Facilitating the Spread of Knowledge and Innovation in Professional Software Development

Write for InfoQ


Choose your language

InfoQ Homepage Presentations High Performance Cooperative Distributed Systems in Adtech

High Performance Cooperative Distributed Systems in Adtech



Stan Rosenberg explores a set of core building blocks exhibited by Adtech platforms and applies them towards building a fraud detection platform. After addressing performance, he touches on the key attributes of system reliability and quality in an Adtech system. He concludes with many insights learned from building one of the leading fraud detection platforms from the ground up.


Stan Rosenberg currently heads up engineering at Forensiq - a leading fraud detection platform for online advertising. Prior to Forensiq, he built distributed platforms for startups.

About the conference

Software is changing the world. QCon empowers software development by facilitating the spread of knowledge and innovation in the developer community. A practitioner-driven conference, QCon is designed for technical team leads, architects, engineering directors, and project managers who influence innovation in their teams.


Rosenberg: I'm going to jump into some performance graph and set up some context. It looks like a throughput graph. We're looking here at Prebid traffic. For those of you not familiar with ad tech, this is what happens in a real-time bidding environment. Before an ad is actually displayed on the end-user device, there's a request that enters one of these platforms and an auction takes place. This is the Prebid environment. What's particular about this graph, now that I've given you the context is, data is continuously streaming in.

There's continuous traffic and it has periodic patterns. You could see that there's definitely bursty traffic, spikes going in the upwards direction. What's interesting here is the downwards direction, that should almost never happen. That gives you some idea that something perhaps is wrong. It could theoretically happen, that your major traffic source provider all of a sudden dropped out, there's a network partition or whatever, but that wasn't the case.

In any case, this would be suspicious. If you're doing any sort of anomaly deduction, it would have picked it up. Very quickly, we're able to correlate this with the following graph, which is probably familiar to a lot of folks. This is garbage collection pauses, instantly you can, or assume we're using a managed runtime. Yes, we are, this is JVM stuff. You can see, these are very long pauses.

Moreover, if you look a little bit more carefully, they are happening every hour. Moreover, they're happening almost at the same time every hour, but not exactly. The minute offset is slightly different. That gives us enough clues to start to dig in at all the metrics and trying to correlate this. After a while - and this was not easy, this was not trivial, believe me - this problem was actually narrowed down.

The root cause of this problem was a sub-system in our API which periodically reloads data from Cloud Storage into memory. The way this works is, there's an offline data pipeline that computes the new version of the data. When that data is available, which explains this irregularity in terms of the minute offset, the servers detect there's a new version and then they reload this.

This was two weeks after we went live in production, so everything was running smoothly until that time, and all of a sudden this happened. The result was that the data grew in size because the job that was producing this output actually produced a lot more output, which no longer fit into memory. Therefore, the servers started to run out of memory because it's a distributed system. This happened asynchronously, so some of them would fall over, new instances would come up. Therefore, there was a huge performance degradation.

This leads me to the next slide. We're dealing here with distributed systems, and my favorite definition of distributed systems is this quote from Ken Arnold. Basically, when you build a distributed system, your assumption is that everything can fail. "Failure happens all the time." The historical fallacies of distributed computing is a subset, it's not a whole list, actually, that Peter Deutsch published, but we all now know today that there's no free lunch. That's what it's alluding to.

Network is not reliable, latency is never zero, bandwidth is never infinite although it can get pretty close to that in a cloud environment, to argue in a future slide. Transport cost is also not zero. There's a lot of overhead with all these components.

Past work, I did some work in academia. This is what I used to do. Basically, use specifications and code and prove programs correct. This was functional correctness, using logic, theorem provers, stuff like that. This is more of what I do today, try to look at stack traces like that and find root causes. This is a stack trace from Spark, rather obscure. Unfortunately, this happens quite frequently.

Intro is slightly more formal. As I said, my dissertation was on proving correctness of some subset of Java. Recently, I've been building platforms more focusing on distributed platforms in the ad tech startup scene. AppNexus is basically trying to serve ads faster. PlaceIQ, we're using location data to optimize ads. More recently, I've been building a platform at Forensiq which is on the other spectrum, you could argue of ad tech, which is fighting fraud in ad tech.

This is not a talk about ad fraud. There have been some great talks yesterday from Johnny [Xmas] and Jarrod [Overson] on security. I'll definitely mention some intersections there. This is a talk about building distributed systems, building high performance systems with some hints to fraud in ad tech.

Forensiq is the company I work for, here’s just a very quick overview. We're basically a fraud detection and verification stack. It's a comprehensive stack, many different solutions, different API's. Particularly, we are offering Prebid API integration, the majority of this talk is about that. This is, in terms of performance, the most challenging one. We're MRC certified, and a lot of what we do is real-time scoring, although our capabilities also include offline scoring.

Fraud Examples

This is an example, I have two. There's a caveat here. I apologize ahead of time if you're on this site. I don't mean to offend anyone. I looked at our scoring system and I picked a site at random. Not quite random completely, I was really biased by the names of those domains. This name kind of drew my attention. That's the RealClear Science, that's kind of an oxymoron in itself. I said, "Ok, let me look at this website and see what's going on here." You can see, that's a screenshot of the website over there.

You could also see this is a plugin that tells you all the requests that are being made into these advertising platforms and exchanges. This has more than 10 of them. You can see there are some well-known names like Rubycon, there's DFP from Google, there's Amazon, AppNexus, all the major players. Just looking at that, you can make your own conclusions, obviously, without knowing the data. One of that would be this website is really optimized for ads, not content.

Indeed, if you actually look at it, there's not much unique content on the website, rather than just linking to other publications. This is data from our system. If we look at this domain, we basically can classify the types of invalid traffic that we see on that domain. The SIVT and GIVT, those are designations that MRC mandates. That just means Sophisticated Invalid Traffic and General Invalid Traffic.

More interestingly, you could see it broken down by categories. There's automated traffic which is about 82% of the traffic that we see go into this domain. The automated traffic is basically just bots. Compare that to something like proxy. If you went to Johnny's [Xmas] talk yesterday, he talked about these automated proxy solutions. They can quickly rotate through our IPs. They're not really hard to find, it's usually some sort of detection system in conjunction with a list.

This is only 13%. Then there's an IP reputation which kind of falls into that category. That IP is bad because it's being used by a lot of these, either bots or hackers, anyone who's trying to pretend to be human. You can see that percentage is low, which means that whoever is supplying traffic to this domain is of some level of sophistication because this is automated traffic. Automated traffic is a lot harder to detect, but what it means is that there's just bots visiting this website.

This is not related to Forensiq but there's a great website, SimilarWeb. It's a web analytics company. I thought this was interesting. This is a free version of it, you can type in domain and it will analyze that domain in terms of all things that are referring to it. These are incoming addresses and outgoing edges. You can see that in the middle, you've got this RealClear Science and the number one referer to that is RealClearPolitics.

I saw some faces smiling. It is true then that politics cares about science. In reality, we looked at this other website. It's not fraudulent, there is actually some invalid traffic but it's low percentage, less than 10%. People do go to RealClearPolitics but to the best of our knowledge, RealClear Science is a fake one.

Call for Cooperation and Collaboration

A lot of the problems with fraud we see is just data quality. We do our best on trying to detect these bots, but it comes down to the data that has been passed to us. The higher the quality of that data, the better our detection is going to be. I just wanted to call out a few trends that are happening in the industry, which are really making it easier for the bad guys to write these bots. One trend is the server-side video serving. This idea of ad-stitching, where you basically stitch the ads on the server-side without actually executing the code on a client-side. Then you inject ads at predetermined intervals in that video stream.

The problem is that we can't really run any detection on the end-user device, and we rely on that to get deep information about, is this a legitimate device, for example? Is this a phone or is this an emulator? Whatever the heck this is. The information is passed to us from the server and moreover, there's no standardized way of passing trusted information. The server will tell us, "Here is some IP address" and usually, it's either going to be the server's IP, which is bad because we expect this data to be coming from the client-side. If we see that the server IP's inside a data center or it's on some known VPN network, then we would automatically blacklist it.

It's bad because we expect this to be a user on some mobile network. If they pass as the client IP that means everyone can spoof it. It's just another header, and the header can be spoofed. I can put whatever I want into one of these ex-headers. It'd be nice if there were some standard. We worked our own using hmac, a cryptographically secure way of passing this but it's not standardized, which means not every platform work this way.

Another type of source of very inaccurate data, and I can tell you from experience working with a location analytics company is location. A lot of location information is useful for fraud detection. You want to be able to see if the location has been spoofed or not. You can't really tell, is this location coming from GPS readout on the phone, or is this just MaxMind mapping from the IP? Those could be completely different levels of accuracy.

OpenRTB has a standard way of doing this. They're working towards that. There's a type field which can designate the type and we also need a source. Then lastly, the last level of our defense is being able to execute code on the end-user device to test the device for authenticity. It's quite easy because you're in a sandbox environment like JavaScript, that the bad guy can just block your script. That means that that calls for openness for these advertising platforms to give us some information on aggregate, for example. You could tell us what is the number of requests that you're supposed to or expecting to see on a campaign or source level, which can then be correlated to tracking the execution of that code on the client-side. That's how you would be alerted of an anomaly.

Performance Requirements (Prebid API)

I'm going to set the performance requirements for the rest of the talk. This is just purely technical. We'll talk about performance, specifically, our Prebid API. It is a high throughput, low latency; when I say high throughput, I mean that it must scale to 1 million requests per second and above. Low latency in the context of this talk, is response time. The 99th percentile response time is under 10 milliseconds.

This is very reflective of the ad tech platform. This is maybe a few months old. These are stats from the major advertising platforms. You can see the order of magnitude here is in the tens if not hundreds or billions of requests daily. Basic arithmetic, this translates on average to about a million requests per second. Some of these systems can handle multiples of millions, and some of them are kind of around that average.

The fraud detection and ad tech systems actually have a lot of commonalities, these are some of them. It's not an exhaustive list. What I'll show you in the slide is, if you look at the backend, they have similar performance characteristics. The backend has to be high throughput, low latency, and there is a Key-Value Store. Whether it's a memory or not, we'll talk about that later. The Key-Value Store in the case of ad tech, this is mostly for looking at things like cookie data. This is like, I have requests, it contains some pseudo-anonymized information about the user, and I want to match this up to some user profile. An example would be this is a user who's 25 or older, a certain income range or maybe something more, this is a user who goes to Starbucks. You can plug in this different kind of facets of information and then that allows you to do a quick lookup inside the RTB, inside a Prebid request.

In a fraud detection case, we want to be able to look up historical data. We can do some real-time scoring while that request is being serviced. In addition to that, we need to bring in historical activity data, which is then combined with the real-time results to form a response, which represents the score of that being fraud - probability of that being fraud.

That's the key value store use case, we'll talk more about that very soon. The data ingest is common to both platforms. You just have a ton of data, and you need to be able to reliably store that. Not only that, I would argue you need to be able to store that as quickly as possible so that your data processing team can start making some good use of it, whatever that use might be.

There is an ETL process usually. Alluded to that, there's some sort of extraction, transformation. The formats that are coming in are not necessarily optimal. You want to be able to translate to a format that's more conducive to MapReduce perhaps. Then the data pipelines is where you actually make use of the data. In case of ad serving, enrichment is the process of, you may or may not have that, but usually, you would take sort of things like IP address and add a dimension to it. Use MaxMind to enrich it so that you know it's coming from State of California, certain zip code, and so on. This is part of enrichment you may have to do slightly more than that.

Similar idea for us, we're basically trying to enrich that information with anything that is useful downstream to be able to score that request. There's budgeting, so basically if you're running Ad Serving you need to understand what your budget is. Each Ad Server only as local state so it knows only part of its budget as it's progressing through RTB. You need to be able to re-compute, reload those attributions. You want to be able to attribute some sort of action like a click to a previous impression, and then there's reporting. Those are the common steps in the data pipeline. In fraud detection, you have scoring and reporting, reporting obviously common to both. There's a lot of commonality.

Guiding Principles

In terms of our guiding principles, I wanted to summarize core design principles. Obviously, this is not the list, it's a list, and just suggestions of things that you want to incorporate into a system that has these performance requirements. Obviously, we're going to stick to an IO - non blocking IO. It's going to give us throughput latency that we want. Avoid locks at all cost, so want to be able to use compare and swap, which has become really cheap on modern architecture. It's not completely free, there's no free launch so compare-and-swap does have negative effects on out- of-order execution. As far as the price of it versus locks, it's sort of magnitudes cheaper.

Spatial and temporal locality- this gets to everything that's been said before. You want to make sure that the code is simple so you can take advantage of things like branch prediction, prefetching, memory into cash, data into cash. There's spatial and temporal locality and I have a few more things to say on that.

Minimize coupling, this is just keep things simple. The more coupling you have, the more interdependencies you have between the sub-systems, the more difficult it becomes. I would say your complexity grows exponentially in terms of improving performance. You want to be really careful with that as early as possible in your design.

