Transcript
I'm Joy [Gao]. I'm a senior software engineer at WePay. If you haven't heard about WePay, we provide payment solutions for platform businesses through our API. For this talk, I'm going to be talking about database streaming. We live in a world where we expect everything to be streamed, like our music is streamed. Our TV shows are streamed. I want to argue that the data in your data warehouses should not be considered as second class citizen. We should allow everything to be streamed in real-time so we can access these data as soon as they arrive into the database.
This talk is about our journey at WePay going from an ETL data pipeline into a streaming based real-time pipeline. The talk is going to be broken down into three sections. We're first going to go over what our current ETL or our previous ETL process looked like and what are some of the pain points that we're going through. We're also going to introduce change data capture, which is the mechanism that we use to stream data from our database. Next, we're going to take a look at a real-world example which is how we're actually streaming data from MySQL into our data warehouse. And finally, we are going to go a little experimental and take a look at some of the ongoing work we're doing with streaming Cassandra into BigQuery which is our data warehouse as I mentioned.
The Beauty of Change Data Capture
Let's get started. At WePay, we use BigQuery. For those of you who are in the AWS lens, this is the equivalent of RedShift. It's basically the Google's cloud data warehouse. It uses ANSI-compliant SQL as its core language which makes it really easy for developers and engineers to pick up. It supports nested and repeated data structures for things like lists or structs and even geospatial data types, which is actually something very useful for CDC as you will see later on. And it has a virtual view feature which you can create views on top of the base tables. And because these views are now materialized, when you're querying the view, you're essentially querying the underlying table. And this will allow you to access real-time data even through views. And that's another feature that we're leveraging very heavily at WePay for our streaming pipeline which we'll also going into later on.
So at WePay, we use a microservice architecture. Most of our microservices are stateful and the states are typically stored into a MySQL database. And we use Airflow as a tool to orchestrate our data pipelines. For anyone who hasn't heard about Airflow, you can think of it as cron on steroids, that's the line for data pipelines and complex workflow. And the way we're using Airflow is basically by periodically pulling the MySQL database for a change. The way we detect these changes is by looking at the modified time column in each table, and if the modified time has been changed in the most recent interval, we upload that information into BigQuery. It's pretty standard.
With this approach though, we're starting to hit a lot of limitations and operational overhead. So the first problem, which ties back to the talk from the introduction, is that it has very high latency. The data won't actually arrive into BigQuery until much later. Some of our jobs, we try to push the limit to once every 15 minutes, so the job runs in 15-minute intervals. But then we're getting to this inconsistency where an analyst that may be trying to do a join in BigQuery, and one of the tables is being uploaded on an hourly or daily basis, and another table is being uploaded every 15 minutes, and then the data becomes inconsistent, so it's like, why is it not in this other table but it's here?
The second problem is that because the way we use Airflow, is that we're creating one job for every single table. In Airflow, a job is called a DAG, or a directed acyclic graph. So we have basically hundreds of DAGs. Each of them is responsible for a table. And this is a whole lot of operational or configurations as well as overhead when it comes to monitoring. So it's not quite ideal. Another problem is hard delete. We can allow hard deletes in our database because when you're pulling the database, you're running these select queries. It's not going to generate which data has been deleted. It's only going to show you what's in the database.
We basically have to tell our microservice owners, "Hey, just don't delete anything in these tables," which is pretty error-prone. And that's at least to the first point, that it is very error-prone. We are relying on our microservice owners to be doing the right thing, not only must they not delete rows from this table, where they must have to guarantee that they're always updating the modified time spend every time because otherwise, we'll still get into a data inconsistency issue because we won't be able to detect those changes.
Finally, the schema management is actually manual because if a DBA decides to go into the database and they want to say, “Add a column to a table,” Airflow doesn't know about it. So now we have to go into Airflow and we have to manage every single one of those tables, or whichever table that needs to be modified, and we have to update the schema so that it propagates the BigQuery and so on. On top of all of these problems, our data ecosystem is constantly evolving. We're adding new tools that are optimized for different jobs. We may introduce a Redis that optimizes for key-value cash. We may introduce Elasticsearch to do full text search. We may want to add a graph database for fraud detection. Or we may want to add some live dashboards and alert the monitoring system that help us understand how our business is doing right now.
And Airflow being a badge oriented tool, it's not meant for streaming. So we needed a better tool for this job. And as many of you probably already guessed it or if you already read the summary in this talk, we use Kafka. With Kafka now, every single downstream derived application can now just listen to the Kafka log and apply the changes at their own pace which is really nice, and because Kafka is designed for streaming, this solved the streaming problem.
The next question is, how are we getting the data from these databases into Kafka? There are a couple of options. First one, we can just double-write about system, right? Every time we're updating the database, we make sure we're also sending a message to Kafka. Then the question is, should we do this synchronously or asynchronously? If we update asynchronously, we'll, again, get into data inconsistent issues because we don't know whether the data has been successfully running in Kafka when we're doing the updating to the database. If we do this synchronously, which means that every time we successfully send to Kafka, we commit the change. Every time we fail to send to Kafka, we abort the change. But we're talking about distributed systems here and errors are friends. The problem is timeouts.
Timeouts are something that we don't quite know. It could be a network glitch that essentially caused the timeout and the data could have been successfully running to Kafka or could have not. So we wouldn't know what to do. And to solve that properly, that required distributed transaction which means something like two-phase commit. And two-phase commit is not trivial to implement and get right. It requires a set of interfaces and tools to actually implement it and the Vanilla Kafka doesn't support it, and not to mention that with two-phase commit, it means that it requires multiple rank shifts to do a consensus in order to have each write committed, and that's going to take a lot of time and a lot of production database cannot allow that kind of latency.
There's the second option; this is the cool kid on the club. It's event sourcing, which means we're using Kafka as a source of truth, we only write the data into Kafka and we're going to treat the database just like any other derived system. The database is just going to be pulling changes from this Kafka log and it's going to apply them into the database one by one. This looks much cleaner and it would solve a lot of headaches. However, there is one problem with this for some use cases and it's Read-Your-Write consistency.
Read-Your-Write consistency is the idea that when you're updating some data and you're trying to read from what you've just updated, you're expected to get what you just wrote. But with this setup, we actually may potentially be reading stale data, because say if we have a traffic spike and we have a bunch of data that are being sent into Kafka and then the database is slow at catching up, so at that point if we're trying to do a read, we're going to be reading stale data. That's really bad when you're building an application, like an account balance where you need to guarantee that your users are not withdrawing money to go into negative balance and this is problematic for that.
Then there is the third option which is change data capture using the write-ahead log. Change data capture is a design pattern in databases that basically captures every single database changes into a stream of change events. And then anyone that is interested in these change events can listen to the change and react accordingly. And we mentioned we're going to do this with the write-ahead log. Write-ahead log is pretty much implemented in every single database out there; it's kind of an implementation detail of each database rather than an API. And the idea of the write-ahead log is that before we update the data into the storage file, we're first going to update them into the write-ahead log just like the name sounds.
There are some benefits to this approach. The first is crash recovery. Now, if the database crashes halfway while writing the data into the storage file, the database upon restart can look at the commit log, replay the change, restore the corrupted data, so that's great. The second benefit is improving write performance in a certain scenario. This is the case where you have a single transaction but you're updating a lot of tables and this table probably resides on different storage files. So instead of trying to update on each of those tables individually, it's first going to sequentially write all of those changes into this log, that's only a single fsync versus fsyncing on each of those individual storage log, so it's much faster.
The third benefit is streaming replication. A lot of databases already apply this, like MySQL, where all the replicas are just looking at the commit log and tailing the commit log, applying the change, and then updating these replicas asynchronously. One other detail that's worth mentioning about the write-ahead log, specifically for MySQL, is that it gives you two options. You can either log statement. You can either do statement-based logging or you can do raw-based logging. Statement-based logging means you're logging the queries, and raw-based logging means you're actually logging the data after the change has been applied. And in terms of change data capture, raw-based logging is very useful, since now you have the data for the entire row, not just the column you've updated.
So by using change data capture with the write-ahead log, we get the best of all the worlds. We don't have to worry about implementing distributed transaction, but we get all of the transactional guarantees. And because we're asynchronously tailing this MySQL Binlog or some kind of a write-ahead log, we don't have to worry about impacting the performance when we're writing the data into the database because it's asynchronous.
Real-World Example: Streaming MySQL
Now, let's take a look at how exactly we're using CDC at WePay to stream database from MySQL into BigQuery. Under the hood, we use Kafka Connect framework or we leverage Kafka Connect framework for this job. The source connector is responsible for getting data from external sources and publishing them into Kafka. The sink connector is responsible for reading from Kafka and storing them into external sinks. Applied at WePay, our source data is MySQL, our data sink is BigQuery, our source connector is Debezium which is an open-source project. And our data sink is KCBQ which stands for Kafka Connect BigQuery; it's something we named ourselves because we wrote it.
We're going to break this up into two sections and talk about each part of them separately. So first, MySQL into Kafka, and I have to definitely talk about Debezium before going to any details. Debezium is an open-source project. It's basically meant for CDC and it's built on top of a Kafka Connect framework. And the way it does this is by basically just like CDC, reading the write-ahead log and converting them into individual changes and record them on the row-level basis. Debezium guarantees at least one semantics, which is the same guarantee as Kafka. And this means we don't have to worry about we'd ever lost data but we may potentially get duplicates. And finally, Debezium currently already supports MySQL, MongoDB, PostgreSQL, Oracle, and SQL server.
So how does Debezium look like in action? Before we started Debezium connector, we probably already had some database running in production. It's probably already replicating to some replica. So when we first start the connector, it's going to ask the database to give it the filename and the position of the most recent write and it's going to record that information.
Next, it's going to run a select star from table, every single table from the database, and it's going to convert the result set into individual create event and publish these events into Kafka. And because some tables are huge, this could potentially take a couple of hours. And during this time, the database may be having additional writes and that may be replicating to the replica, and Debezium is just going to temporarily ignore that. Once the snapshotting is complete, Debezium is going to start to catch up and it knows where to catch up because they recorded the filename and the position of the most recent write. And then once it's finally caught up, it will start streaming the data in real time just like any other replica except instead of storing the information, it's sending that information to Kafka.
Just take a look at what a Debezium event looks like. The before section is what the data looks like before the change. After section is what the data looks like after the change. The source section provides a bunch of metadata about the data source, so like the server ID and the filename and positions, as well as the database and the table that it's coming from. And if you're familiar with MySQL since 5.6, it introduced GTID so this is actually able to support GTID as well, instead of using the filename and position. The op section represents the type of operation used for updates, C is for create and D is for delete. And the timestamp is the timestamp of when this event was created in Debezium. If it's a create event, before will be null, if it's a delete event, after would be null.
This original pipeline I've showed you at the very start is pretty different from what we’re actually running in production; it's a little bit more complicated than this. Let's take a look at why. So, we're not going to basically be directly reading from the master of the MySQL instance because we can potentially have snapshots that could take hours. We don't want to impact the performance. So we set up a MySQL replica. And this replica is dedicated for Debezium and we're just going to be tailing from the replica. But having just one replica is not enough, because what if it goes down? So we set up a secondary replica and this is responsible in case the primary is down. In order to handle failover, we add a proxy in front of it so that if the primary is down, we read from the secondary instead.
But, of course, we don't just have a single microservice, we have many microservices and each one of them will be replicating to the same primary and secondary MySQL replica. And the reason that we're using just a single cluster of primary and secondary replica for Debezium is for operational cost. We know that as we add more microservices, this could potentially become problematic and we may potentially add additional cluster as well, but for now, this is sufficient for us because we're a startup. But even though we only have a single Debezium dedicated MySQL cluster, we do have an individual Debezium connector that corresponds to every single one of those microservices, and this is important because it allows us to configure each microservice Debezium connector based on what works for that particular connector and it also allows us to bring up and down the specific connector in case we're doing any kind of troubleshooting, without affecting the rest of the entire streaming pipeline basically. We run these connectors in distributed mode for fault tolerance. So this is what it actually looks like in production, just a little bit more complicated.
Now, that we got our data into Kafka, the next question is, how are we getting the data from Kafka into BigQuery? The reason we built KCBQ is because at that time, there was no existing Kafka to BigQuery connector. We have open-sourced it, so if you're interested, it's there on the WePay GitHub. There are a couple nice features about this connector. First of all, it has a configurable retry logic which means that BigQuery will sometimes give you these retryable transient errors and the connector is intelligent enough to know about it and is going to retry in order to not drop any messages. But because sometimes this error could last for a while, we've implemented the retry logic with exponential back off so that it won't have to hide the API too frequently in case it's down for a long time.
Secondly, this KCBQ is capable of lazily updating the schema for our tables. What the lazily means here is that Debezium itself is actually going to cache the schema for every single table. And when the new message arrives, it's going to leverage the data in that cache and it's going to try to send a message to BigQuery with the version in the cache. In the case where it gets the schema error back, it knows that the schema is outdated. It will then go fetch the latest schema from the schema registry and it will retry again with that latest schema. So that helps us deal with automatic schema evolution.
And finally, KCBQ supports both batch and streaming-based uploading or it basically uses BigQuery's batch API and BigQuery's streaming insertion API. The benefit of the batch API is when you're doing snapshotting, it's the faster option. And when the snapshotting is complete, you can then basically flip the switch to use the streaming-based API which allows you to access data in real-time.
There is one additional information that we had to add to the KCBQ event and that's the Kafka offset. I'll explain why in a second. But the Kafka offset, if you're not familiar with it, is essentially the position of this record in Kafka. So here is what an example table looks like when we're querying for all the field in this table, and I've also included Kafka offset there as well. Notice that this is actually not very useful. We're getting every single record, every single change event. What we really want is just the final change. So we leverage Kafka offsets to do deduplication and compression and determine what we actually need to show to the user. The reason we can trust the Kafka offset is because the data are partitioned by primary key and in Kafka, anything in a partition is guaranteed to be ordered. So now we know that any data with a larger offset arrived at a later time. So with the Kafka offset, we can now dedup our data by primary key and we now have a version that mirrors what's in BigQuery.
An additional benefit of using BigQuery view is that we can actually mask any column that we don't want to see. Because, for example, email SPI, sensitive data, and we don't want most of the users to see that information. We create another view on top of the view I showed you guys earlier and this view does not have the email information. And because BigQuery has access control configuration so we can give different users different permissions to different tables.
There's one final piece in this pipeline that I briefly mentioned but didn't really get into, and that's the schema registry. At WePay, we use the Confluent Schema Registry and this is basically a registry that stores a version history of all of the data schemas. What's really cool about the Confluent Schema Registry is that it dog-foods on Kafka. What that means is that it uses Kafka as its underlying storage for all of the schemas, so you don't have to spin up a new storage, engineer a database of some sort to handle schema. And schema registry supports Apache Avro as its serialization format which guarantees both forward and backward compellability which is always a good thing.
And finally, we don't want our schema registry to become our single point of failure because that defeats the whole purpose of a resilient pipeline. And the schema registry is designed to be both distributed and single-master. It leverages Zookeeper to do any kind of failover but essentially, it is resilient to failure.
To put it all together, here is what schema evolution looks like. Before that, one thing worth mentioning is that MySQL Binlog doesn't just store the data change. It also stores every single schema change. This is really useful because now Debezium, upon receiving a schema change, is going to cache this schema change and it's going to update this information to schema registry. Any following data change event it receives, it can now use this new cached version of the schema instead.
So by the time the data gets into KCBQ, KCBQ doesn't know about the schema change yet so it's just going to send the data with its older cached version. But BigQuery is going to give us an error saying the schema is wrong and KCBQ can now fetch the latest schema from the schema registry and then send the data to BigQuery using this new schema. So that completes this automatic schema evolution, which is really useful.
Future Challenge: Streaming Cassandra
As I mentioned, this final part is going to be a little bit experimental as it's something we're currently working on, but it's interesting enough and relevant enough to CDC and I'm really excited to share it with you guys. At WePay, as our company grew, we began to see a need for a NoSQL database that's optimized for high rise throughput for horizontal scalability and for high availability. And Kafka became the obvious top contender. By introducing Kafka to our stack though, we also need to figure out how we want to do CDC for Kafka.
At first, we thought we figured this out for MySQL. How hard could it be? It turns out that it's a little bit more complicated. And because this talk is not a Cassandra-focused talk, I'm going to be skipping over a lot of details on Cassandra. I'm only going to talk about the Cassandra steps that are directly related to CDC. The thing that makes Cassandra really difficult for change data capture is its replication model. Unlike MySQL which uses primary replica replication model, Cassandra uses a peer-to-peer replication model. This means that all the nodes are equal. It also means that every single node is able to handle both reads and writes. And this also means that if we look at the data in a single node, it only contains a subset of the entire cluster of nodes, and which makes sense, because that's how you do horizontal scalability. You don't want a node to contain all the data.
So the next question is, how exactly then does Cassandra determine which node each data goes into? The way Cassandra handles this is that it divides the data into a cluster of nodes, it's typically visualized as a ring. And each of the nodes in this ring is responsible for a subset of all the data and is called the token range. So in this naïve example, we have a total possible token value from 0 to 19 and each node is responsible for a quarter of them.
When a request comes in, it's going to have a primary key or partition key value. The reason there is always going to be a partition key is because Cassandra schemas require you to specify a partitioning key for every single table. So, in this case, the partition key is foo and one of the nodes is going to be picked at the coordinator node. The job of the coordinator node is to hash this partition key, convert it into a token value, and depending on what this token value is, the coordinator is going to forward this information, this request, to the node that is responsible for writing this change.
But what if this node C dies? Then this is no longer fault tolerant. So the way Cassandra solves this is by increasing the replication factor. This example here has a replication factor of one. In reality, it typically has a replication factor of three. With a replication factor of three, the way Cassandra distributed this token range is by walking along this token ring and then basically replicating this range to its neighbors until the replication factor is reached. There are more sophisticated ways of distribution but this is just a naïve example. With this approach now, when the coordinator is forwarding the data, three of these four nodes are actually going to all store this data. So now we don't have to worry about not being able to write when one of the nodes is down.
How does this relate to CDC? Well, there is actually also a write-ahead log in every single one of this nodes in the cluster. And this is called a commit log in Cassandra. This commit log will only record the writes that are specific to that node and then the way we can actually handle CDC is that we can put one agent, a CDC agent, in each of this nodes and this agent is going to be responsible for reading the data from this commit log and sending them off into Kafka. In fact, since Cassandra's 3.0, they actually introduced a feature, a CDC feature, and this feature provides us with a file reader and the file-read handler, and the handler has already deserialized this information from the commit log, so we thought all we have to do then is to take this mutation which is what they call a change of event, extract the data that we care about converting to Avro, package it, and then send it off to Kafka.
But as you're probably already thinking, there are a couple of problems with this approach. First of all, we get a duplicated change of event. Because we have a replication factor of three, when we are reading from all these logs, we're going to get three copies of all the data. So somewhere down in our pipeline, we need to figure out how to do the deduplication.
The second problem is out-of-order events. This one is a little bit more subtle, because we're dealing with distributed system here, it is possible that when two different clients are writing to the same row at the same time, to two different values, in this case, maybe one client is changing the first name to Anne and the other client is changing the first name to Alice, and node one and node two receive Alice first and then Anne, while node three receives Anne first and then Alice. Now, these three different nodes actually have a different understanding of what's the most recent data.
The way Cassandra cleverly handles this is using the concept of last-write-win. So when a client is sending a request, it actually generates a client-side timestamp. And this timestamp gets propagated into every single column of this row of the data. This way when the client is reading the data from these nodes, if it sees the discrepancy between two or more nodes, it's going to always pick the row with the latest timestamp. But because our CDC pipeline is outside of the read path of Cassandra, we have to basically figure out how to do these ourselves.
The third problem is incomplete change of event. Cassandra is optimized for write, so unlike MySQL where it's going to do a read before every single write, Cassandra is just going to blindly write the data into the database. And because of this, we're only going to know the columns that have changed. We're not going to know all the rest of the columns of that row. And because of this, our change of events is incomplete and we need to somehow figure out how to piece together this information in our pipeline.
The fourth problem is unlogged schema change. You can modify your schema in Cassandra, however, it uses a completely different read-write path from a data change event. It uses gossip protocol to handle or propagate a schema change. This means that this data is never going to be recorded into the commit log. So if we're only listening to the commit log, we're not going to know about any schema change.
Our current solution that we're working on, we call it the bridging-the-gap solution, is that we're going to ignore other problems at least until the data get into BigQuery. So basically, the agent is just going to parse all of this data, send it off to Kafka. Kafka is going to send all of these data into BigQuery. Everything in BigQuery is unordered, it's duplicated, and it's incomplete. But then we're going to heavily leverage BigQuery view to handle all of this. In order for BigQuery to know how to do this, it needs a little bit more information. It's not only going to store the value of every single column, it's also going to record the timestamp which is when the data is updated. It's going to record a deletion timestamp in the case the data is deleted, and it's also going to create a boolean field or record a boolean field which is this primary boolean that just represents whether this column is a primary field or not.
Let's take a look at the data now that we have stored into BigQuery. This query specifically looks at the first name column and if we want to also query for the last name column, notice that the second row is null and that's because the second event is just an update so we only updated first name. This is not quite useful, because what we actually want is the second event for our first name but the first event for last name. So the way we can handle this is basically by looking at the timestamp field, compare them and find the one with the latest timestamp. And then we can return the user or create a view that returns the user with the data that's being deduplicated. It's being ordered and it's complete. And in order to do this, we have to heavily leverage BigQuery including its UDFs as well as a lot of like Group By and so on.
There are some advantages to this approach. The first advantage is quick iteration because we basically didn't change anything in our pipeline and we're doing all the heavy lifting in BigQuery and BigQuery view is very cheap to create, to modify, to delete, so then as we experiment with Cassandra, we can basically modify the view as necessary. The second benefit is that there's very little operational overhead. Notice aside from the Cassandra CDC agent, we didn't introduce anything new. So this way, as we're solving this problem, we don't want to be thinking about the uptime of other services or other applications that help for this pipeline. And finally, because we're leveraging off the base table in BigQuery, we're now going to impact Cassandra production because we don't have to basically on every write, read back into Cassandra to get the full row because all of our data are already in BigQuery. But, of course, it comes at a cost.
The first cost is it's very expensive because this means every time a user is querying for this view, we have to do all of these piecing the data together, and it's going to get very expensive. On top of this, we're recording a replication factor of three which means that it's going to amplify and the table is going to get really big really fast, so we'd have to do some maintenance work in order to minimize this view. And the way to do compaction is by just materializing this view periodically, but it is going to be another overhead. And finally, notice we've only solved this problem for BigQuery which means that if any other downstream derived system is trying to read from this data, they are out of luck. They basically have to re-implement all of these themselves and that's not quite ideal.
For sake of completion, I've included a potential future solution that we're considering. It's a little bit more complicated because now it introduces stream processing engine, introduces a cache, a database, and a second Kafka. Let's go through how this would work. The messages are still going to arrive into Kafka, duplicated and out-of-order. The first thing the stream processing engine is going to do is check against the cache to see whether this data has been processed or not. If it hasn't been processed, then we will process this, otherwise, we can drop the message.
Next, the stream processing engine is going to check against the database. It's going to compare the timestamp of what's in the database against the timestamp of this event, and in the case where we have on older timestamp in our event, we can drop it as well. And finally, because we've done a read on this database, we now get both the before and the after of every single event. So we can send this complete information into our second Kafka. Now, Kafka can then send this information into KCBQ which then can propagate into BigQuery, and the benefit here is that if we have any other derived system that is reading from Kafka, they now have a much nicer outcome.
In summary, there's three things I'm trying to get through for this talk. The first thing is that database as a stream of change event is a really natural and useful concept. It would make a lot of sense for every single database out there to be able to provide a CDC interface for the data to be sent into other derived systems because otherwise, we're talking about a very close system where the database expects that this is going to be the final destination and the data is not going to go anywhere else, and that's kind of selfish.
The second point is that log-centric architecture is at the heart of streaming data pipeline. It helps us solve a lot of problems when it comes to distributed transactions, and it's very simple to implement and understand. And finally, CDC for peer-to-peer databases is not trivial as you probably already noticed. However, we're hoping as the tools get better, and as our understanding of these databases gets better, it will become easier over time.
Some additional information, if your interests are MySQL to BigQuery pipeline, there is a blog post on our website that basically explains it in a little bit more details. I've also included the KCBQ GitHub link in case you're interested in using that. And finally, the last piece is actually a blog post that my colleague has wrote this morning. It talks about schema evolution in the case where it breaks backwards compatibility. We can use Avro's forward and backward compatibility to deal with schemas that are compatible. But what happens if you made a change that is not compatible in the database? So the last point kind of goes a little bit more into that and I think it’s super interesting and relevant for CDC. And that's the end of my talk. Thank you, guys.
Questions & Answers
Moderator: So for the future work, you have a database sitting on the side. I have two questions. One is, you could hydrate by just reading directly from one of the source databases, do you?
Gao: Yes. The reason we don't want to read directly from the source is because we don't want to impact the production of the source database. It is possible to create a second cluster, a Cassandra cluster that is made specifically for the CDC purpose which is kind of what this could potentially be as well, so.
Moderator: But you have to keep it in sync with the sources? That's the challenge, right?
Gao: It's okay if it's asynchronous because for every single table, it's essentially serialized so then we know that it's going to be in order.
Participant 1: What do you think about writing the event from the application itself? Because we've implemented the future solution that you showed here, using events in the application write to Cassandra, write to Kafka, and then we have two events before and after into Kafka. And then we use the stream processing engine within [inaudible 00:42:53] to handle [inaudible 00:42:55] and then distribute the updates to multiple databases.
Gao: Yes. One of the problems that we're seeing is the distributed transaction problem. If you guys have potentially solved that, then that's great. But the other problem is that we do want to be able to get the before and after. At least into MySQL case, if we were to use this event sourcing approach, then we only get to the column that have changed. So that was something we're trying to avoid. But in this case with Cassandra, it's simply not possible because the database itself doesn't do read before write. That's also why this is kind of an event sourcing approach. Okay, so now I kind of understand what you're saying, so you're talking about why not just update Kafka first and then basically read from.
Participant 1: That approach works with Cassandra, you don't have to worry about multiple copies and stuff.
Participant 2: So can you go into the detail, you read from Cassandra, send it to Kafka, you write to Cassandra and then write to Kafka, so you get a bit of an update.
Participant 3: Before you write to Cassandra, you write an event to Kafka and then once it's successful, you write another event to Kafka.
Participant 2: It's kind of but it kind of gets grounded.
Gao: And another thing is if you are using Cassandra for any kind of transactional things where you care about read-your-write consistency, that could potentially become a problem I think. Where you need to guarantee that every time you're reading from the database, it's the latest thing, it has what you've written already.
Participant 4: Yes, I know. The read or the write in Cassandra helps with this, but when you're replicating that information [inaudible 00:44:36].
Gao: It's okay. Yes. We can take it offline after. Thank you.
Participant 5: What's the motivation for going to Cassandra and given that it sounds like this is quite an effort to go to it? Is BigQuery not sufficient? Or what's the limitation there?
Gao: We want to use Cassandra more in the sense of a production database, in the sense of OLTP database whereas BigQuery is more so for analytical database, OLAP. So we want to optimize for write and Cassandra is our best contender for that.
Participant 6: So I had a quick question about caching. That essentially materializes the view of the row within the cache, right? So how do you know how long to keep that cache before you throw it out?
Gao: The cache is going to be like an optimization, but it's not going to be a source truth because it is possible - say you've set your TTL to 30 minutes but for whatever reasons, one of the nodes is down for a longer amount of time, then you can get the data later. But the database can then cache those problems by the time the data get there, so.
Participant 7: Hi. Just a quick question. I was just listening through the future solution for Cassandra where you will get three copies of the data and now to quarter as well at times. Could you not just use one of the replicas as master and you could use Zookeeper for keeping that state and just have that one replica push it out?
Gao: That's actually something we considered, where we would actually coordinate the different agents so that only one of them is sending the message. The problem is that Cassandra is meant to be a peer-to-peer database and it's meant where all the nodes should be equal. If we're starting to introduce Zookeeper into the picture, it's a little bit against Cassandra's philosophy which is that any node can be taken up or down because now we're basically setting one of the nodes to be the master, and it's definitely possible but the reason we're not considering it is because we kind of want to follow what Cassandra is known for which is the tow [SP] like peer-to-peer, everyone is equal.
Participant 8: Do you write three? Write equals three on all writes?
Gao: Right now, we do, yes.
Participant 9: Thanks, it's a great talk. Just curious, if you think there'll be any people to build downstream applications off some of these streams, or if it all goes to analytics? I've seen some interesting use cases of using this to actually generate other applications.
Gao: I think it's definitely possible which is why we want to have this future solution that allows other systems to be able to read from Kafka. If the only thing we cared about was analytics, then our existing pipeline could kind of work for a while, so yes.
Participant 10: I'm curious if you ever end up missing events due to a network failure talking to Kafka or something like that, and if so, how do you deal with that?
Gao: That's a good question. A lot of things that we're dealing with right now are all in POC, so we haven't had to spend a lot of time and effort in terms of guaranteeing that our messages are not lost and whatnot. I think it's potentially possible, as I've heard about similar scenarios in other pipelines before with Kafka, but we'll probably get a clear answer as we experiment more with this pipeline.
Participant 11: My question is, you're paying an awful lot of the data duplication with a view on BigQuery. How much of a performance penalty are you incurring by lazily evaluating the data this way?
Gao: That's something that's just going to get worse over time. We are currently in the process of building this so I don't have a lot of numbers for you but it is definitely pretty expensive. BigQuery is great because it's able to do parallel execution, but even with that, it is a concern that we could potentially take way too long for a single query to do, which is why we're hoping compaction could help us down the road, but, yes. Sorry, I don't have a good number for that.
Participant 11: Is that a business tradeoff, you think, whether or not to throw away the out data …?
Gao: The business tradeoff is expense. BigQuery cost is based off of execution, so this is going to get expensive, yes.
Participant 12: One question is if you do, and this [inaudible 00:49:51] but if you are cached and you're just windowing in the stream processor, you could probably dedup there at least for a time window data.
Gao: Cassandra itself actually had the TTL feature. If we actually want to use Cassandra as this intermediary database, it's possible as well.
Participant 13: This is more in line with someone who had asked the question before. If data consistency is such a big requirement here, what is the rationale for using an eventual thing like Casandra?
Gao: That's a really good question. Because we're building a data pipeline, we want to optimize for different use cases. So maybe for one user, they're goal is to use Cassandra for write-only, and another might be using Cassandra for something that's a little bit more consistent. We're trying to make a more generic solution essentially, that covers all these cases but it's true though.
Participant 13: In this picture, you are envisioning heterogeneous sources like Cassandra and other MySQL and things like that?
Gao: Oh, no. So this pipeline is specifically for Cassandra only.
See more presentations with transcripts