BT

Facilitating the Spread of Knowledge and Innovation in Professional Software Development

Write for InfoQ

Topics

Choose your language

InfoQ Homepage Presentations Building Latency Sensitive User Facing Analytics via Apache Pinot

Building Latency Sensitive User Facing Analytics via Apache Pinot

Bookmarks
19:25

Summary

Chinmay Soman discusses how LinkedIn, Uber and other companies managed to have low latency for analytical database queries in spite of high throughput.

Bio

Chinmay Soman has been working in the distributed systems domain for the past 10+ years. He started out in IBM where he worked on distributed file systems and replication technologies. He then joined the Data Infrastructure team at LinkedIn. Until recently, he was a Senior Staff Software Engineer in Uber. Currently, he’s a founding engineer in a stealth mode company.

About the conference

QCon Plus is a virtual conference for senior software engineers and architects that covers the trends, best practices, and solutions leveraged by the world's most innovative software organizations.

Transcript

Soman: I'll be talking about how you can use Apache Pinot as part of your data pipelines for building rich, external, or site facing analytics. I am Chinmay Soman. I'm part of Stealth right now. Before this, I was leading the streaming group at Uber, whose mission was to build the next generation real-time analytics platform. I'm an active contributor to Apache Pinot, and also part of some other open source projects.

Outline

I will begin by discussing what exactly site facing analytics is, and why we need an underlying system that can support high throughput, low latency analytical queries. I'll introduce Apache Pinot and talk about its high level architecture, and how it's optimized for such use cases. Next, I'll discuss some scenarios where maintaining the p99th latency SLA becomes non-trivial. Finally, how we can optimize our input data pipelines and our Pinot data layout to obtain predictable latency.

LinkedIn - Feed Ranking

What exactly is site facing analytics? By this we typically refer to the analytical tools and solutions exposed directly to our end users, or the audience of your company. One of the best use cases is the LinkedIn feed ranking use case. I'm sure most of you are familiar with the LinkedIn news feed. To make sure the news feed is interesting, we need to ensure the user is not seeing the same thing again and again. In other words, for any given story, we need to know, how many times has the user seen this in the last 14 days or so? Which can be done with a SQL query, something like this. At a high level, this might look straightforward. Keep in mind, this query needs to be executed for every active user every time they visit the LinkedIn homepage, which translates to multiple thousands of search OLAP queries executed on a huge database of pretty much all the LinkedIn members. Each search query must execute pretty quickly, in the order of milliseconds. Otherwise, it's going to lead to a terrible experience for our users.

Uber Eats - Restaurant Manager

Let's take another example, which is the Restaurant Manager from Uber Eats. This is a tool that's given to restaurant owners across the globe to slice and dice the data from Uber Eats. When you open the tool, you can see a view of something like this, or you can see sales metrics in a real-time and historical fashion. You see missed orders or inaccurate orders, which is more real-time, and other information such as top selling items, menu, feedback, and so on. You can imagine to load this view, we need to execute multiple complex OLAP queries, all executing concurrently. Multiply this with all the restaurant owners across the globe leads to a lot of QPS to the underlying database. All these queries must execute in the order of milliseconds for a good experience for our users. It is clear, to build such rich analytics for external audience, we need an underlying analytical system that can ingest data from real-time and historical sources, and support high throughput, low latency queries in a reliable manner.

Apache Pinot at a Glance

Enter Apache Pinot. Apache Pinot is a distributed OLAP store, which can ingest data from a variety of sources such as Kafka, HDFS, S3, GCS, and so on, and make it available for querying in real time. At the heart of the system is a columnar store. It also features a variety of smart indexing techniques and pre-aggregation techniques for low latency. All these optimizations make it a great fit for a lot of analytical use cases, such as business metrics, or dashboards, or the analytical application which we saw with LinkedIn, or even things such as real-time, ad hoc exploration, anomaly detection, and so on.

Pinot is a mature product, today. It's used in a lot of big data companies across the globe in production, as shown here on the right. The usage is rapidly increasing. Of course, no introduction is complete without vanity metrics. Here you go. Some of the largest Pinot clusters can ingest data at a rate of upwards of a million events per second, support hundreds of thousands of queries per second, while still maintaining a low latency in the order of milliseconds.

High Level Architecture

Let's get our hands dirty and see how Pinot looks like under the hood. Here's a high level architecture of Pinot. At the bottom of the diagram, you can see the data sources, where to get the raw data from. You can use Pinot as part of your real-time pipelines, where the data flows in through either Kafka, Kinesis, Pub/Sub, and so on. In this case, Pinot will be ingesting the data one event at a time. You could also use Pinot as part of your batch pipeline, where the raw data is sitting somewhere in HDFS or S3, and we run an ingestion job to bulk load that data into Pinot. The system supports Lambda architecture. The cool thing here is you can run both these pipelines, for the same table, and it will be presented as one unified view to the user, which is pretty convenient. On the top right, we have the controller, which uses Apache Helix to coordinate different cluster operations, such as partitioning, replication, load balancing, instance allocation, and so on. Finally, we have the data plane consisting of brokers and servers. Servers will get the incoming data and organize it in a columnar format, and make it available for local queries. Brokers will get the queries issued by users or applications, and then do a distributed scatter/gather to compute the final result.

Pinot Data Flow (Real-time)

Let's take an example of a real-time pipeline with Pinot to illustrate the flow of data. Here we have the Pinot controller, Pinot broker, 4 servers, and a Kafka topic with 4 partitions. Let's assume we want to set up a table to ingest from this topic. The very first thing controller will do is to fetch the Kafka metadata, discover the 4 partitions, and decide that it needs to create 4 segments for each of these partitions. A segment here refers to the unit of partitioning and replication in Pinot. It basically refers to a chunk or subset of your data. You'll have many segments for a given table. You can see how the controller has also assigned these segments to the 4 servers. It will use Apache Helix to write this ideal state into the ZooKeeper, and the 4 servers will start reading from Kafka for the corresponding partition and creating a local segment. At some point, the segment will be complete. Then the controller will create the next set of segments, and so on.

With any good distributed system, we want to make sure our data is replicated. Let's set that up. Let's configure our table with a replication factor of 2. The controller will create this config and make sure that every single segment is replicated on two distinct servers, as shown here. At this point, data is flowing from Kafka to all these servers in parallel. When the Pinot broker receives a query, it will do a distributed scatter/gather and contact all the servers responsible for this table. What it effectively does is to pass on this query to each set server. Each server will then locally execute this query for the corresponding segments from that table and return an intermediate response to the broker. The broker will do a final aggregation and then return the response back to the user. You can see that this system, there's a good scale out design adopted by Pinot. If you're ever running out of capacity or you want more throughput from your system, all you need to do is just add more machines. The controller will then allocate new segments onto the new machines. The broker will automatically discover these new segments.

Pinot Segment: Designed for Low Latency

Let's take a deeper look into Pinot Segment, which is really the secret sauce behind the low latency aspect of Pinot. A segment is a chunk or subset of the data. Let's assume the figure on the left is some number of rows of raw data, which needs to be converted into one segment. Internally, the segment will organize this data in a column format, which means all the country data will be grouped together, all browser data will be grouped together, and so on. This is great for executing analytical queries. For example, if you get a simple query, something like count (*) where the country is U.S., all I need to do is process this one column, instead of all the rows from the segment. This obviously leads to a good speed-up and low latency.

What's more is Pinot allows us to add a variety of indexes on any of these dimensions. For example, I can add an inverted index on the country dimension. Instead of scanning the country column, I can just look up the exact entries corresponding to U.S., and get an answer in a very fast manner. Similarly, I can add a sorted index on the country column, or even a star-tree index. Star-tree is a special index, which allows us to maintain pre-aggregated values for some combination of dimensions, for example, country and browser. For this particular query, we can get the result with a single lookup, almost like a key-value store, which is incredibly fast. These are just some of the examples of indexing techniques. There are many others that you can use, for example, range index, text index, geospatial index, Bloom filters, and so on. We are constantly adding these things. All these optimizations allow us to execute queries in one segment in a really fast manner, which in turn lowers the poor server latency, and thus, leads to a low latency across the cluster.

P99th Latency Problem

Let's shift gears and discuss when things go wrong, especially with large scale deployments of Pinot. For most Pinot deployments, things will be running fine out of the box. Then in large scale cases, your latency graph might look something like this. As you can see, your 50th percentile latencies are actually doing fine, somewhere between 50 milliseconds to 100 milliseconds. Your p99th latency is actually quite bad. In fact, at some point, it starts timing out at 10 seconds. Let's take a look at why this could happen. With large use cases, you might end up provisioning a lot of Pinot Servers. For example, in this case is 1000 servers. Every now and then, one of these servers might slow down. This could be because it's undergoing some memory pressure, is going through garbage collection, or has some intermittent CPU spikes. This ends up lowering the overall latency across the cluster. The more the number of servers, the higher the probability of hitting such a server, and thus, it might increase your tail end latencies. In a similar vein, you might end up with a lot of segments per server. Each server has to do a lot of work to execute the local query. Please note, in this case, it will not only increase the p99th latency, but latencies across the board.

Optimizing Data Layout

What can we do about this? The general approach for getting more predictable latency is to limit the query span, or to limit the number of things we need to process to execute the query. The two approaches we have here is, reduce the number of servers processed to execute a query, and number two, is reduce number of segments processed for doing this. Let's look at this one by one. Please keep in mind, these approaches are complementary to each other.

Replica Groups

For the first one, Pinot provides a mechanism known as replica group. A replica group refers to a subset of servers, which contain a complete copy of all the segments for a given table. For example, we can configure this in our table config, as shown on the top right. Here we have two replica groups with two server instances per group. The pinot controller reads this config, it will make sure that each group has all the segments for the table. With this setup, when the Pinot broker receives a query, it needs to send the query to only one of these two groups to get a final result. As you notice, we have effectively reduced the number of servers to be processed by 50%. This will drastically help in improving your tail end latencies.

Partitioning

For approach two, where we want to minimize the number of segments processed, for executing query, we rely on data partitioning. In this case, we need to pre-process the data before it can be ingested into Pinot. Let's assume we want to set up a Pinot table, and we're going to execute a lot of queries where we filter on the client ID dimension, as an example shown here. In this case, we can pre-process the raw data by using a partition function on a client ID. For example, you can do, client_id % 4. We're effectively resharding the raw data into 4 partitions for each of these sources. This can be done with your standard tools such as Flink, or Spark, or whatever you have in your stack. At this point, this sharded data can be ingested in Pinot. When we set up the table, we can specify the same partitioning function so Pinot is aware of this strategy. With this config, Pinot will then track for every single segment, what partition it belongs to.

Let's take a look, how this helps on the query side. When the Pinot broker receives this query, it will retrieve the lookup value and apply the same partition function, and determine that I need to query the partition 1. In this case, let's assume partition 1 is mapped to segment 1. Therefore, the broker only needs to query one of these two servers to compute the final result. With 4 partitions, we have effectively reduced the query span in terms of segments by 75%, which is pretty big. This optimization helps greatly in reducing latencies across the board, especially for large datasets.

Conclusion

We saw the high level architecture and the scale out design of Pinot. We looked at the segment architecture and the rich indexing support, which helps in getting low latency in addition to high throughput. We also saw some powerful knobs that Pinot provides in order to optimize data layout and therefore get more predictable latency.

Resources

If you want to know more, please visit our official page at pinot.apache.org. This is our official Twitter handle, @ApachePinot. I've also given the link for our Slack channel, where you'll find most of us, and we can answer questions online.

 

See more presentations with transcripts

 

Recorded at:

Jan 31, 2021

Hello stranger!

You need to Register an InfoQ account or or login to post comments. But there's so much more behind being registered.

Get the most out of the InfoQ experience.

Allowed html: a,b,br,blockquote,i,li,pre,u,ul,p

Community comments

Allowed html: a,b,br,blockquote,i,li,pre,u,ul,p

Allowed html: a,b,br,blockquote,i,li,pre,u,ul,p

BT