If you need to go back in time and analyze your past Apache Spark applications, you can use the native Apache Spark History server. However, it can also be an infrastructure problem because of the continuously increasing historical logs for streaming applications. In this blog post we'll try to understand this component and to see different configuration options.
Data Engineering Design Patterns
Looking for a book that defines and solves most common data engineering problems? I'm currently writing
one on that topic and the first chapters are already available in π
Early Release on the O'Reilly platform
I also help solve your data engineering problems π contact@waitingforcode.com π©
Architecture
In a nutshell, the Apache Spark history UI is composed of 2 main components, the web server and the data provider. The former is responsible for exposing the programmatic interfaces that expose the HTML code to the browser. It's mostly a servlet-based component that analyzes user HTTP requests and serves appropriate content. For example, when you access the /jobs endpoint, the history server will get its content from the AllJobsPage renderer:
private[ui] class AllJobsPage(parent: JobsTab, store: AppStatusStore) extends WebUIPage("") { def render(request: HttpServletRequest): Seq[Node] = { // ... if (shouldShowActiveJobs) { content ++= <span id="active" class="collapse-aggregated-activeJobs collapse-table" onClick="collapseTable('collapse-aggregated-activeJobs','aggregated-activeJobs')"> <h4> <span class="collapse-table-arrow arrow-open"></span> <a>Active Jobs ({activeJobs.size}) </h4> </span> ++ <div class="aggregated-activeJobs collapsible-table"> {activeJobsTable} </div> } // ... UIUtils.headerSparkPage(request, "Spark Jobs", content, parent, helpText = Some(helpText)) }
The web server has some logic but it couldn't exist without the data, so the Apache Spark logs parsed by the data provider component. The parsing is a background process so far only implemented for the file system logs source as the FsHistoryProvider class. It wakes up every spark.history.fs.update.interval period and verifies the new logs to include in the history page. The logic doesn't rely on the modification time, though. Instead, the process compares the size of the previously parsed file with its most recent version. According to the Scaladoc, this file size-based comparison is there to address these issues:
/** * ... * The use of log size, rather than simply relying on modification times, is needed to * address the following issues * - some filesystems do not appear to update the `modtime` value whenever data is flushed to * an open file output stream. Changes to the history may not be picked up. * - the granularity of the `modtime` field may be 2+ seconds. Rapid changes to the FS can be * missed. * ... */ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) extends ApplicationHistoryProvider with Logging {
Page generation
FsHistoryProvider has a method called getAppUI:
private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) extends ApplicationHistoryProvider with Logging { // ... /** * Returns the Spark UI for a specific application. * * @param appId The application ID. * @param attemptId The application attempt ID (or None if there is no attempt ID). * @return a [[LoadedAppUI]] instance containing the application's UI and any state information * for update probes, or `None` if the application/attempt is not found. */ def getAppUI(appId: String, attemptId: Option[String]): Option[LoadedAppUI] // ...
This method either loads the UI from cache or rebuilds it from scratch. The former happens when no new logs have been created since the last logs parsing event. The latter involves the opposite, i.e. each new parsed log invalidates the existing application and requires rebuilding it from the existing log files:
private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) extends ApplicationHistoryProvider with Logging { override def getAppUI(appId: String, attemptId: Option[String]): Option[LoadedAppUI] = { // ... val kvstore = try { diskManager match { case Some(sm) => loadDiskStore(sm, appId, attempt) case _ => createInMemoryStore(attempt) } } catch { case _: FileNotFoundException => return None } val ui = SparkUI.create(None, new HistoryAppStatusStore(conf, kvstore), conf, secManager, app.info.name, HistoryServer.getAttemptURI(appId, attempt.info.attemptId), attempt.info.startTime.getTime(), attempt.info.appSparkVersion) // place the tab in UI based on the display order loadPlugins().toSeq.sortBy(_.displayOrder).foreach(_.setupUI(ui)) val loadedUI = LoadedAppUI(ui) synchronized { activeUIs((appId, attemptId)) = loadedUI } Some(loadedUI) }
I highlighted 2 important parts in the snippet. The kvstore is the instance of the KVStore interface that holds the classes created from the logs parsing. The ui references this local store with the HistoryAppStatusStore instance and uses the created classes in the rendering part:
private[ui] class AllJobsPage(parent: JobsTab, store: AppStatusStore) extends WebUIPage("") { def render(request: HttpServletRequest): Seq[Node] = { // ... store.jobsList(null).foreach { job => job.status match { case JobExecutionStatus.SUCCEEDED => completedJobs += job case JobExecutionStatus.FAILED => failedJobs += job case _ => activeJobs += job } }
Configuration
Apache Spark history has a dedicated configuration section in the documentation called Viewing after the fact. The page contains a full list of the configuration properties among which you can find:
- spark.history.fs.inProgressOptimization.enabled to fine-tune the logs parsing behavior and improve the History UI availability. The optimization consists of showing the UI when there is enough items and not only in the end of the logs processing which can be a long process.
- spark.eventLog.rolling.enabled and spark.eventLog.rolling.maxFileSize can be used to control the size of the log files. If you enable the rolling, Apache Spark will close the in-progress open log files when they reach the size specified in the maxFileSize parameter. This operation deletes some of the old observations but helps keeping the log size under control. It can be particularly useful for long running streaming jobs.
- spark.history.fs.update.interval defines how often the FsHistoryProvider will look for changes in the logs.
- spark.history.fs.cleaner.enabled enables cleaning old event logs from storage. The spark.history.fs.cleaner.maxAge configures how old the files can be whereas the spark.history.fs.cleaner.maxNum configures the max number of the files to store. Finally, the spark.history.fs.cleaner.interval sets the logs cleaner execution frequency.
Why am I writing this blog post? The UI is "just" there. Well, not really. It's a dynamic Apache Spark component that you can configure or debug as any other part of the ecosystem!