Conflict-Free Replicated Data Types - flags, graphs and maps

Versions: Scala 2.12.3, Akka Distributed Data 2.5.11

Previous post about the Conflict-free Replicated Data Types presented some of basic structures of this type. This one will describe some of recently uncovered types such as: flags, graphs and arrays.

Data Engineering Design Patterns

Looking for a book that defines and solves most common data engineering problems? I'm currently writing one on that topic and the first chapters are already available in πŸ‘‰ Early Release on the O'Reilly platform

I also help solve your data engineering problems πŸ‘‰ contact@waitingforcode.com πŸ“©

The post is divided in 4 sections. The first one describes the flags in the context of CRDT. The next one is about the graphs that can also benefit from CRDT convergence properties. The final part describes the maps. Each of the sections contains some code examples for described data type.

CRDT: flags

Before talking about the CRDT and flags, let's begin by describing the flags shortly. The flag is a data structure holding a boolean value. The value represents disabled or enabled state. In the context of CRDT, the flags are not commutative since (enable, disable) and (disable, enable) doesn't mean the same.

Because of the not-commutative character we distinguish 2 types of flags:

Illustrating the flags is quite straightforward:

class FlagCrdtTest extends FlatSpec with Matchers {

  "flag" should "be always merged to true" in {
    val trueFlagNode1 = Flag.empty.switchOn
    val falseFlagNode2 = Flag.empty

    val mergedFlag = falseFlagNode2.merge(trueFlagNode1)

    mergedFlag.enabled shouldEqual true
  }

}

CRDT: graphs

The use of the CRDT can also be retrieved in the graphs. As a reminder, the graph is composed of vertice - edge pairs. This relationship makes that the operations on vertices and edges can't be independent: an edge can be added only if the corresponding vertex exists and a vertex can be removed only if it doesn't have any edges. When these 2 operations are called concurrently, 3 scenarios are expected:

  1. Giving a precedence for removals - all edges are removed as a side effect
  2. Giving a precedence for adds
  3. Delaying the execution of removal after the execution of all add operations (drawback: synchronization is expected)

To deal with these problems easier, the CRDT proposes 3 graph-like data structures:

The graphs are not present in Akka Distributed Data. It's why we'll focus here on a simple implementation for RGA and 2P2P-Graph:

/**
  * CRDT graph tests. The implementation code was inspired from the specifications defined in
  * "A comprehensive study of Convergent and Commutative Replicated Data Types"
  * that can be found here https://hal.inria.fr/file/index/docid/555588/filename/techreport.pdf
  */
class GraphsCrdtTest extends FlatSpec with Matchers {

  "RGA" should "create correct graph even if the events arrive in disorder" in {
    val rga = new ReplicatedGrowableArray()
    rga.addRight(RgaId("sentinel_left", -1L), RgaId("A", 1L))
    rga.addRight(RgaId("A", 1L), RgaId("C", 3L))
    rga.addRight(RgaId("A", 1L), RgaId("B", 2L))

    val mappedRga = rga.getGraph.map(rgaId => rgaId.id)

    mappedRga shouldEqual Seq("sentinel_left", "A", "C", "B", "sentinel_right")
  }

  "RGA" should "create correct graph if the events arrive in order" in {
    val rga = new ReplicatedGrowableArray()
    rga.addRight(RgaId("sentinel_left", -1L), RgaId("A", 1L))
    rga.addRight(RgaId("A", 1L), RgaId("B", 2L))
    rga.addRight(RgaId("A", 1L), RgaId("C", 3L))

    val mappedRga = rga.getGraph.map(rgaId => rgaId.id)

    mappedRga shouldEqual Seq("sentinel_left", "A", "C", "B", "sentinel_right")
  }

  "the 2P2P Graph" should "only mark elements as removed and keep them in the map" in {
    val graph2P2P = new TwoPhaseTwoPhaseGraph
    graph2P2P.addVertex("A")
    graph2P2P.addVertex("B")
    graph2P2P.addVertex("C")
    graph2P2P.addEdge("A", "B")
    graph2P2P.addEdge("B", "C")
    graph2P2P.removeEdge("A", "B")

    val graphEdges = graph2P2P.getEdges

    graphEdges should have size 1
    graphEdges should contain only(("B", "C"))
    graph2P2P.getAddedEdges should have size 2
    graph2P2P.getAddedEdges should contain allOf(("A", "B"), ("B", "C"))
    graph2P2P.getRemovedEdges should have size 1
    graph2P2P.getRemovedEdges should contain only(("A", "B"))
  }

}

class TwoPhaseTwoPhaseGraph {

  // Normally we'd use a 2PSet instead of Set but it doesn't exist in the Akka Distributed Data
  // and since the code is only for illustration purposes, it's easier to use a simple Set
  private val verticesAdded = mutable.Set[String]()
  private val verticesRemoved = mutable.Set[String]()
  private val edgesAdded = mutable.Set[(String, String)]()
  private val edgesRemoved = mutable.Set[(String, String)]()

  def getEdges: mutable.Set[(String, String)] = {
    edgesAdded.filter(edge => !edgesRemoved.contains(edge))
  }

  def getAddedEdges: mutable.Set[(String, String)] = edgesAdded

  def getRemovedEdges: mutable.Set[(String, String)] = edgesRemoved

  def addVertex(vertex: String) = verticesAdded.add(vertex)

  def addEdge(vertexSource: String, vertexTarget: String): Unit = {
    assert(checkIfVertexExists(vertexSource), "Source vertex must exist in the graph")
    assert(checkIfVertexExists(vertexTarget), "Target vertex must exist in the graph")

    edgesAdded.add((vertexSource, vertexTarget))
  }

  private def checkIfVertexExists(vertex: String): Boolean = {
    verticesAdded.contains(vertex) && !verticesRemoved.contains(vertex)
  }

  def removeEdge(vertexSource: String, vertexTarget: String): Unit = {
    val edgeToRemove = (vertexSource, vertexTarget)
    assert(edgesAdded.contains(edgeToRemove), "The removed edge must exist")
    edgesRemoved.add(edgeToRemove)
  }
}

class ReplicatedGrowableArray {

  private val graph = mutable.ListBuffer[RgaId]()
  graph.append(RgaId("sentinel_left",  -1L))
  graph.append(RgaId("sentinel_right", 0L))

  def getGraph: mutable.ListBuffer[RgaId] = graph

  def addRight(sourceVertex: RgaId, newVertex: RgaId): Unit = {
    // As it's only for illustration purposes, it's simplified version where no downstream operation
    // is propagated, i.e. we suppose to deal with only 1 node.
    var sourceWasFound = false
    var index = 0
    while (!sourceWasFound) {
      val currentGraId = graph(index)
      if (currentGraId == sourceVertex) {
        var youngerSuccessorFound = false
        for (indexSuccessors <- index+1 until graph.size if !youngerSuccessorFound) {
          val successorVertex = graph(indexSuccessors)
          if (newVertex.timestamp > successorVertex.timestamp) {
            graph.insert(indexSuccessors, newVertex)
            youngerSuccessorFound = true
          }
        }
        sourceWasFound = true
      }
      index += 1
    }

  }


}

case class RgaId(id: String, timestamp: Long)

CRDT: maps

A map doesn't change its specificities in the context of CRDTs. As in the case of standalone applications, it's composed of key - value pairs. Among the CRDT maps we can distinguish:

Below you can find some test cases for CRDT maps with the use of Akka Distributed Data module:

class MapCrdtTest extends FlatSpec with Matchers {

  implicit val system = ActorSystem("responses-learner-service")
  private val Node1 = Cluster(system)
  private val Node2 = Cluster(system)


  "LWW Map" should "take last written value for a key" in {
    val key = "was_read"
    val lwwMapNode1 = LWWMap.empty.put(Node1, key, true)
    val lwwMapNode2 = LWWMap.empty.put(Node2, key, false)

    val mergedLwwMap = lwwMapNode1.merge(lwwMapNode2)

    // false is written after true, so it's the most recent one
    mergedLwwMap.get(key).get shouldBe false
  }

  "OR Map" should "merge concurrent updates" in {
    val orMapNode2WithA = ORMap.empty.put(Node2, "visits_count", GCounter.empty.increment(Node1, 10))
    val orMapNode1WithA = ORMap.empty.put(Node2, "visits_count", GCounter.empty.increment(Node2, 5))

    val mergedOrMap = orMapNode2WithA.merge(orMapNode1WithA)

    // The ORMap value type is ReplicatedData so the result of the merge relies on the merge
    // method implementation of underlying type
    mergedOrMap.get("visits_count").get.getValue.intValue() shouldEqual 10
  }


}

CRDT are not limited to only simple data types. As shown in this post, they also apply to more complicated structures as graphs and maps. However in some cases the number of supported operations is restricted. It's the case of monotonic DAG which doesn't work well with the removals. Other graph structures solve this problem but in the other side the solution based on tombstones can lead to problem spaces (and the purge would require the synchronisation mechanism). It's much simpler for the case of flags where the true flags are always taken in precedence. Similar strategy is used to LWW Map where the last value is always kept. Other maps, as GMap, follows the same principle as monotonic DAG and they don't support removals.