Spark Memory Allocation
- JVM HEAP: spark.executor.memory (512M)
- Safety of storage: spark.storage.safetyFraction (90% of Heap)
- Storage: spark.storage.memoryFraction (60% of safe)
- Unroll: spark.storage.unrollFraction (20% of Storage)
- Safety of shuffle: spark.shuffle.safetyFraction (80% of Heap)
- Shuffle: spark.shuffle.memoryFraction (20% of Safe)
By default, Spark starts with 512M JVM heap. To be on a safe side and avoid OOM error Spark allows to utilize only 90% of the heap.
spark.executor.memory * spark.storage.safetyFraction * spark.storage.memoryFraction
Some amount of memory is reserved for the caching of the data you are processing, and this part is usually 60% of the safe heap. If you want to know how much data you can cache in Spark, you should take the sum of all the heap sizes for all the executors, by default it is 0.9*0.6=0.54 of the total heap size you allow Spark to use.
spark.executor.memory * spark.shuffle.safetyFraction * spark.shuffle.memoryFraction
Default value for spark.shuffle.safetyFraction is 0.8, default value for spark.shuffle.memoryFraction is 0.2. So finally you can use up to 0.8*0.2=0.16 of the JVM heap for the shuffle.
spark.executor.memory * spark.storage.safetyFraction * spark.storage.memoryFraction * spark.storage.unrollFraction
When shuffle is performed, as well you need to sort the data, you usually need a buffer to store the sorted data. What happens if you don’t have enough memory to sort the data?
There is a wide range of algorithms usually referenced as “external sorting” that allows you to sort the data chunk-by-chunk and then merge the final result together.
Unroll memory with the default values equal to 0.2*0.6*0.9=0.108. This is the memory that can be used when you are unrolling the block of data into the memory.
- Why do you need to unroll the block of data into the memory after all?
Spark allows you to store the data both in serialized and deserialized form. The data in serialized form cannot be used directly, so you have to unroll it before using. It is shared with the storage RAM, which means that if you need some memory to unroll the data, this might cause dropping some of the partitions stored in the Spark LRU cache.
Spark Cluster Arch
Here’s an example, the cluster with 12 nodes running YARN Node Managers, 64GB of RAM each and 32 CPU cores each. This way on each node you can start 2 executors with 26GB of RAM each, each executor with 12 cores to be utilized for tasks. So in total the cluster would handle.
12 machines * 2 executors per machine * 12 cores per executor / 1 core = 288 task
The amount of RAM you can use for caching your data on this cluster is
0.9(spark.storage.safetyFraction)*0.6(spark.storage.memoryFraction)*12 machines * 2 executors per machine * 26 GB per executor = 336.96 GB
Task is a single unit of work performed by Spark, and is executed as a thread in the executor JVM. This is the secret under the Spark low job startup time – forking additional thread inside of the JVM is much faster that bringing up the whole JVM, which is performed when you start a MapReduce job in Hadoop.
Partition size completely depends on the data source you use. For most of the methods to read the data in Spark you can specify the amount of partitions you want to have in your RDD. When you read a file from HDFS, you use Hadoop’s InputFormat to make it. By default each input split returned by InputFormat is mapped to a single partition in RDD. For most of the files on HDFS single input split is generated for a single block of data stored on HDFS, which equals to approximately 64MB of 128MB of data. Approximately, because the data in HDFS is split on exact block boundaries in bytes, but when it is processed it is split on the record splits. For text file the splitting character is the newline char, for sequence file it is the block end and so on.
Spark Memory Management
This is the memory reserved by the system, and its size is hardcoded. As of Spark 1.6.0, its value is 300MB, which means that this 300MB of RAM does not participate in Spark memory region size calculations.
Even if you want to give all the Java Heap for Spark to cache your data, you won’t be able to do so as this “reserved” part would remain spare(not really spare, it would store lots of Spark internal objects). If you don’t give Spark executor at least 1.5*Reserved Memory = 450M, it will fail with “please use larger heap size” error message.
("Java Heap" - "Reserved Memory")*(1.0 - spark.memory.fraction) = ("Java Heap" - 300MB)*0.25
This is the memory pool that remains after the allocation of Spark Memory, and it is completely up to you to use it in a way you like. And again, this is the User Memory and its completely up to you what would be stored in this RAM and how, Spark makes completely no accounting on what you do there and whether you respect this boudary or not. Not respecting this boundary in your code might cause OOM error.
(“Java Heap” – “Reserved Memory”) * spark.memory.fraction
This whole pool is split into 2 regions – Storage Memory and Execution Memory, and the boundary between them is set by spark.memory.storageFraction parameter, which defaults to 0.5.
This pool is used for both storing Apache Spark cached data and for temporary space serialized data “unroll”. Also all the “broadcast” variables are stored there as cached blocks. As of “broadcast”, all the broadcast variables are stored in cache with MEMORY_AND_DISK persistence level.
This pool is used for storing the objects required during the execution of Spark tasks.
Let’s focus on the moving boundary between Storage Memory and Execution Memory. You can not forcefully evict blocks from this pool, because this is the data used in intermediate computations and the process requiring this memory would simply fail if the block it refers to won’t be found.
We can forcefully evict the block from Storage Memory, but can not do so from Execution Memory. When Execution Memory pool can borrow some space from Storage Memory? It happens when either:
- There is free space available in Storage Memory pool, cached blocks don’t use all the memory available there. Then it just reduces the Storage Memory pool size, increasing the Execution Memory pool.
- Storage Memory pool size exceeds the initial Storage Memory region size and it has all this space utilized. This situation causes forceful eviction of the blocks from Storage Memory pool, unless it reaches its initial size.
- Storage Memory pool can borrow some space from Execution Memory pool only if there is some free space in Execution Memory pool available.
What is Shuffle
When you store the data across the cluster, how can you sum up the values for the same key stored on different machines? The only way to do so is to make all the values for the same key be on the same machine, after this you would be able to sum them up.
Shuffling in general has 2 important compression parameters:
- spark.shuffle.compress: whether the engine would compress shuffle outputs or not.
- spark.shuffle.spill.compress: whether to compress intermediate shuffle spill files or not.
Both have the value “true” by default, and both would use `spark.io.compression.codec` codec for compressing the data, which is snappy by default.
Prior to Spark 1.2.0 this was the default option of shuffle(spark.shuffle.manager = hash). But it has many drawbacks, mostly caused by the amount of files it creates – each mapper task creates seperate file for each seperate reducer, resulting in M * R total files on the cluster, where M is the number of “mappers” and R is the number of “reducers”.
Optimization implemented for Hash Shuffle
There is an optimization for the shuffle, controlled by the parameter `spark.shuffle.consolidateFiles`(default is ‘false’).
If your cluster has E executors(`-num-executors` for YARN) and each of them has C cores (`-executor-cores` for YARN) and each task asks for T CPUs(`spark.task.cpus`), then the amount of execution slots on the cluster would be E * C / T, and the amount of files created during shuffle whould be E * C / T * R. With 100 executors 10 cores each allocating 1 core for each task and 46000 “reducers” it would allow you to go from 2 billion files down to 46 million files. Instead of creating new file for each of the reducers, it creates a pool of output files.
Important parameter on the fetch side is `spark.reducer.maxSizeInFlight`(48M by default), which determines the amount of data requested from the remote executors by each reducer, which determines the amount of data requested from the remote executors by each reducer. This size is split equally by 5 parallel requests from different executors to speed up the process. If you would increase the size, you reducers would request the data from “map” task outputs in bigger chunks, which would improve performance, but also increase memory usage by “reducer” processes.
- Fast – no sorting is required, no hash table maintained;
- No memory overhead for sorting the data;
- No IO overhead – data is written to HDD exactly once and read exactly once.
- When the amount of partitions is big, performance starts to degrade due to big amount of output files.
- Big amount of files written to the file system causes I/O skew towards random IO, which is in general up to 100x slower than sequential IO.
Starting Spark 1.2.0, this is the default shuffle algorithm used by Spark(spark.shuffle.manager=sort). With hash shuffle you output one seperate file for each of the “reducers”, while with sort shuffle you’re doing a smarted thing: you output a single file ordered by “reducer” id and indexed, this way you can easily fetch the chunk of the data related to “reducer x” by just getting information about the position of related data block in the file and doing a single fseek before fread.
Of course for small amount of “reducers” it is obvious that hashing to seperate files would work faster than sorting, so the sort shuffle has a “fallback” plan: when the amount of “reducers” is smaller than `spark.shuffle.sort.bypassMergeThreshold`(200 by default). We use the “fallback” plan with hashing the data to separate files and the joining these files together in a single file.
This implementation is that it sorts the data on the “map” side, but does not merge the results of this sort on “reduce” side – in case the ordering of data is needed it just re-sort the data.
What if you don’t have enough memory to store the whole “map” output? You might need to spill intermediate data to the disk. Parameter `spark.shuffle.spill` is responsible for enabling/disabling spilling, and by default spilling is enabled. If you would disable it and there is not enough memory to store the “map” output, you would simply get OOM error.
Each spill file is written to the disk separately, their merging is performed only when the data is requested by “reducer” and the merging is real-time, it does not call somewhat “on-disk-merger” like it happens in MapReduce, it just dynamically collects the data from a number of separate spill files and merges them together using PriorityQueue class.
- Smaller amount of files created on “map” side;
- Smaller amount of random IO operations, mostly sequential writes and reads.
- Sorting is slower than hashing. It might worth tuning the bypassMergeThreshold parameter for cluster to find a sweet spot, but in general for most of the cluster it is even too high with its default.
- In case you use SSD drives for the temporary data of Spark shuffles, hash shuffle might work better for you.
Can be enabled with setting spark.shuffle.manager=tungsten-sort in Spark 1.4.0+. The optimizations implemented in this shuffle are:
- Operate directly on serialized binary data without the need to deserialize it. It uses unsafe(sun.misc.Unsafe) memory copy functions to directly copy the data itself, which works fine for serialized data as in fact it is just a byte array.
- Uses special cache-efficient sorter `ShuffleExternalSorter` that sorts arrays of compressed record pointers and partition ids. By using only 8 bytes of space per record in the sorting array, it works more efficiently with CPU cache.
- As the records are not deserialized, spilling of the serialized data is performed directly. (no deserialize-compare-serialize-spill logic)
- Extra spill-merging optimizations are automatically applied when the shuffle compression codec supports concatenation of serialized streams. This is currently supported by Spark’s LZF serializer, and only if fast merging is enabled by parameter `shuffle.unsafe.fastMergeEnabled`.
The shuffle implementation would be used only when all of the following conditions hold:
- The shuffle dependency specifies no aggregation.
Applying aggregation means the need to store deserialized value to be able to aggregate new incoming values to it. This way you lose the main advantage of this shuffle with its operations on serialized data.
- The shuffle serializer supports relocation of serialized values.
This is currently supported by KryoSerializer and Spark SQL’s custom serializer.
- The shuffle produces less than 16777216 output partitions.
- No individual record is larger than 128 MB in serialized form.
First for each spill of the data it sorts the described pointer array and outputs an indexed partition file, then it merges these partition files together into a single indexed ouput file.
- Not yet handling data ordering on mapper side.
- Not yet offer off-heap sorting buffer.
- Not yet stable.