Consuming and producing events using Apache Kafka in Quarkus

This document describes how you can configure a workflow to interact with Apache Kafka. Apache Kafka uses an abstraction called topic to classify events. Events are published to a topic, and events are consumed from a topic.

SonataFlow uses Smallrye connectors to encapsulate access to several brokers, enabling SonataFlow to support different brokers by changing the configuration and classpath dependencies. However, Smallrye connectors also introduce an entity called channel. The Smallrye channels are unidirectional and need to be declared as input (incoming) or output (outgoing). When using Apache Kafka, the Smallrye channels must be mapped to Apache Kafka topics through configuration.

You need to add the Kafka Quarkus Smallrye connector dependency to indicate that you are using Apache Kafka. To enable a workflow to use Apache Kafka Smallrye connector, add the following dependency to the pom.xml file of your project if using Apache Maven:

Add dependency for Apache Kafka Smallrye connector in pom.xml
<dependency>
  <groupId>io.quarkus</groupId>
  <artifactId>quarkus-smallrye-reactive-messaging-kafka</artifactId>
</dependency>

The messaging capabilities are included in the Quarkus Serverless Workflow extension, even though the messaging capabilities are optional. This means you do not need to explicitly add the messaging add-on dependency when using SonataFlow.

Smallrye channels configuration for a workflow

You can configure Smallrye channels for a workflow using event definitions. The Smallrye channels are defined using Quarkus configuration. The format for Smallrye channel properties is mp.messaging.[incoming|outgoing].<channel name>.<property_name>.

SonataFlow allows the following channel mapping strategies:

  • Define one default incoming channel to receive all the incoming messages and one default outgoing channel to store all the published messages.

  • Define a channel for each CloudEvent type so that every message type has a dedicated channel.

  • Define a channel for certain CloudEvent types. The non-mapped CloudEvent types uses the default incoming or outgoing channel.

SonataFlow first searches for a channel name in the properties that is same as CloudEvent type. If the channel name is found, SonataFlow uses the channel for that CloudEvent type. However, if the channel name is not found, SonataFlow searches for the default channel definition. In case the default channel definition is not existing, then an error is reported.

The default incoming channel is kogito_incoming_stream and the default outgoing channel is kogito_outgoing_stream.

To change the default incoming and outgoing topic names, you can use the following properties:

Properties to change default incoming and outgoing topic names
kogito.addon.messaging.incoming.defaultName=<default channel name>
kogito.addon.messaging.outgoing.defaultName=<default channel name>

The following properties are mandatory for each channel:

  • connector: This property needs to be set to smallrye-kafka.

  • Depending on whether the channel is incoming or outgoing:

    • value.deserializer: This property is used for incoming channels. Unless you have specific marshaling requirements, you must set this property to org.apache.kafka.common.serialization.ByteArrayDeserializer or org.apache.kafka.common.serialization.StringDeserializer.

    • value.serializer: This property is used for outgoing channels. Unless you have specific marshaling requirements, you must set this property to org.apache.kafka.common.serialization.ByteArraySerializer or org.apache.kafka.common.serialization.StringSerializer.

topic is another relevant but optional property that might be set for a channel. The topic property contains the Apache Kafka topic name to be used for a channel. If topic property is not set, then the channel name is used as topic name.

For more information about the properties that you can use for a channel when using Apache Kafka connector, see the properties list.

Examples of Smallrye channel mapping

This section describes examples of channel mapping, using the properties mentioned in the previous section.

One Smallrye channel per CloudEvent type

The serverless-workflow-callback-quarkus example application uses two CloudEvent types, including wait (incoming) and resume (outgoing).

Kafka topic names match the CloudEvent types. Therefore, it is suitable to select the mapping strategy of one channel per CloudEvent type. This suggests that two channels must be configured. Note that you do not need to set the optional topic property as the channel name matches the CloudEvent type.

Example property configuration
mp.messaging.incoming.wait.connector=smallrye-kafka
mp.messaging.incoming.wait.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer

mp.messaging.outgoing.resume.connector=smallrye-kafka
mp.messaging.outgoing.resume.value.serializer=org.apache.kafka.common.serialization.StringSerializer
Default channel mapping

The serverless-workflow-events-quarkus example application uses two CloudEvent types, including applicants (incoming) and decisions (outgoing).

As mentioned before, Kafka topic names match the CloudEvent types. However, to use the default channel functionality, kogito_incoming_stream and kogito_outgoing_stream are used as channel names. As explained before, since there is no specific channel name for the CloudEvent type, then the default channels are used. Also, the default channels need to be mapped to the desired topic name using the topic property.

Example property configuration
mp.messaging.incoming.kogito_incoming_stream.connector=smallrye-kafka
mp.messaging.incoming.kogito_incoming_stream.topic=applicants
mp.messaging.incoming.kogito_incoming_stream.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer

mp.messaging.outgoing.kogito_outgoing_stream.connector=smallrye-kafka
mp.messaging.outgoing.kogito_outgoing_stream.topic=decisions
mp.messaging.outgoing.kogito_outgoing_stream.value.serializer=org.apache.kafka.common.serialization.StringSerializer

OnOverflow handling

Smallrye provide means to manage emitter overflow through OnOverflow annotation

Kogito will annotate generated smallrye emitters for a particular channel using the information provided by a property of the form kogito.addon.messaging.emitter.<channel_name>.overflow-strategy. The possible values are BUFFER, NONE, UNBOUNDED, FAIL, and DROP. If the value of the strategy is BUFFER, then you must specify the buffer size by using the property of the form kogito.addon.messaging.emitter.<channel_name>.buffer-size

Therefore, for the wait channel, in the previous example, if we want to buffer as many as 100 events we will add these two properties

Example overflow configuration
kogito.addon.messaging.emitter.wait.overflow-strategy=BUFFER
kogito.addon.messaging.emitter.wait.buffer-size=100

If all your channels use the same strategy and this strategy differs from the BUFFER one (you can change buffer size globally by using the mp.messaging.emitter.default-buffer-size smallrye property). You can set it up by using kogito.addon.messaging.emitter.overflow-strategy=NONE|UNBOUNDED|FAIL|DROP

Found an issue?

If you find an issue or any misleading information, please feel free to report it here. We really appreciate it!