Orchestrating AsyncAPI Services
This document describes how to trigger and publish events using an AsyncAPI specification file. Underneath, SonataFlow uses the AsyncAPI Quarkiverse extension. This extension automatically generates all the required Smallrye properties to make SonataFlow work with the servers declared in the AsyncAPI file. Therefore, you do not need to add any property specific to your event broker to make AsyncAPI work. This is a huge advantage over regular event states, where whether using Kafka or Knative, you have to provide additional configuration
AsyncAPI Quarkiverse extension only supports |
Consuming an event
Let’s assume there is an AsyncAPI specification file called asyncAPI.yaml
, located in the classpath of your project.
This file contains a servers
definition which protocol
is Kafka:
servers:
development:
url: localhost:9092
description: Development server
protocol: kafka
protocolVersion: '1.0.0'
and a subscription channel named wait
:
channels:
wait:
description: A message channel
subscribe:
operationId: consumeWait
summary: Get messages
message:
$ref: '#/components/messages/message'
You can define a function in your workflow, of type asyncapi
, that points to consumerWait
operationId:
{
"functions": [
{
"name": "consumeEvent",
"type": "asyncapi",
"operation": "asyncAPI.yaml#consumeWait"
}
]
}
Remember that, as happens with |
Once the function is defined, you can invoke it within an operation
state:
{
"name": "waitForEvent",
"type": "operation",
"actions": [
{
"functionRef": "consumeEvent"
}
]
}
When your flow reach the waitForEvent
state, it will stop until it receives a CloudEvent published over the Kafka topic wait
.
Publishing an event
Let’s assume there is an AsyncAPI specification file called asyncAPI.yaml
, located in the classpath of your project.
This file contains a servers
definition which protocol
is HTTP:
servers:
development:
url: localhost:8080
description: Development server
protocol: http
protocolVersion: '1.0.0'
and a publishing channel named resume
:
channels:
resume:
description: A message channel
publish:
operationId: sendResume
summary: Get messages
message:
$ref: '#/components/messages/message'
You can define a function in your workflow, of type asyncapi
, that point to sendResume
operationId:
{
"functions": [
{
"name": "sendResume",
"type": "asyncapi",
"operation": "asyncAPI.yaml#sendResume"
}
]
}
Once the function is defined, you can invoke it within an operation
state:
{
"name": "sendEvent",
"type": "operation",
"actions": [
{
"functionRef": "sendResume",
"arguments" : {
"name" : "Javierito"
}
}
]
}
When your flow reach the sendEvent
state, it will send an event to the endpoint http://localhost:8080/resume
, with the payload {"name":"Javierito"}
Found an issue?
If you find an issue or any misleading information, please feel free to report it here. We really appreciate it!