BT

Facilitating the Spread of Knowledge and Innovation in Professional Software Development

Write for InfoQ

Topics

Choose your language

InfoQ Homepage Presentations Laying the Foundations for a Kappa Architecture - the Yellow Brick Road

Laying the Foundations for a Kappa Architecture - the Yellow Brick Road

Bookmarks
43:12

Summary

Sherin Thomas discusses strategies to evolve Data Infrastructure to enable Kappa architecture in an organization.

Bio

Sherin Thomas is a Software Engineer with over 12 years of experience at companies like Google, Twitter, Lyft, Netflix and Chime. She works in the field of Big Data, Streaming, ML/AI and Distributed Systems. Currently, she's building a shiny new data platform at Chime. Sherin has presented on the topic of ML and Streaming at various reputable conferences.

About the conference

Software is changing the world. QCon empowers software development by facilitating the spread of knowledge and innovation in the developer community. A practitioner-driven conference, QCon is designed for technical team leads, architects, engineering directors, and project managers who influence innovation in their teams.

Transcript

Sherin Thomas: We will be talking about an interesting architecture in the streaming world, the Kappa architecture. Before we dive into all of that technical stuff, I wanted to share a story with you that I heard just a few months ago. Does anyone recognize these paintings? These are Mark Rothko's famous Seagram murals. They live at the Tate Museum in London. I got a chance to look at them in person back in 2020 while I was in London for QCon.

When Mark Rothko set out to create these paintings, his goal was that he wanted people to feel certain emotions, the type they feel when they listen to music, or watch movies, and paintings until then haven't had that effect on people. It was a very ambitious effort. He was a pretty unknown, almost starving artist when he received a fat commission from the Four Seasons Hotel of New York.

His paintings were supposed to adorn the dining hall at the Four Seasons. One day before he submitted the commission, he went for lunch with his wife. As he sat there among the excess and extravagance, it dawned on him that people who ate there are not going to appreciate his work. They're not going to look at it, contemplate, and it won't evoke the same emotions as he had hoped to. He canceled his commission, and he donated almost two to three years' worth of his time, the time that he took to create those.

He donated all of that to the Tate Museum, where it hangs today for everyday man, woman, and child to admire in contemplation in that quiet room. Why am I telling you all this? Because when I heard the story for the first time, my main takeaway was that sometimes we chase ideals and forget the practicalities, who are we building it for? What are the practical parameters that we are bounded by? Sometimes imperfect is good.

My name is Sherin. I'm a software engineer at Chime. I've previously worked at Netflix, Lyft, Twitter, and Google. I've been doing data stuff for the past decade. I want to show you how you can build a multi-purpose data platform that can support a range of applications on the latency and consistency spectrum, using principles from Kappa architecture.

Consistency

In any typical organization, you would have applications with a range of latency and consistency needs. Starting with BI and revenue reporting, where latency is not so much a problem, you probably generate these just once a month, but consistency is. You want to make sure those revenue numbers are exactly correct. Then you have the easy ones where neither latency nor consistency is that much of a challenge, so something like experimentation metrics.

Then you have things like metrics and observability, where latency is super important. You want to know if your users are suddenly hitting 404s. Consistency is not so much because we're looking at the scale. If there are 1000 people hitting 404 versus 1001, it doesn't really matter as much. The most challenging one is the first quadrant, where both consistency and latency become super important. These are your event driven systems where suppose you want to count the number of transactions in real time. Maybe you want to deliver incentives based on some event driven logic, and you just want to make sure that those incentives are delivered just once.

Lambda Architecture

The most challenging part about achieving consistency in the streaming world was the fact that the early stream processors were not considered to be consistent, some of them provided at-most delivery guarantees at best. Lambda set out to solve that problem. Lambda combined the best of two worlds, the slow but consistent results from batch, combined with the fast but possibly inconsistent results from stream into one store from where results can be served. It came with its challenges, and all of that stemming from these two code paths.

You have two sources in the system because a batch processor might read from a batch source, a data lake, data warehouse, a stream processor would read from a stream. Moreover, let's say you make changes to your logic. Now you have to deploy that to two different processing systems to comply with probably two different technologies, and applying those logic consistently in both parts can be challenging. Upgrades are hard. This is operationally very complex.

Kappa Architecture

In 2014, Jay Kreps, the founder of Confluent, he wrote a memo first introducing Kappa architecture, in that he posited that the main reason why lambda existed is because stream processors were not considered to be consistent. With modern stream processors such as Flink and Dataflow, that's no longer true, so do we still need that batch component? In the Kappa world, there is a streaming first single path, there is a single source of truth data source, which is a stream.

If you have to run batch in this world, it would be run as a special case of streaming. It has its challenges as well, because of which it is 2023 but Kappa is not as widely used as we would like it to be. What are those? One, a streaming first single path may not be practical for many different reasons. Let's say you want to run backfills and replays. Let's say you want to bootstrap your state. For this, you need history, and doesn't really make sense in the streaming paradigm.

The second challenge is having a single source of truth. Most organizations already have a data lake and warehouse. Moving to a stream as a single source of truth requires a paradigm shift. The third is accepting batch as a special case of streaming, because we are all possibly used to Spark and Airflow workflow, so this is again, a big shift.

I want you to just remember these five things. One, modern stream processors are capable of low latency and consistency. The only reason lambda is still in use is for bootstrapping, backfills, and replays. Second, the reality is that batch and streaming applications will continue to coexist. Kappa is great, but it is not a silver bullet, neither is lambda. The dual code path makes it hard to manage. Fifth, a backward compatible, cost effective, versatile, and easy to manage data platform might be a combination of lambda and Kappa, and we'll see how.

Let's talk about the ideal world. Let's build a data platform where you have all of these three ingredients from Kappa in place, starting with the single source of truth. In the Kappa world, since there is a single source of truth, that single source of truth needs to serve real-time applications, it needs to serve batch, backfills, replays. For those, you need history, which means you might need infinite retention. Traditionally, in a system like Kafka, retention is increased by adding brokers.

This is expensive, because broker instances are meant to serve live traffic, they have compute resources, CPUs, memory, and also whenever you add new brokers, partitions are moved around. Those take up precious network resources that could be used for serving production traffic. It is expensive and affects the performance. Moreover, not all data in a stream is created equally or read equally. If you break down your stream, you'd realize that most of your real time applications would be reading from the tail end of the log.

They would be reading, even considering like batching or restarts from checkpoints and so on. They would be consuming maybe the last few seconds or minutes, or maybe up to an hour's worth of data. For anything that is older than an hour, those would be read whenever there is a big enough downtime. Depending on your latency SLOs, that could be maybe a couple of times a week, if not fewer than that. Then the really old data that's like days old data, that's only ever used in case of backfills and replays, so maybe a few times a month.

The makers of Kafka also realized this, so they introduced tiered storage in Kafka very recently. With tiered storage, what happens is that all the data on the brokers is also flushed to a remote store, a cheaper remote store such as S3. Different retention policies can be applied on the data in the brokers and the remote stores. You could choose to maintain data in the remote store infinitely, and on the brokers, you can put a retention policy of maybe a day, or two days, three days, something like that.

Each broker has a reference to the objects in the remote store. Now when you have to read data from history, the broker knows where to pull it from and it can serve it to you. From a user's point of view, all of this is completely abstracted away. The biggest benefit is that you can use something cheap like S3 for your remote, and SSDs in your brokers to serve the high throughput writes to the tail end of the log, and throughput reads.

Flink Unified Batch and Streaming

The next order of business is figuring out compute in an ideal Kappa world. Flink is a modern stream processor, which is capable of achieving really high consistency, exactly once processing. It does this by event time-based processing watermarks, having a checkpointing strategy for fault tolerance and so on. You would also be pleased to know that about two years ago, Flink also came out with a unified API, a data stream API that you can use to write your job once and then you can run it either batch mode or streaming mode.

Flink uses the different characteristics of batch and streaming to optimize it for each runtime. For example, batch jobs work on bounded, complete data, streaming jobs run on unbounded streams of data with constantly incoming data. In case of batch, Flink deploys the job similar to a MapReduce job where operators run in stages. Your first stage would run on the complete set of data, produce an intermediate result which is passed on to the next stage, and so on.

In case of streaming, it's deployed as an always on DAG, where, as events come in, they are processed by each operator and forwarded on with no batching or buffering, except for the little bit of buffering that happens at the network layer. For this reason, Flink you don't really need checkpoints in the batch world, because it runs in stages.

Even if your job dies, and if you restart it, it can just pick back up from the incomplete stage, whereas checkpoints are enabled for a streaming job. Also, there are a few differences in how results are materialized. In a streaming job, results are materialized incrementally, so think of upserts. Whereas in batch, results are materialized once, right at the end, once the job finishes.

Beam Model

If you don't want to use Flink, let's say you already are using Spark or something else for batch or some other streaming technology, you are still covered in this world. Beam model. Beam SDK is an open source, I call it a pipeline expression language that came out of Google. With the Beam model, you can write your code once.

When you're writing your code or you're expressing your logic, all you have to care about, what are you computing? Where in event time are you aggregating data? When in processing time you are triggering the results. How do these results correlate with each other? Are they append-only? Are they upserts? Nowhere in this do you care about where it's running or the engine. That's the beauty of Beam model.

You write your code once, and you can run it on any of the supported engines. Because Beam is an open source project, there are many different engines coming into the fold. It supports Flink, Spark, Dataflow, many such. Portability is a big plus. Let's say that you want to port over to something else later, you don't really have to change the application logic, just the underlying engine. The only caveat is, take a look at the capabilities matrix, because not all features are available in all the engines. When you are thinking about portability, or a multi-engine support, take a look at the capabilities matrix because you might have to take lowest common denominator.

Write to Both Stream and Lake

We talked about the ideal Kappa world. The truth is that it may not be practical for everyone. In my personal experience, the biggest challenge that people run into is reconciling the single source of truth part. I have you covered. Let's discuss how you could still get the benefits of Kappa without having that single source of truth. Instead of having a single source of truth infinite retention stream, what you can do is as you are ingesting data, try to ingest it through a single entry point.

From that entry point, you may fan out the data to a data lake, and you put a warehouse or catalog on top of it and use it for BI and for all your reporting use cases. You can fan out that same data to streams for any kind of stream processing jobs. Because you have data available in these two places, you may choose to have really high retention or infinite retention in the data lake, which is usually cheaper, and get away with short retention in the streams. This is where things get a little interesting, the short retention in the streams.

Now let's say you have a streaming job, and the streaming job is counting the number of clicks on your website over a 30-day window, and this window progresses every 5 minutes. It's like a sliding window of 30 days, and it slides forward every 5 minutes because you want to keep that information fresh. Let's say you kick off your streaming job today, and it's working on data as it's coming in.

To get a complete data for the first state, your job would have to run for 30 days. What if after 30 days you realize there's a bug or something else so it's definitely not ideal. With bootstrapping, you can do a lookback instead. You can look back into history, bootstrap your first state, and then keep it going as new data comes in through your stream.

Here's where the problem lies. Your stream only has 7 days of retention, and you need 30 days of history. For this, you may consider using something that I call a lambda source. A lambda source is essentially the source operator in your streaming job, or, for example, your Flink job. This source operator can read data from two sources, the low retention stream and the high retention data lake. On read, it will union the data and forward it on to the rest of the DAG.

You may choose to use a cutoff timestamp to reduce any duplicates arising from the overlap between the stream and the lake. Let's say you choose a cutoff time x, and you may say, only read data newer than x from the stream and data older than x from the lake. With that cutoff time, what you'll realize is that after a point, once your job is bootstrapped and it's running along, you won't really need any more data from the lake because you have fresh data coming in through the stream.

Also, because there is no data being read from the lake, the throughput that the job is processing has also gone down. At this point, you may choose to lower the parallelism, or essentially reduce the resources to your job by taking a safepoint of the job, and restarting it from the safepoint, but with a lower parallelism and fewer resources. However, depending on the way you have designed your application, you may want to keep the job graph the same, especially in Flink.

In Flink, to apply safepoints, or the way operators maintain state is in safepoints. If you remove an operator, and its state goes away, and if you readd that operator later, it will show up as a fresh node, and all of the state would be retained. You may have to keep your job graph the same. That's fine. You can just reduce the parallelism enough that it's not really wasting any resources.

The next order of business is, to union these two things, you'd have to convert your lake to a stream. How might one do that? In order to understand this, let's think about how a stream works or how a stream is perceived by a stream processor. A stream is this constantly incoming stream of events, stream of data, which are roughly monotonically increasing in time. I say roughly, because there can be some amount of disorder in the stream, because it's distributed systems, things can arrive out of order, arrive late. In the streaming world, there is a concept called watermarks.

Watermarks provide some ordering to the streams. Watermarks are like breadcrumbs in your system. It communicates to your stream processor that any data older than my timestamp should have already arrived. If you were waiting for any older data, and you were waiting to trigger any results, feel free to go ahead and do that. It's like an indication of how late data can be. Let's say you have implemented watermarking strategy perfectly, and this is a perfect scenario.

This is what it would look like. Between two watermarks, you will see that things are a little out of order. You can see the 21, 22, 19, they are slightly out of order. If you take one watermark block, which is data between two watermarks, then all of the data that arrives now will have a smaller timestamp compared to all of the data that arrives next, in the next watermark block. Your watermarks are essentially monotonically increasing in time. You have watermark 22 is greater than 18 is greater than 19.

In order to convert a lake to a stream, you would do something similar, where you'd essentially partition your data in your lake by time. When you're writing data to the lake, you'd partition by day, by hour, something like that. When you're reading these partitions, consider reading them in sequence. If you're reading 30 days' worth of data, read the 30th day first, then the 29th, then the 28th, and insert the watermark after a partition. That watermark can be equal to the largest timestamp in your partition.

This way you're indicating to the stream processor how long to wait for any late arriving data. This is also assuming that the data within your partition may not be sorted by event time. The other point to note with this strategy is that with really large partitions, the watermarks can be really spaced out. This can lead to some interesting problems, so windows remaining open for a long time. How does that work?

In a stream processor, as data comes in, states are updated. Let's say you have windowed states, keys that are windows, and as the data comes in, states are updated, but they are not triggered just yet. They are triggered once a watermark shows up. The stream processor can make a determination that, ok, I don't have to wait any longer for late data, so some of these windows that are already baked, I'll just go ahead and fire them.

If your watermarks are really spaced out, it means your stream processor might be waiting a long time and those windows would be open and they would be taking up memory. Consider using smaller partitions, maybe hourly or smaller depending on your use case.

The other interesting challenge because of combining these two sources is watermark skew. As you recall, we are reading new data from the stream and old historic data from the lake. What does this do? Because you're reading both at the same time, essentially newer data as well as older data from the lake, they're all coming in together into your system.

This is how watermarks typically propagate within a Flink DAG. Let's look at the source one. Each operator makes a local decision on how to process the watermarks. Watermarks are always processed in order, the smallest one first, and descending order from there. When a source is reading from two sources, you can see that it's reading from a source that has watermark 33 and then another one with watermark 88, it would process the 33 first, so it will forward the 33 first, and 88 much later, kind of like a tournament tree.

The effect this has is in this lambda source, the distance between your latest watermark and the oldest watermark is really high, which means that as the watermarks are processing in descending order, a lot of this data is updating the windows as you can see, and they are all remaining open waiting for the job to progress. At a previous company, we ran into this issue where we were doing bootstrapping over 90 days' worth of data. We were constantly running into out of memory issues, and really high memory usage and things like that.

At that time, we developed what we called a watermark or a source synchronization. Remember I told you in the beginning, in a Flink tag, sources or operators make local decisions about watermarks. In this world, now what we did was we published a global state for that entire job to look at. In this global state, there is information about the lowest watermark that is being processed at a time. Let's say I'm an operator, I'm reading from Kinesis.

I look at the data, and I see that the data that I'm reading that was from a few seconds ago. Then I check my global watermark state, and I see that the global watermark is from 30 days ago or some such. Based on this information, I know that if I read data from Kinesis, and I forward it, it's not going to be used any longer. It's just going to be sitting around in buffers, not ready to be used just yet. I'll just wait. I'll just say that, ok, I'm just going to wait, I'll just keep a check on that global watermark.

Once it advances enough, I will start reading from Kinesis. I'll just wait. I'll just periodically check in with the global watermark. As it progresses, and once it reaches close to the data time that my system is producing, at that time I will start reading from Kinesis. I will update the shared state if I need to, and then produce data. This way we are removing the problem of newer data sitting around in buffers, while older data is being processed.

The next order of business is schema unification. Very often, when we store data in lakes, you may flatten out nested fields or do some other kinds of enhancements. The data coming in through the stream might be in a slightly different structure. When you are reading data from two sources, make sure that a unified schema can be applied.

The other option, if you do not want to use a lambda source is to use maybe a batch job. A batch job that can read from data lake, update an initial state. Then once the initial state is ready and baked, a new streaming job can be kicked off that will read data from the initial state and then from the stream and then continue processing. This is being actively discussed in the Flink community.

Recap

Single source of truth requirement in Kappa is probably the hardest one to adopt. To get around that, consider ingesting your streaming data into a data lake as well as streams. A lambda source is a union of data from lake and stream, and you can use it for bootstrapping large windows in a streaming job. You could also reuse the lambda source for backfills, replays, also for batch jobs. Consider schematizing your data so that you can easily and reliably combine data from two sources.

Takeaways

Modern stream processors are capable of low latency and consistency. The only reason lambda is still in use is bootstrapping, backfills, and replays. Batch and streaming jobs will continue to coexist. Kappa is great, but it is not a silver bullet, neither is lambda. The dual code path makes it hard to manage. A backward compatible, cost effective, versatile, and easy to manage data platform might be a combination of Kappa and lambda.

What Is Streaming?

Participant 1: I always have been curious about actually when people talk about streaming. If we actually started to use persistent storage like S3, are we still talking about streaming because to me it's like streaming is a combination of persistent storage [inaudible 00:32:21]. Can you elaborate a little more about if I combine with S3, is that actually real time streaming. Is that still considered streaming in general?

Thomas: In my mind, or when you think about streaming, streams are what they are, kind of like a log of events that are roughly ordered by some notion of event time, and also new data comes in in processing time as the clock time progresses. That is a stream, and with some amount of disorder. I agree with you. Data lake or persistent store, it is like data at rest, that is by definition. For the purposes of how I talked about in the lambda source, even though it is data at rest, if you are capturing the event time correctly.

Let's say your events or your data has a notion of when it was created, then you may be able to convert that into a stream by adding a rough ordering, and by also inserting watermarks in the middle to communicate to the consumer, how much of entropy there can be or how long to wait for late data and things like that. It's a way of reading it as a stream instead of materializing it as a stream.

Challenges of Idempotence and Streaming

Participant 2: I wanted to touch base on idempotence on the data on the streaming model. Let's say you do event sourcing on a multi-domain space, and creating that idempotence on the streaming. Can you speak about the challenges you had?

Thomas: Back in one of my older companies, our feature store was a combination of batch features for machine learning and streaming features. We used sometimes batch for populating the historic features, and so on. Over there because all our windows were based on event time and not processing time, what we did was for every result that was emitted, there was a notion of an event time for that result.

For example, let's say I have a hopping window, or like a sliding window, and the sliding window is taking data from x0 to xN, then the timestamp for the result, or the output of that would be xN. We would use that event time to correlate with that result. Now when it is written to an idempotent store, and because we are using event time, even if you are replaying that data from history, that timestamp will not really change. When writing, we would only keep the latest data for that key. That's how we did it. Event time is key for this problem.

Practical Implementations of Writing to a Unified Schema

Participant 3: You mentioned that during the batch and streaming system, it would be writing to a unified schema. Can you share with us some practical implementations of that? I imagine how would you do it without compromising performance? How would you unify something else like a nested structure versus a flat data warehouse, or a data lake?

Thomas: When this was implemented in practice, so all the events in our system had a schema, like a protocol buffer schema. In the first diagram that you saw, we had one entry point for all the events coming in, and from there, it was fanned out to a data lake and the stream. In the data lake, sometimes, what we would do is generate to write it to a Parquet format. There would be flattening out done. The source of truth is still that protocol buffer schema for all of those purposes.

Now, when we are reading that data, we know that for event A, this is the protocol buffer schema that it relates to. Now when we read it, we can reapply, so when we read the Parquet data, we can munge it into that protocol buffer representation. The challenges would arise with respect to time travel. You'd need to have some schema registry, as you are reading, what was the schema when that data was produced?

Storing that reliably somewhere, I think that is the main challenge associated with that. Also, having a single entry point for all of these events before fanning out to data lake, and Kinesis, that helps.

Partitions in a Data Lake

Participant 4: You talked about how you use partitions in your data lake as basically watermarks for S3. How do you reconcile that where you want smaller partitions there with a batch processor that has been already active on the data lake that wants really large partitions to consume with your files, [inaudible 00:38:05], do you duplicate the data, or do you reconcile?

Thomas: We did not really duplicate the data. At one point, our partitions were by day, and even with day, it changes quite a bit. There might be some sparse events for which day might have shorter partitions, then we had like certain click events that were tons of data for even a day. We kept that the same, and we solved the problem with the synchronization bit.

One small change that we made was, we ordered the data within the partition by time. Initially, it was unordered because the main problem with spaced out watermarks comes in if your data within a partition is unordered. You may consider ordering it by event time so then when you read, you can insert watermarks more frequently.

Source Synchronization, and Global Watermark

Participant 5: I'm interested in how you do the source synchronization. You need to do this as a global watermark. At which layer do you tell that with customize source function with stream processing, or something?

Thomas: We saved the global watermark in Flink JobMaster. The only downside of that is, quite often there would be communication between task managers and the job manager as they're reading the global state, updating the global state, and so on.

 

That's the main challenge depending on how often you are checking in on that. The source synchronization bit, we already committed it to open source Flink, so it is available. Flink recently came out with watermark alignment. I think it was in beta in 1.17. It should be out of beta now. Look into watermark alignment where this is solved out of the box.

The Cost of Streaming and Batching

Participant 6: I'm just curious about, you have a very nice system built up for the large amounts of data, do you need to worry about the cost where you have streaming and then you have batching on the side, or you duplicate data to the product [inaudible 00:40:40]. Why do we have such a system for [inaudible 00:40:45]? What is the return on investment? Do you apply some techniques to reduce cost or maintain the cost to a lower level?

Thomas: In the companies that I have worked for, the streams were managed Kinesis Streams, and in the Kinesis Streams, we used, often, 24 hours of retention. The cost varies by the amount of retention that you have in your streams. This is where having the data in the data lake came to our rescue and that was the driver behind building this lambda source again, because a lot of our features required a really large lookback. There were some that even required like a global state to be progressed, like just give me the count of our transactions by use. These are some knobs you can turn with respect to how much retention that you have.

The second place where the cost comes in, and this was before AWS introduced enhanced fan-out. Before enhanced fan-out, a Kinesis Stream could only be read by at most two consumers, if they are reading. For the amount of write throughput, the only way to scale out the number of consumers is by increasing the number of shards. Otherwise, in a best-case scenario, for one write throughput there is like one is to two ratio between write and read throughput. We would have essentially duplicate or replicate the streams for different jobs so that we can support multiple consumers. With enhanced fan-out, I think the way the cost works over there is just per shard. It's just a few cents more. When we did the math, it came out to be like, for four or more consumers it was easier to use enhanced fan-out than replicated.

 

See more presentations with transcripts

 

Recorded at:

Nov 06, 2023

BT