BT

Facilitating the Spread of Knowledge and Innovation in Professional Software Development

Write for InfoQ

Topics

Choose your language

InfoQ Homepage Presentations Building a Large Scale Real-Time Ad Events Processing System

Building a Large Scale Real-Time Ad Events Processing System

Bookmarks
41:21

Summary

Chao Chu provides insights and practical knowledge for building streaming pipelines for an ad platform.

Bio

Chao Chu is a back-end engineer at DoorDash. He is working at the ads foundation team focusing on ad event pipeline and ad exchange service. Previously, he worked at Morgan Stanley where he helped build the Fixed Income Risk Infrastructure platform using Scala. He is passionate about building large scale distributed systems.

About the conference

Software is changing the world. QCon empowers software development by facilitating the spread of knowledge and innovation in the developer community. A practitioner-driven conference, QCon is designed for technical team leads, architects, engineering directors, and project managers who influence innovation in their teams.

Transcript

Chu: I'm delighted to present to you our journey of building a large scale, real-time ad event processing system here at DoorDash. My name is Chao Chu. I'm a software engineer at DoorDash, one of the founding members of the DoorDash ad scene. DoorDash is an online food delivery platform, trying to connect consumers with local restaurants and stores. It facilitates the delivery to layout design destinations. It started about two years ago. Right now, our ads system handles hundreds of billions of impressions, processes and attributes millions of ad clicks and orders on a daily basis.

Outline

I'd like to introduce what an ad events processing system is and how we started building this. Following that, I will give you a highlights overview of our DoorDash ads platform, giving you a sense of what we have built and the key functionalities. Next, we would zoom in to the main topic where I will walk you through our end-to-end ad events pipeline, highlighting the various components involved. Then, we will dive deeper into how our ads attribution works, how we unify our budget tracking for different types of ad campaigns, and also our reporting infrastructure.

What Is an Ad Events Processing System?

What exactly is an ad events processing system? With every ad served, meaning an ad presented to a user, there will be user interaction events like impressions and clicks. The responsibility of an ad events processing system is to manage the flow of events, handles and processes events with every ad served. Do the data validation and aggregation, attribute them to orders and other types of conversions. I mainly want to share our experience of developing such an ad events processing system, focusing on how we utilize the open source tech stack to satisfy the business requirements. So far, it's more like a 0 to 1 journey. It's far from perfect yet, but satisfying all our business requirements. They continue growing very fast.

DoorDash Ads Platform

Our ads platform's mission is to enable merchants to reach their customers at the critical moments of decisions throughout their shopping journey at DoorDash, whether it's decide what food to buy, what laundry items to choose. Right now, we have three main ads products. The first is banner ads. Banner is a placement that promotes the product or category offerings, deliver important announcements to the customers, and also important updates of the platform. It's the place that draws most attention from our app. We also have sponsored listings, which are ads for store on top of DoorDash search and the cuisine results. We also have sponsored products. It's the digital shelf space equivalent for consumer-packaged goods. Convenience stores like Albertsons or 7-Eleven, they use these kinds of campaigns to boost the sales of items.

After getting a sense of our ads products, let's see what our ads platform offers. Here is the typical ads lifecycle. It started with campaign creations. Merchants and brands use our tools or APIs to create ad campaigns. Once the campaigns are created, the serving path will retrieve the ads candidate, select the relevant ones for the current user or the consumer. After the ranking and the auction, the best candidates will be picked up through a fair competition. Together with the organic results, the winner ad will be presented to the user. Based on the user interactions, we will be able to measure the performance of the ad campaign. From the previous lifecycle, we can see how the data flows through our session. The write path is about campaign creation and indexing. Once they are properly indexed, the serving path could then retrieve those efficiently and present it to the user after running the options. The user will interact with the ads served, this is while the reverse path kicks in. We need a full-fledged ad events processing session. The conversion events will then be further published to the billing paths and the reporting paths as well.

Ad Events Pipeline

Here is the high-level ad events pipeline architecture with all the components included. I know it looks a bit complex, but let's group them and explain them one by one. The first part is about ads view and click process. We use all impressions, it's what goes through this user sessionization process, and produce the ad views, which are the views, events with ad metadata. Similarly, the clicks, where it goes through the click processor components, and produce the ad clicks, which is the ad clicks with ads metadata. After the first part, we're going to the ads transaction generator where we have three different flows. We usually have three different pricing models for different ads products. We have CPM, CPC, and CPA. For this part, you can imagine after all these different flows, they will generate what we call the ads transactions. The pricing models basically defines how we generate a billable transaction record that we can charge our advertisers. These transaction records from the different flows and the different ad campaigns, are going to this budget controller part, where we have a larger component to write this transaction record. The store has CDC built in. The budget cap job will enforce the budget capping based on the campaign's budget configurations. We also have spend generator component, which is to accumulate the budget spent and provide the real-time budgeted spend information for the serving path.

Before we explain the detailed flows, let's take a step back to see the tradeoffs we have made about what we choose to implement this architecture and design. Here is the comparison with the main factors to consider between batch or streaming processing to implement such a data pipeline. As we can see, batch excels at the cost and accuracy aspect, whereas streaming processor could achieve low latency. From the ads systems business requirement, why do we need streaming processing? Streaming processing can enable timely ad delivery based on the user preference, also continuous campaign optimizations. It also reacts fast to the campaign updates. For example, we can choose to increase the budget with the spend, or we can improve our bidding strategy to achieve better performance. Real-time processing could also enable timely attribution, which will in turn provide us accurate budget spend signals. With real-time budget signals, the serving path could implement more accurate and effective pacing strategy, which can help us to efficiently distribute the spend among the campaign's lifecycle to achieve best marketing results. Modern stream processors like Apache Flink made the choice easier because it's capable of computing consistent and accurate results with these event time semantics. Also, it's exactly once, state consistency guarantees make the fault tolerance much easier to handle in case of failure. Autoscaling of the Flink jobs can also offset the cost impact comparing with the traditional streaming processing approach.

Now let's go deep into the end-to-end events pipeline. The first is the ads view and the click ingestions. Within DoorDash, we have our own in-house events tracking session. We have the full control of all these events injection, which makes the data quality reliable by the time they landed into our ad reverse path. We also have the real-time monitoring of the ads metadata for the injected events. We can make sure whenever we have a new or client-side release, there is no regression about the ads metadata quality. Once, after the events got injected based on the different types of the campaigns I introduced earlier, it will go into the different processing flow. The first is the CPM flow, cost per mile. The banner ads use this pricing model, meaning we charge certain ads fee for every 1000 impressions. For example, we charge $3 for every 1000 impressions. The processing database processes the raw impressions events, and then publish it to the CPM count job, which is to accumulate the impressions events. Whenever we saw 1000 accumulated, we emit a CPM record, representing the transaction record. What the user session management does is to click unique events per user session. What is a user session? There were different definitions based on the different requirements. What we emit here is a 30-minute rolling window session, meaning, for the same user of the same placement, if he or she really wants, if the next time he viewed the same place, it won't be counted unless it's more than 30 minutes.

In order to do this, the building blocks of Flink can help us easily implement such logic. There will be two levels of dedupes. The first level is dedupe based on the event ID itself, because we are using Kafka's at-least-once delivery semantics. The exactly same events on the consumer side, we might receive it more than once, so we just use Flink MapStates for each of the interaction events, and the dedupe based on the ID of the event. We also have a ValueState to record the last interaction event timestamp. Whenever the next event from the same user from the same placement arrives, if it's within 30 minutes, we just extend or refresh that recorded timestamp. Otherwise, if it's more than 30 minutes, then we consider that as a new session started, generates a new record only in this case, and update the ValueState with the new event timestamp. The timer could help us to easily have this logic implemented, all this Flink state and the timer they are part of the checkpoint mechanism. For fault tolerance, even if the job got crashed, when it's restored from the checkpoint, all this is Flink state and the timer will be restored. When we count the impression events, we also use Flink's AsyncDataStream which is used to enrich the event with campaign billing information. This will be the external service course. Because they say to have external dependency you want to be 100% available, we will use side outputs for the failed records after exhausting the retries. We can add [inaudible 00:17:32] to a dead letter queue for data recovery. We can also set up the alert to monitor the number of such failures.

This is the first flow. This is the first non-trivial flow we implement with Flink when we start building this system. Some of the lessons learned are, CPM represents 1000 impressions. For such aggregation record, we should also store the raw IDs of the events composed of the aggregation records. That will be very helpful for recovery, audit, or investigate purpose. We definitely need to choose the right identifier for the aggregation record itself. Initially, we just use the first impression events ID from the 1000 batch. During some incident, we noticed there were quite a lot of duplicate batches after the checkpoint restore. With some in-depth investigation, we realized because we partitioned the records by the company ID, using Flink's keyBy operation. The keyBy operation only maintains orders for those events for the same key coming from the same task because Flink splits events into multiple different subtasks, before we do the keyBy. Those events from the different subtasks, when they arrive, they will arrive in different order. Whenever the checkpoint restore happens, the first impression of the same batch won't be the same one anymore. That's why we see the duplicate batches. Once we knew that, the fix is simple, we just switch to an auto-increase batch serial number representing the 1000 impressions batch.

The next one is the CPC flow. CPC stands for cost per click, for the sponsored product and campaigns, [inaudible 00:20:17] is the brand. They mainly focus on the user engagement of the campaigns. Those orders go to the convenience stores not through the brand directly. Using these click-based pricing model is a better shoot. It starts with the raw click events processed. The click processor's responsibility is to look up the auction data so that we can know what's the price for each of the click. Then we have this CPC billing job to look up the campaign's billing info and generate the billable event record. It's very similar to the CPM flow, we will use the same pattern. We use the AsyncDataStream to enrich the record, and we use the Flink state to do the dedupe. From the product perspective or to avoid some fraud, we don't actually count every single click. For example, sometimes some user might keep clicking the same stuff until they do the next step actions. For those clicks that happened in a very short time, we shouldn't count all of those. What we did here is we only count clicks within 30-minutes, within certain time, for example, 5 minutes. To implement these, we just need to record the last event timestamp in the Flink state. We either refresh or renew the timestamp when there was a new record generated.

Ads Attribution Deep Dive

Next, let's dive deeper into how our ads attribution works. For the sponsored listing campaigns, we use this CPA model. CPA is cost per action or cost per acquisition, which basically we needed to attribute the order to previous ads interactions. Here we choose to attribute that order to previous clicks from the same user or the consumer. Our product platform encompasses the full merchandising lifecycle, and the ads lifecycle. Comparing with other platform, we have better user profiling and a better conversion tracking. We also have this withhold ads fee functionality from the order payment. This helps small businesses using DoorDash's platform, because they only need to pay the ads fee when they see better results. They don't need any out-of-pocket expense by utilizing our ads platform. Here is the full CPA attribution flow. We start from the same click processing logic, do the auction data lookup to stamp a price to the click. After that, we store the click by the consumer and the time, for later attribution. The CPA attribution job listens to the order stream. Whenever the order comes it'll go back to trying to match the clicks from the same user. Among those multiple eligible clicks, we now try to find the highest value of clicks to attribute to that order. Whenever there's a match, we again publish the transaction to the ads transactions topic. There's one optimization we did based on the product requirement. We only consider the clicks between the two orders at the ads levels. For example, if there was a previous attributed order already, we only consider the clicks after that previous order, and before the current order as the range for the eligible clicks, so we don't need to always look back to the seven days attribution window, is the mean of the last attributed order time and the seven days.

You might notice, there was some data races in this attribution process. There were data dependencies between the jobs. We first need to look up the auction data before writing ad clicks. We then need to look up the ad clicks when the order comes. Imagine if the click processor didn't write the clicks in time, when the order comes, because we are doing real-time attribution and processing, we might not be able to find the clicks, then we miss the chance to do the attribution online. How do we solve this? There were several iterations in this process. We initially have these 10 minutes tumbling window, meaning we accumulate the orders for the last 10 minutes. It has a buffer. The hope is that within these 10 minutes buffer, the related clicks that could have potentially attributed order should have arrived and are processed already. We choose the 10 minutes because this accounts for the p999 end-to-end latency of the ad clicks. Later on, we try to union the ad clicks and order stream. Store the clicks inside Flink state directly so we don't need to write clicks to an external storage anymore. When the order comes, we just do the clicks lookup with Flink's state directly.

Since we use seven days lookback, in theory, we only need to store seven days clicks. To get some flexibility, we actually store the last 20 days of clicks data. That's less than 12 gigabytes. The related checkpoint data is around 10 gigabytes. Right now, for checkpoint, the end-to-end latency is about 23 seconds. Now when we need to restore from another checkpoint or savepoint, it's about 88 seconds. The Flink savepoint is similar with the checkpoint mechanism but has to be triggered by the user. It's not automatically handled by Flink itself. The reason is the savepoint is used more for when we need to change the job logic. Whenever you have a Flink job and you use a savepoint, you couldn't just update the code as whatever you want. You could only do the savepoint compatible changes, is like your schema of the job, but there was still some flexibility for you to evolve the job. That's the main use case of the savepoint. Then we can also use savepoint to scale up the job.

