Epidemic protocols

A lot of names in IT come from the real world phenomena. One of them are epidemic protocols, aka gossip protocols, covered in this post.

The post is composed of 3 sections. The first one talks about the epidemic protocols from the general point of view. The next part is about 2 epidemic types: simple and complex, known also as SI Model and SIR Model. The last section shows the use of epidemic protocols with Apache Gossip incubator project.

Why epidemic ?

First and foremost, why such enigmatic name to describe this kind of protocols ? An epidemic disease is nothing good because it propagates very quickly often from a single point to multiple points, as illustrated in the following image:

What is not good in real world is not necessarily bad in the IT world. In distributed architecture quick propagation of information, or simply quick communication, are something really important. For instance nobody wants his fault-tolerant storage to asynchronously replicate persisted data with a delay of minutes. After all the nodes storing this information can fail before they successfully replicate it.

Such responsiveness is not the single reason-to-be of epidemic protocols but let's see more of them in the next section.

Epidemic protocols explained

The idea behind epidemic protocols is the same as for epidemic diseases. The "epidemic" communication starts when one node contacts X randomly chosen nodes. These nodes, in their turn, contact other X randomly chosen nodes that do the same, and so forth. As you can deduce, to make it work all nodes must be pairs. Unlike master-slave there is no a node being more important than the others. The communication between nodes is repetitive and endless but it may be stopped when some expected state is reached.

What are use cases of epidemic protocols ? Mainly they were invented to disseminate information, i.e. to send some message to the most of people without requiring their immediate feedback. It's here where appeared the concept of eventually consistent storage since the information sent that way must often be reconciled because of potential inconsistencies. This use case will be detailed in the further part of the post. Among other use cases we can find: failure detection, data aggregation and, distributed topology constructions and maintenance. The last point is widely used in Apache Cassandra and will be explained in one of next posts.

Except "epidemic" communication, what are other characteristics of these protocols ? If we should make a short list, it would probably look like:

Except Apache Cassandra also Riak, ScyllaDB and S3 use epidemic protocols, especially in cluster topology management.

Epidemic types

A node in epidemic protocols can be in one of following states:

These 3 states equal to epidemiology compartments. Thus, the models based on them are called compartmental. They are mathematical frameworks that help to understand what happens in the systems implementing epidemic protocols. Among these models we distinguish SI Model (aka: anti-entropy, simple epidemics) and SIR Model (aka: rumor mongering, complex epidemics). But before going into their details, let's focus on the common points:

SI Model works exactly as stated in above points. In push mode only one node is initially "infected" with an update. It randomly chooses nodes to infect that are updated in their turn. These nodes chose other subset of random nodes to infect and it continues until all nodes eventually converge. It's important to notice that epidemic protocols don't guarantee all nodes to be infected because of non-deterministic random choice of nodes. Because of that we may still end with some nodes not being aware of the update.

SIR Model introduces an extra state - Removed. A node is considered as "removed" when it loses the interest for the update. It may happen at every round (blind) or after receiving the information about already applied update on selected nodes (feedback). The switch to removed state is done either with a specified counter decremented at "lost of interest" (count) or randomly with a probability 1/k where k is the number of "contacts" with the node (coin).

Apache Gossip example

To see epidemic protocols in action we'll use Apache incubating project, Apache Gossip. Despite it's not maintained actively, it's a good illustration for message propagation in epidemic protocols. Let's start by proving that nodes are aware of the cluster topology changes:

private static final String CLUSTER_NAME = "test_cluster";


private static List<Member> getStartupMembers(int membersCount) throws URISyntaxException {
  List<Member> startupMembers = new ArrayList<>();
  for (int i = 0; i < membersCount; i++){
      String memberId = "id#"+i;
      URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i + 1));
      startupMembers.add(new RemoteMember(CLUSTER_NAME, uri, memberId));
  }
  return startupMembers;
}

private static List<GossipManager> getClients(int from, int clientsToGet, List<Member> startupMembers) throws URISyntaxException {
  List<GossipManager> clients = new ArrayList<>();
  GossipSettings gossipSettings = new GossipSettings();
  for (int i = from; i < clientsToGet; i++) {
      String memberId = "id#"+i;
      URI memberUri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i + 1));
      Member member = new RemoteMember(CLUSTER_NAME, memberUri, memberId);
      if (startupMembers.size() > i) {
          member = startupMembers.get(i);
      }
      GossipManager gossipService = GossipManagerBuilder.newBuilder().cluster(CLUSTER_NAME).uri(member.getUri())
              .id(member.getId()).gossipMembers(startupMembers).gossipSettings(gossipSettings).build();
      clients.add(gossipService);
      gossipService.init();
  }
  return clients;
}



