Previously we've presented theory of data consistency in Cassandra. Now it's a good moment to show some examples of consistency levels.
Data Engineering Design Patterns
Looking for a book that defines and solves most common data engineering problems? I'm currently writing
one on that topic and the first chapters are already available in π
Early Release on the O'Reilly platform
I also help solve your data engineering problems π contact@waitingforcode.com π©
This article is composed by 4 examples, each one preceded by instructions preparing cluster state. After it's followed by test cases illustrating the expected behavior. In the first part we show TWO consistency for 2 nodes cluster, when both are up. The second part shows the case when one of them is down. The 3rd and 4th cases illustrate the same concept (working and not working) for QUORUM consistency level.
For tests we use Cassandra Cluster Manager tool. Simple cluster of 2 nodes can be started with following command:
ccm create test_two -v 3.0.3 -n 2 -s # for TWO consistency test ccm create test_two -v 3.0.3 -n 3 -s # for QUORUM consistency test
TWO consistency level for 2 alive nodes
Before launching the tests for this case, we must be sure that both nodes are running. We can do it by calling ccm status. The output should look like:
Cluster: 'test_two' ---------------------- node1: UP node2: UP
Now, the tests:
private static final Statement INSERT_TWO_CONSISTENCY = QueryBuilder.insertInto("consistencytest", "players_consistent") .value("name", "Player_1").value("teamName", "Team1") .setConsistencyLevel(ConsistencyLevel.TWO) .enableTracing(); private static final Statement SELECT_TWO_CONSISTENCY = QueryBuilder.select().from("consistencytest", "players_consistent") .where(QueryBuilder.eq("name", "Fixed player")).setConsistencyLevel(ConsistencyLevel.TWO) .enableTracing(); @Test public void should_correctly_operate_on_data_with_TWO_consistency_level() { ResultSet result = SESSION.execute(INSERT_TWO_CONSISTENCY); assertThat(result.getExecutionInfo().getQueryTrace().getEvents()).extracting("name") .contains("Determining replicas for mutation", "Sending MUTATION message", "MUTATION message received"); } @Test public void should_correctly_read_data_with_TWO_consistency_level_when_2_nodes_are_running() { ResultSet result = SESSION.execute(SELECT_TWO_CONSISTENCY); List<String> expected = Lists.newArrayList("Merging data from memtables and 0 sstables", "Acquiring sstable references"); List<QueryTrace.Event> events = result.getExecutionInfo().getQueryTrace().getEvents().stream() .filter(event -> expected.contains(event.getDescription())).collect(Collectors.toList()); assertThat(events).hasSize(4); assertThat(events.stream().map(e -> e.getSource().toString())).containsOnly("/127.0.0.1", "/127.0.0.2"); }
As you can see thanks to trace logs, Cassandra executes SELECT and INSERT queries on both available nodes, according to replication_factory specified during keyspace creation.
TWO consistency level for 1 of 2 nodes alive
Now, we should stop one node. It can be done with this command:
ccm node2 stop
Once again, before launching these tests, it's good to check with ccm status if the 2nd node is really down:
@Test public void should_fail_on_inserting_data_with_consistency_TWO_when_one_node_is_down() throws InterruptedException { System.out.println("Before launching the test, please stop one of 2 running nodes..."); Thread.sleep(10000); try { SESSION.execute(INSERT_TWO_CONSISTENCY); fail("Should not execute INSERT with consistency TWO when only one replica is running"); } catch (NoHostAvailableException exception) { assertThat(exception.getMessage()).contains("Not enough replicas available for query at consistency TWO"); } } @Test public void should_fail_read_when_only_1_node_is_running_while_TWO_consistency_is_set() throws InterruptedException { System.out.println("Before launching the test, please stop one of 2 running nodes..."); Thread.sleep(10000); try { SESSION.execute(SELECT_TWO_CONSISTENCY); fail("Should not execute SELECT with consistency TWO when only one replica is running"); } catch (NoHostAvailableException exception) { assertThat(exception.getMessage()).contains("Not enough replicas available for query at consistency TWO"); } }
As you can observe, error message tells every time that the query can't be satisfied because of not met consistency expectations.
QUORUM consistency with all nodes alive
For the next 2 tests we'll run a cluster composed of 3 nodes. It can be created with CCM with the second command:
ccm create test_two -v 3.0.3 -n 3
If all nodes are up, we can execute following test:
@Test public void should_correctly_insert_data_when_all_nodes_are_up() { ResultSet result = SESSION.execute(INSERT_QUORUM_CONSISTENCY); assertThat(result.getExecutionInfo().getQueryTrace().getEvents()).extracting("name") .contains("Determining replicas for mutation", "Sending MUTATION message to /127.0.0.2", "Sending MUTATION message to /127.0.0.1"); } @Test public void should_correctly_read_data_when_accepted_1_node_is_down_in_quorum() throws InterruptedException { System.out.println("Before launching the test, please stop one of 3 running nodes..."); Thread.sleep(10000); ResultSet result = SESSION.execute(SELECT_QUORUM_CONSISTENCY); List<String> expected = Lists.newArrayList("Merging data from memtables and 0 sstables", "Acquiring sstable references"); List<QueryTrace.Event> events = result.getExecutionInfo().getQueryTrace().getEvents().stream() .filter(event -> expected.contains(event.getDescription())).collect(Collectors.toList()); assertThat(events).hasSize(4); assertThat(events.stream().map(e -> e.getSource().toString())).containsOnly("/127.0.0.1", "/127.0.0.2"); }
QUORUM consistency with only 1 node alive
As you remember from the artice about data consistency in Cassandra, QUORUM level needs to have RF/2 + 1 nodes alive (RF - replication factor). To see what happens when it's not the case, let's turn down 2 of 3 running nodes, for example with this command:
ccm node2 stop ccm node3 stop
Cluster state should be like that:
Cluster: 'test_quorum' ---------------------- node1: UP node2: DOWN node3: DOWN
And corresponding tests look like:
@Test public void should_fail_on_reading_data_when_2_nodes_are_down_in_quorum() throws InterruptedException { System.out.println("Before launching the test, please stop 2 of 3 running nodes..."); Thread.sleep(10000); try { SESSION.execute(SELECT_QUORUM_CONSISTENCY); fail("Should not be able to SELECT data when QUORUM consistency is not respected"); } catch (NoHostAvailableException e) { assertThat(e.getMessage()).contains( "Not enough replicas available for query at consistency QUORUM (2 required but only 1 alive)"); } } @Test public void should_fail_on_inserting_data_when_2_nodes_are_down_in_quorum() throws InterruptedException { System.out.println("Before launching the test, please stop 2 of 3 running nodes..."); Thread.sleep(10000); try { SESSION.execute(INSERT_QUORUM_CONSISTENCY); fail("Should not be able to INSERT data when QUORUM consistency is not respected"); } catch (NoHostAvailableException e) { assertThat(e.getMessage()).contains( "Not enough replicas available for query at consistency QUORUM (2 required but only 1 alive)"); } }
Regarding to tests of TWO consistency level, the message is different. But globaly, it tells the same thing - consistency is not respected, so the query can't be executed correctly.
The article shows how Cassandra behaves regarding to two situations: when consistency level is respected and when it's not. We can see that queries aren't execued at all when some mandatory nodes are down. We can also see that consistency is specified on client level, and more precisely, at query definition level.