Broadcast join and changing static dataset

Versions: Apache Spark 3.2.0

Last year I wrote a blog post about broadcasting in Structured Streaming and I got an interesting question under one of the demo videos. What happens if the joined static dataset in a broadcast mode gets new data? Let's check this out!

New ebook 🔥

Learn 84 ways to solve common data engineering problems with cloud services.

👉 I want my Early Access edition

The code used in the demo does a simple join between an Apache Kafka topic and an at-rest dataset of JSON files. If it's so simple, then why wouldn't it work? Well, even though under-the-hood Apache Spark runs a streaming query, it still uses a static query plan resolved in the first micro-batch (I oversimplified, will dig into it after the demo):

Joining a static dataset

As you noticed in the demo, Apache Spark reads the input files in each micro-batch but doesn't include new files from the input directory. To understand why, we've to recall how the framework finds the files to read. To build a file-based data source, Apache Spark calls the DataSource#resolveRelation method where it does different things like finding the data source class, inferring the schema and finding the files to include in the data source. You can see the last action in the snippet below

val globbedPaths = checkAndGlobPathIfNecessary(
   checkEmptyGlobPath = true, checkFilesExist = checkFilesExist)
val index = createInMemoryFileIndex(globbedPaths)
val (resultDataSchema, resultPartitionSchema) =
    getOrInferFileFormatSchema(format, () => index)
(index, resultDataSchema, resultPartitionSchema)

I highlighted one line on purpose because it creates an in-memory index with all files composing the data source. The index launches an Apache Spark job described as "Listing leaf files and directories" to process the input paths in parallel and return the exact files to include. So resolved list is later cached in these 2 variables of the index:

 @volatile private var cachedLeafFiles: mutable.LinkedHashMap[Path, FileStatus] = _
 @volatile private var cachedLeafDirToChildrenFiles: Map[Path, Array[FileStatus]] = _

Now, when a new micro-batch runs, it doesn't regenerate the list. Instead, it uses the cache. And that's why a batch + streaming datasets join doesn't support new files in the input directory. How to overcome the issue? I tried different techniques.

First, I tested the stream-to-stream join with this batch-as-stream dataset. It was hard to do with artificially created watermarks. I'm simply trying to enrich the streaming data with some really slowly changing dataset. And since the watermark also defines when the not matched rows go to the sink, configuring it was difficult (I didn't find a good one).

Second, I tried something - apparently - easier, like using a temporary view for the join:

sparkSession.read.json(TestConfiguration.datasetPath).createOrReplaceTempView("timestamps_to_join")

  new Thread(new Runnable {
    override def run(): Unit = {
      while (true) {
        println("Refreshing timestamps_to_join")
        if (new File(s"${TestConfiguration.datasetPath}/_SUCCESS").exists()) {
     sparkSession.read.json(TestConfiguration.datasetPath).createOrReplaceTempView("timestamps_to_join")
          sparkSession.sql("SELECT * FROM timestamps_to_join").show()
        }
        Thread.sleep(5000L)
      }
    }
  }).start()

  val writeQuery = kafkaSource.selectExpr("CAST(value AS STRING) AS value_key")
    .join(sparkSession.sql("SELECT * FROM timestamps_to_join"), $"value_key" === $"nr",
      "left_outer")
    .writeStream
    .format("console")
    .option("truncate", false)

Unfortunately, it didn't work either. When I started to debug it, I quickly made the link with my previous blog post about Structured Streaming and temporary views. Temporary view doesn't materialize the data. It's a simple alias thanks to which you can use it in SQL queries. You can see that in the analyzed version of the execution plan:

Join LeftOuter, (cast(value_key#34 as bigint) = nr#8L)
:- Project [cast(value#21 as string) AS value_key#34]
:  +- StreamingRelationV2 org.apache.spark.sql.kafka010.KafkaSourceProvider@12166229, kafka, org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@208e5b23, [startingOffsets=LATEST, kafka.bootstrap.servers=localhost:29092, subscribe=broadcast_join_demo_new_file_created, client.id=broadcast_join_demo_new_file_created_client], [key#20, value#21, topic#22, partition#23, offset#24L, timestamp#25, timestampType#26], StreamingRelation DataSource(org.apache.spark.sql.SparkSession@1f346ad2,kafka,List(),None,List(),None,Map(kafka.bootstrap.servers -> localhost:29092, client.id -> broadcast_join_demo_new_file_created_client, subscribe -> broadcast_join_demo_new_file_created, startingOffsets -> LATEST),None), kafka, [key#13, value#14, topic#15, partition#16, offset#17L, timestamp#18, timestampType#19]
+- Project [is_even#7, nr#8L, ts#9]
   +- SubqueryAlias timestamps_to_join
      +- RelationV2[is_even#7, nr#8L, ts#9] json file:/tmp/wfc/broadcast-join/test_dataset

So recreating the temporary view doesn't change the underlying data source definition used by the streaming query because of the very same cached index issue! The rule also applies to the global temporary view. Hopefully, I recalled some of the demos of Delta Lake and was able to find a solution!

Batch and streaming join with Delta Lake

Why does simply changing the file format to Delta Lake help? All because of the internal mechanism responsible for the input resolution. The Delta Lake index, represented by TahoeFileIndex class, doesn't cache the data. Every time a join happens with the streaming dataset, it calls the DeltaLog#update method, which refreshes the current version of the dataset.

Thanks to this update, the physical node responsible for the static dataset, the DataSourceScanExec, can use the most recent files.

And now the demo time showing the execution details of Delta Lake, streaming JSON and refreshed temporary view:

To sum up, joining a slowly changing dataset with a streaming one will work with an appropriate file format or with the dataset composed of the same updated files. All this is because of the internal indexing details of the file formats, which for some cases (e.g., Delta Lake) refresh the state automatically. In contrast, for some others (e.g., JSON), the data source keeps a cache through the query execution.

If you liked it, you should read:

The comments are moderated. I publish them when I answer, so don't worry if you don't see yours immediately :)

📚 Newsletter Get new posts, recommended reading and other exclusive information every week. SPAM free - no 3rd party ads, only the information about waitingforcode!