Schema projection

on waitingforcode.com

Schema projection

You're a data scientist? Great! If you want to acquire some data engineering most important concepts, it's even better :) I prepared a course that will help you to acquire data engineering best practices. Join the class!
Even if it's always better to explicit things, in programming we have often the possibility to let the computer to guess. Spark SQL also has this level of intelligence, for example during schema resolving.

Through this post we can learn about strategies used to resolve schema according to defined data. The first part explains some Scala classes involved in the process and explains the resolution pipeline. The second part shows some examples of automatic schema detection. The basic information about schemas won't be contained here. You can find them in the post about Schemas in Spark SQL.

Automatic schema detection

Schema resolution depends mostly on read data source. If the source is supposed to contain structured data (i.e. relational database), schema is retrieved directly without guessing:

# org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD#resolveTable
def resolveTable(options: JDBCOptions): StructType = {
  val url = options.url
  val table = options.table
  val dialect = JdbcDialects.get(url)
  val conn: Connection = JdbcUtils.createConnectionFactory(options)()
  try {
    val statement = conn.prepareStatement(dialect.getSchemaQuery(table))
    try {
      val rs = statement.executeQuery()
      try {
        JdbcUtils.getSchema(rs, dialect)
      } finally {
        rs.close()
      }
    } finally {
      statement.close()
    }
  } finally {
    conn.close()
  }
}

The query responsible for resolving the schema depends on used database. For instance, for MySQL it looks like SELECT * FROM $table WHERE 1=0.

More complicated operations are applied on semi-structured data, as JSON or Parquet files. In these cases, the schema is inferred. For JSON it's done by org.apache.spark.sql.execution.datasources.json.InferSchema#infer() method. For Parquet files it's done by org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat#inferSchema() method. Entering into details for each of them could be painful to read. It's why we'll try to deduce some basic rules of automatic schema resolution.

As a rule of thumb we can consider that every time when some schema information is available (e.g. Parquet's summary files, database table), Spark SQL tries to read this information and contsruct correct schema. In the other side, the schema resolution for schemaless formats, as JSON files, is based on sampling. The sampling is controlled by samplingRatio parameter. If it's equal to 1 (100%), the schema will be inferred from all JSON entries. If it's lower than that, only sampled number of entries, depending on specified ratio, will be analyzed.

For the JSON case, after getting a number of entries, Spark SQL uses Jackson's classes (JsonParser, JsonFactory) to match types from JSON to their corresponding Spark SQL's schema objects. The sampling ratio is a key for this operation - if too big, the operation can be very slow. If too low, some of columns can be missing (to see what happens then, take a look at "low sampling" should "make select query fail because of not detected column" in test case).

Schema resolution examples

Schema resolution, thanks to Dataset's schema, can be easily tested:

var sparkSession: SparkSession = null
val structuredFile = new File("./structured_data.json")
val semiStructuredFile = new File("./semi_structured_data.json")
val structuredParquetFile = new File("./structured_data.parquet")

before {
  sparkSession = SparkSession.builder()
    .appName("Spark SQL automatic schema resolution test").master("local[1]").getOrCreate()
  // Structured data
  val structuredEntries =
    """
      | {"name": "A", "age": 20, "hobby": null}
      | {"name": "B", "age": 24, "hobby": "sport"}
      | {"name": "C", "age": 30, "hobby": "travelling"}
      | {"name": "D", "age": 34, "hobby": null}
      | {"name": "E", "age": 40, "hobby": "cooking"}
      | {"name": "F", "age": 44, "hobby": null}
    """.stripMargin
  FileUtils.writeStringToFile(structuredFile, structuredEntries)
  // Semi-structured data (in some cases, keys are missing)
  val semiStructuredEntries =
    """
      | {"name": "A", "age": 20, "hobby": null}
      | {"name": "B", "city": "London", "hobby": null}
      | {"name": "C", "country": "France"}
      | {"name": "D", "age": 20, "hobby": null}
      | {"preferences": {"sport": "football", "fruit": "apple"}}
      | {"job": "programmer"}
      | {"age": 20, "job": "software engineer"}
    """.stripMargin
  FileUtils.writeStringToFile(semiStructuredFile, semiStructuredEntries)
}

after {
  FileUtils.forceDelete(structuredFile)
  FileUtils.forceDelete(semiStructuredFile)
  structuredParquetFile.delete()
  sparkSession.stop()
  InMemoryDatabase.cleanDatabase()
}

"structured data" should "have its schema resolved automatically" in {
  val structuredJsonData = sparkSession.read.json(structuredFile.getAbsolutePath)

  val resolvedSchema = structuredJsonData.schema
  val schemaFields = resolvedSchema.fields

  val nameField = schemaFields.filter(field => field.name == "name")(0)
  nameField.dataType shouldBe(DataTypes.StringType)
  nameField.nullable shouldBe true
  val ageField = schemaFields.filter(field => field.name == "age")(0)
  ageField.dataType shouldBe(DataTypes.LongType)
  ageField.nullable shouldBe true
  val hobbyField = schemaFields.filter(field => field.name == "hobby")(0)
  hobbyField.dataType shouldBe(DataTypes.StringType)
  hobbyField.nullable shouldBe true
}

"structured data from in-memory database" should "have the schema resolved automatically" in {
  InMemoryDatabase
    .createTable("CREATE TABLE city(name varchar(30) primary key, country varchar(50))")

  val structuredDbData = sparkSession.read.format("jdbc")
    .option("url", InMemoryDatabase.DbConnection)
    .option("driver", InMemoryDatabase.DbDriver)
    .option("dbtable", "city")
    .option("user", InMemoryDatabase.DbUser)
    .option("password", InMemoryDatabase.DbPassword)
    .load()

  val resolvedSchema = structuredDbData.schema
  val schemaFields = resolvedSchema.fields

  val nameField = schemaFields.filter(field => field.name == "NAME")(0)
  nameField.dataType shouldBe(DataTypes.StringType)
  nameField.nullable shouldBe false
  val countryField = schemaFields.filter(field => field.name == "COUNTRY")(0)
  countryField.dataType shouldBe(DataTypes.StringType)
  countryField.nullable shouldBe true
}

"Parquet files" should "also have their schemas resolved automatically" in {
  val structuredJsonData = sparkSession.read.json(structuredFile.getAbsolutePath)
  structuredJsonData.write.parquet(structuredParquetFile.getAbsolutePath)
  val structuredParquetData = sparkSession.read.parquet(structuredParquetFile.getAbsolutePath)

  val resolvedSchema = structuredParquetData.schema
  val schemaFields = resolvedSchema.fields

  // The schema is expected to be the same as in the case
  // of test on structured JSON
  val nameField = schemaFields.filter(field => field.name == "name")(0)
  nameField.dataType shouldBe(DataTypes.StringType)
  nameField.nullable shouldBe true
  val ageField = schemaFields.filter(field => field.name == "age")(0)
  ageField.dataType shouldBe(DataTypes.LongType)
  ageField.nullable shouldBe true
  val hobbyField = schemaFields.filter(field => field.name == "hobby")(0)
  hobbyField.dataType shouldBe(DataTypes.StringType)
  hobbyField.nullable shouldBe true
}

"low sampling" should "make select query fail because of not detected column" in {
  // This time we defined a low sampling ratio
  // Because of that Spark SQL will analyze less
  // JSON entries and, probably, resolve schema for
  // only a part of columns; Thus some of columns
  // can be missing
  val semiStructuredJsonData = sparkSession.read.option("samplingRatio", 0.1)
    .json(semiStructuredFile.getAbsolutePath)

  // Note: if the test fails, it means that 'preferences' was
  //       resolved but there are some other columns missing
  //       To facilitate test understanding, 'preferences' was chosen
  //       as it was never found in local tests.
  val queryError = intercept[AnalysisException] {
    semiStructuredJsonData.select("preferences")
  }

  queryError.message.contains("cannot resolve '`preferences`'") shouldBe true
}

"high sampling" should "resolve all columns correctly" in {
  // Please note that by default the sampling ratio
  // is equal to 1.0. Here it's repeated only to
  // make insight on parameters difference between
  // both tests
  val semiStructuredJsonData = sparkSession.read.option("samplingRatio", 1.0).
    json(semiStructuredFile.getAbsolutePath)

  // Unlike previously, this query shouldn't fail because
  // all JSON entries were analyzed and 'preferences' entry
  // was found and included in the schema
  semiStructuredJsonData.select("preferences")

  val resolvedSchema = semiStructuredJsonData.schema
  val fields = resolvedSchema.fields

  val ageField = fields.filter(field => field.name == "age")(0)
  ageField.dataType shouldEqual(DataTypes.LongType)
  val cityField = fields.filter(field => field.name == "city")(0)
  cityField.dataType shouldEqual(DataTypes.StringType)
  val countryField = fields.filter(field => field.name == "country")(0)
  countryField.dataType shouldEqual(DataTypes.StringType)
  val hobbyField = fields.filter(field => field.name == "hobby")(0)
  hobbyField.dataType shouldEqual(DataTypes.StringType)
  val jobField = fields.filter(field => field.name == "job")(0)
  jobField.dataType shouldEqual(DataTypes.StringType)
  val nameField = fields.filter(field => field.name == "name")(0)
  nameField.dataType shouldEqual(DataTypes.StringType)
  val preferencesField = fields.filter(field => field.name == "preferences")(0)
  val expectedPreferencesType = StructType(Seq(
    StructField("fruit",DataTypes.StringType,true),
    StructField("sport",DataTypes.StringType,true)
  ))
  preferencesField.dataType shouldEqual(expectedPreferencesType)
}

Automatic schema resolution is pretty easy to understand. Spark SQL always picks the more sure version. If some schema information (database table, metadata files) is available somewhere, it will be used to define data schema used in computation. If the datasource is schemaless (JSON), it will sample some entries and try to deduce the schema from them. However, sampling can be dangerous (some columns can be missing) and make operations fail.

Share on:

Share, like or comment this post on Twitter: