Facilitating the Spread of Knowledge and Innovation in Professional Software Development

Write for InfoQ


Choose your language

InfoQ Homepage Presentations Building & Operating High-Fidelity Data Streams

Building & Operating High-Fidelity Data Streams



Sid Anand discusses building high-fidelity nearline data streams as a service within a lean team.


Sid Anand currently serves as the Chief Architect for Datazoom. Prior to joining Datazoom, Sid served as PayPal's Chief Data Engineer, focusing on ways to realize the value of data. Prior to joining PayPal, he held several positions including Agari's Data Architect, a Technical Lead in Search @ LinkedIn, Netflix’s Cloud Data Architect, Etsy’s VP of Engineering, and several technical roles at eBay.

About the conference

Con Plus is a virtual conference for senior software engineers and architects that covers the trends, best practices, and solutions leveraged by the world's most innovative software organizations.


Anand: My name is Sid Anand. Given that this is a data track, I'm sure many of you are already familiar with streams, but for those of you who are new to it, this might serve as a good introduction. Why do streams matter? In our world today, machine intelligence and personalization drive engaging experiences online. Whether that's a learn to rank system that improves search quality at your favorite search engine, or a recommender system that recommends music, or movies, or recommender systems that recommend who you should follow, or ranking systems that re-rank a feed on your social platform of choice. Disparate data is constantly being connected to drive predictions that keep us engaged. While it may seem that some magical SQL join is powering these connections, the reality is that data growth has made it impractical to store all of this data in a single DB. Ten years ago, we would use a single monolithic DB to store all our data, but today, the picture on the right is more representative of modern data architectures that we see. It is a collection of point solutions tied together by one or more data movement infrastructure services. How do companies manage the complexity below? A key piece to the puzzle is data movement, which usually comes in two forms, either batch processing, or stream processing.

Why Are Streams Hard?

What makes streams hard? Let's have a look at this picture. There are lots of moving parts. It has a very large surface area, which means that there are many places where errors can occur. In streaming architectures, any gaps in non-functional requirements can be very unforgiving. You end up spending a lot of your time fighting fires and keeping systems up. If you don't build your systems with the ilities as first-class citizens, and by ilities, I mean nonfunctional requirements such as scalability, reliability, operability. If you don't build your systems with these ilities as first-class citizens, you will end up paying a very steep operational tax. This translates typically to unhappy customers, and burnt out key members that will eventually leave your team. In this talk, we will focus on building high-fidelity streams from the ground up.

Start Simple

This is a pretty ambitious endeavor. With any large project or endeavor, I like to start simple. Let's start by defining a goal. Our goal is to build a system that can deliver messages from source S to destination D. First, let's decouple S and D by putting a message broker between them. This is not a very controversial decision, it's quite conventional and used throughout the world. In this case, I've picked Kafka as my technology, but you can use any type of message brokering system. In this system, I've created a single topic called an events topic. I've decoupled S and D with this event topic. This means that if D fails, S can continue to publish messages to E. If S fails, D can continue to consume messages from E.

Let's make a few more implementation decisions about this system. Let's run our system on a cloud platform. For many, that just means running it on a publicly available cloud platform like AWS or GCP. It can also mean running it on Kubernetes on-prem. Additionally, to start with, let's operate at low scale. This means we can run Kafka with a single partition in our topic. However, since we want things to be reliable, let's run three brokers split across three availability zones, and set the RF or replication factor to 3. Additionally, we will run S and D on single but separate EC2 instances. To make things a bit more interesting, let's provide our stream as a service. This means we can accept inbound messages at an API endpoint hosted at process S. When messages arrive, S will process them and send them to event topic E. Process D will consume from event topic E, and send the message to some third-party endpoint on the internet.


The first question I ask is, is this system reliable? Let's revise our goal. We want to build a system that can deliver messages reliably from S to D. To make this goal more concrete, I'd like to add the following requirement, I want zero message loss. Once process S has acknowledged a message to a remote sender, D must deliver that message to a remote receiver. How do we build reliability into our system? Let's first generalize our system. Instead of just S and D, let's say that we have three processes A, B, and C, all of which are connected linearly with Kafka topics. In order to make this system reliable, let's treat this linear topology as a chain. Like any chain, it's only as strong as its weakest link. If each process or link is transactional in nature, this chain will be transactional. My definition of transactionality is at least once delivery, since this is the most common way that Kafka is used today. How do we make each link transactional? Let's first break this chain into its component processing links. First, we have A. A is an ingest node. Then we have B. B is an internal node. It reads from Kafka and writes to Kafka. Finally, we have C. C is an expel or egest node. It reads from Kafka and sends a message out to the internet.

