BT

Facilitating the spread of knowledge and innovation in professional software development

Contribute

Topics

Choose your language

InfoQ Homepage Articles Building Latency Sensitive User Facing Analytics via Apache Pinot

Building Latency Sensitive User Facing Analytics via Apache Pinot

Bookmarks

Key Takeaways

  • User Facing analytics helps companies increase engagement with their customers and end users.
  • Apache Pinot is a great platform to build such user facing analytical applications that require ability to perform high throughput, low latency analytical queries.
  • Pinot’s columnar data layout, clubbed with its rich indexing and pre-aggregation capabilities enables it to serve complex queries in milliseconds.
  • Pinot has a distributed, scale out architecture that is built for multi-tenancy and simplifies overall cluster operation. 
  • Pinot provides powerful knobs to guarantee query throughput and response time SLA in production use cases.

 

QCon, a virtual conference for senior software engineers and architects covering the trends, Chinmay Soman talked about how you can use Apache Pinot as part of your data pipelines for building rich, external, or site-facing analytics. He has been working in the distributed systems domain for the past 10+ years – currently a founding engineer in StarTree which is a startup based on Apache Pinot.

Before this, he led the streaming group at Uber, whose mission was to build the next generation real-time analytics platform. Furthermore, Soman is a committer of Apache Pinot and some other open-source projects.

Outline

His talk began by discussing what exactly site-facing analytics is and why we need an underlying system that can support high throughput, low latency analytical queries. Furthermore, he introduced Apache Pinot and talked about its high-level architecture and how it's optimized for such use cases. Next, some scenarios where maintaining the p99th latency SLA becomes non-trivial. And finally, how to optimize input data pipelines and Pinot data layout to obtain predictable latency.

 LinkedIn - FeedRanking

What exactly is site-facing analytics? By this, we typically refer to the analytical tools and solutions exposed directly to the end-users or your company's audience. One of the best examples is the LinkedIn feed ranking use case.  Many of its users are familiar with the LinkedIn news feed. To keep the news feed interesting for LinkedIn users, it is essential to ensure that the user does not see the same thing repeatedly. In other words, for any given story, we need to know how many times has the user seen this in the last 14 days or so? Which can be done with a SQL query, something like this.

At a high level, this might look straightforward. Keep in mind; this query needs to be executed for every active user every time they visit the LinkedIn homepage, which translates to multiple thousands of such OLAP queries executed on a massive database of all the LinkedIn members. Each such query must execute pretty quickly, in the order of milliseconds. Otherwise, it's going to lead to a terrible experience for its users.

Uber Eats - Restaurant Manager

Let's take another example: the Restaurant Manager from Uber Eats - a tool that's given to restaurant owners across the globe to slice and dice the data from Uber Eats. When you open the tool, you can see a view of something like this

You see missed orders or inaccurate orders, which is more real-time, and other information such as top-selling items, menu feedback, and so on. You can imagine loading this view; one needs to execute multiple complex OLAP queries running concurrently. Multiply this with all the restaurant owners across the globe leads to a lot of QPS to the underlying database. All these queries must execute in the order of milliseconds for a good experience for our users. It is clear, to build such rich analytics for an external audience, one needs an underlying analytical system that can ingest data from real-time and historical sources and support high throughput, low latency queries reliably.

Apache Pinot at a Glance

Enter Apache Pinot. Apache Pinot is a distributed OLAP store that can ingest data from various sources such as Kafka, HDFS, S3, GCS, and so on and make it available for querying in real-time. At the heart of the system is a columnar store. It also features a variety of intelligent indexing techniques and pre-aggregation techniques for low latency. All these optimizations make it an excellent fit for many analytical use cases, such as business metrics, or dashboards, or the analytical application we saw with LinkedIn, or even things such as real-time, ad hoc exploration, anomaly detection, and so on.

 Pinot is a mature product today and used in many big data companies globally in production. The usage is rapidly increasing. Of course, no introduction is complete without vanity metrics. Some of the most significant Pinot clusters can ingest data at a rate of upwards of a million events per second, support hundreds of thousands of queries per second while still maintaining a low latency in the order of milliseconds.

High-Level Architecture

How does Pinot look like under the hood? See the data sources at the bottom of the diagram, where to get the raw data from.

One can use Pinot as part of your real-time pipelines, where the data flows in through either Kafka, Kinesis, Pub/Sub, and so on. In this case, Pinot will be ingesting the data one event at a time. Furthermore, one could also use Pinot as part of your batch pipeline, where the raw data is sitting somewhere in HDFS or S3, and run an ingestion job to bulk load that data into Pinot. Pinot uses this classic lambda architecture to simplify data ingestion and querying.

Furthermore, one can run both these pipelines for the same table, and it will be presented as one unified view to the user, which is pretty convenient. On the top right of the diagram, there is a controller, which uses Apache Helix to coordinate different cluster operations, such as partitioning, replication, load balancing, instance allocation, etc. Finally, there is the data plane consisting of brokers and servers. Servers will get the incoming data, organize it in a columnar format, and make it available for local queries. Brokers will get the queries issued by users or applications and then do a distributed scatter/gather to compute the final result.

Pinot Data Flow (Real-time)

Let's take an example of a real-time pipeline with Pinot to illustrate the flow of data. Here we have the Pinot controller, Pinot broker, four servers, and a Kafka topic with four partitions. Let's assume we want to set up a table to ingest from this topic. The first thing that the controller does is to fetch the Kafka metadata, discover the four partitions, and decide that it needs to create four segments for each of these partitions. A segment here refers to the unit of partitioning and replication in Pinot. It basically refers to a chunk or subset of your data. You'll have many segments for a given table. You can see how the controller has also assigned these segments to the four servers. It will use Apache Helix to write this ideal state into the ZooKeeper, and the four servers will start reading from Kafka for the corresponding partition and create a local segment. At some point, the segment will be complete. Then the controller will create the next set of segments, and so on.

With any sound distributed system, we want to make sure our data is replicated. Let's set that up. Let's configure our table with a replication factor of 2. The controller will create this config and ensure that every segment is replicated on two distinct servers, as shown below.

 At this point, data is flowing from Kafka to all these servers in parallel. When the Pinot broker receives a query, it will do a distributed scatter/gather and contact all the servers responsible for this table. What it effectively does is to pass on this query to each such server. Each server will then locally execute this query for the corresponding segments from that table and return an intermediate response to the broker. The broker will do a final aggregation and then return the response back to the user. This is  an excellent scale-out design adopted by Pinot. If you're ever running out of capacity or want more throughput from your system, all you need to do is add more machines. The controller will then allocate new segments onto the new machines. The broker will automatically discover these new segments.

Pinot Segment: Designed for Low Latency

Let's take a deeper look into Pinot Segment, the secret sauce behind the low latency aspect of Pinot. A segment is a chunk or subset of the data. Let's assume the figure on the left is some number of rows of raw data, which needs to be converted into one segment. Internally, the segment will organize this data in a columnar format. All the data for a given column (eg: country) will be grouped together and stored in a contiguous manner – an excellent way for executing analytical queries. For example, if you get a simple query, something like count (*) where the country is the U.S., all we need to do is process this one column instead of all the rows from the segment, which obviously leads to low latency.

What's more, Pinot allows us to add various indexes on any of these dimensions. For example, we can add an inverted index on the country dimension. Instead of scanning the country column, we can just look up the exact entries corresponding to the U.S. and get an answer quickly. Similarly, we can add a sorted index on the country column or even a star-tree index. Star-tree is a particular type of index that allows us to maintain pre-aggregated values for some combination of dimensions, such as country and browser.

 For this particular query, we can get the result with a single lookup, almost like a key-value store, which is incredibly fast. These are just some of the examples of indexing techniques. You can use many others, for example, range index, text index, geospatial index, Bloom filters, and so on. We are constantly adding these things. All these optimizations allow us to execute queries in one segment quickly, which in turn lowers the overall server latency, leading to a low latency across the cluster.

P99th Latency Problem

Let's shift gears and discuss when things go wrong, especially with large-scale deployments of Pinot. For most Pinot deployments, things will be running fine out of the box. Then in large-scale cases, your latency graph might look something like below:

As you can see, your 50th percentile latencies are doing fine, somewhere between 50 milliseconds to 100 milliseconds. Your p99th latency is quite alarming. In fact, at some point, it starts timing out at 10 seconds. Let's take a look at why this could happen. With significant use cases, you might end up provisioning a lot of Pinot Servers. Every now and then, one of these servers might slow down, which could be because it's undergoing some memory pressure and thus garbage collection, or has some intermittent CPU spikes. Subsequently, this ends up lowering the overall latency across the cluster. The more the number of servers, the higher the probability of hitting such a server, and thus, it might increase your tail end latencies. In a similar vein, you might end up with a lot of segments per server. Each server has to do a lot of work to execute the local query. Please note, in this case, it will not only increase the p99th latency but latencies across the board.

Optimizing Data Layout

What can we do about this? The general approach for getting more predictable latency is to limit the query span or to limit the number of things we need to process to execute the query. The two approaches we have here are reducing the number of servers processed to execute a query and reducing the number of segments processed for doing this. Let's look at this one by one. Please keep in mind; these approaches are complementary to each other.

Replica Groups

For the first one, Pinot provides a mechanism known as the replica group. A replica group refers to a subset of servers, which contain a complete copy of all the segments for a given table. For example, we can configure this in our table config, as shown on the top right. Here we have two replica groups with two server instances per group. The pinot controller reads this config; it will make sure that each group has all the segments for the table. With this setup, when the Pinot broker receives a query, it needs to send the query to only one of these two groups to get a final result. As you can guess, we have effectively reduced the number of servers to be processed by 50%, which will drastically help in improving your tail-end latencies.

Partitioning

For the second approach, where we want to minimize the number of segments processed, we rely on data partitioning. In this case, we need to pre-process the data before it can be ingested into Pinot. Let's assume we want to set up a Pinot table, and we're going to execute a lot of queries where we filter on the client ID dimension, as an example shown here. In this case, we can pre-process the raw data by using a partition function on a client ID. For example, you can do client_id % 4. We're effectively sharding the raw data into four partitions for each of these sources, which can be done with standard frameworks such as Flink, or Spark, or whatever you have in your stack. At this point, this sharded data can be ingested in Pinot. When we set up the table, we can specify the same partitioning function, so Pinot is aware of this strategy. With this config, Pinot will then track for every single segment what partition it belongs to.

 Let's take a look, how this helps on the query side. When the Pinot broker receives this query, it will retrieve the lookup value, apply the same partition function, and determine that it needs to query partition ‘1’. In this case, let's assume partition one is mapped to segment ‘1’. Therefore, the broker only needs to query one of these two servers to compute the final result. With four partitions, we have effectively reduced the query span in terms of segments by 75%, which is pretty big. This optimization helps significantly in lowering latencies across the board, especially for large datasets.

Conclusion

We saw the high-level architecture and the scale-out design of Pinot. We looked at the segment architecture and the rich indexing support, which helps in getting low latency and high throughput. And lastly, we also saw some powerful knobs that Pinot provides to optimize data layout and therefore get more predictable latency.

About the Author

Chinmay Soman has been working in the distributed systems domain for the past 10+ years. He started out in IBM where he worked on distributed file systems and replication technologies. He then joined the Data Infrastructure team at LinkedIn. Until recently, he was a Senior Staff Software Engineer in Uber. Currently, he’s a founding engineer in a stealth mode company.

 

 

 

 

Rate this Article

Adoption
Style

Hello stranger!

You need to Register an InfoQ account or or login to post comments. But there's so much more behind being registered.

Get the most out of the InfoQ experience.

Allowed html: a,b,br,blockquote,i,li,pre,u,ul,p

Community comments

Allowed html: a,b,br,blockquote,i,li,pre,u,ul,p

Allowed html: a,b,br,blockquote,i,li,pre,u,ul,p

BT