Save modes in Spark SQL

DataFrame can either be loaded and saved. And Spark SQL provides, as for a lot other points, different strategies to deal with data persistence.

4-day workshop · In-person or online

What would it take for you to trust your Databricks pipelines in production?

A 3-day bug hunt on a 3-person team costs up to €7,200 in lost engineering time. This workshop teaches you to prevent that — unit tests, data tests, and integration tests for PySpark and Databricks Lakeflow, including Spark Declarative Pipelines.

Unit, data & integration tests
Medallion architecture & Lakeflow SDP
Max 10 participants · production-ready templates
See the full curriculum → €7,000 flat fee · cohort of up to 10
Bartosz Konieczny
Bartosz
Konieczny

This short post presents different methods to save DataFrame as a file. The first section introduces the internal object involved in this action. The second part shows how the saving works through usual learning tests.

DataFrame save modes

All DataFrame saving logic is handled by the enum org.apache.spark.sql.SaveMode that defines 4 strategies that can be used in org.apache.spark.sql.DataFrameWriter#mode(saveMode: SaveMode) method defining the saving behavior in the case when the saved file already exists:

SaveMode is also used in other Spark built-in code. For instance, it's used by the queries like CREATE TABLE [IF NOT EXISTS] [db_name.]table_name. Also org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider uses it when the DataFrame is written into the database.

SaveModes examples

Showing SaveModes through learning tests is pretty straightforward:

private val sparkSession: SparkSession = SparkSession.builder().appName("SaveMode test").master("local")
  .getOrCreate()

private val BaseDir = "/tmp/spark-save-modes/"

private val FileToAppend = s"${BaseDir}appendable_file"

private val FileToOverwrite = s"${BaseDir}overridable_file"

private val AlreadyExistentFile = s"${BaseDir}already_existent_file"

private val AlreadyExistentOrder = Order(1, 1, 19d)

import sparkSession.implicits._

override def beforeAll() = {
  val ordersDataFrame = Seq(
    (1, 1, 19d)
  ).toDF("id", "customers_id", "amount")

  ordersDataFrame.write.json(FileToAppend)
  ordersDataFrame.write.json(FileToOverwrite)
  ordersDataFrame.write.json(AlreadyExistentFile)
}

override def afterAll() {
  File(FileToAppend).deleteRecursively()
  File(FileToOverwrite).deleteRecursively()
  File(AlreadyExistentFile).deleteRecursively()
  sparkSession.stop()
}

"created data" should "be appended to already existent data source" in {
  val newOrders = Seq(
    (10, 100, 2000d), (11, 100, 2500d)
  ).toDF("id", "customers_id", "amount")

  newOrders.write.mode(SaveMode.Append).json(FileToAppend)

  val allOrders = sparkSession.read.json(FileToAppend).map(Converter.rowToOrder(_)).collect()

  allOrders should have length(3)
  allOrders should contain allOf(AlreadyExistentOrder, Order(10, 100,  2000d), Order(11, 100, 2500d))
}

"created data" should "overwrite already existent data source" in {
  val newOrders = Seq(
    (20, 200, 3000d), (21, 200, 3500d)
  ).toDF("id", "customers_id", "amount")

  newOrders.write.mode(SaveMode.Overwrite).json(FileToOverwrite)

  val allOrders = sparkSession.read.json(FileToOverwrite).map(Converter.rowToOrder(_)).collect()

  allOrders should have length(2)
  allOrders should contain allOf(Order(20, 200,  3000d), Order(21, 200, 3500d))
}

"already existent data source" should "produce an error if the DataFrame is saved to the same location" in {
  val newOrders = Seq(
    (30, 300, 4000d), (31, 300, 4500d)
  ).toDF("id", "customers_id", "amount")

  val writingError = intercept[AnalysisException] {
    newOrders.write.mode(SaveMode.ErrorIfExists).json(AlreadyExistentFile)
  }

  writingError.getMessage should include ("path file:/tmp/spark-save-modes/already_existent_file already exists")
}

"created data" should "not be saved when the data source already exists" in {
  val newOrders = Seq(
    (30, 300, 4000d), (31, 300, 4500d)
  ).toDF("id", "customers_id", "amount")

  newOrders.write.mode(SaveMode.Ignore).json(AlreadyExistentFile)

  val allOrders = sparkSession.read.json(AlreadyExistentFile).map(Converter.rowToOrder(_)).collect()

  allOrders should have length(1)
  allOrders(0) should equal(AlreadyExistentOrder)
}

object Converter {

  def rowToOrder(orderRow: Row): Order = {
    Order(orderRow.getAs[Long]("id"), orderRow.getAs[Long]("customers_id"), orderRow.getAs[Double]("amount"))
  }
}

case class Order(id: Long, customer: Long, amount: Double)

SaveModes in Spark SQL are a pretty simple topic. However, they're often forgotten at the first times uses. It's why it's good to know the 4 existing types and the places where they can be used. After, the real use is simple because it's resumed to putting appropriate enum entry to .mode(...) method.

Data Engineering Design Patterns

Looking for a book that defines and solves most common data engineering problems? I wrote one on that topic! You can read it online on the O'Reilly platform, or get a print copy on Amazon.

I also help solve your data engineering problems contact@waitingforcode.com đź“©