Listener or checkpoints for external progress tracking?

Versions: Apache Spark 4.0.1 https://github.com/bartosz25/spark-playground/tree/master/structured-streaming-progress-tracking

Among the many ways to track the progress of Apache Spark Structured Streaming jobs, you'll find custom offset extraction and ingestion into your observability tool of choice. Although this approach sounds simple, it can be implemented in several ways. In this blog post, we're going to discuss two of them: a custom batch job and a custom listener.

Data Engineering Design Patterns

Looking for a book that defines and solves most common data engineering problems? I wrote one on that topic! You can read it online on the O'Reilly platform, or get a print copy on Amazon.

I also help solve your data engineering problems 👉 contact@waitingforcode.com 📩

💡 Several metrics are already available via PrometheusServlet, enabled with spark.sql.streaming.metricsEnabled. If they are not sufficient, or if you are using a different tool for monitoring (e.g. Databricks dashboards), you can apply one of the approaches presented in this blog post.

Listener approach

To help you better understand the first solution, let's take a look at the architecture diagram:

To put it short, you create a class implementing the StreamingQueryListener to intercept four different query states:

I covered the streaming listeners more in detail in the StreamingQueryListener, from states to questions article two years ago. Instead of repeating myself about the execution flow, let me address some practical questions:

The answers are pretty simple. When you deal with multiple data sources, you'll get them directly as part of the sources attribute of the query progress:

def onQueryProgress(self, event: QueryProgressEvent) -> None:
  for source_progress in event.progress.sources:
    processed_offsets = json.loads(source_progress.endOffset)

If you don't want to send many granular updates which is an unnecessary concern if you deal with a streaming broker as the monitoring data sink, you can apply the buffering logic. The listener is a long-living component, i.e. Apache Spark doesn't recreate an instance at each micro-batch. Consequently, you can buffer the events in a list and flush them once a while:

def onQueryProgress(self, event: QueryProgressEvent) -> None: 
  if len(self.events_buffer) >= self.buffer_flush_threshold:
    self._flush_records()

An important thing to keep in mind if you decide to buffer events, though. If your query is idle, Apache Spark won't call the onQueryProgress. Consequently, if you want to avoid your events sticking in the buffer for too long, you also should configure a maximal idleness time:

def onQueryIdle(self, event: QueryIdleEvent): 
  self.idle_calls += 1
  if self.idle_calls >= self.idle_threshold:
    logging.info("Query was idle for too long. Flushing records...")
    self._flush_records()
    self.idle_calls = 0

Checkpoint job

An alternative to the listener-based approach is an Apache Spark job that consumes checkpoint offset information. Here is the high-level design doc:

Structured Streaming jobs emit offset information natively, synchronous or asynchronously with async progress tracking. Checkpoint information is materialized as JSON-like files in object storage. Consequently, you can treat them just like any other valid Apache Spark data source and consume them with the data source API.

Below is a short code snippet that processes the offset information from the checkpoint metadata for the case of heterogeneous data sources (a Delta Lake table and an Apache Kafka topic):

offsets = (spark_session.read.text(f'{CHECKPOINT_DIR}/offsets')
    .withColumn('micro_batch_version',
       F.element_at(F.split(F.input_file_name(), "/"), -1).cast('int'))
    .withColumn('progress_source',
       F.when(F.col('value').contains('reservoirId'), 'Delta Lake')
         .when(F.col('value').startswith('{"visits'), 'Apache Kafka')
         .otherwise(None))
    .filter('progress_source IS NOT NULL')
)

Comparison

Now a question should arise in your head, which one to choose? A more Apache Spark-tied listener, or an isolated job? The answer is, it depends. It depends on what you need. It depends on how you want to track the progress tracking failures.

Listener:

Checkpoint job:

Besides these custom methods that might be appropriate if you use a different observability storage, such as a Databricks volume, or an object on S3, you can configure a PrometheusServlet to emit streaming metrics with spark.sql.streaming.metricsEnabled.

Consulting

With nearly 17 years of experience, including 9 as data engineer, I offer expert consulting to design and optimize scalable data solutions. As an O’Reilly author, Data+AI Summit speaker, and blogger, I bring cutting-edge insights to modernize infrastructure, build robust pipelines, and drive data-driven decision-making. Let's transform your data challenges into opportunities—reach out to elevate your data engineering game today!

👉 contact@waitingforcode.com
đź”— past projects