Here are some of our key learnings and takeaways for designing the data pipeline. We should always design for failure. Idempotency is the key. We need to make sure our state is recoverable. We should also avoid data race if possible, but that won't be for all the cases. For us, even with these two different variations, there were still chances we might miss the attribution, we could. We also designed the offline attribution to rescue in this scenario. At the end of day, we can run this offline attribution to re-attribute those orders we have missed during the real-time path. We also have some Flink got you's we wish we knew from the beginning. For example, the stateful job needed to be returned in a savepoint compatible way. For example, the state could be defined with a TTL. However, you need to decide whether a state should have a TTL or not in the beginning, because changing a state from no TTL config with an explicit config, that's non-compatible changes. Instead, in that case, we can only define a new state with the TTL and convert the old state to the new state. The next is if we do need to store the Flink state with a customized data type, for example, the different Google protobuf types, you have to register serializers properly, otherwise, the default serializer won't be able to deserialize the data whenever you change the protobuf definition. You also need to tune your checkpoint config to achieve better checkpoint latency and recovery time based on the data size.

Unified Budget Tracking

Next, let's go into our unified budget tracking. Initially, we have a single job doing everything. Whenever we have the new ads products added, we keep adding some quick and dirty solutions to the existing job to solve some more use cases. That single job reacts to campaign budget changes, accumulate spend, enforce budget cap, emit billable records. It's very hard to extend it to handle more use cases. Reliability is a big concern. It's hard to debug, reason, and investigate. That's why we decided to revamp and redesign a new budget tracking architecture. It evolves from this kind of monolithic to single responsibility jobs. Here is the new design. This transaction topic is what we showed as the ads transactions. It helps track out the different campaign types. All different campaign types they just publish the ads transaction type to this topic. We have this unified flow, start from spend tracker, which is to provide the initial spend allocated events to our event store. The idea is inspired from the so-called event sourcing pattern, where we don't directly update event of the state. Instead, we write multiple different states for the same event. For every single ads transaction, it started with the spend allocated right into the store, relying on the CDC mechanism which does publish the changes of the data right into the datastore. After budget cap, if we're still within the cap, we turn it into a spend verified event, representing the same transaction, and then writing that new record with this new state back into the store. Then the budget spend generator can publish these spend verified, accumulate this campaign spend via either daily or monthly aggregation. That's what we use for the serving path to do the pacing. How do we enforce the cap, or how do we check the limit? This is abstracted out by this budget management service. Similarly, the billing event sync, also listens to this spend verified event and generate the detailed transaction record for billing purpose.

The benefits of these single-responsibility components is much more easier to be made components, which is very important for fault tolerance. Also, isolating different responsibilities into separate jobs, enhance the fault isolation. With these different jobs they are independently scalable as well. This also simplifies the testing and the debugging, or improve the maintainability of the code as well. How do we manage the fault tolerance across the whole pipeline? The simple and powerful idea is once you can make all other components at-least-once with deduplication steps, that's how we achieve the exactly-once semantic. All our things are idempotent writes. Whenever the jobs process the same event, it adds skips or upserts, the event store like data model, with that design, we represent the full budget transaction history in the store so that we have the ability to replay and reprocess the data.

Reporting

The next is our reporting infra. Data reporting and analytics inside DoorDash are highly reliant on ETL and Snowflake. ETL's nature is batch, which couldn't get nice real-time insights about campaigns data. When we initially launched our ads, we used the existing infrastructure to build the reporting solution. However, the real-time report is very important, especially for those small businesses running under DoorDash's ads platform. For our enterprise customers, they don't usually use our system directly. We have the analytic and ops team, who are responsible to generate the sophisticated reporting solution for them. In order to enable and support small businesses to use our self-serve platform to check the campaign reporting, we paired with our DoorDash analytics engine team to build a solution by combining the power of Flink and Pinot. This is a high-level architecture from Pinot. The king idea is we can leverage the hybrid table of Pinot. When we inject the data, we can define what are the real-time data and what are the offline data. When we do the queries, the Pinot broker helped us to abstract this out. We just send the Flink queries to the Pinot broker. It will route queries to different servers for real-time data and offline data separately. With this design, we define the past three days data as real-time and the old data as offline data. They are stored and handled by the isolated servers, but the queries are exactly same. All these complex details are handled by Pinot's internal logic. We still send the same queries as we sent to Snowflake. Pinot could just achieve the low latency queries on all those key metrics we want to report. These real-time insights allow for those small business owners to be able to quickly update the budget or the targeting based on the current campaign performance they saw. Then they can also end the campaign any time when they saw a satisfied outcome.

 

See more presentations with transcripts

 

Recorded at:

Mar 21, 2024

BT