Facilitating the Spread of Knowledge and Innovation in Professional Software Development

Write for InfoQ


Choose your language

InfoQ Homepage Presentations Anti-Entropy Using CRDTs on HA Datastores @Netflix

Anti-Entropy Using CRDTs on HA Datastores @Netflix



Sailesh Mukil briefly introduces Dynomite, an open-source distributed datastore primarily backed by Redis, built to be highly available, and offers a deep dive on how anti-entropy is implemented. Mukil talks about the underlying principles of CRDTs that make this possible.


Sailesh Mukil is a Senior Software Engineer at Netflix where he works on Dynomite, a highly available in-memory database backed by Redis. Prior to Netflix, he worked at Cloudera where he contributed to the Apache Impala and the Apache Kudu projects, in his capacity as a PMC and a committer to both.

About the conference

Software is changing the world. QCon empowers software development by facilitating the spread of knowledge and innovation in the developer community. A practitioner-driven conference, QCon is designed for technical team leads, architects, engineering directors, and project managers who influence innovation in their teams.


Mukil: My name is Sailesh Mukil. I'm a Senior Software Engineer at Netflix. Today I want to talk to you about how we achieve anti-entropy using CRDTs, or Conflict-free Replicated Data Types on a highly available datastore called Dynomite, which we built in-house. The purpose of this talk really is to show how CRDTs can allow a highly available datastore like ours to provide some stronger guarantees. I want to point out, CRDTs are not something brand new. I'd like to talk about how we went about implementing it. What are some of the practical challenges we faced along the way, and some optimizations that we made. I hope, after this talk, maybe the audience can leave with a better understanding of CRDTs. If you're new to it, hopefully identify some use cases that you think CRDTs would be a good fit for.

In 2011, Netflix began its adoption of Cassandra. There was a heroic migration from the traditional Oracle database to Cassandra, because we like the highly available nature of it. In 2013, Netflix as a business was gearing up to go global. In 2013, we started our first multi-region deployments. There was an incident in 2012 which definitely helped as well. In 2012, when we were a single region deployment, we had AWS go down for about 24 hours and Netflix was inaccessible. This definitely helped us push our multi-region story. As we started going to a multi-region story, a lot of our app developers started using Cassandra as a primary database. They understood the trade-offs that a system like Cassandra could provide. For a lot of use cases, a system like Cassandra was still quite heavy. A lot of use cases could use with some lower latency. This was the thinking that brought about the system Dynomite in 2016.


What is Dynomite? Dynomite, in a nutshell, it takes non-distributed datastores and makes them distributed. It's inspired by the DynamoDB white paper, just like Cassandra was. It follows the token ring architecture, which is basically to say, if you have a single datastore instance, Dynomite can take multiple of them and shard your data across them. The way you would shard your data across these nodes are you would parse your keys through a hash function. Your hash functions would spit out tokens. Your tokens would fall somewhere in the ring. The node that falls in the ring owns the data. In this case, we can see a three node ring, which means that the data is roughly divided by a third in each node.

Each ring is basically a full copy of the data. The way we deployed is each ring is contained within one rack or Amazon availability zone. We replicate this ring across racks, within the same region, or even across regions. A client can connect to any node in the cluster. Each node can act as a coordinator for any query. If the node is the token owner for that key, it applies a write locally and it replicates it to its other replicas. If the node is not the token owner for a key, it forwards it to the token owner, which then takes the responsibility of replicating it across the cluster.

What are the features that Dynomite provides? It provides global replication. It's a highly available datastore. It's a shared nothing, which means that each node can operate independently. It does auto-sharding of your keys across the cluster. It scales linearly with the size of data. It allows you to plug datastores underneath. We can run with Memcached, Redis, or RocksDB. I want to point out that we primarily run with Redis in production today. This talk is geared towards how we use CRDTs within Redis. It offers multiple quorum levels. We can have no quorums, quorums within a single region, quorums across regions. Lastly, it supports the same datastore API of the underlying datastore. If you're being backed by Redis, then any Redis client can talk to Dynomite, and Dynomite would automatically understand those commands and just offer a distributed Redis.

What is the Dynomite footprint at Netflix currently? Today we have about 1000 customer-facing nodes. We collectively handle about a million operations per second. Our largest cluster today holds about 6 TBs. Dynomite, when it began was typically used as a distributed cache in front of datastores like Cassandra and Elasticsearch for faster access. Over the years, we've had a growing number of use cases. It's about time we made the system more resilient to onboard newer use cases. Today we have use cases ranging from maintaining session information for devices. Dynomite is used as part of A/B testing as well, determining when to send notifications to customers through devices. Also, we started onboarding more critical use cases, like connecting customers to support agents for 150-million subscriber base.

The Problem

Let's get into what the problem is. The problem simply is entropy in the system. What do we mean by entropy? Entropy in this context simply means replicas going out of sync. Let's look at a simple example of how that can happen. Here we have a three replica system. I'm just running a Redis command which says, set the value of a key K to 123. How it typically works is we apply the write locally and replicate it to the other replicas. If this is a quorum write, we would respond to the client after a quorum number of nodes have applied the write. Let's assume we have a network partition which isolates one of our replicas in the system. Replica 3 is isolated and cannot talk to any of the other replicas. We get a command to update the value of key K to 456. This replica can apply the write locally, but it fails to replicate the data because of the network partition. If this is a quorum write, we respond with an error back to the client.

Let's say there's yet another partition which isolates all replicas from each other. We have yet another update to the same key, at a different replica. That replica applies the write locally, but again fails to replicate. At this point, you see that for the same key K, we have three different values across replicas in the cluster. If different clients were to ask different replicas, get me the value for the key K, you would get back different answers. This is if you're doing a non-quorum read. If you were to do a quorum read, which means that the majority of the replicas have to agree on the value. The coordinating node would ask the other replicas, give me a value for K. They would respond with their values. We can see that since everybody has a different value, we cannot achieve a quorum. The only thing we can do at this point is reply to the client saying that we were not able to achieve a quorum. At this point, this key is basically unreadable. If you want to use a quorum read, this key is unreadable. The only way to unblock this key would be to do a non-quorum read on the key or to overwrite it, at which point we can read it again. This is obviously not desirable.

What can we see? From this we can see that replicas will go out of sync. They can and they will go out of sync. Systems must be built to be resilient to entropy. We can do things like have consensus on every operation, but this is an AP system. We compromise on strong consistency to maintain availability. This is the problem that brought about implementing CRDTs within Dynomite.

Achieving Anti-Entropy (Traditionally)

Before I dive into CRDTs, let's look at how some of the other systems generally deal with anti-entropy. The most common one is the last writer wins. The way last writer wins is each replica maintains a timestamp for the latest update for every key that it maintains. When entropy is detected across replicas, these timestamps are used to find the latest value for the key. The timestamp that wins is the greatest timestamp. This is not always 100% correct because of clock skew. You generally know that different servers, clocks can go out of sync. They don't always agree on the time. Doing periodic NTP syncs, which is to sync your clock with a remote time server, always helps. We are quite aggressive with NTP syncs at Netflix, but it still doesn't mean it's 100% correct.

The next is vector clocks. Vector clocks could use its own presentation, so I'll just talk about the guarantees that it offers. It allows you to create causal relationships between updates to find out which updates are truly the latest and which are not. It doesn't allow you to find causal relationship between concurrent updates. For concurrent writes, while using vector clocks, we still have to fall back to either using a last writer wins mechanism, or what some systems do, which is pass back all the conflicting values for the key back up to the application, and let the application decide. As we go through this talk, we'll see that for some cases, we still need to rely on last writer wins. The goal is to avoid it when possible.

What is the solution that we implemented? We ended up using CRDTs to achieve anti-entropy. What is a CRDT? It's basically a data structure that can be replicated across the network. Each replica can update its local state independently without having to coordinate with the other replicas. It's always CRDT state that is always mathematically possible to resolve any inconsistencies across replicas. Today we're going to talk about state-based CRDTs or convergent replicated data types. It was formally defined in 2011 by Marc Shapiro, Carlos Baquero, and a few others.

Operations on CRDTs must obey three properties. What are these three properties? The first is they need to be associative, which means that the grouping of operations should not matter. Then they need to be commutative, which means that the order of the operation should not matter. Lastly, they need to be idempotent, which means that duplication of the operations should not matter. We can see that not all data can be represented as CRDTs. Fortunately, most Redis native data types can be. There are two basic types of operations on CRDTs. One is an update where it updates its local state in its local replica. The second is a merge where all the replicas share their local states with each other. We reach a consistent state where they converge.

Introduction to CRDTs

In the context of Dynomite, when we do a write, we basically do an update operation. We update the local replica. When we're doing a repair, we're basically doing a merge operation. A merge operation on the read path is basically a read repair. Most of today's talk will focus on read repairs. I will also talk about asynchronous repairs towards the end. I have to mention that it's still some ongoing work.

CRDTs provide this notion of strong eventual consistency. What is strong eventual consistency? It's basically eventual consistency with the added property of safety. Eventual consistency basically tells us that we have aliveness property, which means that something good will eventually happen. That basically means your system will not hit a deadlock and will continue to function. A safety property tells us that something bad will never happen. What this means is that the value that you end up with will always be the correct value.

Let's look at an example to understand the difference between strong eventual consistency and eventual consistency. This is, unfortunately, a real-world example where someone tried to use a system like Dynomite for a distributed counter use case. In the interest of time, as I go along CRDTs, I'm going to explain only two types, one, which is the counter, which is the simplest. To help you understand the guarantees that CRDTs can provide. The same guarantees will apply to other CRDTs as well.

Let's take a naïve distributed counter use case. We have a command called increment in INCR in Redis, which basically increments a counter by one. All counters start with the base value of 0 in Redis. When we get this command, we update our local state. We replicate it to the other replicas, so now everyone has a counter value as 1. We have our network partitions come in which isolate all replicas from each other. We have two commands to two different replicas for the same counter. One is decrementing the same counter and one is incrementing it. They update the local states, but they're not able to replicate across the cluster. We are at the state where we have three different values for the same counter. Let's say we want to repair the system. We want to reach an eventually consistent state. How do we reach that? Let's say we use last writer wins. Let's say that replica 3 has the latest value for the counter because it got the last increment. Replica 3 states that the value of the counter is 2, which is actually incorrect. If you remember, we had two increments and one decrement, which means that the value of the counter has to be 1. If we do a last writer wins in this case, we will end up with an incorrect value. How can we represent counters as a CRDT to avoid this?

CRDT: PNCounters

This CRDT is called PNCounters, which are positive, negative counters. How it basically works is each replica splits the logical counter into two physical counters. One is a positive counter which tracks the number of increments that that replica has seen. One is the negative counter which tracks the number of decrements that that counter has seen. The final logical counter value is basically the sum of all positive counters minus the sum of all negative counters. You can see that each physical counter will only grow in one direction. It will always grow upwards. The more increments you get, your positive counter grows up. The more decrement you get, your negative counter grows up. You'll never have a case where any physical counter falls in value. Implementing this in a system would have some memory overhead. The memory overhead is bounded by the number of replicas in the system. As each replica maintains two local counters, it also maintains copies of the other replicas' counters. Let's look at how this looks. Let's try to visualize it.

Here we have the same scenario, three replicas, and we have a counter across replicas. Each of the replicas has a local positive and negative counter. It also has copies of the other replicas' counters. Let's walk through the same scenario again. We get an increment command for the increment counter. The value of the local positive counter is updated to 1. It replicates the state of its counter across the system, and the other replicas apply the updates locally as well. We have the same network partitions where replicas are isolated. Then we have the same commands where one of them gets a decrement and one of them gets an increment. If you notice, replica's 2 negative counter has increased by 1, and replica 3's positive counter has increased by 1. They still failed to replicate across the system due to the network partition. Since we've not reached an eventually consistent state yet, all the replicas have a different value for the same counter and that's ok. We would want to converge to a consistent value. Since Dynomite, we've implemented it as a read repair, we need something to trigger the merge operation. The way to trigger a merge operation in this case would be to read that key.

When we say get me the value of this counter. It automatically triggers a merge operation where the coordinating node asks the state of the same counter from the other replicas. They respond with what the counter looks like to them. The coordinating node applies the changes locally, so it repairs itself. It also sends a repair message to the other replicas to make sure that everybody is on the same page. We can see that it's the same scenario, but because we represented the counter as a CRDT, we were able to reach a safe, eventually consistent state. We have the final value of the counter which is 1, which is correct in this case.

CRDT: LWW - Element Set

Let's move on to the next CRDT, which I want to talk about. It's called the last writer wins element set. We use this CRDT for registers, which in Redis, we just call strings, for hashmaps, and sorted sets. We use the CRDT to maintain key metadata. The last writer wins element set basically has two things. It has an add set. We use the add set to maintain the latest timestamps for keys seen on that replica. The remove set, which is to maintain the timestamps for keys, which were deleted. It maintains the metadata for the key and at what time it was deleted.

To contrast with the counter, a counter's value can only grow in two directions, it grows up or it goes down. Representing it as two physical counters allows us to mathematically converge to a safe value. Registers have this property of being able to take arbitrary values at any given time. You can set a key to ABC now, and you can set it to something completely different a second from now. There is no mathematical way to arrive at which one is truly the latest. In these cases, we still fall back to last writer wins.

Let's look at how the last writer wins element set looks like to Dynomite. Each replica has an add set and a remove set. I'll be talking only about regular key values, or Redis strings for this example. We get a command saying, set the key K1 to the value 123. It's tagged with the timestamp t1. We add this information to our add set saying that I saw a key K1, and it was updated at the timestamp t1. We also apply the write locally. This gets replicated across the cluster. Everybody adds this information to the add set and also applies the write locally.

We have a network partition that isolates replica 3 from the system and we get an update for the same key. We are updating the value from 123 to 456 at a future timestamp t2. This write is applied locally but it fails to replicate because of the network partition. We get a new key written to replica 2 called K2 with a value of 999, at a timestamp t3. We apply that write locally, add the information to the add set. Try to replicate it across the cluster. It can only make it to replica 1. At this point, we have replicas 1 and 2, agreeing on the values of key K1 and key K2, but replica 3 not having the key K2 and having a completely different value for the key K1. Let's try to repair this. To repair it, we have to do a read to trigger the merge operation. When you try to get the value of key K1, the coordinating node asks the other replicas, what are your values for K1? They send back their respective values. One of them says mine is 123 with the timestamp t1. The other says, my value is 456 with the timestamp t2. The coordinating node can make a decision here. It looks at the timestamp t2, is greater than t1. That means that 456 has to be the latest value. It repairs itself by applying this write locally. It also sends a repair message to the replica that is out of date, which is R-2. It also responds to the client with the latest value of the key which is 456. We can still see that replica 3 does not have the key K2. That was not repaired because we had not triggered a merge operation for that key. Let's do that now. We try to get the value of key K2. The coordinating node asks the values of K2 from the other replicas. R-3 says I do not have this value at all. The coordinating node repairs R-3 with the value of K2. It sends the value back to the client with the value 999.

Let's look at how a delete would work. Let's say we have a new network partition which isolates replica R-2 from the rest of the system. We get a delete for the key K2 at a timestamp t4, which is greater than all the timestamps we've seen so far. What this would do is it would take the key K2 from the add set, add it to the remove set with the new timestamp t4, and get rid of the key completely. Because of the network partition, we are unable to replicate this information across the cluster. While this network partition is still in effect, someone can try to read the key K2 from one of the other replicas. They would still get back the value that is contained in those replicas. If this is a quorum read, we have two out of three replicas in the system agreeing that the value is 999, so they would get back the value 999. Because of the network partition, they are unable to see that K2 was deleted.

Let's say the partition has healed. We try to get the value of K2 again. At this point, R-3, which is the coordinating node, tries to get the values of K2 from the other replicas. R-1 says my value is 999 at t3. Replica 2 says, I've deleted it at timestamp t4. The coordinating node knows that the latest value for the key is gone. It's deleted. It updates the state locally, adds the key to its remove set. Repairs the replica that it knows does not have this value, and responds back to the client with the value of nil.

Before I move on to the implementation challenges, I showed you this example of how it works for registers, which are basic key values. We can extend the same implementation for hashmaps, and sorted sets, simply by having a separate add set and a separate remove set for each hashmap or each sorted set. What that means is basically, each field within the hashmap would have its own metadata, or each field within the sorted set would have its own metadata. The crux of that is you would treat each secondary key or each field as you would treat a register.

Implementation Challenges (LWW Element Set)

Let's move on to the implementation challenges. Let's start with the easiest one. Redis doesn't maintain timestamps. This is easy to fix because Dynomite can track timestamps of client requests. Secondly, we'd like Dynomite to remain stateless. What I mean by this is, the Dynomite process currently does not hold any state. If the Dynomite process itself crashes, the only action you need to take is spin the process back up, connect it to the local Redis. It should be good to go. We don't want to add state to the Dynomite process, which would basically uncover a whole new set of failure scenarios to cover. We'd still like Dynomite to remain stateless. We need to find a place to put this metadata. The best place to put this metadata is put it back into Redis.

The next challenge is we need to make sure that all data and metadata are updated atomically. We don't want to have a case where we updated the metadata but failed to write the data. We want to make sure that we update both of them atomically. Redis allows us to write Lua scripts, and it guarantees that all Lua scripts will be executed atomically. This is possible because is done on a single thread. We can basically capture all write commands as Dynomite, rewrite them into Lua scripts, and send them down to Redis.

Lastly, the remove set contains the metadata for all keys that were removed. Does that mean it grows forever? If you keep deleting keys, the remove set can hog up all the memory in the cluster and we wouldn't have any space for the remaining keys and values. There are some things we can do here to maintain to make sure that the remove set doesn't grow too large. The first thing is if the coordinating node hears back from all replicas that a delete operation was successfully processed. That basically means if all replicas have removed the key and added to their remove set, that this key was deleted at a certain time. If the coordinating node hears back from all replicas, this means that we know that all replicas agree that this key was deleted. Therefore, we can remove off the element from the remove set. This way we can keep cleaning up the remove set, as we know that all replicas agree that a key has been deleted.

For the remaining cases, we need to keep the information around in the remove set because we don't know if some replicas have processed the delete or if they have not yet. For these cases, we can have background threads on the Dynomite processes, which constantly look into the remove set to keep making sure that all replicas agree that they are in sync. If they're not, repair them, and subsequently remove the information from the remove set. To make it easier for the background threads, we can maintain the remove set as a sorted set, so that all the oldest keys would be in the back of the sorted set, and the background thread can start looking from the back to make sure it deletes the oldest metadata first. The add set, should go without saying, is maintained as a hashmap. Because as we add keys, we want to make sure that all metadata update operations are just 0 of 1. We don't want to sift through complicated data structures on the write path.

What would this Lua script look like? The first thing it does is it's very simple. It just takes the timestamp, checks if the timestamp of this operation on this key is old or not. What it does, it takes the timestamp of the operation, checks against the timestamp for the same key within its add and remove sets. If it's an old update, it discards it. If it's a new update, it basically updates the data and the metadata. It does all of this atomically per replica.

Currently, repairs are implemented for point reads, which are basically, get this key. Get me this field from this hashmap. What is the score of this member in the sorted set, and so on? For the remaining, we would rely on asynchronous repairs or background repairs.

Let's talk about background repairs. I want to point out that this is still some ongoing work. It's basically the same as read repairs but instead of doing it on the read path, we do it in the background. Why do we want to do this? Repairing on range reads is quite expensive. When you have operations that basically say, give me all the members of the set, or show me everything in this hashmap, or return me a subset from the sorted set. If you have millions of items in a set or a hashmap, trying to repair all of them on the read path would be prohibitively expensive. We try to do them asynchronously.

The next problem is when we want to do asynchronous repairs, how do we know what keys need repairing and what keys don't need repairing? Let's look at what another system, Cassandra, does. It basically does a full key walk, which is manually triggered. How that basically works is it maintains Merkle trees to maintain hashes of keys. Using Merkle trees, it detects inconsistencies across replicas. If it detects any inconsistencies it goes and repairs them. This is quite slow and expensive. Always in a system, it's only a subset of your keys that would need repairing and the rest of them do not need repairing. Doing a full key walk is quite slow and complicated. Also, trying to follow the same thing in our scenario would be quite complicated, because representing Merkle trees in Redis could get quite complicated. Lastly, this needs to be manually kicked off. Can we do something better, where we don't need manual intervention?

The next thing we can do is maintain a list of recently updated keys. We keep maintaining this list. We have a background thread that's constantly going, running merge operations on them in the background. This way, we don't need to have a manual repair process, the repair is always happening in the background without any human intervention. We know that merge operations on large structures are expensive. If we have a few elements of a set, or maybe one element of a set that's updated, we don't want to run a merge operation on the entire set, or the entire hashmap. We just want to make sure that those few mutations have made it across the system. Can we do something even better?

Delta-state CRDTs

Here, we enter delta-state CRDTs. This is inspired by a paper of the same name. Basically, what it states is we can maintain a list of mutations done to keys instead of just the names of the keys. We maintain a list talking about, this operation was done on this key, and this was the result of that operation. Applying these mutations have to follow the same three properties that I mentioned before: they have to be associative, they have to be commutative, and they have to be idempotent. Instead of shipping the entire local CRDT state across for a merge operation, we only ship the delta-state. We can keep confirming which replicas have applied our mutations that we've shipped, and we can move on. We don't need to send each mutation one by one. We can send them in batches.

What do I mean by, ship only the delta-state? I'm going to use the PNCounter as an example, because that's the easiest to understand. Let's say we have a two replica system with the value of the counter as 0, currently. It has one in the positive counter and one in the negative counters. We try to increment it. We increment the two locally. While it replicates, we send the entire state of the counter. This is basically sending the full state of the counter to the other replica. This is not necessary. We don't need to send the full state. We can only send the delta-state, which is saying, "I received an operation, and it changed my positive counter from 1 to 2." That is all the state we need to make sure that the point gets across to the other replicas. This is a very simplified example to show you what shipping the full state versus shipping only a delta-state is.

How would this work in the background? Each coordinating node would maintain a list of recently done mutations. This list keeps growing as writes happen. We can batch multiple mutations together to send them across replicas. Now we're batching two mutations. We send them to replicas R-2 and R-3. They both applied locally, but only one of the acknowledgements make it back. The coordinating replica marks its local state saying, R-2 has seen these mutations but not R-3 yet. Even though R-3 may have seen it, it needs to send it again to make sure that it's applied the state. Because of the idempotence property, it doesn't matter if we send the same states again. The end result is going to be the same. We can see that both replicas in the system have acknowledged these writes. We can garbage collect those items from the list and move on to the next.

What are some challenges with getting the delta-state CRDTs to work? The first thing is to make sure not to lose writes. We have to make the list of mutations durable, which means that we have to make sure that we have some way of recovering these writes in case something goes wrong. What are the challenges with making it durable? Redis is an in-memory datastore. The primary advantage of using a system like Redis is the low latency offered by having data in-memory. If we were to persist a list like this for every write, it completely negates the benefits provided by Redis.

Since this is still some ongoing work, the current thinking is for use cases that are ok with losing a few seconds of write, they would use non-quorum connections to Dynomite. In the worst case, they would lose a few seconds worth of writes. For all other use cases, they have to use quorum connections, which mean that if a client receives an acknowledgment for a write that means it has been applied across a quorum number of replicas in the cluster.

Another challenge is, what is the practical overhead of maintaining a large list of mutations? What if one replica is down for an extended period of time? Every coordinating node would have to maintain a very large list. These are the challenges that we are still thinking through. Another challenge is, when we introduce gossip into the system, which means the cluster can scale or shrink without any downtime. How would that affect how this looks, because we have new replicas coming to the system, old replicas leaving? These are the problems that we're trying to work on and try to solve in the coming months.

Questions and Answers

Participant 1: Am I correct in my understanding that this is essentially a way to clusterize your Redis without necessarily clusterizing it by simply running this thing next to it and with Redis running in essentially standalone mode?

Mukil: Basically, yes. Dynomite just takes non-distributed data key value stores like Redis and it can make them distributed. In this case, it makes them distributed in a highly available fashion.

Participant 1: What about the performance? Redis is very well known for being super fast. What's the performance hit that you are imposing on it by using CRDTs?

Mukil: I have not included benchmarks in this because this work is brand new. I didn't want to add numbers without having tested it properly. We will follow with a blog post with extensive numbers. To give you a basic understanding, without CRDTs and without quorums, we get the same performance as you would with Redis. With quorums, our 99th percentile is about 1 millisecond if you have quorums within a single region. If we add CRDTs, there's definitely some overhead to it. I don't have specific numbers, but from preliminary testing, we see that it's about 20% to 30% overhead. I'll be releasing benchmarks in a blog post soon.

Participant 2: I did not really understand the reasoning behind the process of when you remove K, why was it that it was removed from the add set, as opposed to waiting for garbage collection, given that the timestamp is also stored for this operation?

Mukil: Basically, the remove set, we could also think of it as a tombstone set. It's a way to denote that this key has been deleted on this replica at this time. The reason we want to maintain this tombstone set is because when a delete operation is received by a replica, we don't know if all replicas have seen it yet. This replica, the only thing it's doing, is saying, "I have deleted this key at this time." If anybody asks me, when I'm running a merge operation, I need to give them this information saying that this key was deleted at this time. While running a merge operation, we can decide if this delete was the latest operation or not. That's primarily why we need to maintain this metadata around.

Participant 2: I didn't follow why the add part portion was removed from the set as well?

Mukil: Because at that point, the key no longer exists. The add set is only there to maintain the latest timestamps for updates of keys that are present in that replica.

Participant 3: How frequently do you get into an inconsistent state? Because it seems there are some situations where you have to use the last updated time. You said that there was clock skew, even if you try to keep the clocks in sync. If you have clock skew, and you use last latest, you must have inconsistent state?

Mukil: That basically boils on to the point of, are we ok with using inconsistent clocks? At Netflix, we have multiple datastores, and many of them use this notion of last writer wins. We currently support use cases that are ok with having some level of clock skew. Since we are aggressive with our NTP time syncs, the clock skews won't be very large. For example, I think many years back we had a situation where clocks were skewed by four, five hours, which is really bad. In our situations, we probably would get into clock skews of, in the worst case, a few seconds. Currently, this system support use cases that are ok with that level of clock skew. If they need some stronger guarantees, then they have to go to systems that support those, things like Spanner, and so on.

Participant 4: You talked a lot about replicating datasets across multiple regions. Have you explored using CRDTs to create consistent objects from multiple event streams with different partition keys?

If you had an event stream of taxis and riders, the order is guaranteed on those two streams but it's not guaranteed when you combine them. If you use a CRDT maybe that's one way of creating an object state that's consistent regardless of the order.

Mukil: We've not done that yet. We initially started with supporting registers, sorted sets, hashmaps, but we've not gotten to other data types, lists included. I know Redis has a recent implementation for streams. We've not started supporting those yet. That's some of the future work that will happen.

Participant 5: At the start of the talk you spoke on sharding with Dynomite. Does Dynomite with CRDTs support sharding well? Are there additional concerns there with sharding?

Mukil: The sharding is decoupled from the CRDT story. The sharding, basically, all it does is it takes the name of a key, runs it through a hash function, and sends it to the node that should own that key, basically. You can assume if your tokens range from 0 to 100, and you have a three node ring, all the keys that hash to the token 0 to 33, would fall on node 1, and so on. The CRDTs are a lower level implementation. They're decoupled.

Participant 6: Curious from your work. It sounds like a lot of the work that the Riak community has also done on CRDTs. I was just curious how that had influenced your choice to build a novel database layer.

Mukil: For us, it was more about, we have a system like Dynomite. We have hundreds of use cases on it. When we need to make it more resilient, how do we go about doing it? We looked at multiple systems, like Cassandra, Riak included. Obviously, some of the inspiration has come from how Riak has implemented things inside. This was not to create a new datastore which does something different. It was more to support our existing use cases and make sure we can onboard more to our system.

Participant 7: At what point do you want to bring CRDTs? If you stay with this in the same availability zone like the partition, like very rare. It's obviously if you want to maintain state, globally, across the globe, you have to have them. At what point would you have a few in the same region, but different zones?

Mukil: It depends on the deployment. We can have multiple replicas within the same region or across regions. CRDTs would only look at how many replicas do we have in the system.

Participant 7: At what point do they become necessary, because if you have it within the same availability zone, you can use old-school methods to maintain consistency.

Mukil: We would still like to be as fast as possible. When you say the old-school methods, like last writer wins, and so on? We're still using that but in a CRDT setting. The way the Redis data types are laid out, it's just natural to use CRDTs. It's not like we're going out of our way to do this. We have to do some work to do anti-entropy, and using CRDTs just felt more natural in the system.

Participant 4: Do you pre-compute the answer within Redis to enable reading directly from Redis? Or do you have to go through Dynomite?

Mukil: When you say pre-compute the answer?

Participant 4: You showed the counter has a more complex metadata data object, but the answer itself is 2.

Mukil: That's what I meant by using Lua scripts. We can push down the logic to Redis by having Dynomite just take Lua scripts, parse in certain parameters from the query, and have Redis do the operations for us. All the state is actually within Redis and the logic is being run within Redis. All Dynomite is doing is providing the write scripts to Redis to make sure it does the write operation.

Participant 4: There's no resiliency or high availability reason to be able to read the answer directly from Redis if Dynomite failed? You're expecting that Dynomite is always working.

Mukil: Exactly. You could always connect to the Redis process on the same instance. If you do that, you can read the same data that is on that instance. The reason you would go through Dynomite is because Dynomite would shard the key. If it's not the owner for that key it would send it to the right node and so on. In the worst case, yes, we can read directly from Redis. Typically, everything just goes through Dynomite.

Participant 4: Then my second question was around the developer experience. In a situation where you have concurrent values with siblings, have you found that developers using the system find this understandable or are there other challenges that come from reading concurrent results?

Mukil: When you say concurrent, you mean conflicting results. We solve those cases ourselves using last writer wins, because when clients talk to Dynomite, they expect to talk to it as though they are talking to Redis. When we have conflicting values for a key, and when a client says get me the value for this key. It's not natural to send back two values and say, "You choose," since we want any Redis client to be able to talk to us. We use last writer wins to resolve it ourselves before parsing up the value.

Participant 4: My understanding is in Riak, they had some challenges with the developer experience where it's possible to read a value, write your new value, and then read the old value. They included the option of returning a sibling as well to make it more understandable. I didn't know if that was a cause for confusion?

Mukil: For us, the guarantees we say is, if you are within the same session, you will be able to read your writes, within the same session. If a client does a write to one node, breaks a session, connects to a different node, and tries to read it. We don't guarantee that you will read the write that you just made. Developers basically use the system knowing these trade-offs and guarantees.

Participant 8: How do you compare a system with CRDTs like a multi-master with overhead to a single master system, which is, write to one master, but read from other replicas?

Mukil: The challenge with that typically becomes availability. What if the master goes down? Then we can have some process to elect a new master. With a highly available system like ours, the reason we went with this model is most of our microservices act as distributed microservices. They have instances running all over the world. They want to consistently have access to data. In this world, if you have a master-slave model, then we would have services in Europe try to talk to the master in U.S. East, which could be very slow. For our use cases, having a highly available setup like this seemed more natural. It's more useful for the developers that way.


See more presentations with transcripts


Recorded at:

May 13, 2020