BT

Facilitating the Spread of Knowledge and Innovation in Professional Software Development

Write for InfoQ

Topics

Choose your language

InfoQ Homepage Presentations How to Build a Reliable Kafka Data Processing Pipeline, Focusing on Contention, Uptime and Latency

How to Build a Reliable Kafka Data Processing Pipeline, Focusing on Contention, Uptime and Latency

Bookmarks
49:33

Summary

Lily Mara shares how OneSignal improved the performance and maintainability of its highest-throughput HTTP endpoints (backed by a Kafka consumer in Rust) by making it an asynchronous system. She shares how metrics changed when the system went from sync to async, and what unique sharding strategies were used to maximize concurrency and performance, while maintaining consistency for Kafka consumers.

Bio

Lily Mara is an Engineering Manager at OneSignal in San Mateo, CA. She manages the Infrastructure Services team, which is responsible for in-house services used by other OneSignal engineering teams. Lily is the author of Refactoring to Rust, a book by Manning Publications about improving the performance of existing software systems through the gradual addition of Rust code.

About the conference

Software is changing the world. QCon empowers software development by facilitating the spread of knowledge and innovation in the developer community. A practitioner-driven conference, QCon is designed for technical team leads, architects, engineering directors, and project managers who influence innovation in their teams.

Transcript

Mara: My name is Lily Mara. I'm an engineering manager at a company called OneSignal in San Francisco. We write customer messaging software that helps millions of app developers connect with billions of users. We send around 13 billion push notifications every single day. I have been writing Rust since around 2016, and writing it professionally at OneSignal since 2019. I'm one of the authors of the book, "Refactoring to Rust." That's available now in early access at manning.com.

Once Upon a Time

I'm going to talk a little bit about how we took one of our highest throughput endpoints and made it stop bringing the website down every day. We're going to be talking about how we took a synchronous endpoint that was leaving a ton of load on the database at our HTTP edge layer, and moved that into an asynchronous workload that we could much more control the internal impact of. Since this is a retrospective, we're going to be jumping back in time a little bit by a couple of years. At this time, we were sending around 8 billion push notifications every day. We had a team of around 10 engineers on what was then called the backend team at OneSignal. We had maybe 25 engineers, total. When you're operating at that sort of, we considered it largest scale with that small team, you have to make a lot of simplifying assumptions about your problem space. This is going to come into play later on as we get into the architecture of the system. It's important to remember that a lot of the tradeoffs that we made, we made because we had specific performance goals that we were trying to serve. We had other things that we didn't quite care so much about.

The problem that we're trying to solve is backing the API that allows our customers to update the data that's associated with their subscriptions. At OneSignal at the time, our data model was centered on subscriptions, which we defined as a single installation of an application on a single device. One human person might have many OneSignal subscriptions associated with them for different applications and different devices. Each subscription can have a number of key value properties associated with them. Customers might send us HTTP requests like this one that allow them to update those properties. For example, you can see, in this instance, someone is sending us an HTTP PUT request, so that they can update the First Name and Last Name fields to Jon Smith. This request also gives us the subscription ID, which correlates with the row ID in the database, as well as the app ID, which is more or less a customer dataset ID. You can see in this instance we have one human person and she has a mobile device. She has a web browser that's on a laptop or desktop. The mobile device has two subscriptions that are associated with it, it has an SMS subscription and it has a mobile push subscription for a mobile app. The web browser also has a web push subscription that are associated with it. There are multiple subscriptions across multiple devices, but they all correspond with the same human person. At this time, we didn't really have a data model that allowed for the unification of these subscription records to one human person, like we do now. You can see each of these subscriptions has associated with it in the database, an account type property. This might be something that a customer uses to do substitution at the time they deliver a notification. They might do some templating that allows them to send a slightly different message body to people with different account types. They can also just do segmentation that allowed, send this message only to people who have VIP account levels. Send this message to people who have user account types. This system is fairly flexible. Our customers do like it. Somebody has to maintain the infrastructure that allows you to store arbitrary amounts of data.

The original format of this was an HTTP API that performed synchronous Postgres writes. Our customer makes an HTTP request, either from their own servers or from our SDKs on their users' devices. We block the HTTP request in our web servers, and the Postgres write completes, and we send a 200 back to the device. This works for a time. It certainly works at 100 users. It probably works at 10,000 users. Maybe it even works at several million users. Once we started getting into the billion range, we have billions of users, we're sending billions of notifications a day, and a notification send was, generally speaking, fairly well correlated with these property update requests. Because after a notification sends, maybe a user takes some action, and a customer wants to update the subscriptions properties as a result of this. Eventually, we grow to such a scale that we get hammered by traffic incessantly, generally speaking, at the hour and half hour marks, because people love to schedule their messages to go out on the hour, on the half hour. Nobody is scheduling their push notifications to be delivered at 2:12 in the morning. People are scheduling their messages to be delivered at 9:00 in the morning, because it's just easier. It makes more sense as a marketer, and it comes more naturally. Everybody is doing that, meaning at that hour mark, we are getting tons of traffic at this endpoint. It would frequently lead to issues with Postgres, as it really couldn't keep up with the volume of writes that we're sending it. There's a huge enormous bottleneck here. Very frequently, this would just cause our entire HTTP layer to go down, because all of our Postgres connections would just be completely saturated, running these synchronous HTTP writes.

The Queuing Layer

What did we do? We took this synchronous system, and we made it asynchronous by introducing a layer of queuing. What is a queue? Queues are a very fundamental data structure that are talked about in computer science classes. They work in a very similar way to the queue you would wait in at the café. Items are enqueued at one side. They sit on the queue for a period of time. Then, in the end, items are dequeued from the other side, and they're handed off to some processor that does something with them. We can control the rate at which messages are dequeued. We can slow down our processor, or we could try to speed up our processor, so that messages sit in the queue for as little time as possible. Or we could slow our processor down a little bit, so that we don't overwhelm the processor in the end, because remember we're running Postgres writes here. If there's a huge spike in the number of requests that come in, really all that happens is more messages sit in the queue, and our Postgres servers don't get overloaded because they're processing a more or less constant rate of messages per second, or at least a rate of messages that has some known upper bound on it. We are introducing an additional metric here. In our previous world, really the only metrics that we had were maybe the number of requests that we're processing at one time, and the CPU memory limits on our web workers and on our Postgres workers. Now we have this new thing that we need to measure. It's going to turn out to be really important for us to be measuring this. It's the number of messages that are sitting in that queue waiting to be processed. This is the metric that we call the lag of our queue. We have four messages right now that are sitting in this particular queue. We have a red, blue, orange, and green message that are sitting in the queue waiting to be processed. We have a purple message that has been removed from the queue so it's no longer waiting. We would say that we have four messages in lag for this queue that's represented on this slide.

Apache Kafka

A queue, like I said earlier, is just an abstract data structure. In the real world, we don't use abstract data structures, we use concrete implementations of data structures. In our case, we use Apache Kafka. Apache Kafka is a very nice high performance message log system. It has a lot of neat features that you don't get with just the basic abstract queue. It has a ton of use cases beyond what we're going to discuss in this talk. We're going to talk about how we use it to back our Postgres writes. How does Kafka work? How are these things structured? With a basic queue that you might import from your programming languages' standard library, you might have a list of messages that sits in memory. You might remove things from that list of messages as they're being processed as you dequeue them. That's not what we get in Apache Kafka. We have a number of data structures that are represented on this slide, so let's talk about them one at a time. The first thing that's on here is the producer. This is the thing that enqueues messages to Apache Kafka. In our case, this was our HTTP server that's taking messages from the HTTP API, and it's adding them to our Kafka queue by calling the Send method on Kafka. It's adding them to Kafka in something called a topic. This is a logical grouping of messages, where messages have the same purpose. Each message has an auto-incrementing numerical ID called an offset, that starts at 0 and it gets bigger over time.

The thing that pulls messages off of Kafka, that dequeues messages is called a consumer. In this case, we have a single consumer, it's called A, and it has pulled the first message off of the log of the queue. The thing that's interesting here is, you'll notice that that message is still on the queue. It hasn't been removed, it's still on the topic. The thing that's different about Kafka from the typical queue implementation is that the messages don't actually get deleted from Kafka's data structures, they stay there, and the thing that changes is just the pointer inside the topic to which message has been read. This is going to give us a little bit of flexibility in terms of how this is implemented. It's important to note that, really, what we're changing is we're just moving this pointer around in there. You notice that our consumer A has read message 0. It processes message 0. Then once it's done, it sends Kafka a message called a commit. That advances this pointer, so it moves the pointer from 0 to 1. The next time we ask Kafka for a message, it's going to give us the next message in the log back.

One of the nice flexible things that you can do with Kafka because it has this moving pointer thing is you can actually have multiple consumers, which read from the same exact topic, and every consumer is actually going to be able to see every single message that's on the topic. This is beneficial if you wanted to have a single Kafka topic that has just a ton of data on it that might be useful in broad contexts, and maybe one application reads 90% of the data, and another application reads 30% of the data, obviously, those add up to more than 100%. There could be overlaps. Because each consumer is going to see every message that's on there, you get a lot of flexibility in how you want to utilize that data. In this instance, we can see that there are two consumers that are both reading from the same topic, and they're at different points in the topic. As your user base grows, you probably want to be able to process more than one message at a time. Both of these consumers are just looking at a single message, they're looking at different messages. They are both looking at single messages and they are both going to want to process every message that's on that queue.

Kafka Partition

What do we do about this? It turns out that Kafka has a feature that's built in for scaling things horizontally, that is called a partition. I know that I presented each topic as an independent queue of messages, previously. It turns out that within a topic, there are numbered partitions. These are the actual logs of messages that have independent numbering schemes for offsets. They can be consumed independently by either single instances of a consumer or, indeed, multiple instances of a consumer. In this instance, on here we have a topic that has two partitions. We could have two instances of the consumer that were running in two different Kubernetes pods, or on two physical servers that were separated and had dedicated resources, or they could simply be running in two threads of the same consumer instance.

We talked previously about lag. Notice that these two partitions, they have independent lag from each other, because they are two different queues with two different positions in the queue. Partition 1 actually doesn't have any lag, because it's red up to the end of the queue. Partition 0 has 3 messages in lag, because there are still message 2, 3, and 4 that are sitting on the queue and waiting to be processed by the consumer. You might say that the topic as a whole has three messages in lag, because it's just those three messages from partition 0. You could subdivide that down into the two partitions. Indeed, in most cases, if you have a resource that's sitting back there behind just one of the partitions, you will very often find that one of your partitions, or one grouping of your partitions starts to lag. It's, generally speaking, not smeared across the whole range.

We have our multiple partitions, but when we have a message that we want to add to our Kafka topic, what do we do with it? Which of those partitions does it go to? There's a couple of different ways that we can distribute messages to our different partitions inside the Kafka topic. The first one is fairly intuitive.

If we don't tell Kafka which partition we want a message to go to, we just get round-robin partitioning. In this instance, there are three topics. Each time we add a message to our Kafka topic, Kafka is just going to pick one of those partitions, and just assign the message to it. This might be a perfectly acceptable partitioning strategy for many use cases. In our case, this was not really going to work for us. Because, remember, we were doing Postgres writes. Each of these messages was a subscription update that had a customer dataset ID and a row ID on it. We have a cluster of Postgres servers that are partitioned by customer dataset ID. We don't have one Postgres server per customer, but we do have a Postgres server that is responsible for a range of customer dataset IDs. This meant at the time that we were adding our message to Kafka, we actually knew at that time which Postgres server the message was going to be written to eventually. At the time, we generated our message and wrote it into Kafka, we would actually assign it explicitly to a partition that was tied to that particular Postgres server. Now, again, this, as is shown on here, gives us some amount of separation between the Postgres servers. You may notice that within each Postgres server, there's really not any concurrency, there's a single partition that's responsible for each Postgres server. We are not going to have the ability to do concurrent writes on our Postgres server, we're going to be fairly slow. The way that we would get around this is by having multiple Kafka partitions that would be responsible for each Postgres server. In this case, we have four partitions, and two Postgres servers. Each Postgres server has two Kafka partitions that are associated with it. We can process those two partitions concurrently, meaning we can perform two concurrent writes to our Postgres servers.

Issues

This strategy will get you fairly far, but we did run into some issues with it. This system isn't super flexible. What happens in a world where we want to change that concurrency number? There's a lot of reasons why we might want to do this. Maybe we're able to get some hardware updates, and we have better CPUs that are available for our Postgres servers, and so we want to increase the concurrency of our writes. Or maybe we decide that we actually want to prioritize reads, and we're ok with slightly higher message latency, so we want to reduce the concurrency of our Postgres writes. In this instance, we would have to do what's called repartitioning our Kafka topic. This entails some rather interesting gymnastics. If we had a topic that had four partitions, and we wanted to go to a topic that had six partitions, so we wanted to be able to change our write concurrency from 2 to 3, we would have to create a new empty topic that had 6 partitions, and we would have to rewrite all of the data from the old topic to the new topic. We would have to manage doing that at the same time as we were shifting new writes onto the new topic. If you want to maintain message ordering while doing something like this, it's really quite a juggling act. This is something that we wanted to engineer our way out of having to do every time that we wanted to change our Postgres concurrency level.

What do we do instead? How do we get around this kind of issue with the strategy? The solution that we developed here was to concurrently process Kafka messages in memory within each Kafka partition. This is something that we called subpartition processing. This is not something that, generally speaking, folks like Confluent consultants will recommend that you do, but we have found it to be very useful and extremely effective for meeting our particular needs. What does this look like? Inside of our consumer instance, as mentioned, we have a number of partitions that are sitting there in memory and processing concurrently. Within each one of those partitions, Kafka has a number of messages that it knows about. Inside of our consumer instance, inside the partition level, we have a number of workers, these are more or less threads that are sitting there, and they're able to process messages concurrently within the partition. Instead of asking Kafka for the next message, we would ask it for the next four messages. We would process those concurrently, send our writes to Postgres concurrently. If it were that easy, this would be an extremely boring and short talk. Once you start doing this, things start to get a lot more complicated. We'll run into a lot of issues along the way. The first of which, that becomes extremely obvious when you try and achieve this, is, it becomes a lot more difficult to manage commits. Let's talk about why that is.

If you're processing messages concurrently that are linearly ordered in Kafka, eventually you're going to get to a point where you process messages out of order. Let's imagine that we have message 0, 1, 2, and 3 that are all sitting there in memory, they're being processed concurrently, and we just so happen to finish processing message 0, and message 3 at the same time, while 1 and 2 are still processing. What do we do in that situation? If we follow these standard Kafka rules, what we would do is we would send Kafka a commit for message 0, telling Kafka, I finished the processing on message with offset 0, and you can mark it as done. If I ask you for more messages in the future, you don't have to give me message 0 because it's done, it's over. Once you've done this, remember we've also finished processing message 3 concurrently, so we would also send Kafka a commit for the message with offset 3. Kafka says, "Ok, that's great. Message 3 is finished, so I will never show you message 3, or any of the messages before message 3 again." Because it turns out that Kafka does not store the completion of every single message, it only stores the last message, or it only stores the greatest message that has been committed, because it assumes that you read the queue from the beginning to the end, and you don't skip any messages, like we're doing. This is a problem. This isn't going to evict message 1 and 2 from the memory of our worker, but it is going to prevent us from replaying those messages. If we get into a situation like, say, our consumer instance restarts, or crashes, or something like that, while message 1 and 2 are still being processed. If the consumer instance comes back up and it asks Kafka for the messages again, it's never going to get message 1 and 2 replayed, because Kafka thinks that they've been committed, they've finished processing so it doesn't need to show them again. For our purposes, we consider this to be an unacceptable data integrity solution. We wanted to make sure that every update was being processed by the database.

What is the solution here? How do we get out of this problem? What do we do with this message 3 given that we can't send it to Kafka? The solution that we came up with was to create a data structure in the memory of our Kafka Consumer, which we called a commit buffer. Instead of sending messages to Kafka when they're committed, we write them into this in memory commit buffer. It has one slot for all of the messages that are in memory. It just has: is completed, is not completed. Right now, it has two completed messages in it, 0 and 3. You can see it has empty slots there for 1 and 2. We have the messages written to the buffer. Then what we're going to do is we're going to start at the beginning, and scan for a contiguous block of completed messages. If we do that, we can see that there's a single completed message at the beginning, message 0. We can take that completed message, and we can send it as a commit to Kafka. Then after that, we need to do nothing for a little bit. Because if we look at the beginning, there's not a single committed message that's at the beginning of that commit buffer, so it's not safe for us to send Kafka any more commits at this point. We need to sit around, and we need to wait for messages 1 and 2 to be completed by our Kafka Consumer, at which point, we can scan again from the beginning for the longest block of contiguous messages. We find that 1, 2 and 3 are completed, and we can send Kafka a commit, or the message with offset 3, and it will know that 3 and everything before it have been completed, and I don't ever need to replay them.

Model Concession

There is a concession of this model. It's a pretty important one that you definitely need to design around. This model uses something called at-least-once delivery. There's a couple of different models of Kafka Consumers and generally asynchronous processors like this. This means that messages are going to be replayed. It means you will definitely get to a point where a Kafka Consumer reads the same exact message more than one time. It will happen. You need to design your system to account for this possibility. There's a lot of different ways you can do this. You might have a Redis server that sits there and tracks an idempotency key at the time that your message is completed. We actually took the Kafka offset from the message and we wrote it into Postgres, and we use that as a comparison at the time we were doing our write to make sure we didn't update any rows more than once, if we've already applied that update. This is something that you have to design into your system, or you will get inconsistent data. Imagine you're ticking a counter and you apply the same counter tick more than once, you're going to get bad data, so you've got to design around it. There are other models of delivery. There's at-most-once delivery, which is where you know you're not going to see something twice, they might not show up once. There's also exactly-once delivery, which is a very large bucket that you pour money into, and at the end, it doesn't work.

Review

Let's review all the pieces that we've talked about here. Some of this is standard Kafka, and some of it is specific to what we were trying to do at OneSignal. We have a Kafka topic that contains a number of partitions, which are independent queues of messages. Each message has an auto-incrementing numerical offset, that's sort of like its ID. We have producers that enqueue messages into Kafka, and we have consumers which dequeue messages out of Kafka. We can control the concurrency of our consumers via Kafka partitioning, the number of partitions we create, and this subpartitioning thing that is very specific to what we're doing at OneSignal. The really nice thing about this subpartitioning scheme is since our workers are really just threads in the memory of our consumer, we can control the number of those extremely easily. Right now they're essentially numbers in a configuration file of our Kafka Consumer. We can change those by redeploying the Kafka Consumer. In theory, we could design a way to change those while a process was running without having any downtime whatsoever. We don't do that, because we don't feel that that level of liveliness is necessary. We can be very flexible up to the point that we think is useful.

Postgres Writes

It's also very important to remember that these Kafka Consumers are performing Postgres writes. Let's talk about these Postgres writes. Let's talk about what they're actually doing, because the fact that we're doing Postgres writes is actually quite important and quite relevant, and it's going to have a lot of impact on how we manage concurrency in this system. We're doing Postgres writes. Postgres has a bunch of rows in it. All these Kafka updates are fundamentally saying, take this Postgres row and update some property on it. Let's imagine for a moment that we receive two Kafka updates at the same time, or around the same time, and we process those two concurrently. They're both trying to update the same exact row in Postgres, one of them is setting a property called a to 10, and another one is setting the property a to 20. Imagine we got the one setting a to 10 first, and the one setting a to 20 second. I don't know why a customer might send us two conflicting property updates in quick succession like this. It's really not my concern. The thing that is my concern is trying to make sure that we replay our customer's data, and we apply our customer's data in exactly the order that they sent it to us. If we're processing these two things concurrently, we might just so happen to process the message setting a to 20 first, meaning in the database we get 20. Then, at a later time, we finish the message setting a to 10, meaning we get 10 in the database. That might be confusing from a customer's perspective, because they sent us set it to 10, set it to 20, so it would seem like the property should be set to 20. This is not an acceptable state of the world for us. We need to design around this problem.

What are we trying to do? We need to maximize concurrency. We want to make sure that we are saturating our Postgres connections, and getting as much out of those as we possibly can. We need to minimize contention. If there's a single row that's getting a bunch of updates sent to it, we can never process those updates concurrently, because they might complete out of order, and that might lead to an inconsistent state of the customer's data. We don't want to do that. What do we do? We created some more queues, because it's queues all the way down. We love queues. Recall that we had a number of processors for messages in memory, those subpartition processors. Instead of having them just grab messages randomly off the main partition queue, what we did is we took the subscription ID, which is the row ID, and we hash that. We merge load it by the number of workers that we had in memory, and then we assigned it to a queue that was tied one to one to those processors. You can see, in this case, we have the real queue that represents every message that's in the Kafka partition.

