Range query algorithm in Apache Cassandra

Versions: Apache Cassandra 3.0.0

When I was learning about the secondary index in Cassandra, I've found the mention of special Cassandra's algorithm used to range and secondary index queries. After some time passed on exploring secondary index mechanism, it's a good moment to discover the algorithm making it work.

In this post we'll discover the details of Cassandra's the range query algorithm. Its first part describes the general idea. It also explains when the algorithm is used. The second part shows the exact steps and the code used to implement the range query method. The last section shows how the algorithm behaves during query execution.

General idea

The range query algorithm, as its name indicates, is responsible for range scans in Apache Cassandra. It's also involved during the queries executed against the secondary index (you can learn more about it in the post about Secondary index in NoSQL data stores). The algorithm comes as an alternative to some potential problems caused by another possible solution for secondary indexes read that is the querying of all nodes. Firstly, if all nodes in a large cluster (thousands of nodes) were queried, it would take a lot of time and would probably lead to network bandwidth issues. Secondly, querying a big number of nodes (primary replicas) would also add an overhead to the coordinator. For instance, if the query limits the number of results to 10, the coordinator would need to get the rows from all nodes and apply limit clause on them only after. The range query algorithm brings a little bit of smartness to that naive approach.

The main idea behind the algorithm consists on querying primary replicas by rounds. In every round, of course only if the number of rows doesn't satisfy the user's request, a different node(s) is asked to return the data. The number of contacted nodes is determined by the attribute called concurrency factor. The value of this attribute depends on the relation between number of rows and token ranges (it will be detailed in the next section).

Token ranges

The general idea behind token ranges is similar to the idea of tokens per node described in the post about Dynamo paper and consistent hashing. As described there, each node in the cluster is responsible for a given range of values (tokens). It's the same in Cassandra where each of nodes is in charge of specific range of tokens.

One of requirements for this algorithm's performance is the even data distribution. The range query algorithm assumes that the number of rows in each node is quite similar.

Range query implementation

In the first step, the algorithm estimates the number of returned rows. The estimation occurs in 2 different conditions:

  1. No secondary index used in the query - the estimation is based on the estimated number of keys in given SSTable.
  2. One or more secondary indexes are present in the query - Cassandra chooses the most appropriated index for the query, i.e. the most restrictive one. In the first step, if the query contains some filters on one of defined secondary indexes, it filters the list by keeping only the indexes which can have the filter expression applied. Next, the choice of the final estimation is based on the value representing estimated number of rows in each of filtered indexes.

So estimated number of rows is considered as the max number of results that the query can return. This number is later divided by the number of tokens in given node. The number of stored tokens directly influences the amount of data stored in given node. Obviosly, more tokens are present, more data is sent to the given node in the ring. The division result is divided in its turn by the replication factor of given keyspace (if you don't remember what it is, you can read the post about Data part in Apache Cassandra). This estimation step is expressed in the code:

private static float estimateResultsPerRange(PartitionRangeReadCommand command, Keyspace keyspace)
{
  ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(command.metadata().cfId);
  Index index = command.getIndex(cfs);
  float maxExpectedResults = index == null
    ? command.limits().estimateTotalResults(cfs)
    : index.getEstimatedResultRows();

  // adjust maxExpectedResults by the number of tokens this 
  // node has and the replication factor for this ks
  return (maxExpectedResults / DatabaseDescriptor.getNumTokens()) / keyspace.getReplicationStrategy().getReplicationFactor();
}

The next step underestimates the estimated number of rows by substracting: estim_rows - (estim_rows * 0.1). This operation increases the likelihood of getting all needed rows at the first round. Later, the concurrency factor (= number of rows to contact in given round) is computed as:

Range query execution

After the first execution, when not all results are fetched, 2 situations can happen:

The range query algorithm is implemented in Apache Cassandra to deal with range and secondary index queries. As shown through this post, the algorithm determines an attribute called concurrency factor with the goal to limit the number of nodes queried to retrieve the results. However, it works by a strong assumption about even data distribution. Thus some performance issues could be explained by an uneven distribution.