BT

Facilitating the Spread of Knowledge and Innovation in Professional Software Development

Write for InfoQ

Topics

Choose your language

InfoQ Homepage Articles In-Process Analytical Data Management with DuckDB

In-Process Analytical Data Management with DuckDB

Key Takeaways

  • DuckDB is an open-source OLAP database designed for analytical data management. Similar to SQLite, it is an in-process database that can be embedded within your application.
  • In an in-process database, the engine resides within the application, enabling data transfer within the same memory address space. This eliminates the need to copy large amounts of data over sockets, resulting in improved performance.
  • DuckDB leverages vectorized query processing, which enables efficient operations within the CPU cache and minimizes function call overhead.
  • The use of Morsel-Driven parallelism in DuckDB allows efficient parallelization across multiple cores while maintaining awareness of multi-core processing.

Why did I embark on the journey of building a new database? It started with a statement by the well-known statistician and software developer Hadley Wickham:

If your data fits in memory there is no advantage to putting it in a database: it will only be slower and more frustrating.

This sentiment was a blow and a challenge to database researchers like myself. What are the aspects that make databases slow and frustrating? The first culprit is the client-server model.

When conducting data analysis and moving large volumes of data into a database from an application, or extracting it from a database into an analysis environment like R or Python, the process can be painfully slow.

I tried to understand the origins of the client-server architectural pattern, and I authored the paper, "Don’t Hold My Data Hostage – A Case For Client Protocol Redesign".

Comparing the database client protocols of various data management systems, I timed how long it took to transmit a fixed dataset between a client program and several database systems.

As a benchmark, I used the Netcat utility to send the same dataset over a network socket.

[Click on the image to view full-size]


Figure 1: Comparing different clients; the dashed line is the wall clock time for netcat to transfer a CSV of the data

Compared to Netcat, transferring the same volume of data with MySQL took ten times longer, and with Hive and MongoDB, it took over an hour. The client-server model appears to be fraught with issues.

SQLite

My thoughts then turned to SQLite. With billions and billions of copies existing in the wild, SQLite is the most extensively used SQL system in the world. It's quite literally everywhere: you're daily engaging with dozens, if not hundreds, of instances unbeknownst to you.

SQLite operates in-process, a different architectural approach integrating the database management system directly into a client application, avoiding the traditional client-server model. Data can be transferred within the same memory address space, eliminating the need to copy and serialize large amounts of data over sockets.

However, SQLite isn't designed for large-scale data analysis and its primary purpose is to handle transactional workloads.

DuckDB

Several years ago, Mark Raasveldt and I began working on a new database, DuckDB. Written entirely in C++, DuckDB is a database management system that employs a vectorized execution engine. It is an in-process database engine and we often refer to it as the 'SQLite for analytics'. Released under the highly permissive MIT license, the project operates under the stewardship of a foundation, rather than the typical venture capital model.

What does interacting with DuckDB look like?

import duckdb
duckdb.sql('LOAD httpfs')
duckdb.sql("SELECT * FROM 'https://github.com/duckdb/duckdb/blob/master/data/parquet-testing/userdata1.parquet'").df()

In these three lines, DuckDB is imported as a Python package, an extension is loaded to enable communication with HTTPS resources, and a Parquet file is read from a URL and converted back to a Panda DataFrame (DF).

DuckDB, as demonstrated in this example, inherently supports Parquet files, which we consider the new CSV. The LOAD httpfs call illustrates how DuckDB can be expanded with plugins.

There's a lot of intricate work hidden in the conversion to DF, as it involves transferring a result set, potentially millions of lines. But as we are operating in the same address space, we can bypass serialization or socket transfer, making the process incredibly fast.

We've also developed a command-line client, complete with features like query autocompletion and SQL syntax highlighting. For example, I can initiate a DuckDB shell from my computer and read the same Parquet file:

If you consider the query:

SELECT * FROM userdata.parquet;

you realize that would not typically work in a traditional SQL system, as userdata.parquet is not a table, it is a file. The table doesn't exist yet, but the Parquet file does. If a table with a specific name is not found, we search for other entities with that name, such as a Parquet file, directly executing queries on it.

In-Process Analytics

From an architectural standpoint, we have a new category of data management systems: in-process OLAP databases.
SQLite is an in-process system, but it is geared toward OLTP (Online Transaction Processing). When you think of a traditional client-server architecture for OLTP, PostgreSQL is instead the most common option.

Figure 2: OLTP versus OLAP

On the OLAP side, there have been several client-server systems, with ClickHouse being the most recognized open-source option. However, before the emergence of DuckDB, there was no in-process OLAP option.

Technical Perspective of DuckDB

Let's discuss the technical aspects of DuckDB, walking through the stages of processing the following query:

[Click on the image to view full-size]

Figure 3: A simple select query on DuckDB

The example involves selecting a name and sum from the joining of two tables, customer, and sale that share a common column, cid. The goal is to compute the total revenue per customer, summing up all revenue and including tax for each transaction.

When we run this query, the system joins the two tables, aggregating customers based on the value in the cid column. Then, the system computes the revenue + tax projection, followed by a grouped aggregation by cid, where we compute the first name and the final sum.

DuckDB processes this query through standard phases: query planning, query optimization, and physical planning, and the query planning stage is further divided into so-called pipelines.

For example, this query has three pipelines, defined by their ability to be run in a streaming fashion. The streaming ends when we encounter a breaking operator, that is an operator that needs to retrieve the entire input before proceeding.

Figure 4: First pipeline

The first pipeline scans the customer table and constructs a hash table. The hash join is split into two phases, building the hash table on one side of the join, and probing, which happens on the other side. The building of the hash table requires seeing all data from the left-hand side of the join, meaning we must run through the entire customer table and feed all of it into the hash join build phase. Once this pipeline is completed, we move to the second pipeline.

[Click on the image to view full-size]

Figure 5: Second pipeline

The second pipeline is larger and contains more streaming operators: it can scan the sales table, and look into the hash table we've built before to find join partners from the customer table. It then projects the revenue + tax column and runs the aggregate, a breaking operator. Finally, we run the group by build phase and complete the second pipeline.

Figure 6: Third pipeline

We can schedule the third and final pipeline that reads the results of the GROUP BY and outputs the result. This process is fairly standard and many database systems take a similar approach to query planning.

Row-at-a-time

To understand how DuckDB processes a query, let's consider first the traditional Volcano-style iterator model that operates through a sequence of iterators: every operator exposes an iterator and has a set of iterators as its input.

The execution begins by trying to read from the top operator, in this case, the GROUP BY BUILD phase. However, it can't read anything yet as no data has been ingested. This triggers a read request to its child operator, the projection, which reads from its child operator, the HASH JOIN PROBE. This cascades down until it finally reaches the sale table.

[Click on the image to view full-size]

Figure 7: Volcano-style iterator model

The sale table generates a tuple, for example, 42, 1233, 422, representing the ID, revenue, and tax columns. This tuple then moves up to the HASH JOIN PROBE, which consults its built hash table. For instance, it knows that ID 42 corresponds to the company ASML and it generates a new row as the join result, which is ASML, 1233, 422.

This new row is then processed by the next operator, the projection, which sums up the last two columns, resulting in a new row: ASML, 1355. This row finally enters the GROUP BY BUILD phase.

This tuple-at-a-time, row-at-a-time approach is common to many database systems such as PostgreSQL, MySQL, Oracle, SQL Server, and SQLite. It's particularly effective for transactional use cases, where single rows are the focus, but it has a major drawback in analytical processing: it generates significant overhead due to the constant switching between operators and iterators.

One possible improvement suggested by the literature is to just-in-time (JIT) compile the entire pipeline. This option, though viable, isn't the only one.

Vector-at-a-time

Let's consider the operation of a simple streaming operator like the projection.

[Click on the image to view full-size]

Figure 8: Implementation of a projection

We have an incoming row and some pseudocode: input.readRow reads a row of input, the first value remains unchanged, and the second entry in the output becomes the result of adding the second and third values of the input, with the output then written. While this approach is easy to implement, it incurs a significant performance cost due to function calls for every value read.

An improvement over the row-at-a-time model is the vector-at-a-time model, first proposed in "MonetDB/X100: Hyper-Pipelining Query Execution" in 2005.

This model processes not just single values at a time, but short columns of values collectively referred to as vectors. Instead of examining a single value for each row, multiple values are examined for each column at once. This approach reduces the overhead as type switching is performed on a vector of values instead of a single row of values.

[Click on the image to view full-size]

Figure 9: The vector-at-a-time model

The vector-at-a-time model strikes a balance between columnar and row-wise executions. While columnar execution is more efficient, it can lead to memory issues. By limiting the size of columns to something manageable, the vector-at-a-time model avoids JIT compilation. It also promotes cache locality, which is critical for efficiency.

The importance of cache locality is illustrated by the well-known Latency Numbers Everyone Should Know.

[Click on the image to view full-size]

 
Figure 10: Latency Numbers Everyone Should Know

The graphic, provided by Google's Peter Norvig and Jeff Dean, highlights the disparity between the L1 cache reference (0.5 nanoseconds) and the main memory reference (100 nanoseconds), a factor of 200. Given that L1 cache reference has become 200 times faster since 1990 compared to memory reference, which is only twice as fast, there's a significant advantage in having operations fit within the CPU cache.

This is where the beauty of vectorized query processing lies.

[Click on the image to view full-size]

Figure 11: Implementation of a projection with vectorized query processing

Let's consider the same projection of revenue + tax example we discussed before. Instead of retrieving a single row, we get as input three vectors of values and output two vectors of values. We read a chunk (a collection of small vectors of columns) instead of a single row. As the first vector remains unchanged, it's reassigned to the output. A new result vector is created, and an addition operation is performed on every individual value in the range from 0 to 2048.

This approach allows the compiler to insert special instructions automatically and avoids function call overhead by interpreting and switching around data types and operators only at the vector level. This is the core of vectorized processing.

Exchange-Parallelism

Vectorized processing being efficient on a single CPU is not enough, it also needs to perform well on multiple CPUs. How can we support parallelism?

Goetz Graefe, principal scientist at Google, in his paper "Volcano - An Extensible and Parallel Query Evaluation System" described the concept of exchange operator parallelism.

Figure 12: Exchange operator parallelism

In this example, three partitions are read simultaneously. Filters are applied and values are pre-aggregated, then hashed. Based on the values of the hash, the data is split up, further aggregated, re-aggregated, and then the output is combined. By doing this, most parts of the query are effectively parallelized.

For instance, you can observe this approach in Spark's execution of a simple query. After scanning the files, a hash aggregate performs a partial_sum. Then, a separate operation partitions the data, followed by a re-aggregation that computes the total sum. However, this has been proven to be problematic in many instances.

Morsel-Driven Parallelism

A more modern model for achieving parallelism in SQL engines is Morsel-Driven parallelism. As in the approach above, the input level scans are divided, resulting in partial scans. In our second pipeline, we have two partial scans of the sale table, with the first one scanning the first half of the table and the second one scanning the latter half.

[Click on the image to view full-size]

Figure 13: Morsel-Driven parallelism

The HASH JOIN PROBE remains the same as it still operates on the same hash table from the two pipelines. The projection operation is independent, and all these results sync into the GROUP BY operator, which is our blocking operator. Notably, you don't see an exchange operator here.

Unlike the traditional exchange operator-based model, the GROUP BY is aware of the parallelization taking place and is equipped to efficiently manage the contention arising from different threads reading groups that could potentially collide.

[Click on the image to view full-size]

Figure 14: Partitioning hash tables for parallelized merging

In Morsel-Driven parallelism, the process begins (Phase 1) with each thread pre-aggregating its values. The separate subsets or morsels of input data, are built into separate hash tables.

The next phase (Phase 2) involves partition-wise aggregation: in the local hash tables, data is partitioned based on the radixes of the group keys, ensuring that each hash table cannot contain keys present in any other hash table. When all the data has been read and it's time to finalize our hash table and aggregate, we can select the same partition from each participating thread and schedule more threads to read them all.

Though this process is more complex than a standard aggregate hash table, it allows the Morsel-Driven model to achieve great parallelism. This model efficiently constructs an aggregation over multiple inputs, circumventing the issues associated with the exchange operator.

Simple Benchmark

I conducted a simple benchmark, using our example query with some minor added complexity in the form of an ORDER BY and a LIMIT clause. The query selects the name and the sum of revenue + tax from the customer and sale tables, which are joined and grouped by the customer ID.

The experiment involved two tables: one with a million customers and another with a hundred million sales entries. This amounted to about 1.4 gigabytes of CSV data, which is not an unusually large dataset.

[Click on the image to view full-size]

Figure 15: The simple benchmark

DuckDB completed the query on my laptop in just half a second. On the other hand, PostgreSQL, after I had optimized the configuration, took 11 seconds to finish the same task. With default settings, it took 21 seconds.

While DuckDB could process the query around 40 times faster than PostgreSQL, it's important to note that this comparison is not entirely fair, as PostgreSQL is primarily designed for OLTP workloads.

Conclusions

The goal of this article is to explain the design, functionality, and rationale behind DuckDB,  a data engine encapsulated in a compact package. DuckDB functions as a library linked directly to the application process, boasting a small footprint and no dependencies and allowing developers to easily integrate a SQL engine for analytics.

I highlighted the power of in-process databases, which lies in their ability to efficiently transfer result sets to clients and write data to the database.

An essential component of DuckDB's design is vectorized query processing: this technique allows efficient in-cache operations and eliminates the burden of the function call overhead.

Lastly, I touched upon DuckDB's parallelism model: Morsel-Driven parallelism supports efficient parallelization across any number of cores while maintaining awareness of multi-core processing, contributing to DuckDB's overall performance and efficiency.

About the Author

Rate this Article

Adoption
Style

BT