Consuming and producing events on Knative Eventing
This document describes how you can configure a workflow to interact with Knative Eventing.
Knative Eventing abstracts the events consumption through event source and sink components. An event source is a Kubernetes object that produces the event, and a sink is another Kubernetes object that receives the event. The workflow application can act as a source, a sink, or both in Knative Eventing platform.
You need to add the Kogito Knative Eventing add-on dependency to indicate that you are using Knative Eventing. To enable a workflow to use Knative Eventing, add the following dependency to the pom.xml file of your project:
mvn quarkus:add-extension -Dextensions="kogito-addons-quarkus-knative-eventing"
quarkus extension add kogito-addons-quarkus-knative-eventing
<dependency>
<groupId>org.kie.kogito</groupId>
<artifactId>kogito-addons-quarkus-knative-eventing</artifactId>
</dependency>
|
If you have used the Knative workflow CLI to create your project, then the Kogito Knative Eventing extension is already present. For more information about creating a project using Knative workflow CLI, see Serverless Workflow plug-in for Knative CLI. |
The Kogito Knative Eventing add-on takes care of the required dependencies and additional configuration that the workflow application needs, to interact with the Knative Eventing platform.
Although the default configuration that the Quarkus Knative Eventing add-on provides ought to be enough for most of the use cases, sometimes you might need to do additional configuration to serve a specific scenario.
Knative Eventing add-on source configuration
The configuration described in this section is useful if your workflow consists of at least one produced type event definition. In this scenario, the workflow application produces events that act as a Knative source.
- HTTP transport configuration
-
Knative injects the
K_SINKenvironment variable in the workflow application when you deploy the application in the cluster. Kogito uses theK_SINKenvironment variable to address the produced events to the correct Knative sink. For more information, see SinkBinding in Knative Eventing documentation.The following table lists the HTTP transport configuration properties:
Table 1. HTTP transport configuration properties Property Default value Description mp.messaging.outgoing.kogito_outgoing_stream.url${K_SINK:http://localhost:9090}This property indicates where to POST the HTTP CloudEvent message.
mp.messaging.outgoing.kogito_outgoing_stream.connectorquarkus-httpThis property indicates the Quarkus Smallrye channel implementation. You might not need to change this property.
If the
K_SINKvariable is not present, then the default value ishttp://localhost:9000. You can override the default value in development environments. - Health check configuration
-
By default, the workflow application generates a health check probe to verify if the Knative platform injected a valid
K_SINKvariable. If theK_SINKvariable is not present, then the pod is not in the state of receiving requests.The following table lists the health check probe configuration property:
Table 2. Health check probe configuration property Property Default value Description org.kie.kogito.addons.knative.health_enabledtrueThis property indicates if the health check is enabled to verify that the
K_SINKvariable is injected into the environment. - Knative target sink generation configuration
-
The Kogito Knative Eventing add-on generates a few Knative objects during build time. By default, the add-on generates a Knative Broker named
defaultif the workflow application is acting as an event source.The following table lists the configuration properties related to Knative sink generation:
Table 3. Knative sink generation configuration properties Property Default value Description org.kie.kogito.addons.knative.auto_generate_brokertrue
This property indicates if the Kogito Knative Eventing add-on generates a default Knative Broker in memory to sink and dispatch the messages. Set this property to
falsein case a broker is already installed in your namespace. Note that you can useorg.kie.kogito.addons.knative.eventing.sink.*property to configure your custom sink. If this property is not set, then the auto-generated broker works as a sink.org.kie.kogito.addons.knative.sink.namespaceThis property indicates the namespace where the generated Knative sink is deployed.
org.kie.kogito.addons.knative.sink.api_versioneventing.knative.dev/v1This property indicates the API group and version of the generated Knative sink.
org.kie.kogito.addons.knative.sink.namedefaultThis property indicates the name of the generated Knative sink.
org.kie.kogito.addons.knative.sink.kindBrokerThis property indicates the Kubernetes kind of the generated Knative sink.
Knative Eventing add-on sink configuration
The configuration described in this section is useful if your workflow consists of at least one consumed type event definition. In this scenario, the workflow application consumes events, acting as a Knative sink.
When the workflow application needs to consume events, the Knative Eventing add-on generates Knative triggers. The Knative triggers are configured to listen to a broker with the required event type, which is defined in your workflow definition.
The following table lists the configuration property related to Knative sink generation:
| Property | Default value | Description |
|---|---|---|
|
|
This property indicates the name of the default Knative broker that is deployed in the Kubernetes namespace. This broker is used as the reference to create the Knative triggers, which are responsible to delegate the events that the workflow service consumes. |
|
|
This property indicates the HTTP path where the workflow application will listen for the CloudEvents in the default incoming channel. |
|
|
This property indicates the HTTP path where the workflow application will listen for the CloudEvents in the specific given channel name. The channel name is the event |
Manually sending events to an HTTP endpoint
You can send HTTP CloudEvents to the workflow application endpoint by using any tool that’s capable to produce HTTP requests. The only requirement is that the request conforms to the CloudEvents specification.
For example, with the help of curl, you can send an event to the workflow using the following command:
curl -X POST \
-H 'Content-Type: application/cloudevents+json' \
-d '{"datacontenttype": "application/json", "specversion":"1.0","id":"41495513-a9ef-4a81-8479-21bb14db61f0","source":"/local/curl","type":"kogito.serverless.loanbroker.aggregated.quotes.response","data": { "amount": 300000, "term": 30, "credit": { "score": 700, "history": 15 }, "quotes": [{ "bankId": "Bank1", "rate": 12.2 }, {"bankId": "Bank2", "rate": 10}] } } ' \
http://localhost:8080
In this example we are using the CloudEvents structured format, which includes every event information within the request payload. Note the header Content-Type being application/cloudevents+json.
Alternatively, you can use the CloudEvents binary format, which includes the event metadata in the HTTP header. For example, using the same event as before:
curl -X POST -i \
-H 'Content-Type: application/json' \
-H 'ce-specversion: 1.0' \
-H 'ce-id: 41495513-a9ef-4a81-8479-21bb14db61f0' \
-H 'ce-source: /local/curl' \
-H 'ce-type: kogito.serverless.loanbroker.aggregated.quotes.response' \
-d '{ "amount": 300000, "term": 30, "credit": { "score": 700, "history": 15 }, "quotes": [{ "bankId": "Bank1", "rate": 12.2 }, {"bankId": "Bank2", "rate": 10}] }' \
http://localhost:8080/
You can use this tool to test your Kogito application locally and verify if the events are being consumed correctly by the workflow.
For more information about testing incoming and outgoing CloudEvents over HTTP, see Mocking HTTP CloudEvents sink using WireMock.
Generating Knative objects during build time
Kogito can generate Knative objects during the workflow application build time to facilitate the deployment in a Kubernetes cluster. However, you do not need to use the generated objects if you plan to create and deploy the Knative objects by yourself.
-
A workflow application with the Knative Eventing add-on is created.
-
Add the following Quarkus Kubernetes extension dependency to the
pom.xmlfile of your project:mvn quarkus:add-extension -Dextensions="quarkus-kubernetes,quarkus-container-image-jib"quarkus extension add quarkus-kubernetes quarkus-container-image-jib<dependency> <groupId>io.quarkus</groupId> <artifactId>quarkus-kubernetes</artifactId> </dependency> <dependency> <groupId>io.quarkus</groupId> <artifactId>quarkus-container-image-jib</artifactId> </dependency> -
Add the
quarkus.kubernetes.deployment-target=knativeproperty to yourapplication.propertiesfile. -
Build your workflow application using the following command:
mvn clean installquarkus buildkn workflow build --image=<name>The
target/kubernetesdirectory contains two files includingknative.ymlandkogito.yml. Theknative.ymlfile contains the Knative service representing the workflow application. Thekogito.ymlfile contains the required objects to connect the workflow application to the Knative Eventing platform. -
You can use the generated files to deploy the workflow application in the Kubernetes cluster using the following command:
kubectl apply -f target/kogito.yml kubectl apply -f target/knative.ymlkn workflow deployFor more information about building and deploying the workflow application, see Building workflow images using Quarkus CLI.
Example of workflow event definition in Knative
A workflow must contain at least one event definition for the Knative Eventing add-on to generate the event binding objects. The following is an example of a workflow containing produced and consumed events:
{
"events": [
{
"name": "requestQuote",
"type": "kogito.sw.request.quote",
"kind": "produced"
},
{
"name": "aggregatedQuotesResponse",
"type": "kogito.loanbroker.aggregated.quotes.response",
"kind": "consumed",
"source": "/kogito/serverless/loanbroker/aggregator"
}]
}
A workflow application with events definition needs a Knative SinkBinding to configure the target sink. The target sink is where the produced events (kogito.sw.request.quote event in the previous example) are dispatched. In this case, the Knative Eventing add-on generates an object as shown in the following example:
SinkBinding generated by the add-onapiVersion: sources.knative.dev/v1
kind: SinkBinding
metadata:
name: sb-loanbroker-flow
spec:
sink:
ref:
apiVersion: eventing.knative.dev/v1
kind: Broker
name: default
namespace: ""
subject:
apiVersion: serving.knative.dev/v1
kind: Service
name: loanbroker-flow
|
Regardless of the number of produced events in the workflow definition, only one |
For the kogito.loanbroker.aggregated.quotes.response event in a previous example, the Knative Eventing platform must be configured with a Knative trigger using an appropriate CloudEvent filter. The following example shows the Trigger generated by the Knative Eventing add-on:
Trigger generated by the add-onapiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
name: kogito-serverless-loanbroker-aggregated-quotes-response-trigger
spec:
broker: default
filter:
attributes:
type: kogito.loanbroker.aggregated.quotes.response
subscriber:
ref:
apiVersion: serving.knative.dev/v1
kind: Service
name: loanbroker-flow
For each consumed event definition, the Knative Eventing add-on generates one Knative Trigger.
Found an issue?
If you find an issue or any misleading information, please feel free to report it here. We really appreciate it!