Akka and Distributed Data module

on waitingforcode.com

Akka and Distributed Data module

Recently we discovered a lot of topics related to the eventual consistency (ACID 2.0, CRDT, CALM...). Some of these posts had the code examples using the Akka distributed data module that will be covered below.

The post is divided in 3 main parts. The first one explains the context of this module. The second part focuses on the API details. The last section shows some simple use cases of the Akka distributed module with the goal to give a pragmatic input about the CRDT and not to recall the Conflict-free Replicated Data Types (CRDT) specificity presented in previous posts.

The distributed data and Akka

The Akka Distributed Data module is a replicated in-memory data store providing common CRDT implementations as counters, maps, sets or registers. The data stored in the store is shared between Akka Cluster nodes and may be replicated asynchronously throughout Gossip protocol or direct replication.

The module benefits from the advantages provided by the CRDT. Thanks to the monotonic merge function any conflicted state occured for instance because of a network partition can be resolved. Because of the link with the CRDT, the Distributed Data is also related to the eventual consistency. Thus, it provides both high read and write availability, and low latency. In the counterpart it may sometimes return an out-of-date value that will be updated thought after the system regains responsivness.

API details

As told, Distributed Data module already provides some of common CRDT types. However we aren't limited to them. To extend the list of available options we can define our own CRDT type through the implementation of the akka.cluster.ddata.ReplicatedData trait. The implementation must provide the logic for merge(that: T): T that is the function used to resolve the value for given entry. It's important to emphasize that this function must be monotonic (function either entirely nonincreasing or nondecreasing). Another important requirement is the immutability - every modification should return a new instance. An example of the immutability can be PNCounter's copy method called at increment and decrement operations:

private def copy(increments: GCounter = this.increments, decrements: GCounter = this.decrements): PNCounter =
    new PNCounter(increments, decrements)

It exists also a special type of CRDT called delta-CRDT. It's a special kind of CRDT represented by the implementations of akka.cluster.ddata.DeltaReplicatedData trait. Its specificity comes from the fact that it replicates only the modified elements. For instance if 2 new values are added to a set, only the information about the addition of 2 new elements is dispatched to the replicas and instead of the whole set.

But the CRDT type alone wouldn't be enough. It must be integrated with Akka Cluster. The interaction with the distributed data is done by the akka.cluster.ddata.Replicator actor. It must be started on each node of the cluster or on the group of nodes having a specific role. These actor can receive one of the following types of messages:

  • Replicator.Update - performs an update for specific data. It contains the information about:
    • key to the updated data - the Distributed Data is an in-memory key-value data store where each key represents one CRDT. For instance, a key called "usersMap" can represent the Observed Remove Map structure and another key called "onlineUsers" can be linked to the Growing Counter.
    • write consistency level - defines on how the updated data will be replicated in the cluster. Among the available levels we can distinguish:
      • local - the write applies immediately on the local actor and later is dispatched asynchronously by the gossip protocol to replicas. It's pretty fast (low latency) but doesn't prevent against stale data (= updates from other nodes are not visible at the moment of writing because of the asynchronous replication; it applies more on read side described in the next point)
      • n - the modified data is immediately written to n replicas including the local node
      • majority - the write is applied on at least 2/N + 1 nodes where N is the number of nodes in the cluster
      • all - the new value must be immediately written to all nodes
      The consistency level impacts the write latency. The local will be much quicker than the all level. It also defines the reply sent to the replicator. For the case of successful write Replicator.UpdateSuccess is returned. In some error occured, the Replicator.UpdateFailure is sent. An intermediate state between both of them is Replicator.UpdateTimeout. It can be returned when the value was written locally and probably on some of replicas but not on all defined in the consistency level.
    • an optional request
    • the function doing the update - it's important to notice that it must be a pure function, i.e. use only the data parameter and stable fields from enclosing scope
    • Replicator.Get - retrieves the data value for given key. As above, it defines the key and the optional request. It also specifies the read consistency levels that have the same semantic as for writes: local, n, majority and all. When the value is read only from local replica, then the values stored on other nodes are not fetched. It means that the monotonic merge function is not called. Here too some reply messages are sent back to signal a success (Replicator.GetSuccess), a failure (Replicator.GetFailure, e.g. when consistency requirement was not met or Replicator.NotFound when the key doesn't exist). It's important to notice at this occasion that the order of replies is not guaranteed when one Update message is followed by Get (or inversely), i.e. the reply for later message can arrive before the earlier.
    • Replicator.Delete - this message is similar to update except the fact that instead of modifying the data for given key it simply deletes it. It also takes the same consistency levels and an optional request.
    • Replicator.Subscribe - with this message a subscriber can be notified on any changes done for the data belonging to given key.

    Akka Distributed Data example

    To see how the Distributed Data module works, we'll take the example of Grow-only Counter that, for a quick reminder, is a counter accepting only increment operation:

    private def getCommonConfig(clusterHostname: String, ports: Seq[Int]): Config = ConfigFactory.parseString(
      s"""
        akka {
          actor.provider = cluster
          remote.artery.enabled = true
          remote.artery.canonical.hostname = 127.0.0.1
          cluster.seed-nodes = [ "akka://${clusterHostname}@127.0.0.1:${ports(0)}",
                                  "akka://${clusterHostname}@127.0.0.1:${ports(1)}",
                                  "akka://${clusterHostname}@127.0.0.1:${ports(2)}"]
          cluster.jmx.multi-mbeans-in-same-jvm = on
          log-dead-letters = on
          cluster.failure-detector.acceptable-heartbeat-pause=30s
        }
      """)
    
    private def getPortConfig(port: Int) = ConfigFactory.parseString(s"akka.remote.artery.canonical.port = $port")
    
    behavior of "Akka Distributed Data"
    
    it should "correctly increment counter for known key" in {
      val clusterHostname = "cluster"
      val clusterCommonConfig = getCommonConfig(clusterHostname, Seq(25520, 25521, 25522))
      val node1 = ActorSystem(clusterHostname, getPortConfig(25520).withFallback(clusterCommonConfig))
      val node2 = ActorSystem(clusterHostname, getPortConfig(25521).withFallback(clusterCommonConfig))
      val node3 = ActorSystem(clusterHostname, getPortConfig(25522).withFallback(clusterCommonConfig))
    
      val CounterKey = "counter1"
      // Give some time to cluster to correctly compose
      Thread.sleep(10000)
    
      val actor1 = node1.actorOf(Props(new DistributedDataHandler()), name = "node1Actor")
      val actor2 = node2.actorOf(Props(new DistributedDataHandler()), name = "node2Actor")
      val actor3 = node3.actorOf(Props(new DistributedDataHandler()), name = "node3Actor")
      actor1 ! Increment(CounterKey)
      actor2 ! Increment(CounterKey)
      actor3 ! Increment(CounterKey)
      actor3 ! Increment(CounterKey)
      actor3 ! Increment(CounterKey)
    
      // Give some time to apply the updates before asking the result
      Thread.sleep(2000)
      actor3 ! CounterValueGetter(CounterKey)
      Thread.sleep(2000)
    
      CounterStore.Data(CounterKey) shouldEqual 5
    }
    
    it should "fail because of unknown key" in {
      val clusterHostname = "cluster"
      val clusterCommonConfig = getCommonConfig(clusterHostname, Seq(35520, 35521, 35522))
      val node1 = ActorSystem(clusterHostname, getPortConfig(35520).withFallback(clusterCommonConfig))
      val node2 = ActorSystem(clusterHostname, getPortConfig(35521).withFallback(clusterCommonConfig))
      val node3 = ActorSystem(clusterHostname, getPortConfig(35522).withFallback(clusterCommonConfig))
    
      val CounterKey = "counter2"
      // Give some time to cluster to correctly compose
      Thread.sleep(10000)
    
      val actor1 = node1.actorOf(Props(new DistributedDataHandler()), name = "node1Actor")
      val actor2 = node2.actorOf(Props(new DistributedDataHandler()), name = "node2Actor")
      val actor3 = node3.actorOf(Props(new DistributedDataHandler()), name = "node3Actor")
      actor1 ! Increment(CounterKey)
      actor2 ! Increment(CounterKey)
      actor3 ! Increment(CounterKey)
      actor3 ! Increment(CounterKey)
      actor3 ! Increment(CounterKey)
    
      // Give some time to apply the updates before asking the result
      Thread.sleep(2000)
      val notExistentCounterKey = s"${CounterKey}_not_existent"
      actor3 ! CounterValueGetter(notExistentCounterKey)
      Thread.sleep(2000)
    
      CounterStore.Data should not contain (CounterKey)
      CounterStore.NotFoundCounters(notExistentCounterKey) shouldBe true
    }
    
    it should "fail because of 1-node cluster" in {
      val clusterHostname = "cluster3"
      val clusterCommonConfig = getCommonConfig(clusterHostname, Seq(45520, 45521, 45522))
      // Here, unlike in the first test case, we don't wait for the cluster to finish the composition
      // As you we'll see in the assertion, it'll lead to the situation where the count will read
      // the data only from 1 node
      val node1 = ActorSystem(clusterHostname, getPortConfig(45520).withFallback(clusterCommonConfig))
      val node2 = ActorSystem(clusterHostname, getPortConfig(45521).withFallback(clusterCommonConfig))
      val node3 = ActorSystem(clusterHostname, getPortConfig(45522).withFallback(clusterCommonConfig))
    
      val CounterKey = "counter3"
    
      val actor1 = node1.actorOf(Props(new DistributedDataHandler()), name = "node1Actor")
      val actor2 = node2.actorOf(Props(new DistributedDataHandler()), name = "node2Actor")
      val actor3 = node3.actorOf(Props(new DistributedDataHandler()), name = "node3Actor")
      actor1 ! Increment(CounterKey)
      actor2 ! Increment(CounterKey)
      actor3 ! Increment(CounterKey)
      actor3 ! Increment(CounterKey)
      actor3 ! Increment(CounterKey)
    
      // Give some time to apply the updates before asking the result
      Thread.sleep(2000)
      actor3 ! CounterValueGetter(CounterKey)
      Thread.sleep(2000)
    
      // As you can see, since the cluster wasn't composed on time (= before counting),
      // the result will be taken from 1 node
      CounterStore.Data(CounterKey) shouldEqual 3
    }
    
    object CounterStore {
      val Data: mutable.Map[String, Long] = new mutable.HashMap[String, Long]()
    
      val NotFoundCounters: mutable.Map[String, Boolean] = new mutable.HashMap[String, Boolean]()
    }
    
    case class Increment(counterKey: String)
    
    case class CounterValueGetter(counterKey: String)
    
    class DistributedDataHandler() extends Actor with ActorLogging {
      implicit val cluster = Cluster(context.system)
    
      val replicator = DistributedData(context.system).replicator
    
      override def receive: Receive = {
        case incrementMsg: Increment => {
          replicator ! Update(GCounterKey(incrementMsg.counterKey), GCounter(), WriteLocal)(number => {
            number + 1
          })
        }
        case counterGetterMsg: CounterValueGetter => {
          replicator ! Get(GCounterKey(counterGetterMsg.counterKey), ReadAll(3.seconds), Some(counterGetterMsg.counterKey))
        }
        case success @ GetSuccess(key: GCounterKey, request) => {
          val growOnlyCounter = success.get(key)
          CounterStore.Data.put(success.request.get.toString, growOnlyCounter.getValue.longValue())
        }
        case msg @ UpdateSuccess(key, request) => {
          log.info(s"Got successful update from ${msg}")
        }
        case notFound @ NotFound(key, request) => {
          CounterStore.NotFoundCounters.put(notFound.key.id.toString, true)
        }
      }
    }
    

    Akka's Distributed Data module implements some of Conflict-free replicated data types. Among the implemented data structures we can find the maps, sets, registers, flags and counters. This post started by introducing the module with some generalities. Its second part focused on the API and shown that besides the provided data types, it's still possible to create custom ones. The only requirement - the implementation of ReplicatedData trait and its merge method used to compute the final value of the type. The last section shown a simple use of the Distributed Data module with GCounter.

Read also about Akka and Distributed Data module here: Akka Distributed Data , Tour of Akka Cluster – Akka Distributed Data .

Share, like or comment this post on Twitter: