What's new in Apache Spark 3.4.0 - shuffle changes

Versions: Apache Spark 3.4.0

Shuffle is a permanent point in the What's new in Apache Spark series. Why? It's often one the most time consuming part of the jobs and knowing the improvement simply helps writing better pipelines.

RocksDB shuffle service state store

The author of the first shuffle-related feature is Yang Jie. He extended the shuffle service state store with a RocksDB implementation. But first, why is RocksDB there? The shuffle service is a proxy on each executor in the cluster. While reading shuffle data, the executors communicate with the proxy instead of directly with themselves. You may be thinking that in this configuration the shuffle blocks are stored in RocksDB but it's not the case. The service only tracks the files, so it needs to maintain the state of the managed executors. More precisely, this:

public class ExecutorShuffleInfo implements Encodable {
/** The base set of local directories that the executor stores its shuffle files in. */
  public final String[] localDirs;
  /** Number of subdirectories created within each localDir. */
  public final int subDirsPerLocalDir;
  /**
   * Shuffle manager (SortShuffleManager) that the executor is using.
   * If this string contains semicolon, it will also include the meta information
   * for push based shuffle in JSON format. Example of the string with semicolon would be:
   * SortShuffleManager:{"mergeDir": "mergeDirectory_1", "attemptId": 1}
   */
  public final String shuffleManager;
// ...
}

The problem with the external shuffle service, as with anything else, is the risk of failure. Historically, the issue was solved in ExternalShuffleService should be robust to NodeManager restarts in yarn, so in the 1.6.0 release. The Pull Request proposed by Imran Rashid for linked JIRA details the shuffle service state store purpose very well:


In general, Yarn apps should be robust to NodeManager restarts. However, if you run spark with the external shuffle service on, after a NM restart all shuffles fail, b/c the shuffle service has lost some state with info on each executor. (Note the shuffle data is perfectly fine on disk across a NM restart, the problem is we've lost the small bit of state that lets us find those files.) The solution proposed here is that the external shuffle service can write out its state to a file every time an executor is added. When running with yarn, that file is in the NM's local dir. Whenever the service is started, it looks for that file, and if it exists, it reads the file and re-registers all executors there

Whenever a new executor starts, it gots registered within the external shuffle service here:

public class ExternalShuffleBlockResolver {

  public void registerExecutor(
  	String appId, String execId, ExecutorShuffleInfo executorInfo) {
	AppExecId fullId = new AppExecId(appId, execId);
	logger.info("Registered executor {} with {}", fullId, executorInfo);
	try {
  	if (db != null) {
    	byte[] key = dbAppExecKey(fullId);
    	byte[] value = mapper.writeValueAsString(executorInfo).getBytes(StandardCharsets.UTF_8);
    	db.put(key, value);
  	}
	} catch (Exception e) {
  	logger.error("Error saving registered executors", e);
	}
	executors.put(fullId, executorInfo);
  }

Until the 3.4.0 release, Apache Spark has supported only LevelDB as the "db" backend. Now, you can configure it in the spark.shuffle.service.db.backend and use RocksDB.

At this occasion it's worth mentioning the problems you may encounter while running LevelDB-based components on Apple Silicon and the external shuffle service state store was one of them.

Push-based shuffle

In this section, let's start with a bug fix proposed by Wan Kun to remove shuffle merge data files for the push-based shuffle. Starting from now, the driver sends a RemoveShuffleMerge message to all the merger locations that triggers all related merge data files.

The second fix, this time implemented by gaoyajun02, solves the problem of reading 0-size blocks that may come from nodes with hardware issues. In that case, the shuffle block fetcher will fallback into retrieving original shuffle blocks if available.

Additionally, there is also a fault-tolerance-related improvement. Ye Zhou enabled the push-based shuffle service to store its state in the LevelDB instance. It preserves losing the state in case NodeManager restarts which could lead to the inability to serve fetch requests for merged data.

Finally, there were some metrics-related changes. Thejdeep Gudivada added client-side read metrics. Thanks to it, you can learn about things like number of corrupted merged shuffle block chunks, number of fallbacks into the normal shuffle block reading, and many others.

I'm fully aware this blog post is much more low-level than the previous ones of the series. However, it's a good reminder of the shuffle extra components including the shuffle service and push-based shuffle added not so long ago (Apache Spark 3.2.0).