Monotonically increasing id function in Apache Spark SQL

on waitingforcode.com

Monotonically increasing id function in Apache Spark SQL

You want to become a data engineer and don't know where to start? I was like you 4 years ago when I started to learn the data. From that experience I prepared a 12-weeks course that will help you to become a data engineer. Join the class today! Join the class!
Some time ago I was thinking whether Apache Spark provides the support for auto-incremented values, so hard to implement in distributed environments After some research, I almost found what I was looking for - monotonically increasing id. In this post, I will try to explain why "almost".

In the first part of this post I will explain the mystery about the almost. In the next one, I will focus on the implementation details. In the last section, I will focus on some particular behaviors of the described function.

Almost auto-increment function in Apache Spark

You may wonder why I insist on the "almost" word in these few lines of this post. It's because the monotonically increasing id is not the same as the auto-increment columns that you can find in most relational databases. However, it behaves very similarly because it's able to generate the consecutive numbers but only within a partition.

The feature is not new because it was added in 2015 as a response to the users needs to generate a unique ID for each row. The function generates the integers monotonically increasing and consecutive in a partition. These ids are also distinct within a Dataset.

The main purpose of this feature was to provide a method to uniquely identify the rows. However, exposing such id as a primary key in the data sinks may be problematic. I will try to show it in the last section. On the other hand, one of the legitimate use cases is the generation of ids in GraphX vertex RDDs. As you already know - if not, everything is explained in the Vertex representation in Apache Spark GraphX post - each vertex must have assigned a unique ID. If your original dataset doesn't have this property, you could use the monotonically increasing id to generate one at the moment of loading the data (aside from many other methods).

Implementation details

Before we focus on the implementation details, let's recall the definition of the monotonicity. In the context of Mathematics, we say that a function is monotonic when it increases or decreases. In the context of the Apache Spark SQL, the monotonic id is only increasing, as well locally inside a partition, as well globally. To compute these increasing values, the implementation uses 2 numbers: the first 31 bits of the partition id and the row number within a partition:

override protected def initializeInternal(partitionIndex: Int): Unit = {
  count = 0L
  partitionMask = partitionIndex.toLong << 33
}
override protected def evalInternal(input: InternalRow): Long = {
  val currentCount = count
  count += 1
  partitionMask + currentCount
}

Keep it local

As I wrote, exposing the monotonically increasing ids is not a good idea. The implementation explains why but to complete the theoretical definition, I will add some code proof. Let's imagine that you run the following code in 2 different periods of time and that every time you insert the rows into a key-value data store. As shown in the test, the generated ids will be duplicated:

"the same IDs" should "be generated for different data" in {
  val datasetPeriod1 = (1 to 5).toDF("nr").repartition(5)
  datasetPeriod1.withColumn("primary_key", functions.monotonically_increasing_id())
    .foreach(row => {
      InMemoryKeyedStore.addValue(row.getAs[Long]("primary_key").toString, row.getAs[Int]("nr").toString)
    })
  InMemoryKeyedStore.allValues should have size 5
  InMemoryKeyedStore.allValues.foreach {
    case (key, data) => data should have size 1
  }

  val datasetPeriod2 = (6 to 10).toDF("nr").repartition(5)
  datasetPeriod2.withColumn("primary_key", functions.monotonically_increasing_id())
    .foreach(row => {
      InMemoryKeyedStore.addValue(row.getAs[Long]("primary_key").toString, row.getAs[Int]("nr").toString)
    })
  InMemoryKeyedStore.allValues should have size 5
  InMemoryKeyedStore.getValues("8589934594") should contain allOf("4", "9")
  InMemoryKeyedStore.getValues("34359738368") should contain allOf("3", "8")
  InMemoryKeyedStore.getValues("8589934593") should contain allOf("2", "7")
  InMemoryKeyedStore.getValues("0") should contain allOf("5", "10")
  InMemoryKeyedStore.getValues("8589934592") should  contain allOf("1", "6")
}

