Wildcard path and partitions

Versions: Apache Spark 3.3.0

Let's suppose you store the partitioned data under the /data/mydir location. What will be the difference if you read this directory with Apache Spark as /data/mydir/ and /data/mydir/* ? You should find the answer to the question just below.

If you want to know the answer without the technical details, below you can find the output generated for the same dataset partitioned by a partition_value column. The first result is for the path without the wildcard and as you can see, it does include the partitioning column:

|label  |modulo|nr |partition_value|
|label=1|false |1  |a              |
|label=4|false |4  |a              |
|label=4|false |4  |b              |
|label=2|false |2  |b              |
|label=1|false |1  |b              |
|label=3|true  |3  |b              |
|label=0|true  |0  |b              |

It's not the case for the path using the wildcard:

|label  |modulo|nr |
|label=1|false |1  |
|label=4|false |4  |
|label=2|false |2  |
|label=0|true  |0  |
|label=0|true  |0  |
|label=3|true  |3  |

If you want to know the "why", please keep reading.


Both paths share the same reading logic shown in the schema below:

All happens inside the PartitioningUtils#parsePartitions method:

  private[datasources] def parsePartitions(
      paths: Seq[Path], typeInference: Boolean,
      basePaths: Set[Path],
      userSpecifiedSchema: Option[StructType], caseSensitive: Boolean,
      validatePartitionColumns: Boolean, timeZoneId: String)

I highlighted the different attributes for both types of paths. For the wildcard-based scenario, the basePath set contains all the paths, including the ones resolved after analyzing the glob expression. On the other hand, it only contains the top level non-wildcard path:

// wildcard scenario

parsePartitions(paths = ..., typeInference = ..., basePaths = [file:/tmp/spark-wildcard-reading/input/subdir/partition_value=a, file:/tmp/spark-wildcard-reading/input/subdir/partition_value=b, file:/tmp/spark-wildcard-reading/input/subdir/partition_value=c], ...)

// non wildcard scenario
parsePartitions(paths = ..., typeInference = ..., basePaths = [file:/tmp/spark-wildcard-reading/input/subdir], ...)

Later on, the parsing method analyzes the path and extracts the partitioning columns from the partitioning pattern. However, it continues until reaching the point where the next retrieved subdirectory is present in the basePath:

object PartitioningUtils extends SQLConfHelper{
// ...
  private[datasources] def parsePartition(
      path: Path,
      typeInference: Boolean,
      basePaths: Set[Path],
// ...
    val columns = ArrayBuffer.empty[(String, TypedPartValue)]
    // Old Hadoop versions don't have `Path.isRoot`
    var finished = path.getParent == null
    // currentPath is the current path that we will use to parse partition column value.
    var currentPath: Path = path
    while (!finished) {
      // Sometimes (e.g., when speculative task is enabled), temporary directories may be left
      // uncleaned. Here we simply ignore them.
      if (currentPath.getName.toLowerCase(Locale.ROOT) == "_temporary") {
        return (None, None)

      if (basePaths.contains(currentPath)) {
        // If the currentPath is one of base paths. We should stop.
        finished = true
// ...

The highlighted condition is always false for the wildcard-based query. Remember, the basePaths set contains the parth with partition values, so the method doesn't have a chance to infer the partitioning columns. It's not the case for the non-wildcard query where the basePaths only contains the top level directory.

Even though I got the same number of rows in both scenarios, in the non-wildcard one the partition column got included in the schema. Due to the base path resolution method, it was not the case for the wildcard-based query.

If you liked it, you should read:

📚 Newsletter Get new posts, recommended reading and other exclusive information every week. SPAM free - no 3rd party ads, only the information about waitingforcode!