BT

Facilitating the Spread of Knowledge and Innovation in Professional Software Development

Write for InfoQ

Topics

Choose your language

InfoQ Homepage Presentations Building and Scaling a Control Plane for 1000s of Kafka Clusters

Building and Scaling a Control Plane for 1000s of Kafka Clusters

Bookmarks
39:45

Summary

Gwen Shapira and Vivek Sharma discuss some architectural highlights of building, evolving and scaling a control plane for thousands of Kafka clusters, and some challenges encountered.

Bio

Gwen Shapira is a software engineer at Confluent, PMC member at Kafka, and committer Apache Sqoop. Vivek Sharma is a senior software engineer at Confluent.

About the conference

QCon Plus is a virtual conference for senior software engineers and architects that covers the trends, best practices, and solutions leveraged by the world's most innovative software organizations.

Transcript

Shapira: Welcome to QCon Plus presentation about the lessons we learned while building the Confluent Cloud Control Plane. I'm Gwen Shapira. I'm a principal engineer and an engineering manager of the cloud native Kafka team here at Confluent.

Sharma: I'm Vivek. I'm a senior engineer at Confluent. I come here from Uber, where I helped build control plane, and bring a bunch of experience with me. Together, we are building control plane for Confluent.

Welcome To Confluent Cloud

Shapira: We're here to tell you about the control plane we already built, the problems we ran into with it. The control plane we're trying to build to fix all those problems, and the new problems that we've created on the way. Hopefully, there will be some lessons that you will find useful while designing your own architectures.

First of all, Confluent Cloud. The main idea in Confluent Cloud is that we want to make it really easy for everyone to install and manage your own Kafka clusters, and other Confluent products like Connect and KSQL. The main idea is that we have three flavors of clusters, we call them SKUs. You can use basic, standard, or dedicated. Basic and standard are multi-tenant. Dedicated means you have your very own brokers in the cloud. We allow you to size the cluster, but note that we don't size it by number of brokers, because who knows what can a broker do? We size it by CKU, which is a capacity unit. You get certain amount of throughput, certain number of partitions, connections, storage, and so on. It doesn't end when you provision the cluster, after you have a cluster, you can also change its size. All these, ease of use, one-click deployment, one-click resize of Kafka, which those of you who manage know it's not always easy, this is driven by our control plane.

The reason our control plane is even interesting to talk about is that it's really a challenging problem. It's a challenging problem because of scale, but not in the way you think about scale normally. It's not just that we manage thousands of clusters, which we do, but it's also that we have 20 engineering teams working either on the control plane, or on product that the control plane manages. We have four different customer facing services, and they all require slightly different variants of control. We have four different network options, obviously all of those overlap, so now we're at 16 different things customers could do. We have 3 SKUs, and product constantly changes what exactly basic includes, what standard includes, so we have to have flexibility there. We have to support three different cloud vendors with our lovely, ever changing APIs. We need to have some security, because it's your guys' data. It's not like every component, every service can just call everywhere in the world. I said the world because we also have a widely, wide range distributed problem, where our services are pretty much all over the world.

Confluent Cloud Control Plane

The way we solve the problem today is that we have the Confluent Cloud Control Plane. Essentially, it's a monolith. Not quite. We have the mothership. This is the control plane. Then we have the network regions. Those are all the places where customers actually have clusters and data. Those are separated mostly for security, but also because there was really no other way to build it. The nice things that we got right is that the data plane is completely separate. No matter what we do in the control plane, customers only talk to their Kafka clusters, and they get the service, and the data continue flowing no matter what. The only thing that could happen is that they will not be able to resize or provision.

The things that we didn't get right is that we have a monolithic scheduler. This is responsible for provisioning and resizing and upgrading of all our services. Everything that we do always goes through there, which means that any change to the product, any change to the logic, anything that any team wants to do, it goes through there. Every time we try to release it, we actually release a lot of features in different states from different teams. We really have to coordinate those releases to make sure that everyone does not have half a feature in there while we're releasing, and we're not releasing anything that is broken. This is quite painful, in general, working with it because it has so many circular dependencies, so many weakly understood dependencies. It's not an easy component to work with. This is what we're trying to solve with our new architecture.

New Requirements

Every new architecture starts with requirements. We wanted to keep the good stuff, so the data plane stays independent, and users don't notice anything that we do on the control plane. The main thing is that we need to decouple resources. We really need to give each team its own thing to allow to be independently and to move faster. While we do this decoupling we still need to have a trusted source of truth. What does our system have? At the very least, we need to be able to bill the customers on a current state in the last hour of all their systems. This view has to be somewhere. In addition, in order to make the whole thing work, we really need clean dependencies and clean interfaces for all those services to talk to each other to allow us to manage them. We really wanted from version perspective to make it all self-healing. If a service goes down, it will get auto-restarted and sync will just continue on seamlessly. We had to manage state for all those services. We took all those requirements, and we got a great developer to implement them.

Architecture Diagram of New State

Sharma: Let's start with taking a sneak peek at our new architecture. We have four user facing products: Kafka, Connect, KSQL, and Traffic. All the user requests are fronted by a Gateway. Depending on what resource a request is for it gives it to the specific product. We'll speak about Kafka in this particular presentation, but it applies exactly same for Connect, KSQL, or any other CP that we might have in the future. Kafka frontend service takes care of user requests and gives a handle back or ID back to user. It's everything that the user sees. It could be CKUs. It could be the name. It could be anything that a user sees. This translates into something that we call physical Kafka cluster config, which says how many brokers does the CKU need? It's all the physical requirements that this logical Kafka requires to actually get provisioned or realized. This physical Kafka cluster is hosted on capacity or a K8s, which is controlled by capacity controller. Capacity controller basically takes care of spinning and despinning of K8s. That's the underlying infrastructure that we use to host our physical connect, our Kafka cluster. The request from capacity controller then lands into sync service, which goes to satellites, where it's actually realized.

Key Architecture Principles - Desired State System

Now that we saw the new architecture, let's see what are the basic guiding principles or the North Star for the architecture. The three basic guiding principles for us were, desired system state, Layer Cake, and choreography pattern. Let's see what desired state system means. Any system or a service that's responsible for a resource always writes the desired state first, which then would trigger downstream events for other services to take care. All the principle says is we always write the desired state before we take any action that requires other service communication.

Let's take an example where a customer already has a Kafka and wants to expand it to 4 CKUs. The request lands on Gateway. Gateway looks at it, it's a Kafka request, gives it to the Kafka frontend service. Kafka frontend service now realizes, I need to expand the Kafka to be 4 CKUs. Does a bunch of validation or quotas, and writes it to the database. When it writes to database, we have a Debezium connector, which pulls the database for changes and writes an event to Kafka. This event would trigger action for PKC controller, where PKC controller now sees it has a request to expand the Kafka, and it means it needs to expand to 12 brokers. It writes a new desired state, which is 12 brokers to database, which again creates an event on Kafka. This triggers a downstream capacity controller. Fortunately, in this case, we have an autoscaler so we don't really need to spin any more Kafka nodes or node pool, and we write it to database config. It creates config to the database and send it back to the Kafka. Sync service, which is the communication between control plane and all the satellites, reads this, sends it to the satellites where it actually gets realized. When the operations are done, the actual statuses are percolated all the way back up in the exact same manner, but on a different topic. This separation helps us pass the message all the way down and up, by adhering to the principle of desired state.

There was a scenario where we had failures on the downstream, and how did our desired state help us get around it? Say, we want to spin up a new Kafka, it goes to PKC controller, capacity controller. Now, capacity controller realizes we need to spin up a new K8s. Unfortunately, we get an error from the cloud provider because of HTTP 429. It retries, doesn't succeed. Hands it back to capacity controller saying that the state could not be fulfilled. All the way down, it would still keep retrying because the desired state isn't met yet. All the errors are from capacity controller to down. This would keep retrying until the desired state is reached.

Let's see a bit more on how desired state helps us with more operations like expansion and shrinks. Say a customer wants to expand from 2 CKUs to 4 CKUs. Puts in the request. This expansion could take up to 20 minutes. It's not very unlikely to request a different operation like shrink, or maybe even expand it further. What it means is, it would boil down to a different number of brokers that each CKU goes on to. Look into it, like if it was a command based system, it will go from 2 to 4, back to 2, and then again to 6. Since it's a desired state and all we do is maintain the new state, get the config, and pass it down, the end state which is satellites, would need to go to 6 CKUs without having to do any of the shrinks.

Benefits of Persisted Desired State

What are the benefits that we get out of the desired state? Our systems are very recoverable. We have had incidents in the past where customers accidentally deleted the Kafka and asked us to panic recover those Kafka. Since all it is, is a desired state in our database, we could easily recover those Kafkas. We have single source of truth. Each service owns a database and is a source of truth for that particular resource. Just because of the nature of CDC events, which is communication between different components, it's easily auditable. Since each component reacts to different events on the system, each component can be easily testable.

Key Architecture Principles - Layer Cake

Let's dive into the second principle, which is Layer Cake. Why do we need this? Say we want to patch our control plane or even release new services, we need to know the order in which we release the services so that we can upgrade our control plane without having to take any downtime. Second is, we have one control plane, which is HA. In the future, if we want to spin up multiple control planes, we need to know the deterministic way of spinning up services so we can actually do this. In our current architecture, the base is sync service, which comes up first. On the networking side, it's going to be a network controller which speaks to different vendors. These are the ones that come up first. After this, it's going to be capacity controller which speaks to the sync service, or traffic service which speaks to the network controller. These are the second ones to come up. Capacity controller is used by all the CPs that we offer, so Connect and KSQL now can come up. This is used by PKC controller which in turn is used by Kafka, so those can come up in that order. The last to come up is Gateway. Gateway comes up last because if the underlying dependency is not up, it will give errors for those dependencies. Gateway is the last to come up. Now we have a clean way to spin up our control plane.

Now that we know what layering is, let's see how the communication between services work. Say you come in with a request to increase to 4 CKUs on a gateway. Gateway looks, it's a Kafka request. Goes through Kafka frontend service. Kafka frontend service does quota checks, validations. Writes it to a database. This write would trigger a pull on the Debezium connector which reads the database change, and then writes it to a Kafka topic. A refiner on this Kafka would create an event for PKC controller, which now realizes that it needs some action to adhere to this request. This is the same pattern that we try to follow everywhere, but there are some places like Kafka frontend where it speaks to quota service or validation which are still gRPC. This is the world where we are now lined up in.

Key Architecture Principles - Choreography of Microservices

Shapira: Vivek explained how two services talk to each other via write state to the database, pulling an event into Kafka, and then other services reacting to these events. We're going to show how each service interacting with each other in this manner, actually allows us to build a very resilient system out of all those microservices.

What we learned is that we have those services, each one owns a resource. It owns the state of this resource, and it owns the changes to the state. The changes to state are propagated to an event, and theoretically, any other service could get those events. Each service that gets these events owns how to react to these events. The entire logic of what to do when something changes in the system lies within each service. As a bunch of examples, Kafka frontend service owns the customer Kafka configuration, stores it. Every time it changes there is event with a logical config change. The physical Kafka controller knows that every time there is a logical config change, it should examine it. If it requires any physical changes, for example, a name change does not require a physical change, but a change to the size of the cluster obviously does. If there was a change to the physical configuration, you put it in your database. It creates another event. The capacity controller knows that if the capacity of Kafka changed, we may need to also change the capacity in Kubernetes. Again, stores the new capacity. We have all those services, they get events. They understand internally what to do with that. They store the reaction somewhere else, which creates another chain of events.

This idea of a system where each service basically listens to changes in the world, decides how to react to them, and react independently is known as the choreography pattern. It's different from what is known as orchestration pattern, where there is a single service that knows what has to be done. It owns the entire logic of the system, end-to-end, and it issues commands. In an orchestration system, it would be, customer wants something, ok, Kafka frontend, do this, Kafka physical, do that, capacity, do this. All the logic would be in one place, which is a benefit. It also means that, again, if the team does not own the component with its own logic, you have to work in this centralized system, where either there's one team that tries to manage all the logic for every other team, which is not great, or teams have to somehow coordinate work in one service, also quite painful. We really liked the way that gives teams independence, at the cost of not necessarily having all our logic in one place.

The reason this is super important to us is because we have a lot of engineering teams. Engineers are expensive. Their time is valuable. We really want to enable them to be as effective as they can be. Having each team owns its logic, from the customer interaction, to storage, to everything else is super critical for us. We make the tradeoffs to make everyone a lot more productive this way.

Challenges

We're moving to a new system. Clearly, it solves all the world problems without any drawbacks at all, except it really does not. Some of the challenges we have encountered are, first of all, observability is really hard. We said that we really want it to be observable, we want to have a single source of truth. In reality, we still have 20 microservices, each one of them with their own state. We connect it all to Kafka. We build systems that allow us to aggregate the state to report on it. It required us to build a lot of those systems. There is some you can buy off the shelf, but we have an entire observability team responsible just to make sure that we can report on the system. We have another team just for billing on the system again, collecting all this information. Migration turned out to be incredibly difficult. Obviously, we cannot just one day shut down one control plane and bring back a new one, we have to migrate, basically, service by service and area by area. Each team also owns its migration, which makes things more fun. This is not trivial in the sense that you can create a new database, you can even use change capture to migrate data between databases. At some point, the service has to stop talking to database A and start on database B. We cannot really have two parallel services because of our ownership principle. There will be a point where we cut off services with a small downtime, luckily, only on the control plane. The data plane will always stay up.

Testing. We made it really easy for each team to test its own component. Yes, this is great, but there are concerns that are very cross cutting. Security, really, it's not just each component being secure. Things like who is authorized to do what, is the user authenticated? You really need to take care of it across all of the systems in a very unified manner. We ended up with teams that are responsible for those cross cutting concerns, rather than component. We suspected their life is not quite as good as the life we built for the component owners, as they have to talk to a lot of other teams, they have to do a lot of coordination. I wish we had a good solution for them so far. This is where we are. The last one is that we discovered that our internal needs and operations are not the same as what customer needs, because customers work on one cluster at a time, we really need to do things on thousands of clusters at a time. We were behind in building our own tools, and we need to catch up. Again, a large body of work was that we semi-forgot about doing.

Key Lessons

With all those challenges, all those principles, let's close this with some of our key lessons. What worked really well for us is persisting state with Kafka events. It's really the foundation of making something that's recoverable. We have the entire auditable history of every change, because we record the state in an event and not just we added something or we removed something. It makes things recoverable, auditable, and observable. We depend on this system a lot. Layer Cake is fantastic, both in avoiding those cyclic dependencies, which makes it hard to start a service. Also, in debugging and diagnosing the fact that we know exactly who is calling who, that calls only go in a certain direction. We have a very clear dependency graph, it made our life much more manageable than it otherwise would be. Even with those improvements, breaking a monolith is always challenging. If you do it, we recommend, do not leave, how do we test end-to-end concerns and how do we build operational tools, to the end like we did. Start thinking about it very early on.

Questions and Answers

Humble: Could we talk a bit about the Layer Cake model of dependencies and boundaries? I'm presuming that since each service has its own logic for reacting to events, you need to ensure that each layer only modifies its own resources. I got that right?

Shapira: Absolutely. We call it the single writer principle. It's the reason anything works at all.

Humble: That doesn't always quite work out as planned. You want to talk about that first?

Shapira: Basically, when we started rolling out this model, and we were making modifications to the way we write our connectors, and while deploying, we deleted the Debezium connector in the development environment. Basically, the service that owns it, the Connect service, one of the things it did as part of being deleted is it discovered, I'm the last thing that has been using this Kafka cluster, let's delete the Kafka cluster. It sends an API request to delete the Kafka cluster. Luckily, it didn't work. That demonstrates how important it is to really define which service owns what responsibility. If you're an upstream service, you don't really get to delete stuff that belongs to downstream services. You can only delete your stuff and publish an event that says, "I deleted my stuff. Maybe you guys want to do something." Then the downstream services will have their own logic and they will decide if they want to actually delete something as a result, or maybe not do anything at all and ignore the event. I'm glad we caught it in development, it would have been quite disastrous in production.

Sharma: Somebody asked about the frameworks and how does it compare to Confluent Cloud, and why don't we take that as a dependency on our control plane.

If you really look at it, what we get out of those frameworks is what we have in Kafka already. In Kafka, we have a dependency for the communication between our control plane to satellites. Having these additional frameworks is going to be one more dependency for us to worry about. By using Debezium and existing relational databases, we can get those features right into our control plane without taking any additional dependencies. That's the biggest takeaway, why we did not go with an existing framework.

Kafka is event streaming, and that has just very specific frameworks. Happy to use it in your control plane, might need more work, but go for it.

Humble: In terms of resiliency, how do you react or manage the outage of the central control plane?

Shapira: Basically, it's one of our least impactful incidents, because everything that exists keeps on working. If you have a Kafka, and you depend on it for your architecture, like your own Kafka is not going away. That's on the data plane. We have a control plane outage, we may post in the status page, "You now cannot create new clusters, or you now cannot expand your cluster." It is sad, if you cannot create new clusters for an hour.

Sharma: There will be tickets. We would want to go into multi-cloud. That's one of the principles we also wanted to follow because we see ourselves in multi-cloud. For us, we want a script that we can just run and get up and open up. That's one of the reasons why we have these principles going in the future.

Shapira: When you have a control plane in AWS and GCP, we can use Kafka to replicate the events back and forth. That would be fun.

Humble: Are there specific patterns that the microservices follow in order to persist data to its own database and then safely publish its associated event, as this publishing can fail after persisting to the database? Basically, it's a distributed transaction problem.

Shapira: It's not a distributed transaction problem, that's because we use the outbox pattern. Basically, we persist to a database. It has relational database guarantees. I do a commit, and that's it. The event is guaranteed to be persisted in the database no matter what. Then we have Debezium. Its guarantee is that no matter what happens, the event will eventually make it from the database to Kafka. Kafka has its own guarantees that after an event was published to Kafka, no matter what happens, the consumers will eventually see it. If some stuff goes wrong, it may be delayed for a while. I know that if you create a cluster in our cloud, it tells you, "The cluster will be up in like 24 hours." Everyone's like, 24 hours is insane. We gave a very large time span. Eventually, we have a chain of events and we know that each component has really strong guarantees, which is not like distributed transactions, where you actually need two separate things to do two-phase commit in order to have any final guarantees.

Humble: You mentioned the satellite abstraction briefly, can you talk a bit more about it? What are the satellites made up of? What's in there?

Sharma: It's easy, because satellites are basically our data planes. Imagine, AWS U.S.-West 2, anything we spin up there is going to be a satellite. Any K8s that gets spun up there is a satellite. We call it satellite because it directly does operations with customers. The orchestration of satellite is done through control plane. Any I/O creation, deletion, expansion, any of those happen through control plane, but once it gets there, it directly communicates to the customer's I/O. That's what the satellite basically is.

Humble: Then, is that scoped to a single Kubernetes cluster.

Sharma: Is satellite a single Kubernetes cluster?

Humble: Yes.

Sharma: No. We can have multiple of those.

Humble: Can you talk a bit more about the data plane? What does the Kafka data plane consist of?

Shapira: Where is the Kafka data plane installed?

Humble: What's the scope of it? What's the data plane made up of?

Shapira: The data plane is made up of several thousand Kafka clusters, close to 1,000 KSQL clusters, several thousand connectors. Basically, this is what our customers are paying us for. It's our own Kafka that we are running. It's a managed service. You go to our site. You spin up a Kafka. You give us money, and we give you a Kafka cluster in return, and we run it for you. The entire control plane is to allow us to run all those Kafka clusters for our customers. Because our customers are on AWS, GCP, Azure is that we have to spin up Kafka clusters there. They have different security requirements. Some of them want to be on the public internet, some of them want to be on a VPC, some of them like PrivateLink. That's why we have to get creative with which Kubernetes cluster each customer gets. If you are ok with public internet, you will be in the public Kubernetes cluster that has a lot of other clusters on it. That's why satellite can have more than one Kafka cluster. On the other hand, if you want VPC peering, you will have your own VPC with your own Kubernetes, your own Kafka. If you have KSQL, your own KSQL VPC peered to your cluster. The whole thing is that our control plane is just a thin layer to manage all of this. Our control plane is not really this thing we are selling to customers. The best case scenario, it's so easy to use, you'll ignore it completely.

Sharma: Also, if you go back to the Layer Cake, that's exactly how your data plane is going to look like. You'll have a site. You'll have a Kubernetes, PKC, LKCs, and that's what Gwen is referring to the thousands of Kafkas, are basically those customer facing Kafkas. That's literally the site section of data plane in the control plane. We just manage all of that through control plane, but that's how it looks in reality.

Humble: How do you orchestrate multiple Kubernetes resource deployment? Are you using federated mechanisms for that?

Sharma: I think we have our own operator actually. We have our own operator. That's what we use to manage.

Shapira: We have our own operator. Then we also run on managed Kubernetes, so EKS, AKS, and all that. Because every managed Kubernetes is different, we created our own Kubernetes controller on top of that, so internally, if I have some logic that requires a new Kubernetes cluster, I publish an event that says, I am going to use some more resources. The Kubernetes controller gets the event, and it says, when we need more resources, and it knows, we're going to want resources on Azure. Let me talk to the AKS and create those resources, handle all the retries, all the rate limiting, everything that is required. Eventually, Gwen will get the resources that she asked for. We abstract all of it. That's exactly how we created this Layer Cake. We abstract those low-level Kubernetes resources from engineers who only worry about Kafka.

Humble: Do you somehow have a global or federated monitoring of all the Kafka clusters?

Shapira: Yes. Our monitoring is a little bit painful. We use Datadog to basically monitor all of it. We also have our own metrics pipeline for more advanced analytics on our metrics. It would be its own architecture talk, how do we actually observe those thousands of clusters? Because we've written low collections, metrics collection. We just did a lot of telemetry work on its own right.

Sharma: We use Kafka.

Humble: Do you see the need for service mesh for service to service communication?

Sharma: I think right now, our real proxies at the site where we have to do all of these routing principles within the data plane, only at the site. On the control plane, I think we have only a handful of services. I think service mesh, to me, that's yet another dependency. Right now, it's still manageable. It's not even crossed hundreds of services on the control plane yet. Maybe in the future, but I don't think we need it right now.

Shapira: What we've implemented is so much closer to data mesh than it is to service mesh, because of the ways that our services communicate more via persistent events than via gRPC. I can't say that we will never have a service mesh, but I think we went in a slightly different direction.

Humble: Do you have a need where you have to rebuild the entire control plane or data plane in the event of a major vulnerability?

Shapira: Rebuilding the entire data plane? We tried. It's time consuming. I think the same is true for the control plane, but it's a lot easier.

Sharma: Yes. We do have to build our data plane when a cloud provider starts a new region. It's time consuming. We never had a need for that on the control plane.

Shapira: We had to upgrade our entire data plane in about three days. Yes, it's intense, but definitely do. If we need to rebuild the whole control plane, I think that's a lot easier. That's actually where the Layer Cake helps us a lot to replicate. We do it every once in a while, when we decide to have a new testing environment.

Sharma: All our upgrades do follow that. Yes.

Humble: If you have a global customer who operates out of the U.S. and EU regions, are you able to segregate and keep the EU workloads in the EU and the U.S. workloads in the U.S.?

Shapira: Of course. That would be not only really weird to mix them up, also, quite illegal, I believe. Obviously, every resource, at the highest level, when it gets created, it gets created within a cloud [inaudible 00:38:39] or within a region. This is information that we maintain consistent all over the whole stack. We also bill differently, so probably we'll have our own finance people really upset if you mix everything up.

Sharma: Yes, the details, transfer cost, everything, what would happen to that?

Shapira: Definitely. You basically get assigned to a Kubernetes resource in your region, and networking origin, and everything would be in your region.

Sharma: Exactly.

Shapira: Now we have people asking for zones.

 

See more presentations with transcripts

 

Recorded at:

Nov 06, 2021

BT