BT

Facilitating the Spread of Knowledge and Innovation in Professional Software Development

Write for InfoQ

Topics

Choose your language

InfoQ Homepage News Netflix Builds a Custom High-Throughput Priority Queue Backed by Redis, Kafka and Elasticsearch

Netflix Builds a Custom High-Throughput Priority Queue Backed by Redis, Kafka and Elasticsearch

Netflix recently published how it built Timestone, a custom high-throughput, low-latency priority queueing system. Netflix built the queuing system using open-source components such as Redis, Apache Kafka, Apache Flink and Elasticsearch. Engineers state that they built Timestone since they could not find an off-the-shelf solution that met all of its requirements.

One of these requirements is the ability to mark some work items as non-parallelizable without requiring any locking or coordination on the consumer side. This requirement means that Timestone should not release some messages for processing until previous items belonging to the same work set are completed first. Timestone introduces the concept of "Exclusive Queues" to support this notion.

Kostas Christidis, a software engineer at Netflix, explains how Exclusive Queues work:

When an exclusive queue is created, it is associated with a user-defined exclusivity key — for example, "project". All messages posted to that queue must carry this key in their metadata. For instance, a message with "project=foo" will be accepted into the queue; a message without the project key will not be. In this example, we call "foo", the value that corresponds to the exclusivity key, the message's exclusivity value. The contract for exclusive queues is that at any point in time, there can be only up to one consumer per exclusivity value. Therefore, if the "project"-based exclusive queue in our example has two messages with the key-value pair "project=foo" in it, and one of them is already leased out to a worker, the other one is not dequeueable.

The below figure depicts this example.


When worker_2 issues a dequeue call, they lease msg_2 instead of msg_1, even though msg_1 has a higher priority
Source: https://netflixtechblog.com/timestone-netflixs-high-throughput-low-latency-priority-queueing-system-with-built-in-support-1abf249ba95f

Another requirement is that a message can only be assigned to one worker at any given time. It is important since work that tends to happen in Cosmos is resource-intensive and can fan out to thousands of actions, and one of the goals was to reduce resource waste. This requirement rules out eventually consistent solutions and means that Netflix engineers want linearizable consistency at the queue level.

Netflix engineers achieved this requirement by maintaining a message state per message. When a producer enqueues a message, the message is set to the "Pending" or "Invisible" state, depending on the message's optional invisibility timeout. When a consumer dequeues a pending message, it acquires an exclusive lease on that message, and Timestone sets the message in the "Running" state. At this stage, the producer can mark the message as "Completed" or "Canceled". Each message can be dequeued up to a finite number of attempts, after which Timestone moves it to the "Errored" state. The following diagram illustrates all possible state transitions.


Source: https://netflixtechblog.com/timestone-netflixs-high-throughput-low-latency-priority-queueing-system-with-built-in-support-1abf249ba95f

The Timestone server exposes a gRPC-based interface. All API operations are queue-scoped. All API operations that modify the state are idempotent. The system of record is a durable Redis cluster. Redis persists each write request to a transaction log before it sends a response back to the server. Inside Redis, a sorted set sorted by priority represents each queue. Messages and queue configurations are stored as hashes.

Christidis comments on how Netflix engineers achieved atomicity with Redis:

Almost all of the interactions between Timestone and Redis [...] are codified as Lua scripts. In most of these Lua scripts, we tend to update a number of data structures. Since Redis guarantees that each script is executed atomically, a successful script execution is guaranteed to leave the system in a consistent (in the ACID sense) state.


Source: https://netflixtechblog.com/timestone-netflixs-high-throughput-low-latency-priority-queueing-system-with-built-in-support-1abf249ba95f

Timestone captures information about incoming messages and their transition between states in two secondary indexes maintained in Elasticsearch for observability purposes. When the Timtstone server gets a write response from Redis, it converts it into an event sent to a Kafka cluster. Two Flink jobs, one for each type of index Timestone maintains, consume the events from the corresponding Kafka topics and update the indexes in Elasticsearch.

Netflix built Timestone to support the needs of its media encoding platform, Cosmos. Timestone also backs Conductor, Netflix's general-purpose workflow orchestration engine, and it acts as the scheduler for large-scale data pipelines.

About the Author

Rate this Article

Adoption
Style

BT