BT

Your opinion matters! Please fill in the InfoQ Survey!

Video Stream Analytics Using OpenCV, Kafka and Spark Technologies

| Posted by Amit Baghel Follow 0 Followers , reviewed by Srini Penchikala Follow 15 Followers on Sep 02, 2017. Estimated reading time: 14 minutes | NOTICE: QCon.ai - Applied AI conference for Developers Apr 9-11, 2018, San Francisco. Join us!

A note to our readers: As per your request we have developed a set of features that allow you to reduce the noise, while not losing sight of anything that is important. Get email and web notifications by choosing the topics you are interested in.

Key Takeaways

  • For reliable handling and efficient processing of large scale video stream data, there is a need for a scalable, fault tolerant and loosely coupled distributed system.  
  • The sample application in this article uses open source technology - OpenCV, Kafka and Spark - to build such a system. Amazon S3 or HDFS can be used for storage.
  • The system comprises three major components - a Video Stream Collector, a Stream Data Buffer, and a Video Stream Processor.  
  • The Video Stream Collector works with a cluster of IP cameras which provide live feeds of streaming data of video content and uses the OpenCV video processing library to convert video stream into frames passing the data in JSON to the Kafka Broker used for the Stream Data Buffer component. 
  • The Video Stream Processor component is built on Apache Spark and again uses OpenCV for processing video stream data.

Technology has brought an unprecedented explosion in unstructured data. Sources like mobile devices, websites, social media, scientific apparatus, satellites, IoT devices, and surveillance cameras are generating a vast number of images and videos every second.

Managing and efficiently analysing this data is a challenge. Consider a city’s network of video-surveillance cameras. It is impractical and inefficient to monitor every camera’s video stream to discover any objects or events of interest. Instead, computer vision (CV) libraries process these video streams and provide intelligent video analytics and object detection.

Traditional CV systems have limitations, however. In traditional video analytics systems, the server with the CV library collects and processes data at the same time and so a server failure therefore loses the video streaming data. Detecting a node failure and switching the processing to another node may result in fragmented data.

Many tasks drive the use of big-data technologies in video stream analytics: parallel and on-demand processing of large-scale video streams, extracting a different set of information from a video frame, analysing the data with different machine learning libraries, piping the analysed data to different components of application for further processing, and outputting the processed data in different formats.

Video Stream Analytics - Motion Detection

To reliably handle and efficiently process large-scale video stream data requires a scalable, fault-tolerant, loosely coupled distributed system. The video stream analytics discussed in this article is designed on these principles.

Types of video stream analytics include:

  • object tracking,
  • motion detection,
  • face recognition,
  • gesture recognition,
  • augmented reality, and
  • image segmentation.

The use case of this article’s sample application is motion detection in a video stream.

Motion detection is the process of finding the change in position of an object (often a human) relative to its surroundings. It is used mostly in video-surveillance systems that continuously monitor a specific area. An algorithm provided by the CV libraries analyses the video feed sent by such a camera and looks for any motion. Detecting motion triggers an event that can send a message to an application or alert the user.

This article’s application for video stream analytics has three main components:

  • a video stream collector,
  • a stream data buffer, and
  • a video stream processor.

The video stream collector receives the video stream data from a cluster of IP cameras. This component serializes the video frames to stream data buffer, which is a fault-tolerant data queue for streaming video data. The video stream processor consumes the stream data from buffer and processes it. This component will apply video-processing algorithms to detect motion in the video-stream data. Finally, the processed data or image files will be stored in a S3 bucket or HDFS directory. This video-stream-processing system has been designed using OpenCV, Apache Kafka, and Apache Spark frameworks.

Brief Details of OpenCV, Kafka and Spark

Here are a few details on the relevant frameworks.

OpenCV

OpenCV (Open Source Computer Vision Library) is an open-source BSD-licensed library. This library is written in C++ but provides a Java API as well. OpenCV includes several hundred CV algorithms that can be used for processing and analysing image and video files. Please check this document for more details.

Kafka

Apache Kafka is a distributed streaming platform that provides a system for publishing and subscribing to streams of records. These records can be stored in fault-tolerant way and consumers can process the data. Please check the documentation.

Spark

Apache Spark is a fast, generalised cluster-computing system. It provides modules for SQL and structured data processing, MLlib for machine learning, GraphX for graph processing, and Spark Streaming. There’s more detail here.

System Architecture

The architecture diagram of video stream analytics system is illustrated in Figure 1 below.

Figure 1. Video Stream Analytics System Architecture Diagram

Design and Implementation

The code for this application example is available at GitHub.

The following sections provide design and implementation details of the video stream collector, stream data buffer, and video stream processor in the sample application.

Video Stream Collector

The video stream collector works with a cluster of IP cameras that provide live video feeds. The component must read the feed from each camera and convert the video stream into a series of video frames. To distinguish each IP camera, the collector maintains the mapping of camera ID and URL with camera.url and camera.id properties in a stream-collector.properties file. These properties can have comma-separated lists of camera URLs and IDs. Different cameras may provide data with different specifications such as the codec, resolution, or frames per second. The  collector must retain these details while creating frames from the video stream.

The video stream collector uses the OpenCV video-processing library to convert a video stream into frames. Each frame is resized to the required processing resolution (e.g. 640x480). OpenCV stores each frame or image as a Mat object. Mat needs to be converted in serialise-able (byte-array) form by keeping intact the details of frame — i.e. rows, columns, and type. The video stream collector uses the following JSON message structure to store these details.

{"cameraId":"cam-01","timestamp":1488627991133,"rows":12,"cols":15,"type":16,"data":"asdfh"}

cameraId is the unique ID of the camera. timestamp is the time at which the frame was generated. rows, cols, and type are OpenCV Mat-specific details. data is a base-64 encoded string for the byte array of the frame.

The video stream collector uses the Gson library to convert the data to JSON messages, which are published  in the video-stream-event topic. It sends the JSON messages to the Kafka broker using the KafkaProducer client. KafkaProducer sends data into the same partition for each key and order of these messages is guaranteed.

JsonObject obj = new JsonObject();
obj.addProperty("cameraId",cameraId);
obj.addProperty("timestamp", timestamp);
obj.addProperty("rows", rows);
obj.addProperty("cols", cols);
obj.addProperty("type", type);
obj.addProperty("data", Base64.getEncoder().encodeToString(data));  
String json = gson.toJson(obj);
producer.send(new ProducerRecord<String, String>(topic,cameraId,json),new EventGeneratorCallback(cameraId));

Figure 2. Code snippet for sending images as JSON messages

Kafka is primarily designed for text messages of small sizes but a JSON message comprising the byte array of a video frame will be large (e.g. 1.5 MB), so Kafka will require configuration changes before it can process these larger messages. The following KafkaProducer properties need to be adjusted:

  • batch.size
  • max.request.size
  • compression.type

Please see the Producer Configs section of the Kafka documentation and the code and property files at the GitHub project for more details.

Stream Data Buffer

To process a huge amount of video stream data without loss, it is necessary to store the stream data in temporary storage. The Kafka broker works as a buffer queue for the data that the video stream collector produces. Kafka uses the file system to store the messages, and the length of time it retains these messages is configurable.

Keeping the data in storage before processing ensures its durability and improves the overall performance of the system as processors can process data at different times and at different speeds depending on the load. This improves the reliability of the system when the rate of data production exceeds the rate of data processing.

Kafka guarantees the order of messages in a single partition for a given topic. This is extremely helpful for processing data when the order of the data is important. To store large messages, the following configurations might need to be adjusted in the server.properties file of the Kafka server:

  • message.max.bytes
  • replica.fetch.max.bytes

Please see the Broker Configs section of the Kafka documentation for more details about these properties.

Video Stream Processor

The video stream processor performs three steps:

  1. Read the JSON messages from the Kafka broker in the form of a VideoEventData dataset.
  2. Group the VideoEventData dataset by camera ID and pass it to the video stream processor.
  3. Create a Mat object from the JSON data and process the video stream data.

The video stream processor is built on Apache Spark. Spark provides a Spark Streaming API, which uses a discretized stream or DStream, and a new Structured Streaming API based on a dataset. This application’s video stream processor uses the Structured Streaming API to consume and process JSON messages from Kafka. Please note this application processes structured data in the form of JSON messages and the unstructured video data is an attribute of these JSON messages that the video stream processor will process. The Spark documentation states "Structured Streaming provides fast, scalable, fault-tolerant, end-to-end exactly-once stream processing without the user having to reason about streaming." This is why the video stream processor is designed around Spark’s Structured Streaming. The Structured Streaming engine provides built-in support for structured text data and state management for aggregation queries. This engine also provides features like processing of non-aggregate queries and external state management of datasets (a new feature in Spark 2.2.0).

To process large messages, the following Kafka consumer configurations must be passed to the Spark engine:

  • max.partition.fetch.bytes
  • max.poll.records

Please see the Consumer Configs section of the Kafka documentation for more about these properties.

The main class for this component is VideoStreamProcessor. This class first creates a SparkSession object that is the entry point for working with the Spark SQL engine. The next step is to define a schema for incoming JSON messages so that Spark can use this schema to parse the string format of a message into JSON format. Spark’s bean encoder can transform this into Dataset<VideoEventData>. VideoEventData is a Java bean class that holds the data of JSON message.

Dataset<VideoEventData> ds = spark.readStream().format("kafka")
.option("kafka.bootstrap.servers",prop.getProperty("kafka.bootstrap.servers"))
.option("subscribe",prop.getProperty("kafka.topic"))
.option("kafka.max.partition.fetch.bytes",prop.getProperty("kafka.max.partition.fetch.bytes"))
.option("kafka.max.poll.records", prop.getProperty("kafka.max.poll.records"))
.load().selectExpr("CAST(value AS STRING) as message")
.select(functions.from_json(functions.col("message"),schema).as("json"))
.select("json.*").as(Encoders.bean(VideoEventData.class));   

Figure 3. Code snippet for processing of kafka messages by spark streaming

Next, groupByKey groups the dataset by camera ID to get KeyValueGroupedDataset<String, VideoEventData>.  It uses a mapGroupsWithState transformation to work on a group of VideoEventData (Iterator<VideoEventData>) for the current batch of video frames that are grouped by camera ID. This transformation first checks that the last processed VideoEventData (video frame) is present and passes that to the video processor for next step of processing. After video processing, the last processed VideoEventData (video frame) is returned from the video processor and the state updates. To start the streaming application, the writeStream method is called on the dataset with console sink and update output mode.

The video stream processor uses the OpenCV library to process video stream data. Our application is meant to detect motion; VideoMotionDetector is the class with the logic for detecting motion in a series of frames. The first step in this process is to sort the list of VideoEventData (Iterator<VideoEventData>) by timestamp for a given camera ID to compare video frames in order. The next step is to iterate the sorted list of VideoEventData objects and convert them to an OpenCV Mat object. If the last processed video frame is available, then it uses that as the first video frame for processing the current series of frames. VideoMotionDetector compares two consecutive frames and detects the differences using an API provided by the OpenCV library. If it finds differences that exceed defined criteria, these are considered to be motion. VideoMotionDetector will save this detected motion in form of image file to a preconfigured S3 bucket or HDFS directory. This image file can undergo further processing by another application or VideoMotionDetector can trigger an event to notify a user or application it has detected motion.

Please read the property file and code at GitHub for more details.

Technologies and Tools

The following table shows the technologies and tools used for this video stream analytics system.

Tools and Technology

Version

Download URL

JDK

1.8

http://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html

Maven

3.3.9

https://maven.apache.org/download.cgi

ZooKeeper

3.4.8

https://zookeeper.apache.org/releases.html

Kafka

2.11-0.10.2.0

http://kafka.apache.org/downloads.html

Spark

2.2.0

http://spark.apache.org/downloads.html

OpenCV

3.2.0

http://opencv.org/releases.html

Please refer to the documentation for installing and configuring these tools. Kafka documentation and Spark documentation provide the details about how to set up and run applications in standalone mode or in cluster mode. To install OpenCV, refer to the OpenCV documentation. OpenCV can also be installed from Anaconda.

Build and Deploy

This section details how to build and run the video stream collector and video stream processor components of the sample application. This application can be used to process both offline video files and live camera feeds but here is configured to analyse an offline sample video file. Please follow these steps to build and run this application:

1. Download and install the tools listed in the table above. Make sure ZooKeeper and Kafka servers are up and running.

2. This application uses OpenCV native libraries (.dll or .so) and loads them using the System.loadLibrary()method. Set the directory path for these native libraries in the system environment variable or pass this path as a command line argument. For example, for a 64-bit Windows machine, the path of the native library file (opencv_java320.dll and opencv_ffmpeg320_64.dll) will be {OpenCV Installation Directory} \build\java\x64.

3. The stream-collector.properties file has the Kafka topic as video-stream-event. Create this topic and partitions in Kafka. Use the kafka-topic command to create the topic and partitions.

kafka-topics.sh --create --zookeeper localhost:2181 --topic video-stream-event --replication-factor 1 --partitions 3

4. The stream-processor.properties file has a processed.output.dir property, which is the directory path for saving processed images. Create and set the directory path for this property.

5. The stream-collector.properties file has a camera.url property that holds the path or URL of a video file or video source. Make sure the path or URL is correct.

6. Check log4j.properties files for both VideoStreamCollector and VideoStreamProcessor components and set the directory path for stream-collector.log and stream-processor.log files. Check these log files for the log messages that the application generates, which can help in case of errors while running the application.

7. This application uses OpenCV APIs from the OpenCV JAR file, but the OpenCV JAR file is not available at the Maven central repository. This application is bundled with the OpenCV JAR file that can be installed to a local Maven repository. In the pom.xml file, maven-install-plugin has been configured and associated with the clean phase for installing this JAR file. To install the OpenCV JAR in a local Maven repository go to the video-stream-processor folder and execute this command.

mvn clean

8. To keep the application logic simple, VideoStreamProcessor processes only new messages. The VideoStreamProcessor component should be up and running before starting the VideoStreamCollector component. To run VideoStreamProcessor using Maven, go to the video-stream-processor folder and execute this command.

mvn clean package exec:java -Dexec.mainClass="com.iot.video.app.spark.processor.VideoStreamProcessor"

9. Once VideoStreamProcessor has started, start the VideoStreamCollector component. Go to the video-stream-collector folder and execute this command.

mvn clean package exec:java -Dexec.mainClass="com.iot.video.app.kafka.collector.VideoStreamCollector" -Dexec.cleanupDaemonThreads=false

The GitHub project is bundled with a sample.mp4 video file. The URL and ID of this video file have been configured as camera.url and camera.id properties of the stream-collector.properties file. After processing the video file, the images will be saved in pre-configured directory (step 4). Figure 4 shows the sample output of this application.

Figure 4. Sample output for motion detection

This application can configure and process multiple video sources (offline and live feed). For example, to add webcam feeds along with sample.mp4, edit the stream-collector.properties file and add integers (0 for first webcam, 1 for second webcam, and so on ) separated by commas in the camera.url property and add the corresponding camera IDs (cam-01, cam-02, and so on) separated by commas in the camera.id property. Here’s an example:

camera.url=../sample-video/sample.mp4,0  

camera.id=vid-01,cam-01

Conclusions

Large-scale video analytics of video streams requires a robust system backed by big-data technologies. Open-source technologies like OpenCV, Kafka, and Spark can be used to build a fault-tolerant and distributed system for video stream analytics. We used OpenCV and Kafka to build a video stream collector component that receives video streams from different sources and sends them to a stream data buffer component. Kafka serves as the stream data buffer component that provides durable storage of streaming data. The video stream processor component is developed using OpenCV and Spark’s Structured Streaming. This component receives streaming data from the stream data buffer and analyses that data. The processed files are stored in a preconfigured HDFS or S3 bucket. We used motion detection as a use case to demonstrate video stream analytics with a sample application.

References

About the Author

Amit Baghel is a Software Architect with over 16 years of experience in design and development of enterprise applications and products around Java ecosystem. His current focus is on IoT, Cloud Computing, Big Data Solutions, Micro Services, DevOps and Continuous Integration and Delivery. Amit Baghel can be reached via e-mail.

Rate this Article

Adoption Stage
Style

Hello stranger!

You need to Register an InfoQ account or or login to post comments. But there's so much more behind being registered.

Get the most out of the InfoQ experience.

Tell us what you think

Allowed html: a,b,br,blockquote,i,li,pre,u,ul,p

Email me replies to any of my messages in this thread

Great Work by Dilip Mudireddy

Wonderful post, thank you so much.

Error while opening the video file by Dilip Mudireddy

Hi Amit,

I am getting error with opening the file, though I have specified correct location.

----------------------------------------------------------------------------------------------------
//check camera working
if (!camera.isOpened()) {
Thread.sleep(5000);
if (!camera.isOpened()) {
throw new Exception("Error opening cameraId "+cameraId+" with
url="+url+".Set correct file path or url in camera.url key of property file.");
}
}
----------------------------------------------------------------------------------------------------

2017-10-31 18:09:43 ERROR VideoEventGenerator:52 - Error opening cameraId vid-01 with url=C:/sample-video/sample.mp4.Set correct file path or url in camera.url key of property file.

Please find my stream-processor.properties file below.
=======================================================
# Kafka properties
kafka.bootstrap.servers=localhost:9092
kafka.acks=all
kafka.retries=1
# 20 MB
kafka.batch.size=20971520
kafka.linger.ms=5
kafka.compression.type=gzip
# 2 MB
kafka.max.request.size=2097152
kafka.topic=video-stream-event
# Comma separated list of camera url. Example : /tmp/sample.mp4,0
camera.url=C:/sample-video/sample.mp4
# Comma separated list of camera url. Example : vid-01,cam-01
camera.id=vid-01
========================================================

Thanks,
Dilip M

Re: Error while opening the video file by Dilip Mudireddy

Issue got resolved by copying opencv_ffmpeg320_64.dll to C:\opencv\build\java\x64 from C:\opencv\build\bin\.

Allowed html: a,b,br,blockquote,i,li,pre,u,ul,p

Email me replies to any of my messages in this thread

Allowed html: a,b,br,blockquote,i,li,pre,u,ul,p

Email me replies to any of my messages in this thread

3 Discuss

Login to InfoQ to interact with what matters most to you.


Recover your password...

Follow

Follow your favorite topics and editors

Quick overview of most important highlights in the industry and on the site.

Like

More signal, less noise

Build your own feed by choosing topics you want to read about and editors you want to hear from.

Notifications

Stay up-to-date

Set up your notifications and don't miss out on content that matters to you

BT