Apache Spark SQL and types resolution in semi-structured data

on waitingforcode.com

Apache Spark SQL and types resolution in semi-structured data

One of data governance goals is to ensure data consistency across different producers. Unfortunately, very often it's only a theory and especially when the data format is schemaless. It's why the data exploration is an important step in the process of data pipeline definition. In this post I wanted to do a small exercise and check how Apache Spark SQL behaves with inconsistent data.

This post shows how Apache Spark SQL behaves with semi-structured data source having inconsistent values. The inconsistency is translated by different data types for the same attribute. I will present this in 2 sections, each one describing one specific scenario. The first one shows how Apache Spark SQL infers the schema from inconsistent JSON files. The next part covers user-defined-schemas and 3 parsing modes.

Interfered schema

The test cases of this post use the following JSON files saved once before the execution of all test scenarios:

private  val InconsistentDataset =
    """
      |{"id": 1, "created": "2018-10-10"}
      |{"id": {"value": 2, "code": "1234-5678-9011"}, "created": "2018-11-10"}
      |{"id": 3, "created": "2018-09-01"}
    """.stripMargin
private  val InconsistentFile = new File("./test_inconsistent_schema.json")
private  FileUtils.writeStringToFile(InconsistentFile , InconsistentDataset )

As you can see, the dataset has globally the same fields but they're not of the same type every time. Let's check now how Apache Spark SQL deals with it on its own, i.e. without an explicit schema definition:

  "Dataset" should "deduce the most universal schema" in {
    val structuredJsonData = sparkSession.read.json(InconsistentFile.getAbsolutePath)
    structuredJsonData.printSchema()

    structuredJsonData.schema.fields should have size 2
    structuredJsonData.schema("id").dataType shouldEqual StringType
    structuredJsonData.schema("created").dataType shouldEqual StringType
    val collectedRows = structuredJsonData.collect().map(row => row.mkString(", "))
    collectedRows should have size 3
    collectedRows should contain allOf("2018-10-10, 1", "2018-11-10, {\"value\":2,\"code\":\"1234-5678-9011\"}",
      "2018-09-01, 3")
  }

The engine has just interfered almost ideal schema - it considers all "id"s as strings. If we don't want to lose the value it's a perfect choice. But how did Apache Spark do that ? The answer is hidden very deep in the schema interference rules. When you don't define the schema explicitly, the engine calls JsonFileFormat#inferSchema method that in its turns invokes JsonDataSource#inferSchema, that finally calls TextInputJsonDataSource#infer method where the interference is made with the help of JsonInferSchema class. Quit complex but the above snippet coming from Apache Spark source code should explain a lot:

    // In each RDD partition, perform schema inference on each row and merge afterwards.
    val typeMerger = compatibleRootType(columnNameOfCorruptRecord, parseMode)
    val mergedTypesFromPartitions = json.mapPartitions { iter =>
      val factory = new JsonFactory()
      configOptions.setJacksonOptions(factory)
      iter.flatMap { row =>
        try {
          Utils.tryWithResource(createParser(factory, row)) { parser =>
            parser.nextToken()
            Some(inferField(parser, configOptions))
          }
        } catch {
          case  e @ (_: RuntimeException | _: JsonProcessingException) => parseMode match {
            case PermissiveMode =>
              Some(StructType(Seq(StructField(columnNameOfCorruptRecord, StringType))))
            case DropMalformedMode =>
              None
            case FailFastMode =>
              throw new SparkException("Malformed records are detected in schema inference. " +
                s"Parse Mode: ${FailFastMode.name}.", e)
          }
        }
      }.reduceOption(typeMerger).toIterator
    }

The key to understand the logic is typeMerger variable. In our case it'll merge 2 different types: LongType and StructType, just here:

  /**
   * Returns the most general data type for two given data types.
   */
  def compatibleType(t1: DataType, t2: DataType): DataType = {
       TypeCoercion.findTightestCommonType(t1, t2).getOrElse {
     // ... some decimal or array matchings
            // strings and every string is a Json object.
        case (_, _) => StringType

As you can see in the comment, in the case of type conflicts, Apache Spark tries to return the most generic type in order to satisfy all data and not make processing errors. It's the reason why it considered our long- and struct-based id attributes as a string.

Defined restrictive schema

Naturally, if we define our own schema that uses the most generic types for the fields, we'll get the same results as the ones described in the previous section:

  "Dataset" should "take the universal schema and not generate null fields" in {
    val schema = StructType(
      Seq(StructField("id", DataTypes.StringType, true), StructField("created", DataTypes.StringType, true))
    )
    val structuredJsonData = sparkSession.read.schema(schema)
      .json(InconsistentFile.getAbsolutePath)

    structuredJsonData.schema.fields should have size 2
    structuredJsonData.schema("id").dataType shouldEqual StringType
    structuredJsonData.schema("created").dataType shouldEqual StringType
    val collectedRows = structuredJsonData.collect().map(row => row.mkString(", "))
    collectedRows should have size 3
    collectedRows should contain allOf("1, 2018-10-10", "{\"value\":2,\"code\":\"1234-5678-9011\"}, 2018-11-10", "3, 2018-09-01")
  }

However, when we start to define more restrictive schemas, different things will happen accordingly to the defined parse mode option.

This option can take one of 3 values. The first one called PERMISSIVE and makes that any row not matching the user-specified schema will be returned with null values:

  "Dataset" should "take the not universal schema and generate null fields for not matching columns" in {
    val schema = StructType(
      Seq(StructField("id", StructType(Seq(StructField("value", DataTypes.IntegerType, true),
        StructField("code", DataTypes.StringType, true)))), StructField("created", DataTypes.StringType, true)
      )
    )
    val structuredJsonData = sparkSession.read.schema(schema)
      .option("mode", "permissive").json(InconsistentFile.getAbsolutePath)

    structuredJsonData.schema.printTreeString()
    structuredJsonData.schema.fields should have size 2
    structuredJsonData.schema("id").dataType shouldEqual StructType(Seq(StructField("value",IntegerType,true),
      StructField("code",StringType,true)))
    structuredJsonData.schema("created").dataType shouldEqual StringType
    val collectedRows = structuredJsonData.collect().map(row => row.mkString(", "))
    collectedRows should have size 3
    collectedRows should contain allElementsOf (Seq("null, null", "[2,1234-5678-9011], 2018-11-10", "null, null"))
  }

For the mode identified by DROPMALFORMED value all rows not matching the schema will be dropped:

  "Dataset" should "take the not universal schema and drop rows not matching columns" in {
    val schema = StructType(
      Seq(StructField("id", StructType(Seq(StructField("value", DataTypes.IntegerType, true),
        StructField("code", DataTypes.StringType, true)))), StructField("created", DataTypes.StringType, true)
      )
    )
    val structuredJsonData = sparkSession.read.schema(schema)
      .option("mode", "dropmalformed").json(InconsistentFile.getAbsolutePath)

    structuredJsonData.schema.printTreeString()
    structuredJsonData.schema.fields should have size 2
    structuredJsonData.schema("id").dataType shouldEqual StructType(Seq(StructField("value",IntegerType,true),
      StructField("code",StringType,true)))
    structuredJsonData.schema("created").dataType shouldEqual StringType
    val collectedRows = structuredJsonData.collect().map(row => row.mkString(", "))
    collectedRows should have size 1
    collectedRows(0) shouldEqual "[2,1234-5678-9011], 2018-11-10"
  }

Finally, for FAILFAST mode all rows not respecting the schema will make the whole processing fail:

  "Dataset" should "fail when one of fields doesn't match the schema in failfast mode" in {
    val schema = StructType(
      Seq(StructField("id", StructType(Seq(StructField("value", DataTypes.IntegerType, true),
        StructField("created", DataTypes.StringType, true)))), StructField("created", DataTypes.StringType, true)
      )
    )

    val malformedDataError = intercept[SparkException] {
      sparkSession.read.schema(schema)
        .option("mode", "failfast").json(InconsistentFile.getAbsolutePath).collect()
    }

    malformedDataError.getMessage should include ("Malformed records are detected in record parsing. Parse Mode: FAILFAST")
  }

JSON data source gained a lot of popularity last years with the micro-services and REST-based applications. JSON is a semi-structured data since it doesn't enforce the schema consistency across the data generated inside the same system. It's why we should always be careful with the processing logic. Apache Spark SQL deals with JSON in 2 manners. In the first one, it infers the schema by analyzing a sample from the input. It tries to makes the schema the most generic possible. In the second approach, the engine uses user-specified schema and if it encounters mismatching rows it either: returns them as null, doesn't return them or fail-fast with an exception. All these 3 strategies can be controlled with parsing mode and it was illustrated in the test cases from the last section.

Share, like or comment this post on Twitter:

Share on: