State expiration in stream-to-stream joins with event time range condition

You certainly know it, the watermark (aka GC Watermark) is responsible for cleaning state store in Apache Spark Structured Streaming. But you may not know that it's not the single time-based condition. There is a different one involved in the stream-to-stream joins.

4-day workshop · In-person or online

What would it take for you to trust your Databricks pipelines in production?

A 3-day bug hunt on a 3-person team costs up to €7,200 in lost engineering time. This workshop teaches you to prevent that — unit tests, data tests, and integration tests for PySpark and Databricks Lakeflow, including Spark Declarative Pipelines.

Unit, data & integration tests
Medallion architecture & Lakeflow SDP
Max 10 participants · production-ready templates
See the full curriculum → €7,000 flat fee · cohort of up to 10
Bartosz Konieczny
Bartosz
Konieczny

The Structured Streaming documentation presents 2 different event-time constraints you can apply to your joins:

The event-time window is quite self-explanatory and you'll typically use it for window-based joins. But what about the "Time range conditions"? To understand it, let's take an example of the ads displays, often given to explain stream-to-stream joins:

I highlighted the condition I'm talking about in this blog post. If we want to translate to more human-friendly language, it'll be:


Join ads and clicks on the ad_id. Additionally, include only the clicks displayed at most 10 minutes after the ad display.

It's inherently a definition of a business rule that an ad cannot be clicked after 10 minutes. Besides this user-facing character, this condition also has an implication on the state store.

Join simulation

Let's simulate a join between an ad and an impression, with a special focus on the watermark:

As you clearly see, the state store manager doesn't remove the 10:00 o'clock impression despite moving the watermark to 10:06. Why? The business rule clearly states that the impression can be matched even 10 minutes after. As a consequence, the state expiration time uses this time range instead of the watermark that is usually employed in stateful processing.

Some internals

Under-the-hood, Apache Spark uses StreamingJoinHelper class to generate the time-range (aka value) watermark:

The generation logic starts with the step transforming the logical plan into the physical plan. It relies on the StreamingSymmetricHashJoinHelper to generate the GC Watermark for the state store. The physical value generation happens in the getStateValueWatermark function and consists of the following steps:

Even after reading this blog post, watermark will still probably be associated with the state expiration. However, as you saw for the time range conditions, it may not be used alone in some scenarios.

Data Engineering Design Patterns

Looking for a book that defines and solves most common data engineering problems? I wrote one on that topic! You can read it online on the O'Reilly platform, or get a print copy on Amazon.

I also help solve your data engineering problems contact@waitingforcode.com đź“©