Often a misconfiguration is the reason of all kinds of issues - performance, security or functional. Spark isn't an exception for this rule and it's the reason why this article focuses on configuration properties available for driver and executors.
Data Engineering Design Patterns
Looking for a book that defines and solves most common data engineering problems? I'm currently writing
one on that topic and the first chapters are already available in π
Early Release on the O'Reilly platform
I also help solve your data engineering problems π contact@waitingforcode.com π©
The first part describes properties available for driver. After, executor properties are presented. Finally, a set of miscelleanous configuration entries is defined. Only some subjectively judging interesting properties are listed in this post. Complete list can be found on Spark configuration page.
Driver configuration in Spark
Most of driver properties are prefixed by spark.driver key. Important properties from the application's point of view concern resources allocated to driver:
- spark.driver.cores - used only in cluster mode, allows to specify the number of cores used in driver program.
- spark.driver.memory - determines memory amount allocated to driver. If Spark context is created programmatically in one of JVM languages, this property should be set before launching this context - once launched, a memory is already allocated to the driver.
- spark.driver.maxResultSize - specifies the maximum size of results retrieved from all partitions when an action is invoked. If set to 0, it means that there are no limit. This property is very sensible since it can lead to OOM errors in driver program - especially in the case when data retrieved from executors uses more memory than specified in spark.driver.memory configuration entry.
When a driver is executed, we can also specify some additional JVM parameters (spark.driver.extraJavaOptions) or supplementary classpath to take into account (spark.driver.extraClassPath).
Executor configuration in Spark
Executors have some common properties with drivers. They concern among others: the amount of memory, the number of cores, additional JVM parameters and supplementary classpath. The difference is that they're prefixed with spark.executor key. A special attention should be reserved to spark.executor.cores. If this property is not configured, it means that only 1 executor for each Spark application will be used. In the other side, if specified, multiple executors would possibly used on the same node worker for given Spark application.
Two other important properties are related to tasks execution efficiency. The first one, spark.executor.heartbeatInterval, defines how often executor should inform driver that it's alive and that executing task progresses. The second property is in fact a set of properties related to speculation. As you already know, if given task is executing slowly, Spark can try to run its speculative copy and use the result returned by the faster execution. Speculation is activated through property spark.speculation and all configuration are:
- spark.speculation.interval - tells in which frequency Spark checks if there are some tasks to speculate.
- spark.speculation.multiplier - specifies how many times slower must be given task to be elected for speculation. The comparison is made against median execution time of tasks.
- spark.speculation.quantile - represents the percentage of tasks that must finish before launching speculation for given stage.
Regarding to tasks, we can configure the number of used cores for each of them (spark.task.cpus) and tolerated number of failures (spark.task.maxFailures). If given task fails exactly as often as this last configuration entry, job for which belongs the task will be aborted.
Miscelleanous properties
For the misc side, two important family of properties exist. The first one concerns shuffle and is prefixed with spark.shuffle key. They specify, among others, if a compression should be used for output (spark.shuffle.compress) and how much memory should be reserved to output stream (spark.shuffle.file.buffer). Especially the second configuration is important because shuffle buffers help to limit the number of operations made on disk. In consequence, correctly configured, it can improve performance.
The second family concerns reducer. spark.reducer.maxSizeInFlight defines the maximum size of reduce tasks fetched simultaneously. Other configuration, spark.reducer.maxReqsInFlight, specifies allowed number of requests fetching reduce tasks results. The goal of this entry consists on limiting hotspots (nodes more charged than others) in cluster.
It's also important to control compression and serialization. As already seen, we can compress shuffled objects. But we can also apply compression to broadcast variables by activating it through spark.broadcast.compress parameter. The same can be done for RDDs through spark.rdd.compress. When a compression is used, it can be tuned with spark.io.compression properties. They help to specify compression codec to use or size blocks for one of codes (lz4, snappy).
For the serialization, Spark configuration allows to specify class used for it through spark.serializer. By default it uses Java serializer but, according to the doc, it' slower than the second available serializer - Kryo. If the second one is used, has a set of properties identified by the prefix spark.kryo.
Another important properties are about memory. But they are described in the article about memory management in Spark.
The article shows several configuration properties available in Spark. The first part lists the entries for driver. We can mostly find parameters related to allocated resources. It's almost the same for executors, described in the second part. In additional, they contain some explanations about speculative tasks. The last part shows the rest of properties, among others, the ones related to data transfer accross network.