What's new in Apache Spark 3.3.0 - Data Source V2

Versions: Apache Spark 3.3.0

After a break for the Data+AI Summit retrospective, it's time to return to Apache Spark 3.3.0 and see what changed for the DataSource V2 API.

Data Engineering Design Patterns

Looking for a book that defines and solves most common data engineering problems? I'm currently writing one on that topic and the first chapters are already available in πŸ‘‰ Early Release on the O'Reilly platform

I also help solve your data engineering problems πŸ‘‰ contact@waitingforcode.com πŸ“©

Index support

In the first feature, Huaxin Gao added a new interface to support the index management. The interface is called SupportsIndex defines the following methods:

public interface SupportsIndex extends Table {
void createIndex(String indexName,
    NamedReference[] columns,
    Map<NamedReference, Map<String, String>> columnsProperties,
    Map<String, String> properties) throws IndexAlreadyExistsException;

void dropIndex(String indexName) throws NoSuchIndexException;
    boolean indexExists(String indexName);
TableIndex[] listIndexes();
}

The addition of this interface also brings 2 new SQL statements, CREATE INDEX and DROP INDEX, backed respectively by the logical and physical nodes called, CreateIndex/CreateIndexExec and DropIndex/DropIndexExec.

Although the SupportsIndex comes from an Apache Iceberg-related need, it's also implemented natively in Apache Spark JDBCTable data source. And the route is open to add it for any other indexable data source.

Predicate pushdown support

The second important feature that might not be visible to you as an end-user is the pushdown framework refactoring by Jiaan Geng. The improvement process is still ongoing, but its major part is already in Apache Spark 3.3.0.

Why did this change even happen? Jiaan Geng identified multiple drawbacks in the previous pushdown implementation that can be summarized in 2 major categories:

Apache Spark manages pushdowns in a logical rule called V2ScanRelationPushDown. It applies to sample, filter, aggregation, limit and scan (data reading) operations. The optimization iteratively optimizes the initial logical plan by adding push downs, starting by transforming the DataSourceV2Relation into a ScanBuilderHolder node:

case class ScanBuilderHolder(output: Seq[AttributeReference], relation: DataSourceV2Relation, builder: ScanBuilder) extends LeafNode {
  var pushedLimit: Option[Int] = None
  var sortOrders: Seq[V2SortOrder] = Seq.empty[V2SortOrder]
  var pushedSample: Option[TableSampleInfo] = None
  var pushedPredicates: Seq[Predicate] = Seq.empty[Predicate]
}

After this initial conversion, the optimization rule updates the ScanBuilderHolder variables. For example, it might add a pushed down predicates:

private def pushDownFilters(plan: LogicalPlan) = plan.transform {
    case Filter(condition, sHolder: ScanBuilderHolder) =>
// ...
      val (pushedFilters, postScanFiltersWithoutSubquery) = PushDownUtils.pushFilters(
        sHolder.builder, normalizedFiltersWithoutSubquery)
      val pushedFiltersStr = if (pushedFilters.isLeft) {
        pushedFilters.left.get.mkString(", ")
      } else {
        sHolder.pushedPredicates = pushedFilters.right.get
        pushedFilters.right.get.mkString(", ")
      }

Similar rules exist for other pushable components and their detection relies on the SupportsPushDown* interfaces. Below you can find an example of JDBC reader:

case class JDBCScanBuilder(
    session: SparkSession,
    schema: StructType,
    jdbcOptions: JDBCOptions)
  extends ScanBuilder
    with SupportsPushDownV2Filters
    with SupportsPushDownRequiredColumns
    with SupportsPushDownAggregates
    with SupportsPushDownLimit
    with SupportsPushDownTableSample
    with SupportsPushDownTopN
    with Logging {

The pushdown changes added in Apache Spark 3.3.0 target mainly the JDBC data source. They consist of getting the JDBC dialect and calling V2ExpressionSQLBuilder to convert Apache Spark Expression into the SQL expression which eventually contains the pushed down element. Among the concrete changes added in Apache Spark 3.3.0 you'll find:

Row-level operations

To finish this blog post, let's introduce the row-level operations support added by Anton Okolnychyi! The SPIP document details the planned work and identifies 2 different types of the row-level operations that should be supported in Apache Spark:

Additionally, the sources may need different input semantics related to:

To support these in-place operations, Apache Spark API has a new SupportsRowLevelOperations interface that should implement the data sources. It returns an instance of RowLevelOperationBuilder that on its turn, generates a logical representation of the DELETE/UPDATE/MERGE operation as RowLevelOperation.

The RowLevelOperation exposes 2 factory methods returning ScanBuilder and WriteBuilder instances. The ScanBuilder is the data reader that will respect the row-level operation semantic (group- vs. element-based). The WriteBuilder is responsible for writing the data. It might use some information from the ScanBuilder, like the condition used to read the data.

Besides these interfaces, Anton also added a new logical rule called RewriteDeleteFromTable to support delete operations. If the original plan supports the SupportsRowLevelOperations, the rule creates a new RowLevelOperationTable referencing the RowLevelOperation interface, and wraps it around a ReplaceData node that gets executed later by ReplaceDataExec physical node.

It's great to see all these innovations in the DataSource V2 API! Indexes, predicate pushdown and optimized row-level operations should help improve the performance of Apache Spark workloads. Yet, it's only the API and now it's time for the databases to implement the new interfaces!