BT

Facilitating the Spread of Knowledge and Innovation in Professional Software Development

Write for InfoQ

Topics

Choose your language

InfoQ Homepage Articles Testing Quarkus Web Applications: Reactive Messaging, Kafka, and Testcontainers

Testing Quarkus Web Applications: Reactive Messaging, Kafka, and Testcontainers

Lire ce contenu en français

Bookmarks

Key Takeaways

  • Quarkus is a full-stack, Kubernetes-native Java framework that supports many coding styles, including reactive programming.
  • Writing clean unit/component/integration tests for Quarkus applications when a reactive approach is used is vitally important. Here we demonstrate testing reactive code, reactive messaging, and full integration testing.
  • Writing tests for validating reactive code can sound complicated, and the strategy often followed might be to either mock the classes or spin up a Kafka cluster. However, there are other alternatives.
  • It’s important to write integration tests to validate that everything works under circumstances similar to production. When managing the tradeoff between realism and testing execution time, you can use Testcontainers to initialize ephemeral instances of databases and distributed logs/queues.
  • Unit tests are a central pillar to maintain the quality of applications, but component and integration tests are also important. Quarkus excels in supporting these kinds of tests.

Quarkus is a full-stack, Kubernetes-native Java framework made for Java virtual machines (JVMs) and native compilation, optimizing Java specifically for containers and enabling it to become an effective platform for serverless, cloud, and Kubernetes environments.

Instead of reinventing the wheel, Quarkus uses well-known enterprise-grade frameworks backed by standards/specifications and makes them compilable to a binary using Graal VM.

In this article, we’ll learn how to write clean unit/component/integration tests for Quarkus applications when reactive messaging is used. We’ll see how we can write simple and clean tests for the following scenarios:

  • Reactive code
  • Reactive messaging
  • Full integration test

Let’s see how we can write tests for these use cases.

The application

Let’s consider a small use case of a checkout process for an online shop that sells books.

The process of the checkout is composed of the following steps:

  1. The shopping cart is received by the checkout service.
  2. A synchronous HTTP call is done to the discount service to apply any possible discount.
  3. The purchase is stored in the database.
  4. An event is emitted to a Kafka topic.
  5. The event is received asynchronously by the delivery service.
  6. The delivery service processes the purchase (i.e., prepares the package, sends to the customer, etc…).

We aren’t going to inspect the application code in-depth, only the necessary parts required for the tests.

Checkout Service

The checkout service is a REST API endpoint implementing the checkout process of the shopping cart.

@Inject
CheckoutProcess checkoutProcess;


@POST
@Transactional
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
public Response checkout(ShoppingBasket shoppingBasket) {

    Long id = checkoutProcess.checkout(shoppingBasket);

    UriBuilder uriBuilder = UriBuilder.fromResource(ShopResource.class);
    uriBuilder.path(Long.toString(id));

    return Response.created(uriBuilder.build()).build();
}

Business logic is delegated to the CheckoutProcess class.

CheckoutProcess

This class implements the flow to the checkout process. The steps implied in the process are:

  1. The discount service call is done to apply any discount to the final price.
  2. Data is persisted in the database.
  3. An event is emitted to the messaging system.
import javax.json.bind.JsonbBuilder;

import org.eclipse.microprofile.reactive.messaging.Emitter;
import org.eclipse.microprofile.reactive.messaging.Channel;

@ApplicationScoped
public class CheckoutProcess {

   @Inject
   PriceService priceService;

   @Inject
   @Channel("delivery")
   Emitter<String> delivery;

   public Long checkout(ShoppingBasket shoppingBasket) {
       Double total = this.priceService.calculate(shoppingBasket);
      
       shoppingBasket.persist();
      
       Invoice invoice = new Invoice();
       invoice.shoppingBasket = shoppingBasket;
       invoice.total = total;

       invoice.persist();

       delivery.send(JsonbBuilder.create().toJson(shoppingBasket));

       return invoice.id;

   }
}

●    Emitter class sends an event message to the configured channel
●    Channel annotation sets delivery as a channel where messages are sent
●    Shopping cart Java object is marshaled to String using JSON-B spec and sent as event body

DeliveryService

This class is listening to any event that is emitted in the message system and processing them.

import org.eclipse.microprofile.reactive.messaging.Incoming;

@ApplicationScoped
public class DeliveryService {
 
   @Incoming("delivery-warehouse")
   public void deliverPurchase(String shoppingBasket) {
       final ShoppingBasket cart = JsonbBuilder.create().fromJson(shoppingBasket, ShoppingBasket.class);

       System.out.println(String.format("Sending to %s the following items %s", cart.address.toString(), cart.toString()));

   }

}

●    Incoming annotation configures the channel where events are read
●    deliverPurchase method is called for each event and the body content is injected as a parameter

Configuration

Finally, we need to configure the database connection, the Kafka cluster, and the messaging system using the application.properties file.

quarkus.datasource.db-kind=mariadb
quarkus.datasource.username=developer
quarkus.datasource.password=developer
quarkus.datasource.jdbc.url=jdbc:mariadb://localhost:3306/mydb
quarkus.hibernate-orm.database.generation = drop-and-create

%test.quarkus.datasource.db-kind=h2
%test.quarkus.datasource.username=developer
%test.quarkus.datasource.password=developer
%test.quarkus.datasource.jdbc.url=jdbc:h2:mem:mydb
%test.quarkus.hibernate-orm.database.generation = drop-and-create

org.acme.DiscountGateway/mp-rest/url=http://localhost:9000

%prod.kafka.bootstrap.servers=localhost:9092
%prod.mp.messaging.outgoing.delivery.connector=smallrye-kafka
%prod.mp.messaging.incoming.delivery-warehouse.connector=smallrye-kafka

mp.messaging.incoming.delivery-warehouse.topic=delivery
mp.messaging.incoming.delivery-warehouse.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
mp.messaging.outgoing.delivery.topic=delivery
mp.messaging.outgoing.delivery.value.serializer=org.apache.kafka.common.serialization.StringSerializer

The first thing you might notice here is that some properties start with a %. No worries, we’re going to explain them in the following section as it’s an important concept that simplifies the test configuration in Quarkus.

  • In the first part of the file, two data sources are defined -- one default to connect to a MariaDB server and a second one used during the test phase.
  • Then the location of Discount Service using the MicroProfile Rest client extension.
  • Finally, Kafka and messaging systems configuration used only in the production phase (JAR/native packaging).

Regarding messaging system configuration, each of the channels used in the application (delivery and delivery-warehouse) should be configured, otherwise, an in-memory channel will be used by default and this is a key aspect for testing these kinds of applications as we’re going to see it in a moment.

But before we move forward into the testing topic, let’s see what exactly this % is at the beginning of some lines and why it’s really helpful for testing.

Quarkus Profiles

Quarkus supports the notion of configuration profiles. These allow you to have multiple configuration values in the same file and select between them via a profile name.

The syntax for this is .config.key=value.

Quarkus comes with default profiles:

  • dev: Activated in development mode (quarkus:dev)
  • test: Activated when running tests
  • prod: The default profile when not running in development or test mode

Properties not prefixed with a profile are used as default values if they’re not specified in any other profile.

So, if we look back at the configuration file used previously, we’ll notice that:

  • An H2 in-memory database is used when running tests, in any other cases, MariaDB is used as a database.
  • Reactive messaging with Kafka is only used when the production profile is activated. In all other cases, no configuration is provided.

You can create custom profile names and enable them by either setting quarkus.profile system property or QUARKUS_PROFILE environment variable.

quarkus.http.port=9090
%staging.quarkus.http.port=9999

And enable the staging profile by starting Quarkus with -Dquarkus.profile=staging flag.

Additional files

Sometimes, we have several configuration properties to set, and having them all together quickly ends up with one big configuration file making edits of any property error-prone. To avoid this problem, there is the smallrye.config.locations property used to set additional configuration files.

The main benefit of this approach is that we can have one configuration file per running environment: application.properties, application-staging.properties, application-it.properties.

For example, if staging properties are required, we start the Quarkus application setting the following system property: -Dsmallrye.config.locations=application-staging.properties

Setting Profiles in Tests

By default, when running tests, the configuration properties of the test profile are loaded and used during test execution. But some tests might require different configuration test values. Quarkus gives you this flexibility by using TestProfile annotation. Test profiles must implement the QuarkusTestProfile interface to set the new parameters for the test.

All methods of the QuarkusTestProfile interface are default so it’s not mandatory to implement all of them, we only need to implement the required ones for the nature of the test.

import io.quarkus.test.junit.QuarkusTestProfile;

public class MyTestProfile implements QuarkusTestProfile {
 
   @Override
   public Map<String, String> getConfigOverrides() {
       return Collections.singletonMap("greeting.message"," HW");
   }

   @Override
   public Set<Class<?>> getEnabledAlternatives() {
       return Collections.emptySet();
   }


   @Override
   public String getConfigProfile() {
       return "staging";
   }

   @Override
   public List<TestResourceEntry> testResources() {
       return Collections.emptyList();
   }

   @Override
   public boolean disableGlobalTestResources() {
       return false;
   }

   @Override
   public Set<String> tags() {
       return Collections.singleton("test1");
   }

}
  • getConfigOverrides returns an additional config to apply to the test. This overrides any existing config.
  • getEnabledAlternatives sets a list of CDI alternatives to use in the test.
  • getConfigProfile enables the profile to be used in the test. It can be test (it’s the default one) or any other custom profile.
  • testResources sets a list of QuarkusTestResourceLifecycleManager.
  • disableGlobalTestResources returns true then only the test resources returned from testResources are started, and global annotated test resources are ignored.
  • tags set tag names this profile is associated with.
@QuarkusTest
@TestProfile(MyTestProfile.class)
public class MyQuarkusTest {}

MyTestProfile profile is applied when MyQuarkusTest is executed.

Tagging

A test profile can be categorized using tags. In the previous example, MyTestProfile is tagged with the test1 tag. To limit the test execution by tag names the quarkus.test.profile.tags property can be set with the test tags to execute. If not set, all tests are executed.

In the following section, we’re going to see a real example of Quarkus profiles.

Now that we know how to use Quarkus profiles and how they can help us for testing purposes, let’s move forward and learn how to write component tests for the reactive messaging part.

Component Tests for Reactive Messaging

At this point, we want to write a component test for the checkout logic to validate all the parts that compose the process work and that the integrations with the Quarkus framework operate as expected. Of course, as it’s a component test, we don’t want to have any external dependencies (i.e., remote database, Kafka cluster) as this might slow down our component test suite or even make it flaky.

The CheckoutProcess has 3 external dependencies that we need to deal with (price service, persistence, and Kafka cluster):

1.    @Inject PriceService priceService;
2.    shoppingBasket.persist();
3.    @Inject @Channel("delivery") Emitter<String> delivery;

Price Service

There are several strategies for testing the price service; we can choose using mocks, stubs, or a service virtualization tooling like Hoverfly to test the full communication stack. We covered this approach in part two of this series of testing posts Testing Quarkus Web Applications: Writing Clean Component Tests.

Persistence

This topic was covered in part one of this series of testing posts Testing Quarkus Web Applications: Component & Integration Tests. For this specific case, and as you might have noticed in the application.properties file, an in-memory database is used.

Kafka cluster

Writing tests for validating reactive code can sound complicated, and the strategy you may follow might be to either mock the classes or spin up a Kafka cluster. But this feeling is far from the reality thanks to how reactive messaging works.

Quarkus uses MicroProfile Reactive Messaging to interact with Apache Kafka. By default, when a stream (or channel) isn’t configured to connect to a message broker like Apache Kafka or Artemis, then it’s considered as an in-memory stream making the communication between streams fast and reliable.

For testing purposes, an in-memory stream is the best approach as no external server is required but we also need to validate the content sent on the stream. This isn’t supported by default but the MicroProfile Reactive Messaging project provides an artifact to get the content from the in-memory channel.

Let’s register the smallrye-reactive-messaging-in-memory dependency so we can get access to the in-memory stream from our tests.

For example in Maven you should add the following section in pom.xml:

<dependency>
  <groupId>io.smallrye.reactive</groupId>
  <artifactId>smallrye-reactive-messaging-in-memory</artifactId>
  <scope>test</scope>
</dependency>

Then the connector needs to be configured to be the in-memory one instead of the Kafka one. In this case, the test profile is used to configure in-memory connectors only for testing purposes.