As a workaround, you could create a composite key composed of the generation time milliseconds and the monotonically increasing id:

"the different IDs" should "be generated for different data thanks to the composite key from timestamp" in {
  val datasetPeriod1 = (1 to 5).toDF("nr").repartition(5)
  val generationTimePeriod1 = System.currentTimeMillis()
  sparkSession.sparkContext.broadcast(generationTimePeriod1)
  datasetPeriod1.withColumn("primary_key", functions.monotonically_increasing_id())
    .foreach(row => {
      val compositeKey = s"${generationTimePeriod1}_${row.getAs[Long]("primary_key")}"
      InMemoryKeyedStore.addValue(compositeKey, row.getAs[Int]("nr").toString)
    })
  InMemoryKeyedStore.allValues should have size 5
  InMemoryKeyedStore.allValues.foreach {
    case (key, data) => data should have size 1
  }

  val generationTimePeriod2 = System.currentTimeMillis()
  sparkSession.sparkContext.broadcast(generationTimePeriod2)
  val datasetPeriod2 = (6 to 10).toDF("nr").repartition(5)
  datasetPeriod2.withColumn("primary_key", functions.monotonically_increasing_id())
    .foreach(row => {
      val compositeKey = s"${generationTimePeriod2}_${row.getAs[Long]("primary_key")}"
      InMemoryKeyedStore.addValue(compositeKey, row.getAs[Int]("nr").toString)
    })
  InMemoryKeyedStore.allValues should have size 10
  InMemoryKeyedStore.allValues.foreach {
    case (key, data) => data should have size 1
  }
}

The problem with the workaround is that it breaks the idempotence. The data is driven by the generation time and it would lead us into troubles in the case of the data regeneration. Without the prefix, of course, if we suppose that the regenerated keys are the same and only the values change, we could simply rerun the pipeline to see the regeneration happens. On the other side, with the suffix, we would need first to remove previously generated keys.

But it's not true if instead of a timestamp we use a manually managed version prefix:

"the different IDs" should "be generated for different data thanks to the composite key from version" in {
  val datasetPeriod1 = (1 to 5).toDF("nr").repartition(5)
  val version1 = "version1"
  sparkSession.sparkContext.broadcast(version1)
  datasetPeriod1.withColumn("primary_key", functions.monotonically_increasing_id())
    .foreach(row => {
      val compositeKey = s"${version1}_${row.getAs[Long]("primary_key")}"
      InMemoryKeyedStore.addValue(compositeKey, row.getAs[Int]("nr").toString)
    })

  InMemoryKeyedStore.allValues should have size 5
  InMemoryKeyedStore.allValues.foreach {
    case (key, data) => data should have size 1
  }

  val version2 = "version2"
  sparkSession.sparkContext.broadcast(version2)
  val datasetPeriod2 = (6 to 10).toDF("nr").repartition(5)
  datasetPeriod2.withColumn("primary_key", functions.monotonically_increasing_id())
    .foreach(row => {
      val compositeKey = s"${version2}_${row.getAs[Long]("primary_key")}"
      InMemoryKeyedStore.addValue(compositeKey, row.getAs[Int]("nr").toString)
    })

  InMemoryKeyedStore.allValues should have size 10
  InMemoryKeyedStore.allValues.foreach {
    case (key, data) => data should have size 1
  }
}

Here, still only if the keys are the same, we can simply execute the processing for the version2 and have the data regenerated. Of course, both will work only when the partitions have the same size during the first and the second generation which sometimes can be hard to guarantee.

Having a possibility to automatically generate an auto-incremented id is a useful feature. Even though it applies only to the partition level, it avoids a lot of manual writing if we need some unique id only for the purpose of current processing. However, it was not designed to be used as the primary keys in the data stores because of the risk of collisions between consecutive runs. Instead, it's better to use a composite key that even may be built from the monotonically increasing id and the generation time. But doing so, you should always take the data reprocessing into account because some combinations won't work very well for it.

Share on:

Share, like or comment this post on Twitter: