Spark SQL schema is very flexible. It supports global data types, as booleans, integers, strings, but it also supports custom data types called User Defined Type (UDT).
Data Engineering Design Patterns

Looking for a book that defines and solves most common data engineering problems? I wrote
one on that topic! You can read it online
on the O'Reilly platform,
or get a print copy on Amazon.
I also help solve your data engineering problems 👉 contact@waitingforcode.com 📩
Through this post we can discover a little bit about UDT. The first part explains their role in Spark SQL. It also indicates how to implement UDT. The second part tests the implementation of our sample City class as an UDT.
User Defined Type definition
UDT helps to use custom data types in Spark SQL programs. For example, thanks to UDT we can build Dataset objects directly from RDD of Rows. According to Scaladoc, this improved interoperability with Spark SQL applications is one of goals of UDT.
To define an UDT, custom data type must extend abstract org.apache.spark.sql.types.UserDefinedType class and implement its 4 methods:
- sqlType() - returns the schema representing given UDT object.
- serialize(T) - converts object of UDT class (T) to Spark SQL representation (e.g. InternalRow).
- deserialize(Object) - constructs, for example from InternalRow, the object represented by UDT.
- userClass() - returns the class for which the UDT is defined.
But it's not enough to create an UDT. The last thing to do is to annotate class related to UDT with @SQLUserDefinedType and define its udt parameter as previously implemented UDT class. On runtime, before serializing and deserializing objects, Spark SQL checks if there are classes annotated with @SQLUserDefinedType. If it's the case, it uses defined UDT to make serialization and deserialization. Below snipped of RowEncoder class shows that:
private def serializerFor( inputObject: Expression, inputType: DataType): Expression = inputType match { case dt if ScalaReflection.isNativeType(dt) => inputObject case p: PythonUserDefinedType => serializerFor(inputObject, p.sqlType) case udt: UserDefinedType[_] => val annotation = udt.userClass.getAnnotation(classOf[SQLUserDefinedType]) val udtClass: Class[_] = if (annotation != null) { annotation.udt() } else { UDTRegistration.getUDTFor(udt.userClass.getName).getOrElse { throw new SparkException(s"${udt.userClass.getName} is not annotated with " + "SQLUserDefinedType nor registered with UDTRegistration.}") } } // ...
It exists also less invasive manner to declare an object as UDT. For that an registry represented by UDTRegistration is publicly accessible. A new mapping between custom object and its UDT can be added through register(String, String) method.
User Defined Type example
Below example shows how to define a UDT for an object representing a city (name with the department number). First, the City class:
@SQLUserDefinedType(udt = CityUserDefinedType.class) public class City implements Serializable { private String name; private Integer departmentNumber; public String getName() { return name; } public void setName(String name) { this.name = name; } public Integer getDepartmentNumber() { return departmentNumber; } public void setDepartmentNumber(Integer departmentNumber) { this.departmentNumber = departmentNumber; } public static City valueOf(String name, Integer departmentNumber) { City city = new City(); city.name = name; city.departmentNumber = departmentNumber; return city; } @Override public String toString() { return MoreObjects.toStringHelper(this).add("name", name).add("departmentNumber", departmentNumber) .toString(); } }
And because this class is not complicated, we can directly show its UDT, defined in @SQLUserDefinedType annotation:
public class CityUserDefinedType extends UserDefinedType<City> { private static final int DEPT_NUMBER_INDEX = 0; private static final int NAME_INDEX = 1; private static final DataType DATA_TYPE; static { MetadataBuilder metadataBuilder = new MetadataBuilder(); metadataBuilder.putLong("maxNumber", 99); DATA_TYPE = DataTypes.createStructType(new StructField[]{ DataTypes.createStructField("departmentNumber", DataTypes.IntegerType, false, metadataBuilder.build()), DataTypes.createStructField("name", DataTypes.StringType, false) }); } @Override public DataType sqlType() { return DATA_TYPE; } @Override public InternalRow serialize(City city) { InternalRow row = new GenericInternalRow(2); row.setInt(DEPT_NUMBER_INDEX, city.getDepartmentNumber()); row.update(NAME_INDEX, UTF8String.fromString(city.getName())); return row; } @Override public City deserialize(Object datum) { if (datum instanceof InternalRow) { InternalRow row = (InternalRow) datum; return City.valueOf(row.getString(NAME_INDEX), row.getInt(DEPT_NUMBER_INDEX)); } throw new IllegalStateException("Unsupported conversion"); } @Override public Class<City> userClass() { return City.class; } }
UDT is also pretty easy. The most verbose part concerns the SQL schema but it consists only on defining field types matching Java objects composing City class with additional metadata information. Finally, below test shows an example of UDT manipulation:
@Test public void should_create_schema_programatically_with_metadata_and_UDT() { // If City wouldn't be annotated with @SQLUserDefinedType, below // use of UDT registry should be enough to make it work // UDTRegistration.register("com.waitingforcode.sql.udt.City", "com.waitingforcode.sql.udt.CityUserDefinedType"); SparkSession session = SparkSession.builder().master("local[1]") .appName("UserDefinedType Test").getOrCreate(); try (JavaSparkContext closeableContext = new JavaSparkContext(session.sparkContext())) { City paris = City.valueOf("Paris", 75); City marseille = City.valueOf("Marseille", 13); City nantes = City.valueOf("Nantes", 44); List<City> cities = Lists.newArrayList(paris, marseille, nantes); JavaRDD<Row> citiesRdd = closeableContext.parallelize(cities) .map(RowFactory::create); StructType citySchema = new StructType().add("city", new CityUserDefinedType(), false); Dataset<Row> datasetFromRdd = session.createDataFrame(citiesRdd, citySchema); datasetFromRdd.show(false); // First, make some checks on used schema scala.collection.immutable.List<StructField> fields = citySchema.toList(); assertThat(fields.size()).isEqualTo(1); StructField cityBeanField = fields.apply(citySchema.getFieldIndex("city").get()); DataType dataType = cityBeanField.dataType(); assertThat(dataType).isInstanceOf(CityUserDefinedType.class); // Next, check if data was correctly serialized/deserialized List<Row> rows = datasetFromRdd.collectAsList(); City parisRow = getCityByName(rows, paris.getName()); assertRowEqualsToObject(parisRow, paris); City marseilleRow = getCityByName(rows, marseille.getName()); assertRowEqualsToObject(marseilleRow, marseille); City nantesRow = getCityByName(rows, nantes.getName()); assertRowEqualsToObject(nantesRow, nantes); assertThat(rows).hasSize(3); } } private void assertRowEqualsToObject(City row, City city) { assertThat(row.getDepartmentNumber()).isEqualTo(city.getDepartmentNumber()); } private City getCityByName(Collection<Row> rows, String name) { return rows.stream().map(row -> (City) row.getAs("city")).filter(city -> city.getName().equals(name)) .findFirst().get(); }
This post shows how to use User Defined Types in Spark SQL. The first part explains the goal of UDT and how implement them. The second part contains an example of UDT definition.
Consulting

With nearly 16 years of experience, including 8 as data engineer, I offer expert consulting to design and optimize scalable data solutions.
As an O’Reilly author, Data+AI Summit speaker, and blogger, I bring cutting-edge insights to modernize infrastructure, build robust pipelines, and
drive data-driven decision-making. Let's transform your data challenges into opportunities—reach out to elevate your data engineering game today!
👉 contact@waitingforcode.com
đź”— past projects