Facilitating the Spread of Knowledge and Innovation in Professional Software Development

Write for InfoQ


Choose your language

InfoQ Homepage Presentations Ray: the Next Generation Compute Runtime for ML Applications

Ray: the Next Generation Compute Runtime for ML Applications



Zhe Zang introduces the basic API and architectural concepts of Ray, as well as diving deeper into some of its innovative ML use cases.


Zhe Zang is currently Head of Open Source Engineering ( project) at Anyscale. Before Anyscale, Zhe spent 4.5 years at LinkedIn where he managed the Hadoop/Spark infra team. He has been working on open source for about 10 years; he's a committer and PMC member of the Apache Hadoop project, and a member of the Apache Software Foundation.

About the conference

QCon Plus is a virtual conference for senior software engineers and architects that covers the trends, best practices, and solutions leveraged by the world's most innovative software organizations.


Zhang: My name is Zhe. I lead the open source Ray team at Anyscale. The first question I'd like to ask is that, currently, how do you manage your compute resources for machine learning? I imagine many of us are facing different challenges. Today's talk will introduce Ray as a possible solution to these challenges. I think two simple approaches. I think Ray's value proposition is that Ray provides a simple and flexible API for distributed computing so that you can achieve scalability, unification, and run your workload on heterogeneous hardware.

Simple and Flexible API for Distributed Computing

The first part I want to introduce is the simple and flexible API for distributed computing. To put it simply, Ray has really a minimum set of APIs to allow general-purpose distributed computing. Later, I will provide more examples, but this table has the gist. In the beginning of any Ray program, you simply run ray.init, and then you have your Ray context initialized. Then you just decorate your normal functions and classes with a simple @ray.remote decorator. Then these functions and classes will become scheduled units to a cluster. When calling these functions and classes, you simply add .remote when triggering them. Then they will become similar to Python and Java futures for asynchronous compute. In addition to functions and classes, Ray also allows cluster wide object sharing through ray.put and ray.get. Lastly, so there's ray.wait API similar to how you do other asynchronous compute, which serves as a synchronization point.

Let's step into some more details. Ray turns functions, think about a regular Python function, into tasks as the basic unit of stateless computation, basically. Similar to RPC, you have an input, you have the stateless computation unit, and then you have the output. Ray turns any classes into actors, the basic unit of stateful computation. Basically, the actor itself has certain state to maintain. It can be updated with any new income and outcome. Think about functions and classes, when they are actually scheduled on a cluster, they become a form of distributed futures. That's how Ray enables parallelism. In addition to pure computation, Ray also provides this in-memory object store, so that in doing the functions and classes you can pass the argument and results by reference instead of by the actual data, which is much larger in a month. Ray's design principle is that we want to extend existing programming languages. Right now, we have first-class support for Python, and beta support for C++ and Java. We want to extend existing languages rather than designing a new language, which the purpose of the principle is to lower the learning burden for developers.


Let's start the more detailed parts diving into more examples. I'm showing a very simple Python function, so you cannot imagine a simpler form of code. It's basically now we're defining a function called read_array. It will open a file and read an ndarray as a variable a, and just return the variable a as a result. The second function is also very simple, it will take two arrays and add them together and return a new ndarray. In the main part of the function, we basically use the read_array function twice on two different files, file1 and file2. The result is that we add them together, we have combined ndarray as the sum. It's a very simple form of numerical computation. On the right-hand side, I have a most simple form of a class. The class maintains one internal attribute which is called self.value. Initially, self.value is set as 0, so when the class Counter is called once, it would increase the value. It's just a simple counter, and you can actually get the current value by calling increase. In the main part of the function, it will create an instance of this class and call the increase function of field types.

Let's look at how Ray magically turns this function and this class to a cluster setting. The first step is to add the decorators. If you want the read_array function to be executable on different nodes on the cluster, you simply put one line, @ray.remote, before the actual function definition similar to the function @ and similar to a class counter. Then, how do you use them? When calling these functions and classes, for example, the read_array function, you simply say id1 equals read_array.remote(file1). Because you need to get a handle of the task, to handle it as an asynchronous unit of computation. When you call the remote function, or task of add, you get another handle called id. Then, our sum is basically ray.get(id), meaning that you need to now create a synchronization point of these futures. Similar for the class example, so you decorate the class with @ray.remote, and when creating an instance of this class, you use the .remote keyword. Then, again, you need to get a handle for each unit of asynchronous compute.

Let's visualize the task example further. After decorating the read_array and the add functions, now they can be executed on different nodes on a cluster. Here the assumption is that file1 does exist on node1, file2 exists on node2. Ray's scheduler will make sure to schedule the tasks in an optimal way, optimizing for load balancing and for data locality. When we call id1 equals read_array.remote(file1), at this point, we're only constructing the compute graph. The task is not necessarily executed. We're basically saying now id1 represents this unit of compute. You can start using id1 to further construct the compute graph before the actual read_array function finishes or even starts executing. Similar for id2, is it now represents this unit of compute. Let's now construct the dynamic task graph based on these handlers. As we can see, the add function or task now uses these two handlers and creates a new handler called id. This example is simple, but you can imagine we construct more sophisticated compute graph with different handles. Only at the point where we do ray.get, this is where we know, we need the result to be computed. Then the actual computations are triggered. Node1 starts executing read_array on file1 and generating id1, so on so forth for id2, and then so on so forth for id. Ray.get, this is a synchronous or blocking call, it will block until the results become available. The task graph now is fully executed to compute the sum. Later we'll talk about different aspects of finishing this computation including scheduling and fault tolerance.

Flexible Distributed Computing Patterns

Now we have gone through the simple examples, I want to introduce what flexible distributed computing patterns these simple set of APIs will enable. I think this chart basically illustrates different patterns of parallel compute this API will allow. If you think about different kinds of compute on a larger set of data, there are different possibilities. One is a standard data parallel pattern. You can basically divide original data into different partitions and apply the same function to each data slice. This is how people do, for example, standard UDFs on different shards of data. For example, if you want to apply the same filter to different shards of data and create a unified filtered view of the larger dataset. Sometimes you will call these same data, same function. There are also more flexible non-standard patterns. For example, you may want to apply different functions to different shards of data depending on the condition of the data. You can apply very flexible check conditions, before applying the functions. For example, you want to maybe apply one filter for your website users under the age of 12, and you apply a different filter for the age range of 12 to 18. Maybe you want to apply different functions on different data. Maybe you want to do nested parallelism. You want to, for example, enable graph computation. All these become possible on this standard compute substrate of these simple and flexible, distributed computing APIs of just tasks, actors, and objects.

In the next slide, I want to introduce a more sophisticated or an advanced use case of this distributed computing API. This is the Alpa project, which is a collaboration between researchers in UC Berkeley, CMU, and Google. You can check this blog post for more details, Alpa tries to map a computational graph to a cluster of different devices. CPUs of different specs, and GPUs of different specs, and TPUs, so that we can optimize the inter-operational pass and intra-operational pass. You can move on to optimize the mapping between computational units to devices. This is pretty difficult to do with existing infrastructure, because it requires a very high level of flexibility depending on exactly the unit of computation and depending on nature of hardware device. It needs to consider the nesting nature of the inter-operational pass and intra-operational pass. Ray is serving as the compute substrate for this highly flexible compute pattern and highly innovative project.


I hope the first part of the presentation has given you an overview of the minimum set of APIs that you can start using on Ray to do cluster computing, I want to dive deeper into the different benefits that these simple and flexible APIs will provide. What do people care about when they do cluster or distributed computing? The first element many people have in mind is probably scalability. After all, people want to distribute their computation into a cluster because they want to leverage the large number of compute nodes, a large amount of hardware resources, memory CPU. Now let's talk about scalability. I want to start with this simple statement, what's the challenge of scalability on the infra layer? I think scalability, basically, the mission of the compute infra to achieve scalability is to keep removing the non-linear factors, when you want to increase the resources to deliver higher performance. Ideally, you want to put twice the amount of resources, and you want to double the performance, or you want to fulfill 200% of your compute workloads. You want to always match the performance and workload with your resources. The mission of infra is to remove the non-linear factors for you to do that. How does Ray serve the purpose of removing the non-linear factors in the system? First, I want to share a statement from Ray Summit. This is from Greg Brockman the president of OpenAI. His note is very powerful. They're using Ray to train their largest models. OpenAI itself is actually training the largest model in the entire world.

To do that they looked at a half dozen distributed computing projects, and Ray was by far the winner. Why? In the later slides, I will give more details. To simply put, quoting Greg, Ray owns a whole layer of distributed computing, and not in an opaque way. Many aspects of distributed computing come for free for OpenAI. They no longer have to develop these functionalities, handle these corner cases by themselves. Meanwhile, they have adequate control on the things they do care about.

To further illustrate these points, I want to dive a little deeper into two typical non-linear factors in cluster computing, one is fault tolerance, one is scheduling. Fault tolerance is a non-linear factor because as soon as you have different failure scenarios on one of the nodes in a cluster, you need to provide flexible handling based on the application requirements. Otherwise, the entire application could either crash or stall. Scheduling could also be a non-linear factor because certain nodes in the cluster could easily become the hotspots and the workloads might not be evenly distributed. I'll start with fault tolerance. To understand why Ray provides flexible fault tolerance, let's start looking at a Ray cluster architecture. A Ray cluster basically consists of a collection of nodes, we call them Ray nodes. Every single Ray node in the cluster is equivalent, is homogeneous in this sense with one exception, the one exception is the Head node. There's only one special component in the Head node called Global Control Store, GCS. Similar to other cluster computing frameworks like Hadoop YARN, and Kubernetes, the Global Control Store, the GCS handles certain global information and make certain global decisions. This is in our future roadmap, improving the scalability, fault tolerance of GCS is a very important item. Other than that, every single node in the Ray cluster is pretty much replaceable.

Let's dive deeper on the different components on each Ray node. On each Ray node, first we have a long running daemon that we call Raylet. This component is in charge of monitoring and managing the compute resources on this single node. It's also responsible for spinning these worker processes. In the previous section of the presentation, we talked about the concept of Ray spinning tasks and actors as a basic unit of stateless and stateful compute. Basically, we handle each actor as a worker process, and we handle each task as a function in a worker process. The Raylet is responsible for monitoring resources, and use the scheduler component in Raylet to spin and manage these worker processes. Actually, this is a unique aspect of Ray, because Raylet can make local scheduling decisions in addition to the global scheduling decision from the GCS. Another very special component in Ray, is this object store component in each Raylet. On each Ray node, it will carve out part of the local physical memory to be managed by this object store component. Ray also manages a layer of global metadata so that we can provide the illusion that you can do Put and Get anywhere in the cluster. The object store basically manages the local memory and local objects to provide global object Put and Get.

To make it more concrete, let's look at the end-to-end flow of mapping a piece of Python code all the way to physical execution. On the left-hand side is very similar but a simplified version of the task example we gave. We define two functions, a and b, in which a will actually call into b and generate an object called z. Then in the main part of the program, we first put an object called x and then we trigger a with x as the input, and y as the output. In Ray, to handle fillers for fault tolerance, a very important concept is ownership. Basically, every component that creates a task actor object becomes the owner of that object, or task, or actor, and is in charge of managing the lifecycle of that downstream component, downstream object. Then, let's look at the middle part of this graph, the task graph. How do we map the code to the task graph? Naturally, the main part of the function, which is the driver of this Ray job is the owner of both x, because it called x by calling ray.put. It's also the owner of a, because we started the a function in the main part of function, and it's also the owner of y, because we generate the y in the driver part of the code. The more interesting part is that the task a is owner of b and z because it started with z, and created the result as z. How does it map to the actual physical execution? I think the power and beauty of Ray is that it can schedule these components in different parts of a cluster in an optimal way, based on all kinds of conditions, including fault tolerance, locality, and load balancing. In this example, the driver and the task a and b can be scheduled on three nodes. Then the first node is the owner of the task a, and the first worker node is the owner of task b. The object store manages object x, y, and z. Again, they can be located on different or the same set of physical nodes. Basically, the owner plays an important role in fault tolerance of these tasks and actors and objects.

Let's dive a little deeper in the different fault tolerance model for tasks, actors, and objects. For tasks, they're stateless, so there's an input to a certain amount of computation, you generate the output. We basically rely on retries for the task with the assumption that the input is still available when doing the retry. By default, we require every task three times, but this can be customized, as I'm showing in this code snippet. You can control the maximum retry in this example, to be once instead of three times in that case. If this task fails before generating the output, we'll retry once, reading the file and generating the array. Actors is slightly more complicated because it maintains the state. We allow the developer of this actor to customize how this actor can be recovered in terms of the state. You need to specify in the class constructor, how you want the state to be restored. For example, you can periodically checkpoint the state into either the object store or external storage, and in the constructor, you can bring back the most recent checkpoints. Another difference is that by default, we're not restarting the actor, exactly because of the customizable nature of the state. You can actually configure the maximum number of restarts. For objects, we rely on lineage-based reconstruction of the objects. This is more transparent to developers. Basically, if a piece of the object is lost, we always try to reconstruct the object through the chain of construction. If we read a certain file into an object, we'll try to reread the file. If we read the file and do a simple operation, and then have the object when it got lost, we'll try to redo the operation assuming that the more upstream object is still available.

Another very common non-linear factor in doing cluster computing is scheduling Ray. How to, on one hand, have the full knowledge of all the hardware resources on the cluster, on the other hand, have a full knowledge of all the required workflows and make sure that when the amount of workload increases, we can acquire more hardware resources to fulfill the requirements. A very critical component in Ray to meet this challenge is called autoscaler. I'm showing the logical sequence or the graph of how the autoscaler makes a decision. Basically, on each Ray node, the distributed scheduler or the Raylet will always monitor the available compute resources on the node. Then it will notify the GCS about this information. The GCS will have the global view of the available resources in the cluster and it will try to calculate the matching between the workload and resources. If the cluster needs to be expanded with more resources or shrunk with less resources, it will work with the underlying infrastructure layer, being it's a cloud virtual machine, so Kubernetes to actually make the changes. In this example, we want to create more nodes to the cluster. The cloud provider, for example, AWS EC2 or Kubernetes, will, in the ideal case, provide more nodes. Then, we create more nodes in the cluster and enter the loop again. Of course, there will be corner conditions where the cloud provider is unable to add more nodes. The autoscaler has logic to handle this case, to set expectation to applications.

To give a more concrete example of how Ray's global distributed scheduling helps with the scalability and maximizing the utilization of hardware, I'm showing an example from a Ray Summit talk by the Instacart team. This use case is very exciting. It's one of the most critical use cases in Instacart, which is fulfillment. After receiving a customer order, a machine learning pipeline needs to analyze the data and generate the optimal fulfillment. First, I'm showing the legacy setup before using Ray. It's basically sharding the data into different zones and use a message broker, in this case, they're using Celery. They use a message broker based on hard partitioning and send each partition to a different computation. I think the basic bottleneck of this step is that the hard partitioning could cause a hotspot and cannot easily handle the fluctuation nature of the data. During a certain time of the day, there might be more data flying in a certain zone or certain partitioning of the entire pool of data, and this hard partition might not be able to handle that. In the new architecture it's using a global Ray cluster and global scheduling. The Ray cluster based on the global distributed computing, they will have full knowledge of all the available nodes and all the available data and the workloads. It can easily scale up and down the cluster and handle the varying degrees of workloads. As a result, you can see that before with the legacy setup, the Instacart team, they constantly need to overprovision the cluster to handle peak workloads. The utilization is basically moving between 8% to something like 16% depending on different days and different hours of the day. With the Ray setup, when the workload increases, you can see that we can easily achieve over 80% of hardware utilization. Hopefully, the previous section gave some good overview of why Ray architecturally can allow good scalability and hardware utilization on a cluster by providing strong flexible fault tolerance and global optimized scheduling.

Unification - One Substrate for ML Compute

The second aspect is of a different nature, but again, enabled by the simple flexible API of Ray. The second aspect is something that developers care about, the unification of tooling in the machine learning ecosystem, so they can easily leverage this very active ecosystem. I'll talk about unification, how Ray serves as one substrate for machine learning compute. I think the best framework on Ray so far for this regard is Ray AI runtime which we released as part of Ray 2.0 in Ray Summit. Ray AI runtime is a framework to enable different stages of the machine learning pipeline, from the very beginning how to handle dataset, to the different ways of training your model, tuning hyperparameters, and scoring the models after training, and serving the model offline. As well as more specialized machine learning, including reinforcement learning. This layer or framework will interact with the more stable layer of Ray Core and storage. This will serve as the compute foundation for machine learning engineers in their day-to-day work of constructing these pipelines. To make it more concrete, I think today's machine learning ecosystem is very active. Different new libraries, or new versions of libraries, and new functionalities in these libraries come up constantly on a weekly basis, in different stages. I think the design principle of Ray AIR is to allow the pluggability, to allow enough flexibility in different stages of the machine learning pipeline. On a data side, we're providing native support for Apache Arrow, Parquet, Snowflake, and pandas. We're trying to add more integration into Delta Lake. You can see that there's easy pluggability when constructing a dataset. When reading the dataset, basically, you can do Then, here is where you can customize different formats. You can read Parquet. You can read Snowflake. You can read pandas. Then you simply split the dataset into train and validate, and you can do simple preprocessings. You can, for example, do mean radius when doing training. Actually, training is a very active ecosystem.

If you wind back the clock, maybe a few months, so TensorFlow, PyTorch were very dominant as trainers, and XGBoost. In the example we construct an XGBoost trainer with very simple configurations. If you look at [inaudible 00:32:16], landscape is quite different. Many more people are starting to use PyTorch Lightning and Hugging Face JAX. Within the Ray AIR framework, you can change that by simply constructing a different trainer in the same Python type, so that as long as we implement a new framework as a Ray trainer, it can integrate with Ray Data in the same way. It allows this interaction of different stages in a much more scalable way. Same for tuning. You can basically construct different tuners with things like MLflow, Weights & Biases, and Optuna. Again, once a tuner is plugged in, the framework can very easily interact with the different kinds of trainers and different kinds of data sources. Same for serve, you can integrate with your favorite serving framework like FastAPI. The distributed computing part is handled by the Ray framework.

I want to put it more concretely, how people are leveraging this unification power. Here, the example I want to provide is a Ray Summit talk from Spotify's machine learning platform team. Before using Ray, the reason why the Spotify ML platform team wants to evaluate Ray, is that they are seeing this need that the ML platform is covering a few quite different and interesting personas, where there's backend engineers, there's data engineers who actually prefer a more stable pipeline, more repeatable predictable on a daily basis. There are also more exploratory personas like research scientists, they want to leverage the latest tooling and keep improving the model performance and generating revenue lift. Ideally, they want the ML platform to serve these different personas to maximize the value proposition. The reason they're choosing Ray, is that Ray provides this framework, the unification power so that they can now install Hugging Face as a trainer. They can plug in different data sources like pandas and NumPy. They can basically serve these different preferences from these different personas. They can allow these different personas of people to have familiar tooling, have faster project start time, faster prototyping, and faster experimentation without sacrificing on stability of the repeatable pipelines. To summarize, because of unification power, I think in these year's Ray Summit, we're seeing very exciting signals that the leading technical companies are actually using Ray to build their machine learning platform. I gave the example of Instacart, and OpenAI, and Spotify, but there are many more. At the end, I'll add the call to actions. One of them is that I do encourage you to check the entire Ray Summit program.

Heterogeneous Hardware

The last aspect, but definitely not the least, is the power to leverage heterogeneous hardware, I think, is really quickly becoming an inevitable trend. If you think about the different hardware vendors, Intel, NVIDIA, Google, AMD. It's quickly becoming clear that a single type of hardware will not serve all the compute purposes. Exactly, why are these leading tech companies choosing Ray to leverage heterogeneous hardware? I can give a very quick example of how Ray allows that. I think, again, it comes to the flexible API, and how Ray is the simplest tooling to allow developers to specify the preference and pass the preference all the way to the cluster level. Here I'm giving you the example of an actor. When starting a Ray actor, you can simply specify the amount of CPU and GPU resources you want to use for these actors, and you can even specify a fraction of CPU and GPUs. For example, two CPUs, half a GPU. Ray will take this global knowledge of all the hardware requirements and try to match it to the hardware on the cluster. To put it simply, other compute frameworks, they are defining these requirements more, on a collective level. For this entire job, you want to leverage a certain type of hardware. These will more and more meet the requirements of more fine-grained control. In Ray, you can specify, in my entire job, I want this actor to use this hardware profile, while another actor could use a different hardware profile. I will have concrete examples to illustrate this value proposition.

Actually, I have two examples, personally, I'm very excited about. The first one is the deep learning cluster at Uber. At Uber, a flagship use case of machine learning is called DeepETA. As the name suggests, it's about using deep learning to estimate the taxi arrival time. As you can imagine, with the amount of data and the high requirements on the accuracy, this is a very challenging problem. It involves the pipeline of loading the data, to the shuffling to maximize the randomness of each worker, and also to simple last mile preprocessing. All these data loading and manipulation is most suitable for CPUs. Meanwhile, after this data loading and manipulation, the actual deep learning training is most suitable for GPUs. They want to train TensorFlow, PyTorch, or PyTorch Lightning deep learning models, so that creates a unique challenge. How do we construct this heterogeneous cluster with different kinds of computation on different kinds of hardware. The Uber machine learning platform team has identified Ray as the most suitable solution for them to specify these hybrid or heterogeneous requirements. Note that these requirements are also evolving. They want to continue to optimize this mapping between compute image and hardware. Ray allows this fast iteration of specifying your preference on hardware and accurately fulfill these requirements. Then, by using Ray, we can actually manage this heterogeneous cluster, and we can manage this pipeline of using CPU and GPU to achieve the optimal global outcome.

A more forward-looking example, is a really exciting use case of Ray plus Qiskit, is combining the power of quantum computing and classical computing. How is this done? I couldn't have done a better visualization than the IBM team doing that. I think the power of Ray is exactly allowing the developer to quickly construct and to quickly iterate on the preferred logic on laptop. It's like the developer suddenly having the magic power of these different kinds of classical computing SKUs, and the quantum computing units on a laptop, so they can specify for this part of code, I want to execute on CPU, classical computing on the data center. For the other part of my code, I want to execute on quantum. I want to combine the results of these two parts to return to the final result I want, to present certain scientific discoveries, as an example. As the visualization shows, the user machine can now orchestrate and dispatch the different parts of the compute to different kinds of data centers, third-party data center, IBM Cloud, and IBM Quantum Center, and then combine the results back to the laptop. I think this is probably one of the most exciting demonstrations of Ray's power.


I think in summary, Ray has these set of simple and flexible APIs to provide easy access to scalable and unified distributed computing on heterogeneous hardware. I think the takeaway is, please get started with Hello World simple examples at our documentation site, Then after that, dive deeper on our GitHub projects and ask questions on our forum, Then check the Ray Summit program for the use case that matches your scenario the best. Then stay tuned, we have a regular monthly meetup series coming up covering different topics.

Nodes Where Data Resides

Is there a way to specify the exact node where you want the data to reside?

We don't expose control for where the data should reside, but for scheduling compute tasks and actors, we have a concept called placement groups for you to specify the scheduling policy.

Questions and Answers

Luu: Are there any exciting stuff coming up for Ray and Ray AIR in 2023 that you guys are planning?

Zhang: Two things. One is, in the near-term, Ray 2.2 release is coming up. Many exciting features, including automatic splitting a block. To handle large files in 2.2, we'll support automatically sharding the files and also automatic out of memory control. Both of these features will make sure that even if the data is larger than the collective memory, we can still handle. For next year, the most exciting thing is we're planning next year's Ray Summit. Actually, we also have two Ray related books coming up.

Luu: Is anyone using Ray for non-ML projects that you are aware of?

Zhang: Amazon is using Ray for large scale data processing. They call it data compaction, basically, for GDPR and other purposes. Every day, they need to run through all the large tables and do things like deleting certain records. To do that at extreme big scale, with cost efficiency, they want a fine-grained control. They're using Ray Core and Ray Data to achieve the purpose. Another very good example is Ant Group in China, it's the largest financial institution in China. They're using Ray to build online applications they call function as a service, and also graph computing, many non-ML projects. I think another notable example is OpenAI. Even though it's about building large models, training large models, but the actual technical use case is not really machine learning specific. They're using the core of Ray to launch a very large number of computational task connectors. Actually, another thing I want to mention pretty recent from Amazon re:Invent conference, there's a key announcement of the AWS Glue project supporting Ray as their third option for data processing. Previously, they supported native Python and Spark. With this new release, they're supporting Ray as the third option. Here's the O'Reilly book.

Luu: That's the second book about Ray, as far as I know, from O'Reilly.

Zhang: Yes, the first book actually is already on Amazon. It's by Holden, called, "Scaling Python with Ray."


See more presentations with transcripts


Recorded at:

Oct 05, 2023