Pessimistic replication requires a synchronous communication between the main node writing the data and the replicas. However in some cases the optimistic replication can be more efficient and still guarantee the same final result. One of solutions from this category are conflict-free replicated data types.
Data Engineering Design Patterns
Looking for a book that defines and solves most common data engineering problems? I'm currently writing
one on that topic and the first chapters are already available in π
Early Release on the O'Reilly platform
I also help solve your data engineering problems π contact@waitingforcode.com π©
This post focuses on the Conflict-free Replicated Data Types (CRDT), implemented among others in Redis and Riak. However, the article begins with the explanation of the difference between pessimistic and optimistic replications and why we can use the CRDTs in the latter one. The next part shows this data structure more in details by listing some of popular types (counters, registers and set). The last section shows them in some learning tests written with the help of Akka's Distributed Data library.
Conflict-free replicated data type and optimistic replication
In distributed systems the replication is one of possible sources of the data inconsistencies. When a network partition occurs one or more cluster nodes become unreachable. Thus, during this time they don't receive any updates and the data they store can diverge from the data stored on reachable nodes. Even though the system is globally available, its whole state becomes inconsistent.One of strategies helping to deal with this kind of issues is pessimistic replication.It means that the given state change is confirmed only when all required replicas acknowledge the write of modified data. However as you can imagine, it has a big impact on the latency since the operation is not considered successful as long as there are some of nodes not acknowledging it.
Another approach to deal with the replication issues is the optimistic replication. Here the replication happens in asynchronous manner, i.e. the main writer doesn't need to wait all replicas before confirming the operation. The inconsistencies are quite naturally "allowed" and, if they exist, they're resolved later by the merge operation. An example of such situation can be a boolean flag defined as true when one specific event happened at least once in the past. Thus, when in some replicas this flag is false and in the others true, the conflict can be easily resolved by putting the false flags to true since once happened event can't be discarded.
One of techniques supporting the optimistic replication are Conflict-free Replicated Data Types. As you can deduce from the name, it's a data structure that can be safely (without conflicts) replicated across different nodes in the cluster. The safety is guaranteed by the fact that all conflicts can be mathematically merged or resolved. One of properties guaranteeing it is the commutativity. It means the order-insensitive arguments of the operations, i.e. f(a, b) = f(b, a). We'll see that more in details in the next section where specific data structures are covered.
Conflict-free Replicated Data Types examples
Two different approaches define the CRDTs. The first one is state-based and it creates Convergent Replicated Data-Types (CvRDT). It represents the situations where the whole changed object is sent to the replicas. In the execution flow, the first step consists on updating the local state. After that the new object is sent to the remaining replicas that merge their local state with the one coming from the remote call.
The second approach is operation-based and it creates Commutative Replicated Data-Types (CmRDT). The name comes from the method used to propagate the updates. Instead of sending the whole new object this approach sends the operation constructing it. The replicas receiving this message execute the operation and create by that an updated object. However as you can imagine, because of network issues, the operations can arrive to different replicas in different order. For instance the replica#1 can receive 2 sample operations in order (op1, op2) while the replica#2 in reverse order. Because of that this approach can be applied only to the commutative updates.
In more fine-grained level we can distinguish the following data types:
- Counters - a counter is a data structure supporting 3 types of operations: increment, decrement and get value. Some specific counters exist in CRDT:
- operation-based counter - it's an example of CmRDT because concurrent increment and decrement executions commute: 2 increments or decrements are interchangeable and decrement + increment or increment + decrement makes no effect.
- Grow-only Counter (G-Counter) (aka state-based increment-only counter) - it's inspired by the technique coming from vector clocks and it supports only increment operation (= hence grow-only). Each node in the cluster can be identified by an integer value from 0 to cluster size - 1 (let's call it n). Next, an array of n elements is created on every node. The initial values for all entries set to 0. Now, when the increment operation is executed, the replica executing it retrieves its place in the array and increment the identified number locally. At the end, the merge operation takes the max values for each array place and returns the sum of all merged values.
Concretely speaking for 3 nodes, each of them will have an initial array: {node#1: [0, 0, 0], node#2: [0, 0, 0], node#3: [0, 0, 0]}. First, let's say the node#1 is called. It increments the first entry of the array. After the update the array will be [1, 0, 0]. If it happens again on the same node, the array will be [2, 0, 0]. If it happens now on node#3, the array will be [0, 0, 1]. Now when the get value operation is called, the merge will construct the [2, 0, 1] array giving the sum (value) of 3. - State-based PN Counter - the previous counter is called increment-only not by mistake. During the merge it always takes the max value and the decrement operation doesn't have any effect. To handle this issue, the PN Counter is proposed. It uses 2 G-Counters: one for increments and another one for decrements. Each increment and decrement operation is executed locally, as in the case of G-Counter. The difference is in the value computation where:
- the increment and decrement counters always take the max from compared nodes (same as G-Counter)
- later the get value operation subtracts the sum of increments from the sum of decrements. For instance if after the merge we get 2 arrays: [0, 1, 5] (increments) and [1, 2, 2] (decrements), then the value for the counter will be 6 - 5 = 1
- Registers - globally the register is a cell storing an object. Basically it supports 2 operations: assign (updates the value) and get (retrieves the value). Thus it's based on sequential semantic where later writes replace the newer ones. This detail makes that the registers work perfectly in non-concurrent environment. However, when several different writers exist, it becomes more tricky. It's why the CRDT also includes some of safe registers:
- Last-Writer-Wins Register (LWW-Register) - in this data structure the last write always wins. That said, if 2 data sets diverge, the data set with most recent values is always kept. For instance let's suppose we send a value "A" to replica#1 and 2 seconds later the value "B" to replica#2. Because of some network problems, the "B" is not replicated in replica#1 node. When this divergence is detected, the LWW strategy will then automatically assign the "B" value to the replica#1. More specifically we can divide LWW-Register in 2 subcategories:
- State-based LWW-Register - this register works almost as in the above example. When the assign operation is called, the new state is saved to the register. The state's freshness is detected by a timestamp representing the state generation time at the replica node's level. If a conflict is detected, the value with greater timestamp is kept.
- Operation-based LWW-Register - here the logic is still similar. The difference comes from the fact that the generation timestamp is not defined inside the register but at the value source's level. It means that the source sends a pair of (value, generation_timestamp) and the register overrides its value only when the sent timestamp is greater than the timestamp locally stored.
- Multi-Value Register (MV-Register) - it's another approach to deal with registers. Here the timestamps are replaced by the (value, versionVector) pairs. Also this register type lets the client side to deal with the conflicts. It's translated by 2 things:
- at the moment of writing the versionVector is increased to "dominate" the previously generate one
- the merge operations takes all elements for both input sets. In some conflict occurs (the item defined in both datasets), only the dominating values are taken. For instance for these 2 conflicting registers {("A", 1), ("B", 1)}, {("B", 2)} the merge operation will produce {("A", 1), ("B", 2)}
- Sets - by default the sets are not commutative since (add(x), remove(x)) and (remove(x), add(x)) operations won't lead to the same result. However, some safer set data structures exist in CRDT:
- Grow-Only Set (G-Set) - from the example above we can figure out that the set isn't a commutative data structure because of the remove(x) operation. Thus the simplest remedy to this problem is to not have the remove() operation at all. And G-Set is the structure allowing only adding new elements into the set.
- Two-Phase set (2P-Set) - it's a combination of 2 G-Sets where one stores added items and another removed ones (the latter called tombstone set). In 2P-Set the items can be added and removed. However, given element can be added only once - only when it was not removed before (aka remove-wins semantic):
- State-based - for add and remove operations we can distinguish 2 different requirements. For the add the set checks if the element wasn't already added to the set with added items. If not, the operation is allowed. For the removal the operation begins by checking if the item was added but not removed. If it's the case, the remove is executed. The remove always happens-before the addition so even if both operations arrive in disorder, the item will always be considered as removed.
- Operation-based - here the assumption is that concurrent operations on the same element commute and that the remove operation always happens after the addition's one. It proves that this data type is still a CRDT.
- Last-Write-Wins-Element-Set (LWW-element-Set) - it's a variance of 2P-Set where 2 sets still exist (one with adds, other with removals). The difference are the format of stored data and the lack of the remove-wins semantic. It means that an element can be added so many times as wanted. Each element is accompanied by a timestamp. This timestamp is used to define the precedence. For instance if the operations (remove(3, x), add(1, x)) are sent in disorder, the x will be considered as removed since the remove event happens after the add (timestamp 3 > timestamp 1). Thus an element is considered to be in the set if it's present in the set of adds and is absent in the set of removals with higher timestamp.
- PN-Set - similar to the above one except the fact that the timestamp is replaced by the counter. At the beginning the counter of given set's item is 0. Each add operation increments the counter. In the other side the remove operation is translated by the counter's decrementation. At the end given element is considered to be in the set only when the counter is strictly positive value.
- Observed-Removed Set (OR Set) - here multiple add and remove operations are still allowed and their order is defined by the causal history, i.e. the add always happens before the remove. It uses similar principle to the LWW-element set. The difference is that it assigns unique tags instead of the timestamp to each add and remove operation. Each of these operations has a separated set storing the generated tags. An element is considered to be in the OR Set when the set of adds less the set of removals is not empty. In big steps the algorithm looks like:
- add(e) - generate a unique tag ("tag1") and add it to the adds set: (e, "tag1"). Propagate that to the replicas
- remove(e) - take the tag generated for add ("tag1") and add it to the removal set: (e, "tag1")
- at this moment the subtraction of both sets gives an empty set, so the e element doesn't exist
- but now let's suppose that it diverged and we end up with adds set of node#1 equal to (e, "tag1") and removal one of node#2 equal to (e, "tag3"). The merge operation is expressed as a union of differences between:
the add set of node 1 with the removal set of node 2 and the add set of node 2 with the removal set of node 1. In our case this difference leads to the situation where the element e is in the merged set (the removal set for node 1 the adds set for node 2 are both empty) Since in the case of conflicting adds and removals (= operations on 2 different nodes) the add always wins, we consider here that the element e is in the set
CRDT examples
To stay in the blog's leitmotiv we'll test the above tellings with Akka Distributed Data library that provides some of presented CRDTs:
"PN Counter" should "take the max value during the merge" in { val countNode1 = PNCounter.empty.increment(Node1, 1) val countNode1Decremented = countNode1.decrement(Node1, 3) val countNode2 = PNCounter.empty.increment(Node2, 10) val countNode2Decremented = countNode2.decrement(Node1, 2) val finalCounter = countNode1Decremented.merge(countNode2Decremented) // Since it always takes the max for modified nodes finalCounter.getValue.intValue() shouldEqual(7) } "G Counter" should "take the max value from 2 nodes" in { val countNode1 = GCounter.empty.increment(Node1, 10) val countNode2 = GCounter.empty.increment(Node2, 5) val finalCounter = countNode1.merge(countNode2) finalCounter.getValue.intValue() shouldEqual(10) } "G Set" should "allow only addition operations" in { val gSetNode1 = GSet.create() + "a" + "b" + "c" val gSetNode2 = GSet.create() + "a" + "b" + "d" val mergedGSet = gSetNode1.merge(gSetNode2) mergedGSet.getElements() should contain allOf("a", "b", "c", "d") } "LWW register" should "make the last writes win" in { val clockNode1Value = new akka.cluster.ddata.LWWRegister.Clock[String] { override def apply(current: Long, value: String): Long = 100 } val clockNode2Value = new akka.cluster.ddata.LWWRegister.Clock[String] { override def apply(current: Long, value: String): Long = 105 } val lwwRegister1 = LWWRegister("A")(Node1, clockNode1Value) val lwwRegister2 = LWWRegister("B")(Node2, clockNode2Value) // Greater value returned by the clock, better val mergedRegister = lwwRegister1.merge(lwwRegister2) mergedRegister.getValue() shouldEqual("B") } "OR Set" should "show add-wins semantic" in { var oRSetNode1 = ORSet.create[String]() oRSetNode1 = oRSetNode1.add(Node1, "a") oRSetNode1 = oRSetNode1.add(Node1, "a") oRSetNode1 = oRSetNode1.remove(Node1, "a") oRSetNode1 = oRSetNode1.remove(Node1, "a") val oRSetNode2= oRSetNode1.add(Node2, "a") oRSetNode1 = oRSetNode1.remove(Node1, "a") val mergedSet = oRSetNode1.merge(oRSetNode2) mergedSet.getElements() should have size 1 mergedSet.getElements().iterator().next() shouldEqual("a") }
The synchronized replication reduces the write performances but in the counterpart it guarantees strong data consistency. However in some cases the strong consistency is not required and, thanks to CRDTs, the divergences can be easily merged without a big lost of consistency. The first part explained this aspect better while the second one shown a lot of available CRDT types. Among them we could met some simple structures as counter and more complex ones as sets or registers. All of them are based on different concepts. A part of them use last-like strategy where the last operation wins. The other ones simplify given problem by, for instance, accepting only one specific type of operation (e.g. only increment or adding new elements). Some of examples showing most of these data structures were added in the last section with the help of Akka Distributed Data library.
- Last-Writer-Wins Register (LWW-Register) - in this data structure the last write always wins. That said, if 2 data sets diverge, the data set with most recent values is always kept. For instance let's suppose we send a value "A" to replica#1 and 2 seconds later the value "B" to replica#2. Because of some network problems, the "B" is not replicated in replica#1 node. When this divergence is detected, the LWW strategy will then automatically assign the "B" value to the replica#1. More specifically we can divide LWW-Register in 2 subcategories: