BT

Facilitating the Spread of Knowledge and Innovation in Professional Software Development

Write for InfoQ

Topics

Choose your language

InfoQ Homepage Presentations Patterns of Streaming Applications

Patterns of Streaming Applications

Bookmarks
47:34

Summary

Stream processing engines have evolved to a machinery that's capable of complex data processing, having a familiar Dataflow based programming model. Monal Daxini presents a blueprint for streaming data architectures and a review of desirable features of a streaming engine. He also talks about streaming application patterns and anti-patterns, use cases and concrete examples using Apache Flink.

Bio

Monal Daxini is the Tech Lead for Stream Processing platform for business insights at Netflix. He helped build the petabyte scale Keystone pipeline running on the Flink powered platform. He introduced Flink to Netflix, and also helped define the vision for this platform. He has over 17 years of experience building scalable distributed systems.

About the conference

Software is changing the world. QCon empowers software development by facilitating the spread of knowledge and innovation in the developer community. A practitioner-driven conference, QCon is designed for technical team leads, architects, engineering directors, and project managers who influence innovation in their teams.

Transcript

How many of you here actually are doing any kind of stream processing in your company? Oh, that's great. Anyone using Flink? Spark? Any of the other famous, popular, favorite engines? A few hands. Ok. I think as any domain matures over time, patterns emerge. And we all know how useful patterns are. And I think with stream processing, it's the same thing. The domain has been maturing, we have been building different applications, and patterns have come out of it. And internally at Netflix, we've been using these names for these patterns for some of them. And for the others, I'm trying to give the names and naming is really hard. So if you don't like them, I may not be able to appease everyone with the names, but I have to start somewhere. In this session, I want to begin the conversation with the community to discuss these patterns so that we can have a meaningful conversation about it.

I've been building stream processing applications in the platform for the past four years at Netflix, and I've helped drive the technical vision roadmap and implementation of it. Today, I'm going to distill my experience over the four years to talk to you about eight patterns; why do we think stream processing is useful for us. And I'll provide some background material so that if you're new to this space, some of the patterns and things that we'll discuss actually tie things together and you have enough context for it.

In the spirit of any movie or documentary that's based on true events, this talk is inspired by true events. I have to provide a disclaimer. Unfortunately, I don't have a baritone voice, or I don't work for an ad company that sells pharmaceutical drugs, so I don't have a really cool audio for you, but imagine there's one. I did change some code around or some identifying information so that I can avoid the privacy issue of people coding the applications involved.

What is Stream Processing?

What is stream processing? At the very essence of it, you're just processing events as they arrive or as they're produced. Basically, it's nothing but processing data in motion. Let's take a real-world example. This is a home page, you might have visited it on Netflix, and the user browses around and then picks a movie to watch. A whole bunch of events were generated behind the scenes that you didn't know about so that we can provide users a really rich personalized experience. And we have over 130 million members, so we want to provide every one of them a unique experience when they come to the site.

And if you look at one of the home pages, you'll see this row called Trending Now. Now, this provides you member-specific personalization of what's happening right now in Netflix. It's not just most recent across all members, it's most recent across all members specific to your account and profile. If you have a Netflix account, you'd know that you can have up to five different profiles. So each profile is personalized, and this row is personalized for each user. Getting lower latency analytics and metrics is useful, and that's where one of the applications of stream processing is.

The next one is if you're trying to understand user sessions. Maybe they're searching or maybe they are creating certain experiences, and we want to know what happens. In the batch world, you usually have fixed batches and you're trying to identify sessions that happened during those batches. And a problem with that is you might identify a session that was close enough and have those two events as two separate sessions. And you may say, "Well, I can fix this. I can go in and scan the next batch and figure out where the session joins in." But it's a lot of cumbersome work and the tools kind of out-of-the-box don’t really support it really nicely. And what exactly actually you want is you want these two events to be part of the same session.

In stream processing, as we are processing events, we are able to aggregate them into sessions. And so, when you're processing unbounded streams and trying to sessionize them, stream processing is the more natural intuitive model to do and stitch these sessions together.

Why Stream Processing?

A couple more things why stream processing: at a high level, as the data arrives from the website, from the devices that you're playing, there are tons of metrics that are being generated, events that are being generated. We want to clean the data, we may want to enhance it, and then we want to send this data to Elasticsearch Hive, or other stream processing applications downstream. We want to ETL the data as it arrives so that we don't have to drop it, read it again, and then massage it, and clean it up again.

The other area where you can leverage stream processing is not just for ad-hoc analytics. You can actually use it for building event-driven applications. Because at the core of it, if you think about it, both of them are actually processing events. And we'll look at an actual example of where we have actually used it for event-driven applications as well. The other interesting thing about stream processing is it actually turns the database paradigm reversed. In the database world, you take all your data, you shove it in, and then you run queries and other analysis on it. And stream processing, you do queries as the data is flowing in and then you generate meaningful results and you store it in the database. So it kind of slightly reverses what's happening in a database.

Now, I'm going to provide some background and set the stage so that we have enough information to support the discussion on patterns in streaming applications. We'll look at the high-level architecture of what most streaming application looks like. It's pretty simple. At 10,000-foot view, you have a source. You do some meaningful processing, and then you shelve the data to the sink. A little more interesting, you might have multiple sources that you're joining across and you may be fanning out to multiple sinks. In addition, as the events are flowing through, you may be enriching them with data from other services or other look up of data.

So, for example, if you get a click event, you may want to find out which georegion this was clicked in. Maybe there's a service you call into. You want to find out what additional movie metadata was there so that you can denormalize the event and then send it to your data warehouse so that you have an event that's prepared and ready for analysis. We'll look at an example also and a pattern that's related to side inputs and what it entails.

Why Flink?

Why did we actually choose Flink for our stream processing engine? This section will give you a high-level overview, and, if you're new to Flink, also a primer of how things work. Let's start with code, it's pretty straight forward. When you look at the architecture, we can map the different areas here. If you look here, we have the source. And here's this code underlined for the source. And then we have a couple of transformations on the bottom of it. I you look here, we're just cleaning the line up, parsing the line, and then we do three simple operations. It's kind of like doing a GroupBy, and we're doing Windowing, that's grouping into buckets, because we want to do some meaningful metrics on it, analytics on it like Z score, or deviation, or as simple as account.

And then we want to send those results to some sink to be useful. There's a source, there's a transformation, and there's a sink. So, in effect, it's streaming data flows. It streams and a bunch of transformation operators combine together to process these data. And if you're new to Windowing, it's nothing but just grouping or batching group of events so that you can run meaningful metrics on it or computation on it.

That code, what Flink does is it takes that code and splits it up into these operators, and it creates a DAG out of it. Now, it does support some kind of iterations and cycles, but we'll not get into it. For now, let's just assume it's a quick graph of operators. So, in this case, we have a source. We have a map where we took the line and parsed the line, and then we did a bunch of operations like keyBy, window, and apply. Now, if you look at keyBy, window, and apply, they're efficiently grouped together and they're one execution in it in a single process. The reason Flink does that is it tries to avoid network hops when it knows that operations can be chained together. It's doing some smart chaining here for us without us having to tell it.

Then the way this gets actually deployed on a set of processes is Flink has a very similar model to Spark streaming where you have a coordinator process and you have a bunch of slave processes. The coordinator process decides what task is running on what. It's responsible for fault tolerance and making sure when a task manager goes, if any replacement comes, tt becomes part of the whole job again. And here, if you look at the TaskManager process, we have the source and map operators and then we have a keyBy, and a sink. So each task slot thinks about a host operator that needs to be run, and it's a single thread, each task slot. Flink gives you this out-of-the-box mechanism of taking a very simple code, making a graph of operators out of it, deploying it onto a set of processes, and then giving you a fault or a processing mechanism out of the box.

In this case, we have sink parallelism of one. You'll only see one sink operator. With the other operators, we said we want a parallelism of two, so there'll be two instances of each running. It also lets you control the parallelism and the concurrency of different tasks that are running. And all it takes is a couple lines of code to tweak that if you need to. You can take these set of processes, and it's agnostic so you could deploy it on bare metal. We do development on our laptops. We take the same thing, build a Docker container out of it and then we deploy it onto a runtime. We could do the same thing and deploy it onto EC2. You could deploy it onto VMs. And so, there's a very flexible model out-of-the-box to deploy onto different target runtimes.

The other notion within Flink is the support for a stateful and stateless processing. Now, when we talk about stateless processing, what we're saying is, as those events flow through the system, the application is just processing one event at a time, and then it's not storing any state between the events. For example, if you're just taking an event and either enriching it or cleaning it and then moving it forward.

Now, stateful is very interesting. Let's take a look at a quick animation to see what's happening, and then I'll add a little more color to it. On the left, we have these source producers or producers that actually produce events. And then they end up going into a Kafka topic. The reason we use Kafka is we want a persistent buffer for a short period of time so that if our services are down, or if our stream processing jobs are down, we have availability of the messages, and we can process it so that we can at least guarantee processing these messages at least once. That is, we try our best effort that every event that we get, we're going to process at least once. Kafka kind of helps us do that. Then we have a streaming application, which is the three processes I showed you, it could be many more slaves. And then we'll look at what's happening to state as the events get processed.

We have an event that gets into Kafka and then it's pulled into the TaskManager. And one of the slots there processed the event, made some transformations offer, and then it automatically got stored into local state. Flink out-of-the-box provides mechanisms to store state locally, and that's what is being used here. Then this state is either available in-memory. You can configure it to do so, or you can have it backed by local disk which uses RocksDB back storage mechanism. It gives you a flexibility of how we want to actually store the state, and it's just a configuration option. You don't have to worry about it.

Now, Flink also gives you something called checkpointing. So, periodically, it takes asynchronous checkpoints so that it can save the state off the application itself. You can say, "Every 10 seconds, capture my state and back it off to a secondary storage." We actually store it on an S3 by default and then that storage is supported, SDFS is supported. When there's a failure, it'll actually rewind the state back to the last known good checkpoint. And because with the checkpoint, it also saves the offset to Kafka, so offset is nothing but a pointer in Kafka which tells you which was the last message you read. You can actually resurrect your job to begin the state where it left off.

This is pretty powerful because now this lets you provide exactly one semantics within the streaming application. So, for example, if you're doing counts and you have four messages and one is adding two and then you're adding two again, you're adding two again, and then you're adding five. You'd expect the answer to be 11 regardless of failures within the realm of the application. So that's what Flink guarantees. You'll actually get 11 regardless of how many failures you have because it checked by state.

Another cool feature is that you can actually explicitly take savepoints. This is a consistent snapshot of the state, and you can take the state, and then run another version of your application on it as long as it's compatible with the state DAG that was built or the job DAG that was built. And this lets you do some really cool things. The closest thing you can think about is think about good branches. When you branch off, you can do something with that state. Your original state is safe. So, savepoints lets you do a lot of that. It also lets you take a savepoint and scale up your job so that you can scale it up to a much larger state, or you can do A/B Testing on it, or you can try out a new algorithm on the state and see how it's going to behave before you deploy it and run canaries on it. There's a lot of possibilities with the way you can use savepoints and leverage them.

From the API perspective, Flink offer different levels of abstraction. At the easiest level on the top, they have SQL. The SQL is streaming SQL, and it's built on top of Apache Calcite. The challenges on SQL side is actually productionizing. If a user gives us SQL, and we are operating a platform that lets them run this, how do we determine how many resources it's going to take? Is it going to misbehave? What kind of joins is it going to do? It's a great mechanism, but there are lots of challenges there on offering that as a platform. And then there are low level APIs. We took a look at the data stream API today. And then if you go even lower-level, you can actually control how the state is being [inaudible 00:15:58] RocksDB, their SPI provider interfaces, and so on. It gives you a gamut of options to run through things.

Configurable Router

Let's jump into patterns now. And the way I'm going to describe it is I'm going to take a use case, a real use case that we encountered, how we've solved it, and how a pattern emerged out of it. I'll show you a small code snippet and then how that could code snippet actually gets deployed. And I'll mention any related patterns if there are any.

They're divided into functional and nonfunctional patterns, so we'll start with the functional ones. And the first one is called the configurable router. Back to our popular screen again. When the user clicks on this, as I was mentioning, there's a whole bunch of traffic that generated on the back end. These are the mesh of micro-services that actually front what you're seeing on the UI and generate tons of events. These could be use click, impressions, playbacks, health of the client, did you download a movie offline, status of that, and a lot of other such cool things. And even internally within the system, the micro-services generate tracing events, they generate error lag events, and other system events that are of interest to us. Now, these are in addition to our whole operating metric system, which is called Atlas, which is different. This generates a whole ton of events back into our system. Now, we want to take these events and clean them up, prep them, and get them either to the data warehouse, and get them to Elasticsearch, or other interesting things.

What we actually want to do is we want to allow the users to create ingest pipelines because I could have multiple streams of events. I might be interested in only generating events for my application, and I want that to be separate from another user. Maybe I want to create a personal ingest pipeline and then specify which sinks it's going to go to. And we want to do it with at least one semantics. And users want to some kind of basic filtering and projection, and we'll look at what those are.

In essence, what we wanted was a serverless product that's ready to use out-of-the-box, and the user does not have to write any code. They don't have to manage it. They don't have to operate it. They just come to the UI and then they say, "I want a new stream. And I want to take that stream, and one path I want to filter it and then send it to Hive. And then I want to just send the raw events to Elasticsearch as they are, and I also want to send them to a Kafka topic." And the filter that we looked at before is right here on the top, and the user specifies declarative syntax right here. Think about filtering as growing in a database when you write SQL. It's kind of like the row class. It filters out certain rows.

The projection, on the other hand, is like selecting columns of a SQL query when you're running it against a database. The user goes to the UI and just specifies these two things. And on the back end, we actually create in provisioning topic. And then we run a stream. We create and run a stream processing job. And why we call it the configurable router job is because this is a streaming application code that's written once. And then each time on deployment time, we can configure what filter it's going to run, what projection it's going to run, and what connectors it's going to use.

In this scenario where the user had specified three different paths, we ended up creating three different jobs. But the interesting thing to look at is it's already from the same Kafka topic. So it's a funnel of three. The same topic has been created thrice. You may wonder why. Why do we do that? Why do we use up so many resources and do the fan-out? The reason is isolation. If Hive is down, then you don't want the other two paths to be impacted. Or if Elasticsearch is down, you don't want the other two sources to be impacted. And the others denote the offsets so that if there's a failure, you can rewind back and start off where we left off. It provides us at least once guarantee of processing these events. We'll be using the R henceforth to refer to this routing job as we use it in other patterns.

This whole Keystone pipeline currently is processing up to a trillion new events every day. And it ends up being about 4 petabytes of data being moved around in the system, and there are 2,000 routing jobs. We looked at three, there are 2,000 of those, and it runs on 10,000 different containers. It's a massive scale pipeline, and so this pattern has been vetted at scale for a couple years. This is what it ends up being, the derived pattern, is you have a producer. You put it into a system like Kafka that is a temporary parser and lag buffer, and then you pass it through a routing job, which allows you to specify declarative processing like filtering and transmission. And, voila, you get something interesting into your sink.

This is the code snippet, pretty straight forward. We have the source on the top, right here and then we build a sink. And if you see here, the sinks are configurable. So depending on deployment time, it will pick one of the sinks. And then this filter function and projection function actually use the configuration that was parsed in, the declarative expert syntax or the projection syntax. And the user did not have to write any code. We didn't have to write any code for every new pipeline that was deployed. It was automatically deployed by our automation. We generate metrics, dashboards, lags, everything for us to automate and operate this. To the user, it's serverless and a 24/7 running system, and they don't actually have to ever worry about it. We take care of that for them.

The next variation of the configurable router pattern is driven by the fact that we have a high fan-out factor. You might have a really popular stream; we have our click event stream which is really popular. It has a very high fan-out factor. That means we have to keep scaling up our Kafka cluster so that it can handle the fan-out load. And after a while, it becomes very expensive. It's prone to failures, and you cannot scale a single Kafka cluster beyond a certain amount.

What do we do for that? In this scenario, this just tells us that there are two routing paths that are going to the same cluster, but they're of different topics. We leverage this fact because now they are in the same fall domain. It's the same cluster. If something goes wrong in the whole cluster, it's okay. And so, we do that trade-off of isolation to efficiency, but we only do it in the domain of one cluster. What we do is we merge the processing that was happening across two routers into one. And it's still configurable. You can still specify what filters and projection it needs to do. But now, the same job has reduced the fan-out factor from three to two. So it's efficiency optimization pattern that leverages what the router pattern starts off with.

This code snippet is pretty straight forward to that one. Flink allows you to take a stream and then do parallel mapping on it. So it just splits out and it does the processing for you. It's pretty straightforward. You didn't have to do much to come up with this pattern and implement it. Again, no user code needs to be deployed for this. It's, again, configurable. And based on the sinks that you've configured, it'll merge the topics together.

Script UDF Component

The next pattern is UDF. What if the configurable router for projection and filtering is not enough? You wanted a little bit more, right? But the user still does not want to manage the code and deploy it. Then they can plug in their business logic using a scripting engine. One of them was like Nashorn, so users can provide a JavaScript snippet to run. And so, the pattern that emerges out of it is we created this- it's actually, this is in the POC mode. It's not fully out yet, but it's a usable component that you can let a script run. And the script could be downloaded automatically from your dynamic runtime configuration system so that you might process 100 events, you change your script, 101st event will use the next new script to run.

Comparing this to the UDFs in SQL, the UDFs in SQL need to be written in Java and Flink, or Scholar. And you have to package the library, and so there's a little bit of ceremony for the user to go through and manage the code. This is actually complimentary to CDS because you can actually drop this component as one of the UDFs in SQL so now you have scripting engine support. The downside is all the cons of using scripting engine. You're going to introduce latencies, people can write any code, so you need to have the right kind of isolation on it. And because we run in containers and every job has its own, it provide some degree of isolation there.

This is the code snippet. Again, straight forward. It's just a map function. In the map function, we use a scripting engine. And this is the scripting engine functionality that JDK provides out-of-the-box. And it's using the Nashorn JDK on the right. And on the left, it's just using that engine to run the script.

The Enricher

The next one is the enricher. All the patterns that we’e look at so far haven’t require any deployment. The next three patterns that we're going to look at are patterns based on code that the user needs to write. That means they need to deploy it. We have this tooling built that lets them deploy their job, and it also automatically syncs the configuration changes in the code they might have made and make it very easy to override here so that they can take the same code through tests and production. And we also generate some metrics dashboard for them automatically, searchable application logs for them, and some niceties. But that's the deployment model we are going to use.

This third use case was where we wanted to generate interesting places that were happening on the user and identify them so that we can provide better personalization. And what this job ended up doing was using a stream from Keystone pipelines. It's using one of the router streams here, and it looks up additional information about this event from the play back history. You made a play if you want to look at additional information about it, we enrich the event, and then we also look up the data from the video metadata. Now, there's a rate limiter built in because we don't want to bombard the service. There are different ways of rate limiting. That itself is a big topic, so I won't get into that. But here, we are limiting the rate by a very dumb way of just launching less resource on the streaming job site.

The pattern that emerges out of this is the enricher pattern. And what it says is you can connect to a site input, and the site input could be a service call. It could be a lookout from a data store, or it could be data from a data store that's cached locally but refreshed frequently and periodically. And so, the data can be either pulled or pushed, or gathered in a sync or an async fashion. The code looks pretty similar. Again, the magic is happening in the map function where it's using the client libraries to get this data. And it's using another library to locally cache the data that's refreshed on a 30-second interval.

The Co-Process Joiner

This is one of the more complex use cases that we have that we're recently working on. It's called the co-process joiner. When you are on the website, you're browsing it up and down, then you are clicking it. Every time you browse, you're generating impressions. And then when you click on something, you're generating a play. We want to be able to understand what the conversion rate is, or what the take rate is. We want to understand the number of impressions before a userplay happens, and we want to find out additional attributes around it.

