Facilitating the Spread of Knowledge and Innovation in Professional Software Development

Write for InfoQ


Choose your language

InfoQ Homepage Presentations Datadog: a Real-Time Metrics Database for One Quadrillion Points/Day

Datadog: a Real-Time Metrics Database for One Quadrillion Points/Day



Ian Nowland and Joel Barciauskas talk about the challenges Datadog faces as the company has grown its real-time metrics systems that collect, process, and visualize data to the point they now handle trillions of points per day. They also talk about how the architecture has evolved, and what they are looking to in the future as they architect for a quadrillion points per day.


Ian Nowland is the VP Engineering Metrics and Alerting at Datadog. Joel Barciauskas currently leads Datadog's distribution metrics team, providing accurate, low latency percentile measures for customers across their infrastructure.

About the conference

Software is changing the world. QCon empowers software development by facilitating the spread of knowledge and innovation in the developer community. A practitioner-driven conference, QCon is designed for technical team leads, architects, engineering directors, and project managers who influence innovation in their teams.


Nowland: My name's Ian Nowland, this is Joel Barciauskas. We work for a company called Datadog, our headquarter is here in New York, Joel [Barciauskas] works out of Boston. Today, we're going to talk about one of our oldest largest-scale systems at Datadog our real-time metrics database. For those who [inaudible 00:00:17], there's been a power of 10 change in the quadrillions and the trillions. As we are pre-IPO, our lawyers are very nervous about saying only certain things. Quadrillions is what we hope to be capable of. Trillions is what we publicly say we're capable of today. That's the reason for the change.

For those who haven't heard of Datadog, I just want to do a quick couple of slides about what we do. It's pretty important for the talk. Our customers tend to be either developers or operations teams throughout the industry. What we really think of is, we want to be able to offer the telemetry that they need to understand their systems and production. This is just right off our website, a bunch of customers. This is the type of data we store. We integrate into a whole bunch of open source, a whole bunch of vendor systems. We want to just pull back anything that we can about their systems so that people can really understand what's happening. There's a whole bunch of different type of data but today's focus is going to be on metrics, which is the original product from Datadog launched eight years ago.

It's talking about the motivation for this talk and what we hope for you to take away. Datadog has been in a place where I think a lot of us in the data processing industry, we have seen this explosion of the amount of data people would like us to process, which is to the extent that we can cost effectively process more and more data, people just keep finding new ways to throw data at us. There's a couple of particular dimensions that we see this. When Datadog launched, people were just getting out of data centers. They had these data sources that were very big service size, living for months or years. What's happened over the last eight years is people have got smaller and smaller, more limited and more limited in life span. It's really changed the type of data that people have been sending to us. The motto here is function is a service of Lambda but where really the application instance doesn't live for anything more than minutes. We've had to cope with this big change in dimension.

At the same time of that, we're seeing this great change in data volume. When Datadog launched, all it was was system metrics, things like CPU usage, disk usage, things like that. What we've discovered is, to the extent we can bring in more and more data about richer and richer application stacks into the one place, that's what people really want to understand their system. They don't really want to go to multiple places. I walk from bottom to top here starting with system metrics. Then people start wanting application metrics, say garbage collection time, things like that. Then people start wanting SLIs, the APIs that they have, they really want to know how they're performing. What we're seeing pressure from today is even like getting per user or per device metrics aggregated across all these application-serving customers. Today we have people asking for us. There is this two-dimensional increase in data. That's really what we want to talk today, about lessons in terms of what we've learned in dealing with that.

In framing those lessons, these metrics are from the performance engineering community Brendan Gregg, who's a big blogger about performance engineering. These are just straight drawn from his website. We want to claim like we take these mantras and we apply them and that's how we decide how to change our system. Rather what we found as Joel [Barciauskas] and I were working on this talk is that these form a great way of framing everything that we have to do to build something that can really scale and scale cost-effectively. The major lesson to take away is not focusing on one or two of these things. It's by focusing on all of them that we've been able to grow to where we are today.

This is the talk structure for today, we're going to start with talking about what are metrics databases, just quickly for people who haven't run into it. We'll talk a little bit about architecture, dive deep in our datastores. I think that's one of the key things where we've done a good job of using those mantras. Talking about synchronization, what falls out of the datastores [inaudible 00:03:44]. Then Joel [Barciauskas] is going to take over, he's going to talk a little bit about aggregation. Aggregation's both been a great way for scaling with historical methods. Joel [Barciauskas] is going to introduce this concept of sketches, which is something that we're doing to bring it to the next order of magnitude of scale.

What Are Metric Databases?

First of all, what are metrics databases? This is the typical metrics query. It's like Metrics 101 but just for those who haven't seen it, someone wants to know, "What's the system load been and what are my [inaudible 00:04:10] across the last 30 minutes?" This introduces the idea that there's a metric system load, there's a host generated, and then this notion of time. It's very key to what a metrics database does. What does this look like? This is just a graph of this metric across 30 minutes. We call the thing on the left a time series. The metric name here is just the load. I'll talk names and tags are really important ways to think about metrics. It's made of a bunch of points that were reported by a particular host, an agent and a host. Of course, it has a value at any particular point in time. That's really important when we get to the next slide.

This is very simple. What does a metrics database do on top of this though? Once we get to a more advanced query that actually introduces the three main things that metrics databases do that are hard and definitely hard at scale. The first thing is this notion of alerting. The general use case of metrics database is you see the people wanting to understand what's happening in their system right now on dashboards but also people want it to be regularly proactive. Page me when the system load on my fleet hits a certain value. What that take action brings is a high throughput of queries that you're running over time, as well as very tight windows on how long you can take to diverse queries. It's where we talk about real-time, it's where the real-time comes from.

The next thing is this aggregation. While you might dive down into what one host is doing occasionally, generally people's applications today are running across a fleet. They don't really want to see a time series that just wants to represent one line for one host. What they want is one line that represents the aggregate across 10 hosts or 100 hosts. Tied to this notion of aggregation is also this type that I mentioned, which is, people will be running these queries across all their fleet. Whether they want to go view a UI or whether they want to go set up an alert, it's always along these dimensions. This notion of dimension is very key to a metrics database.

Diving a little bit in terms of what these dimensions mean, the metric name is the specific thing that you're measuring. These tags, these dimensions, are capturing all of the different ways that you might want to slice and dice that data to understand your system. You walk from left to right along the bottom there. You see host because it's the easy one. I want to know what's happening on a host, "I want to see what's happening in my AZ." As people get more and more advanced, it's, "I want to see if my application runs differently on different versions of kernels," and the next kernel. What you see is this expansion of dimension that people want to send with every single point.

That's actually really the point. I talked about that slide earlier about how we're seeing so many more points that people want to send us, that people want to send us so many more dimensions on the points. These are the same categories as I talked about earlier. What I dearly want everyone to record is what host it was on, what were the module aspects of what was done? What was the path or the curve that that leads me to want to make this metric? Ideally, what was the customer? What probe was I serving for what customer? You get this big expansion of dimensions. This dimensionality, this cardinality, is a big part of a metrics database.

Our Architecture

Talking through our architecture, I wanted to start with the 1,000-foot view, this just a simple pipeline architecture. It's actually nice that the simple pipeline architecture from a 1,000 feet actually meets our architecture pretty well. We're taking large amounts of data, putting it through a pipeline, and getting out at the lower left there what customers want. One of the interesting things I guess is that we have that query system is aware of all the different types of datastores. Then it just emits to both types. We do a streaming processing. We have the one query system for both the UI and the monitors and alerts.

I want to fold in the mantras for the first time. The first one to focus on is "Just do it but don't do it again." This is for everyone in the room, of course, caching is very important to us. We cache both in the query system and in the front end. There's cache for different types of things, one is rendered queries, the other can be subqueries. Caching is super important to us, it's a big part of how we scale. It turns out that this type of system, because time is constantly moving, and the focus is always what's happening right now, query is an important part of a story. But it's actually not, in theory, it doesn't get you all the wins that you need. What we really need to focus to make the big win to our system is this pipeline between intake and, in particular, into our datastore. That's what I wanted to dive into.

One of the important things with metric stores - and it's why we talk about a metrics database and a time series database - is that in a metrics database there's this general property that even in a Lambda, something that just lives for one or two minutes, they're reporting time series, they're not reporting points. What that means is that you can separate out a tag set, you can separate out a time series and actually store that very differently to your points. You can basically make optimizations on top of that. It basically leads to a very scalable system that a more generalized time-series database sometimes struggles with. What does that look like for diving in on our intake and datastores?

The main heavy lifting bit of open source across our company is Kafka. Kafka has served us really well in terms of its durability and its scalability. What I represent here is the first thing you can see in these gray boxes, I separated out the points traffic from the tag set traffic. That's that key thing, that we can basically treat these as separate streams with different scaling characteristics. What I've then got though in that next thing is we can also basically use Kafka as a [inaudible 00:09:18] the different types of stores to consumers at their own rate to actually have different types of datastores taking the exact same data source and actually serving up different queries. What this has allowed us to do is both take, print at production a bunch of different types of datastores but also make iterations on top of those datastores over time to improve them completely independently of the rest of the system.

Another aspect of this system that's very important to point out is, because we can do all this off the Kafka, we can basically just do it right and minimizing work on the part of the system. Generally, we want to keep - and we have kept - our intake system really simple. Make intake simple, dump it to Kafka. For us, Kafka's a persistent layer. We store that data in there for hours, sometimes for days. What that means is we can really optimize work in the storage layer and in the query layer as opposed to having to do it all on the intake.

Coming back to this picture, this looks like a good picture, but the natural question I have when I see it is, "How does this horizontally scale?" Kafka itself has horizontal scaling issues, but there's no real notion of how these different datastores could actually horizontally scale at all. The question is, how do we horizontally scale? The answer is, and has been for a long time, fairly naïve and naïve in a good, simple sense, Kafka sharding. Intake does do a bit of work, it basically has to decide partition. The type of things we partition is known as a customer ID or a hash of their metric name. We put those into different Kafka partitions. Then it's nice because those can basically horizontal scaling just knowing about what Kafka partitions they have to consume from. It's been a nice way for them to scale independently of each other. The struggle with the system is the reason we ended up moving away from it. As we get to very high scaling, you start talking about thousands of partitions is it gets very hard to avoid hot spots or cold spots in your stores.

What we're moving to right now actually is more independent scaling of these systems independently from the Kafka. I'd say Datadog's success story today has been very tied to Kafka sharding, it's worked really well for us. This comes up to coming back to the mantra "What's the learning here?" This is that we do it concurrently. Basically, with the use of Kafka what we've been able to do is use independently horizontally scalable datastores. It's been a great way for us to scale with, as we discover our customers' needs and scale the right things independently of each other.

Deep Drive on Our Datastores

Now I want to deep dive on those datastores. It's abstract what they mean at the moment. One of the things I wanted to bring to light is, when you deal with a metrics database, you're talking about a lot of data. This is just a ballpark, this isn't of any particular customer. It's just ballparking the way you get into the amount of data one customer can send at you. I have about 10,000 apps, as I said, things are getting smaller - smaller containers, fewer monoliths, more microservices. As I talked about earlier, people want to have all these different types of metrics today. They want to have system metrics, application metrics, JVM metrics, system metrics. You really get 1,000 metrics emitted from just one application. This is one point per second, so this is one of those numbers in the middle, but some data are every single query where you want to be faster than one point per second. There are other things where it’s one point a minute. This is just trying to keep it to powers of 10.

One point a second is a good [inaudible 00:12:35] just to work out the amount of data. There's about 100,000 seconds in a day. The part I'm just ballparking is we use 64-bit loads as our data as a primary point. That's what we drive on, so just trying to keep it to powers of 10, adding in the fact that we do have tags [inaudible 00:12:48] as well, let's just go back 10 bytes per point. What you get off that just doing other modification, there is about 10 terabytes a day for an average customer.

Then it's, "How do we make 10 terabytes a day queryable?" Obviously, the first place to go is it's, "S3's great at storing bulk amount of data." You just run the numbers through, ten terabytes per month is $210, that's really cheap, it’s fantastic. You tend to want to make it maybe queryable for a month. You just do the multiplication through rolling store $60,000. Not quite as cheap but, again, something that seems pretty feasible to build.

The key challenge I think for any of us who have worked with S3 is it's great at this like bulk gerbil storage where you just want the blob back. But for any type of high throughput, definitely with our real-time requirements, S3 in and of itself is not going to ever perform well enough. What this tells us is S3's going to be great for a long-term gerbil store, but to really scale this, we need to do something faster. That turns into the question of what.

I put together this slide. This is also generated by me just crawling AWS website looking at different options. What I've tried to represent here is the different cost of different options for maximum capacity storage. That first line is one of the X1E instances types. What you see, DRAM, is pretty much where it's always been, very expensive but incredibly fast, low latency. S3, of course, is towards the bottom, it's very cheap, very slow, but it also has that characteristic of being very durable. The interesting ones are in the middle. SSD has come up, particularly over the last five years, where you look at the modern instance types. The bandwidth and the latency are getting very close to DRAM with a big order of magnitude cost difference. That, of course, highlights that that's a really interesting option for us. What I want to talk through is how do we apply this.

Just starting with everyone, let's just say, everyone who wants to do things fast and is just going in-memory databases today, what does the math of that look like? 300 terabytes, that's 80 x 1e.32xlarge for a month. That takes it to $300,000 for a month. Now you're getting into a really expensive system for one customer. This is with no indexes or overhead. There's generally going to be multiples of this to actually get the system in production. You also run into the problem of people want to query much more than a month.

When people are having that urgent issue in the middle of the day, they want to focus on the last five minutes, but they might want to compare it to what happened like six months ago and see the trend for the last six months. We can't just over-focus on in-memory storage. What we see is neither of the inputs worked for us until we've really got to do a hybrid approach.

The first step of taking a hybrid approach - and this is like anyone building a data store - if you want to build a perfectly generalized, highly indexed datastore, what you will find is, it's very expensive. There are just way too many things you could index on compared to what customers are going to query on. You really want to focus on your particular patterns of query. This is just the query I showed earlier. What it shows is the notion of time, we want to select a particular time internal, we want to aggregate across dimensions. What it highlights to us is that the major things that we want to build into our query store.

It turns out that this has far fewer indexing needs than lot of other types of problems. I just list these off here, the first one is describe tags. Because tags are very dynamic, when people want to work out what tags has my system been generating for the last month, they need a describe function, so we need a tag index, so we needed describe tags, a describe thing. The next thing is just having that tag index, "This was a time series, reported for five minutes, divert tags that were used." That's important. The actually more important one is the inverted index. Generally, with the pattern that we've got, people are coming to us with like five tags. All we want to really look at is the time series that had both tags and nothing else. The inverted index solves that well, it's a very important part of the system. Finally, just because we have the point store, we actually need to at some point to get the points given back to customers. The point store is the important part, as well.

How do we actually use that notion that we can break out these different datastores to do it cheaper? The answer is [inaudible 00:17:04] it's been very hybrid in both how we approach data storage and how we approach technologies.

I talked about this table earlier. The ones that really appeal to us is S3 for its long-term durability and cost, DRAM for its speed and high throughput report, and then SSD as basically a good compromise between the two. Pretty hard throughput, very low cost because it puts the instance failure of volatility on us. What does that look like? There are four types of queries I talked about earlier, what you're seeing on the left here. I've also added query results, which is the cache that I talked about earlier.

How do we actually implement these? What I'm showing here, and I know it's introducing a lot of complexity, is what we've done. We've tried to pick the right technology for the right index. If that meant picking more important technologies to deal with different [inaudible 00:17:54] that's what we've done. We've chosen a very hybrid strategy to build this hot cost-effective high throughput system. Not only have we taken this approach with the storage types that we've used, but we've actually done this with technology, as well. We've taken a very hybrid approach. This isn't just engineers picking their favorite. This is very much engineers being very thoughtful about what's right for what I need. What you're seeing in examples here is, we use LevelDB for SSD storage where it's very high-performing. DRAM is for in-memory. We like Cassandra, where we can trust it to horizontal scaling, more mid-performance. We use Rocks DB and SQLite for when we want a lot of flexibility in types of queries that we want to run.

The other thing is, particularly when we're storing stuff in memory, what we found is that there is no real substitute for just picking the right data structures and just storing them in memory. We use a lot of Go code just using the right indexes and the right clever patterns to store anything else. The key takeaway, this is a lot of data, but what we've found is to do this well and at scale, it's a very hybrid approach to traditional systems.

Handling Synchronization

All of that sounds great, so why don't more databases do this? Generally, when you build these very independent datastores, your biggest issue is handling synchronization type issues. Everything becomes big global locks across everything that's been developed independently. The nice thing about the metrics database is that synchronization is important for us but that we don't have to do it with traditional synchronization mechanisms.

The first thing to realize is that if you're the human sitting at the UI clicking on refresh, if you do eventual consistency good enough, if you can get people where maybe that last second isn't totally up-to-date, if I hit refresh, I get that last second, people are going to be completely fine with that. Like a lot of eventual consistencies today, UI requirements are like, "Good enough one second type eventual consistency" and we're totally fine.

The power here is all about alerts and monitors. This is where synchronization is very important. The way that I think about these is that they're level-sensitive. We end up marketing the points that come in, we want to evaluate a function across them. If that changes too much between one interval and the next, we really might end up paging someone and waking them up in the middle of the night.

What that means for this system is false positives are almost as bad as false negatives. False negatives - you miss an event, that's very bad. But false positives - someone gets woken up in the middle of the night because Datadog has one of its many storage systems delayed for a few milliseconds, that's just as bad. That's actually a key synchronization that we're trying to build just trying to solve for this.

A really important characteristic of this though is that a small delay is actually preferable to evaluating incomplete data. You can't take this to extremes, if people will have to wait 20 minutes to get paged, they're going to be pretty upset. But to the extent you can just wait seconds to maybe lag in a big distributed system to be resolved, it actually works very well if you can make it work.

The key synchronization, what drives that implementation here is just this notion of buckets. We want the same amount of times points in each bucket before our evaluation system looks at them. The way that we've solved this is a heartbeat system. The way that I think of a heartbeat is, we are injecting intake this data that's going to flow through the system so when it comes out of the back, we basically know a set amount of time has passed very deterministically. All that ends up looking like we inject fake datapoints that we know are fake datapoints, but they have to go through the exact same path through our systems as the real datapoints. All that basically means is that our monitors and alerts, they're right there at the end. It's the one that has to know "Has this heartbeat got through?" in terms of deciding whether it's the right time to take action or whether it should delay a bit before it gets all the data through.

What does that mean in terms of details? We have a one second tick time for metrics, that's the fastest you can send this data - that's pretty important. In terms of handling concurrency, the agents might want to send us multiple points. We do have a last write wins semantic, the triggering point for keeping this clean. Then it really just comes down to injecting the fake data into our intake exactly in a way that it's going to flow through our system like real data. That's the monitor evaluation’s job.

The nice thing about this is, it's a really unique way of doing synchronization. The challenge with this is you have all these different teams now building all these different datastores with different sharding characteristics. You're ending up with a lot of independent parts to go through. That's definitely been a big design challenge for us, but to the extent we've tried to stay on top of it and keep building a richer and richer system. This has allowed us to have these independent scaling storages without having any type of global locking type issues.

That comes back to the performance metrics. The final one to apply here is just don't do. Rather than go full-on with synchronization, just build the bare minimal of synchronization needed. That's been a lesson here. With that, I'm going to hand it over to Joel [Barciauskas] who's going to talk about aggregation. It will be a way of getting to the next couple of orders of magnitude of scale.

Introducing Aggregation

Barciauskas: I'm going to talk about aggregation and why is aggregation important enough that it merits its own dedicated team and its own portion of this talk. To talk about aggregation, I first need to talk about the types of metrics that we support in our system. You can think of all of our metrics fundamentally as counters or gauges. These types really define the aggregation that we use over them. Counters are things like requests and errors per seconds, total time spent. You can think of a stop watch that you're continually clicking like a chess clock.

Those are things that are aggregated by summation whereas gauges are continuous functions where we're taking readings at certain times. There's always a value for a gauge, that there's always something happening. You're just looking at it at specific points in time. That means that generally what you want to do is aggregate it by the last value that you've seen or an average of some of the recent values that you've taken a look at. This tends to be things like your system metrics like CPU network or disc usage, but it can also be some higher-level application level metrics like queue length.

How does this translate into how we interpret the data that we see in a time series context? This example here is showing four sources of time series data. It could four servers or four containers or four Lambda functions, that are each generating independent time series data and over time in 10 intervals. You can see that the output of a query is really dictated by the type of data that we classify those time series as. You can see the counter comes up with a very different query output than a gauge. Even gauges can be different, depending on if you're choosing to aggregate by the average of all of the data in that time window or just by the most recent value. Both of those have valid uses, They're really up to you to determine how to derive the meaning from your data.

Ian [Nowland] alluded to some of this, but I just want to talk a little bit about some of the common characteristics that we see, user queries specifically. This is in the vein of queries that are diagnostic in nature. You're in your metric system, you're investigating an incident, you want to see what went wrong. These kinds of queries tend to be bursty and unpredictable, you may go long periods of time between incidents. They're latency sensitive. you want to get answers to your queries quickly, you don't want to sit there staring at a spinner. In skews to recent data, you're most likely looking at data that's related to something that's not being in your system right now. But you often want some reference data to compare that to whether that's yesterday or a week ago or three months ago, etc.

There's another subset of user queries that have their own set of characteristics separate from the diagnostic ones, and that's dashboards. These are actually much more predictable than the diagnostic kinds of queries. They're actually really important. Someone has taken the time to actually think through what this query looks like, what the dimensions are, and what will give you the most meaning in advance. It comes from some past experience. Generally, what you're looking for in these graphs are things like step function changes, things like performance regressions large changes in usage patterns, etc.

How do those characteristics drive some of the decisions that we're making in terms of aggregation? We really want to focus on the output of these kinds of queries. You can make assumptions about what the kinds of queries are that people want to put in dashboards. You see two examples of the same data being queried in different ways here. On one hand, you have this many layered cake where if you took a bite of it, you would get all the flavors would be super muddied. You really wouldn't have any idea of what's going on there. It's really hard to get any meaning out of all of those 3,000 different lines. You can barely perceive that there's that many. If you had to guess, you'd probably guess there is like a couple hundred whereas on the right, you see a much more human digestible level of data.

What's happening here is, these graphs are both aggregating 70,000 series of data under the hood. That's actually not a ton in the context of some of the numbers that Ian was talking about, but in each of these graphs, we're seeing aggregation where we're reducing the number of time series from input to output by between 10 and 2,000 times. That's a huge reduction.

How can we use this to our benefit? We can think about how this data is getting queried and when it's getting queried. How can we do it when the user isn't looking? Aggregation typically happens in these kinds of systems in two places. It will happen at the source, so if you have like a collector agent of some sort, you'll see that the data is being summed or aggregated by last or average at the agent level. Then at the query system, it will be aggregated based on the user's queries, the time window over which they select the query, and the dimensions they select.

Because of the properties of these dashboard queries, we can actually introduce this streaming aggregator component to our architecture that knows about the common dashboard queries that are being done and takes them out of the query path and moves them between the intake and the datastore. That means that the customer's experiencing a 10 to 2,000 time improvement in their response time because we're doing that much less work on the front end.

Pre-aggregation is a really powerful tool for us for doing it when the user's not looking and moving things out of that user-sensitive, latency-sensitive flow. It does have limitations, of course. Once we've aggregated that data, taking an average of averages can lead to inaccuracy. You don't have the original waits and things like that. We can do really well on the specific sets of data, but there's limitations in how much we can reuse that data throughout the system. We'll talk a little bit more about what if that limitation didn't exist.

Aggregation for Deeper Insights Using Sketches

Let's talk a little bit about another kind of metric type. I mentioned at the top that we support two kinds of metric types. I was misleading you a little bit, we actually support a third type of metric. These are distribution metrics and these are very different than counters and gauges. These are generally what you are observing, rather than a continuous function or a counter where you're monotonically increasing a number, what you're observing here are discreet events in the system. You're looking at things like a specific request latency or a request size. Generally, the way that you want to aggregate these is by percentile. You want to know what your 99th percentile or even your four-nines of latency are or request size or things like that. You really want to see what's happening at the tail of your distribution, so you know that you have confidence that 99.9% of your users are having what you consider to be a good experience, for instance. That last one would be an example of an SLO that your organization might have.

How do we calculate distributions? Using this same set of example data, the traditional way that you would calculate a distribution, you first need to take all of your data across space and time. You need to sort all that data, then you need to observe where the value is that you want to take the percentile reading from, what rank that is in your sorted list. In this example, you wanted the P50 or the median value. In a time series of 40 values now that would be between the 20th and 21st value. Traditionally, you might average those two, in this case they're the same, so it doesn't really matter - it's five. If you wanted the 90th percentile value out of a series of 40, you would get the 36th value, which is 32 in this case. Your 90th percentile, whatever value from this series, is 32.

All of that storage and sorting each individual point and sorting it across whatever dimensions have been chosen, you can imagine gets expensive relatively quickly. We really want to think about how we can do this a bit more cheaply. That brings us to a class of data structures that are often referred to as sketches. Sketches all share these two characteristics where you want to examine each item in the data streams. This comes from the math study of data stream processing where you want to examine each item a limited number of times. Ideally, you're only observing each discreet value once, but depending on the data structure, it might be multiple times. But it's a fixed number, a constant number and a limited, a bounded memory usage where the memory usage is either logarithmic or better to the size of the stream. In the best case, it has a constant bounded.

Some examples that you might actually be family with that fall into this category of data structure, HyperLogLog is a very commonly used data structure. It's provided as a feature of Redis through the PFADD, PFCOUNT, PFMERGE operations. Some others are bloom filters, which are used for an estimation of set measurement, similar to HyperLogLog but with some different characteristics. Frequency sketches are another category, if you wanted to know what the top 10 or top 100 unique page views were or pages with most views were in your system.

Of course, with any engineering decision and application of clever data structures, there are tradeoffs. We have to think about the speed at which we can process this data, the accuracy, and the space that it's taking up. We can think about the characteristics of the queries that our customers are making to help us make these tradeoffs. We're thinking about the range of inputs that we expect and what those queries are that we're answering those percentile and SOL kinds of queries.

Thinking about distribution metrics specifically, it's really important that we have a bounded error. I'll go into some more detail about what that means. In general, our customers expect that the value that we give them from a particular query is accurate and we can tell them that the answer is accurate within plus or minus 1%. We're very interested in and sensitive to the performance, both in terms of the data structure, as I mentioned before, and also in terms of the speed of inserting new data. That comes up because of our interest in minimizing the impact of the agent on our customer systems. Then what are the properties in terms of aggregating across these data structures. It's great if we can observe a single time series through this data structure, but what if I have two of these? How do I put them together?

How do you compress a distribution so that you're able to have a different view of it that reduces the size but still maintains some amount of accuracy? If we think about it in terms of these buckets and counts, you can draw a representation of a curve by just splitting it up into these discrete buckets. This is actually an example from OpenMetrics/Prometheus from their documentation about how they do this. Internally, they support these kinds of buckets and counts.

This is an example from their documentation showing how you might record a distribution of request latency. We're going to talk a little bit about how they handle queries for this data. You can see here that there are six buckets. If you want to know the median value, you start from the last one and you say, "The total number of values is 144,000." If you want to know the median, you're looking at the 72,000th value. That falls somewhere between the 100 millisecond and 200 millisecond bucket. But where does it fall inside that? We don't really know, but we can guess. We can draw a line between those two buckets and say, "If we linearly interpolate between these two buckets, what value do we get?" We get 158 milliseconds. If you queried Prometheus for the median value of this data set, it would tell you 158 milliseconds because it uses linear interpolation.

What if we wanted to query this data for the 99th percentile? The 99th percentile actually falls in the last bucket, which is defined as anything greater than 1 second. We actually don't really know where that value falls. It could be around 2 seconds or it could be 100,000 seconds. We really have no way of knowing from this particular way of storing these buckets.

There's another approach that's called rank error. This is actually used by a data structure that was developed by the two authors of a paper. It's referred to as the GK Sketch for their names, which I'm not remembering off the top of my head. What it tells you is specifically that the value that is returned by the data structure when you query it for the 99th percentile definitely falls between the maximum value observed and the 98th percentile. That's pretty good, that tells you that it falls in some range that you have some confidence about what it means. What we see really frequently is that we have really long-tail skewed distributions like the one that you see here. That presents a real challenge. We can actually see that subsequent queries over time for the 99th percentile of a data structure like this, the value can bounce around pretty widely and wildly. That's not really what you want to see from your distribution. You want to know that the value that you're getting is really close to the value that you actually want to observe.

That's why we've looked at this. We've come up with an approach that guarantees a relative error. That's an error relative to the value itself where we can say that when you query to our sketch, the value that will be returned is plus or minus 1% of the actual 99th percentile value that we observed. What does that mean in terms of how it gets used? Going back to the SLO example, I want to know, is my 99th percentile value under 500 milliseconds? With these relative error bounds we can say, "Within 99% of requests are less than 500 milliseconds plus or minus 1%." We can restate that as 99% of your requests are guaranteed to be less than 505 milliseconds. That's a really tight guarantee that we can give you on the answer to this question.

That was talking a bit about the bounded error. The faster insertion property that's really important to us, we get this - again, using this approach of these buckets and counts, each insertion, each new value observed, is two operations. You find the bucket you want, then you increment the count. Sometimes there's an allocation that tends to get amortized over the density of the sketch and essentially goes to zero as your sketch becomes denser.

How do we achieve the fixed size bound? Certain distributions, it's possible for us to reach. We've set a maximum number of buckets, essentially, in order to achieve this fixed size guarantee. In our case, we set the number of buckets to 4,000. What do we do if a distribution is such that that's not enough buckets are represented maintaining our relative error guarantees? The way that we do that is by rolling up the lower buckets. Our users want to know the tail latency, the 99th percentile, the four-nines or five-nines of their latency. They generally do not care as much about the 5th percentile or the 10th percentile. Generally, the first percentile that people tend to find interesting is the median. We can roll up a bunch of those extra buckers in order to maintain the accuracy at the tail end that people are really interested in. It's also worth noting that while this is a guarantee that we provide, we actually have yet to encounter a data set in real world usage that exhausts the number of buckets where we've actually needed to roll things up that way.

We talked about the insertion and performance characteristics, the size characteristics. The last characteristic that we care a lot about is this aggregation and merging characteristic. I've included the textbook definition of commutativity from Wikipedia, of course. This is a typical thing where we think about something where changing the order of the operands doesn't change the results. Multiplication is a great example of a commutative operation. Taking the average of averages is a great example of an operation that is not commutative. Why do we care about having commutative aggregation?

Sketches Enabling Flexible Architecture

I'm going to talk about how that property specifically enables us to have an even more flexible architecture. I talked a bit about how we have these different points in the system that we're doing aggregation. Two-way mergeable sketches specifically sketches that can be merged both where a larger sketch is inserted into a smaller one, a smaller one is inserted into a larger one. That means that we can litter this operation really anywhere in our architecture and anywhere where we calculate the aggregation, it can be reused. Whether it's streaming during ingestion, at query time, actually within the datastore by saving partial results we can really spread this workaround. That really enables us to do the work as cheaply as possible and do it in as many places as possible taking it out of the query path and at different points throughout the system.

Our implementation is referred to as the distributed distribution sketch. The distributed part referring to that architectural component, that mergeability where it can be merged both ways, both a large into a small and a smaller into a large one. The distribution part referring to the kinds of queries that we're going to make of it around what is my median 99th percentile, what is my SLO, etc.? This is actually open-source today, this is part of our agent that's Apache license, you can go and see it. Then we're actually presenting at VLDB2019 in August, as well. At that time, we'll be open-sourcing more stand-alone versions. The Datadog Agent Version is in Go. We'll be open-sourcing things like Java, Python, etc.

Coming back to our performance mantras doing it cheaper, using hybrid data storage types, but also thinking about your data structures and how you can select data structures that enable you to meet your customers' needs and in a specific way. Going through all of these performance mantras are all building up to this idea that we have really invested in understanding how our data is used and how we can optimize both our architecture and our data structures to be able to answer the kinds of queries that our customers need and doing exactly the right amount of work to meet those queries and absolutely no more than that.


See more presentations with transcripts


Recorded at:

Sep 17, 2019