Introducing Spring XD, a Runtime Environment for Big Data Applications
Spring XD (eXtreme Data) is Pivotal’s Big Data play. It joins Spring Boot and Grails as part of the execution portion of the Spring IO platform. Whilst Spring XD makes use of a number of existing Spring projects it is a runtime environment rather than a library or framework, comprising a bin directory with servers that you start up and interact with via a shell. The runtime can run on a developer machine, on a client's own servers, on AWS EC2, and on Cloud Foundry.
The key components in Spring XD are the Admin and Container servers. Using a DSL, you post the description of the required processing tasks to the Admin server over HTTP. The Admin server then maps the processing tasks into processing modules: a module is a unit of execution and is implemented as a Spring Application Context.
The product has two modes of operation - single and multi-node. The first is a single process that is responsible for all processing and administration. This is useful for getting started, as well as quick development and testing of an application. All the examples in this article are designed to work in single node. The second is a distributed mode. The Distributed Integration Runtime (DIRT) distributes processing tasks across multiple nodes. Aside from having these nodes as VMs or physical servers, Spring XD allows you to run them on a Hadoop YARN cluster.
The XD Admin server breaks up a processing task into individual module definitions and assigns each module to a container instance using Apache ZooKeeper. Each container listens for module definitions to which it has been assigned and deploys the module, creating a Spring Application Context. to run it. It is worth observing that, at the time of writing, Spring XD does not ship with Zookeeper. The compliant version is 3.4.6 and you can download it from here.
Modules share data by passing messages using a configured messaging middleware. The transport layer is pluggable, with support for two other Pivotal projects - Redis and Rabbit MQ - as well as an in-memory option out of the box.
The following diagram provides a high-level overview of Spring XD:
The Spring XD team have identified ingestion, real-time analytics, workflow orchestration, and export as being four major use cases common to creating Big Data solutions:
Data ingestion provides the ability to accept data from a variety of input sources and transfer it into a Big Data store such as HDFS (the Hadoop file system), Splunk, or an MPP database. As well as files, data sources might include events from mobile devices, sensors that support the MQ Telemetry Transport (MQTT) protocol, and social streams such as Twitter.
Ingestion occurs through stream processing for event driven data, and batch jobs (MR, PIG, Hive, Cascading, SQL and so on) for other types of data. These two worlds of streams and jobs are distinct, but Spring XD tries to blur the boundaries between the two using a channel abstraction, so that, for example, a stream can trigger a batch job, and a batch job can send events and, in turn, trigger other streams.
For streams some real-time analytics is supported, such as gathering metrics and counting values, through an abstraction called "Taps". Conceptually Taps allows you to tap into a stream, perform real-time analytics, and optionally populate an external system such a GemFire, Redis or another in-memory datagrid.
Once you have data in your Big Data store you need some sort of workflow tool to orchestrate the processing. Orchestration is necessary because the scripts or map-reduce jobs that you write will often be long running and take the form of a chain of events with multiple steps. Ideally you want the ability to re-start from a given step in the event of a failure, for example, rather than having to start all over again.
Finally there's a need for an export step to take the data into a system that is more suitable for presentation and perhaps further analysis. For example from HDFS to a RDBMS where you can use more conventional business intelligence tools.
Spring XD is intended to provide a unified, distributed and extensible service to meet these use cases. Rather than starting from scratch, it makes use of a number of pre-existing Spring technologies. For example, Spring Batch is used to support the workflow orchestration and export use cases, and Spring Integration supports stream processing, and the various Enterprise Application Integration patterns. Other key Spring products include Spring Data for NoSQL/Hadoop work, and Reactor which provides a simplified API for writing asynchronous applications in particular using the LMAX Disruptor.
Installing Spring XD
In the next sections we'll take a look at each of the use cases in more detail. You may want to try some of the examples yourself. Getting started is straightforward.
To get started, make sure your system has as a minimum Java JDK 6 or newer installed. Java JDK 7 is recommended.
For OSX users install Homebrew if you haven’t already, and then run:
brew tap pivotal/tap
brew install springxd
This will install to /usr/local/Cellar/springxd/1.0.0.M7/libexec (depending on the Spring XD library).
Note: if you subsequently want to install a later version then brew upgrade springXD should do the trick.
Red Hat/CentOS users can install using Yum.
Windows users should download the current .zip file, unzip it, CD to that directory, and set the environment variable XD_HOME to the installation directory.
You can start up Spring XD in single node by typing:
Open a second terminal and start the shell by typing:
You'll see something like this:
To check it works create a quick stream:
stream create --definition "time | log" --name ticktock --deploy
In the console window where you started Spring XD you should see something like:
You can delete the stream using the stream destroy command from the shell:
stream destroy --name ticktock
In Spring XD, a basic stream defines the ingestion of event driven data from a source to a sink that passes through any number of processors.
The Spring XD shell supports a DSL for stream definitions with a pipes and filters syntax - source | processor | sink.
For example a command such as stream create --name filetest --definition "file | log" --deploy will log the content of a file.
As well as working with files Spring XD supports a number of other sources including:
The command HTTP POST /streams/myStream "http | file --deploy" - says "consume my stream from HTTP and go to a file". This will default to port 9000. You can override the default port using the --port option. This is the only option for HTTP.
For example (from the XD shell):
xd:> stream create --name infoqhttptest9010 --definition "http --port=9010 | file" --deploy
You can test this by posting some data to the new port:
xd:> http post --target http://localhost:9010 --data "hello world"
You should see the following text in the console window:
> POST (text/plain;Charset=UTF-8) http://localhost:9010 hello world
> 200 OK
Open another terminal instance and type:
$ cd /tmp/xd/output
$ tail -f infoqhttptest9010.out
You should see the words "hello world" in the output.
To send binary data, set the Content-Type header to application/octet-string:
$ curl --data-binary @foo.zip -H'Content-Type: application-octet-string' http://localhost:9000
Type stream destroy infoqhttptest9010 to clean up.
Mail is the source module for receiving emails. It can work by polling or receive emails as they become available depending on the protocol used.
xd:> stream create --name infoqmailstream --definition "mail --host=imap.gmail.com --firstname.lastname@example.org --password=secret --delete=false | file" --deploy
Note: the delete option is important here since the default for Spring XD is to delete emails once they are consumed. Spring XD also has a markAsRead option but this defaults to false. The Spring Integration documentation provides a detailed explanation for this, but the main issue is that the POP3 protocol only knows what's been read within a single session. As a consequence when the POP3 mail adapter is running, emails are successfully consumed as they become available during each poll and no single email message will be delivered more than once. However, as soon as you restart your adapter and begin a new session, all the mail messages that might have been retrieved in the previous session will be retrieved again.
If you see an error in the console log such as:
WARN task-scheduler-1 org.springframework.integration.mail.ImapIdleChannelAdapter:230 - error occurred in idle task
javax.mail.AuthenticationFailedException: failed to connect, no password specified?
Try replacing the @ sign in your URL with the URL encoded version: %40:
stream create --name infoqmailstream --definition "mail --host=imap.gmail.com --username=charles%40c4media.com --password=secret --delete=false | file" --deploy
Open another terminal instance and type:
$ cd /tmp/xd/output
$ tail -f infoqmailstream.out
Send yourself an email to see it appear in the log file.
Spring XD can work with both the Twitter Search API (twittersearch) and data from Twitter's Streaming API.
For example this:
xd:> stream create --name twittersearchinfoq --definition "twittersearch --outputType=application/json --fixedDelay=1000 --consumerKey=afes2uqo6JAuFljdJFhqA --consumerSecret=0top8crpmd1MXGEbbgzAwVJSAODMcbeAbhwHXLnsg --query='infoq' | file" --deploy
uses twittersearch outputs formatted as JSON, every 1000 milliseconds, querying Twitter using the token 'infoq'. To run this you need a consumer key (an application consumer key issued by Twitter) and its corresponding secret.
The results of this are piped to a file sync which defaults to /tmp/xd/output/[streamName].out
Open another terminal instance and type:
$ cd /tmp/xd/output
$ tail -f twittersearchjava.out
You'll find after a while that you exceed the limit for Twitter API searches and will start seeing messages like this in the console window where you started XD in single node:
11:27:01,468 WARN task-scheduler-1 client.RestTemplate:581 - GET request for "https://api.twitter.com/1.1/search/tweets.json?q=infoq&count=20&since_id=478845525597237248" resulted in 429 (Client Error (429)); invoking error handler
11:27:01,471 ERROR task-scheduler-1 handler.LoggingHandler:145 - org.springframework.social.RateLimitExceededException: The rate limit has been exceeded.
Type stream destroy twittersearchinfoq to clean up.
Other Input Streams
GemFire: configures a cache and replicated region in the XD container process along with a Spring Integration GemFire inbound channel adapter, backed by a CacheListener that outputs messages triggered by an external entry event on the region. Continuous query is also supported, which allows client applications to create a GemFire query using Object Query Language (OQL) and register a CQ listener which subscribes to the query and is notified every time the query 's result set changes.
Reactor IP: acts as a server and allows a remote party to connect to XD and submit data over a raw TCP or UDP socket. The reactor-ip source differs from the standard tcp source in that it is based on the Reactor Project and can be configured to use the LMAX Disruptor RingBuffer library allowing for extremely high ingestion rates, quoted at around 1M/sec.
Syslog: Three syslog sources are provided: reactor-syslog, syslog-udp, and syslog-tcp. The reactor-syslog adapter uses tcp and builds upon the functionality available in the Reactor project and provides improved throughput over the syslog-tcp adapter.
TCP: acts as a server and allows a remote party to connect to XD and submit data over a raw TCP socket.
MQTT: connects to an MQTT server and receives telemetry messages.
At any point in a stream you can insert a tap - based on the "wire tap" pattern from the Enterprise Application Integration Patterns book by Gregor Hohpe et al.
Conceptually you insert a simple recipient list into the channel that publishes each incoming message to the main channel and a secondary channel. A stream is unaware of any taps on its pipeline. As a consequence deleting the stream will not automatically delete the taps - they have to be deleted separately. However if the tapped stream is re-created, the existing tap will continue to function.
A tap can be inserted at any point along a stream (or in multiple places).
The data in a stream can be processed in a variety of ways.
Filters: Can used to determine whether a message should be passed to the output channel. At its most simple a filter is simply an SpEL boolean expression that returns true or false. For example:
xd:> stream create --name filtertest --definition "http | filter --expression=payload=='good' | log" --deploy
will log anything with the payload "good". Filters can be fairly sophisticated however. Spring XD supports both JSONPath evaluation and custom Groovy scripts.
Transform: Used to convert a message's content or structure. It supports both simple SpEL and Groovy scripts for more complex transformations.
Splitter: Similar to the splitter concept in Spring Integration, the splitter uses an SpEL expression which evaluates to an array or a collection to split a single message into several distinct messages. You can use JSON oath expressions with these, but not custom Groovy scripts.
Aggregator: Opposite of a splitter, this combines multiple messages into a single payload.
Finally Script can be used to invoke a specific Groovy script as a processor step.
The simplest sinks are logs and files. Other supported options include Hadoop (HDFS), JDBC, TCP, Mail, RabbitMQ, GemFire server, Splunk server, and MQQT. There is also a dynamic router option which allows for the routing of Spring XD messages to named channels based on the evaluation of either an SpEL expression or Groovy scripts. One slightly surprising omission is the lack of a general purpose JMS sink, though it is possible to build a custom sink module as described here.
Spring XD provides support for the real-time evaluation of various machine learning scoring algorithms as well simple real-time data analytics using various types of counters and gauges. The analytics functionality is provided via modules that can be added to a stream. In that sense, real-time analytics is accomplished via exactly the same model as data ingestion.
Whilst it is possible for the primary role of a stream is to be to perform real-time analytics, it's quite common to add a tap to initiate a secondary stream where analytics, e.g. a field-value-counter, are applied to the same data being ingested through a primary stream.
Out of the box Spring XD provides a few simple analytics tools implemented as an Abstract API with implementations for in-memory and Redis, as follows:
- Simple Counter
- Field Value Counter: Counts the occurrence of named fields.
- Aggregate Counter: Popular in tools like Mongo and Redis, this allows you to timeslice data by, for example, minute, hour, month, year and so on.
- Gauge: Last value
- Rich Gauge: Last value, running average, min/max
For predictive analysis, Spring XD includes an extensible class library upon which implementations can be built. An example PMML Module is available in GitHub, which integrates with the JPMML-Evaluator library that provides support for a wide range of model types and is interoperable with models exported from R, Rattle, KNIME, and RapidMiner.
The product also includes some abstractions for implementing analytical models in stream processing applications. At the time of writing only Predictive Model Markup Language (PMML) is supported, but Pivotal told InfoQ:
We are working on an internal project to provide comprehensive analytics solutions that targets on use cases surrounding "fraud detection", "cyber security" etc. We also have design spikes on OSS library integration such as "stream-lib" and "graphlab".
Pivotal also made it clear that this is an area that they expect to see evolve over time, with additional support for predictive modeling planned.
Batch Jobs, Workflow Orchestration and Export
In addition to streams, Spring XD includes the ability to launch and monitor batch jobs based on Spring Batch, and Spring Batch is also used to support the workflow orchestration and export use cases.
The concept of workflow translates to a batch job, which can be thought of as a directed graph of steps, each of which is a processing step:
Steps can be executed sequentially or in parallel, depending on the configuration. They can copy or process data from files, databases, MapReduce, Pig, Hive or Cascading jobs, and are persisted with checkpoints allowing a restart. Like streams, jobs support single node or can be distributed with data partitioning.
Spring XD ships with a small number of predefined jobs for exporting data to and from the Hadoop file system HDFS. These cover FTP to HDFS, HDFS to JDBC, HDFS to MongoDB and JDBC to HDFS. There's also a job for exporting a file to JDBC. They can be found in the /libexec/xd/modules/job folder.
Spring XD provides a fairly basic browser-based GUI which currently allows you to perform batch job related tasks. Upon starting Spring XD, the admin UI is available here:
(Click on the image to enlarge it)
As can be seen in the screen-shot above, the admin UI currently comprises four tabs:
- Modules: lists the available batch job modules and more details (such as the job module options and the module XML configuration file).
- Definitions: lists the XD batch job definitions and provides actions to deploy or un-deploy those jobs.
- Deployments: lists all the deployed jobs and provides an option to launch the deployed job. Once the job is deployed, it can be launched through the admin UI as well.
- Executions: lists the batch job executions and provides an option to restart if the batch job is restartable and stopped/failed.
Spring XD is currently in development. The first milestone release shipped in June 2013 and the GA release is expected in July of this year (2014). It is licensed under Apache version 2. Source code and examples are available on GitHub. You can also find the Sonar code metrics on-line.
The product may be new, but as we've seen, it builds on mature foundations - Spring Batch, Spring Integration and Spring Data, as well as the Reactor project, LMAX Disruptor and Apache Hadoop - and provides a lightweight runtime environment that is easily configured and assembled via a DSL with little or no code. Spring XD provides a convenient way for developers to get started building a Big Data application, providing a "one stop shop" for building and deploying such applications.
About the Author
Charles Humble took over as head of the editorial team at InfoQ.com in March 2014, guiding our content creation including news, articles, books, video presentations and interviews. Prior to taking on the full-time role at InfoQ, Charles led our Java coverage, and was CTO for PRPi Consulting, a renumeration research firm that was acquired by PwC in July 2012. He has worked in enterprise software for around 20 years as a developer, architect and development manager. In his spare time he writes music as 1/3 of London-based ambient techno group Twofish.
Tom Gilb & Kai Gilb Jan 26, 2015