Shuffling in Spark: How to Balance Performance With Getting It Done
Reduce the data that has to move or avoid disk spilling - as best you can
Spark is wonderful, Spark is awful.
I guess it just depends if the query plan agrees with your intent.
Spark’s query optimizer is excellent, and in most cases you should trust it. But it’s helpful to verify that you’ve expressed your transformation in way that Spark likes.
In this post I’ll share how to check your query plans and what to do if you need to shuffle some data around.
Pre-Processing Data for Model Training
I was recently preparing a large dataset for deep learning.
When you train a model, especially on time series data, it’s helpful to randomize your training data so the model can learn more efficiently. However, our partitioned dataset had similar data grouped together after extraction.
We needed to randomize the training set for the initial epoch.
The job was painfully inefficient from the beginning because of a global sort operation. This was causing a massive data shuffle. Fortunately, in my case I was able to hash the composite key and manually assign a partition number to each row. This enabled me to do a manual repartition and then a randomize using sortWithinPartition.
I was lucky.
We could avoid the global sort and get pretty good randomness with this method. Good enough for our first epoch of training. While finding this balance between randomness and volume of shuffling worked for me, I spent a lot of time digging into shuffle operations in Spark.
Along the way, I learned two things:
Spark is excellent at optimizing on its own (but make sure you ask for what you want correctly).
To improve Spark performance, do your best to avoid shuffling.
Here’s what I learned about how to find and reduce Shuffling in your Spark jobs.
Understanding Shuffles in Spark
What is a Shuffle?
A shuffle is when data needs to move between executors.
Shuffles are bad for performance because:
Network I/O is slow
Executors need more memory to hold the intermediate data
Data can spill to disk if executors run out of memory
Narrow vs. Wide Transformations
Narrow Transformations: Do not require data to be moved across executors. Each partition’s transformation can be completed independently (e.g., map, filter). This is Spark at it’s best, parallelizing work across partitions.
Wide Transformations: These operations require data across partitions, triggering a shuffle. Wide transformations include operations like join, groupBy, and distinct, where data from different partitions needs to be co-located for processing.
Keeping transformations narrow when possible minimizes shuffle costs, leading to faster, more efficient jobs.
How to Identify a Shuffle Operation in Spark
Before Running a Job: df.explain()
Remember, Spark uses Lazy Evaluation which means no data actually moves until an action is called. To see the query plan before triggering an action, use df.explain().
How to Read a Spark Query Plan
Bottom-Up: Query plans are read from bottom to top, indicating the flow of data through different operations.
Indentation: Indentation represents parallel operations, meaning those steps can occur simultaneously.
Exchange Operator: The Exchange operation represents a shuffle. This means data will be exchanged between executors.
df.join(df_other, “key”).groupBy(“column”).count().explain()
== Physical Plan ==
*(5) HashAggregate
+- Exchange hashpartitioning(column, 200)
In the above plan, the Exchange is what indicates the data shuffle between executors.
After Running a Job: Spark UI
You can see the following in the Spark UI after job execution to check for shuffles:
Shuffle Read/Write Size: Large sizes (more than your dataset size) often indicate high data movement between nodes.
Disk Spills: High disk spill indicates that shuffle operations are consuming more memory than available, leading to slower performance because data is written to disk.
Query Plan: The AQE may have changed the plan shown by df.explain() — you can see the actual steps taken by your job here.
Techniques for Avoiding Shuffles in Spark
In my experience, there are two big things you can do here:
Reduce the amount of data that needs to be shuffled.
Ensure you have enough memory and partitions to avoid spilling to disk.
Reducing the Amount of Data to Shuffle
Repartition Before
Repartition will shuffle some data around, but depending on your use case, doing one repartition in the beginning can reduce subsequent shuffle operations.
This can be useful if you need to:
Join with this table (repartition on the join key)
Improve distribution on heavily skewed data
Avoid a global sort (as was my case)
Which columns you partition on depends on your downstream use case.
For final datasets, you want to find a balance between too many partitions (repartitioning on high cardinality columns) and too few that would result in not enough data pruning.
The repartition columns should match the downstream use cases.
In the final dataset, this is the actual read query patterns. In your pipeline, this might be something else based on your downstream transformations.
df.repartition(<number of partitions or list of columns>)
A Quick Note on using coalesce() vs. repartition():
coalesce(): Use this when you need to reduce the number of partitions. This doesn’t trigger a shuffle.
repartition(): Use this to increase partitions or collocate data for parallel processing, which requires a shuffle.
Broadcast Joins for Small Datasets:
When one side of a join is relatively small, consider broadcasting that dataset across all executors to avoid a shuffle.
Use the BROADCAST hint, or adjust spark.sql.autoBroadcastJoinThreshold to allow Spark to handle broadcasts automatically. Spark should likely choose this strategy on it’s own, it’s worth checking the query plan too and what the AQE chooses for your job after running.
A broadcast join effectively copies the smaller dataset to each executor.
# Use BROADCAST hint for smaller datasets
df_large.join(broadcast(df_small), “key”)
Caching Data
Caching data can help reduce shuffles in the following scenarios:
When your DataFrame will be used in multiple joins but with different tables
When you are performing multiple transformations on filtered data
In general, to cache some intermediate result that will be used downstream
In either case, caching the DataFrame likely can help reduce any shuffling. Just be careful of the amount of memory you consume.
As with a lot of things in Spark, you’ll need to experiment a bit.
Free up resources by unpersisting cached data once it’s no longer needed.
Choose the Right Storage Level:
MEMORY_ONLY: For smaller datasets that fit comfortably within the executor memory. This option avoids disk spill but may result in recomputation if memory limits are reached.
MEMORY_AND_DISK: For larger datasets. When the executor memory is full, Spark will spill data to disk
df.persist(StorageLevel.MEMORY_AND_DISK)
# Do some work
# ...
# Done with the df
df.unpersist()
Improving Shuffle Performance When It Can’t Be Avoided
When shuffles are necessary, these adjustments can help manage their impact:
Add Executor Memory:
Increase spark.executor.memory to try and keep more data in memory and avoid disk spilling. This depends on the cost and availability of resources in your cluster.
Use Enough Partitions:
For large datasets, increasing the number of shuffle partitions can help with memory pressure.
The AQE can adjust this number between stages, but increasing spark.sql.shuffle.partitions initially will allow the AQE to do so.
Try to set this to at least the number of cores you have running, but increase it further to trade off disk spillage for smaller partitions (that you can coalesce later).
Adaptive Query Execution (AQE):
Although AQE may be enabled by default in some Spark distributions, many production environments still require explicit enabling.
AQE dynamically adjusts the number of shuffle partitions based on runtime metrics, which helps especially with data skew or uneven data distribution.
spark.conf.set(“spark.sql.adaptive.enabled”, “true”)
Additional Resources
Apache Spark Performance Tuning — Official guide from Apache, a great reference for the various knobs you can pull.
Efficient Data Processing in Spark by Joseph Machado — This was a great resource that saved me a lot of time. For me in particular, having a clear guide to reading the Spark query plans and Spark UI was invaluable. Joseph’s book puts this key information in one place that easily made it worth the price.