Facilitating the Spread of Knowledge and Innovation in Professional Software Development

Write for InfoQ


Choose your language

InfoQ Homepage Presentations Scaling Emerging AI Applications with Ray

Scaling Emerging AI Applications with Ray



Peter Schafhalter discusses about his work with Ray, a distributed execution framework for emerging AI applications, Tune, and Modin.


Peter Schafhalter is a researcher in UC Berkeley's RISELab working in distributed systems and machine learning. He has worked with Ray, RLlib, Tune, and Modin for 1.5 years. He is interested in building high-performance scalable systems that enable AI applications.

About the conference is a practical AI and machine learning conference bringing together software teams working on all aspects of AI and machine learning.


Schafhalter: I'm very excited to be here today at QCon, and today I'll be talking about "Scaling Emerging AI Applications with Ray". Ray is a distributed execution framework targeted at emerging AI applications.

A little bit about myself, I'm a researcher at UC Berkeley's RISELab. For those of you who aren't familiar with RISELab, RISELab is a successor to AMPLab which produced technologies and systems such as Apache Spark. In the fall, I'll be starting my PhD there, and I'm really excited about that. I'm, broadly speaking interested in AI systems. My belief is that work in AI and work on systems are two sides of the same coin. Advances in the AI motivate new systems, and new systems enable new advances in AI. I've been working on Ray and its libraries for 1.5 years, where I've seen it grow from a small research project to a high performance distributed system of a large amount of users. In addition to Ray, I also do research in systems for autonomous vehicles. If you're interested in that, I'm happy to talk to you afterwards.

The Machine Learning Ecosystem

To motivate Ray, let's briefly examine the machine learning ecosystem. Modern scalable AI applications need support for distributed training, distributed reinforcement learning, model serving, hyperparameter search, data processing, and streaming. All these problems are right now siloed into specialized distributed systems. These systems don't directly talk to each other, so when you want to compose them for a machine learning framework or pipeline, you'll have to pay overheads for switching compute contacts and for composing these together, but also a pain to code this together, so you'll have a softer overhead as well. This makes it difficult to develop an ML pipeline and run an end-to-end machine learning workflow.

What is Ray?

This is where Ray comes in. Instead of having a separate distributed computing framework that solves some specific part of the machine learning lifecycle, we created Ray, a high performance distributed computing system, and built libraries on top of Ray to support all these types of workflows. Using this architecture, we can avoid overheads and leverage performance of building on one system.

In order to implement these libraries, we need to expose the right abstractions to developers. On the top, we have tasks and actors. Traditionally, people write programs using functions and classes, and Ray exposes these using tasks, which can be operated on as functions, and actors, which can be operated on as classes. This model is expressive enough to support high-performance implementations of these machine learning workflows. It's also easy to reason about, unfamiliar to developers. The Ray backend also handles scheduling of these tasks and actors, and it's written in C++.

Ray is open source and it's using industry and several of these AI contexts, including hyperparameter tuning, reinforcement learning, and distributed data processing. Here are some examples of organizations using Ray for AI applications. As you can see, it's used in industry as well as in academic organizations for research.


Let's briefly go over the Ray API and how easy it is to distribute a program using Ray. In this example, we'll find the dot-product of two arrays. You start off with these functions, and then to turn them into remote tasks, you simply add the @ray.remote decorator to the functions to convert them into tasks. Then when you call them, Ray-schedules them in the backend, calling a task is just like calling a function, but you need to add .remote. When developing an application, it's really important to understand what code runs locally, versus what code runs in the cloud. This helps developers avoid bugs and quickly understand the code. Calling tasks instantly returns an object ID while the task executes in the background. The object ID acts as a future to the result of the remote task.

Let's create another array, both of these tasks may run in parallel, despite getting called one after the other. Generating an array of zeros is pretty fast, but long-running tasks involving more complicated operations results in parallelism, which can improve your performance. We can also pass object IDs to remote tasks without any modifications, just like Python objects. Under the hood, Ray manages the data communication and schedules the task, passing the results of id1 and id2 to the dot functions. Finally, we call ray.get on the object ID to get the results of id3. Ray.get is a blocking operation, so the program will wait until the dot-product functions finishes running. This returns a Python object with the results.

Let's move into actors, actors are implemented in Ray just like Python classes. Simply, we add the @ray.remote decorator again to the Python class to turn it into the actor. Actors are like remote Python classes that live in different processes or a different node. The advantages they have over tasks is that they maintain state, for example, self.value in this counter can be updated by running actor methods such as increments.

In this case, the actor requests the use of resources, one GPU, you can also request resources for a task. This is useful if your application has resource requirements like many machine learning applications. Many machine learning tasks, for example, might require GPUs to quickly run or train models. Under the hood, Ray will provision the resource and schedule the actor or task such that it can use this resource once it's available.

We can instantiate the actor just like a class. Similar to tasks, we need to add the .remote to make sure that developers know that this is not running locally. You can also call an actor method just like calling a task, like tasks, actor methods instantly return an object ID while the method executes in the background. We can implement the counter again and its internal state, self.value becomes 2. However, this does not affect the value of id4, id4 still points to the results of the first time we imcremented the counter, and this value is 1, id5, however, is 2. Note that you already passed a list of object IDs to ray.get, this will return a list of Python objects and works just like calling ray.get on each individual object ID.

Before I'd like to move on, I'd like to mention a brief caveat about the Ray API. The API is simple, but it won't make all your systems-level problems go away. Speed up from parallelism is still limited by the number of CPUs you have available. In a complicated performance-critical application, you should make careful design decisions about how many tasks of actors to run and what objects to transfer across nodes.

I'll briefly go into more of the advanced Ray features. The Ray provides a simple API for writing parallel and distributed programs, and that same code can scale from a single node all the way into the cloud. For running Ray in the cloud, we've also provided scripts for cluster setup and auto-scaling on AWS and GCP. Ray also provides fault tolerance, so if a node goes down and the task fails, it will try to relaunch these. We're also constantly working on new features such AsyncIO integration and streaming. The best thing about Ray, in my opinion, is the libraries built on top.


Let's go into these libraries, the first of these libraries this Tune. Tune takes advantage of Ray's performance and simple API to provide a library for distributed type of parameter search. Tune uses Ray's tasks and actor API, so seamlessly distributes the hyperparameter search. Before we dive into the details of Tune, maybe we should take a step back and explain, "What are hyperparameters?" Here we have an example of a convolutional neural network. CNNs are a type of network that is very powerful, and attributed to many of the recent advances in computer vision. One thing that gets us attention, though, in developing these types of metrics is the time and effort spent on designing these networks. For example, changing the shape of the neural network, the size of each layer, and the parameters that dictate the learning dynamics, such as the learning rate, can make a huge difference in the performance. All these knobs are what we call hyperparameters.

The performance of these machine learning algorithms can depend a lot on the hyperparameters you set. Here's a study of changing the size of the neural network, and using a deep learning benchmark. By simply increasing the size of some layers, you can completely change the performance. In this case, higher is better, you can see there's quite a difference between the red at the bottom, and the blue at the top. The takeaway is that choosing or finding good hyperparameters is super important for building successful AI applications. Better hyperparameters result in better models, which translates into better benefits, such as safer self-driving cars or more relevant recommendations and advertisements.

Why do we need a framework for tuning hyperparameters? Without an automated technology like Tune, you need to make manual adjustments to hyperparameters over the course of many training runs to arrive at the optimal values. Hyperparameter tuning algorithms makes the process of determining the best hyperparameter settings easier and less tedious. In addition, resources are expensive, and model training is becoming more and more computationally intensive. In 2012, the amount of compute used by the largest AI training runs has been increasing exponentially, with a 3.5 month doubling time.

Finally, the time it takes to train a model to the state of the art can take days, if not weeks, to finish. In summary, we want a system that can maximize our model performance, efficiently utilize our resources with minimal time and price. Tune is designed with these goals in mind.

As you might know, deep learning has taken off across the field. Tune is one of the only hyperparameter tuning frameworks built with deep learning as a priority. This means that Tune supports the ability to multiplex training on multiple GPUs across multiple nodes. For example, using Ray as a backend, you can do highly efficient training in certain situations, such as sharing one GPU across four models when the model isn't large. Since GPUs are typically expensive, this has the potential to save a lot of cost.

Tune also supports seamless distributed execution. Tune works on a single machine, parallelizing with search across ports and GPUs. Also, you're given the the ability to scale from a single machine to a cluster without changing a single line of your code. Tune also offers many algorithms to help guide your search for hyperparameters, if you're not satisfied with some of the algorithms in our offerings, Tune offers a simple API to add new model search algorithms. Best of all, Tune is compatible with any deep learning and machine learning framework, such as TensorFlow, and Keras, or PyTorch.

How do we use Tune? Tune is designed with ease of use in mind. Unlike many other hyperparameter search libraries, you don't need to restructure your entire code base to use it. You can use one of two interfaces: a function-based API or a class-based API, which enables additional features such as training and checkpointing. In my experience, modifying a code base to use Tune typically takes less than 10 lines of code.

Let's talk about what Tune offers for hyperparameter optimization. Tune offers search algorithms, which are algorithm-based techniques, for finding hyperparameters more quickly. These are what you would traditionally find for hyperparameter optimization, and include things such as HyperOpt and Random Search. Most hyperparameter optimization algorithms or services can be adapted to this Tune search algorithm with just a few lines of code. In yellow, we also offer trial schedulers, which are execution-based techniques for hyperparameter optimization. These techniques allow Tune to be very resource efficient. For example, HyperBand is an efficient algorithm which kills low-performing trials. Ray's actor API is a great fit for these algorithms, because it enables features such as modifications to the model during training and early termination.

The other thing that might be hard to find without Tune, you can combine these execution-based techniques in yellow, such as HyperBand, with this algorithm-based techniques and blue, such as Bayesian Optimization. Recent research has shown that combining these in certain combinations can accelerate your search process. Often, these two families of algorithms are implemented as standalone libraries, and take extra engineering effort to make sure that they are compatible with each other. With Tune, you can combine both of these algorithms with two lines of Python.

Tune API Demo

Let's quickly go over the nice features. This example will use the function-based API to search across hyperparameters to train a model, there are many more examples on our GitHub page, if you're interested. We start by importing Ray, where Tune is included as part of Ray, and Tune can actually run on other execution engines. Let's use Ray for an example, we'll start by importing Ray in Tune and initializing Ray. We take a regular Python function, which you might use to train a model, and simply made two minor modifications. We add a reporter argument, and this is a Tune object, and then we use this reporter to monitor intermediate results and early stop bad evaluations by reporting metrics down here, because you simply called the reporter with keyword arguments to report these metrics. These will actually show up in TensorBoard as well.

Next, we register this function with Tune and call run_experiments. Tune will then execute a training run of this function. You can also specify your search space along with stopping criteria like this, note that this is just a simple example, and you can check out our documentation for more complicated search spaces. In this example, we search over the learning rate and over the momentum, and we do a three-by-three grid_search. We also stop once the mean_accuracy reaches 99. Evaluations of these different hyperparameter configurations will be executed in parallel, either on your local machine or on your cluster. Meanwhile, Tune will automatically take care of logging all of your results and outputting visualization in TensorBoard. You'll see nice graphs, and you'll be able to interpret which models are training well and which aren't.

For more complicated use cases, such as a large cluster, you can specify resources needed by each evaluation with the trial_resources argument. Here, the trial_resources uses one CPU and zero GPUs for each hyperparameter configuration. You can also continuously sync your results with S3 using another argument, and you can specify your favorite scheduler for hyperparameter search, which is yet another argument.

In summary, Tune's goal is to be the best library for hyperparameter tuning. Tune was developed in the open, so you can check with the priorities on our GitHub project board. We're also focusing on extending Tune with new algorithms and schedulers, as well as improving performance. The code is all available at the Ray repository and you can visit the documentation and tune is installed via Pip.


Transitioning from Tune, we arrive at RLlib, a scalable and unified library for reinforcement learning. RLlib is an application built on top of Ray, and uses Ray's task and actor APIs to also distribute reinforcement learning. Let's briefly go over, "What is reinforcement learning?" How many of you are familiar with the concept, by show of hands? Just a few of you, so let's go into this in more detail.

The reinforcement learning is an AI technique to learn which actions are best to take in an environment, it's also known as learning using experience. In the example, you have an agent which takes an action based on the state. For example, I might be at a campfire and I might decide to put my hand in the fire. The actions change the environment, for example, my hand is now in a new location and the heat travels to my hands, heating it up, making it very hot. The agent observes this new state in the environment, and also receives a reward. For example, the agent might observe the hands in the fire, and very hot, and then might get a very low reward, indicating a poor action was taken due to the pain. The agent uses this reward and action to update its policy. For example, it might learn to not put its hand in a fire. Typically, an agent uses a deep model to learn policies, this might remind you of Tune and the possibility for hyperparameter tuning, but we'll get to that later.

Let's quickly go over some applications of reinforcement learning. One example is AlphaGo, if you haven't heard of AlphaGo, this is a computer algorithm that plays Go and it was the first algorithm to defeat a Go world champion. To learn the game, AlphaGo first learned moves with extra players using classical machine learning to initialize the model. Once it reached a certain mastery, researchers used reinforcement learning to improve AlphaGo. In 2016, AlphaGo became the first computer program to defeat a world champion in Go. In Go, for example, the observations might be the board state, the actions might be where to place your next stone, and the rewards might be whether you're winning or losing the game.

Another application is antenna tilt control, this is research right now. Here, the observations might be the positions of users, or the user signal strength, and you might change the tilt of the antenna to improve network throughput. We can also post database query optimization as an RL problem, for example, the actions might be which relations to join, and the rewards might the cost of the query.

Now that we've gone over some applications, we might go over the anatomy. First, you have the environment, this environment contains the problem you're trying to solve, and exposes actions and provides feedback, such as observations and rewards. Typically, in an industry context, a lot of engineering effort is put into this environment. When the agent interacts with the environment, the agent observes the environment states and takes actions, as we went over before. Using the reward, the agents should learn to take better actions in the future. In order to learn a policy, the agent has a model, usually, this is some sort of deep network that predicts actions using observations. Choosing a good model can make a big difference in how well an agent performs, a lot of work is also put into designing these models.

Around the model, we have the algorithm, which affects how the policy is learned. The algorithm uses prior observations, actions, and rewards to train the model, because the algorithm affects how the model is trained, choosing the right algorithm and configuring the algorithm correctly can make a big difference in terms of performance.

Now, we arrive at RLlib, here's a brief overview of the design. We first have APIs that make RL accessible to a variety of applications. For example, we have offline data support, which allows you to save the experiences that an agent might have encountered in a training run and use that to train new models offline. We also have a collection of scalable reference algorithms based on state of the art research. Under that, we add abstractions in case you'd like to implement your own algorithms.

The API integrates with Tune, let's focus on how to run RL experiments. In industrial settings, once again, the heavy lifting might have been done in developing the environment and designing the model. Using the Tune API, we tie everything together, we simply specify the algorithm and the environment. These are two arguments, for example, the environment here is CartPole-v0 in OpenAI Gym environment. We're using PPO, Proximal Policy Optimization, as the algorithm which optimizes the model, we also stop once the average episode_reward reaches 200. Each trial will use zero GPUs and one CPU, we'll have one worker.

We also integrate with Tune to search across the sgd_stepsize hyperparameter, which affects how fast the neural network learns. You can also add additional arguments for the model, for example, but I'll leave that to you to find in the documentation. Once we run this Python script in the command line, RLlib will print out useful information, such as what hyperparameter configurations are running, and how many resources are being used, as well as information of the performance. We also make graphs of the trials and results available on TensorBoard, so you can watch how your agents are performing in real time.

A brief look at the performance. Using Ray, RLlib manages to outperform competing implementations such as OpenMPI. Here, lower is better, and Ray performs much faster to train to a similar level. Due to this performance and simple API, RLlib is actually now part of the AWS stack as well in Amazon SageMaker.

Let's go over the RLlib project status, the goal is to be the best library for RL applications and RL research. It's developed in the open, it's also available at the Ray project repository. We're excited about new directions and features we have planned for the future, such as new algorithms, cross-cutting features such as environment modeling and AutoRL, and better performance, of course. Documentation is available at, also, they're at the Ray website, you can Pip install RLlib as well.


Finally, we reach Modin, which is a distributed DataFrame system that allows you to accelerate your Pandas workflows by changing a single line of code. Just like Tune and RLlib, Modin is built on Ray and uses the task and actor abstractions. The motivation for Modin is the following, right now in the industry, we have tools that perform really well on small datasets, such as one kilobyte or one megabyte, and we have tools that work really well on large datasets, on the terabyte scale. There's nothing that bridges this gap that works well on both. Modin aims to do that.

Modin is a DataFrame library, so it copies Pandas API. Pandas is a popular Python library for DataFrames. This means that any program using Pandas can also use Modin to scale the data processing. To change from Pandas to Modin, simply change one line of code, change the line "import pandas as pd" to "import modin.pandas as pd". You can also install Modin via Pip. By replacing the import from Pandas to Modin, you should, in theory, be able to get faster performance on larger data sets.

Modin has the following advantages, it's faster Pandas, even on your laptop. If you have four cores on your laptop, you can get up to a four-times speed improvement on four physical cores. We also put experimental cluster support, and we aim at making this better in the future. That'll allow you to scale your Pandas code from a single node all the way to a cluster using Modin. Modin also supports multiple backends, not only can you run it on Ray, but also using Dask, it's aimed at bridging the gap between small scale datasets and large scale datasets.

A brief look at the design. Modin implements the Pandas API, so users just simply call Pandas methods on Modin DataFrames. Underneath, Modin compiles these queries and translates these to calls for distributed data processing, and performs some optimizations underneath as well. The partition manager manages the partitioning sizing, serialization, and data distributions, it sends the optimized queries the data as well. Each partition maintains a part of the dataset and all this executes on Ray by default.

We can look at and compare Modin's performance, for example, against Pandas, the graph, across the time it takes to read a CSV file with a certain size. Less time is better here, this is run on four cores. As you can see, Modin can read a CSV file an order of magnitude faster than Pandas due to parallelization. Modin's goal is to scale Pandas workflows by changing one line of code. Modin is also developed in the open, you can check on development and contribute to the project page.

We're really excited about new features we're working on, for example, we recently added distributed series to Modin to further improve performance. We're also adding additional backends, such as Apache Arrow. We're also always working on better performance. Documentation is also available at the Modin webpage, modin.readthedocs.ion, you can install Modin using Pip.

In conclusion, Ray is a general purpose system for parallel and distributed Python and it's aimed at emerging AI applications. It includes libraries targeting machine learning ecosystems, such as Tune for distributed hyperparameter tuning, RLlib for distributed reinforcement learning, and Modin for scalable data processing. It's open source and it's available at GitHub at the Ray GitHub, and it's easy to install using Pip.

Questions and Answers

Participant 1: I'm curious, one thing about the data storage part, it seems that Ray actually, itself, is a distributed system, then how this will be integrated with existing systems like HDFS and other things?

Schafhalter: Integrated with systems such as ...?

Participant 1: HDFS, storage.

Schafhalter: Ray is different from the systems like HDFS, so it'll work together. If you have an HDFS across the cluster, for example, you'll be able to read from HDFS just like using normal libraries.

Participant 1: You mean it's just a computation layer which you can read from any place?

Schafhalter: Right now, it's more of a computation layer, yes, but we are focused on a data ingest, for example, on HDFS. This is work for the future, this currently isn't implemented.

Participant 1: One more question. We already have the Spark, which was to do a lot of data processing, but I see there's difference here. Can you elaborate more? What's the purpose, difference, between Ray and Spark?

Schafhalter: The question is, "What's the difference between Ray and Spark?" Spark is more focused at large scale data processing, while Ray is focused on machine learning applications in particular, and this has a little bit of different systems-level requirements. For example, machine learning applications might spawn many tasks, so you have a big requirement for running many tasks quickly, and high throughput for executing and scheduling these tasks. Also, compared to Spark, Ray is more focused on keeping the simple API and making it accessible to Python users, while Spark has PySpark, usually, you need some more distributed systems background to run on Spark.

Participant 1: It seems that if I need to use both Spark and Ray, do I need to set up two different clusters? Do some processing in Spark and then bring data to Ray to do more computation?

Schafhalter: Yes, that's one way you could do it, another way is you can actually run Ray within a Spark cluster. Use Spark to bring Ray nodes online, and then use Python to run on the Spark cluster.

Participant 1: You mean that when we run a Spark job inside the [inaudible 00:31:04], we would run Ray there?

Schafhalter: Yes, you could do that.

Participant 2: Thank you. I totally get it about Tune and RLlib, but for the scalable data processing use case- I get it, one terabyte and over, etc., but for the small to medium data, Dask has been working really well for me, so why would I switch from Dask and Dask DataFrame to Ray and Modin?

Schafhalter: Why switch from Dask and Dask DataFrame to Modin? If Dask is working well for you, maybe you shouldn't switch. But if you are using Pandas and working with the Pandas API, like many data scientists are, replacing Pandas with Modin is just changing one line of code and you'll have immediate performance improvements as you scale up on the size of the dataset. Maybe if you're happy with Dask and that's working well for you, maybe switching to Modin isn't the best solution.

Participant 2: You're saying the API differences in the DataFrame are even smaller than with Dask?

Schafhalter: Yes. Modin implements the Pandas API, so it's completely compatible. Changing from Pandas to Modin is just changing one line of code, one input statement.

Participant 3: Questions about PySpark and Ray differences. My experience working with PySpark, there's a lot of pain because Python is not really the first class citizen for that ecosystem, which also makes, for serializations, very unclear memory management. Does Ray read Python differently in this case?

Schafhalter: Yes. Ray treats Python differently in the sense that to run a Python script on Spark, you usually use a separate application or program called PySpark. Ray, you can run simply using Python, you'd write your Ray script, call ray.init, and then simply run Python, Ray This makes it a little bit easier to debug, for example, there's some utilities for debugging as well. We can flow Web API to look at a website to look at how tasks are running on your cluster and how long they take to run to analyze performance. Also, if nodes go down or programs crash, it's easier to read the exceptions, these get printed to the command line.

Participant 4: Actually, I'm going to jump on the question the other person asked. For divide-and-conquer algorithms where you have to divide the data that you're going to read, how do you set that up in Ray? Say you want to spin up 50 parallel Ray workers to do some task, and then there's a data space and you have to kind of divide up who gets which piece of the database, a space. If one worker fails, it needs to continue working, MapReduce and Spark deal with this. How do you deal with it in Ray?

Schafhalter: For this, you would want to partition the dataset yourself first. For example, if you a have a large NumPy array, for example, you'd want to pass a portion of that to each Ray task instead of serializing the entire dataset and then passing that object ID to each worker. The issue with that is then this object that gets transferred around, back and forth, between nodes. Generally, the better solution is to serialize individual chunks of data itself.

Participant 4: In the MapReduce world, the MapReduce framework would take care of this for people. They would just read locally from the HDFS nodes where those data ranges lived. In the end, you want to pass data range keys around, not actual data, and then have it read locally.

Schafhalter: Yes. Another way to think about Ray compared to Spark and MapReduce, is Ray's API is actually a step below MapReduce, so you can implement MapReduce on top of Ray. You need to think about the system-level challenges yourself, Ray is the tool that enables MapReduce, for example.

Participant 4: Basically, you're saying Ray is not the thing that application developers should think about using. They should use these applications already built on top of it, like Tune and RLlib.

Schafhalter: Ideally, yes, it's better to use these libraries. We do aim to keep the API simple enough so that you can work natively with Ray code as well.


See more presentations with transcripts


Recorded at:

Jun 17, 2019