Mocking HTTP CloudEvents sink using WireMock
This document describes how you can test your workflow application that uses HTTP CloudEvents and Knative SinkBinding.
The example described in this document is based on the serverless-workflow-order-processing
example application.
Overview
The workflow application that you want to test must be configured to use Knative Eventing. You must configure Knative Eventing using standard HTTP POST requests to send and receive events between event producers and sinks. The events between the event producers and sinks follow the CloudEvents specification, which enables creating, parsing, sending, and receiving events in any programming language.
When you create an event source, you can specify a sink where events are sent to, from the source. A sink is a transferable or a callable resource that can receive incoming events from other resources. The examples of sink include Kubernetes deployments, Knative services, channels, and brokers.
This document describes the testing of Knative service that is configured as a sink, and the same Knative service is mocked to verify if the CloudEvents are received correctly by the sink. In this process, the WireMock framework adds the mocked server, verifying the CloudEvents received by the sink during the workflow service execution.
Testing a workflow application using SinkBinding
You can test a workflow application using SinkBinding.
-
Your workflow application is working.
For more information about creating a workflow application, see Creating your first workflow service.
-
Your workflow application is configured to use HTTP CloudEvents using SinkBinding.
For more information about enabling event-driven architecture in your workflow application using Knative Eventing, see Consuming and producing events on Knative Eventing.
-
Add the required test dependencies to the
pom.xml
file of your workflow application:Add test dependencies topom.xml
file<dependency> <groupId>io.quarkus</groupId> <artifactId>quarkus-junit5</artifactId> (1) <scope>test</scope> </dependency> <dependency> <groupId>io.rest-assured</groupId> <artifactId>rest-assured</artifactId> (1) <scope>test</scope> </dependency> <dependency> <groupId>com.github.tomakehurst</groupId> <artifactId>wiremock-jre8</artifactId> (2) <scope>test</scope> </dependency> <dependency> <groupId>org.awaitility</groupId> <artifactId>awaitility</artifactId> (3) <scope>test</scope> </dependency>
1 quarkus-junit5
andrest-assured
dependencies are required for HTTP-based testing in JVM mode.2 wiremock-jre8
dependency allows you to mock the server that acts as a sink.3 awaitility
dependency is used to express the expectations of an asynchronous system. For more information, see Awaitility website.You can also see the dependencies added in
pom.xml
file ofserverless-workflow-order-processing
example application. -
Add failsafe maven plug-in to run integration tests:
maven-failsafe-plugin
in thepom.xml
plug-in section<plugin> <artifactId>maven-failsafe-plugin</artifactId> <version>${version.failsafe.plugin}</version> <configuration> <systemPropertyVariables> <java.util.logging.manager>org.jboss.logmanager.LogManager</java.util.logging.manager> <maven.home>${maven.home}</maven.home> </systemPropertyVariables> </configuration> <executions> <execution> <goals> <goal>integration-test</goal> <goal>verify</goal> </goals> </execution> </executions> </plugin>
More information in maven-failsafe-plugin documentation.
-
Create a test class that mocks the sink using WireMock as shown in the following example:
Example of a test class@QuarkusTest(1) public class VerifyWorkflowExecutionIT { (2) private static WireMockServer sink; (3) static { (4) RestAssured.enableLoggingOfRequestAndResponseIfValidationFails(); } /** * Starts the "sink" server, which is the endpoint that will receive our produced events */ @BeforeAll (5) public static void startSink() { sink = new WireMockServer(options().port(8181)); (6) sink.start(); (7) sink.stubFor(post("/").willReturn(aResponse().withBody("ok").withStatus(200))); (8) } @AfterAll (9) public static void stopSink() { if (sink != null) { sink.stop(); (10) } } @Test void processDomesticOrderUnderFraudEval() throws JsonProcessingException, InterruptedException { final ObjectMapper objectMapper = new ObjectMapper(); final Order order = new Order(); order.setId(UUID.randomUUID().toString()); order.setDescription("iPhone 12"); order.setTotal(1001); order.setCountry("US"); given() (4) .header("ce-specversion", "1.0") .header("ce-id", order.getId()) .header("ce-source", "/from/test") .header("ce-type", "orderEvent") .contentType(MediaType.APPLICATION_JSON) .body(objectMapper.writeValueAsString(order)) .post("/") .then() .statusCode(200); await() (11) .atMost(60, SECONDS) .with().pollInterval(1, SECONDS) .untilAsserted(() -> { sink.verify(2, postRequestedFor(urlEqualTo("/")).withRequestBody(containing(order.getId()))); sink.verify(1, postRequestedFor(urlEqualTo("/")).withRequestBody(containing("\"type\":\"fraudEvaluation\"").and(containing("\"id\":\"" + order.getId() + "\"")))); sink.verify(1, postRequestedFor(urlEqualTo("/")).withRequestBody(containing("\"type\":\"domesticShipping\"").and(containing("\"id\":\"" + order.getId() + "\"")))); }); (12) } }
1 @QuarkusTest
starts the Quarkus server for the lifetime of the test execution run. For more information, see Quarkus - Testing your application.2 The test name ends with IT
to identify which test needs to be executed as an integration test. More details in maven-failsafe-plugin documentation.3 WireMockServer
is a mocked server instance that is used for SinkBinding for testing.4 given()
is used to test interactions with the application. For more information about testing interactions, see Testing your workflow application using REST Assured.5 @BeforeAll
annotation is used to signal that the annotated method must be executed before running all the tests.6 Creates a WireMockServer
instance, listening at the port that is passed as a parameter and must match with the sink configuration.7 Starts the server before the tests are executed. 8 Stubs the mocked API response. It accepts a MappingBuilder
instance that is used to build API mapping information, such as URL, request parameters, body, headers, and authorization.9 @AfterAll
annotation is used to signal that the annotated method must be executed after executing all the tests.10 Stops the server after executing all the tests. 11 await()
is added to wait for asynchronous operations.12 verify
verifies if the request hits the mock API using the expected event content.You can check the
VerifyWorkflowExecutionIT
class ofserverless-workflow-order-processing
example application.Start the server before executing the tests, and stop the server once the tests are completed. You can also reset the mock stubs between the tests.
-
Configure your test application to use the
WireMockServer
as a sink.Also, add the reference of
WireMockServer
in theapplication.properties
file as shown in the following example:Example adding sink connection property inapplication.properties
filemp.messaging.outgoing.kogito_outgoing_stream.url=http://0.0.0.0:8181 (1)
1 The port that needs to match with the passed parameter. The parameter is passed when the WireMockServer
is created in the test class.For more information, see
application.properties
file ofserverless-workflow-order-processing
example application. -
To run the tests, execute the following command:
Run the testsmvn clean verify
Example of test execution cycle
The testing example in this document is based on the serverless-workflow-order-processing
example application. The serverless-workflow-order-processing
example application contains three workflows as shown in the following figure:
serverless-workflow-order-processing
exampleThe Order Workflow in the serverless-workflow-order-processing
example application processes the incoming order event and starts a parallel state, which sends requests to two workflows including Fraud Handling and Shipping Handling. The Order Workflow ends when both Fraud Handling and Shipping Handling workflows are completed.
The Fraud Handling workflow produces a FraudEvaluation
event if the received order is more than 1000 USD. In the workflow architecture, any other system or service can read the FraudEvaluation
event and react upon it, such as canceling the order.
Simultaneously, regardless of evaluating the fraud, the Shipping Handling workflow produces events that classify the required shipping service, such as international or domestic. In this example, domestic shipping is classified for any order, containing the address within the United States.
The following figure shows the event flow among the components in the serverless-workflow-order-processing
example application:
Also, the testing components replicate the interactions to verify the events that are received by the sink as shown in the following figure:
Before executing a test, the WireMockServer
starts listening to the configured port as the sink. The sink listens to the events that are produced by the workflows. When a workflow produces an event to the sink, the produced event is received by the WireMockServer
, and then the test verifies the event content.
The processDomesticOrderUnderFraudEval
in the VerifyWorkflowExecutionIT
class, produces events, such as fraudEvaluation
(Total > 1000
) and domesticShipping
(country = "US"
). Also, the order event consumed by the Order Workflow needs to match the requirements as shown in the following example:
final ObjectMapper objectMapper = new ObjectMapper();
final Order order = new Order();
order.setId(UUID.randomUUID().toString());
order.setDescription("iPhone 12");
order.setTotal(1001);
order.setCountry("US");
given()
.header("ce-specversion", "1.0")
.header("ce-id", order.getId())
.header("ce-source", "/from/test")
.header("ce-type", "orderEvent")
.contentType(MediaType.APPLICATION_JSON)
.body(objectMapper.writeValueAsString(order))
.post("/")
.then()
.statusCode(200);
After matching the requirements, the test verifies if the sink is receiving the expected events as shown in the following example:
await()
.atMost(60, SECONDS)
.with().pollInterval(1, SECONDS)
.untilAsserted(() -> {
sink.verify(2, postRequestedFor(urlEqualTo("/")).withRequestBody(containing(order.getId())));
sink.verify(1, postRequestedFor(urlEqualTo("/")).withRequestBody(containing("\"type\":\"fraudEvaluation\"").and(containing("\"id\":\"" + order.getId() + "\""))));
sink.verify(1, postRequestedFor(urlEqualTo("/")).withRequestBody(containing("\"type\":\"domesticShipping\"").and(containing("\"id\":\"" + order.getId() + "\""))));
});
The await()
method in the previous example allows the test to retry the validations until the verifications are declared or until the specified time is expired. In this example, the specified time is 60 seconds.
The following example shows how to check if the sink (WireMockServer
) receives two events for the same order ID:
sink.verify(2, postRequestedFor(urlEqualTo("/")).withRequestBody(containing(order.getId())));
To check the content of the received events, the following verifications can be declared or performed on the types
:
sink.verify(1, postRequestedFor(urlEqualTo("/")).withRequestBody(containing("\"type\":\"fraudEvaluation\"").and(containing("\"id\":\"" + order.getId() + "\""))));
sink.verify(1, postRequestedFor(urlEqualTo("/")).withRequestBody(containing("\"type\":\"domesticShipping\"").and(containing("\"id\":\"" + order.getId() + "\""))));
After declaring the verifications on received events, the test successfully ends and the WireMockServer
stops.
Found an issue?
If you find an issue or any misleading information, please feel free to report it here. We really appreciate it!