BT

Facilitating the Spread of Knowledge and Innovation in Professional Software Development

Write for InfoQ

Topics

Choose your language

InfoQ Homepage Presentations Change Data Capture for Microservices

Change Data Capture for Microservices

Bookmarks
50:45

Summary

Gunnar Morling discusses how change data capture (CDC) and stream processing can help developers with typical challenges they often face when working on microservices.

Bio

Gunnar Morling is a software engineer and open-source enthusiast by heart, currently working at Decodable on stream processing based on Apache Flink. In his prior role as a software engineer at Red Hat, he led the Debezium project, a distributed platform for change data capture. He is a Java Champion and has founded multiple open source projects such as JfrUnit, kcctl, and MapStruct.

About the conference

Software is changing the world. QCon empowers software development by facilitating the spread of knowledge and innovation in the developer community. A practitioner-driven conference, QCon is designed for technical team leads, architects, engineering directors, and project managers who influence innovation in their teams.

Transcript

Morling: Welcome to this talk about Change Data Capture for microservices. Let me set the scene a little bit with a maybe blunt statement and an observation. The world around us, this is happening in real time. People buy stuff in an online store, maybe they do some payment transactions. Maybe you have machinery or IoT devices, which send over measurements or all kinds of sensor data. Now we build software to model, to represent this real world. The data in this software also should be real time. It should be really up to date, because otherwise it's just losing very quickly in its value. Let's say you build some dashboard, which tells you how your business is going. This needs to be really up to date, because if it works on the data from yesterday, or on the data from last week, it would just not be as insightful to you, and the information wouldn't be as relevant to you. That's the idea. The world is real time, our data also needs to be real time.

Outline

In this talk, I would like to talk about one concept and one tool, Change Data Capture, which can help us to build software and systems which live up to this real time promise. This is what we are going to talk about, Change Data Capture as a tool, as part of our toolbox. Secondly, a few use cases in the context specifically of microservices for Change Data Capture, or CDC for short. Lastly, also I want to talk a little bit about some of the challenges which you might encounter when deploying CDC into practice.

Background

I work as a software engineer at Decodable, where we build a fully managed data stream processing platform. One of the tools is Debezium. It's all based on Apache Flink. This explains a little bit why I do this talk and why I'm interested in this space. Maybe you have also seen previous talks from me that I spoke about Debezium and CDC in more depth, just because I've been the Debezium project lead for quite a while in my previous job at Red Hat.

Debezium - Log-Based Change Data Capture

Let's talk about Debezium. I just want to do a brief recap here what the general idea is, and then touch on a few features, which are maybe not as widely known yet, and I would like to get out the word about those. What is it about? It is about capturing changes from your database. Whenever something happens in your database, let's say Postgres or MySQL, then this insert, or update, or delete, it will be appended to the transaction log. CDC is just a process of extracting those change events from the transaction log, and propagating them to all kinds of consumers. Now maybe you wonder, there might be different approaches for reacting to changes out of a database. Maybe I could implement a trigger-based approach, where I essentially set up triggers for all the tables I'm interested in. Or maybe you think about some polling-based approach where in a continuously running loop, you would go to your tables and query them for what are the latest changes happening in the last few minutes. Really, this log-based approach, it's the most advantageous one because it gives you a very low latency. It doesn't impact your data model. It also will never miss any changes. Let's say if you do a polling-based approach, you might for instance, miss two changes which happen between two of your polling loop runs. This could never happen with a log-based approach. That's essentially what it is.

The question is, why would you be interested in using such a tool or such a concept? I would always think about as a huge enabler for your data. Once you are able to react to your data changes with a low latency, and it could, for instance, be like two-digit or three-digit milliseconds of latency, you can use that information for driving all kinds of use cases. Just for instance, to replicate your data into a data warehouse, such as Snowflake or Apache Pinot, maybe to replicate your data into a search index or a cache. That's all replication in the widest sense. Then you also could use CDC, for instance, to run streaming queries, to continuously recompute queries and have up to date results, or maybe analytical query results over your data. You could use it to exchange data between microservices. You could use it to build audit logs. It's a very powerful tool. I've been working with Debezium and CDC for the last 5-and-a-bit years, and I still discover new use cases all the time. I feel it's a very versatile tool, which you should have in your tool belt if you work on data driven applications.

Just to give a little bit of an overview of what there is in Debezium. It's a fully open source project. It's Apache 2 licensed. It's a fully capable CDC platform. It means it comes with all kinds of things like initial snapshotting support. There's a UI. There's all kinds of ways for massaging your data, like filtering it out or transforming your data, all this stuff. It has a very active, very lively community around it. Up to this point, more than 350 people have contributed to Debezium, in one way or another. There's massive deployments out in the field where people use it to capture changes out of hundreds or even thousands of databases. It really is battle proven, and has proven its value in this large-scale environment.

Debezium: Data Change Events

Just in terms of how do those change events look like, just to recap a little bit if you haven't been using it before. The change events, most importantly, they describe the old and the new state of a row in your database. It could also be, for instance, a document in terms of MongoDB, or like rows in Cassandra, so it's not only limited to relational databases. Within those before and after parts, they are structured as your actual database tables. For each column, essentially, there will be a field by default. Then there's all kinds of metadata, like what's the table where this event is coming from the database, maybe transaction ID, the position in the transaction log. Maybe even the query, which caused the change. For instance, we can capture that in the case of MySQL, so all sorts of metadata. Then of course, also things like a timestamp, type discriminator, all this stuff, so you can process this event in a meaningful way.

Debezium: Becoming the De Facto CDC Standard

I would like to touch a little bit on some of the maybe newer or lesser-known features. One of the things I'm particularly excited about is it really is establishing itself as a de facto standard for CDC. That's a very interesting development over the last few years, I would say that not only the Debezium project itself is working on more connectors, but also other vendors. Let's take ScyllaDB as an example, who work on a Cassandra compatible NoSQL database. They decided to build their Kafka CDC connector based on the Debezium connector framework. Or just recently, Google announced their CDC connector for Google Cloud Spanner, their highly scalable database. Again, also based on Debezium. Actually, they even developed this as part of the Debezium project umbrella. All this is to say that all those connectors, they give you one unified change event format, as we have seen on the previous slides. This means now, for you, as a user, really it doesn't matter. Does this event come from Postgres? Does it come from ScyllaDB? Does it come from Google Cloud Spanner, or any of the other supported databases? You can implement your consuming logic in a uniform way, which makes it, of course, rather easy and simple for you to adopt this. Really, that's very interesting for me to see. Also, for instance, Yugabyte, they also develop a Debezium based CDC connector.

Debezium - Deployment Options

Most of the times, people use Debezium actually with Kafka. Maybe 90% of the Debezium users, they are working with Debezium on top of Apache Kafka, which essentially means Debezium in this deployment scenario, as we see it here on the left-hand side, it's essentially a set of Kafka source connectors. They take data out of a database, and put those change events into Apache Kafka. By default, there would be one topic per table you're capturing. Then you could use any kinds of Kafka sync connectors to take the change events to your external systems. Then we also realized, there's people who would like to use CDC, they would like to use Debezium, but maybe they are not on Kafka. Maybe they use something like Apache Pulsar, AWS Kinesis, Google Cloud Pub/Sub, Redis streams, all those kinds of things. Of course, we thought also, those users should be able to benefit from Debezium. This is where Debezium server on the right-hand side comes into the picture. Essentially, that's a ready-made runtime for connecting the Debezium connectors with all those kinds of messaging, or streaming platforms.

At the heart of Debezium server, there's what we call the Debezium engine. Essentially, that's just a library module, which allows you to take Debezium and its connectors and integrate them into any kind of Java or JVM based application. This is also what is driving Debezium server. It's also what many integrators of Debezium into external systems use. For instance, for Apache Flink, there is the Flink CDC project. Flink CDC to a large part is based on Debezium engine. That way you can ingest change events out of those Debezium connectors without even having to go through Kafka or any other streaming platform. It would be ingested straight into Flink. There's all kinds of other integrators of the Debezium embedded engine. You could use it in your own application. Let's say you're working on a Spring Boot application, or you're working on a Quarkus application, you could use the embedded engine, and essentially, there would just be a callback method which you register, and then whenever a change event arrives from database, this callback method would be invoked, and you will be free to do whatever you would like to do with this change event. That's different deployment options for Debezium. Again, most of the times people use it with Kafka, but there's quite a few other options, which give you much more flexibility if you need it.

Debezium CloudEvents

Then let me talk a little bit about CloudEvents, because I think that's also a lesser-known feature. It deserves more attention. We have seen this change event format before. That's like a bespoke format. Maybe you are really working in a very wide ecosystem of companies, or maybe just distributed teams in your organization. For them, it would make sense to align on one envelope for all your message or event payloads. Let's say, a CDC, Change Data Capture, that's just one kind of event source which we have in this architecture. Maybe you also ingest IoT data, maybe you have other sources of real-time data flowing through this streaming platform. In that scenario, it makes sense to align on a standardized event envelope. This is essentially what CloudEvents is providing you. That's a standard developed by the CNCF, the Cloud Native Compute Foundation, which defines a specification for describing event data in a common way. That's diverse from their spec. Essentially, what they aim for is to give you this unified event format, which you see here, which has well-defined specified attributes like, what's the source? In this case, it's Debezium Postgres connector. What's the type? That's a Postgres change event. Time. What's the content type? JSON, and so on. Now having those well-defined attributes, this allows you to implement your consumers in a homogeneous way, working with all kinds of different event sources. It makes it very easy for them to reason about the type of the event, filter out events, and so on.

Of course, now the actual payload which we see here in this data part, that's the core part of the Debezium change event. With the before and after part, in this case, the before part, it's now because it's an insert event, and so that's the actual payload of the event. Then you have all those attributes which are prefixed with I/O Debezium. Those are those metadata attributes with the timestamp information, the connecter version, and so on, allowing your consumers to handle those events. Essentially, all this just helps you to implement your consumers in a consistent way to have portability across different platforms, all this stuff. CloudEvents, it's important as a message converter in Debezium. It's very easy for you to use if you are in this environment.

Correlating Events from the Same Transaction

Next, there's a very interesting feature I wanted to briefly mention, and this is the support for what we call transaction metadata. Transaction metadata, that's essentially about giving you the tools to correlate events which originate from one and the same transaction. Very often, that's a use case people have. Let's say, you are working on some domain-driven design application, and there, your aggregate manifests in multiple tables in a relational database. Whenever there's a change to this aggregate structure, you would like to combine the events and only emit them to downstream consumers once all the events have arrived for this aggregate and a particular transaction. This is exactly what this transaction metadata events allow you. If you enable this, then all your actual change events as we see them here on the left-hand side, they will have this transaction block where we essentially tell you, this is the transaction ID. Then, also, what's the order number of this particular event type, like a global order number within the transaction, so they are ordered. Then you also get on another topic, you get those messages which tell you a transaction has started, or a transaction has been completed.

In the completed case, we also tell you how many events there are per table type, or essentially, per topic, if you were to break it down to Kafka terms. This allows you to implement some buffering logic, where you can essentially await all the events originating from the same transaction. Only once you're sure you have consumed all those events, then you would go and emit them to your downstream consumers. One of the users of this is actually Stripe. They did a very interesting presentation at a conference last year, Flink Forward, where they used this transaction metadata support for exactly doing this, buffering the events from one transaction and emitting them once they have received all the events from one transaction. I very much recommend to check out this talk. I think it's a super interesting capability which Debezium provides, and which isn't as widely used yet.

The Outbox Pattern

That being said about Debezium itself, Change Data Capture itself, let's talk a little bit more in depth about some of the use cases in particular in the context of microservices. The first one I would like to talk about a little bit is the outbox pattern. The outbox pattern, when would you want to use it, or what's the challenge you often face? Typically, if you are in a microservices environment or scenario, then very often it happens, you receive, let's say, a web request. What needs to happen is you need to update your own database. Let's say a request comes in to place a purchase order, you need to update your own database. At the same time, you would like to send out a message to external services, maybe a shipment service, which should be notified so that this shipment can be built. Typically, you would use something like Kafka for that. Now the problem is updating your own local database and sending a message to Kafka. Those two things, they cannot happen as part of one shared transaction, because there is just no means of distributed transactions which would also involve Kafka. Essentially, that's what we call dual writes, trying to update those two resources, a services database and Kafka without shared transactional semantics. It's just prone to inconsistencies because one of the operations might fail, the other might succeed, and then you end up in an overall inconsistent state. That's definitely something which you want to avoid.

The idea for the outbox pattern, essentially, as well, if we can't update multiple resources, we can always update a single resource. The idea is, our application only writes to its database. In this case, it would update let's say, its order and order line tables. Then at the same time, it would insert a message into what's called the outbox table. This outbox table, only this one then will be captured. Essentially, it's a message which you send there. This is an externally facing contract. Now you don't have this issue with those dual writes because writing to your business tables, and the outbox table, this happens as part of one local transaction. Either both things happen, or the entire transaction would be rolled back and nothing gets committed. That's the idea.

In terms of how does this outbox table look like? Again, coming back to the ideas of domain driven design, you could have columns there, like an aggregate type, which defines what is the type of my aggregate. This is about a purchase order or a customer. You could then, for instance, use this information. Debezium provides you with support for that, for routing all the events for the same aggregate type to one topic within Kafka. You would have something there, like an aggregate ID, which defines in terms of Kafka, the message key, making sure that all the events which pertain to one aggregate, that they all end up in the same transaction. You could have something like event type to further discriminate different types of events, maybe something like order created, order line changed, order deleted, and so on. For instance, consumers could have different event handlers come in. Then, the actual payload itself, this is now what you want to send to your consumer. This could be a JSON structure, it could be Avro, really what you want to have there.

What's important to keep in mind is it's an external user facing contract. That's a contract between your source application and your downstream consumer. You should evolve it with a very strong consideration of backwards compatibility in mind, so not to break those consumers. Having this bespoke contract also, of course, abstracts from your internal models. One concern some users have with CDC sometimes is it exposes to internal table schema, which would be right if you were to capture the order and the order line tables themselves. Now as we only here capture this outbox table, we don't have this concern any longer. Coming back to Debezium, this all is integrated with distributed tracing. Right now, it's still OpenTracing, but there's work happening to move it all to OpenTelemetry, which then allows you, for instance, to use tools like Jaeger to examine and analyze this distributed messaging flow. You would have deep insight, you would learn where there's slowness. You could, for instance, do performance analysis, all those stuff, again, deeply integrated with Debezium.

There's one interesting variation of this pattern, which I would like to mention for Postgres. Most of the times this outbox pattern is implemented using a bespoke table. Actually, in the case of Postgres, we don't even have to use a table, we also could use what's called logical decoding messages. Logical decoding messages essentially, are just messages which are just written to the WAL. The write ahead log in Postgres, the transaction log there. Now this means we don't even have to write into a table. For instance, we also don't need to think about deleting an outbox message from this table once it has been sent. Also, we would be sure that a message will never be updated, because there should be an append only outbox of messages, really. All this cannot happen if we use this pg_logical_emit_message function. The way it works is, you essentially just say, should it be transactional? Meaning, does it participate in transactions? Yes or no. This is what we want to do. There's this prefix notion. In this case, we say this is an outbox message, which just is used to group different logical decoding messages. Then again, we have our payload, which pretty much would be the same as we have seen for the outbox table structure on the slide before, but in this case, it will never materialize in a table, it will only be written to the WAL. Still, Debezium can be used to extract those change events and propagate them to external consumers. I feel that's a very interesting variation, very smart way of implementing the outbox pattern.

CDC-Based Strangler Fig Pattern

Then let's talk a little bit more about the scenario where you would like to move to microservices. Maybe you are still in the world of monolithic application architecture, and now you would like to move to microservices. The challenge there is, this should happen in a gradual fashion. You don't want to do a big-bang migration, just because it's too risky. What you want to do is you want to take parts of the monolith step by step and extract them gradually into new bespoke microservices. Let's see how Change Data Capture can help us a little bit with that. Let's assume for the sake of the example, we have this monolithic application. Maybe it's still built using an old Spring version, an old Java version. We would like to extract those components there into microservices. I have components there because you oftentimes will think about monoliths as like one big ball of mud. Still, what I've realized or what I've often observed as well, is actually they have a structure. There's different components. They're actually quite well organized. It's just part of one large deployment monolith. Let's take those components and move them over into microservices.

What's this strangler fig pattern? The name was coined by Martin Fowler, who saw the strangler fig plant, which wraps an old tree and strangles it until it dies off. This reminded me of this pattern, apparently. What we would do there is we would put a proxy component in front of it, so it could be something like NGINX, or Envoy, or whatever. First of all, all the requests to this application would go to this proxy. We would start and we would set up a CDC process. In this case, it's a MySQL database, so we use the MySQL Debezium connector, put the data into Kafka. Maybe we want to also change the database of our new service, and maybe this should use MongoDB. We would use the MongoDB sync connector to put the data into this MongoDB database. Then we could have another new service. Let's say, we have this owners components. That's from a pet store scenario. When I have this owners component, and move this into an owners service, or maybe we change also this tech, so maybe that's built using Quarkus Java 11, maybe Java 17. That's a new thing now. First of all, we would say all the read requests, they would be served by this new microservice. Maybe there's just specific views in this application, and we would like to serve them from this new microservice. We would have to set up the proxy components so that it routes all the reads which pertain to owners to the new microservice. Everything else, including the writes for the owners, they will still go to the old monolith. Whenever changes are happening, they are propagated using CDC, to the new services database so that we can serve those requests.

Then, we can continue, we could say, at some point, everything which pertains to pet owners, this is now managed by this new microservice. Also, the writes, they are managed by the new microservice, and all the other parts of our domain, they are still handled by the monolith. This essentially would allow us already to take one component and extract it. What could happen is maybe we still have dependencies in the monolith, which pertain to pet owners, so we need to have pet owners data still in the monolithic database. You could realize that we also can use CDC again, to take this data back from the microservice and propagate it back into the monolith. We could do it both ways. What we just need to realize is we should always have one side which is in charge of a particular part of our domain. Owners should be either managed, which means, it should manage the write side of things. It should either be this microservice, or it should be the monolith. We shouldn't have writes to the same part of the domain in both components, in both services, because then we would essentially end up circulating the same changes, potentially, forever, forth, and back. That's not something which we want to do. There's a demo repo, https://bit.ly//ff21-sfp, which you can check out, which shows you all this in action, and you can follow along if you want to do it.

Benefits

What we gain by this, is we can do with this gradual approach. We can take component by component. We would always update this routing logic in front of it. Essentially, we would be able to do this migration step by step. We could pause it even. Maybe we could say we want to continue later on. Those two things could coexist for quite a while. Or maybe we even could say, that's a desired state. Maybe we are happy just having extracted the owners component into its service and maybe another one. Then we want to leave the rest in the monolith by having this proxy in front. That's something which we can do. All this is really done to minimize the risk and to be sure our business can continue to function.

CDC Pipeline Considerations

There are some questions which you might ask yourself at this point, in particular, aren't we exposing the internals of our old monolithic database? Coming a little bit back to the outbox pattern. That's a valid concern. Also, maybe we just don't want to have this one-to-one replication of our tables. Maybe if we are in MongoDB, we want to do something like putting multiple data structures into a single document and nest that, which we could do in this documents store. Let's see how we could go about those things. One means of doing that is what's called Kafka Connect single message transformation. Essentially, they are there for modifying or routing and changing the single messages. Single change events in our case. We could use such an SMT, for instance, to adjust the structure of our data. Maybe we have like old legacy column names, which we want to rename, or maybe some of your data formats, maybe some of your data representations like Booleans, and so on. We could use such an SMT to shield our newly extracted microservices from all those oddities in the monolithic database and provide a clean data contract. The same, if you change schemas while all this is running. Maybe we rename a column in the monolithic database, we could use such an SMT to shield the consumers from such a schema change, because we could, for instance, just add back the field using the old name and having the field within change events twice using the old and the new name. There's quite a few things also in regards to schema changes, which we can do with those message transformations.

Then, they only go so far, because they only work on a single message, on a specific message. Coming back to this nesting example, this is not enough. Maybe we want to actually take multiple table streams, and merge them into a single document. This is where we need something more powerful, something more capable, stateful stream processing, for instance, in the form of Kafka Streams, or Apache Flink, which seems to establish itself as the dominating solution in the stream processing space. We could use Apache Flink, and for instance, implement this adjoining logic. Let's say we have owners and pets, and they are stored within two tables in the monolith. This means we would have two CDC topics, one topic with owners, one topic with pets. What we actually would like to have is a nested structure shown here in JSON for the sake of the example, which contains the data of one owner and the data of all their pets aggregated into a single document. This should be written back to an owner's with pets topic, which we then could take and ingest into MongoDB.

Flink provides us with different means for doing that. One way, for instance, could be Flink SQL, as we see it here, where essentially, we just use a few custom functions, which I'm not showing here, to take the data from those two topics, join them based on the owners ID. Then essentially aggregate a group and aggregate the structure data per owners, so we have all the changes from one owner in a single data structure. This then we insert into another Kafka topic. Or let's say, in this case, first of all, in the Flink SQL table, which then could be taken into a Kafka topic, or which could even be written directly to MongoDB using a Flink connector for MongoDB. The bottom line of all this being is just by means of having this declarative logic expressed in SQL, we are capable of expressing this rather complex joining, grouping, and aggregating logic. There's definitely way more which could be said around Flink, but I hope it gives you a glimpse of what could be possible.

Challenges - Capturing Intent

Having spoken about CDC, and two use cases, the outbox pattern, and the strangler fig pattern. Let's talk a little bit about some of the challenges which you might encounter when using CDC solutions like Debezium, because it's a powerful concept, a powerful tool. Where there's great power, oftentimes there's great responsibility, but also, sometimes you need to be careful with how you're employing those tools. The first thing would be, how do we actually go about capturing intent? This means, if we apply Change Data Capture to the tables of our business application, what we typically are missing is metadata, which tells us what's the business user who did a specific change, or maybe what's the client device configuration, what's the client IP address, something like a use case identifier. All this metadata, which you typically don't store within the tables of your application itself, which means Debezium or any other CDC solution for that matter, would not be able to extract that metadata. Still, you would like to have this metadata as part of your change events. Again, stateful stream processing, for instance, with Apache Flink, provides us with an interesting way for capturing that metadata and enriching our change events with metadata. This is how it works. This again, uses this nice function in Postgres, a pg_logical_emit_message. We could also, of course, implement another form of this, and we could just insert that metadata into a bespoke table. In Postgres, logical decoding messages are a very interesting way of doing this. Here, we, again, would use this function. At the beginning of each transaction, we would write a logical decoding message with the metadata we are after, into our transaction log. Here, we would have something like client date, use case identifier, user name, but really, this is up to you what you would like to have. This is the first message which gets written to the transaction log for each transaction.

Then the idea as well, we would like to use stateful stream processing to take this event and put it into all the subsequent change events, which are originating from the same transaction. To give you an example, here, let's say we have this first transaction, there's a beginning event, then this first message which captures the metadata. We have that stored. Then we have two actual data changes, two inserts in this case, and then this transaction gets committed. We would use Debezium. We would use Flink CDC, which takes Debezium and integrates it into the Flink universe. We would then propagate this metadata into each of the change event. Then we wouldn't have an event anymore for just this metadata. This would be part of the actual change events themselves.

Just to give you an idea how this could look like. Before that, I showed you how you can implement this stream processing logic in a declarative way using Flink SQL. There's other means for implementing stream processing in Flink, for instance, in this case, this data streams API, which allows you to use Java, or Scala, or Python, or other kinds of languages for implementing more at once logic, which you maybe cannot express in SQL by itself. In this case, it would give you this API where we have like operators for mapping your data, filtering your data. Also, for instance, this flatMap operator. This means essentially, for each event, which is coming in to our stream processor, we would emit either non-event at all, or one or even more events. This is exactly what we want to do. Because if we receive such a logical decoding message, for instance, we don't want to emit this right away, we just want to capture this audit metadata which was inserted. We want to put this into a state store. This is what's happening here on the left-hand side. If the operation type equals m, so that's the logical decoding message, then we would take the metadata and put it into an internal state store, so we remembered it.

Now, on the right-hand side, what you see there is now if an actual change event comes in, then we would go to our state store, and we would check, is this the same transaction still? Because there might also be events which didn't receive any metadata in their transaction, so we would just omit them unenriched. In the case, with the metadata which we have in this datastore, it's from the current transaction, then we will retrieve it from the state store and put it into our change events, and then send them out to whatever is on the sync side of the stream. Typically, it would be a Kafka topic, so that our consumers could then consume from the Kafka topic, those enriched change events.

Challenge - Snapshotting

Another challenge, this is about snapshotting. Debezium and CDC or, in particular, log-based CDC, this is about extracting change events from a transaction log. The problem there is very often we don't have the transaction logs from a long time ago, because the database, essentially it uses transaction logs for its own purposes for replication, or for transaction recovery. Typically, once it figures it doesn't need those transaction logs any longer, then it would discard them. This means if you set up such a new CDC pipeline, we wouldn't have the transaction logs from six months ago. Still, we would oftentimes start with one complete snapshot of our data. We want to do a snapshot or a backfill of our data. In the simplest case, this is how it could look like. This is how it essentially works in Postgres. There, we create what's called the replication slot. This means that's our handler for getting changes out of the transaction log. Say, we want to export a snapshot right at this offset in the transaction log. This exported snapshot allows us, essentially, to run a separate read transaction right at this snapshot offset. Within this transaction, we would scan all the tables, which there are, and essentially, it emits something which looks like an insert event. Once the snapshot is done, we would commit this read transaction. We would have an entire version of the data in the change event stream. Then we would continue to read from this replication slot right at this offset where the snapshot was taken. That's a very effective means of implementing such an initial backfill logic.

There's a few problems, which are coming with that, and which we also got reported in the Debezium project again. For instance, it meant you couldn't update the filter lists. In Debezium, we can say, I want to just capture those 10 out of my 100 tables. Now maybe you say, there's another table, which I also want to capture, and there was no good way for doing that. Also, such a snapshot can run for quite a long time. It can take hours if you have millions of table rows to complete such a snapshot, and this was one atomic operation, you couldn't pause it. This meant, for whatever reason, you had to restart this connector, or maybe there was a failure, then it had to redo the entire snapshot, although maybe it already was done by 90%. Still, you had to redo the entire snapshot. Also, it meant you couldn't begin to read changes from the transaction log while this snapshot was still running. Quite a few shortcomings in this classic snapshotting approach.

This is when we learned about this very interesting approach implemented by the project, DBLog. DBLog is a internal CDC project by Netflix, by those two engineers, Andreas Andreakis, and Ioannis Papapanagiotou. They implemented a bespoke CDC solution at Netflix, and they came up with a very interesting snapshotting approach. Thankfully, they wrote a research paper about this watermark-based snapshotting approach, which allowed us to also implement this in Debezium. This is essentially how it looks like. The idea there is, instead of doing those two things, essentially after each other, first doing the snapshot, then doing the reads from the transaction log, those two things happen simultaneously, and then, interweaved away. There's some deduplication logic in place, which allows us to not emit duplicated events from the snapshot, and from what we retrieve from the transaction log. That's the basic idea.

To make it a bit more specific, this is how it works. First of all, we don't have a single transaction for reading, or a single select for reading all the data from a table. Instead, we do this in chunks. Here, let's say we step through this customer table in chunks of 1000 rows each time. It's configurable: could be more, could be less. Let's say we do 1000 customers at a time. What we also do is we insert watermarks into the transaction log. Essentially, whenever one of those snapshotting transaction runs, the process is that first of all, we insert what's called the low watermark into the transaction log. Of course, then if we read from the transaction log, we will receive this mocking event. We insert the low watermark, then we run this next snapshot chunk transaction or selection, so let's say the next 1000 customers. Then we insert a high watermark. This concludes this chunk of data. This is what's happening step by step. Now, those watermarks, they allow us to correlate a specific snapshot, select chunk, with a part which we read from the transaction log.

Let's look a little bit closer at one of those chunk processing steps. Here we have our low watermark, then a few operations as they were happening in the database: so an update, an insert, an update, delete, few more updates, and then the high watermark. At the same time, we have the result set from this snapshot select, or this particular chunk. What happens is that, essentially all this gets buffered in memory. Once we receive this high watermark, we apply this deduplication step. The logic there is, whatever we retrieve for this chunk from the transaction log, this will take precedence over what we read from the snapshot select. In this case, for the key 1, we don't take the read event from the snapshot chunk because there was a delete event for that key. Same for key 2, there were updates for that key. Whatever is in the transaction log for this chunk, it will be taken from there. Only for the key number 3, and 5 and 6, they were not modified during this period, then we will backfill that data from this snapshot select. This essentially means on a high level, we stepped through the data in our database with those chunk selects. At the same time, we continue to retrieve data from the transaction log, and we give precedence to what we retrieve from the transaction log. Very importantly, this means that there's a few differences to the behavior and to the semantics you would get from a from a classic snapshot. In particular, you wouldn't be guaranteed to get snapshot events or read events for all the data which you're capturing. It could also be update events from the transaction log, or it could even be delete events if a record was deleted during that time. For instance, it could also happen that you receive an update event, and you didn't get an insert event before that, because that's just what you got from what we read from the transaction log.

What is guaranteed, and that's what you really want to do, or what you want to have, is once this entire logic is completed, once a snapshot is completed, you have a complete set of your data in your streaming platform, in Kafka or wherever you take the data. That's guaranteed to you. This is a very powerful tool. It addresses all those shortcomings which I touched on before. For instance, now we can update our filter list or we can re-bootstrap one particular table, because those snapshots, they can be run in an ad hoc fashion. The way it works in Debezium is you interact with the connector using what we call a signal table. You insert into the signal table a command where you say, I want to do a snapshot for this table, inventory orders. Then this will run this watermark-based snapshot just for this table. You could read snapshot specific tables, maybe you lost a Kafka topic. Also, it means we keep track of how far have we gotten in this snapshotting process. Dose snapshot chunk select? We essentially store the IDs at the boundaries in the connector offsets, which means if you restart the connector, or you stop the connector and pause it, then you restart it, then we will be able to continue the snapshot from that particular point in time. Also, it means, and this is the gist of it, streaming and snapshotting happens at the same time. It means you don't have to wait for a multi-hour snapshot to complete before receiving the latest changes from your database, you will start to receive changes from the database happening right now. That's a very interesting concept. I definitely would recommend you to read this research paper by the Netflix guys. I think that's a huge improvement. I'm really excited about this means of implementing backfills for CDC systems.

Challenges - WAL Growth

The last challenge I wanted to mention, it's about WAL Growth, write ahead log growth in Postgres. This is a little bit of an oddity in Postgres. The challenge there is, in Postgres, you can have essentially, a physical database host, so like a machine or VM. Then there can be multiple logical databases on this host. In this case, I have database 1 and database 2 on the same Postgres host. The problem there is, the WAL, the write ahead log, that's global. That's shared between those two logical tables. Whereas the logical replication slot, which is our handle for getting changes out of a database, this is specific to one of those logical databases. We could have a replication slot, either for database 1, or for database 2. A challenge can arise if there is differences in terms of how frequently are changes done to those databases. Maybe on database 1, we do many changes, and on database 2, we don't do any changes at all. This means if we have a replication slot for database 2, then we will not be able to advance this replication slot because we will never receive changes from the database. It also means we will never be able to acknowledge this is the latest offset we have consumed because we never consumed any offsets. This means, as the WAL is shared that this replication slot retains larger chunks of the transaction log. Here, in this case, you have this nice query where you can essentially take the offset where the replication slot is at. You can compare this to the current position in the transaction log. You would see this retained WAL gets bigger, because we have this replication slot on this low traffic database, and it has no way for making any progress because no changes are coming in. That's a challenge. At the end of the day, your database might run out of disk space, unless you delete this replication slot. Then of course, the connector state would be messed, definitely.

One solution is to artificially induce changes into this low traffic database. This is what Debezium allows you to do, like this feature of heartbeat action queries. Again, it's a very nice use case for those logical decoding messages, because there we could just emit messages into the WAL, and for this particular database, just some heartbeat record. Now then this would allow this replication slot for this database to make progress, and we wouldn't have this issue. It's a rather peculiar or specific issue which you have with Postgres, but it can happen quite easily. Definitely something to keep in mind if you have this scenario with a high traffic and a low traffic logical database on the same host.

Takeaways

The fresher our data is, the more actionable it is, the more valuable it is. If you build dashboards, if you want streaming queries against our data, all this needs to happen on fresh data. Otherwise, the insights we gain from that are just not as valuable. If it comes to processing data from a database, then log-based CDC as implemented with Debezium. It's a very powerful means for giving you fresh data, for giving you up to date data. Then you could, of course, use tools like Kafka Streams, or Apache Flink for processing those change event streams and combine them, filter them, map them, doing aggregated operations, do windowed analytics, all this stuff. Really taking it to the next level.

 

See more presentations with transcripts

 

Recorded at:

Oct 04, 2023

BT