BT

Facilitating the Spread of Knowledge and Innovation in Professional Software Development

Write for InfoQ

Topics

Choose your language

InfoQ Homepage Articles Democratizing Stream Processing with Apache Kafka and KSQL - Part 1

Democratizing Stream Processing with Apache Kafka and KSQL - Part 1

Bookmarks

Key Takeaways

  • Most stream processing technologies require you to write code in programming languages such as Java or Scala.
  • KSQL, the streaming SQL engine for Apache Kafka, can be used to perform stream processing tasks using SQL statements instead of writing a lot of code. 
  • KSQL, built on top of Kafka's Streams API, supports stream processing operations like filtering, transformations, aggregations, joins, windowing, and sessionization.
  • Use cases for KSQL include real-time reports and dashboards, monitoring infrastructure and IoT devices, and detection of anomalies and alert on fraudulent activities.

This is part one of the article Democratizing Stream Processing with Apache Kafka and KSQL. You can read part two here.

Would you cross the street with traffic information that is a minute old? Certainly not! Modern businesses have the same needs nowadays, whether it’s due to competitive pressure or because their customers have much higher expectations on how they want to interact with a product or service.

If I can rent and watch the latest movie at the flick of a button on my iPad, why should I need to wait several hours for a financial alert from my bank account?

Today’s businesses have data at their core, and this data is changing continuously at a rapid pace, with ever-increasing volumes. The technology that allows businesses to harness this torrent of information in real time is Stream Processing, and tens of thousands of companies like Netflix, Audi, PayPal, Airbnb, Uber, and The New York Times have picked Apache Kafka® as the de-facto streaming platform of choice to reshape their industries.

Whether you are reading a newspaper, shopping online, booking a hotel or a flight, taking a cab, playing a video game, or wiring money, many of these daily activities are already powered by Kafka behind the scenes.

Why Stream Processing

To show the value of stream processing, let’s pick a motivating example that is applicable across many different industries. Imagine that you need to create and maintain, in real-time, 360-degree profiles for your customers. This would be useful for many reasons such as:

  1. To create a better customer experience. For instance, "This premium customer tried several times in the past five minutes to checkout his shopping cart but failed because our latest website update was buggy, let’s offer him a discount immediately and apologize for the bad user experience."

  2. To minimize risk. For instance, "This new payment seems fraudulent because the payment originated outside the US, yet the customer’s mobile phone app reports she is in New York City. Let’s block the payment and contact the customer immediately."

What is needed for this use case is to bring together, in real-time, data from a variety of internal and possibly also external channels, and then consolidate this information in comprehensive customer profiles (sometimes called customer 360-degree profiles) that are continuously updated as soon as new information is available in any of the channels.

Here’s how we would sketch the high-level setup of this use case with Kafka. Customer data is continuously collected in streams of data from a variety of sources, from which a table of comprehensive customer profiles is built and continuously updated. All this happens in real-time and at scale.

Figure 1: Building comprehensive customer profiles in real-time from internal and external streams of customer data

The good news is that the sketch above is conceptually simple. It probably matches your understanding of how the body’s nervous system works to stream sensor data from our eyes, ears, limbs, etc. to our brain so that we can make informed decisions quickly – such as crossing the street when it is safe to do so. This is why Kafka is commonly described as "the central nervous system" for digital-native companies.

Until now, however, the world of stream processing has had a rather high barrier to entry. Today’s most popular stream processing technologies - including Apache Kafka’s Streams API - still require you to write code in programming languages such as Java or Scala - even for the simplest of tasks. This hard requirement on programming skills has prevented many companies from unlocking the benefits of stream processing at their full potential. But thankfully, now there is an easier way.

Enter KSQL, the streaming SQL engine for Apache Kafka

Launched in 2017, KSQL is the streaming SQL engine for Apache Kafka. KSQL lowers the bar to entry for the world of stream processing. Instead of writing a lot of code, all you need to get started with stream processing is a simple SQL statement, such as:

CREATE STREAM fraudulent_payments AS
    SELECT * FROM payments-kafka-stream 
    WHERE fraud_probability > 0.8

That’s it! And while this might not be immediately obvious, the above streaming query of KSQL is distributed, fault-tolerant, elastic, scalable, and real-time to meet the data needs of businesses today. KSQL achieves this because it is built on top of Kafka’s Streams API and therefore leverages Kafka’s strong technical foundation for distributed stream processing.

If we were to implement the KSQL query above directly with Kafka’s Streams API in Java or Scala, our application code would look somewhat like the code snippet below. Of course, we would also have to compile, package, and deploy this application.

// Using Kafka’s Streams API
object FraudFilteringApplication extends App {

  val builder: StreamsBuilder = new StreamsBuilder()
  val fraudulentPayments: KStream[String, Payment] = builder
    .stream[String, Payment]("payments-kafka-topic")
    .filter((_ ,payment) => payment.fraudProbability > 0.8)
  fraudulentPayments.to("fraudulent-payments-topic")

  val config = new java.util.Properties 
  config.put(StreamsConfig.APPLICATION_ID_CONFIG, "fraud-filtering-app")
  config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092")

  val streams: KafkaStreams = new KafkaStreams(builder.build(), config)
  streams.start()
}

For Java or Scala programmers, Kafka Streams API is a powerful library with which to incorporate stream processing in their applications—but KSQL gives a much broader base of developers the option to simply use SQL to express their stream processing requirements.

Of course, you can do much more with KSQL than I have shown in the simple example above. KSQL is open source (Apache 2.0 licensed) and built on top of Kafka’s Streams API. This means it supports a wide range of stream processing operations, including filtering, transformations, aggregations, joins, windowing, and sessionization. This way you can easily:

  • Drive real-time reports and dashboards
  • Monitor infrastructure and IoT devices
  • Detect anomalies and alert on fraudulent activities
  • Conduct session-based analysis of user activities
  • Perform real-time ETL
  • And much more.

Here are some further examples of what you can do with KSQL.

Example: KSQL for online data integration and enrichment

Most of the data processing done in companies falls in the domain of data enrichment and wrangling: take data coming out of several systems, transform it, join it together and store it into a key-value store, RDBMS, search index, cache, or other data-serving system. KSQL, when used with connectors for Kafka Connect for systems like Oracle, MySQL, Elasticsearch, HDFS, or S3, among many others, enables a move from batch data integration to real-time data integration.

As shown in the KSQL query below, you can enrich streams of data with metadata stored in tables using stream-table joins:

CREATE STREAM vip_users_clickstream AS
     SELECT user_id, user_country, web_page, action
     FROM website_clickstream c
     LEFT JOIN users u ON u.user_id = c.user_id
     WHERE u.level = 'Platinum';

You can also filter out data such as personally identifiable information (PII) before loading the stream into another system in order to be GDPR compliant. Here we remove the user_id field from the vip_users stream we created in the previous example by not including it in the resulting stream (only the fields user_country, web_page, and action make it into the result stream):

CREATE STREAM anonymized_vip_clickstream AS
     SELECT user_country, web_page, action
     FROM vip_users_clickstream;

Example: KSQL for real-time monitoring and analytics

While real-time monitoring and real-time analytics are rather different use cases, the stream processing functionality needed to implement is very similar. KSQL makes it straightforward to define appropriate metrics on a stream of raw events, whether these are generated from database updates, applications, mobile devices, vehicles, or any other kind.

The following query computes, in real-time, vehicles who may possibly fail based on the number of errors observed in the vehicles’ telemetry data during five-minute windows. This is an example of special type of aggregation: a windowed aggregation, where the input data is first grouped into windows of data (in this query the grouping/windowing is based on the timestamps in the input data), and each window is then aggregated separately.

CREATE TABLE possibly_failing_vehicles AS
     SELECT vehicle, COUNT(*)
     FROM vehicle_monitoring_stream
     WINDOW TUMBLING (SIZE 5 MINUTES)
     WHERE  event_type = 'ERROR'
     GROUP BY vehicle
     HAVING COUNT(*) >= 3;

Another use of KSQL is defining custom business-level metrics that are computed in real-time and from which you can monitor and alert. For example, showcasing the number of concurrent online players for a triple-A video game franchise ("Has the latest game expansion increased playtime?") or reporting on the number of abandoned shopping carts for an e-commerce website ("Has the latest update to our online store made it easier for customers to checkout?"). Similarly, you can also use KSQL to define a notion of correctness for your business applications and then check whether they are meeting this as they run in production.

The last query example above also happens to be an example of a stateful query. Stateful stream processing is arguably the most commonly needed feature in stream processing, and also a very challenging one to implement and to get right. So let’s take a closer look.

Stream Processing that Remembers: Stateful Stream Processing

The query in the example above performs aggregations on its input data. These aggregations are stateful operations, meaning that they are operations that require maintaining and updating state. Here, for example, the query needs to remember the previous error count per time window and per vehicle whenever a new error is observed because otherwise it can’t decide whether a vehicle’s five-minute error threshold was exceeded. One of the main challenges in distributed stream processing is to guarantee that such stateful operations work efficiently and correctly, taking into account factors such as machine crashes, network errors, and when running at scale.

In comparison, stateless operations are rather trivial: you must only move the computation from machine to machine. That’s cheap and easy. Stateful operations on the other hand require this plus being able to do things such as move the previous state from a failed machine to a live machine and doing so efficiently - given that you might have GBs of data to move. Most important of all, it must be done correctly. For example, in our last KSQL example query above, you certainly don’t want to send a false alert to a car driver about an imminent engine breakdown only because we have processed the same error message multiple times!

For faster processing and for better fault-tolerance, KSQL is typically run distributed across multiple machines, VMs, or containers. So how does it solve the stateful challenge? The answer is that KSQL is built on Kafka’s Streams API, and this gives all KSQL queries - including stateful queries - the following characteristics:

  • Fault-tolerant in case of machine failures, where state and computation is moved from the failed machine to a live machine automatically. This is achieved by continuous “streaming backups” of state from KSQL to Kafka, and by automatically restoring state back from Kafka when and where needed.
  • Elastic, so that you can add and remove new machines on the fly during live operations in order to scale processing in and out, without data loss and still with correct processing results.
  • Scalable, where processing load and state are automatically spread across machines that collaboratively process the data. This is achieved by leveraging Kafka’s processing protocol and its partitioned storage of data, where processing tasks are spread and parallelized across machines based on the data partitions.

Because these properties are provided out of the box by KSQL, you only need to focus on writing the desired SQL statements for your stream processing needs. Similarly, KSQL therefore fits very well into modern deployment environments that are built on top of technologies such as Docker and Kubernetes or cloud-native environments.

The Stream-Table Duality

One aspect that is unique to Kafka is the first-class support for streams and tables. We have already seen both streams and tables at play in the previous examples - did you spot it? In the very last example the result of the stateful query was a table, even though its input was a stream of data:

CREATE TABLE possibly_failing_vehicles AS
     SELECT vehicle, COUNT(*)
     FROM vehicle_monitoring_stream
     WINDOW TUMBLING (SIZE 5 MINUTES)
     WHERE  event_type = 'ERROR'
     GROUP BY vehicle
     HAVING COUNT(*) >= 3;

You may wonder, "What is the difference between streams and tables?" And, more importantly, "how does this help me in my daily work?" In short, it helps a lot! Tables and Streams give you the necessary primitives with which to reason about and model our data to answer the business questions asked of it. Here’s the best plain English analogy I can think of:

  • A stream in Kafka is the full history of world (or business) events from the beginning of time to today. It represents the past and the present. As we go from today to tomorrow, new events are constantly being added to the world’s history. In Kafka, events are written to, stored in, and read from Kafka topics. And just as we cannot change the past, Kafka is an immutable, append-only, log of events. In analytical RDBMS terms, you can think of a stream as how you model Facts.
  • A table in Kafka is the state of the world today (or, more generally, at a given point in time). It represents the present, or a date in the past. It is an aggregation of the history of world events, and this aggregation is changing constantly as we go from today to tomorrow. Tables are derived from streams by processing those streams, and more precisely by aggregating streams, and you do this processing with tools such as Kafka’s Streams API and KSQL. In analytical RDBMS terms, you can think of tables as how you model Dimensions, holding the current value for a key.

We call this intrinsic relationship the stream-table duality. If you want to learn more about the fascinating relationship between streams and tables, I recommend to read my article Of Streams and Tables in Kafka and Stream Processing.

But wait a minute. Where did the concept of tables originally come from? The answer is databases, which we have been using successfully for decades to build applications and services. In databases, the first-order construct is the table. This is what you work with. Streams actually also exist in databases in the form of the transaction log on which databases are built - for example in the form of MySQL’s binlog or Oracle’s Redo Log - but they are typically hidden from you in the sense that you do not interact with them directly. Following on from the earlier analogy, a database knows about the present, but it does not know about the past - if you need the past, fetch your backup tapes which are in effect hardware streams ...

Thus, Kafka and stream processing turn the database inside-out. Here, the first-order construct is the stream. Tables are derivations of streams, as we have seen above. As Pat Helland put it in Immutability Changes Everything, "The truth is the log [stream]. The database is a cache of a subset of the log." Kafka knows about the present but also about the past. That’s why The New York Times store all articles ever published – 160 years of journalism going back to the 1850s – in Kafka as the source of truth.

In short, a database thinks table first, stream second. Kafka thinks stream first, table second. And by providing native support for streams and tables in both Kafka Streams and KSQL Kafka helps you bridge the worlds of stream processing and databases. To make this even more powerful, you can hook existing databases and tables in real-time into Kafka via Kafka Connect. Given these statements, we might rightfully conclude that Kafka is therefore a stream-relational rather than stream-only system.

Streams and Tables Galore

The stream-table duality is crucial in practice for two important reasons. First, because you probably already have a lot of data in existing databases in your company, and you’ll want to apply this data to use cases driven by stream processing. Second, once you start implementing your own stream processing applications you will soon realize that most use cases actually require to model data as both streams and tables - even when there are no "real" databases involved. That’s because tables represent state. Tables enter the picture whenever you are doing any stateful processing like aggregations (for example, computing 5-minute averages for key business metrics) or joins (such as real-time data enrichment joining a stream of facts with dimension tables).

Here is an example of streams and tables at play, where we use KSQL to compute the number of geo-location updates by users in real-time. For example, this could be a mobile application like Strava that allows users to check-in manually to a location or that periodically sends geolocation updates automatically. The input is a stream of geolocation updates, and the result is a continuously updated table. That’s because COUNT() is an aggregation and thus a stateful operation: in order to add +1 to the current count one must be able to remember what the current count was in the first place! This KSQL query below will work the same for a few geolocation updates per second but also for hundreds of thousands updates per second and more.

CREATE TABLE geo_location_checkins_per_user AS
    SELECT username, COUNT(*)
    FROM geo_location_updates
    GROUP BY username;

In the next example we compute hourly aggregates on an ‘orders’ stream based on the status of orders. This is yet another common use case in practice. Again, the result of this computation is a table (‘orders_hourly_aggregates’) that is continually updated whenever a new order arrives. The query also showcases some of the available scalar functions in KSQL.

CREATE TABLE orders_hourly_aggregates AS
    SELECT
        order_status,
        COUNT(*) AS order_count,
        MAX(ORDER_TOTAL) AS max_order_total,
        MIN(ORDER_TOTAL) AS min_order_total,
        SUM(ORDER_TOTAL) AS sum_order_total,
    FROM orders
    WINDOW TUMBLING (SIZE 1 HOUR)
    GROUP BY order_status

Streaming Batteries Included

Kafka provides a complete streaming platform on which to build applications and systems. Whether you are implementing a simple streaming enrichment or something more sophisticated like fraud detection or 360-degree customer profiles, you want an easy-to-use stream processing solution such as Kafka with all batteries and core data structures included - which particularly includes first-class support for streams and tables. Without this support you will invariably end up building needlessly complex architectures in which you stitch together a stream(-only) processing technology with a remote datastore like Cassandra or MySQL to enable stateful processing, and probably also having to add Hadoop/HDFS just to enable fault-tolerance for said processing. How many technology balls can one juggle at a time?

Conclusion

In this article I gave a whirlwind tour of stream processing with KSQL, the streaming SQL engine for Apache Kafka. We took a look at several concrete examples, covered at a high-level how KSQL addresses the challenges of stateful stream processing, and how Kafka and KSQL help to bridge the world of streams and databases through their first-class support for streams and tables so that you can implement your own use cases end-to-end with minimal effort.

If you are interested in learning more on KSQL in the meantime, I suggest the following resources:


About the Author

Michael Noll is a product manager at Confluent, the company founded by the creators of Apache Kafka. Previously, Michael was the technical lead of DNS operator Verisign’s big data platform, where he grew the Hadoop, Kafka, and Storm-based infrastructure from zero to petabyte-sized production clusters spanning multiple data centers - one of the largest big data infrastructures in Europe at the time. He is a well-known tech blogger in the big data community. In his spare time, Michael serves as a technical reviewer for publishers such as Manning and is a frequent speaker at international conferences, including Strata, ApacheCon, and ACM SIGIR. Michael holds a PhD in computer science.

This is part one of the article Democratizing Stream Processing with Apache Kafka and KSQL. You can read part two here.

 

Rate this Article

Adoption
Style

BT