Introduction to Apache Spark History

Versions: Apache Spark 3.3.0

If you need to go back in time and analyze your past Apache Spark applications, you can use the native Apache Spark History server. However, it can also be an infrastructure problem because of the continuously increasing historical logs for streaming applications. In this blog post we'll try to understand this component and to see different configuration options.

Architecture

In a nutshell, the Apache Spark history UI is composed of 2 main components, the web server and the data provider. The former is responsible for exposing the programmatic interfaces that expose the HTML code to the browser. It's mostly a servlet-based component that analyzes user HTTP requests and serves appropriate content. For example, when you access the /jobs endpoint, the history server will get its content from the AllJobsPage renderer:

private[ui] class AllJobsPage(parent: JobsTab, store: AppStatusStore) extends WebUIPage("") {

  def render(request: HttpServletRequest): Seq[Node] = {

// ...
if (shouldShowActiveJobs) {
  content ++=
    <span id="active" class="collapse-aggregated-activeJobs collapse-table"
        onClick="collapseTable('collapse-aggregated-activeJobs','aggregated-activeJobs')">
      	<h4>
        <span class="collapse-table-arrow arrow-open"></span>
        <a>Active Jobs ({activeJobs.size})
      	</h4>
    </span> ++
    <div class="aggregated-activeJobs collapsible-table">
      	{activeJobsTable}
    </div>
}
// ...
UIUtils.headerSparkPage(request, "Spark Jobs", content, parent,
  helpText = Some(helpText))
  }

The web server has some logic but it couldn't exist without the data, so the Apache Spark logs parsed by the data provider component. The parsing is a background process so far only implemented for the file system logs source as the FsHistoryProvider class. It wakes up every spark.history.fs.update.interval period and verifies the new logs to include in the history page. The logic doesn't rely on the modification time, though. Instead, the process compares the size of the previously parsed file with its most recent version. According to the Scaladoc, this file size-based comparison is there to address these issues:

/**
* ...
* The use of log size, rather than simply relying on modification times, is needed to
 * address the following issues
 * - some filesystems do not appear to update the `modtime` value whenever data is flushed to
 * an open file output stream. Changes to the history may not be picked up.
 * - the granularity of the `modtime` field may be 2+ seconds. Rapid changes to the FS can be
 * missed.
 * ...
 */
private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
  extends ApplicationHistoryProvider with Logging {

Page generation

FsHistoryProvider has a method called getAppUI:

private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
  extends ApplicationHistoryProvider with Logging {
// ...
  /**
   * Returns the Spark UI for a specific application.
   *
   * @param appId The application ID.
   * @param attemptId The application attempt ID (or None if there is no attempt ID).
   * @return a [[LoadedAppUI]] instance containing the application's UI and any state information
   *     	for update probes, or `None` if the application/attempt is not found.
   */
  def getAppUI(appId: String, attemptId: Option[String]): Option[LoadedAppUI]
// ...

This method either loads the UI from cache or rebuilds it from scratch. The former happens when no new logs have been created since the last logs parsing event. The latter involves the opposite, i.e. each new parsed log invalidates the existing application and requires rebuilding it from the existing log files:

private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
  extends ApplicationHistoryProvider with Logging {
  override def getAppUI(appId: String, attemptId: Option[String]): Option[LoadedAppUI] = {
// ...
val kvstore = try {
  diskManager match {
    case Some(sm) =>
      loadDiskStore(sm, appId, attempt)
    case _ =>
      createInMemoryStore(attempt)
  }
} catch {
  case _: FileNotFoundException =>
    return None
}

val ui = SparkUI.create(None, new HistoryAppStatusStore(conf, kvstore), conf, secManager,
  app.info.name, HistoryServer.getAttemptURI(appId, attempt.info.attemptId),
  attempt.info.startTime.getTime(), attempt.info.appSparkVersion)

// place the tab in UI based on the display order
	loadPlugins().toSeq.sortBy(_.displayOrder).foreach(_.setupUI(ui))

val loadedUI = LoadedAppUI(ui)
synchronized {
  activeUIs((appId, attemptId)) = loadedUI
}

Some(loadedUI)
  }

I highlighted 2 important parts in the snippet. The kvstore is the instance of the KVStore interface that holds the classes created from the logs parsing. The ui references this local store with the HistoryAppStatusStore instance and uses the created classes in the rendering part:

private[ui] class AllJobsPage(parent: JobsTab, store: AppStatusStore) extends WebUIPage("") {

  def render(request: HttpServletRequest): Seq[Node] = {
// ...
store.jobsList(null).foreach { job =>
  job.status match {
    case JobExecutionStatus.SUCCEEDED =>
      completedJobs += job
    case JobExecutionStatus.FAILED =>
      failedJobs += job
    case _ =>
      activeJobs += job
  }
}

Configuration

Apache Spark history has a dedicated configuration section in the documentation called Viewing after the fact. The page contains a full list of the configuration properties among which you can find:

Why am I writing this blog post? The UI is "just" there. Well, not really. It's a dynamic Apache Spark component that you can configure or debug as any other part of the ecosystem!