After publishing a release of my blog post about the insertInto trap, I got an intriguing question in the comments. The alternative to the insertInto, the saveAsTable method, doesn't work well on partitioned data in overwrite mode while the insertInto does. True, but is there an alternative to it that doesn't require using this position-based function?
Data Engineering Design Patterns

Looking for a book that defines and solves most common data engineering problems? I'm currently writing
one on that topic and the first chapters are already available in 👉
Early Release on the O'Reilly platform
I also help solve your data engineering problems 👉 contact@waitingforcode.com 📩
A note before we start, in the blog post we consider only the overwrite save mode. I'll not mention it for each code snippet to keep the blog post readable.
Partition overwrite modes
Understanding the topic requires introducing a concept coming from the early days of Apache Spark where Hive and Hadoop was still main enterprise solutions, the partition overwrite mode. This mode defines how the DataFrame writer will behave regarding the existing partitions. It can be configured as static or dynamic via spark.sql.sources.partitionOverwriteMode.
The difference between these two modes is pretty important for your data. If you use the static one, your DataFrame writer will replace the existing partitions with the ones present in the written DataFrame. The dynamic mode, on another hand, replaces only the partitions present in the written DataFrame. Put differently, the code below:
Seq((3, "C", "c"), (4, "D", "d")).toDF("nr", "upper_case", "lower_case").write .mode(SaveMode.Overwrite).partitionBy("nr") .saveAsTable("test_table") Seq(("Cc", "cc", 3), ("E", "e", 5)).toDF("upper_case", "lower_case", "nr").write .mode(SaveMode.Overwrite) .insertInto("test_table")
...will create the following tables:
- For the static mode:
+----------+----------+---+ |upper_case|lower_case| nr| +----------+----------+---+ | Cc | cc| 3| | E | e| 5| +----------+----------+---+
- For the dynamic mode:
+----------+----------+---+ |upper_case|lower_case| nr| +----------+----------+---+ | Cc | cc| 3| | D | d| 4| | E | e| 5| +----------+----------+---+
Why this difference? The answer comes from the InsertIntoHadoopFsRelationCommand#run method which is responsible for physically writing the DataFrame on disk. After writing the new DataFrame, the function updates the partitions with the following code:
def refreshUpdatedPartitions(updatedPartitionPaths: Set[String]): Unit = { val updatedPartitions = updatedPartitionPaths.map(PartitioningUtils.parsePathFragment) if (partitionsTrackedByCatalog) { val newPartitions = updatedPartitions -- initialMatchingPartitions if (newPartitions.nonEmpty) { AlterTableAddPartitionCommand( catalogTable.get.identifier, newPartitions.toSeq.map(p => (p, None)), ifNotExists = true).run(sparkSession) } // For dynamic partition overwrite, we never remove partitions but only update existing // ones. if (mode == SaveMode.Overwrite && !dynamicPartitionOverwrite) { val deletedPartitions = initialMatchingPartitions.toSet -- updatedPartitions if (deletedPartitions.nonEmpty) { AlterTableDropPartitionCommand( catalogTable.get.identifier, deletedPartitions.toSeq, ifExists = true, purge = false, retainData = true /* already deleted */).run(sparkSession) } } }
I highlighted two parts on purpose. The first marked section adds the partitions created in the current write. So in our case for both static and dynamic modes, it'll create the partition nr=5. But the similarities stop here. The second highlighted section shows a different behavior for static and dynamic modes. The static mode first detects all partitions that are missing in the written DataFrame and puts them to the deletedPartitions sequence. Next, if the sequence is not empty, the function executes a drop partitions command that removes the partitions missing in the written DataFrame.
The code I just shared is a simplification for the static semantics. In realty, you can avoid overwriting all partitions missing in the written DataFrame by using the partition spec in your INSERT OVERWRITE statement such as:
Seq((3, "C", "c"), (4, "D", "d")).toDF("nr", "upper_case", "lower_case").write .mode(SaveMode.Overwrite).partitionBy("nr") .saveAsTable("test_table") sparkSession.sql( """ |INSERT OVERWRITE test_table PARTITION(nr=3) (upper_case, lower_case) |VALUES |("Cc", "cc") |""".stripMargin) sparkSession.sql("SELECT * FROM test_table").show()
The command will impact only the partition from the partition spec:
+----------+----------+---+ |upper_case|lower_case| nr| +----------+----------+---+ | Cc | cc| 3| | D | d| 4| +----------+----------+---+
Why? The difference comes from SessionCatalog#listPartitions function that for this specific case takes a predicate parameter in partialSpec as Some(Map(nr -> 3)). Consequently, the underlying catalog filters out all partitions not matching this predicate.
You must be aware that as of this writing (Apache Spark 3.5.1), the default mode is set to static.
saveAsTable and dynamic partition overwrite
From the previous section you may be like OK, so the dynamic partition overwrite only replaces the partitions present in the written dataset. So the saveAsTable should also replace only those partitions, shouldn't it?. Unfortunately, it's not true.
First, the saveAsTable is a different operation than insertInto. The saveAsTable, as the name suggests, saves a DataFrame as a table and under-the-hood, involves a CREATE TABLE statement. The insertInto does the opposite, thus it operates on an existing table and only adds new rows to it.
Consequently, even if you invoke saveAsTable with dynamic overwrite mode, like in the next snippet...
Seq((3, "C", "c"), (4, "D", "d")).toDF("nr", "upper_case", "lower_case").write .mode(SaveMode.Overwrite).partitionBy("nr") .saveAsTable("test_table") sparkSession.sql("SELECT * FROM test_table").show() Seq(("Cc", "cc", 3), ("E", "e", 5)).toDF("upper_case", "lower_case", "nr").write .partitionBy("nr").mode(SaveMode.Overwrite) .saveAsTable("test_table") sparkSession.sql("SELECT * FROM test_table").show() sparkSession.sql("SHOW PARTITIONS test_table").show()
...the generated table will only store rows defined by the last writer:
+----------+----------+---+ |upper_case|lower_case| nr| +----------+----------+---+ | C| c| 3| | D| d| 4| +----------+----------+---+ +----------+----------+---+ |upper_case|lower_case| nr| +----------+----------+---+ | Cc| cc| 3| | E| e| 5| +----------+----------+---+ +---------+ |partition| +---------+ | nr=3| | nr=5| +---------+
Why that happens? The reason comes from the internal saveAsTable behavior that for the overwrite save mode, independently on the table's type (partitioned or nonpartitioned), runs a DROP TABLE before CREATE TABLE:
private def saveAsTable(tableIdent: TableIdentifier): Unit = { val catalog = df.sparkSession.sessionState.catalog val qualifiedIdent = catalog.qualifyIdentifier(tableIdent) val tableExists = catalog.tableExists(qualifiedIdent) val tableName = qualifiedIdent.unquotedString (tableExists, mode) match { case (true, SaveMode.Overwrite) => // ... catalog.dropTable(qualifiedIdent, ignoreIfNotExists = true, purge = false) createTable(qualifiedIdent) catalog.refreshTable(qualifiedIdent)
Dynamic partitioning mode and Delta Lake
Before we go to the solutions section, let's see the partitions overwriting for Delta Lake. Delta Lake comes with two ways to address the partitions overwriting problem. The first one is quite similar to the static partitioning you saw previously. It uses a condition to identify the rows to overwrite. If the attribute used in the condition references the partition, Delta Lake will natively replace all files within the partition. The feature we're talking here is a writer option called replaceWhere that you can use as:
Seq((3, "C", "c"), (4, "D", "d")).toDF("nr", "upper_case", "lower_case").write.format("delta") .mode(SaveMode.Overwrite).partitionBy("nr") .saveAsTable("test_table") Seq((3, "Cc", "cc")).toDF("nr", "upper_case", "lower_case").write.format("delta") .option("replaceWhere", "nr = 3") .mode(SaveMode.Overwrite) .insertInto("test_table") sparkSession.sql("SELECT * FROM test_table").show()
According to the condition, the code above will overwrite only the partition number 3.
And what about the dynamic partitioning? Initially (cf [Feature Request] support for dynamic partition overwrite #348) the community was not fully convinced in implementing this feature. The reason is important. As you saw previously, the dynamic partitioning overwrites all the partitions present in the written dataset. Consequently, if you encounter a late data phenomena and the writer has one single row for one of the past partitions, the writer will truncate this past partition to write this tiny little record to it. The replaceWhere action is then intentional, a little bit like the static partitioning, while the dynamic overwrite may hide some runtime surprises. Finally, after the discussion about different use cases, the dynamic overwrite was added to Delta Lake 2, and you can use it by defining the partitionOverwriteMode parameter on the writer:
Seq((3, "C", "c"), (4, "D", "d")).toDF("nr", "upper_case", "lower_case").write.format("delta") .mode(SaveMode.Overwrite).partitionBy("nr") .saveAsTable("test_table") Seq((3, "Cc", "cc"), (5, "E", "e")).toDF("nr", "upper_case", "lower_case").write.format("delta") .option("partitionOverwriteMode", "dynamic") .mode(SaveMode.Overwrite).insertInto("test_table") sparkSession.sql("SELECT * FROM test_table").show()
The code above generates:
+---+----------+----------+ | nr|upper_case|lower_case| +---+----------+----------+ | 3| Cc| cc| | 5| E| e| | 4| D| d| +---+----------+----------+
Overwriting partitions
With all this technical context set up, we can now see how to overwrite a particular partition in Apache Spark SQL and, ideally, get rid of the position-based requirement for the insertInto command. If you use Delta Lake, you can leverage the replaceWhere and saveAsTable which is a column-based operation. In the next snippet I reversed the letters column order on purpose. You can see that despite this mistake, the data gets written correctly:
Seq((3, "C", "c"), (4, "D", "d")).toDF("nr", "upper_case", "lower_case").write.format("delta") .mode(SaveMode.Overwrite).partitionBy("nr") .saveAsTable("test_table") Seq(("Cc", 3, "cc")).toDF("upper_case", "nr", "lower_case").write.format("delta") .option("replaceWhere", "nr = 3") .mode(SaveMode.Overwrite) .saveAsTable("test_table") sparkSession.sql("SELECT * FROM test_table").show() /** +---+----------+----------+ | nr|upper_case|lower_case| +---+----------+----------+ | 3| Cc| cc| | 4| D| d| +---+----------+----------+ */
And the commit log will also contain only the operation on the partition:
cat /tmp/spark-playground/partitions/delta-lake-replacewhere-saveastable-example/warehouse/test_table/_delta_log/00000000000000000001.json {"add":{"path":"nr=3/part-00000-115c3c9e-cd66-40b3-9035-1b1144cfb9dc.c000.snappy.parquet","partitionValues":{"nr":"3"},"size":710,"modificationTime":1738470972939,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"upper_case\":\"Cc\",\"lower_case\":\"cc\"},\"maxValues\":{\"upper_case\":\"Cc\",\"lower_case\":\"cc\"},\"nullCount\":{\"upper_case\":0,\"lower_case\":0}}"}} {"remove":{"path":"nr=3/part-00000-91acfb0e-5ca1-4c3c-9f68-b2432438471c.c000.snappy.parquet","deletionTimestamp":1738470971594,"dataChange":true,"extendedFileMetadata":true,"partitionValues":{"nr":"3"},"size":696,"stats":"{\"numRecords\":1}"}}
💡 Remember to Keep It Simple!
If you don't work with Delta Lake and the position-based insertInto works for you, do not complexify your codebase. If on another hand, you have already encountered some issues due to the position-based character, you can try the native insertByName on Databricks, or a hacky insertByName leveraging unionByName presented next.
If you don't use Delta Lake, then despite the position-based drawback, the insertInto seems to be the easiest and most elegant solution to overwrite a partition. The saveAsTable, even with the dynamic partitioning enabled, will in that case overwrite the whole table. If you are concerned about the columns order requirement, you can create an insert-by-name function inspired by the Dataset#unionByName:
def insert_by_name(table_name: str, data_to_write: DataFrame): table_schema = data_to_write.sparkSession.table(table_name).schema dataset_from_table = data_to_write.sparkSession.createDataFrame([], table_schema) dataset_from_table.unionByName(data_to_write).write.insertInto(table_name, overwrite=True)
As the Scala version looks similar, I'll omit it here but you can still find it on Github. Instead, I prefer to explain the logic to you. First, we know that the written DataFrame should match the schema of the table. For that reason, the function gets the schema in the first step. Next, the method creates an in-memory DataFrame with the retrieved schema. This in-memory DataFrame represents the rows in correct order but as you know, it might not be the order of the written DataFrame. For that reason, before executing the insert command, the function does a union-by-name, thus the column-based one, between the ideal DataFrame and the user's DataFrame. As a result, the union creates a DataFrame with the correct column positions and writes them to the table. You can see it by analyzing the execution plan:
== Analyzed Logical Plan == upper_case: string, lower_case: string, nr: bigint Union false, false :- LogicalRDD [upper_case#30, lower_case#31, nr#32L], false +- Project [upper_case#18, lower_case#20, nr#19L] +- LogicalRDD [upper_case#18, nr#19L, lower_case#20], false == Optimized Logical Plan == Union false, false :- LogicalRDD [upper_case#30, lower_case#31, nr#32L], false +- Project [upper_case#18, lower_case#20, nr#19L] +- LogicalRDD [upper_case#18, nr#19L, lower_case#20], false
Insert by name on Databricks
If you are working with Databricks, the INSERT by name is natively available as one of the parameters of the insert statement:

Our insert_by_name function is a bit hacky. After all, we perform some completely unrelated operation to data writing which is unionByName. Thankfully, if you use Delta Lake, you can leverage the replaceWhere option with column-based saveAsTable, and who knows, maybe in the future, Apache Spark SQL API will get a native support for an insert-by-name?
Consulting

With nearly 16 years of experience, including 8 as data engineer, I offer expert consulting to design and optimize scalable data solutions.
As an O’Reilly author, Data+AI Summit speaker, and blogger, I bring cutting-edge insights to modernize infrastructure, build robust pipelines, and
drive data-driven decision-making. Let's transform your data challenges into opportunities—reach out to elevate your data engineering game today!
👉 contact@waitingforcode.com
đź”— past projects