Nested fields, dropDuplicates and watermark in Apache Spark Structured Streaming

Versions: Apache Spark 2.4.3

When I was playing with my data-generator and Apache Spark Structured Streaming, I was surprised by one behavior that I would like to share and explain in this post. To not deep delve into the details right now, the story will be about the use of nested structures in several operations.

Let's first introduce the problematic code:

  private val sparkSession: SparkSession = SparkSession.builder()
    .appName("Spark SQL nested data structures failures").master("local[*]").getOrCreate()
  import sparkSession.implicits._

  "watermark on a nested column" should "fail the query" in {
    val inputStream = new MemoryStream[LetterContainer](1, sparkSession.sqlContext)
    inputStream.addData(
      LetterContainer(TimeInformation(new Timestamp(0), "0"), "a"),
      LetterContainer(TimeInformation(new Timestamp(1), "1"), "b")
    )

    val treeError = intercept[Exception] {
      inputStream.toDS()
        .withWatermark("timeParams.watermark", "1 minute")
        .writeStream.format("console").option("truncate", "false")
        .start().awaitTermination(5000L)
    }

    treeError.getMessage() should include ("makeCopy, tree:")
    treeError.getMessage() should include ("EventTimeWatermark 'timeParams.watermark, interval 1 minutes")
  }

  "dropDuplicates on a nested column" should "fail the query" in {
    val inputStream = new MemoryStream[LetterContainer](1, sparkSession.sqlContext)
    inputStream.addData(
      LetterContainer(TimeInformation(new Timestamp(0), "0"), "a"),
      LetterContainer(TimeInformation(new Timestamp(1), "1"), "b")
    )

    val analysisException = intercept[AnalysisException] {
      inputStream.toDS()
        .dropDuplicates("timeParams.watermark", "1 minute")
        .writeStream.format("console").option("truncate", "false")
        .start().awaitTermination(5000L)
    }

    analysisException.getMessage() should startWith("Cannot resolve column name \"timeParams.watermark\" among (timeParams, letter);")
  }

As you can see, in both cases Apache Spark fails the processing because of the fields used in dropDuplicates and withWatermark methods. In both situations, these fields came from a nested structure, so logically the solution would extract these fields, like that:

  "extracted nested fields" should "be allowed in dropDuplicates and withWatermark" in {
    val inputStream = new MemoryStream[LetterContainer](1, sparkSession.sqlContext)
    inputStream.addData(
      LetterContainer(TimeInformation(new Timestamp(0), "0"), "a"),
      LetterContainer(TimeInformation(new Timestamp(1), "1"), "b")
    )

    inputStream.toDS()
      .select("timeParams.*")
      .withWatermark("watermark", "1 minute")
      .dropDuplicates("stringifiedTime")
      .writeStream.format("console").option("truncate", "false")
      .start().awaitTermination(5000L)
  }

Why does it happen? Let's check the simplest error case first.

dropDuplicates

The reason for failure of dropDuplicates is easy to find. The method allows only the top-level columns to be used as deduplication columns. You can find it in the code below, and more exactly here, val allColumns = queryExecution.analyzed.output:

  def dropDuplicates(colNames: Seq[String]): Dataset[T] = withTypedPlan {
    val resolver = sparkSession.sessionState.analyzer.resolver
    val allColumns = queryExecution.analyzed.output
    val groupCols = colNames.toSet.toSeq.flatMap { (colName: String) =>
      // It is possibly there are more than one columns with the same name,
      // so we call filter instead of find.
      val cols = allColumns.filter(col => resolver(col.name, colName))
      if (cols.isEmpty) {
        throw new AnalysisException(
          s"""Cannot resolve column name "$colName" among (${schema.fieldNames.mkString(", ")})""")
      }
      cols
    }
    Deduplicate(groupCols, logicalPlan)
  }

In other words, the output of the analyzed plan will never expose the nested fields of a struct type as columns - there will be a single column, the one with the struct:

  "analyzed plan output" should "only output top-level columns" in {
    val inputData = Seq(
      LetterContainer(TimeInformation(new Timestamp(0), "0"), "a"),
      LetterContainer(TimeInformation(new Timestamp(1), "1"), "b")
    ).toDS()

    val output = inputData.queryExecution.analyzed.output

    output.map(attribute => attribute.name) should contain only ("timeParams", "letter")
  }

withWatermark

The exception produced by withWatermark call on a struct's attribute is:


makeCopy, tree:
'EventTimeWatermark 'timeParams.watermark, interval 1 minutes
+- StreamingExecutionRelation MemoryStream[timeParams#2,letter#3], [timeParams#2, letter#3]

org.apache.spark.sql.catalyst.errors.package$TreeNodeException: makeCopy, tree:
'EventTimeWatermark 'timeParams.watermark, interval 1 minutes
+- StreamingExecutionRelation MemoryStream[timeParams#2,letter#3], [timeParams#2, letter#3]
...
Caused by: org.apache.spark.sql.catalyst.errors.package$TreeNodeException:
Failed to copy node.
Is otherCopyArgs specified correctly for EventTimeWatermark.
Exception message: argument type mismatch
ctor: public org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark(org.apache.spark.sql.catalyst.expressions.Attribute,org.apache.spark.unsafe.types.CalendarInterval,org.apache.spark.sql.catalyst.plans.logical.LogicalPlan)?
types: class org.apache.spark.sql.catalyst.expressions.Alias, class org.apache.spark.unsafe.types.CalendarInterval, class org.apache.spark.sql.execution.streaming.StreamingExecutionRelation
args: timeParams#2.watermark AS watermark#5, interval 1 minutes, MemoryStream[timeParams#2,letter#3]
           , tree:
'EventTimeWatermark 'timeParams.watermark, interval 1 minutes
+- StreamingExecutionRelation MemoryStream[timeParams#2,letter#3], [timeParams#2, letter#3]

    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$makeCopy$1.apply(TreeNode.scala:407)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$makeCopy$1.apply(TreeNode.scala:377)
    at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
    ... 87 more

What is the reason for this exception? It happens here:

  def makeCopy(newArgs: Array[AnyRef]): BaseType = attachTree(this, "makeCopy") {
// ...
    try {
      CurrentOrigin.withOrigin(origin) {
        defaultCtor.newInstance(allArgs.toArray: _*).asInstanceOf[BaseType]
      }
    } catch {
      case e: java.lang.IllegalArgumentException =>
        throw new TreeNodeException(
          this,
          s"""
             |Failed to copy node.
             |Is otherCopyArgs specified correctly for $nodeName.
             |Exception message: ${e.getMessage}
             |ctor: $defaultCtor?
             |types: ${newArgs.map(_.getClass).mkString(", ")}
             |args: ${newArgs.mkString(", ")}
           """.stripMargin)
    }
}

The makeCopy method is invoked in various places to replace a node in the plan by the new node, having newArgs from the signature. This new node is created by invoking the constructor which for watermark node (EventTimeWatermark takes 3 parameters, namely (org.apache.spark.sql.catalyst.expressions.Attribute,org.apache.spark.unsafe.types.CalendarInterval,org.apache.spark.sql.catalyst.plans.logical.LogicalPlan). But when you analyze what are the types resolved for a structure's column, you will see that the first parameter has a wrong type:

 0 = {Alias@7891} "timeParams#2.watermark AS watermark#5"
 1 = {CalendarInterval@7892} "interval 1 minutes"
 2 = {StreamingExecutionRelation@7893} "MemoryStream[timeParams#2,letter#3]"

In Apache Spark classes hierarchy, Alias is not an attribute but an expression, so something that has to be evaluated. An attribute, on the other hand, can be considered as a "static" parameter in the sense that we take a value from the column.

After that finding, I wanted to see whether I was alone and "fortunately", I wasn't. From the issues, I found we can learn that the problem comes from the difficulty to evaluate a given string as the real column name or expression, like when we use a struct's attribute. And this problem seems to be shared not only between dropDuplicates and withWatermark but also by partitionBy:

  "partitionBy" should "also fail with a struct column" in {
    val inputData = Seq(
      LetterContainer(TimeInformation(new Timestamp(0), "0"), "a"),
      LetterContainer(TimeInformation(new Timestamp(1), "1"), "b")
    ).toDS()

    val analysisException = intercept[AnalysisException] {
      inputData.write.partitionBy("timeParams.watermark").json("/tmp/struct_in_partitionby")
    }

    analysisException.getMessage should startWith("Partition column `timeParams.watermark` not found in schema")
  }

Why is it not fixed? According to SPARK-23337 and SPARK-18084, the refactoring is not as simple as it looks and should apply to the framework globally rather than unitarly, method by method.

And that's my story. I found these errors by a pure mistake, when I was playing around with data-generator and Structured Streaming. Despite this problem I have no regrets because the next time I will not look for the errors in my schema definition in these cases ;)