Transcript
Thomas: My name is Sherin. I'm a Software Engineer at Lyft. Today I will be talking about how we incorporated streaming into our machine learning platform and why we need it. All the hoops we had to jump to get it to work properly for us. How many of you have used or have heard of Lyft before? How many of you have used any other ridesharing service, Uber? For those who are not familiar, Lyft is a ridesharing service that connects passengers with drivers through our platform. When you use the Lyft app, when you want to go to some place, you open the Lyft app. You enter your destination and Lyft gives you a few options to get there. You can take a shared ride. You can take a luxury car, and gives you an ETA, a price estimate, and so on. Then when you pick an option, Lyft platform matches you with a nearby driver who takes you to your destination.
As you can imagine, in this whole interaction in the world of ride sharing, there are a lot of decisions that depend on the most recent state of the world. For example, ETA calculation would depend on traffic, if there is a road closure or an accident that may impact the ETA. Pricing depends on supply and demand. Demand can vary a lot based on the time of the day, based on similar things like weather or traffic. If there's a train cancellation, there may suddenly be a higher demand for Lyft, for example.
To solve all these problems, we use machine learning to make predictions about what ETA should be, pricing should be, and so on. Machine learning algorithms are only as good as the data that is used to score the algorithms. That is where streaming comes in. Apart from that, there are a lot of incidences like fraud as well that we want to fight, where real-time information is super critical.
Stopping a Phishing Attack
Talking about fraud, I want to tell you a short story, a story about Alex and Tracy. Alex is a driver on Lyft. He got a ride request from someone. He is on his way to pick that person up. While he is driving, he gets a call from Tracy. Tracy says, "Alex, you are an amazing driver on our platform. We want to reward you. We want to offer you $200." Alex is obviously really happy. Then he carries on the conversation. Tracy says, "I see that you are in the middle of a ride. You're going to pick someone up. Let me go ahead and cancel your ride from the back-end, so that you can pull to the side of the road and we can continue with this conversation." At this point, Alex is thinking, "Tracy has to be from Lyft if she can cancel my ride from the back-end." He trusts her. They continue with this conversation. At this time, Tracy says, "Wait, before I give you the $200, I want to verify your identity. I will be sending you a verification code. Can you repeat that to me?" At this point, you all know where this is going. Tracy is not really a support agent. She's an imposter. Our poor, gullible Alex gives her the verification code. Now she has access to his driver account, and she can do quite a bit of damage with that. She has access to his bank details. She can take his earnings, and stuff like that. We want to combat these phishing attacks in real-time. A thing with fraudsters is they keep on changing their schemes and we want to evolve and keep up with that.
To solve this problem, research scientists at Lyft, they figured out that there is a pattern to this. There is a pattern to all these fraudsters. If we can fingerprint, if we can find these patterns in their behavior, we may be able to predict when a fraud is going to happen and combat that in real-time.
For example, in this case, Tracy was actually a passenger. She requested a ride. Then she made driver contact. In the Lyft app, once you are matched with a driver you can call the driver through the app. She did that. Then she canceled the ride. Taken together, these indicate a weird sequence of events, which doesn't occur normally. This is a fingerprint that can tell you a little bit about whether this can be a potential for fraud. Our research scientist decided to turn to deep learning. They wanted to use convolutional neural networks to find these patterns and act on that.
This is the input that they wanted to give to the convolutional neural network. They wanted to get the last x actions a person took on the platform. Historic context was also important because just one-off action doesn't tell you as much. They wanted to look at what the person has been doing in the past. They needed event time processing. All these actions needed to be taken in the context of when they actually happened. This is an example of a situation where real-time features are super important.
For this and many other such use cases, we wanted to make it really easy for our research scientists to be effective at their work and make a generation of streaming features super easy. When we started thinking about building such a platform, the first step was, what would we use as a stream processing engine? What would be the underlying engine? For that, we turned towards Flink.
Apache Flink
Apache Flink is an open-source framework. It provides low latency, stateful computations on streaming data. You can achieve latencies in the order of milliseconds through that. Event time processing is supported, which was really important for us. What event time processing gives us there is replayability, first of all. You get the same results, no matter how many times you play the algorithm on the same set of data. Exactly once processing for correctness, so no matter how many times the data is retried, there is exactly once processing. Then failure recovery was also important. In a distributed system, anything can happen. Things go down. We want our system to gracefully recover from that. The other thing was ease of use. Flink provides a SQL API. Stateful computations can be specified in terms of SQL, which was really desirable for us.
At Lyft, whenever any action happens on the platform, whenever a user takes any action, events are generated. I was listening to Benjamin's talk earlier, and he described events in a really excellent way. Events are something that happens. In this case, for example, someone took a ride at time t0. That's an event. These events are generated on the platform. Our event ingestion pipeline ingests all these events. These are streamed into multiple Kinesis streams, and also persisted in S3 for offline computation, just persisted for historic reasons.
The other important thing I want to talk about is why event time processing is important and why it's tricky. In a streaming world, why it's tricky to understand the difference between processing and event time. Processing time is the time when your system observes an event. This time is maintained by the processor itself. When a processor looks at the event, it looks at the clock and that's the processing time. Event time is the time at which an event actually happened. This is supplied by the event. Your event will come with a timestamp in them.
In a perfect world, when there are no latencies, everything is great. Our events would be somewhere on this axis. There is no latency between event time and processing time. In reality, it will be something more like this 9 over there. This 9 has an event time of 12:01, but a processing time of 12:08. Our system saw it seven minutes later. Then things can be completely out of order. Complete madness, all out of order. Star Wars episode 4, 5, and 6 came first, and then 1, 2, and 3. What do you do in such a world where things are out of order and there are unpredictable latencies.
Integer Sum over 2-Minute Window
Suppose we want to calculate integer sum in 2-minute windows, suppose that's the problem you're trying to solve. In a batch world, suppose you're running a MapReduce job on data, you run that on complete dataset. Before you start the program, your data is fully available. In that situation, it's easy. Our MapReduce program starts looking at the data, and then it slots everything into windows. Then it produces the results once it has seen all the data. You're sure to receive correct results. In a streaming world, data is never complete. It's an infinite constantly arriving set of events. How do you know how long to wait or when to finish processing, or commit these windows?
For this, the data flow model which was introduced at Google, they came up with this thing called watermarks. Apache Flink is actually based on the data flow model. The simplest way to understand a watermark is a watermark is the way time flows through a streaming system. Watermarks are markers that are in your stream, along with your data, these watermarks will flow through. When the system sees the event, a watermark, what it tells the system is, you can stop waiting for events having timestamp smaller than mine. When the system sees this watermark, 12:02, it knows that I should have received everything having a lower timestamp than this. If there are any windows that need to be committed, I can go ahead and do that.
Our system is now in the streaming world. Whenever the system sees the watermark, it commits results then. We have low latency. The system doesn't have to wait endlessly. As a developer, you can choose what your watermarking strategy should be. If you know that your p99 latency is 5 minutes, you can account for that when generating watermarks. Now we get low latency. We get results as quickly as possible. Still, data can arrive late. If your watermark strategy is not conservative enough, or by an off chance when there are backfills happening and data is coming in late. It can still get missed out, like that 9 over there. Flink has a remedy for this too. In Flink, you can re-trigger windows when late data arrives. You can trigger windows once when the system sees the watermark. Then when the late event arrives, it can re-trigger it. You have low latency with eventual correctness. This was really desirable for us because we wanted eventually correct data. We also wanted our system to get results as quickly as possible, even though they were one-off.
Usability
With the engine taken care of, the second thing we started thinking of was the usability. Our users are data scientists and research scientists. We started thinking about what do they care most. They care most about model development, about feature engineering, data quality, and probably the least, about figuring out compute resources dealing with all the things related to distributed systems. Often that becomes a bigger chunk of the whole problem. They end up spending a lot of time on data collection, data discovery, schematization, compute resources. We wanted to abstract all that out for them. Then we also started thinking about what a typical machine learning workflow looks like. A bigger part of the machine learning workflow is the data prep. Finding out what sources are available, cleaning it up, generating features, and only after that comes the modeling and training and evaluation. The data input and data prep is often the most complicated and time consuming aspect of doing anything machine learning.
Dryft - Self-Service Streaming Framework
With that in mind, what came about was something that we called Dryft, because every project at Lyft has to have a Y in its name. This is our self-service streaming framework. The user plane, which is the only thing that the user interacts with, is a simple UI that accepts a declarative configuration as input. The control plane does all the heavy lifting of data discovery. It does the query analysis. It figures out how much resources do I need to run the streaming program. It does all of that. Then finally, when features are materialized, they are written to several different places, to Kafka, to DynamoDB, which is our main feature storage, Druid for analytics, Hive, and Elasticsearch.
The user only needs to give us this. It's as simple as a one page declarative configuration. Users can specify their logic in terms of SQL, which was great, because Flink SQL API is really powerful. Almost 90% of our use cases can be expressed in terms of Flink SQL. Then they give us a simple job config, which contains metadata about the job, what the retention policy should be, where the features need to go, any versioning information, and so on.
This is what a general ecosystem of our streaming service looks like. The blue box on top, that represents a user's application. When the user gives us the declarative configuration, and when they hit submit on the UI, the user application starts. It starts consuming data from the data sources, applies the logic, writes to a Kinesis stream. Then a separate feature fanout job reads data from this Kinesis stream and materializes it to all the sinks where it needs to go. The reason why we had this two-step process is so that user applications can run completely agnostic of where the data needs to go. That way, our sinks can evolve irrespective of what's happening with the user app. The feature fanout job actually does the work of any transformation, manages throughputs to sink. It does everything that's needed to materialize the data.
When we were building this, we also decided to eat our own dog food. The feature fanout jobs are also written using Dryft's declarative configuration and runs on Flink. This makes it really easy for us because now, anyone, even in the middle of the night, we get paged, "You're writing too much to DynamoDB." Our SREs can go and quickly change a config, lower the write rate, fix any small parameters and quickly update the job, one touch. It's really easy, no coding required. Any transformation can also be expressed in terms of SQL. Even though these feature fanout jobs are stateless, we can still use SQL for that.
Let's talk about deployment. This is where things got really interesting. We went through several roadblocks to figure out what worked for us. Previously, until about a year ago, our platform ran on AWS EC2 instances using our custom deployment. There was basically one big cluster. There was one job manager cluster. The job manager's only job is to submit jobs, or create tasks and submit it to another cluster, which is a task manager cluster. All our jobs, and there were 60 jobs running on the platform at that time, they were all running on the same cluster. This is what it looked like, so all the jobs in one cluster. If one job takes down the whole cluster all the jobs are down. It was multi tenancy hell.
Kubernetes Based Deployment
At this time, Kubernetes came to our rescue. With Kubernetes, now we deploy each application in its own separate cluster. It has its own dedicated job manager pods, and task manager pods. There is separation of concerns. The problems of the multi tenancy architecture are removed. Because it was running on one cluster, compute resources were manually computed. We would anticipate, by next week, we might have 60 jobs. If each job needs a parallelism of 10, maybe we need 600 instances. We would do these manual calculations. When we deploy we used to ask for 600 instances, which is obviously not a great use of our time or a good model.
Flink-K8s-Operator
To solve all these problems, we built something called a Flink Kubernetes Operator at Lyft, which is available in open-source. With Flink Kubernetes Operator, you can think of it as a cron job. You provide it a custom resource descriptor. This Kubernetes operator is constantly polling for custom resource descriptors. It looks for any new custom resource descriptor or any changes, and it basically materializes whatever your desires that you write in that custom resource descriptor into a cluster.
A custom resource descriptor looks something like this. In one page, you can tell the Flink Kubernetes Operator what your cluster should look like. I can say that my cluster will be running a Flink application. I can give it an image name, and my Docker image will contain all the dependencies. This is my task manager configuration. I want all my task managers to have 15 gigs of memory for CPUs, and so on. A Flink Kubernetes Operator will read that and just make it happen. If I go back and update this over here, change the CPU to 1. I don't need to do anything because the Flink Kubernetes Operator is constantly listening to these changes. It will listen to these changes and it will update it.
This is what our entire deployment step looks like. The user writes a declarative configuration. Our control plane validates this configuration, does query analysis. Based on the query analysis, it figures out how much memory is required, how much CPU, what the parallelism should be, and it generates the Kubernetes CRD. From there, the Flink Kubernetes Operator takes over and it deploys the application and everything works fine. All that is completely isolated from the user. With Flink on Kubernetes, each Flink application runs on a separate cluster. We were able to scale to hundreds of Flink applications and automatic updates as well are supported. The best thing is resource calculation and allocation is done on a per job basis.
Bootstrapping
Let's talk about bootstrapping. This was another big problem that we spent a lot of time trying to solve. First, let's understand what is bootstrapping? Let's look at this simple program. Here we want to count the number of rides a person took over a 30-day period. The point to note here is this is a streaming application, which is reading streaming data. It wants to count the number of rides over 30 days. Suppose I start the program today, and it starts accumulating data into states. For this job to correctly answer the query, it needs to run for at least 30 days to have enough data to be able to answer this query correctly. That's definitely not a good solution, because what if after 30 days, we realize that there was something wrong with the logic and we need to update it. That's definitely not a scalable solution.
What you can do is we can bootstrap the first state looking back, using historic information, and then we keep on updating the state as new data comes in through our streams, which is great. In theory, it works really well. The problem arises when the sources that you read from do not have 30 days worth of data, which is often the case. Kinesis has a maximum retention, I think, of seven days if I'm not wrong. With Kafka, also, it's really expensive to have really long retention policy. Unless you have an infinite retention source, it's not possible to get all that data from one source, which was the case with us.
What we did was we decided to read from two sources. We would read real-time data from Kinesis, which has a retention policy of seven days, and everything else from our S3, which is our historic source. We retain data in S3 forever. In order to remove any duplicates, when they are unioning these two streams we use target time as a bridge, when you're unioning this data. This works great. This is what we currently use. This is our bootstrapping strategy.
One thing to note here is when the job starts, when it is bootstrapping, it is reading 30 days worth of data all at once from S3. We see this huge spike in the number of records being processed. Once the bootstrapping is over, and it's just processing real-time data, it's now catching. In real-time, obviously data is not being read at the same rate, all at once. The problem with this is you can clearly see that during bootstrapping, it needs a lot more resources to do its job. Bootstrapping time can vary based on the job it is. Either we could provision the job for the maximum, whatever is required for our bootstrapping, which is not great, which is a really costly option. Or, we had to deal with it in some other way.
What we did, we would start the job with a really high parallelism, so that it has enough resources to go through this huge backlog of historic data. Then our system would automatically detect when bootstrapping is over. We do this based on watermarks. Remember the watermarks that I talked about earlier. Our system would keep on listening to the watermarks to see how time is progressing within the streaming system. When the watermark approaches somewhere within a reasonable limit of real-time, we realize that bootstrapping should be over. At that time, we update our CRD with fewer resources so that the job update takes place. Then our job can proceed with fewer resources, so we save on cost.
Another thing that we had to deal with was the output volume spike. Because during bootstrapping, our job is reading a lot more data, so understandably, it's also producing a lot more data. How do you manage throughput in that situation? Do you over-provision your sinks to handle that huge spike during bootstrapping? That can be a strategy but that is again really costly, over-provisioning for a short bootstrapping period.
What we did was we realized that, for us, feature needs to be fresh, but eventual completeness is ok. Instead of over-provisioning our sinks to handle that really high spiky throughput during the bootstrap phase, we decided to write the data produced during bootstrap into a separate Kinesis stream. Then the feature fanout job that reads from the Kinesis stream would smooth it out and write it over a longer period of time. Now our sinks do not have to be over-provisioned to handle that throughput because they are just receiving results in one rate. The results produced during steady state go through a high priority Kinesis, and they are written to an idempotent sink. It's important for the sink to be idempotent because there are two different sources now writing data to it. You need a way to make sure the sink knows how to decouple that.
That is all well and good. Two obstacles over, another third thing that we had to deal with was the time skew. That was a really interesting problem that it took us a while to figure out. Let's think about what is happening during bootstrapping. Our program is reading from two sources at the same time. We are getting really old data from S3 and we are getting fresh, brand new data from Kinesis. Because we union it, all these data is going on and updating windowed state. Suppose our window is 2-minute windows, with all these older data is updating older 2-minute windows, new data is coming in and updating new 2-minute windows. The challenge arises because of the way watermarks flow through the system.
Watermark is a way time moves through a streaming platform. Watermark will be based on whatever the oldest data is that is being processed right now. If your system is still processing old data, the watermark will not proceed. Time will not move forward until it is done with that. Because watermark is not proceeding, all these windows are waiting open over there because they are only triggered once based on watermarks. What we saw was the state size explosion. Our memory usage just went up because of all these open windows that are waiting for a watermark to proceed.
To solve this problem, we built source synchronization into our sources. The way source synchronization works, each source task that is reading from either Kinesis or S3, they would publish their watermark, their view of the time in the system to a global state, which is accessible to all the consumer tasks. Each consumer task, when it's reading data from a source, it puts it in a priority queue. Then it looks at the global watermarks, and it makes a decision. It looks at the data that it has and it questions itself, "Based on this watermark, is this data going to be used soon enough? Is my data too new or is the watermark too behind?" If the global watermark is too old, it means that I might forward this data to the other task and the state will be sitting there and not be used. The consumer task just pauses over there. It pauses. It keeps on polling the global watermark state and it waits for time to advance in the system. When time has advanced beyond a certain point, at that time it starts proceeding with that data. It starts reading from the sources again. Because of this problem, because of this waiting that we introduced, now our sources wait to produce new data. The time moves forward, and the issue that we saw with memory was removed.
With all these improvements, we were able to scale the system and now we have about 120-plus features that are being produced by our system. We write features to DynamoDB, and several other sinks. The time to write, test, and deploy our streaming application has reduced considerably. Our research scientists can write, and test, and prototype, and run, start out streaming jobs in half a day. Our p99 latencies are 5 seconds. Ninety percent of our use cases are solved by Flink SQL, but not everything is. We are looking into Python support as well. For that we are currently looking into Beam, so that may be coming up.
Questions and Answers
Participant 1: At the beginning you said you had a pattern when you wanted to find some frauds? Did you find this pattern through machine learning? What's the reason for machine learning in this case?
Thomas: We produce batch features through Presto and batch queries for training the machine learning algorithms. The streaming features are actually used for scoring it. The way it works is, once our machine learning algorithms are trained and they are deployed in production, we have something called a model executor. The model executor knows what the model is. Then it needs features to score, or as input to the model to make a decision or make a prediction. That is what the streaming features are used for. For that, we needed really instant real-time information. This particular feature would look at last 2000 or so actions and with timestamp. In this case, I'm not completely sure about what the actual wait finds this pattern is. This is the data that it would use as an input in production to make the prediction.
Participant 2: When you're doing the historical backfill, and then you're going to reconfigure the job and start it again, do you ever run into issues where your job stores a lot of state, and that savepoint might take a long time, or have issues? Are there any challenges around that transition?
Thomas: There were two issues that we saw with that update. One thing is with Flink, whenever the job graph changes in any way, suppose a new operator is added, something like that. At that time, savepoints become incompatible. That was one reason why we kept the job graph the same. Even though in steady state, the S3 operator is not doing anything. It's not reading any S3 data. Just so that we could keep job graph the same, we basically keep the operator around. That was one of the issues.
Related to state, not really, our main issues with checkpoint size was the data skew. Because of that time skew, because of that our checkpoint sizes used to be really high, and then checkpoint alignment would take a long time. Then we saw a lot of issues around that, especially for really high volume events.
Participant 3: How do you deal with schema changes? If some of the people actually changed the SQL Query directory executing?
Thomas: Suppose there is schema changes that is actually a breaking change, it completely makes savepoints incompatible checkpoints, and the job cannot proceed. The way our features are stored in DynamoDB, they are namespaced under a feature name and the version. Suppose I have a feature that's counting the number of writes. Suppose the version 1 is running with schema 1, and I know that the schema change is going to break this. The way we do it is we re-bootstrap a version 2, which is written under a completely different namespace. Then we automatically cut over the clients. Once it has fully bootstrapped, and caught up, and it has picked up all the updates, we just cut over to the new version.
See more presentations with transcripts