Facilitating the Spread of Knowledge and Innovation in Professional Software Development

Write for InfoQ


Choose your language

InfoQ Homepage Presentations Massive Scale Anomaly Detection Framework

Massive Scale Anomaly Detection Framework



Guy Gerson introduces an anomaly detection framework PayPal has developed and is using internally, focusing on flexibility to support different types of statistical and machine learning models, and inspired by the design of scikit-learn and Spark MLlib.


Guy Gerson is a Software Engineer on PayPal’s next generation stream processing platform core team. He is currently working on the adaptation of Statistical and Machine learning methodologies as part of real-time data pipelines. Prior to PayPal, He was a Researcher on the IBM Cloud and Data Technologies group focusing on designing large scale Internet of Things analytics architectures.

About the conference is a practical AI and machine learning conference bringing together software teams working on all aspects of AI and machine learning.


Gerson: Hi, everyone. Thank you for coming to this talk. $22.8 billion, that was the estimated loss from a card fraud alone globally in 2016. We expect this number to reach $43.8 billion by 2025. It might almost double itself. We can imagine taking $1 bills, putting them in a straight line and it turns out we would be able to circle planet Earth and do it again for another 200 times. Just try to imagine this loss. This is only from card fraud. If we know of vulnerabilities in our systems, then we can maybe write code to block it or maybe set up some rules. But how do we face fraud or vulnerability that we are not aware of? And one way to do that would be to use anomaly detection. But of course, anomaly detection is not just for fraud. We basically want to know anything that goes wrong in our business, in our systems, in our services, micro-services, hardware servers, pretty much anything. This is why anomaly detection is such a hot topic right now and there were even a few talks about it in this conference. Also, finding anomalies and responding to them fast in real time is important for us not just to save loss in the short term, but it also really builds trust with our customers and with our partners. But finding anomalies is tricky. Anomalies are oddly shaped, they are sometimes even unique, and they're also hidden inside just mountains of normal, everyday data, but yet they are extremely valuable for us.

Introduction to Anomalies

This picture was taken in Serra Pelada. It's a famous gold mine in Brazil that was active in the 1980s. In this gold mine, each worker received a small patch of land, and he was in charge of extracting all the gold from that batch. Obviously, this isn't very efficient, it doesn't scale well in its way of working. At one point, there were over 100,000 workers in this mine, and working in really bad conditions. I hope this talk would inspire you to use and build better tools for the problems you're trying to face, like this one, and which can make your work more productive and scale out much better.

Together with my team at PayPal, we build big data platforms and solutions. A recent project we were working on recently was building the next generation stream processing platform for PayPal. As we had more and more use cases wanting to onboard the platform, we started to notice that many of them want to use stream processing to do specifically anomaly detection. So, we really started to focus also on anomaly detection.

In this talk, I want to share with you what we learned about anomaly detection, but from a perspective of a software engineer, so I wouldn't focus on any specific methodology of detecting anomalies, but more in general on what we learned in this work. Also it resulted in building a stream processing anomaly detection framework which is called Yurita. This is also kind of an announcement talk because a few weeks ago, we've open sourced this project. In the first part of the talk, I would go over the challenges and the motivation of doing anomaly detection and doing it in real-time. Then I would go a bit more technical into Yurita itself and how we tackled these challenges.

What does Yurita do?

Internally, Yurita proved to be very valuable. It's been successful for different use cases like fraud prevention, operational monitoring, like seeing what happens in our services, and also for security reasons, it's also been successful. I want to show you some example why finding anomalies is something that is so difficult. We can look at this data set and some of you might think that this is an anomaly and might not be fooled, because when we look at the full picture, we see that this is just the normal behavior of this data showing this kind of peaks.

I want to show some more examples. Let's say we start together a business, we offer some online service and we want to see what's happening in our business. We might get a real-time monitor and get a data set that looks like this, which is representative of a daily cycle for business. I'm sure many of you can see what is done normally in this data set. This is because our brain is pretty good at understanding patterns. The challenge is how do we make a machine or a computer also know when something goes wrong and doesn't behave as it should? One way we can do it is to use statistics. For example, I applied some sort of Z scoring function on this data set and I got this data set. I think now it's much more clear to see where did an anomaly happen on this data.

This is another example, and this time we look at some distribution. We might be a pretty versatile business and we want to know how people are using our services right now. We have different entry points. They can use it from any desktop browsers, or mobile or some other devices. We got a distribution like this, and we might think that the left bar is showing an anomaly, but if we know the normal behavior of our data, and what we call a reference behavior, we see that the left bar is showing [inaudible 00:06:35] behavior. Again, I can use the statistics to check which one is an anomaly. Now again, this time we used an entropy function, and it's much more clear to see that the middle bar is showing an anomaly.

The last example I want to show is using clustering. We often use clustering when we want to find some correlation between features of events. In this case, we would look for outliers. It would be outliers which are points that don't belong to any cluster. They might be suspected to be an anomaly. In this case, I used the density-based clustering, specifically DBSCAN, and then got a clustering like this, and we see that one point doesn't belong and it might be an anomaly.

Looking at these and many more use cases, we started to notice that we can describe many of the use cases using three high-level questions. The questions were, first, what is the current observation? What is the data that I'm seeing right now in real-time? The second question was, what is the normal behavior of my data and how does it usually behave? Answering the last question is, are they significantly different? If they are, it might be an anomaly. As a platform team, we look at all these different use cases that doesn't really fit together and we try to understand what is shared between them, what is common, and how can we make it generic.

Understanding the Algorithm

I like to think about scalability of a platform in a three axis cube, with three dimensions. The first axis of this scalability cube would be productivity. This is not the normal way you would develop a scale but it's very important for an organization to also focus on that. Usually, when we start to work on some model, I'm sure many of you are familiar with the stages in general for machine learning, but also specifically for anomaly detection, the first phase will be data preparation. Your data scientists would work on the acquisition, would work on cleaning the data, normalize it, maybe select some features. The next stage would be building a model. We try to solve a specific problem, building a lab version of that model.

The next stage, the data scientist would be teamed up with a software engineer. They will work together to create a production grade model that is robust, scalable and solves the problem. The next step would be evaluation. We would evaluate if the lab model and the production model give the same results. It turns out that many times they don't. And then, we need to do a few iterations and to get it just right, to get the correct results in production. This process is very time-consuming. It happens again and again for any model we want to build and deploy.

Usually, when we think about frameworks, we build frameworks in order to really boost the productionization phase. We want the research phase and the production phase to be very similar from a logical perspective. Also then, we need to do less iterations, so it makes development much more productive. It can really reduce development time from a few months to just a few weeks.

In our case of anomaly detection, we also wanted to really focus about model flexibility. We wanted to allow our data scientist to choose the really most industry commonly used methodologies and to really think about it from the start. So, the next axis of the cube would be data scalability. This is what you would usually think of it. For us, it's very important at PayPal because PayPal currently have over 250 million active users, which means they are users who are doing transactions. The numbers, that is true for 2017, we had 7.8 billion payment transactions. We have many things running under the hood - we have compliance and fraud, so, when you do a transaction, many things happen. It produces over 400 billion Kafka messages every day, which we want to analyze. Being able to process really huge amounts of data is very important for us.

The last axis would be latency. This is true when we start to talk about real-time analytics, being able to reduce the latency and also bound it, allows us to build really reliable and scalable stream processing applications.

This is true from many of the examples but even just looking at the fraud example, most of the damage from fraud happens very early on. This is because after that users start to notice they can change passwords and there are things we can do. It's very important for us to stop the fraud as fast as we can. We can really talk about doing real-time applications today because there have been recent advancements in state of the art data processing and frameworks.

We can today run a much more complex logic when we do stream processing than we could even a few years ago. This is because we have really great projects today for stream processing. We have Apache Flink, Apache Beam, Spark streaming, its evolution structured streaming, all great projects, but there is still a gap from an analytical perspective. In the analytical domain we still need to work on this here because traditionally statistical and machine learning models have been designed with the batch paradigm in mind, and they many times have some underlying assumptions like all the data is available during processing, the data set is constant while you process, the data can be very big and stored on disks.

Also, the processing can take hours, sometimes even more, and it's still okay for us. When we talk about stream processing, the picture looks a bit different. So, our data is always partial, our data set is actually infinite. The data set is passing through the system and is usually held in memory. Also, processing should take seconds. In the worst case, sometimes we even want lower latencies than seconds.

Applying the Algorithm on Yurita

Now that I've talked a bit about the challenges and the motivations for tackling anomaly detection and doing it in real time and in high scale, I want to show you how Yurita really solves each of these challenges. Like I said, Yurita is now an open source, and we'll talk about it a bit more at the end. The name Yurita is actually coming from a traditional Japanese gold panning tool, which we thought is a nice analogy for anomaly detection.

The first axis we had was productivity. We really wanted to look what are the common building blocks, what is shared between our use cases. We also wanted to focus about flexibility. We took a plug-in approach, so the system would be very extendable. Each of our components is completely replaceable and extendable. We came up with a design which is based on a pipeline approach. This was inspired by other successful projects out there like scikit-learn and Spark MLlib. And I will go over this pipeline and explain what are the components we thought that are common.

We can imagine an event stream going from left to right. Let's say our event is maybe a user just logged in, and maybe we know the gender and the age of that user. First, we might focus on only the matrices that are interesting for us. Also, because we talk about stream processing and the data set is infinite, we tend to work in windows so we have a logical way of working with the data. We can maybe define 20-minute windows for the gender metric and 30-minute windows for the age metric.

The next phase will be data aggregation. The aggregation is basically a data structure and a transformation that makes every window some way to model the data that we see in real time. In the gender example, we just used a basic count map because we might be interested in the distribution of values in that window. Then in the age case, we might just want to look at some statistics of what have we seen in that time window.

The next step would be the anomaly model itself. The anomaly model receives the real-time aggregation that we have. Its job is to tell us is this an anomaly? The anomaly model can be implemented in a lot of different ways. Maybe in the gender case, I want to use statistical entropy and maybe I want to use Z scoring for the age modeling.

Also, important to us is that the output of these models is another stream, which is the analysis stream of reports. This is ready for post-processing. So, we can continue to process this event stream, maybe plugging it into a dashboard or sending alerts, or maybe just sending it back to Kafka to be consumed by a different downstream application. Also, something that is very important for anomaly detection is knowing why did the anomaly happen. If we know what caused the anomaly, we might take real time, even automatic action to tackle the cause of it.

How to Distinguish an Anomaly

Something that was missing from the pipeline I've just shown is what is the expected observation? What is the normal behavior of my data? This was the question I raised before. I want to show you how we tackle this approach. Let's say I have a data stream like this, which have some seasonality in it. Right now, it's Saturday between 2:00 to 3:00, which by coincidence is a peak in this data. Traditionally, what we would see is using maybe some offline batch core fitting algorithm that would try to fit the shape of the data. Now we want to use something that doesn't need so much expertise and something that is much easier to get pretty good results with. With it, maybe we can answer the question, what is the normal behavior of my data between 2:00 to 3:00 on Saturdays? I might limit myself by looking just on the last month, if we have some trend in the data.

The way we did it in Yurita, we allow a user to create strategies. These strategies can basically tell how to treat the historical data when in the model. And I'll show how it fits in. In this example, to capture this pretty complex logic, we use one line of code which basically tells the user strategy of recurring duration. From now, look one hour back in time, slide back every one week, and limit yourself by one month.

To look back at the pipeline picture, we also have a historical aggregation pool. Every historic aggregation that we already evaluated, if it's an anomaly, we still keep around. We might keep it in memory or offload it to an external key-value store.

Another part that was missing from the model is the reducer. The reducer knows how to take the historic aggregations that were [inaudible 00:19:55] and it will take it and kind of reduce it to a reference aggregation that represents the normal behavior of my data. We found it to be important that the reference is modeled the same as the real-time aggregation. Because they are modeled similarly, we can easily compare them for evaluation.

Back to the full pipeline. I want to show you in code really how each of these components looks like. The first stage was windowing. We can just specify which type of windowing we want - maybe it's a sliding window or a fixed window of 20 minutes. The aggregation model is also described by an interface and in this case, we use the categorical aggregation, which is the account map. A user can really plug in which type of aggregation he wants using our interfaces, but we also have built-in interfaces to start with it more easily.

The anomaly model, in this case, the categorical anomaly model, is described with three lambda functions. The first one would be the reducer. It's the average reference. It tells the system to calculate how to work with the historic congregations. In this example, just calculate the average of the reference, which will represent the normal behavior.

The second function would be which statistical function to use for the evaluation and we chose here entropy. The last one is just the threshold, maybe put a 0.9 threshold to tell us that the entropy was too high and we found an anomaly. All of these come together with these five lines of code, which [inaudible 00:21:47] the pipeline we've just seen. Think about it, this is not a toy example. This code can actually run in production in very high scale. It's something really nice that we achieved and I'll show how we made it happen that it can really run in this the scale.

Before that, I want to talk a bit about the flexibility of this design. Why did we like it? Let's say I want to change this model I've just shown, to use a forecasting anomaly detection model, maybe similar to the profit model. If you’ve seen, there is tomorrow a talk about it.

The only thing I basically need to change is that my reducer, that looks at the history, is just a forecasting reducer. It looks at the history, create a forecast what we expect to see in the real-time. Then, when we have a forecast of what is the expected observation, then we again can calculate if the real time is very different than the forecast. Basically, the only thing we needed to change is this line of code, which is the reducer to use a forecasting reference. This shows how it's really easy to extend it. Then we can evaluate it. Because nothing else changed, we can really evaluate which of them give a better performance for us.

The next axis of the cube was data scalability. The way we tackled that is that I just showed you one pipeline [inaudible 00:23:29] many pipelines like this, and these are just definitions of the computation. It's not the computation itself. All of these pipelines are getting flattened to a single optimized execution flow. This execution flow is just added to Spark, a structured streaming computation graph. We get a single flow that processes many of these matrices and anomalies in very high scale and which we get from the Spark framework.

I will show you how all of it comes together. We can imagine having some data frame, and we might get it as a streaming data frame and load it from Kafka. We defined our pipeline, we defined the workload, the workload is the flattened execution flow. Then, we just extend the data frame API, we set  detect anomalies function, and give it the execution flow we just added. Then, we get an output stream of reports which are the results of the anomaly process. I wanted to show you also to focus on this line which tells build with watermark. How many of you are familiar with the watermark concept in stream processing? I see a few hands but not too many, so, I would give a high overview of what is watermarking in two slides, just to show you how the result of the processing I just showed, looks like.

This is kind of a prettified print of the report, which tells us an anomaly was not detected, the statistical function was evaluated to some value. We also look at what is the contribution of each category to this result, what was the current distribution that was evaluated, what was the reference distribution that was created for the system in which time ranges is this talking about. This is very important, the observability to understand what's happening in the model, why did the model make a decision is very important for us both for debugging, for research and for development.


Now a few slides on watermarking. Let's say we have an event stream going from left to right.

Watermark is basically some trailing interval from the most recent event we've seen. We basically tell the system that we allow for late events to be consumed in this interval. Because we can imagine somebody using an application on a mobile device and then entering a tunnel, and only after they exited the tunnel, the message would be sent to our servers. It's what we call the late arrival.

As we move in time, the window is trailing and this is how we specify it for the system, you can still accept late events. If events are too late, then we just ignore them. Together with windowing, as I've talked about before, we can get a picture like this, but something that's interesting is, if we look at that window with an arrow, that window can't change anymore because we don't accept a late event and because it can't change anymore, we can offload it to a key-value store or clear it from memory.

This way we can really manage the memory state we store and when we do anomaly detection. This is relevant for the historical aggregation pool. So we really took the concept of watermarking which was used for one thing and kind of extended it to solve other problems in stream processing.

Importance of Rapidity

The last axis I want to show is the latency. I'm sure many of you are familiar with this architecture. We have some events coming in. And we have in real time stream processing engines that process these events. But we also store these events in a long-term storage like Hadoop, and then do once in a while, a batch process that runs maybe every few hours or every few days, create more fresh models, store them in a serving layer.

Then, the stream processing consumes the recent model. The problem with this architecture, I think it was also touched in the last talk in this thread, is that the models sometimes are not fresh enough. If we want to really detect abrupt changes like anomalies, we need to be able to react as fast as we can.

I think that in the next few years, we will see more advancement on more and more logic moving to the real-time flow from the batch flow. In order to do that, we must first change a bit our mindset on how do we build robust and scalable streaming applications. Three important factors in doing that is being able to bound and reduce the processing time, being able to bound and reduce the memory footprint, and also be able to prune redundant computation with things like caching. What we want to do is really look at streaming oriented building blocks as first-class citizen from day one. So, we don't think about streaming data structures as an afterthought of how we solve a specific problem. From day one they're an important member of this framework.

I want to show you some of the work we've done on stream processing data structures. We have again an event stream going from left to right, and we windowed the events. We have already built in statistical functions inside the framework, all of them are lazy evaluated, so we don't compute any statistics that we didn't really actually need for any of the models. We also used approximation. Some statistics like percentiles are computed much more efficiently using approximation. Also, we have computation caching, so we cache any statistics that we computed. This is all under the hood. The user doesn't even know it's cached. Only if the data becomes stale, then we re-compute the cache again.

When we talk about footprint reduction, we try to work with small computation units, really bring our events down to managing them in blocks using aggregations, and also using data sampling. If you think about it, data sampling of an infinite data set is not something so trivial. We really integrated the industry algorithm for doing that which is called reservoir sampling as part of the framework.


We still have many challenges remaining for us. We're just getting started with the framework. We wanted the community to join and help us work on these problems. One example of a problem is: let's say I have my reference, which describe the behavior of my data from 1 to 2. But in real time, if I look at that window, I only seen partial view of my events. I still didn't see all the events. How do I work with that? If I wait for the whole window to end, then I get a more correct answer, but I'm losing responsiveness. If I would decide too early, then it's much more responsive, but it's not so accurate. It might change as we see more events. Maybe we can look at some partial and some percentage of the reference and we have many ways we tried to solve this. But it really depends on the model. What does the model favor in the trade of responsiveness or correctness or accuracy?

Now that you understand the challenges and the motivation we had for building Yurita, how we took an approach for focusing on productivity for developers and data scientists, in building this framework, how we solved the problem for really scaling the data to high volumes, and also how we worked on the latency with stream processing, building blocks as first-class citizens, I would like to invite you to go to the link on GitHub, play with Yurita, maybe even contribute, add models. We might be able to create a really versatile toolbox for anomaly detection.

I would like to thank you for coming to this talk, and I hope we can fine-build together. Thank you. We have a few minutes for questions, but I'm still hanging around.

Questions and Answers

Participant 1: In relation to the concept of windows, I have a couple of questions. Can you give us some examples of the sizes of those windows in terms of the number of events? Following-up question would be you did mention storing windows in a key-value store. Were you referring to the windows themselves or were you talking about events within the windows? And also, if you can give us some examples of aggregations you applied and the kinds of irregularities you were able to identify within a window, that would be great. Thank you. Wonderful talk.

Gerson: There were three questions, the first one was the size of the windows. It really depends on the model. Some models might say, "One hour is enough for me." Some might say, "Maybe just a few minutes." We usually don't see like a window for really low latencies like milliseconds or seconds. When we look about kind of a wide approach on how everything in our system happens like this, we're more focused on throughput. And for many of these cases, it's fine. We could go to lower latency, it's not a limitation by the system but in the most cases we've seen in production, a few minutes would be fine. Also, you need to remember the evaluation depends on the window size. We can maybe evaluate every 20 seconds if this window is an anomaly, but the window might still describe maybe an hour. We have the triggering of the stream, which when do we run the computation and we have the window, which is the logical perspective of the events. So, we're still being responsive in writing the micro batch, or the single event resolution. This is for the first question. The second question was?

Participant 1: Key-value store, where you're storing windows or events.

Gerson: We basically store the aggregation. It's not the events themselves, it's more how the user modeled and the data in between those. It might be the aggregation. In some cases, we might want the events, like we might do clustering and we want the events themselves. Then we would usually work with a sample. So, the aggregation would be just a sample of the events and this would be treated as the aggregation but obviously, when you do sampling you'll need to make sure you don't sample the anomalies. That's why it works good for the historical ones because they're not interesting for us from an anomaly perspective. We still keep all the anomalies and we store a sample of the data as historical values. And the last question was?

Participant 1: You answered this indirectly by answering the other questions.

Participant 2: I want to ask about your reducer, you need to specify the threshold and your loss. Shouldn’t these three be coupled together, like you train a model but you train on the specific laws? And if you decouple it, this is simply not making sense for me. And also, is this Yurita applying on the transactions law? Because most the users don't have transactions. There's a window. It's very dynamic. You may not be able to find any records. So, if you have to specify a fixed window, isn’t that ...?

Gerson: I'm not sure I understood the question. There is some coupling. The reducer and the statistical function don't have to be coupled, right? Because I might want to try a different type of statistical function as research to know which of them give me better detection of my models. The threshold is a bit coupled to them and we were really thinking that we can maybe make it automatic, the threshold detection. There have been some attempts in the industry for dynamic thresholding but usually, for starting, a user can plug in how he wants to do the threshold. And if a fixed threshold is good for him, it might be okay. But we can also plug in kind of a dynamic thresholding mechanism, but we still haven't got to doing that. Did it answer your question?

Participant 2: Let’s say your units at the thresholding may not be compliant with the units, so you're using the reducer, right? If your score is saying 100 scale but your thresholding, the user thought, "Oh, that threshold should be in the zero to one scale," then how can that fit? He has to know what a reducer looked like.

Gerson: The framework is really is focused for developers and data scientists. So, we don't tell them, where is your threshold? We allow them to do the research and define what is working well for the specific problem. In the future, we might try to add something also automatic.

Participant 3: What's your building block to reduce false alarm?

Gerson: The building block to reduce false alarms?

Participant 3: Yes, false anomaly. I'm pretty sure you have a lot of battlefield examples of how false alarm cause people have less confidence on the system. But what are the building blocks? I think it's good for making a better model to combat false alarms.

Gerson: This is a really good question, because a false alarm is kind of what always makes anomaly detection challenging. And I don't think we really solved it. It's still a problem for the developers and the data scientists to tune it. In the framework, it's more like allowing them to use any type of models they want. But we don't have any internal mechanism for working with false alarms. We just give a good tool for the developers to create more robust models.


See more presentations with transcripts


Recorded at:

May 22, 2019