InfoQ Homepage Articles Analytics Zoo: Unified Analytics + AI Platform for Distributed Tensorflow, and BigDL on Apache Spark

# Analytics Zoo: Unified Analytics + AI Platform for Distributed Tensorflow, and BigDL on Apache Spark

### Key Takeaways

• Today, applying deep learning to big data pipelines often requires manually “stitching together” many separate components (e.g., TensorFlow, Apache Spark, Apache HDFS, etc.), which can be complex and error-prone.
• Analytics Zoo streamlines this process by providing a unified analytics + AI platform for distributed TensorFlow, Keras and BigDL on Apache Spark.
• It seamlessly unites Spark, TensorFlow, Keras and BigDL programs into an integrated pipeline, which can transparently scale out to large Apache Hadoop/Spark clusters for distributed training or inference.
• Early users (such as World Bank, Cray, TalrooBaosightMidea/KUKA, etc.) have built analytics + AI applications on top of Analytics Zoo for a wide range of workloads (including transfer learning based image classification, sequence-to-sequence prediction for precipitation nowcasting, neural collaborative filtering for job recommendations, and unsupervised time-series anomaly detection, etc.)
• This article provides several specific tutorials on how to implement distributed TensorFlow pipelines on Apache Spark using Analytics Zoo, and end-to-end pipelines for text classification in real use cases using Analytics Zoo.

Continued advancements in artificial intelligence applications have brought deep learning to the forefront of a new generation of data analytics development. In particular, we are seeing increasing demand from organizations to apply deep learning technologies (such as computer vision, natural language processing, generative adversarial neural networks, etc.) to their big data platforms and pipelines.  Today this often requires manually “stitching together” many separate components (e.g., Apache Spark, TensorFlow, Caffe, Apache Hadoop Distributed File System (HDFS), Apache Storm/Kafka, and others), in what can be a complex and error-prone process.

At Intel, we have been working extensively with open source community users and several partners & customers including JD.comUCSFMastercard, and many others to build deep learning (DL) and AI applications on Apache Spark. To streamline end-to-end development and deployment, we have developed Analytics Zoo, a unified analytics + AI platform that seamlessly unites Spark, TensorFlow, Keras and BigDL programs into an integrated pipeline, which can transparently scale out to large Apache Hadoop/Spark clusters for distributed training or inference.

Early users such as World Bank,Cray,TalrooBaosightMidea/KUKA, and others have built analytics + AI applications on top of Analytics Zoo for a wide range of workloads. These include transfer learning based image classification, sequence-to-sequence prediction for precipitation nowcasting, neural collaborative filtering for job recommendations, and unsupervised time-series anomaly detection, among other examples.

In this article, we will provide several specific tutorials on how to implement distributed TensorFlow pipelines on Apache Spark using Analytics Zoo, and end-to-end pipelines for text classification in real use cases using Analytics Zoo.

## Distributed TensorFlow on Apache Spark

Using Analytics Zoo, the users can easily build an end-to-end deep learning pipeline on large-scale cluster using Spark and TensorFlow as follows.

### Data wrangling and analysis using PySpark

For instance, to process the training data for an object detection pipeline in a distributed fashion, one can simply read the raw image data into an RDD (Resilient Distributed Dataset), an immutable collection of records partitioned across a cluster, using PySpark, and then apply a few transformations to decode images, and extract bounding boxes and class labels, as illustrated below.

train_rdd = sc.parallelize(examples_list)
.map(lambda image: decode_to_ndarrays(image))


Each record in the result RDD (train_rdd) consists of a list of NumPy ndrrays (namely, image, bounding boxes, classes, and number of detected boxes), which can then be directly used in TensorFlow models for distributed training on Analytics Zoo; this is accomplished by creating a TFDataset from the result RDD (as shown below).

dataset = TFDataset.from_rdd(train_rdd,
names=["images", "bbox", "classes", "num_detections"],
shapes=[[300, 300, 3],[None, 4], [None], [1)]],
types=[tf.float32, tf.float32, tf.int32, tf.int32],
batch_size=BATCH_SIZE,
hard_code_batch_size=True)


### Deep learning model development using TensorFlow

In Analytics Zoo, TFDataset represents a distributed set of elements, in which each element contains one or more Tensorflow Tensor objects. We can then directly use these Tensors(as inputs) to build Tensorflow models; for instance, we can use Tensorflow Object Detection API to construct a SSDLite+MobileNet V2 model (as illustrated below):

# using tensorflow object detection api to construct model
# https://github.com/tensorflow/models/tree/master/research/object_detection
from object_detection.builders import model_builder

images, bbox, classes, num_detections = dataset.tensors

detection_model = model_builder.build(model_config, is_training=True)
resized_images, true_image_shapes = detection_model.preprocess(images)
detection_model.provide_groundtruth(bbox, classes)
prediction_dict = detection_model.predict(resized_images, true_image_shapes)
losses = detection_model.loss(prediction_dict, true_image_shapes)
total_loss = tf.add_n(losses.values())

### Distributed training/inference on Spark and BigDL

After the model construction, we can then train the model in a distributed fashion directly on top of Spark (leveraging the BigDL framework). For instance, in the code snippet below, we applies transfer learning technologies to train a Tensoflow model that has been pretrained on the MS COCO dataset.

with tf.Session() as sess:
init_from_checkpoint(sess, CHECKPOINT_PATH)
optimizer = TFOptimizer(total_loss, RMSprop(LR), sess)
optimizer.optimize(end_trigger=MaxEpoch(20))
save_to_new_checkpoint(sess, NEW_CHEKCPOINT_PATH)


Under the hood, the input data are read from disk and preprocessed to generate an RDD of Tensorflow Tensorsusing PySpark; then the Tensorflow model is trained in a distributed fashion on top of BigDL and Spark (as described in the BigDL Technical Report). The entire training pipeline can automatically scale out from a single node to a large Xeon-based Hadoop/Spark cluster (without code modifications or manual configurations).

Once the model is trained, we can also perform large-scale, distributed evaluation/inference on Analytics Zoo using PySpark, TensorFlow and BigDL (similar to the training pipeline above). Alternatively, we may also deploy the model for low latency, online serving (in, for instance, web services, Apache Storm, Apache Flink, etc.) using the POJO-style serving API provided by Analytics Zoo, as illustrated below.

AbstractInferenceModel model = new AbstractInferenceModel(){};
model.loadTF(modelPath, 0, 0, false);
List<List<JTensor>> output = model.predict(inputs);

The figure below shows the entire workflow (including training, evaluation/inference and online serving) for the distributed TensorFlow on Apache Spark pipelines in Analytics Zoo.

## End-to-end analytics and AI pipelines

Analytics Zoo also provides the users a rich set of analytics and AI support for the end-to-end pipeline, including:

• Easy-to-use abstractions, such as Spark Dataframe and ML pipeline support, transfer learning support, Keras-style API, POJO-style model serving API, and more.
• Common feature engineering operations for image, text, and 3D image.
• Built-in deep learning models, such as text classification, recommendation, object detection, image classification, and more.
• Reference use cases, such as time-series anomaly detection, fraud detection, image similarity search, and more.

Using these high level pipeline support, users can easily build complex data analytics and deep learning applications in just a few lines, as illustrated below.

### Load images into Spark DataFrames using NNImageReader

from zoo.common.nncontext import *
from zoo.pipeline.nnframes import *
sc = init_nncontext()


### Process loaded data using DataFrames transformations

getName = udf(lambda row: ...)
getLabel = udf(lambda name: ...)
df = imageDF.withColumn("name", getName(col("image"))) \
.withColumn("label", getLabel(col('name')))


### Processing image using built-in feature engineering operations

from zoo.feature.imageimport *
transformer = ChainedPreprocessing(
[RowToImageFeature(), ImageChannelNormalize(123.0, 117.0, 104.0),
ImageMatToTensor(), ImageFeatureToTensor()])

Use transfer learning API to load an existing Caffe model, remove last few layers, freeze first few layers and append a few new layers (using the Keras-style API)

from zoo.pipeline.api.netimport *
full_model = Net.load_caffe(def_path, model_path)
# Remove layers after pool5
model = full_model.new_graph(outputs=["pool5"])
# freeze layers from input to res4f inclusive
model.freeze_up_to(["res4f"])
# append a few layers
image = Input(name="input", shape=(3, 224, 224))
resnet = model.to_keras()(image)
resnet50 = Flatten()(resnet)

logits = Dense(2)(flatten)

newModel = Model(inputs, logits)

### Train model using Spark ML Pipelines

estimater = NNEstimater(newModel, CrossEntropyCriterion(), transformer) \
.setLearningRate(0.003).setBatchSize(40).setMaxEpoch(2) \
.setFeaturesCol("image").setCachingSample(False)
nnModel = estimater.fit(df)

## Real world AI use cases on Analytics Zoo

As mentioned above, there are many early users that have built real-world application on top of Analytics Zoo.. In this section, we will describe in more details how to build an end-to-end text classification pipeline using NLP technologies on Analytics Zoo by Microsoft Azure.

### Text Classification Overview

Text Classification is a common type of Natural Language Processing task, whose purpose is to classify input text corpus into one or more categories.  For example, spam email detection classifies the content of an email into spam or non-spam categories.

In general, training a text classification model involves the following steps: collect and prepare train dataset and validation dataset, data cleaning and preprocessing, train the model, validate and evaluate the model, and tune the model (which include but not limited to adding data, adjust hyper parameters, adjust models).

There are several pre-defined text classifiers in Analytics Zoo that can be used out-of-box (namely, CNN, LSTM, GRU). We chose CNN as a start. We use Python API in the following texts to illustrate the training process.

from zoo.models.textclassificationimport TextClassifier
text_classifier = TextClassifier(class_num, embedding_file, \

sequence_length=500, encoder="cnn", encoder_output_dim=256)

In the above API, class_num is the number of categories in this problem, embedding_file is the path to the pertained word embedding file (only Glove is supported at this moment), sequence_length is the number of words each text record contains, encoder is the type of word encoder (which can be cnn, lstm or gru), encoder_output_dim is the output of this encoder. This model accepts as input a sequence of world index, and outputs a label.

### Data collection and preprocessing

Each record in the training dataset contains two fields, a dialogue and a label. We collected thousands of such records, and collected labels both manually and semi-automatically.  Then we did data cleaning to original texts, where we removed meaningless tags and garbled parts, and converted them into a text RDD with each record in format of a pair (text, label). Next we did preprocessing with the text RDD and output the correct form that our model accepts. Please make sure that you keep the data cleaning and processing the same for both training and prediction!

 (How to get invoice …, 1) (Can you send invoice to me…,1) (Remote service connection failure…，2) (How to buy…, 3) …

Illustration of text RDD records after data cleaning (Each record is a pair of text andlabel)

We can use the TextSet provided by Analytics Zoo to read the text data in a distributed fashion as follows.

from zoo.feature.text import TextSet
from zoo.common.nncontext import init_nncontext

sc = init_nncontext("Text Classification")
text_set = TextSet.read(data_path, sc)
1. Tokenization

We then break the sentencesinto words, which converts each input text were into an array of tokens (words), and normalize the tokens (e.g., removing unknown characters and converting to lower case).


text_set = text_set.tokenize()   \
.normalize()

1. Sequence Aligning

Different texts may generate different sizes of token array. But a text classification model needs the fixed size of input for all records. Thus we have to align the token arrays to the same size (specified in the parametersequence_lengthin text classier). If the size of a token array is larger than the required size, we striped the words from the beginning or the end; otherwise, we padded meaningless words to the end of the array (e.g. “##”).

text_set= text_set.shape_sequence(sequence_length)
1. Word to Index

After the token array size is aligned, we need to convert each token (word) into an index, which can be used to look up its embedding latter (in the Text Classifier model). During the word-to-index, we also remove the stop words (that is, the words that frequently appear in text but do not help semantics understanding, such as “the”, “of”, etc.) by removing the top Nwords with highest frequencies in the text.

text_set= text_set.word2idx(remove_topN=10, max_words_num)
1. Conversion to Sample

After all the above steps, each text becomesa tensor with shape (sequence_length, 1). Then we constructone BigDL Sample from each record, with the generated tensor as feature, the label integer aslabel.

text_set = text_set.generate_sample()

### Model training, testing, evaluation and tuning

After prepared the train dataset (train_rdd) and the validation dataset (val_rdd) in the same way as above, we instantiate a new TextClassifier model (text_classifier), and then created an Optimizer to train the model in a distributed fashion. We used Sparse Categorical Cross Entropy as the loss function.

train_set, val_set= text_set.random_split( \
[training_split, 1 - training_split])

learningrate_decay=0.001),
loss="sparse_categorical_crossentropy", \
metrics=['accuracy'])

model.fit(train_set,batch_size=int(options.batch_size), \
nb_epoch=max_epoch, validation_data=val_set)

The tunable parameters for training include number of epochs, batch size, learning rate, and etc. You can specify validation options to output metrics such as accuracyon validation set along the training progress to detect overfit or underfit.

If the result is not good on the validation dataset, we have to tune the model. This is generally a repeated process of adjusting hyper parameters/data/model, train, and validation, until the result is good enough. We have improved our accuracy score remarkably after we tuned learning rate, added new data, and augmented the stopwords dictionary.

You may refer to these documents for more details on the text processing and classification support in Analytics Zoo.

## About the Author

Jason Dai is a senior principal engineer and CTO of Big Data Technologies at Intel, responsible for leading the global engineering teams (in both Silicon Valley and Shanghai) on the development of advanced big data analytics and machine learning. He is a committer and PMC member of Apache Spark, a mentor of Apache MXNet, a co-chair of Strata Data Conference Beijing, and the creator of BigDL (a distributed deep learning framework for Apache Spark).

Style

## Hello stranger!

You need to Register an InfoQ account or or login to post comments. But there's so much more behind being registered.

Get the most out of the InfoQ experience.

Allowed html: a,b,br,blockquote,i,li,pre,u,ul,p

Allowed html: a,b,br,blockquote,i,li,pre,u,ul,p

Allowed html: a,b,br,blockquote,i,li,pre,u,ul,p

Is your profile up-to-date? Please take a moment to review and update.

Note: If updating/changing your email, a validation request will be sent

Company name:
Company role:
Company size:
Country/Zone:
State/Province/Region:
You will be sent an email to validate the new email address. This pop-up will close itself in a few moments.