In Spark blocks are everywhere. They represent broadcasted objects, they are used as support for intermediate steps in shuffle process, or finally they're used to store temporary files. But very often they're disregarded at the beginning because of more meaningful concepts, as transformations and actions - even if without blocks, both of them won't be possible.
Data Engineering Design Patterns
Looking for a book that defines and solves most common data engineering problems? I'm currently writing
one on that topic and the first chapters are already available in π
Early Release on the O'Reilly platform
I also help solve your data engineering problems π contact@waitingforcode.com π©
In this post we'll focus on blocks part. The first part explains some basic facts about them: definition, use cases. The second part dives into implementation details that help to make insight on other, more complicated aspects as replication or blocks eviction. The last part, through some learning tests, shows the blocks behavior.
Spark blocks basics
The most of data living in Spark applications is grouped into blocks. Thus simply speaking, blocks in Spark contain data either used as tasks inputs or returned as tasks outputs. Among the types of data stored in blocks we can find:
- RDD - each RDD is composed of multiple blocks. The number of blocks corresponds to the number of partitions:
/** * Gets or computes an RDD partition. Used by RDD.iterator() when an RDD is cached. */ private[spark] def getOrCompute(partition: Partition, context: TaskContext): Iterator[T] = { val blockId = RDDBlockId(id, partition.index) var readCachedBlock = true // This method is called on executors, so we need call SparkEnv.get instead of sc.env. SparkEnv.get.blockManager.getOrElseUpdate(blockId, storageLevel, elementClassTag, () => { // ...
- shuffle - in this category we can distinguish shuffle data, shuffle index and temporary shuffle files (intermediate results).
- broadcast - broadcasted data is organized in blocks too. Thanks to that the parts composing the broadcast object can be fetched from multiple sources simultaneously. You can read more about that in the post Zoom at broadcast variables.
- task results
- stream data - in DStream-oriented streaming applications data loaded by receivers is grouped to blocks and used in later processing.
- temporary data - all temporary data, for instance the spilled results (results written on disk when there are no more place in memory)
What is the main idea behind blocks ? Among others, the blocks help to achieve greater parallelism. For instance, if given block can be retrieved from 4 different executor nodes, it'll be quicker to fetch it from them in parallel manner instead of making sequential HTTP calls for one specific node.
Blocks are stored on every node, independently on its character (driver/executor). They can be persisted on disk or in memory (on/off heap), both locally or remotely.They are stored during some period of time. After it, they're evicted. As mentioned in the previous paragraph, the blocks can be read locally or remotely. Both situations can be easily detected in logs. The local reads are represented by entries starting with INFO Found block [blockId] locally. The remote access is indicated with logs having INFO Found block [blockId] remotely.
In addition to local/remote fetches and automatic eviction, the blocks can also be replicated. Obviously, the replication level can be specified during RDD calculation to improve fault tolerance.
Spark blocks internals
All operations related to blocks are managed by the process called block manager. Internally it's represented by the BlockManager class. It takes care about: blocks retrieval, blocks writing or blocks replication. We distinguish slave and master block managers. In addition to blocks manipulations, slaves send requests about added/removed blocks to the master which is the source of truth for the state of living blocks.
As told previously, the blocks can be not only read remotely, but also saved on remote node. This feature is used when the replication level is greater than 1. BlockManager uses then an instance of org.apache.spark.network.BlockTransferService to upload replicated blocks to other nodes. These nodes are called peers and are resolved according to defined replication policy. The default used strategy randomly sorts available nodes and takes the first ones from the sorted list. It's implemented in org.apache.spark.storage.RandomBlockReplicationPolicy class.
Blocks eviction is also controlled by block manager. The master block manager asks the slaves to remove blocks of stored RDDs, shuffles or broadcasts. The code triggering the cleanup is defined in org.apache.spark.ContextCleaner class. The cleanup is the long-living process checking if stored objects (RDD, shuffle, broadcast, accumulators) are eligible for the cleanup. They become eligible when they're no more referenced.
Also the clean up can happen when there are no more place to save new blocks. In such case the LRU strategy is used to evict already existent blocks. Under-the-hood, the use of this strategy is guaranteed by the java.util.LinkedHashMap that stores all blocks. Later, the MemoryStore's evictBlocksToFreeSpace(blockId: Option[BlockId], space: Long, memoryMode: MemoryMode) iterates over all accumulated entries and removes the blocks eligible for the removal. And since the LinkedHashMap stores entries in the order of insertion (the most recent at the beginning), the LRU strategy can be easily applied. The removal ends when the freed space is equal to the reclaimed one.
Spark blocks examples
To see what happens with the blocks in Spark, let's execute some learning tests checking entries printed to the logs:
"simple processing with grouping" should "accumulate basic logs" in { val logAppender = InMemoryLogAppender.createLogAppender(Seq("Getting local block", "Level for block", "Getting 5 non-empty blocks", "Updated info of block", "in memory on", "Block", "Started 0 remote fetches", "Told master about block broadcast_1_piece0")) val data = sparkContext.parallelize(1 to 100, 5) data.map(number => (number%2, number)) .groupByKey(3) .foreach(number => { println(s"Number=${number}") }) val logMessages = logAppender.getMessagesText() // This log appears when worker sends the UpdateBlockInfo message to // the master's block manager. The UpdateBlockInfo message contains the // information about block's id, storage level, the size taken in memory and on disk logMessages should contain ("Updated info of block broadcast_0_piece0") // This message tells that the block manager tries to retrieve the block // locally. If the block is not found locally, it's later fetched // from remote block manager logMessages should contain ("Getting local block broadcast_0") // Here the block manager informs the master about the state // of given block. It's important since sometimes the expected storage // level can not be met (e.g. MEMORY+DISK is demanded but only DISK is // written) and thanks to the message, the master will know that logMessages should contain ("Told master about block broadcast_1_piece0") // The 2 logs below represent the shuffle operation. The first one is printed // when the ShuffleBlockFetcherIterator iterates among all blocks to fetch and // resolves which ones are stored locally and which ones remotely. // The 2nd log is printed when fetchnig of shuffle remote blocks begins. logMessages should contain ("Getting 5 non-empty blocks out of 5 blocks") logMessages.find(log => log.startsWith("Started 0 remote fetches")) should not be empty; } "the processing with replicated cache" should "generate logs showing the replication" in { val logAppender = InMemoryLogAppender.createLogAppender(Seq("Level for block rdd_0_1", "Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication", "Replicating rdd_0_0 of", "Block rdd_0_1 replicated to only 0", "Block rdd_0_0 replicated to only 0")) val data = sparkContext.parallelize(1 to 100, 5) data.persist(StorageLevel.MEMORY_ONLY_2).map(number => (number%2, number)) .groupByKey(3) .foreach(number => { println(s"Number=${number}") }) val logMessages = logAppender.getMessagesText() // The information about expected replication level is shown in the logs // when block's data is retrieved from the local block manager logMessages should contain ("Level for block rdd_0_1 is StorageLevel(memory, deserialized, 2 replicas)") // Here block manager indicates that it's doing the replication of given block (rdd_0_0 in example) // to some number of nodes in the cluster. Below log contains 0 as its number since the // code is executed against local master. The value should be 1 in the real cluster of 2 nodes at least. logMessages.find(log => log.startsWith("Replicating rdd_0_0 of 40 bytes to 0 peer(s)")) should not be empty; // This log is a warning message when the number of replicated blocks doesn't met the // expectation of storage level logMessages should contain ("Block rdd_0_0 replicated to only 0 peer(s) instead of 1 peers") }
This post explains some points about blocks. The first part shown that they're used in plenty of Spark objects: RDDs, shuffle data and indexes, broadcast variables and even temporary files. It also presented their lifecycle. The second section focused on implementation details. From it we could learn about BlockManager - blocks handler initialized on every node (driver/worker) helping to read and write blocks. This part also introduced the distinction between master and slave block managers and also made an insight on LRU eviction strategy. The last part, almost as usual, proved the concepts related to the blocks through learning tests checking logs.