Consuming and producing events on Knative Eventing in Quarkus

This document describes how you can configure a workflow to interact with Knative Eventing.

Knative Eventing abstracts the events consumption through event source and sink components. An event source is a Kubernetes object that produces the event, and a sink is another Kubernetes object that receives the event. The workflow application can act as a source, a sink, or both in Knative Eventing platform.

You need to add the Kogito Knative Eventing add-on dependency to indicate that you are using Knative Eventing. To enable a workflow to use Knative Eventing, add the following dependency to the pom.xml file of your project:

  • Apache Maven

  • Quarkus CLI

  • Manually

mvn quarkus:add-extension -Dextensions="kogito-addons-quarkus-knative-eventing"
quarkus extension add kogito-addons-quarkus-knative-eventing
<dependency>
    <groupId>org.kie.kogito</groupId>
    <artifactId>kogito-addons-quarkus-knative-eventing</artifactId>
</dependency>

If you have used the Knative workflow CLI to create your project, then the Kogito Knative Eventing extension is already present. For more information about creating a project using Knative workflow CLI, see SonataFlow plug-in for Knative CLI.

The Kogito Knative Eventing add-on takes care of the required dependencies and additional configuration that the workflow application needs, to interact with the Knative Eventing platform.

Although the default configuration that the Quarkus Knative Eventing add-on provides ought to be enough for most of the use cases, sometimes you might need to do additional configuration to serve a specific scenario.

Knative Eventing add-on source configuration

The configuration described in this section is useful if your workflow consists of at least one produced type event definition. In this scenario, the workflow application produces events that act as a Knative source.

HTTP transport configuration

Knative injects the K_SINK environment variable in the workflow application when you deploy the application in the cluster. SonataFlow uses the K_SINK environment variable to address the produced events to the correct Knative sink. For more information, see SinkBinding in Knative Eventing documentation.

The following table lists the HTTP transport configuration properties:

Table 1. HTTP transport configuration properties
Property Default value Description

mp.messaging.outgoing.kogito_outgoing_stream.url

${K_SINK:http://localhost:9090}

This property indicates where to POST the HTTP CloudEvent message.

mp.messaging.outgoing.kogito_outgoing_stream.connector

quarkus-http

This property indicates the Quarkus Smallrye channel implementation. You might not need to change this property.

If the K_SINK variable is not present, then the default value is http://localhost:9000. You can override the default value in development environments.

Health check configuration

By default, the workflow application generates a health check probe to verify if the Knative platform injected a valid K_SINK variable. If the K_SINK variable is not present, then the pod is not in the state of receiving requests.

The following table lists the health check probe configuration property:

Table 2. Health check probe configuration property
Property Default value Description

org.kie.kogito.addons.knative.eventing.health_enabled

true

This property indicates if the health check is enabled to verify that the K_SINK variable is injected into the environment.

Knative target sink generation configuration

The Kogito Knative Eventing add-on generates a few Knative objects during build time. By default, the add-on generates a Knative Broker named default if the workflow application is acting as an event source.

The following table lists the configuration properties related to Knative sink generation:

Table 3. Knative sink generation configuration properties
Property Default value Description

org.kie.kogito.addons.knative.eventing.auto_generate_broker

true

This property indicates if the Kogito Knative Eventing add-on generates a default Knative Broker in memory to sink and dispatch the messages. Set this property to false in case a broker is already installed in your namespace. Note that you can use org.kie.kogito.addons.knative.eventing.sink.* property to configure your custom sink. If this property is not set, then the auto-generated broker works as a sink.

org.kie.kogito.addons.knative.eventing.sink.namespace

This property indicates the namespace where the generated Knative sink is deployed.

org.kie.kogito.addons.knative.eventing.sink.api_version

eventing.knative.dev/v1

This property indicates the API group and version of the generated Knative sink.

org.kie.kogito.addons.knative.eventing.sink.name

default

This property indicates the name of the generated Knative sink.

org.kie.kogito.addons.knative.eventing.sink.kind

Broker

This property indicates the Kubernetes kind of the generated Knative sink.

Knative Eventing add-on sink configuration

The configuration described in this section is useful if your workflow consists of at least one consumed type event definition. In this scenario, the workflow application consumes events, acting as a Knative sink.

When the workflow application needs to consume events, the Knative Eventing add-on generates Knative triggers. The Knative triggers are configured to listen to a broker with the required event type, which is defined in your workflow definition.

The following table lists the configuration property related to Knative sink generation:

Table 4. Knative sink generation configuration property
Property Default value Description

org.kie.kogito.addons.knative.eventing.broker

default

This property indicates the name of the default Knative broker that is deployed in the Kubernetes namespace. This broker is used as the reference to create the Knative triggers, which are responsible to delegate the events that the workflow service consumes.

mp.messaging.incoming.kogito_incoming_stream.path

/ (root path)

This property indicates the HTTP path where the workflow application will listen for the CloudEvents in the default incoming channel.

mp.messaging.incoming.<event type name>.path

/ (root path)

This property indicates the HTTP path where the workflow application will listen for the CloudEvents in the specific given channel name. The channel name is the event type as defined in the Serverless Workflow events definition.

Manually sending events to an HTTP endpoint

You can send HTTP CloudEvents to the workflow application endpoint by using any tool that’s capable to produce HTTP requests. The only requirement is that the request conforms to the CloudEvents specification.

For example, with the help of curl, you can send an event to the workflow using the following command:

Sending a CloudEvent over HTTP using the structured format.
curl -X POST \
     -H 'Content-Type: application/cloudevents+json'  \
     -d '{"datacontenttype": "application/json", "specversion":"1.0","id":"41495513-a9ef-4a81-8479-21bb14db61f0","source":"/local/curl","type":"kogito.serverless.loanbroker.aggregated.quotes.response","data": { "amount": 300000, "term": 30, "credit": { "score": 700, "history": 15 }, "quotes": [{ "bankId": "Bank1", "rate": 12.2  }, {"bankId": "Bank2", "rate": 10}]  } } ' \
http://localhost:8080

In this example we are using the CloudEvents structured format, which includes every event information within the request payload. Note the header Content-Type being application/cloudevents+json.

Alternatively, you can use the CloudEvents binary format, which includes the event metadata in the HTTP header. For example, using the same event as before:

Sending a CloudEvent over HTTP using the binary format.
curl -X POST -i \
     -H 'Content-Type: application/json'  \
     -H 'ce-specversion: 1.0' \
     -H 'ce-id: 41495513-a9ef-4a81-8479-21bb14db61f0' \
     -H 'ce-source: /local/curl' \
     -H 'ce-type: kogito.serverless.loanbroker.aggregated.quotes.response' \
     -d '{ "amount": 300000, "term": 30, "credit": { "score": 700, "history": 15 }, "quotes": [{ "bankId": "Bank1", "rate": 12.2  }, {"bankId": "Bank2", "rate": 10}]  }' \
http://localhost:8080/

You can use this tool to test your SonataFlow application locally and verify if the events are being consumed correctly by the workflow.

For more information about testing incoming and outgoing CloudEvents over HTTP, see Mocking HTTP CloudEvents sink using WireMock.

Generating Knative objects during build time

SonataFlow can generate Knative objects during the workflow application build time to facilitate the deployment in a Kubernetes cluster. However, you do not need to use the generated objects if you plan to create and deploy the Knative objects by yourself.

Prerequisites
  • A workflow application with the Knative Eventing add-on is created.

Procedure
  1. Add the following Quarkus Kubernetes extension dependency to the pom.xml file of your project:

    • Apache Maven

    • Quarkus CLI

    • Manually

    mvn quarkus:add-extension -Dextensions="quarkus-kubernetes,quarkus-container-image-jib"
    quarkus extension add quarkus-kubernetes quarkus-container-image-jib
    <dependency>
        <groupId>io.quarkus</groupId>
        <artifactId>quarkus-kubernetes</artifactId>
    </dependency>
    <dependency>
        <groupId>io.quarkus</groupId>
        <artifactId>quarkus-container-image-jib</artifactId>
    </dependency>
  2. Add the quarkus.kubernetes.deployment-target=knative property to your application.properties file.

  3. Build your workflow application using the following command:

    • Apache Maven

    • Quarkus CLI

    • Knative CLI

    mvn clean install
    quarkus build
    kn workflow build --image=<name>

    The target/kubernetes directory contains two files including knative.yml and kogito.yml. The knative.yml file contains the Knative service representing the workflow application. The kogito.yml file contains the required objects to connect the workflow application to the Knative Eventing platform.

  4. You can use the generated files to deploy the workflow application in the Kubernetes cluster using the following command:

    • Kubernetes command line tool

    • Knative CLI

    kubectl apply -f target/kogito.yml
    kubectl apply -f target/knative.yml
    kn workflow deploy

    For more information about building and deploying the workflow application, see Building workflow images using Quarkus CLI.

Example of workflow event definition in Knative

A workflow must contain at least one event definition for the Knative Eventing add-on to generate the event binding objects. The following is an example of a workflow containing produced and consumed events:

Example of a workflow with produced and consumed events
{
    "events": [
    {
      "name": "requestQuote",
      "type": "kogito.sw.request.quote",
      "kind": "produced"
    },
    {
      "name": "aggregatedQuotesResponse",
      "type": "kogito.loanbroker.aggregated.quotes.response",
      "kind": "consumed",
      "source": "/kogito/serverless/loanbroker/aggregator"
    }]
}

A workflow application with events definition needs a Knative SinkBinding to configure the target sink. The target sink is where the produced events (kogito.sw.request.quote event in the previous example) are dispatched. In this case, the Knative Eventing add-on generates an object as shown in the following example:

Example of a Knative SinkBinding generated by the add-on
apiVersion: sources.knative.dev/v1
kind: SinkBinding
metadata:
  name: sb-loanbroker-flow
spec:
  sink:
    ref:
      apiVersion: eventing.knative.dev/v1
      kind: Broker
      name: default
      namespace: ""
  subject:
    apiVersion: serving.knative.dev/v1
    kind: Service
    name: loanbroker-flow

Regardless of the number of produced events in the workflow definition, only one SinkBinding is generated. When you define multiple events, ensure that your sink is a Knative Broker. The listener services can configure the subscriptions or triggers to consume the events from the broker.

For the kogito.loanbroker.aggregated.quotes.response event in a previous example, the Knative Eventing platform must be configured with a Knative trigger using an appropriate CloudEvent filter. The following example shows the Trigger generated by the Knative Eventing add-on:

Example of a Knative Trigger generated by the add-on
apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
  name: kogito-serverless-loanbroker-aggregated-quotes-response-trigger
spec:
  broker: default
  filter:
    attributes:
      type: kogito.loanbroker.aggregated.quotes.response
  subscriber:
    ref:
      apiVersion: serving.knative.dev/v1
      kind: Service
      name: loanbroker-flow

For each consumed event definition, the Knative Eventing add-on generates one Knative Trigger.

Found an issue?

If you find an issue or any misleading information, please feel free to report it here. We really appreciate it!