@Test
public void should_correctly_observe_leave_and_join_events() throws URISyntaxException, InterruptedException {
  List<Member> startupMembers = getStartupMembers(4);
  List<GossipManager> clients = getClients(0, 7, startupMembers);

  // Give some time to discover other members
  Thread.sleep(4500);
  List<Integer> liveMembers = clients.stream().map(client -> client.getLiveMembers().size()).collect(Collectors.toList());
  List<Integer> deadMembers = clients.stream().map(client -> client.getDeadMembers().size()).collect(Collectors.toList());
  assertThat(liveMembers).containsOnly(6);
  assertThat(deadMembers).containsOnly(0);

  // Now, let one node to leave the cluster, check immediately if all nodes were aware of that
  clients.get(0).shutdown();
  // Give some time to update network topology
  Thread.sleep(4500);
  List<Integer> liveMembersAfterShutdown = clients.stream().map(client -> client.getLiveMembers().size()).collect(Collectors.toList());
  List<Integer> deadMembersAfterShutdown = clients.stream().map(client -> client.getDeadMembers().size()).collect(Collectors.toList());

  assertThat(liveMembersAfterShutdown).containsExactlyInAnyOrder(0, 5, 5, 5, 5, 5, 5);
  assertThat(deadMembersAfterShutdown).containsExactlyInAnyOrder(6, 1, 1, 1, 1, 1, 1);

  // Add new member to the cluster
  List<GossipManager> newClients = getClients(7, 8, startupMembers);
  clients.addAll(newClients);
  Thread.sleep(4500);
  List<Integer> liveMembersAfterJoin = clients.stream().map(client -> client.getLiveMembers().size()).collect(Collectors.toList());
  List<Integer> deadMembersAfterJoin = clients.stream().map(client -> client.getDeadMembers().size()).collect(Collectors.toList());

  assertThat(liveMembersAfterJoin).containsExactlyInAnyOrder(0, 6, 6, 6, 6, 6, 6, 6);
  assertThat(deadMembersAfterJoin).containsExactlyInAnyOrder(6, 1, 1, 1, 1, 1, 1, 1);

  clients.forEach(client -> client.shutdown());
}

Cluster nodes correctly notified the change in the cluster - as well when a node left the cluster as when a new node joined it. It's important to notice the time needed the update to propagate. The test uses 4.5 seconds because it was the time needed on my local machine. However, if the random choice algorithm performs poorly, this time may be not enough and the tests can fail.

As it was presented in the post, epidemic protocols are also good for message propagation:

@Test
public void should_propagate_data_to_other_nodes() throws InterruptedException, URISyntaxException {
  List<Member> startupMembers = getStartupMembers(4);
  List<GossipManager> clients = getClients(0, 7, startupMembers);

  // Give some time to discover other members
  String key = "message#1";
  SharedDataMessage sharedData = new SharedDataMessage();
  sharedData.setKey(key);
  sharedData.setPayload(2);
  sharedData.setExpireAt(Long.MAX_VALUE);
  sharedData.setTimestamp(System.currentTimeMillis());
  clients.get(0).gossipSharedData(sharedData);

  // Just after sending the message is not replicated yet
  Optional<GossipManager> clientAwareOfUpdate = clients.subList(1, clients.size()).stream()
          .filter(client -> client.findSharedGossipData(key) != null)
          .findFirst();
  assertThat(clientAwareOfUpdate).isEmpty();
  // Now, give some time to replicate
  Thread.sleep(4500);
  Optional<GossipManager> clientUnawareOfUpdate = clients.stream()
          .filter(client -> client.findSharedGossipData(key) == null)
          .findFirst();
  assertThat(clientUnawareOfUpdate).isEmpty();

  clients.forEach(client -> client.shutdown());
}

Here you can observe that a message is first sent to one node and some time later propagated to other nodes in the cluster. The note about 4.5 seconds is the same as for the first test case. But to show that, let's do a simple setup with 4 startup nodes and let 10 other nodes to discover the topology:

@Test
public void should_show_topology_discover_in_progress() throws URISyntaxException, InterruptedException {
  List<Member> startupMembers = getStartupMembers(4);
  List<GossipManager> clients = getClients(0, 14, startupMembers);

  // Give some time to discover other members, but not too much to show the discovery in progress
  Thread.sleep(500);
  List<Integer> liveMembers = clients.stream().map(client -> client.getLiveMembers().size()).collect(Collectors.toList());

  // a sample output: [11, 3, 3, 4, 9, 9, 3, 9, 0, 3, 4, 0, 2, 5]
  assertThat(liveMembers).doesNotContain(14);
  Optional<Integer> discoveredMembers = liveMembers.stream().filter(livMembersCount -> livMembersCount > 0).findFirst();
  assertThat(discoveredMembers).isNotEmpty();
  
  clients.forEach(client -> client.shutdown());
}

Epidemic protocols are a powerful solution that benefits from the fact that all nodes are peers. It scales well and is pretty elastic compared to other possible solutions. However as talked in 2 first sections, it's based on a random and non-deterministic choice of nodes to communicate. This can sometimes slow down message propagation or even lead to a eventual consistency in the cluster. Despite that, epidemic protocols are still used in distributed computing and Apache Cassandra, ScyllaDB are only few proofs of that.