How do we make A reliable? A will receive a request via its REST endpoint. It will do some processing on it. It will reliably send that data to Kafka. Then it will send an HTTP response back to its caller. To reliably send data to Kafka, it will need to call kProducer.send with two arguments, a topic and a message. It will then immediately need to call flush, which will flush internal Kafka buffers and force the messages to be sent to all three brokers. Since we have producer config acks=all, it will wait for an acknowledgement of success from all three brokers before it can respond to its caller. How about C? What does C need to do to be reliable? It needs to read data, typically a batch from Kafka, do some processing on it, and then reliably send data out. In this case reliably means that it will need to wait for a 200 OK response code from some external service. If it receives that, since we've disabled autocommit, we've set it to false, the process C will need to manually move or manually acknowledge back to Kafka so that the checkpoint for read is moved forward. If there are any problems, process C will negatively add Kafka, which will force a reread of the same data. Last but not least, what does B need to do? B is a combination of A and C. B needs to act like a reliable Kafka Producer like A, and it also needs to act like a reliable Kafka consumer like C.

What can we say about the reliability of our system? What happens if a process crashes? If A crashes, we will have a complete outage at ingest. That means our system will not accept any new messages. If C crashes, we will stop delivering messages to external consumers, although that does mean A will continue to receive messages, it means that C will not be delivering them anywhere. The most common solution to this problem is to wrap each service in an autoscaling group of size T. If we do so, then each of the groups can handle T-1 concurrent failures. While an autoscaling group is a term that was coined by Amazon, it's available on all cloud platforms and also in Kubernetes.


For now, we appear to have a pretty reliable data stream. How do we measure its reliability? This brings us to observability. A story about lag and loss. If you've worked in streams before, you might be familiar with these metrics. If you're new to it, I'm going to give you an introduction. Firstly, let's start with lag. Lag is simply a measure of message delay in a system. The longer a message takes to transit a system, the greater its lag. The greater the lag, the greater the impact to a business, especially one that depends on real time insights. Hence, our goal is to minimize lag in order to deliver insights as quickly as possible. How do we compute it? To start with, let's discuss one of the concepts called event time. Event time is the creation time of a message. It's typically stored within the message body and travels with it as it transits our system. Lag can be calculated for any message m1, at any node N in the system using this equation.

Let's look at a real example. Let's say we have a message created at noon. That message arrives at our system at node A at 12:01 p.m. Node A processed the message and sends it on to node B. The message arrives at node B at 12:04 p.m. B processes the message and sends it on to C which receives it at 12:10 p.m. In turn, C processes the message and sends it on its way. Using the equation from the page before, we see that T1-T0 is one minute, T3-T0 is four minutes, and so on, we can compute the lag of message one and node C at 10 minutes. In reality, times in these systems are not on the order of minutes but in the order of milliseconds.

One thing I will mention is that we've been talking about message arrival at these nodes, and hence this is called arrival lag or lag-in. Another thing to observe is that lag is cumulative. That means the lag computed at node C accounts for the lag upstream of it in both nodes A and B. Similarly, the lag computed at node B accounts for the lag upstream of it at node A. While we talked about arrival lag, there is another type of lag called departure lag. Departure lag is calculated when messages leave nodes. Similarly to arrival lag, we can compute departure lag. The most important metric for lag in any streaming system is called end-to-end lag. That is the total time a message spent in the system. It is very easy to compute end-to-end lag because it's simply the departure lag at the last node in the system. Hence it's the departure lag at node C.

While it is interesting to know the lag for a particular message, it's of little use since we typically deal with millions of messages. Instead, we prefer statistics to capture population behavior. I prefer P95. Many people prefer Max or P99. Let's look at some of the statistics we can build. We can compute the end-to-end lag over the population at P95. We can also compute the lag at any node, we can compute the lag-in or lag-out at any node. With lag-out and lag-in, we can compute what's called a process duration. This is the time spent at any node in the chain. Why is that useful? Let's have a look at a real example. Imagine this topology, we have a message that hits a system with four nodes, the red node, green node, blue node, and then finally an orange node. This is actually a system we run in production. The graph below is taken from CloudWatch for our production service. As you can see, we took the process duration for each node and put it in a pie chart. This gives us the lag contribution of each node in the system. The lag contribution is approximately equal for each node. No node stands out. In fact, this is a very well-tuned system. If we take this pie chart and spread it out over time, we get the graph on the right, which shows us that the performance is consistent over time. We have a well-tuned system that performs consistently over time.