If you're in managed runtime, you have to worry a lot about garbage collection. Any sort of unexpected GC pressure is going to give you a spike in latency. That's really hard to control because it's mostly out of your control. You have to structure your code in such a way that you can sort of guarantee it, although there's no absolute guarantee. We'll talk slightly more about that.

Warm-up on startup, you may think this is obvious. If I got my next version, I deploy this, I need to warm it up because the JIT won't actually execute until the same function is hit enough times. If you don't warm up your code, then you're going to incur a really high latency on the initial request, which is bad. Warm-up is actually extremely important, so if you don't do it, then you should. The best way to warm up your latest release is to have some sort of production data that you can replay. This is very similar to executing performance benchmark. That's exactly what we do in our continuous deploy pipeline. New version rolls out, we hit it. It also allows you to do any sort of last-minute sanity check. We verify that the number of errors coming from the responses is below threshold; so if all of a sudden you miss something in your release pipeline you can abort now. You can also verify that the certain baseline in performance is there. Keep that in mind, this last step is really important.

After warm-up, you can get to the point where at least 80% of your code has been JIT-optimized, it's been compiled. You may still incur some latency hits initially, but they won't be orders of magnitude in performance. Last two things, I think they're not obviously directly related to performance but if you don't do that, you're going to hurt yourself later on, so you might as well do it ahead of time. Use HdrHistogram. Mark [Price] has mentioned this in his talk. It's a great resource out there. A lot of the metrics packages are broken out there, they're just not giving you real information. The HdrHistogram is doing that, so there's no reason not to use that.

Benchmark everything, so JMH benchmarks, microbenchmarks, a really gray area. We use them is to get an idea of an order of magnitude. If you do a micro benchmark, and it tells you that something is running 100 microseconds, and you expect 10 microseconds, then that's a signal. Don't take the good side of it, only treat it as a negative feedback. Don't treat it as a positive feedback telling you something that is really great, because when you stitch things together on a pipeline, it's going to look completely different.

By itself, as a negative feedback tool, it's a great tool. For everything that is supposed to be on a critical path, I would argue you have to have JMH benchmarks. Wrk2 is a great tool, basically written by the same author as HdrHistogram, Gil Tene. It incorporates HdrHistogram, it’s a really good load generator compared to JMeter, which we used years ago. It can push 10 times or even more traffic on the same instance. It's really great at generating traffic and doing it well.

Cloud Is Fast (Enough)

We're in a cloud and so the point of this slide is argue to you that cloud is fast enough, whatever that means. It's definitely fast for a lot of stuff. As you probably heard in Mark's [Price] talk, it's not fast enough for everything. Probably not fast enough for FinTech when you're looking at microsecond latency. I think for ad tech, it is definitely fast enough.

Using a modern hypervisor like KVM - we're in Google Cloud, by the way, so KVM runs there - there's fairly negligible overhead. This idea of "noisy neighbor," it does come up a lot still today discussion but to me, it's a myth. There's been some recent work in academia that shows that there's less than 5% overhead in virtualization. True, you might be running next to somebody who's doing Bitcoin mining, but they get a fair share of the resources and so do you. You're still in a shared environment, so there's still going to be some unfairness and because of that, there's going to be an overhead. I think a modern KVM does a really good job to keep that overhead to a low percentage.

My experience is kind of the same. We've been running benchmarks on freshly deployed instances and consistently getting the same results, and the same thing goes with our production code. Networking is really good. The point of this slide is, if you want high throughput in the cloud, you're going to get that. It's actually quite easy to get that. In Google Cloud, for example, if you have a 16 core machine, you get up to 32 gigabits per second of throughput, that's 2 gigs per core.

If you're using your JVM, like we do, it's really hard to saturate that. If you think about the timing it takes to saturate 32 gigs on the network, I have yet to see JVM that's capable of doing that, it's really hard. Local storage - you have lots of variations, there's cloud storage. You can attach local SSDs which we'll use for Key-Value Store later on. Great hardware, you got two gigs per second of throughput, you got really high IOPS. Same hardware, you would get, I think, elsewhere. If you spend more money in your data center, you can probably get more slightly faster SSD drives but it's not going to be an order of magnitude faster, obviously.

