ShuffleManager
is the pluggable mechanism for shuffle systems that track shuffle dependencies for ShuffleMapStage
on the driver and executors.
Note
|
SortShuffleManager (short name: sort or tungsten-sort ) is the one and only ShuffleManager in Spark 2.0.
|
spark.shuffle.manager Spark property sets up the default shuffle manager.
The driver and executor access their ShuffleManager
instances using SparkEnv.
val shuffleManager = SparkEnv.get.shuffleManager
The driver registers shuffles with a shuffle manager, and executors (or tasks running locally in the driver) can ask to read and write data.
It is network-addressable, i.e. it is available on a host and port.
There can be many shuffle services running simultaneously and a driver registers with all of them when CoarseGrainedSchedulerBackend is used.
trait ShuffleManager {
def registerShuffle[K, V, C](
shuffleId: Int,
numMaps: Int,
dependency: ShuffleDependency[K, V, C]): ShuffleHandle
def getWriter[K, V](
handle: ShuffleHandle,
mapId: Int,
context: TaskContext): ShuffleWriter[K, V]
def getReader[K, C](
handle: ShuffleHandle,
startPartition: Int,
endPartition: Int,
context: TaskContext): ShuffleReader[K, C]
def unregisterShuffle(shuffleId: Int): Boolean
def shuffleBlockResolver: ShuffleBlockResolver
def stop(): Unit
}
Note
|
ShuffleManager is a private[spark] contract.
|
Method | Description |
---|---|
Executed when |
|
Used when a |
|
Returns a ShuffleReader for a range of partitions (to read key-value records for a ShuffleDependency dependency). Used when CoGroupedRDD, ShuffledRDD, SubtractedRDD, and ShuffledRowRDD compute their partitions. |
|
Executed when ??? removes the metadata of a shuffle. |
|
Used when: 1. 2. |
|
Used when |
Tip
|
Review ShuffleManager sources.
|
Spark Property | Default Value | Description |
---|---|---|
|
ShuffleManager for a Spark application. You can use a short name or the fully-qualified class name of a custom implementation. The predefined aliases are |
-
(slides) Spark shuffle introduction by Raymond Liu (aka colorant).