Conflict-Free Replicated Data Types
In this lecture, we'll define a particular class of replicated data structures known as state-based objects and formalize what it means for them to be eventually consistent and strongly eventually consistent. We'll then survey conflict-free replicated data types (CRDTs): a specific type of state-based object that is guaranteed to achieve strong eventual consistency.
State-Based Objects
In the first lecture, we looked at replicated key-value stores. In the second lecture, we looked at replicated registers. Key-value stores and registers are two specific examples of data structures that we can replicate. In this section, we'll generalize things and look at arbitrary replicated data structures known as state-based objects. We'll then introduce a way to visualize the state of a replicated state-based object over time.
A state-based object is an ordinary, plain Jane object—like the objects you've likely seen in Python or Java—with (1) some internal state, (2) a query method, (3) some number of update methods, and (4) a merge method. Understanding state-based objects is easiest with an example. Here's one written in Python:
class Average(object):
def __init__(self):
self.sum = 0
self.cnt = 0
def query(self):
if self.cnt != 0:
return self.sum / self.cnt
else:
return 0
def update(self, x):
self.sum += x
self.cnt += 1
def merge(self, avg):
self.sum += avg.sum
self.cnt += avg.cnt
Average
is a state-based object representing a running
average. Note the following:
-
Average
's internal state are the two variablesself.sum
andself.cnt
representing the sum and count of all the values being averaged. -
Average
's query method returns the average. Note thatAverage
'squery
method does not modify any internal state. This is true for every state-based object. -
Average
's single update method updates the average with a new valuex
. In general, a state-based object can have multiple update methods. -
Average
'smerge
method merges oneAverage
instance into another.
A replicated state-based object is simply a
state-based object that is replicated across multiple computers. For
example, if we replicate an Average
state-based object
across two servers a
and b
(as we do below),
then both a
and b
have a copy of the
state-based object. Clients (e.g. c
and d
below) send query
and update
requests to a
server's copy of the state-based object. Meanwhile, every server
periodically sends its copy of the state-based object to other servers to
be merged with the merge
method. That looks something like
this:
Animated diagrams like these help us visualize how clients and servers communicate in a distributed system, but they make it difficult to keep track of the replicated state-based object as it is queried, updated, and merged over time. To more easily visualize the replicated state-based object, we'll introduce a new type of diagram. Here's an example:
In these diagrams, snapshots of a state-based object are drawn from left
to right forwards through time. Here, we consider an Average
state-based object on a single server a
. The initial
snapshot of the state-based object is labelled a0
. Server
a
receives an update(1)
request from some
client which updates the state-based object to the next snapshot
a1
. Server a
then receives an
update(3)
request which updates the state-based object to
the snapshot a2
.
Each update request is assigned a unique identifier which is drawn below
the request. For example, the update(1)
request is assigned
an identifier of 0
, and the update(3)
request
is assigned an identifier of 1
.
Beneath the diagram, we draw a table with one row for each snapshot of the state-based object.
-
The first column is the internal state of the object. For
example,
a0
has a sum of0
and a count of0
(as specified byAverage
's constructor). Similarly,a1
has a sum of1
and a count of1
. -
The second column is the result of calling the snapshot's
query()
method. For example,a1.query()
returns1
becausea1.sum / a1.cnt
is equal to1
. Similarly,a2.query()
returns2
becausea2.sum / a2.cnt
is equal to2
. -
The third (and most complex) column is the causal history of the snapshot: the set of identifiers of every request that has contributed to the snapshot's internal state.
For example,
a0
hasn't been affected by any requests, so its causal history is the empty set{}
.a1
has been affected by theupdate(1)
request (id0
), soa1
's causal history is the set{0}
. Finally,a2
has been affected by theupdate(1)
request (id0
) and theupdate(3)
request (id1
), soa2
's causal history is the set{0,1}
.In general if
y = x.update()
where the update has identifieri
, then the causal history ofy
is the causal history ofx
unioned with the singleton set{i}
.
Here's an example diagram which traces a state-based object replicated on
two servers: a
and b
. Notice how the causal
history of a1
is disjoint from the casual histories of
b1
and b2
. The update(2)
and
update(4)
requests have not contributed to the state of
a1
, so their identifiers do not appear in its causal
history. Conversely, the update(1)
request has not
contributed to the state of either b1
or b2
, so
it does not appear in either of their causal histories.
We can also use these diagrams to visualize merge
requests.
In the diagram below, server a
receives an
update(2)
request which transitions a0
to
a1
. Similarly, server b
receives an
update(4)
request which transitions b0
to
b1
. Then, server b
sends a copy of
b1
to server a
. When server a
receives the copy of b1
, it merges b1
into
a1
which creates a2
.
Also notice that the causal history of a2
(i.e.
{0,1}
) is the union of a1
's causal history
(i.e. {0}
) and b1
's causal history (i.e.
{1}
). This makes intuitive sense because the both the
update(2)
request (id 0
) and the
update(4)
request (id 1
) contribute to the
state of a2
. In general, if z = x.merge(y)
,
the causal history of z
is the union of the causal histories
of x
and y
.
Finally, recall that a server periodically sends its replica of the state-based object to other servers to be merged. Thus, our diagrams will look something like the following, where servers continually merge with one another.
(Strong) Eventual Consistency
Equipped with an understanding of replicated state-based objects and
causal history, we're ready to define eventual consistency and strong
eventual consistency! In this section, we'll also bolster our grasp on
the two definitions by looking at four state-based
objects—Average
, NoMergeAverage
,
BMergeAverage
, and SECAverage
—and decide
whether they are eventually consistent, strongly eventually consistent,
neither, or both.
We say a replicated state-based object is eventually consistent (EC) if whenever two replicas of the state-based object have the same causal history, the eventually (not necessarily immediately) converge to the same internal state. We say a replicated state-based object is strongly eventually consistent (SEC) if whenever two snapshots of the state-based object have the same causal history, they (immediately) have the same internal state. Note that strong eventual consistency implies eventual consistency.
Average
The definitions of eventual consistency and strong eventual consistency
are best understood with examples. First, let's consider the
Average
state-based object we defined above and ask
ourselves whether or not it is eventually consistent or strongly
eventually consistent. Consider the following execution (note that we've
abbreviated update
as u
and merge
as m
).
Snapshots a2
, b2
, a3
, and
b3
(highlighted red) all have the same causal history
{0,1}
. This means that a
's replica and
b
's replica have the same causal history. However, despite
having the same causal history, the two replicas do not converge to the
same internal state. In fact, they don't converge at all! As
a
and b
periodically send merge requests to one
another, the internal state of their replicas continues to change.
Neither replica converges on a single value. Thus,
Average
is neither eventually consistent nor strongly
eventually consistent.
NoMergeAverage
Next, let's look at the following NoMergeAverage
state-based object which is exactly the same as Average
except that its merge
method does nothing at all.
class NoMergeAverage(Average):
# __init__, query, and merge
# inherited from Average.
def merge(self, avg):
# Ignore merge requests!
pass
Unlike Average
, NoMergeAverage
is convergent.
However (like Average
), NoMergeAverage
is neither eventually consistent nor strongly eventually
consistent. To see why, let's take a look at the following
execution:
Snapshots a2
, b2
, a3
, and
b3
(highlighted red) all have the same causal history.
Moreover, both a
's replica and b
's replica of
the state-based object converge. However, a
's replica
converges to an internal state {sum=2,cnt=1}
while
b
's replica converges to an internal state
{sum=4,cnt=1}
. Because the two replicas converge to
different internal states, NoMergeAverage
is
neither eventually consistent nor strongly eventually consistent.
BMergeAverage
Next, let's look at the BMergeAverage
state-based object
which is exactly the same as Average
except for the
merge
function. When server b
receives a merge
request from server a
, it throws away its replica of the
state-based object and overwrites it with a
's replica. On
the other hand, when server a
receives a merge request from
server b
, it ignores it.
class BMergeAverage(Average):
# __init__, query, and merge
# inherited from Average.
def merge(self, avg):
if on_server_b():
self.sum = avg.sum
self.cnt = avg.cnt
else:
# Server a ignores
# merge requests!
BMergeAverage
is eventually consistent, but not
strongly eventually consistent. To see why, consider the
following execution.
Snapshots b1
, a1
, b2
, and
a2
(highlighted red) all have the same causal history.
Moreover, both a
's replica and b
's replica
eventually converge to the same internal state {sum=0,cnt=0}
.
In fact, it's not difficult to convince yourself that this will happen in
every possible execution involving a BMergeAverage
state-based object. a
's replica and b
's replica
will always eventually converge to the same internal state! Thus,
BMergeAverage
is eventually consistent.
However, let's take a closer look at snapshot b1
and
a1
. Both snapshots have the same causal history
{0}
, but the two snapshots have different internal states.
b1
's internal state is {sum=4,cnt=1}
, while
a1
's internal state is {sum=0,cnt=0}
. Thus,
BMergeAverage
is not strongly eventually consistent.
MaxAverage
Finally, let's look at the MaxAverage
state-based object
which is exactly the same as Average
except that the
merge
function now performs a pairwise maximum of
sum
and cnt
.
class MaxAverage(Average):
# __init__, query, and merge
# inherited from Average.
def merge(self, avg):
self.sum = max(self.sum, avg.sum)
self.cnt = max(self.cnt, avg.cnt)
MaxAverage
is both eventually consistent and
strongly eventually consistent! To see why, consider the
following execution.
Snapshots a2
, b2
, a3
, and
b3
(highlighted red) all have the same causal history.
Better yet, they all have the same internal state too! This is no
coincidence. For every execution involving a replicated
MaxAverage
state-based object, if two snapshots have the
same causal history, they are guaranteed to have the same internal state.
Thus, MaxAverage
is strongly eventually consistent (and
therefore eventually consistent too).
Unfortunately, even though MaxAverage
is strongly eventually
consistent, its semantics are a bit weird. For example, in the execution
above, the effects of the update(2)
request were completely
overwritten by the update(4)
request!
Summary
In the following table, we summarize the consistency of the four state-based objects we've covered. C stands for convergent, EC stands for eventually consistent, and SEC stands for strongly eventually consistent.
C? | EC? | SEC? | |
---|---|---|---|
Average |
no | no | no |
NoMergeAverage |
yes | no | no |
BMergeAverage | yes | yes | no |
MaxAverage | yes | yes | yes |
CRDTs
The last section demonstrated that designing a strongly eventually consistent state-based object with intuitive semantics is challenging. In this section, we define a particular class of state-based objects known as conflict-free replicated data types (CRDTs) which are guaranteed to be strongly eventually consistent and which usually have reasonable semantics. We'll then present four CRDTs: GCounters, PN-Counters, G-Sets and 2P-Sets.
A conflict-free replicated datatype (CRDT) is a
state-based object that satisfies the following properties. Let
merge(x, y)
be the value of x
after performing
x.merge(y)
. Similarly, let update(x, ...)
the
value of x
after performing x.update(...)
.
-
The
merge
method is associative. This means that for any three state-based objectsx
,y
, andz
,merge(merge(x, y), z)
is equal tomerge(x, merge(y, z))
. -
The
merge
method is commutative. This means that for any two state-based objects,x
andy
,merge(x, y)
is equal tomerge(y, x)
. -
The
merge
method is idempotent. This means that for any state-based objectx
,merge(x, x)
is equal tox
. -
Every update method is increasing. Let
x
be an arbitrary state-based object and lety = update(x, ...)
be the result of applying an arbitrary updateupdate
tox
. Theupdate
method is increasing ifmerge(x, y)
is equal toy
.
To make things concrete, let's look at a very simple state-based object
and prove it is a CRDT. The following IntMax
state-based
object wraps a single integer x
where merge(a,
b)
computes the maximum of a.x
and b.x
.
class IntMax(object):
def __init__(self):
self.x = 0
def query(self):
return self.x
def update(self, x):
assert x >= 0
self.x += x
def merge(self, other):
self.x = max(self.x, other.x)
We can easily verify that IntMax
is a CRDT by proving that
(a) merge
is associative, commutative, and
idempotent and that (b) update
is
increasing. The proofs are straightforward because the
max
function itself is associative, commutative, and
idempotent! Note that we'll abuse notation a bit and say an
IntMax
object a
is equal to the integer
a.x
it contains.
-
First, we prove that
merge
is associative.
merge(merge(a, b), c) = max(max(a.x, b.x), c.x) = max(a.x, max(b.x, c.x)) = merge(a, merge(b, c))
-
Next, we prove that
merge
is commutative.
merge(a, b) = max(a.x, b.x) = max(b.x, a.x) = merge(b, a)
-
Then, we prove that
merge
is idempotent.
merge(a, a) = max(a.x, a.x) = a.x = a
-
Finally, we prove that
update
is increasing.
merge(a, update(a, x)) = max(a.x, a.x + x) = a.x + x = update(a, x)
We'll now turn our attention to four more complex CRDTS: G-Counters (increment only counters); PN-Counters (increment/decrement counters); G-Sets (add only sets); and 2P-Sets (add/remove sets).
G-Counter
A G-Counter CRDT represents a replicated counter which can be added to but not subtracted from.
- The internal state of a G-Counter replicated on n machines is n-length array of non-negative integers.
-
The
query
method returns the sum of every element in the n-length array. -
The
add(x)
update method, when invoked on the ith server, increments the ith entry of the n-length array byx
. For example, server 0 will increment the 0th entry of the array, server 1 will increment the 1st entry of the array, and so on. -
The
merge
method performs a pairwise maximum of the two arrays.
We can implement a G-Counter in Python as follows.
class GCounter(object):
def __init__(self, i, n):
self.i = i # server id
self.n = n # number of servers
self.xs = [0] * n
def query(self):
return sum(self.xs)
def add(self, x):
assert x >= 0
self.xs[self.i] += x
def merge(self, c):
zipped = zip(self.xs, c.xs)
self.xs = [max(x, y) for (x, y) in zipped]
If a G-Counter object is replicated on n servers, then we construct an
GCounter(0, n)
object on server 0, a GCounter(1,
n)
object on server 1, and so on.
Let's take a look at an example execution in which a G-Counter is
replicated on two servers where server a
is server 0 and
server b
is server 1.
PN-Counter
A PN-Counter CRDT represents a replicated counter which can be added to and subtracted from.
-
The internal state of a PN-Counter is a pair of two G-Counters
named
p
andn
.p
represents the total value added to the PN-Counter whilen
represents the total value subtracted from the PN-Counter. -
The
query
method returns the differencep.query() - n.query()
. -
The
add(x)
method (the first of the two update methods) invokesp.add(x)
. -
The
sub(x)
method (the second of the two update methods) invokesn.add(x)
. -
The
merge
method performs a pairwise merge ofp
andn
.
We can implement a PN-Counter in Python as follows.
class PNCounter(object):
def __init__(self, i, n):
self.p = GCounter(i, n)
self.n = GCounter(i, n)
def query(self):
return self.p.query() - self.n.query()
def add(self, x):
assert x >= 0
self.p.add(x)
def sub(self, x):
assert x >= 0
self.n.add(x)
def merge(self, c):
self.p.merge(c.p)
self.n.merge(c.n)
As with G-Counters, if a PN-Counter object is replicated on n servers,
then we construct an PNCounter(0, n)
object on server 0, a
PNCounter(1, n)
object on server 1, and so on. Let's take a
look at another example execution. For brevity, we've only listed a
subset of a PN-Counter's internal state.
G-Set
A G-Set CRDT represents a replicated set which can be added to but not removed from.
- The internal state of a G-Set is just a set!
- The
query
method returns the set. - The
add(x)
update method addsx
to the set. - The
merge
method performs a set union.
Pretty simple, huh? Not surprising considering set union is commutative, associative, and idempotent. We can implement a G-Set in Python as follows.
class GSet(object):
def __init__(self):
self.xs = set()
def query(self):
return self.xs
def add(self, x):
self.xs.add(x)
def merge(self, s):
self.xs = self.xs.union(s.xs)
Let's take a look at an example execution.
2P-Set
A 2P-Set CRDT represents a replicated set which can be added to and removed from.
-
The internal state of a 2P-Set is a pair of two G-Sets named
a
andr
.a
represents the set of values added to the 2P-Set whiler
represents the set of values removed from the 2P-Set. -
The
query
method returns the set differencea.query() - r.query()
. -
The
add(x)
method (the first of the two update methods) invokesa.add(x)
. -
The
sub(x)
method (the second of the two update methods) invokesr.add(x)
. -
The
merge
method performs a pairwise merge ofa
andr
.
We can implement a 2P-Set in Python as follows.
class TwoPSet(object):
def __init__(self):
self.a = GSet()
self.r = GSet()
def query(self):
return self.a.query() - self.r.query()
def add(self, x):
self.a.add(x)
def sub(self, x):
self.r.add(x)
def merge(self, s):
self.a.merge(s.a)
self.r.merge(s.r)
Let's look at an example execution.