The who, when, how and what of Apache Spark SQL code generation

Versions: Apache Spark 2.4.0

The code generated by Apache Spark for all the queries defined with higher-level concepts as SQL queries is the key to understand the processing logic performance. This post, started after a discussion on my Github, tries to explain some of the basics of code generation workflow.

The first section of the post talks about the "who" and hence explains the classes involved in the code generator. It's followed by the part about the "when" the code generation happens. The next section focuses on the "how" the generation is made while the last one about the "what" is generated.

Who

Let's start by seeing who is involved in the code generation. By "who" I mean the classes responsible for triggering code generation and the generation itself. The generation is started in different places. The first one is GenerateUnsafeProjection who creates the initial projection and saves the data as UnsafeRow in a buffer. Another trigger object is WholeStageCodeGenExec who is responsible for the data processing logic generation.

The queries are translated into Java code by the implementations of CodeGenerator[InType <: AnyRef, OutType <: AnyRef] abstract class. Among its implementations we can find: GeneratePredicate which evaluates the expression on each input row, GenerateSafeProjection able to create an updatable InternalRow, or GenerateOrdering to create the code for ordering expressions. You can get the full list directly by analyzing the implementations of CodeGenerator.

Among the actors involved in the code generation we can also find all implementations of CodegenSupport trait: ProjectExec, FilterExec, SampleExec, RangeExec, BroadcastHashJoinExec, ColumnarBatchScan, DatasourceScanExec, RawDataSourceScanExec, FileSourceScanExec, DataSourceV2ScanExec, ExpandExec, GenerateExec and many many others. Globally all of them are responsible for the conversion of higher-level expressions (e.g. SQL) into the executable Java code.

The final key participant of the code generation is CodegenContext. This context-like object is important because it behaves like a container and a factory. Its former role consists of storing the list of the objects passed into generated classes. The references are used for instance to store a single instance of a given object, as it's the case of datetime expressions which store there the timezone, or datetime formatter. It's also the case of some other expressions which save there the references to themselves in order to call their methods from the generated code. It happens for instance in the case of the ArrayUnion operator:

