Secondary index in NoSQL data stores

on waitingforcode.com

Secondary index in NoSQL data stores

The data organization in key-value oriented NoSQL databases is very often based on a pair of keys: partition and sorting. However they also offer other feature called secondary index that can be a good alternative to previously described index table pattern.

This post presents the idea of secondary index. The first section explains the general concepts of this mechanism and gives some ideas about how to use and not use it. The second, and at the same time the last, part defines a simple implementation of secondary index in Scala.

Basic concepts

The secondary index is a data structure containing a subset of attributes from some "parent" table. This "parent" table is in reality called base table because the secondary index table is constructed directly from it.

A big advantage of the secondary index is that it can use different partition and sort keys than the base table. It can also use only a part of the base table's columns.

Unlike we could initially think, the secondary index is not a table distributed with its partition key. Instead it's stored on the same node as the base table. It guarantees faster response time. Otherwise the table, on flushing the data, should also write it to the distributed tables and wait that all of replicas are correctly written (according to quorum rule). However, if the secondary index is written locally, simply as an extra mutation of original data, this requirement is not true anymore. Locating the secondary index on the same node as the base table doesn't mean that the reading path must query all nodes in the cluster. The strategy will obviously depend on the database but for instance in Cassandra, the reading is controlled by the algorithm called range scan in order to limit the number of query=ied nodes.

The data in the secondary index is either copied or fetched. The first approach consists on taking the values of the base table and copying them to the secondary index. As it's easy to see, it brings a good performance on query time but adds more complexity on writing. The latter solution references only partitions keys of base table and uses them to fetch the values from base table. It makes the writing process quite simple and fast but slows down the reading part.

As told in the introduction, the secondary index is one of the most unknown features but it also one of the misknown features. In order to work efficiently it must respect some rules. The secondary index should not apply to the values with too low (e.g. gender -> male/female) or too high cardinality (e.g. user's unique id). In the first case the querying won't be efficient while the situation leads to wide index partitions and automatically we've a lot of data to scan. Similar rule applies to the latter case when the query can be executed at best on 1 node and at worst in all nodes.

Sample implementation

Our sample of secondary index implementation has 2 indexing strategy: copy and fetch. It'll also support the index creation after adding some values to the base table. The test is done against some football teams in 2002. It's also a simplified version since it supports only String values and has a lot of shortcuts:

class SecondaryIndexTest extends FunSuite with Matchers {

  private def footballersTable(indexStrategy: SecondaryIndexStorageStrategy): BaseTable = {
    val baseTable = new BaseTable("name", Seq("age", "position", "team"))
    baseTable.createSecondaryIndex("position", indexStrategy, Seq("name"))
    baseTable.writeValue(("name", "Tony Adams"), ("age", "36"), ("position", "defender"), ("team", "Arsenal"))
    baseTable.writeValue(("name", "Martin Keown"), ("age", "36"), ("position", "defender"), ("team", "Arsenal"))
    baseTable.writeValue(("name", "Ryan Giggs"), ("age", "29"), ("position", "midfielder"), ("team", "Manchester United"))
    baseTable
  }

  test("should read entry from index with fetching strategy") {
    val baseTable = footballersTable(FetchSecondaryIndexStorageStrategy)

    val defenders = baseTable.readFromSecondaryIndex("position", "defender")

    val names = defenders.map(row => row.attributes("name"))
    names should have size 2
    names should contain allOf("Tony Adams", "Martin Keown")
    val ages = defenders.map(row => row.attributes("age"))
    ages should have size 2
    ages(0) shouldEqual "36"
    ages(1) shouldEqual "36"
  }

  test("should read entry from index with copy strategy") {
    val baseTable = footballersTable(CopySecondaryIndexStorageStrategy)

    val defenders = baseTable.readFromSecondaryIndex("position", "defender")

    val names = defenders.map(row => row.attributes("name"))
    names should have size 2
    names should contain allOf("Tony Adams", "Martin Keown")
  }

  test("should create secondary index after adding some values") {
    val baseTable = new BaseTable("name", Seq("age", "position", "team"))
    baseTable.writeValue(("name", "Jerzy Dudek"), ("age", "29"), ("position", "goalkeeper"), ("team", "Liverpool FC"))
    baseTable.createSecondaryIndex("position", CopySecondaryIndexStorageStrategy, Seq("name"))

    val goalkeepers = baseTable.readFromSecondaryIndex("position", "goalkeeper")

    val names = goalkeepers.map(row => row.attributes("name"))
    names should have size 1
    names(0) shouldEqual "Jerzy Dudek"
  }

}


class BaseTable(val partitionKey: String, attributes: Seq[String]) {

  private val secondaryIndexes: mutable.Map[String, SecondaryIndex] = new mutable.HashMap[String, SecondaryIndex]()

  private val data = new mutable.HashMap[String, Row]()

  def createSecondaryIndex(indexColumn: String, indexStrategy: SecondaryIndexStorageStrategy, attributes: Seq[String]) = {
    val secondaryIndex = new SecondaryIndex(indexColumn, indexStrategy, attributes, this)
    secondaryIndexes.put(indexColumn, secondaryIndex)

    if (data.nonEmpty) {
      data.foreach(entry => writeValueToSecondaryIndex(secondaryIndex, entry._2))
    }
  }

  def writeValue(keyValuePairs: (String, String)*) = {
    val attributes = Map(keyValuePairs.map(pair => pair._1 -> pair._2): _*)
    assert(attributes.contains(partitionKey), "Invalid entry - it doesn't contain partition key")

    val row = new Row(attributes)
    data.put(attributes(partitionKey), row)
    secondaryIndexes.foreach(entry => writeValueToSecondaryIndex(entry._2, row))
  }

  def readFromSecondaryIndex(key: String, value: String): Seq[Row] = {
    secondaryIndexes(key).readValue(value)
  }

  private def writeValueToSecondaryIndex(secondaryIndex: SecondaryIndex, row: Row): Unit = {
    secondaryIndex.writeValue(row)
  }

  def readBatch(keys: Seq[String]): Seq[Row] = {
    println(s"Fetching rows for keys ${keys}")
    val dataToReturn = new mutable.ListBuffer[Row]()
    keys.foreach(key => {
      val dataValue = data.get(key)
      if (dataValue.isDefined) {
        dataToReturn.append(dataValue.get)
      }
    })
    dataToReturn
  }

}

class SecondaryIndex(indexKey: String, indexStrategy: SecondaryIndexStorageStrategy, val attributes: Seq[String],
                     val baseTable: BaseTable) {

  private val inverseIndex = new mutable.HashMap[String, Seq[Row]]()

  def readValue(partitionKey: String): Seq[Row] = {
    val rows = inverseIndex(partitionKey)
    indexStrategy.read(this, rows)
  }

  def writeValue(row: Row) = {
    val inverseIndexKey = row.attributes(indexKey)
    val currentValues = inverseIndex.getOrElse(inverseIndexKey, Seq.empty)
    val newValues = currentValues :+ indexStrategy.mutateToWritableRow(this, row)
    inverseIndex.put(inverseIndexKey, newValues)
  }

}

// For the simplicity reason [[keyValuePairs]] accepts only String values
class Row(val attributes: Map[String, String]) {

  override def toString: String = s"Row ${attributes}"

}

trait SecondaryIndexStorageStrategy {
  def mutateToWritableRow(secondaryIndex: SecondaryIndex, row: Row): Row = {
    val attributesToSave = row.attributes.filter(entry => secondaryIndex.attributes.contains(entry._1))
    new Row(attributesToSave)
  }

  def read(secondaryIndex: SecondaryIndex, rows: Seq[Row]): Seq[Row]
}

object FetchSecondaryIndexStorageStrategy extends SecondaryIndexStorageStrategy {
  override def read(secondaryIndex: SecondaryIndex, rows: Seq[Row]): Seq[Row] = {
    val keys = rows.map(row => row.attributes.head._2)
    secondaryIndex.baseTable.readBatch(keys)
  }
}
object CopySecondaryIndexStorageStrategy extends SecondaryIndexStorageStrategy {
  override def read(secondaryIndex: SecondaryIndex, rows: Seq[Row]): Seq[Row] = rows
}

The index partition table is not the single way to define additional indexes in key-value NoSQL databases. Another solution is the secondary index that is a data structure located on the same node as the base table. But, as shown in the first section, it has to be used carefuly because not all columns are the good candidates for secondary index keys. The second section provided a simple implementation of the secondary index for in-memory dummy database. It shown how 2 indexing strategy (copy and fetch attributes) could be implemented.

Read also about Secondary index in NoSQL data stores here: Cassandra Native Secondary Index Deep Dive , Improving Data Access with Secondary Indexes .

Share, like or comment this post on Twitter: