BT

Facilitating the Spread of Knowledge and Innovation in Professional Software Development

Write for InfoQ

Topics

Choose your language

InfoQ Homepage Presentations Streaming from Apache Iceberg - Building Low-Latency and Cost-Effective Data Pipelines

Streaming from Apache Iceberg - Building Low-Latency and Cost-Effective Data Pipelines

Bookmarks
48:48

Summary

Steven Wu discusses the design of the Flink Iceberg, comparing the Kafka and Iceberg sources for streaming and how the Iceberg streaming source can power many common stream processing use cases.

Bio

Steven Wu is a software engineer at Apple. He is working at the AIML data platform team focusing on stream processing and data lake technologies. Previously, he worked at Netflix where he helped build the real-time data infrastructure. He is passionate about building scalable distributed systems and empowering people with data.

About the conference

We've been helping software development teams adopt new technologies and practices for over 17 years. The driving force behind every QCon conference is our passion to accelerate the software side of human progress.

Transcript

Steven Wu: I'm going to talk about streaming from Iceberg data lake. We all want to build data pipelines that have low latency, are cost effective, and easy to operate. I'm going to show you how to achieve those goals by using Apache Iceberg as a streaming source. I'll first give a quick introduction about Apache Iceberg and Apache Flink. Then we'll look at some of the motivation why we're thinking about using Apache Iceberg as a streaming source. Next, we'll talk about high level-design of the Iceberg streaming source and some of the benefits it brings in. After that, we're going to dive deeper into the Flink watermark alignment, and how it works in the Iceberg streaming source. Finally, we're going to look at the evaluation results.

Introduction to Iceberg and Flink

Apache Iceberg is an open table format for huge analytic datasets. What is a table format? You might be familiar with a file format like Apache Parquet. It is about organizing records within our data file. Table format like Apache Iceberg is about organizing data files within a table. Apache Iceberg was designed to adjust the corruptness and performance issue of the old Hive table format. Iceberg brings numerous benefits.

First, it provides serializable isolation. All table changes are atomic. Readers will never see partial or uncommitted data. Iceberg supports fast scan planning with advanced filtering using partitions and column level statistics. Iceberg supports safe schema and partition evolution. It supports time travel, which enables reproducible query using the exact same snapshot in the Iceberg table. Recently, Iceberg added a feature like branch and tagging. With branching, we can implement write, audit, and patch pattern. You can write new data to a staging branch first, run the data validation. Once the validation passed, you can merge the staging branch to the main trunk. You can merge the data at the Core, if you are familiar with the GitHub Git workflow.

Where does Apache Iceberg fit in the big data ecosystem? At the bottom level, we have data files like Parquet or Orc. They're stored on storage distributed file system, like HDFS or cloud storage like S3. Then we have a table format like Apache Iceberg, Delta Lake, or Apache Hudi. They provide SQL table semantics. Table format, they're all about metadata management. Compute Engines like Apache Flink, Apache Spark, Trino, they integrate with the table format to access underlying data files. Apache Flink is a distributed stream processing engine. It's one of the most popular stream processing engines. It is highly scalable.

A single Flink job can process trillions of events per day. It can run on sizable cores, and maintains terabyte of state in a single Flink job. Flink checkpoint provides strong state consistency or exactly once processing semantics. If the sink also supports transaction write, like Iceberg, we can achieve end-to-end exact once also. Flink's all event time processing semantics, it uses a watermark to reason about the event time progress within the application. Watermark also provide a flexible way to trade off between the completeness and the latency of the results. Flink provides a layered API from low-level data stream API to a high-level table API and SQL.

Motivation

Next, I'm going to explain why we're thinking about Iceberg as a streaming source. Here are the data pipelines you might be familiar with, where a device sends the raw events to edge API services, who then ingest the raw data into a Kafka message queue. This ingest [inaudible 00:04:46], a second or less. Then with stream processing engines like Flink, read data from Kafka and write a raw event to a data lake like Iceberg, after that is mostly batch jobs for ETL, feature engineering, and maybe offline model training. Those batch jobs are typically scheduled, hourly or daily. The overall latency is at least a few hours, if not days. We know latency matters for machine learning pipelines.

The industry is increasingly shifting towards real-time machine learning for online learning and online inference. How can we reduce the latency of our data pipeline? Kafka is probably the most popular streaming storage for Flink. It can achieve sub-second read latency. If you care about latency, why don't we switch everything to Kafka and Flink? There are pipelines built that way. Kafka is a fantastic streaming storage. I love Kafka. It can achieve sub-second read latency, which is great if you have really low latency requirement. As we know, system design is all about tradeoffs, nobody is good at everything. There are a few things in Kafka it's not very pleasant to work with. If you operate a stateful storage system, you know it's not easy. Setting up queries can be painful. Because we cannot give code to autoscale our stateful storage system, so we have to do careful capacity planning. We have to worry about the burst backfill workload and how to achieve isolation so that the bursty backfill workload does not affect the live traffic.

In a Flink Meetup, Sundaram from Netflix, demonstrated it's 38 times more expensive to store long-term data in Kafka compared to Iceberg. Kafka also recently introduced tiered storage within Kafka, that can help reduce difference, but it won't be able to completely bridge the gap. That's why many companies adopt the practice of tiered storage. They use Kafka to store the recent data for the last few hours or maybe few days. Then they store the long-term data on the Iceberg data lake. Kafka is useful for serving stream processing workload, and Iceberg used to serve the batch workload.

For availability reasons, Kafka brokers are typically placed on different availability zones, that could incur cross-AZ network traffic, from producer to broker, or broker to broker for replication, or broker to consumer. I did a back of envelope calculation using publicly listed pricing for a major cloud provider. My calculation shows that the cross-AZ network costs can be 10 times more expensive compared to the broker costs, compute and storage combined. Kafka do provide a rack-aware partition assignment. It can avoid the cross-AZ traffic from the broker to consumer, but we still have the producer to broker and the broker to broker cross-AZ traffic. That's still pretty significant. Kafka source doesn't provide filtering or projection at the broker side. If we had different consumer jobs, they're interested in different subset of data, like by filter or projection. All of them have to pull down the big pipe, and apply the filter and projection at the consumer side. This is obviously inefficient.

People have been setting up routing jobs just to apply the filter and projection to produce smaller substream, such as all the downstream jobs, they only need to consume the smaller substream. This definitely helps to improve the efficiency, but also, you create an actual routing job to maintain and the data duplications. In Kafka, a partition is an unbounded stream of workers. Kafka source statically assigned the partitions to the workers during the job initialization. This assignment remains static throughout the lifecycle of the job unless that topology changed, for example, adding a new worker.

This leads to a few limitations, first, in production, where sometimes we can see outlier worker node whose performance is significantly lower compared to the other peers. In this case, because the partition is unbounded stream, there's no work sharing or work stealing. Even if other workers they have extra capacity to spare, they cannot pick up the slack. Second, Kafka source parallelism is limited by the number of Kafka partitions. Adding a new worker won't help improve the read throughput because it's not getting a new partition assigned to it, overprovision the number of Kafka partition to help alleviate the problem here, but too many partitions does have performance penalty on the broker side and also on the consumer side.

Autoscaling can help improve the cost efficiency and reduce the operation burden. Let's assume initially we have six partitions assigned to three workers, each worker gets two partitions, would get a well-balanced workload assignment. Now let's say you're autoscaling, you need to scale up the number of workers because the traffic grows. Now we have to add a new worker, now we're assigning six partitions to four workers. We'll get an unbalanced workload assignment. Again, overprovisioning the number of partitions can help, but it also has performance implications. This type of thing, is there any other alternative streaming storage we can leverage?

Streaming from Iceberg

Next, I'm going to show you the high-level design of the Iceberg streaming source and the benefits it brings. As I showed earlier in the data pipeline graph, we typically have a stream processing engine at Flink, read data from Kafka, write them in data files, and commit the data file to the Iceberg table. Flink Iceberg sink commits the data files after every successful checkpoint in Flink. One to 10 minutes commit intervals are pretty common.

If we commit too frequently, like a second, we're going to create a lot of small files, and also create a lot of metadata files for Iceberg to keep track. Too many metadata files can also stress the Iceberg metadata system. If we commit too infrequently, like every hour, we can delay the availability of the data to the downstream consumers. The question we are asking, can a downstream Flink job stream data out of the Iceberg table as they are committed by the upstream job? The answer is yes. Let's see the kind of data the Iceberg table have snapshot at the end.

The upstream job commits the new data files, it will create a new snapshot in Iceberg table called n plus 1. Iceberg provides this incremental scaling API to discover the new data files appended between two snapshots. This is called the incremental scan. Then, the Iceberg source will discover the new files and the readers will read the records from those data files and admit them to the downstream operators.

For a streaming job, this cycle continues to evolve, because the stream job by definition never finish, it runs forever. This split discovery cycle continues forever. On the left, I have the code snippet for constructing the Kafka source in Flink. Basically, we set the bootstrap servers and Kafka topics, and StartingOffset strategy, I use latest offset, and deserializer. To construct an Iceberg source in Flink, we need to provide tableLoader, basically tell the Iceberg source how to load the Iceberg table from a catalog, like Hive metastore, or Glue. Then we also use StartingStrategy, here I chose starting from the latest snapshot in the Iceberg table. Also, monitorInterval. How often we want the Iceberg source to discover new data files from the table. Both Kafka and Iceberg source are available in the Flink SQL.

This paradigm of streaming from Iceberg works because we observe many streaming use cases, they're actually fine with minute-level latency. They're not looking for a sub-second level latency. With that, we can build low-latency data pipelines chained by Flink jobs streaming from Iceberg. We can achieve end-to-end latency maybe in minutes level. In a 2019 Flink Forward presentation, Stephan Ewen, put the data processing application in the spectrum of latency. At the most real time, we have the transaction processing, we have like the request-response model in a microservice. The latency here is typically milliseconds. Then we have the event-driven application. This is probably mostly the Kafka region come from. The latency may be the sub-seconds. Then we have streaming analytics, data pipelines, continuous processing, and batch processing. Batch processing, you can schedule it hourly or daily, so that latency is at least hours or days level.

Stephan thinks that the full category in the middle fit the stream processing paradigm. I think the Flink Iceberg streaming source fits well for the data pipelines and the continuous processing categories, where the latency expectation is probably minutes. You may wonder, this seems like a micro-batch model. It is. How is it different with incremental batch processing? Let me first explain, what do I mean by incremental batch processing? It means that we're scheduling the batch runs at a much shorter interval than we typically do, like every few minutes, or even every minute.

Typically, a batch job most commonly is hourly or daily. Each batch run process new files added since last run. Each batch run will produce a bookmark, tell you where to end, then you discover incremental data files. As the schedule interval shortens from hourly, or daily, or few minutes, or every minute, the line between streaming and batch actually becomes blurry. What the batch was streaming, you probably cannot tell.

What are the limitations of the incremental batch processing compared to the streaming execution we're talking about? First, as we shorten the scheduling interval to every minute or every couple minutes, it might be more expensive to tear down the job and bring it up again, just a few seconds later. We might as well just keep the job running all the time, that's essentially a streaming job, because a streaming job runs forever, they never terminate. Second, I've heard from batch users, they are favoring daily scheduling over the hourly scheduling, although hourly scheduling can bring lower latency. The reason is that with hourly scheduling, we can get 24 times more batch runs and higher chance of job failures, and the need to do backfill. That operation burden, push them away from the hourly scheduling to the daily scheduling.

Now imagine if we're going to schedule the batch runs every minute, or every couple minutes, we will get a lot more batch runs and a lot higher chance for job failures and the need to do backfill. This operation burden can be too high. Third, most non-trivial data processing involves state. For stateful batch processing, intermediate results are discarded after each run and are recomputed in the next run. With stream processing, the intermediate results are stored in Flink state, and they also checkpoint for fault-tolerance purpose.

FLIP stands for Flink Improvement Proposal. It is used to describe major new features or public API changes. FLIP-27 introduced a new source interface in Flink. It was designed to address the limitations with the old source function in Flink. The key idea in FLIP-27 source interface is to separate the work discovery with the actual reading. The enumerator runs on the job manager, which is the coordinator. It is responsible for discovery work. The parallel readers runs on the TaskManagers, those are the worker nodes. They are responsible for actually reading the data from storage. In FLIP-27, a unit of work is defined as a split. In the Kafka source, a split is a Kafka partition.

In Iceberg source, a split is a file, a slice of a large file or a group of small files. A split can be unbounded, like Kafka source case, or bounded like Iceberg source. In the Iceberg source, the enumerator will discover split from the Iceberg table and keep track with the pending split in an internal queue. The readers will request a split during the job initialization, or when it's done with the current split. Then we assign a split to the reader. Reader requests one split at a time, is a pull-based model. FLIP-27 unifies a batch and streaming sources. The only difference is whether the split discovery is one time for the batch execution or is periodic for the streaming execution.

What are the benefits of using Iceberg as a streaming source? They're tied to some of the pain points we talked about earlier. First, Iceberg can leverage managed cloud storage like S3. It offloads the operation burden to the cloud provider, like for the system upgrade, capacity planning, bursty workload, and isolation. Cloud storage is also very scalable and cost effective. This simplifies the storage architecture to a unified storage, because both recent data and historic data are stored in Iceberg. Iceberg is used to store both streaming workload and the batch workload. It unifies the live job and the backfill job source to Iceberg.

Most cloud blob storage like S3 don't charge cross-AZ network cost within a region. There's no cross-AZ network cost you were seeing earlier. Iceberg source supports advanced data pruning with filter and projection. You can provide a filter expression so that Iceberg scan planning will effectively prune out data files not matching the filter expression. With projection, the Iceberg source will only deserialize the columns it is interested in, because we use column-oriented file format like Parquet. In the Iceberg source, the split assignment is dynamic. It is pull-based, triggered by the reader. That outlier worker node, it allows other worker to pick up the slack because the split is bounded. That's an opportunity for the other workers to steer the work from the outlier worker node. The other workers, they can just pick up more data files and process them.

With the Iceberg source, we can keep together a lot more file segments than the number of Kafka partitions, this can bring some operational benefit. For example, with a higher number of file segment, we can support higher parallelism, which can be useful in some cases like backfill. You may want to be able to backfill much faster than the live job, so you want a higher parallelism. With more file segments, you cannot do that. With more file segments, it's also more autoscaling friendly. It is easier to assign the files to the readers in a more balanced fashion. We have merged this FLIP-27 Flink Iceberg source into the Apache Iceberg project. It is fully merged, but the only thing I want to call out is that for the stream and read, right now we only support the append only record. It does not support a CDC read on updates and the deletes. That needs to be addressed in the future.

Watermark Alignment

Next, I'm going to dive deeper into the Flink watermark alignment, and how it works in the Flink Iceberg source. I want to give a quick recap, what a watermark is. In Flink, event-based application, record can come out of order. It's necessary to buffer and wait to tolerate the late arrival data. We cannot wait forever, otherwise, the result will be delayed for a long time and the Flink state can go unboundedly. At a certain time, we have to stop waiting. That is that watermark is doing. It basically tells us all the data before the watermark have already arrived, so it's ok to emit a result. Occasionally, the watermark could be heuristic, could be wrong, hence the watermark is a tradeoff between the completeness and the latency of the results. We talk about watermark.

Then, what is watermark alignment? Why do you need it? Let's look at the use cases of stateful join. For example, let's say we join ad impression with ad click. For stateful join, we typically need to define join window, which can be determined by how late data can come. In this case, let's assume we have 6-hour join window. Let's assume everything works well, in steady state with live traffic. Let's say we need to replay the two Kafka sources to 24 hours ago, maybe there's outages, or there's something like a data issue. Because those two data streams, they may have different volume of data, let's assume the ads impression stream have 4x amount of data compared to clickstream. Because of different data volume, those two Kafka sources, they will proceed at a different pace. In Flink, the pace is measured as a watermark.

If we zoom into the Flink job a little bit, it can be simplified as four operators. You'd have two Kafka source operators, each need to form one Kafka stream. Then we have the co-process function, which does act as stateful join. This is a stateful operator. Then we have sink operator, which would write a join output to the sink storage. Because clickstream is much smaller, so let's assume that source 2 was able to catch up much faster, it was able to catch up with the live traffic, which it made the watermark as null. Source 1 was catching up on a much bigger stream, so it will catch up much slower, let's assume it made the watermark null minus 18-hour. Because the Flink watermark calculates the minimal value of all the inputs, so the stateful co-process function will have a watermark at null minus 18-hour. This is a problem. Because the watermark advanced slower in the stateful operator, now it needs to buffer 24 hours of that clickstream data versus 6-hour during steady state.

This excessive data buffering can lead to performance issues and stability issues for the stateful Flink job. This is actually one of the biggest pain points we experienced when running large stateful jobs in Flink. That's why Flink introduces watermark alignment strategy in the source. It basically tried to ensure that both sources they're progressing at a similar pace. The source tool will stop reading from Kafka when its local watermark is too far ahead compared to the global watermark. This is the alignment part. This avoids excess data buffering in the downstream stateful operator. For example, in this case, if we allow one throttle of maximum watermark drift, then the co-processor operator function only needs to buffer 7 hours of click data, which is very close to the steady state. This is how Flink watermark alignment works internally.

Let's assume we have three readers, each reader from one Kafka partition. The Kafka readers instruct a watermark from a timestamp field in the record it consumes from Kafka. They periodically send a local watermark to the enumerator for global aggregations. The enumerator calculates the global watermark as the minimum value of all the reported local watermarks, which would be 10 times, in this case. The enumerator then broadcasts the global watermark to all the readers. Then the readers need to check the difference between the local watermark and the global watermark, to determine if throttle is needed. Let's assume the max allowed watermark drift is 30 minutes. In this case, we did 0, we'll stop reading because its local watermark is too far ahead. This is where the alignment can come in. We need to align the progress of all the source readers.

Before I talk about how the watermark alignment work in the Iceberg streaming source, let's recap some of the difference between the Kafka source and the Iceberg source. In Kafka source, the splits are unbounded partitions. In Iceberg source, the splits are bounded data files or file segments. In Kafka source, the split assignment is static during the job initialization. In the Iceberg source, the split assignment is dynamic, is pull-based from the readers. In Kafka, records are ordered within a partition, first-in, first-out. In Iceberg source, records may not be sorted by the timestamp field within a data file. In Kafka source case, only readers can extract the watermark information from the records that will be read from Kafka, that's the only way. In the Iceberg source, because Iceberg keep track of column level statistics, like min-max values, in the metadata files, the enumerator actually can extract the watermark information from the min-max test column level statistic in the metadata file without actually downloading or reading the data files.

The key idea is that the Iceberg source enumerator needs to assign the splits ordered by the time. Let's assume the enumerator discovered 9 files, and we order those 9 files by the minimum timestamp value from the column level statistic. Here the notation means that F1 contains timestamp with records from 9:00 to 9:03. Let's assume the enumerator assigned the first three files to the first three readers. The readers extract the local watermark by the minimum timestamp value from this data file.

Reader-0, which calculate the local watermark at 9:00, because that's the minimum timestamp value we have from F1. The global watermark is calculated at the minimum of all the local watermark. The global watermark here will be 9:00. The readers will check the difference between the local watermark and the global watermark to decide if throttle is needed. In this case, all the three readers are ok to proceed because they are within the threshold of 10 minutes watermark drift. Let's assume some time later, Reader-2 finishes a data file, so it's requesting a new file and got a fallback. Now Reader-2 will advance its local watermark to 9:13, because that's the minimum timestamp value from F4. In this case, Reader-2 needs to stop reading, because its local watermark is too far ahead, because the max allowed drift is 10 minutes. That's the throttling alignment part.

Let's assume some time later, Reader-0 also finishes file, and got a new file F5, and Reader-0 advanced its local watermark to 9:16 because that's the minimum timestamp value from F5. Now Reader-0 also needs to stop reading because its local watermark is too far ahead. After some propagation delay, maybe a couple seconds or 2, the global watermark will advance to 9:04 because that's the minimum timestamp value of the other local watermark. In this case, the Reader-2 can resume reading because its local watermark now is within the threshold when compared to the latest global watermark. Now Reader-2 will resume the normal reading. The max out-of-orderliness we can achieve in the Iceberg source watermark alignment is equal to the max allowed watermark drift plus the max timestamp range in any data files. Let's assume, in this case, the max timestamp range from F6 is 8 minutes. If the max watermark drift allowed is 10 minutes, then we can achieve max out-of-orderliness in 18 minutes.

Why is it important to keep the max out-of-orderliness small? Remember the earlier graph I showed you, when we keep the max out-of-orderliness small, we can avoid excessive data buffering in the Flink stateful operator. My colleague Peter Vary is working on the upstream contribution for this watermark alignment in the Flink Iceberg source. This is not fully merged yet.

Evaluation Results

Finally, we're going to look at some of the evaluation results. This is a test pipeline setup. We have a Kafka source job that reads data from Kafka and writes the same data into Iceberg table. Then we have an Iceberg source job streaming data from the input Iceberg table, and write the same data in the output Iceberg table. It's like an Iceberg middle maker. The traffic volume is about 4000 messages per second. Each message is about 1 kilobyte. The TaskManager container at 1 CPU and 4 gigabyte memory, so it's a small test setup.

What are we evaluating? First, we want to look at the read latency of the Iceberg streaming source, because for streaming job, latency matters. We want to understand that. Second, we want to understand how the upstream commit interval affects the downstream consumption. The bursty consumption in the downstream. Third, we want to compare the CPU utilization between the Kafka source and the Iceberg source. We measure the latency from the Kafka broker to the Iceberg source with processing time. If we do [inaudible 00:35:14] three segments. First, we have the Kafka source, it's Kafka read latency, which is typically very fast, like sub-second. Then we have the commit interval. How often the upstream job commits a data file to the Iceberg table. Third segment is the poll interval, how often the Iceberg source polls the Iceberg table to discover the new data files.

The latency is most determined by the last two segments, the commit interval and the poll interval. Here we're going to look at some of the latency histogram with 10-second commit interval and 5-second poll interval. The x-axis is time, y-axis is latency in milliseconds. We can say the max latency is less than 40 seconds. The median latency is fluctuating around 10 seconds, which corresponds to the 10 second upstream commit interval, which makes sense. Because we commit a batch of files and downstream read a batch of files. Because the Iceberg data commit is a global transaction, that can lead to stop-and-go consumption pattern.

The top graph shows the CPU usage for the Kafka source job. The x-axis is time, y-axis is the number of cores it used. Remember, we're using a 1-core container. From this graph, we can see the Kafka source job is always busy, is always pulling data from Kafka, processing them. If we look at the Iceberg source job, here, we're using 5 minutes commit interval and 30-second poll interval. You can see the Iceberg source job was busy processing a batch of files for maybe 2 to 3 minutes. After it's done with the batch, it's idle for the next 2 minutes, maybe. Until the new batch commits in, it will be busy again, then it's idle for the next 2 minutes. This stop-and-go consumption pattern is what we expect out of the global transaction write in Iceberg. I want to show you, as we shorten the upstream commit interval and Iceberg source poll interval, the CPU usage can become much smoother for the Iceberg source job. The top graph is from the previous slide, is for 5 minutes commit interval, and 30 second poll interval.

The middle graph is for 60 seconds commit interval and 10 second poll interval. You can see, we don't have those long idle gaps of 2 minutes or 3 minutes anymore, although it is still much choppier, compared to Kafka source job. It's still burstier compared to Kafka source. As we further shorten the schedule interval to 10 seconds commit interval, and 5 second poll interval, we probably cannot see much difference in terms of CPU usage between the Iceberg source job and the Kafka source job. The pattern maybe looks very similar.

Finally, we're going to compare the CPU usage between the Kafka source job and the Iceberg source job. The only difference between those two jobs is in the streaming source: one in the Kafka, the other one in Iceberg. Everything else is actually pretty much the same. Here the CPU usage comparison between the Kafka source job and the Iceberg source job, we apply smooth functions so that the CPU usage does not look choppy, so it's easier to read the overall trend. You can say that Kafka source job has CPU utilization at 60%, Iceberg source job has CPU utilization at 36%. I know benchmark is tricky. I think the main takeaway is that Iceberg source job is as performant as Kafka streaming source, if not better.

Key Takeaway

We can build the low latency data pipelines chained by Flink job streaming from Iceberg. We can achieve overall latency in minutes level. They're cost effective, and they're also operation friendly.

Flink and Spark vs. Data Lake Solutions, for Batch Processing

