BT

Facilitating the Spread of Knowledge and Innovation in Professional Software Development

Write for InfoQ

Topics

Choose your language

InfoQ Homepage Articles Building Kafka Event-Driven Applications with KafkaFlow

Building Kafka Event-Driven Applications with KafkaFlow

This item in japanese

Key Takeaways

  • KafkaFlow is an open-source project that streamlines Kafka-based event-driven applications, simplifying the development of Kafka consumers and producers.
  • The .NET framework offers an extensive range of features, including middleware, message handlers, type-based deserialization, concurrency control, batch processing, etc.
  • By utilizing middlewares, developers can encapsulate the logic for processing messages, which leads to better separation of concerns and maintainable code.
  • The project can be extended, creating the possibility of customization and the growth of an ecosystem of add-ons.
  • Developers benefit from KafkaFlow by being able to focus on what matters, spending more time on the business logic rather than investing in low-level concerns.

KafkaFlow is an open-source framework by FARFETCH. It helps .NET developers working with Apache Kafka to create event-driven applications. KafkaFlow lets developers easily set up "Consumers" and "Producers." The simplicity makes it an attractive framework for businesses seeking efficiency and robustness in their applications.

In this article, we will explore what KafkaFlow has to offer. If you build Apache Kafka Consumers and Producers using .NET, this article will provide a glance at how KafkaFlow can simplify your life.

Why Should You Care About It?

KafkaFlow provides an abstraction layer over the Confluent .NET Kafka client. It does so while making it easier to use, maintain, and test Kafka consumers and producers.

Imagine you need to build a Client Catalog for marketing initiatives. You will need a service to consume messages that capture new Clients. Once you start laying out your required service, you notice that existing services are not consistent in how they consume messages.

It’s common to see teams struggling and often solving simple problems such as graceful shutdowns. You’ve figured out that you have four different implementations of a JSON serializer across the organization, just to name one of the challenges.

Adopting a framework like KafkaFlow simplifies the process and can speed up the development cycle. KafkaFlow has a set of features designed to enhance the developer experience:

  1. Middlewares: KafkaFlow allows developers to create middlewares to process messages, enabling more control and customization of the Kafka consumer/producer pipeline.
  2. Handlers: Introduces the concept of message handlers, allowing developers to forward message processing from a topic to a message-type dedicated handler.
  3. Deserialization Algorithms: Offers a set of Serialization and Deserialization algorithms out-of-the-box.
  4. Multi-threaded Consumers: Provides multi-threading with message order guaranteed, helping to ensure optimal use of system resources.
  5. Administration API and Dashboard: Provides API and Dashboards to manage Consumers and Consumer groups, with operations such as pausing, resuming, or rewinding offsets, all at runtime.
  6. Consumer Throttling: Provides an easy way to bring priorities to topic consumption.

Let’s explore them so you can see the potential to address a problem like this.

KafkaFlow Producers: Simplified Message Production

Let’s start with message producers.

Producing a Message to Kafka is not rocket science. Even then, KafkaFlow provides a higher-level abstraction over the producer interface from Confluent’s .NET Kafka client, simplifying the code and increasing maintainability.

Here’s an example of how to send a message with a KafkaFlow producer:

await _producers["my-topic-events"]
    .ProduceAsync("my-topic", message.Id.ToString(), message);

This way, you can produce messages to Kafka without dealing directly with serialization or other complexities of the underlying Kafka client.

Not only that, but defining and managing Producers is pleasantly done through a Fluent Interface on your service configuration.

services.AddKafka(kafka => kafka
    .AddCluster(cluster => cluster
        .WithBrokers(new[] { "host:9092" })
        .AddProducer(
            "product-events",
            producer =>
                producer
            ...
        )
    )
);

Producers tend to be simple, but there are some common concerns to address, like compression or serialization. Let’s explore that.

Custom Serialization/Deserialization in KafkaFlow

One of the attractive features of Apache Kafka is being agnostic of data formats. However, that transfers the responsibility to producers and consumers. Without a thoughtful approach, it may lead to many ways to achieve the same result across the system. That makes serialization an obvious use case to be handled by a client framework.

KafkaFlow has serializers available for JSON, Protobuf, and even Avro. Those can be used simply by adding them to the middleware configuration.

.AddProducer<ProductEventsProducer>(producer => producer
       ...
       .AddMiddlewares(middlewares => middleware
           ...
           .AddSerializer<JsonMessageSerializer>()
       )
)

The list is not restricted to those three due to its ability to use custom serializers/deserializers for messages. While Confluent’s .NET Kafka client already supports custom serialization/deserialization, KafkaFlow simplifies the process by providing a more elegant way to handle it.

As an example, to use a custom serializer, you would do something like this:

public class MySerializer : ISerializer
{
       public Task SerializeAsync(object message, Stream output, ISerializerContext context)
       {
             // Serialization logic here
       }

       public async Task<object> DeserializeAsync(Stream input, Type type, ISerializerContext context)
       {
             // Deserialization logic here
       }
}

// Register the custom serializer when setting up the Kafka consumer/producer

.AddProducer<MyProducer>(producer => producer
       ...
       .AddMiddlewares(middlewares => middleware
       	  ...
       	  .AddSerializer<MySerializer>()
       )
)

Message Handling in KafkaFlow

Consumers bring a ton of questions and possibilities. The first one is "How do you handle a message?"

Let’s start with the simplest way. With the advent of libraries like MediatR that popularized the CQRS and the Meditor Patterns, .NET developers got used to decoupling message handlers from the request/message receiver. KafkaFlow brings that same principle to Kafka Consumers.

KafkaFlow message handlers allow developers to define specific logic to process messages from a Kafka topic. KafkaFlow’s message handler structure is designed for better separation of concerns and cleaner, more maintainable code.

Here’s an example of a message handler:

public class MyMessageHandler : IMessageHandler<MyMessageType>
{
    public Task Handle(IMessageContext context, MyMessageType message)
    {
        // Message handling logic here.
    }
}

This handler can be registered in the consumer configuration:

.AddConsumer(consumer => consumer
...
       .AddMiddlewares(middlewares => middlewares
           ...
             .AddTypedHandlers(handlers => handlers
                     .AddHandler<MyMessageHandler>()
              )
       )
)

With this approach, it’s easy to separate Consumers from Handlers, simplifying maintainability and testability.

This may look like unneeded complexity if you have a microservice handling one topic with only one message type. In that case, you can take advantage of middlewares.

Middleware in KafkaFlow

KafkaFlow is middleware-oriented. Maybe you noticed on the Message Handlers snippets a reference to "Middlewares." So, you may be asking yourself what a Middleware is.

Middlewares are what make Typed Handlers possible. Messages are delivered to a middleware pipeline that will be invoked in sequence. You might be familiar with this concept if you have used MediatR pipelines. Also, Middlewares can be used to apply a series of transformations. In other words, a given Middleware can transform the incoming message to the following Middleware.

A Middleware in KafkaFlow encapsulates the logic for processing messages. The pipeline is extensible, allowing developers to add behavior to the message-processing pipeline.

Here’s an example of a middleware:

public class MyMiddleware : IMessageMiddleware
{
    public async Task Invoke(IMessageContext context, MiddlewareDelegate next)
    {
         // Pre-processing logic here.          
        await next(context);          
         // Post-processing logic here.     
    }
}

To use this middleware, it can be registered in the consumer configuration:

.AddConsumer(consumer => consumer
       ...
       .AddMiddlewares(middlewares => middlewares
             ...
             .Add<MyMiddleware>()
         )
)   

This way, developers can plug-in custom logic into the message processing pipeline, providing flexibility and control.

Typed Handlers are a form of Middleware. So, you can even handle a message without a Typed Handler, implementing your middleware, or you can take advantage of Middlewares to build a Message pipeline that performs validations, enrichment, etc., before handling that message.

Handling Concurrency in KafkaFlow

Once you start thinking about infrastructure efficiency, you will notice that many Kafka Consumers are underutilized. The most common implementation is single-threaded, which caps resource utilization. So, when you need to scale, you do it horizontally to keep the desired throughput.

KafkaFlow brings another option to achieve infrastructure efficiency. KafkaFlow gives developers control over how many messages can be processed concurrently by a single consumer. It uses the concept of Workers that can work together consuming a topic.
This functionality allows you to optimize your Kafka consumer to better match your system’s capabilities.

Here’s an example of how to set the number of concurrent workers for a consumer:

.AddConsumer(consumer => consumer
.Topic("topic-name")
       .WithGroupId("sample-group")
       .WithBufferSize(100)
       .WithWorkersCount(10) // Set the number of workers.
       .AddMiddlewares(middlewares => middlewares
       	...
      	)
)

KafkaFlow guarantees order even with concurrent workers.

Batch Processing

With scale, you will face the tradeoff between latency and throughput. To handle that tradeoff, KafkaFlow has an important feature called "Batch Consuming." This feature addresses the need for efficiency and performance in consuming and processing messages from Kafka in a batch-wise manner. It plays an important role in use cases where a group of messages needs to be processed together rather than individually.

What Is Batch Consuming?

Batch consuming is an approach where instead of processing messages atomically as they come in, the system groups several messages together and processes them all at once. This method is more efficient for dealing with large amounts of data, particularly if messages are independent of each other. Performing operations as a batch will lead to an increase in overall performance.

KafkaFlow’s Approach to Batch Consuming

KafkaFlow takes advantage of the system of Middlewares to provide batch processing. The Batch Processing Middleware lets you group messages according to batch size or timespan. Once one of those conditions is reached, the Middleware will forward the group of messages to the next middleware.

services.AddKafka(kafka => kafka
    .AddCluster(cluster => cluster
        .WithBrokers(new[] { "host:9092" })
        .AddConsumer(
            consumerBuilder => consumerBuilder
            ...
            .AddMiddlewares(
                middlewares => middlewares
                    ...
                    .BatchConsume(100, TimeSpan.FromSeconds(10))
                    .Add<HandlingMiddleware>()
            )
        )
    )
);

The Impact of Batch Consuming on Performance

With batch processing, developers can achieve higher throughput in their Kafka-based applications. It allows for faster processing as the overhead associated with initiating and finalizing each processing task is significantly reduced. This leads to an overall increase in system performance.

Also, this approach reduces network I/O operations as data is fetched in larger chunks, which can further improve processing speed, especially in systems where network latency is a concern.

Consumer Administration with KafkaFlow

KafkaFlow also simplifies administrative tasks related to managing Kafka consumers. You can start, stop, pause consumers, rewind offsets, and much more with KafkaFlow’s administrative API.

The Administration API can be used throughout a programming interface, REST API, or a Dashboard UI.

[Click on the image to view full-size]

KafkaFlow administration Dashboard

Consumer Throttling

Often, underlying technologies may not be able to deal with high-load periods in the same way as Kafka Consumers. That can bring stability problems. That is where throttling comes in.

Consumer Throttling is an approach to managing the consumption of messages, enabling applications to dynamically fine-tune the rate at which they consume messages based on metrics.

Prioritization

Imagine you’re running an application where you want to segregate atomic and bulk actions into different consumers and topics. You may prefer to prioritize the processing of atomic actions over bulk actions. Traditionally, managing this differentiation could be challenging, given the potential discrepancies in the rate of message production.

Consumer Throttling is valuable in such instances, allowing you to monitor the consumer lag of the consumer responsible for atomic actions. Based on this metric, you can apply throttling to the consumer handling the bulk actions, ensuring that atomic actions are processed as a priority.

The result? An efficient, flexible, and optimized consumption process.

Adding throttling to a consumer is straightforward with a KafkaFlow fluent interface. Here’s a simple example:

.AddConsumer(
    consumer => consumer
        .Topic("bulk-topic")
        .WithName("bulkConsumer")
        .AddMiddlewares(
            middlewares => middlewares
                .ThrottleConsumer(
                    t => t
                        .ByOtherConsumersLag("singleConsumer")
                        .WithInterval(TimeSpan.FromSeconds(5))
                        .AddAction(a => a.AboveThreshold(10).ApplyDelay(100))
                        .AddAction(a => a.AboveThreshold(100).ApplyDelay(1_000))
                        .AddAction(a => a.AboveThreshold(1_000).ApplyDelay(10_000)))
                .AddSerializer<JsonCoreSerializer>()
        )
)

KafkaFlow: Looking Toward the Future

As of now, KafkaFlow provides a robust, developer-friendly abstraction over Kafka that simplifies building real-time data processing applications with .NET. However, like any active open-source project, it’s continually evolving and improving.

Given the project’s current trajectory, we might anticipate several developments. For instance, KafkaFlow could further enhance its middleware system, providing even more control and flexibility over message processing. We might also see more extensive administrative APIs, providing developers with even greater control over their Kafka clusters.

Being extensible by design, we can expect the KafkaFlow community to grow, leading to more contributions, innovative features, extensions, and support. As more developers and organizations adopt KafkaFlow, we’re likely to see an increase in learning resources, tutorials, case studies, and other community-generated content that can help new users get started and existing users get more from the library.

Conclusion

KafkaFlow is a handy and developer-friendly tool that simplifies work with Kafka in .NET. It shines in the area of developer experience and usability. The framework design lends itself well to clean, readable code. With a clear separation of concerns through middlewares and message handlers, as well as abstractions over complex problems when building applications on top of Apache Kafka, KafkaFlow helps to keep your codebase manageable and understandable.

Furthermore, the community around KafkaFlow is growing. If you’re using Kafka and looking to improve productivity and reliability, KafkaFlow is certainly worth considering.

About the Author

Rate this Article

Adoption
Style

BT