Facilitating the Spread of Knowledge and Innovation in Professional Software Development

Write for InfoQ


Choose your language

InfoQ Homepage Presentations Fault Tolerance at Speed

Fault Tolerance at Speed



Todd Montgomery discusses the techniques and lessons learned from implementing Aeron Cluster. His focus is on how Raft can be implemented on Aeron, minimizing the network round trip overhead, and comparing single process to a fully distributed cluster.


Todd Montgomery is a networking hacker who has researched, designed, and built numerous protocols, messaging-oriented middleware systems, and real-time data systems, done research for NASA, contributed to the IETF and IEEE, and co-founded two startups. He currently works as an independent consultant and is active in several open source projects.

About the conference

Software is changing the world. QCon empowers software development by facilitating the spread of knowledge and innovation in the developer community. A practitioner-driven conference, QCon is designed for technical team leads, architects, engineering directors, and project managers who influence innovation in their teams.


Montgomery: My name is Todd Montgomery and we'll be talking a little bit about some protocol design and a little bit about fault tolerance and how speed and efficiency is something that can be carried through a design. We'll do that by example.

A little bit about me. I actually started as a researcher in software quality at NASA, so, figuring out how to keep the shuttle from falling down, literally falling down through the atmosphere would be a good thing, keeping the International Space Station up there, keeping air in the International Space Station. It’s probably a very good idea if you want people there. If you don't, you could do that very easily. That set the tone for a lot of my career in terms of looking at software quality and a lot of things around quality. That has affected me even to this day, even though I stopped working with NASA in 99.

From that actually, I started working in high-performance messaging systems, communications, working with traders on Wall Street, foreign exchange all over the world. Various things like that. When we're talking about fault tolerance at speed, I wanted to talk about systems that I've seen for a while and systems that we, me and a fellow, have actually been putting out in open source.

We'll talk a little bit about different types of fault tolerance, we'll start out looking at some models of fault tolerance. We'll center on one particular type, which is the idea of clustered services and replicated state machines. We'll talk about an open-source project called Aeron, that myself and Martin Thompson have done, that actually supports this and uses it. Then, we'll talk about how designing it for making things very efficient and fast will speed it up. It's not only just speed, it's also efficiency. We want to actually think about how do you inject efficiency throughout the whole time that you're dealing with some piece of software.

The reason why we want to talk about efficiency is we're actually going to be seeing some of the highest data center usages in the world in 2020 and 2030. The projected energy consumption is extremely high. We may look at it and say, "Facebook and Google have all these initiatives on renewable energy." We'll start looking at them a little bit closer, certain things like you buy more energy, renewable energy, than actually can ever be produced right now. Is this real? Even though it's renewables, should we just assume that we don't care about things and we'll just let them be slow and inefficient?

A lot of times we seem to assume that efficiency, security, quality, all these other kinds of non-functional requirements are special characteristics we add later, if at all. That's actually not true. We a lot of times have these as things that we have to design from the up front if we want to actually be able to take advantage of it.

Fault Tolerance of State

It's a lot like fault tolerance. Systems that don't have to be available, it's ok if they fail. Can they restart? A lot of times when you start to think about how you make a system fault-tolerant, you may start like this. You've got a service running in a client connected to it, or a set of clients, and your first thing to make this system fault-tolerant might be as simple as we're going to run it in the cloud. That's it. We've now made a fault-tolerant system, we can restart the system. It's now fault-tolerant. If it fails, we just restart it. That actually is a form of fault tolerance. It may not be the best form of fault tolerance but it is a form of fault tolerance.

We may go even further and say, "We'll just not run just one, we'll run a whole bunch of them." Now we have our client that has to figure out how to access which one, and that may be a very simple matter, it may be more complicated, but we don't just have one client, we have many. Now we have to figure out how to get to which one has the data that we want. These are all stateless so it doesn't matter. If you remember our keynote from this morning, a lot of times it does matter that you have some state that's associated with those.

That's where things get really complex because we have state that is basically between those services. How do we manage that? It used to say that you could just add another layer of indirection and you could solve that. You can, and you can also do it by just mirroring it again and kicking a can down the road because you can just push it off to something else to actually store your state for you. That'll work. We do it all the time. Databases, caches, stores. Everything. It looks like that again, because if you start now looking at more of the back end, you still have that same problem.

A lot of the problems that we have with fault tolerance are really the fault tolerance of state. How do we make state fault-tolerant so that if you lose part of it, you can still continue? Therea are lots of different things we can do. We can partition it so that you can basically load it out and you can replicate it, as well as partition it, and various things along this same continuum. You can make it so that you have the data in different places. You can do a lot of different things with this and we have systems that do this, that's a form of fault tolerance. That works pretty well most of the time.

There's something else we can do though too. You'll notice that with these systems, it's a lot of the input to the system and the generation of data that's coming into the state, that is really what we want to capture. In the same set of inputs, if we're going to have a deterministic system should generate the same state. If we look at it as a log, now we have things like CQRS and things like that, event stores, that kind of approach. These all work pretty fine, but it's an even more general thing that we can do. If we think of it as a continuous log immutable with the ability to take snapshots in the log and replay them.

This is just an example, here's a set of events. One, two, three, four, five, six, and then, onto X. If we look at the states associated with it, we have generated state at each of those events. If we look at the state, it is a conglomerate of all the states of all those events played in that order. Very deterministic. If we take a snapshot, at this point, then we can also say that that state can be reconstituted from the snapshot and replayed forward in that log. This basic concept is used all over the place. We use it in databases. We use it on spacecraft. Cassini does this. It takes a snapshot before it does anything dangerous. Or it did, it's not there anymore. That's not because of this, I swear, it's not because of that. It continued its mission, it fulfilled it, and it's done. It's the idea of saving your state, and then, using that as a periodic marker, and then, being able to, when you recover, use that as your first step to reconstitute, "What am I doing?"

Why would something like Cassini want to do that, a spacecraft? Imagine, if you will, you're a happy spacecraft, you're flying through the universe, you're thinking, "I'm going to get to Saturn." You start to rotate so that you can take advantage of the sunlight, and your orientation to charge your systems. What happens if you fail while you're pointed into the Sun, you've got some fairly delicate sensors? When you come back up, you better rotate out of that quickly or else you burn up. Why? Because these are very delicate sensors, they will basically become inoperable and now the spacecraft won't be able to know where it is.

What do you do? "I'm going to turn into the Sun. I snapshot. I then turn. If I have a fault, what was my last event? 'I'm going to turn into the Sun.'" You know that you might be in a dangerous state, that idea how much data do you need? Very little for your state, only your critical state, but you still do need it. That's a little bit more of a system that uses this but uses it in a very different way, but the idea is that you can reconstitute your state from a very small piece of data, that's key.

Clustered Services

We have this idea of this log and we have services that are attached that are reading this log and they're basically reading and consuming the archive of this log, in a sequential manner, all of them at the same time. This is what's called a replicated state machine. Each replicated service, or replicant, sees exact same event log, it sees the same input ordering. The log is replicated locally so that, if something fails, it doesn't have to go and get the log, it's saving it locally. All recovery can be done locally.

This concept's not new, it's been around for a very long time. There's all kinds of different ways of doing this. You can do this with any system, and I've seen various messaging systems used to construct these exact types of systems. The idea of a checkpoint or a snapshot, if it's an event in the log and it takes and generates a piece of state, it's rolling up all the previous log events to get you to a particular state for the service. everyone will generate the snapshot at the exact same time. If it is an event in the log, when did you take the snapshot? Did one take the snapshot at one point one take it at another? No, it doesn't matter. If it's an event in the log that says, "Take a snapshot," guess what? Everyone takes a snapshot at the exact same point in the log. Snapshots would be the same.

Here's a real big question. When should a service consume a log event? We have a log, it's being generated, something's generating this log and it's being replicated around, but when do you consume it? That is a very good question, because if you consume it too early and you fail, you may have been the only one to act on that event. If you can know that there are others who have that log message and you know that they have recorded it to their own local archive, you know that it is replicated.

To give an idea, let's say we have three different services and they all have the same set of events but some of them have more of the events than others. The first one has only event one and two, middle one has events one-two-three-four-five-six, and the last one has one-two-three-four-five-six-seven. For example, when you get an event, process it, if you are that last one and you get seven, you process it, and you die and that message never makes it to anyone else, have you affected the world? You might have. That may or may not, depending on the system, be a stable way to process that event. You could definitely process one and two because they all have that, but is that too late? Is system latency-sensitive in some way so that, if you wait till everyone has it, then you can process it?

Also, what happens if you wait for everyone and you never get everyone? Like, one dies, you have to wait for it to recover, get everything before you can continue? Is that being fault-tolerant? Things to think about.

Here's one thing to also think about. Once processed, an event cannot be altered. You want to make sure that, if that event is processed that the log is not altered afterwards. You have to balance the effects of what happens if you have faults and you continue but you don't want to actually say that you process something and then go back and you say you didn't. That comes into Raft consensus.

Raft is a very simple consensus mechanism. It basically says, "An event must be recorded at the majority of Replicas before being consumed by any Replica." This have been around for quite a while, Raft went one step further and actually showed certain properties on this basic concept of consensus. You have a consensus on the actual message for the group. In this case, you have consensus that six has been received by a majority. At that point, you can go ahead and process it because you know that if you lose a minority, you can continue. In other words, if you lose one, you can continue. If you lose two, now all bets are off. You don't know what you did but you can now see that you can continue, even if there's loss, just by losing one member or losing a minority. In this case, if you were to lose the center one, you could still continue on with the one on the end there.

Raft requires a strong leader, it's an elected member of the cluster. It orders input, so it's the one who actually says, "Here's your log." It's taking an input, it's taking in events, and it's ordering them by disseminating them back out. It's also disseminating what consensus is. In other words, as different events reach consensus, the leader is the one who's informing the followers basically of what is possible for them to process. Our picture changes a little bit. We have the service, we have a local archive, but we have this consensus mechanism in there that is controlling when an event can be consumed.

Before we go further, let's talk about what Raft is and what it's not. Raft is an algorithm with formal verification. It has certain properties, when implemented given how it's described, that follow along with the things that we've been talking about. It will say things like, "You can lose a minority of members and continue operation without pausing or anything else." You can continue.

What it is not is, it is not a full specification. You can go to the ITF and you can and say, "Give me the TCP spec," you can't go and say, "give me the Raft spec," because there isn't really a specification, at least one that I know of. They may exist. Nor is it really a complete system. Raft has some great properties but it is not complete in and of itself, you have to add things to it to actually make it more useful. In the real world, when Raft gets implemented, a lot of times you add things to it. What we've done with our implementation here is we've actually add timestamps to events so that the concept of time is injected into the system, it's in the log. Time doesn't progress unless the log progresses. Raft is specified as a blocking RPC mechanism. By making it async, which is the thing we're really interested in with this that will make things go a lot faster, you have to break that asynchrony but you can't break it in such a way that you're going to actually break all the fundamental properties that Raft has.

Last thing is you can also, since you have the concept of time, you can actually have also timers within the log. In our picture, one of these is going to be the leader, a strong leader, as Raft would say, and all the clients will communicate with it and it will disseminate the log to the followers.

A quick aside. The reason why this is important in a lot of the stuff that I do is, it's basically it's a very deterministic system. The log is immutable, it can be played, stopped replayed. Each event is time-stamped. The services are restarted from a snapshot and then they replay the log forward. What can you do with that? The simple thing is you can do distributed key/value stores, distributed timers, distributed locks pretty simply from it because you have a lot to base it on.

A lot of the entities that I work with are more interested in this, matching engines, order management, market surveillance for trading, profit and loss, risk, different models that are associated from that. Once you have a deterministic log that's immutable and those events come in, think that you're stock ticker. Think of the old-style stock ticker that's just clattering away with events that are coming in. That's an immutable log. You can't go back and say, " I didn't mean that." Technically yes, you can, but that doesn't happen. Just think of it as your ticker. It turns out that a lot of things in finance that has to do with the markets, a lot of time look like this, they look like a continuous log of the things that you want to do as you consume it.

The reason why a lot of the systems that I've worked on have been in this category is, looking at it from this perspective, it's deterministic. Think of what a matching engine does. How many of you have heard the term "Matching engine" before? What's the matching engine do is, it matches, "I want to sell something," with, "I want to buy something," and it figures out what that price is. That's what they do. It matches bids and asks, technically. It even gets more specific than that, but that's all it does. The idea of placing an order and saying, "I want to sell something," and then, also placing in another event that says, "I want to buy something," if you have these are deterministic, your logic for matching is really quite simple. I've just vastly overstated that, it's not simple at all, but it can be simpler in a fault-tolerant world when you think about these events coming in as a law.

It's not only that. Venue ticketing, reservations, auctions, a lot of these things where you have a very hotly-contended database. When you think about the events that are being operated on it, they tend to, a lot of times, be a better match for something like this and the logic behind it as opposed to storing it into other logic that has rollbacks and everything else attached to it. If you're going to build this system, you need a communication substrate, that's where Aeron comes in.

Why Aeron?

Aeron, if you're not familiar with it, is an efficient, reliable, UDP unicast, UDP multicast, and IPC message transport. It has various language bindings. It has a very fast archival and replay mechanism, and it's the efficient bit because it does things at very high speeds. It's not unusual to send IPC messages between processes at 100 million messages a second, you can get that pretty much out of the box.

All the communications, publications and subscriptions, archival and replay, shared counters, various things like that are what you need to build something like this. A consensus, the idea that consensus point, if you think of it as a stream position – within a lot of communication systems, think of TCP, you have a stream position where you are in the stream. It's the byte offset within the stream. Aeron has the same concept, it's the byte offset within the stream. That means that the idea of, "What can I consume in the log?" is nothing more than an integer. I can consume up to byte 1024. Remember, it'll never move back. It only moves forward, so, it continues to move forward. As the leader is disseminating that consensus information, it's not saying, "This message is now stable." It's saying, "everyone can consume up to .2048," or "position 3192."

You need efficient batching to be able to do these things too at speed. You can't get to 100 million messages a second, unless you understand how to do very efficient batching. For correct operation, you also have to have some concept of flow control. You can't let a sender just continue to send and blast out when the consumers cannot handle the rate. There has to be effective and efficient flow-control mechanisms within the system to backpressure everything up.

Designing for efficiency – Aeron itself has a lot of different things that it's been designed to be incredibly efficient. When you put something like this clustering or consensus on top, you also have a lot of things to consider. You start with this stuff, you start looking at cache hit and miss ratios, branch prediction, allocation rates, garbage collection, inlining, optimizations; but not yet. When you start looking at a design and you start thinking about what the system is going to do, these may be in the back of your mind, if you've done them a lot, but they shouldn't be the first thing in your mind. They should be the last thing on your mind when you start.

As you start thinking about the design you may start thinking about the ownership dependency and coupling of objects in the system, or the complexity or layers of abstraction, which don't come for free resource management. You may start thinking about those, but again, closer but still not quite yet. Think about things at a higher level.

When we started to look at what we wanted to do with consensus, one of the first things that we thought of was Amdahl's law, which is basically saying, "The serialized part of your system will dominate your speed up. No matter how many threads you throw at it, if your system has 5% serialization," in other words, 5% of it is just inherently serialized, "you will never go any faster than 20 times, no matter how many threads you throw at it." It turns out that Amdahl was an optimist because work by Neil Gunther on the universal scalability law actually says that that was the best case, it gets even worse when you start throwing incoherence. The communication between entities so that they can communicate effectively, that communication aspect will limit your scaling even more. In fact, it makes it worse. What this graph is showing is the blue is Amdahl and the red is actually universal scalability law with a 250-ns penalty for coherence. That's fast; and that's just a little bit.

Notice, you get up to 32-64 processors, and then, everything starts to go really bad. How many of you have seen systems where the more that you've actually powered more threads and you think, "It's going to really go faster," and all of a sudden, it goes slower, that's what you're hitting. The concept here is, "Coherence is bad. You're going to have some of it so you have to think about what it is." You just can't throw threads at it, there's going to be, in every system, some part of it that is going to be serialized, so identify it.

Let's do that, let's break this down. Let's break down the interactions which I really quickly went through. There are fundamental sequential operations within Raft that have to happen for the system to continue. You ingress a message, a message comes into the leader. It sequences it and it disseminates it, so it takes it from the client, it sends it down to the followers. The follower can't consume it til he has the message. This is a sequential part.

It's even more than that because the followers have to instruct the leader what position they have, in other words that they've received a message. Then, the leader has to disseminate the consensus point, we call that a commit message. Those sequence that, sending the message down, appends coming back up and commits coming back down, that's a fundamental sequential operations, you're not going to go any faster that. No matter what you do, you're going to be limited by that until we have sufficient qubits that we don't have to worry about it and quantum computing just fixes it. I'm sure that will involve blockchain at some point, in some way..

When you start thinking about it, what you can make faster, the only way that you make these faster is to not do some of them. When you start not doing them, you don't have fault tolerance, so this is a fundamental part of it. If you break it down into what are the things that can be pipelined, operations and batching, that's where even these things can get fairly efficient. The leader and the follower have an archive position that they have written to disk that's it's locally stable. That is an activity, saving it locally, can be done asynchronously to the rest of the operation. You can have it as a background thread, you can have it as something else. It turns out, the operating systems are really good at doing that in the background for you. You just instruct it to keep flushing that to disk and figuring out where that is, and you feed that back. That's one thing which is an asynchronous pipelineable part, is that writing to disk.

Also, the act of sending the appends back. What are you doing at that point? You're just looking to see what's made at the disk and you're continuously sending that back. That's where batching can come in. Very simple, it's an integer. It should be pretty small, so you should be able to send them fast if you need to and they should conflate. You only need the last one, you don't need all of them, but they should compress real nicely too if you really wanted to think about it that way.

Also, the leader is sending down a commit position. the leader doesn't have to immediately send a commit position when everything changes. It could do a little bit of batching, it could do some work and then say, " there's nothing right now for me to do, I'm just going to send a commit position down for that whole batch." There's lots of different things within that that can be effective and efficient just by batching them up. Those are things that you can start to play with.

Doesn't this complicate recovery? Now you've got what you've saved locally and the position of it is different than what you've informed the leader that you're on, and there's more data coming in, and there is a commit position coming down and all this. Isn't this complicating the recovery? How in the world do you know what state you're in? guess what? It doesn't complicate it at all because, let me tell you, when you're saving something to disk, you have no idea what's actually on disk, what's actually made it to disk and what is stable. You have to handle this anyway. When systems fail, they never fail in a graceful way. Unless you've orchestrated it that way, they usually fail in very complicated ways that they're in all kinds of different states.

The first thing you have to do, when you start to elect and want to determine what the state of the world is, you still need to assert the state of a cluster and locally catch up. What's going to happen is, you've probably had a failure of the leader, the followers are in very disparate states. No matter how you've locked them together, they're still going to be all over the place. You have to figure out who has the most, who's seen the most, all that. Just because you have a system which is doing that in lockstep – this is where Raft is so good, Raft doesn't assume any of that. It doesn't assume that, "Everything's happened in lockstep, therefore everything's good." No, it's saying that you have to assert this behavior, you have to make sure that you can progress forward. You have to figure out who is the most electable leader because of the one that has the most data.

That's where things really start to shine and you can get really aggressive with batching, you can get really aggressive with pipelining, and you're just going to make it faster. You're not complicating your life at all because you have to, in a well-designed system, follow it anyway. That's a situation where, "Here's the limit. You know that you can't get any faster in certain things unless you find a different mechanism because there's some sequential part. Here's the part where you know that there are certain things that you can actually optimize really well and get some really nice advantages from." Even those have limits. There's limitations to this efficiency and it comes in the form of throughput and latency.

Let's break that operation down. Ingress message comes in, leader disseminates down, appends come back up, commits flow back down. That's simple sequence. If we think about it from a round-trip time perspective, a client sends a message to the leader, the leader's sending that log event to the followers, the followers send append positions back, and then, the leader sends a commit position back down to the followers. Just written out differently from left to right instead.

There are services attached to the leader and the followers. One of the things that you can look at and think as an additional way to make things a little faster is, when a leader gets a message and sends that log event out and it gets an append position that it knows that the majority of the followers are at a particular position, that means that the service on the leader can now process it. That's nice, that's going to be faster than any other service because, at that point, the leader knows what the commit position is but it hasn't sent that message down to the followers. That's an optimization, you know you can do a little bit better right there for that service.

If we think about the network being a constant delay, in other words, you send a message, it takes exactly one second to get to the other side. No network works like this but it lets us think about things a little bit more. If we think about a round trip as being something like one second to one place, and then, another second back from the client to the service A on the leader is pretty much, if you had no consensus, would be half a round trip. It's just one-way delay. The service on one of the followers is actually one round trip because it has to get to the leader, the leader then has to send it to the follower.

That's without consensus. If we wait for the commit, in other words, we know when the consensus has been received getting to service A, we have 1.5 round trips. One to get there, we then forward it on, then the append comes back. At that point, you know that you have consensus, you can process the message on the leader. A follower has to wait for an additional half a round trip for that message to come down.

Those are some limits, we know we can't do any better than that. That gives us, from this standpoint, that we know that there is a little bit of an advantage. We know that the service that is running on the leader is going to be the fastest service by definition.

If we start to fill in a few things here, if we look at shared memory with round-trip times that are less than 100 ns, you start to get some pretty interesting kinds of numbers that you can think about. 150 nanoseconds from a client to the leader service with consensus, that's pretty good. And it's 200 ns to the followers. People do run like this, they run with multiple services on the same machine, all communicating with shared memory. Nothing else. Why would you want to run that way?

Your regulations may say that, as long as it's in a different memory region of the machine, it's considered stable. Read your regulations real carefully. It has to do with compliance, very few systems run like this, but they can, there are options too. Also, this might not be actual shared memory, it may be over PCIe, so it actually is basically another machine in the rack, it's just not using Ethernet. Or maybe it is.

If you're looking at data-center round-trip times less than 100 micros, your differences are still going to be in the 50-microsecond range but that can be very effective. If you're looking at within a rack with kernel bypass or limitations, and that's the bottom one there, you're looking at 15 micros from a client getting into a service on the leader, with 20 micros being into a service on the follower. This is giving you an idea that the round-trip time matters here and these are limits based on what your expected round-trip times might be within a certain range.

How does this match reality? Here's a measured latency. It's a single client, bursts of 20 messages at a time. Each message is 200 bytes, it's a 3-node cluster, and all the services do is echo the payload back. You measure from the time that a client sends it until it comes through the whole cluster, all of them, and comes back to the client with three copies. These are older Xeon's memory. This would be the hardware that you would see at probably a trading shop that's got it in the corner. It's ok, it might be running some pretty critical stuff that no one touches, but this isn't top-of-the-line hardware by any stretch but it's decent. This is a good starting point.

You can see that for your median, your 0.90 and your 0.99, and even up into the 0.9999, it's pretty flat. It doesn't really matter what's your throughput was. It could be 1,000 messages a second, it could be 200,000 messages a second, this is aggregate rate. It's not something slow and it's sending it a fairly high rate. How many of you think that 200,000 messages a second is a high rate? It's not. This is a rate that we don't expect it to queue. It should be able to keep up with this rate even with pretty iffy application logic. It should be able to keep up with this. It gives you an idea that, at that rate, you can still get something pretty good. These ae pretty consistent. This wasn't exhaustive, but this is courtesy of a Mark Price who I do a lot of work with. It's pretty much what we would expect, throughput isn't really affecting things that much. It goes pretty fast.

This is how it starts. You start thinking about early, upfront, "What are the limitations that I'm going to be working under?" I do this with protocols, I look at the protocol itself because, it doesn't matter what happens in the implementation, if the protocol says that, "This happens, then this has to happen," the natural sequential part. You have to realize that that may be a bottleneck, sure, but you're not going to get around it unless you break it apart somehow. Then, start looking at what things can you break apart and which things can you start to do. Then, you start looking at the things like your caches, you start looking at branch prediction, you start looking at virtual memory, you start looking at all the other things after that.


Efficiency is part of design. When I think about it, I can't think of any system that I've worked on throughout my career where efficiency and speed hasn't been some unspoken requirement. If a system doesn't operate fast enough and it seems like it's unavailable, it might as well not be there. Efficiency is one of those things that you can start really early. You're not over-optimizing by thinking upfront, "What am I limited by? Where are the places where later I can make them faster?" Even if that's all you do, and setup, and think about, "What is the areas later on that can be optimized?" it still is incredibly valuable. The power of a timestamp-replicated blog. It's a real simple concept that can make really complicated systems a lot simpler. Replicated state machines are a great fault-tolerant mechanism.

I want to thank Adaptive who is an entity that has actually sponsored the development of clustering and has been using it. Just to show you how efficient you can be, that is three nodes of a cluster and a client running on Raspberry Pis that is not where I got the benchmarks, remember. I had Xeons and I don't think Raspberry Pis come with Xeons. I'd be surprised. That would be great though, if they did that would change everything. These Pis were actually able to do some pretty interesting workloads.

Questions and Answers

Participant 1: On the recovery slide, when you showed the three different machines and the three different states of where they had confirmed where they had written - and I forget the other one. In that example, do you just look for a majority of the lowest level of confirm? What are the salient points there in Raft in terms of reconstituting the state?

Montgomery: We had to invent a few terms. Terminology is something that is real important to get right. We used ballot, canvas, and candidate. The idea is, first, when you enter an election, you don't know what the status of everyone is, you only know your own status. What do you do? I will be a leader, if I can, and you send out your position in the log. Where is your position that you have seen up to and what do you know as the last message that was the commit message? There's two, how far your log goes and where you have committed up to. Everyone else, hopefully, sees this. If someone is higher, they'll say, "No, I have more data and here's where I know that I've committed up to."

If they initially say, "I will be leader," if there's a better leader candidate, they set down. That's how it works; now there's lots of different things around this, how long you canvas, the votes that come in, and the process of getting a nominee. The first step is you have to say "Here's where I am," and everyone has to say, "here's where I am." The only way they can not say anything is if they know that someone else will, "It's a better choice, I'm not saying anything," other than they know they have less. That's effectively when you start that process. It does have the idea of determining the leader, and then, you have a process of getting everyone to agree and to start the process of using it.

Trust me, that's one of the hardest things within these systems to get right, I've done it. Before Raft I worked on a system called a Reliable Multicast Protocol. I didn't make up the name, someone else did. Essentially, it had a reformation step that we proved actually ended and had certain properties. It took three years. It takes a long time to get these right. Raft is a much simpler version of that. It still takes a while to get it right.

Participant 2: The leader [inaudible 00:43:50]. The leader [inaudible 00:43:58] some of the followers, [inaudible 00:44:00] to come back and say, "I got the message," it's just not [inaudible 00:44:04]

Montgomery: Think of it as the leader is constantly sending out the log and the followers are, not only receiving that log, but they're writing it locally or archiving it. They're periodically sending back to the leader what position they have saved. In other words, they've saved up to 1024, now they've saved up to 1428, and all that's doing is that's what we call the append position. They've appended locally to the log, but that's all that they're saying, that, "We've received. We've received up to this point."

Participant 2: The leader doesn't wait for them to [inaudible 00:44:46]? [inaudible 00:44:48] then the leader is going to write?

Montgomery: Perhaps. It depends. Let's take it where a situation where there is loss. Three members, leader sends message 10. Dies before it gets out of the wire. It never processed message 10 because it never had anyone else to actually acknowledge that they received it, so it couldn't have processed it. That's if message 10 was never seen by the followers when they start an election. They're going to start at 9 or some other point that they have that they've gone to.

Now let's say that one of them received it. One of them received 10, and it was the only one to receive 10. Now you have to go and to catch up because one of them didn't get 10, so it has to get 10, come up to the set, even with the new leader, and then continue. If it fails, and we're doing that, let's say that there's a cascade, leader goes, 10 was received by one, still wasn't processed. It got 2, the entity, the follower, had 10 but can't process it because it was never told to. If it dies at that point, the remaining members of the group have no knowledge of 10. But it was never processed, that's the key. I won't say it's a house of cards because it's not, it's built on a foundation and there's very subtle interactions with all distributed algorithms. Raft is no different in that. If for that message to have been processed it had to have been received by and appended to a majority of the followers and the followers had to have communicated to the leader that they have appended it, it builds on that.

Participant 2: [inaudible 00:47:05] processed only when the follower says that [inaudible 00:47:08]?

Montgomery: When the leader has said that it's ok to commit this message, in other words, to process it.

Participant 3: One of the things I'm trying to understand with peer design is, by doing all this batching and not doing the back-and-forth and all that stuff, are you sacrificing, for the sake of performance, consistent reads and writes?

Montgomery: In a replicated system, it's not like a normal linearizable system like read/write operation because you've got a process of a log. Let's say that the processing of a log is setting a key equal value. The best you can say at any one point within that is, at this point in the log the value within this key is X. That's all I can say. It's like the version version that Pat [Helland] this morning talked about. All you can really say is, at a particular time or instance or version, then that's what the value was. It does mean that "Everything previous to it," but it does mean "At a particular level."

Participant 3: True, but, in one of your previous graphs, like all those [inaudible 00:48:34] had different commit positions. To me, the value [inaudible 00:48:40] persisted but all of them could directly be reporting a different answer at that exact moment.

Montgomery: Right, and so, how these systems typically get around that if they need that read-your-rights approach? You don't query the system through anything but the log, so you put actually, in the log, a query. It goes through the log, processes it and says, "Ok, at that position." Yes, it's trivial but it actually works. It does exactly what you would think, that's how actually a lot of the systems work is they send the query through the log. I don't know if it's clever enough but it works.


See more presentations with transcripts


Recorded at:

Jan 20, 2020