Schema case sensitivity for JSON source in Apache Spark SQL

Versions: Apache Spark 2.4.3

On the one hand, I appreciate JSON for its flexibility but also from the other one, I hate it for exactly the same thing. It's particularly painful when you work on a project without good data governance. The most popular pain is an inconsistent field type - Spark can manage that by getting the most common type. Unfortunately, it's a little bit trickier for less common problems, for instance when a same field has different case sensitivity.

In this post I will cover an attribute called spark.sql.caseSensitive and show how to use it to handle the same field with different case sensitivity. Please notice that the documentation states that "it is highly discouraged to turn on case sensitive mode" so always check if there is no another way to solve the issue.

Working example

To figure out the problem, I will use this code:

  private val testInputFilePath = "/tmp/test-case-sensitivity.json"

  override def beforeAll(): Unit = {
    val differentCaseSensitivityData =
      """
        |{"key": "event_1", "value": 1 }
        |{"key": "event_2", "Value": 1 }
        |{"key": "event_3", "ValuE": 1  }
      """.stripMargin
    FileUtils.writeStringToFile(new File(testInputFilePath), differentCaseSensitivityData)
  }

  override def afterAll(): Unit = {
    FileUtils.forceDelete(new File(testInputFilePath))
  }

As you can notice, one of the fields is written with different case sensitivity. Since Spark SQL's case sensitivity is disabled by default, if you try to create a schema to match at least 2 of the fields, you will get an error at the analysis stage of the query execution:

  "disabled case sensitivity" should "throw an exception for confusing schema" in {
    val sparkSession: SparkSession = SparkSession.builder()
      .appName("test")
      .config("spark.sql.caseSensitive", "false")
      .master("local[2]").getOrCreate()

    val schema = StructType(Seq(
      StructField("key", StringType), StructField("value", IntegerType)
      , StructField("Value", IntegerType)
    ))

    val error = intercept[AnalysisException] {
      sparkSession.read.schema(schema).json(testInputFilePath).show(false)
    }
    error.getMessage() shouldEqual "Found duplicate column(s) in the data schema: `value`;"
  }

Enabled case sensitivity

Let's see now what happens if we execute the same query with the case sensitivity enabled:

  "enabled case sensitivity" should "get rows matching value and Value columns" in {
    val sparkSession: SparkSession = SparkSession.builder()
      .appName("test")
      .config("spark.sql.caseSensitive", "true")
      .master("local[2]").getOrCreate()
    import sparkSession.implicits._

    val schema = StructType(Seq(
      StructField("key", StringType), StructField("value", IntegerType)
      , StructField("Value", IntegerType)
    ))

    val rows = sparkSession.read.schema(schema).json(testInputFilePath)
      .filter(
        """
          |value = 1 OR Value = 1
        """.stripMargin)
      .map(row => (row.getAs[Int]("value"), row.getAs[Int]("Value")))
      .collect()

    rows should have size 2
    rows should contain allOf((0, 1), (1, 0))
  }

Internals

Case sensitivity flag is used for various reasons. The first one is the schema validation. PartitionColumn's validatePartitionColumn( schema: StructType, partitionColumns: Seq[String], caseSensitive: Boolean) uses it to validate the partitioning column. checkColumnNameDuplication(columnNames: Seq[String], colType: String, caseSensitiveAnalysis: Boolean) from SchemaUtils uses the flag to check whether the schema doesn't contain duplicates. This method is invoked for instance when you add a new column via withColumn method.

The second purpose of the flag is the normalization. Depending on the value, Apache Spark will either use the name you specified in the schema or will normalize it to lower case. It happens for instance when the header of CSV file is analyzed (CSVDataSource's makeSafeHeader method) or the schemas normalized in HadoopFsRelation. But it doesn't mean that the schema will be transformed into a lowercase version. The normalized schema will mostly serve for validation purposes to detect duplicated columns or not existent columns for an INSERT INTO query.

Long story short, the case sensitivity flag won't rewrite your schema into a lowercase version. That's understandable if you know that the method exposing the configuration property contains "analysis" in its name SQLConf#caseSensitiveAnalysis, so it applies only before the query execution, at the analysis stage. If you want, you can also see that the schema is not altered in this test:

  "schema sensitivity" should "be respected even with case sensitive flag disabled" in {
    val sparkSession: SparkSession = SparkSession.builder()
      .appName("test")
      .config("spark.sql.caseSensitive", "false")
      .master("local[2]").getOrCreate()
    import sparkSession.implicits._

    val schema = StructType(Seq(
      StructField("key", StringType), StructField("Value", IntegerType)
    ))

    val rows = sparkSession.read.schema(schema).json(testInputFilePath)
      .filter(
        """
          |Value = 1
        """.stripMargin)
      .map(row => (row.getAs[String]("key"), row.getAs[Int]("Value")))
      .collect()

    rows should have size 1
    rows(0) shouldEqual ("event_2", 1)
    val schemaFromQuery = sparkSession.read.schema(schema).json(testInputFilePath).schema.toString
    schemaFromQuery shouldEqual "StructType(StructField(key,StringType,true)," +
      " StructField(Value,IntegerType,true))"
  }

Why it's discouraged?

As I mentioned at the beginning, the documentation discourages the changes of the case sensitivity. The reason for that was given in SPARK-15229 ticket. The case-sensitivity support in Apache Spark differs from the one in the ANSI SQL. In the standard, the identifiers are case sensitive if they're wrapped in quotes. Otherwise, they're insensitive, so the join expression like "ON table1.MyField = table2.myfield" will work.

From the quoted JIRA you can learn that the current Spark implementation is intended to change in the future. To avoid an important refactoring for that, it's then advised to not use it directly.

Moreover, using case sensitivity to solve the problem covered in the post is like using a hack. The problem comes from the fact that some producers generate invalid data and the fix should apply to them first. But if changing your producer is not easy, maybe because of the long delivery cycle, you can add an extra step in your pipeline that will normalize every data quality issues before exposing the data to the rest of the system.