Generated code in Spark SQL

Versions: Spark 2.0.0

One of powerful features of Spark SQL is dynamic generation of code. Several different layers are generated and this post explains some of them.

Analyzed code in this post is based on below snippet, executed against Spark SQL context with activated tracing debug level:

AgedPerson youngster = AgedPerson.of("young", 17);
AgedPerson older = AgedPerson.of("older", 300);
Dataset<Row> dataset = SESSION.createDataFrame(asList(youngster, older), AgedPerson.class);

Dataset<Row> people = dataset.filter(new MinAgeFilter(20));
people.foreach(rdd -> {});

public static class AgedPerson {
  private String name;

  private int age;
  // omitted for brevity
}

public static class MinAgeFilter 
  implements FilterFunction<Row>, Serializable {

  private final int minAge;

  public MinAgeFilter(int minAge) {
    this.minAge = minAge;
  }

  @Override
  public boolean call(Row row) throws Exception {
    return row.<Integer>getAs("age") >= minAge;
  }
}

Below 3 sections describe types of generated code. The first explains unsafe project operating on UnsafeRow presented in the post about Spark Project Tungsten. The second section describes safe projection working on org.apache.spark.sql.catalyst.InternalRow. The last part talks about predicate. In presented example it corresponds to MinAgeFilter.

All of these 3 sections talk about separate evaluation expression: projections and conditions. Projections produce a new row from existent one or modify already existent row (mutable rows). This creation is done with expression specified inside given projection. In the other side, conditions evalutes expressions against given row and returns true or false (success/error). Another type of evaluated expression, not described in this post, are sorting and UnsafeRows joining.

Unsafe projection

As already told, unsafe projection works on UnsafeRow, i.e. row represented by raw memory and not Java objects. Generated class extends abstract org.apache.spark.sql.catalyst.expressions.UnsafeProjection and looks like below.

At the beginning the constructor defined mandatory parts to hold object in raw memory (BufferHolder) and write it (UnsafeRowWriter). It also created empty UnsafeRow of 2 fields:

class SpecificUnsafeProjection extends org.apache.spark.sql.catalyst.expressions.UnsafeProjection {

  private Object[] references;
  private UnsafeRow result;
  private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder holder;
  private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter rowWriter;

  public SpecificUnsafeProjection(Object[] references) {
    this.references = references;
    result = new UnsafeRow(2);
    this.holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(result, 32);
    this.rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(holder, 2);
  }

After that, another method apply(InternalRow) creates UnsafeRow from InternalRow. It's called for example on getting an array of UnsafeRows in LocalTableScanExec representing "physical plan node for scanning data from a local collection" (from Scalacode). Below method defines 2 fields of AgedPerson class - age (index 0) and name (index 1):

public UnsafeRow apply(InternalRow i) {
  holder.reset();
  rowWriter.zeroOutNullBytes();
  int value = i.getInt(0);
  rowWriter.write(0, value);
  
  boolean isNull1 = i.isNullAt(1);
  UTF8String value1 = isNull1 ? null : (i.getUTF8String(1));
  if (isNull1) {
    rowWriter.setNullAt(1);
  } else {
    rowWriter.write(1, value1);
  }
  result.setTotalSize(holder.totalSize());
  return result;
} 

Safe projection

The second type of generated code is safe projection. Unlike unsafe projection, it creates a projection generating other implementation of InternalRow implementation than UnsafeRow. Specificaly, it generates a row with schema that is implemented at the end of apply(Object) method. But let's start with constructor:

class SpecificSafeProjection extends org.apache.spark.sql.catalyst.expressions.codegen.BaseProjection {
  private Object[] references;
  private InternalRow mutableRow;
  private Object[] values;
  private org.apache.spark.sql.types.StructType schema;

  public SpecificSafeProjection(Object[] references) {
    this.references = references;
    mutableRow = (InternalRow) references[references.length - 1];
    this.schema = (org.apache.spark.sql.types.StructType) references[0];
  }

Some things to note. First, the projection is constructed with an array of objects passed in references variable. This array is specifically composed of 2 elements: the schema (index 0) and the mutable row (index 1). After adding some breakpoints and launching code from this post in debug mode, we can intercept the exact content of references array:

this = {org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificSafeProjection@8214} ""
 references = {java.lang.Object[2]@8232} 
  0 = {org.apache.spark.sql.types.StructType@8234} "StructType" size = 2
  1 = {org.apache.spark.sql.catalyst.expressions.SpecificInternalRow@8224} "[null]"
   values = {org.apache.spark.sql.catalyst.expressions.MutableValue[1]@8226} 
    0 = {org.apache.spark.sql.catalyst.expressions.MutableAny@8242} 
     value = null
     isNull = true

The method building targeted row looks like method of the same type in the case of unsafe projection:

public java.lang.Object apply(java.lang.Object _i) {
  InternalRow i = (InternalRow) _i;
  values = new Object[2];

  int value1 = i.getInt(0);
  if (false) {
    values[0] = null;
  } else {
    values[0] = value1;
  }

  boolean isNull3 = i.isNullAt(1);
  UTF8String value3 = isNull3 ? null : (i.getUTF8String(1));
  boolean isNull2 = true;
  java.lang.String value2 = null;
  if (!isNull3) {
    isNull2 = false;
    if (!isNull2) {
      Object funcResult = null;
      funcResult = value3.toString();
      if (funcResult == null) {
        isNull2 = true;
      } else {
        value2 = (java.lang.String) funcResult;
      }
    }
    isNull2 = value2 == null;
  }
  if (isNull2) {
    values[1] = null;
  } else {
    values[1] = value2;
  }
  final org.apache.spark.sql.Row value = 
    new org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema(values, schema);
  if (false) {
    mutableRow.setNullAt(0);
  } else {
    mutableRow.update(0, value);
  }
  return mutableRow; 
}

The generated code looks complicated because of unusual if(false) checks. After eliminating them, we can simply deduce that the code moves all values of received InternalRow to mutable InternalRow, initialized in constructor. An important thing to note is the fact that created row stores values in an array of objects. It's why the mutableRow is updated with Object[] values.

Predicate

The third generated code concerns predicates, in case MinAgeFilter. Declared filter is wrapped with the implementation of abstract Predicate class as in below snippet:

class SpecificPredicate extends org.apache.spark.sql.catalyst.expressions.codegen.Predicate {
  private final Object[] references;
  private org.apache.spark.sql.Row argValue;

  public SpecificPredicate(Object[] references) {
    this.references = references;
  }

  public boolean eval(InternalRow i) {
    Object obj = ((Expression) references[0]).eval(null);
    org.apache.spark.api.java.function.FilterFunction value1 = 
      (org.apache.spark.api.java.function.FilterFunction) obj;
    boolean isNull = true;
    boolean value = false;
    if (!false) {
      org.apache.spark.sql.Row value2 = (org.apache.spark.sql.Row)i.get(0, null);
      argValue = value2;
      isNull = false;
      if (!isNull) {
        try {
          value = value1.call(argValue);
        } catch (Exception e) {
          org.apache.spark.unsafe.Platform.throwException(e);
        } 
      }
    }
    return !isNull && value;
  }
} 

Some similarities with safe projection ? First of all, the constructor takes once again an array of objects. Thanks to the same trick of debugging (discover them in Tips to discover internals of an Open Source framework internals - Apache Spark use case) we can see that reference array brings filter to apply (it can also be deduced from the cast inside eval(InternalRow)):

this = {org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate@8237} 
 references = {java.lang.Object[1]@8241} 
  0 = {org.apache.spark.sql.catalyst.expressions.Literal@8245} "com.waitingforcode.sql.PlanDebug$MinAgeFilter@6c363318"
   value = {com.waitingforcode.sql.PlanDebug$MinAgeFilter@8236} 
   dataType = {org.apache.spark.sql.types.ObjectType@8247} "ObjectType(interface org.apache.spark.api.java.function.FilterFunction)"
   resolved = false
   canonicalized = null
   bitmap$0 = 0
   origin = {org.apache.spark.sql.catalyst.trees.Origin@8133} "Origin(None,None)"
   containsChild = null
   _hashCode = 0
   allChildren = null
   org.apache.spark.sql.catalyst.trees.TreeNode.bitmap$0 = 0

eval(InternalRow) is pretty trivial. It starts by cast Object to FilterFunction. Such casted filter is further applied on passed row.

This post presents step by step, 3 projections generated by Spark SQL. The first part talks about unsafe projection working on UnsafeRow. The second part explains safe projection, adapted to deal with schema-defined row. The last part shows the case of filtering.