OpenAPI Callback in SonataFlow

This document describes the OpenAPI Callback and related examples. Also, it covers the difference between OpenAPI Callback and the Serverless Workflow Callback state. The Serverless Workflow callback state can be implemented using both the OpenAPI callback and the Callback state functionalities. Both the functionalities perform an action and wait for an event to be produced as a result of the action to resume the workflow, but each of them follows a different approach in achieving it. Therefore, both OpenAPI callback and the Callback state approaches are suitable to perform fire & wait-for-result operations.

The OpenAPI callback is an asynchronous, out-of-band request that your service will send to some other service in response to specific events. In contrast, the Callback state performs an action that triggers an external activity/service which is responsible for sending a cloud event to the workflow. Both ways aim to send an event to the workflow to resume the flow.

As mentioned in the OpenAPI Callback document, when the workflow makes an asynchronous service call, you must provide the callback URL in the request body. The external service invokes the callback URL and sends a CloudEvent to the workflow.

From a workflow perspective, asynchronous service indicates that the control is returned to the caller immediately without waiting for the action to be completed. Once the action is completed, a CloudEvent needs to be published to resume the workflow.

For the workflow to identify the published CloudEvent it is waiting for, the external service developer includes the workflow instance ID in the CloudEvent header or uses the Event correlation. The following figure displays the process:

callbacks explained

The workflow correlation described in this document focuses on the former mechanism that is based on the fact that each workflow instance contains a unique identifier, which is generated automatically.

Example of the OpenAPI Callback

To understand the OpenAPI Callback, see the serverless-workflow-callback-events-over-http-quarkus example application in GitHub repository.

This example contains a simple workflow-service that illustrates callback state using OpenAPI callbacks functionality. A callback is a state that invokes an action and wait for an event (event that will be eventually fired by the external service notified by the action). This example consists of a callback state that waits for an event to arrive at the wait channel. Its action calls an external service named callback-event-service that publishes the wait event over HTTP. After consuming the wait event, the workflow prints the message received in the wait event and ends the workflow.

The serverless-workflow-callback-events-over-http-quarkus application is initiated with the following request to http://localhost:8080/callback:

{
  "message": "Hello"
}

Once the workflow is started, it makes an external service call with the callback URL and the workflow instance ID in the request body to callback-event-service. Then, as configured in the OpenAPI file, the callback URL is invoked to send a CloudEvent to the workflow.

Once the wait type CloudEvent is received by the callback-workflow-service, the workflow moves to the next state and ends successfully. The following figure shows the serverless-workflow-callback-events-over-http-quarkus image:

openapi callback

To use the OpenAPI callback in a workflow, the OpenAPI YAML file is configured with the callback as specified in the OpenAPI file.

To use the Callback state in a workflow, first CloudEvent type wait is declared that the workflow uses. Following is an example of CloudEvents declaration in a workflow definition:

Example of CloudEvent declaration in a workflow definition
 "events": [
    {
      "name": "waitEvent",
      "source": "",
      "type": "wait"
    }

After that, a Callback state is declared, which waits for a CloudEvent with the wait type. Following is an example of declaring a Callback state that handles the wait type CloudEvent:

Example of a Callback State declaration handling the wait CloudEvent
 {
      "name": "waitForEvent",
      "type": "callback",
      "action":
        {
        "functionRef": {
          "refName": "callBack",
          "arguments": {
            "uri": "http://localhost:8080/wait",
            "processInstanceId": "$WORKFLOW.instanceId"
          }
        }
      },
      "eventRef": "waitEvent",
      "transition": "finish"
    }

Please refer configure openapi service endpoints document to set the URL dynamically using an environment variable.

An event listener publishes a new wait type CloudEvent. Following is an example of a Java method that publishes the wait type CloudEvent:

Example of a Java method that makes a call to Callback URL and publishes the wait CloudEvent
    @POST
    @Consumes(MediaType.APPLICATION_JSON)
    public void wait(EventInput eventInput) throws JsonProcessingException {
        logger.info("About to generate event for {}",eventInput);
        CloudEventBuilder builder = CloudEventBuilder.v1()
                .withId(UUID.randomUUID().toString())
                .withSource(URI.create(""))
                .withType("wait")
                .withTime(OffsetDateTime.now())
                .withExtension(CloudEventExtensionConstants.PROCESS_REFERENCE_ID, eventInput.getProcessInstanceId())
                .withData(objectMapper.writeValueAsBytes(Collections.singletonMap("message", "New Event")));

        webClient.postAbs(eventInput.getUri()).sendJson(builder.build()).toCompletionStage();
    }

The callback-workflow-service consumes the CloudEvent, it contains an attribute named kogitoprocrefid, which holds the instance ID of the workflow.

The kogitoprocrefid attribute is crucial because when the correlation is not used, then this attribute is the only way for the Callback state to identify that the related CloudEvent needs to be used to resume the workflow. For more information about correlation, see Event correlation in SonataFlow.

Note that each workflow is identified by a unique instance ID, which is automatically included in any published CloudEvent, as kogitoprocinstanceid CloudEvent extension.

HTTP transport configuration

The serverless-workflow-callback-events-over-http-quarkus example application consumes the Cloudevents using Knative Eventing. For more information about incoming and outgoing CloudEvents oer HTTP, see Consuming and Producing CloudEvents over HTTP.

The HTTP path where the workflow application will listen for the CloudEvents in the serverless-workflow-callback-events-over-http-quarkus example application, is configured in the application.properties file as shown below:

mp.messaging.incoming.wait.connector=quarkus-http
mp.messaging.incoming.wait.path=/wait

Found an issue?

If you find an issue or any misleading information, please feel free to report it here. We really appreciate it!