Key-value distribution patterns

Key-value stores have the advantage of being a kind of distributed and high-available memory cache. But even though they're quite easy to manipulate thanks to the key-based access, they also have some complicated tasks. One of them is the strategy of picking a good key.

Data Engineering Design Patterns

Looking for a book that defines and solves most common data engineering problems? I'm currently writing one on that topic and the first chapters are already available in πŸ‘‰ Early Release on the O'Reilly platform

I also help solve your data engineering problems πŸ‘‰ contact@waitingforcode.com πŸ“©

This post focuses on that key strategy choice. Its first part explains why choosing a good key is important. The second section shows different patterns you can apply to improve the quality of a poor key choice.

Generally about key-value stores

In key-value stores, the key is the unit of partitioning so choosing a bad one will sooner or later negatively impact the performance of our application. But the distribution is not the only concern. Another one is the querying. Key-value stores are most of the time designed around the idea of query-first pattern. That means that you will think about the query patterns before designing the database schema. And these 2 points can often be concurrent.

Let's imagine an example of an e-commerce store where you want to track the number of products bought per hour every day. An obvious choice to respect query-first pattern is the use of date time truncated to hours. But unfortunately, it doesn't guarantee even partitioning. After all, you can't guarantee that the number of products bought every hour will be at worse slightly different. This simple example shows that choosing a key respecting both criteria is not an easy piece of cake.

To sum up, among bad ideas for the partition keys we can find:

As told in the last point, the used hashing algorithm is the key to understand how the data will be distributed. Very often the used algorithm is the modulo-based one where the partition is computed as hash(partition key) % number of partitions. And that's the algorithm I will take into account in the next part of the post.

Distribution patterns

Fortunately some of the distribution patterns exist. The first of them uses composite partition keys. This type of partition key is constructed from 2 or more attributes, often initially considered as normal columns - we talk then about promoting the column to the key position. For instance, you can build the composite key from product code and the client who bought it. The following snippet shows how the distribution may vary with and without the composite key for a modulo-based partitioner for that product code example, where some of the products are more popular than the others:

  private val AllPartitions = 4
  private val ProductCodes = (0 to 10)
  private val ProductCodesOrderedQuantity = (10 to 10000).by(100)
  private val OrdersWithQuantity = ProductCodes.zip(ProductCodesOrderedQuantity)

  "product code" should "be a poor choice for distribution key" in {
    val partitions = OrdersWithQuantity.map {
      case (orderId, quantity) => {
        (1 to quantity).map(_ => HashPartitioner.getPartition(s"${orderId}", AllPartitions))
      }
    }.flatten

    val groupedPartitions = partitions.groupBy(partition => partition)

    groupedPartitions("0").size shouldEqual 2240
    groupedPartitions("1").size shouldEqual 1320
    groupedPartitions("2").size shouldEqual 920
    groupedPartitions("3").size shouldEqual 1130
  }
  "composite key" should "help to balance the partitions" in {
    val partitionsWithUser = OrdersWithQuantity.map {
      case (orderId, quantity) => {
        (1 to quantity).map(userId => HashPartitioner.getPartition(s"${orderId}#${userId}", AllPartitions))
      }
    }.flatten

    val groupedPartitions = partitionsWithUser.groupBy(partition => partition)
    println(s"grouped=${groupedPartitions.map(kv => s"${kv._1}=${kv._2.size}")}")

    groupedPartitions("0").size shouldEqual 1411
    groupedPartitions("1").size shouldEqual 1427
    groupedPartitions("2").size shouldEqual 1420
    groupedPartitions("3").size shouldEqual 1352
  }

As you can notice, the distribution is poor with the exclusive product code. It's because of the popularity of several items. But as you can also observe, if we use a composite partition key the distribution is pretty even. However, this approach has an important drawback. It violates the query pattern. Here if we want to get all orders for the given product, we'll need first to retrieve all clients in our database. An alternative solution similar to this one could be the use of salting which, instead of using the natural composite keys, uses its numeric representation from a finite set of values:

  "composite salted key" should "help to balance the partitions" in {
    val allSalts = 200
    val partitionsWithSalt = OrdersWithQuantity.map {
      case (orderId, quantity) => {
        (1 to quantity).map(userId => HashPartitioner.getPartition(s"${orderId}#${HashPartitioner.getPartition(userId.toString, allSalts)}", AllPartitions))
      }
    }.flatten

    val groupedPartitions = partitionsWithSalt.groupBy(partition => partition)

    groupedPartitions("0").size shouldEqual 1384
    groupedPartitions("1").size shouldEqual 1325
    groupedPartitions("2").size shouldEqual 1397
    groupedPartitions("3").size shouldEqual 1504
  }


The result is a little bit worse than for natural composite keys but the distribution is much better than for the use of product code as a distribution key. Unfortunately, salting has also some drawbacks. It supposes a fixed number of "salt partitions". When you'll want to change it, the distribution will change and you'll unable to retrieve the data belonging to the same salted value. For instance, all orders of user#1 can be located in the first partition but after increasing the number of salt partitions, the new orders may be located elsewhere. Of course, you can mitigate the issue by repartitioning the entries but it adds more complexity and impacts the performance. Also, it complexifies the querying because in order to get all products, you will need to do in a loop.

Of course, if you don't want to deal with composite and salted keys, you can still use keys with high-cardinality as e-mail addresses or random values as UUIDs:

  "high-cardinality UUID" should "guarantee even partitions" in {
    val partitionsWithUUID = OrdersWithQuantity.map {
      case (orderId, quantity) => {
        (1 to quantity).map(userId => HashPartitioner.getPartition(UUID.randomUUID().toString, AllPartitions))
      }
    }.flatten

    val groupedPartitions = partitionsWithUUID.groupBy(partition => partition)
    // groupedPartitions.map(kv => s"${kv._1}=${kv._2.size}")
    // Sample distribution: List(2=1367, 1=1381, 0=1399, 3=1463)

    val partitionSizes = Seq(groupedPartitions("0").size, groupedPartitions("1").size, groupedPartitions("2").size,
      groupedPartitions("3").size)
    partitionSizes.foreach(partitionSize => {
      partitionSizes.foreach(comparedPartitionSize => {
        assert((partitionSize - comparedPartitionSize).abs <= 150, "The difference between 2 partitions can't be greater " +
          s"than 150 but was not: ${partitionSize} - ${comparedPartitionSize}")
      })
    })
  }

As you can see, the distribution is quite even. However, the solution using UUID or even auto-incremented numbers is difficult to use. But it should work pretty good for more natural high-cardinality keys as e-mail addresses:

  "e-mails" should "be good but not the best partition key choice" in {
    val emails = (0 to 2200).map(nr => s"${Random.alphanumeric.take(10).toString}@gmail.com)") ++
      (0 to 1200).map(nr => s"${Random.alphanumeric.take(10).toString}@yahoo.com)") ++
      (0 to 1200).map(nr => s"${Random.alphanumeric.take(10).toString}@mymail.com)")
    val partitions = emails.map(email => HashPartitioner.getPartition(email, AllPartitions))

    val groupedPartitions = partitions.groupBy(partition => partition)
    // groupedPartitions.map(kv => s"${kv._1}=${kv._2.size}")
    // Sample distribution=List(2=1012, 1=1383, 0=1097, 3=1111)

    val partitionSizes = Seq(groupedPartitions("0").size, groupedPartitions("1").size, groupedPartitions("2").size,
      groupedPartitions("3").size)
    partitionSizes.foreach(partitionSize => {
      partitionSizes.foreach(comparedPartitionSize => {
        assert((partitionSize - comparedPartitionSize).abs <= 400, "The difference between 2 partitions can't be greater " +
          s"than 400 but was not: ${partitionSize} - ${comparedPartitionSize}")
      })
    })
  }

The distribution is almost perfect. Unfortunately, we have one partition that took much more writes than the others. The difference is not as big as for the product code partition key though and could be accepted - especially for a real-time pipeline and not the one limited to 3 e-mail domains.

Key-value stores are a good way to provide fast access to the data. However, this type of storage should be designed in a query-first manner. Sometimes this organization can have a negative impact on the even distribution of the data. As shown in the first section, not all values are good candidates for the primary keys. Fortunately, even though some of them are not well suited for that, we can still use some distribution patterns to improve partitioning. But do not take them for "sure" - everything depends on the organization of your data. You could see that even e-mail addresses, estimated as a partition keys with high cardinality, also led some partitions to be hotter than the others. So you should always test in play/fail mode, playing not only with the patterns but also with the number of partitions.