%test.mp.messaging.outgoing.delivery.connector=smallrye-in-memory
%test.mp.messaging.incoming.delivery-warehouse.connector=smallrye-in-memory

CheckoutProcess Test

The CheckoutProcessTest isn’t very different from any other Quarkus tests, the only big difference is that the InMemoryConnector class is injected to inspect streaming content.

import io.smallrye.reactive.messaging.connectors.InMemoryConnector;
import io.smallrye.reactive.messaging.connectors.InMemorySink;

@QuarkusTest
@Transactional
public class CheckoutProcessTest {

   @Inject
   CheckoutProcess checkoutProcess;

   @Inject @Any
   InMemoryConnector connector;

   @Test
   public void should_apply_checkout() {
       
       ShoppingBasket shoppingBasket = ShoppingBasketObjectMother.theHobbiBasket();
       checkoutProcess.checkout(shoppingBasket);

       Invoice invoce = Invoice.findInvoiceByTransaction(ShoppingBasketObjectMother.TRANSACTION_ID);

       assertThat(invoce.id).isNotNull();

       InMemorySink<String> queue = connector.sink("delivery");
       String payload = queue.received().get(0).getPayload();

       final ShoppingBasket cart = JsonbBuilder.create().fromJson(payload, ShoppingBasket.class);
       assertThat(cart.transactionId).isEqualTo(ShoppingBasketObjectMother.TRANSACTION_ID);

   }
}
  • The checkout method stores shopping carts in the database, generates a transaction id, and emits an event to the delivery channel.
  • Messages sent to the delivery channel are stored in an in-memory sink named delivery.
  • Use the InMemorySink instance to retrieve the messages and assert them in the test.

DeliveryService Test

The DeliveryServiceTest verifies that an event is processed correctly. If this is the case, the InMemoryConnector is used to emit an event to the delivery-warehouse stream.

import io.smallrye.reactive.messaging.connectors.InMemoryConnector;
import io.smallrye.reactive.messaging.connectors.InMemorySource;

@QuarkusTest
public class DeliveryServiceTest {
 
   @Inject
   DeliveryService deliveryService;

   @Inject @Any
   InMemoryConnector connector;

   @Test
   public void should_process_deliveries() {
       InMemorySource<String> deliveries = connector.source("delivery-warehouse");
       String content = JsonbBuilder.create().toJson(ShoppingBasketObjectMother.theHobbiBasket());
       deliveries.send(content);

       assertThat(deliveryService.getProcessedItems()).hasSize(1);
   }
}
  • Use the InMemorySource instance to send messages to the delivery-warehouse channel/stream.
  • DeliveryService listens to any message sent to the delivery-warehouse channel/stream and processes it.

QuarkusTestResourceLifecycleManager

We’ve seen we can configure in-memory channels in the application.properties file, but there’s also a second way to do it by using a programmatic approach. A QuarkusTestResourceLifecycleManager is required to override the channel configuration to use the in-memory channel. This class was covered in part 2 of this series of posts Testing Quarkus Web Applications: Writing Clean Component Tests.

The InMemoryConnector class has two static methods to set up the in-memory channels, one for incoming and another for outgoing.

public class InMemReactiveMessagingLifecycleManager implements QuarkusTestResourceLifecycleManager {
 
   private Map<String, String> params = new HashMap<>();

   @Override
   public void init(Map<String, String> params) {
       this.params.putAll(params);
   }

   @Override
   public Map<String, String> start() {
       Map<String, String> env = new HashMap<>();
       for (Entry<String, String> con : this.params.entrySet()) {
           switch (con.getValue()) {
               case "incoming": env.putAll(InMemoryConnector.switchIncomingChannelsToInMemory(con.getKey())); break;
               case "outgoing": env.putAll(InMemoryConnector.switchOutgoingChannelsToInMemory(con.getKey())); break;
           }
       }
      
       return env;
   }

   @Override
   public void stop() {
       InMemoryConnector.clear();
   }
}
  • init method is invoked before the start method and sets configuration parameters in the class. The tuples stored in the map are of the form <channel name>=<incoming|outgoing>.
  • The configuration map is iterated to configure accordingly all in-memory channels.

Then we annotate one test with QuarkusTestResource setting the initArgs attribute with the required channels for the tests.

@QuarkusTest
@QuarkusTestResource(value = InMemReactiveMessagingLifecycleManager.class, initArgs = {
   @ResourceArg(value = "incoming", name = "delivery-warehouse"),
   @ResourceArg(value = "outgoing", name = "delivery")
})
public class DeliveryServiceTest {}
  • Two in-memory channels are created, one incoming channel named delivery-warehouse and another outgoing channel named delivery. During test execution, these channels are configured and ready to use.

Integration Test for Reactive Messaging

It’s important to write integration tests to validate that everything works under circumstances similar to production. For this specific example, we need a MariaDB and a Kafka server up and running before integration tests are run. As we showed in part 2 Testing Quarkus Web Applications: Writing Clean Component Tests, Testcontainers is the best solution here to spin up these servers inside a Docker host.

Let’s register the Testcontainers dependencies so both MariaDB and Kafka containers start at the beginning of the integration tests.

For example in Maven you should add the following section in pom.xml:

<dependency>
  <groupId>org.testcontainers</groupId>
  <artifactId>junit-jupiter</artifactId>
  <version>1.15.1</version>
  <scope>test</scope>
</dependency>
<dependency>
  <groupId>org.testcontainers</groupId>
  <artifactId>mariadb</artifactId>
  <version>1.15.1</version>
  <scope>test</scope>
</dependency>
<dependency>
  <groupId>org.testcontainers</groupId>
  <artifactId>kafka</artifactId>
  <version>1.15.1</version>
  <scope>test</scope>
</dependency>

Integration Test

The CheckoutProcessIntegrationTest verifies that the checkout process ends up correctly -- not by relying on in-memory dependencies but on real instances. This test uses a custom TestProfile to set up the application only for integration tests.

@QuarkusTest
@TestProfile(IntegrationTestProfile.class)
public class CheckoutProcessIntegrationTest {

   @Test
   public void should_do_a_checkout() throws InterruptedException {

       ShoppingBasket shoppingBasket = ShoppingBasketObjectMother.theHobbiBasket();
       given()
           .contentType(ContentType.JSON)
           .body(shoppingBasket)
           .when()
           .post("/checkout")
           .then()
           .log().ifValidationFails()
           .statusCode(201);
   }
}

Integration Test Profile

As we’ve seen before, to create a test profile, we need to create a class implementing QuarkusTestProfile. This class must set the specific Quarkus profile for the integration tests and register a QuarkusTestResourceLifecycleManager for starting/stopping the MariaDB and Kafka containers using the Testcontainers framework.

public class IntegrationTestProfile implements QuarkusTestProfile {
 
   @Override
   public String getConfigProfile() {
       return "int-test";
   }

   @Override
   public List<TestResourceEntry> testResources() {
       return Collections.singletonList(new TestResourceEntry(InfrastructureTestResource.class));
   }
}
  • int-test is the active profile when running tests annotated with this class.
  • InfrastructureTestResource implements all logic needed to boot up the required containers.

Quarkus Test Resource

The last piece is a class implementing the QuarkusTestResourceLifecycleManager that integrates with Testcontainers and starts/stops the required containers for integration tests. One of the important characteristics of this class is that the configuration parameters must be prefixed with %int-tests as this is the active profile when using the IntegrationTestProfile.

public class InfrastructureTestResource implements QuarkusTestResourceLifecycleManager {

   static MariaDBContainer<?> db = new MariaDBContainer<>("mariadb:10.3.6")
                                           .withDatabaseName("mydb")
                                           .withUsername("developer")
                                           .withPassword("developer");

   static KafkaContainer kafka = new KafkaContainer();

   @Override
   public Map<String, String> start() {
       db.start();
       kafka.start();

       return configurationParameters();
   }

   private Map<String, String> configurationParameters() {
       final Map<String, String> conf = new HashMap<>();
       conf.put("%int-test.quarkus.datasource.jdbc.url", db.getJdbcUrl());
       conf.put("%int-test.quarkus.datasource.username", "developer");
       conf.put("%int-test.quarkus.datasource.password", "developer");
       conf.put("%int-test.quarkus.datasource.db-kind", "mariadb");
       conf.put("%int-test.quarkus.hibernate-orm.database.generation", "drop-and-create");

       conf.put("%int-test.kafka.bootstrap.servers",  kafka.getBootstrapServers());
       conf.put("%int-test.mp.messaging.outgoing.delivery.connector", "smallrye-kafka");
       conf.put("%int-test.mp.messaging.incoming.delivery-warehouse.connector", "smallrye-kafka");
       return conf;
   }

   @Override
   public void stop() {
       if (db != null) {
           db.stop();
       }

       if (kafka != null) {
           kafka.stop();
       }
   }
}
  • Both MariaDB and Kafka containers start before any integration tests are executed.
  • The application is configured with MariaDB and Kafka container parameters.

This test uses real MariaDB and Kafka clusters instead of an in-memory approach as used in the component tests.

Reactive Code

Quarkus uses Mutiny as a reactive framework. When using reactive messaging it’s easy to end up using mutiny classes in the code. Because of the nature of reactive programming, writing tests for reactive code isn’t the easiest thing to do as most of the things occur asynchronously. To avoid these complexities, Mutiny provides a library to wait and assert for events.

Counting Checkouts

Suppose we want to stream every 5 seconds the number of checkouts processed by the platform. The code might look like this:

import io.smallrye.mutiny.Multi;

public Multi<Long> totalCheckouts() {
       return Multi.createFrom().ticks().every(Duration.ofSeconds(5))
           .map(tick -> ShoppingBasket.count());
}

Every 5 seconds the number of processed elements are counted and sent as an event in the created Multi object.

Testing

Let’s register the Mutiny Test utils dependency to start using the helper classes it provides.
For example in Maven you should add the following section in pom.xml:

<dependency>
  <groupId>io.smallrye.reactive</groupId>
  <artifactId>mutiny-test-utils</artifactId>
  <version>0.13.0</version>
  <scope>test</scope>
</dependency>

The test for this use case only needs to process one shopping cart, wait until the event is fired, and assert that the event value is 1 as it’s the number of processed items.

import io.smallrye.mutiny.helpers.test.AssertSubscriber;

@Test
public void should_stream_total_checkouts() {

    ShoppingBasket shoppingBasket = ShoppingBasketObjectMother.theHobbiBasket();
    checkoutProcess.checkout(shoppingBasket);

    checkoutProcess.totalCheckouts()
        .transform().byTakingFirstItems(1)
        .subscribe().withSubscriber(AssertSubscriber.create(1))
        .await()
        .assertItems(1L)
        .assertCompleted();
}
  • AssertSubscriber is a special class provided by Mutiny for testing Mutiny classes. AssertSubscriber.create(1) sets 1 as an argument because these are the number of elements the test requires.
  • await method waits until the element is available.
  • Asserts the event value is 1 because it’s the total number of checkouts done.

Conclusions

We’ve been digging into Quarkus testing in this series of 3 posts. From persistence tests and integration tests to using Docker containers for testing purposes or reactive applications.

Remember that unit tests are a central pillar to maintain the quality of applications, but component and integration tests are also important and Quarkus excels in supporting these kinds of tests.

Source code in the GitHub repository.

About the Author

Alex Soto is a Director of Developer Experience at Red Hat. He is passionate about the Java world, software automation, and he believes in the open-source software model. Soto is the co-author of Manning | Testing Java Microservices and O'Reilly | Quarkus Cookbook and contributor to several open-source projects. A Java Champion since 2017, he is also an international speaker and teacher at Salle URL University. You can follow him on Twitter (Alex Soto ⚛️) to stay tuned to what’s going on in Kubernetes and Java world.

Rate this Article

Adoption
Style

Hello stranger!

You need to Register an InfoQ account or or login to post comments. But there's so much more behind being registered.

Get the most out of the InfoQ experience.

Allowed html: a,b,br,blockquote,i,li,pre,u,ul,p

Community comments

  • getProcessedItems

    by ok cool,

    Your message is awaiting moderation. Thank you for participating in the discussion.

    Where does getProcessedItems come from????

Allowed html: a,b,br,blockquote,i,li,pre,u,ul,p

Allowed html: a,b,br,blockquote,i,li,pre,u,ul,p

BT