For people coming from a batch background, is there a difference for a different compute engine like Flink, Spark. Do they work better with one data lake solution like Iceberg, or Hudi, or Delta Lake? I think initially, for example, Iceberg were coming from the batch analytics dataset background, and Hudi was coming from the streaming use cases. That's why I think Hudi probably have better support for the CDC read or something. Last couple years, we can see some convergence on those data lake solutions. They're also trying to learn from these jobs, trying to address the weakness with its own system. I think all the common data lake solutions like Iceberg, Delta Lake, Hudi, now they have all pretty good integration with other popular engines, like Flink, Spark, Trino. I don't think that I will say there's a significance of one is working better with the other. It sometimes depends on what technology you are familiar with or you're more comfortable with.

How Kafka Key Topics are Handled in Iceberg Source

Kafka supports key topic, will the record for the same key go to the same Kafka partition? With the Iceberg source, how is that handled? Iceberg supports a partition scheme called bucketing, it's like hashing. You can send all the records with the same bucket ID, which is basically the hashing function, [inaudible 00:42:00] by normal bucket, to the same partition bucket. That way, you can group all the records for the same key go to same bucket, then we can read, you can assign the file for the same bucket to the real reader.

The Difference in CPU Benchmark for Kafka Source vs. Iceberg Source

In the CPU benchmark, the Kafka source has 60% CPU usage and the Iceberg source has 36% CPU usage, why is it such a difference, the way we look into that? I did not look into what exactly is causing the difference. I think a couple things might affect that. Because in the Iceberg source, you have to pull in data in a much bigger chunk, because data comes from S3, we get a data file and [inaudible 00:43:08], so it's a much bigger batch compared to the Kafka source read. That's probably one of the reasons. Again, I don't want to read too much to say, Iceberg is more efficient than Kafka. It's as performant as it is.

Q&A

Participant 1: In the graphs you had the smoother graphs versus the very spiky graphs, on the smoother as you shorten the intervals, do you run a greater risk with backups?

Greater Risk with Backups with Shortened Intervals

Wu: Not really. As we shorten the interval, the problem is small data files. If we commit a data file every second, we're going to only write 1 second of data into a data file, that data file is typically going to be much smaller, maybe a few megabytes compared to an ideal data file size, maybe like 100 megabytes we're looking for. Yes, they are producing much small data file which is the downside. That's why we typically don't recommend to go very low. That's why I think 1 to 10 minutes commit is pretty common.

Iceberg Optimization Based on its Latency, Considering Flink Checkpoints and Recoverability

Participant 2: I have a question about the optimization for Iceberg based on rate of latency of Iceberg itself, you said 10, 11 minutes was optimal interval. Interestingly, you're not considering Flink checkpoints and recoverability, because you just mentioned operational costs, the larger the checkpoints the more costly it will become to maintain this.

Wu: The Iceberg source after the sink only committed data after every successful checkpoint, that's why the commit interval and the Flink checkpoint interval, they're actually identical. They're the same. The main thing is that, you don't want to commit too frequently or checkpoint too frequently because it can produce small data files, and also too many metadata files. You also don't want to commit too infrequently, that can lead to latency and the delay of the result. In production, most common I have seen is 1 to 10 minutes. Some people do far shorter, like every 5 seconds or 10 seconds. Some people do even longer than 15 minutes, but 1 to 10 minutes is more common.

Pros and Cons of Hybrid Source & Lambda Source, Compared to Iceberg

Participant 3: In the previous presentation given by Sharon, she talked about the hybrid source and the Lambda source where you read the historical data from data lake and latest data using Kafka. How do you see the pros and cons, purely in the line of the Iceberg source comparing with the notion of the Lambda source.

Wu: Iceberg can be used to store long-term data. You can store months, years' data in Iceberg. That's no problem. People will put the data in the blob storage like S3. I think the main problem with that, when you push up, let's say, for 30 days of data, the main problem is actually not to read all the state of data in memory, you want to use the watermark alignment to post the old data first, then move them along nicely. You don't want to read both new data and old data at the same time. The Lambda source is implemented in Flink as the hybrid source. The key idea with the hybrid source is, in Flink you're reading the historic data first, then you transition to the streaming source.

See more presentations with transcripts

 

Recorded at:

Jul 24, 2023

BT