BT

Facilitating the Spread of Knowledge and Innovation in Professional Software Development

Write for InfoQ

Topics

Choose your language

InfoQ Homepage Presentations Tales of Kafka @Cloudflare: Lessons Learnt on the Way to 1 Trillion Messages

Tales of Kafka @Cloudflare: Lessons Learnt on the Way to 1 Trillion Messages

Bookmarks
32:21

Summary

Andrea Medda and Matt Boyle discuss Kafka on the way to one trillion messages, and the internal tools used to ease adoption as well as improve resiliency.

Bio

Andrea Medda is Senior Systems Engineer @Cloudflare. Matt Boyle is Engineering Manager @Cloudflare.

About the conference

Software is changing the world. QCon empowers software development by facilitating the spread of knowledge and innovation in the developer community. A practitioner-driven conference, QCon is designed for technical team leads, architects, engineering directors, and project managers who influence innovation in their teams.

Transcript

Boyle: One trillion is 1 million, million. One trillion is over 60 times the amount of UK currency currently in circulation. If you were to start counting to 1 trillion now, it would take you over 31,000 years to finish. However, it took Kafka less than 6 years to emit 1 trillion messages to Kafka, and that's only inter-service communication messages. If we were to consider other message types as well, we would have done this in less than a year.

We're going to tell you the story about how as Cloudflare grew as a company, and also as the engineering function group, we had to build and adapt our tooling to enable us to continue to deliver fast. We're going to do this by firstly timelining from the early days of working in distributed domain-based teams, and how we built abstractions on top of Kafka that helped us reach that magic 1 trillion message mark.

Then I'm going to share with you some real incidents we faced in recent years as our tooling began to hit scalability limitations, as well as describing the steps and patterns we've applied to ensure we can deal with Cloudflare's ever-increasing demand. We'll then wrap up the talk by sharing some of the next steps we're thinking about, as well as sharing some of our key lessons.

I'm Matt Boyle. I'm joined by Andrea Medda. We're both in the application services team at Cloudflare. We'll talk a little bit more about what the application services team at Cloudflare does. We're going to share with you tales of Kafka at Cloudflare.

What Is Cloudflare?

Before we get too deep into explaining the things we've learned along the way, we just want to do some baselining so we all ensure we have the same context. We're going to start at the beginning which is, what is Cloudflare? Cloudflare has a vision of building a better internet, and to do this we provide a global network to our customers.

It enables them to secure websites, APIs, internet traffic, protect their corporate networks, and also to run and deploy entire applications that run on the edge. We do this through products such as CDN, Zero Trust, Cloudflare Workers. It's very simple as it looks like this diagram here. Bad people can't get to your website, good people can. Cloudflare does all the stuff to figure out what determines good and what determines bad.

If we dig a tiny bit deeper into this, from an engineering perspective, we have two main components to the Cloudflare network. We have the Global Edge network. A surprising amount of Cloudflare is built using Cloudflare's own products, so we actually do build and deploy Cloudflare Workers on our Global Edge, the same way as our customers do.

We have a place we can deploy Cloudflare's control plane. This is a collection of data centers where we run Kubernetes, we run Kafka, and we run databases. We actually run the Kafka and the databases on bare metal and we run Kubernetes on bare metal. All our Kafka consumers and producers are deployed typically into Kubernetes.

Where we deploy depends on the workload and what we're trying to achieve. For the purpose of this talk, we're going to be really focusing on the control plane and how we scale inter-service communication and enablement tools there.

Kafka

Then the final thing I just want to cover up before we get into the meat of this talk, is this isn't going to be a talk about Kafka. I just wanted to do some level setting in terms of language and concepts that we're going to use. Kafka is made up of clusters. A Kafka cluster is made up of many brokers. One of these brokers is going to be the leader and it's responsible for coordinating. In our image, Broker 2 is the leader.

Messages that are emitted to Kafka are organized as topics. For example, you may have a topic called user events, and you may emit messages there such as user created, and user info updated, things like that. Topics are further split into partitions. Partitions are how we allow Kafka to scale horizontally. For example, in the above image, you can see that we have partitions for topic A on both brokers. Partitions have leaders too.

These are used to determine who's the source of truth for that partition. Partitions are replicated by a predetermined replication factor for resilience. At Cloudflare we tend to use three as the minimum replication factor, but there can be good reasons to pick a much higher number, or usually a much lower number. A much lower number is usually for speed. Services that push messages to Kafka are called producers. Services that read messages from Kafka are called consumers. Technically, a service could be both a producer and a consumer.

Cloudflare Engineering Culture

Cloudflare engineering culture is critical to a lot of the things we're going to describe here. Cloudflare doesn't really believe in having a diktat for specific tools, or even programming languages. Teams are encouraged to build, run, and own their own services. We therefore encourage experimentation and advocacy for good tools and methods.

This is where application services comes in. Application services is a team that exists within our engineering productivity organization. We're a pretty young team in our current format, but our mission is simple, we want to make it easy for other teams to do the right thing. This means providing batteries-included tooling that enables them to move fast with best practices, and also to automate the automatable.

Allow teams to reduce toil and ensure other teams can focus on delivering value. It hasn't always been like this, though. Cloudflare started life as a monolithic PHP application, much of which is still used in production today. We have many teams contributing to it. Cloudflare as a company does many things, and the monolithic approach was really starting to impact our ability to deliver and deliver safely.

Tight Coupling

This talk isn't about monolith versus microservices, though. I'm going to really zone it on to the one problem we identified and we set out to solve, which is tight coupling. As our engineering team and product offering grew, we needed to start exploring better ways of allowing teams to deliver at their own pace, decouple from their peers. We also needed to give engineers the ability to retry our backoff requests and to have more guarantees about work being completed.

At this point, Cloudflare was already running Kafka clusters that were handling huge volumes of data, so it seemed a great place to invest time. We created the message bus cluster. The message bus cluster is the most general-purpose cluster we run and is available to all our application teams at Cloudflare. Onboarding to it is really simple. It's a case of creating a pull request into a repository we have which we call Salt, and once merge and apply, you're going to get a new topic.

It's going to have your chosen replication strategy, a retention period, ACLs, and all that is all set up and ready to go. If you look at this diagram, you can see some of the decoupling starting to appear. We have three teams emitting messages that our audit log system is interested in. It can listen to all of those, without even being aware, particularly those services exist. If you wanted to add another service that also needed to emit some audit logs, it will just need to produce to the right topic, and we could pick that up immediately. We're already starting to see this looser coupling in effect.

Unstructured Communication

One problem that we did see with this particular approach is unstructured communication. The challenge with having systems that are unaware of each other is if they don't have a strong contract about how to discuss things with each other, it can lead to situations where messages become unprocessable. In fact, if you're building an event driven system correctly, you actually don't want your systems to necessarily be aware of each other, so you don't want them to have that coupling.

Initially, we didn't enforce any message format, and we left it up to the producer teams to decide how to structure their messages. This was a mistake. It led to strong coupling and lots of unprocessable messages. For example, maybe a producer sent a JSON blob, but certain keys were missing, and the consumer team expected them to be there.

To fix this, we started to look around. In the Kafka world, there's only two really good options available here. There's Apache Avro, or there's protobuf. We decided to use protobuf, and we've been really happy with that decision. We'd be using JSON previously, but we found it harder to enforce forward and backwards compatibility. As well as message being substantially larger than the protobuf equivalent.

Protobuf provides strict message types, and it gives us that forwards and backwards compatibility out of the box that we would desire. The ability to generate code in multiple languages is a huge plus as well. We encourage heavy commentary on our protobuf messages, but once we have a merge, we also use a tool called Prototool, which was open sourced by Uber, which does breaking change detection. It enforces some stylistic rules. It is able to generate code for various languages.

On the board here, you can see that we have Email Message View1. This is one of the protobuf Kafka messages that we actually use here at Cloudflare. As you can see, it's very readable. Protobuf really is self-documenting as well, which makes it really nice to work with.

We then took this solution one step further. Because, even though we were using protobuf, we felt like there was more we could do to make this even easier for teams. There were still some issues we were seeing. An example you can see on the board here. Teams could emit different messages to the same topic, and maybe the audit log consumer wasn't expecting that and it wasn't able to process the firewall message because it wasn't in the format they expected.

There's also some things around configuration. Configuring Kafka consumers and producers isn't the easiest thing to do. There's a lot of intricacy that goes into making sure you've got the right settings for your workload. Because nearly every team at Cloudflare was using Go, and we were starting to get a pretty good understanding about what Go looks like, we decided to build what we call the message bus client library, which baked in a bunch of best practices into it, and allows teams to move much faster than they had been able to previously.

One controversial decision we did make was to enforce one protobuf message type per topic to avoid exactly this problem you can see here. This is only configured client side and it's enforced by the client. This was a really good decision to enable easy adoption, but it has led to numerous topics existing.

When you consider that for each topic we create, we had numerous partitions and replicate them with a replication factor of at least three, there's lots of potential to optimize for compute here. This worked really well for us, but it's not necessarily something I would advise every company does.

Teams Using Kafka for the Same Style of Job

Things are looking pretty good. We have a bunch of tooling and abstractions available over Kafka now, which makes it pretty easy for teams to get started. We probably could have left things here and moved on. As we spoke to more engineers, we discovered there were some common use cases and patterns emerging.

This isn't a huge problem, but we did see an opportunity for our team to add more value and to make it even easier for teams to follow best practices, and do the right thing. To fix this, we came up with something that we call the connector framework. The connector framework is based on Kafka connectors, and it allows our engineers to spin up services that can read from one system of record and push it somewhere else, such as Kafka itself, or even Cloudflare's own Quicksilver, which is an Edge database that we wrote.

To make this as easy as possible, we use Cookiecutter to template, and it allows engineers to enter just a few parameters into a CLI, and in return, they get a ready to go service. You can see a rough example of how this might work here. You have a connector, some transformation, and you have a consumer service. I have a more concrete example of something we've built in a couple of slides.

Configuring the connector is really simple. We allow this to happen simply with environment variables. You don't even need to make any code changes. You can see an example here in which we have a reader, which is Kafka, and the writer is Quicksilver, which is our Edge database. Then we have a couple of transformations we're going to apply. We're going to read from topic 1 and topic 2, and we're going to apply a function called pf_edge.

This is it. This is all the configuration you need, and it comes with metrics, alerts, and everything you need to get through into production. This makes it really easy for teams to start to do the right thing. If they want to, they can register custom transformations, which is the only pieces of code they would need to write here.

Here's an example of how our team has used connectors. We own this service called communication preferences service. If you want to opt out of marketing information in the Cloudflare dashboard, this is the service you would be communicating with to do that. It stores on its own database, your communication preference upgrade, but then it also emits a message to Kafka.

We then have three different connectors, which are used to sync that change in communication preference to three different source systems. One is a transactional email service. One is the customer management system, and one is the market email system. This makes this an eventually consistent system. For this particular workload, this is acceptable, and we can use some of the guarantees Kafka provides to make sure that that happens.

Too Many Abstractions

At this point, in our story, we built tons of abstractions over Kafka, in an attempt to make it as easy as possible to deliver value. This was actually working really well for us until it didn't. As we entered the pandemic, the amount of internet usage and Cloudflare customers increased rapidly. We saw a huge increase in throughput through a lot of our systems, and we started to identify scalability issues in some of the abstractions we created.

Visibility

Medda: We're going to approach a bunch of issues that we've seen. We can call them war stories, and how we approached them and we fixed them by enhancing our cluster as well as our SDK. To start with, we can give an example for audit logs. If you're a Kafka customer, you might already know this, but we handle also audit logs, and specifically our teams built a system to manage them.

We allow producer teams to produce these events, and we listen for them and we just record them in our own database. We usually surface this information to our customers via API, and also with this new integration which is called log push, which enables us to push this information into data buckets, such as R2, or S3, or anything else you like.

Just for context, audit logs usually represents an event of a change of a certain resource in your system. For example, if someone goes and deletes a zone on Cloudflare, so a website, this will emit an event which will contain information such as the ID of the zone, the name, the actors or the email of who did this, the IP, what time this happened, and so on.

We have many customers. As we mentioned, we had less spike also in traffic due to the pandemic, so we started registering many more audit logs. Most of our customers care deeply about audit logs, and so they started pulling our APIs to get fresh data. Unfortunately, it wasn't really scalable, and that's why we decided to invest into this pipeline. We created a very tiny service that just listens for these events, transforms these events into the correct format to be pushed directly into a bucket.

This is great, so we will avoid the customers bawling. Unfortunately, when we raised this in production, we have seen massive issues because we started accruing logs, and we were not really able to clear it up very fast. We were breaching our SLOs. With lag in turn, the amount of time and the delay that there is between a message being produced on a partition until it's consumed. Of course, we set also the SLAs on this.

We were not really sure why this was taking so long, because we didn't have any instruments or any tooling at our disposal baked in the SDK to understand, so we were not really sure if it was a matter of reading from Kafka, when there's transformation, or just even saving this data to the database.

We decided to invest some time and enrich our SDK with basic Prometheus metrics. Specifically, in this case, we relied on histograms. Histograms allows us to measure how much the whole flow for each message is taking, as well as each different step is taking. This helped us a lot already to understand that some of these were slower than another one, but there is some lack of correlation, so we can't really tell if a determined bit is taking longer for a certain message compared also to the other bit.

For example, if consuming from Kafka is taking a long time, we can't really relate it with taking long for log pushing on the same message. At this point, we decided to invest into something else, we decided to explore OpenTelemetry. OpenTelemetry is a collection of SDKs and APIs that allow us to collect metrics in a standardized way from different services, even written in different programming languages, it's pretty great.

We specifically decided to invest our efforts into OpenTracing, though this wasn't really easy. There are not many good integration for OpenTracing on Kafka specifically. Also, it's very easy to get this wrong, and not to get your traces being propagated across different services in this instance. For us, it was pretty tricky to do this while we had a production incident.

Just to give you an overview, we were able to wrap our SDK and enrich it with this OpenTracing. With that, we were able to have spans, which track the total duration of this workflow. In this example, you can see 7.91 seconds, and then correlate it actually with each different piece that composes it, which is called a span. Here, we can see that the longest part was definitely pushed into the data bucket, but as well as reading from Kafka, they were both slow.

In this case, we were just able to decide on prioritizing what needed to be fixed. We actually were able to do so and actually be able to create a lag. Up to today, we still do this. Just for context, here, we see an average of 250 messages per second, which is great. Sometimes we have spikes of even 500 messages per second.

This led to something interesting, because we started to add metrics into our SDK. Of course, we were actually in a good spot to have a generic overview on the health of the cluster as well as the health of the services that leverage our SDK and cluster. We were able to see like, how many messages are being produced? How many messages are being consumed per topic? We started also to set up alerts on those, even standardized ones, such as standardized alerts for p50, p90, and so on.

Noisy On-call

This is great, but of course we have lots of metrics. We are just trying and therefore we actually got into the following issue, which is a very noisy on-call. We started having a lot of alerts to do with unhealthy applications, so they were not able to actually reach the broker. We started to see many more lag alerts, just because maybe the lag was already there but now we were actually aware of it, or just issues to do with the health of the Kafka cluster.

We have a very basic alerting pipeline, so we registered these metrics on Prometheus. We have AlertManager deployed to just watch this continuously and just page us on PagerDuty pretty much. Unfortunately, the usual solution to these issues with services that deal with Kafka was to literally restart the service because most of the time there was a deteriorated condition for the network, and it was leading us to just like get paged even during the night and having to restart manually, these services.

Or sometimes, we will have to just scale up, scale down, which is not really ideal. We just started to think like, how can we leverage Kubernetes and what we know to actually get things running smoothly? We decided to invest into health checks. This concept applies even to other types of orchestrators or whatever, as you deploy, where you might deploy your application, so the implementation of health checks, which tell, essentially, if your application is ready to operate or not.

In Kubernetes, we have three types of health checks. We have the liveness ones, the readiness ones, and the startup ones. Liveness tells Kubernetes whether or not your service is ready to run actually. Readiness probes tells Kubernetes whether your service is ready or not to receive HTTP traffic. The startup probe, you can imagine it has a glorified liveness probe for very slow starting services. In brief, it works like this.

Kubernetes, at recurring intervals, pings your service at a certain endpoint. If you specify, for example, here, _livez, and your service has to understand whether it's healthy or not, and it has to reply with a successful status code, or a successful code, depends on how you'd implement this, or an unsuccessful one. If it's successful, Kubernetes says, "Your service is fine." Otherwise, it's not. There are some policies that tell Kubernetes how often your service should fail to be restarted, and so on, just to give you a rough idea.

For Kafka specifically, it doesn't make much sense to implement the readiness probe, because we don't really usually expose an HTTP server there, we just tend to keep things simple and just implement a Kafka consumer with its business logic. Nothing else. We don't expose anything else. We asked ourselves, if we usually have these issues, how can we actually attack them very simply? When we get the request for the liveness check, we can just go and try to perform a very basic operation with a broker, for example, list the topics.

When we get the response, if we can't actually get it and it's erroring, it probably means that there is this issue that I talked about earlier, and so we can just fail the check. Otherwise, we can just pass it. This is all really great. In some instances, these actually improve things a lot. Unfortunately, there are cases where our application is still healthy, is still able to talk with the broker, but in case it's a producer, or a consumer is stuck and it can't produce or consume messages, which is very unfortunate.

We have this situation where the application says, I'm healthy, but actually can't really operate and just deal with the traffic, let's say. We decided to invest some time into implementing smarter health checks for the consumers, because for the producers, we just handle it differently by restarting the application where we receive a certain error from the broker.

In regards to consumers, it's important to keep in mind two concepts for the current offset and the committed offset. The current offset of a partition is the last available offset on the partition. This means that if we have 10 messages on the partition, the current offset is number 9, which is literally the last one. The committed offset instead, is the last committed offset by the certain consumer for the partition. If we consume successfully, 9 messages, then the last committed offset will be the number 8.

By having this concept in mind, where every time we receive a health check, what we do is we retrieve these two offsets. If for some reason we can't do it, there are probably some underlying issues and therefore we report unhealthy. If we can do so, we compare the last committed offset with the current offset, if they are the same, it means that no new messages have been really appended to the partition, and therefore we can consider ourself fine.

Otherwise, we can just check the very last committed offset compared to what we had before. If this doesn't change, it means that probably our application is stuck and it needs to probably be restarted. It might be because of different reasons, might be for rebalances, or other issues. It's just better like this. This led to massive improvements for us, and we started to have much better on-call life as well as also making our customers happy.

Inability to Keep Up

Of course, now we have a way of keeping our applications healthy, as well as understanding what are these lower bits, but we are not accounting for another bit that we started to experience during the pandemic, which is load. We have another story for this. At Cloudflare we also manage the email system. It's a very simple system where we actually allow teams to produce events from Kafka.

These events contain a template, which might be, for example, you are under attack template for email, which contains information about your website under attack, identity of the attack, and so on. This comes with some metadata, usually. We listen for this event, and then we retrieve the template for that specific email from our template registry. We enrich it, we template it, and we actually dispatch it to our customers. That's it.

This is fine until actually it wasn't, because often we've seen spikes into production rate, even 40 requests per second, which is not even amazing, to be honest, but we were making very expensive operations here. Unfortunately, we were not really able to cope up with the consumption rate, always lagging behind, as you can see from here. We had multiple cases where really people were not really happy with us on social media or even on Hacker News, like Jimmy here, not so happy about it for sure. We really needed to look into this because we also send really important OTP messages, ones that expire after 10 minutes. We don't really want to breach those SLOs.

Batching

We started exploring a bunch of solutions for this. The first one that we thought was to just scale the number of partitions and scale the number of consumers and see what we can do. Actually, we didn't see many improvements with that. We actually decided to just take a step back and look at an easier concept, which is actually batch consuming. It's literally consuming n messages at a time, applying a transformation on n messages at a time, and dispatching the n messages at a time.

This can be applied to many other workflows, because we implemented it somewhere else. Thanks to this, we were actually able to cope up very well with high production rates. Actually, there was a case where we were able to chew through like over 10,000 messages really in a matter of seconds, making everyone happy, even our favorite Jimmy here.

Lots of Inbound Questions, Chatops + Documentation

Of course, now we also are able to do so and people are really liking our SDK. We started to see many people using it and many problems with it, so bugs or just people not being really sure on how to implement health checks, for example, on how to interpret certain errors, how to use the SDK capabilities, and so on. The solution to this is quite simple.

