Event correlation in SonataFlow
Event correlation plays a significant role in large event-driven applications. It allows matching one or more events with a particular workflow instance. Correlation rules with domain information can be used when defining consumed events to match a workflow instance. Also, event correlation is a practical alternative to the internal identifier processInstanceId
since you can use any external domain identifier as a correlation.
A correlation definition consists of one or more attributes associated with an event and the respective workflow to which the event belongs. In Serverless Workflow specification, the correlation property defines the possible correlations for a given event. Each element must contain a contextAttributeName
property, which is set for the value that matches an attribute from an event.
You can optionally set the contextAttributeValue
property, which matches the value for the respective attribute defined in contextAttributeName
property for the consumed events in a workflow.
The incoming events consumed by the engine must contain the correlation attributes, set in the definition as extension context attributes. The correlation attributes are compliant with the CloudEvent format, therefore, the attributes are not part of the event payload. |
A new workflow instance must be created using an event, which must be declared in the workflow definition file, containing correlation attributes in the event definition section. For more information about events, see Event state in SonataFlow. Once the event is consumed, the engine extracts the correlation attributes and associates the attributes with the created workflow instance.
A start event does not trigger a correlation evaluation, but acts as a moment in which correlation attributes and values are set. The correlation attributes and values are evaluated against other incoming events that might trigger the given instance. Therefore, when a non-start event is consumed and correlation attributes are evaluated, then the engine continues the execution of the matched instances (if any).
The following figure shows how engine processes the correlation:
Example of event correlation in a workflow
You can see the correlation configuration in the serverless-workflow-correlation-quarkus
example application, describing the setup and usage of event correlation in a workflow.
The workflow definition file contains the correlation information, in which the events section is defined as follows:
"events": [
{
"name": "newAccountEvent",
"source": "",
"type": "newAccountEventType",
"correlation": [
{
"contextAttributeName": "userid"
}
]
},
{
"name": "validateAccountEmailEvent",
"source": "workflow",
"type": "validateAccountEmail"
},
{
"name": "validatedAccountEmailEvent",
"source": "workflow",
"type": "validatedAccountEmail",
"correlation": [
{
"contextAttributeName": "userid"
}
]
},
{
"name": "activateAccountEvent",
"source": "workflow",
"type": "activateAccount"
},
{
"name": "activatedAccountEvent",
"source": "workflow",
"type": "activatedAccount",
"correlation": [
{
"contextAttributeName": "userid"
}
]
}
]
You can create a workflow by consuming events as defined in the New User Account Request
event state. The New User Account Request
event state contains a reference of the newAccountEvent
event, containing a correlation definition for userid
attribute.
New User Account Request
event state definition{
"name": "New User Account Request",
"type": "event",
"onEvents": [
{
"eventRefs": [
"newAccountEvent"
]
}
],
"transition": "Validate User Email"
}
When the workflow consumes a new event of newAccountEventType
type, a workflow instance is created. After that, the events consumed by the same workflow must contain the same correlation attribute and value, such as userid
attribute and 12345
value. This correlation attribute and value is used to evaluate and match the workflow instance to continue the workflow execution.
newAccountEvent
{
"specversion": "0.3",
"id": "1d174d25-46ac-4785-bc76-457c2d37d2fe",
"source": "",
"type": "newAccountEventType",
"time": "2022-07-25T16:30:35.461988261-03:00",
"userid": "12345",
"data": {
"email": "test@test.com",
"userId": "12345"
}
}
In SonataFlow, correlating multiple events together is not supported, therefore, the events are evaluated with correlations independently. |
Using the workflow definition in serverless-workflow-correlation-quarkus
example application, you can define other events that are published and consumed by the workflow.
The serverless-workflow-correlation-quarkus
example application uses callback states, such as Validate User Email
. This means that once the workflow execution reaches the callback state, the workflow publishes an event of validateAccountEmailEvent
type and waits to receive an event of validatedAccountEmailEvent
type. For more information about the callback state, see Callback state in SonataFlow.
{
"name": "Validate User Email",
"type": "callback",
"action": {
"name": "publish validate event",
"eventRef": {
"triggerEventRef": "validateAccountEmailEvent"
}
},
"eventRef": "validatedAccountEmailEvent",
"transition": "Activate User Account"
}
The produced events contain the same correlation attributes when the workflow is created. |
validateAccountEmailEvent
{
"id": "7640a0af-b7fb-4d94-9d9d-3aa1ace60e79",
"source": "/process/correlation",
"type": "validateAccountEmail",
"time": "2022-07-25T16:22:53.735128049-03:00",
"data": {
"email": "test@test.com",
"userId": "12345"
},
"specversion": "1.0",
"kogitoprocinstanceid": "69019826-daef-4fb4-880b-c1658c4e49bc",
"kogitoprocid": "correlation",
"kogitoprocversion": "1.0",
"kogitousertaskist": "1",
"kogitoproctype": "SW",
"userid": "12345"
}
All consumed events must contain the same correlation attributes since the consumed events are used to identify the workflow instance. The following example shows the consumed events containing same correlation attributes and values, such as userid
and 12345
:
validatedAccountEmailEvent
{
"specversion": "1.0",
"id": "953f07a7-aea8-4956-8775-85ab59366fe6",
"source": "",
"type": "validatedAccountEmail",
"time": "2022-07-25T16:29:27.320408379-03:00",
"userid": "12345",
"data": null
}
The engine stores the correlation information in the same persistence mechanism that is configured in the workflow application. If a persistence add-on is not configured, then the correlation information is stored in memory. This means that entire correlation information is lost when the workflow application restarts, therefore this process must be used for testing purposes. For more information about the persistence configuration, see Running a workflow service using PostgreSQL.
Currently, only |
Found an issue?
If you find an issue or any misleading information, please feel free to report it here. We really appreciate it!