BT

Facilitating the Spread of Knowledge and Innovation in Professional Software Development

Write for InfoQ

Topics

Choose your language

InfoQ Homepage Presentations Improve Feature Freshness in Large Scale ML Data Processing

Improve Feature Freshness in Large Scale ML Data Processing

Bookmarks
28:59

Summary

Zhongliang Liang covers the impact of feature freshness on model performance, discussing various strategies and techniques that can be used to improve feature freshness.

Bio

Zhongliang Liang has over a decade of experience working in the domain of big data and large scale distributed systems. His most recent focus is on developing advanced data infrastructure for ML data processing at Meta, which powers the SOTA recommendation systems in the industry.

About the conference

Software is changing the world. QCon empowers software development by facilitating the spread of knowledge and innovation in the developer community. A practitioner-driven conference, QCon is designed for technical team leads, architects, engineering directors, and project managers who influence innovation in their teams.

Transcript

Liang: Our topic is, improve feature freshness in large scale ML data processing. I'm Zhongliang from Meta. First of all, let's talk about the scope of this presentation. We will be talking about feature freshness, or latency, but it has many aspects. The first one is training. When you train a model, there are different steps that can introduce latency. One is, how fresh is your training data? The other one is, how frequently do you train your model? How long does it take to train the model? Then, when the model is trained, how fast can you publish and roll out your models? These are the things about training. Another dimension is inference, or we call prediction. When your model is trained and is online, how fresh is your inference there? How fast can you fetch the data and do the inference? Out of all those different things, today we will just focus on the freshness of the inference data. We will not talk about other parts.

Why Do We Want Fresher Features?

Before diving into how to make features fresh, let's ask the why question. Why do we want fresher features? First of all, here we are interested in a very specific type of ML task, which is called recommendation. Not just any ML task. In many other ML tasks, freshness of the data is not a requirement, it does not affect the performance of the model. Features used in recommendation are usually time invariant functions, since they represent the state of an object at a certain time point. In those cases, model performance is a function of feature, and a feature is a function of time. Here you can see a formula that gives you a rough intuition of this concept. Usually, our goal is to minimize the expectation of the loss between the prediction result and the label. As you can see, both elements in the prediction results are time relevant. The model is trained on data at a certain time point. The features are collected at a certain time point too. More practically speaking, in many application scenarios, the states of the objects are transient, for example, the interest of the user, the timeliness of the content. Both fresher training data and fresher inference data allow the model to capture trend in more recent events, so their prediction results are more relevant. These are valid through many online A/B testing. In the ideal world, we use current data to train the model, and we use current data to do the inference, so everything is instant. In reality, everything takes time so there is latency. If you look at the graph, you'll notice we usually use older data to train the model, and relatively more recent data to do inference. In some cases, the inference data may be more stale than the training data was, something is wrong.

Now we know a fresher feature may be helpful, and we want to have a fresher feature. Naturally, we ask, you want fresher features, but by how much? How much would it affect the model performance? Can you quantify it? First of all, latency and performance is not a simple linear function. It's hard to draw the exact curve, but we have some observations, can serve as intuitions. The chart here is for the idea only, it's not scientific. X axis is latency, from left to right is higher latency to lower latency. Y axis is the model performance. The bottom line is lower and the top means higher performance. We have observed something like diminishing loss if you look at the curve. If a feature set is already very slow, the impact of adding additional delay is not very obvious. However, when we start to move from the left to the right along the curve, if we reduce the latency to a certain range, the model performance has started to increase significantly. That means there is a certain range where our model is more sensitive to the feature latency. Then there might be the question, what if we continue reducing the latency, will we see the diminishing returns or will we not see that? A lot of programs will keep increasing beyond certain points. It's very hard to answer. When the latency is too low, the system becomes very complex. It's very hard to set up a sophisticated experiment. We have different experiments with different setup, we see different results. There's something that you can only find out if you have the right experiment setup.

How are Features Produced and Stored?

Next one, let's talk about how features are produced and stored, so we can understand where to do the optimization. First of all, what are the computations used in feature creation? There are some basic operations. Transform is a function with one input and one output. Some examples are string parsing and typecasting. Next one is filter. We use filter to filter out data points based on certain conditions. Next one is join. It brings data together from different sources, when they are not in the same place, and especially in enriching data. For example, the event data usually only contains object IDs. In order to get the full data, a join with the dimension table is needed to bring the actual data to this place to replace the IDs, so you have the actual data. Next one is aggregation. We use it to aggregate the data over certain dimensions, for example, aggregate the gathered data over a time window. The common aggregation functions we use are sum, average, count, top-k. There are many of those. Beyond these, there are other complex computations that's becoming more popular today, such as model inference, where you can actually run a model and use the output as the feature. We won't cover too much of that here since the paradigm is slightly different. It might be a topic of another day. Now we know the common computations, let's see how they are used to form a data processing flow. The chart here is the most basic flow for creating features. It is the very basic setup. We can see from left to right, we have the data source. Then we do some filter and transform. Then the data is given to aggregation function. Then, after aggregation, the data is pushed to online storage. When the model actually needs to do the inference, it will do the data fetch and then sometimes it will also need to do some online computation to process the data and then eventually feed that into the model to do the inference. Along this flow in any of those steps, you can actually do joins with other data sources to bring more data, more dimensions into the data flow. Join can happen anytime.

