Key Takeaways
- GPUs have been widely used for accelerating deep learning, but not for data processing. As part of a major Spark initiative to better unify deep learning and data processing on Spark, GPUs are now a schedulable resource in Apache Spark 3.0.
- When combined with the RAPIDS Accelerator for Apache Spark, Spark can now accelerate SQL and DataFrame data processing on GPUs without code changes.
- The combination of Deep Java Library, Apache Spark 3.x, and NVIDIA GPU computing, simplifies end to end GPU accelerated big data processing pipelines, DL training and inference.
Many large enterprises and AWS customers are interested in adopting deep learning with business use cases ranging from customer service (including object detection from images and video streams, sentiment analysis) to fraud detection and collaboration. However, until recently, there were multiple difficulties with implementing deep learning in enterprise applications:
- The adoption learning curve was steep and required development of internal technical expertise in new programming languages (e.g., Python) and frameworks.
- Deep Learning training and Inference is compute intensive and typically performed on GPUs, while large-scale data engineering was typically programmed in Scala on multi-CPU distributed Apache Spark.
In this tutorial we share how the combination of Deep Java Learning, Apache Spark 3.x, and NVIDIA GPU computing simplifies deep learning pipelines while improving performance and reducing costs. In this post, you learn about the following:
- Deep learning use cases with Apache Spark
- Speeding up end-to-end ETL, ML, DL pipelines with Apache Spark and NVIDIA GPU computing
- DeepJavaLibrary (DJL), a Deep Learning framework implemented in Java, which aims to make popular open source deep-learning frameworks accessible to Java/Scala developers
- Creating a cluster of GPU machines and using Apache Spark with DJL on Amazon EMR to leverage large-scale image classification in Scala.
Deep Learning on Apache Spark
Data processing and deep learning are often split into two pipelines, one for ETL processing, and one for model training. Enabling deep learning frameworks to integrate with ETL jobs allows for more streamlined ETL/DL pipelines.
Apache Spark has emerged as the standard framework for large-scale, distributed, data analytics processing. Apache Spark's popularity comes from the ease-of-use APIs and high-performance big data processing. Spark is integrated with high-level operators and libraries for SQL, stream processing, machine learning (ML), and graph processing.
Many developers are looking for an efficient and easy way to integrate their deep learning (DL) applications with Spark. However, there is no official support for DL in Spark. There are libraries that try to solve this problem such as TensorFlowOnSpark, Elephas, and CERN, but most of them are engine-dependent. Also most of the Deep Learning Frameworks (PyTorch, TensorFlow, Apache MXNet…) do not have good support for the Java Virtual Machine (JVM), which Spark runs on.
Deep Learning use cases with Apache Spark
In this section, we’ll walk through several DL use cases for different industries using Scala.
Financial Institutions
Machine learning and deep learning have many applications in the financial industry. J.P. Morgan summarized six initiatives for their machine learning applications: Anomaly Detection, Intelligent Pricing, News Analytics, Quantitative Client Intelligence, Smart Documents, Virtual Assistants. This indicates deep learning has its position in many business areas in financial institutions. A good example for this point comes from Monzo bank, a fast-growing UK-based “challenger bank”, which reached its 3 million customers in 2019. They successfully automated 30% to 50% of the potential user’s enquiries by applying Recurrent Neural Networks (RNNs) on their users’ sequential event data.
Customer experience is an important topic for most financial institutions. Another example of applying deep learning to improve customer experience is Mastercard, a first-tier global payment solution company. Mastercard successfully built a deep learning-based customer propensity recommendation system with Apache Spark and their credit card transaction data. Such a recommender can provide better and more suitable goods and services to their customers, potentially benefiting the customer, the merchants and Mastercard. Before this project, Mastercard built a Spark ML recommendation pipeline with traditional machine learning methods (i.e. matrix factorization with Alternating Least Square, or ALS) on their data consisting of over 1.4 billion transactions. In order to determine if new deep learning methods could improve the performance of their existing recommender system, they benchmarked 2 deep learning methods: “Neural Collaborative Filtering” and “Wide and Deep Model“. Both achieved a significant improvement compared to the traditional ALS implementation.
Financial systems require very high fault-tolerance and security levels. Java was widely used in these companies to achieve better stability. Since Financial systems also face the challenges of huge amounts of data (1.4 Billion transactions), big data pipelines like Apache Spark are a natural choice to process the data. The combination of Java/Scala with Apache Spark is predominant in these fields.
Big Data Analytics
As the data continues to grow, there is a new type of company that mines and analyzes business data. They serve as a third-party to help their client to explore the valuable information from their data. This data is typically system logs, anonymous non-sensitive customer information, sales and transaction records. As an example, TalkingData is a data intelligence service provider that offers data products and services to provide businesses insights on consumer behavior, preferences, and trends. One of TalkingData’s core services is leveraging machine learning and deep learning models to predict consumer behaviors (e.g., likelihood of a particular group to buy a house or a car) and use these insights for targeted advertising. Currently, TalkingData is using a Scala based big data pipeline to process hundreds of million data a day. They built a Deep Learning model and used it across a Spark cluster to do distributed inference tasks. Compared to single machine inference, the Spark cluster reduced the total inference time from 8 hours to less than 3 hours. They chose DJL with Spark for the following reasons:
- DJL eliminates the need to maintain additional infrastructure other than Apache Spark. DJL let TalkingData fully utilize the computing power from Apache Spark for inference. If Spark is already being used for big data processing, this computation power can be reused to deal with DL inference. This helps to further save money and improve efficiency.
- Spark has a good fault-tolerant mechanism to help manage failure in the middle of processes. The DL inference on big data can last for hours and even tens of hours. In that case, they are looking for an auto-recovery solution for these machines.
- DJL is framework-agnostic, which gives TalkingData the ability to deploy any deep learning model (i.e, Tensorflow, PyTorch, MXNet, etc.) without any deployment code change, reducing time to market for TalkingData’s new products/services.
For the online retail industry, recommendations and Ads are important to provide a better customer experience and revenue. The data sizes are usually enormous and they need a big data pipeline for them to clean up and extract the valuable information. Apache Spark becomes a natural fit to help deal with these tasks.
Today more and more companies are taking a personalized approach to content and marketing. Amazon Retail used Apache Spark on Amazon EMR to achieve this goal. They created a multi-label classification model to understand customer action propensity across thousands of product categories and used these propensities to create a personalized experience for customers. Amazon Retail built a Scala-based big data pipeline to consume hundreds of million records and used DJL to do DL inference on their model.
Accelerating ML pipelines with Apache Spark and NVIDIA GPU Computing
As shown above, many companies and institutions are using Apache Spark for their Deep Learning tasks. However, with the growing size and complexity of their Deep Learning models, developers are leveraging GPUs to do their training and inference jobs. The CPU only computational power on Apache Spark is not sufficient enough to handle large models.
GPUs, with their massively parallel architecture, are driving the advancement of deep learning (DL) in the past several years. With GPUs, you can exploit data parallelism through columnar data processing instead of traditional row-based reading designed initially for CPUs. This provides higher performance and cost savings.
Apache Spark 3.0 represents a key milestone in this advancement, combining GPU acceleration with large-scale distributed data processing and analytics. Spark 3.0 can now schedule GPU-accelerated ML and DL applications on Spark clusters with GPUs. Spark conveys these resource requests to the underlying cluster manager. Also, when combined with the RAPIDS Accelerator for Apache Spark, Spark can now accelerate SQL and DataFrame data processing with GPUs without code changes. Because this functionality allows you to run distributed ETL, DL training, and inference at scale, it helps accelerate big data pipelines to leverage DL applications.
In Spark 3.0, you can now have a single pipeline, from data ingestion to data preparation to model training on a GPU-powered cluster.
Before Apache Spark 3.0, using GPUs was difficult. Users had to manually assign NVIDIA GPU devices to a Spark job and hardcode all configurations for every executor/task to leverage different GPUs on a single machine. Because the Apache Hadoop 3.1 Yarn cluster manager allows GPU coordination among different machines, Apache Spark can now work alongside it to help pass the device arrangement to different tasks. Users can simply specify the number of GPUs to use and how those GPUs should be shared between tasks. Spark handles the assignment and coordination of the tasks.
To leverage the best power from it, let’s discuss the following two components:
Rapids: the NVIDIA GPU powered Spark data pipeline
The RAPIDS Accelerator for Apache Spark combines the power of the RAPIDS library and the scale of the Spark distributed computing framework. In addition, RAPIDS integration with ML/DL frameworks enables the acceleration of model training and tuning. This allows data scientists and ML engineers to have a unified, GPU-accelerated pipeline for ETL and analytics, while ML and DL applications leverage the same GPU infrastructure, removing bottlenecks, increasing performance, and simplifying clusters.
Apache Spark-accelerated end-to-end ML platform stack
NVIDIA worked with the Apache Spark community to add GPU acceleration on several leading platforms, including Google Cloud, Databricks, Cloudera and Amazon EMR making it easy and cost-effective to launch scalable, cloud-managed Apache Spark clusters with GPU acceleration.
Amazon EMR, Spark + NVIDIA RAPIDS Accelerator GPU vs. CPU Performance Comparison
For its experiments to compare CPU vs. GPU performance for Spark 3.0.1 on AWS EMR, the NVIDIA RAPIDS accelerator team uses 10 TB of simulated data and queries designed to mimic large scale ETL from a retail or company (similar to TPC-DS). This comparison was run both on a CPU cluster and a GPU cluster with 3TB TPC-DS data stored on AWS S3. The CPU cluster consisted of 8 instances of m5d.2xlarge as workers and 1 instance of m5d.xlarge as a master. The GPU cluster consisted of 8 instances of g4dn.2xlarge as workers, which has one NVIDIA T4 GPU in each instance (the most cost-effective GPU instances in the cloud for ML) and 1 instance of m5d.xlarge as a master. The CPU cluster costs $3.91 per hour and the GPU cluster costs $6.24 per hour.
In this experiment, the RAPIDS Accelerator team used a query similar to TPC-DS query 97. Query 97 calculates counts of promotional sales and total sales, and their ratio from the web channel for a particular item category and month to customers in a given time zone. You can see from the Spark Physical plan and DAG for query 97 shown below, that every line of the Physical plan has a GPU prefix attached to it, meaning that every operation of that query runs entirely on the GPU.
Spark SQL query 97 DAG
With this query running almost completely on the GPU, processing time was sped up by a factor of up to 2.6x with 39% cost savings compared to running the job on the Spark CPU cluster. Note that there was no tuning, nor code changes for this query.
Improvements in query time and total costs.
In addition, the NVIDIA RAPIDS accelerator team has run queries with Spark windowing operators on EMR and seen speeds up to 30x faster on GPU than CPU on large datasets.
DJL: Deep Learning Framework in Java
Deep Java Library (DJL) is a Deep Learning Framework written in Java, supporting both training and inference. DJL is built on top of modern Deep Learning engines (TensorFlow, PyTorch, MXNet, etc). It provides a viable solution for users who are interested in Scala/Java or are looking for a solution to integrate DL into their Scala-based big data pipeline. DJL aims to make deep-learning open source tools accessible to developers/data engineers who use primarily Java/Scala by using familiar concepts and intuitive APIs. You can easily use DJL to train your model or deploy a model trained using Python from a variety of engines without any additional conversion.
By combining Spark 3.x, the Rapids Accelerator for Spark and DJL, users can now build an end-to-end GPU accelerated Scala-based big data + DL pipeline using Apache Spark.
Deep Learning on Scala Example
Now let’s walk through an example using Apache Spark 3.0 with GPU for image classification tasks. This example shows a common Image Classification task on Apache Spark for Online Retail. It can be used to do content filtering like eliminating inappropriate images that merchants have uploaded. The full project is available in the DJL demo repository.
Step 1: Prepare Spark application
1.1 Setup
For full setup information, refer to the Gradle project setup. The following section highlights some key components you need to know.
First, we’ll import the Spark dependencies. Spark SQL and ML libraries are used to store and process the images.
configurations {
exclusion
}
dependencies {
implementation "org.apache.spark:spark-sql_2.12:3.0.1"
implementation "org.apache.spark:spark-mllib_2.12:3.0.1"
implementation "org.apache.hadoop:hadoop-hdfs:2.7.4"
exclusion "org.apache.spark:spark-sql_2.12:3.0.1"
exclusion "org.apache.spark:spark-mllib_2.12:3.0.1"
exclusion "org.apache.hadoop:hadoop-hdfs:2.7.4"}
}
jar {
from {
(configurations.runtimeClasspath - configurations.exclusion).collect {
it.isDirectory() ? it : zipTree(it)
}
}
}
Next, we import the DJL-related dependencies. We use DJL API and PyTorch packages, which provide the core DJL features and load a DL engine to run for inference. We also leverage PyTorch-native-cu101 to run on GPU with CUDA 10.1.
implementation platform("ai.djl:bom:0.8.0")
implementation "ai.djl:api"
runtimeOnly "ai.djl.pytorch:pytorch-model-zoo"
runtimeOnly "ai.djl.pytorch:pytorch-native-cu101::linux-x86_64"
1.2 Load model
To load a model in DJL, we provide a URL (e.g., file://, hdfs://, s3://, https://) hosting the model. The model will be downloaded and imported from that URL.
def loadModel(device : Device): ZooModel[Row, Classifications] = {
val modelUrl = "https://alpha-djl-demos.s3.amazonaws.com/model/djl-blockrunner/pytorch_resnet18.zip?model_name=traced_resnet18"
val criteria = Criteria.builder
.setTypes(classOf[Row], classOf[Classifications])
.optModelUrls(modelUrl)
.optTranslator(new MyTranslator())
.optProgress(new ProgressBar)
.optDevice(device)
.build()
ModelZoo.loadModel(criteria)
}
The input type here is a Row in Spark SQL. The output type is a Classification result. We also defined a Translator (not shown in this document) named MyTranslator that deals with preprocessing and post-processing work. The model we load here is a pre-trained PyTorch ResNet18 model from torchvision.
1.3 Main logic
In the main function, we download images and store them into the hdfs. After that, we can create a SparkSession and use the built-in Spark image loading mechanism to load all images into Spark SQL. After this step, we use mapPartition to fetch the GPU information.
As shown in the following, TaskContext.resources()("gpu") stores the assigned GPU for this partition. We can assign the GPU id to the model to load the model on that particular GPU. This step will ensure all GPUs on a single device are properly used. To run inference, run predictor.predict(row).
def main(args: Array[String]) {
// download images
val imagePath = downloadImages(new Path("hdfs:///images"))
// Spark configuration
val spark = SparkSession.builder()
.appName("Image Classification")
.config(new SparkConf())
.getOrCreate()
val df = spark.read.format("image").option("dropInvalid", true).load(imagePath)
val result = df.select(col("image.*")).mapPartitions(partition => {
val context = TaskContext.get()
// get assigned GPU
val gpuId = context.resources()("gpu").addresses(0)
val model = loadModel(Device.gpu(gpuId.toInt))
val predictor = model.newPredictor()
partition.map(row => {
predictor.predict(row).toString
})
})(Encoders.STRING)
println(result.collect().mkString("\n"))
}
1.4 Wrap it up
Next, we run ./gradlew jar to bundle everything we need into a single jar and run it in a Spark cluster.
Optional: Setup Spark Cluster with NVIDIA GPU on AWS
With EMR release version 6.2.0 and later, you can quickly and easily create scalable and secure clusters with Apache Spark 3.x, the RAPIDS Accelerator, and NVIDIA GPU-powered Amazon EC2 instances. (To set up a cluster using the EMR console follow the instructions in this article. )
To set up a Spark cluster using AWS CLI, create a GPU cluster with three instances using the command below. To run the command successfully, you’ll need to change myKey to your EC2 pem key name. The --region flag can also be removed if you have that preconfigured in your AWS CLI.
aws emr create-cluster \
--name "Spark cluster" \
--release-label emr-6.2.0 \
--region us-east-1 \
--ebs-root-volume-size 50 \
--applications Name=Hadoop Name=Spark \
--ec2-attributes KeyName=myKey \
--instance-type g3s.xlarge \
--instance-count 3 \
--use-default-roles \
--configurations https://raw.githubusercontent.com/aws-samples/djl-demo/master/aws/emr-distributed-inference/image-classification-gpu/configurations.json
We use the g3s.xlarge instance type for testing purposes. You can choose from a variety of GPU instances that are available in AWS. The total run time for the cluster setup is around 10 to 15 minutes.
Step 2: Execute the Spark job
Now, we can run the distributed inference job on Spark. You can choose to do it on the EMR console or from the command line.
The following command tells Spark to run a Yarn cluster and setup-script to find GPUs on different devices. The GPU amount per task is set to 0.5, which means that two tasks share one GPU. You may also need to set the CPU number accordingly to ensure they match. For example, if you have an 8-core CPU and you set spark.task.cpus to 2, it means that four tasks can run in parallel on a single machine.
To achieve the best performance, you can set spark.task.resource.gpu.amount to 0.25, which allows four tasks to share the same GPU. This helps to maximize the performance because all cores in the GPU and CPU are used. Without a balanced setup, some cores will be in an idle state, which wastes resources.
spark-submit \
--master yarn \
--conf spark.executor.resource.gpu.discoveryScript=/usr/lib/spark/scripts/gpu/getGpusResources.sh \
--conf spark.worker.resource.gpu.discoveryScript=/usr/lib/spark/scripts/gpu/getGpusResources.sh \
--conf spark.task.resource.gpu.amount="0.5" \
--conf spark.task.cpus=2 \
--conf spark.executor.resource.gpu.amount=1 \
--conf spark.worker.resource.gpu.amount=1 \
--class com.examples.ImageClassificationExample \
build/libs/image-classification-gpu-1.0-SNAPSHOT.jar
This script takes around 4 to 6 minutes to finish, and you will get a printout inference result as output.
Summary
DL on Spark is growing rapidly with more applications and toolkits. Users can build their own DL with NVIDIA GPUs for better performance. Please check out the link below for more information about DJL and the Rapids Accelerator for Spark:
- DJL
- Rapids Accelerator for Apache Spark
- How Amazon retail systems run machine learning predictions with Apache Spark using Deep Java Library | Amazon Web Services
- How TalkingData uses AWS open source Deep Java Library with Apache Spark for machine learning inference at scale | Amazon Web Services
- Improving Apache Spark Performance and Reducing Costs with Amazon EMR and NVIDIA
- How Netflix uses Deep Java Library (DJL) for distributed deep learning inference in real-time | Amazon Web Services
- Adopting machine learning in your microservices with DJL (Deep Java Library) and Spring Boot | Amazon Web Services
About the Authors
Haoxuan Wang is a data scientist and software developer of Barclays, and a community member of DJL (djl.ai). He is keen to building advanced data solutions for the bank by applying innovative ideas. His main technical interest involves natural language processing, graph neural network and distributed system. He was awarded a master’s degree (distinction) in data science from University College London (UCL) in 2019.
Qing Lan is a Software Development Engineer who is passionate about Efficient Architectural Design on Morden Software/Application System. Focused on Parallel Computing and Distributed System Design. Currently working on Deep Learning Acceleration and Deep Learning Framework optimization.
Carol McDonald works in technical marketing focusing on Spark and data science. Carol has experience in many roles, including technical marketing, software architecture and development, training, technology evangelism, and developer outreach for companies including: NVIDIA, SUN, and IBM. Carol writes industry architectures, best practices, patterns, prototypes, tutorials, demos, blog posts, whitepapers, and ebooks. She has traveled worldwide, speaking and giving hands-on labs; and has developed complex, mission-critical applications in the banking, health insurance, and telecom industries. Carol holds an MS in computer science from the University of Tennessee and a BS in geology from Vanderbilt University. Carol is fluent in English, French, and German.