When I was learning about the secondary index in Cassandra, I've found the mention of special Cassandra's algorithm used to range and secondary index queries. After some time passed on exploring secondary index mechanism, it's a good moment to discover the algorithm making it work.
Data Engineering Design Patterns
Looking for a book that defines and solves most common data engineering problems? I'm currently writing
one on that topic and the first chapters are already available in π
Early Release on the O'Reilly platform
I also help solve your data engineering problems π contact@waitingforcode.com π©
In this post we'll discover the details of Cassandra's the range query algorithm. Its first part describes the general idea. It also explains when the algorithm is used. The second part shows the exact steps and the code used to implement the range query method. The last section shows how the algorithm behaves during query execution.
General idea
The range query algorithm, as its name indicates, is responsible for range scans in Apache Cassandra. It's also involved during the queries executed against the secondary index (you can learn more about it in the post about Secondary index in NoSQL data stores). The algorithm comes as an alternative to some potential problems caused by another possible solution for secondary indexes read that is the querying of all nodes. Firstly, if all nodes in a large cluster (thousands of nodes) were queried, it would take a lot of time and would probably lead to network bandwidth issues. Secondly, querying a big number of nodes (primary replicas) would also add an overhead to the coordinator. For instance, if the query limits the number of results to 10, the coordinator would need to get the rows from all nodes and apply limit clause on them only after. The range query algorithm brings a little bit of smartness to that naive approach.
The main idea behind the algorithm consists on querying primary replicas by rounds. In every round, of course only if the number of rows doesn't satisfy the user's request, a different node(s) is asked to return the data. The number of contacted nodes is determined by the attribute called concurrency factor. The value of this attribute depends on the relation between number of rows and token ranges (it will be detailed in the next section).
Token ranges
The general idea behind token ranges is similar to the idea of tokens per node described in the post about Dynamo paper and consistent hashing. As described there, each node in the cluster is responsible for a given range of values (tokens). It's the same in Cassandra where each of nodes is in charge of specific range of tokens.
One of requirements for this algorithm's performance is the even data distribution. The range query algorithm assumes that the number of rows in each node is quite similar.
Range query implementation
In the first step, the algorithm estimates the number of returned rows. The estimation occurs in 2 different conditions:
- No secondary index used in the query - the estimation is based on the estimated number of keys in given SSTable.
- One or more secondary indexes are present in the query - Cassandra chooses the most appropriated index for the query, i.e. the most restrictive one. In the first step, if the query contains some filters on one of defined secondary indexes, it filters the list by keeping only the indexes which can have the filter expression applied. Next, the choice of the final estimation is based on the value representing estimated number of rows in each of filtered indexes.
So estimated number of rows is considered as the max number of results that the query can return. This number is later divided by the number of tokens in given node. The number of stored tokens directly influences the amount of data stored in given node. Obviosly, more tokens are present, more data is sent to the given node in the ring. The division result is divided in its turn by the replication factor of given keyspace (if you don't remember what it is, you can read the post about Data part in Apache Cassandra). This estimation step is expressed in the code:
private static float estimateResultsPerRange(PartitionRangeReadCommand command, Keyspace keyspace) { ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(command.metadata().cfId); Index index = command.getIndex(cfs); float maxExpectedResults = index == null ? command.limits().estimateTotalResults(cfs) : index.getEstimatedResultRows(); // adjust maxExpectedResults by the number of tokens this // node has and the replication factor for this ks return (maxExpectedResults / DatabaseDescriptor.getNumTokens()) / keyspace.getReplicationStrategy().getReplicationFactor(); }
The next step underestimates the estimated number of rows by substracting: estim_rows - (estim_rows * 0.1). This operation increases the likelihood of getting all needed rows at the first round. Later, the concurrency factor (= number of rows to contact in given round) is computed as:
- 1 if the rows estimation is equal to 0
- or max(1, min(ranges_number, ceil(query_limit/estim_rows))) where: ranges_number is the number of queried ranges, query_limit is the LIMIT clause from the user's request and estim_rows the estimated number of rows from previous step
Range query execution
After the first execution, when not all results are fetched, 2 situations can happen:
- either the number of returned rows is equal to 0 - then the concurrency factor is modified as so: ranges_number - already_queried_ranges. Simply speaking, the query will be sent to the remaining ranges at once
- or only a part of results was found - in this case the concurrency factor is modified according to the following steps:
int remainingRows = command.limits().count() - liveReturned; float rowsPerRange = (float)liveReturned / (float)rangesQueried; concurrencyFactor = Math.max(1, Math.min(totalRangeCount - rangesQueried, Math.round(remainingRows / rowsPerRange)));
The code doesn't need a lot of comments. The first line computes the number of remaining rows to retrieve. The second line determines the number of rows per queried ranges. The last step updates the concurrency factor with the formula similar to the one from the beginning of the algorithm, except that it's aware of already queried ranges and already retrieved rows.
The range query algorithm is implemented in Apache Cassandra to deal with range and secondary index queries. As shown through this post, the algorithm determines an attribute called concurrency factor with the goal to limit the number of nodes queried to retrieve the results. However, it works by a strong assumption about even data distribution. Thus some performance issues could be explained by an uneven distribution.