Apache Spark can be eagerly evaluated too - Commands

Versions: Apache Spark 3.1.1

Some time ago I participated in an interesting meetup about the MERGE operation in Delta Lake (link in the Further reading section). Jacek Laskowski presented the operation internals and asked an interesting question about the difference between commands and execs. Since I didn't know the answer right away, I decided to explore the commands concepts in this blog post.

Commands - execution

The first specific point I found about commands is their execution flow. As you know, Apache Spark DataFrame is evaluated lazily. If you call the read method of SparkSession without defining a writing action, Apache Spark won't load the data yet (it merely creates a source in a dataflow graph)

Although most things in Spark SQL are executed lazily, Commands evaluate eagerly. It means that Apache Spark executes them as soon as you define them in your pipeline, e.g. using sql() method.

Commands - side effects

Aside from the eager execution, commands have another characteristic, the side effects. If you check the Scaladoc comment for RunnableCommand, you will read something along these lines:

/**
 * A logical command that is executed for its side-effects.  `RunnableCommand's are
 * wrapped in `ExecutedCommand` during execution.
 */
trait RunnableCommand extends Command {

The physical node executing a command interface (RunnableCommand) is ExecutedCommandExec(cmd: RunnableCommand). In its definition you will find a lazy val sideEffectResult: Seq[InternalRow] field. As its name indicates, it executes the command and converts its result into Apache Spark's internal format with the cmd.run(sqlContext.sparkSession).map(converter(_).asInstanceOf[InternalRow]) operation. But what's the link with side-effects?

In programming, we consider a function to have side effects when it modifies some external state. Does this definition also apply to Apache Spark commands? To answer the question, let's try to list some of the commands first:

As you can notice, they are mostly the metadata operations, i.e., the operations interacting with a catalog. If we consider the catalog as an external state, we can say that these commands have side effects. Each of them will change something in the catalog. Some of them will add new columns, and some others create new tables. Of course, there are also read-only operations that could be considered as side effects-free because they don't modify anything. However, they interact with a mutable store, so they may also suffer from the side-effects made by other applications working on the same catalog. In that scenario, a side-effect would translate to different output printed by the DataFrame returned by the command.

In consequence, all the executions are controlled and wrapped in this sideEffectResult field that is executed only once for a sparkSession.sql("...") operation:

case class ExecutedCommandExec(cmd: RunnableCommand) extends LeafExecNode {
  protected[sql] lazy val sideEffectResult: Seq[InternalRow] = {
    val converter = CatalystTypeConverters.createToCatalystConverter(schema)
    cmd.run(sqlContext.sparkSession).map(converter(_).asInstanceOf[InternalRow])
  }

  protected override def doExecute(): RDD[InternalRow] = {
    sqlContext.sparkContext.parallelize(sideEffectResult, 1)
  }
}

Commands - getting results

I don't know if you've noticed the converter in the previous snippet. It uses a schema field to generate the command output in the InternalRow form. The schema value comes from Command's output field but it's not always present!

Remember, only some of the commands describe something. And only those expose the output to the caller. For example, the output will be empty for a command like ALTER TABLE users_list ADD COLUMNS first_name STRING but it will be defined for a command like SHOW TABLES. Below you can find an example for them:

case class AlterTableAddColumnsCommand(
    table: TableIdentifier,
    colsToAdd: Seq[StructField]) extends RunnableCommand {    
  override def output: Seq[Attribute] = Seq.empty
}

case class ShowTablesCommand(
    databaseName: Option[String],
    tableIdentifierPattern: Option[String],
    isExtended: Boolean = false,
    partitionSpec: Option[TablePartitionSpec] = None) extends RunnableCommand {

  override val output: Seq[Attribute] = {
    val tableExtendedInfo = if (isExtended) {
      AttributeReference("information", StringType, nullable = false)() :: Nil
    } else {
      Nil
    }
    AttributeReference("database", StringType, nullable = false)() ::
      AttributeReference("tableName", StringType, nullable = false)() ::
      AttributeReference("isTemporary", BooleanType, nullable = false)() :: tableExtendedInfo
  }

lazy val schema: StructType = StructType.fromAttributes(output)

Driver execution

The last important point - thank you, Jacek, for pointing this out - is the execution context. Commands stay on the driver. They interact with SessionCatalog living on the driver and there is no reason to schedule anything on the executors.

There is a special type of commands that execute as Apache Spark jobs, though! It's the V2TableWriteExec that implements V2CommandExec:

trait V2TableWriteExec extends V2CommandExec with UnaryExecNode {
  protected def writeWithV2(batchWrite: BatchWrite): Seq[InternalRow] = { 
// ...
      sparkContext.runJob(
        rdd,
        (context: TaskContext, iter: Iterator[InternalRow]) =>
          DataWritingSparkTask.run(writerFactory, context, iter, useCommitCoordinator),
        rdd.partitions.indices,
        (index, result: DataWritingSparkTaskResult) => {
          val commitMessage = result.writerCommitMessage
          messages(index) = commitMessage
          totalNumRowsAccumulator.add(result.numRows)
          batchWrite.onDataWriterCommit(commitMessage)
        }
      )

They're a candidate topic for a follow-up blog post but I wanted to mention the implementation because it shows the difference with the "metadata" (DDL) commands. The DDLs don't call the runJob. The sqlContext.sparkSession passed to their run implementations mostly provides an access point to the SessionCatalog instance, and not for scheduling the jobs.

Obviously, the rule of not scheduling a job for the DDL Commands does apply only to the commands themselves! Remember, the Commands return a DataFrame, so if you do a ddlResult.write…, Apache Spark will create a job to write the output.

To see the presented points in action, you can watch the following video:

Commands vs exec nodes

After the aforementioned meetup, I have been wondering about the differences between the commands and exec nodes. I can't see any because both are active at different execution stages and the command ends up by being wrapped by a physical exec node anyway:

  object BasicOperators extends Strategy {
    def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
      case d: DataWritingCommand => DataWritingCommandExec(d, planLater(d.query)) :: Nil
      case r: RunnableCommand => ExecutedCommandExec(r) :: Nil
// ...
}

But I can see the differences from another perspective. A Command is a logical node operating on the metadata layer. Because of this metadata character, commands are present in the analysis and physical execution stages. I didn't find them in the logical query optimization stage.

In addition to that, the commands execution is automatically cached thanks to the sideEffectResult field presented earlier in this article. For example the following code will execute the ALTER TABLE only once, when Apache Spark will run the sql(...). The show(...) will reuse the sql's result:

  val outputAlterTable = sparkSession.sql("ALTER TABLE users_list ADD COLUMNS first_name STRING") 
  outputAlterTable.show(false)

To execute a DataFrame representing a data source only once, you have to cache it explicitly.

I've somehow missed the commands in my Apache Spark journey. And I think they still have some interesting features to discover, like the commands dedicated to the DataSource V2 API. Maybe a topic for the next blog post?