BT

Facilitating the Spread of Knowledge and Innovation in Professional Software Development

Write for InfoQ

Topics

Choose your language

InfoQ Homepage Articles Migrating Netflix's Viewing History from Synchronous Request-Response to Async Events

Migrating Netflix's Viewing History from Synchronous Request-Response to Async Events

Bookmarks

Key Takeaways

  • Asynchronous processing can improve the availability of a web service by relieving backpressure
  • Apache Kafka is a good solution for implementing a durable queue between service layers
  • Operating Kafka at large scale introduces several challenges, including data loss, processing latency, and duplicate or out-of-order records
  • Choosing a Kafka consumer platform depends on the use case and includes several tradeoffs
  • Use shadow testing and incremental rollout for a confident and smooth deployment

Suppose you are running a web-based service. A slowdown in request processing can eventually make your service unavailable. Chances are, not all requests need to be processed right away. Some of them just need an acknowledgement of receipt. Have you ever asked yourself: “Would I benefit from asynchronous processing of requests? If so, how would I make such a change in a live, large-scale mission critical system?”

I'm going to talk about how we migrated a user-facing system from a synchronous request-response based system to an asynchronous one. I'm going to talk about what motivated us to embark on this journey, what system design changes we made, what were the challenges in this process, and what design choices and tradeoffs we made. Finally, I'm going to touch upon the validation process we used as we rolled out the new system.

Original Architecture

Netflix is a streaming video service available to over 200 million members worldwide. Members watch TV shows, documentaries, and movies on many different supported devices. When they come to Netflix, they are given a variety of choices through our personalized recommendations. Users press play, sit back, and enjoy watching the movie.

While the movie plays, during the playback we collect a lot of data, for both operational and analytical use cases. Some of the data drives our product features like continue watching, which lets our members stop a movie in the middle and come back to it later to continue watching from that point on any device. The data also feeds personalization and recommendations engines, and the core business analytics.

I'm going to talk about our experience migrating one of the product features, viewing history, which lets members see their past viewing activity and optionally hide it. Let's look at our existing system before the migration. At a high level, we have the Netflix client on devices such as mobile phones, computers, laptops, and TVs, that is sending data during playback into the Netflix cloud.

First the data reaches the Gateway Service. From there it goes to the Playback API, which manages the lifecycle of the playback sessions. In addition, it sends the playback data into the Request Processor layer. Within the Request Processor, among other things, it is storing both short term and long term viewing data into persistence, which for us is Apache Cassandra, and also into a caching layer, EVCache, which lets us do really quick lookups.

Backpressure

Most of the time, this system worked absolutely fine. Once in a rare while, it was possible that an individual request being processed would be delayed because of a network blip, or maybe one of the Cassandra nodes slowed down for a brief time. When that happened, since this is synchronous processing, the request processing thread in the Request Processor layer had to wait. Then this in turn slowed down the upstream Playback API service, which in turn slowed down the Gateway Service itself.

Beyond a few retry strategies within the cloud, the slowdown can hit the Netflix client that's running on the user’s device. Sometimes this is referred to as backpressure. Backpressure can manifest itself as unavailability in your system and can build up a queue of items that the client may have to retry. Some of this data is very critical to what we do and we want to avoid any data loss; for example, if the clients were to fill their local queues, which are bounded.

Our solution to this problem was to introduce asynchronous processing into the system. Between the Playback API service and the Request Processor, we introduced a durable queue. Now when the request comes in, it's put into the durable queue, and immediately acknowledged. There is no need to wait for that request to be processed.

It turns out, Apache Kafka fits this use case pretty well. Kafka presents a log abstraction to which the producers like Playback API can append to, and then multiple consumers can then read from the Kafka logs at their own pace using offsets.

This sounds simple. But if we simply introduce Apache Kafka in between two of our processing layers, can we call it done? Not quite. Netflix operates at a scale of approximately 1 million events per second. At that scale, we encountered several challenges in asynchronous processing: data loss, processing latencies, out of order and duplicate records, and intermittent processing failures. There are also design decisions around Kafka consumer platform choice as well as cross-region aspects.

Challenge: Data Loss

There are two potential sources of data loss. First: if the Kafka cluster itself were to be unavailable, of course, you might lose data. One simple way to address that would be to add an additional standby cluster. If the primary cluster were to be unavailable due to unforeseen reasons, then the publisher---in this case, Playback API---could then publish into this standby cluster. The consumer request processor can connect to both Kafka clusters and therefore not miss any data. 

Obviously, the tradeoff here is additional cost. For a certain kind of data, this makes sense. Does all data require this? Fortunately not. We have two categories of data for playback. Critical data gets this treatment, which justifies the additional cost of a standby cluster. The other less critical data gets a normal single Kafka cluster. Since Kafka itself employs multiple strategies to improve availability, this is good enough.

Another source of data loss is at publish time. Kafka has multiple partitions to increase scalability. Each partition is served by an ensemble of servers called brokers. One of them is elected as the leader. When you are publishing into a partition, you send the data to the leader broker. You can then decide to wait for only the leader to acknowledge that the item has actually been persisted into durable storage, or you can also wait for the follower brokers to acknowledge that they have written into persistence as well. If you're dealing with critical data, it will make sense to wait for acknowledgement for all brokers of the partition. At a large scale, this has implications beyond just the cost of waiting for multiple writes.

What would happen if you were to lose the leader broker? This happened to us just a couple of months after we deployed our new architecture. If a broker becomes unavailable and you are waiting for acknowledgement from it, obviously your processing is going to slow down. That slowdown causes backpressure and unavailability, which we're trying to avoid.

If we are only waiting to get acknowledgement from just the leader broker, there's an interesting failure mode. What if you were to then lose the leader broker after a successful publish? Leader election will come up with a different leader. However, if the item that was acknowledged by the original leader was not completely replicated into the other brokers, then doing such an election of the leader could make you lose data, which is what we're trying to avoid. This is called an unclean broker leader election.

How did we handle the situation? Again, there's a tradeoff here to make. We have a producer library that is a wrapper on top of the Kafka producer client. There are two optimizations that are relevant here. First, because we use non-keyed partitions, the library is able to choose the partition it writes to. If one partition is unavailable because the leader broker is unavailable, our library can write to a different partition. Also, if the partition is on an under-replicated set of brokers---that is, the leader broker has more items than the follower leaders, and the replication has not caught up completely---then our library picks a different partition that is more well replicated.

With these strategies, we eventually ended up choosing to write in asynchronous mode, where the publisher writes it into an in-memory queue and asynchronously publishes into Kafka. This helps scale performance, but we were interested in making sure we have an upper bound on the worst-case data loss we would incur if multiple errors are all happening at the same time. We were happy with the upper bound we were able to configure based on the in-memory queue size, and the strategy of avoiding under-replicated partitions.

We monitor this data durability, and we consistently get four to five nines from it, which is acceptable for us. If your application must not lose any items of data, then you may want to pick acknowledgment from all brokers before you call that item processed.

Challenge: Processing Latency and Autoscaling

One unavoidable side effect of introducing Kafka into our system is that there is now additional latency in processing a request; this includes the time required for the Playback API to publish the data to Kafka and for the Request Processor to consume it.

There is also the amount of time that the data waits in the Kafka queue. This is referred to as the lag, and it is a function of the number of consumer worker nodes and of the traffic volume. For a given number of nodes, as traffic volume increases, so will lag.

If you have a good idea of the peak traffic you're going to get, then you can figure out the number of consumer processing nodes you need in your system to achieve an acceptable lag. You could then simply provision your system to manage the peak traffic volume, or “set it and forget it.”

For us, the traffic changes across the day and across the day of the week as well. We see a 5x change from peak to trough of our traffic. Because of such a big volume change, we wanted to be more efficient with our resources, and we chose to autoscale. Specifically, we add or remove a certain number of consumer processing nodes based on the traffic.

Whenever you change the number of consumers of a Kafka topic, all of that topic’s partitions are rebalanced across the new set of consumers. The tradeoff here is resource efficiency versus paying the price of a rebalance. Rebalance can affect you in different ways.

If your processing is stateful, then you would have to do something complex. For example, your consumers may have to  pause processing, then take any in-memory state, and checkpoint that along with the Kafka offset up to which they have processed. After the partitions are rebalanced, the consumers reload the checkpointed data, and then start processing from the checkpointed offset.

If your processing is a little simpler, or if you are storing state in an external fashion, then it is possible for you to let the rebalance happen and just continue normally. What's going to happen here is that since you may have had items that were in process when the rebalance starts, and have not been acknowledged into Kafka, those items would show up on another processing node, because that node now gets that partition after the rebalance. In the worst case, you are going to reprocess some number of items. If your processing is idempotent or if you have other ways of dealing with duplicates, then this is not a problem.

The next question is: when and by how much to auto scale? One might think lag is a good metric to trigger auto scaling. The problem is that you cannot easily scale down by this metric. When the lag is zero, how do we know if we should scale down by one processing node, 10, or 50? Removing too many nodes at once might result in “flapping”: removing and re-adding nodes over and over in a short time span.

In practice, many developers use a proxy metric; for example, CPU utilization. For us, records-per-second (RPS) turns out to be a good trigger for autoscaling. When our system is in a steady state, we measure the RPS to establish a baseline. Then we can add or remove nodes as our throughput changes relative to that baseline.

We also have different patterns for scaling up vs. scaling down. We want to avoid rebalances during scale-up, because we are already seeing a lot of incoming data, and rebalances will temporarily slow down consumers, so we want to quickly scale up. Scaling down can be done more gradually, since the current throughput is higher than it needs to be and we can accept the slowdown from a rebalance.

Challenge: Out of Order and Duplicate Records

Out of order and duplicate records are going to happen in a distributed system. How you address this problem will depend on the specifics of your application. In our case, we apply sessionization, which is collecting events for a video playback session that has specific start and stop events. Therefore, we collect all events for a session within those boundaries.

Given multiple events of a session, and based on certain attributes within the data, we can order them and also deduplicate them. For example, each event could have a monotonically increasing ID or a timestamp from the client. For writes, we are able to deduplicate them by using the timestamp when the event reaches our server.

Challenge: Intermittent Processing Failures

On the consumer side, we still have to deal with intermittent failures of processing. Typically, we wouldn't want to hold up processing the entire contents of the queue because of one failed item---sometimes referred to as head-of-line blocking. Instead, we put the failed item aside, process the rest of the queue, and then come back to it later.

A characteristic we would like for such a system is that there should be a finite time elapsing before we try it again; there is no need to try it immediately. We can use a separate queue, a delay queue, for these failed items. There's a variety of ways you can implement this. Maybe you could write it into another Kafka topic, and then build another processor that builds in a delay.

For us, it's very easy to achieve this using Amazon Simple Queue Service (SQS), since we already operate on Amazon Elastic Compute Cloud (EC2). We submit failed items to an SQS queue, and then the queue has a feature to optionally specify an interval before the item is available to consume.

Consumer Platform

There are multiple platforms we could use for consuming and processing items from Kafka. At Netflix, we use three different ones. Apache Flink is a popular stream processing system. Mantis is a stream processing system that Netflix open-sourced a few years ago. Finally, Kafka has an embeddable consumer client, which allows us to write microservices that just process the items directly.

We started out with the question of: which platform is the best one to use? Eventually we realized that's not the right question; instead, the question is: which processing platforms benefit which use cases? Each of these three have pros and cons, and we use all three for different scenarios.

If you're doing complex stream processing, Mantis and Apache Flink are a very good fit. Apache Flink also has a built-in support for stateful stream processing where each node can store its local state, for example, for sessionization. Microservices are very compelling, at least for us, because Netflix engineers have excellent support for the microservices ecosystem, all the way from generating or starting with a clean code base, to CI/CD pipelines and monitoring. 

Cross-Region Aspects

Cross-region aspects are important because Netflix operates in multiple regions. Since we are running a large distributed system, it is possible that a region may become unavailable once in a while. We routinely practice for this, and multiple times a year, we take a region down just to make sure that we exercise our muscle of cross-region traffic forwarding.

At first glance, it might make sense that an item that was meant to be for another region could be just remotely published into a Kafka topic, using a tunnel across the regions. That normally would work except when you do encounter a real outage of that region, that remote publish is not going to work.

A simple but subtle change we made is that we always want to publish locally. We publish to another Kafka topic and asynchronously have a region router send it to the other side. This way, all events of a single playback session can be processed together.

Testing, Validation, and Rollout

Now that we have challenges figured out and trade offs made, how did we test and roll it out? Shadow testing is one technique. Chances are, you may already be using a similar strategy in your environment. For us, this consisted of having Playback API dual-write to the existing synchronous system and to Apache Kafka, from which the asynchronous request processor was consuming. Then we had a validator process that would validate that the in-flight request processing is identical.

The next step was to also make sure that the stored artifacts were identical. For this we created a shadow Cassandra cluster. Here, you're trading off costs for higher confidence. If you have an environment where it is relatively easy to get additional resources for a short period of time, which is certainly possible in a cloud environment like ours, then it gives you the additional benefit of confidence before rolling it out.

We rolled this out, splitting traffic by userId; that is, all traffic for a given userId was consistently written into either the new system or the old. We started with 1% of users’ data written to the new system, then incrementally increased the percentage all the way to 100% of users. That gave us really smooth migration without impact to upstream or downstream systems.

The figure below shows where we are today and where we're going next. The items in blue, the Playback API, the Viewing History Processor, and the Bookmark Processor along with Kafka, are out in production now. We have the rest of the system that deals with additional data. There's an Attributes processor and Session Logs service, which will be interesting because the size of the data is very large: much larger than what you would normally write into Kafka. We will have some new challenges to solve there.

Conclusion

We’ve seen how asynchronous processing improved the availability and data quality for us, and how we reasoned about the design choices and what tradeoffs made sense for our environment. After implementation, shadow testing and incremental rollout gave us a confident and smooth deployment. With this information, think about how you could apply these lessons to your environment, and what other tradeoffs you may make for a similar journey.

About the Author

Rate this Article

Adoption
Style

Hello stranger!

You need to Register an InfoQ account or or login to post comments. But there's so much more behind being registered.

Get the most out of the InfoQ experience.

Allowed html: a,b,br,blockquote,i,li,pre,u,ul,p

Community comments

Allowed html: a,b,br,blockquote,i,li,pre,u,ul,p

Allowed html: a,b,br,blockquote,i,li,pre,u,ul,p

BT