What you do get though with cloud storage, which is really great, is high throughput and high availability. The cloud storage scales to virtually infinite throughput. As far as your concerned, to you, it's going to be infinite. There's definitely an upper bound but there's lots of resources. It is strongly consistent, not with all the cloud providers but Google cloud storage is. There's this other really nice option which we'll make use of, it allows you to do parallel uploads, very fast and with not a lot of complexity. There's a compose command which allows you to chunk a large file, and then upload chunks concurrently. Then you invoke the compose, which just basically stitches the blocks together. That stitch operation is atomic and really fast.

Mechanical Sympathy

Mechanical Sympathy, I like this; I learned quite a bit from it. It's been taught at this conference many times, but I think it bears repeating. Basically, the idea is that if you are a pilot and you are flying a modern jet today, you would want to appreciate its capabilities and its limits. If you're running performant code, you need to appreciate the hardware and its limits.

There are great resources listed there, I welcome you to read through them. Lots of good stuff there.


Latency slide, I'm sure you've seen that. Most of you in the audience, this goes back to Jeff Dean's presentation from decades ago, capturing the order of magnitude when it comes to different hardware. What I want to illustrate in the next slide is going to give you some examples, you can read through this later. We know that queueing theory. Quick refresh - Little's Law tells us throughput is inversely proportional to latency under stable conditions. With that, let's say we do 1,000 references to main memory, which could be quite common, so you implement some sort of whitelist as a linked-list data structure. You do 1,000 references. Every request comes in, and you're going to go through the linked-list and look for something in that whitelist, maybe an IP address. That is roughly 100 microseconds.

A thousand doesn't seem like a lot, but when you do the math, that's a limit of 10,000 requests per second. That's the upper bound and the throughput of your system there. It's important to understand that. The same 1,000 references to L2 cache, on the other hand, is only 7 microseconds, so there's a huge difference here, more than an order of magnitude. That translates to over 100,000 requests per second. There's a huge difference.

Another point here is, if you go back to your comp sci fundamentals, and it's something that I've been taught; all the sorting algorithms, for example, are usually finished with insertion sort. No matter what you do in quick sort, at the end, there's an insertion sort when the size of the array reaches a certain threshold, maybe 1,000 elements. At the time it didn't quite make sense, so why do we do this? Again, this goes back to this spatial locality principle, because then it's an array, contiguously stored, probably in cache. Things are a lot faster at that level. Insertion has that locality, whereas other sorts don't necessarily have that. It's a lot more efficient once we get to smaller size.

Here's an example. We all think that binary search is the fastest way to search through a sorted array. It may not be so. If you have a small array of let's say, 1,000 elements, the linear search may be a lot faster. If you look at that example, what you notice is that it has no branches other than the for-loop. That means this code can be vectorized. It can take advantage of the modern vectorization, AVX Intrinsics, which is going to give you a much, much higher speed up. This could actually beat binary search rather badly, so just keep that in mind.

When you're building your data structure, it's important to understand. As it's been said in other talks, I think Todd [Montgomery] mentioned this in his talk, linked-lists are not particularly a great data structure when you're trying to take advantage of the modern hardware. If it's possible, if you can store things in an array, if the scale allows you, then you should consider doing so.

Disruptor Pattern – Fast Event Processing

Make use of the disruptor pattern. I won't spend a lot of time explaining it. It's a great pattern. If you haven't seen it, I encourage you to study it. It's basically like Java's BlockingQueue but way faster, it's super-fast. It's a ring buffer, it uses a compare and swap operation to drain the queue without locking. Because of that it's using speculative racing to do this very quickly. It looks like this, there's a ring buffer. Every consumer has its offset, so it can basically consume messages at its own speed and the producer progresses there.

The ringbuffer is pre-allocated so you want to reduce any sort of GC pressure. It's pre-allocated using some sort of message wrapper. As an example, you could see there's a wrapper class - by the way, common source of a memory leak here. When you're consuming this data, you can consume a batch of messages. Make sure you set your message to Null. We've actually had that memory leak in production and discovered it, it wasn't easy to find. That means if you don't, you're going to wait until that message gets overwritten. Before that time, you're going to have a lot of memory that cannot be collected because something is referring to it.

It's very compact, so you have to figure out what your throughput is. You can kind of empirically adjust the size of your ringbuffer but if you have half a million entries, then that's only 14.5 megabytes. By today's standards, that's almost nothing in memory.

Data Ingest & ETL

Data Ingest and ETL, I want to spend a little bit of time to talk about that. ETL is a big problem, it's done in many different ways. One idea that we leveraged successfully, I believe, is to do ETL as early as possible. We actually do it inside the API. It really depends on your use cases, but there's enough CPU cycles to do this stuff. Unless you're running some sort of numerical computing on every single request, and you're saturating your CPU, your CPU actually has a lot more cycles than you think. These are IO bound systems, I would argue. Basically, a request comes in and we validate the request. Then we apply some payload limits and all that. Usually, it's some sort of inefficient format, like in Mark's [Price] talk it was FIX, we see JSON. Neither one is a great format. The first thing we do is basically translate the inefficiency away to using something like Avro. If you haven't heard of Avro, it's like Protobuf, they're comparable in terms of performance.

Then on top of that, we slap snappy compression, so I'll have a slide on snappy very soon. It's still a really fast compression algorithm that works at IO speed. It basically trades efficiency from an information standpoint for speed, and it can do this really well. You're compressing data, you're not getting the best compression possible, but you're still getting on this type of data may be 2, 2.5X of compression. You're doing this at IO speed without actually hurting your performance in any significant way.

Then basically, we use Disruptor here to consume messages that have been already ETL'd into this nice and efficient Avro snappy-compressed format. The Disruptor allows us to do this stuff really nicely in terms of actually getting the data uploaded into Cloud Storage.

In this case, it's a single thread. It's consuming the records that are already in Avro format, and you want to batch them. In order to get the highest throughput on your upload, you want to batch enough records there. We do this somewhat arbitrarily, but a 5-minute batch seems to work quite well. It's super cheap, because what you're doing is, you're just appending to, in our case, a file that is on the local file system. Then there's a separate thread that kicks off every few minutes, and then uses a parallel upload to Google Cloud Storage.

A couple of caveats. Every time you're dealing with a low latency application, anything that touches things like network, even an upload or a download, when you bring it on data into memory, you want to throttle those systems. It's something that we learned later on. Even if you upload something to cloud storage in a background thread, what's going to happen is, you're going to get all of the available throughput of that network. You’re usually going to have to have some allocations done in the process. That creates additional garbage collection pressure in a managed environment, and that hurts your latency as a result. Same thing happens on download. We actually had to throttle downloads and uploads to put a damper on latency.

As I mentioned, you can do this really easily without a very complicated system. We don't use, for example, write-ahead logs, which means that if the server goes down the data on the local file system may be corrupted. Because Avro uses blocks and those are fairly small blocks, the worst we can do is lose 32 kilobytes of records. We created our downstream system to be able to deal with corrupted Avro files. If you use systems like Hadoop or Spark, you know that they're really bad at dealing with corrupted files, something you can easily fix by implementing custom input format. What it's going to do is just skip over bad blocks.

Because of that, that gives you simplicity. We're not in the domain of financial transactions where we have to worry about every single request. If we lose five records out of a million, it's not a big deal. Fraud detection works usually on aggregates. It introduces simplicity in design and it also improves your performance because write-ahead logs actually have a lot of latency associated with things like fsync to disk. It introduces delay into your pipeline.

A very quick JMH benchmark, pardon me for displaying those, but it's showing you that Snappy has very little overhead. This is looking at encoding your data using Avro, and encode and compress is just using Snappy. You can see that the number of ops on the encoding part is somewhat affected but when you're actually decoding the data there's almost insignificant overhead of using Snappy. Facebook has developed Zstandard, which on benchmarks, it is actually much better than Avro, so I would encourage you to look into it. It's not well integrated through the whole ecosystem, but it's definitely in our backlog to research more. Zstandard is kind of adaptive. You can tune it and it can actually perform slightly better than Snappy in my experience.

We talked about data ingest and ETL. One added benefit of doing ETL early is that you don't have another pipeline to worry about. The data is readily available, usually within five minutes, because we're batching. We can see the data in cloud storage. It's already an efficient format, we can start processing that. That's a huge benefit to your data pipeline team.

There's not a huge delay between the incoming request and the ETL'd and scored response, and you can start looking into. We mentioned the Avro performance, throttling uploads downloads. Another thing you have to do is garbage collection tuning. Hopefully, you don't have to do it, but if you do, watch out for humongous object allocation if you're using G1, something that definitely we encountered. Again, this has to do with using a cloud storage library that uses very large buffer sizes.

In this case, Google Cloud Storage library, I think, by default will use something like 8 megabytes, very large buffers to be able to minimize the number of RPCs. You basically minimize network round trip at the expense of storing stuff in memory. There are fewer calls. Unfortunately, if you're using G1, humongous object allocation kicks in, and that can hurt your latency quite substantially. Make sure you know where those allocations are and then you can definitely build a better data structure to limit that allocation, which is what we did.

The compose works well. Like I said, you can basically batch a large file, and then you chunk it, and you get really nice, very high throughput upload with that.

KV Store – Why Not Aerospike?

Move to Key-Value Store next. I want to share our experience with Key-Value Store. Folks from the ad tech industry probably are familiar with Aerospike, it's like the de facto solution or technology in that space. We used Aerospike before, we tried to use it here at Forensiq. It's a great system. The pros and cons are there, I'll just point out some of them. It definitely has really great performance but it's a very complex system because it has to support writes and it has to support replication, all we care for is actually read-only, and I'll explain what I mean by that.

It has no bulk loading capability so that's kind of a big problem. If you're continually producing new versions of your data, and you want it to be able to load this into different regions or data centers, then you have this almost a split-brain scenario. You're basically writing into your Aerospike cluster, it's replicating data. This is true of any sort of replication system, at which point is that data consistent. This eventual consistency can actually be really hard to troubleshoot.

Bulk loading is something we would prefer because we would produce a new version of our dataset and then using bulk loading, we just load it in as one atomic step. There's some other design considerations. The index file can actually be very large, and Aerospike takes a lot of memory. Then one thing we ran into is the global bin limit, which is silly. If you use HBase, bigtable, this is the number of columns. Aerospike has this limit, it's a global limit. It has 32,000 bins or columns that you can allocate. When you allocate it past that you're losing data. If you're writing into your Aerospike cluster, it can no longer store it. Moreover, if you want to get rid of these allocations, because they're using reference counting, it's not that easy. There's no garbage collection built in. What we had to do is basically write a tool that would walk through all the bins, set them to Null, and then restart each server at a time. This was quite a laborious exercise, which I'm happy not to ever do again.

Low Latency KV – Voldemort

For that reason, we settled on Voldemort, which is LinkedIn’s Key-Value Store contributed to the open-source a while back. For a few years, they stopped development on Voldemort. They're working on Venice, which is their next replacement for it. Voldemort is really nice because it's a very simple system - relatively simple if you're using it only for read. It does support writes, but for us, as I said, we mostly care about read-only performance, because we're going to use bulk loading to load the data in.

It was written in Java, supports are simple, Key-Value Store API get/put. Bulk loading in bold because that's what we care about - the simple part of the system comes in from the fact that it allows you to basically take your data, split it into chunks, and then memory map them. With memory-mapping, you want to make sure that your index is always memory resident. When you can guarantee that, then you're just using the latency of the local SSD drive to look up data.

Using OS caching here, it's going to try to load as much data as your RAM allows, but everything outside of that will incur a disk lookup, which without SSD, as you know, it's on the order of 100 microseconds. There's still penalty for doing that but it's quite relatively small. It's a great system. The way the bulk loading works is kind of like this. You produce your data set offline using Hadoop job, or you could use Spark, for example, any sort of system out there that's going to generate the chunk files that you can load. Then it issues an RPC to all the servers on the cluster and then they just pull down the data. When they're pulling the data down, they're just pulling the chunk files into a new staging directory. Then the switch actually happens using a move operation in a file system. The switch is sort of atomic, not fully.

Here's a slide on performance. This is the latency for Voldemort that we see in production. This is 99th percentile latency. For the most part it's ok, but you see these spikes which are happening quite regularly. The problem there is, when you're reloading the data, because we're using, in this case, Java, it's really hard to actually mlock the index file. When the data gets reloaded for some time, it's a small duration but, maybe a minute on average, the index file is not fully memory-resident. All the requests that are coming in at that time, that window, they will incur this additional latency.

We had to build a lot of custom stuff that just wasn't available there. The BloomFilter, for example, is going to cut down on the amount of network calls we have to make. The BloomFilter is actually on the client-side. We compute it offline and then we load it into our backend to reduce the number of calls. It has to support schema versioning, Union datastore, so you can basically union different types of Avro records together if you want to store them.

Then it uses TCP. TCP connection pooling, in my opinion, is flawed. Every time you have some sort of a network spike or some condition that causes that connection in a pool take longer than it expects, it times out, and then it's removed from the pool, which means on subsequent calls, you're going to incur the latency of a TCP handshake. That turns out to be pretty bad when it happens. Going back to Java ByteBuffer, you can only access two gigs so that limits into what you can memory map and so on. As a result, we've decided that we can do a better job. Using the cloud environment, we could replace TCP with UDP because the network is really reliable when you're in the same physical region or data center. Instead of using HDFS which it uses by default, we could use Google Cloud Storage and the backend can be done in C++ to eliminate this memory component, where you have much better control of the memory. That's currently in progress.

Putting Things Together

Putting things together, this is how the high-level architecture diagram looks. You've got these backend instances running in different locations in the world. They're making calls to the Voldemort, which is KV store in there, and then they're uploading the data to Google Cloud Storage, which ends up going to, in this case, us-east1. Then data processing happens from that point, so the Dataproc is a managed Hadoop cluster, and there is Bigtable and whatnot. The data pipeline is of coming out of that.

I'm going to wrap up here. I wanted to say a few things about reliability. You can't really do performance without reliability, so maybe I'll pause here for a few seconds. That's my favorite slide on Tech Debt, that's how it feels. I think it captures that pretty well in practice.

Legacy and Tech Debt I think are top two diseases in our industry. Part of it is inherited, part of it is environmental. There's some good research. Big companies like Google rewrite code regularly, that helps.

Tests are important if you're doing performance critical stuff. The more tests you can integrate together including micro benchmarks and performance into a single pipeline, the faster feedback you'd get. It helps us to do things like ETL very early, because then we can test from the client-side, where JavaScript is executing on the end-user device using Selenium framework to the point where it comes in on the backend, to the point where it comes out as Avro. This gives you a nice end-to-end test, which can tell you that yes, the information that you've computed is functionally correct to some degree. Having that feedback is really important. It helps you iterate very quickly.

I'm going to skip the design by contract. That's interesting, but probably deserves a separate talk. The managed infrastructure in Google or any other cloud provider has a lot of benefits to you, I would argue. Things like having distributed, highly available file system, that solves a lot of problems right there. Being able to store your data and reload it and upload it, and doing this reliably just eliminates a lot of concerns from your code.

Load balancing is another really interesting area. That's why I said earlier, low latency is much harder to get on a cloud environment but you could do latency-based load balancing at basically layer seven, and layer four, but they have different performance characteristics. If you need HTTPS, then you have to be in layer seven. In our case, the load balancer does a fairly good job but there's a latency associated with building that initial request.

The layer four performs slightly better. It's a great technology, you don't have to worry about load balancing, but there's a penalty with that. That gives you other advantages. You can do rolling deploys, which are really tough to do without a load balancer.

Wrapping up, Cloud Tech, I think it's mostly mature, mostly because there's still a bunch of issues that need to be solved. These are just some bug reports and issues that we've submitted. Some of them are critical, some of them are improvements. The first one was a fairly critical block with Bigtable. The good news is that this is on the client-side. The backend actually is a very mature technology. Bigtable has been around for decades and it works, but the client libraries that are being built still today, they are buggy and need to be improved.

Trust but verify. If you're using cloud, definitely trust your provider but always verify. There are certain things that can creep up. A bunch of different features you may first get to see that are freely available, get to play with them, and then all of a sudden, the new pricing model makes it insanely expensive, it's something you would not be able to afford. Watch out for that.

Configuration. If you misconfigure something and you're using that as a black box, my favorite fiasco was DNS delegation, which basically resulted in this crazy amount of money. Because it was a black box, there's no logging, you assume that Cloud DNS works as expected but because here we're using delegation, it's kind of an edge case where we're delegating some DNS records from one provider to another, this interfered with ipv6, where a lot of mobile devices, and that's what they use by default. They would look up the ipv6 record, which is the four As, because it's 128 bits, and then the record wouldn't exist there. Then it would just do another four lookups, because there are four name servers for every single request. I can think of it as a denial service attack, which led to a lot of dollars.

In summary, I think Cloud and OSS is a very powerful combination. I think high throughput in Cloud is really relatively easy to achieve. Low latency is a different topic. It is doable but requires a lot more effort and dedication. There are some new trends in the field. We know that there's everything as a service today. Infrastructure as a service has been around for a while. More recently emerging trend is storage as a service. SSDs are becoming APIs that you could use. That means in the future you could actually have custom code running in SSD. They already have dedicated CPUs that can offload and give you a much better performance and custom performance, more importantly. Lastly, fraud is not going away anywhere. I think you all know that. Cooperation and collaboration is really important if we want to solve that problem together.


See more presentations with transcripts


Recorded at:

Oct 23, 2019