case class ArrayUnion(left: Expression, right: Expression) extends ArrayBinaryLike
  with ComplexTypeMergingExpression {
     override def nullSafeEval(input1: Any, input2: Any): Any = {
    val array1 = input1.asInstanceOf[ArrayData]
    val array2 = input2.asInstanceOf[ArrayData]

    evalUnion(array1, array2)
  }
  override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {{
    // ...
    } else {
      nullSafeCodeGen(ctx, ev, (array1, array2) => {
        val expr = ctx.addReferenceObj("arrayUnionExpr", this)
        s"${ev.value} = (ArrayData)$expr.nullSafeEval($array1, $array2);"
      })
    }
}

CodegenContext is also important from another point of view. It provides a wide range of factory methods to create the textual representation of the generated code. For instance, it provides a factory method to create a null safe execution:

 def nullSafeExec(nullable: Boolean, isNull: String)(execute: String): String = {
    if (nullable) {
      s"""
        if (!$isNull) {
          $execute
        }
      """
    } else {
      "\n" + execute
    }
  }

When

The physical code generation happens in different places of the Apache Spark program execution. The first one is already presented WholeStageCodegenExec invoked for every method of the high-level API (functions, SQL, ...). The following diagram shows what happens when we try to read a file:

In the picture you can see that the entrypoint for the code generation is WholeStageCodeGenExec. It generates the code for its underlying plans. In our example, the plan is composed only of the file scan activity but as we will see later in this post, it's not the rule.

Another places invoking code generation, outside the WholeStageCodeGenExec, are the implementations of CodeGenerator. One of them is GenerateUnsafeProjection involved in all operations creating new UnsafeRow instances. It happens for instance when we create an in-memory Dataset, manipulate streaming and batch aggregations, or generate join keys in broadcast hash join. Among other CodeGenerator implementations we have GeneratePredicate that is used in join operations (streaming and batch) or GenerateOrdering involved in all order-related operations.

Long story short, since the code generation is the materialization of the physical execution plan steps, it can be invoked in any place of the pipeline. We saw that it was the case of the first step creating the input data but also that it could happen later, for instance during the data aggregation.

How

Internally the code is generated in different ways. The first one is CodegenSupport's doProduce(CodegenContext): String abstract method implemented by the most of physical plans. In all the cases the generation consists of an extended use of CodegenContext - to store the references or the mutable state. The former ones were already presented. Regarding the mutable state, it's nothing more than the textual representation of Java code, inserted directly in the generated code by a physical operator. The mutable states are stored in CodegenContext though in order to prevent the unexpected errors when the generated code is split into multiple functions in CodegenContext#splitExpressions method. Regarding the references, they're input objects passed to the generated code. It concerns the instances created outside the CodegenContext, as for instance in the operators. The solution with references was preferred over setting the object itself as the generated class field because of the JVM limits about the number of fields per class. It's much less risky to reference them that way.

Another way to produce the code consists on implementing org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator#create method. It too, even though we don't see that from the signature, uses the CodegenContext internally:

// GeneratePredicate
  protected def create(predicate: Expression): Predicate = {
    val ctx = newCodeGenContext()

// GenerateSafeProjection
  protected def create(expressions: Seq[Expression]): Projection = {
    val ctx = newCodeGenContext()

// GenerateUnsafeProjectio
  protected def create(references: Seq[Expression]): UnsafeProjection = {
    create(references, subexpressionEliminationEnabled = false)
  }

  private def create(
      expressions: Seq[Expression],
      subexpressionEliminationEnabled: Boolean): UnsafeProjection = {
    val ctx = newCodeGenContext()

Once generated, the Java code is compiled by org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator#compile method. It's invoked by WholeStageCodegenExec or one of already listed CodeGenerator's implementations. The bytecode generation itself is done with Janino compiler.

What

The class generated by the Janino compiler is an instance of org.apache.spark.sql.catalyst.expressions.codegen.GeneratedClass. It's a wrapper for the initially generated class with an abstract generate(references: Array[Any]): Any method. Thanks to that method we can pass any extra object to the wrapped class. The method is used in some of the places listed in the previous section,mainly to pass the array of references from CodegenContext:

// GenerateOrdering
val (clazz, _) = CodeGenerator.compile(code)
clazz.generate(ctx.references.toArray).asInstanceOf[BaseOrdering]
// For generate implementation:
//      public SpecificOrdering generate(Object[] references) {
//            return new SpecificOrdering(references);
//      }

// GenerateMutableProjection
val (clazz, _) = CodeGenerator.compile(code)
clazz.generate(ctx.references.toArray).asInstanceOf[MutableProjection]
// For generate implementation:
//     public java.lang.Object generate(Object[] references) {
//            return new SpecificMutableProjection(references);
//      }

// But also to pass empty and ignores values
// GenerateColumnAccessor
val (clazz, _) = CodeGenerator.compile(code)
clazz.generate(Array.empty).asInstanceOf[ColumnarIterator] 
// For  generate implementation:
//    public SpecificColumnarIterator generate(Object[] references) {
//            return new SpecificColumnarIterator();
//      }

If you look attentively at the generated code, you'll certainly notice the presence of numbers after a lot of operations, for instance: private void agg_doAggregateWithoutKey_0, private java.lang.String[] mutableStateArray_0 = new java.lang.String[1], private UTF8String StaticInvoke_0(InternalRow i) {. These numbers come from Codegen's freshNameIds map and are simply the way to avoid naming conflicts. The map stores the variable base names as keys and the number of their appearances in the code as the value. Every time the Codegen's freshName(name: String) method is invoked to get the next available name for a variable, the freshNameIds map is used.

Aside from the id suffixes we can notice another specific thing - the prefixes. The functions are often prefixed with agg_, bhj_, rdd_, scan_ and so forth. These values are retrieved from CodegenSupport's variablePrefix value and are strictly related to the operator for which the code is generated.

The code generation in Apache Spark is an interesting topic difficult to cover in one concise blog post. I tried here to introduce the most important code generation concepts that should help you to deep delve into more advanced features easier. The 4 sections focus on one specific point and gathered together should give the workflow of the code generation with its main actors, trigger moments and key methods. If you find that the post misses some concepts, please let me know, either on my Github by creating an issue, or in the comment below this post.