Apache Spark listeners

Versions: Apache Spark 3.3.0 https://github.com/bartosz25/spark-playground/tree/master/spark-listeners

Message bus is a common architectural design in the Enterprise Design Patterns. But it's also present at a lower level to enable the event-driven behavior. Apache Spark is not an exception. It uses a publish/subscribe approach in various places.

Agnostic design

A message bus architecture involves 3 actors:

The implementation looks the same in Apache Spark.

SparkListener

Job-related events are the first type of messages you can listen to in Apache Spark. They expose what happens for:

Apache Spark comes with a handy abstract class called SparkListener that defines no-op event handlers for all existing events. Thanks to this class, if you need to only react on one specific category, you don't have to redefine all on* callback methods.

To register a custom listener, you can call SparkContext#addSparkListener(listener: SparkListenerInterface) function that will add your class to the listeners queue.

An event publication consists of calling the LiveListenerBus#post(event: SparkListenerEvent) with the case class representing the event. Event dispatching to the listeners is a simple pattern matching on the event classes:

private[spark] trait SparkListenerBus
  extends ListenerBus[SparkListenerInterface, SparkListenerEvent] {
// ...
  protected override def doPostEvent(
      listener: SparkListenerInterface,
      event: SparkListenerEvent): Unit = {
    event match {
      case stageSubmitted: SparkListenerStageSubmitted =>
        listener.onStageSubmitted(stageSubmitted)
      case stageCompleted: SparkListenerStageCompleted =>
        listener.onStageCompleted(stageCompleted)
      case jobStart: SparkListenerJobStart =>
        listener.onJobStart(jobStart)
      case jobEnd: SparkListenerJobEnd =>
// ...

QueryExecutionListener

Another category of listeners comes from Apache Spark SQL and the QueryExecutionListener class. It's a poorer sibling of the SparkListener because it only has 2 callbacks:

To register the listeners, you can specify the full class name in the spark.sql.queryExecutionListeners config parameter or directly call the ExecutionListenerManager#register(listener: QueryExecutionListener) method.

The logic responsible for dispatching the event to the handlers is defined inside the ExecutionListenerBus#doPostEvent(listener: QueryExecutionListener, event: SparkListenerSQLExecutionEnd). It uses the pattern matching on top of the SparkListenerSQLExecutionEnd to get the success or failure handler:

class ExecutionListenerManager private[sql](
    session: SparkSession,
    sqlConf: SQLConf,
    loadExtensions: Boolean)
  extends Logging {
// ...
  override protected def doPostEvent(
      listener: QueryExecutionListener,
      event: SparkListenerSQLExecutionEnd): Unit = {
    if (shouldReport(event)) {
      val funcName = event.executionName.get
      event.executionFailure match {
        case Some(ex) =>
          val exception = ex match {
            case e: Exception => e
            case other: Throwable =>
              QueryExecutionErrors.failedToExecuteQueryError(other)
          }
          listener.onFailure(funcName, event.qe, exception)
        case _ =>
          listener.onSuccess(funcName, event.qe, event.duration)
      }
    }
  }

StreamingQueryListener

The StreamingQueryListener is the last example of the listeners I want to share with you here. As its name indicates, the class is responsible for the events related to the Structured Streaming queries. It defines 3 callbacks:

There are 2 ways to attach a new listener. The first uses the spark.sql.streaming.streamingQueryListeners configuration entry where you can define a list of listeners to use in the job. The second method calls StreamingQueryManager#addListener(listener: StreamingQueryListener) directly.

When it comes to the dispatch, it's defined in the same doPostEvent method but this time, in the StreamingQueryListenerBus:

class StreamingQueryListenerBus(sparkListenerBus: Option[LiveListenerBus])
  extends SparkListener with ListenerBus[StreamingQueryListener, StreamingQueryListener.Event] {

// ...
  override protected def doPostEvent(
      listener: StreamingQueryListener,
      event: StreamingQueryListener.Event): Unit = {
    def shouldReport(runId: UUID): Boolean = {
      // When loaded by Spark History Server, we should process all event coming from replay
      // listener bus.
      sparkListenerBus.isEmpty ||
        activeQueryRunIds.synchronized { activeQueryRunIds.contains(runId) }
    }

    event match {
      case queryStarted: QueryStartedEvent =>
        if (shouldReport(queryStarted.runId)) {
          listener.onQueryStarted(queryStarted)
        }
      case queryProgress: QueryProgressEvent =>
        if (shouldReport(queryProgress.progress.runId)) {
          listener.onQueryProgress(queryProgress)
        }
      case queryTerminated: QueryTerminatedEvent =>
        if (shouldReport(queryTerminated.runId)) {
          listener.onQueryTerminated(queryTerminated)
        }
      case _ =>
    }
  }

Example

I hope you saw from the above sections that writing and registering a listener is quite easy. In case you're still confused, below you can find 3 code snippets showing how to define and register each type of listeners:

In addition to the observable metrics Spark listeners are another way to see what happens with your job. The difference between these 2 components is the scope. The metrics operate on the data whereas the listeners apply to the query- or job-related attributes.


If you liked it, you should read:

📚 Newsletter Get new posts, recommended reading and other exclusive information every week. SPAM free - no 3rd party ads, only the information about waitingforcode!