Facilitating the Spread of Knowledge and Innovation in Professional Software Development

Write for InfoQ


Choose your language

InfoQ Homepage Presentations CockroachDB: Architecture of a Geo-Distributed SQL Database

CockroachDB: Architecture of a Geo-Distributed SQL Database



Peter Mattis talks about how Cockroach Labs addressed the complexity of distributed databases with CockroachDB. He gives a tour of CockroachDB’s internals, covering the usage of Raft for consensus, the challenges of data distribution, distributed transactions, distributed SQL execution, and distributed SQL optimizations.


Peter Mattis is the co-founder of Cockroach Labs where he works on a bit of everything, from low-level optimization of code to refining the overall design. He has worked on distributed systems, designing and implementing the original Gmail back-end search and storage system at Google and designing and implementing Colossus, the successor to Google's original distributed file system.

About the conference

Software is changing the world. QCon empowers software development by facilitating the spread of knowledge and innovation in the developer community. A practitioner-driven conference, QCon is designed for technical team leads, architects, engineering directors, and project managers who influence innovation in their teams.


Mattis: Today I'm going to walk you through some of the high-level components of the CockroachDB architecture, and at times diving down into some of the details. I first want to give you the elevator pitch for CockroachDB: Make Data Easy. This is actually the mission statement at Cockroach Labs. Our thesis at Cockroach Labs was that too much of a burden has been placed on application developers. Traditional SQL databases didn't provide horizontal scalability or Geo distribution of data and NoSQL databases which promised a horizontal scalability, they've required you to give up transactions, and indexes, and other goodies that made application development easier. So, we came up with CockroachDB.

CockroachDB is a geo-distributed SQL database. By distributed, we mean it's horizontally scalable to grow with your application. It's geo-distributed to handle a data center failure. We place data near where it's being used and we also push computation closer to your data.

SQL's the lingua franca for rich data storage. There's still contention of whether SQL's hard to use or not, and yet it is known by almost everyone. SQL provides schemas, and indexes, and transactions, and these all make your life as an app developer easier.

As Wes introduced me, my name's Peter Mattis, I'm the CTO and co-founder of Cockroach Labs - that's the company behind CockroachDB. I've been in the data storage industry for a little over 20 years, which shocks me when I think about it. Some of the highlights of my career: I built the original search and storage engine for Gmail back in Google, that's almost a lifetime ago. I also worked on Google's second-generation distributed file system called Colossus. I've been working on CockroachDB for the past five years. It´s my obsession right now, I love it.

Here's the agenda for today. There's actually quite a bit to cover. We're going to go pretty fast, so let's just jump right in.


There are many places we can start at covering the architecture of CockroachDB, and I'm going to build things from the bottom up. The bottom is a distributed, replicated transactional key-value store. I'm going to start right out today and disappoint you, this key-value store is not available for your use, it's purely for internal use, I'm sorry. There are many reasons for this, but we really want to be able to tailor and evolve this key-value store for the SQL layer that sits on top and focus all our energies on making the SQL exceptional.

Where to start with is distributed, replicated transactional key-value store? The first thing to notice is that it contains keys and values, and keys and values are arbitrary strings. At this level, the keys and values don't have any structure. At higher levels, I'll talk a little bit later about how some structure is imposed upon them. Everything's ordered by key in the key-value store. We use multi-version concurrency control, and what that means is the keys and values are never updated in place. Instead, in order to update a value, you write a newer value and that shadows the older versions. Tombstone values are used to delete values. What this provides is a snapshot view of the system for transactions. We describe CockroachDB as having a model with the key space; that means there aren't separate key spaces used by different tables, it uses one big key space and we just do tricks with how we structure the keys.

That's a little bit of a depiction of what the monolithic key-space looks like. Here, I have a set of keys, these are just dogs. Some of them are names of dogs of people who work at Cockroach Labs. It's monolithic key-space, it's ordered by key. For many of the remaining slides, I'm just going to be showing you keys and not keys and values. In reality, behind the scenes, there are values, the values are versioned. I'm not showing that just for clarity, reality is always more complex than presentations like this.

The key space is divided into 64-megabyte ranges. Sixty-four megabytes is chosen as an in-between size where it allows ranges to be moved around and split fairly quickly, but large enough to amortize an indexing overhead, which I will talk about shortly. I should mention that ranges don't occupy a fixed amount of space; they only occupy the space that they're consuming and they grow and they shrink as data is added to them and deleted from them.

There's an indexing structure that sits on top of these ranges. The ranges don't just float out there and we can't find them; we have to be able to find a range. This indexing structure is actually stored in another set of ranges. These ranges are stored in this special part of the key space known as the system key space within CockroachDB. This actually presents a little bit of a chicken and egg problem; how do we find these indexing ranges? Well, there's another index that sits on top of that. This forms this two-level indexing structure, and if any of you are familiar with Bigtable, or HBase, or Spanner, this is exactly analogous to what's on there.

I should mention that this structure here, this order-preserving data distribution, is a very conscious design decision. It poses quite a bit of complexity on the CockroachDB internals, yet it's extremely important. If we had chosen something like consistent hashing, we wouldn't be able to implement full SQL. You'll see a little bit about why that is later.

By having this fully ordered key space, we can do range scans that span ranges. Here's a simple example of that. If I want to search for the dogs with names between Muddy and Stella, I can go to the index. I see that this corresponds to ranges two and three, and then I go right to ranges two and three and jump into the data there. Something I forgot to mention earlier is that the ranges themselves, their key-value data, it's stored down on a local key-value store and we use RocksDB for that purpose.

Transactions are used to insert and delete data into ranges. I'm going to go into the transaction details more a bit later in this talk. Right now, it's just a simple example. If we're trying to insert data into a range, we see this key Sunny, we go into the indexing structure, we see this corresponds to range three. We go to range three and we can insert the data, and then the insert is done. If the range is full, what happens? We go to Insert, we see it's full, and we split the range. Splitting the range involves printing a new replica, a new range, moving approximately half the data from the old range into the new range, and then updating the indexing structure. The way this update is performed, it's using the exact same distributed transaction mechanism that I was using to insert data into range itself.


I said this is a distributed, replicated transactional key-value store, now let’s talk about replication. We use Raft for replication, Raft is a distributed consensus protocol similar to Paxos. Paxos is notoriously hard to implement, and I chalked some of that up to some of the early descriptions of the algorithms. Having implemented Raft and lived with Raft for the past five years, I can tell you the implementation of Raft is no walk in the park either, it's quite challenging and complex. Each range in CockroachDB is a Raft group. I want to reiterate that. The unit of replication is a range. Sixty-four megabytes of data at a time are replicated and kept in consistent. This is important. It allows easy data movement within the cluster. It also allows us to tune and control the replication factor on a per range basis.

CockroachDB defaults to three replicas per range, but it's configurable and we actually default to a high replication factor for some important system ranges, notably the indexing ranges I talked about earlier. Finally, I should note, some people wonder, "Can I have a range that only has two replicas?" In fact, it doesn't actually make sense. The reason it doesn't make sense is because of how consensus works. Consensus requires a strict majority. If you see this example right here, there are three replicas depicted in this Raft group, and in order for consensus to be achieved, I need a strict majority, which would be two of them. If I only had two replicas in a range, the majority is two and I have actually achieved nothing except reducing my availability.

I like to describe what Raft provides, what consensus protocols provide, as providing atomic replication. Commands are proposed into Raft, and once they are written to a majority of the replicas, they're atomically committed. It doesn't matter which of the majority writes them; any of the majority write them. In this case, if I'm writing to three replicates here, as soon as two have written it, it is committed. It's available, the system crashes, comes back up. We're only going to make progress if those two are still available.

Commands in Raft are proposed by the leaseholder and distributed to follower replicas. I'll get into the details a little bit later. They're only accepted when a quorum of replicas has acknowledged receipt. If you're familiar with Raft, you might be slightly confused right now because I'm using this term leaseholder, and Raft talks about having a Raft leader. They are actually separate concepts inside CockroachDB, but they're almost the same. For most of the talk, I'm just going to talk about leaseholders. Raft leader is the thing that actually proposes leaseholders and optimization, which I will touch on shortly, help optimize reads. For the most part in CockroachDB, they're exactly the same replica. They're almost never different, we actually have a background process to make sure they're the same.

Talking about range leases, this is where the concept of leaseholders comes up. Why range leases are important is, we want to be able to do reads in CockroachDB without going through a consensus round trip. Without range leases, in order to do a read, I'd have to talk to a quorum of the replicas in the range. The latency for the read would be determined by doing that quorum read. We'd also be doubling or tripling the amount of the traffic we send over the network, and all of these stinks. What we'd like to do is be able to read from a single replica. You can't read from an arbitrary replica, though; that arbitrary replica might be behind, but there is a special replica within the Raft group and that's essentially the range leader or the Raft leader or the leaseholder, and that leaseholder which has been coordinating all the writes knows which writes have been committed. We send all the reads to the leaseholder, and the leaseholder can then handle the read without consensus. In addition to performing this quorumless reads, the leaseholder coordinates writes. This is important because it actually comes up in our serializable isolation level in terms of key range locking and some other functionality.

I talked about replication, I talked about keys and values, let’s move on to distribution. In a Cockroach cluster, you have more than just three nodes or four nodes. You have many nodes and we can place these replicas wherever we want in a cluster. Where do we place them in? This is the replica placement problem. It's actually a hard problem. CockroachDB uses four signals in order to drive the replica placement heuristics: space, diversity, load, and latency. I'm not going to talk much about space, it’s the easiest one. It can be summed up as that we try to balance space across the cluster just to get the most disk utilization.

Let's start with diversity. We all have heard diversity and this term used throughout the industry recently. We know diversity improves teams, improves companies, it also improves availability for ranges. Diversity comes into play in replica placement in that we want to spread replicas across failure domains. What is a failure domain? It's a unit of failure, a disk, a machine, a rack in a data center, a data center itself, a region. California can be hit by a meteor, we want to be able to survive that. I actually don't know if we can survive that, but the idea is real. What CockroachDB tries to do is, it looks at these failure domains and it tries to just spread the replicas across them. If we didn't take failure domains into consideration, the silly extreme is that we could end up placing all the replicas for a range on a single node and that's obviously bad. Rather than having any sort of fixed hierarchy here, this is something that's user-specified, so it can be customized here at deployment.

The second heuristic that we use, the second signal for replica placement, is load. The reason load is important as a heuristic is just balancing on space, balancing on diversity, and spreading the ranges throughout the cluster. It doesn't actually balance on load. The first place this comes up is leaseholders. There's actually an imbalance between leaseholder and follower replicas. The leaseholder, by performing all the coordination for writes and performing all the reads, has a significantly higher network traffic, as well as CPU usage than the follower replicas. If we weren't taking care of to try to balance the leaseholders throughout the cluster, we could actually end up having a one or a few nodes in the cluster having all the leaseholders and having a severe imbalance. We actually saw some practice before we implemented this heuristic.

The second place this comes up is that all ranges aren't created equal. I'm showing here an example where this blue range up here is having higher load than the other ranges in the system. CockroachDB notices this, it actually measures per range load, and by measuring that per range load, it can spread ranges and spread the load across the cluster. In this case, the blue range ends up on nodes by itself. An example of how this might actually occur in practice is you might have a small reference table that's being accessed on every query, or if you have a few hot rows in your system, the ranges those rows exist on would be very hot.

Finally, CockroachDB is a geo-distributed database, and with geo-distribution comes geographic latencies. Over short distances, geographic latencies can be tens of milliseconds and over longer distances, they can be hundreds of milliseconds. We need to take this into consideration during replica placement. We actually want to place data so that it's close to where the users are and move data around where it's close to being accessed. There are two mechanisms inside CockroachDB. What I'm depicting here on this slide, is we take that same dog's data that we had before, we can actually divide it up and put a prefix on the keys saying, "Here we had European dogs, we had East Coast dogs, we have West Coast dogs."

Once that's done, each of those ranges, we can apply the administrator where the application replica can apply constraints to the table and preferences saying, "I want the European dogs, those ranges, those replicas, to be housed in a European data center. I want the West Coast dogs to be in a California data center," etc. Those are the manual controls that we have over replica placement to improve latency. There are also automatic controls that take place. We track on a per range basis, where in the system reads are coming from and then practically move data towards where those reads are happening. That's called follow the workload, because it's meant to adjust to changes in the workload over the course of a day or over the course of a week.

What I described previously was replica placement in a static cluster, but clusters aren't static. If we add nodes to clusters, we remove nodes from clusters, nodes permanently fail or sometimes they temporarily fail. We need to make all of this easy. That's our mission statement: make data easy. It should be seamless when a node gets added, that administrator doesn't have to do anything. What happens when you add a new node to the cluster? The cluster automatically sees this node and starts rebalancing replicas onto it, and it just uses the previously described replica placement heuristics to decide which nodes to move. What does this look like? We first add an additional replica to the range and then we delete the old replica from a node. It's composed into these two steps. In order to move a replica, it's an add and delete operation.

Another common occurrence is there's a permanent node failure. This happens, you run on thousands of machines and nodes will go down permanently and never come back. When this happens, the system notices that that node is gone, and the ranges which had replicas on that node will actually start to create new replicas on other nodes and remove the replicas from the failed node. We're not able to talk to the failed node, so the failed node doesn't even know this happened. In some sense, permanent failure is just like an excessively long temporary failure and that's how the system treats it.

There's also temporary node failure, and this is actually the much more common occurrence. Permanent failure, happens, but it's rare. More commonly, you have temporary node failure. The most common reason for this might be upgrading your system, or the other reason, and this is going to be exceptionally hard for you all to believe, is that there's a bug in CockroachDB, it crashes and the node has to restart. That is a super short temporary failure. Even during that short period of time, a minute, or whatever, the replicas that are on that node can fall behind, and when they fall behind, we have to catch them up when the node comes back up. How does this catch up occur?

There are two mechanisms inside Raft that we utilize to catch up replicas. One of them is we can send a complete snapshot of the current range data, and if the node has been down here for minutes and there's been significant churn on the range, this is the right thing to be doing. You send all the data and the replica is now up-to-date. What happens if the node was only down for a handful of seconds, and the ranges are only behind by a few records. What do you do then? The other mechanism just sends this a log of the recent commands to the range and you can replay those commands, and for a handful of records, that's the right thing to do. Now, which one do we use? That is just careful balancing of heuristics within CockroachDB. We look at how many writes occurred while the replica was unavailable, while the node was down, and we determine whether it's better to send a snapshot or better to send a log of commands.


I talked about replication, and distribution, keys and values. This is a transactional system. I want to give you a bit of flavor of what transactions in CockroachDB mean. ACID, you probably all have seen this: atomicity, consistency, isolation, and durability. Transactions in CockroachDB are serializable. We implement the serializable isolation. Isolation levels in databases and in general are this deep and frequently confusing topic, and I'm not going to get into all the details about them today. Serializable isolation is actually an intuitive isolation level. It's the one you might imagine, if you're to think what the isolation between transactions should be, it would be serializable. The transactions and serializability are run as if in some serial order. We don't actually know what that serial order is, but you could actually lay out what that serial order is. There is one, it's a gold standard isolation level. It's not actually the strongest isolation, but it's right near the top. There are tons and tons of weaker isolation levels, and we embrace serializable isolation as part of our mission to make data easy.

Weaker isolation levels are a frequent source of bugs, and sometimes they're bugs that application developers don't even know about. You don't even realize you're messing up; you just have a bug and can later be exploited. Some research at Stanford actually showed this. A year or two ago, they made a paper called ACIDRain that showed how many websites have bugs due to weaker isolation levels.

Transactions in CockroachDB can span arbitrary ranges. There are no limitations; it's not like you have microtransactions or something like that. They're also conversational, and this is important, the full set of operations for transaction is not required upfront. Why this is important is because frequently, people who are application developers write into a SQL database, they'll start a transaction, read and write some data, data they've read, they apply some application logic to it, and then they write the transaction and back out. That back and forth is called the conversation of the transaction, and some other systems don't apply that. It's hard to do, it complicates our life, and yet, it's important to provide a full SQL database.

I'm going to dive into some of the details of our transaction implementation. Transactions provide a unit of atomicity, and in a distributed database there is a question of how do you actually provide that atomicity for the transaction overall? We bootstrap transaction atomicity on top of the atomic replication that Raft provided. Raft, remember, provides atomic replication of a single record within a range where I can write that record, it's atomically replicated. We're going to use that functionality and we bootstrap our transaction atomicity on it, and that's done by having a transaction record. Every transaction has a transaction record. That transaction record is just a key-value record like any other in the system, and we atomically flip it from pending to commit in order to commit the transaction.

Let's walk through an example of this, we'll make some of this a little bit clearer. What I'm showing here is we have a cluster with four nodes. There are three ranges spread across these four nodes, and this is our dog's data set again. We have this query, we're inserting two rows into this table. That SQL query will be sent to a SQL gateway, which is just any node in the system and SQL execution will decompose the query into a set of KV operations. This starts out saying, we're beginning a transaction and we're writing the key Sunny. When this occurs, the gateway talks to the leaseholder of the Sunny range and the first thing it does is, it creates a transaction record. That transaction record is always created on the same range as the first key written in the transaction. That's done for locality purposes. The transaction starts out in a pending state. Something I didn't show there which is important is the transaction record is actually replicated just like everything else, it's just not depicted here.

After writing the transaction record, the leaseholder then proposes a right of the Sunny key and this is done by sending that write, that command, to the followers as well as to itself, and the Sunny key is in process at this point. One of the followers replies, says, "Yes, I wrote that to disk," as well as the leaseholder also acknowledges the write to disk, and then we move on to the next write. Now we're writing Ozzie.

Something I should point out here is that we only required a quorum of the replicas to respond to this write to the Sunny key. Another thing I should point out here is I still have Sunny highlighted in yellow, and the reason it's highlighted in yellow is that other transactions can't see this key at this point. This is part of what isolation's about. Until the transaction is committed, another transaction shouldn't be able to read the Sunny key. This is implemented internally, there are markers associated with each key, and the marker is actually the idea of the transaction that is writing that key.

Moving on, a very similar process happens with Ozzie. We send the Ozzie to the leaseholder, the leaseholder proposes Ozzie to the followers, one of them replies, and then we acknowledge that KV operation back to the SQL gateway. At this point, the SQL gateway marks that the transaction record is committed, and this commit was a replicated commit, and we did a Raft write there. Once that's acknowledged, the SQL statement is done, and we reply, and that's it.

I left out a whole number of details there, and that's because it's just a high-level architecture talk. I didn't talk about read-write conflicts, I didn't talk about write-write conflicts, I didn't talk about how we handled distributed deadlock detection, and I didn't talk about large transactions. All that stuff is actually implemented in CockroachDB, it is just there's only so much detail I can go into in an overview talk.

One thing I want to highlight is that you're probably noticing there, there's a number of round trips that take place during a transaction, handling a transaction. What I described was the original transaction protocol implemented by CockroachDB. We've slowly been evolving that protocol over time to make it faster and to reduce the effect of network round trips. What we call this new version is pipelined. What was previously described, that's the serial model of transactions, but the pipelined one is what we're currently using.

Let me just step through that same example again, but look at the round trips that are involved. With the serial operations of transactions, we wrote the transaction record, waited for it to write. Then we wrote Sunny, waited for it to write, that round trip. For the pipelined, we actually just write Sunny; we don't even write the transaction record. The reason this is safe is a little bit subtle. We actually give transactions a grace period so that if another transaction encounters the key Sunny shortly after it's been written, it has a small grace period where we consider it still pending.

Then we write the key Ozzie. The thing I want you to notice here is on the serial operations, we wrote Ozzie and then we waited for it. But on the pipelined ones, we actually started writing Ozzie before Sunny even returned. Sunny's still in flight, still being written, now we're writing Ozzie as well. Finally, we write the transaction record, we get to the commit. Something that's subtle here is that the transaction record in both cases, we include a list of the keys that were touched by the transaction. This is necessary in order to do this cleanup operation of all the markers on those keys. It also allows something else. In pipelined transactions, those keys allow anybody who comes across the transaction record to go out and determine, is the transaction committed or aborted? When is the transaction committed or aborted? In pipelined, it's committed once all the operations that were involved in the transaction complete.

Essentially, what we did is we took what was previously a centralized commit marker indicating the transaction was committed, and replaced it with a distributed one. As you might imagine, this is complex and it's challenging to get this right. We actually took the effort to model this with TLA+ and join the cool kids club when we did that. It was a fun experience to do that, the engineers involved failed to give them a reassurance that they caught everything correct here. I should note that we've been evolving the transaction protocol over time, and what I just described is the protocol that's going to be in CockroachDB in the version released in the fall. The code is written, it just hasn't been released yet.

SQL Data in a KV World

I started out describing CockroachDB as a geo-distributed SQL database, and all I've been talking about is key-value stuff so far. Where does SQL come in? Well, now it's time to talk about SQL. How many of you feel like you know SQL pretty well? Wow, that's impressive. I had the same thought and then I got to working and implementing SQL database and it exposed the vast gaps in my knowledge. We're not going to get into those gaps today, I just want to say that it's fascinating. You get into implementation and you learn what you didn't know - I didn't know a lot. Let's get started.

Those of you who are familiar with SQL will recognize this. It's a declarative language, not imperative. What I mean by that is you specify the results you want, but not the sequence of operations for how to get those results. This is kind of great, it's powerful. It's also confusing because very frequently, it's "How do I structure my SQL appropriately?" But what it does is it gives the SQL database a ton of freedom in deciding how to implement the SQL queries. That's also a ton of burden, but we feel that's where the burden belongs. It belongs in the SQL database, not in the application developer.

Data in SQL is relational. We have tables composed of rows and columns. The columns are typed, they have types like integer, float, and string. We have foreign keys to provide referential integrity, which is useful both for data correctness, as well as it provides optimization opportunities for the optimizer. I should mention that CockroachDB SQL is full SQL, it's not half SQL or light SQL. We actually implement the PostgreSQL dialect, more or less compatible just with a few esoteric edge cases, and we also implement the PostgreSQL SQL wire protocol. There are pretty many drivers for every language out there in existence.

"Wait a minute," you say, I talked about keys and values and keys and values were just strings, and columns have types. Whoa, what do we do? At first glance, this is kind of like this severe impedance mismatch, but it's not that bad. The question though, is how are we going to store this typed columnar data in keys and values? Let me give you a flavor for how this is done. This is actually a low-level detail, but it's useful to explain at a high level, just to make clear that this is possible. We have this example here, it's an inventory table. The inventory table has three columns: ID, which is the primary key, name, and price. Inside CockroachDB, every table has a primary key. If you don't specify a primary key, which is something that's valid to do in SQL, one is created automatically behind the scenes.

Every index inside a table, including the primary index, creates another logical key space inside CockroachDB, and a single row in that index creates a single record. In this case, we just have the primary index so each row will create a single KV value. The way this works for the values is straight forward. The non-indexed columns here, name and price, they get placed in the value. The way this is done isn't super interesting. We could have used something like protocol buffers, it could be JSON - that would be somewhat expensive - it could be Avro. We actually have a custom encoding because this is performance-critical code and we want to make this very efficient. It's not that surprising that it can be done, the surprising part is how we encode the keys. Unfortunately, I'm not going to get into the details. If you're really interested, come talk to me afterward, I'll point you to where this is done in the code.

Essentially, we have this problem. We have this ID column; it's an integer and we need to encode it into a key in such a way that the integer values are the same as the string values. I'm just going to wave my hands at this point. This is possible to do, it's possible to do with [inaudible 00:31:15] arbitrary tuples, and this is not something the CockroachDB invented. I first learned about this technique at Google years ago. It's not something Google invented, I think I saw a paper from the late 1980s that describes this. I'm not even sure that's the first time it was mentioned, but it's generally possible to do. What you're seeing here, though, when I'm showing /1, /2, and /3, isn't actually the physical encoding of the key, that's just a logical representation of the key. That's actually what we print out in debug logs and other places.

We support more than just a single index and more than just a single table. How does that take place? We actually prefix these keys with a table name and the index name. What does that look like for an inventory table? We have a prefix of inventory primary and all these keys. Now, you're probably thinking, "Oh, my goodness, that is quite expensive. We're replicating this inventory primary, and just storing the small /1, /2, /3 in each of these keys." Underneath the hood, we're not actually putting names there, we actually store table ID and index ID. The reason we're using IDs is A, they're smaller and B, it allows us to do very fast renames of tables and indexes. Then down at the lowest level inside RocksDB, there are key prefix compression that takes place, which removes some of the remaining overhead.

What does this look like though, this key encoding, if we had a secondary indexing? Here I've added an index on the call name, it shows up here. Something I want you to notice is the key doesn't just contain the name call; it actually still contains /1, /2, and /3. Why is that? The reason is this is a non-unique index, and a non-unique index means I can insert multiple rows that have the same name column. Here I've added another row, and this contains a duplicate of the bat name. We have two bats, one for $4.44, one for $1.11.

What does this translate into in terms of a key? Well, we have a key and the suffix of the key is /4. What you're seeing here has actually made the keys unique, and the way I made them unique was to use the columns that are present in the primary key. We know the columns that are present in the primary key are unique because that's by definition; primary keys are unique. It's a unique index on the table. This is just a very quick high-level overview of how the SQL data mapping occurs.

SQL Execution

SQL execution - there's actually a relatively small number of SQL operators, relational operators. We have projection, selection, aggregation, join, scanning from a base table, and there's order by, which is actually technically not a relational operator, but most people think it is just for the service. These are specified in SQL queries via select, where, group by. We have join, and you intersect, and you specify tables using the from clause.

These relational operators are part of relational expressions and all relational expressions have zero, one, or two inputs. There's usually often a scale or expression that sits off on the side. An example of that is the filter operator; a filter operator has a child input operator feeding it rows, and it has a scalar expression, which is the filter that's being applied to decide if a row should be emitted on the output. A query plan is just a tree of these relational expressions. What SQL execution does is it takes query plan, and runs the operations to completion.

Let me give you a simple example, a flavor of how this works. You have an inventory table, and I'm just going to scan it, and filter, and return the names of inventory items that start with B. The way we break down and do basic SQL execution is you start with the table, that's the base of the operation. We're scanning the inventory table. The rows that are output from the scan operation are sent into a filter operation and the filter operation looks at each row and just is applying the scalar expression. Filter the rows, and the output from that is sent to a projection operation and that projects out the name columns so that's all that remains in the output, and then we send the results to the user.

If you haven't been exposed to SQL execution before, you're, "Shouldn't there be more to it than that?" At some level, there isn't anything more to it than that. It's actually fairly straight forward. There are complexities with regards to hindering the generality. You have user-defined schemas, you have arbitrary types, and whatnot, and yet I want to shrug and say at some level it's just a small matter of programming to handle all that stuff, easy peasy. It's a little bit more than a small matter of programming, but it's not conceptually difficult. There is efficiency that comes into play. Something that happens here is that we don't actually want to do a table scan on each operation. Now, I want to tie this down a little bit about how this works at KV level. We did the scan to the inventory table, we did a whole table scan, and it will be correct SQL to always do a full table scan. It'll also be incredibly inefficient, so what you actually do is you push the filter down as low as possible. Here, we can push the filter down. If we added an index on the name column, we can push it all the way down so that we're only scanning a small subset of keys. We still send the output from the scan onto the project, and then we get the results.

There are two large areas of concern for SQL execution. The first is correctness. There's just a ton of bookkeeping, an absolute ton of bookkeeping inside SQL to handle the generality and handle all the semantics. We have user-defined tables and indexes and foreign keys and all that lovely stuff. We also have the semantics of SQL, and some of them are well-understood and some of them are a little bit more esoteric. The biggest stumbling block in terms of semantics, both for SQL optimization and for SQL execution is the handling of NULLs. I just want to point this out and highlight it because the bane of the existence of database implementers is NULL handlings; it's the most frequent source of bugs in SQL execution, in SQL optimization. I'm not going to get into the details.

The other area of concern is performance. I kind of touched on this already with the scan example. We have three main tactics for SQL performance. One is tight, well-written code; you don't do allocations. You try to avoid allocations in every row. You use good data structures, you try to be frugal with CPU, avoid redundant operations. The other is operator specialization, which I'm going to get into in just a moment. Where this comes up is that there's oftentimes a general way to implement an operator, and you have to implement that general way. For example, aggregation has a hash group by operator and I'm going to walk through an example of that. It's also a specialized operator that can be used, a streaming group by operator, and a streaming group by operator can be used if the input is assorted on the grouping columns. There's just a ton of these operator specializations that are used in the special cases when they apply. Another good example of operator specialization is that we implement hash join, merge join, lookup join, and zig-zag join. They're used in different cases where appropriate. Lastly, we distribute execution, push it down to data, as close to data as possible.

Let me work through an example of group by. Here we have a query, I was reading from customer's table and doing an aggregation, a group by operation on country and emitting a count of number of customers per country. I have this sample data here. We have two users in the United States, two in France, and one in Germany. One thing you should notice is right now the data is sorted on name, so we must add some index on name. What the hash group by operator does is it maintains this in-memory hash table and says, "Consuming the rows of input from its input source, it's taking that grouping column, country in this case, looking up in the hash table and incrementing the count."

I'll just walk through how this works, it's really straightforward. We're getting through the France users, we aren't omitting France, but then we get to another United States user and we have to go back and increment the count for the United States. It's really nothing more complicated than that. There are, again, details in terms of generality and handling arbitrary types and tuples of grouping columns, but this is more or less it. Now you might be thinking, "That seemed relatively straightforward. How do we make this better?" Hash tables are fast, it's old one lookup. What could be faster than a hash table? The answer is, don't have a hash table at all.

How do we get rid of having a hash table? Let's take our sample data again and let's imagine it is sorted on the grouping column. How did it get sorted? We can sort it ourselves, but more likely, there's an index on the country column and that index on the country column allowed us to read the data just already sorted on country. Now, what happens? There was something obvious but useful; all the groups are now just contiguous. As we walk through the groups, as we move from one group to the next, we can just omit those rows. We don't have to maintain a hash table any longer, we just have to maintain a single variable, which is the name of the group we're in.

We walk down through the data again, we get through our France users, and now we can omit the France row. We get through our German users, we can omit the German row and we get through our United States users and we're done. I just want to highlight, one of the big pieces of work that takes place at SQL execution, is all these specialized operators. Streaming group by versus hash group by is one of the easier ones. We have the zig-zag join I mentioned earlier, which is more complex. There's also work on future optimizations and specializations such as a loose index scan, which is something that we discovered by paying attention to what other SQL databases do.

Onto distributed SQL execution. CockroachDB is a geo-distributed database. In a geo-distributed database, network latencies and network throughput are important considerations. What we try to do in order to handle and avoid those network latencies, is we want to push fragments of computation for the SQL query as close to the data as possible. Let's walk through what this looked like for the streaming group by.

Revisiting that streaming group by example, if we're scanning over the customers' index - actually the country index for the customers' table - we would actually instantiate three different processors, one in each of the data centers that contained fragments of their customer's table. The output from that scan would be sent to a local group by operator, [inaudible 00:42:10] data centers. What we do is we're doing the scan, we're doing the group by, and it's all just local to the data. There's nothing being sent over geographic distances. If the query had come into the East Coast, we're then doing this final aggregation of the data to the East Coast data center. What's being sent from the network here is vastly smaller than the original customer's data. It's only a handful of rows.

SQL Optimization

Onto SQL optimization - I mentioned this earlier, SQL's declarative; a SQL database has many possibilities, many choices, for how it chooses to execute a SQL query. It's the job of the SQL optimizer to choose from all these logically equivalent query plans and decide which one to execute. This is the depiction of the high level of what it looks like to process the SQL query. The first step is we take the textural SQL query, we parse it. The output of parsing is an abstract syntax tree. That abstract syntax tree is then sent into the stage called prep. What prep does is it prepares the query. It translates the abstract syntax tree into a relational algebra expression tree. This is what I mentioned earlier and what essentially SQL execution can process.

AT this stage we do semantic analysis such as resolving table names into table IDs, reporting semantic errors. We also fold constants; we check column types. There's some other stuff that takes place here as well, other cost-independent transformations. The output of the prep stage is something called a memo, and a memo isn't something I get to talk to you about today other than it is this fantastic data structure which everybody who comes across it in the course of working on SQL optimizer is, "Holy crap, this is amazing." Essentially, the memo is able to represent a forest of query plans. Instead of having one query plan that a query represents, it stores an entire forest of query plans and it stores it compactly so that I can explore alternative query plans efficiently. The memo is sent into the search stage, and it's called the search stage because we're searching for the best query plan in this large forest. What the search stage does is it repeatedly applies cost-based transformations to the query. The output of the search stage at the very end is we choose the lowest cost query that we've determined and we execute it.

Let me get into a little bit more detail about what cost-independent transformations look like. These are transformations that always make sense. Some examples of these are constant folding, that's the canonical example. I always want a full constant upfront instead of doing that constant folding repeatedly on every row as you execute a query. There's also filter pushdown, decorrelating subqueries. Inside CockroachDB, we actually call these transformations normalizations because they essentially normalize fragments to the query into some common pattern.

These transformations, when they can be applied, they're always applied. That's where it means to be cost-independent. The way this is actually implemented inside CockroachDB is kind of cool. There is a domain-specific language for transformations. It's a tiny language that specifies patterns through match against fragments to the query. That DSL is compiled down to code which officially matches the patterns within the memo and then applies the transformations. Currently, I just counted this morning, and there's approximately just a little over 200 transformations defining the code. We add more in every release. The vast majority of them are cost-independent, and there's a smaller number which are cost-based.

Let's see a simple example of a cost-independent transformation. Here we have a query; we're joining two tables, and then filtering. The initial plan that's constructed, the initial transformation from the AST to a relational algebra expression, has us doing a scan of the two tables, then we join them, then we filter, and then we had the results. That filtering column is actually on both sides of the join, and this is one thing that the DSL implements, it allows both matching on the shape, as well as various properties of the tree. We can actually push the filter before the join. This is a good idea because the filtering operation is really fast, and the join operation is relatively slow. The estimated cost per join is dependent on how large the data coming into it is. We also always want to do this filter pushdown. This also allows for further transformation to be applied again. We then later try to push it down and did scan operation.

Cost-independent transformations are no big deal. Almost every SQL database influences cost-independent transformations, but cost-based transformations are where the real money is. These are transformations that aren't universally good. Some examples of these are index selection and join reordering. Because the translations aren't really good, it brings up the question of how do we decide whether to apply the transformation or not? This is where the memo comes into play. You can't really decide on transformation by transformation basis, whether to apply it. You actually have to apply it and keep both the untransformed and the transformed query. This actually creates a state explosion. This is one of the huge engineering challenges of a cost-based optimizer, is how to handle that state explosion. The memo data structure keeps the memory usage under control, but then there's also just a lot of fine engineering to make this really fast.

At the end of the cost-based transformation process, we estimate the cost of each query. I don't have time to go into costing in detail, but basically, you look at table statistics, and using those table statistics you build from the leaves of the query up to the root, and are looking at the cardinality of the inputs to each operator. Based on that cardinality, you're making some estimate of the cost of how long it's going to take to execute the query, and then you choose the one with the lowest cost.

I'm going to go through an example of this for index selection. Index selection is actually affected by a number of factors. It's affected by the filters and the query, filters that are present in join conditions. There might be a required ordering in the query that affects index selection, such as an order by. There might be an implicit ordering in the query, such as a group by, where if we read order data from the lower level, we can actually use a streaming group by operator. There's covering versus non-covering indexes, which I won't talk about, and then there's locality as well, which comes into play, the locality of where the data's stored.

Let me give you an example of how required orderings affect index selection and why it's cost-based. You probably all know, sorting is relatively expensive, and yet sorting could be the better option if there are very few rows to sort. Here you have a query, it's reading from this table, it's filtering on a column, and it's sorting on a different column. The naive plan is to scan the primary index, filter, then sort. If we have an index on X, you might be able to push the filter all the way down into the scanned and then we can scan on index X, and sort. This is almost certainly better than the first plan. If we have an index on Y, it's possible we can scan on Y and then filter. Now, the question comes up, which of these last two plans is better? Let me just quickly run through what this looks like. Imagine the output of the filter is only 10 rows. Sorting 10 rows is very efficient. That's really the best thing to do, especially if there are 100,000 rows in the table. The filter might actually output 50,000 rows and sorting 50,000 rows is going to be super expensive. In that case, it's better to actually get the sorted data and then filter.

Locality-Aware SQL Optimization

The last topic I'm going to cover is locality-aware SQL optimization. For geographic database, network latencies are important. What we can do is we can duplicate certain read mostly data in a system, and then partition that data into various localities. What does this look like? Here I'm depicting a reference table, the postal codes table. What we have here is we actually have three indexes on this table. We have a primary index and two secondary indexes, and that storing syntax is basically saying that these secondary indexes store exactly the same data as the primary index. Then we can use replication constraints on these indexes to say all the ranges in the primary are stored in U.S. East; all in the European index and the U.S. West index also have their ranges constrained to different localities.

When a query comes into the system, we know which locality the query is coming in from and the cost-based optimizer takes locality into account as cost model, and preferentially, it can choose any of these indexes to read from. They all contain the same data and it'll choose the index that is in the current locality. This is a very simple description of how this works, yet this is a general mechanism that the cost-based optimizer can use whenever you actually have data that's replicated like this.

Here’s a very quick review. We talked about all this stuff. At the base, there's a distributed, replicated transactional key-value store which has monolithic key spaces broken into 64-megabyte ranges. We use Raft for replication. There are various replica placement signals: space, diversity, load, and latency, that we utilize to decide where replicas should reside inside a cluster. Transactions implement serializable isolation. The operations are pipelined. On this key value store, we map SQL data. SQL execution involves a lot of bookkeeping, a heavy dose of performance, and we have specialized operators. Distributed SQL execution pushes fragments of SQL computation as close to the day as possible, and finally, we have an optimizer that ties all this together. At this point, we've wrapped up, you guys are all certified to now go implement a distributed SQL database.


See more presentations with transcripts


Recorded at:

Sep 09, 2019

Hello stranger!

You need to Register an InfoQ account or or login to post comments. But there's so much more behind being registered.

Get the most out of the InfoQ experience.

Allowed html: a,b,br,blockquote,i,li,pre,u,ul,p

Community comments

Allowed html: a,b,br,blockquote,i,li,pre,u,ul,p

Allowed html: a,b,br,blockquote,i,li,pre,u,ul,p