Schemas

Versions: Spark 2.0.0

Spark SQL - even if the SQL suffix makes automatically think about RDBMS - works well with other data sources, as even plain CSVs or JSON files. This relation would be difficult to achieve without the concept of schema.

This post makes insight on some points related to Spark SQL schema. The first part defines schema and its components. The second part makes a focus on encoders, widely used to make Java objects communicate with Spark SQL programs. The last part shows some tests cases about the use of schemas.

Schema definition

Spark SQL library is destined to work with structured and semi-structured data. It does that pretty well thanks to the concept of schemas. As in the case of RDBMS, they define expected fields, theirs types and even if they are nullable.

Internally the schema is represented by org.apache.spark.sql.types.StructType class. It's an immutable object since every time when new field is added, new StructType instance is created. Every schema is composed of the instances of StructField class. It's described by 4 properties:

The schema can be defined manually or resolved automatically. The first method consists on simple definition of fields describing structure of related data. The second way is more complicated since it involves some more actors. Schema resolving starts in SparkSession.getSchema() method that triggers JavaTypeInference.inferDataType(Class). This second method is responsible for matching Java data types to appropriated Spark SQL DataTypes. So computed result is further returned by getSchema() as a collection of AttributeReference. This collection is used next to convert Java objects to Spark SQL Rows.

If data source (e.g. JSON Lines) contains different structured data, their separated schemas can be merged into a single one. It's done with the activation of mergeSchema option while reading data source.

Encoders

Encoder is an interface that implementations are used to convert JVM objects to internal Spark SQL representations. Available instances are defined in Encoders class. Internally they all call org.apache.spark.sql.catalyst.encoders.ExpressionEncoder able to convert Java beans to Spark SQL internal format.

An interesting feature is pretty simple capacity to convert custom Java class to Spark's SQL row. It can be done thanks to bean encoder, got from Encoders.bean(Class) static method. It can be used to, for example, construct a Dataset of given Class type. To detect (infer) data types for beans, Encoder uses Interceptor from java.beans package.

Schema examples

Below tests show some of previously explained schema features:

private static final SparkConf CONFIGURATION =
  new SparkConf().setAppName("Schema Test").setMaster("local[1]");
private static final JavaSparkContext SPARK_CONTEXT = new JavaSparkContext(CONFIGURATION);
private static final SparkSession SESSION = new SparkSession(SPARK_CONTEXT.sc());

@Test
public void should_fail_on_reading_object_with_inconsistent_schema() {
  List<Integer> ages = Lists.newArrayList(1, 2, 3, 4, 5, 6);
  JavaRDD<Row> agesRdd = SPARK_CONTEXT.parallelize(ages)
    .map(RowFactory::create);

  StructType schema = DataTypes.createStructType(new StructField[]{
    DataTypes.createStructField("name", DataTypes.BooleanType, true),
  });
  Dataset<Row> datasetFromRdd = SESSION.createDataFrame(agesRdd, schema);

  assertThatExceptionOfType(SparkException.class).isThrownBy(() -> datasetFromRdd.collectAsList())
    .withMessageContaining("java.lang.Integer is not a valid external type for schema of boolean");
}

@Test
public void should_correctly_read_objects_with_consistent_schema() {
  List<Integer> ages = Lists.newArrayList(1, 2, 3, 4, 5, 6);
  JavaRDD<Row> agesRdd = SPARK_CONTEXT.parallelize(ages)
    .map(RowFactory::create);

  StructType schema = DataTypes.createStructType(new StructField[]{
    DataTypes.createStructField("name", DataTypes.IntegerType, true),
  });
  Dataset<Row> datasetFromRdd = SESSION.createDataFrame(agesRdd, schema);

  List<Row> rows = datasetFromRdd.collectAsList();
  assertThat(rows).hasSize(6);
  List<Integer> numbers = rows.stream().map(row -> row.getInt(0)).collect(Collectors.toList());
  assertThat(numbers).containsOnly(1, 2, 3, 4, 5, 6);
}

@Test
public void should_correctly_read_object_with_consistent_schema() {
  City paris = City.valueOf("Paris", 75);
  City marseille = City.valueOf("Marseille", 13);

  Dataset<Row> cities = SESSION.createDataFrame(Arrays.asList(paris, marseille), City.class);

  assertThat(cities.count()).isEqualTo(2);
}

@Test
public void should_create_dataset_from_encoder() {
  City paris = City.valueOf("Paris", 75);
  City lille = City.valueOf("Lille", 59);

  Encoder<City> cityEncoder = Encoders.bean(City.class);
  Dataset<City> dataset = SESSION.createDataset(Arrays.asList(paris, lille), cityEncoder);

  assertThat(dataset.count()).isEqualTo(2);
}

@Test
public void should_auto_detect_schema() {
  String filePath = getClass().getClassLoader().getResource("dataset/cities_department.json").getPath();
  Dataset<Row> citiesJson = SESSION.read().json(filePath);

  assertThat(citiesJson.count()).isEqualTo(2);
}

@Test
public void should_merge_2_schemas() {
  // Below file contains cities in 2 formats: {name[String], departmentNumber[int]} and {name[String],
  // country[String], continent[String]}
  String filePath = getClass().getClassLoader().getResource("dataset/cities_merge.json").getPath();

  Dataset<Row> mergedCities = SESSION.read().option("mergeSchema", true).json(filePath);

  StructType schema = mergedCities.schema();
  scala.collection.immutable.List<StructField> fields = schema.toList();
  assertThat(fields.size()).isEqualTo(4);
  StructField nameField = fields.apply(schema.getFieldIndex("name").get());
  assertThat(nameField.dataType()).isEqualTo(DataTypes.StringType);
  StructField departmentNumberField = fields.apply(schema.getFieldIndex("departmentNumber").get());
  assertThat(departmentNumberField.dataType()).isEqualTo(DataTypes.LongType);
  StructField countryField = fields.apply(schema.getFieldIndex("country").get());
  assertThat(countryField.dataType()).isEqualTo(DataTypes.StringType);
  StructField continentField = fields.apply(schema.getFieldIndex("continent").get());
  assertThat(continentField.dataType()).isEqualTo(DataTypes.StringType);
}

This post defines concepts related to Spark SQL schema. The first part explains how schema is composed and the methods of its resolving. The second part presents some information about Encoders and especially Java object encoders, used to convert objects to Spark SQL representation. The third part shows some schema manipulations with the use of Java API.