BT

Facilitating the Spread of Knowledge and Innovation in Professional Software Development

Write for InfoQ

Topics

Choose your language

InfoQ Homepage Presentations Monitoring and Tracing @Netflix Streaming Data Infrastructure

Monitoring and Tracing @Netflix Streaming Data Infrastructure

Bookmarks
42:05

Summary

Allen Wang talks about the design and implementation details of the dev/ops tools used by Netflix and highlights the critical roles they play in operating their data infrastructure. Wang showcases how active and targeted tools development for operational use can quickly payoff with improved product quality and overall agility.

Bio

Allen Wang is an architect and engineer in Real Time Data Infrastructure team at Netflix. He architected the multi-cluster Kafka infrastructure for Netflix in cloud environment and is heavily involved in developing the tools needed for operating the streaming data infrastructure. He is an open source contributor for Apache Kafka and NetflixOSS and a frequent speaker for Kafka.

About the conference

Software is changing the world. QCon empowers software development by facilitating the spread of knowledge and innovation in the developer community. A practitioner-driven conference, QCon is designed for technical team leads, architects, engineering directors, and project managers who influence innovation in their teams.

Transcript

Wang: My name is Allen Wang, and I'm with the Netflix Real-Time Data Infrastructure team. Today my talk will be focusing on external monitoring and the tracing system that we have developed in Netflix to improve our production readiness for our streaming data infrastructure.

Today, when I look back, one thing that strikes me is that the tools that we have developed share one common theme, and that is they provide powerful observations made from outside of our core system. You may be wondering what's so special about observing from outside. Let me first talk about a situation that may sound familiar to many of you.

You wake up at night due to a pager PagerDuty call, and you realize that the reason that you have waken up is just because of a latency spike on one of your dependency service. You keep wondering why the guys who are responsible for that service are not the first one to wake up so that they can fix their issue and I can have a good night of sleep? The truth is they may not have the same insight as you do. Their dashboard may look perfectly normal, but it doesn't matter. In a distributed system, the lightness of a service totally depends on the observations made from outside.

Here is another example. This is how the first finding of black hole was confirmed. Black hole, as the name stands, is invisible. In order for scientists to find a black hole, they have to observe objects around it. In this case, scientists found a star called Cygnus X-1, and there's one interesting fact about this star. It changes color from red to blue and blue to red periodically. If you still remember physics, that actually matches the description of Doppler effect. When the object comes towards you, and the object move away from you, its wavelength will change. This star, Cygnus X-1, must be moving towards earth and moving away from the earth periodically. A natural explanation of that behavior is that it's actually circling something. So what is it circling?

It is something invisible, and scientists were able to calculate the mass of this invisible object given the location of the star. The mass actually matches the criteria of a black hole. This is how the conclusion was made that this invisible object was probably a black hole. This story tells us how powerful external observation is. I would argue that this might be the only way for you to find something invisible in your system.

Our Vision

Next I will share our vision to production readiness. Our vision maps into four levels of satisfactions. You start with the basics. Once you satisfy your basic needs, you start to go up and form other basic needs. At the bottom, we have observability. You should have clear insights to your product, and you should share your insights with your customers to earn their trust. Without observing, nothing works. If you feel that you are short of ideas of how to improve your production readiness, I definitely recommend to start with observability.

Once you have achieved observability, the next step will be availability. That means you want your product to be available and reliable to your users most of the time. For our streaming data infrastructure, that means we should make sure that the data keeps flowing end-to-end in our system without interruption.

Once you have achieved availability, you should look into operability. Having a great product doesn't mean that you have to be burned out for operations. For most of the issues, you should have simple and repeatable processes to deal with them, and most of the operations should be automated. In Netflix, our streaming data infrastructure handles about 1,000 streams and data pipelines. Without operability, things will be totally untenable.

Once you have satisfied with operability, availability, and observability, here comes the crown jewel – data quality. Here you want to establish some key indicators for your data quality and maybe also some enforceable SLOs. For this talk I'll be focusing on data transport quality, and specifically, data loss rate, duplicate rate, and end-to-end latency.

To make sense of my talk, I like to share the architecture of our streaming data infrastructure in Netflix. On the left side we have event producers, and they send events to a set of Kafka clusters, which we call Fronting Kafka clusters. As you can see on this picture, we use Flink for stream processing. Once the data get into Fronting Kafka cluster, they are consumed by a set of stream processing jobs, which operate on the infrastructure level. Those stream processing jobs are also called routers in our infrastructure. They do some simple ETL, like projection and filtering. Once the data is processed, they'll be sent to downstream syncs, including S3, Elasticsearch, and another set of Kafka cluster, which we call Consumer Kafka clusters. There, data will be consumed by our customer's streaming processing job and other Kafka consumers.

Now that we have covered our vision, our principle of external observation, and our architecture, let's start with the basics. How we have improved observability with our Kafka broker and consumer monitoring.

Improved Observability

As you can see on the previous architectural chart, Kafka plays a vital role in our infrastructure. It is very important to make sure the Kafka is operational, is healthy. Let's say you create a Kafka cluster, and all the internal metrics looks normal to you. Are you confident enough to put production traffic on it? The Kafka cluster is not operational until you can produce message to it and consume message from it. We come up with a simple idea of creating an external service that constantly send heart-beating messages to the Kafka cluster, and at the same time, consume those messages. We do a little bit of analytical work here also. We will make sure that the messages that it produces matches the messages it has consumed. By doing that, we have gained confidence that the Kafka cluster is truly operational.

Another important functionality we are doing in this external monitoring service is to monitor the Kafka metadata, and we do this in two ways. We watch the ZooKeeper nodes and look for its metadata. We monitor the ISR state in ZooKeeper as well. At the same time, this external monitor service will also talk to brokers to get the metadata directly, and we try to match the metadata we get from brokers versus the metadata we get from the ZooKeeper to make sure they match. If there's a mismatch, usually, it's an indication they're some kind of network partitioning has happened. Either the broker cannot talk to the ZooKeeper or the broker cannot connect to the rest of the Kafka clusters. At that time, you need to enable some automated actions. For example, you should maybe restart the JVM of your Kafka broker or just completely replace that broker.

Another important functionality that we do in this monitoring service is to monitor the consumer and the stream jobs. Kafka consumers will periodically commit its offset to the Kafka cluster. The committed consumer offset becomes a vital sign to tell if the consumer is healthy. From monitoring the committed offset, we can derive the consumer lag as well as to get a signal of whether the consumer has been completely stuck. What we do here is we fetch the log end offsets from the Kafka cluster, and we also fetch the committed consumer offsets from the Kafka cluster. We do a little simple calculation to note the consumer lag, which is the log end offsets minus the committed offsets for any topic partition.

Another signal that we try to derive is whether the consumer is completely stuck. We do this by observing the continued staleness of the committed consumer offset in a given window. Let's say we set this window to be three minutes, as I shown in this example. In the first three minutes, the committed consumer offset has not been changed. It has been all 3, while the log end offset has been making some progress. At the fourth minute, upon observing the consumer offset is still unchanged, we'll declare this consumer to be stuck. At the fifth minute, we noticed an advancement of the committed consumer offset, so we'll just clear the stuck signal.

We do provide REST endpoints for this marketing service. We expose the consumer lag information as well as the broker metadata information. By doing this, we can build a lot of the useful automations on top of those.

If you are familiar with Kafka, you may notice that Kafka already provide internal metrics for consumer lag. You may be asking, why bother creating your own service to monitor the consumer lag? These internal metrics works well until the consumer stops [inaudible 00:13:24] or completely crash. At that time, this metric may become completely stale or be dropped from the metric system. At that point, it will be very confusing and misleading.

The second reason that we want to have this service is that we can easily derive the stuck signal, because purely relying on the Kafka's internal metric, there's no way that you can get that stuck signal.

Raise your hand if you know this song. It's kind of stuck in my head, because it actually reflects the feeling that we have towards our stuck consumer alert. It's always the mix of love and hate. I'll explain why.

First, the love. Getting this stuck consumer alert can help us we catch all kinds of issues, whether its internal or external to our system. It dramatically improves our service quality. If you have a streaming data service, the last thing that you want to have is your customer comes to you to complain about their job getting stuck, and you have no idea about it. This stuck consumer alert help us to avoid this kind of embarrassment. It also helped us to earn the customer's trust.

Why the hate? It is truly a nightmare for us. Since the creation of this alert, it immediately becomes number one wake-up calls from PagerDuty. It's actually very difficult to debug, and there are so many possibilities that can cause a consumer getting stuck. Here I listed some of the possible causes to get on this consumer stuck alert. You can see it ranges from source, infrastructure, sync, and processors. It may be even caused by problems in committing offset. Imagine that you wake up at night, and you are facing so many possibilities to deal with.

To help us to truly identify the cause of consumer getting stuck, we need to further look into debugging logs, and possibly, try to find some metrics. In some cases, after we identify some pattern or some failure modes, we will create additional logging or additional metrics so that next time we know where to look for. The biggest help comes from auto remediations.

Improved Availability and Operability

In the next section, I'm going to talk about how auto mediations help us to improve the operability and availability in our data infrastructure.

The first thing that we have noticed is that for a stateless streaming job, simply in relaunching a job upon getting this stuck consumer alert have a good chance to actually getting that consumer out of the stuck state. We create this tool called first responder. As the name implies, it will automatically relaunch a status job upon getting the first consumer stuck alert.

Of course, we'll still go investigate the root cause afterwards. Also, when the fixed relaunch does not fix the problem, the further relaunch will be suppressed. Here is the screenshot of the first responder reaction. As you can see, it indeed save the night for a couple of times for us.

The second useful automations that we derive from our consumer lag monitoring is streaming jobs autoscaling. As you can see, for most of our streams, their traffic fluctuate on a daily base. It is very inefficient to use a fixed capacity for all streaming jobs. Without autoscaling, all we can do initially is to pin our capacity high.

The evening that we pin our capacity high, sometimes is still not sufficient, because the traffic can grow organically. On this graph, the gray area designates the traffic volume, and the red line designating the consumer lag. You can see, after the traffic exceeds a certain threshold, the consumer on lag will increase dramatically. Only after this peak hour is over, then the lag can disappear and the job can finally catch up. The direct impact of this is that the customer will see increased latency at peak hours.

In order to get rid of this catch-up game, we want to do autoscaling. The idea of autoscaling is this. For example, you noticed a lag for your streaming job, and you want to get rid of it in 15 minutes. You need to calculate the new capacity needed for the job. Once you calculate the new capacity, you need to relaunch the job, and then the job relaunch will take about two minutes. You have 30 minutes left to do the actual catch-up.

The first thing you need to figure out is the expected incoming events in the 15 minutes of time frame. These are the new events coming into the pipeline. You also need to know the consumer lag. You add them together to get the number of total events that you'll be processing in this 15 minutes. You divide that total events by the catch-up time and you'll get the target rate. Once you know the target rate, it's very easy to figure out the new expected capacity for your job.

One thing we need to get in order to make this work is predicting the future workload. What we do here is we try to fit the quadratic regression on our traffic curve. If it turns out that this error is too large, we'll just go back to linear regression. Of course, this sounds like overly simplified. In practice, we do find that this calculation does not cover all cases, and there's a lot of room for improvement. At least it gave us a head start for us to do autoscaling so that we can save cost and reduce the operational burdens.

This is a screenshot showing the streaming jobs autoscaling in action. On the upper part of the graph, you can see the traffic volume. On the lower part, that's the number of containers needed for that streaming jobs. Basically, that translates to the resource and the capacity of the job. As you can see, the number of containers actually scale up and down quite nicely according to the traffic pattern.

Improved Data Quality

Finally, we covered the operability, availability, and observability, and then we can get into the discussion of data quality. Here I want to talk about how we used loss detection to be able to find the lost messages and even auto recover those messages to improve our data quality. As I mentioned in the beginning of the talk, I'll be focusing on the data transport quality here, which is the loss rate, duplicate rate, and end-to-end latency.

Let's look into data loss detection in details. I have seen in several places where people rely on count to detect loss. The idea is simple. You first count the number of messages sent by your producer in a time frame. You also count the number of messages received by your consumer in the same time frame. You try to compare your received count and the send count.

The question is that, if your received count is always greater than the send count, are you confident that you delivered every message? The answer is no, because in the receive account, there's probably some duplicates, and the duplicates can be caused by producer doing retries under the hood, which you cannot account for. Or, the consumer sometimes reset his offset to an earlier position. Another problem with the count is that the aggregation is tricky. Are we talking about event time or processing time? And how do we handle the late arrivals of the message? The third reason that I don't like using count is that the identification of the messages is lost in aggregation. Even if you can detect count, there's no way that you can recover your loss.

We decided to turn to a widely accepted concepts in microservice world, which is tracing. The basic idea is we want to trace the data as they move along in the pipeline, and we'll also generate traces as point of interests. At the data ingestion point, we want to determine if we want to trace a particular message by random sampling, and this sampling rate is configurable. If you configure sampling rate to be 100%, that just means you want to trace every single message. Once we gather those traces, we want to analyze the traces externally.

We are also doing it quite differently from the tracing in microservicing world. Our emphasis here is on data loss detection. We want to identify lost messages and be able to automatically recover those lost messages. It is not about creating the call graph, because we already have the data lineage information from control plane or our management service. Instead we want to leverage the data lineage information to derive the data loss information. In other words, we know the call graph, but we want to figure out who doesn't make that call.

We created our autotracing system called Inca. Inca is a famous mountain trail in South America. It is worth noting that we typically use mountain names for our services in our streaming data infrastructure. Inca is a clear exception from that naming convention. You can figure out that we want to rely on Inca to reach higher ground.

This is how we do tracing. Assuming you have a typical date pipeline, where you have your event producer, produce messages to a Kafka cluster A, and the message is consumed by your stream processing job. Let's assume it would do some simple enrichment and keep the same identity of the message. Then the final message will be produced to a cluster B.

We want to keep using Kafka for our tracing, and we created a dedicated Kafka cluster to hold all the trace messages. When the producer sends a message with ID 0001 to Kafka cluster A, at the same time, send a trace message to the tracing cluster. The trace message will have the message ID, which is the same as the original message ID. You have the type of sent, and remote location being A, and the offset of that message. Same as the streaming processing jobs, upon receiving the message of ID 0001, it will send a trace to the tracing Kafka cluster with the same ID and with the type being received, and the remote location being A. Once the processing is done, and the final message is produced to the Kafka cluster B, another trace message will be generated. This time it had the same ID and the type of being sent and the remote location being B.

Now we gathered all our trace messages in our Kafka cluster, and we want to do stream processing for those messages to get real-time results, because we don't want to wait for hours or days to detect a loss. We want to detect loss immediately within minutes. The first thing that the job does is do a key by, the traced ID, or the same as message ID. By doing this, you can imagine that a lot of messages are being shuffled to different places. The traces for the same message will be shuffled to the same task.

Now that we have access to all the trace messages for the for all the traces of the same message in one place, and that can actually give us tremendous insights to see what's going on there. If we find that, let's say, one trace is missing for a given location and a given action type, we can claim that the message is lost there. If we received more than one traces for a certain location and certain action type, we can claim duplicates. Same as latency, we can easily calculate the latency for any data hub using the timestamp in the traces.

We immediately faced a challenge. In the streaming processing job, how long we should be waiting for the trace to arrive? Because trace arrival time is unpredictable in our system. It is not unusual because it actually matches the behavior of the messages flowing in our data system. While most of the messages are delivered end-to-end within a few seconds, there are messages that can take hours to be delivered. This is just caused by the streaming job is severely lagging behind.

As I mentioned before, we use Flink for stream processing. In Flink, it's very hard to find a built-in window function that can efficiently deal with this kind of long tail. We decided to use Flink custom trigger with global window. What it essentially translates to is that we want to take complete control of how long to wait and what actions to take after the wait is over.

We still face the challenge of how long to wait. If we wait too long, but the message is already lost, then we are just wasting resources, and the job will become unscalable. If we wait too short, the system may [inaudible 00:30:49] a lot of false alarms. If we prematurely declare a message as lost, the false alarm will definitely arrive, because the actual message has not arrived yet, and we just declared it lost at that time. It's like waiting a train that's not always sticking to the schedule. When you rush to the platform and you found the train is not there, you will face dilemma of whether to wait or not. If you choose to wait, you might keep wondering if you are waiting for a train that will never arrive. To solve that problem, we needed some external inputs.

We decided to turn to our old friend, committed consumer offset. Recall that in our trace messages, we actually record the actual offset of the message. What we can do here is we can compare the committed offset versus the message offset. If the committed offset is greater than the message offset, but we don't have the trace, that means the message is probably lost.

Here I'm showing you the simplified logic of our stream processing job. It always starts with some kind of event. The event can be, a trace is received or a timer has been triggered. At that time, we will check if all expected traces has been received for that message. If we do receive all the traces, we will be probably good. If not, we'll compare the committed offset versus the message offset. If the committed offset is greater than the message offset, we'll report it lost as I explained in the previous slide. If not, that just means the consumer is still trying to catch up. At that time, we'll just set a new timer and keep waiting.

Here is the completed picture for our tracing system Inca. The data producers and processes will constantly send us all kinds of trace messages to our dedicated Kafka cluster for tracing. We have our stream processing jobs that consume from those trace messages from Kafka cluster. It will also get external input from our control plane and Kafka clusters. It will get the data energy information from our control plane so that we can figure out what are the expected traces for any message. It will get the committed offset from the Kafka cluster so we will know how long to wait for the trace messages.

After the processing is done, it will produce its final outputs to two locations. It will produce the loss rate, duplicate rate, and latency metrics to our telemetry system called an Atlas. Most importantly, it will publish identity of the lost messages to a Kafka topic. That's where our customer can subscribe to that lost message Kafka topic and apply their own auto recovery.

You might be asking, those trace messages are also Kafka messages and can be lost. The answer is definitely yes. We understand that a lost trace can lead to false alarms or some inaccuracy in our analysis. I also want to argue that to validate your streaming data, there's no perfect solution that can give you 100% confidence or 100% accuracy. You will probably end up having to have multiple data validation solutions all together to give you the best confidence that you can have. Tracing is just one of those data validation solutions.

That being said, we do maximize the delivery guarantee for our traces. For example, we have a dedicated trace in Kafka cluster, and it uses a very durable AWS EBS. Finally, let's say a false alarm is generated due to a lost trace. It is still ok. One solution for getting this kind of false alarms due to lost a lost race is that you apply your auto recovery anyway. For at least one delivery, this may introduce a little bit of duplicates. Given at least one semantics, this is totally acceptable given that our loss rate for the traces is very low.

Now you might be wondering, you have this wonderful tracing system, what kind of problem have you detected? We have detected three major categories of data loss. The first category is caused by less durable configurations. This is understandable, because when we initially create our Kafka cluster and configure our streams, we have taken into account of cost availability and durability and balanced them amount. We may end up choosing a less durable configuration for saving cost. For example, if you have a Kafka topic with replication factor of two, producer X equals one, naturally this might lead to a tiny bit of message losses. It can happen at some extreme, AD replication, especially when there's a change of leader in a partition.

The second major category that we found for data loss is some extreme situations. Let's say you have a Kafka topic with a replication factor of nine, but you just happen to lose all nine of your replicas. That's very unfortunate, but it can happen. In one interesting case, we found that a partition leader has a significant clock drift. That leads to unexpected log truncation happening at unexpected time. Luckily, we have the tracing in place at the time, and we were able to detect the message loss caused by this unexpected log truncation.

The third major category for the data loss is actually human factors. These are human errors, code errors, or operation errors. I can give you one example. In our deployment tool, we have a design flaw where, in some rare cases, it can assign the same consumer ID to two different stream processing jobs. Those two stream processing jobs can step on each other's toe when they restart. What happens is when one job shuts down, it will commit its last offset to the Kafka cluster. When it comes back again, it tried to fetch the last committed offset, but what it gets is something else committed offset. It will naturally lead to message loss.

I want to point out that for the second and third category, it can happen regardless of how durable your Kafka configuration is. I would even argue that it can happen for any system regardless of how strong their delivery guarantee is. That leads me to think that detecting message loss is not that different from finding a black hole. As you recall in the beginning of my talk, to find a black hole, you have to observe objects around it. In this case, the black hole is the Kafka broker with a bad hardware that eats our messages. What are the objects around it? The producers and consumers. If you just look at your Kafka broker, there's no way that you can find that elusive message. If you look at the producer and consumer and the traces that they generate, it's very easy to figure out that some message has been lost.

Takeaway

This is the takeaway of my talk. We started with observability, where we use Kafka broker and consumer monitoring tool improve observability. On top of that, we created auto remediations to help improve our operability and availability. Specifically, we use our first responder to automatically relaunch our stateless jobs once it gets the stuck state. Also, we use autoscaling to help us to efficiently use our resources while keeping the consumer lag at minimal level. Finally, we use message tracing to help us detect message loss and auto recover them to improve our data quality.

Again, I would like everybody to remember the power of external observation. I hope someday you can use that to find your own black hole.

By the way, I have a completed blog post for our message tracing system in our Netflix tech blog, so check it out if you are interested. If you are interested to tackle with problems in the real-time infrastructure, we are hiring, and welcome to join us.

 

See more presentations with transcripts

 

Recorded at:

Mar 20, 2020

BT