We have a blue queue that represents some portion of the messages that happens to hash, and the red queue that happens to hash to the same queue. Because these hashes are based on the subscription ID, which is the row ID, we know that updates, which are bound for the same row, will never complete concurrently, because they will be in the same queue which has only a single processor associated with it. However, since these row IDs we assume are assigned basically fairly, these queues are going to have a bunch of messages in them, and we'll be able to process lots of things concurrently.

Unifying View of the Kafka Consumer

There's been a lot of data structures that we've talked about, let's try to put them all together into one grand unifying view of what our Kafka Consumers look like. At the far end, we have a producer that enqueues messages into our Kafka topic, and it tells Kafka, please send this to this particular partition, because it's going to wind up in this Postgres database. That Kafka topic has its messages dequeued by a consumer, which is consuming multiple partitions of messages concurrently. Within those partitions, we take the row ID, we hash it, and we use that to assign a message to a particular subpartition queue, which is tied to a particular subpartition processor. Once messages are finished being processed by those subpartition processors, we add them to a commit buffer, and eventually commit them to Kafka. These processors are all sending a bunch of writes to some Postgres servers.

This got us pretty far, but at a certain point, we started running into some more issues, because there's always more issues than you think there's going to be. It's always so simple in theory. It wasn't this simple, I thought it's so beautiful. We ran into some more issues. We're adding additional layers of in memory queuing, so we're asking Kafka to give us lots of messages. When there's not a lot of Kafka lag, this works just fine, because you can ask Kafka for all the messages, and there's not very many of them. At a certain point, you're going to lag your Kafka queue, because you're going to have some operational issues that cause your messages to stop processing. There's going to be a bunch of messages that build up and lag. This means that your consumers are going to ask Kafka for too many messages, and you're going to overload the memory of your consumer processes, which is going to make them fall over, which means they're not going to process messages, which means the lag is going to get bigger, which means your problem is not going to go away on its own. How did we solve this one? It was fairly simple. We added a cap on the number of messages that each consumer instance was allowed to hold in memory. Once we got to a certain number of messages in memory, it just stopped asking Kafka for messages. It would check back at a later time to see if some messages had cleared out, and some memory space had opened up. Once we did this, to our shock and awe, everything was fine for an amount of time. At the end of that amount of time, things were no longer fine. This not fineness had the following characteristics. We started getting paged intermittently on high amounts of lag on our Kafka topic. There was a very clear demarcation line between the fine and the not fine metrics.

Everything was just ticking along at a very low baseline level, and then all of a sudden, out of nowhere, line goes up in a bad way. We delved into the stats on our consumer instances with some expectations that were very soundly busted. We expected that our consumer instances would have had their CPU jump way up, because there's a bunch more messages to process. I'm working really hard to process those messages. We expected CPU to go way up, and we expected the number of idle connections to go way down. In reality, we saw the exact opposite happen, we saw the CPU usage go way down and the number of idle connections jump way up, almost to the maximum. This confused us to no end. We didn't really have a lot more observability options in this. At this time, really, we were just dealing with metrics. We had some unstructured logs that were sitting on the boxes, like the boxes, not in the Kubernetes config store. At this time, I insisted on getting us to a point where we had centralized logging. It was something that I wanted when I first joined the company. It was not prioritized at that time, but I used this as a jumping off point to be like, "No, we need centralized logging. This is the point." We added some centralized logging, so all of our workers started sending data to a central location. It wasn't a lot of data. It was relatively simple. We sent in the app ID, which was the customer dataset ID. We sent in the subscription ID, which is the row ID, the SQL statement that was being executed, and the consumer instance. This allowed us to do some stuff like group our logs by which customer dataset ID was sending us the most updates. What we found was surprising. We found that almost all of the updates that were coming through were coming from a single customer, which we're going to call clothes.ly. It was something on the order of, if clothes.ly was doing maybe 500 updates per second, our next largest customer was doing 30. Every other customer on that same pod was doing maybe a combined rate of 100. It was really dominating the logs. Since we had it, we also grouped by the subscription ID, the row ID. We actually found that there was a single row ID that was dominating our logs. A single row ID was getting all the updates.

What was happening here? It's important to remember that it wasn't just the case that we were getting a lot of updates for a single row. It was also the case that we stopped doing everything else. Remember, number of idle connections jumped way up, CPU jumped way down. There was really nothing going on except the processing of these single updates, the single subscription updates. What was happening? We wanted to see what was in those updates. It was a bunch of updates to single fields at a time. Basically, every single one of those updates was completely incompatible. There were constant updates happening to the location field, so it was jumping from San Francisco, to New York, to Tokyo, back to San Francisco, to Boise. It was just all over the place. It made really no sense to us. We looked in Postgres and we started just refreshing this particular subscription. Location is constantly changing, key-value pairs are constantly changing, but there were a couple fields that were staying very consistent. The device type was always email. That was never changing. The identifier, which is what we use to message a particular subscription, it might be an email, if it's an email subscription, or it might be a push token, or an iOS or an Android device. It just depends on the subscription type. The identifier was never changing. It was always set to admin@clothes.ly.

This raised alarm bells for us. Let's talk about why. OneSignal started as a push notification company. We started branching out from that a couple years ago, we started moving into omnichannel messaging that let our customers reach their user base across multiple platforms. The first way that we did that was by adding a method to our SDKs called setEmail. What this does, as implied, if you have a push SDK that has a subscription in it, and you call the setEmail method, it will create a new subscription that has that email address as its delivery method. It will set the parent of that push record to the new email record. It will store that email record in the SDK. Anytime you update a property from that SDK, it will duplicate the update to that email record. We use this parent property to check and see how many records were linked to this admin@clothes.ly record. Clothes.ly had around 5 million users total, and about 4.8 million of those users had the same exact parent record. Almost every single one of those users had had setEmail admin@clothes.ly called in its SDK. Every time one of those 4.8 million users had any of its properties updated, we had an identical update mirrored to this one central record. Our Kafka queue was subtly weighted with too many updates for this one subscription record.

Why is this a problem? Why is this such a big deal, if there's like a little bit more, there's like a just thumb on the scale? Why is this so bad? Remember that our subpartition queues, these are not totally independent queues. These are views onto a single real queue, the Kafka partition. In a normal situation, where there's a basically fair distribution of messages, we have three queues here. When the green processor finishes processing its message, it can grab the next green message off queue 2, and the next one, and the messages are spaced out about the same. There's always going to be more messages to run. If we put a thumb on the scale, and we weight this a little bit unfairly, when the green queue finishes processing this message, that's the only message that's in memory right now. Remember there's a limit on how many messages that we can pull into memory. There's a real Kafka partition that's representing this queue of messages here, so we can't advance the queue to look for more green messages until we process some of these red messages. What's going to happen is the green queue, and then later on the blue queue, queue 1, those are going to process their messages, and there's not going to be any work for them to do. Unfortunately, because of the way that Postgres works, because we're sending lots of updates to one row, Postgres is actually going to get slower because Postgres is not super great at having one row get hammered with updates. This is a worst-case situation for us. In reality, it was slightly worse than I'm discussing here, because I'm zoomed in to the partition level, but because our limit of messages was at the consumer level, at the application level, it was actually affecting every single partition that was assigned to a single consumer instance. It was smeared across a number of partitions, not just the one partition that had clothes.ly's app on it.

What did we do in this situation? The first thing that we did that very quickly resolved this particular problem was we skipped updates that were going to the clothes.ly admin record. We fixed the limiting, so it was zoomed in to the partition level, so you wouldn't be able to blow up the whole consumer level stats by having one partition go haywire like this. We also put limits in place that stopped customers from linking so many records together using our SDKs, because that was a fairly obvious misuse case. Customer saw setEmail, and they assumed that they were supposed to set it to their email of the account owner.

Lessons Learned

What did we learn during this talk? What did we learn as a result of this very inventive customer? We learned how we can take intensive API write workloads, and we can shift them from the API layer down into asynchronous workers to reduce the operational burden of those writes. We learned how we could do subpartition queuing to increase the currency of our Kafka consumers in a configurable and flexible way. We also learned some of the struggles that you might face while you're trying to do subpartition queuing. We learned how centralized observability was valuable to us in tracking down this particular issue. We also learned that no matter how creative your engineering and design and product teams may be, your customers are almost certainly more creative.

 

See more presentations with transcripts

 

Recorded at:

Jan 02, 2024

BT