Corrupted records aka poison pill records in Apache Spark Structured Streaming

Versions: Apache Spark 2.4.3

Some time ago I watched an interesting Devoxx France 2019 talk about poison pills in streaming systems presented by Loïc Divad. I learned a few interesting patterns like sentinel value that may help to deal with corrupted data but the talk was oriented on Kafka Streams. And since I didn't find a corresponding resource for Apache Spark Structured Streaming [and also because I'm simply an Apache Spark enthusiast ;)], I decided to write one trying to implement Loïc's ideas in the Structured Streaming world.

The post is divided into 5 sections. Each of them describes one strategy to deal with corrupted records. In my examples I will consider the case of the data retrieval during the projection. But it's not the single place when you can meet corrupted records. The problem can move further in your pipeline depending where you deserialize the data. For example, if you keep the value as string and deserialize it manually in the map function, it will probably be your risky place.

All snippets share the same setup code:

  import org.apache.spark.sql.types._
  val schema = new StructType()
    .add(StructField("lower", StringType, false))
    .add(StructField("upper", StringType, false))

  def addCorruptedRecords(sparkSession: SparkSession, topic: String) = {
    import sparkSession.implicits._
    val inputKafkaData = sparkSession.createDataset(Seq(
      (topic, System.currentTimeMillis().toString, """{"lower": "a", "upper": "A"}"""),
      (topic, System.currentTimeMillis().toString, """{"PARTIAL_lower": "a", "upper": "A"}"""),
      (topic, System.currentTimeMillis().toString, "not a JSON"),
      (topic, System.currentTimeMillis().toString, """{"SOME_LETTER": "a", OTHER_LETTER": "A"}""")
    )).toDF("topic", "key", "value")

    // I'm using here the broker from my other project:
    // https://github.com/bartosz25/kafka-playground/tree/master/broker
    inputKafkaData.write.format("kafka").option("kafka.bootstrap.servers", "210.0.0.20:9092").save()
  }

 val topic = "wrapper"
  val sparkSession = SparkSession.builder()
    .appName("Spark-Kafka corrupted records - wrapper")
    .config("spark.sql.shuffle.partitions", "2")
    .master("local[*]")
    .getOrCreate()
  import sparkSession.implicits._

  addCorruptedRecords(sparkSession, topic)

  val dataSource = sparkSession.readStream.format("kafka")
    .option("kafka.bootstrap.servers", "210.0.0.20:9092")
    .option("client.id", s"wrapper")
    .option("subscribe", topic)
    .option("startingOffsets", "earliest")
    .load()

Strategy 1: let it crash

In this first, quite primitive strategy, our Structured Streaming application will log a poison pill message and stop the processing. It's not a big deal because thanks to the checkpointed offsets we'll be able to reprocess the data and handle it accordingly, maybe with a try-catch block. However, it's not a good practice in streaming systems because the consumer stops and during that idle period it accumulates the lag (the producer continues to generate data). Never mind, let's see how to implement it in a Structured Streaming pipeline reading data from Apache Kafka:

// I'm omitting the source for readability
 val jsonOptions = Map[String, String](
      // Doesn't work like expected!
      // Simply speaking, this parameter is used only for the data sources readers
      // and cannot be injected here
      //"columnNameOfCorruptRecord" -> "corrupted_records",
      // Also doesn't work, for the same reason
      //"dropFieldIfAllNull" -> "true"
  )
  val processedData = dataSource.selectExpr("CAST(value AS STRING) AS value_as_string")
    .withColumn("letter", functions.from_json($"value_as_string", schema, jsonOptions))

  val query = processedData
    .writeStream
    .foreachBatch((dataset, batchNumber) => {
      dataset.persist()

      // I'm using take because it fails for .first() when used with empty dataset (no conversion errors) with:
      // "Exception in thread "main" org.apache.spark.sql.streaming.StreamingQueryException: next on empty iterator"
      // error
      val failedRow = dataset.filter("letter IS NULL").select("value_as_string").take(1)
      if (failedRow.nonEmpty) {
        throw new RuntimeException(
          s"""An error happened when converting "${failedRow(0).getAs[String]("value_as_string")}" to JSON."""
        )
      }
      println("Writing rows to the console")
      // Otherwise, do something with data
      dataset.show(false)

      dataset.unpersist()
    })

  query.start().awaitTermination()


As you can see, it's not an easy piece of cake. Initially, I wanted to use parse mode configuration described in my previous Apache Spark SQL and types resolution in semi-structured data post. I simply wanted to put it as FailFast in the configuration map that can be passed to from_json method and let Apache Spark crash. However, it doesn't work since the parse mode applies to data source readers and not the the function. To see that, let's go to the Jackson parser invoked for from_json call:

    try {
      converter(parser.parse(
        json.asInstanceOf[UTF8String],
        CreateJacksonParser.utf8String,
        identity[UTF8String]))
    } catch {
      case _: BadRecordException => null
    }

In the parse method, a BadRecordException is thrown if something wrong happens:

    try {
      Utils.tryWithResource(createParser(factory, record)) { parser =>
        // a null first token is equivalent to testing for input.trim.isEmpty
        // but it works on any token stream and not just strings
        parser.nextToken() match {
          case null => Nil
          case _ => rootConverter.apply(parser) match {
            case null => throw new RuntimeException("Root converter returned null")
            case rows => rows
          }
        }
      }
    } catch {
      case e @ (_: RuntimeException | _: JsonProcessingException | _: MalformedInputException) =>
        // JSON parser currently doesn't support partial results for corrupted records.
        // For such records, all fields other than the field configured by
        // `columnNameOfCorruptRecord` are set to `null`.
        throw BadRecordException(() => recordLiteral(record), () => None, e)

But as you saw in the first snippet, this exception is silently translated to a null value and it's not logged. That's the reason why I had to play with extra temporary column storing raw data and null checks.

Strategy 2: ignore errors

If you don't want downtime of your consumer, you can simply skip the corrupted events. In Structured Streaming it can be summarized to filtering out null records and, eventually, logging the unparseable messages for further investigation:

  val processedData = dataSource.selectExpr("CAST(value AS STRING) AS value_as_string")
    .withColumn("letter", functions.from_json($"value_as_string", schema))
    .filter("letter IS NOT NULL")

  val query = processedData.writeStream.format("console").option("truncate", "false")

  query.start().awaitTermination()

In my snippet I simply ignore the invalid records. A version with logging could look like that:

  val processedData = dataSource.selectExpr("CAST(value AS STRING) AS value_as_string")
    .withColumn("letter", functions.from_json($"value_as_string", schema))
    .filter(row => {
      val convertedLetter = row.getAs[Row]("letter")
      if (convertedLetter == null) {
        println(s"Record ${row.getAs[String]("value_as_string")} cannot be converted")
        false
      } else {
        true
      }
    })

Strategy 3: Dead Letter Queue

The next strategy can be considered as an evolution of the previous one. Here too, we ignore the errors but instead of logging them, we dispatch them into another data storage. It can be a Kafka topic but also a more static storage like distributed file system if you need to make some ad-hoc querying on it. In my example, I take the second option into account and to do that, I use foreachBatch method introduced in 2.4 release:

  val processedData = dataSource.selectExpr("CAST(value AS STRING) AS value_as_string")
    .withColumn("letter", functions.from_json($"value_as_string", schema))

  val query = processedData
    .writeStream
    .foreachBatch((dataset, batchNumber) => {
      dataset.persist()

      // I'm simply printing both, valid and invalid, datasets
      // But it's only for demo purposes. You will probably want to write
      // them somewhere else
      println("Corrupted records")
      dataset.filter("letter IS NULL").show(false)
      println("Valid records")
      dataset.filter("letter IS NOT NULL").show(false)

      dataset.unpersist()
    })

  query.start().awaitTermination()

An interesting thing to notice. Among the valid records group we'll find a partially valid entry {"PARTIAL_lower": "a", "upper": "A"}. When I was analyzing the code, I didn't find a way to enforce that, ie. configuring the from_json to export only rows with all columns defined. The schema, despite being marked as not nullable for both columns, doesn't enforce that constraint.

Strategy 4: sentinel value

Finally, we can use a pattern called Sentinel Value and it can be freely used with Dead Letter Queue. Let's start by defining it first. Sentinel Value corresponds to a unique value returned every time in case of trouble. So in our case, whenever a record cannot be converted to the structure we're processing, we will emit a common object, like here:

  val processedData = dataSource.selectExpr("CAST(value AS STRING) AS value_as_string")
    .withColumn("letter", functions.from_json($"value_as_string", schema, Map[String, String]()))
    .withColumn("letter", functions.when($"letter".isNotNull, $"letter")
      .otherwise(
        // Here I'm building our sentinel value
        functions.struct(
          functions.lit(".").as("lower"), functions.lit(".").as("upper")
        )
      )
    )
    .select("letter")

  val query = processedData.writeStream.format("console").option("truncate", "false")

  query.start().awaitTermination()

Strategy candidate: wrapper

As I said, we can mix Sentinel Value and Dead Letter Queue. The idea here is to wrap the failed event and annotate it with all information required for a better understanding of the errors. For instance we could build a dictionary of error types and add the error code to the wrapper. We could also give some technical context like the exception message or processing info like failure date. Since working on structured data is easier than on unstructured or semi-structured, this approach should help to build downstream applications. Please notice that you could also use for that a not modified message with Apache Kafka headers but as of this writing, the headers aren't supported in Structured Streaming connector (2.4.4).

An example of such approach could be implemented as:

  val processedData = dataSource.selectExpr("CAST(value AS STRING) AS value_as_string")
    .withColumn("letter", functions.from_json($"value_as_string", schema, Map[String, String]()))
    .map(row => {
      val letter = row.getAs[Row]("letter")
      if (letter != null) {
        val sentinelData = Option(letter).map(letterData => SentinelForDLQData(
          lower = letterData.getAs[String]("lower"), upper = letterData.getAs[String]("upper"))
        )
        SentinelForDLQWrapper(data = sentinelData)
      } else {
        SentinelForDLQWrapper(data = None, error = Some(
          SentinelForDLQError(row.getAs[String]("value_as_string"))
        ))
      }
    })

  val query = processedData.writeStream.format("console").option("truncate", "false")

  query.start().awaitTermination()

case class SentinelForDLQWrapper(data: Option[SentinelForDLQData] = None,
                                 error: Option[SentinelForDLQError] = None)
case class SentinelForDLQData(lower: String, upper: String)
case class SentinelForDLQError(raw: String)

This is one of the simplest implementations where we're only checking if the record was correctly converted. But we could also go further and, for instance, analyze the content of successfully converted records, estimate their completeness and annotate the wrapper with this information.

I built the code on from_json function. Is there another solution allowing the definition of conversion failure strategy? I eventually see a User Defined Function but I didn't want to reinvent the wheel, at least for this blog post. If you have any clue or implemented these problematics differently, feel free to share your solution in the comments!