Buckets in Apache Spark SQL

Versions: Apache Spark 2.4.2

Partitioning is the most popular method to divide a dataset into smaller parts. It's important to know that it can be completed with another technique called bucketing.

The post focuses on buckets implementation in Apache Spark. The first part presents them generally and explains the benefits of bucketed data. Next part shows how buckets are implemented in Apache Spark SQL whereas the last one shows some of their limitations.

Definition

Bucketing is a kind of partitioning for partitions. To be more grammatically correct, we can consider them as a technique to decompose large datasets into smaller and, therefore, more manageable subsets. So they have nothing to do with buckets that you've maybe encounter on cloud storages like AWS S3 or GCP GCS.

Bucketing, which is also known as clustering, helps in 2 specific operations. I share here the examples coming from Hive but they will probably apply to other frameworks as well. The first one is a sampling operation. When you want to discover only a small subset of data, you can pick up one bucket and work on it. In Hive it's made with BUCKET x OUT OF y ON table_name operator.

The second operation where bucketing is helpful is JOIN. When 2 joined datasets are bucketed the same way, it helps to make a join directly between buckets instead of requiring a shuffle. Moreover, if bucketed data is sorted, sort-merge join will be much more efficient (discover this kind of join in Sort-merge join in Spark SQL).

In batch

Last time when I wrote the post about Apache Spark 2.4.0 features - bucket pruning, I focused mainly on the reading part. Here I would like to fulfill my duty and bring closer the part writing the data. To show this, I'm using the same example as in the previous post which is:

  "Spark" should "create buckets in partitions for orders Dataset" in {
    val tableName = s"orders${System.currentTimeMillis()}"
    val orders = Seq((1L, "user1"), (2L, "user2"), (3L, "user3"), (4L, "user1")).toDF("order_id", "user_id")

    orders.write.mode(SaveMode.Overwrite).bucketBy(2, "user_id").saveAsTable(tableName)

    val metadata = TestedSparkSession.sessionState.catalog.getTableMetadata(TableIdentifier(tableName))
    metadata.bucketSpec shouldBe defined
    metadata.bucketSpec.get.numBuckets shouldEqual 2
    metadata.bucketSpec.get.bucketColumnNames(0) shouldEqual "user_id"
  }

As you can see, buckets are created through bucketBy(numBuckets: Int, colName: String, colNames: String*) method. Internally, it does nothing but setting 2 properties, the number of buckets and the names for bucket columns.

Physical buckets creation happens at the writing stage, and more exactly, in FileFormatWriter's write method. Buckets configuration is passed in parameter. Apache Spark uses it to build bucket expressions with:

Pmod(new Murmur3Hash(expressions), Literal(numPartitions))

Pmod is a node representing modulo division function "exp1 mod exp2". Using it guarantees that all bucketed rows will be held in the same bucket.

Limitations

Hive made buckets popular and Apache Spark added a support for them. However, this support is not complete. If you try to use buckets with not supported sink, you will encounter the errors like:

org.apache.spark.sql.AnalysisException: 'save' does not support bucketBy right now;

For instance, you can't save bucketed data (version 2.4.0) for JSON sink nor for Parquet or any other file format other than Hive's table:

  "saving a bucket to JSON sink" should "fail" in {
    val orders = Seq((1L, "user1"), (2L, "user2"), (3L, "user3"), (4L, "user1")).toDF("order_id", "user_id")

    val exception = intercept[AnalysisException] {
      orders.write.mode(SaveMode.Overwrite).bucketBy(2, "user_id").json("/tmp/test_json_bucket/")
    }

    exception.getMessage shouldEqual "'save' does not support bucketBy right now;"
  }

You can find the assertions responsible for that error in:

  def jdbc(url: String, table: String, connectionProperties: Properties): Unit = {
    assertNotPartitioned("jdbc")
    assertNotBucketed("jdbc")
}
  private def insertInto(tableIdent: TableIdentifier): Unit = {
    assertNotBucketed("insertInto")
}
  def save(): Unit = {
    assertNotBucketed("save")
}

  private def assertNotBucketed(operation: String): Unit = {
    if (getBucketSpec.isDefined) {
      if (sortColumnNames.isEmpty) {
        throw new AnalysisException(s"'$operation' does not support bucketBy right now")
      } else {
        throw new AnalysisException(s"'$operation' does not support bucketBy and sortBy right now")
      }
    }
  }

A reason for that? After some digging, I found a PR giving more detail:


For `save`, we just write the data out into a directory users specified, and it's not a table, we don't keep its metadata. When we read it back, we have no idea if the data is bucketed or not, so it doesn't make sense to use `save` to write bucketed data, as we can't use the bucket information anyway. We can support it in the future, once we have features like bucket discovery, or we save bucket information in the data directory too, so that we don't need to rely on a metastore.

Good news, it can be supported in the future.

To wrap-up, a bucket is a technique to partition data inside the given partition. It can accelerate the execution of some operations like bucketed sampling or joins. On the other side, since it was made popular with Hive, Apache Spark supports it only as Hive tables. As you can see in the last section, it was impossible to save bucketed JSON or Parquet files.