BT

Facilitating the Spread of Knowledge and Innovation in Professional Software Development

Write for InfoQ

Topics

Choose your language

InfoQ Homepage Presentations Robust Foundation for Data Pipelines at Scale - Lessons from Netflix

Robust Foundation for Data Pipelines at Scale - Lessons from Netflix

Bookmarks
38:17

Summary

Jun He and Harrington Joseph share their experiences of building and operating the orchestration platform for Netflix’s big data ecosystem.

Bio

Jun He is Sr. software engineer in the Big Data Orchestration Team @Netflix. Harrington Joseph is Sr. software engineer @Netflix Data Platform Orchestration Team.

About the conference

QCon Plus is a virtual conference for senior software engineers and architects that covers the trends, best practices, and solutions leveraged by the world's most innovative software organizations.

Transcript

He: My name is Jun He. A software engineer at Netflix. We are going to talk about our workflow scheduler, a robust foundation for large scale data pipelines. We will share our experiences and the lessons we learned, including patterns we developed. Netflix is a data-driven company. Every decision at Netflix is driven by data insights, from the color used in the landing page, to upcoming original content. Data scientists, engineers, and even content producers, all run their data pipelines to get the insights they need. Thus, the scalability and the stability of the data platform has become more important. Over the past eight years, we have developed a robust foundation to provide users, a consistent way to manage and automate their data pipelines.

The BDP Scheduler - What Is It?

This foundation is called a BDP Scheduler. BDP Scheduler provides workflow as a service to thousands of Netflix internal users. It is scalable and reliable, including multiple components. For example, workflow engine, UI, alerting, and abstraction layers such as DSL and templates, either satisfies all the needs of our users, including engineers and non-engineers.

Architecture

Here is the high level architecture design. Our users uses UI, CLI clients to interact with the scheduler APIs over the gateway, which provides a public abstraction and hide the internal details. The gateway is also helpful while dealing with API instance. For example, we can shut down the gateway to disable APIs, but still keep the workflow engine running. This greatly reduce the impact of the incidents. Workflow engine is the core, which manages workflow versioning, the metadata, and the status of the workflow executions. Either supports scheduling with trigger components such as a time-based cron trigger, and the event trigger. Either runs business logic in Docker containers over the execution engine. The core itself is loosely coupled with downstream services by sending data changes to an event stream.

Use Cases

This foundation worked quite well to support a wider variety of use cases. Users use it to automate their normal ETL pipelines, train their machine learning models, and so on. Some services also use scheduler to periodically run jobs. Here, we tried to abstract the common functions or reusable patterns from all use cases and add them to the scheduler in a loosely coupled way. We also found that human readable DSL is very helpful and plays an important role to support heterogeneous use cases. At Netflix, users can write their workflow definition using DSLs in YAML, Java, or Python. A YAML DSL is actually the most popular one due to its simplicity and easy to understand.

Scale

Here is our current scale. The growth rate is quite high, you can see from this chart. We observed about a 300% increase per year. One best practice to operate a service with this scale is to build good monitoring and logging around it. For example, in the past year, we were able to find out that the scheduler would have scaling problems in the near future by those metrics. Then, we started the engineering work before it actually happens. Now the scaling problem starts to appear. The engineering work is close to complete. One lesson we learned is that in this scale, we may experience many unexpected problems. For example, we found that there is a large load spike at midnight every day. Thousands of workflows are scheduled to write exactly at midnight. We talked with other workflow owners, and many of them were ok to start execution at a different time. The reason they use midnight is just because they copy the example in our schedule doc, which uses midnight in this cron trigger. Then, the next lesson is that it is hard to ask thousands of users to change tens of thousands of their workflows, even if it is a tiny change. The communication and correlation efforts will be much more than the efforts improving the system, which is the path that we took to handle this peak traffic.

Workflow Engine - Workflow Patterns

Next, I will focus on the core component, workflow engine, and talk about its design and the technical decisions we made to better serve our users. First item is about whether to build a barebones engine or an engine including useful patterns. You will only support the core features. We could reduce the complexity and make the core well optimized, but it means we will push the work to the user side and ask users to repeatedly write the same logic or find a workaround. This might lead to thousands of ways to solve the same problem. It is much better to support users with common patterns than asking them to do it by themselves.

