Predicate pushdown in Spark SQL

Versions: Spark 2.1.0

The optimizer in Spark SQL helps to improve the performance of processing pipelines. One of its techniques is predicate pushdown.

The definition of the predicate pushdown is included in the first section of this post. The next part describes some implementation details. The last section shows the use and not-use of the predicate pushdown through 2 different data sources (RDBMS and JSON files).

Definition

The predicate pushdown is a logical optimization rule that consists on sending filtering operation directly to the data source. For instance, in the case of RDBMS, it's translated by executing "WHERE...." clause directly on the database level. This optimization helps to reduce the amount of loaded data and helps to use the query optimizations (e.g. RDBMS indexes) defined in the data source level.

The use of predicate pushdown (as of other optimizations) can be detected in logs (TRACE level enabled):

=== Applying Rule org.apache.spark.sql.catalyst.optimizer.PushDownPredicate ===

Telling that the predicate pushdown applies only for "WHERE..." clauses was a simplification. In fact, it can be used in joins. A join condition applied either on left or right side can be transformed to filtering predicate and pushed down to the data source, for instance the following clause "... FROM table1 t1 LEFT JOIN table t2 ON t1.join_field = 300" will first filter all rows from t1 having the join_field value equal to 300 and only after that it will make the join.

Implementation details

Let's analyze how the predicate pushdown is implemented in 2 data sources: RDBMS and Parquet. The pushdown predicate is a part of the list containing all optimizations that can be made by Spark SQL optimizer in org.apache.spark.sql.catalyst.optimizer.Optimizer. In this list it's represented by PushDownPredicate object. When the predicate pushdown can be applied, the apply method of its object is called in order to set pushdown predicate in the plan.

But not all predicates can be pushed down. The instruction below indicates if a given filter represented as a Catalyst expression can be sent to the data source. The comment is pretty meaningful for this snippet and if some of predicates is not used at data source level, it's may be the place to start the investigations:

// A map from original Catalyst expressions to corresponding translated data source filters.
// If a predicate is not in this map, it means it cannot be pushed down.
val translatedMap: Map[Expression, Filter] = predicates.flatMap { p =>
  translateFilter(p).map(f => p -> f)
}.toMap

I encountered a case when the apparently obvious filter was not applied for a decimal column in database. The reason for that was an extra casting operation for the column because of different types used in both sides of filtering clause. This situation is shown in the test "predicate pushdown should not be applied when the columns are of different types".

Above operation is called filters pruning and it aims to select filters executable at data source level and at Spark SQL Java generated files level. Once selected, pushed filters are passed to objects representing data sources. For the case of RDBMS they're sent to JDBCRelation#buildScan(requiredColumns: Array[String], filters: Array[Filter]) that directly takes care to convert them to SQL representation and use them in SELECT... query.

The execution logic is similar for Parquet files. The first step consists on converting Catalyst expression to data source filters in:

// files physical plan: org.apache.spark.sql.execution.FileSourceScanExec
private val pushedDownFilters = dataFilters.flatMap(DataSourceStrategy.translateFilter)

Later so created filters are passed to the data sources. In the case of Parquet it goes to the ParquetFileFormat#buildReaderWithPartitionValues(sparkSession: SparkSession, dataSchema: StructType,partitionSchema: StructType,requiredSchema: StructType, filters: Seq[Filter], options: Map[String, String], hadoopConf: Configuration) method. Inside this method, only if spark.sql.parquet.filterPushdown configuration is enabled (by default it is), Spark's predicates are transformed to Parquet predicates (org.apache.parquet.filter2.predicate.FilterPredicate). The Parquet filter is later set to the configuration entry under parquet.private.read.filter.predicate key as Base64-serialized String.

The use of filters pushed for Parquet files can be detected in the logs with this entry:

INFO Pushed Filters: IsNotNull(total_revenue),GreaterThan(total_revenue,1000) (org.apache.spark.sql.exe
cution.FileSourceScanExec:54)

But this information should be interpreted carefully because it can appear even for formats not supporting the predicate pushdown (e.g. JSON). For instance, the JsonFileFormat takes all filters as a parameter of JsonFileFormat#buildReader(sparkSession: SparkSession, dataSchema: StructType, partitionSchema: StructType, requiredSchema: StructType, filters: Seq[Filter], options: Map[String, String], hadoopConf: Configuration) but it never uses them to select matching data at data source level.

Predicate pushdown example

Through the learning tests below we'll see how the predicate pushdown and the join predicate pushdown are used. We'll also see how Spark SQL behaves when the filtering condition applies to a data source not supporting predicate pushdown (JSON):

private val sparkSession: SparkSession =
  SparkSession.builder().appName("Predicate pushdown test").master("local")
    .config("spark.sql.crossJoin.enabled", "true")
    .getOrCreate()

private val Options =  Map("url" -> InMemoryDatabase.DbConnection, "dbtable" -> "orders", "user" -> InMemoryDatabase.DbUser,
  "password" -> InMemoryDatabase.DbPassword, "driver" -> InMemoryDatabase.DbDriver)

private val ShopsParquetFile = new File("./structured_data.parquet")

private val OutputDir = "/tmp/spark-predicate_pushdown-test"

override def beforeAll(): Unit = {
  Directory(OutputDir).createDirectory(true, false)
  new PrintWriter(s"${OutputDir}/sample_order.json") { write("{\"id\": 1, \"amount\": 300}"); close }

  InMemoryDatabase.createTable("CREATE TABLE IF NOT EXISTS orders " +
    "(id INT(11) NOT NULL AUTO_INCREMENT PRIMARY KEY, shop_id INT(1) NOT NULL,  " +
    "customer VARCHAR(255) NOT NULL, amount DECIMAL(6, 2) NOT NULL)")

  case class Order(shopId: Int, customer: String, amount: Double) extends DataOperation {
    override def populatePreparedStatement(preparedStatement: PreparedStatement): Unit = {
      preparedStatement.setInt(1, shopId)
      preparedStatement.setString(2, customer)
      preparedStatement.setDouble(3, amount)
    }
  }

  val ordersToInsert = mutable.ListBuffer[Order]()
  ordersToInsert.append(Order(1, UUID.randomUUID().toString, 5))
  ordersToInsert.append(Order(2, UUID.randomUUID().toString, 15))
  ordersToInsert.append(Order(3, UUID.randomUUID().toString, 25))
  InMemoryDatabase.populateTable("INSERT INTO orders (shop_id, customer, amount) VALUES (?, ?, ?)",
    ordersToInsert)
}

override def afterAll(): Unit = {
  ShopsParquetFile.delete()
  InMemoryDatabase.cleanDatabase()
  Directory(OutputDir).deleteRecursively()

  sparkSession.stop()
}

"predicate pushdown" should "be used for relational database" in {
  val allOrders = sparkSession.read
    .format("jdbc")
    .options(Options)
    .load()

  val valuableOrdersQuery = allOrders
    .select("id", "amount")
    // using the BigDecimal is mandatory because the amount is a decimal
    // otherwise the generated filter will be composed of cast operation.
    // It makes that the filter won't be pushed down (cast is not "pushable")
    .where(allOrders("amount") > BigDecimal("10.0"))
  val valuableOrders = valuableOrdersQuery.collect()

  valuableOrders.size shouldEqual 2
  valuableOrdersQuery.queryExecution.toString() should include("== Physical Plan ==" +
    "\n*Scan JDBCRelation(orders) [numPartitions=1] [id#0,amount#3] PushedFilters: [*GreaterThan(AMOUNT,10.00)], " +
    "ReadSchema: struct<id:int,amount:decimal(6,2)>")
}

"predicate pushdown" should "not be applied when the columns are of different types" in {
  val allOrders = sparkSession.read
    .format("jdbc")
    .options(Options)
    .load()

  val valuableOrdersQuery = allOrders
    .select("id", "amount")
    .where(allOrders("amount") > 10)
  val valuableOrders = valuableOrdersQuery.collect()

  valuableOrders.size shouldEqual 2
  // Physical plan is:
  // == Physical Plan ==
  // *Filter (cast(AMOUNT#16 as decimal(12,2)) > 10.00)
  // +- *Scan JDBCRelation(orders) [numPartitions=1] [id#13,amount#16] ReadSchema: struct<id:int,amount:decimal(6,2)>
  valuableOrdersQuery.queryExecution.toString() should not include("PushedFilters: [*GreaterThan(AMOUNT,10.00)], " +
    "ReadSchema: struct<id:int,amount:decimal(6,2)>")
}

"predicate pushdown" should "be used when the filtering clause is used to make the join" in {
  import sparkSession.implicits._
  val shops = Seq(
    (1, "Shop_1"), (2, "Shop_2")
  ).toDF("id", "Name")
  val allOrders = sparkSession.read
    .format("jdbc")
    .options(Options)
    .load()

  val valuableOrdersQuery = allOrders.join(shops, allOrders("amount") > BigDecimal(10), "inner")

  // Please note that this query fails when the cross join is not
  // enabled explicitly through spark.sql.crossJoin.enabled
  // The physical plan is similar to (indexes after field names can change):
  // == Physical Plan ==
  // BroadcastNestedLoopJoin BuildRight, Inner
  // :- *Scan JDBCRelation(orders) [numPartitions=1] [ID#36,SHOP_ID#37,CUSTOMER#38,AMOUNT#39]
  // PushedFilters: [*GreaterThan(AMOUNT,10.00)], ReadSchema: struct<ID:int,SHOP_ID:int,CUSTOMER:string,AMOUNT:decimal(6,2)>
  //  +- BroadcastExchange IdentityBroadcastMode
  // +- LocalTableScan [id#31, Name#32]
  valuableOrdersQuery.queryExecution.toString() should include("== Physical Plan ==" +
    "\nBroadcastNestedLoopJoin BuildRight, Inner" +
    "\n:- *Scan JDBCRelation(orders) [numPartitions=1]")
  valuableOrdersQuery.queryExecution.toString() should include("PushedFilters: [*GreaterThan(AMOUNT,10.00)], " +
    "ReadSchema: struct<ID:int,SHOP_ID:int,CUSTOMER:string,AMOUNT:decimal(6,2)>" +
    "\n+- BroadcastExchange IdentityBroadcastMode")
}

"predicate pushdown" should "be applied on Parquet files" in {
  import sparkSession.implicits._
  val shops = Seq(
    (1, "Shop_1", 1000), (2, "Shop_2", 1100), (3, "Shop_3", 900)
  ).toDF("id", "name", "total_revenue")
  shops.write.mode(SaveMode.Overwrite).parquet(ShopsParquetFile.getAbsolutePath)

  val shopsFromDisk = sparkSession.read
    .format("parquet")
    .parquet(ShopsParquetFile.getAbsolutePath)

  val valuableShopsQuery = shopsFromDisk.where("total_revenue > 1000")

  valuableShopsQuery.collect().size shouldEqual 1
  // Expected physical plan is:
  // == Physical Plan ==
  // *Project [id#93, name#94, total_revenue#95]
  // +- *Filter (isnotnull(total_revenue#95) && (total_revenue#95 > 1000))
  // +- *FileScan parquet [id#93,name#94,total_revenue#95] Batched: true, Format: Parquet,
  // Location: InMemoryFileIndex[file:/home/bartosz/workspace/spark-scala/structured_data.parquet],
  // PartitionFilters: [], PushedFilters: [IsNotNull(total_revenue), GreaterThan(total_revenue,1000)],
  // ReadSchema: struct<id:int,name:string,total_revenue:int>
  valuableShopsQuery.queryExecution.toString() should include("PartitionFilters: [], " +
    "PushedFilters: [IsNotNull(total_revenue), GreaterThan(total_revenue,1000)], " +
    "ReadSchema: struct<id:int,name:string,total_revenue:int>")
}

"predicate pushdown" should "not be applied on JSON files" in {
  val allOrders = sparkSession.read
    .format("json")
    .load(OutputDir)

  val valuableOrdersQuery = allOrders.where("amount > 10")
  val valuableOrders = valuableOrdersQuery.count

  valuableOrders shouldEqual 1
  println(s"plan=${valuableOrdersQuery.queryExecution.toString().trim }")
  // Expected plan is:
  // == Physical Plan ==
  //  *Project [amount#8L, id#9L]
  // +- *Filter (isnotnull(amount#8L) && (amount#8L > 10))
  // +- *FileScan json [amount#8L,id#9L] Batched: false, Format: JSON,
  // Location: InMemoryFileIndex[file:/tmp/spark-predicate_pushdown-test], PartitionFilters: [],
  // PushedFilters: [IsNotNull(amount), GreaterThan(amount,10)], ReadSchema: struct<amount:bigint,id:bigint>
  // As you can see, it also has the pushed filters. But if you look at JsonFileFormat source you'll see
  // that these filter are never used.
  valuableOrdersQuery.queryExecution.toString() should include("PushedFilters: [IsNotNull(amount), GreaterThan(amount,10)],")
  valuableOrdersQuery.queryExecution.toString() should include(" +- *FileScan json [amount")
}

Predicate pushdown is a great optimization helping to reduce the amount of loaded data at the data source level. It can be applied, sometimes with some additional customization, to all data sources that allow to filter stored entries. Among them we can find RDBMS or Parquet files those internals were described in the second section. Their uses were shown in the last part through usual learning tests.


If you liked it, you should read:

📚 Newsletter Get new posts, recommended reading and other exclusive information every week. SPAM free - no 3rd party ads, only the information about waitingforcode!