BT

Facilitating the Spread of Knowledge and Innovation in Professional Software Development

Write for InfoQ

Topics

Choose your language

InfoQ Homepage Presentations In-Process Analytical Data Management with DuckDB

In-Process Analytical Data Management with DuckDB

Bookmarks
28:11

Summary

Hannes Mühleisen discusses DuckDB, an analytical data management system that is built for an in-process use case. DuckDB speaks SQL, is integrated as a library, and uses query processing techniques.

Bio

Hannes Mühleisen is a creator of the DuckDB database management system and cofounder and CEO of DuckDB Labs, a consulting company providing services around DuckDB. He is also a senior researcher of the Database Architectures group at the Centrum Wiskunde & Informatica (CWI). Hannes is also Professor of Data Engineering at Radboud Universiteit Nijmegen.

About the conference

Software is changing the world. QCon empowers software development by facilitating the spread of knowledge and innovation in the developer community. A practitioner-driven conference, QCon is designed for technical team leads, architects, engineering directors, and project managers who influence innovation in their teams.

Transcript

Muhleisen: Welcome to my presentation, in-process analytical data management with DuckDB. My name is Hannes Muhleisen. I'm one of the creators of DuckDB. I'm speaking to you from the distant past of approximately 4 weeks ago. What motivated us to start building a new database system in the first place? It was this man, and it was this quote. The man is Hadley Wickham. He is one of the leading figures of the R community. He has said, if your data fits in memory, there is no advantage to putting it in a database, it will only be slower and more frustrating. That really hurt our feelings because I'm from a background, I'm a database researcher. For somebody influential like him to call the fruits of our labor, frustrating and slow, was of course a bit of a challenge. We started thinking about, what is it actually that makes databases slow and frustrating? The first thing that we could find was the client-server model. If you're doing data analysis, pushing a lot of data into a database management system from an application, or pulling a lot of data from a database management application system into, let's say, an analysis environment like R or Python, is really slow. We started digging a bit and tried to find out when this architectural pattern of client-server really came about, and found that it is something that Sybase apparently invented in 1984.

Then we looked a bit into how these systems communicate with each other. We found that they're using these ancient client protocols. We actually wrote a paper about it some years back, where we compare the database client protocols with various data management systems. Here, the plot, I should explain, we measure the time it takes to transfer a fixed dataset between a client program and these various database systems on the left here. The benchmark is the Netcat utility, which you can just use to basically byte dump the same dataset in CSV format over a network socket. That took 10 seconds. If we transfer the same amount of data with MySQL, it took 100 seconds. If you transfer yet again, the same amount of SQL with Hive, or MongoDB, it took over 600 seconds. That was pretty damning. The client-server model seems to be a bit problematic, especially given the client protocols that we have. We were able to get somewhere in the area of Netcat after some work. It was still not really nice.

SQLite

Then we thought, maybe we can learn something from SQLite. Because the most widely deployed SQL system in existence, with over a trillion installations, according to the authors, is SQLite, just literally everywhere. It is. Right now, you're probably interacting with tens or hundreds of SQLite instances without realizing it. What's interesting about SQLite is that it is in-process, which is a very radically different architectural style when building data management systems, which basically works by linking a database management system into a client application directly, instead of relying on the client-server model. Basically, now you can send data back and forth in the same memory address space, which is beautiful, because if you can just share pointers back and forth, it means that you don't actually have to copy and serialize a lot of data over sockets, which is what the problem was with these client-server things. In-process is really a great idea. However, SQLite is not a system that is built for large scale data analysis. It is built for transactional workloads. The author themselves says it's built as an fopen replacement, because too many people are storing application logic in flat files. I really agree with that notion. For analytics, it's not really great.

DuckDB

This is why we started writing a new database management system a couple of years ago called DuckDB. DuckDB really had to start from scratch in its implementation, because the in-process world is rather different than the client-server world. It's a rather harsh environment. DuckDB was written from scratch. It's written in C++. It is a SQL system, so you can speak complex SQL to it. We call it the SQLite for analytics sometimes. It has some quite interesting internals. It uses, for example, a vectorized execution engine. Also, importantly, it isn't free and open source project, it is MIT licensed, which is a very permissive license. The project is governed by a foundation, which is funded by donations instead of the usual VC game. Why is it called DuckDB? DuckDB is called DuckDB because I used to have a pet duck named Wilbur. He has since left us but his legacy lives on in the name of our database management system.

How does this look like if I interact with DuckDB? Here's an example from Google callup. It's really very simple. In this very short script, has three lines in total, import the DuckDB system as a Python package. I load an extension to be able to talk to HTTPS resources. Then I just read a Parquet file from a URL. It's a Parquet file that has some user data in it. I convert it back 2DF. We can see some interesting things here. First of all, there's a lot of magic hidden in this little 2DF at the back there, because what happens here is that we transfer a result set, in this case it's only 10 rows, but it could be millions of rows into a DF, which is a Panda's DataFrame. Because we are in the same address space, we don't have to serialize anything or transfer anything over a socket, we can simply dress up the data bit differently, share some pointers in the same address space and move on. This makes this very fast. DuckDB already, as you can see from the example, has native support for Parquet files. It's the new CSV, and therefore DuckDB supports it natively. You can also see this load httpfs call up there, which shows you how DuckDB can be extended with plugins because there's a bunch of plugins, like the HDFS extension that you can just install, add to your thing. This is an example for users of DuckDB from Python.

We also have a command line client. We actually spend a lot of time on the command line client. It has things like query autocompletion, and color for SQL syntax highlighting. In this example, I'm just starting a DuckDB shell from my computer. I'm just reading the same Parquet file directly. What you can also see there, if you look at the query, and you're a bit aware of how SQL works, this would normally not work in a SQL system where you say SELECT * FROM userdata.parquet, as userdata.parquet is of course not a table it is a file. How does this work in DuckDB? This table should not exist yet. Yet it is a Parquet file that exists in this directory, so we have this thing where if we fail to find a table with a specific name, we go searching for other things with that name, like for example, Parquet files. This is why we find the file, and we can directly run queries on it. This is an arbitrary query.

In-Process Analytics

If you think about this from more of the architectural perspective, we actually accidentally invented a new class of data management systems, which is the in-process analytics, or in-process OLAP. OLAP and OLTP stand for different sectors of the SQL market. OLTP is transactional processing, which is more about the transactional use case, the traditional database use case, where it is a lot about updating single rows and reading single rows. The OLAP, the analytical processing stands for crunching through large portions of the table preparing reports. SQLite is an in-process system, but it is meant for OLTP. If you think about a traditional client-server architecture for OLTP, what comes in mind is Postgres because it's the undisputed champion of open source, at least OLTP.

Then on the OLAP side, there have been a couple of client-server systems. I think ClickHouse is maybe the most well-known. There has never been a in-process OLAP system before DuckDB came around. I think we are the first in-process OLAP SQL system. We were excited when we realized that. Other people have also been excited. It's been pretty crazy the last couple of months, DuckDB has enjoyed a constant rise in popularity. Here, we just had a tweet from Peter, where he wished us a happy 2023. DuckDB crossed at that point 1 million downloads per month for PyPi, which is of course only one of the environments where we exist. Here, we can also use the commander interface. We also integrate a bunch of other things like JavaScript, R, Java, ODBC, you name it. There's also been some developments on the commercial front. A new company called MotherDuck has sprung up from the name. You will already guess that it has something to do with DuckDB. This is a company that has raised money to build a service offering around DuckDB. We're quite excited about that.

Technical Perspective of DuckDB

Now let's try to go a bit more into the innards of DuckDB. What makes DuckDB special from a technical perspective? To do that, I'd like to start with a fairly simple query. I'm going to walk you through the various stages in which DuckDB goes through in processing the answer for this query. If you don't know SQL, then I shall explain what it does. If you know, you can just read the query. We have a query here, it selects a name and a sum from the JOIN of two tables. We have the table called customer, and the JOIN, a table called sales, for sales maybe. They have a common column called cid, maybe customer ID. We want to basically compute the amount of revenue per customer. We want to sum up all this revenue per customer, but we also need to add the tax for each transaction. That's just a bit of a constructed example here, but it serves the purpose later. When we run this query, basically, the system will join these two tables. We gather customers then based on equality on this cid column. Then we will compute the revenue plus tax projection. Then later we will do the GROUP BY, which is a grouped aggregation by the customer ID, where we compute the first name, which is essentially the name and the sum of revenue plus tax.

It's pretty straightforward and simple. You can imagine as these things become hideously complicated, this is just as an example. If you look at what DuckDB does with this, of course, this goes through the regular standard phases of query planning, query optimization, physical planning. Next, we will split up this query planning to what we call pipelines. This query has actually three pipelines. What is a pipeline? A pipeline is something that can be run in a streaming fashion. When does a thing stopped being a streaming fashion? They stop being a streaming fashion when they hit something called a breaking operator, a pipeline breaker. These are operators that need to see the entire input before continuing. Here we have the first pipeline. It will scan the customer table, and it will actually build a hash table. The hash join is split into two phases. One is the building of the hash table on the one side of the JOIN, and the other one is the probe, which you see on the right side of the JOIN. The build of the hash table will need to see all data from the left-hand side of the JOIN. In this case, we need to run through the entire customer table and read all of this into this HASH JOIN BUILD phase until we're done with this pipeline.

Once this pipeline is done, we can schedule pipeline 2, which is bigger because it has more streaming operators, because we can basically scan the sale table, look into the hash table that we've built before, to find JOIN partners from the customer table. Project the revenue plus tax column there, and then run the aggregate. Obviously, the aggregate itself is also a breaking operator, because it needs to also see the entire input before it can continue. We run the GROUP BY BUILD phase. Then we're done with this pipeline. Once everything is finished, we can schedule pipeline 3, which is very simple. You read the results of the GROUP BY and we output a result. This is so far pretty standard stuff. I think, many database systems out there, look at the pipelines when they do the query planning.

Row-at-a-time

How does this then actually work during execution? Traditionally, we have something called the Volcano model, which basically is a sequence of iterators. Every operator essentially exposes an iterator and has as input a bunch of iterators. What happens here is that we start trying to read something from them. At the top of this pipeline here, which is this GROUP BY BUILD phase where we cannot read anything yet, because nothing has been read, so then it starts reading from its child operator, which is projection, and there's nothing there yet. That one in turn, starts reading from its child operator, the HASH JOIN PROBE, that in turn starts reading from the sale table. Sale table produces a tuple, in this case, the tuple is 42, 1233, 422. These are the three columns for the ID for the revenue and the tax. Then, once we have done the hash, so then that one tuple moves up to the HASH JOIN PROBE. The HASH JOIN PROBE has built a hash table, so it, for example, knows that ID 42 stands for ASML, the company, and it will output a new row that has the join result which is basically the name which is ASML, and the two other columns unchanged, 1233 and 422.

Then, this row goes in the next operator, which has the projection, which essentially adds up the two last columns. We get a new row that is outputted by this operator with ASML and 1355. Then that row goes into the GROUP BY BUILD, finally. This is very common, like everyone uses this approach. Postgres uses it, MySQL uses it, Oracle uses it, SQL Server uses it, SQLite uses it. This tuple-at-a-time, row-at-a-time is really great for transactional use cases, because you only ever look at single rows. The execution engine there is usually built around an interpreter, which is really great, because then you don't have any upfront costs in running queries. On the flip side is that this creates a lot of overhead if you want to do analytical processing, because you can imagine that this constant switching around between operators and iterators, and child operators and parent operators and different things to do, creates a lot of overhead. There's one possible improvement also in the literature, and in the real world, which is to basically JIT compile this entire pipeline, and that is certainly an option. There's other options, as we will see.

Here, if you look at how such an operator, for example, here the projection, because it's a streaming operator that's fairly simple, could be operated. It's really simple. We have this row coming in. On the right, we have some pseudocode here, so input.readRow. We read a row of input. Then of course, the first value in this row is unchanged, so we'll just assign in zero to add zero. Then the second entry in the output will become the result of the addition here, where we do an integer addition on the second and the third value of the input. Then we write this to the output. This is very simple. It's nice to implement. The problem is that this creates a function call for every value that you read in, in typical implementations, and that can be very slow.

Vector-at-a-time

This is why an improvement over this row-at-a-time or tuple-at-a-time is this vector-at-a-time model, which was first proposed by actually my colleagues, Peter, Marcin, and Niels back in 2005. This model, basically is based on the idea that we can process not single values at a time, but short columns of values in one go. This is what we call these vectors. They are vectors, because instead of looking at one value of each row-at-a-time, we're looking at a number of values for each column at a time. What that does, it amortizes the interpretation overhead. We can still do this type switching, but we do the type switching based on a vector of values instead of a row of values. It's a compromise between columnar and row-wise execution. Columnar is, of course, more efficient, but has memory issues. Vectorize is a compromise where we just limit the size of the columns to something small. We don't need a JIT. We have really great performance. It's really competitive with JITing the whole query plan. We also get something that is called cache locality, because cache locality is actually super critical.

This is a pretty famous chart. It comes from the Google people like Peter Norvig, and Jeff Dean, who have popularized these numbers. The interesting thing from our perspective, is the difference between the L1 cache reference, which is the top left there, which is 0.5 seconds, and the main memory reference, which is 100 nanoseconds. That's a factor of 200, or something like that. It's interesting, because the L1 cache reference has become 200 times faster since 1990. The memory reference has only become twice as fast. What this basically means is that we have a factor 200 between reading something that is in our L1 cache versus reading something that is in main memory. Which is why we want all the things we operate on to fit in this CPU cache. This is exactly what vectorized query processing is beautiful art, is where we change our operators slightly. Here's the same projection of this addition, revenue plus tax. We get these three vectors of values as input instead of a single row. We output two vectors of values. If you look at the right, you can see the implementation, where we will read, instead of a single row, we will read a chunk, which is a collection of small vectors of columns. The first vector is unchanged, so we'll just assign it to the output again. We'll create a new result vector. Then we have a range. This range goes from 0 to 2048. We'll essentially call an addition on every individual value. By doing that, we can generate very efficient code. We can have a compiler that autovectorizes the code and inserts special instructions automatically. Also, can avoid the function call overhead by basically doing our interpretation, our switching around data types, operators, and so on, only on a vector level instead of a value level. That greatly decreases the overall overhead. That's vectorized processing.

Exchange-Parallelism

Of course, vectorized processing being fast on one CPU is not enough, we also have to be good on multiple CPUs. How can we do parallelism? Here, traditionally, the great Goetz Graefe has written in the Volcano paper, the same paper that described the Volcano style query processing that I mentioned earlier. In that same paper, he also described the exchange operator parallelism. The way this works is, on the left, you see a representation of a plan. The way this works is, we will read from three partitions at the same time. We will apply filters, and we will pre-aggregate these values. Then we will hash the values, split them up based on the values of the hash, do another aggregation, do a re-aggregation, and then combine the output. Then we have basically parallelized most of our query here. You can see, for example, Spark does this. You can see here in this plan representation for this fairly simple query here, it has, above the scan level here, this bottom FileScan Parquet thing. We have the hash aggregate that does a partial_sum. Then we do another [inaudible 00:22:04] partitioning, and then we do a re-aggregation that actually computes the sum. That's all well and good, but it's actually quite cumbersome, in general. The reason why people have designed query engines with an exchange operator is because what it allows you is to keep your individual operators like the aggregate, like the JOINs, unaware of parallelism. It has been shown to be a problematic approach.

Morsel-Driven Parallelism

The model approach of doing parallelism in SQL engines is called Morsel-Driven Parallelism. Where, similarly to before, we split up our scans on the input level, where we have a partial scan. In this case, on the second pipeline of our example query, we have a partial scan. One on sale, which you can imagine reads the first half of the table. Then we have a partial scan number two on the same table that reads the second half of the table. We will do the HASH JOIN PROBE exactly the same, because we just broke the same hash table from two pipelines. Then we do a projection that is independent. Then we both sync all these results into the GROUP BY operator which is our pipeline breaker, our blocking operator. You see no exchange operator here. The difference from the exchange operator based model, the traditional model, is that the GROUP BY is now aware of the fact that parallelization is happening, and can internally deal, efficiently deal with the contention that arises from having different threads, reading groups that could collide. It's entirely possible if one thread reads the first half of the table, and the other thread reads the second half of the table that the rows collide, groups collide.

The way that this is done in Morsel-Driven Parallelism is that, on the very left side here, we have this first phase where every individual thread starts pre-aggregating their values. We have these morsels, those are these splits of the hash table. Those individually get built into hash tables. There's the second phase where we aggregate, but we do this partition wise. In the local hash tables, we can already split the data based on the radixes of the group keys, so that we will basically create individual hash tables that we know cannot have keys that are present in any of the other hash tables, which allows us when we later have read all the data, and in the phase 2, we have to finalize our hash table and our aggregates, it means that we can basically pick from each participating thread. We can pick the same partition and schedule more threads that read them all. This is fairly complicated compared to a normal aggregate hash table. What it allows us is to have this Morsel-Driven Parallelism model where the operators are aware of the parallelism and allows them to not deal with the problems of the exchange operator, but efficiently build an aggregation over multiple inputs.

Quick Benchmark

What does this all buy us? Quite a lot, actually. I've done a quick benchmark here for this talk. I've taken our example query. It's slightly more complicated, I just added an ORDER BY and a LIMIT. We select the name and the sum of revenue plus tax from these two tables, customer JOIN sale, and group them by the customer ID. I have thrown two tables into this with 1 million customers and 100 million sales entries. It's not so crazy, it's about 1.4 gigabytes of CSV data. On the right you can see some results. DuckDB can finish this query on my laptop in 0.5 seconds. We have Postgres, after some optimization by me, being able to do this in 11 seconds. We have Postgres on the default settings doing this in 21 seconds, so a factor of 40, because of all these optimizations. Of course, it was a bit of an apples to oranges comparison, because Postgres is focusing on an OLTP workload. That hasn't stopped people before.

Summary

I hope I've given you some idea about how DuckDB works. We call it a SQLite for analytics, how it is designed, what it does, why it was designed? What it really provides is the state-of-the-art data engine in quite a tiny package. It is a library you link just to your process. It is fairly small, has no dependencies. You can just add it to your application, and you have a state-of-the-art SQL engine. I've talked about how in-process is so powerful because we can transfer result sets to our client in a very efficient way. We can also write data to our database in a very efficient way, trying to fix Hadley's slow and frustrating thing. I've explained how vectorized query processing works, which is a way of amortizing interpretation overhead during query execution. We have efficient in-cache operation and don't incur any of the function call overhead that otherwise we would suffer from. I've tried to explain to you a bit about the parallelism model that DuckDB uses, which is this Morsel-Driven Parallelism that allows you to efficiently parallelize over an arbitrary number, of course, while being aware of multi-core.

 

See more presentations with transcripts

 

Recorded at:

Feb 28, 2024

BT