Predicate pushdown, why it doesn't work every time?

Versions: Apache Spark 3.3.0

Pushdowns in Apache Spark are great to delegate some operations to the data sources. It's a great way to reduce the data volume to be processed in the job. However, there is one important gotcha. Watch out the definition of your predicate because from time to time, even though the pushdown predicate is supported by the data source, the predicate can still be executed by the Apache Spark job!

New ebook 🔥

Learn 84 ways to solve common data engineering problems with cloud services.

👉 I want my Early Access edition

To understand why, let's take this simple use case. I'm creating here a dataset composed of 3 letters:

case class Letter(lower: String, upper: String, creationDate: Timestamp)

    val inputData = Seq(Letter("a", "A", new Timestamp(1L)),
      Letter("b", "B", new Timestamp(1651356000000L)),
      Letter("c", "C", new Timestamp(100))).toDF

    inputData.write.mode(SaveMode.Overwrite).parquet(OutputDir)

And in my query I want to get all the letters created in 2022, so to get only the "b". Bad luck, you've just started with Apache Spark and think that to compare a date like '2022-01-01', the filter should cast the creationDate field to a string:

    val queryWithDateTimeFilter = sparkSession.read.parquet(OutputDir)
      .filter("CAST(creationDate AS STRING) >= '2022-01-01'")

The query returns correct results but if you analyze the execution plan, you'll see that only an "is not null" filter was pushed down to Apache Parquet:

== Physical Plan ==
*(1) Filter (isnotnull(creationDate#2) AND (cast(creationDate#2 as string) >= 2022-01-01))
+- *(1) ColumnarToRow
   +- FileScan parquet [lower#0,upper#1,creationDate#2] Batched: true, DataFilters: [isnotnull(creationDate#2), (cast(creationDate#2 as string) >= 2022-01-01)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/tmp/spark-predicate-pushdown], PartitionFilters: [], PushedFilters: [IsNotNull(creationDate)], ReadSchema: struct<lower:string,upper:string,creationDate:timestamp>

Why? To understand that, let's analyze the planning steps.

Planning steps in DataSource V2 API

The logical rule responsible for the predicate pushdown is the V2ScanRelationPushDown class. It's responsible for more than that because it also pushes down the aggregates, limit statements, and projections in column pruning. When it comes to the predicates, it defines the pruning logic in the pushDownFilters(plan: LogicalPlan) method that transforms Filter nodes of the LogicalPlan into pushed filters.

The transformation happens in another method, the pushFilters(scanBuilder: ScanBuilder, filters: Seq[Expression]) of the PushDownUtils. The function handles the following scenarios where the ScanBuilder can implement SupportsPushDownFilters or SupportsPushDownV2Filters interface, or not. For Apache Parquet, the rule goes to the last pattern matching case:

object PushDownUtils extends PredicateHelper { 
  def pushFilters(scanBuilder: ScanBuilder, filters: Seq[Expression])
      : (Either[Seq[sources.Filter], Seq[Predicate]], Seq[Expression]) = {
    scanBuilder match {
// ...
      case f: FileScanBuilder =>
        val postScanFilters = f.pushFilters(filters)
        (Right(f.pushedFilters), postScanFilters)

The pushFilters method iterates all filters and tries to translate them into a data source filter expression. To do that, it calls DataSourceStrategy#translateLeafNodeFilter that uses PushableColumnBase extractor to get the data source column where it should push down the predicate. Here too, Spark uses pattern matching which is valid for simple attributes and nested fields:

abstract class PushableColumnBase {
  val nestedPredicatePushdownEnabled: Boolean

  private def extractNestedCol(e: Expression): Option[Seq[String]] = e match {
    case a: Attribute => Some(Seq(a.name))
    case s: GetStructField =>
      extractNestedCol(s.child).map(_ :+ s.childSchema(s.ordinal).name)
    case _ => None
  }

Because we're casting the column, it's neither an attribute or a structure. Therefore, the pattern matching goes to the default case, meaning that there is no data source column to apply the predicate on. Similar mechanism works for the two interface-based scenarios introduced before.

Need a more reactive demo? Check the video below out:

So one thing to remember. Although the predicate pushdown is there for many sources and scenarios [including nested columns], it doesn't mean it'll apply every time. As shown in the blog post, if the predicate expression doesn't match the data source columns, it'll leave the filter execution on Apache Spark side.

If you liked it, you should read:

The comments are moderated. I publish them when I answer, so don't worry if you don't see yours immediately :)

📚 Newsletter Get new posts, recommended reading and other exclusive information every week. SPAM free - no 3rd party ads, only the information about waitingforcode!