What's new in Apache Spark 3.0 - delete, update and merge API support

Versions: Apache Spark 3.0.0

All the operations from the title are natively available in relational databases but doing them with distributed data processing systems is not obvious. Starting from 3.0, Apache Spark gives a possibility to implement them in the data sources.

I will cover all these 3 operations in the next 3 sections, starting by the delete because it seems to be the most complete.

Delete support

There are multiple layers to cover before implementing a new operation in Apache Spark SQL. The first of them concerns the parser, so the part translating the SQL statement into a more meaningful part. For the delete operation, the parser change looks like that:

# SqlBase.g4
DELETE FROM multipartIdentifier tableAlias whereClause

Later on, this expression has to be translated into a logical node and the magic happens in AstBuilder. The builder takes all parts from the syntax (mutlipartIdentifier, tableAlias, whereClause) and converts them into the components of DeleteFromTable logical node:

case class DeleteFromTable(
    table: LogicalPlan,
    condition: Option[Expression]) extends Command with SupportsSubquery {
  override def children: Seq[LogicalPlan] = table :: Nil
}

At this occasion it worth noticing that the new mixin, SupportsSubquery was added. The analyze stage uses it to know whether given operation is supported with a subquery.

The logical node is later transformed into the physical node, responsible for the real execution of the operation. The physical node for the delete is DeleteFromTableExec class. During the conversion we can see that so far, the subqueries aren't really supported in the filter condition:

    case DeleteFromTable(relation, condition) =>
      relation match {
        case DataSourceV2ScanRelation(table, _, output) =>
          if (condition.exists(SubqueryExpression.hasSubquery)) {
            throw new AnalysisException(
              s"Delete by condition with subquery is not supported: $condition")
          }

Once resolved, DeleteFromTableExec's field called table, is used for physical execution of the delete operation. This field is an instance of a table mixed with SupportsDelete trait, so having implemented the deleteWhere(Filter[] filters) method. The table that doesn't support the deletes but called with DELETE FROM operation, will fail because of this check from DataSourceV2Implicits.TableHelper:

    def asDeletable: SupportsDelete = {
      table match {
        case support: SupportsDelete =>
          support
        case _ =>
          throw new AnalysisException(s"Table does not support deletes: ${table.name}")
      }
    }

For now, any of the built-in V2 sources support the deletes. And that's why when you run the command on the native ones, you will get this error:

  before {
    sparkSession.sql("DROP TABLE IF EXISTS numbers_delete")
  }

  "the DELETE operation" should "be supported if doesn't contain subquery" in {
    sparkSession.sql("CREATE TABLE numbers_delete (id bigint, number int) USING HIVE PARTITIONED BY (id)")
    sparkSession.sql("INSERT INTO numbers_delete VALUES (1, 1), (2, 2), (3, 3)")

    val analysisException = intercept[AnalysisException] {
      sparkSession.sql("DELETE FROM numbers_delete WHERE number = 3")
    }

    analysisException.getMessage() should include("DELETE is only supported with v2 tables")
  }

Update support

I started by the delete operation on purpose because it was the most complete one, ie. supporting the whole chain, from the parsing to the physical execution. It's not the case of the remaining 2 operations, so the overall understanding should be much easier.

First, the update. As for the delete, a new syntax (UPDATE multipartIdentifier tableAlias setClause whereClause?) and logical node were added:

case class UpdateTable(
    table: LogicalPlan,
    assignments: Seq[Assignment],
    condition: Option[Expression])

case class Assignment(key: Expression, value: Expression) extends Expression with Unevaluable {
  override def foldable: Boolean = false
  override def nullable: Boolean = false
  override def dataType: DataType = throw new UnresolvedException(this, "nullable")
  override def children: Seq[Expression] = key ::  value :: Nil
}

But if you look for the physical execution support, you will not find it. If you will try to execute an update, the execution will fail because of this pattern match in the BasicOperators class:

      case _: UpdateTable =>
        throw new UnsupportedOperationException(s"UPDATE TABLE is not supported temporarily.")

And you can see it in the following test:

  "the UPDATE operation" should "not be supported" in {
    val numbers = Seq((1), (2), (3)).toDF("number")
    numbers.createOrReplaceTempView("numbers_update")

    val updateError = intercept[UnsupportedOperationException] {
      sparkSession.sql("UPDATE numbers_update SET number = 3 WHERE number = 2")
    }

    updateError.getMessage should include("UPDATE TABLE is not supported temporarily")
  }

Merge support

Regarding the merge, the story is the same as for the update, ie. only the parsing part is implemented in 3.0. However, unlike the update, its implementation is a little bit more complex since the logical node involves the following:

case class MergeIntoTable(
    targetTable: LogicalPlan,
    sourceTable: LogicalPlan,
    mergeCondition: Expression,
    matchedActions: Seq[MergeAction],
    notMatchedActions: Seq[MergeAction]) extends Command with SupportsSubquery {
  override def children: Seq[LogicalPlan] = Seq(targetTable, sourceTable)
}

You can see then that we have one table for the source and for the target, the merge conditions, and less obvious to understand, matched and not matched actions. Just to recall, a MERGE operation looks like that:

MERGE INTO table1 AS target
USING table2 ON target.id_col = source.id_col
WHEN 
  MATCHED THEN UPDATE SET target.some_column = source.some_column
 WHEN
  NOT MATCHED THEN INSERT (target.some_column) VALUES (source.some_column)

As you can see, my merge statement uses 2 tables and 2 different actions. When both tables contain a given entry, the target's column will be updated with the source value. When the match is not found, a new value will be inserted in the target table.

But if you try to execute it, you should get the following error:

      case _: MergeIntoTable =>
        throw new UnsupportedOperationException(s"MERGE INTO TABLE is not supported temporarily.")

And as a proof, you can take this very simple test:

  "the MERGE operation" should "not be supported" in {
    val numbers = Seq((1), (2), (3)).toDF("number")
    numbers.createOrReplaceTempView("numbers_merge_source")
    val numbers2 = Seq((1), (2), (3)).toDF("number")
    numbers2.createOrReplaceTempView("numbers_merge_target")

    val mergeError = intercept[UnsupportedOperationException] {
      sparkSession.sql(
        """
          |MERGE INTO numbers_merge_target AS target
          | USING numbers_merge_source AS source ON target.number = source.number
          |WHEN MATCHED THEN UPDATE SET target.number = source.number
          |""".stripMargin)
    }

    mergeError.getMessage should include("MERGE INTO TABLE is not supported temporarily")
  }

Despite the fact of providing the possibility for physical execution only for the delete, the perspective of the support for the update and merge operations looks amazing. Now, it's time for the different data sources supporting delete, update and merge operations, to implement the required interfaces and connect them to Apache Spark 🤝