Some time ago @ArunJijo36 mentioned me on Twitter with a question about broadcasting in Structured Streaming. If, like me at this time, you don't know what happens, I think that this article will be good for you 👊
A virtual conference at the intersection of Data and AI. This is not a conference for the hype. Its real users talking about real experiences.
- 40+ speakers with the likes of Hannes from Duck DB, Sol Rashidi, Joe Reis, Sadie St. Lawrence, Ryan Wolf from nvidia, Rebecca from lidl
- 12th September 2024
- Three simultaneous tracks
- Panels, Lighting Talks, Keynotes, Booth crawls, Roundtables and Entertainment.
- Topics include (ingestion, finops for data, data for inference (feature platforms), data for ML observability
- 100% virtual and 100% free
👉 Register here
In the post you will discover, in order, what happens when a variable is broadcasted and when the streaming Dataset is joined with a batch one via broadcast join. Every part will contain a demo example and some technical explanation for the behavior. If you are wondering whether the datasets and objects will be broadcasted every time - after all, the micro-batch can be thought of as a batch query executed iteratively - keep reading!
Broadcasted variables
I already covered the broadcasted variables in zoom at broadcast variables article, so I will just recall some basics here. A broadcast variable is a variable that is built on the driver and sent to the executors only once. It's then immutable and local. To use it, nothing simpler than calling a SparkContext's broadcast[T: ClassTag](value: T): Broadcast[T] method that will return an instance of typed Broadcast class.
If you want to use it, you have to pass this typed instance to your methods or classes, for example that way:
val ratesMap = (0 to 3000).map(nr => { (nr, nr % 2 == 0) }) val ratesMapBroadcast = sparkSession.sparkContext.broadcast(ratesMap.toMap) def selectAField(row: Row, broadcast: Broadcast[scala.collection.immutable.Map[Int, Boolean]]): String = { println(s"Got ratesMap=${ratesMapBroadcast.value}") row.getAs[String]("ts") } val writeQuery = dataFrame.selectExpr("CAST(value AS STRING) AS value_key") .join(ratesDataset, $"value_key" === $"nr", "left_outer") .map(row => selectAField(row, ratesMapBroadcast))
And the whole magic happens when you call the value. It's at this time when the executor will verify whether the value you're looking for was already seen or not. If it's the first time, the value will be fetched by the block manager from the remote node:
override protected def getValue() = synchronized { val memoized: T = if (_value == null) null.asInstanceOf[T] else _value.get if (memoized != null) { memoized } else { val newlyRead = readBroadcastBlock() _value = new SoftReference[T](newlyRead) newlyRead } }
On this occasion, it's very interesting to notice one thing. The broadcast protocol is a bittorrent-like protocol, so the block files composing the broadcasted object can be fetched from the original publisher (the driver) but also from the consumer that already fetched the blocks (the executors). It happens in readBlocks() method where first, the local file system is searched for the existence of the broadcasted blocks, and only later, other executors:
bm.getLocalBytes(pieceId) match { case Some(block) => blocks(pid) = block releaseBlockManagerLock(pieceId) case None => bm.getRemoteBytes(pieceId) match {
Once fetched, the value is put in a local variable and retrieved whenever needed. But does it apply to the Structured Streaming queries too? Yes, and so despite the iterative character of the micro-batch queries. The broadcast variables are stored in the executor's memory and aren't coupled to a specific physical plan. They could be if the physical plan would deallocate them explicitly but in the code, you won't find any reference from it to the TorrentBroadcast's unpersist(id: Long, removeFromDriver: Boolean, blocking: Boolean) method.
Does it mean that the broadcast variables will be there forever? No because the cached value is a SoftReference and it is one on purpose:
* On the driver, if the value is required, it is read lazily from the block manager. We hold * a soft reference so that it can be garbage collected if required, as we can always reconstruct * in the future.
SoftReferences can be deallocated by the Garbage Collector when the JVM is in trouble to fulfill the increasing memory demand. If you want to know more about them, there are plenty great article explaining it, like [1], [2], [3]. Applied on broadcast variables, it means that the GC can deallocate the cached value at any moment but since it was previously downloaded to the local file system, there is no risk that the executor has to do it again. And that's why you will often find the statements that the broadcasts are sent to the executors only once. In other words, if you change it locally, the change won't be propagated. You can find a proof for that in the following demo:
Broadcast join in Structured Streaming
I already gave you a hint what will happen with broadcast joins in Structured Streaming micro-batch queries. Since they're executed iteratively, well yes, the joined dataset will be broadcasted every time.
If you check what happens inside runBatch(sparkSessionToRunBatch: SparkSession) method, responsible for executing every micro-batch, you will see different things:
- retrieval for metadata for the new data - eg. offsets from a Kafka topic
- use of the new metadata in the in the existing logical plan - the plan is updated with the offsets fetched previously:
// Replace sources in the logical plan with data that has arrived since the last batch. val newBatchesPlan = logicalPlan transform { // For v2 sources. case r: StreamingDataSourceV2Relation => newData.get(r.stream).map { case OffsetHolder(start, end) => r.copy(startOffset = Some(start), endOffset = Some(end)) }.getOrElse { LocalRelation(r.output, isStreaming = true) } }
- create the logical plan for the given trigger
// Rewire the plan to use the new attributes that were returned by the source. val newAttributePlan = newBatchesPlan transformAllExpressions { case ct: CurrentTimestamp => // CurrentTimestamp is not TimeZoneAwareExpression while CurrentBatchTimestamp is. // Without TimeZoneId, CurrentBatchTimestamp is unresolved. Here, we use an explicit // dummy string to prevent UnresolvedException and to prevent to be used in the future. CurrentBatchTimestamp(offsetSeqMetadata.batchTimestampMs, ct.dataType, Some("Dummy TimeZoneId")) case cd: CurrentDate => CurrentBatchTimestamp(offsetSeqMetadata.batchTimestampMs, cd.dataType, cd.timeZoneId) } val triggerLogicalPlan = sink match { case _: Sink => newAttributePlan case _: SupportsWrite => newAttributePlan.asInstanceOf[WriteToMicroBatchDataSource].createPlan(currentBatchId) case _ => throw new IllegalArgumentException(s"unknown sink type for $sink") }
- plan the new physical execution -
reportTimeTaken("queryPlanning") { lastExecution = new IncrementalExecution( sparkSessionToRunBatch, triggerLogicalPlan, outputMode, checkpointFile("state"), id, runId, currentBatchId, offsetSeqMetadata) lastExecution.executedPlan // Force the lazy generation of execution plan }
And during the query planning, the optimization rules are applied:lazy val executedPlan: SparkPlan = { // We need to materialize the optimizedPlan here, before tracking the planning phase, to ensure // that the optimization time is not counted as part of the planning phase. assertOptimized() executePhase(QueryPlanningTracker.PLANNING) { // clone the plan to avoid sharing the plan instance between different stages like analyzing, // optimizing and planning. QueryExecution.prepareForExecution(preparations, sparkPlan.clone()) } } private[execution] def prepareForExecution( preparations: Seq[Rule[SparkPlan]], plan: SparkPlan): SparkPlan = { preparations.foldLeft(plan) { case (sp, rule) => rule.apply(sp) } }
- trigger sink execution - the built plan is later triggered from a new instance of the Dataset:
val nextBatch = new Dataset(lastExecution, RowEncoder(lastExecution.analyzed.schema)) val batchSinkProgress: Option[StreamWriterCommitProgress] = reportTimeTaken("addBatch") { SQLExecution.withNewExecutionId(lastExecution) { sink match { case s: Sink => s.addBatch(currentBatchId, nextBatch) case _: SupportsWrite => // This doesn't accumulate any data - it just forces execution of the microbatch writer. nextBatch.collect() } lastExecution.executedPlan match { case w: WriteToDataSourceV2Exec => w.commitProgress case _ => None } } }
In the steps above you can see that nothing is reused except the initial logical plan. And since the physical plan is not reused, the broadcast join dataset will be broadcasted at every micro-batch. Below you can find a demo proving that:
Often the broadcasting is a way to accelerate the processing logic but as you saw, there are some gotchas in Structured Streaming. Broadcast variables are quite clear since they keep the same semantic as for the batch applications. On the other hand, broadcast joins, due to the incremental character of the streaming, are a little bit different. They're still sent only once to every executor but only within the scope of the given micro-batch.