You certainly know it, the watermark (aka GC Watermark) is responsible for cleaning state store in Apache Spark Structured Streaming. But you may not know that it's not the single time-based condition. There is a different one involved in the stream-to-stream joins.
The Structured Streaming documentation presents 2 different event-time constraints you can apply to your joins:
The event-time window is quite self-explanatory and you'll typically use it for window-based joins. But what about the "Time range conditions"? To understand it, let's take an example of the ads displays, often given to explain stream-to-stream joins:
I highlighted the condition I'm talking about in this blog post. If we want to translate to more human-friendly language, it'll be:
Join ads and clicks on the ad_id. Additionally, include only the clicks displayed at most 10 minutes after the ad display.
It's inherently a definition of a business rule that an ad cannot be clicked after 10 minutes. Besides this user-facing character, this condition also has an implication on the state store.
Let's simulate a join between an ad and an impression, with a special focus on the watermark:
As you clearly see, the state store manager doesn't remove the 10:00 o'clock impression despite moving the watermark to 10:06. Why? The business rule clearly states that the impression can be matched even 10 minutes after. As a consequence, the state expiration time uses this time range instead of the watermark that is usually employed in stateful processing.
Under-the-hood, Apache Spark uses StreamingJoinHelper class to generate the time-range (aka value) watermark:
The generation logic starts with the step transforming the logical plan into the physical plan. It relies on the StreamingSymmetricHashJoinHelper to generate the GC Watermark for the state store. The physical value generation happens in the getStateValueWatermark function and consists of the following steps:
- Splitting all predicates into separate expressions: our initial join condition becomes a sequence of [a.event_time >= i.event_time, c.event_time <= a.event_time + INTERVAL 10 minutes].
- Extracting the watermark from the predicates.
- Getting the final watermark as the min value from the extracted results.
Even after reading this blog post, watermark will still probably be associated with the state expiration. However, as you saw for the time range conditions, it may not be used alone in some scenarios.