Every data processing pipeline can have a source of contention. One of them can be the data localization. When all entries are read from single place by dozens or hundreds of workers, the data source can respond slower. One of solutions to this problem can be the partitioning.
A virtual conference at the intersection of Data and AI. This is not a conference for the hype. Its real users talking about real experiences.
- 40+ speakers with the likes of Hannes from Duck DB, Sol Rashidi, Joe Reis, Sadie St. Lawrence, Ryan Wolf from nvidia, Rebecca from lidl
- 12th September 2024
- Three simultaneous tracks
- Panels, Lighting Talks, Keynotes, Booth crawls, Roundtables and Entertainment.
- Topics include (ingestion, finops for data, data for inference (feature platforms), data for ML observability
- 100% virtual and 100% free
👉 Register here
This post explains the concept of partitioning in data-driven applications. The first part defines the partitioning and describes its 3 possible schemes. The second part lists available partitioning algorithms, showing their sample implementations in Scala.
Partitioning definition
The partitioning divides a dataset into smaller pieces called partitions. Each partition is identified by a unique name and any operation involving it should be transparent to the final user, i.e. the data engine should handle the query to the appropriate partition (even if user could also be able to interact with one explicit partition).
Thanks to the partitioning any big dataset can be divided and distributed in different places (e.g. nodes). This distribution brings the possibility of processing parallelisation. To take a simple example, one worker can deal with the data accumulated in 2016 while the second one can work on the data from 2017. However, to achieve a good optimization gain, the partitions should be equilibrated, i.e. the subdivisions of data stored on each of them should have similar size.
Three popular partitioning schemes exist:
- horizontal (aka sharding) - divides a dataset and places the subsets in different places. All the subsets have the same schema but often their division is different. An example of horizontal partitioning can be a news website that groups the news per date.
- vertical - this kind of partitioning doesn't divide whole entries but their parts. In consequence, the properties of the same information can be stored in different partitions. The division can be made on different criteria and one of them could be the frequency of querying, i.e. the most read properties can placed in totally different place the less frequently read. For instance, let's suppose that we store the information about cars was a tuple of (id, color, price, year, description), and the description is almost never read. According to the vertical partition we could move (id, color, price, year) values in one partition and leave (id, description) in the other.
- functional - data's division is based on a bounded context coming from business logic. For instance, in a e-commerce store the functional partitioning could be separated tables containing customers and orders.
Partitioning algorithms
Big Data processing frameworks (e.g. Apache Spark) and NoSQL engines (e.g. Cassandra, Kafka) use the concept of partitioning to the same purposes as described in previous section. Among all these solutions we can distinguish some common partitioning approaches:
-
natural key partitioning - one or several properties of given object is (are) used to create a partition key. This key is later used by the partitioner to find the partition. It's quite simple but can appear to be inefficient since the partitioner must somehow know the mapping between partitions and keys. An simple implementation of that partitioner could look like:
class NaturalKeyPartitioningTest extends FunSuite with BeforeAndAfter with Matchers { test("should map 'test' key to correct partition") { val partitioner = new NaturalKeyPartitioner(Map("t" -> 1, "te" -> 2, "tes" -> 3, "test" -> 4)) val partition = partitioner.getPartition("test") partition shouldEqual 4 } } class NaturalKeyPartitioner(keysToPartitions: Map[String, Int]) { def getPartition(key: String): Int = keysToPartitions(key) }
-
hash-based partitioning - here the natural key is hashed and, very often, divided by the number of all partitionins. The remainder is used to define the final partition that will hold given data. It's quite simple to implement. However it has some important drawbacks. The most important is about its static character. The hashing modulo partitioner works well when the number of target partitions doesn't change. In the other case, for instance when the quantity of partitions is increased, all already partitioned data will have to be repartitioned.
The other drawback is the choice of hashed value. For instance, if we store the data of a website devoted to 20-22 y.o. people, the hash computed on age value won't guarantee even distribution. Instead we'd opt for a composite key guaranteeing better distribution.
A simple implementation could be:class HashPartitionerTest extends FunSuite with BeforeAndAfter with Matchers { test("should show that changing the number of partitions invalidates the previous partitioning") { val partitioner3Partitions = new HashPartitioner(3) val partitioner5Partitions = new HashPartitioner(6) val hash = 10 val partitionPartitioner3 = partitioner3Partitions.getPartition(hash) val partitionPartitioner5 = partitioner5Partitions.getPartition(hash) partitionPartitioner3 should not equal partitionPartitioner5 partitionPartitioner3 shouldEqual 1 partitionPartitioner5 shouldEqual 4 } test("should show even distribution among auto-incremented hashes") { val partitioner5Partitions = new HashPartitioner(5) val partitions = (1 to 5).map(hash => partitioner5Partitions.getPartition(hash)) partitions should contain inOrderOnly(1, 2, 3, 4, 0) } } class HashPartitioner(partitionsNr: Int) { def getPartition(hash: Int): Int = hash % partitionsNr }
An improved version of hash-based partitioning is consistent hashing that will be covered in one of next posts. -
range partitioning - this partitioner places the data to the partitions storing continuous slices of data (ranges). For instance, we'd have 5 partitions, each of them storing values of 20 next users: [1-20, 21-40, 41-60, 61-80, 81-100]. The ranges can be defined by a natural key or hashing function. A dummy range partitioner implementation could look like:
class RangePartitionerTest extends FunSuite with BeforeAndAfter with Matchers { test("should find the partition for a value") { val rangePartitioner = new RangePartitioner(Seq((1 to 20), (21 to 40))) val partition = rangePartitioner.getPartition(15) partition shouldEqual 1 } } class RangePartitioner(ranges: Seq[Range]) { def getPartition(key: Int): Int = { var partitionNumber: Option[Int] = None for (partition <- 0 until ranges.size if partitionNumber.isEmpty) { val range = ranges(partition) if (range.contains(key)) { partitionNumber = Some(partition) } } partitionNumber.get } }
-
round-robin partitioning - each message is assigned to the next available partitions, guaranteeing even distribution of messages. However, with this distribution the client doesn't know where a particular message can be located. It's the reason why this method is adapted for streaming brokers as Apache Kafka but is less suited for NoSQL databases as Cassandra or DynamoDB. Its simple implementation could be:
class RoundRobinPartitioningTest extends FunSuite with BeforeAndAfter with Matchers { test("should assign partitions in round robin fashion") { val roundRobinPartitioner = new RoundRobinPartitioner(3) val partitionRound1 = roundRobinPartitioner.getPartition val partitionRound2 = roundRobinPartitioner.getPartition val partitionRound3 = roundRobinPartitioner.getPartition val partitionRound4 = roundRobinPartitioner.getPartition val partitionRound5 = roundRobinPartitioner.getPartition val partitionRound6 = roundRobinPartitioner.getPartition partitionRound1 shouldEqual partitionRound4 partitionRound1 shouldEqual 1 partitionRound2 shouldEqual partitionRound5 partitionRound2 shouldEqual 2 partitionRound3 shouldEqual partitionRound6 partitionRound3 shouldEqual 0 } } class RoundRobinPartitioner(allPartitions: Int) { private val NextId = new AtomicInteger(1) def getPartition(): Int = { val currentId = NextId.getAndAdd(1) currentId % allPartitions } }
As proved in this post, the partitioning can have a beneficial influence on the data processing. It can speed up querying and writing by distributing the load, without any complexification of client code. However, in order to work efficiently, the data must be distributed evenly. And as it was shown in the second section describing some basic partitioning algorithms, it's not always easy.