BT

Facilitating the Spread of Knowledge and Innovation in Professional Software Development

Write for InfoQ

Topics

Choose your language

InfoQ Homepage Presentations Change Data Capture for Distributed Databases @Netflix

Change Data Capture for Distributed Databases @Netflix

Bookmarks
19:45

Summary

Raghuram Onti Srinivasan covers the challenges associated with capturing CDC events from Cassandra, discussing the Flink ecosystem and the use of RocksDB.

Bio

Raghuram Onti Srinivasan is a senior software engineer at Netflix where he works on Data Integrations Platform. He has contributed a distributed locking mechanism to Dynomite.

About the conference

QCon Plus is a virtual conference for senior software engineers and architects that covers the trends, best practices, and solutions leveraged by the world's most innovative software organizations.

Transcript

Srinivasan: At Netflix, we have microservices using different kinds of databases based on the capabilities of each database. When a new movie is added, we have the information written to a Cassandra database, which would then be required by the UI, but needs to be accessed with Elasticsearch, or we might need this information for analytics and we will need to use a database like Iceberg, which is a data warehouse solution. We'll focus on the challenges involved in keeping the data synced from Cassandra, which is a distributed NoSQL database, to other databases using Change Data Capture.

I'm Raghu. I'm a software engineer at Netflix, working on our data integrations platform.

Databases

The data sync problem, which we saw just now, is not specific to Netflix. Applications end up requiring varying kinds of access patterns for the use cases they support. Due to this, we have different databases with trade-offs resulting from their architectures. What are some of the access patterns? For example, you could use Apache Cassandra, which is known for having high availability and high write throughput workloads, because it uses memtables and commit logs while providing eventual consistency, but doesn't provide APIs for search. For that, you could use Elasticsearch, which can provide those query patterns, but cannot handle analytical queries, and we would need a data warehouse solution like Hive or Iceberg.

Solutions

What are some solutions to these problems? We could have the writes go to multiple databases for every write, or perform every write as a distributed transaction. With dual writes, some issues will be with regarding to reconciling only one of the writes succeed. Distributed transactions need management outside of the database, and that is a hard problem to tackle. We could have a separate process, which takes care of just extracting the data from the source, optionally transforming it, and then loading it into a sync database. These processes would need to have verifiers to make sure that there are no bugs in the systems, and that the source and sync database always have the same information.

What Is CDC?

Where can CDC come into the picture with this? We can look at this as a solution for this problem, and we'll see how that is the case. For a particular database, you could be writing to it from a shell client or from an application. The changes to a database could be coming in from different systems. We're mainly concerned about inserts, updates, and deletes which we'll term as mutations.

Stream of Mutations

When we convert these changes at a database to a stream of events, we have Change Data Capture, or CDC. CDC involves capturing data which gets written to a database and sending it to downstream consumers as a stream of events. Why would we need this? Let's take a look at the previous example. Here, we have an application, which has a backend service with a UI. The backend service has business logic and maintains state in a robust database like Cassandra. The UI needs a more flexible database like Elasticsearch to perform text search, and have search indices based on different fields to support user requests. We can solve the data sync problem between these two databases using CDC. Here we see another example where we need the data in Cassandra and an analytics oriented data warehouse solution like Iceberg or Hive. By having a Change Data Capture stream from Cassandra being written into the data warehouse solution, we simplify the problem.

Let's look at how CDC operates within Cassandra. Cassandra provides high availability guarantees by having three copies of data on different nodes in a cluster. Since writes to memtables and commit logs are extremely fast, we can have good performance by the database, despite the replication. What are the challenges in such a system? The data is partitioned so multiple nodes have to emit the change stream. Data is replicated, which means the same change will be emitted by multiple nodes. The system is active-active, which is to say that there is no single authoritative node to get the change stream, unlike databases like MySQL, or Postgres. When multiple replicas are emitting this change stream, there is no ordering guarantee in the resultant stream. The stream is also unfiltered, which is to say that all changes happening in the cluster are part of the stream, irrespective of which column family they might belong to, or what the columns would be which you're changing. Cassandra also only provides partial changes, which is to say that updates and deletes contain only affected columns and keys, but not details about the full impact. A change stream with these properties ends up requiring downstream systems which consume this to make sure that they are deduplicated and have ordering somehow, and filter them according to their requirements. This is an issue with the open source Debezium Cassandra connector also. We'll see how we can address these problems.

Change Stream Properties

What are the ideal properties of a change stream? You will mainly want it to be unique, so there are no duplicate changes. Ordered. The changes that are emitted, they are emitted in the order that they got committed in the source database. They have full row images. All columns, including pre and post images of a row are emitted in the stream. It should be filtered, so that there is a change stream per database per table and specific columns might need to be filtered out for security requirements, where the columns could contain personally identifiable information, which should not be leaving the boundaries of the database cluster.

Possible Solutions

What are some possible solutions which we can look at for this particular problem? The Cassandra clusters at Netflix have incremental backups where SSTables from the cluster are written to S3 at regular intervals. The backup system is robust but this approach could take minutes to hours to emit changes. It's not near real-time, which is one of our requirements. We could use audit loggers or full query loggers, which would end up severely degrading the cluster performance and require changes to Cassandra. We also need to recreate ordering outside of Cassandra. We could perform CDC in the coordinator only, but this has issues with having availability and continuation of the change stream when the coordinator goes down and there's a newly elected node which becomes the coordinator. CDC could be performed in an additional copy where basically we are saying if the replication factor is three, we have a fourth copy, which would be in the asynchronous write path and not on the read path for the Cassandra cluster. This would make sure that this change stream has all of the previous properties which we want, but the complexity of this implementation would require major changes into Cassandra.

Solution - System Design

Our solution entails instead an intermediate system, which is used to hold information about the data so that we can solve these challenges. Our solution also improves upon the solution for this, which is developed by Yelp, where they talk about streaming Cassandra into Kafka. Here, we can see what the internal architecture of our connector would look like. The mutations are read from the commit log of Cassandra, and written to a Kafka topic. It's important to note that the Flink application and the Cassandra cluster need to have the same versions of the Cassandra storage engine to be able to read objects correctly, since we make use of this to generate the correct full row image. We will see how we can use a KeyValueStore within Flink to store data and make sure that the Cassandra output which is unordered, has duplicates, has only partial updates, and is unfiltered, can be converted to one which is ordered, dedupe created, has full row images, and is filtered.

System Properties

What are some of the properties of the system which make it ideal for us? The output of the Cassandra cluster ends up being a Kafka topic with n partitions. We then have workers which process events on each partition. Having this intermediary Kafka topic decouples the availability of the consuming system. This also ensures that the CDC solution has minimal impact on the Cassandra clusters performance. We have a linearizable datastore to retrieve the most recent information, which is stored for a particular partition key, if present. Apply the incoming change, and write that back into the datastore. This helps us achieve the same ordering which is given by Cassandra for its clients. We can also scale the system independently from the Cassandra cluster to account for changes in the stream rate. We would like it if this stream could be open sourced easily also.

Apache Flink

Apache Flink is a streaming system which acts as a data plane in our solution. The major components here include the input Kafka topic, the Flink application which reads from this. The application itself maintains state, which in our case is RocksDB, which is an ephemeral in-memory datastore. We can process incoming data from the Kafka input and emit it to an output stream here. The Flink application is able to process the stream of data and recover from failures by having the stored application state and Kafka offsets, checkpointed at regular intervals. We can also scale this Flink application independently of the Cassandra cluster. We will see how this intermediary state is leveraged to provide a CDC solution for Cassandra.

Example - Create

Let's look at this with an example. Let us say there is a new movie being written into the database with a movie ID, 842, for Fargo, and the status is private. The sidecar would read this mutation and write it to the CDC Kafka topic. This would then end up being read by the Flink application and written into the RocksDB store with the partition key and the mutation information. As this is a fresh show, it would have all of the information for that particular row. The second Kafka topic then gets a record for that row with a Create operation. As you can see here, we maintain a fourth copy of the data outside of Cassandra. Let's see why this would be required.

Update

Let's update that row to have different information in the columns. We want to change the status of Fargo from private to public. When we make that update in Cassandra, the sidecar emits that change to the CDC Kafka topic. It is important to note that the change in the Kafka topic only has to change columns but not all of the row information. Specifically, we are missing the information that Fargo is part of the store. When this is sent to the Flink application, we have the previous mutation which came for this particular partition key, and we can reconcile that with the incoming event and make sure that we have a full row image generated in this datastore. This is then emitted as an update for that particular row into the Source Connector Kafka.

Delete

Let's look at how a delete would happen. If we end up deleting that same row from Cassandra, we store a delete for that particular partition key in Cassandra as a tombstone. Cassandra would end up removing this tombstone after GC, 3 seconds. The sidecar would serialize this mutation, and we would store that particular delete within the datastore here and emit the delete information to the Source Connector Kafka. We also, at the same time, store a timer within Flink state to make sure that we remove this partition key from the map, if there are no future updates, which come for this particular partition key with a higher timestamp. This way, we can make sure that the data grows and shrinks correctly, according to whatever is happening on Cassandra.

Unique and Ordered

How do we make sure we can guarantee uniqueness and ordering in our output stream? Unique mutations in the Source Connector Kafka stream can be achieved by checking that we have already processed this particular information from a previous replica. When the second replica ends up emitting this same information, we do not send the row update to the Source Connector Kafka. Ordering is guaranteed by making the CDC Kafka topic a key topic where the key is the partition key of the change. This way, all changes for a particular partition key arrive at the same consumer in Flink. Because we have the same storage engine, we can recreate order for these changes correctly. There is a cost to be paid for the ordering, we would need to deserialize the mutation to know which partition key to use, and serialize it back before writing it to the CDC Kafka topic.

Schema Changes

What happens when the schema for column family changes? The Cassandra storage engine in Flink needs to have the right schema information in order to deserialize the mutation correctly. For this, we listen for schema changes by registering with the Cassandra cluster to make sure we have this up to date information.

Filtering

How do we make sure we have filtering of the output Change Data Capture stream? This is required because the original CDC stream from Cassandra will have changes from all of the column families or tables for that particular cluster. We would normally want this as separate streams and with specific fields. We can perform this filtering at the Flink application and generate messages to multiple Kafka topics, one per column family.

Challenges

What are some challenges with this solution? We obviously need to keep an additional copy of the data, so if your replication factor is already three for Cassandra resiliency, then you have four copies. This added cost is on both infrastructure and maintenance. What happens when the write throughput on Cassandra is higher than what Kafka can safely handle? We could drop the stream to keep Cassandra available. This would mean we need to backfill from SSTables at scale. We would also need to make sure that the Flink system and Cassandra cluster which is emitting changes are the same version of the storage engine, which is also an operational overhead.

Next Steps

Here, we have seen how we can get an eventually consistent change stream from an active-active setup database like Cassandra. We have a stream which is unique, ordered, full rows, and filtered. Our solution could be extended to support other databases, which have architectures similar to Cassandra, where CDC is not supported out of the box. There are still a lot of questions left with some solutions in the horizon. For example, how can we make sure we have a schema attached to the output stream and make schema evolution possible? How do we scale this to handle data at the scale at which Netflix systems operate? How does this operate in prod? What are some operational challenges for managing this infrastructure at scale? Stay tuned for answers to all these questions and more in a future blog post.

 

See more presentations with transcripts

 

Recorded at:

May 09, 2021

BT