BT

Facilitating the Spread of Knowledge and Innovation in Professional Software Development

Write for InfoQ

Topics

Choose your language

InfoQ Homepage Articles A Case for Event-Driven Architecture with Mediator Topology

A Case for Event-Driven Architecture with Mediator Topology

This item in japanese

Key Takeaways

  • Event-Driven Architecture is powerful and can be very simple to implement and support if a suitable topology is selected.
  • Open-source frameworks for durable workflows and orchestration can help build reliable systems and save many person-months of custom development.
  • KEDA supports many different metrics and can help configure advanced autoscaling rules where classical CPU-based scaling would not be efficient.
  • Trivial business cases might require sophisticated architecture design to satisfy the requirements.
  • Event-Driven Architecture enables elastic scalability even with an orchestration approach.
     

Today, I want to share a story about a business case where we used Event-Driven Architecture with Mediator topology and some interesting implementation details, such as elastic scalability, reliability, and durable workflows. All were built using Kubernetes, KEDA, AWS, and .NET technologies.

The Business Problem

Let’s start with the business case. In our product, users upload files to later share and review online as part of due diligence processes. But behind the scenes, everything is much more complicated. Each file must be processed. Namely, we convert it to a basic format and optimize it to view on browsers, generate previews, determine the language and recognize text on images, collect metadata, and other operations. The files include documents, pictures, technical drawings, archives (.zip), and even videos.

Sometimes we can get hundreds of thousands of files uploaded in a day, and sometimes there are days without activity. Still, users generally want to start collaborating on a file as soon as possible after uploading it. So we need an architecture that will scale elastically and be cost-effective.

Also, each file carries essential and sensitive business information for the customer. We cannot afford to lose a file somewhere in the middle of the process.

It is clear that when we are talking about hundreds of thousands or even millions of files overall, it is crucial to have good system observability to identify and solve problems when they arise quickly.

Another important detail that can affect the architecture design is that processing one file can include a dozen steps. Each step can last a few seconds to an hour, consuming a lot of CPU and RAM (and IO). We also want to be able to modify the file processing process easily and quickly.

We use 3rd-party SDKs to process files, which are not always reliable, and can sometimes corrupt memory and crash with a memory access violation error or stack overflow, etc.

The Implementation

Let’s now see how we implemented it.

Scalability requirements pushed us to the idea of building a solution based on events. But at the same time, we could not compromise on reliability, observability, and ease of system support.

We chose an Event-Driven Architecture with the Mediator topology pattern. There is a special service called Event Mediator (we call it internally Orchestrator). It receives the initial message to process the file and executes a file-processing script, which we call a workflow. The workflow is a declarative description of what must be done with a particular file as a set of discrete steps. Each step type is implemented as a separate stateless service. In pattern terms, they are called Event Processors, but we call them Converters.

The first diagram shows how it works in general. When the user has uploaded the file, we send a command to the Mediator to process the file. Based on the file type, the Mediator selects the required workflow and starts it. Notably, the Mediator itself does not touch files. Instead, it sends a command to a queue corresponding to a specific operation type and waits for a response. The service that implements this type of operation (a Converter) receives the command from the queue, processes the corresponding file, and sends a command back to the Mediator that the operation is completed and where the processing result is stored. After receiving the answer, the Mediator starts the next step the same way until the entire workflow is finished. The output of one step can be an input for the next step. In the end, the Mediator sends a command to the system that the processing is complete.

[Click on the image to view full-size]

Now that we understand how the solution works, let’s look at how the required architectural characteristics are achieved.

Scalability

Let’s start with scalability.

All services are containerized and run in a Kubernetes cluster in Amazon. Also, a Kubernetes-based Event-Driven Autoscaler (KEDA) component is installed in the Kubernetes cluster. Converters implement the Competing Consumers pattern.

On top of that, scaling rules are configured for each Converter type depending on the queue length. KEDA automatically monitors queues, and for example, if there are 100 text recognition commands in the queue, it will automatically instantiate 100 new pods in the cluster and later automatically remove pods when there are fewer commands in the queue. We specifically chose to scale based on queue length because it works more reliably and transparently than classic CPU scaling. We have many different file processing operations running simultaneously, and the load does not always correlate linearly with the CPU load.

Of course, running new pods requires more nodes in the cluster. The Cluster Autoscaler helps us with this. It monitors the load on the cluster and adds or removes nodes as KEDA scales pods.

One interesting nuance here is that during scale-in, you do not want to stop the container in the middle of the processing file and start over. Luckily, Kubernetes allows you to address it. Kubernetes sends a SIGTERM to signal the intent to terminate. The container can delay termination until the processing of the message is complete by delaying the response. So Kubernetes will wait for a SIGTERM response up to the terminationGracePeriodSeconds value before killing the replica.

OK, converters scale elastically, but what about the Mediator? Could it be the bottleneck in the system?

Yes, the Mediator and the workflow engine can scale horizontally. KEDA scales it too, but this time depending on the number of active workflows. We monitor the size of some lists in Redis, which are used internally by the workflow engine and correlate with the active workflows count and current load.

As I mentioned, the Mediator does not perform any operations other than the orchestration of processes, so its resource consumption is minimal. For example, when we have thousands of active workflows and scale out to 200 converters, only about five instances of Mediator are needed.

[Click on the image to view full-size]

The cluster is homogeneous - we do not have separate node types to run converters and orchestrator instances.

Maintainability and Extensibility

Let’s talk about how easy it is to implement and maintain this system.

The converters are stateless services with fairly simple logic - take a command from the queue, run the processing of the specified file (invoke methods from 3rd-party libraries), save the result, and send a response.

Implementing workflow functionality is very difficult, and I don’t recommend anyone doing it themselves. There are quite mature solutions on the market in which many years and millions of dollars have been invested. For example, Temporal.io, Camunda, Azure Durable Functions, and AWS Step Functions, to name a few.

Because our stack is .NET and we are hosted in AWS, and for several other historical reasons, we chose Daniel Gerlag’s Workflow Core library.

It is lightweight, easy to use, and covers our use cases completely. However, Workflow Core is not under active development. As an alternative, you might look at MassTransit’s State Machine by Chris Patterson, which is actively maintained and has some additional features.

Implementing the Mediator is also simple - the source code is primarily a set of declaratively described workflows in the form of a sequence of steps for each type of file.

[Click on the image to view full-size]

Testing the system is possible at many levels. Covering the workflows with unit tests is easy, as it does not require running the converters or instantiating the orchestrator service. It’s also helpful to check that all steps are invoked as expected, retries and timeout policies and error handling work as expected, steps update the workflow state, etc. The Workflow Core library has built-in support for that. Finally, we can run end-to-end integration tests where we start all converters, the orchestrator, the database, Redis, and queues. Docker compose makes this an easy one-click or command-line option for local development.

So when we need to make changes, it’s just a matter of changing the workflow description or sometimes adding a new converter service to the system to support new operations or trying an alternative solution.

Reliability

Finally, we come to perhaps the most critical aspect of the system - reliability.

Let’s start by identifying what can go wrong - any service can go down at any time, the system load can grow faster than the system can scale, some services/infrastructures can be temporarily unavailable, and there can be code defects, which leads to incorrect processing of files and these files need to be re-processed.

The most straightforward cases for reliability involve the converter services. The service locks a message in the queue when it starts processing and deletes it when it has finished its work and sent the result. If the service crashes, the message will become available again in the queue after a short timeout and can be processed by another instance of the converter. If the load grows faster than new instances are added or there are problems with the infrastructure, messages accumulate in the queue. They will be processed right after the system stabilizes.

In the case of the Mediator, all the heavy lifting is again done by the Workflow Core library. Because all running workflows and their state are stored in the database, if an abnormal termination of the service occurs, the workflows will continue execution from the last recorded state.

Also, we have configurations to retry failed steps, timeouts, alternative scenarios, and limits on the maximum number of parallel workflows.

What’s more, the entire system is idempotent, allowing every operation to be retried safely without side effects and mitigating the concern of duplicate messages being received. AWS S3 policies allow us to remove any temporary files automatically and avoid garbage accumulation from failed operations.

Another benefit of Kubernetes is setting limits and minimum resource requirements for each service, e.g., how much CPU or RAM resources it can use at max and required minimum to start. You do not have to worry about a noisy neighbor problem when several pods are running on the same cluster node, and a memory leak or infinite loop occurs in one of the instances, etc.

[Click on the image to view full-size]

Thanks to the Mediator approach and durable workflows, we have very good system observability. At any moment, we completely understand the stage at which each file is, how many files there are, and other important metrics. In case of defects, we can review the historical data, restart the file processing for the affected files, or take other actions as necessary.

We built dashboards with all the critical metrics of AWS infrastructure and application metrics.

Overall, the system can restore itself even after a large-scale failure.

Conclusion

The end result is a highly scalable system that is easy to extend, modify, and test, with good observability and cost-effectiveness.

Finally, some interesting statistics. It took us only two months to build the walking skeleton of the system, thanks to the use of off-the-shelf components. The recorded peak throughput was about 5,000 files per hour. This is not a capacity limit - we intentionally limited auto-scaling and now lift it gradually to avoid unexpected infrastructure bills, such as those caused by a defect leading to a runaway process. The largest batch uploaded by users in a day was 250,000 files. In total, we have already processed millions of files since we switched to the new solution.

We did have some failures and incidents. Most were related to defects in third-party libraries appearing only in edge cases under heavy load. As responsible engineers, we tried our best to contribute diagnostic information and fixes, and we are very grateful to the OSS maintainers for their support and responsiveness. One lesson we learned here is to have easily configurable limits so that when something goes wrong, you can reduce the load, let the system stabilize, recover, and continue operations under degraded throughput while working on the fix.

About the Author

Rate this Article

Adoption
Style

BT