BT

Facilitating the Spread of Knowledge and Innovation in Professional Software Development

Write for InfoQ

Topics

Choose your language

InfoQ Homepage Articles Traffic Data Monitoring Using IoT, Kafka and Spark Streaming

Traffic Data Monitoring Using IoT, Kafka and Spark Streaming

Bookmarks

High Performance Architecture for the Internet of Things

Learn about some of the most common challenges associated with the real-time processing and storage of IoT-generated data; the common technology components, including in-memory computing technologies; and how they fit into an IoT architecture. Download Now.

 

Sponsored Content

Key takeaways

  • Introduction - IoT Connected Vehicles and Apache Spark
  • Technologies and Tools for this application
  • Application to generate IoT Data events using Apache Kafka
  • Application to process IoT Data Streams using Spark Streaming.Process and transform IoTData events into Total traffic count, Window traffic count and POI traffic detail
  • Build IoT Traffic Data Monitoring Dashboard using Spring Boot, Web socket, jQuery, Sockjs and Bootstrap

 

IoT and Connected Vehicles

Internet of Things (IoT) is an emerging disruptive technology and is becoming an increasing topic of interest among technology giants and business communities. IoT Components are interconnected devices over the network, which are embedded with sensors, software and smart apps so they can collect and exchange data with each other or with cloud/data centres. The data generated by IoT devices is large in volume and random in nature and needs to be analysed using a big data analytics engine in order to extract the critical information or to understand the user behavioural patterns.

One of the areas in which IoT is paving its way is the connected vehicles. According to Gartner, by 2020, there will be a quarter-billion connected vehicles on the road, which are more automated, providing new in-vehicle services such as enhanced navigation system, real-time traffic updates, weather alerts and integration with monitoring dashboard and smart phones. In order to process the data generated by IoT connected vehicles, data is streamed to big data processors located in the cloud or the data centres. An IoT connected vehicle provides real time information of the vehicle like speed, fuel level, route name, latitude and longitude of vehicle etc. This information can be analysed and data can be extracted and transformed to the final result which can be sent back to the vehicle or to a monitoring dashboard. For example, using the information collected for different vehicles we can analyse and monitor the traffic on a particular route. In this article, we’ll use Apache Spark to analyse and process IoT connected vehicle’s data and send the processed data to a real time traffic monitoring dashboard.

What is Spark

Spark is an open source, fast and general purpose big data processing system for cluster computing.  Spark can run in standalone mode, Hadoop YARN or Apache Mesos. To develop applications, Spark provides rich API in Scala, Java and Python. Apart from Spark core engine, Spark comes with several libraries which provides API for parallel computing.

  • Spark SQL- API to run SQL like queries on dataset.
  • Spark MLlib – Machine learning library.
  • Spark GraphX – API for graph computing.
  • Spark Streaming – API to process real time streaming data.

The machine in which spark application (Spark Context) runs is called Driver node. Driver executes various parallel operations on worker nodes or cluster. Spark uses concept of a Resilient Distributed Dataset (RDD), which represents a read-only collection of objects partitioned across a set of machines that can be rebuilt if a partition is lost. For more details, refer to the article "Big Data Processing with Apache Spark – Part 1: Introduction". 

Spark Streaming

Spark Streaming is an extension of Spark Core which provides capabilities of fault tolerant processing of live stream data. Spark streaming divides the incoming stream into micro batches of specified intervals and returns Dstream. Dstream represents continuous stream of data ingested from sources like Kafka, Flume, Twitter, or HDFS. Dstreams are processed and pushed out to filesystems, databases, and live dashboards. Refer to the article “Big Data Processing with Apache Spark - Part 3: Spark Streaming” for more details.

Kafka

Apache Kafka is high-throughput distributed messaging system in which multiple producers send data to Kafka cluster and which in turn serves them to consumers. It is a distributed, partitioned, replicated commit log service. Please refer to the article Apache Kafka: Next Generation Distributed Messaging System for more details.

Traffic Data Monitoring Application

The application we are going to develop is an IoT data processing and monitoring application using Spark Streaming. This application will process real time IoT data sent by connected vehicles and use that data to monitor the traffic on different routes. We will divide this application into following three modules. These modules are standalone Maven applications written in Java and can be built and run independently.

IoT Data Producer: Connected vehicles generate IoT messages which are captured by a message broker and are sent to the streaming application for processing. In our sample application, IoT data producer is a simulator application for connected vehicles and uses Apache Kafka to generate IoT data events.

IoT Data Processor: This is a Spark Streaming application which consumes IoT data streams and processes them for traffic data analysis. IoT data processor provides the following metrics:

  • Get the total vehicle count for different types of vehicles on different routes and store them in a Cassandra database.
  • Get vehicle count for the window of last 30 seconds, for different types of vehicles on different routes and store them in the Cassandra database.
  • Get the details of vehicles which are in the radius of a given Point of Interest (POI) and store the details in Cassandra database.

IoT Data Dashboard: This is a Spring Boot application which will retrieve data from the Cassandra database and send it to the web page. This application uses Web Sockets and jQuery to push the data to the web page in fixed intervals so data will be refreshed automatically. Dashboard displays data in charts and tables. This web page uses bootstrap.js for responsive web design, so it’s accessible on desktop as well as mobile devices.

The architecture diagram for traffic data monitoring Application is illustrated in Figure 1 below. 

(Click on the image to enlarge it)

Figure 1. Traffic Data Monitoring Application Architecture Diagram

Technologies and Tools

Following table shows the technologies and tools used in the traffic data monitoring application.

Tools and Technology

Version

Download URL

JDK

1.8

http://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html

Maven

3.3.9

https://maven.apache.org/download.cgi

ZooKeeper

3.4.8

https://zookeeper.apache.org

Kafka

2.10-0.10.0.0

http://kafka.apache.org/downloads.html

Cassandra

2.2.6

http://cassandra.apache.org/download/

Spring Boot

1.3.5

https://mvnrepository.com/artifact/org.springframework.boot/spring-boot/1.3.5.RELEASE

Spark

1.6.2 Pre-built for Hadoop 2.6

http://spark.apache.org/downloads.html

Download and install the tools and set the environment variables as given in the installation document of respective tools. Below are the few setups we need for this application.

  • Create a topic called “iot-data-event” for this application using below Kafka command.
    bin/kafka-topics.sh --create --zookeeper localhost:2181 -
    -replication-factor 1 --partitions 1 --topic iot-data-
    event

     

  • Create tables in Cassandra database using below commands.
    CREATE KEYSPACE IF NOT EXISTS TrafficKeySpace WITH 
    replication = {'class':'SimpleStrategy', 
    'replication_factor':1};
    
    CREATE TABLE TrafficKeySpace.Total_Traffic (routeId text, 
    vehicleType text, totalCount bigint, timeStamp 
    timestamp,recordDate text,PRIMARY KEY 
    (routeId,recordDate,vehicleType));
    
    CREATE TABLE TrafficKeySpace.Window_Traffic (routeId 
    text, vehicleType text, totalCount bigint, timeStamp 
    timestamp,recordDate text,PRIMARY KEY 
    (routeId,recordDate,vehicleType));
    
    CREATE TABLE TrafficKeySpace.poi_traffic(vehicleid text , 
    vehicletype text , distance bigint,  timeStamp 
    timestamp,PRIMARY KEY (vehicleid));
    

IoT Data Producer

IoT devices or connected vehicles generate huge amount of data which are extremely random and time-sensitive. These data are captured by messaging system filtered, routed and ingested to stream processors. In this article we are using Kafka as IoT data producer. This article will show the relevant portion of java classes or configuration files. For complete files, please check GitHub project.

Let’s start with Maven pom.xml file.

  <dependency>
  	  <groupId>org.apache.kafka</groupId>
	  <artifactId>kafka_2.10</artifactId>
	  <version>0.8.1</version>
  </dependency>
  <dependency>
	  <groupId>com.fasterxml.jackson.core</groupId>
	  <artifactId>jackson-core</artifactId>
	  <version>2.6.6</version>
  </dependency>
  <dependency>
	  <groupId>com.fasterxml.jackson.core</groupId>
	  <artifactId>jackson-databind</artifactId>
	  <version>2.6.6</version>
  </dependency>
  <dependency>
	  <groupId>com.fasterxml.jackson.core</groupId>
	  <artifactId>jackson-annotations</artifactId>
	  <version>2.6.6</version>
  </dependency>

We use IoTData class to define the attributes of connected vehicle.

public class IoTData implements Serializable{
	private String vehicleId;
	private String vehicleType;
	private String routeId;
	private String longitude;
	private String latitude;
}

IoTDataProducer application will produce IoT data in JSON format. We need to write a custom Kafka serializer class which will serialize IoTData objects. For this we need to implement toBytes method of kafka.serializer.Encoder interface.

public class IoTDataEncoder implements Encoder<IoTData> {
	public byte[] toBytes(IoTData iotEvent) {
		try {
			String msg = 
objectMapper.writeValueAsString(iotEvent);			       
return msg.getBytes();
		} catch (JsonProcessingException e) {}	
    }
}

Now let’s write IoTDataProducer class. We will keep the configuration details of ZooKeeper and Kafka in iot-kafka.properties which is inside resources folder. This folder also contains log4j.properties file. Remember we have created iot-data-event topic while setting up environment for Kafka. IoTDataProducer will produce message on this topic. We will set the value for serializer.class property for Kafka as IoTDataEncoder.

public class IoTDataProducer {
    properties.put("zookeeper.connect", zookeeper);
    properties.put("metadata.broker.list", brokerList);
    properties.put("request.required.acks", "1");
    properties.put("serializer.class",
        "com.iot.app.kafka.util.IoTDataEncoder");
    Producer<String, IoTData> producer = new Producer<String, 
        IoTData>(new ProducerConfig(properties));
}

In this application we will have to define the Routes and Vehicle types. For sake of simplicity we are using three routes “Route-37”, “Route-43” and “Route-82”. We have five different types of IoT Connected Vehicles which are "Large Truck", "Small Truck", "Private Car", "Bus" and "Taxi". Connected Vehicles send events in regular intervals while moving on a particular route. For this article we assume that each vehicle will send five events in random order with delay of 1 to 3 seconds. We are using Java Random class to generate values for attributes of Connected Vehicles.

private void generateIoTEvent(Producer<String, IoTData>   
        producer, String topic) {
    IoTData event = new IoTData(vehicleId, vehicleType, 
        routeId, latitude, longitude, timestamp,   
        speed,fuelLevel);
    KeyedMessage<String, IoTData> data = new 
        KeyedMessage<String, IoTData>(topic, event);
    producer.send(data);
}

 IoT Producer application is ready. We can build and run this application using below commands.

mvn package

mvn exec:java -
Dexec.mainClass="com.iot.app.kafka.producer.IoTDataProducer"

or

java -jar iot-kafka-producer-1.0.0.jar

You will see the IoT data events as shown in below snapshot.

(Click on the image to enlarge it)

Figure 2. IoT Data events

IoT Data Processor

In this section, we will develop a Spark Streaming application which will consume real time IoT Data messages and process them. This application is available at GitHub project.

Below is the pom.xml file for IoT Data Processor application.

  <dependency>
	  <groupId>org.apache.spark</groupId>
	  <artifactId>spark-core_2.10</artifactId>
	  <version>1.6.2</version>
  </dependency>
  <dependency>
	  <groupId>org.apache.spark</groupId>
	  <artifactId>spark-streaming_2.10</artifactId>
	  <version>1.6.2</version>
  </dependency>
  <dependency>
	  <groupId>org.apache.spark</groupId>
	  <artifactId>spark-streaming-
kafka_2.10</artifactId>
	  <version>1.6.2</version>
  </dependency>
  <dependency>
	  <groupId>org.apache.spark</groupId>
	  <artifactId>spark-sql_2.10</artifactId>
	  <version>1.6.2</version>
  </dependency>
  <dependency>
	  <groupId>com.datastax.spark</groupId>
	  <artifactId>spark-cassandra-
connector_2.10</artifactId>
	  <version>1.6.0</version>
  </dependency>

We need to write custom deserializer class which will deserialize IoTData JSON String to IoTData object. We need to implement fromBytes method for kafka.serializer.Decoder interface.

public class IoTDataDecoder implements Decoder<IoTData> {	
	public IoTData fromBytes(byte[] bytes) {
		try {
			return objectMapper.readValue(bytes, 
IoTData.class);
		} catch (Exception e) {}
   }
}

The resources folder will have iot-spark.properties file which has configuration key-value pair for Kafka, Spark and Cassandra. We will write IoTDataProcessor class using Spark APIs. We will start with creating SparkConf object by setting the Spark and Cassandra properties.

SparkConf conf = new SparkConf()
.setAppName(prop.getProperty("com.iot.app.spark.app.name"))
.set("spark.cassandra.connection.host",prop.getProperty("com.
iot.app.cassandra.host"))

Our application will collect streaming data in batch of five seconds. Create Java Streaming Context using SparkConf object and Duration value of five seconds. Set checkpoint directory in Java Streaming context. Read IoT date stream using KafkaUtils.createDirectStream API.

JavaStreamingContext jssc = new 
JavaStreamingContext(conf,Durations.seconds(5));
jssc.checkpoint(prop.getProperty("com.iot.app.spark.checkpoint.dir"));
JavaPairInputDStream<String, IoTData> directKafkaStream = 
KafkaUtils.createDirectStream(
			        jssc,
			        String.class,
			        IoTData.class,
			        StringDecoder.class,
			        IoTDataDecoder.class,
			        kafkaParams,
			        topicsSet
			   );

We will do first transformation using map operation to get the DStream of IotData objects. Next we want to have a pair DStream in which key will be vehicleId and value will be IoTData object and for this we will call mapToPair transformation. Current DStream can have more than one event data for same vehicleId. We want unique Vehicles per incoming batch and for that we will call reduceByKey transformation.

JavaDStream<IoTData> nonFilteredIotDataStream = 
    directKafkaStream.map(tuple -> tuple._2());
JavaPairDStream<String,IoTData> iotDataPairStream = 
    nonFilteredIotDataStream.mapToPair(iot -> new 
Tuple2<String,IoTData>(iot.getVehicleId(),iot)).reduceByKey((
    a, b) -> a);

In order to calculate the count of vehicles over the time we need to keep record of vehicles which have already been processed in previous Dstreams. To achieve this, we are going to use Spark’s stateful operation. We will use mapWithState operation available on DStreams of key-value pairs. mapWithState operation uses StateSpec.function for maintaining the state data for key. Below processedVehicleFunc is the StateSpec.function for this application.

private static final Function3<String, Optional<IoTData>, 
        State<Boolean>, Tuple2<IoTData,Boolean>> 
        processedVehicleFunc = (String, iot, state) -> {
    Tuple2<IoTData,Boolean> vehicle = new 
        Tuple2<>(iot.get(),false);
		if(state.exists()){
			vehicle = new Tuple2<>(iot.get(),true);
		}else{
			state.update(Boolean.TRUE);
		}			
		return vehicle;
	};

For stateful operation Spark stores the RDD state in on machines across the cluster. For long running application there could be large amount of data which will be stored in memory on worker machines. Based on type of application you can decide for how long you want to maintain state for a key. In this article we will keep the vehiclieId in memory for one hour. In below code we are applying a map operation and then filter operation to get the vehicles which are not processed.

JavaMapWithStateDStream<String, IoTData, Boolean, 
    Tuple2<IoTData,Boolean>> iotDStreamWithStatePairs = 
        iotDataPairStream.mapWithState(
        StateSpec.function(processedVehicleFunc)
        .timeout(Durations.seconds(3600)));

JavaDStream<Tuple2<IoTData,Boolean>> filteredIotDStreams = 
    iotDStreamWithStatePairs.map(tuple2 -> 
    tuple2).filter(tuple -> tuple._2.equals(Boolean.FALSE));

Total Traffic Processing

Total traffic data processing from IoT Data Stream using Spark is illustrated in Figure 3 below.

(Click on the image to enlarge it)

Figure 3. Total Traffic Data Processing Using Spark

Below is the transformation to process total traffic count for different types of vehicles on each route. We identify unique vehicle by routeId and vehicleType so we have created a AggregateKey class which has these two attributes. AggregateKey object will be key in pairDstream. Value for this pairDstream will be the count of vehicles. We use mapToPair transformation for each count and reduceByKey to combine the count of same AggregateKey in pair. For total vehicle count or traffic data we need to maintain state of AggregateKey.

JavaPairDStream<AggregateKey, Long> countDStreamPair = 
filteredIotDataStream
	.mapToPair(iot -> new Tuple2<>(new 
AggregateKey(iot.getRouteId(), iot.getVehicleType()), 1L))
	.reduceByKey((a, b) -> a + b);

Below function maintains running sum for AggregateKey over the time.

        long totalSum = currentSum.or(0L) + (state.exists() ? 
state.get() : 0);
        Tuple2<AggregateKey, Long> total = new Tuple2<>(key, 
totalSum);
        state.update(totalSum);

With above series of transformations, we get routeId, vehicleType and total count, which is the total traffic data. We will store this data in Cassandra database so that this can be sent to Dashboard.

We need to transform the JavaDstream from previous transformation to JavaDStream<TotalTrafficData> so that this can be persisted in Cassandra database. We write a new function for this transformation.

private static final Function<Tuple2<AggregateKey, Long>, 
TotalTrafficData> totalTrafficDataFunc = (tuple -> {
		TotalTrafficData trafficData = new 
TotalTrafficData();
		trafficData.setRouteId(tuple._1().getRouteId());

	trafficData.setVehicleType(tuple._1().getVehicleType());
		trafficData.setTotalCount(tuple._2());
		trafficData.setTimeStamp(new Date());
		trafficData.setRecordDate(new 
SimpleDateFormat("yyyy-MM-dd").format(new Date()));
		return trafficData;
	});

For saving data in Cassandra database we are using datastax’s spark Cassandra connector library. This library provides API to save DStream or RDD to Cassandra. Provide column mapping and call saveToCassandra() method.

Map<String, String> columnNameMappings = new HashMap<String, 
String>();
columnNameMappings.put("routeId", "routeid");
columnNameMappings.put("vehicleType", "vehicletype");
columnNameMappings.put("totalCount", "totalcount");
columnNameMappings.put("timeStamp", "timestamp");
columnNameMappings.put("recordDate", "recorddate");

javaFunctions(trafficDStream).writerBuilder("traffickeyspace"
, "total_traffic",
CassandraJavaUtil.mapToRow(TotalTrafficData.class, 
columnNameMappings)).saveToCassandra();

Window Traffic Processing

The second objective of IoT Data Processor is to calculate the traffic details for last 30 second Window.  We will use Spark’s Window based API for this processing. We will use reduceByKeyAndWindow method with the window duration of 30 seconds and slide interval of 10 seconds. We will not maintain any state for count here and will send the transformed data to Cassandra database. Transform the Dstream to WindowTrafficData entity class of Window_Traffic table and save to Cassandra DB like we did in total traffic processing.

JavaPairDStream<AggregateKey, Long> countDStreamPair = 
filteredIotDataStream
				.mapToPair(iot -> new Tuple2<>(new 
AggregateKey(iot.getRouteId(), iot.getVehicleType()), 1L))
				.reduceByKeyAndWindow((a, b) -> a + b, 
Durations.seconds(30), Durations.seconds(10));

POI (Point-of-Interest) Traffic Processing

Third and final objective of IoT Data Processor is to process information regarding the vehicles which are in the radius of defined point-of-interest (POI). Please see Figure 4 below.

Figure 4. Vehicles within POI radius

If you checked the IoTData messages generated by IoT Data Producer, it has latitude and longitude of the vehicle position. We will use this coordinates and compare with coordinates for POI. In this application we will monitor Route-37 and vehicle type Truck (Small Truck or Large Truck) which are in the radius (30 km) of POI. Let’s define coordinates and radius for POI. Set these values in broadcast variable.

POIData poiData = new POIData();
poiData.setLatitude(33.877495);
poiData.setLongitude(-95.50238);
poiData.setRadius(30);
 
Broadcast<Tuple3<POIData, String, String>> broadcastPOIValues 
= jssc.sparkContext().broadcast(new Tuple3<>(poiData,"Route-
37","Truck"));

We have to calculate distance between a vehicle and POI using their latitude and longitude positions. In this application we use "Haversine Formula” to calculate the great-circle distance between two points on earth. Please visit this website if you want to check the distance using this formula. For real-time applications you might want to use Google Map API or Open Street Map API. For our application, "Haversine" formula is sufficient and we will write GeoDistanceCalculator class and getDistance method in it.

public static double getDistance(double lat1, double lon1, 
double lat2, double lon2) {
	final int r = 6371;
	Double latDistance = Math.toRadians(lat2 - lat1);
	Double lonDistance = Math.toRadians(lon2 - lon1);
	Double a = Math.sin(latDistance / 2) * 
Math.sin(latDistance / 2) +   Math.cos(Math.toRadians(lat1))* 
Math.cos(Math.toRadians(lat2)) * Math.sin(lonDistance / 2) * 
Math.sin(lonDistance / 2);
	Double c = 2 * Math.atan2(Math.sqrt(a), Math.sqrt(1 - 
a));
	double distance = r * c;	
	return distance;
     }

For POI calculation we will use JavaDStream<IoTData> Dstream which we got after first transformation on data received from KafkaUtils. We need to apply filter transformation get Dstream of IotData in which vehicle’s routeId is equals to “Route-37”, vehicleId is like “Truck” and vehicle is in radius of POI. We need to keep this filtered IotData and POIData objects in pair and will use to transform it into entity object for (POITrafficData) Poi_Traffic table of Cassandra database.

JavaDStream<IoTData> iotDataStreamFiltered = 
nonFilteredIotDataStream
.filter(iot -> 
(iot.getRouteId().equals(broadcastPOIValues.value()._2()) && 
iot.getVehicleType().contains(broadcastPOIValues.value()._3()
) && 
GeoDistanceCalculator.isInPOIRadius(Double.valueOf(iot.getLat
itude()),
Double.valueOf(iot.getLongitude()), 
broadcastPOIValues.value()._1().getLatitude(),
broadcastPOIValues.value()._1().getLongitude(),
broadcastPOIValues.value()._1().getRadius())));

JavaPairDStream<IoTData, POIData> poiDStreamPair = 
iotDataStreamFiltered
.mapToPair(iot -> new Tuple2<>(iot, 
broadcastPOIValues.value()._1()));

Map the entity’s attributes with the table column names and call saveToCassandra() method. Please note here we are invoking withConstantTTL() method which will make this Dstream to be stored for specified time and then it will be deleted from the table. We are keeping it for two minutes.

javaFunctions(trafficDStream).writerBuilder("traffickeyspace"
, 
"poi_traffic",CassandraJavaUtil.mapToRow(POITrafficData.class
, columnNameMappings)).withConstantTTL(120)
.saveToCassandra();

All the three methods are completed and now we can invoke these methods from IotDataProcessor class and then we will start Java Streaming Context.

IoTTrafficDataProcessor iotTrafficProcessor = new 
IoTTrafficDataProcessor();
iotTrafficProcessor.processTotalTrafficData(filteredIotDataSt
ream);
iotTrafficProcessor.processWindowTrafficData(filteredIotDataS
tream);
iotTrafficProcessor.processPOIData(nonFilteredIotDataStream,b
roadcastPOIValues);
jssc.start();            
jssc.awaitTermination();

IoTDataProcessor application is ready. It’s time to build and run the application. Execute maven package command to generate jar file.

mvn package

You will get iot-spark-processor-1.0.0.jar file. In maven build we have included all the required spark libraries while assembling this jar. If you want, you can use <provided> scope for Spark related dependency in pom.xml and those libraries will not be bundled in final uber jar as they will be provided by Spark at runtime. Invoke spark-submit command like below.

spark-submit --class 
"com.iot.app.spark.processor.IoTDataProcessor” iot-spark-
processor-1.0.0.jar

IoT Data Dashboard

We will use Spring Boot for developing our dashboard application. Spring Boot provides spring data support for Cassandra database so it is easy to develop data access class and entity class. We are using spring-boot-dependencies 1.3.5 release and it supports Cassandra 2.X through library which uses DataStax Java Driver (2.0.X). We will start with pom.xml file. Code for Dashboard is available at GitHub project.

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-
websocket</artifactId>
</dependency>
<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-data-
cassandra</artifactId>
</dependency>

We will create entity classes for all three tables “Total_Traffic”, “Window_Traffic” and “Poi_Traffic”. Create DAO interfaces for all three entities which will extend CassandraRepository. Below code snippet shows DAO class for TotalTrafficData entity. We will also add custom query (if required) for fetching data from table.

@Repository
public interface TotalTrafficDataRepository extends 
CassandraRepository<TotalTrafficData>{
@Query("SELECT * FROM traffickeyspace.total_traffic WHERE 
recorddate = ?0 ALLOW FILTERING")
	 Iterable<TotalTrafficData> findTrafficDataByDate(String 
date);	 
}

We will write CassandraConfig class which will connect to Cassandra cluster and get connection for database operations.

public class CassandraConfig extends 
AbstractCassandraConfiguration{
    @Bean
    public CassandraClusterFactoryBean cluster() {
        CassandraClusterFactoryBean cluster = new 
CassandraClusterFactoryBean();

cluster.setContactPoints(environment.getProperty("com.iot.app
.cassandra.host"));

cluster.setPort(Integer.parseInt(environment.getProperty("com
.iot.app.cassandra.port")));
        return cluster;
    }
  }

We want to refresh data on dashboard automatically in fixed interval. We will use Web socket to push updated data to UI.

public class WebSocketConfig extends 
AbstractWebSocketMessageBrokerConfigurer {
public void registerStompEndpoints(StompEndpointRegistry 
registry) {
        registry.addEndpoint("/stomp").withSockJS();
    }

public void configureMessageBroker(MessageBrokerRegistry 
config) {
        config.enableSimpleBroker("/topic");
    }
}

Next we write TrafficDataService class which will pull data from Cassandra tables using repository interfaces. This service class will then send this data over the Web Socket to the dashboard in every five seconds. This class uses Spring’s SimpMessagingTemplate to send Response object to dashboard.

private SimpMessagingTemplate template;
@Scheduled(fixedRate = 5000)
public void trigger() {
		Response = new Response();
		response.setTotalTraffic(totalTrafficList);
		response.setWindowTraffic(windowTrafficList);
		response.setPoiTraffic(poiTrafficList);
		this.template.convertAndSend("/topic/trafficData", 
response);
	}

Below is the IotDataDashboard class which is a Spring Boot application class and runs on port 8080.

public class IoTDataDashboard {
	  public static void main(String[] args) {
	        SpringApplication.run(IoTDataDashboard.class, 
args);
	    }
	}

IoT Data Dashboard UI page is a html page which is available at resources/static folder. We need to add jQuery, Sockjs and Stomp javascript libraries so it can receive messages on Web Socket.

    <script type="text/javascript" src="js/jquery-
1.12.4.min.js"></script>
	<script type="text/javascript" src="js/sockjs-
1.1.1.min.js"></script>
	<script type="text/javascript" 
src="js/stomp.min.js"></script>

Add code to receive data from Web Socket and parse the Response object as JSON data.

      var totalTrafficList = jQuery("#total_traffic");
      stompClient.connect({ }, function(frame) {
	stompClient.subscribe("/topic/trafficData", 
function(data) {
			var dataList = data.body;
			var resp=jQuery.parseJSON(dataList);
						totalTrafficList.html(t_tabl_start+totalOutput+t_tabl
_end)	
}

We want to display traffic data on dashboard using graphs and charts. For that we will use Chart.js library. Chart.js provides rich API for different types of charts. Please see the Chart.js documentation page for API details. We want that this dashboard to be responsive so it can be viewed on devices of different screen sizes or resolutions. We are adding bootstrap.js library and bootstrap.css for responsive web UI.

<script type="text/javascript" 
src="js/bootstrap.min.js"></script>
<script type="text/javascript" 
src="js/Chart.min.js"></script>
<head>
    <title>IoT Traffic Data Dashboard</title>
    <link rel="stylesheet" type="text/css" 
href="css/bootstrap.min.css">
    <link rel="stylesheet" type="text/css" 
href="css/style.css">
</head>

IoT Data Dashboard application is ready. We can build and run this application using below commands.

mvn package

mvn exec:java -Dexec.mainClass=" 
com.iot.app.springboot.dashboard.IoTDataDashboard"

Or

java -jar iot-springboot-dashboard-1.0.0.jar

Now open the browser and enter http://localhost:8080 . You will see dashboard displaying data in tables and charts. This dashboard is refreshing data in every five seconds. IoT Traffic Data Monitoring Dashboard looks like below image.

(Click on the image to enlarge it)

Figure 5. IoT Traffic Data Monitoring Dashboard

Summary

In this article we learned how real time IoT Data Events coming from Connected Vehicles can be ingested to Spark through Kafka. Using the Spark streaming API, we processed and analysed IoT data events and transformed them in to vehicle count for different types of vehicles on different routes. We performed series of stateless and stateful transformation using Spark streaming API on Dstreams and persisted them to Cassandra database tables. We developed responsive web traffic monitoring dashboard using Spring Boot, SockJs and Bootstrap which queries data from Cassandra database and pushes to UI using web socket. These applications are available inside maven aggregator project "iot-traffic-monitor" at GitHub location.

References

About The Author

Amit Baghel is a Software Architect with over 15 years of experience in design and development of enterprise applications and products around Java ecosystem. His current focus is on IoT, Cloud Computing, Big Data Solutions, Microservices, DevOps and Continuous Integration and Delivery. Amit can be reached via e-mail.

Rate this Article

Adoption
Style

Hello stranger!

You need to Register an InfoQ account or or login to post comments. But there's so much more behind being registered.

Get the most out of the InfoQ experience.

Allowed html: a,b,br,blockquote,i,li,pre,u,ul,p

Community comments

  • IoT Dashboard Issue

    by Bala ss,

  • IoT application

    by Pethuru Chelliah,

  • Alternative to simplify

    by Craig Kauffman,

  • Getting Logging Problem.

    by ajay beesam,

  • How can it recover after it's down?

    by han slam,

  • Data Processor

    by SS Samant,

    • Apache Flink

      by Luis Silveira,

      Your message is awaiting moderation. Thank you for participating in the discussion.

      Congratulations for this great article, Amit. IMHO, for a real-time traffic monitoring with subsecond latency requirements, I would suggest you to adopt Apache Flink instead of Apache Spark. Flink has a high throughput low-latency pure streaming engine, while Spark RDDs process stream of events inside a micro-batch perspective.

    • Re: Apache Flink

      by Tomasz Gawęda,

      Your message is awaiting moderation. Thank you for participating in the discussion.

      Author didn't mention any subsecond latency requirement. What's more, latency in sending data is often bigger :) Such monitoring is often used to train and use machine learning models, in Spark this is unified, especially in Structured Streaming. One class for batch and streaming data, used also by ML library. Maybe it's the topic for new article for author? :)

      So - you are right about Flink usage in real-time systems, but not in case of this article, where bigger latency is ok. In such cases Spark has many advantages

      By the way, I agree that article is very good! :)

    • Re: Apache Flink

      by Hareesh Jagannathan,

      Your message is awaiting moderation. Thank you for participating in the discussion.

      I have implemented the processor using Apache flink. Full code snippet is available at below location.
      (github.com/harsh86/iot-traffic-monitor-flink).

      I am a newbie to flink would apprciate feedbacks.

    • Thanks!!

      by Hareesh Jagannathan,

      Your message is awaiting moderation. Thank you for participating in the discussion.

      Great Article. Loved the completness of the article and authors ability to guide reader into his tought process.

    • Insightful article

      by Abhishek Yada,

      Your message is awaiting moderation. Thank you for participating in the discussion.

      Hello, Excellent piece on a end-to-end Spark appl.

    • IoT Dashboard Issue

      by Bala ss,

      Your message is awaiting moderation. Thank you for participating in the discussion.

      Great Article.

      Kafka and IoT data processor is working fine.

      I'm facing some issue while running IoT Dashboard project

      Exception in thread "main" java.lang.reflect.InvocationTargetException
      at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
      at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
      at java.lang.reflect.Method.invoke(Unknown Source)
      at org.springframework.boot.loader.MainMethodRunner.run(MainMethodRunner.java:54)
      at java.lang.Thread.run(Unknown Source)
      Caused by: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'trafficDataService': Injection of autowired dependencies failed; nested exception is org.springframework.beans.factory.BeanCreationException: Could not autowire field: private com.iot.app.springboot.dao.TotalTrafficDataRepository com.iot.app.springboot.dashboard.TrafficDataService.totalRepository; nested exception is org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'totalTrafficDataRepository': Cannot resolve reference to bean 'cassandraTemplate' while setting bean property 'cassandraTemplate'; nested exception is org.springframework.beans.factory.UnsatisfiedDependencyException: Error creating bean with name 'cassandraTemplate' defined in class path resource [org/springframework/boot/autoconfigure/data/cassandra/CassandraDataAutoConfiguration.class]: Unsatisfied dependency expressed through constructor argument with index 0 of type [com.datastax.driver.core.Session]: Error creating bean with name 'session' defined in class path resource [com/iot/app/springboot/dao/CassandraConfig.class]: Invocation of init method failed; nested exception is com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: /127.0.0.1:9042 (com.datastax.driver.core.exceptions.InvalidQueryException: unconfigured table schema_keyspaces)); nested exception is org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'session' defined in class path resource [com/iot/app/springboot/dao/CassandraConfig.class]: Invocation of init method failed; nested exception is com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: /127.0.0.1:9042 (com.datastax.driver.core.exceptions.InvalidQueryException: unconfigured table schema_keyspaces))

    • Re: IoT Dashboard Issue

      by Amit Baghel,

      Your message is awaiting moderation. Thank you for participating in the discussion.

      From the error "unconfigured table schema_keyspaces" it looks like you are using Cassandra 3.x. Please check the version compatibility of Spring Boot, Spring-Data-Cassandra and Cassandra DB.

    • IoT application

      by Pethuru Chelliah,

      Your message is awaiting moderation. Thank you for participating in the discussion.

      Well-written technical article!

    • Alternative to simplify

      by Craig Kauffman,

      Your message is awaiting moderation. Thank you for participating in the discussion.

      IoT to Kafka, MemSQL Pipelines to Kafka parallelized, MemSQL Transform to Spark for Calc / Agg /Enrich ( or Tensorflow etc), land in MemSQL (vs Cassandra, same speed on landing but much better access speeds, standard SQL and scans for Dashboards etc). This would use two proprietary technologies ( MemSQL Pipelines with Transforms to replace Spark Streaming complexity and MemSQL Db to replace Cassandra limitations). The trade would be open source to propriety gaining simplicity (vendor supports the interfaces) and pure speed and scale. MemSQL is at least as fast on landing (upserts at the same speed as a write), and much faster on data access with full ANSI SQL to access. Scale is to Pbs level. This should keep from passing from Cassandra to another warehouse for speed access and scans. I'm not trying to push a vendor on you, but 3rd gen Db solves a lot of the 2011 - 2015 architecture issues. faster, better cheaper is hard to beat. I am happy to show this alternative arch to anyone interested. Actually working on an IoT from Traffic sensor architecture right now for a non-usa entity.

    • Getting Logging Problem.

      by ajay beesam,

      Your message is awaiting moderation. Thank you for participating in the discussion.

      Excellent article.
      Please help me to solve this problem.
      I am getting problem while running the "IoT Data Processor". the problem is no logging class is found in the "org/apache/spark/Logging". While trying to use latest versions getting "StateSpec.function()" Error.

    • Re: IoT Dashboard Issue

      by ajay beesam,

      Your message is awaiting moderation. Thank you for participating in the discussion.

      Would you please tell me how you solved the "org/apache/spark/Logging" while running the "IoT Data Processor". It will great helpfull to me.

    • Re: IoT Dashboard Issue

      by Amit Baghel,

      Your message is awaiting moderation. Thank you for participating in the discussion.

      Spark version used in this article is 1.6.2. If you are using 2.x then you will have to update classes as per new APIs in 2.x. Please check stackoverflow.com for code or build related issues.

      stackoverflow.com/questions/40287289/java-lang-...

    • How can it recover after it's down?

      by han slam,

      Your message is awaiting moderation. Thank you for participating in the discussion.

      It's good article, but there one issue is that it's will restart with totalcount to 0 in the db when iot-spark-processor it's restart

    • Re: How can it recover after it's down?

      by Amit Baghel,

      Your message is awaiting moderation. Thank you for participating in the discussion.

      Thanks han. You would need to modify processor code and enable checkpointing to get JavaStreamingContext from checkpoint data. Please check document at spark.apache.org/docs/1.6.2/streaming-programmi...

    • Data Processor

      by SS Samant,

      Your message is awaiting moderation. Thank you for participating in the discussion.

      Hi Amit, thanks for the great article. Most of the code part is easy to understand, however I got stuck with data processing part in spark streaming. How do I extract message from the IoTData that matches specific pattern, lets say data producer send an alert now I need find that specific alert from the data streams and send it to the dashboard straight-away. Hint or steps to extract the alert from the stream will be much appreciated.

    • Re: Data Processor

      by Atul Kalra,

      Your message is awaiting moderation. Thank you for participating in the discussion.

      while running the Data Processor , i am getting the following error


      D:\jdk1.8.0_121\bin\java -cp "C:\Users\Dell-PC\Spark\sparkhome\bin\..\conf\;C:\Users\Dell-PC\Spark\sparkho
      me\jars\*" -Xmx1g org.apache.spark.deploy.SparkSubmit --class "com.iot.app.spark.processor.IoTDataProcesso
      r iot-spark-processor-1.0.0.jar > C:\Users\Dell-PC\AppData\Local\Temp\spark-class-launcher-output-27093.tx
      t" --usage-error
      The system cannot find the file C:\Users\Dell-PC\AppData\Local\Temp\spark-class-launcher-output-27093.txt.

      Could Not Find C:\Users\Dell-PC\AppData\Local\Temp\spark-class-launcher-output-27093.txt

      I have checked the permissions and they are granted to the folders.. Please support

    • kafka failure

      by Jasmine laurn,

      Your message is awaiting moderation. Thank you for participating in the discussion.

      Hello! can you help me please , i'm new to this

      when I execute the command to run the whole project i get this error

      ] ------------------------------------------------------------------------
      [INFO] Building IoT Kafka Producer 1.0.0
      [INFO] ------------------------------------------------------------------------
      [INFO]
      [INFO] --- exec-maven-plugin:3.0.0:java (default-cli) @ iot-kafka-producer ---
      [WARNING]
      java.lang.ClassNotFoundException: com.iot.app.springboot.dashboard.IoTDataDashboard
      at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
      at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
      at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
      at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:246)
      at java.lang.Thread.run(Thread.java:748)
      [INFO] ------------------------------------------------------------------------
      [INFO] Reactor Summary:
      [INFO]
      [INFO] IoT Kafka Producer ................................. FAILURE [ 0.458 s]
      [INFO] IoT Spark Processor ................................ SKIPPED
      [INFO] IoT Spring Boot Dashboard .......................... SKIPPED
      [INFO] IoT Traffic Monitor ................................ SKIPPED
      [INFO] ------------------------------------------------------------------------
      [INFO] BUILD FAILURE
      [INFO] ------------------------------------------------------------------------
      [INFO] Total time: 0.895 s
      [INFO] Finished at: 2021-07-02T19:57:20+00:00
      [INFO] Final Memory: 13M/153M
      [INFO] ------------------------------------------------------------------------
      [ERROR] Failed to execute goal org.codehaus.mojo:exec-maven-plugin:3.0.0:java (default-cli) on project iot-kafka-producer: An exception occured while executing the Java class. com.iot.app.springboot.dashboard.IoTDataDashboard -> [Help 1]
      [ERROR]
      [ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch.
      [ERROR] Re-run Maven using the -X switch to enable full debug logging.
      [ERROR]
      [ERROR] For more information about the errors and possible solutions, please read the following articles:
      [ERROR] [Help 1] cwiki.apache.org/confluence/display/MAVEN/MojoE...

    • Cloud based processing

      by Lawrence Smith,

      Your message is awaiting moderation. Thank you for participating in the discussion.

      What changes would be necessary to migrate to GCP Cloudrun solution?

    Allowed html: a,b,br,blockquote,i,li,pre,u,ul,p

    Allowed html: a,b,br,blockquote,i,li,pre,u,ul,p

    BT