Apache Spark and shuffle management - external services

Versions: Apache Spark 3.0.0

Shuffle accompanies distributed data processing from the very beginning. Apache Spark is not an exception, and one of the prominent features targeted for 3.1 release is the full support for the pluggable shuffle backend. But it's not the single effort made these days by the community to handle shuffle drawbacks. And you will see it in this blog post.

New ebook 🔥

Learn 84 ways to solve common data engineering problems with cloud services.

👉 I want my Early Access edition

The article starts by recalling some difficulties brought by the shuffle. You will also discover there a term I heard for the first time during the preparation of this article. Later, you will see how the shuffle problem was solved at scale by the companies processing PB of data daily. Since I'm not inventing anything new here, just summarizing some interesting papers and talks I found on the internet, please check the links from the "Read also" section for any further and more detailed information.

Why shuffle is hard?

First, it makes the scaling more difficult. Currently, it's quite easy to add new executors to Apache Spark cluster, even at runtime with the Dynamic Resource Allocation, but the opposite (scaling down) is much harder, especially when the workload involves a shuffle operation (group by key, join, ...). A common solution for that is an external shuffle service. However it also doesn't guarantee a seamless scaling down action since there can still be a cluster node storing the shuffle files.

That was a quick summary of the points I covered in one of previous blog posts dedicated to the shuffle in Apache Spark (shuffle service changes). But to understand the proposed shuffle alternatives way better and share with you something interesting, I have to introduce the concept of spindle disk. Spindle handles HDD platters in place, and it's also one of the import measures for the HDD performance. We talk about the number of Rotations Per Minute (RPM). It happens that small I/O sizes will decrease the performance of the operation. From the talk about Cosco - linked below the article - you can learn that reading 4MB at once costs 8 seconds whereas doing the same by the chunks of 64KB is almost 18x more expensive (140s). Imagine a PB scale and small I/O 🤯 It was one of the problems resolved in the proposed alternatives.

The second problem of PB-scale is related to the already mentioned external shuffle service and its connectivity. In the paper about Magnet, presented closer in the next part, you will find that the number of connection will be the multiplication of the number of executors (E) by the number of shuffle services (S) and since at scale both can be big ("up to 1000"), connectivity issues start to arise. If the reducer can't fetch the shuffle block in that context, it will lead to the regeneration of the shuffle data, which may be expensive and slow down the whole processing.

Magnet from LinkedIn - push-merge

The first alternative system addressing the above drawbacks is called Magnet and it comes from LinkedIn. In simple terms, the idea consists of extending already existing Spark components like the tasks and shuffle service (called here Magnet shuffle service). All this to address the problems listed before.

The major addressed problem in Magnet is the small I/O problem. It's solved with a technique called push-merge shuffle. It uses the current Spark shuffle mechanism and sends every created shuffle file to the Magnet shuffle services. But before sending, the file is divided into multiple chunks of MB size. Why the file is divided and why its chunks are delivered to different shuffle services? To facilitate the reducer's work. The mapper sends the shuffle files belonging to the same shuffle partition to the shuffle service. Later, every service merges these blocks into one file served in the shuffle reading step. This merge operation is a best-effort one, which means that the reducers can access the non-merged files as well.

Thanks to these merged shuffle files, the reading is mostly a more optimized sequential I/O pattern instead of multiple small random reads. Moreover, its performance is less dependent on the disk type used to store the shuffle files.

Magnet shuffle service has also a flexible architecture that can be adapted to on-premise and cloud-based scenarios with Dynamic Resource Allocation. For the on-premise scenarios, the dependency is inverted and the external shuffle services are created before the additional executors, i.e. without taking into account the executor's location. Instead, that's the executors, which are created according to the shuffle service location. In the cloud context where the compute is often separated from the storage, Magnet could work in disaggregated clusters where the storage could be an object store but also a node(s) local to the cluster.

Even though Magnet service is used at LinkedIn, it's also a proposal for push-merge shuffle support (SPARK-30602 ), as of this writing targeted to the 3.1.0 release.

Cosco from Facebook - disaggregated shuffle

Another alternative comes from Facebook and is called Cosco. It addresses the same issues with a big focus on the small I/O issues. It's based on the approach called - apart from the spindle, it's another one I learned during this exploration - disaggregated compute and storage. The idea is to separate compute and storage and adapt every layer for what is supposed to do best, so allocate compute power to the compute and storage capacity to the storage.

The main idea of Cosco is to delegate the shuffle preparation to an external shuffle service but optimized for this disaggregated architecture. This external service has an in-memory write-ahead buffer per reduce partition filled with the mappers' shuffle data. Once the buffer is full, its content is materialized as a file on the distributed file system.

Since the mappers communicate with an external service across the network, there is still a risk of failures. If the initial Cosco shuffle service doesn't acknowledge successful writing, the mapper will resend the same data to another shuffle service. It can lead to duplicates that are not good from the data quality point of view on the reducer's side. But Cosco manages it by decorating every row received from the mappers with a (row id, mapper id) information, so that reducer can use them to deduplicate the rows at reading.

Finally, apart from the shuffle service, Cosco brings 2 other components. The first of them is a metadata service responsible for tracking the files generated by every mapper stage. This information is very useful when one of the mapper files cannot be fetched and it has to be regenerated - Cosco knows what map stage to recompute. The second extra component is Cosco scheduler, adapted to the shuffle service's distributed character.

The concepts presented in this blog post are not unique. There is an ongoing effort to provide a pluggable shuffle API (SPARK-25999) and enable elastic workloads on Kubernetes much easier. And if you think that only the cloud providers' object stores are considered among the storage solutions, you're wrong. Last year, Google presented a talk about using a network file system called Elastifile to store shuffle files that performed much better than the analyzed alternatives (HDFS, GCS, BigTable). Long story short, despite a hard problem to solve which is the elastic shuffle management, there is hope and who knows, maybe one day we'll all use a serverless (vanilla) Spark in the cloud 🤞.