There is actually a human side to all of this, which is not surprising, and consists of chat operations and documentation. We decided to set up channels on our Google Chat to allow people to come and ask us questions about this. We had one person on call to deal with these questions and go and answer these questions as fast as they can. Also, trying to spend time documenting all the findings and putting them into our wiki, as well as also getting the answers and putting them in the wiki. This helped us a lot, of course.

Key Lessons

To conclude, we can just see some key lessons from all this story. The first one is about configurability versus simplicity. This is like a very tricky comparison to do, that's from organization to organization. Because of course, like even for us before, we were more on a configurable setup, where each team or even different people, they can really experiment and do different changes very fast, for example, adding tracing, or adding metrics, and so on.

Instead, having simplicity here enables you to actually standardize things. In the case of tracing, for example, standardizing the library allows us to actually get spans and traces working across even different pipelines, which is great. What we really do in our case, like keep it simple, have a very simple API for our SDK. We don't expose much functionality.

We tend to add things on top only when it's actually needed, even though we might have to scratch our head during incidents. The second one is about visibility. Try to think about adding metrics, especially to your SDKs as soon as you can. This will make you accelerate and take better decisions, especially when incidents are there. You will be able to just understand like how your system is behaving and take the next steps to keep it up and running all the time.

The other one is about contracts. Here, we took a very interesting choice, which is like to force people to have one contract, a very strong contract very strict on a certain topic, but this gives us very good visibility on actually what is going on inside the topic. Who's writing to it? Who's reading from it? What's actually there? What's the version? We might want to consider this if you can afford.

Of course, there's getting problems for this. The last one is about documentation. It is very obvious. Of course, make sure to also document the good work that you do, so that you don't have to spend too much time replying to people or helping them debug their production issues.

What's Next?

Boyle: We're just going to finish off this presentation by just talking through the next iterations of all the things that we've talked about here. We're in a pretty good spot now. We've got highly scalable libraries and configuration available to our teams, but we want to do that a little bit more. One thing that we've been working on at the moment is a tool called Gaia. Gaia was the mother of creation, and that's exactly what we want to build. We have a very simple UI where you enter a few details like a project name, namespace, maybe the topic you want to consume from or produce to.

We're going to do all the other things that are necessary to get a service into production, write the on code. We are going to bootstrap the code too, but there may be other tickets we need to create. We need to make a Kubernetes configuration. We need to generate you a CI/CD pipeline. We want to generate you a certificate for connecting to the Kafka. We're going to wait for all these things to be complete, and just send you some instructions about what you need to do next.

This is really powerful, because it means that engineers who are new to Cloudflare, or perhaps people who haven't made a service in a while, can just come along and guarantee they're going to get the best practices when they start it from scratch, rather than cloning an old repo where they might get outdated configuration. Or server teams at Cloudflare that maybe don't always use that language. For example, I spend a lot of time writing in Go, but not a lot of time writing in TypeScript.

Let's say I need to generate a Cloudflare Worker, I don't know too much about that space. Being able to go to a tool like Gaia, and generate a Cloudflare Worker that's production ready and got all the best practices from all the Cloudflare teams that use Cloudflare Workers, is a really powerful paradigm. This is something that we've currently built. It's in an open beta with our teams, but we're going to continue to invest in this product for sure.

The next thing is, you probably notice at the very top of this talk, I spent a lot of time talking about how we don't constrain engineering teams to use a specific language. We don't on purpose, but we have done that by accident in our team. As you've seen throughout this talk, we spent a lot of time talking about how we've made it very easy for teams that use Go to get all the best practices around using Kafka.

We haven't really done that for other languages. Say, a team mainly use Python, but wanted to produce to Kafka, it's probably in their best interest to write it in Go, because they're going to get all of our support, which we don't want to be the case. Something else we would like to be able to do as well is generate client code for any language basically, with ease. Python and Rust are two very popular languages here at Cloudflare. We'd love to be able to have a base protobuf contract and generate all of the client configuration from that. That's something I imagine we're going to be doing in the not-so-distant future.

 

See more presentations with transcripts

 

Recorded at:

Nov 08, 2023

BT