Facilitating the Spread of Knowledge and Innovation in Professional Software Development

Write for InfoQ


Choose your language

InfoQ Homepage Presentations Practical Change Data Streaming Use Cases with Apache Kafka & Debezium

Practical Change Data Streaming Use Cases with Apache Kafka & Debezium



Gunnar Morling discusses practical matters, best practices for running Debezium in production on and off Kubernetes, and the many use cases enabled by Kafka Connect's single message transformations. He talks about how to leverage CDC for reliable microservices integration, e.g. using the outbox pattern, as well as many other CDC applications (maintaining audit logs, driving streaming queries).


Gunnar Morling is a Open Source Software Engineer at RedHat. He is leading the Debezium project, a tool for change data capture (CDC). He is a Java Champion, the spec lead for Bean Validation 2.0 (JSR 380) and has founded multiple open source projects such as Deptective and MapStruct. Prior to joining Red Hat, he worked on a wide range of Java EE projects in the logistics and retail industries.

About the conference

Software is changing the world. QCon empowers software development by facilitating the spread of knowledge and innovation in the developer community. A practitioner-driven conference, QCon is designed for technical team leads, architects, engineering directors, and project managers who influence innovation in their teams.


Morling: When I was learning from Gwen [Shapira] that there would be this track about data architectures, I really was happy because I feel oftentimes data is an aspect of our applications, which we tend to forget. Very often, let's say we talk about microservices, then we tend to discuss, what's the best stack? What's the best framework we should use? How can we run this on Kubernetes? How can we scale this to 10 or 100 instances? Very often, we tend not to think about our data. That's a very important aspect. Your microservices, your applications, they don't exist in isolation. They need to exchange data. One source needs to propagate data to another. That's an important aspect.

Let's say you start with a microservice architecture, oftentimes you won't do this on the greenfield. You will have existing monolithic applications, and now somehow, you need to get your data from the monolith over to the microservice. There's something about data, and it's good we talk about it here. Luckily, we are not the only ones, there's this guy, Bill Clinton, and already in '92, he knew "It's about the data, stupid." This was his statement there. Or, take this one, the former CEO of Microsoft, "Data, data, data." This was his big statement. Also, finally, even some people would name their child data. There must be something to it. Today, I would like to talk about a tool which helps us with dealing with some of the aspects and the challenges around data. This is the Change Data Capture.

To give you a rough idea of what I'm going to talk about, it will be in three large blocks. I will talk about dual writes, this is a very common issue. I will tell you what this is about and how Change Data Capture can help you to avoid those dual write issues. I will talk about a few CDC use cases and patterns, and finally, I will talk about a few practical matters, so how can we run this in a high available topology? How can we run this on Kubernetes?

Why I am standing here? I work as a software engineer at Red Hat, I've been there for a few years. Mostly, I've been working on data-related projects. I've been working on Hibernate, I've been working as the spec lead for the Bean Validation 2.0 spec, and lately, I'm working on this project, Debezium. This is what I'm going to talk about today. It always was related to data.

A Common Problem

Let's get into it. I would like to start with a very common problem which you might encounter in your everyday job. Let's take this example. You have this e-commerce application, you have a microservice, and this probably has a REST API. This allows the user to place purchase order requests, and typically, you would save the data in a database. That's a very common thing to do, nothing too exciting here. Maybe now this business goes well, and you feel you are getting a little bit a victim of your own success. Maybe some views in this application, some parts, they don't run as performance as you would like them to. What do you typically do? You add a cache. Now things look like this: you have this database, and you also have this cache. Now your application has to do two writes. It has to update the data in the database, and at the same time, it needs to update the data in the cache. If you go to the cache and query data from there, you don't have stellar results.

Maybe again, some time continues or progresses, and your product manager comes around, and they would like to have some new functionality. They would like to have some search functionality. Typically, search isn't as great in our relational databases. What do we do? We add something like Elasticsearch, a dedicated search service there. Now we have to do three things. Whenever there's, like, a new request coming in or a purchase order gets updated, we need to update the data in those three resources.

On the surface of it, this could seem like an acceptable thing, but really, there's some problems with this solution. We are tied in terms of the availability. This means our own availability of this order service is limited by the availability of all those three things. Let's say the search service isn't available, so we cannot go there and send the update for a search index. Then what should we do? We could retry this request, we could try to buffer it, but for how long should we do this? This puts us into a bad spot. Or even worse, maybe you have updated already the data in the database, but then for some reason, you cannot go to the search index and update the data there. You already have partially applied this change, and you cannot complete. Now your data is inconsistent. That's a problem.

Consistency even takes this a bit further. Let's say we have two requests coming in, two updates, and they apply to the same purchase order. They update maybe the delivery type or whatever. Let's call them A and B. Now it could happen that you apply A before B in the database, but then B before A in the search index, just because there is no guarantee how those requests would be routed. You have applied all the changes but in an inconsistent order. This means now those resources, they are in an inconsistent state, and you would get back outdated data from your search index. That's the problem.

If you were to leave this talk right now, I would just send you home with this message, friends don't let friends do dual writes. We call this dual writes, going to multiple resources and updating them without having some global transactional boundary. That's not something you should do, and it's not something which you should your friends to do. Don't do that.

A Better Solution

The question is, what can we do instead? If we cannot update multiple resources, we can always update a single one. We can always go to our own database. I would argue, if we cannot update our own database, then we are in deep trouble anyways. We can take this as a given, this thing must be there for us. We just update this, and then the Change Data Capture process comes into play. What it does is it takes the change events from the database and sends them to something like Apache Kafka. It hooks into the transaction log of the database, and then whenever there is a new record, like an insert or an update or a delete, we will capture this from the transaction log and we will create a corresponding change event and send it to Apache Kafka. Now we could set up consumers for the topics in Kafka, and they would update those other resources. There would be a consumer which would update the search index, there would be another consumer which would update our cache.

We don't have those concerns which we had before. We don't have the availability concern because, if the search index isn't there, we just don't care. We can still write to our own database. This will all eventually catch up. Also, if CDC itself wouldn't be available or Kafka, we wouldn't care because when we get back up again, we could continue to read the transaction log from the point where we left off. We don't have the availability concern, and we also don't have this ordering concern because the transaction log in the database, that's the canonical source of the changes as they were applied. It's the serialized form of all the changes. If you get change events out of the transaction log, we have the data changes in their exact order. We don't have this A before B, B before A issue there.

Once we have data and change events in Kafka, this opens up many more doors for us because we can now add more and more consumers for those change events and support more use cases down the road. We can keep data in Kafka for a long time. We have disk space, we can keep it for a month, for a year, for however long we want to, and we could set up those consumers, and they could read those change event topics from the beginning. Then that's the updated data warehouse or whatever new use case you have in mind. That's it in a nutshell.

Change Data Capture with Debezium

Let's talk a little bit about how Debezium fits into all of this. What it essentially is, it's transactional log-based Change Data Capture for a variety of databases. The problem is, the idea to go to the transaction log and get change events from there, that's great, but unfortunately, there is no single API, no single way we could do this for all of the databases. Instead, we need to figure out, how do we get changes out of Postgres? How do we get them out of MySQL? That's an original engineering effort all the time, and this is what Debezium is doing. We figured this out. How do we get changes out of those different databases, then we produce change events, which are rather abstract, rather generic, so you as a consumer, if you go to Kafka, you don't have to care too much from which particular database this is coming from.

There's many more things which could be said around it. I just want to quickly mention this notion of snapshotting. This means that the transaction logs exist in the database, not for our purposes. They exist essentially for two things, for two reasons, which would be transaction recovery and replication. This means if the database figures, "I don't need those logs anymore," it will discard them. This means if your database has been running for some time and now you would set up the CDC process, while it wouldn't have the transaction log files from two years ago. This is where the snapshotting comes in. Essentially, it goes to all the tables you're interested in, so you can configure this, and they'll take a consistent snapshot of the current data, so just scanning all the tables there, all those records, produce something like an insert event. Then once the snapshot is done, it will automatically go over to this log reading mode and continue to read the transaction log from this exact very point in time.

This is all open-source, and there's a very active community around it. I think by now more than 130 people contributed to this, so there's a very active community. You can use it with Kafka, and this is definitely what most of the people do, but then you also can use it as a library within your Java application. That's a possibility. Even, for instance, if you were to use something like Apache Pulsar, it's integrated there right out of the box. That's Debezium in a nutshell.

Debezium Connectors

Let's talk about the connectors. I was mentioning we need to have a dedicated connector for all of those databases. Those are the ones we have: MySQL, Postgres, Mongo, SQL Server. Those are rather mature and stable, and they are used a lot in production. There's a few more where we are working on. One I would like to highlight is this Cassandra one because this is one where the community is having delete. This morning, Chris [Roccomini] from WePay was here, and actually, they contributed this Cassandra connector to Debezium project, and they continue to lead this effort. This is very cool for me to see that also people from the community step up and take the lead on those connectors.

We might add some more. There's DB2, maybe you have heard about this acquisition. Red Hat got bought by IBM, and this means there's this love relationship going on. There are contributors from IBM who already filed a pull request for Debezium to have a DB2 connector. This is really awesome. We will have an open-source CDC connector for DB2 sometime soon.

Log- vs. Query-Based CDC

I was talking a lot about the log-based Change Data Capture, and maybe you're wondering, "I could also do this myself. I could just go to the database and do something like the snapshotting. I could ask my tables, 'Is there a new or an updated record?' I could keep running this in a loop." I would like to spend a few words on that: the query-based CDC versus the log-based CDC. If you have this polling loop, where you go and you ask repeatedly, "Is there new records? Are there updated records in this table?" This creates a conflict of interest because you would like to do this as often as possible, so your data is as up to date as possible.

Then at the same time, this will create lots of load on the database. You would like to run this, let's say, every second, but maybe then your DBA comes and says, "Don't run this query all the time. This is just too much load." There's a problem there. Also, no matter how often you do this, you still would never be sure that you have seen all the changes. It always could happen that between those two polling attempts, you miss updates, or maybe something got inserted and deleted right away, so you would miss out on that. That's a problem.

Then for a polling-based approach, you need to have some column in your table which indicates the last update timestamp. There's an impact on your model, which you wouldn't have with the log-based approach. Also, the log-based approach can give you deletes. If something has been deleted from a table and you poll for it, it would just not be there. You couldn't get it. Whereas a delete also is an additional event in the database's transaction log. This is like an append-only log. We can also get deletes in the log-based way.

There's one thing which I should mention as an advantage of the query-based approaches, and this is, it's easier to install and to configure. As I mentioned, you need to have a dedicated connector for all those databases. If there isn't a log-based connector for the particular database you're using, then you would be out of luck. Whereas with querying, as long as there is some JDBC support, you could do this, and it would be possible. Otherwise, I always would recommend the log-based approach.

Change Event Structure

Let's talk about the structure of change events. You all know that messages in Kafka have a key and a value. In our case, the key of Kafka messages, that's the primary key of our tables. This means if you have multiple change events for the same customer, for the same purchase order, they would, for instance, go into the same partition within this Kafka topic. That's what we have as the key. For the value, we have this complex structure there which has those three parts. We have the before part, the after part, and then we have this source of metadata, which describes where is this change coming from, and so on.

In the before and after, they would describe the structure of the table. For each column, we would have a field there. If it would be an update event, we would have both, before and after. If it's an insert event, we would just have the after. Then if it's a delete event, we would have the before event. For the source metadata, we have things like positioning the log file, transaction ID, maybe the query which was causing this change and a few more things like timestamps, and so on. That's the value.

In terms of serialization in Kafka, everything is by the race. We need to figure out, how can we get this semantical structure we see here into the binary representation? This is where this notion of serializers or converters comes into play. Those allow us to take our data and serialize it into the binary representation. There are a few choices you have: something like JSON, so then messages would look like this. Most of the people in production, I think they would use Avro, which is a very efficient binary protocol, which means methods are very small. Then also, Avro typically is used with a schema registry. This means we don't have to have schema information in your messages, but instead, this would be stored in this registry, again, making our messages very small and efficient. That's definitely what people would use most of the times in production, I would say.

CDC – "Liberation for Your Data

With that being said, let's talk about some use cases. I hope by now you have some idea of how this Change Data Capture works in general. Let's talk about, why would you want to use it, and some patterns around it. Generally, the way I would like to think about it is, this is like liberation for your data. You have this data sitting there in your database, and it's eager for you to come and do something with it, and you don't have to go there and ask, but instead, whenever something changes, it will push to you automatically, then you can react to those changes with very low-latency.

There are many things you could do. You could replicate data; just take the data and put it somewhere else. The search index case would fit into this category, when you're updating a cache. Then you also could use it for things like streaming queries. That's something I find very interesting. You have this continuously running query, which you would re-execute whenever something in the underlying data has changed using Kafka Streams or Flink or something like that. Those CDC events, they could feed into such a streaming query. That's pretty cool.

Microservices are another very interesting use case. We can use CDC to propagate data between microservices. I will touch on that a bit in more depth later on. Ultimately, it's enabling really many use cases.

Data Replication

Let's talk about replication. At the heart of it, we would have Kafka and we would have different topics there. There's this other thing which is called Kafka Connect. Kafka Connect, that's a framework and runtime environment for connectors. We have source connectors, which get data into Kafka, and then we have sink connectors, which take data out of Kafka and put them somewhere else. By now I guess you would have figured Debezium is a set of source connectors. We would deploy the Debezium connectors into Kafka Connect. They would subscribe to the binlog in case of MySQL, to the logical replication stream in case of Postgres, and then they would propagate those changes into corresponding Kafka topics. By default, that would be one topic per captured table.

We could deploy any sink connector. Let's say we would come back to this Elasticsearch use case, we would take the Elasticsearch sink connector, and this would subscribe to those topics, and then take the data and write it out to Elasticsearch. The nice thing about this is, all this is configuration only. I am a Java developer, I like Java coding all day long, but many of our users are not necessarily in that camp. They would like just to use this stuff and maybe configure it. This is what you can do here. You don't have to code, you don't have to implement, you just configure those connectors, and then this pipeline will be running for you.

There's a rich ecosystem of connectors. You could take the JDBC sink connector, and this would allow you to write data to any kind of JDBC-accessible store - maybe a database, a data warehouse, and so on. Or, coming back to the caching use case, if you were to use something like Infinispan as an in-memory, there's a connector for that. It will take the data and write it to Infinispan. There is some sort of sink connector pretty much for every data source. This is very cool.

This is zero-coding, and also, it's low-latency. This is something which I find very interesting. There's this company called Convoy, and I think they are something like Uber for freight forwarding if I'm correct. They were doing a very nice blog post about their CDC pipeline, which they use essentially to stream data out of their production databases into their data warehouses. Before that, they were looking at something like one to two hours latency between the change in the primary database and the point in time when it would be in the data warehouse. Now, they moved to Apache Kafka and Debezium. After some time, it looked like this. You could pretty much see where they switched over to their new pipeline. They were down to a very small amount of time. What I hear from people in the community, you would look at delay or latency, let's say, end-to-end, maybe seconds or even milliseconds typically. With Debezium into Kafka, you would look at 10 or hundreds of milliseconds. It's very low-latency. Definitely check out their blog posts. It's very interesting to see what they have done there. That's replication.


Let's talk a little bit about auditing. I was at this open space earlier today, and somebody was asking about auditing. This is a very common use case. You are obligated to keep the history of your data for some time. Whenever something has changed, you're obligated to keep the history of your customers' purchase orders and so on for some time. If you squint a little bit a Kafka topic with those change events, it already is like an audit log We have the history of all the changes there. To make this a true audit log, in the strictest sense, something is missing. This is metadata. Typically, we are interested in things like, what's the business user? Who did this change? Maybe what's their client IP address, device configuration, maybe use case identifier and all these kinds of things. We cannot get this information out of those tables because, typically, you would not store this information on your tables.

How can we get this? One idea there is, you could do it with a little bit help of your application. You would have another table which would contain this metadata. This is scoped by transactions. You would have a record in this table for each of your transactions, and then the columns would be all the metadata you're interested in, like the user, use case, and so on. You would just insert [inaudible 00:21:25] for each transaction which is going on. You would use Debezium to capture changes from your actual topics or from your actual tables, like customer and so on, and you also would capture changes, the inserts from this transaction table. You would have those two topics. Then you can use some stream processing magic to actually correlate those two things. You could use Kafka Streams, which I really like for that. You would join those two streams of change events, and you would write it back into Apache Kafka. You would have this enriched customer topic there.

How does this look like in terms of an implementation? Let's say this is our actual change event. This is an update, we have some change there for our customer, and you already see there we have this transaction ID in the source metadata block. That's the customer change event. This is our transaction change event. We have the key and the value, and the key SSE is the transaction ID The value would be just the insert event for this record. We can correlate this because we have the transaction ID on both sides. It's the key for the transaction topic, and then we have it again in the source block for the actual change event. We can correlate this, and then we could write it back. Let's say we could divide it into this source block taking the user use case identifier and so on and write it back into this enriched customer topic. That's the general idea.

Implementing this is surprisingly non-trivial. Don't try to figure out the code, I just put it there to make the point - ok, there's some coding involved. The problem is that you run into this issue that you don't have ordering guarantees across multiple Kafka topics. What it means is, at the point in time when you get to process a customer change event, it might happen you have not yet the corresponding transaction event in your hands. This means we need to do some buffering. We need to wait with this customer event until we have received the transaction event, and then we can enrich it, write it back and continue to process. If you have been using Kafka Streams before, you might think, "Ok, I just use a stream to stream join there." The problem is, you can only join on the same key. We would have to have the transaction ID as the key on both sides. This would mean we would lose the ordering of our customer change events. Again, that's not something you should do, or you could do really. If you are interested in this, check out this blog post, an example which discusses this in great length, and you have a complete example which shows the particular implementation of that. That's auditing.

Microservice CDC Patterns

Let's talk about microservices. Everybody is doing microservices, it's the architecture du jour. We should make this point, our microservices don't exist in isolation. They need to have data from amongst each other. Let's say you have this e-commerce application there with systems order, item, stock, and so on. In microservices, you don't want to share databases. They all should have their own database. Maybe it's specifically chosen for this particular purpose, maybe it was the preference of the team, you don't know. It's different databases, and they are not shared.

If this other system receives a request to process a purchase order, it will need to have data from these other two systems. It will have to have the information from the item system. Let's say, what's the weight of this particular item so we can figure out the shipping fees, it will have to have the information from the stock system. Do we even have this item in the warehouse anymore? Can we sell it? What could we do? We could go there and ask some REST API, but then again, we would be back to this point of tight coupling. If one of those systems wouldn't be available, the item system, we could not go there with our synchronous REST request, then we would have a problem there. We can use CDC to turn things around. We could set up the CDC process, and we could publish change event topics for the item and for the stock database. We would have Kafka topics with all the item changes and all the stock changes.

What you could do is we could have the order system subscribe to these topics, and it would take the change events there and materialize a duplicated, redundant view of this data in its own local database. Probably you would just take a subset of the data. You definitely would use a representation which is optimized for the particular needs of the systems that might be denormalized, all these kinds of things. Once the data is in the own local database of the order system, it doesn't have to go to those other systems to do this synchronous data retrieval. That's a good thing.

We have solved this coupling aspect there, but now we should be aware this is all eventual consistent. It might mean we haven't seen the latest item change or we haven't seen the latest stock change. I would argue the reality itself is not transactional. It always could happen that you run into this situation. At some point, I had a friend, and he was working at this e-commerce company, and they were selling flowers. I was meeting him at a birthday party, and he got a call - he was the head of IT - and he should do some data patch because in the warehouse, somebody had tossed over a rack with a couple of hundred flowers, and now they were in the system. He was sitting at the birthday party connected through his mobile to VPN to patch the data there, all this stuff. The point is the reality in on transaction. You need to be prepared to handle these kinds of situations. That's why I think this is a possible way.

There's another concern. You expose the internal structure of your tables. Maybe it's not something you want, you don't want to expose the internal structure of the item table in the item systems database to the outside world. That's definitely a concern. I could understand. This is where this outbox pattern comes into play. Maybe you take a step back again and you say, "I would like to update my own database, and then I would like to send a message to our downstream systems using Kafka." Again, friends don't let friends do dual writes. You cannot - you can, but you shouldn't - update your database and then send a message to Kafka and hope for the best because eventually this would run out of sync, and you would have a problem there. Don't update your database in Kafka without any sorts of transaction guarantees.

Outbox Pattern

This outbox pattern is a very nice way around it. The way this works is, in your source database, you have essentially two tables. Let's say we have this order system here, and now just would like to update its database, and then also would like to notify the shipment service and the customer service that this new purchase order has arrived. What it does is it updates the data in this orders table - that's like the business table - then within the same database there is this outbox table. The outbox table is just a container for events. We produce in [inaudible] and the payload in this outbox table to contents is the event structure which we would like to send to those other consumers. This happens within one transaction. If you roll back, we would not update the order, and we also would not produce this insert into the outbox table.

Now you see this structure, how it could look like. The most important part is the payload column. This is the message which we would like to send. This is now decoupled from the internal structure of the orders table. We could evolve those things independently. We would be very careful to change the format of this message structure there because we know this might impact our downstream consumers. This is naturally decoupled.

I mentioned, by default, we would have one topic per table which we are capturing. This means we would end up with one huge outbox topic in Kafka. Likely, this is not something we want. Instead, we would like to have a topic for each kind of event. We would like to have a topic with all the events, credit worthiness, check events and whatnot. This is where another component from Debezium comes into play. It's the outbox router. This is a configurable thing which allows you to implement this pattern, and then it would route those events from this outbox table to different topics based on a configurable column like this aggregate type there.

We have implemented a very extensive demo, blog post, and everything. You could check this out on this URL down to see the full implementation. That's definitely something which I find very interesting. This has seen a huge uptake of interest in the community. I think we were first blogging about this earlier this year, and since then, maybe like three or five blog posts by other people came out where they were exploring this and sharing this idea. That's the outbox pattern.

Strangler Pattern

Another thing I would like to talk about is, how do we even get to a world of microservices? Very often, we don't start on the greenfield. We have this monolith, and it would just be prohibitively expensive to redo everything at once, we cannot do this. We must find some incremental approach. This strangler pattern is very helpful there. Martin Fowler came up with the name. Apparently, he saw this strangler fig plant which strangle around some tree. They grow there, they strangle it, and at some point, the old tree dies off and just those strangler figs continue to live. This is the analogy tree there.

In our case, what does it mean? Let's say we have this application there. It's a monolith. Hopefully, it's sensibly structured. We have different components there for customer, and so on. Now we would go to our database. Let's say we would want to add some new functionality, maybe a new view with all the purchase orders of this customer. We would like to implement this in a new nicely, cleanly implemented microservice. What could we do? We could use an architecture like this. We would implement this new view in this new microservice there. For now, let's say this is just a read-based use case, just to show data. The key is to have this routing component at the top there. Based on the URL which the user is invoking, this would route this request either to the old monolith, or if it's the URL of this new purchase order view, it would send it to the new microservice.

Of course, this microservice needs to have this customer data, which still is maintained and written by the old monolith. We would use CDC to stream the changes from the monolithic database over to the microservices database. That's the basic idea, and very likely you would have some transformation component in there. Let's say the tables are not structured in the way you would like to have them. The names are not what you want. You would like to have some transformation which cleans this up a little bit, so you have a nicer event structure for the microservice. If you were to work with the DDD approach, this would be like some anti-corruption there.

You would add more and more use cases, and you also could think about adding right use cases to this. The idea there is you should take one component, let's say one table, or, again, in DDD terms, one aggregate and move this over to this new microservice. This service would be in charge of maintaining everything which deals with the customer aggregate, let's say. The same could happen that the other components in the monolith need to be aware of customer changes. Again, you could use CDC to also stream changes back to the monolithic database, so we have the data also there. You just should be very careful that you always have one side which is in charge of the writes. You don't want to update your customer with the monolith entity microservice. This would give you some conflict. You might end up with propagating the same changes forth and back in a loop. That's not something you want. That's the strangler pattern, you can use it to get to the world of microservices.

This pretty much wraps up what I wanted to mention in terms of use cases. There are many more things to mention, but I hope you could get an idea what you can use CDC for. In the last part or the last minutes, I would like to talk about a few practical matters - things like, how can we achieve high availability?

Deployment Topologies

This is the most basic setup you could think of. You would have a database, you would set up the CDC process, and it would just go there and get the changes out of my SQL and write them into Apache Kafka. That's the simple setup. In terms of achieving HA, you would add a secondary node. You would run MySQL in some clustered environment. It would look like this. Debezium would be connected to the primary node, but then at some point, this might go away and the previous secondary node becomes the new master. This means now we need to connect to previous secondary node and get the changes from there. In terms of MySQL, what's important there is you need to have global transaction IDs for that. You can do it.

One problem there is you would have to reconfigure the connector, so it goes now to this new primary node with a different hostname, and so on. It requires a little bit of configuration. What people often do is they add something in the middle, like HA proxy. This would take care of the failover for us. When we have this failover, we wouldn't have to reconfigure the Debezium connector, it still would go to HA proxy, and this would be the one source where it gets the changes from. We just would have to restart it because currently, the connectors, if they lose the database connection, they would not automatically restart, so you would have to do that once, but then otherwise, it's very easy to use. That's HA for the database.

All the databases, typically, must be configured in a particular way so we can use CDC. For MySQL, this means the binlog, the transaction log in MySQL, this must be configured in the row-based mode. There's statement and row-based mode. Maybe you have this application running and this database is running, and this is in the statement-based mode and you don't feel like changing this or it would take some time and something you cannot afford. You could rely on the MySQL replication there, and you could connect Debezium to this secondary node. This would be set up with the correct binlog mode. It would receive the changes from the primary, but then the CDC process doesn't touch the primary process or the primary database directly. This would add up, of course, a little bit of latency because we have this additional help there, but you might want to do this for the reasons explained.

Sometimes people also have this requirement, they would like to have HA for the connectors, so always the guarantee is no change events will get lost. Let's say the Debezium connectors are not running for some time. Maybe you are doing some upgrade, you go to a new version. During that downtime, we wouldn't proceed to stream changes to Kafka, but then when the connector gets restarted, we would continue to read the log from the last offset, which we memorized. You would never lose events, but then sometimes the requirement still is you just don't want to have any downtime there. This is a setup which some Debezium users did and which they shared in a blog post too. They are running two instances of Debezium, and they are connected to the same database, so they have all the change events twice.

Let's say they wanted to update one of those instances, they would still receive the change events from the other one. They wouldn't have the downtime there, but the trade-off is they would have all the change events twice in Kafka. You would need some deduplication functionality which figures out, "I've already seen this change event, I will discard the second one." Otherwise, you would end up with duplicate data. You can quite easily do this based on the offset information which is stored in this source metadata block, so you would be able to figure out, "I've seen this event." That's HA.

Running on Kubernetes

There are different ways how you could run Kafka and Debezium and all those things on Kubernetes. It might be managed, it might be on-prem, under your own control. I don't even want to discuss which one you should use. There are different reasons for those different approaches. If you were to use Kafka on your own Kubernetes setup under your own control, what I definitely would recommend is use this operator-based approach. An operator in Kubernetes terms is a component which takes a declarative resource description and produces some resource in the Kubernetes cluster. In the case of Kafka, our resource descriptions would be YAML files, which describe a Kafka cluster, like how many nodes, or it would describe topics with particular replication factor, all these things. Then this operator would take this description, this YML file, and it would produce a Kafka cluster or a topic or a user and so on which adheres to this description.

The nice thing is, if you change something, so let's say you would like to rescale your Kafka cluster, you would just have to modify this custom resource definition. You would replace like three nodes with five nodes, and the operator automatically would rescale the Kafka cluster based on that. This allows you very easily to deploy Kafka, Kafka Connect, and also to scale it up, scale it down. Upgrading gets very easy. Let's say you wanted to do a rolling upgrade to a new version, the operator would take care of that.

There are different ones which you could use. The one I just would mention is Strimzi. This is an effort which has been started by colleagues of mine, and this is now a sandbox project at the Cloud Native Computing Foundation. This all, again, is open-source, and the idea is to grow this open-source operator together with the open-source community. One of the things they've just added lately is also support for connectors. You would have a custom resource like this, which would allow you to deploy a Debezium connector based on this YML resource.

Let's talk about a running Kafka Connect and Kubernetes. If you have been using Kafka Connect before, you know there's essentially two modes. We have the distributed mode and we have the standalone mode. The one I would always recommend for Kubernetes would be the distributed mode. This is because this makes essentially your pod in terms of Kubernetes status because the Kafka Connect node needs to memorize the offsets of the connectors it's running, and in the distributed mode, this happens within a Kafka topic. If you were to use the standalone mode, this would be based on files of the local file system, so this gets more complicated. Use the distributed mode.

The interesting thing is, what I hear from people in the community, they have this distributed mode, but then they actually run it with a single node and also a single connector. There are a few reasons for that, one is rebalancing. Before Kafka 2.3, so before the current version, if you were to deploy a new connector, it could happen that existing connectors would be moved to different nodes in the cluster. This, of course, it's very disruptive. If you add a new connector, you would not want to impact existing ones. In particular, if a connector is doing a snapshot, this might take a few hours. If this is a large database you're snapshotting, restarting this in the middle is really painful. The rebalancing has gotten much better in 2.3. I have not tried it, but I think it's much better. This is probably solved. Then there's this issue of health checks. You might want to use a Kubernetes liveness probe and things like that to automatically restart your parts if there's a problem with the connector.

If you were to run multiple connectors within a single connect node, you would restart all of them. Kubernetes only can restart the entire part. That's why people oftentimes use a single node and also a single connector. I guess it's a useful structure, in particular, if those connectors just have a single task. We could have connectors with multiple tasks, but those CDC ones, they're for single task, and this means this model is pretty cool.

Then finally, one thing I want to mention, oftentimes or sometimes people have this multi-tenancy use case. They have a database for each of their tenants, and now they would like to stream changes out of all those databases which have the same structure, though they would end up deploying the same connector configuration many times. Somebody mentioned they're using these Jsonnet templates, which is like a JSON extension, which allows them to have variables in there. If you look at the database name or the server name, here they refer to an external variable, which would be like the tenant ID, and then they would process those templates before they deploy this configuration. This allows them to deploy many similar connector configurations.

Single Message Transformations

The last thing to mention in terms of practical matters is single message transformations. Very often, if people come to us, they ask, "I have this date field, and now I would like to change its representation. I would like to have a different format." Very often, the answer is, "Use this single message transformation." The idea there is they can modify messages, either SDR sent into Kafka or SDR taken out of Kafka. You could use it for format conversions, like the time and date stuff, you could use it for routing messages. If you think about this outbox routing component, this is also an SMT, which based on this configurable table column, would send messages to different topics. You could also use it for keeping compatibility. If you should think about this strangler pattern and this anti-corruption they are there, this could be implemented using SMT. This could allow you to rename fields or remove fields, change the types, and so on. That's a very versatile thing and definitely good thing to be aware of.

One particularly interesting use case is this one. You have large table columns, maybe you have a user table, and it has a column with the profile picture of the user, like a binary data. You shouldn't redo this in your database, but sometimes it happens. You have this large column value there, and you don't want to send this to Kafka whenever something has changed in this particular table row. You could use an SMT to externalize this data. It would take the binary, the blob field from the message, and it would write it to some offline storage, like the Amazon S3 API, whatever you want to do. Then, within the actual Kafka message, you would just have a placeholder which identifies the element in the external storage. In the case of S3, it would be the bucket ID and some identifier. Then, a consumer would be able to retrieve the binary data, the image data from this storage [inaudible 00:45:18]. If you have read the "Enterprise Integration Patterns" book, this is also known as the claim check pattern, which it's interesting to see that we also can implement these kinds of patterns using SMTs.


With that, I'm almost done. I hope I could make the case that Change Data Capture is a very powerful tool. It allows you to react to your data changes with very low-latency, and it's like liberation for your data. You can do things like replication, streaming queries. CQRS - I couldn't talk about that, but it's another very interesting use case where you have multiple read models which are optimized for particular use cases. Now you need to keep those read models in sync with this canonical write model. CDC would allow you to implement this. For microservices, we have seen outbox pattern, strangler pattern. There's many, many applications for CDC, and Debezium would be one particular open-source implementation for that.

Of course, the most important takeaway is friends don't let friends do dual writes. That's the thing you should take away from the talk. As I was leading into this, I hope I could make this point - don't forget thinking about your data. This is a very important aspect, and CDC can play an important role there.

There are some resources. We have everything on the website. Those two things which I particularly recommend are the online resources. That's a huge list of blog posts by many other people who blogged about this, how they use Debezium, how they use CDC. In our own blog, we have things like the auditing stuff, the outbox stuff, and so on. You can go to the Debezium Twitter handle to be informed about new releases and everything.


See more presentations with transcripts


Recorded at:

Jan 01, 2020