Structured Streaming and temporary views

Versions: Apache Spark 3.0.1

I don't know you, but me, when I first saw the code with createTempView method, I thought it created a temporary table in the metastore. But it's not true and in this blog post, you will see why.

In the first part of the article, you will see a working example of the createTempView method used in Structured Streaming. You will also discover what, if any, are the differences in the physical plan. The second part of the blog post will tell you a few words about the implementation details.

A table in streaming?

What does it mean, a table in the streaming application? The motivating example looks like in the following snippet. You can see that the data is read from an Apache Kafka topic and later "put" into a temporary view called stream1. By the end, the data is queried from this temporary view in SQL:

val inputData = spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "localhost:29092")
    .option("subscribe", SyntaxCheckConfiguration.TopicName)
    .option("startingOffsets", "EARLIEST")
    .load()

inputData.createTempView("stream1")

val processingQuerySql = spark.sql("SELECT value, timestamp FROM stream1")

val streamingQuery = processingQuerySql.writeStream.format("console").option("truncate", false)
    .start()

Let's see what the generated physical plan is. But is this temporary view creation step even visible? Not really, the plan looks like in this snippet:

== Physical Plan ==
WriteToDataSourceV2 org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@4bcaeef9
+- *(1) Project [value#8, timestamp#12]
   +- *(1) Project [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13]
      +- MicroBatchScan[key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13] class org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan

As you can see, there is no mention of the temporary view creation. If you compare it with the plan generated by inputData.select("value", "timestamp"), you can notice 0 differences:

== Physical Plan ==
WriteToDataSourceV2 org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@3ffbfa9c
+- *(1) Project [value#8, timestamp#12]
   +- *(1) Project [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13]
      +- MicroBatchScan[key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13] class org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan

Why are both plans the same? After all, an extra operation is involved in SQL-based processing. To answer the question, let's check some internals in the next section.

SubqueryAlias

The logical plans provide a part of the answer about the temporary view's representation. If you analyze the one for the SQL-based processing, you will notice a SubqueryAlias node:

== Analyzed Logical Plan ==
WriteToDataSourceV2 org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@32d8105a
+- Project [value#8, timestamp#12]
   +- SubqueryAlias stream1
      +- StreamingDataSourceV2Relation [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13], org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan@cdaaca5, KafkaV2[Subscribe[syntax_check]], {"syntax_check":{"0":0}}, {"syntax_check":{"0":245}}

Of course, the SubqueryAlias is not present in the API-based processing but as you saw in the previous section, this node is always removed in the physical plan. The logical rule responsible for this removal is EliminateSubqueryAliases and in the transformation it simply takes the child of the alias node:

  def apply(plan: LogicalPlan): LogicalPlan = AnalysisHelper.allowInvokingTransformsInAnalyzer {
    plan transformUp {
      case SubqueryAlias(_, child) => child
    }
  }

SessionCatalog

As you already know - if not, you can learn it from the Writing custom optimization in Apache Spark SQL - parser blog post - the SQL string will be translated to the logical plan tree. In simple terms, every part of the SQL expression will have a corresponding logical node that will be later analyzed, optimized and transformed to the physical plan.

Why do we see the SubqueryAlias in the plan of the SQL-based processing, even though you didn't specify it explicitly? It's automatically added by the analysis step. After the first execution, the SQL text is converted into an unresolved plan like that:

'Project ['value, 'timestamp]
+- 'UnresolvedRelation [stream1]

In the next stage, all the unresolved elements are...resolved, so identified. If you use a non existing table or column, it's the moment when you will get an analysis exception error. Anyway, the method responsible for identifying the temporary view is lookupTempView(table: String) from SessionCatalog:

  def lookupTempView(table: String): Option[SubqueryAlias] = {
    val formattedTable = formatTableName(table)
    getTempView(formattedTable).map { view =>
      SubqueryAlias(formattedTable, view)
    }
  }

The view attribute of the alias node will be the StreamingRelationV2 instance built with Apache Kafka source classes. That's why, once the SubqueryAlias is eliminated, the physical plan looks the same for both code versions.

The SubqueryAlias node is required, though. Otherwise, Apache Spark couldn't identify where the columns specified in the SQL query come from.

Unfortunately, you won't be able to do everything in plain SQL. For example, to write the arbitrary stateful processing code, you will need to implement your own Scala functions. Nonetheless, using plain SQL can be an acceptable trade-off if you want to use Structured Streaming in the team when the data engineers have weaker coding skills. As you saw in this article, except for the user-facing representation of the logic, there are no execution differences with the API-based processing.


If you liked it, you should read:

📚 Newsletter Get new posts, recommended reading and other exclusive information every week. SPAM free - no 3rd party ads, only the information about waitingforcode!