BT

InfoQ Homepage Articles Real-Time Data Processing Using Redis Streams and Apache Spark Structured Streaming

Real-Time Data Processing Using Redis Streams and Apache Spark Structured Streaming

This item in japanese

Bookmarks

Key Takeaways

  • Apache Spark's Structured Streaming brings SQL querying capabilities to data streams, allowing you to perform scalable, real-time data processing.
  • Redis Streams, the new data structure introduced in Redis 5.0, enables collecting, persisting and distributing data at a high speed with sub-millisecond latency.
  • Integrating Redis Streams and Structured Streaming simplifies scaling out continuous applications.
  • The open source, Spark-Redis library connects Apache Spark with Redis. The library offers RDD and Dataframe APIs for Redis data structures and allows you to use Redis Streams as a data source for Structured Streaming.
     

Structured Streaming, a new capability introduced with Apache Spark 2.0, has gathered a lot of traction in the industry and amongst the data engineering community. Built on top of the Spark SQL engine, Structured Streaming APIs deliver an SQL-like interface for streaming data.

Initially, Apache Spark processed Structured Streaming queries in micro-batches, with a latency of about 100 milliseconds.

Last year, version 2.3 introduced low latency (1 millisecond) "continuous processing", which is further fueling adoption of Structured Streaming.

To work at the speed of Spark’s continuous processing, you need to augment it with a high-speed streaming database like Redis.

This open source in-memory database is known for its high speed and sub-millisecond latency. Redis 5.0 recently introduced a new data structure called Redis Streams, which enables Redis to consume, hold and distribute streaming data between multiple producers and consumers.

Now the question is, what’s the best way to deploy Redis Streams as your streaming database alongside your Apache Spark data processing engine?

The Spark-Redis library, written in Scala, integrates Apache Spark and Redis so you can:

  • Read and write data as RDDs in Redis
  • Read and write data as DataFrames in Redis (i.e., it allows mapping Spark SQL tables to Redis data structures)
  • Use Redis Streams as a source for Structured Streaming
  • Implement Redis as a sink after Structured Streaming  

In this article, I’ll present a real-world scenario and walk you through how to process streaming data in real-time using Redis and Apache Spark.

A hypothetical scenario: computing real-time clicks

Let’s assume we’re an advertising company that places display advertising on popular websites. We create dynamic memes based on popular images on social media and place them as ads. In order to maximize our profits, we have to identify the assets that go viral or get more clicks so that we can display them more often.

Most of our assets have a short shelf life, and processing clicks in real-time lets us capitalize on trending images quickly, which is critical to the business. Our ideal streaming data solution must record all ad clicks, process them in real-time and also compute real-time click counts for each asset. Here’s how we could design it:

Figure 1. Building blocks to compute click count in real-time

Input

For every click, our data ingest solution (block 1 in Figure 1) places the asset id and the cost of the advertisement in the Redis Stream as:

XADD clicks * asset [asset id] cost [actual cost]

For example:

XADD clicks * asset aksh1hf98qw7tt9q7 cost 29

Output

After the data processing by block 2 in Figure 1, our result is stored in a data store. The data querying solution (block 3 in Figure 1) provides an SQL interface to the data so we can query the top clicks in the last few minutes:

select asset, count from clicks order by count desc

asset            count
-----------------     -----
aksh1hf98qw7tt9q7    2392
i2dfb8fg023714ins    2010
jsg82t8jasvdh2389    1938

Architecting the solution

Now that we’ve defined the business requirements, let’s explore how we can use Redis 5.0 and Apache Spark 2.4 to build this solution. For the purposes of this article, I’m developing it in the Scala programming language, but you could use the Spark-Redis library with Java or Python as well.

[Click on the image to enlarge it]

Figure 2. Solution Architecture

This flow diagram looks quite straightforward: first the system ingests the data to a Redis Stream, then it consumes the data as a Spark process and aggregates results back to Redis, and finally it queries the results in Redis using the Spark-SQL interface.

  1. Data Ingest: I chose Redis Streams for data ingest because it’s a built-in data structure in Redis that can handle over a million read and write operations per second. In addition, it automatically orders data according to time, and supports consumer groups that streamline how data is read. The Spark-Redis library supports Redis Streams as a data source, so it perfectly fits our need for the streaming database to work with the Apache Spark Engine.
  2. Data Processing: The Structured Streaming API in Apache Spark is a great choice for our data processing, and the Spark-Redis library enables us to transform data arriving in Redis Streams into DataFrames. With Structured Streaming, we can run our queries either in micro-batches or in Spark’s continuous processing mode. We can also develop a custom ‘writer’ that lets us write the data to our preferred destination. As shown in Figure 2, we will write the output to Redis using a Hash data structure.
  3. Data Querying: The Spark-Redis library allows you to map native Redis data structures as DataFrames. We can declare a ‘temporary table’ that maps columns to specific keys of a Hash data structure, and since Redis is blazingly fast with sub-millisecond latency, we can use real-time querying capabilities with Spark-SQL.

Now, I’ll walk you through how to develop and run each component of our solution. But first, let’s initialize our development environment with appropriate tools.

Finding the right development tools

In our example, we will use the Homebrew package manager to download and install software on macOS, although you can choose other package managers depending on your operating system.

1. Redis 5.0 or higher: First, we need to download and install Redis 5.x in our environment. Previous versions of Redis do not support Redis Streams.

On Homebrew, we install and start Redis 5.0 as:

$ brew install Redis
$ brew services start Redis

If we’re running an older version of Redis, we can upgrade Redis:

$ brew upgrade Redis

2. Apacke Spark 2.3 or higher: Next, let’s download and install Apache Spark from its website, or install it using Homebrew:

$ brew install apache-spark

3. Scala 2.12.8 or higher: We’ll do the same for Scala

$ brew install scala

4. Apache Maven: And we’ll need Maven to build our Spark-Redis library.

$ brew install maven

5. JDK 1.8 or higher: We can download and install this JDK from Oracle’s website or from Homebrew, by running the command shown below. For the latest version of JDK, we’ll need to replace java8 with java.

$ brew cask install java8

6. Spark-Redis library: This is the core piece of our solution, so let’s download the library from GitHub and build the package as shown below:

$ git clone https://github.com/RedisLabs/spark-redis.git
$ cd spark-redis
$ mvn clean package -DskipTests

This will result in spark-redis-<version>-jar-with-dependencies.jar under ./target/ directory. In my setup, the file is, spark-redis-2.3.1-SNAPSHOT-jar-with-dependencies.jar

7. SBT 1.2.8 or higher: SBT is a Scala build tool that simplifies organizing and building Scala files.

$ brew install sbt

8. Development Environment: Finally, let’s set up our folder structure and build file. For this example, we will code our programs under the directory, ‘scala’.

$ mkdir scala
$ cd ./scala

Create a new file, build.sbt, with the following content:

name := "RedisExample"

version := "1.0"

scalaVersion := "2.12.8" 

val sparkVersion = "2.4.0"

libraryDependencies ++= Seq(
        "org.apache.spark" %% "spark-core" % sparkVersion,
        "org.apache.spark" %% "spark-sql" % sparkVersion,
        "org.apache.spark" %% "spark-catalyst" % sparkVersion
)

Initialize the directory. The following command initializes the package directory:

$ mkdir ./src/main/scala/
$ mkdir ./lib
$ sbt package

Copy spark-redis-<version>-jar-with-dependencies.jar to the lib directory.

Building our click-counting solution

As described in the architecture section, our solution has three parts – the data ingest component, a data processor inside the Spark engine, and our data query interface. In this section, I’ll take a deep dive into each of these and show a working solution.

1. Ingest to a Redis Stream

Redis Streams is an append-only data structure. Assuming the data will be consumed by Apache Spark’s continuous processing unit, we can cap the number of messages at one million. Let’s slightly modify the command shown earlier to run:

XADD clicks MAXLEN ~ 1000000 * asset aksh1hf98qw7tt9q7 cost 29

Most popular Redis clients support Redis Streams, so depending on your programming language, you could choose redis-py for Python, Jedis or Lettuce for Java, node-redis for Node.js, and so on.

[Click on the image to enlarge it]

Figure 3. Data Ingest

2. Process the data Processing

I’ll divide this section into three subsections to fully cover this part of our solution:

  • A.    Reading and processing data from Redis Stream
  • B.    Storing the results in Redis
  • C.    Running the program

[Click on the image to enlarge it]

Figure 4. Data Processing

A. Reading data from Redis Streams

To read data from Redis Streams in Spark, we need to establish how to connect to Redis, as well as the schema structure of the data in Redis Streams.

To connect to Redis, we must create a new SparkSession with connection parameters for Redis:

val spark = SparkSession
            .builder()
            .appName("redis-example")
            .master("local[*]")
            .config("spark.redis.host", "localhost")
            .config("spark.redis.port", "6379")
            .getOrCreate()

To set up schema structure, we can name our stream, "clicks", and set an option for "stream.keys" as "clicks". Since each of our stream elements contain an asset and the cost associated with it, we will create StructType of an Array with two StructFields – one for "asset" and the other for "cost" as shown below:

val clicks = spark
             .readStream
             .format("redis")
             .option("stream.keys","clicks")
             .schema(StructType(Array(
                     StructField("asset", StringType),
                     StructField("cost", LongType)
             )))
             .load()

In our first program, we are interested in the number of clicks per asset. Therefore, let’s create a DataFrame that will contain data grouped by asset count:

val byasset = clicks.groupBy("asset").count

Our last step is to start the query as a Structured Stream query:

val query = byasset
            .writeStream
            .outputMode("update")
            .foreach(clickWriter)
            .start()

Note that we are using our own ForeachWriter to write back the results to Redis. If you want the output to go to the console, you could write the query as:

val query = byasset
            .writeStream
            .outputMode("update")
            .format("console")
            .start()

For continuous processing, we want to add a ‘trigger’ command to our query as .trigger(Trigger.Continuous("1 second")). The trigger command doesn’t work for aggregate queries, so we cannot insert it in our example.

Below is our complete program that reads new click data from Redis Streams and processes it using Spark’s Structured Streaming APIs. If you’d like to try this in your environment, save the program under src/main/scala as ClickAnalysis.scala. (If your Redis server is not running locally on port 6379, be sure to set the appropriate connection parameters.)

// Program: ClickAnalysis.scala
//
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import com.redislabs.provider.redis._

object ClickAnalysis {
    def main(args: Array[String]): Unit = {
         val spark = SparkSession
                     .builder()
                     .appName("redis-example")
                     .master("local[*]")
                     .config("spark.redis.host", "localhost")
                     .config("spark.redis.port", "6379")
                     .getOrCreate()

         val clicks = spark
                     .readStream
                     .format("redis")
                     .option("stream.keys","clicks")
                     .schema(StructType(Array(
                           StructField("asset", StringType),
                           StructField("cost", LongType)
                      )))
                      .load()
          val byasset = clicks.groupBy("asset").count
          
          val clickWriter : ClickForeachWriter =
new ClickForeachWriter("localhost","6379")
          
          val query = byasset
                      .writeStream
                      .outputMode("update")
                      .foreach(clickWriter)
                      .start()

          query.awaitTermination()

     } // End main
} //End object

B. Storing the results in Redis

To write the results back to Redis, we can develop a custom ForeachWriter called ClickForeachWriter. This extends ForeachWriter and connects to Redis using Jedis, the Java client for Redis. Here is the complete program, saved as ClickForeachWriter.scala:

// Program: ClickForeachWriter.scala
//
import org.apache.spark.sql.ForeachWriter
import org.apache.spark.sql.Row
import redis.clients.jedis.Jedis

class ClickForeachWriter(p_host: String, p_port: String) extends 
ForeachWriter[Row]{

    val host: String = p_host
    val port: String = p_port

    var jedis: Jedis = _

    def connect() = {
        jedis = new Jedis(host, port.toInt)
    }

    override def open(partitionId: Long, version: Long):
 Boolean = {
        return true
    }

    override def process(record: Row) = {
        var asset = record.getString(0);
        var count = record.getLong(1);
        if(jedis == null){
            connect()
        }

        jedis.hset("click:"+asset, "asset", asset)
        jedis.hset("click:"+asset, "count", count.toString)
        jedis.expire("click:"+asset, 300)
    }

    override def close(errorOrNull: Throwable) = {
    }
}

There is one important thing to note in this program: it stores the results in a Hash data structure, whose key follows the syntax, "click:<asset id>". I will transform this structure to be used as a DataFrame in the last section of the article. Another thing to point out is the key expiration, which is totally optional. Here I show how you could extend the life of the key by five minutes every time a click is recorded.

C. Running the program

Before we run the program, we’ll first need to compile our programs. To do this, let’s go to our home directory (the directory where we have stored build.sbt) and run the command:

$ sbt package

Our programs should compile with no errors. If you find errors, fix them and re-run the sbt package. Then, to start the program, stay in the same directory and run the following command:

spark-submit --class ClickAnalysis --jars 
./lib/spark-redis-2.3.1-SNAPSHOT-jar-with-dependencies.jar  
--master local[*] ./target/scala-2.12/redisexample_2.12-1.0.jar

If you don’t like the debug messages, you can stop your program (ctrl-c) and edit log4j.properties under /usr/local/Cellar/apache-spark/2.4.0/libexec/conf/ (or wherever the log4j.properties file is stored in your environment) and change log4j.rootCategory to WARN as below:

log4j.rootCategory=WARN, console

This program will automatically pull messages from our Redis Stream. If there are no messages in the Redis Stream, it will listen for new messages asynchronously. We can start redis-cli in a different console and add a message to our Redis Stream to test whether the app is consuming messages properly:

$ redis-cli
redis-cli> XADD clicks * asset test cost 100

If everything has gone well, we should be able to read the results in the Hash data structure as:

redis-cli> hgetall click:test
1) "asset"
2) "test"
3) "count"
4) "1"

3. Query the data: Reading Redis data as DataFrames

This last component of our solution essentially provides an SQL interface to our Redis data. Reading the data via SQL commands is once again a two-step process: a. we define the SQL schema for Redis data, b. we run the SQL command.

[Click on the image to enlarge it]

Figure 5. Data Querying

But before all this, we need to run spark-sql on our console from our home directory as shown below:

$ spark-sql --jars 
./lib/spark-redis-2.3.1-SNAPSHOT-jar-with-dependencies.jar

This will take us to a spark-sql prompt as below:

spark-sql>

Now, we define the SQL schema for the data stored in the Redis Hash data structure. If you remember, we’re storing data for each asset in a Hash data structure denoted by the key, click:<asset id>. The Hash has our other key, count. The command to create the schema and map it to the Redis Hash data structure is:

spark-sql> CREATE TABLE IF NOT EXISTS clicks(asset STRING, count 
INT) USING org.apache.spark.sql.redis OPTIONS (table 'click')

This command creates a new table view called "clicks". It uses the directives specified in the Spark-Redis library to map columns "asset" and "count" to the respective fields in Hash. Now we can run the query as:

spark-sql> select * from clicks;
test    1
Time taken: 0.088 seconds, Fetched 1 row(s)

If you want to run our SQL queries programmatically, refer to the documentation provided by Apache Spark on how to connect to the Spark engine using ODBC/JDBC drivers.

What did we accomplish?

In this article, I demonstrated how to use Redis Streams as a data source for the Apache Spark engine, as well as how Redis Streams can power a Structured Streaming use case. I also showed how to read Redis data using the DataFrames API in Apache Spark, and put the independent, stand-alone Structured Streaming and DataFrames concepts together to show what you can achieve using the Spark-Redis library.

Redis Streams simplifies the task of collecting and distributing data at a high speed. By combining it with Structured Streaming in Apache Spark, you can power all kinds of solutions that require real-time computations for scenarios ranging from IoT, fraud detection, AI and machine learning, real-time analytics, and so on.

About the Author

Roshan Kumar is a senior product manager at Redis Labs, Inc. He has extensive experience in software development and product management in the technology sector. In the past, Kumar has worked at Hewlett-Packard, and a few successful Silicon Valley startups. He holds a bachelor’s degree in Computer Science, and an MBA from Santa Clara University, California, USA.

 

 

Rate this Article

Adoption
Style

Hello stranger!

You need to Register an InfoQ account or or login to post comments. But there's so much more behind being registered.

Get the most out of the InfoQ experience.

Allowed html: a,b,br,blockquote,i,li,pre,u,ul,p

Community comments

  • Great tutorial !

    by Mariano Ruiz /

    Your message is awaiting moderation. Thank you for participating in the discussion.

    Great tutorial , very practical

  • Scaling streams

    by Shivakumar Ramagopal /

    Your message is awaiting moderation. Thank you for participating in the discussion.

    Roshan,

    Great write-up on using Redis and Spark for streaming and stream processing! I'm a heavy user of Redis and look forward to using the stream functionality in the near future.

    Question: How is scaling of readers/writers handled in Redis, especially in cluster mode?

    Thanks,
    Shiva

  • Re: Scaling streams

    by Roshan Kumar /

    Your message is awaiting moderation. Thank you for participating in the discussion.

    Shiva,

    Thanks for your kind words. Feel elated :-).

    To answer your question, Stream in Redis is just another data structure (just like List, Set, Sorted Set, Hash, etc.). Therefore, scaling with Stream is same as scaling with the other data structures in Redis. With that said, you have a few options if you are running Redis in a cluster mode:

    1. You can control how to distribute your keys across multiple shards. Read more about that here: redis.io/topics/cluster-spec. You may be interested in reading "Keys hash tags" section of the article.

    2. If you are using Redis Enterprise, then it automates some of the steps that you would otherwise do manually. Read more about it here: redislabs.com/redis-enterprise/technology/linea...

    Let me know if that answered your question.

    Regards,
    Roshan

Allowed html: a,b,br,blockquote,i,li,pre,u,ul,p

Allowed html: a,b,br,blockquote,i,li,pre,u,ul,p

BT

Is your profile up-to-date? Please take a moment to review and update.

Note: If updating/changing your email, a validation request will be sent

Company name:
Company role:
Company size:
Country/Zone:
State/Province/Region:
You will be sent an email to validate the new email address. This pop-up will close itself in a few moments.