BT

Facilitating the Spread of Knowledge and Innovation in Professional Software Development

Write for InfoQ

Topics

Choose your language

InfoQ Homepage Presentations Modern Data Pipelines in AdTech—Life in the Trenches

Modern Data Pipelines in AdTech—Life in the Trenches

Bookmarks
46:57

Summary

Roksolana Diachuk discusses how to use modern data pipelines for reporting and analytics as well as the case of historical data reprocessing in AdTech.

Bio

Roksolana Diachuk works as a Big Data Engineer at Captify. She is a speaker at technical conferences and meetups, one of the Women Who Code Kyiv leads. She is passionate about Big Data, Scala, and Kubernetes.

About the conference

QCon Plus is a virtual conference for senior software engineers and architects that covers the trends, best practices, and solutions leveraged by the world's most innovative software organizations.

Transcript

Diachuk: I'm Roksolana. I work as a big data developer at Captify, and also, I'm the Diversity Inclusion Ambassador there. I've been lead of Women Who Code Kyiv community in Ukraine. Also, I often speak at conferences and various meetups.

Outline

I'm going to talk about AdTech as a sphere in the domain, to give a better understanding of the concept, of the things I've been working on. Then I'll move specifically to the data pipelines in the context of AdTech. I'll give lots of practical examples, more on the architectural level of which pipelines are used in its AdTech, how they are built. There'll be a whole section about historical data reprocessing. Then we'll draw some conclusions out of it.

AdTech

First of all, talking about AdTech. AdTech or advertising technology, it's like advertising on steroids, as people call it, because it means that usually in advertising we want to sell something. We want to find the right consumers or customers to sell it to them. While in AdTech, a specific set of technologies is used to basically deliver the right content at the right time and to the right consumers. With the help of these technologies that gather some context and better understanding of the users, it's possible to build better pipelines in terms of performance.

What Captify Does

What does Captify do in terms of AdTech? Captify is the largest independent holder of user search data outside of Google. We collect, categorize search events from users in real time. We have billions of consumers. This data actually feeds our campaigns and helps to build advertising campaigns in a more efficient manner. We have this main product, which is called SENSE. It's a so-called search intent platform. This platform allows users to manage their campaigns, figure out the performance of their campaigns and efficiency, and also use the concept of audiences. Having been able to gather search data, we categorize the users on the various audiences they belong to. Therefore, the advertisers will be able to sell the product in a more targeted manner to specific users they're interested in.

Data Pipelines in AdTech

Talking about the data pipelines, we have various pipelines to the search needs of various teams. For example, there is a reporting pipeline. In any company, there are some reports in the end of the month, quarter or year. Search reports are built on top of specific volumes of the data, and therefore, most of the cases require the latest technologies behind them. Another pipeline is around insights. Captify has this concept of insight, which gives a better understanding of what's happening in the market currently, in terms of the current trends from the user searches, like what the users are searching for more this month. These insights and tailored insights for each of the customers helps to build more efficient advertising campaigns. The concept of classification of the data, gathering the insights, delivering them to the teams is also built using big data technologies. Data costs attribution pipeline is a whole topic. In general, it's like more one of the financial pipelines. It's an important one in terms of the way we use our data and the way we figure out whether our data is efficient or not. Another pipeline is around building users audiences. There is a pipeline which classifies users into these various audiences. There are separate sets of pipelines that actually pushes data to our customers. This is also like a set of various pipelines. Just in general, all kinds of data processing, collection, storage is also built using modern data pipelines.

Reporting

Talking about reporting first, I'm going to cover, not all of the pipelines that I mentioned, only a set of those because some of them just require more time to look into it. They are more complex. You're going to discover more, some challenges around those pipelines, and the way they work right now, and possible future improvements. Talking about reporting. As a concept, reporting is quite simple. We have some data provider that shares the data with us. We need to ingest this data, transform it in some way, and upload it to some storage system. A storage system can be very different. It can be a query engine, or something like HDFS or S3, depends on the needs. How it looks like at Captify is just a more simplified version, or a bird-like view. We have Amazon S3 storage, where our customers push the data in various formats, CSV, parquet, and protobuf are the most common ones. We process these formats using Spark. There are some details we're seeing that happen behind the scenes, so we're going to look into it in more details. First of all, reading the data from Amazon S3, we don't just read the data with Spark, we use our own build library on top of Amazon S3 client called S3 Lister, which lists the files and filters out the parses if we don't need to read, usually in terms of the time. For example, you will read the data over a month or a week, but you won't have to read the data for the whole year, in most of the cases.

Another detail is around actually reading the data with Spark. We need to enforce some schema, because for reporting, some expectations are set for us by the consumer teams regarding the way the data is going to look like. There is another important part, which is around parsing the dates. The data is sent in very different formats, but we are applying partitioning of the data by the date in the end of the Spark pipeline. Therefore, we rely a lot on the information regarding the timestamps, first of all, when we want to list the data and filter it out. As you can imagine, these algorithms should be adjusted to different data formats. Also for partitioning of the data in the end, so we have some generalized format that we want to see, which is usually the date, time, and hour. We can partition by these dates. The issue is that sometimes we get the data where we can read these timestamps within the files, which is the usual expected way, like the timestamp the copy file. Sometimes we do get the data where the dates are hidden in the parse names or some directory names, which can be challenging at times to actually adjust each of the Spark jobs to read the data specifically and partition it the way we would expect to.

Another part of the pipeline is actual data loading. As soon as we roll this Spark job we just read the files and set up a format, we need to upload the data to Impala which is a query engine built on top of HDFS. What happens behind the scenes here? Again, we enforce the schema that is expected by the analytics team that provides this definition. There are some caveats around uploading the data to Impala. First of all, when we have a fresh data provider and we need to make the first upload, a fresh upload, we may have very different volumes of the data. For example, the partner started to push the data a few months ago, so we already have two or three months, or it's very fresh, like only a few days. The thing is that the job running from the fresh upload and scheduled one is a bit different, especially in terms of resources. The scheduled uploads can run a few times a day for example, and you would gather a few hours of data or one day of data at once. While with a fresh upload, you need to run it over a month of data, or even a year sometimes. Therefore, these uploads are different by the resources used and they can be different by the approach to filter now the time periods. In some jobs, we have the mechanisms to figure out whether we already uploaded this data or not. With a fresh upload, there are some steps to figure out that actually there is no data yet, therefore we need to upload it from the beginning of time.

Another thing around Impala was minor data handling. As you noticed, I put both HDFS and S3 behind Impala here, although Impala is built on top of HDFS by default, because we use so-called mechanism of hot-cold storage for some of the data feeds. Usually, the data feeds is saved in our storage system for 30 months for reporting needs, and therefore, usually you would query one month of the data. Very rarely you would query the whole year. Therefore, we have this concept of hot and cold data. We read the data from HDFS, which is hot data, it can be a few days or months tops. S3 stores the data as so-called cold data, which is much older than one month, in such a way queries that want to check recent data run fast on HDFS, and they are the most recent ones. In terms of storage capacity and cost, it's a more effective way to store less data on HDFS. Queries that run over longer periods of time are more time expensive, but they are rare, and therefore it's ok to have this data move to S3. Also, new feeds are also pushed to S3 just for the reasons of moving to a safer storage system.

Reporting Challenges

Talking about challenges. First of all, is around diverse data types. There are different approaches to reading different files. For example, if CSV and parquet are not very much different, the protobuf files will be a completely different story and different formats of the fields, some expectations in terms of more or less a unified schema. Therefore, sometimes there are some challenges around setting up specific columns or even specific files. Another interesting challenge around time dependency. As I said, we need to parse dates, we need to partition the data by the date as well. Therefore, there is a lot of dependency on the data, which is a time nature. Therefore, there can be issues with parsing out the data with the file names, digging it out with the files, changing the format from Unix timestamp, for example to a date, and all of that.

Another challenge is around external data storage. I mentioned this mechanism of hot and cold data, and why it can cause issues with metadata is because S3 tables for Impala are not native. Therefore, it's necessary to create them as external. How it works, is that Impala can discover the data as its own, like this is the data in my table. Impala won't be able to delete this data, or have something like that, more destructive nature. For example, it's safer that if you accidentally delete something in Impala, then the data is still safe on S3, or even if you did it on purpose, and then you can rediscover some of the partitions. Also, there can be issues with discovery of the partitions. Time from time we have some errors when Impala suddenly loses some of the partitions although they are physically on S3. Sometimes the mechanism of external tables in Impala specifically is not completely clear. Another challenge is around being able to connect with the end users. Another team sets up the expectations around the way the data is going to look like. Also, they can have some expectations in terms of performance of the queries, how often they want to run these queries and how long they want these queries to run. Therefore, we as a team should support this need and be in constant loop with them, so each time we upload some new data, we have to get feedback from them that the data is the right format. That everything looks as expected.

Data Costs Attribution

Another interesting pipeline is called data costs attribution. It's more on the financial side of things, but it's a very important pipeline because it's part of this advertising nature. We are going to dig deeper into it. From an engineering standpoint, we have log level data, the data about user events, as we call it. User events is the reaction of the user to the ad. Their reaction can be different. For example, it can be impression, click, conversion, as we call them. Therefore, we have a log of those events, like how each of the users reacted to this ad. The specific time when it happened, and all the information around it, like what exactly did the user see, like specific ads, whether it's part of the campaign, and all of that. The next part is the ingestor module, which actually reads this data. We have actually three pipelines for our three data providers of the attribution pipeline, and therefore we ingest the data differently for each of the providers, similarly to the way I described for reporting.

With this data, there is another module which actually maps the data to the expected format. The expected format is, again, provided by the analytics team. This module is important, because we have way too many fields in the initial log level data. It's actually a log, and therefore we can have like 300 columns there, or something like that, and we don't need all of that. Therefore, a mapper helps to filter out all of this data and set up the format that we actually expect to see. Another module is transformer. Transformer actually is the core of attribution. This is where the attribution itself is happening. Another module is data costs calculator. It's a financial pipeline, and therefore the data costs are calculated up to some transformations, and the result is saved to the database. How it looks like, from very far beyond, because basically, we have some Amazon S3, we have files there, we run Spark jobs, save the information to HDFS. In the process, we refer to some PostgreSQL tables. This is how the attribution looks like. Obviously, as I mentioned, a set of those modules, they are all parts of the various Spark jobs, and some of them only one Spark job. Therefore, a lot of things are happening behind the scenes.

First of all, in order to understand how attribution works, let's look into the data source. We have Amazon S3 with two kinds of data sources or files. One of them is called standard feed, and another is segment feed. Standard feed is the actual log level data that we get from the user events. This is a concept of standard feed in advertising, so it's not something that we invented. Second feed is a feed of audiences that we classified. We store them on S3 as well. Therefore, we can refer to this data, it's more like internal nature of this data that is generated by some other teams. What happens is that we have information about user events in standard feed, and we have information about audiences in segment feed. We need to find a way to join two of those feeds to figure out actually that the users that clicked on the ad, for example, are actually the same users that we targeted from the segments. We have some campaign running, and, for example, we want to advertise some toys for pets. Therefore, we will target the segments of dog lovers or cat lovers. Those are the people that usually search for some stuff around pets. Therefore, we need to figure out that actually we're targeting those users in the first place, whether they got into our log of events or not. That's the core of attribution. Why it's important, because each of the events is not free for us. It's provided by the data providers and therefore we need to pay for the events that actually work for us, the events that we expected to see from the users from specific segments.

The attribution data source also in terms of structure looks like this: we have impressions, clicks, and conversions. It's in this one because the attribution itself is based on the impressions being the initial events, because usually clicks and conversions happen only after impressions. Therefore, even in terms of processing, we start to transform first impressions, we generate results first for the impressions. Only then we can run attribution for both clicks and conversions. What happens in the pipeline? We have Amazon S3. Again, we use this S3 Lister client that helps to list only specific files of the period. There is this ingestion module, which is a Spark job. This module reads different kinds of files, sets up some more like unified schema, and that's it. We get to the mapping. We filter all the columns, only the ones that we expect to see in the final result. There is a part where we refer to our PostgreSQL database. It's important because in order to make some conclusions around the kinds of data that we get from the data providers, we need to match it with our internal data about campaigns and users. Basically, for the same campaign, we can have two kinds of IDs, the IDs provided by the data provider, the way they refer to this campaign, and our internal IDs that we call Captify ID. That's why we need to build some mappings between external IDs and internal IDs, and add up some additional information about the campaigns from the database that we have, from our SENSE tool basically. You can see that everything is connected here.

The next part is around transformation. That's exactly where the join of the feeds happen, the segment feed and standard feed. The join happens on the inclusion rule. Basically, during the mapping stage, we gathered some information about the segments that is stored in our database, and those segments are assigned to specific campaign. We can figure out that actually, this is a segment that we targeted. Let's look into the segment feed and see that actually those are the same, so just the join happens there. There is a data costs calculator. There are just lots of financial formulas in terms of the data costs over different periods of time. It's also a Spark job. The result is saved on HDFS.

Data Costs Attribution Challenges

Talking about challenges. First of all, the attribution pipeline is heavy in terms of resources. It generates large volumes of the data. You can imagine processing log level data where we can have information about the users, like every millisecond. Therefore, there's a lot of data and we need to join it also with segment feed. The result can be quite huge with all the calculated formulas. It's complicated in terms of performance. It's hard to actually pinpoint the failures with the structure because there are some business rules built in into the way some of the mappings work, for example. Therefore, sometimes you need to dig deeper to figure out what actually triggered the result. For example, some data costs are not calculated the way they should, or we don't get the data for some of the campaigns. Another challenge is around historical data reprocessing. In general, it's usually applied to this pipeline, because it's critical in terms of business side of things. It's an interesting use case. Also, it has large data volumes, and therefore it can be a challenge to reprocess even small periods of time.

Historical Data Reprocessing

Now let's move to historical data reprocessing. Usually, when you think of reprocessing of historical data, you can imagine that we have data that is stored over, for example, a year or months, whatever the period is. In terms of business use cases, why would you need to actually reprocess this same data that you already processed? Let's imagine the case when business teams figure out that there is something off with the data over a specific period of time. For example, at the end of March, they figure out that actually there was some issue that happened at the beginning of March, and therefore all of the data over this period of time, which is a month, is not correct. It's not corrupted fully, but it's not correct. Therefore, it's bad for their reporting. They want to present proper numbers, and they don't want to recalculate it manually, therefore, for these external business teams it's important to see the correct data, and see the issue fixed, is there for that period of time, or in general for the future. Business rules changes is always a complicated process, because it can influence a lot of things, different pipelines that may seem not connected in the beginning. Also, it can trigger some issues in the future if you didn't research this change properly.

We have the attribution pipeline that I discussed previously, and we write the data from the Spark job to HDFS, usually in parquet format. We need to reprocess this data, and how it's going to look like. There is a mechanism built in attribution for reprocessing. Usually, when we just process the data for the usual attribution we run, we set up the config called the feed period. We want to process the data over some period of time. Usually, it's a small period, like it can be a few days, like three, five days. We also set up a period for segment feed to merge them only in some period of time, like current days, for example, again. In reprocessing, it's a bit different, you also would like to set up a start date just for safety reasons. For example, on 29th of March, I figure out that there is an issue on the 2nd of March. Obviously, you won't be able to fix the issue on 29th of March, you will still need some time to pick the issue, to test it, to deploy it, to talk to business teams for them to confirm it. Therefore, it's better to set up a start date just in case you'll have to run it over a bigger or smaller period of time. Then the period itself is chosen in such a way that you just take the current day, minus the feed period, which is 30 days. The files are filtered by these dates, starting from the minimum time. Therefore, with such a mechanism, it's possible to go back in time, take some period of time and rerun the new business rules on top of that.

Talking about safety in terms of different versions of the code. How it's usually achieved is by setting so-called fixed version, so we have production version. It's stable. Yes, it produces incorrect data, but for now it's stable. The data is usually partially incorrect, therefore there is at least something for the end users. They need this data every day. There is another version of the same pipeline, which is a reprocessed one, so production version takes the latest stable version that we had and run it. Reprocessed version is going to run a new set of business rules that we set up. Both of them save the data to HDFS, but different directories. The structure of directories looks like this: impressions, clicks, and conversions. First of all, we process impressions, and therefore we can then either process just impressions, and we trigger the pipeline to rerun the clicks and conversions, or we process both of them, but usually impressions takes the longest.

How it works is that we ran our production version, it's still running. We ran our reprocessing version. Then we just replace the directories, obviously, with the backup in such a way that new directories move to the old place. We can run reprocessed version only on impressions, and that's why we will just drop clicks and conversion from production version and set up impressions of the needed period of time. In this case, it's very important not to mix up something and set up the proper period of time for the directories we are going to reprocess. They are processed on the hourly basis, that's why sometimes it's better to take a bigger period to replace the directory, than a smaller one. There is a reporting pipeline that runs on top of attribution. I didn't mention it before, because it's just the same as all the other reporting pipelines. The data is taken from HDFS, Spark job just runs and saves it to Impala. Then we have some tables. When we want to have Impala tables, we would like to, in the same manner as with directories, just drop the partitions that are incorrect and set up and we run the pipeline to get us a result, reprocessed tables. Reprocessed means that it's just different result with the new business case. With Impala, it's not very complicated. Again, this external storage, it will be necessary to drop the data on S3 as well.

There is also a way to check the reporting before actually running it in production. Same as before. I've shown the various versions that we switch to. Here, we can still run production version, and save the result for March, for example, the full month, or only a few days, just to have this check running to another table. Then the team from the business can check out this data and say that, yes, the business rule is correct, you got it right. You can run it in production. That's also an option, depending on how critical the business rule change is.

Future with Delta Lake

Talking about the future, this is one of the ways we do that, having various scripts. Some of the steps are not really automated, and therefore, there can be a lot of challenges around automating it and making the right thing basically, especially if it's something like very critical and important that should be done, like [inaudible 00:32:06]. One of the things that we considered is using Delta Lake. There are quite a lot of features from Delta Lake that can be used for various Spark jobs pipelines. I chose here, three of those that would help this specific pipeline, first of all, is schema enforcement. Just because schema is important for all the reporting pipelines. It's important for end users, and therefore it's good that something like Delta Lake can allow to enforce the schema, and more importantly, keep the changes of the schema and see how it changes over time. As I mentioned, especially with reprocessing, when the data can change over time, because of some business rule change. It's great to be able to see these changes in the data log as Delta Lake allow us to do so. Also, another feature which would be useful is time traveling. Again, just looking back into the data, figuring out at which point for example something went wrong, maybe some issue creeps in. Or just being able to analyze the changes and figure out what worked in the past and what didn't.

We're already on that path. What we were able to do is convert some parquet files to Delta files, as well as all of the Spark table use. Some of the pipelines are already writing data to Delta format, and they already have a log which keeps track of the changes. Therefore, the plan is to move as many pipelines as possible to the same manner and to be able to track those changes and leverage the nature of Delta Lake, which allow us to do so. Another challenge in the future for us is being able to actually reprocess the data using Delta Lake, for example, Vacuum technology. In case of seeing that some data is not working for us or some data is corrupted, or it has issues, that would make sense to be able to just remove it, in the same case as we did with reprocessing and see how it works with rerunning the pipeline and generating new data, and again, being able to see the versions of the data.

Challenges around Reprocessing

Talking about general challenges around reprocessing, the one that I didn't mention before is computing resources. I already talked about huge data volumes for attribution pipeline. Therefore, we usually need quite a lot of computing resources when you want to reprocess some period of time. As an example, a week of data, or less than a week, can be like 2, 3 terabytes for one of the pipelines, which is a lot if you want to reprocess a month. Therefore, a lot of resources are needed for this reprocessing pipelines, and it takes time. If it's critical for the business, the speed is also critical, as they would expect to see the result as fast as possible.

Another challenge is around complexity of the pipeline itself. There are some different steps that should be done very precisely. Some errors in one of the steps of the pipeline can affect the way you would solve this issue. Precise steps means that if you fail on some of those, the cost can be high, because of all of the above mentioned challenges, you will need to reprocess the data again. I had this case, for example, which was not nice, especially because of the critical nature of the pipeline and expectations of other teams to get the result as fast as possible.

Conclusions

First of all, I think that AdTech is an exciting domain for big data stack, that there are a lot of things that can be done and improved in the domain, and a lot of interesting tasks in general. Secondly, that there are various approaches to the way we would usually work with data to the way we build data pipelines. Therefore, the ones that I've shown are not an example of the way you would do things in your company. Obviously, it differs from one company to another. It's more like an example of how we do it at Captify, and the way we are trying to improve it in the future. Another important conclusion that I think that there is always room for improvement and the parts we're learning. It's not like your pipelines which are stable just should stay the same way, you can always improve them by adding some new tech, just in general, analyzing what doesn't work, or works for you, and refactoring the code, changing something, and all of that.

Questions and Answers

Polak: People asked specifically about the attribution pipeline, how does this implementation influence other teams?

Diachuk: Current implementation mostly influences other teams in terms of the delivery of the results of the pipeline. We're actually using to understand, in my case, it's mostly like analytics team that run queries on top of the data that they will get the result of some period of time, because the pipeline is quite heavy. Therefore, it's more around their estimates and expectations. Also, current architectural pipeline allows different teams to differently access the data at various points of the pipeline, like accessing the results as parquet files or within the query engine to actually query the tables in such a way they distribute the load a little bit, and they can look at the data in the most convenient way for them.

Polak: How are you handling duplicated data sent from a data provider, if that's something that you're handling?

Diachuk: Here we have some caching mechanisms for different data feeds. Usually the mechanism needs to identify first, what's uniqueness criteria. In most level data cases, when we have user events, we can rely on something like the timings event, because it's millisecond granularity and IDs of the users or IDs of the actual event. Such fields help to deduplicate that. Also, we have more complex feeds, which depend on one another, like talking about attribution pipeline, one of the data providers supplying like four separate feeds that we then merge in the way each event goes up to one another. There should be a specific direction. The deduplication there happens on multiple stages. The first one joining one field with another, for example, events from the videos. Another one is, after we've joined video events with impressions, we're going to clicks and conversions that I've shown. There we can have additional deduplication for the events, which happened after that.

Polak: Data duplication is known to be always one of the challenges in the data space.

You mentioned working with Delta Lake specifically, and using it for multiple scenarios such as schema validation. Do you see any other improvement possibilities of tech stack aside from Delta Lake?

Diachuk: Yes. Currently, there is general direction within my company to move slightly to Databricks' platform, so there's going to be some changes within the whole pipeline in terms of maybe using some Databricks services. Another big push is around implementing the data mesh concept. Therefore, we are currently working on identifying different criteria on the data, like gold standard, silver, and all those levels. Therefore, we are currently working on pushing the data on different stages so that we would have raw data, more aggregated transformed data, and clean data. Therefore, we're implementing that with the help of both Delta Lake and just our internal tools. We are also looking into different query engines to have better performance out of those, because Impala is no longer serving our needs and users usually run too heavy queries. Therefore, we're looking into other options as well.

Polak: How do you monitor the healthiness of all your cloud ingestion pipelines part?

Diachuk: We did build our own monitoring tools using Prometheus and Grafana, so we have dashboards. In terms of the metrics, I can share that we get those on different stages of the pipeline. First, trying to figure out that we actually get the data from our data provider, and that it's not late in terms of time, because usually there is a schedule. Also then figuring out whether we actually process the data, and the issue is not in the final application. We usually rely on these Prometheus alerts and dashboards that we build in Grafana for that.

Polak: When you think about current or future challenges in the business domain that you work at, what are the challenges for data products in AdTech?

Diachuk: The biggest challenge is about data privacy, with the banning of the cookies in multiple browsers, and multiple companies in AdTech are now trying to build their own cookieless solution. My company as well is doing that. The main challenge here is to actually identify the users without relying on cookies and without knowing more of the users but more relying on the context of these users, like what they're interested in, gathering information about them, but without actually knowing who they are. It's the biggest challenge both for big data because the pipelines should be built, and storage systems. For data scientists as well, because they need to actually build this algorithm and identify the context of the users, and figure out that at least users are actually the ones we are interested in, from the beginning.

Polak: It reminds me when GDPR compliances just started. There was a lot of changes that companies in architecture need to take in order to enable GDPR. Still, even today, some companies are still going through that journey, because there's no one solution for these things. It's really interesting to see how the world is going to adapt and how you are going to adapt the architecture and the processes to enable a cookieless world.

 

See more presentations with transcripts

 

Recorded at:

Nov 11, 2022

BT