Example of data consistency in Apache Cassandra

Previously we've presented theory of data consistency in Cassandra. Now it's a good moment to show some examples of consistency levels.

This article is composed by 4 examples, each one preceded by instructions preparing cluster state. After it's followed by test cases illustrating the expected behavior. In the first part we show TWO consistency for 2 nodes cluster, when both are up. The second part shows the case when one of them is down. The 3rd and 4th cases illustrate the same concept (working and not working) for QUORUM consistency level.

For tests we use Cassandra Cluster Manager tool. Simple cluster of 2 nodes can be started with following command:

ccm create test_two -v 3.0.3 -n 2 -s # for TWO consistency test
ccm create test_two -v 3.0.3 -n 3 -s # for QUORUM consistency test

TWO consistency level for 2 alive nodes

Before launching the tests for this case, we must be sure that both nodes are running. We can do it by calling ccm status. The output should look like:

Cluster: 'test_two'
----------------------
node1: UP
node2: UP

Now, the tests:

private static final Statement INSERT_TWO_CONSISTENCY = QueryBuilder.insertInto("consistencytest", "players_consistent")
        .value("name", "Player_1").value("teamName", "Team1")
        .setConsistencyLevel(ConsistencyLevel.TWO)
        .enableTracing();

private static final Statement SELECT_TWO_CONSISTENCY = QueryBuilder.select().from("consistencytest", "players_consistent")
        .where(QueryBuilder.eq("name", "Fixed player")).setConsistencyLevel(ConsistencyLevel.TWO)
        .enableTracing();

@Test
public void should_correctly_operate_on_data_with_TWO_consistency_level() {
  ResultSet result = SESSION.execute(INSERT_TWO_CONSISTENCY);

  assertThat(result.getExecutionInfo().getQueryTrace().getEvents()).extracting("name")
    .contains("Determining replicas for mutation", "Sending MUTATION message", "MUTATION message received");
}


@Test
public void should_correctly_read_data_with_TWO_consistency_level_when_2_nodes_are_running() {
  ResultSet result = SESSION.execute(SELECT_TWO_CONSISTENCY);


  List<String> expected = Lists.newArrayList("Merging data from memtables and 0 sstables", 
    "Acquiring sstable references");
  List<QueryTrace.Event> events = result.getExecutionInfo().getQueryTrace().getEvents().stream()
    .filter(event -> expected.contains(event.getDescription())).collect(Collectors.toList());
  assertThat(events).hasSize(4);
  assertThat(events.stream().map(e -> e.getSource().toString())).containsOnly("/127.0.0.1", "/127.0.0.2");
}

As you can see thanks to trace logs, Cassandra executes SELECT and INSERT queries on both available nodes, according to replication_factory specified during keyspace creation.

TWO consistency level for 1 of 2 nodes alive

Now, we should stop one node. It can be done with this command:

ccm node2 stop

Once again, before launching these tests, it's good to check with ccm status if the 2nd node is really down:

@Test
public void should_fail_on_inserting_data_with_consistency_TWO_when_one_node_is_down() throws InterruptedException {
  System.out.println("Before launching the test, please stop one of 2 running nodes...");
  Thread.sleep(10000);

  try {
    SESSION.execute(INSERT_TWO_CONSISTENCY);
    fail("Should not execute INSERT with consistency TWO when only one replica is running");
  } catch (NoHostAvailableException exception) {
    assertThat(exception.getMessage()).contains("Not enough replicas available for query at consistency TWO");
  }
}

@Test
public void should_fail_read_when_only_1_node_is_running_while_TWO_consistency_is_set() throws InterruptedException {
  System.out.println("Before launching the test, please stop one of 2 running nodes...");
  Thread.sleep(10000);

  try {
    SESSION.execute(SELECT_TWO_CONSISTENCY);
    fail("Should not execute SELECT with consistency TWO when only one replica is running");
  }  catch (NoHostAvailableException exception) {
    assertThat(exception.getMessage()).contains("Not enough replicas available for query at consistency TWO");
  }
}

As you can observe, error message tells every time that the query can't be satisfied because of not met consistency expectations.

QUORUM consistency with all nodes alive

For the next 2 tests we'll run a cluster composed of 3 nodes. It can be created with CCM with the second command:

ccm create test_two -v 3.0.3 -n 3

If all nodes are up, we can execute following test:

@Test
public void should_correctly_insert_data_when_all_nodes_are_up() {
  ResultSet result = SESSION.execute(INSERT_QUORUM_CONSISTENCY);

  assertThat(result.getExecutionInfo().getQueryTrace().getEvents()).extracting("name")
    .contains("Determining replicas for mutation",
        "Sending MUTATION message to /127.0.0.2", "Sending MUTATION message to /127.0.0.1");
}

@Test
public void should_correctly_read_data_when_accepted_1_node_is_down_in_quorum() throws InterruptedException {
  System.out.println("Before launching the test, please stop one of 3 running nodes...");
  Thread.sleep(10000);

  ResultSet result = SESSION.execute(SELECT_QUORUM_CONSISTENCY);

  List<String> expected = Lists.newArrayList("Merging data from memtables and 0 sstables", "Acquiring sstable references");
  List<QueryTrace.Event> events = result.getExecutionInfo().getQueryTrace().getEvents().stream()
          .filter(event -> expected.contains(event.getDescription())).collect(Collectors.toList());
  assertThat(events).hasSize(4);
  assertThat(events.stream().map(e -> e.getSource().toString())).containsOnly("/127.0.0.1", "/127.0.0.2");
}

QUORUM consistency with only 1 node alive

As you remember from the artice about data consistency in Cassandra, QUORUM level needs to have RF/2 + 1 nodes alive (RF - replication factor). To see what happens when it's not the case, let's turn down 2 of 3 running nodes, for example with this command:

ccm node2 stop
ccm node3 stop

Cluster state should be like that:

Cluster: 'test_quorum'
----------------------
node1: UP
node2: DOWN
node3: DOWN

And corresponding tests look like:

@Test
public void should_fail_on_reading_data_when_2_nodes_are_down_in_quorum() throws InterruptedException {
  System.out.println("Before launching the test, please stop 2 of 3 running nodes...");
  Thread.sleep(10000);

  try {
    SESSION.execute(SELECT_QUORUM_CONSISTENCY);
    fail("Should not be able to SELECT data when QUORUM consistency is not respected");
  } catch (NoHostAvailableException e) {
    assertThat(e.getMessage()).contains(
      "Not enough replicas available for query at consistency QUORUM (2 required but only 1 alive)");
  }
}

@Test
public void should_fail_on_inserting_data_when_2_nodes_are_down_in_quorum() throws InterruptedException {
  System.out.println("Before launching the test, please stop 2 of 3 running nodes...");
  Thread.sleep(10000);

  try {
    SESSION.execute(INSERT_QUORUM_CONSISTENCY);
    fail("Should not be able to INSERT data when QUORUM consistency is not respected");
  } catch (NoHostAvailableException e) {
    assertThat(e.getMessage()).contains(
      "Not enough replicas available for query at consistency QUORUM (2 required but only 1 alive)");
  }
}

Regarding to tests of TWO consistency level, the message is different. But globaly, it tells the same thing - consistency is not respected, so the query can't be executed correctly.

The article shows how Cassandra behaves regarding to two situations: when consistency level is respected and when it's not. We can see that queries aren't execued at all when some mandatory nodes are down. We can also see that consistency is specified on client level, and more precisely, at query definition level.


If you liked it, you should read:

📚 Newsletter Get new posts, recommended reading and other exclusive information every week. SPAM free - no 3rd party ads, only the information about waitingforcode!