Conflict-Free Replicated Data Types

In this lecture, we'll define a particular class of replicated data structures known as state-based objects and formalize what it means for them to be eventually consistent and strongly eventually consistent. We'll then survey conflict-free replicated data types (CRDTs): a specific type of state-based object that is guaranteed to achieve strong eventual consistency.

State-Based Objects

In the first lecture, we looked at replicated key-value stores. In the second lecture, we looked at replicated registers. Key-value stores and registers are two specific examples of data structures that we can replicate. In this section, we'll generalize things and look at arbitrary replicated data structures known as state-based objects. We'll then introduce a way to visualize the state of a replicated state-based object over time.

A state-based object is an ordinary, plain Jane object—like the objects you've likely seen in Python or Java—with (1) some internal state, (2) a query method, (3) some number of update methods, and (4) a merge method. Understanding state-based objects is easiest with an example. Here's one written in Python:

class Average(object):
  def __init__(self):
    self.sum = 0
    self.cnt = 0

  def query(self):
    if self.cnt != 0:
      return self.sum / self.cnt
    else:
      return 0

  def update(self, x):
    self.sum += x
    self.cnt += 1

  def merge(self, avg):
    self.sum += avg.sum
    self.cnt += avg.cnt

Average is a state-based object representing a running average. Note the following:

A replicated state-based object is simply a state-based object that is replicated across multiple computers. For example, if we replicate an Average state-based object across two servers a and b (as we do below), then both a and b have a copy of the state-based object. Clients (e.g. c and d below) send query and update requests to a server's copy of the state-based object. Meanwhile, every server periodically sends its copy of the state-based object to other servers to be merged with the merge method. That looks something like this:

Animated diagrams like these help us visualize how clients and servers communicate in a distributed system, but they make it difficult to keep track of the replicated state-based object as it is queried, updated, and merged over time. To more easily visualize the replicated state-based object, we'll introduce a new type of diagram. Here's an example:

In these diagrams, snapshots of a state-based object are drawn from left to right forwards through time. Here, we consider an Average state-based object on a single server a. The initial snapshot of the state-based object is labelled a0. Server a receives an update(1) request from some client which updates the state-based object to the next snapshot a1. Server a then receives an update(3) request which updates the state-based object to the snapshot a2.

Each update request is assigned a unique identifier which is drawn below the request. For example, the update(1) request is assigned an identifier of 0, and the update(3) request is assigned an identifier of 1.

Beneath the diagram, we draw a table with one row for each snapshot of the state-based object.

Here's an example diagram which traces a state-based object replicated on two servers: a and b. Notice how the causal history of a1 is disjoint from the casual histories of b1 and b2. The update(2) and update(4) requests have not contributed to the state of a1, so their identifiers do not appear in its causal history. Conversely, the update(1) request has not contributed to the state of either b1 or b2, so it does not appear in either of their causal histories.

We can also use these diagrams to visualize merge requests. In the diagram below, server a receives an update(2) request which transitions a0 to a1. Similarly, server b receives an update(4) request which transitions b0 to b1. Then, server b sends a copy of b1 to server a. When server a receives the copy of b1, it merges b1 into a1 which creates a2.

Also notice that the causal history of a2 (i.e. {0,1}) is the union of a1's causal history (i.e. {0}) and b1's causal history (i.e. {1}). This makes intuitive sense because the both the update(2) request (id 0) and the update(4) request (id 1) contribute to the state of a2. In general, if z = x.merge(y), the causal history of z is the union of the causal histories of x and y.

Finally, recall that a server periodically sends its replica of the state-based object to other servers to be merged. Thus, our diagrams will look something like the following, where servers continually merge with one another.

(Strong) Eventual Consistency

Equipped with an understanding of replicated state-based objects and causal history, we're ready to define eventual consistency and strong eventual consistency! In this section, we'll also bolster our grasp on the two definitions by looking at four state-based objects—Average, NoMergeAverage, BMergeAverage, and SECAverage—and decide whether they are eventually consistent, strongly eventually consistent, neither, or both.

We say a replicated state-based object is eventually consistent (EC) if whenever two replicas of the state-based object have the same causal history, the eventually (not necessarily immediately) converge to the same internal state. We say a replicated state-based object is strongly eventually consistent (SEC) if whenever two snapshots of the state-based object have the same causal history, they (immediately) have the same internal state. Note that strong eventual consistency implies eventual consistency.

Average

The definitions of eventual consistency and strong eventual consistency are best understood with examples. First, let's consider the Average state-based object we defined above and ask ourselves whether or not it is eventually consistent or strongly eventually consistent. Consider the following execution (note that we've abbreviated update as u and merge as m).

Snapshots a2, b2, a3, and b3 (highlighted red) all have the same causal history {0,1}. This means that a's replica and b's replica have the same causal history. However, despite having the same causal history, the two replicas do not converge to the same internal state. In fact, they don't converge at all! As a and b periodically send merge requests to one another, the internal state of their replicas continues to change. Neither replica converges on a single value. Thus, Average is neither eventually consistent nor strongly eventually consistent.

NoMergeAverage

Next, let's look at the following NoMergeAverage state-based object which is exactly the same as Average except that its merge method does nothing at all.

class NoMergeAverage(Average):
  # __init__, query, and merge
  # inherited from Average.

  def merge(self, avg):
    # Ignore merge requests!
    pass

Unlike Average, NoMergeAverage is convergent. However (like Average), NoMergeAverage is neither eventually consistent nor strongly eventually consistent. To see why, let's take a look at the following execution:

Snapshots a2, b2, a3, and b3 (highlighted red) all have the same causal history. Moreover, both a's replica and b's replica of the state-based object converge. However, a's replica converges to an internal state {sum=2,cnt=1} while b's replica converges to an internal state {sum=4,cnt=1}. Because the two replicas converge to different internal states, NoMergeAverage is neither eventually consistent nor strongly eventually consistent.

BMergeAverage

Next, let's look at the BMergeAverage state-based object which is exactly the same as Average except for the merge function. When server b receives a merge request from server a, it throws away its replica of the state-based object and overwrites it with a's replica. On the other hand, when server a receives a merge request from server b, it ignores it.

class BMergeAverage(Average):
  # __init__, query, and merge
  # inherited from Average.

  def merge(self, avg):
    if on_server_b():
      self.sum = avg.sum
      self.cnt = avg.cnt
    else:
      # Server a ignores
      # merge requests!

BMergeAverage is eventually consistent, but not strongly eventually consistent. To see why, consider the following execution.

Snapshots b1, a1, b2, and a2 (highlighted red) all have the same causal history. Moreover, both a's replica and b's replica eventually converge to the same internal state {sum=0,cnt=0}. In fact, it's not difficult to convince yourself that this will happen in every possible execution involving a BMergeAverage state-based object. a's replica and b's replica will always eventually converge to the same internal state! Thus, BMergeAverage is eventually consistent.

However, let's take a closer look at snapshot b1 and a1. Both snapshots have the same causal history {0}, but the two snapshots have different internal states. b1's internal state is {sum=4,cnt=1}, while a1's internal state is {sum=0,cnt=0}. Thus, BMergeAverage is not strongly eventually consistent.

MaxAverage

Finally, let's look at the MaxAverage state-based object which is exactly the same as Average except that the merge function now performs a pairwise maximum of sum and cnt.

class MaxAverage(Average):
  # __init__, query, and merge
  # inherited from Average.

  def merge(self, avg):
    self.sum = max(self.sum, avg.sum)
    self.cnt = max(self.cnt, avg.cnt)

MaxAverage is both eventually consistent and strongly eventually consistent! To see why, consider the following execution.

Snapshots a2, b2, a3, and b3 (highlighted red) all have the same causal history. Better yet, they all have the same internal state too! This is no coincidence. For every execution involving a replicated MaxAverage state-based object, if two snapshots have the same causal history, they are guaranteed to have the same internal state. Thus, MaxAverage is strongly eventually consistent (and therefore eventually consistent too).

Unfortunately, even though MaxAverage is strongly eventually consistent, its semantics are a bit weird. For example, in the execution above, the effects of the update(2) request were completely overwritten by the update(4) request!

Summary

In the following table, we summarize the consistency of the four state-based objects we've covered. C stands for convergent, EC stands for eventually consistent, and SEC stands for strongly eventually consistent.