Now that we've talked about lag, what about loss? Loss is simply a measure of messages lost while transiting the system. Messages can be lost for various reasons, most of which we can mitigate. The greater the loss, the lower the data quality. Hence, our goal is to minimize loss in order to deliver high quality insights. Loss, how do we compute it? Loss can be computed as the set difference of messages between any two points in the system. If we look at our topology from before, the one difference you see here is that instead of a single message transiting the system, we have 10 messages transiting the system. We can use the following loss table to compute loss. Each row in the loss table is a message, each column is a node in the chain. As a message transits the system, we tally a 1. For example, message 1 is successfully transited through the entire system, so there's a 1 in each column. Message 2 also transits each node successfully in our system. However, message 3, while it successfully processed at the red node, does not make it to the green node. Therefore, it doesn't make it to the blue or the yellow node. At the bottom, we can compute the loss per node. Then, on the lower right, as you can see, we can compute the end-to-end loss in our system, which in this case is 50%.

In a streaming data system, messages never stop flowing. How do we know when to count? The key behind this is that we can allocate messages to 1-minute wide time buckets using message event time. For example, at the 12:34 minute of a day, we can compute a loss table, which includes all the messages whose event times fall in the 12:34 time. Similarly, we can do this for other times in the day. Let's imagine that right now the time is 12:40 p.m. As we know, in these systems, messages may arrive late. We can see four of the tables are still getting updates to their tallies. However, we may notice that at 12:35 p.m. table is no longer changing, and therefore, all messages have arrived that will arrive. At this point we can compute loss. Any table before this time, we can age out and drop. This allows us to scale the system and trim tables we no longer need for computation. To summarize, we wait a few minutes for messages to transit, and then compute loss. Then we raise an alarm if loss occurs over a configured threshold, for example, say 1%. Now we have a way to measure the reliability and latency of our system.


Have we tuned our system for performance yet? Let's revise our goal. We want to build a system that can deliver messages reliably from S to D, with low latency. To understand streaming system performance, we need to understand the components of end-to-end lag. The first component I'd like to mention is called the ingest time. The ingest time is the time from the last byte in of the request to the first byte out of the response. This time includes any overhead we incur in reliably sending messages to Kafka. At the end of our pipeline, we have something called the expel time or the egest time. This is the time to process and egest a message at D. Any time between these two is called transit time. All three times together are part of the end-to-end lag.

Performance Penalties

When discussing performance, we have to acknowledge that there are something called performance penalties. In other words, we need to trade off latency for reliability. Let's look at some of these penalties. The first penalty is the ingest penalty. In the name of reliability, S needs to call kProducer.flush on every inbound API request. S also needs to wait for three acks from Kafka before sending its API response. Our approach here is to amortize. What that means is that we will support batch APIs, therefore, we get multiple messages per web request. We can then amortize the cost over multiple messages, thereby limiting this penalty. Similarly, we have something called the expel penalty. There's an observation we need to consider. Kafka is very fast. It is many orders of magnitude faster than typical HTTP round trip times. In fact, the majority of the expel time is the HTTP round trip time. Again, we will use an amortization approach. In each D node, we will add batch and parallelism. We will read a batch from Kafka, then we will re-batch them into smaller batches, and use parallel threads to send these out to their destinations. This way, we can maximize throughput and minimize the expel cost or penalty per message.

Last but not least, we have something called a retry penalty. In order to run a zero loss pipeline, we need to retry messages at D that will succeed given enough attempts. We have no control about the remote endpoints. We have no control over those endpoints. They could be transient failures, there might be throttling. There might be other things going on that we have no control over. We have to determine whether we can succeed through retries. We call these types of failures recoverable failures. However, there are also some types of cases or errors that are not recoverable. For example, if we receive a 4xx HTTP response code, except for the throttle 429s, if we receive these 4xx types, we should not retry because they will not succeed even with retries. To summarize, to handle the retry penalty, we have to pay some latency penalty on retry. We need to be smart about what we retry, which we've already talked about. We're not going to retry any non-recoverable failures. We also have to be smart about how we retry.

Performance - Tiered Retries