There about 10 billion impressions that gets generated and 2.5 billion play events. And what we've realized is that to build this actual job, we'll have to keep at, any point in time, two terabytes of state. We looked at how stateful processing works in Flink. So the state that was held across that job, ends up being approximately two terabytes.

Here's a quick animation to show you actually what the problem is. We have two different topics where we are getting events from impressions and plays, and we are interested in joining these events together to make some meaningful conversion rate analytics on top of it. We get an event from impressions, we get another event, and then we get one more event, and we get another event. If you notice, the events in a single stream are coming out of order, which is completely natural because you may be offline on your device. When you come back online, it might send it and take different paths through our service and then through the networks. We need to be able to handle this out of orderness.

The next thing we have to do is we want to actually process this by event time. What I mean by event time, is when the event was actually generated. If I am on my mobile device, and I watch a movie, and I get on a plane, and then when I come back online again, it's going to send an event. I want to know when that play actually happened or not when I received it, because when I received it, it's the wrong signal. You want to do it based on the event time.

To quickly understand event time, here's a quick graphic. Imagine an event that came between 12:00 and 1:00, right here. But it actually was created at somewhere between 11 and 12. So when you're processing based on event time, you'd actually put it in the right bucket. If you're doing it in processing time, you'd put it in the 12:00 to 1:00 bucket because that's when you receive it. This kind of little graphic helps you understand what processing by event time actually means.

There's another animation that shows you exactly what happens. We have an event that comes through the system. There's a GroupBy, there's some processing on it, and then we save it. And this is keyed by a key called K, you can see it on top of the I1 event, and then we get another event. We can call these events because we are able to do it. It's a kind of a reduction. We have multiple events of the same key; we can actually start reducing them. So it reduces those two events into one event. And then now, we got an event from the plays. And now, they actually match because they are the same key, and we found a match across the states. And then it merges those two events, and then emits the interesting result.

What it's doing here is within the same process, it actually has access to both states that are accumulating across both events. And so, we keep in memory as much as we can. We start reducing. And then after a while when we have a join, we actually join the events and send it through. This cool process mechanism comes out-of-the-box with Flink. We didn't have to write anything. The states managed, it's fully fault tolerant. We don't have to worry about concurrency. All that is taken care of.

This is the pattern that emerges out of it. We process and call these events for each stream, grouped by the key, and then we join if there's a match, then we emit the results. And then we, based on our timer, evict the state from these states. The timer is another functionality that Flink provides out-of-the-box where you can register timers and they'll fire after a certain period of time.

The code looks pretty simple here if you look at it. If you're setting up the sources, you're saying, "Use the same key, and do the processing by EventTime." And then we took the EventTime. What you see grayed out here, is something called watermarking. If you have a wall clock, you know the clock is ticking. But if you're basing it on an EventTime, how do you proceed the wall clock on it. You could say, there are events from 9:00, 10:00, 11:00, but what's my current clock? Watermarking is the concept that's used to show the time tick when you're using EventTime which says how much time has passed when I'm processing my event. Flink offers a very out-of-the-box mechanism. All we're saying is whatever the latest event is, let's say my event came at 9:00, then my watermark or the time talk was at 8:50. It's just 10 seconds behind.

Then we just use this connect function that Flink provides out-of-the-box to do everything that we need to do. And the joins actually happening in the co-process function. And the co-process function is simpler to API's, processElement1 and processElement2, corresponding to each stream. Independently of each stream, you update and reduce the state and then you look in the other state. If it's available, you join it and clean it out. So this whole complex pattern was made very simple with the functionality that's available..

Event-Sourced Materialized View

The next pattern is called event-sourced materialized view. And this is where we are leveraging Flink for an event-driven application, not to do ad-hoc analytics. As a user, when you go to a site, you're presented with all these nice videos. But they need to be attached or linked to actual files so that you can watch the videos. These assets or file assets, we need the links to those file assets. They come from a service which is backed by a cache which is our cache called EVCache, which is Memcached-based. And how does this cache actually get its data? It comes through the stream processing application.

When we upload all our assets to our CDN- everybody familiar with content delivery network? Okay. Cool. We upload all these assets to our content edge servers which are much closer to the users and ISPs so we can serve a bit faster. And as soon as the asset gets uploaded, it generates an event to a service on our back end, which is either the asset was added, or asset was deleted, or moved. And these events get generated into Kafka and then we put a streaming job there. The streaming job accumulates these state changes into a materialized view in the state. So, again, using Flink's state for processing functionality, and it creates this map saying, "Asset1 is on edge devices 1 and 2, and then asset2 is on edge devices 1 and 3." And then it publishes that information to EVCache on a batch basis. So every X number of seconds, it batches these updates and then flushes out the new delta changes.

Occasionally if you want to publish a whole snapshot that's in memory, to make sure no updates are lost and we get a fresher view, what we do is we have the pseudo source which generates trigger events into this streaming job, and it tells it to just flush out all the IDs that it's giving it. It's just saying, "You have a map of 10 assets. Here's asset 1, 2, 3, 4, …" Just goes through all of them and just sends a marker event. And the streaming job actually dumps all those events into EVCache, so we get a consistent snapshot view of it. That’s the materialized view in the pattern that we're talking about.

This is what ends up happening in the pattern-wise. You have an event publisher publishing events. It's like events sourcing. You're aggregating those events in to a view, and then you're flushing it out to a sink on a buffer basis or immediately. And then, optionally, you can inject markers so that you can force it to flush. And this optional trigger could be based on your dynamic configuration or code book you do, however you want to configure it.

This is the code. We set up the source. The trigger source, we say parallelism1 because we only want one instance of the source to trigger everything. We don't want multiples of it. We're saying create only one source for the trigger and then create another source that actually reads the events coming from the CDN servers. And then we do the union of the two sources, and then we do the regular functionality. We do a keyBy. We process the events, and then we update the in-memory state here. And then after that, we publish it to EVCache.

Elastic Dev Interface

That ends our functional patterns. Now, we have three more nonfunctional patterns. The first one is the development interfaces. When you're building a platform and stream processing functionality, what we've observed is it's useful to have a flexible set of APIs. This is what we've found based on ease of use, going from bottom to top, and capabilities increase as you go from top to bottom. If you looked at the point and click UI and the UDFs, it’s extremely easy for anybody to use. It takes five minutes to use it and get all the benefits, but it's limited in what the user can do with it.

Next would be SQL with their own UDFs, it provides little more functionality, but then the onus is on the users to maintain the scale of it and there's a little bit of operational overhead time. Though, we don't have that functionality yet, we have some POCs for the SQL stuff running, just because of the operational complexity and challenges with it. The next one is using annotation-based APIs. We have a use case internally that's using certain application-specific annotations. And when a user uses an interface, and specifies those application-specific DSL annotations, they take that and they degenerate code using Javapoint and a few other libraries. They generate Flink code, and then they actually use our automation deployment that I showed you that has an API for it to deploy.

At the lowest level, we looked at all the code for the patterns that we looked at, the data stream APIs. That's what we have. And we mixed that with reusable components, like the script UDF. We have data hygiene component that we can use, rules that are from a different service, and we looked at the script transformer engines. So those are the four variations of APIs you can have based on ease of use capability and flexibility of the platform.

Stream Processing Platform

As we've been building this stream processing platform in Netflix, this itself has become a pattern within Netflix. When we say we provide a Stream Processing as a Service platform or a SpaaS, people expect this out of it. What it actually is, is our whole platform actually runs on Amazon EC2. Then on top of that, we have a container runtime that another team manages, and then we have a stream processing platform. It's the streaming engine plus a whole bunch of config management, integration into Netflix ecosystem for metrics and monitoring and for our service discovery. And on top of that are reusable components, like sources and sinks, different kinds of connectors that you looked at when it writes to Hive, S3, Elasticsearch, Kafka. So, we provide that out-of-the-box.

Then on top of that, we build the routers. The whole Keystone pipelines are built on that. At the same level, people can write custom streaming jobs using code deployments. Now, vertically, a functionality that's leveraged across the whole stack is the management service in UI, which you took a look at, and then we have functionality for making it extremely easy for users to create brand new jobs. You may be familiar with the Maven archetype. What we do is out-of-the-box, it takes users five minutes to take a concept and create a stream process, and drop out of it. Like archetype, you have a command line tool that creates a Git Ripple, creates a Jenkins job, and then also creates local IntelliJ IDEA file so you can actually run it locally.

It also sends your configuration to our deployment tool so that when you're ready to deploy, the containers are available, and you can have the configuration and override it. All this functionality usually takes about more than a week to build. Now, it takes five minutes for anybody to get started off it. And then we provide some metrics and monitoring for these jobs out-of-the-box. And we send all the lags from these streaming jobs and index them so that they're searchable across the board. It's like a full package solution for anybody who wants to do stream processing in Netflix. So, internally, when we talk about SpaaS, actually people know what they mean, so I thought including this would be useful as a pattern because it's emerging as one internally.

Rewind & Restatement

The last pattern for today is rewind and restatement. Now, what happens when there are bugs in the code, or you want to reprocess some data? On batch, you go, reprocess the whole batch. The batch fails, you start from, again, for eight hours and then you process the batch again. But in streaming, you've been streaming the data and you've been processing it. How do we do it?

Let's set point x now, x plus 1. We figure out that there was actually an outage because there was a bug in our code or there was something wrong, and there are some corrupted events that happened into our system. But if you recall, we've been taking consistent checkpoints. And we've taken three check points so far. So what we do is we actually roll back our state and the processing. Basically, if you look at this here, Kafka was at second message. And if you look at the next one, it went back to this message. Because when the checkpoint was taken at this point in time, it was processing this message. And the state got rolled back too. So we rolled back the state, we rolled back the pointer in Kafka, and we went back to a well-known good checkpoint.

Now, you can actually go and deploy new code you want. And you can start of exactly at this point, and then I can fix it going forward. The whole space of restatement and rewind, and being able to correct data of stream processing jobs, it's quite complex. There are lots of other variations we are working on. As we work on more patterns, they emerge out of it.

Here's a quick summary of the eight patterns we looked at. The configurable router, the script component, the enrincher, the co-process joiner, the event-sourced materialized view, the elastic interface, and the stream processing platform, and rewind and restatement. If you like what you saw or you have patterns of your own, or you didn't like something, you have better names, feel free to reach out. I'd be happy to have a chat or discussions with you. This is just the beginning of cataloging the different patterns that are occurring, so a conversation is welcome. Thank you.

Questions and Answers

Participant 1: Thank you very much for your talk. I wanted to ask, how do you scale job? Do you have any functionality to identify the burst of your source and maybe out of scale somehow, or do you just redeploy the whole Flink job?

Daxini: Well, that's a great question. Proactive auto-scaling is not built in yet. There's a proposal out in the Flink community to add auto-scaling that we're working on. What we do internally is we have external processes that monitor, and we have metrics that report lag in the system. So when there's a lag in the system, we have some automation that'll boost the capacity and it'll redeploy that job. Because the state is fault tolerant, we can just stop the job, increase the capacity, and then relaunch it. It's running on containers which come up really fast, so the downtime is pretty small. So at this point in time, that's how we scale until we get to auto-scaling it.

Participant 2: You're talking about joining multiple steam processes to create the batch before you commit to a destination. How do you know when the events have finished for a given join kind of thing?

Daxini: When do we know when the events are joined? There are few possibilities when you know it's finished. One is, depending on the domain. For our use case, we've got impressions and plays. We know that the impressions that are showed, the page that shows you after X number of hours, that becomes invalid. So we know that after X number of hours, we're not going to get impressions for that factor.

Whatever plays are left over, we can either put them out to a different output channel for analysis later as to why we didn't find a match, or we can just capture acorn metric and just drop it. In other scenarios, you know because you found a join. For example, you're just looking for one join and then you're done, then you know. It's either application logic specific or it's a timeout specific, or it's a resource specific.

 

See more presentations with transcripts

 

Recorded at:

Feb 19, 2019

BT