Pushdowns in Apache Spark are great to delegate some operations to the data sources. It's a great way to reduce the data volume to be processed in the job. However, there is one important gotcha. Watch out the definition of your predicate because from time to time, even though the pushdown predicate is supported by the data source, the predicate can still be executed by the Apache Spark job!
A virtual conference at the intersection of Data and AI. This is not a conference for the hype. Its real users talking about real experiences.
- 40+ speakers with the likes of Hannes from Duck DB, Sol Rashidi, Joe Reis, Sadie St. Lawrence, Ryan Wolf from nvidia, Rebecca from lidl
- 12th September 2024
- Three simultaneous tracks
- Panels, Lighting Talks, Keynotes, Booth crawls, Roundtables and Entertainment.
- Topics include (ingestion, finops for data, data for inference (feature platforms), data for ML observability
- 100% virtual and 100% free
👉 Register here
To understand why, let's take this simple use case. I'm creating here a dataset composed of 3 letters:
case class Letter(lower: String, upper: String, creationDate: Timestamp) val inputData = Seq(Letter("a", "A", new Timestamp(1L)), Letter("b", "B", new Timestamp(1651356000000L)), Letter("c", "C", new Timestamp(100))).toDF inputData.write.mode(SaveMode.Overwrite).parquet(OutputDir)
And in my query I want to get all the letters created in 2022, so to get only the "b". Bad luck, you've just started with Apache Spark and think that to compare a date like '2022-01-01', the filter should cast the creationDate field to a string:
val queryWithDateTimeFilter = sparkSession.read.parquet(OutputDir) .filter("CAST(creationDate AS STRING) >= '2022-01-01'")
The query returns correct results but if you analyze the execution plan, you'll see that only an "is not null" filter was pushed down to Apache Parquet:
== Physical Plan == *(1) Filter (isnotnull(creationDate#2) AND (cast(creationDate#2 as string) >= 2022-01-01)) +- *(1) ColumnarToRow +- FileScan parquet [lower#0,upper#1,creationDate#2] Batched: true, DataFilters: [isnotnull(creationDate#2), (cast(creationDate#2 as string) >= 2022-01-01)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/tmp/spark-predicate-pushdown], PartitionFilters: [], PushedFilters: [IsNotNull(creationDate)], ReadSchema: struct<lower:string,upper:string,creationDate:timestamp>
Why? To understand that, let's analyze the planning steps.
Planning steps in DataSource V2 API
The logical rule responsible for the predicate pushdown is the V2ScanRelationPushDown class. It's responsible for more than that because it also pushes down the aggregates, limit statements, and projections in column pruning. When it comes to the predicates, it defines the pruning logic in the pushDownFilters(plan: LogicalPlan) method that transforms Filter nodes of the LogicalPlan into pushed filters.
The transformation happens in another method, the pushFilters(scanBuilder: ScanBuilder, filters: Seq[Expression]) of the PushDownUtils. The function handles the following scenarios where the ScanBuilder can implement SupportsPushDownFilters or SupportsPushDownV2Filters interface, or not. For Apache Parquet, the rule goes to the last pattern matching case:
object PushDownUtils extends PredicateHelper { def pushFilters(scanBuilder: ScanBuilder, filters: Seq[Expression]) : (Either[Seq[sources.Filter], Seq[Predicate]], Seq[Expression]) = { scanBuilder match { // ... case f: FileScanBuilder => val postScanFilters = f.pushFilters(filters) (Right(f.pushedFilters), postScanFilters)
The pushFilters method iterates all filters and tries to translate them into a data source filter expression. To do that, it calls DataSourceStrategy#translateLeafNodeFilter that uses PushableColumnBase extractor to get the data source column where it should push down the predicate. Here too, Spark uses pattern matching which is valid for simple attributes and nested fields:
abstract class PushableColumnBase { val nestedPredicatePushdownEnabled: Boolean private def extractNestedCol(e: Expression): Option[Seq[String]] = e match { case a: Attribute => Some(Seq(a.name)) case s: GetStructField => extractNestedCol(s.child).map(_ :+ s.childSchema(s.ordinal).name) case _ => None }
Because we're casting the column, it's neither an attribute or a structure. Therefore, the pattern matching goes to the default case, meaning that there is no data source column to apply the predicate on. Similar mechanism works for the two interface-based scenarios introduced before.
Need a more reactive demo? Check the video below out:
So one thing to remember. Although the predicate pushdown is there for many sources and scenarios [including nested columns], it doesn't mean it'll apply every time. As shown in the blog post, if the predicate expression doesn't match the data source columns, it'll leave the filter execution on Apache Spark side.