C? EC? SEC?
Average no no no
NoMergeAverage yes no no
BMergeAverage yes yes no
MaxAverage yes yes yes

CRDTs

The last section demonstrated that designing a strongly eventually consistent state-based object with intuitive semantics is challenging. In this section, we define a particular class of state-based objects known as conflict-free replicated data types (CRDTs) which are guaranteed to be strongly eventually consistent and which usually have reasonable semantics. We'll then present four CRDTs: GCounters, PN-Counters, G-Sets and 2P-Sets.

A conflict-free replicated datatype (CRDT) is a state-based object that satisfies the following properties. Let merge(x, y) be the value of x after performing x.merge(y). Similarly, let update(x, ...) the value of x after performing x.update(...).

To make things concrete, let's look at a very simple state-based object and prove it is a CRDT. The following IntMax state-based object wraps a single integer x where merge(a, b) computes the maximum of a.x and b.x.

class IntMax(object):
  def __init__(self):
    self.x = 0

  def query(self):
    return self.x

  def update(self, x):
    assert x >= 0
    self.x += x

  def merge(self, other):
    self.x = max(self.x, other.x)

We can easily verify that IntMax is a CRDT by proving that (a) merge is associative, commutative, and idempotent and that (b) update is increasing. The proofs are straightforward because the max function itself is associative, commutative, and idempotent! Note that we'll abuse notation a bit and say an IntMax object a is equal to the integer a.x it contains.

We'll now turn our attention to four more complex CRDTS: G-Counters (increment only counters); PN-Counters (increment/decrement counters); G-Sets (add only sets); and 2P-Sets (add/remove sets).

G-Counter

A G-Counter CRDT represents a replicated counter which can be added to but not subtracted from.

We can implement a G-Counter in Python as follows.

class GCounter(object):
  def __init__(self, i, n):
    self.i = i # server id
    self.n = n # number of servers
    self.xs = [0] * n

  def query(self):
    return sum(self.xs)

  def add(self, x):
    assert x >= 0
    self.xs[self.i] += x

  def merge(self, c):
    zipped = zip(self.xs, c.xs)
    self.xs = [max(x, y) for (x, y) in zipped]

If a G-Counter object is replicated on n servers, then we construct an GCounter(0, n) object on server 0, a GCounter(1, n) object on server 1, and so on. Let's take a look at an example execution in which a G-Counter is replicated on two servers where server a is server 0 and server b is server 1.

PN-Counter

A PN-Counter CRDT represents a replicated counter which can be added to and subtracted from.

We can implement a PN-Counter in Python as follows.

class PNCounter(object):
  def __init__(self, i, n):
    self.p = GCounter(i, n)
    self.n = GCounter(i, n)

  def query(self):
    return self.p.query() - self.n.query()

  def add(self, x):
    assert x >= 0
    self.p.add(x)

  def sub(self, x):
    assert x >= 0
    self.n.add(x)

  def merge(self, c):
    self.p.merge(c.p)
    self.n.merge(c.n)

As with G-Counters, if a PN-Counter object is replicated on n servers, then we construct an PNCounter(0, n) object on server 0, a PNCounter(1, n) object on server 1, and so on. Let's take a look at another example execution. For brevity, we've only listed a subset of a PN-Counter's internal state.

G-Set

A G-Set CRDT represents a replicated set which can be added to but not removed from.

Pretty simple, huh? Not surprising considering set union is commutative, associative, and idempotent. We can implement a G-Set in Python as follows.

class GSet(object):
  def __init__(self):
    self.xs = set()

  def query(self):
    return self.xs

  def add(self, x):
    self.xs.add(x)

  def merge(self, s):
    self.xs = self.xs.union(s.xs)

Let's take a look at an example execution.

2P-Set

A 2P-Set CRDT represents a replicated set which can be added to and removed from.

We can implement a 2P-Set in Python as follows.

class TwoPSet(object):
  def __init__(self):
    self.a = GSet()
    self.r = GSet()

  def query(self):
    return self.a.query() - self.r.query()

  def add(self, x):
    self.a.add(x)

  def sub(self, x):
    self.r.add(x)

  def merge(self, s):
    self.a.merge(s.a)
    self.r.merge(s.r)

Let's look at an example execution.