One idea that I use is called tiered retries. With tiered retries we have two tiers, we have a local retry tier and a global retry tier. In the local tier, we try to send a message a configurable number of times at D. If we exhaust local retries, D transfers the message to a global retrier. The global retrier then retries the message over a longer span of time. The architecture looks something like this. At D, we will try multiple times to retry a message. If we exhaust those local retries, we send the message to a topic called a retry_in topic. A fleet of global retries will wait a configurable amount of time before they read from this topic, and then immediately send the message to the retry_out topic. D will re-consume the message and try again. The beauty of this approach is that in practice, we typically see much less than 1% global retries, typically much less than 0.1%. Therefore, even though these take longer, they don't affect our P95 end-to-end lags.


At this point, we have a system that works well at low scale. How does this system scale with increasing traffic? First, let's dispel a myth. There is no such thing as a system that can handle infinite scale. Many believe that by moving to Amazon or some other hosted platform, you can achieve this type of goal. The reality is that each account has some limits on it, so your traffic will be capped. In essence, each system no matter where it runs is traffic rated. The traffic rating comes from running load test. We only achieve higher scale by iteratively running load tests and removing bottlenecks. When autoscaling, especially for data streams, we usually have two goals. Goal one is to automatically scale out to maintain low latency, for example, to minimize end-to-end lag. Goal two is to scale and to minimize cost. For this talk, I'm going to focus on goal one. When autoscaling, there are a few things to consider, firstly, what can we autoscale? At least for the last 10 years or so, we have been able to autoscale, at least in Amazon, compute. All of our compute nodes are autoscalable. What about Kafka? Kafka currently does not support autoscaling, but it is something they're working on.

In this talk, we'll discuss autoscaling compute. The most important part of autoscaling is picking the right metric to trigger autoscaling actions. To do that, we have to pick a metric that preserves low latency, that goes up as traffic increases and goes down as the microservice scales out. In my experience, average CPU is the best measure. There are a few things to be wary of. If you have locks, code synchronization, or an IO waits in your code, there will be an issue. You will never be able to saturate the CPU on your box. As traffic increases, your CPU will reach a plateau. When that happens, autoscaler will stop and your latency will increase. If you see this in your system, the simple way around it is to just lower your threshold below the CPU plateau. That will get you around this problem.


At this point, we have a system with non-functional requirements that we desire. While we've covered many key elements, we've left out many more: isolation, autoscaling, multi-level autoscaling with containers, and the cache architecture.

Questions and Answers

Reisz: The first question I want to talk about is decomposing the monolithic database into a stream, and what smells, signals that you might be able to detect. Another one was around event duplication. There were some comments and questions there. Then you made a comment about learning constraints. I want to hit on those three. Starting with streaming systems. It's like you have this monolithic database, you have maybe some joins that are working. You talked about some of the signals that you get, it's time to start decomposing that, but I want to double click on that a bit. As someone who's done this many times, what do you recommend for people? What do you look for? What are the smells? What are the signals?

Anand: What I've generally seen is that storage scale is not the typical consideration for moving beyond the monolithic database. Most databases have a disaggregated storage, like they'll attach a SAN, or if you're using the cloud, you have EBS volumes, you can scale that up quite a lot. To give an example, PayPal had 20 petabytes of storage, that was all sitting in SANs. That's a lot of storage, 20 petabytes of OLE DB, Oracle Database, all hot. Most companies when they think of 20 petabytes, they're thinking of their offline data in Hadoop or S3, they don't think of 20 petabytes attached to an operational Oracle Database. That's quite a lot. Storage is not always the thing that moves away from the monolithic database. It's typically like access scale, or availability. A company might say, ok, we need a DR solution, so that's the first step. They'll have a master follower failover strategy that could be a failover to a warm standby, that can be about five minutes, in some cases, and they'll have these databases distributed across multiple data centers.

The other cases, they just split the read/write traffic, that's a very common pattern. If we're talking about availability, so let's say DR, and the other point about availability would be some small number of companies will do an active-active database. That's the availability story. Then the one around access scale, is typically your BI team is going to run these very long queries against your database. They'll end up asking for a lot of indexes to be built to avoid full table scans, and all of that query traffic can cause your write traffic to fail because some databases will run out of undo space or rollback segments. Their writes will actually get cancelled because of long running queries. Usually at that point, you tell the BI team, you have to use your own OLAP database, and we'll replicate data from this OLTP, Oracle Postgres to Snowflake or something, and you can run your queries against that.

Reisz: Let's break out data corruption and event duplication. First, data corruption, and then we'll talk about maybe duplicates, and maybe even Kafka processing guarantees. First, how do you think about data corruption? It's just something you push to a dead letter queue, how do you think about corruption when you're working with what you just described?

Anand: Data corruption is a very hard to define thing. Let's say we're talking about databases, let's say Oracle, Postgres, MySQL, whatever it is. You write data to that database, and there are generally two ways that data can be replicated to a follower database. One is a physical replication and the other one is logical replication. Physical replication is when the blocks on the disk get replicated out. If you have disk corruption in the master you're writing to, you do replicate that garbage out, and you're in trouble, because the disk that went bad just quickly replicated it to all the slaves.

Reisz: If you replicate that data, then what happens?

Anand: Then you're in trouble, and the only thing you can do is to recover a snapshot, and you typically do nightly snapshots. You would have lost data since that snapshot. That's one case. The other case is logical replication, which just means that you replay the commit log on your follower databases. What is corruption there? If you're replaying the transaction, and one database has a disk corruption, the others won't, so you just fail over. You promote a follower to mastership in that case. Then the third is just corruption in the sense of semantic corruption. Like I rolled out some code, that code is writing garbage. I detect that that code wrote garbage, so I roll back that code, but whatever I wrote to the database is now there. That was semantic corruption because of a bug in processing.

That can happen in stream processing, too. It's actually easier to solve in stream processing than it is in databases. In stream processing, when code is deployed to, let's say we had those streams A, B, C, D to B, when I actually roll new code out, I save the checkpoint somewhere. That checkpoint I saved for wherever it was reading, the consumer, I save it somewhere. If I have observability that looks for errors, or corruption, or this problem, it automatically tells the deployer, your deployment is bad. Maybe after 10 minutes, the deployment will then get rolled back and the checkpoint will get reset. In that 10 minutes, any garbage that went out, when the code is rolled back, the old checkpoint comes with it, and it will replay 10 minutes of data and this time it will be good. Usually over at the sink, that's how we solve it.

Reisz: The second half of that question was about duplicates, before you answered it about final sink. Talk a little bit about Kafka processing guarantee semantics, and then your response.

Anand: I use Kafka at least once, because sinks will have a primary key typically, and last write wins. No ordering guarantees whatsoever. I've heard that they have this thing called exactly once, but I don't think that's possible. I think they can do a good job of getting there. Anything, even writing to a process can get a read timeout. Two processes communicate, there's a TCP setting for read timeout. I send a request to you. I don't hear back in time. I assume it didn't get to you, so I send it again. That can happen in any distributed system. In your distributed system, you are using Kafka along the way, but any point around there can cause duplicate. That's not a true thing, this exactly once semantics, if we're a full stream. You need to solve it anyway, so I just assume at least once delivery.

Reisz: You said I heavily advise against introducing ordering constraints in the system, ordering can always be taken care of at the time of writing to a sink, in other words, DBs, search engines, or soon afterward. Can you double click on that a little bit?

Anand: Ordering guarantee, it means using key topics, for example, in Kafka, and they have this concept in other systems as well. You'll have a stream, at some point that data has to land in persistent storage. We already talked about the fact that retries can happen. Unless you want data to flow along a specific partition, a topic's key partitions, you can't really guarantee this concept of in order delivery. When you use key partitions, you may not get uniform scalability, uniform usage, because now you're using some sort of key. If you have a non-uniform distribution among keys, then your load is not distributed evenly. It's an anti-pattern to use this key partition concept. Anyway, when data lands in a database or in a search engine, you can use a primary key, and it will resolve any out of order delivery. It'll handle retries. It'll handle all of that. If you're writing to blob storage, like S3, Azure Blob Storage, GCS, HDFS, then you use post processing. After the data lands, you post process it to remove duplicates, and you can run it within a few seconds after it lands.

Reisz: Could you give us a little preview when you say multi-level autoscaling with containers in what you just described? Could you talk a little bit about your thinking about for that post?

Anand: At the company I'm at right now, we built our streams using all of these ilities. You might have asked another question about, what is the concept of infinite scale? Autoscaling is part of that story. Our system, for example, can scale up 50x within 2 minutes, starting with a traffic burst. For 2 minutes, it'll scale up 50x for that level of traffic. Right after like 2-minute period, it will basically have 99% sub-second latency, end-to-end, and even on call duration for like calls into the API. It settles very quickly. To achieve that, we run on Kubernetes on Amazon. We use HBA with a CPU target. Then our container EC2 autoscaling uses memory. I'll be talking about how we achieve this scalability in a future blog post, and all the issues we had with the control plane being overwhelmed at high scale, route tables, and a bunch of that stuff.


See more presentations with transcripts


Recorded at:

Apr 07, 2022