Facilitating the Spread of Knowledge and Innovation in Professional Software Development

Write for InfoQ


Choose your language

InfoQ Homepage Articles Highly Distributed Computations Without Synchronization

Highly Distributed Computations Without Synchronization

Synchronization of data across systems is expensive and impractical, especially when running systems at the scale seen by institutions that deploy applications on mobile devices or provide Internet of Things (IoT) services. Not only does the cost increase as the number of clients increases, but it is also not possible to synchronize operations when clients have limited access to connectivity. This makes traditional approaches, such as Paxos or state-machine replication, inviable for coordinating replicated state.

High-availability and fault-tolerance of these systems is also a major concern, given that for most of these companies downtime is linked directly to revenue, as exemplified by Amazon in their work on Dynamo, where they popularized the concept of “eventual consistency” as one solution to the problem. However, there's a minimum to just how much state can be reduced while still performing useful distributed computations.

Consider a large mobile gaming company that needs to share client state across user devices: for example, a shared virtual wallet across all devices owned by a particular user, or a shared list of items across all members of a team's devices.

In the ideal situation, we would like operations performed using this shared, replicated data to be able to succeed when clients are offline. However, allowing operations to be performed on shared data structures, while avoiding synchronization, is vacuous and a recipe for incorrect programs. Therefore, we aim at creating deterministic applications that, when operating over data structures that guarantee convergence in the event of concurrent operations, guarantee convergence of the applications themselves.

In the rest of this article, we explore the basic building blocks for crafting such applications.

Conflict-Free Replicated Data Types

Conflict-free Replicated Data Types (CRDTs) provide one solution to the “semantic resolution” problem described in Amazon's Dynamo paper. In the description of their highly-available shopping cart, a problem exists where concurrent additions and removals of items to a replicated shopping cart can result in divergence of the shopping cart: in this case, causality tracking mechanisms such as version vectors or vector clocks, which can be used to determine ordering of events in a system, can only determine that the operations occurred concurrently.

Dynamo addresses this by storing both copies of the divergent item and returning both copies to the user next time they attempt to retrieve the key (Note: Several of the Dynamo clones that surfaced after the publication of the original Dynamo paper also take this strategy, such as LinkedIn's Project Voldemort and Basho's Riak.)

At this point, the user is supposed to resolve these conflicting writes, and write back the resolved object. In the shopping cart example, the two shopping carts are joined using a set union operation to perform the resolution -- however, depending on how the items in the set are modeled, deleted items may be “resurrected” under this resolution logic.

In “Conflict-free Replicated Data Types,” Shapiro et al. formulate a model of Strong Eventual Consistency (SEC) in which an object meets this criteria if the object is both eventually consistent and has a strong convergence property. It’s convergence property is defined as correct replicas that have delivered the same updates have equivalent state.

Under this model, objects are no longer susceptible to these “concurrency anomalies”, because objects that observe Strong Eventual Consistency are designed to converge correctly under both concurrency and failure. This property makes these data types very powerful for ensuring correctness in distributed systems, especially distributed databases which use optimistic replication techniques.

These data types come in two flavors: state-based, which rely on the properties of semilattices, and operation-based, which are more space-efficient and rely on the commutativity of all operations. These data types take a principled approach to eventual consistency: the data structures by design encode information about the events used to create them and, through this metadata, can resolve concurrent operations deterministically. In this article, we are going to focus on state-based CRDTs.

So, how can we compose conflict-free replicated data types into programs, while ensuring that the strong convergence properties of individual CRDTs are preserved through this composition?

Distributed Deterministic Dataflow

To solve this problem, we turn to deterministic dataflow programming, a form of functional programming where a series of agents, or processes, synchronize on the binding of variables in a shared single-assignment variable store. The following figure shows an example of processes communicating with the shared constraint store. (For more detail on this model, see Chapter 4 of Concepts, Techniques, and Models of Computer Programming.)

The following figure shows an example of processes communicating with the shared constraint store.

In this model, σ represents a shared variable store and P1 and P2 processes. Our store provides two primitive operations: read and bind. read is a blocking operation against the store to read a variable; this operation will block until the variable is bound. The bind operation allows assignment of a variable in the store to a particular value, or to the value of another variable.

What's a join-semilattice?

We can extend this model to state-based conflict-free replicated data types, as previously discussed. Recall that state-based conflict-free replicated data types rely on the monotonicity properties of join-semilattices.

So, what's a join-semilattice? A join-semilattice is a partially ordered set that has a binary operation called the join. This join operation is associative, commutative, and idempotent and computes a least upper bound with respect to the partial order.

To give an example, the natural numbers form a lattice where the join operation is the max operation.

Generalizing to join-semilattices

We start by looking at the single-assignment case as a lattice. Consider the following:

If we generalize this model, we can allow variables to re-bind as long as the update is an inflation that will trigger a bind of a new state, which is higher than the lattice. Let's walk through an example to see how this works. In this example, for simplicity, we assume that the single-assignment version of our dataflow language allows variables to be bound to natural numbers. Here, we represent the unbound state as ⊥, while we represent the error state as ⊤. This lattice serves as a state chart: it shows what directions the variable’s state is allowed to travel. In this case, we can change an unbound variable to have the value 1 (or 2, or 3, or so on.) however, once the value is bound, if we attempt to change its value again, we move to the error state.

Consider the following natural number lattice which computes the maximum observed value:

In this example, subsequent bind operations compute the join between the argument passed to the operation and the current value; the result of this join is then used as the value of the variable. Similar to before, think of this as a state chart: as long as the number keeps increasing we can continue to change the value, whereas before we triggered an error on a subsequent change.

Additionally, we extend our model to provide an additional  primitive, similar, but related to, the threshold read operation as described by Kuper in “LVars: lattice-based data structures for deterministic parallelism. This additional read primitive takes an activation value, which prevents the read operation from completing until the variable being read's value is equal to, or strictly higher in the semilattice order.


Distribution is also important for both high-availability and fault-tolerance. In our model, we assume either replication of each variable in the data store or replications of entire applications.


Our model assumes Dynamo-style partitioning and replication of data. In this model, we use hash-space partitioning and consistent hashing to break up the hash space into a group of disjoint replication sets, each of which has a group of replicas responsible for full replication of the data within that set. This is shown in the following, Figure a:

(Click on the image to enlarge it)

 Replication of variables

When partitioning and replicating variables, we assume the client application will run outside of the cluster, or spread across a series of nodes internal or external to the cluster. Each operation, such as bind or read is turned into a request and sent across the network to the cluster responsible for managing the constraint store, and either succeeds or fails based on whether a quorum of replicas can be contacted. This is shown in Figure b.

Replication of applications

We also provide the ability to run an entire application replicated, by introducing two new primitive operations: register, to remotely load a program, and execute, to remotely execute the program.

Consider the case where a program is going to operate on data stored in one replica set: rather than run the entire application remotely and perform round-trip quorum operations against a replica set, we can push the entire application to the replica set. To execute the application and get the results, we simply can select one of the replicas results to return to the user. This is shown in Figure c.

What are the applications?

Let's look at an example of an application that requires communication between a series of clients and servers: an eventually consistent advertisement counter.

We’re going to look at Erlang code written using a library called Lasp, which implements the programing model we’ve been discussing.

Advertisement counter

Here's an example of an advertisement counter written in our prototype programming language, called Lasp, which supports the programming model discussed in this article. It is made up of two sets of coordinating processes:

  • Servers: Responsible for tracking advertisement impressions for all clients.
  • Clients: Responsible for incrementing the advertisement impressions.

In this example, we use a grow-only counter which we will refer to as a G-Counter. A grow-only counter is a counter that can handle concurrent increment options in a safe and convergent manner, but cannot track decrements.

%% @doc Client process; standard recursive looping server.
client(Id, Ads) ->
    %% Receive messages from server processes.

        %% Respond to the view advertisement message.
        view_ad ->

            %% Choose an advertisement to display; we simply choose
            %% the first item in a list.
            Ad = hd(Ads),

            %% Update ad by incrementing value; issue an update 
            %% to increment the counter. 
            {ok, _} = lasp:update(Ad, increment, Id),(increment, Id, Value),

            client(Id, tl(Ads) ++ [Ad]);

        {remove_ad, Ad} ->
            %% Remove ad.
            client(Id, Ads -- [Ad])

In this snippet, we initialize a series of clients, each of which is given the list of advertisements they are responsible for displaying to the user. These clients represent processes running at the client near the end user.

Each client process handles three things: returning the list of active advertisements, viewing advertisements, and removing advertisements. We use a simple recursive process that blocks on receiving messages to perform each of these operations.

When a request to view an advertisement arrives, we choose an advertisement to display and increment the counter for this particular advertisement.

This bind operation succeeds because in this case, the value we are pushing back to the constraint store is an inflation of the lattice; the counter is only ever going to grow.

Next, we initialize one server process per advertisement. Here’s what that code looks like:

%% @doc Server functions for the advertisement counter.
server(Ad, Clients) ->
    %% Perform a blocking read, which will only unblock 
    %% once the counter reaches at least 5.
    {ok, _, _} = lasp:read(Ad, 5),
    %% For each client, send a message telling the client
    %% to disable the advertisement from being displayed again.
    lists:map(fun(Client) ->
                %% Tell clients to remove the advertisement.
                Client ! {remove_ad, Ad}
        end, Clients),

    %% Print a message to the screen that advertisement 
    %% limit has been reached.
    io:format("Advertisement ~p reached display limit!", [Ad]).

Each of these server processes performs a threshold read against the counter for the advertisement it’s tracking; this threshold read operation will block, thereby suspending execution of the server process until the counter has reached at least five impressions.

Once the threshold has been reached, the server process will unblock and notify all clients to stop displaying the advertisement.

Where do we go from here?

Our programming model for eventually consistent computations is still very much in an early stage of development; it continues to be on-going research driven by the requirements of our industry partners and feedback from our reference implementation.

In terms of features, we have identified a series of work that we have planned to explore over the next year of development on the programming model. Some examples of this work include the following:

Causal+ consistency

What changes are needed to both the programming model and distribution model to support causal+ consistency? Is it possible for the programming model to detect when causal+ consistency is required and when a weaker consistency model will suffice given program requirements?

Different distribution models

Can we rewrite applications that operate on a particular set of data into smaller applications operating on disjoint subsets that can be executed in a parallel, fault-tolerant, manner? Is it possible to break programs up between hierarchical sets of clients transparently in the programming model, in order to support offline, and correct, operation?


We would love to hear your feedback, given a large part of our evaluation is based on whether or not the programming model makes it easier to reason about program behavior and correctness.

About the Author

Christopher Meiklejohn is a Senior Software Engineer with Basho Technologies, Inc. and a contributing member of the European research project, SyncFree. Christopher also frequently blogs about distributed systems on his blog.

Rate this Article