Apache Spark SQL, Hive and insertInto command

Versions: Apache Spark 2.3.2 https://github.com/bartosz25/spark-...veBehaviorInsertOverwriteTest.scala

Some time ago on my Github bithw1 pointed out an interesting behavior of Hive integration on Apache Spark SQL. To not delve too much into details now, I can tell that the behavior was about not respected DataFrame schema. Our quick exchange ended up with an explanation but it also encouraged me to go much more into details to understand the hows and whys.

Data Engineering Design Patterns

Looking for a book that defines and solves most common data engineering problems? I'm currently writing one on that topic and the first chapters are already available in 👉 Early Release on the O'Reilly platform

I also help solve your data engineering problems 👉 contact@waitingforcode.com 📩

This post develops the Github's discussion (link in the "Read also" section). Its first section presents what really happens and why it may be a problem. The second part focuses on the implementation details making it happen.

Not respected DataFrame schema

Let's start with a small example showing how we could work with Hive on top of Apache Spark SQL:

private val TestSparkSession = SparkSession.builder().master("local").appName("Hive on Spark SQL - insertInto")
  .config("spark.sql.sources.partitionOverwriteMode", "dynamic")
  .enableHiveSupport().getOrCreate()
import TestSparkSession.implicits._

it should "write new data to DataFrame" in {
  val targetTable = "Hive_Respect_Schema"
  val initialDataFrame = Seq(("id#1", 3), ("id#2", 4), ("id#1", 6)).toDF("id", "value")

  initialDataFrame.write.mode(SaveMode.Overwrite).partitionBy("id").saveAsTable(targetTable)

  val dataFromTable = TestSparkSession.sql(s"SELECT * FROM ${targetTable}")

  val stringifiedDataFromTable = dataFromTable.map(row => HiveTestConverter.mapRowToString(row)).collect()
  stringifiedDataFromTable should have size 3
  stringifiedDataFromTable should contain allOf("id#1_3", "id#1_6", "id#2_4")
}

As you can see, the above code does nothing strange. We only partition our data by the values of "id" column. This code fits well for most of cases. But sometimes it may need to be reworked. It happens especially when something went wrong during data processing and we must recover the data into the correct state. In such situation, we can either regenerate all the data or only the data for modified partitions. And for the latter case, we can use the Overwrite save mode with insertInto operation:

it should "ignore DataFrame schema when using insertInto command" in {
  val targetTable = "Hive_Ignore_Schema_1"
  val initialDataFrame = Seq(("id#1", 3), ("id#2", 4), ("id#1", 6)).toDF("id", "value")

  initialDataFrame.write.mode(SaveMode.Overwrite).partitionBy("id").saveAsTable(targetTable)

  val dataFromTable = TestSparkSession.sql(s"SELECT * FROM ${targetTable}")

  val stringifiedDataFromTable = dataFromTable.map(row => HiveTestConverter.mapRowToString(row)).collect()
  stringifiedDataFromTable should have size 3
  stringifiedDataFromTable should contain allOf("id#1_3", "id#1_6", "id#2_4")

  // Now we want to overwrite whole partition of 'id#1' but was you'll see, it doesn't work correctly
  Seq(("id#1", 120), ("id#1", 20)).toDF("id", "value").write.mode(SaveMode.Overwrite).insertInto(targetTable)

  val dataFromTableAfterInsert = TestSparkSession.sql(s"SELECT * FROM ${targetTable}")
  val stringifiedDataFromTableAfterInsert = dataFromTable.map(row => HiveTestConverter.mapRowToString(row)).collect()
  stringifiedDataFromTableAfterInsert should have size 5
  stringifiedDataFromTableAfterInsert should contain allOf("id#1_3", "id#1_6", "id#2_4", "120_null", "20_null")
}

Unfortunately it doesn't work as we could expect. As you can see, the values of columns were reversed - despite the fact that DataFrame brings the schema information. Even worse, the columns were not only reversed but the value were set to null because of different type (int expected, String given). But this behavior is normal. To see that we just need to read the Scaladoc of insertInto method:

* @note Unlike `saveAsTable`, `insertInto` ignores the column names and just uses position-based
* resolution. For example:
* {{{
*    scala> Seq((1, 2)).toDF("i", "j").write.mode("overwrite").saveAsTable("t1")
*    scala> Seq((3, 4)).toDF("j", "i").write.insertInto("t1")
*    scala> Seq((5, 6)).toDF("a", "b").write.insertInto("t1")
*    scala> sql("select * from t1").show
*    +---+---+
*    |  i|  j|
*    +---+---+
*    |  5|  6|
*    |  3|  4|
*    |  1|  2|
*    +---+---+
* }}}

Even though this behavior is correct, it's worth checking why it must to be implemented that way.

Hive insertInto under-the-hood

You've got a part of the answer in the previous paragraph - the schema of DataFrame was not respected. Not being an expert in Hive integration, I started by analyzing everything I could find about: dynamic partitioning, INSERT OVERWRITE statement and Spark's integration.

Regarding dynamic partitioning, in our test specified by spark.sql.sources.partitionOverwriteMode configuration, This entry takes 2 values: static and dynamic. When the former is used, the framework deletes all rows matching the partition specification and only later it inserts the remaining values. The specification takes the form of the expression defining partitions impacted by the INSERT statement. For instance, if PARTITION (p1 = 3, p2 = 4) clause is defined, it means that the partition (p1 = 3, p2 = 4) will be impacted by the INSERT. In dynamic configuration, Apache Spark doesn't remove any row before insert operation. It only overrides the partitions having data during the insert operation and the definition of partitions is resolved dynamically.

As you could see after previous explanation, dynamic partitioning works in overwriting mode, hence the previous paragraph talked about INSERT OVERWRITE statement. According to the documentation, this command overwrites any existing data within a given table or partition. It's the opposite for INSERT INTO which will append new data to the table or the partition. And when we mix dynamic partitions with INSERT OVERWRITE we must be careful because declaration order does matter. The inserted values are resolved by position and not by name as we could initially think. So to make our test case working, we should simply switch schema definition into:

it should "respect DataFrame schema because it fits Hive table columns positions" in {
  val targetTable = "Hive_Respect_Schema_Index"
  val initialDataFrame = Seq(("id#1", 3), ("id#2", 4), ("id#1", 6)).toDF("id", "value")

  initialDataFrame.write.mode(SaveMode.Overwrite).partitionBy("id").saveAsTable(targetTable)

  val dataFromTable = TestSparkSession.sql(s"SELECT * FROM ${targetTable}")

  val stringifiedDataFromTable = dataFromTable.map(row => HiveTestConverter.mapRowToString(row)).collect()
  stringifiedDataFromTable should have size 3
  stringifiedDataFromTable should contain allOf("id#1_3", "id#1_6", "id#2_4")

  // DataFrame schema is "reversed" compared to the previous declaration
  Seq((120, "id#1"), (20, "id#1")).toDF("value", "id").write.mode(SaveMode.Overwrite).insertInto(targetTable)

  val dataFromTableAfterInsert = TestSparkSession.sql(s"SELECT * FROM ${targetTable}")
  val stringifiedDataFromTableAfterInsert = dataFromTable.map(row => HiveTestConverter.mapRowToString(row)).collect()
  stringifiedDataFromTableAfterInsert should have size 3
  stringifiedDataFromTableAfterInsert should contain allOf("id#1_120", "id#1_20", "id#2_4")
}

// It also works when we've more than 2 columns
it should "respect DataFrame 3 fields schema because it fits Hive table columns positions" in {
  val targetTable = "Hive_Respect_Schema_3_fields"
  val initialDataFrame = Seq(("id#1", 3, "a"), ("id#2", 4, "b"), ("id#1", 6, "c")).toDF("id", "value", "letter")

  initialDataFrame.write.mode(SaveMode.Overwrite).partitionBy("id").saveAsTable(targetTable)

  val dataFromTable = TestSparkSession.sql(s"SELECT * FROM ${targetTable}")

  val stringifiedDataFromTable = dataFromTable.map(row => HiveTestConverter.mapRowWithLetterToString(row)).collect()
  stringifiedDataFromTable should have size 3
  stringifiedDataFromTable should contain allOf("id#1_3_a", "id#1_6_c", "id#2_4_b")

  Seq((120, "d", "id#1"), (20, "e","id#1")).toDF("value", "letter", "id").write.mode(SaveMode.Overwrite).insertInto(targetTable)

  val dataFromTableAfterInsert = TestSparkSession.sql(s"SELECT * FROM ${targetTable}")
  val stringifiedDataFromTableAfterInsert = dataFromTable.map(row => HiveTestConverter.mapRowWithLetterToString(row)).collect()
  stringifiedDataFromTableAfterInsert should have size 3
  stringifiedDataFromTableAfterInsert should contain allOf("id#1_120_d", "id#1_20_e", "id#2_4_b")
}

But before finishing this post, the last investigation point remains - Hive integration on Apache Spark SQL. Since DataFrame is a structured data, we could imagine that it'll try to match its schema to the schema of Hive table. But it's not the case and after adding some breakpoints in the code we can see that the engine changes initial projection into Hive-compliant one:

Project [cast(id#29 as int) AS value#35, cast(value#30 as string) AS id#36]
+- Project [_1#26 AS id#29, _2#27 AS value#30]
   +- LocalRelation [_1#26, _2#27]

The inversion happens in PreprocessTableInsertion rule where Apache Spark uses org.apache.spark.sql.execution.datasources.DDLPreprocessingUtils#castAndRenameQueryOutput(query: LogicalPlan, expectedOutput: Seq[Attribute], conf: SQLConf) to transform attribute-based DataFrame schema into position-based Hive schema:

// You can compare what is expected and what is provided with the Spark's original query
// query.output
// result = {$colon$colon@17346} "::" size = 2
//   0 = {AttributeReference@17348} "id#29"
//   1 = {AttributeReference@17349} "value#30"
// expectedOutput = {$colon$colon@14274} "::" size = 2
//   0 = {AttributeReference@17334} "value#15"
//   1 = {AttributeReference@17335} "id#16"
val newChildOutput = expectedOutput.zip(query.output).map {
  case (expected, actual) =>
    if (expected.dataType.sameType(actual.dataType) &&
      expected.name == actual.name &&
      expected.metadata == actual.metadata) {
      actual
    } else {
      // Renaming is needed for handling the following cases like
      // 1) Column names/types do not match, e.g., INSERT INTO TABLE tab1 SELECT 1, 2
      // 2) Target tables have column metadata
      Alias(
        Cast(actual, expected.dataType, Option(conf.sessionLocalTimeZone)),
        expected.name)(explicitMetadata = Option(expected.metadata))
    }
}

Apache Spark is a great framework able to communicate with different data sources. However, in some cases it's important to know the very fine details of them. Otherwise, we expose ourselves to strange behavior that may require some debugging time. And it's the case of Hive's dynamic partitioning and Spark's insertInto operation. When both are used we need carefully define the DataFrame schema that should begin with data and terminate with partition column(s).