Ignoring files issues in Apache Spark SQL

Versions: Apache Spark 2.4.5

I have to consider myself as a lucky guy since I've never had to deal with incorrectly formatted files. However, that's not the case of everyone. Hopefully, Apache Spark comes with few configuration options to manage that.

In this blog post I will focus on 2 properties that you can use to manage issues with the input datasets, namely spark.sql.files.ignoreCorruptFiles and spark.sql.files.ignoreMissingFiles. Currently, I will start by the latter one since it's much easier to illustrate.

spark.sql.files.ignoreMissingFiles

spark.sql.files.ignoreMissingFiles property is responsible for throwing an exception when the file that is supposed to be processed disappears at the moment of its processing. How can it happen? To find an answer, we must learn how Apache Spark SQL works with directories as the input. Everything happens in resolveRelation(checkFilesExist: Boolean) method:

        } else {
          val index = createInMemoryFileIndex(globbedPaths)
          val (resultDataSchema, resultPartitionSchema) =
            getOrInferFileFormatSchema(format, Some(index))
          (index, resultDataSchema, resultPartitionSchema)
        }

As you can see, Apache Spark creates an instance of InMemoryFileIndex class that stores all files that will be processed. The problem of missing files can then happen if the listed files are removed meantime by another process. I simulated this in the following snippet:

  private val sparkSession: SparkSession = SparkSession.builder()
    .appName("Spark SQL ignore corrupted files")
    .master("local[2]")
    .config("spark.sql.files.ignoreMissingFiles", "false")
    .config("spark.sql.shuffle.partitions", "2")
    .getOrCreate()
  "ignore missing files flag" should "not make the processing fail when the files are deleted before processing" in {


    FilesManager.writeFiles()

    val linesCount = sparkSession.read
      .schema(StructType(Seq(StructField("letter", StringType))))
      .json(FilesManager.OutputDir)
      .filter(row => {
        FilesManager.removeFiles()
        println(s"filtering ${row}")
        // Returns always true, here just to start the files removal
        true
      })
      .count()

    linesCount should not equal 8
}

object FilesManager {
  val OutputDir = "/tmp/spark-tests/invalid-files/missing-files"

  def removeFiles() = {
    synchronized {
      FileUtils.deleteDirectory(new File(OutputDir))
    }
  }

  def writeFiles() = {
    val dataToWrite = Seq(
      (Seq(
        """{"letter": "A", "number": 1}""",
        """{"letter": "B", "number": 2}"""), "1.jsonl"),
      (Seq(
        """{"letter": "C", "number": 3}""",
        """{"letter": "D", "number": 4}"""), "2.jsonl"),
      (Seq(
        """{"letter": "C", "number": 3}""",
        """{"letter": "D", "number": 4}"""), "3.jsonl"),
      (Seq(
        """{"letter": "C", "number": 3}""",
        """{"letter": "D", "number": 4}"""), "4.jsonl")
    )
    import scala.collection.JavaConverters._
    dataToWrite.foreach(linesToWriteWithFileName => {
      FileUtils.writeLines(new File(s"${OutputDir}/${linesToWriteWithFileName._2}"),
        linesToWriteWithFileName._1.asJava)
    })
  }
}


As you can see, I'm creating here 2 files that are removed when the first line is filtered. Normally, the code should fail with:

Task 0 in stage 2.0 failed 1 times, most recent failure: Lost task 0.0 in stage 2.0 (TID 3, localhost, executor driver): java.io.FileNotFoundException: File file:/tmp/spark-tests/invalid-files/1.jsonl does not exist

To fix the issue, we can set the spark.sql.files.ignoreMissingFiles property to true. The whole process is summarized in the video below:

spark.sql.files.ignoreCorruptFiles

The second property discussed in this post is spark.sql.files.ignoreCorruptFiles. Thanks to it you can allow Spark to process a given dataset even though it has some corrupted files. How is this "corruption" translated? The answer is hidden in FileScanRDD that, if the ignoreCorruptFiles is enabled, creates a kind of safe iterator. Its safety is translated by:

private lazy val internalIter = readCurrentFile()
override def getNext(): AnyRef = {
                try {
// ...
                  }
                } catch {
// ...
                  case e @ (_: RuntimeException | _: IOException) if ignoreCorruptFiles =>
                    logWarning(
                      s"Skipped the rest of the content in the corrupted file: $currentFile", e)
                    finished = true
                    null

As you can deduce from the snippet, if Apache Spark encounters a corrupted file, it will stop its processing and log a warning message in the logs. How to reproduce it? An easy way to do so is to read a JSON file from Parquet reader because of this fragment in ParquetFileFormat class:

      try {
        // Skips row group information since we only need the schema.
        // ParquetFileReader.readFooter throws RuntimeException, instead of IOException,
        // when it can't read the footer.
        Some(new Footer(currentFile.getPath(),
          ParquetFileReader.readFooter(
            conf, currentFile, SKIP_ROW_GROUPS)))
      } catch { case e: RuntimeException =>
        if (ignoreCorruptFiles) {
          logWarning(s"Skipped the footer in the corrupted file: $currentFile", e)
          None
        } else {
          throw new IOException(s"Could not read footer for file: $currentFile", e)
        }
      }

A more complicated but still possible way is to write a compressed file and alter its content afterwards. You can see these 2 techniques in action in the code snippet and video below:

  private val sparkSession: SparkSession = SparkSession.builder()
    .appName("Spark SQL ignore corrupted files")
    .master("local[2]")
    .config("spark.sql.files.ignoreCorruptFiles", "true")
    .config("spark.sql.shuffle.partitions", "2")
    .getOrCreate()
  import sparkSession.implicits._
  before {
    Seq(("a"), ("b"), ("c"), ("d"), ("e"), ("f"),
      ("a"), ("b"), ("c"), ("d"), ("e"), ("f"),
      ("a"), ("b"), ("c"), ("d"), ("e"), ("f"),
      ("a"), ("b"), ("c"), ("d"), ("e"), ("f")
    ).toDS.toDF("letter")
      .write
      .mode(SaveMode.Overwrite)
      .option("compression", "gzip")
      .json(outputDir)
  }

  behavior of "enabled ignoreCorruptFiles flag"

  it should "read JSON files as Parquet without exceptions" in {
    val parquetLinesCount = sparkSession.read
      .schema(StructType(
        Seq(StructField("letter", StringType))
      ))
      .parquet(outputDir).count()

    parquetLinesCount shouldEqual 0
  }

  it should "read altered GZIP compressed file without exceptions" in {
    import scala.collection.JavaConverters._
    val files = FileUtils.listFiles(new File(outputDir), TrueFileFilter.INSTANCE,
      TrueFileFilter.INSTANCE).asScala
    files.foreach(file => {
      if (file.getName.startsWith("part")) {
        val datFile = new FileOutputStream(file, true).getChannel
        // Let's truncate this file to make it impossible to decompress
        datFile.truncate(file.length() - 2)
        datFile.close()
      } else if (file.getName.startsWith(".part")) {
        // ignore checksum files to not use checksum to deduce file correctness
        file.delete()
      }
    })

    val jsonLines = sparkSession.read.schema(StructType(
      Seq(StructField("letter", StringType))
    )).json(outputDir).count()

    jsonLines shouldEqual 24
  }


You can see ignoreCorruptFiles flag in action in the video below:

The 2 properties from this article can help to deal with corrupted files or the files that may be deleted during the processing. A nice thing to notice is that despite an initial corruption, Apache Spark does the best effort work and tries to correctly read as many rows as possible.

If you liked it, you should read: