Broadcasting in Structured Streaming

Versions: Apache Spark 3.0.0

Some time ago @ArunJijo36 mentioned me on Twitter with a question about broadcasting in Structured Streaming. If, like me at this time, you don't know what happens, I think that this article will be good for you 👊

A virtual conference at the intersection of Data and AI. This is not a conference for the hype. Its real users talking about real experiences.
- 40+ speakers with the likes of Hannes from Duck DB, Sol Rashidi, Joe Reis, Sadie St. Lawrence, Ryan Wolf from nvidia, Rebecca from lidl
- 12th September 2024
- Three simultaneous tracks
- Panels, Lighting Talks, Keynotes, Booth crawls, Roundtables and Entertainment.
- Topics include (ingestion, finops for data, data for inference (feature platforms), data for ML observability
- 100% virtual and 100% free

👉 Register here

In the post you will discover, in order, what happens when a variable is broadcasted and when the streaming Dataset is joined with a batch one via broadcast join. Every part will contain a demo example and some technical explanation for the behavior. If you are wondering whether the datasets and objects will be broadcasted every time - after all, the micro-batch can be thought of as a batch query executed iteratively - keep reading!

Broadcasted variables

I already covered the broadcasted variables in zoom at broadcast variables article, so I will just recall some basics here. A broadcast variable is a variable that is built on the driver and sent to the executors only once. It's then immutable and local. To use it, nothing simpler than calling a SparkContext's broadcast[T: ClassTag](value: T): Broadcast[T] method that will return an instance of typed Broadcast class.

If you want to use it, you have to pass this typed instance to your methods or classes, for example that way:

  val ratesMap = (0 to 3000).map(nr => {
    (nr, nr % 2 == 0)
  })
val ratesMapBroadcast = sparkSession.sparkContext.broadcast(ratesMap.toMap)
  def selectAField(row: Row, broadcast: Broadcast[scala.collection.immutable.Map[Int, Boolean]]): String = {
    println(s"Got ratesMap=${ratesMapBroadcast.value}")
    row.getAs[String]("ts")
  }

  val writeQuery = dataFrame.selectExpr("CAST(value AS STRING) AS value_key")
    .join(ratesDataset, $"value_key" === $"nr", "left_outer")
    .map(row => selectAField(row, ratesMapBroadcast))

And the whole magic happens when you call the value. It's at this time when the executor will verify whether the value you're looking for was already seen or not. If it's the first time, the value will be fetched by the block manager from the remote node:

  override protected def getValue() = synchronized {
    val memoized: T = if (_value == null) null.asInstanceOf[T] else _value.get
    if (memoized != null) {
      memoized
    } else {
      val newlyRead = readBroadcastBlock()
      _value = new SoftReference[T](newlyRead)
      newlyRead
    }
  }

On this occasion, it's very interesting to notice one thing. The broadcast protocol is a bittorrent-like protocol, so the block files composing the broadcasted object can be fetched from the original publisher (the driver) but also from the consumer that already fetched the blocks (the executors). It happens in readBlocks() method where first, the local file system is searched for the existence of the broadcasted blocks, and only later, other executors:

      bm.getLocalBytes(pieceId) match {
        case Some(block) =>
          blocks(pid) = block
          releaseBlockManagerLock(pieceId)
        case None =>
          bm.getRemoteBytes(pieceId) match {

Once fetched, the value is put in a local variable and retrieved whenever needed. But does it apply to the Structured Streaming queries too? Yes, and so despite the iterative character of the micro-batch queries. The broadcast variables are stored in the executor's memory and aren't coupled to a specific physical plan. They could be if the physical plan would deallocate them explicitly but in the code, you won't find any reference from it to the TorrentBroadcast's unpersist(id: Long, removeFromDriver: Boolean, blocking: Boolean) method.

Does it mean that the broadcast variables will be there forever? No because the cached value is a SoftReference and it is one on purpose:

   * On the driver, if the value is required, it is read lazily from the block manager. We hold
   * a soft reference so that it can be garbage collected if required, as we can always reconstruct
   * in the future.

SoftReferences can be deallocated by the Garbage Collector when the JVM is in trouble to fulfill the increasing memory demand. If you want to know more about them, there are plenty great article explaining it, like [1], [2], [3]. Applied on broadcast variables, it means that the GC can deallocate the cached value at any moment but since it was previously downloaded to the local file system, there is no risk that the executor has to do it again. And that's why you will often find the statements that the broadcasts are sent to the executors only once. In other words, if you change it locally, the change won't be propagated. You can find a proof for that in the following demo:

Broadcast join in Structured Streaming

I already gave you a hint what will happen with broadcast joins in Structured Streaming micro-batch queries. Since they're executed iteratively, well yes, the joined dataset will be broadcasted every time.

If you check what happens inside runBatch(sparkSessionToRunBatch: SparkSession) method, responsible for executing every micro-batch, you will see different things:

In the steps above you can see that nothing is reused except the initial logical plan. And since the physical plan is not reused, the broadcast join dataset will be broadcasted at every micro-batch. Below you can find a demo proving that:

Often the broadcasting is a way to accelerate the processing logic but as you saw, there are some gotchas in Structured Streaming. Broadcast variables are quite clear since they keep the same semantic as for the batch applications. On the other hand, broadcast joins, due to the incremental character of the streaming, are a little bit different. They're still sent only once to every executor but only within the scope of the given micro-batch.