Facilitating the Spread of Knowledge and Innovation in Professional Software Development

Write for InfoQ


Choose your language

InfoQ Homepage Presentations Peloton - Uber's Webscale Unified Scheduler on Mesos & Kubernetes

Peloton - Uber's Webscale Unified Scheduler on Mesos & Kubernetes



Mayank Bansal, Apoorva Jindal present Peloton, a Unified Resource Scheduler for collocating heterogeneous workloads in shared Mesos clusters. Its goal is to manage compute resources more efficiently while providing hierarchical max-min fairness guarantees for different teams. It schedules large-scale batch jobs with millions of tasks and supports distributed TensorFlow jobs with thousands of GPUs


Mayank Bansal is currently working as a Staff Engineer at Uber in data infrastructure team. He is co-author of Peloton. Apoorva Jindal is a Sr Software Engineer at Uber technologies working on micro-service deployment systems and cluster management.

About the conference

Software is changing the world. QCon empowers software development by facilitating the spread of knowledge and innovation in the developer community. A practitioner-driven conference, QCon is designed for technical team leads, architects, engineering directors, and project managers who influence innovation in their teams.


Bansal: I am Mayank Bansal from the data infra team at Uber, and here is Apoorva who is from the compute team at Uber. We've been working on Uber's open-source unified resource scheduler; we're calling it Peloton. Let's look at what the cluster management story at Uber is today. Currently, we have thousands of microservices running in production. We have thousands of builds per day, which happens in production, tens and thousands of instances deployed per day. We have 100K plus instances running per cluster. We have more than 10 million batch job containers running per day. We have thousands of GPUs per cluster and 25 plus clusters like that.

Right now all the microservices, which we call stateless jobs, run onto their own clusters. We have batch jobs, stateless services run on Mesos and Aurora. Batch jobs on Hadoop, Spark, TensorFlow, all these jobs which runs on Hadoop YARN. We have Cassandra, Redis MySQL, and other stateful services, which run on bare-metal on their own clusters. Then you have damon jobs, which actually go and run on all of these clusters, and there is no resource counting for those. The vision for the Peloton is to combine all these workloads together onto the same big cluster. We wanted to do that to improve the cluster utilization. The scale which runs is thousands and thousands of machines, and if we co-locate all these workloads together, we are envisioning that we will have a lot of resource efficiencies, which actually will be translated into millions of dollars. The other reason we can co-locate all these jobs is because the profiles of these jobs are very different. The online services or the microservices are very much latency-sensitive jobs, which cannot be preempted, because if you preempt them, you are actually impacting the business. When you try to open an Uber app and you call for an Uber, and if you preempt a service there, the guy's not going to show up; the batch jobs which have all these offline training, distributor training, machine learning jobs, spot jobs, all that analytics which can be preempted.

What we are thinking is, we will co-locate them into the same cluster. By that, we can preempt if one of the other profile spikes. We don't need to do the DR capacity based on principle. If you preempt them, we can use it on higher priority jobs. We don't need to buy extra capacity, which we do currently, because we need to over provision the services cluster as well as the batch cluster for spikes and all these DR reasons. Because of the complimentary resource nature of the different workloads, those can get the better cluster utilizations.

Existing Cluster Management Solutions

We looked at the existing cluster management solution back in the day, and then we found there is no silver bullet right now. We looked primarily for solutions Borg, YARN, Mesos, and Kubernetes. Borg is Google's cluster manager, which runs all the application workloads through controller jobs; job/task lifecycle, management, placement, preemption and allocation happens at the Borgmaster, and the Borglet does all the task execution of the country and orchestration.

YARN does it little different. It has job/task lifecycle management done through the application master, and the rest of the things happen at the resource manager, and execution happens at the node manager. Mesos is pretty much not a scheduler; it's a resource manager, so orchestration happens at the agent and the resource allocation at the master of the Mesos, and the rest of the scheduler primitives happen on these frameworks on top of Mesos. Kubernetes, same model; task execution at the Kubelet and the scheduler happens at the kube-scheduler, and all the jobs of task lifecycle happen at the controller.

We looked at all of these four schedulers at that time, and then we found we like Borg, because Borg is the only scheduler, right now at least, which we know that co-locates all different workloads together, but we cannot use it because it's not open source. YARN is a batch scheduler which is good for the batch jobs, but it's not good for the stateless services. Kubernetes is not good for batch; it does not have elastic resource sharing, elastic resource management. The throughput and the high-churn rate of batch jobs Kubernetes cannot scale. Nobody is at Uber's scale which needs 10,000 plus nodes.

Peloton Architecture

Let's look at the Peloton architecture. Peloton is a resource scheduler for co-locating mixed workloads. It runs on top of Mesos. It's integrated with Apache Spark, TensorFlow, YARN, uDeploy, uDeploy is the orchestration engine on top of our microservices. It can be run on-premise and on the cloud. Overall, how does it work? It runs on top of Mesos. Mesos just does a resource management piece and gives the resources to Peloton. Peloton manages all those resources, and all the scheduler primitives are being implemented on the Peloton side. All the orchestration engines actually are integrated through Peloton APIs, like stateless services, uDeploy, Cassandra, MySQL, Spark, Hadoop, YARN, and TensorFlow. All these are integrated through Peloton APIs.

If we compare it against other schedulers which we looked at, Peloton is pretty much how Borg works, except that resource allocation and task execution happen at the Mesos agent level. This is our overall Peloton architecture, we have Peloton UI, Peloton CLI, and Peloton extensions, which talk to the Peloton through Peloton API, which are GRPC and are backed by protobuf. We have four major components which make up Peloton. One is job manager. These are the stateless damons which run as a container in the cluster actually, and manage the job lifecycle of all the jobs. We have resource manager, which does all the elastic resource sharing, preemption, admission control, etc.

Placement engine is all of the stateless. These are the stateless damons which run and they are different for different type of workloads, because you want different placement logic for different workloads. Placement engine has batch placement engine, stateless placement engine, stateful placement engines. With the batch placement engines, you can run many of them together to increase the throughput, and that's where we scale horizontally on Peloton’s side. The host manager is the abstraction layer on top of Mesos. We use Cassandra for state management and Zookeeper for all the damons leader election.

This is the workflow of Peloton Clients; it talks to the job manager to submit a job, the job manager puts the state into Cassandra, and then the resource manager does the mission control, the placement engine goes and places the task, which is launched by two Mesos through a host manager. Now, I'll welcome Apoorva [Jindal] to talk about Peloton’s other cool features.

Elastic Resource Management

Jindal: I'm Apoorva [Jindal], and I'm one of the engineers working on the compute team at Uber on Peloton. I would like to thank you all for taking the time to listen to us. Today, I'm going to talk about some of the interesting features which we have implemented in Peloton for some of the unique use cases we have at Uber. I'm also going to talk about some of the integrations which we have done with some of the open source batch platforms.

Let's first start with batch workloads at Uber. The first feature which I would like to showcase is elastic resource management. Let's try to understand why is elastic resource management needed. Let's say we have a cluster with a finite set of resources, and we want to divide these resources between different organizations. One way to go about it is let each organization submit their jobs onto the cluster, and first come first serve. Whoever served it first wins, they get to run it first. Then once they are done, the next organization gets to run, and so on.

The issue which we saw with this approach was that if one organization submits a large number of jobs, then it pretty much takes up the entire cluster, and other jobs from other organizations get starved out, so that didn't work out for us. Next, what we tried is static quota management. We said, "Each organization has to go and figure out how many resources you need based on some expected demand." Each organization comes back, submits to us, "This is the demand which I'm going to have," and we gave them a static quota. That becomes a guaranteed quota for them that will always remain available. After we implement that, what we observed is that there were many times where the cluster was kind of idle, there was very little stuff running on it, but it had a very high demand. Why that happens is because batch jobs are very short-lived, and the request patterns from the different organizations are very dynamic.

It could be because some organizations are distributed geographically over time zones, and also there are some use cases which different organizations have which caused them to create workloads at a certain time. For example, in the cluster which serves our autonomous vehicles group, what we saw was that there was a group which used to process data being collected by the cars being driven on the road. The cars would go out in the morning, they would come back in the evening, and all the jobs would get created at 5:00 p.m. from that organization.

Then there is another organization which submits a lot of jobs to simulate the different vehicle conditions which can happen on the road. All those happen during the daytime when the engineers are in the office, so there is heavy demand from one organization during the daytime, and very high demand from another organization during the nighttime. What's the solution for this? One way to do it is, use the priority-based quota model, which Borg has. To be able to do this, what we need to do is we need to take every workload which a user can submit, and assign it a global priority across the entire company. Then what we do next is we divide the cluster capacity into different quotas, production and non-production, and hand it out to different teams.

The issue we found is that if a team has production quota but is not using it and has a lot of non-production jobs, then the resource cannot be interchanged between the two quotas. A more structured way to do this is to take the cluster and divide those cluster resources into the different organizations as a reservation or as a guarantee, that if you have demand, then you are guaranteed to get this many resources. Then we ask each organization to divide the workloads into production workloads and non-production workloads. If they submit a production workload, which is essentially a non-preemptible workload, then we place them in their guaranteed reservation. If after placing them there is still some capacity available, then we take the lower priority non-guaranteed workloads and try to place them in the same reservation.

Now, let's say for a given organization after summing up the demand from production workloads and the non-production workloads, there is still unallocated capacity from that reservation. What we do is we take that unallocated capacity and divide it, and give it to other organization to run their non-production workloads on. If there is one organization which has very high demand, and another organization which has no demand, then we take the resources from the second organization and give it to the first organization.

The way we implement it is using the concept of hierarchical resource pools. The root resource pool is essentially our cluster that gets subdivided into multiple resource pools, one for each organization, and then each organization has the liberty to further subdivide into different teams. Each resource pool has a reservation, which we talked about, then each resource pool also has allocation. If a team has a demand less than its reservation, then it gets an allocation equal to its demand. If it is more, then it gets at least equal to its reservation and then any extra capacity gets subdivided.

Let me illustrate this idea using an example. Let's say I have three resource pools and a total capacity of 100. There is very low demand in resource pool one, and resource pool at two and resource pool three have very high demand. Resource pool one gets an allocation equal to its demand, because it's less than its reservation, and resource pool two and resource pool three get the reservation, plus any extra capacity in the cluster gets divided between them equally. If the demand for resource pool one increases, then we do the same calculation again, and take the resources back from resource pool two and resource pool three, to give it back to resource pool one. Basically, we say that the non-production workloads, since they are preemptible, we have to go and preempt them from the cluster to release the capacity to give back to resource pool one.

Priority Preemption

The second feature which I want to highlight is priority preemption. At Uber we use YARN for most of the batch workloads, and one of the issues which we now face is when we rolled out YARN, we didn't assign a priority to each workload. What happens is that since there is no priority, low priority workloads get submitted which take up the capacity, higher priority workloads get submitted later, and it gets starved out. Low priority workloads keep running and higher priority has to wait for either more time, and if it's a critical workload, then actually it requires a manual intervention from our SRE team to actually go and preempt some of the low priority workloads to give the resources to the high priority workloads.

What we do in Peloton is as follows. Job manager enqueues each task with a given priority to the resource manager. All this preemption and entitlement and allocation calculation happens in resource manager, based on the capacity which it gets from the host manager. The job manager submits it to a pending queue, which is a multi-priority queue. What the resource manager does is it takes the highest priority task from the pending queue, looks at that location, and sees if it can be admitted. If it can be admitted, great, it goes ahead.

Let's say there is no resource available to place that high priority task. Then it goes and sees there is a low priority task in the same resource pool, which is taking up its current day resources. Let's say if its current day resources are being consumed by tasks which are greater than equal to the priority of the current tasks, then great, there's nothing thing we need to do. If it's taken up by a lower priority task, then we go ahead and preempt it. This allows us to remove the manual intervention to allow critical tasks to run.

The final requirement, which is very unique to us, is the requirement of large jobs. This is a requirement where we get jobs with tens of thousands and even hundreds of thousands of tasks, and the kicker is that each task has a different configuration. Why is that? The use case comes when we have to process extremely large data sets. For example, our autonomous vehicle maps group had a use case where they broke a street into small segments and processed each segment as a task, so one street is a job, and each segment on the street is a task. Of course, if the street is very big, then it results in a lot of tasks.

What are the potential issues which we see in a scheduler? Firstly, the job configuration becomes huge. If you have 100,000 tasks and each task is around 10K configuration, then the overall job configuration becomes a gig. Another issue is that if we launch all the tasks in parallel, then they can kill some of the subsystems that the job relies on. For example, for most cases we have jobs which go to a storage subsystem to read and write data to be able to process it. If we launch 100,000 tasks which go and talk to the same storage subsystem, then it kind of brings down that subsystem.

How do we solve these problems? One thing we do is we always compress configs. What we figured is that the latency to actually compress and decompress configuration is always less than the latency to actually write and read configuration from any storage system. So, always compressing configs is fine. Then, we do asynchronous task creation, so our user comes in, submits these large jobs, we write to storage and return the API, and then our goal state engine picks up that job and starts creating the task asynchronously. While it's creating those tasks, it also controls the number of parallel running tasks. If a user configures the number of parallel running tasks to be 3000, because that's the number the storage subsystem can handle, then we make sure that no more than 3000 are running, and as soon one task completes, we launch another one.

Another issue we saw was that since the data we are storing is huge and if we store everything in memory, which is what a lot of open source schedulers do as well to get scale, then our memory blows up and we keep running out of memory. So we build some kind of a cash to just store what we use very often and not store everything.

Most of the batch workloads at Uber, which are submitted to Peloton are via Spark. Why Spark on Peloton? There are multiple ways to run Spark. One way is to use YARN. The version of YARN which we have at Uber doesn't have docker support or a big container support. Another way to do it is via use Mesos, but Mesos doesn't have any quota management or elastic resource sharing, and there is also a big scale issue, because every Spark job registers as a framework, and Mesos can run 100 frameworks at a time at max, maybe 200.

Another option is to use Kubernetes, but it has no dynamic resource allocation or elastic resource sharing. Another way to do it is to say, "Anything that requires docker, let's run it on Kubernetes or Mesos, and anything that doesn't require docker, let's run it on YARN." Then we run into the resource partitioning issue, about how do we dynamically share resources between YARN and Kubernetes? Plus, to be able to do that, we run into the same problem that now we need to define global priorities across workloads, which is nearly impossible to do at Uber. How does Spark integrate with Peloton? Spark submits a job to Peloton which launches the driver task, and the driver task then talks to Peloton to launch the executer task and uses the Peloton APIs to monitor them as well.

GPUs & Deep Learning

Let's talk a little bit about deep learning use cases and GPU. At Uber, we have a very large number of deep learning use cases, and that's why as Mayank [Bansal] mentioned in our scale, we have very huge GPU clusters. The use cases range from autonomous vehicles to trip forecasting for predicting ETAs, as well as fraud detection and many more. What are the challenges which distributed TensorFlow faces? One is elastic GPU resource management, also it requires locality of placement, it needs that each task be able to discover every other task, and it also requires gang scheduling and corresponding failure handling.

What is gang scheduling? A subset of tasks in a job can be specified as a gang for scheduling purposes. Note that each task is an independent execution unit and hence runs in its own containers, and can complete and fail independently. From a scheduling perspective, all these tasks in a gang are admitted, placed, preempted, and killed as a group, so if one of the tasks fails, the entire gang fails. The way distributed TensorFlow integrates with Peloton is, the deep learning service creates the job and creates the task in that job as a gang on Peloton. Peloton makes sure that all these tasks get admitted and placed as one unit. Then, once the launch succeeds, the individual deep learning containers talk back to Peloton to discover every other task in that job, or in that gang rather.

Horovod is an open-source distributed TensorFlow implementation which integrates with Peloton, and which is what we run at Uber. We compare distributed TensorFlow and Horovod running on Peloton. For standard benchmarks, we see huge gains with Horovod. Let me spend a few minutes on some of the unique stateless features as well which we have in Peloton. I'm not going to talk about how federation system interacts with Peloton. We have a Spinnaker-like federation system called uDeploy, which integrates with Peloton, but it's closed source and not open-source.

Stateless Jobs

What's a stateless workload? Stateless workloads are long running services, which handle user facing latency sensitive user requests, and do not need any persistence state on local disk. All the microservices which serve Uber requests, like the microservice which serves Uber riders, or the microservice which serves Uber riders or drivers or payments and so on, are essentially a stateless job running on Peloton.

One of the very important requirements from these workloads is that they have strict SLA requirements. We cannot bring down a large number of instances without a user visible disruption. If you bring down a large number of instances Uber goes down.

Let's talk about the first feature which is rolling upgrade. Upgrade is changing the configuration of that service. Our service has multiple instances deployed with some configuration; you want to change it, and the most common configuration change is to change the code which is running for that service. If you bring down all instances to bring up the new configuration, then of course, we violate the SLA. What we do is we first bring down a small subset of instances and then bring them back up on the same host they were running before. If you're familiar with some of the other open source schedulers, you will see some subtle differences here. The first difference is that we don't do red black deployment, we don't bring up an instance first and then bring the instance down. The reason is that we run at such a high allocation in our clusters that we don't have spare capacity to actually bring another instance up.

The second interesting thing which you will see is that we bring it back on the same host. We try very hard to do that, and the reason is as follows. One of the unique problems at Uber is that our image sizes for our microservices tend to be really huge, so if you look at our deployment time, most of the time is spent on actually downloading the image during the deployment. Anything we can do to improve the image download time reduces the download time significantly.

Being able to come up on the same host allows us to do as follows. When the deployment starts, we know upfront which instances the instance is going to come on, so while the first batch of instances is being upgraded, we go and refresh the images on every other host. When the second set of instances go down, when they come up the image is already present, so they come up really fast.

Another thing which it helps with is emergency rollbacks. An emergency rollback is a scenario when a user deployed a new configuration or a new build to every instance already, and through some monitoring or some alerting they figure out that the build is causing problems. Now, they want to roll everything back to the old build, so that's an emergency roll back. If you roll back again on the same host, then since the image is already present on the host, the emergency roll back goes really fast.


The next feature we have is what we refer to as oversubscription. What's oversubscription? Let's take an example of a host. Let's say there are two tasks running on that host with some allocation. In most cases, the utilization of that task is much lower than its allocation. The difference between utilization and allocation is what we call a slack. We have a resource estimator which actually measures this slack available on that host, and then Peloton offers these slack resources as non-guaranteed resources or spot instances for non-production batch workloads. Then, if you detect that a host is running hot, then we have a QoS Controller, which can kill these non-production workloads to reduce the resource contention on the host. As you can guess, and as Mayank [Bansal] will talk about later as well, this is one of the most important features which allows us to co-locate batch and stateless workloads in the same cluster.

The final feature I want to talk about is SLA awareness. As I mentioned, SLA is something very critical to our users, and it's defined as the maximum number of unavailable instances. Why can an instance become unavailable? There may be an ongoing update or there may be some maintenance operations in the cluster, like we are trying to upgrade the kernel or do a docker damon upgrade, or we may be just doing some task relocations, for example, defragmenting the cluster, or we ingested a bunch of new capacity and we want to spread the task on to new hosts. It's also used to reduce host utilization.

What we have built in Peloton is a central SLA evaluator, so any operation which can bring down an instance has to go through that central SLA evaluator, and it can reject the operation if the SLA is violated. For every workflow we've built, which can cause an instance to be unavailable, it has to account for the fact that its request can get rejected, and has to somehow handle it by either retrying or finding some other instance on that host to relocate, and so on. Now, I'll hand back to Mayank [Bansal] to talk about what lies next ahead for us.

Peloton Collocation

Bansal: We have built so many cool features, so now let's talk about what we are building right now. We have all the batch clusters, we have all the batch jobs running, we have all the stateless clusters, we have all the stateless microservices running. This is the thing we started Peloton with, the vision where we wanted to co-locate all kinds of workloads, so this is what we are building right now. Why do we want to do co-location? It's simple, we wanted to have efficiency gains into our clusters, and by tracking down some numbers, we've figured out that if we do a co-location, we can save at least 20% to 25% of compute resources, which will translate to millions of dollars.

What are the challenges right now with the co-location? If we are trying to do the co-location on the same machine, there are four challenges. One is the disk I/O. The stateless services are very CPU heavy, however, they write less data on to the disk, but the batch jobs which are very high throughput, are writing all the data on the disk, which increases the disk contention on the machines. Once we start writing so much data on the machine, the stateless services latencies get impacted. We need to come up with a very good disk I/O isolation story. To be frank, this is a very hard problem, and I don't think anybody has solved it yet.

The second problem we're facing on the same machine co-location is the network on the machine. Batch jobs are very much network-heavy because they need to read and write lot of data from the network. However, stateless services are very much latency-sensitive jobs, which are relying on the network to serve the traffic. If you are running batch jobs on a machine where the critical stateless services are running, you are going to impact the stateless latencies. Batch jobs on the nature are very heavy in terms of memory, and they use lot of memory so you actually dirty the kernel caches and the CPU caches, which also impacts the stateless latencies, because the CPU cannot cache instructions.

Once you're running a stateless and the batch instances together, then you need to have some kind of a memory oversubscription, because if you don't do that, then you cannot drive through utilization in the cluster because you would not have many instances running on the same machine. By looking at all these challenges, we thought maybe running on the same machine was a farfetched problem. So what we are doing right now is we're co-locating on the same cluster. We are building something called dynamic partition co-location, where we are partitioning the same cluster into multiple virtual partitions.

We create a virtual partition of stateless jobs, we're creating the virtual partitions of the batch jobs into the same cluster, and they are dynamic. You can move machines into different virtual partitions. You oversubscribe the physical resources, you say that I have more resources than what I actually have there in the machines, so you can say, "Instead of 10 CPUs, I have 16 CPUs." Then you can pack more services or more jobs on the same machine, and you move machines if some workloads get spikes.

This is how it will look like. You will have the same cluster where the stateless host and the batch host will be running, and all the services are running on the same virtual partition, however, they are packing on each other, because we did oversubscription. Right now at Uber, we probably don't oversubscribe the machines where we don't pack each instance on top of each other, and the reason for that is because those instances give more slack in the cases of spikes or failovers. However, in this case, what we are trying to do is pack on each other, and we are going to preempt jobs from the batch partition if you have spikes in your stateless partition.

As soon as we monitor that, the services are going from the green to the orange to the red zone. We are going to get machines from the batch job virtual partitions and place those services into those machines. This is how it will look like. We will monitor the cluster on every host utilization level, we'll monitor every host load level, and the latencies of each service instances which are running on these hosts. As soon as we see the host latencies are impacted, the P99, P95s are getting impacted, we are going to proactively get the machines from the batch. We preempt the batch host and get back to the stateless virtual partition.

This is how we're going to make it work. Every host will be running some host agents and there is a central damon which will run host advisor. Each host agent will monitor the host and provide all the necessary utilization levels of the CPU memory, P99 latencies for each instance, and then we calculate everything at the host advisor level, and that will tell Peloton that if these hosts or these services are running hot, you need more machines. Then the Peloton will go ahead and move machines across virtual partitions.

Peloton & Kubernetes

The second cool thing we are working on is Kubernetes and Peloton integration. Kubernetes is a widely-adopted container orchestration system. Peloton is an intelligent scheduler built for web-scale workloads. Before we did integration, we wanted to do some benchmarking on Kubernetes and Peloton, and see if it makes sense to even integrate it, or we should just use plain Vanilla Kubernetes. We have something called virtual cluster built on top of Peloton, which is open source, so you can run any cluster on top of Peloton. You can actually run a Kubernetes cluster on top of Peloton, a Peloton cluster on top of Peloton, a YARN on top of Peloton. We did that because we wanted to do all these benchmarks. Getting this many machines is very hard, so you can build a virtual cluster on top of a Peloton cluster and then you can do all this benchmarking.

We did this cluster size of 1,000 nodes to 10,000 nodes, and we ran the jobs which are from 10,000 containers to 100,000 containers. We measure two things - firstly, how much time it is taking to complete the job, and secondly, how much time it is taking to upgrade the job. These are the numbers we ran for 10,000 containers on a 1,000 machines up to 10,000 machines, and the Kubernetes numbers are way off from Peloton numbers. Kubernetes took at least 500 seconds and Peloton was scaling based on the number of nodes. As we're increasing number of nodes, the Peloton numbers are coming down here. We actually tested with 50,000 containers per job, and at that scale, Kubernetes couldn't even scale it; it was not able to finish the job.

Similarly, we tested batch jobs for 2,000 nodes with 10,000 containers, 20,000 containers, and 50,000 containers, where Kubernetes also couldn't scale and even 50,000 jobs could not run on the Kubernetes cluster. Similarly, for the deployment of the stateless for 2,000 nodes, Kubernetes was taking way more time than Peloton for the 50,000 Pods deployment, and it was the same case for the rolling upgrade.

Why do we want to do Peloton and Kubernetes integration? Kubernetes right now is a current technology, and there is a very big community building around Kubernetes, so we wanted to take leverage of the Kubernetes and the integrated Peloton with that. It will also enable Uber to leverage open source. Why Peloton? Because it meets scale and the customization needs. There is a clear path from our Mesos workloads, to move from Mesos to Kubernetes in the future.

This is how we are going to integrate Peloton on top of Kubernetes. We're going to have our Peloton scheduler which will be integrated through the API server using scheduler extension. We'll build our stateless job controller, batch job controller, and stateful job controller, which will be giving the same APIs of Peloton which we have right now, and the orchestration engine will run on top of those controllers. We'll use the same Kubernetes model to plug in our scheduler. Based on our niche, the Peloton stateful engine resource manager and host manager, we already have all the elastic resource sharing, higher throughput for batch jobs, and the rest of the cool features we have for Peloton, and we can combine both of the worlds together.

Peloton in Production

Let's talk about what the status in Peloton production right now. We have been running in production for more than one and a half years now. All our autonomous vehicle map workloads run on top of Peloton. We have integrated with Spark, we are moving all the Spark jobs to Peloton. We have distributed TensorFlow deep learning jobs all running on Peloton. We already started to move over stateless services on top of Peloton. Once we have all the co-location projects done, we start co-locating them together. As for scale, we are running more than 8,000 nodes right now, and we are running more than 2500 GPUs. We're running 3 million plus monthly jobs, with 36 million plus containers monthly on Peloton production.

This is an open source project, so this is a blog post and open source repo. Please look into it, give us feedback, and we're very happy to take contributions also.

Questions and Answers

Participant 1: You're talking about migrating to Kubernetes, and from what it looks like, you've already invested in Mesos heavily, so what would be the actual technical reasons to do this besides the community? Are there actual reasons to migrate from existing Vulcan Infrastructure? Are there are some projects that may be more hyped at the open source?

Bansal: There are two reasons. One is the community, definitely, because if you look at the Mesos community right now and the Kubernetes community right now, there is a very big gap. The Mesos community and the features which are coming out of Mesos are fewer and reducing every day. However, in Kubernetes, there is a big ecosystem around Kubernetes, and we wanted to leverage that. Secondly, we're starting to hit the Mesos scale, which is one example of the events acknowledgement that Mesos does. We have started to hit scale, so now we wanted to invest in something which would be more long-term, which people are not investing in. These are a couple of reasons.

Jindal: I can add this. One of the specific features that are causing us to move away from Mesos is Pod support. We want to run multiple containers in the same Pod and Mesos doesn't have that, and Kubelet just provides it out-of-the-box. There are a couple of other ecosystem-based features, for example, one is SPIRE-based integration. There is already an integration between the API server and SPIRE, and our identity-based authentication. Our identity system is going to be SPIRE as well at Uber. We get some of these ecosystem-based features for free. These are the two main drivers right now to move away from Mesos to Kubernetes.

Bansal: We can add these features to Mesos as well, but the point is, do you want to invest there where you cannot get more out of community, or do you want to invest somewhere where you get a lot of support and a lot of new features? These are the tradeoffs you need to do.

Participant 2: The deployment time comparison that you showed between Peloton and Kubernetes, is it the same as the differences when the image is not available on the same Pod, when the new version is coming up?

Bansal: No. We haven't added that feature right now in Peloton to do the pre-fetching.

Jindal: No, that's not the reason. What we did in those benchmarks was to eliminate that. What we did is we just did a sleep zero or just an echo batch job which didn't require any image download. The difference we're trying to measure is the scheduler throughput, and not external factors like trying to initialize the Sandbox or trying to download the image or anything else.

Participant 3: You mentioned that Peloton supports multiple clustered technologies. How hard is it going to be onboard new cluster technology, like Service Fabric?

Jindal: If you look at the architecture, there is one component which abstracts the cluster APIs from us. We require some very basic functionality from the cluster APIs, like being able to launch a Pod, being able to kill a Pod, being able to get all the hosts in the cluster and the resources, basically available resources of that host. It started with CPUs, 128 gig memory. For anything that provides those APIs, we can write a very simple abstraction, so there's a very simple damon which abstracts everything out, so you just need to replace that damon to integrate with any new cluster technology.

Bansal: There are two dimensions to it. One is the resource management piece - you can add Mesos, you can add Kubernetes, then you can add any other resource management on top of Peloton. Then there is another dimension, which is how easy it is for any orchestration engine or any workload to integrate with Peloton. We provide the APIs, so many of the orchestration engines go through APIs to run different workloads on top of these clusters resource managers.

Participant 4: If you're running only stateless microservices in a cluster and using Kubernetes, would there be any reason why I should consider Peloton?

Bansal: There are two reasons. One is the scale - if you're running very small scale microservices on top of Kubernetes, yes, you might be better off not using Peloton right now. However, there are multiple features which we have in Peloton. One is elastic resource sharing, the scale we have, the throughput we have in terms of Kubernetes. In the long-run, we wanted to also co-locate batch jobs as well as the stateless services. If you need all these multiple features like elastic resource sharing, priority preemptions, and all these scheduling primitives which are right now not present in Kubernetes, then you might want to consider a Peloton with a scale. In the long-run, if you wanted to co-locate all these workloads together, then I would probably encourage using Peloton.

Jindal: If you are running at low scale in cloud, then I don't think there's a better solution than Kubernetes. But if you are running on-prem at scale, then Kubernetes has multiple rentals, we found that we ran into multiple challenges with Kubernetes. Some of them we already discussed, and Mayank [Bansal] also mentioned them. We can discuss more offline about the specific use case you have and if Peloton makes sense for that use case or not.

Participant 5: You mentioned that the person can help an organization to move from Mesos to Kubernetes. I want to ask if we can run Mesos and Kubernetes at the same time, and with Peloton on top of that and scheduling the jobs into both clusters?

Bansal: The one direction which we haven't shown right now, we are actually working on to have in the host manager level. If you look at our architecture right here, from the host manager is the abstraction on top of Mesos, so you can actually plug in lots of other Mesos clusters as well. In addition, you can also add multiple Kubernetes clusters. We just need to write a different host manager for that Kubernetes cluster, and then you can run it on the same cluster. That's what we are actually right now working on, as well as this architecture, to seamlessly migrate workloads from Mesos to Kubelets. You can call it Peloton 1.5, and then Kubernetes 1, Peloton 2.0.


See more presentations with transcripts

Recorded at:

Sep 24, 2019