BT

Facilitating the Spread of Knowledge and Innovation in Professional Software Development

Write for InfoQ

Topics

Choose your language

InfoQ Homepage Articles Democratizing Stream Processing with Apache Kafka® and KSQL - Part 2

Democratizing Stream Processing with Apache Kafka® and KSQL - Part 2

Bookmarks

Key Takeaways

  • Build data integration and processing applications using Apache Kafka and KSQL for use cases like customer operations, operational dashboard, and ad-hoc analytics.
  • Main benefits of stream processing include: data enrichment is done once and processing is low latency, and real-time notifications to customer ops team.
  • You can use a Change-Data-Capture (CDC) tool to mirror the data from the database, and any subsequent changes, into a Kafka topic.
  • Using KSQL, it's simple to join the stream of business data with the reference information originating from a database and maintained in a Kafka topic.
  • Scale out the application to handle greater number of notifications, without needing to modify the filtering logic.
     

This is part two of the article Democratizing Stream Processing with Apache Kafka and KSQL. You can read part one here.

In this article we’ll see how Apache Kafka® and KSQL can be used to build powerful data integration and processing applications. The example is a simple one from the world of e-commerce: On a website, user reviews are tracked through a series of events. Information about these users such as their name, contact details, and loyalty club status is held on a database elsewhere. There are at least three uses for this review data:

  • Customer Operations - If a user with high loyalty club status leaves a poor review, we want to do something about it straight away, to reduce the risk of them churning. We want an application that will notify us as soon as a review meeting this condition is met. By doing so immediately we offer customer service that is far superior than had we waited for a batch process to run and flag the user for contact at a later date.
  • Operational dashboard showing live feed of reviews, rolling aggregates for counts, median score, and so on - broken down by user region, etc.
  • Ad-hoc analytics on review data combined with other data (whether in a data lake, data warehouse, etc). This could extend to broader data science practices and machine learning use.

All of these need access to the review information along with details of the user.

We’re going to see how this can be implemented using a more modern pattern, based on a streaming platform. We’ll use the open-source Apache Kafka and KSQL projects to do this. KSQL is the streaming SQL engine for Apache Kafka, implemented on top of the Kafka Streams API which is part of Apache Kafka itself.

Figure 1 below shows how the sample streaming application works.

Figure 1.  Streaming data application

The events are the reviews that the users submit on the site, and these are streamed directly into Kafka. From here, they can be joined to the user information in real time and the resulting enriched data written back to Kafka. With this transformation done, the data can be used to drive the above applications and targets. The transformation logic is only required once. The data is extracted from the source system once. The transformed data can be used multiple times, by independent applications. New sources and targets can be added, without any change to the existing components. All of this is very low-latency.

Thus at a high level, the design looks like this:

  • Web app emits review directly to Kafka
  • Kafka Connect streams snapshot of user data from database into Kafka, and keeps it directly in sync with CDC
  • Stream processing adds user data to the review event, writes it back to a new Kafka topic
  • Stream processing filters the enriched Kafka topic for poor reviews from VIP users, writes to a new Kafka topic
  • Event-driven app listens to Kafka topic, pushes notifications as soon as VIP user leaves a poor review
  • Kafka Connect streams the data to Elasticsearch for operational dashboard
  • Kafka Connect streams the data to S3 for long-term ad-hoc analytics and use alongside other datasets

Some of the main benefits of this include:

  • Data enrichment is done once, and available for any consuming application
  • Processing is low latency
  • Notifications to customer ops team happen as soon as the VIP customer leaves a poor review - much better customer experience, more chance of retaining their business
  • Easy to scale by adding new nodes as required for greater throughput

Implementation

Let’s take a look in detail at the detail of building this. You can find the code for all of the examples, along with a docker-compose files, on github.

Getting data into Kafka

Web applications have several options for streaming events into Kafka.

  • The Producer API is available through numerous client libraries, for languages including Java, .NET, Python, C/C++, Go, node.js, and more.
  • There is an open-source REST proxy, through which HTTP calls can be made to send data to Kafka.

In our example the application is using the Producer API.

The messages sent from the web application into the Kafka topic ratings look like this:

{
  "rating_id": 604087,
  "user_id": 7,
  "stars": 1,
  "route_id": 2777,
  "rating_time": 1528800546808,
  "channel": "android",
  "message": "thank you for the most friendly, helpful experience today at your new lounge"
}

Making Data from a Database Available in Kafka

When building applications it is a common requirement to use data stored in a database. In our example the user data is held in MySQL, although the design pattern is the same regardless of specific RDBMS technology.

When writing stream processing applications with Kafka, the standard approach to integrating with data held in a database is to ensure the data itself is stored, and maintained, in Kafka. This is easier than it sounds - we simply use a Change-Data-Capture (CDC) tool to mirror the data from the database, and any subsequent changes, into a Kafka topic.

The advantage of this is that we isolate the database from our processing. This has two key advantages; we don’t overload the database with our requests, and we are free to use the data as we chose, without coupling our development and deployment processes to that of the database owner.

There are multiple CDC techniques and tools, which we will not cover here. Since the data is in MySQL, we use the Debezium project for CDC. It snapshots the contents of the users table into Kafka, and uses MySQL’s binlog to detect and replicate instantly any subsequent changes made to the data in MySQL into Kafka.

Figure 2 below shows the change data capture process flow details.

Figure 2. Streaming application change data capture

The messages in the Kafka topic asgard.demo.CUSTOMERS streamed from the database look like this:

{
  "id": 1,
  "first_name": "Rica",
  "last_name": "Blaisdell",
  "email": "rblaisdell0@rambler.ru",
  "gender": "Female",
  "club_status": "bronze",
  "comments": "Universal optimal hierarchy",
  "create_ts": "2018-06-12T11:47:30Z",
  "update_ts": "2018-06-12T11:47:30Z",
  "messagetopic": "asgard.demo.CUSTOMERS",
  "messagesource": "Debezium CDC from MySQL on asgard"
}

Enriching streams of events with information from a database

Using KSQL it is simple to join the stream of ratings with our reference information originating from a database and maintained in a Kafka topic.

The left join details are shown in Figure 3 below.

The first step is to ensure that the messages in the customer topic are keyed on the join column, which in this case is the customer ID. We can actually do this re-partitioning using KSQL itself. The output of a KSQL CREATE STREAM is written to a Kafka topic, named by default after the stream itself:

-- Process all data that currently exists in topic, as well as future data
SET 'auto.offset.reset' = 'earliest';

-- Declare source stream
CREATE STREAM CUSTOMERS_SRC WITH \
(KAFKA_TOPIC='asgard.demo.CUSTOMERS', VALUE_FORMAT='AVRO');

-- Re-partition on the ID column and set the target topic to
-- match the same number of partitions as the source ratings topic:
CREATE STREAM CUSTOMERS_SRC_REKEY WITH (PARTITIONS=1) AS \
SELECT * FROM CUSTOMERS_SRC PARTITION BY ID;

Now every message that arrives on the asgard.demo.CUSTOMERS topic will be written to the CUSTOMERS_SRC_REKEY Kafka topic with the correct message key set. Note that we’ve not had to declare any of the schema, because we’re using Avro. KSQL and Kafka Connect both integrate seamlessly with the open-source Confluent Schema Registry to serialize/deserialize Avro data and store/retrieve schemas in the Schema Registry.

To do the join we use standard SQL join syntax:

-- Register the CUSTOMER data as a KSQL table, 
-- sourced from the re-partitioned topic
CREATE TABLE CUSTOMERS WITH \
    (KAFKA_TOPIC='CUSTOMERS_SRC_REKEY', VALUE_FORMAT ='AVRO', KEY='ID');

-- Register the RATINGS data as a KSQL stream, sourced from the ratings topic
CREATE STREAM RATINGS WITH (KAFKA_TOPIC='ratings',VALUE_FORMAT='AVRO');

-- Perform the join, writing to a new topic - note that the topic
-- name is explicitly set. If the KAFKA_TOPIC argument is omitted the target
-- topic will take the name of the stream or table being created.
CREATE STREAM RATINGS_ENRICHED WITH \
    (KAFKA_TOPIC='ratings-with-customer-data', PARTITIONS=1) AS \
SELECT R.RATING_ID, R.CHANNEL, R.STARS, R.MESSAGE, \
       C.ID, C.CLUB_STATUS, C.EMAIL, \
       C.FIRST_NAME, C.LAST_NAME \
FROM RATINGS R \
     LEFT JOIN CUSTOMERS C \
       ON R.USER_ID = C.ID \
WHERE C.FIRST_NAME IS NOT NULL ;

We can inspect the number of messages processed by this query:

ksql> DESCRIBE EXTENDED RATINGS_ENRICHED;

Name                 : RATINGS_ENRICHED
Type                 : STREAM
Key field            : R.USER_ID
Key format           : STRING
Timestamp field      : Not set - using <ROWTIME>
Value format         : AVRO
Kafka topic          : ratings-with-customer-data (partitions: 4, replication: 1)

[...]

Local runtime statistics
------------------------
messages-per-sec:      3.61   total-messages:      2824     last-message: 6/12/18 11:58:27 AM UTC
 failed-messages:         0 failed-messages-per-sec:         0      last-failed:       n/a
(Statistics of the local KSQL server interaction with the Kafka topic ratings-with-customer-data)

In effect, this SQL statement is itself actually an application just as we would code in Java, Python, C ... it’s a continually running process that takes input data, processes it, and outputs it. The output we see above is the runtime metrics for this application.

Filtering streams of data with KSQL

The output of the JOIN that we created above is a Kafka topic, populated in real-time driven by the events from the source ratings topic, as shown below in Figure 4.

We can build a second KSQL application which is driven by this derived topic, and in turn apply further processing to the data. Here we will simply filter the stream of all ratings to identify just those which are both:

  • negative ratings (which we define—on a scale of 1-5—as being less than 3)
  • ratings left by customers of 'Platinum' status

SQL gives us the semantics with which to express the above requirements almost literally. We can use the KSQL CLI to validate the query first:

SELECT CLUB_STATUS, EMAIL, STARS, MESSAGE \
FROM   RATINGS_ENRICHED \
WHERE  STARS < 3 \
  AND  CLUB_STATUS = 'platinum';

platinum | ltoopinc@icio.us | 1 | worst. flight. ever. #neveragain
platinum | italboyd@imageshack.us | 2 | (expletive deleted)

And then as before, the results of this continuous query can be persisted to a Kafka topic simply be prefixing the statement with CREATE STREAM ... AS (often referred to as the acronym CSAS). Note that we have the option of including all source columns (SELECT *), or creating a subset of the available fields (SELECT COL1, COL2) - which we use depends on the purpose of the stream being created. We’re also going to write the target messages as JSON:

CREATE STREAM UNHAPPY_PLATINUM_CUSTOMERS \
       WITH (VALUE_FORMAT='JSON', PARTITIONS=1) AS \
SELECT CLUB_STATUS, EMAIL, STARS, MESSAGE \
FROM   RATINGS_ENRICHED \
WHERE  STARS < 3 \
  AND  CLUB_STATUS = 'platinum';

Inspecting the resulting Kafka topic, we can see that it contains just the events in which we are interested. Just to reinforce the point that this is a Kafka topic - and I could query it with KSQL - here I’ll step away from KSQL and use the popular kafkacat tool to inspect it:

kafka-console-consumer \
--bootstrap-server kafka:9092 \
--topic UNHAPPY_PLATINUM_CUSTOMERS | jq '.'
{
  "CLUB_STATUS": {
    "string": "platinum"
  },
  "EMAIL": {
    "string": "italboyd@imageshack.us"
  },
  "STARS": {
    "int": 1
  },
  "MESSAGE": {
    "string": "Surprisingly good, maybe you are getting your mojo back at long last!"
  }
}

Before leaving KSQL, let’s remind ourselves that we’ve just, in effect, written three streaming applications:

ksql> SHOW QUERIES;

 Query ID                          | Kafka Topic                | Query String
------------------------------------------------------------------------------------------------------------
 CSAS_CUSTOMERS_SRC_REKEY_0        | CUSTOMERS_SRC_REKEY        | CREATE STREAM CUSTOMERS_SRC_REKEY  […]
 CSAS_RATINGS_ENRICHED_1           | RATINGS_ENRICHED           | CREATE STREAM RATINGS_ENRICHED  […]
 CSAS_UNHAPPY_PLATINUM_CUSTOMERS_2 | UNHAPPY_PLATINUM_CUSTOMERS | CREATE STREAM UNHAPPY_PLATINUM_CUSTOMERS  […]

Push notifications driven from Kafka topics

The above UNHAPPY_PLATINUM_CUSTOMERS topic that we’ve created can be used to drive an application that we write to alert our customer operations team if an important customer has left a poor review. The key thing here is that we’re driving a real-time action based on an event that has just occurred. It’s no use finding out as the result of a batch-driven analysis next week that last week we upset a customer. We want to know now so that we can act now and deliver a superior experience to that customer.

There are numerous Kafka client libraries for languages - almost certainly one for your language of choice. Here we’ll use the open-source Confluent Kafka library for Python. It is a simple example of building an event-driven application, which listens for events on a Kafka topic, and then generates a push notification. We’re going to use Slack as our platform for delivering this notification. The below code snippet omits any kind of error-handling, but serves to illustrate the simplicity with which we can integrate an API such as Slack’s with a Kafka topic on which we listen to events to trigger an action.

from slackclient import SlackClient
from confluent_kafka import Consumer, KafkaError
sc = SlackClient('api-token-xxxxxxx')

settings = {
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'python_kafka_notify.py',
    'default.topic.config': {'auto.offset.reset': 'largest'}
}
c = Consumer(settings)
c.subscribe(['UNHAPPY_PLATINUM_CUSTOMERS'])

while True:
    msg = c.poll(0.1)
    if msg is None:
        continue
    else:
        email=app_msg['EMAIL']
        message=app_msg['MESSAGE']
channel='unhappy-customers'
text=('`%s` just left a bad review :disappointed:\n> %s\n\n_Please contact them immediately and see if we can fix the issue *right here, right now*_' % (email, message))
        sc.api_call('chat.postMessage', channel=channel,
            text=text, username='KSQL Notifications',
            icon_emoji=':rocket:')

finally:
    c.close()

Figure 5 below shows the user notification using Slack API.

[Click on the image to enlarge it]

It’s worth restating here that the application we’re building (call it a microservice if you like) is event driven. That is, the application waits for an event and then acts. It is not trying to process all data and look for a given condition, and nor is it a synchronous request-response service responding to a command. We’ve separated out the responsibilities:

  • The filtering of a real-time stream of events for a determined condition is done by KSQL (using the CREATE STREAM UNHAPPY_PLATINUM_CUSTOMERS statement that we saw above), and matching events are written to a Kafka topic
  • The notification service has a sole responsibility and purpose for taking an event from the Kafka topic, and generating a push notification from it. It does this asynchronously.

The benefits here are clear:

  • We could scale out the application to handle greater number of notifications, without needing to modify the filtering logic
  • We could replace the application with an alternative one, without needing to modify the filtering logic
  • We could replace or amend the filtering logic, without needing to touch the notification application

Kafka and the Request/Response pattern

A common challenge to the concept of using Kafka as a platform on which to write applications is that the event-driven paradigm isn’t applicable to the application’s flow, and thus by extension Kafka isn’t either. This is a fallacy, with two key points to remember:

  • It is fine to use both Event-Driven and Request/Response patterns - they are not mutually exclusive, and some requirements will demand Request/Response
  • The key driver should be the requirements; inertia of existing approaches should be challenged. By using an event-driven architecture for some or all of your application’s messaging you benefit from the asynchronicity that it brings, its scalability, and its integration into Kafka and thus all other systems and applications using Kafka too.

For extended discussion on this, see Ben Stopford’s series of articles and recent book, Designing Event Driven Systems.

Streaming data from Kafka to Elasticsearch for operational analytics

Streaming data from Kafka to Elasticsearch is simple using Kafka Connect. It’s provides scalable streaming integration driven just from a configuration file. An open-source connector for Elasticsearch is available both standalone and as part of Confluent Platform. Here we’re going to stream the raw ratings as well as the alerts into Elasticsearch:

"name": "es_sink",
  "config": {
    "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "topics": "ratings-with-customer-data,UNHAPPY_PLATINUM_CUSTOMERS",
    "connection.url": "http://elasticsearch:9200"
    [...]
    }
}

Using Kibana on the data streaming into Elasticsearch from Kafka Connect it is easy to build a real-time dashboard on the enriched and filtered data, is shown below in Figure 6.

[Click on the image to enlarge it]

Streaming data from Kafka to a Data Lake

Finally, we’re going to stream the enriched ratings data to our data lake. From here it can be used for ad-hoc analysis, training machine learning models, data science projects, and so on.

Data in Kafka can be streamed to numerous types of target using Kafka Connect. Here we’ll see S3 and BigQuery, but could just as easily use HDFS, GCS, Redshift, Snowflake DB, and so on.

As shown above with streaming data to Elasticsearch from Kafka, setup is just a matter of a simple configuration file per target technology.

"name": "s3-sink-ratings",
"config": {
  "connector.class": "io.confluent.connect.s3.S3SinkConnector",
  "topics": "ratings-with-customer-data",
  "s3.region": "us-west-2",
  "s3.bucket.name": "rmoff-demo-ratings",

With the data streaming to S3 we can see it in the bucket and this is shown in Figure 7.

[Click on the image to enlarge it]

We’re also streaming the same data to Google’s BigQuery:

"name": "gbq-sink-ratings",
"config": {
  "connector.class":"com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",
  "topics": "ratings-with-customer-data",
  "project":"rmoff",
  "datasets":".*=ratings",

[Click on the image to enlarge it]

One of the many applications that can be used to analyze the data from these cloud object stores is Google’s Data Studio:

[Click on the image to enlarge it]

The point here is less about the specific technology illustrated, but that whatever technology you chose to use for your data lake, you can stream data to it easily using Kafka Connect.

Into the future with KSQL and the Streaming Platform

In this article we’ve seen some of the strong arguments for adopting a streaming platform as a core piece of your data architecture. It provides the scalable foundations that enable systems to integrate and evolve in a flexible way due to its decoupled nature. Analytics benefits from a streaming platform through its powerful integration capabilities. That it is streaming and thus real-time is not the primary motivator. Applications benefit from a streaming platform because it is real-time, and because of its integration capabilities.

With KSQL it is possible to write streaming processing applications using a language familiar to a large base of developers. These applications can be simple filters of streams of events passing through Kafka, or complex enrichment patterns drawing on data from other systems including databases.

To learn more about KSQL you can watch the tutorials and try them out for yourself. Sizing and deployment practices are documented, and there is an active community around it on the Confluent Community Slack group. The examples shown in this article are available on github.

About the Author

Robin Moffatt is a Developer Advocate at Confluent, the company founded by the creators of Apache Kafka, as well as an Oracle ACE Director and Developer Champion. His career has always involved data, from the old worlds of COBOL and DB2, through the worlds of Oracle and Hadoop, and into the current world with Kafka. His particular interests are analytics, systems architecture, performance testing and optimization. You can read his blog posts here and here (and previously here) and can be found tweeting grumpy geek thoughts as @rmoff. Outside of work he enjoys drinking good beer and eating fried breakfasts, although generally not at the same time.

This is part two of the article Democratizing Stream Processing with Apache Kafka and KSQL. You can read part one here.

 

Rate this Article

Adoption
Style

BT