Some people may ask how this is different from ETL. It looks very much like ETL. Let's zoom out a little bit to look at the bigger picture. This one is the overall data flow of the entire system. On the left side, you have the applications. Applications have data. They're logged in ETL. They're put into offline storage. Some are put into message bus. Then the data will be fed into the system. We previously mentioned, the join transform aggregation system. The data is processed there, and then they're forwarded to feature storage, and they're sitting there waiting for the model inference to happen. This is usually the entire data flow. On the left side, the logging and ETL part, these are common preprocessing of the data. The preprocessing result can be used by ML applications we're talking about today, but they can also be used by other applications such as business intelligence, and other places that need data. These are shared upstream data processing. Here, we're talking about something that we call feature engineering that's focusing more on the complex last mile data processing. The complex part is, we have more complex compute patterns, and we have more specific data format. We have more interactions between different data sources, meaning different joins. We have more optimization on different stages of compute and storage. It's more complex, it's more ML specific. Each of those steps, they can introduce latency. We've looked at the compute. Now let's look at how the data are stored, how the features are stored logically. Features represent the states of an object. Feature data are usually indexed by the primary key, which is naturally the key of the object. For example, the object can be a user or a video or a post or any other object. All features under the same primary key are usually stored together. Under the same object, each feature can also have a feature key.

Different Data Infra and their Latencies

Let's talk about latencies. One dimension of how to provide the right latencies expectation to the features is to pick the right infra for the right task. First of all, let's look at the different infras. In the industry, there are different data infras that provide different latency expectations. There's batch processing, which is usually in order of hours or days. The next one is streaming infra, streaming processing. They do processing in the order of minutes or seconds. The last one, there's also online service. You can actually do data processing with online service as well, it's quite common. The latency expectation of online service is usually in order of milliseconds, or seconds.

Next one, let's look at this one, one by one in detail. The first one, batch data processing. What are the features that are most suitable to be processed by batch data systems? These features are usually ingested into data sources daily or hourly. The data scale is usually too large or involves complex computation. They don't usually change, or the models are not very sensitive to those changes. The last one is, they probably cannot be effectively computed in other infras with lower latencies. They are mostly efficiently computed in batch data processing systems. For example, Apache Spark is one of the most commonly used. Next one, streaming infra. What are the features that should be streaming features? These are usually streamed time series events. They are usually very transient, sensitive to latency. They don't have a lot of joins between data sources, and they don't need very complex aggregations. Because if they do, they will either come with very high performance penalties in streaming infras, or simply too difficult to do in today's industry systems. The first step of getting the right latency is to pick the right infra.

Fine-Tuning the Infra for Better Freshness

Next one, we've covered the basic setups, let's see how we can get into fine-tuning of data infra to further improve the freshness. Here, we are specifically talking about how to fine-tune streaming infra. Look at this one. This one is the basic setup, is the very basic form of streaming aggregation, streaming data processing. This is the case where aggregation is completely done offline. If you look at the chart, from the left side, you have the data source, you do the transform, and then you do the aggregation. All of this happens offline. You can also do joins offline too. Then you can see there's a dotted line which is the separator between offline processing and online processing. After aggregation, the processed data are pushed into online storage. They're sitting there waiting for model inference. When model inference happens, they fetch the data online, and then do some computation if necessary, and then do the inference. This is the baseline.

Next one. This one is a variation of the basic setup. In this case, we're doing something called semi-online aggregation. We are splitting the offline aggregation into two steps. Part of it is offline and part of it is online. You can see the separation line is drawn on the online storage, so how this works. The transform events from the left side are first pushed into online storage, then the online storage will do an infrequent self-aggregation of the events and produce a partial aggregation. The partial aggregation and the raw transform events are stored together in the online storage. Then, when model inference happens, the online service will trigger a on-demand online aggregation where it will fetch the preaggregation results, the partial aggregation, and the most fresh transform events, pull both parts online, and then combine them together online to do a full aggregation. With this design, the full aggregation is done online and has lower compute cost, because some of the aggregation is already done partially offline. This one will both be able to do the full aggregation at a lower latency. It'll also have the most fresh events captured in the full aggregation. This way, we're able to provide a lower latency overall.

The next one. This approach is a little bit more forward thinking. Let's step back a little bit. Ultimately, what are we doing with feature engineering? Why do we do so much aggregation, data processing? We are basically applying human knowledge on data to preprocess data into some format that's easier for a model to perceive the relevance. Basically, all we're doing here is baking human knowledge into the data. That's what feature engineering is all about.

Ask ourselves, what if? What if some of the traditional data aggregation can be replaced by model itself? What if a model can learn the association between data points, without the need of traditional aggregation at all? That's the basic idea, which is shifting feature engineering into the model, at least part of it. This is already the case in computer vision and NLP fields today, because in computer vision and NLP, you don't have to do a lot of data aggregation outside the model. Most of those are already done as part of the neural network in the first few layers. It was difficult for recommendation a few years ago to do this when models were less capable, but a lot of things are changing very fast today. This idea is becoming more real recently.

Other Common Things to Consider

We covered the different data infras. We covered some fine-tuning of streaming infra. Next, let's look at some other common things to consider. Number one, perf tuning. Pipeline performance tuning especially Spark. We found perf tuning especially useful in many cases. You will usually get some low hanging fruits if your dataset is large enough. We've seen pipeline runtime going from tens of hours to a few hours, in some cases. There are so many different Spark tuning techniques online. The general advice is, before adding more machines or upgrading your cloud service tiers, try perf tuning first and it might work better than you think. The next one, invest into data flow management. This one is often undervalued. Your data pipelines may frequently break. When the pipeline is broken, your model will get stale data, or no data at all. Your maintenance overhead is proportional to the scale of your workload. In this case, a good management system becomes very important. First of all, you can't fix what you can't see. Try to build good observability into the system. The second one, automated monitoring and alerts is quite basic but this one's really hard to do right. The last one, try to automate the fixes. Some of the basic fixes are, for example, just do a retry, may help fix the problem. Codify the basic remediations and automate them so your engineers don't have to wait and investigate and apply the fix. If you're able to do all these to automate your management, you might reduce the level of latency just due to breakage and overhead introduced by human labor work.

Next one, let's look at sharding offline storage in the pipelines. When you have a few hundred features and a few hundred data sources or even more, how to properly group them is a big problem. When storing your features in offline table for the same primary key, you can choose to use the white table to store all the columns together, or you can slice them vertically into multiple tables. Similarly, when computing features, you can choose to put more features into one pipeline, or divide them into multiple pipelines. Fewer tables or fewer pipelines are easier to manage, but the coupling means if part of the system is slow, or the data is corrupted, the entire thing will slow down. On the other hand, dividing them will create management overhead because you have more things to manage, but it will reduce the interdependencies. Breakage or failure in one part of the system will not propagate to other parts of the system.

Next one, let's look at sharding online storage. This one is about feature grouping in online storage. When your features are computed, they will be stored in online storage for model inference. One object can have many features, and the features are indexed by the object ID as the primary key. Usually, we will have more than one model. That's the common case. Not every model will use all the features. Some models will use more features, some models will use less features. When your model needs to fetch features, we will usually say, give me all features for model M under the object X. This is the common query. We'll be using feature fetch. Then we will find the features under object X. Then we will take features used by model M, which is a subset of all features for X. There is a choice of how to partition other features under object X. We can either store them all together or add another secondary key to shard the features into smaller groups. Consider this example, two models, model 1 and model 2. Model 1 uses a small number of features and model 2 uses a large number of features. There are some overlap, but one is not a subset of the other. If we store our features together under one key, under object X, model 1 may fetch more features than needed all the time, as you can see in the bottom left chart. There is overhead of network I/O as a result of that. In the other case, if you look at the bottom right chart, if we shard the features into multiple groups, each with k features, then model 2 will always need to do a fanout. This one will add latency too. What is the best sharding strategy? Should we use a single big group, or should we use multiple small groups? There is no right answer, because this depends on the client side, how the models will use features. What you can do is, the system can open up the sharding strategy as an API, and the client side can configure it based on the query pattern.

This one is the last tip, let's look at this chart again. Many times, we tend to look for things we want to see, and we tend to fix problems we understand better, or we think we understand it better. We tend to ignore things we care less. We may say, ok, we understand the transform compute very well, and maybe we can upgrade the technology, and then make this part run a lot faster, and we'll have lower overall latency. Later on, we found that that's actually not the bottleneck, the bottleneck is in a different place, where uploading the data to online storage actually takes longer time. There are many real-world examples. Say, for example, Spark is something that's widely used in the industry. There's lots of open resources, lots of discussions shared online. A lot of engineers are well aware of those technologies, so tend to look at that part more. Some of the parts in the entire flow are not using open source, those were built by one or two engineers years back. They are less maintained. Fewer people looked into that, understood that part less. People tend to ignore that. Many times, we found it's not the big parts, the parts that catch attention that's actually the bottleneck, it's actually the smaller parts that's running there, quietly, nobody looked at, that's the slow part. That's the part that actually needs an upgrade. The actual upgrade is probably not that difficult. It's just that nobody looked at for a very long time. Something to remember.

Takeaways

Number one takeaway, freshness and model gain is a non-linear function. Sometimes there is a sweet spot, finding the right optimization range will give you the best result. Number two, freshness is not free. It comes with a cost in other forms. For example, power consumption, system complexity, or other things. Freshness is a spectrum. Not all features need to be at the same level of freshness. Because feature importance, compute cost, time sensitivity, these are all different. Remember to optimize for overall ROI, instead of just latency itself. Number three, look at the things end-to-end, instead of just focusing on one single component. More powerful infra, newer technology may not always be the best answer. Sometimes going back to the fundamentals will actually yield better results.

 

See more presentations with transcripts

 

Recorded at:

Jan 25, 2024

BT