Very often you will find Apache Spark performance tips related to the hardware (memory, GC) or the configuration parameters (shuffle partitions number, broadcast join threshold). But they're not the single ones you can implement. Moreover, IMO, you should start by the ones presented in this article and optimize your pipeline code before going into more complicated hardware and configuration tuning.
The list is not exhaustive, and I believe that if you see something missing, you will add some tips in the comments 👍 The tips collected so far are organized in 3 big categories, namely data elimination, avoid unnecessary and data storage.
First and foremost, avoid SELECT * if you don't need all the fields. If Spark was a BigQuery, you would do it every time since this GCP data warehouse service is charged by the amount of loaded data. But more seriously, selecting all the columns if you need only a small subset of them will:
- require more storage (memory and disk)
- put a bigger network pressure for shuffle operations
Logically then, the processing will be slower due to this unnecessary I/O increase. But the data elimination through projection is not the single element that could help speed up the processing code. In addition to selecting only the columns you want to work on, you should also select the rows you want to work with, and so as early as possible in your pipeline.
Filtering the rows as early as possible can bring, besides the less storage and smaller memory pressure, some advantages in case of unbalanced data distribution, hence avoiding the data skew problem.
Avoid unnecessary operations
It's time now for the second category, the unnecessary operations. The first one, and quite frequent especially at the beginning, is the lack of schema definition for semi-structured formats like JSON. Apache Spark has this great feature of schema inference and very often its users are unaware of what is happening behind the scenes. To infer the schema, Apache Spark will use samplingRatio% of rows (100% by default), so will read the data twice, once for the inference, once for the processing. It can be then optimized with the explicit schema definition.
Also, avoid shuffle whenever possible. For example, a grouping operation requiring unique values per group can be written as:
newInput.dropDuplicates("id").as[Data] .groupByKey(data => data.user) .mapGroups(Mapping.computeUniqueSum)
But the query plan for this code will generate 2 shuffles, one for the deduplication and another for the mapping. Knowing that we want to avoid the duplicates within a group, the same query can be rewritten to:
newInput.as[Data] .groupByKey(data => data.user) .mapGroups(Mapping.computeUniqueSum)
Here, we could put the deduplication into the mapping function. As you can certainly notice, it goes against the "data elimination" principle, but network exchange reduction should have more beneficial effects for this specific scenario.
From the same category of operations, ask yourself (or your business users) whether the operation has to be performed globally or not. For example, if you need some sorted data and the global sorting is not required, you can opt for more efficient (once again, without shuffle) operation of sortWithinPartitions.
Leverage your storage
The final part is about the storage. Storing your datasets correctly can avoid a lot of performance tuning headaches. First, partition your data. To understand why let's imagine an hourly-based job processing 100GB every hour. If you partition the dataset by day, the last job will be 24 times slower than the first because it'll have to process the whole day of data. On the other hand, it wouldn't happen if the data was partitioned on an hourly basis. Thanks to that, the execution time and the used resource would be constant.
To complete the partitioning, think about bucketing. Thanks to the buckets, Apache Spark can load the data directly to the correct "input partitions" and perform some operations like joins with other datasets, locally, without involving additional network traffic. In addition to the joins, buckets can also help to filter out data very early thanks to the bucket pruning feature.
The last strategy you can apply here is the precomputation. Suppose you have to perform the same operation multiple times, like, for example, compute an identity of a row. In that case, it can be a good idea to enrich the initial dataset and add this information statically. That way, the performance will suffer only once. The same applies to the joins and other compute-intensive operations. Thanks to the nested structures and arrays, you can easily denormalize the join into one dataset and use it further in the pipelines. Of course, precomputation and denormalization have a small drawback of being static, so in case of any change or code regression, you'll have to recompute them.
I hope you saw in this article that Apache Spark tuning is not only about the hardware resources and configuration. You can start a bit more high-level and organize your code and data storage properly. And if you have something else to share, the comments are yours 👊