I will use foreach as an example. Foreach is a very popular pattern. Users use it to repeatedly run the same jobs with different parameters such as backfield data or parameter tuning. We might ask users to explicitly define each iteration in the definition, which is tedious and time consuming for large iterations. Think about the tens of thousands. Also, they need to create new workflows if the foreach range changes. With the direct engine support, users just provides an array of parameter values for foreach and jobs to run. They can change parameter values without recreating the workflow.

Similarly, we add a support of subworkflow and conditional branching. Subworkflow support allows a workflow job to run another workflow, and conditional branching allows to run some jobs only if the condition is true. Directly supporting them in the engine enables us to optimize those features and provide a consistent way to realize those patterns. Additionally, DSL support can help to reduce the complexity to define it.

Parameterized Workflow

Another related item is about static versus dynamic workflow. Static workflow is simple and easy to use, but it is quite limited. Oftentimes, users have to duplicate the same workflow many times just because there are very small changes. A completely dynamic workflow will be hard to manage and support. It is difficult to debug or troubleshoot. Therefore, we developed and created something called parameterized workflows, which are initialized, step by step at runtime based on current user defined parameters. Users love it. It is easy to manage and troubleshoot, and also powerful enough for them to write workflows with rich functions. To further enhance it, we support two execution modes, runnable mode, which you're not returning any data, and the callable mode, which returns output parameter values from the execution back to the engine. Here is an example of backfield workflow with three steps. In step one, user computes the backfield renders, and it returns the date back to step one. Next, foreach step uses the dates from step one to create foreach jobs. Finally, in each of the backfield jobs, either gets date from the foreach and backfields the data based on it.

Parameter

As we just showed, parameters plays an important role in our scheduler. We have to decide whether to support dynamic parameters with code injection or not. Code injection is super useful and powerful, and our users love it. Together with other features, it makes the workflow very dynamic. Part of which the code injection, the security is a big concern. Also, users might write buggy code. For example, in the past, a user unintentionally wrote an infinite loop to create an array, which then eventually crashed the scheduler JVM. We could ask users to put the injected code inside their business logic instead of in the workflow definition. This will put a lot of work on the user's side and also couple the user business logic with the workflow. Eventually, we developed our own customized expression language to support the code injection. During the abstract syntax tree parsing, we put the limits there to secure and protect the system. Here is an example to show how a user writes a piece of code in the workflow definition to validate the results.

Execution Control

Next, I will talk about the execution control. What we found is that simple concurrency control on a single entity is not enough. We and our users want to control, how many workflows to run, how many jobs within one workflow to run, how many jobs across all workflows to run, and so on. This requires a multi-dimensional concurrency control. Also, users want a scheduler to automate data pipelines, but also have enough control of the execution order. We provide both low-level control and some predefined high level run strategies. It turns out the predefined ones are good enough, and almost no one uses the low level ones. It actually caused more confusion to the users, and later we removed those low level ones.

Here are predefined ones. Sequential is to ensure only one running in first-in first-out order. Strict sequential is to handle the unexpected job failures. In this case, users you already want to pause the execution but still include the new ones. For example, here, when the run file failed, run 6 to 12 will be enqueued whether there is no new execution. After the underlying issue is fixed the user then restarts the failed one to resume the execution. First only is to achieve idempotence. If there is an existing running one, the new one is skipped. Last only is to allow new run to cancel the current running one, and also parallel with the concurrency limit.

Debug and Troubleshoot

It is also important to provide the debuggability and the service errors for users to troubleshoot. Those will improve the user experience and there was time and efforts to implement. For debuggability we support the breakpoint to allow users to post the jobs in workflow. We also include error information, execution timeline and milestones to artifacts to help users to troubleshoot failures. With all those learnings, will we always prefer adding more functions to the workflow engine? The answer is not always. In some cases, we move functions to other services or build dedicated services. This helps to separate the concerns and make the whole platform more scalable and robust. If any of those service is offline, the workflow engine can still run normally.

Domain Specific Language (DSL)

Joseph: I'm Harrington Joseph. I'm a senior software engineer at the big data orchestration team. I work together with Jun building the next generation of scheduling tools. Let's talk about the DSL. The DSL stands for domain specific language. This was the first component that we built that is user facing in order for users to have a unique, simple, and unified way of defining workflows. For this, we picked YAML. We picked YAML because we wanted a static definition that anyone could use and read. Here's an example. The top section shows a trigger block that is completely optional, but it allows users to define how often they want their workflow to trigger automatically. In this case, it's defining our cron trigger that is saying that this workflow should run on a daily basis. The time zone is U.S. Pacific. This means that this workflow is going to get activated at midnight in U.S. Pacific time. You could use complex run expressions for all the time zones that are considered valid.

Next, you have the workflow section. A workflow is defined initially by providing an ID which is a unique identifier across a platform. Then you need to provide a list of jobs or units of work that your pipeline is going to execute. In this case, we have two jobs. Job one is a Spark job that only, for presentation purposes, is running a SELECT 1. Of course you can do something much more complex than this, or provide a SQL file or a JAR file. The job number two is a NotebookJob. In this case, a NotebookJob allows you to execute Jupyter Notebooks as part of your pipeline. All the user needs to do is to provide a URL or a path to the notebook so we can find that notebook on execution time. This provides a common tool for users to define units of work to define their workflows. From now on, all the workflows follow this structure. It's fairly simple to understand what a workflow looks like and where the jobs are, and what parameters it's using, and so on.

It also complies with the following three principles. Readability. Anyone can read this. There is no reason to read buggy code, or nested, or recursive code in order to understand how the workflow should look like. All you need is a text editor and you just read the YAML. Next, reproducibility. This is very reproducible because anyone can go find a YAML in your repo, copy, maybe change the ID portion, and you should get the same behavior. Then you can actually iterate over these, add new jobs. Tweak some parameters. It's very useful for users also to learn from each other. Debuggability, because it's readable, reproducible, and people can actually iterate over this, then it is very easy to debug.

The Command-Line Interface (CLI)

Let's talk about the CLI. The CLI is a command-line interface that we built for users to interact with the scheduler. It is installable anywhere. Meaning that users can actually install these on their laptop, servers, dev environment, Docker containers, you name it. It's integrated with the platform. This means that you understand security. When a workflow is created, the workflow is shipped together with the user identity. When the workflow is executed, it executes as a user, meaning that it has access to the same resources. It also provides asset management. If you need to shape say, a JAR, or a SQL file, and encrypt that file that you need to decrypt during the execution time, this tool also allows you to do that. It takes care of that in a seamless manner.

Here are some examples. The way that you access these tools is just by typing the scheduler command. Then you can create a workflow just type in scheduler push, and then provide in the YAML file. From that point, you can run, activate, and deactivate the workflow by simply having the ID of the workflow so you can just execute one of these commands. The last command, Info, is actually very useful, because it provides you ways to fetch information about a workflow, and all you need is the ID. You can get information about what the workflow status is, what the latest version is. What's the current active version? Even a link to go to the UI.

We also provide a dynamic DSL, and it looks like this. It's fairly similar to like the Airflow DSL, in this case it's the Python one. As you can see, it's using the built-in pattern. Instead of operators, we provide what we call job types. This is exactly the same as the YAML definition. We also have a Java version, pretty much the same, just a little bit more verbose, but it's doing exactly the same thing.

You might be wondering, where did all the principles go? If you look at the workflow creation, once you have the YAML, the YAML gets pushed to the scheduler API by using the scheduler CLI. The Python DSL, and the JAR DSL talk straight to the scheduler API. The scheduler API has the ability of computing a YAML for every new created workflow, and then stores it on S3. This computed YAML becomes a source of truth for any given workflow. At that point, if a user pings me and says, "I need help to download my workflow.

I don't understand what's happening. The workflow doesn't look how I want it," or anything like that. I can just go and find a computer demo, take a look, understand. Then even iterate over that YAML without having to read that Python or Java code that might be a little bit trickier for me, because I may not have the same libraries. I may not have the same dev environment. I may not understand what the user is trying to do in their code. This is actually much better for us. It actually complies with the principles that I mentioned. It's readable, reproducible, and debuggable.

Execution

Let's talk about executions. Similarly to Airflow that provides operators, we provide what we call job types. Our job types are built on top of Jupyter Notebooks that we use as templates. Then on execution time, we basically just inject parameters that have been provided by the user into the template for the template to actually use them during the execution. We provide templates for like Presto jobs, Spark jobs, Transport jobs, meaning that you can move data in and out of RDS, Elasticsearch, Hive, and Iceberg. Reporting tools. Data auditing tools. Notebooks, as I say, bring your own notebook, we'll run it as part of your pipeline. We can do the same thing with your custom scripts. You can bring a Bash script or a Python script, and we can also run it as part of your pipeline. You can even bring your own Docker image and specify a custom entry point.

This is very good for our users because it provides a level of abstraction. Users that want to run a Presto job, all they have to do is to provide the SQL statement that they want to execute. They don't really need to know how the Presto job actually runs. They don't need to know the internals. They don't need to know which cluster we're talking to, for example. The next one is also standardization. All the Presto jobs, and all the Spark jobs, all the Transport jobs, they all run through the same template. Therefore, if we find a bug, we can easily go apply the fix, and the fix is applied to every single job that we have in the platform.

Our execution is done in a way that we run every single job in its own container. This means that we get a clean and isolated environment for each particular run. We default our runs to use the BigDataImage. The BigDataImage is a Docker image that has been built and carved by the big data infrastructure team. It contains all the dependencies that we may need for any of the jobs that I mentioned before to run. Our default entry point is Papermill.

Papermill is a tool that allows you to run notebooks in a headless manner. This means that you don't really need the notebooks UI in order to run the notebook. Here's how it looks. We basically call Papermill. We provide the path to the template that we want to execute, say, the Spark template. Then what you see in pink is actually the output notebook where we want these to be reading. We want these execution to be reading like a log but in a notebook format. Then we just have to provide the parameters that we want to inject into this notebook. In this case, we're defining a parameter called spark that has a key called script, and a value of SELECT 1.

Here on the right is a notebook output. At the top, there is this cell called parameters. At the bottom of that cell, there is a Spark equals a dictionary contain, script equals SELECT 1. This is the parameter that we just injected when we made the Papermill call. This is very nice, because users can actually see all the parameters that are getting injected into the notebook execution. It's easier for them to understand what's happening. They can also see the rest of the execution that is actually the code that gets executed in order to execute, for example, in this case, a Spark job. It's definitely readable. They can see everything here. Of course, we talk about attraction. This is a way of diving into details if you want to. It's also reproducible, because anyone can download this notebook and run it locally. It is debuggable because after you download the notebook and you're running locally, you can actually tweak the parameters and even tweak the internals of the notebook in order to achieve what you want. Then you can actually go back to your workflow definition and update your parameters accordingly.

Event-Driven Triggering

On top of manual and time-based triggering, we also provide event-driven triggering. It looks like this. If you look at the trigger section now we have what we call signals. In this case, we have two groups of signals. The first group of signals is defining two signals that says, I want to run this workflow when Table A is updated and when Table B has a partition updated. We're able to do this because signals in the scheduler ecosystem are the interpretation of events. Our platform is fully aware of events in the Netflix ecosystem. We know when a table is updated, when a partition has landed, and so on. In addition, we also allow users to define custom signals. For example, I want to run this workflow when an application called my app has announced a signal called abc. This is also available for users because applications are able to emit events, and once these events get placed into our event stream, we can turn them into signals.

This is more efficient, because there's no reason to waste resources checking if you're ready to run. It's also more accurate because you move away from executing when you think you have to, so instead of running hourly or daily, and so on, you execute when you actually have to. This also applies for what we call job dependencies or input signals. The workflow can actually be executing until the point it gets to a job that has input signals. At that point it needs to check if these signals have been satisfied. If the signals have not been satisfied, then the workflow is going to wait for these signals to satisfy in order to be able to run this job. This is very similar to Airflow sensors, with the main difference that this is 100% event driven. There's no polling mechanism behind it. There's no resource waste in this case. The same way we have output signals, jobs can emit signals that can be consumed downstream either to trigger a workflow or to release a job that is waiting for a signal. This is a very nice way to orchestrate jobs and workflows in a distributed manner.

Summary

I would like you to understand that, from definition to execution, our platform is composed from multiple layers. All these layers have been defined with clear separation of concerns. This has allowed us to provide a solid foundation to support multiple use cases, and for applications to build on top of our platform as well.

Questions and Answers

Nardon: Your strategy of using Jupyter Notebooks as the pipeline source code is very clever, because you can do experimentation in Jupyter and then use the same code to actually execute the pipeline. When you are in the experimentation phase, you probably want to do experimentation using samples of data, or things like that. How do you organize this? Do you have samples done for every dataset you can use later in production? How does this work in practice?

Joseph: It's something that everyone struggles with, because replicating the same data center for test and prod is quite challenging. We do have some test representation of some tables and datasets and stuff like that, of course, it's not 100%. It doesn't represent absolutely everything 100%. The users are able to fetch either some data from a real table, maybe they just gather a smaller set because it probably doesn't fit in-memory or whatever they're trying to do. Then they can actually try to output, whatever. Say, they're doing some ETL or something like that, they can output that to a test table, so they can run over and over. Or, they can even output to a prod table that is under their own database, so it shouldn't actually be propagated or has any impact on live tables. However, the advantage of it is also that users are able to upload to their own databases that are available in production for other people to consume as well. You can see a lot of use cases where users have different ways of inputting data to their tables, either on the experimentation phase, or in a production phase. Then they actually gather that on execution time on their workflow. Machine learning is one of those examples where people are constantly tweaking their data and then picking it off live.

He: All the workflows are parameterized. Basically, from a source where we choose the actual production data from there, users actually can run on even multiple workflows, by passing different parameters for the output path or the input path and then just run their tests without writing the data back to the production set.

Nardon: There's several questions around how do you compare with Airflow, and why are you using your own solution instead of Airflow or other schedulers. Can you elaborate a little bit about that?

He: We evaluated Airflow actually, before we started this. There's multiple things, firstly, definitely scale. In the Netflix scale, we have lots of use cases, and users do not always use Python, they may like to use Java, or they may even not know how to write code, or they just want to write some SQL to query some data. First, we provide DSL, YAML, Python, Java, and users with all kinds of backgrounds can use that. Also, in terms of scale, because our data is in one dataset, all the org is actually here. We don't handle, say, media team only use media data, or something. The org can access all the data. Then we don't want to have multiple schedulers and clusters there. When we take a look at Airflow, it definitely cannot support this scale. First is a single workflow, we started our jobs. This is the first thing. There are also tens of thousands of workflows. Airflow cannot manage that in one single cluster, and our scheduler can. Also, our animation and user experience. Then the last one is the additional features that we provide, code injection, event-driven, and also those common patterns for each of the workflow, and all those things.

Joseph: I want to add two more things. One, when we started to work on this, it's been around four years since we've been actually building this. All these decisions actually predates that. Airflow wasn't quite mature around that time to the scale that we're talking about. That was one of the decision factors.

Then the other decision factor is, as every data platform is composed by tons of services, but for the user experience it's very hard to say, "I have to run a workflow, I go to this service. I want to check my table, I go to the other service." Our philosophy in our platform is to provide a single and unified experience. From the user perspective, it looks like a singular entity. That was the other thing that we were looking at, like, how can we build a scheduler that we can integrate in the platform that feels like a single thing that is part of the platform and not an extra piece of software? That's why we're able to react to events because we're fully aware of what's happening in the platform and getting that with something external was actually fairly challenging.

Nardon: The workflow orchestration engine is a conductor, can you also explain, what is a conductor?

He: The conductor is a service orchestrator orchestrating microservices. In the current version of our BDP Scheduler, we didn't use that engine, instead we used Meson. In the future version, we are considering a conductor due to ease of horizontal scale.

Nardon: Is there an example, or is it available on GitHub? Is it open source?

Joseph: No, at the moment it's not open source. It's fairly challenging to open source it the way it is, because it's very integrated with Netflix. We are exploring different options to see what we can give back to the community.

Nardon: There's another question around YAML. Don't you find the final code in YAML limiting to complex advanced workflows as compared to the [inaudible 00:34:36] code? If you want to elaborate a little bit more.

Joseph: When we proposed YAML in the beginning, we had a lot of people bring that to us. Like, for me, it's much easier to be able to define things in code or I just want to keep my code together. Many different reasons. People wanted to dynamically generate very large workflows and stuff like that. The initial idea was to just provide something very simple for everyone to use. That's one of the main reasons for using YAML. Also, we wanted to have something that was readable. YAML was the choice at that time. Then we basically built the other two DSLs to support mainly applications, because applications do need to generate their own workflows and stuff like that. We wanted to provide them a way of doing this without having to generate a YAML, then ship it. As we did this, we basically just put this in front of users as well. Some users do actually use the Python or the Java DSL to generate the workflows. We're always completely YAML at the end for us to be able to understand the workflow, because I don't want to have to set up a Java environment where it's basically the specific dependencies to understand what a user is trying to do.

Nardon: Do you have any plans to support or align with serverlessworkflow.io? It's an open source project as well.

Could you cover workflow deck versioning?

Joseph: When you create a workflow, you basically claim or restart that workflow ID. The next time you make modifications, you push new workflow definitions under that ID. They basically get up into the same unit, where they get a version, and then you can basically switch between versions, either to rollback or to have two different implementations of the same workflow.

He: The scheduler managing the versioning is a [inaudible 00:37:14].

 

See more presentations with transcripts

 

Recorded at:

Dec 16, 2021

BT