User Defined Type

Versions: Spark 1.6.0

Spark SQL schema is very flexible. It supports global data types, as booleans, integers, strings, but it also supports custom data types called User Defined Type (UDT).

Through this post we can discover a little bit about UDT. The first part explains their role in Spark SQL. It also indicates how to implement UDT. The second part tests the implementation of our sample City class as an UDT.

User Defined Type definition

UDT helps to use custom data types in Spark SQL programs. For example, thanks to UDT we can build Dataset objects directly from RDD of Rows. According to Scaladoc, this improved interoperability with Spark SQL applications is one of goals of UDT.

To define an UDT, custom data type must extend abstract org.apache.spark.sql.types.UserDefinedType class and implement its 4 methods:

But it's not enough to create an UDT. The last thing to do is to annotate class related to UDT with @SQLUserDefinedType and define its udt parameter as previously implemented UDT class. On runtime, before serializing and deserializing objects, Spark SQL checks if there are classes annotated with @SQLUserDefinedType. If it's the case, it uses defined UDT to make serialization and deserialization. Below snipped of RowEncoder class shows that:

private def serializerFor(
    inputObject: Expression,
    inputType: DataType): Expression = inputType match {
  case dt if ScalaReflection.isNativeType(dt) => inputObject

  case p: PythonUserDefinedType => serializerFor(inputObject, p.sqlType)

  case udt: UserDefinedType[_] =>
    val annotation = udt.userClass.getAnnotation(classOf[SQLUserDefinedType])
    val udtClass: Class[_] = if (annotation != null) {
      annotation.udt()
    } else {
      UDTRegistration.getUDTFor(udt.userClass.getName).getOrElse {
        throw new SparkException(s"${udt.userClass.getName} is not annotated with " +
          "SQLUserDefinedType nor registered with UDTRegistration.}")
      }
    }
// ...

It exists also less invasive manner to declare an object as UDT. For that an registry represented by UDTRegistration is publicly accessible. A new mapping between custom object and its UDT can be added through register(String, String) method.

User Defined Type example

Below example shows how to define a UDT for an object representing a city (name with the department number). First, the City class:

@SQLUserDefinedType(udt = CityUserDefinedType.class)
public class City implements Serializable {
  private String name;

  private Integer departmentNumber;

  public String getName() {
    return name;
  }

  public void setName(String name) {
      this.name = name;
  }

  public Integer getDepartmentNumber() {
    return departmentNumber;
  }

  public void setDepartmentNumber(Integer departmentNumber) {
    this.departmentNumber = departmentNumber;
  }

  public static City valueOf(String name, Integer departmentNumber) {
    City city = new City();
    city.name = name;
    city.departmentNumber = departmentNumber;
    return city;
  }

  @Override
  public String toString() {
    return MoreObjects.toStringHelper(this).add("name", name).add("departmentNumber", departmentNumber)
      .toString();
  }

} 

And because this class is not complicated, we can directly show its UDT, defined in @SQLUserDefinedType annotation:

public class CityUserDefinedType extends UserDefinedType<City> {

  private static final int DEPT_NUMBER_INDEX = 0;
  private static final int NAME_INDEX = 1;
  private static final DataType DATA_TYPE;
  static {
    MetadataBuilder metadataBuilder = new MetadataBuilder();
    metadataBuilder.putLong("maxNumber", 99);
    DATA_TYPE = DataTypes.createStructType(new StructField[]{
      DataTypes.createStructField("departmentNumber", DataTypes.IntegerType, false, metadataBuilder.build()),
      DataTypes.createStructField("name", DataTypes.StringType, false)
    });
  }

  @Override
  public DataType sqlType() {
    return DATA_TYPE;
  }

  @Override
  public InternalRow serialize(City city) {
    InternalRow row = new GenericInternalRow(2);
    row.setInt(DEPT_NUMBER_INDEX, city.getDepartmentNumber());
    row.update(NAME_INDEX, UTF8String.fromString(city.getName()));
    return row;
  }

  @Override
  public City deserialize(Object datum) {
    if (datum instanceof InternalRow) {
      InternalRow row = (InternalRow) datum;
      return City.valueOf(row.getString(NAME_INDEX), row.getInt(DEPT_NUMBER_INDEX));
    }
    throw new IllegalStateException("Unsupported conversion");
  }

  @Override
  public Class<City> userClass() {
    return City.class;
  }
}

UDT is also pretty easy. The most verbose part concerns the SQL schema but it consists only on defining field types matching Java objects composing City class with additional metadata information. Finally, below test shows an example of UDT manipulation:

@Test
public void should_create_schema_programatically_with_metadata_and_UDT() {
  // If City wouldn't be annotated with @SQLUserDefinedType, below
  // use of UDT registry should be enough to make it work
  // UDTRegistration.register("com.waitingforcode.sql.udt.City", "com.waitingforcode.sql.udt.CityUserDefinedType");
  SparkSession session = SparkSession.builder().master("local[1]")
    .appName("UserDefinedType Test").getOrCreate();
  try (JavaSparkContext closeableContext = new JavaSparkContext(session.sparkContext())) {
    City paris = City.valueOf("Paris", 75);
    City marseille = City.valueOf("Marseille", 13);
    City nantes = City.valueOf("Nantes", 44);
    List<City> cities = Lists.newArrayList(paris, marseille, nantes);
    JavaRDD<Row> citiesRdd = closeableContext.parallelize(cities)
      .map(RowFactory::create);

    StructType citySchema = new StructType().add("city", 
      new CityUserDefinedType(), false);
    Dataset<Row> datasetFromRdd = session.createDataFrame(citiesRdd, citySchema);
    datasetFromRdd.show(false);

    // First, make some checks on used schema
    scala.collection.immutable.List<StructField> fields = citySchema.toList();
    assertThat(fields.size()).isEqualTo(1);
    StructField cityBeanField = fields.apply(citySchema.getFieldIndex("city").get());
    DataType dataType = cityBeanField.dataType();
    assertThat(dataType).isInstanceOf(CityUserDefinedType.class);
    // Next, check if data was correctly serialized/deserialized
    List<Row> rows = datasetFromRdd.collectAsList();
    City parisRow = getCityByName(rows, paris.getName());
    assertRowEqualsToObject(parisRow, paris);
    City marseilleRow = getCityByName(rows, marseille.getName());
    assertRowEqualsToObject(marseilleRow, marseille);
    City nantesRow = getCityByName(rows, nantes.getName());
    assertRowEqualsToObject(nantesRow, nantes);
    assertThat(rows).hasSize(3);
  }
}

private void assertRowEqualsToObject(City row, City city) {
  assertThat(row.getDepartmentNumber()).isEqualTo(city.getDepartmentNumber());
}

private City getCityByName(Collection<Row> rows, String name) {
  return rows.stream().map(row -> (City) row.getAs("city")).filter(city -> city.getName().equals(name))
    .findFirst().get();
}

This post shows how to use User Defined Types in Spark SQL. The first part explains the goal of UDT and how implement them. The second part contains an example of UDT definition.