Orchestrating AsyncAPI Services

This document describes how to trigger and publish events using an AsyncAPI specification file. Underneath, SonataFlow uses the AsyncAPI Quarkiverse extension. This extension automatically generates all the required Smallrye properties to make SonataFlow work with the servers declared in the AsyncAPI file. Therefore, you do not need to add any property specific to your event broker to make AsyncAPI work. This is a huge advantage over regular event states, where whether using Kafka or Knative, you have to provide additional configuration

AsyncAPI Quarkiverse extension only supports kafka and http protocols at the moment.

Consuming an event

Let’s assume there is an AsyncAPI specification file called asyncAPI.yaml, located in the classpath of your project.

This file contains a servers definition which protocol is Kafka:

Example of an AsyncAPI kafka server definition
servers:
  development:
    url: localhost:9092
    description: Development server
    protocol: kafka
    protocolVersion: '1.0.0'

and a subscription channel named wait:

Example of an AsyncAPI subscribing channel definition
channels:
  wait:
    description: A message channel
    subscribe:
      operationId: consumeWait
      summary: Get messages
      message:
        $ref: '#/components/messages/message'

You can define a function in your workflow, of type asyncapi, that points to consumerWait operationId:

Example of an AsyncAPI subscribing function definition
{
   "functions": [
      {
          "name": "consumeEvent",
          "type": "asyncapi",
          "operation": "asyncAPI.yaml#consumeWait"
       }
   ]
}

Remember that, as happens with REST and RPC types, the URI part of the operation property supports http, https, and file schemas. If no schema is present, it assumes the specification file is on the project classpath.

Once the function is defined, you can invoke it within an operation state:

Example of an AsyncAPI subscribing function invocation
   {
      "name": "waitForEvent",
      "type": "operation",
      "actions": [
          {
           "functionRef": "consumeEvent"
          }
       ]
    }

When your flow reach the waitForEvent state, it will stop until it receives a CloudEvent published over the Kafka topic wait.

Publishing an event

Let’s assume there is an AsyncAPI specification file called asyncAPI.yaml, located in the classpath of your project.

This file contains a servers definition which protocol is HTTP:

Example of an AsyncAPI http server definition
servers:
  development:
    url: localhost:8080
    description: Development server
    protocol: http
    protocolVersion: '1.0.0'

and a publishing channel named resume:

Example of an AsyncAPI publishing channel definition
channels:
  resume:
    description: A message channel
    publish:
      operationId: sendResume
      summary: Get messages
      message:
        $ref: '#/components/messages/message'

You can define a function in your workflow, of type asyncapi, that point to sendResume operationId:

Example of an AsyncAPI publishing function definition
{
   "functions": [
      {
          "name": "sendResume",
          "type": "asyncapi",
          "operation": "asyncAPI.yaml#sendResume"
       }
   ]
}

Once the function is defined, you can invoke it within an operation state:

Example of an AsyncAPI publishing function invocation
   {
      "name": "sendEvent",
      "type": "operation",
      "actions": [
          {
           "functionRef": "sendResume",
           "arguments" : {
              "name" : "Javierito"
            }
          }
       ]
    }

When your flow reach the sendEvent state, it will send an event to the endpoint http://localhost:8080/resume, with the payload {"name":"Javierito"}

Found an issue?

If you find an issue or any misleading information, please feel free to report it here. We really appreciate it!