What's new in Apache Spark 3.2.0 - Data Source V2

Versions: Apache Spark 3.2.0

Even though Data Source V2 is present in the API for a while, every release brings something new to it. This time too and we'll see what through this blog post!

Aggregate push down

The first important change concerns push downs. One of the most famous push downs in Apache Spark works on predicates and helps reduce the amount of data transferred from the source by applying the filtering conditions during the load. Now the filtering got extended by aggregations thanks to the work made by Huaxin Gao!

How does this new feature work? A new interface SupportsPushDownAggregates is available in the code and any of the data sources supporting the aggregates push down has to implement it. In the first iteration it's only supported by the JDBCScanBuilder. The transformed query will look like the aggQuery from the snippet:

case class JDBCScanBuilder
// ...
  override def pushAggregation(aggregation: Aggregation): Boolean = {

 // ...
    val compiledAgg = JDBCRDD.compileAggregates(aggregation.aggregateExpressions, dialect)
    if (compiledAgg.isEmpty) return false

    val groupByCols = aggregation.groupByColumns.map { col =>
      if (col.fieldNames.length != 1) return false
      dialect.quoteIdentifier(col.fieldNames.head)
    }
    val selectList = groupByCols ++ compiledAgg.get
    val groupByClause = if (groupByCols.isEmpty) {
      ""
    } else {
      "GROUP BY " + groupByCols.mkString(",")
    }

    val aggQuery = s"SELECT ${selectList.mkString(",")} FROM ${jdbcOptions.tableOrQuery} " +
      s"WHERE 1=0 $groupByClause"

Please notice the JDBC migration to Data Source V2 is still in progress as a part of SPARK-24907.

Function catalog

But the push down aggregates are not the single feature impacting Data Source V2. Another one is FunctionCatalog interface added by Chao Sun. Does it mean, no more UDFs? Not really, the FunctionCatalog is not intended to provide a new mechanism of writing custom functions. It addresses their exposition problem instead.

How does it work? The FunctionCatalog interface has these 2 reading methods:

public interface FunctionCatalog extends CatalogPlugin {
  Identifier[] listFunctions(String[] namespace) throws NoSuchNamespaceException;
  UnboundFunction loadFunction(Identifier ident) throws NoSuchFunctionException;
}

The idea is to implement a catalog and set it in Apache Spark configuration. Any unknown (=not native) function will be then first marked in the execution plan as UnresolvedFunction and later resolved by the analyzer by using the loadFunction method of the catalog. The feature relies on the catalog plugin component already available for tables.

The catalog supports 2 types of function.ScalarFunction works for at the row basis whereas AggregateFunction produces the result by applying the aggregation over multiple input rows. Below you can find a scalar function example:

class RandomAdder extends UnboundFunction {
  override def bind(inputType: StructType): BoundFunction = new RandomAdderBounded()

  override def description(): String = "adds a random number to an integer"

  override def name(): String = "random_adder"
}

class RandomAdderBounded extends ScalarFunction[Int] {
  override def inputTypes(): Array[DataType] = Array(DataTypes.IntegerType)

  override def resultType(): DataType = DataTypes.IntegerType

  override def isDeterministic = false

  override def isResultNullable = false

  override def name(): String = "random_adder"

  override def produceResult(input: InternalRow): Int = {
    input.getInt(0) + ThreadLocalRandom.current().nextInt(5000)
  }
}

Dynamic filtering

Another API-related change was made by Anton Okolnychyi to extend the Dynamic Partition Pruning (DPP).

The change adds a new interface called SupportsRuntimeFiltering that exposes these 2 methods:

public interface SupportsRuntimeFiltering extends Scan {
  NamedReference[] filterAttributes();
  void filter(Filter[] filters);
}

The main purpose of the interface is to enable dynamic filtering (pruning) for other operations than the joins already supported by the DPP. How? By filtering the input partitions for example:

case class BatchScanExec(
    output: Seq[AttributeReference],
    @transient scan: Scan,
    runtimeFilters: Seq[Expression]) extends DataSourceV2ScanExecBase {
// ...
  @transient private lazy val filteredPartitions: Seq[InputPartition] = {
    val dataSourceFilters = runtimeFilters.flatMap {
      case DynamicPruningExpression(e) => DataSourceStrategy.translateRuntimeFilter(e)
      case _ => None
    }

    if (dataSourceFilters.nonEmpty) {
      val originalPartitioning = outputPartitioning

      val filterableScan = scan.asInstanceOf[SupportsRuntimeFiltering]
      filterableScan.filter(dataSourceFilters.toArray)

      // call toBatch again to get filtered partitions
      val newPartitions = scan.toBatch.planInputPartitions()

// …
      newPartitions

The partitions filtering is implemented in the filter method and the relevant partitions are returned as the newPartitions.

Write enhancements

We can notice other API changes announcing some interesting features in the next releases concerning the writers. To start, Anton Okolnychyi evolved the writing API to support data distribution and ordering. Put another way, thanks to it, the data sources could enforce some constraints on the writer, like for example a globally sorted dataset. In the code you will find a new interface called RequiresDistributionAndOrdering that holds the write expectations for a data source:

public interface RequiresDistributionAndOrdering extends Write {
  Distribution requiredDistribution();
  default int requiredNumPartitions() { return 0; }
  SortOrder[] requiredOrdering();
}

Any distributable or/and sortable writer gets later analyzed by DistributionAndOrderingUtils where repartitioning or sorting nodes are added to the plan:

object DistributionAndOrderingUtils {

  def prepareQuery(write: Write, query: LogicalPlan, conf: SQLConf): LogicalPlan = write match {
    case write: RequiresDistributionAndOrdering =>
      val numPartitions = write.requiredNumPartitions()
      val distribution = write.requiredDistribution match {
        case d: OrderedDistribution => d.ordering.map(e => toCatalyst(e, query))
        case d: ClusteredDistribution => d.clustering.map(e => toCatalyst(e, query))
        case _: UnspecifiedDistribution => Array.empty[Expression]
      }
// ...
     val queryWithDistribution = if (distribution.nonEmpty) {
        val finalNumPartitions = if (numPartitions > 0) {
          numPartitions
        } else {
          conf.numShufflePartitions
        }


        RepartitionByExpression(distribution, query, finalNumPartitions)
      } else if (numPartitions > 0) { 
// } else { }

      val ordering = write.requiredOrdering.toSeq
        .map(e => toCatalyst(e, query))
        .asInstanceOf[Seq[SortOrder]]

      val queryWithDistributionAndOrdering = if (ordering.nonEmpty) {
        Sort(ordering, global = false, queryWithDistribution)
      } else {
        queryWithDistribution
      }

      queryWithDistributionAndOrdering

In the RequiresDistributionAndOrdering you can also notice the requiredNumPartitions. The feature was added by Jungtaek Lim to handle static number of partitions required by a data source. Priori that change, the repartitioning expression was always taking the number of shuffle partitions from the config.

To close this writing part, there is an important bug fix made by Wenchen Fan on the MERGE command using INSERT * and UPDATE *. Previously, the operations like:

MERGE INTO nmsp.table1
USING nmsp.table2
ON 1 = 1
WHEN NOT MATCHED THEN INSERT *

MERGE INTO nmsp.table2
USING nmsp.table2
ON 1 = 1
WHEN MATCHED THEN UPDATE SET *

were executed by Apache Spark per column positions. The fix replaces this behavior by per column name resolution.

Others

Besides the work presented in the above categories, Apache Spark 3.2.0 got some metrics enhancements with the metrics for the writing and the scan operations implemented by Liang-Chi Hsieh.

There is also a new LocalScan data source added by Gengliang Wang. It's a special scan type that will happen only on the driver. It can be a good choice to implement data sources of small size.

A lot of the changes presented here are mainly API-related. However, the point of the interfaces is that they're there to be implemented and hopefully in the next release we'll see some first examples of the features presented here!