Partitioners in Apache Cassandra

Since Cassandra is distributed storage system, it holds data in different nodes. But how it determines data should be stored by each node ? It's the role of partitioners.

This post covers the concept of partitioners in Apache Cassandra. It's divided in 2 parts. The first presents available built-in partitioners: RandomPartitioner, Murmur3Partitioner and ByteOrderedPartitioner. The second part shows the differences between these partitioners. In the last part we can see partitioners in action.

Partitioners in Cassandra

First of all, let's define partitioner. A partitioner is in charge of defining which node should store given data. The node resolution is made according to the first part of primary key. By the way, it's the reason why this part is also called partition key. Simply speaking, partitioner should decide that a row with key starting by "A" goes to other node than the row with key starting by "Z". The decision is always made by partition key analysis. But this analysis can be made in 2 modes: ordered and random. Each of them contains its own partitioner classes.

Ordered partitioners analyze first characters of partition key and according to this analysis, they put row in appropriate node. Rows are placed ordered in Cassandra nodes ring. 'Ordered' means that a row with partition key equals to 1 will be always placed before the row with the partition key equals to 2. The 2 will be placed before 3 and so on. An example of this kind of partitioners in ByteOrderedPartitioner. It orders rows lexically by hexadecimal representation of partition key leading characters.

Random partitioners works in different way. Instead of computing static resolutions based on partition keys, they compute partition keys hashes. These computed hashes are further used to figure out when given row should be moved. Two used functions to calculate hash are MD5 (for RandomPartitioner) and MurmurHash (for Murmur3Partitioner).

Partitioners differences

Random partitioners distribute rows in unordered manner. It's the reason why some operations (pagination, conditional relations on partition key columns) needs the use of CQL's token() function to return correct results.

In the other side, ordered partitioners don't guarantee well balanced rows distribution. Imagine that your data has only rows with partition keys beginning with 'A'. Without any supplementary work, they'll all go single node. They could be rebalanced further with additional operation. This kind of nodes containing more data than the others are called hotspots.

But with oredered partitioners it's easier to deal with data slices. Cassandra takes less effort to know when (= in which node) are stored asked rows.

Cassandra partitioners in action

Partitioners are configured globally in cassandra.yaml file under partitioner entry. By default, the value corresponds to Murmur3Partitioner. Let's begin a test with it. The test consists on:

  1. Making a cluster of 3 nodes with Cassandra Cluster Manager tool
  2. Index rows defined by not equilibred partition key: 50% of rows begin by 'A', 30% by 'D' and remaining 20% (10 and 10) by 'L' and 'Z'
  3. Check how rows were distributed among cluster

"Cluster" can be created with (note explicitly set partitioner which will change according to test version):

ccm create partitioner -v 3.0.3 -n 3 --partitioner=ByteOrderedPartitioner -s

Data is injected and checked through this test case:

private static final String[] TEAMS = {"Ajax Amsterdam", "ADO Den Hag", "Atletico Madrid", "AC Ajaccio", "Athletic Bilbao",
  "Dynamo Kiev", "Dynamo Lviev", "Dnipro", "Lille OSC", "Zamalek"};
private static final Collection INSERT_STATS = new ArrayList<>();

@BeforeClass
public static void prepareContext() throws IOException, URISyntaxException, InterruptedException {
  SESSION.execute("DROP KEYSPACE IF EXISTS partitionertest");
  SESSION.execute("CREATE KEYSPACE partitionertest WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1}");
  SESSION.execute("USE partitionertest");
  String tableListQuery = TestHelper.readFromFile("/queries/create_simple_team.cql");
  System.out.println("Executing query to create table: "+tableListQuery);
  SESSION.execute(tableListQuery);
  Thread.sleep(5000);
  for (String team : TEAMS) {
    ResultSet result = insertTeam(team);
    INSERT_STATS.add(new PartitionStats(team, result.getExecutionInfo().getQueryTrace().getCoordinator(),
      getNodeHoldingData(result.getExecutionInfo())));
  }
}

@AfterClass
public static void destroyContext() {
  SESSION.execute("DROP KEYSPACE IF EXISTS partitionertest");
}

@Test
public void should_read_rows_from_the_same_node_as_during_insert() throws IOException, URISyntaxException, InterruptedException {
  for (String team : TEAMS) {
    Statement selectAllStm = QueryBuilder.select().from("partitionertest", "simple_team")
      .where(QueryBuilder.eq("teamName", team))
      .enableTracing();
    ResultSet result = SESSION.execute(selectAllStm);

    PartitionStats selectStats = new PartitionStats(team, result.getExecutionInfo().getQueryTrace().getCoordinator(),
      getNodeHoldingData(result.getExecutionInfo()));
    PartitionStats insertStats = findStatsByTeam(team);

    assertThat(insertStats.team).isEqualTo(selectStats.team);
    assertThat(insertStats.data).isEqualTo(selectStats.data);
  }

  System.out.println("> "+INSERT_STATS);
}

private static InetAddress getNodeHoldingData(ExecutionInfo executionInfo) {
  InetAddress dataNodeAddress = executionInfo.getQueryTrace().getCoordinator();
  // Try to check if list contains other address - if yes, it's the address of node with data
  Optional otherNode = executionInfo.getQueryTrace().getEvents().stream()
    .filter(e -> !e.getSource().equals(dataNodeAddress))
    .findFirst();

  return otherNode.isPresent() ? otherNode.get().getSource() : dataNodeAddress;
}

private PartitionStats findStatsByTeam(String team) {
  return INSERT_STATS.stream().filter(s -> s.team.equals(team)).findFirst().get();
}

private static ResultSet insertTeam(String teamName) {
  Statement insert = QueryBuilder.insertInto("partitionertest", "simple_team")
    .value("teamName", teamName)
    .enableTracing();
  return SESSION.execute(insert);
}

private static class PartitionStats {
  private final String team;
  private final InetAddress coordinator;
  private final InetAddress data;

  private PartitionStats(String team, InetAddress coordinator, InetAddress data) {
    this.team = team;
    this.coordinator = coordinator;
    this.data = data;
  }

  @Override
  public String toString() {
    return MoreObjects.toStringHelper(this).add("team", team).add("coordinator node", coordinator)
      .add("data node", data).toString();
  }
}

The code executed against ordered partitioner (ByteOrderedPartitioner) prints following information:

> [
  PartitionStats{team=Ajax Amsterdam, coordinator node=/127.0.0.2, data node=/127.0.0.2}, 
  PartitionStats{team=ADO Den Hag, coordinator node=/127.0.0.2, data node=/127.0.0.2}, 
  PartitionStats{team=Atletico Madrid, coordinator node=/127.0.0.1, data node=/127.0.0.2}, 
  PartitionStats{team=AC Ajaccio, coordinator node=/127.0.0.1, data node=/127.0.0.2}, 
  PartitionStats{team=Athletic Bilbao, coordinator node=/127.0.0.1, data node=/127.0.0.2}, 
  PartitionStats{team=Dynamo Kiev, coordinator node=/127.0.0.1, data node=/127.0.0.2}, 
  PartitionStats{team=Dynamo Lviev, coordinator node=/127.0.0.1, data node=/127.0.0.2}, 
  PartitionStats{team=Dnipro, coordinator node=/127.0.0.1, data node=/127.0.0.2}, 
  PartitionStats{team=Lille OSC, coordinator node=/127.0.0.1, data node=/127.0.0.2}, 
  PartitionStats{team=Zamalek, coordinator node=/127.0.0.1, data node=/127.0.0.1}
]

As you can see, we made a hotspot on node 2. It keeps 9 from 10 added rows. Let's see, what happens for random partitioner (Murmur3Partitioner):

> [
  PartitionStats{team=Ajax Amsterdam, coordinator node=/127.0.0.2, data node=/127.0.0.1}, 
  PartitionStats{team=ADO Den Hag, coordinator node=/127.0.0.2, data node=/127.0.0.1}, 
  PartitionStats{team=Atletico Madrid, coordinator node=/127.0.0.1, data node=/127.0.0.1}, 
  PartitionStats{team=AC Ajaccio, coordinator node=/127.0.0.1, data node=/127.0.0.2}, 
  PartitionStats{team=Athletic Bilbao, coordinator node=/127.0.0.3, data node=/127.0.0.3}, 
  PartitionStats{team=Dynamo Kiev, coordinator node=/127.0.0.2, data node=/127.0.0.1}, 
  PartitionStats{team=Dynamo Lviev, coordinator node=/127.0.0.2, data node=/127.0.0.2}, 
  PartitionStats{team=Dnipro, coordinator node=/127.0.0.2, data node=/127.0.0.2}, 
  PartitionStats{team=Lille OSC, coordinator node=/127.0.0.1, data node=/127.0.0.1}, 
  PartitionStats{team=Zamalek, coordinator node=/127.0.0.3, data node=/127.0.0.1}
]

In the case of random partitioner, data is balanced better: node 1 stores 6 rows, node 2 stores 3 and node 3 only 1 row. It's still not equilibred but is more acceptable than the result generated by ordered partitioner.

The article presents two types of partitioners: random and ordered. The first one is more automated, ie. doesn't need additional work to balance data equally. It does it almost good by itself. In the other hand we can retrieve ordered partitioners where rows should be rebalanced manually to avoid hotspots creation. After discovering positive and negative points about the partitioners, random partitioners are often prefered over ByteOrderedPartitioner.


If you liked it, you should read:

📚 Newsletter Get new posts, recommended reading and other exclusive information every week. SPAM free - no 3rd party ads, only the information about waitingforcode!