Event correlation in SonataFlow

Event correlation plays a significant role in large event-driven applications. It allows matching one or more events with a particular workflow instance. Correlation rules with domain information can be used when defining consumed events to match a workflow instance. Also, event correlation is a practical alternative to the internal identifier processInstanceId since you can use any external domain identifier as a correlation.

A correlation definition consists of one or more attributes associated with an event and the respective workflow to which the event belongs. In Serverless Workflow specification, the correlation property defines the possible correlations for a given event. Each element must contain a contextAttributeName property, which is set for the value that matches an attribute from an event.

You can optionally set the contextAttributeValue property, which matches the value for the respective attribute defined in contextAttributeName property for the consumed events in a workflow.

The incoming events consumed by the engine must contain the correlation attributes, set in the definition as extension context attributes. The correlation attributes are compliant with the CloudEvent format, therefore, the attributes are not part of the event payload.

A new workflow instance must be created using an event, which must be declared in the workflow definition file, containing correlation attributes in the event definition section. For more information about events, see Event state in SonataFlow. Once the event is consumed, the engine extracts the correlation attributes and associates the attributes with the created workflow instance.

A start event does not trigger a correlation evaluation, but acts as a moment in which correlation attributes and values are set. The correlation attributes and values are evaluated against other incoming events that might trigger the given instance. Therefore, when a non-start event is consumed and correlation attributes are evaluated, then the engine continues the execution of the matched instances (if any).

The following figure shows how engine processes the correlation:

event correlation evaluation flow
Figure 1. Event correlation evaluation process

Example of event correlation in a workflow

You can see the correlation configuration in the serverless-workflow-correlation-quarkus example application, describing the setup and usage of event correlation in a workflow.

Event correlation evaluation process
Figure 2. Event correlation evaluation process

The workflow definition file contains the correlation information, in which the events section is defined as follows:

Example event correlation in a workflow
  "events": [
    {
      "name": "newAccountEvent",
      "source": "",
      "type": "newAccountEventType",
      "correlation": [
        {
          "contextAttributeName": "userid"
        }
      ]
    },
    {
      "name": "validateAccountEmailEvent",
      "source": "workflow",
      "type": "validateAccountEmail"
    },
    {
      "name": "validatedAccountEmailEvent",
      "source": "workflow",
      "type": "validatedAccountEmail",
      "correlation": [
        {
          "contextAttributeName": "userid"
        }
      ]
    },
    {
      "name": "activateAccountEvent",
      "source": "workflow",
      "type": "activateAccount"
    },
    {
      "name": "activatedAccountEvent",
      "source": "workflow",
      "type": "activatedAccount",
      "correlation": [
        {
          "contextAttributeName": "userid"
        }
      ]
    }
  ]

You can create a workflow by consuming events as defined in the New User Account Request event state. The New User Account Request event state contains a reference of the newAccountEvent event, containing a correlation definition for userid attribute.

Example New User Account Request event state definition
{
  "name": "New User Account Request",
  "type": "event",
  "onEvents": [
    {
      "eventRefs": [
        "newAccountEvent"
      ]
    }
  ],
  "transition": "Validate User Email"
}

When the workflow consumes a new event of newAccountEventType type, a workflow instance is created. After that, the events consumed by the same workflow must contain the same correlation attribute and value, such as userid attribute and 12345 value. This correlation attribute and value is used to evaluate and match the workflow instance to continue the workflow execution.

Example incoming start event newAccountEvent
{
  "specversion": "0.3",
  "id": "1d174d25-46ac-4785-bc76-457c2d37d2fe",
  "source": "",
  "type": "newAccountEventType",
  "time": "2022-07-25T16:30:35.461988261-03:00",
  "userid": "12345",
  "data": {
    "email": "test@test.com",
    "userId": "12345"
  }
}

In SonataFlow, correlating multiple events together is not supported, therefore, the events are evaluated with correlations independently.

Using the workflow definition in serverless-workflow-correlation-quarkus example application, you can define other events that are published and consumed by the workflow.

The serverless-workflow-correlation-quarkus example application uses Callback states, such as Validate User Email. This means that once the workflow execution reaches the Callback state, the workflow publishes an event of validateAccountEmailEvent type and waits to receive an event of validatedAccountEmailEvent type. For more information about callback state, see Callback state in SonataFlow.

Example Callback state definition
{
  "name": "Validate User Email",
  "type": "callback",
  "action": {
    "name": "publish validate event",
    "eventRef": {
      "triggerEventRef": "validateAccountEmailEvent"
    }
  },
  "eventRef": "validatedAccountEmailEvent",
  "transition": "Activate User Account"
}

The produced events contain the same correlation attributes when the workflow is created.

Example produced Callback state event validateAccountEmailEvent
{
  "id": "7640a0af-b7fb-4d94-9d9d-3aa1ace60e79",
  "source": "/process/correlation",
  "type": "validateAccountEmail",
  "time": "2022-07-25T16:22:53.735128049-03:00",
  "data": {
    "email": "test@test.com",
    "userId": "12345"
  },
  "specversion": "1.0",
  "kogitoprocinstanceid": "69019826-daef-4fb4-880b-c1658c4e49bc",
  "kogitoprocid": "correlation",
  "kogitoprocversion": "1.0",
  "kogitousertaskist": "1",
  "kogitoproctype": "SW",
  "userid": "12345"
}

All consumed events must contain the same correlation attributes since the consumed events are used to identify the workflow instance. The following example shows the consumed events containing same correlation attributes and values, such as userid and 12345:

Consumed Callback state event validatedAccountEmailEvent
{
  "specversion": "1.0",
  "id": "953f07a7-aea8-4956-8775-85ab59366fe6",
  "source": "",
  "type": "validatedAccountEmail",
  "time": "2022-07-25T16:29:27.320408379-03:00",
  "userid": "12345",
  "data": null
}

The engine stores the correlation information in the same persistence mechanism that is configured in the workflow application. If a persistence add-on is not configured, then the correlation information is stored in memory. This means that entire correlation information is lost when the workflow application restarts, therefore this process must be used for testing purposes. For more information about the persistence configuration, see Running a workflow service using PostgreSQL.

Currently, only kogito-addons-quarkus-persistence-jdbc persistence add-on supports correlation. The kogito-addons-quarkus-persistence-jdbc add-on is configured for PostgreSQL. Other persistence add-ons will be supported in a future release.

Found an issue?

If you find an issue or any misleading information, please feel free to report it here. We really appreciate it!