State expiration in stream-to-stream joins with event time range condition

Versions: Apache Spark 3.4.0

You certainly know it, the watermark (aka GC Watermark) is responsible for cleaning state store in Apache Spark Structured Streaming. But you may not know that it's not the single time-based condition. There is a different one involved in the stream-to-stream joins.

Looking for a better data engineering position and skills?

You have been working as a data engineer but feel stuck? You don't have any new challenges and are still writing the same jobs all over again? You have now different options. You can try to look for a new job, now or later, or learn from the others! "Become a Better Data Engineer" initiative is one of these places where you can find online learning resources where the theory meets the practice. They will help you prepare maybe for the next job, or at least, improve your current skillset without looking for something else.

👉 I'm interested in improving my data engineering skillset

See you there, Bartosz

The Structured Streaming documentation presents 2 different event-time constraints you can apply to your joins:

The event-time window is quite self-explanatory and you'll typically use it for window-based joins. But what about the "Time range conditions"? To understand it, let's take an example of the ads displays, often given to explain stream-to-stream joins:

I highlighted the condition I'm talking about in this blog post. If we want to translate to more human-friendly language, it'll be:

Join ads and clicks on the ad_id. Additionally, include only the clicks displayed at most 10 minutes after the ad display.

It's inherently a definition of a business rule that an ad cannot be clicked after 10 minutes. Besides this user-facing character, this condition also has an implication on the state store.

Join simulation

Let's simulate a join between an ad and an impression, with a special focus on the watermark:

As you clearly see, the state store manager doesn't remove the 10:00 o'clock impression despite moving the watermark to 10:06. Why? The business rule clearly states that the impression can be matched even 10 minutes after. As a consequence, the state expiration time uses this time range instead of the watermark that is usually employed in stateful processing.

Some internals

Under-the-hood, Apache Spark uses StreamingJoinHelper class to generate the time-range (aka value) watermark:

The generation logic starts with the step transforming the logical plan into the physical plan. It relies on the StreamingSymmetricHashJoinHelper to generate the GC Watermark for the state store. The physical value generation happens in the getStateValueWatermark function and consists of the following steps:

Even after reading this blog post, watermark will still probably be associated with the state expiration. However, as you saw for the time range conditions, it may not be used alone in some scenarios.

If you liked it, you should read:

📚 Newsletter Get new posts, recommended reading and other exclusive information every week. SPAM free - no 3rd party ads, only the information about waitingforcode!