A shuffle manager for Spark that supports different storage plugins.
The motivation of this project is to supply a fast, flexible and reliable shuffle manager that allows the user to plug in his/her favorite backend storage and network frameworks for holding and exchanging shuffle data.
In general, the current shuffle manager in Spark has some shortcomings.
- The local shuffle data have limitations on reliability and performance.
- Losing a single node can break the data integrity of the entire cluster.
- It is difficult to containerize the application.
- In order to improve the shuffle read/write performance, you must upgrade each server in the cluster.
- the overall performance of the shuffle stage is affected by the performance of local disk IO when there is heavy shuffling.
- There is no easy/general solution to plugin external storage to the shuffle service.
We want to address these issues in this shuffle manager.
- License
- Deployment
- Release
- Upgrade
- Service & Support
- Community
- Contributing
- Build
- Options
- Plugin Development
- Shuffle Performance Tool
By default, we support Spark 2.3.2_2.11 with Hadoop 2.7.
If you want to generate a build with a different Spark version, you need to modify
these version parameters in pom.xml
spark.version
hadoop.version
scala.version
Check the Build section for how to generate your customized jar.
- You need to include the Splash jar file in your spark default configuration
or task configuration. Make sure you choose the one that is aligned with your
Spark and Scala version. Typically, you only need to add two configurations
in your
spark-defaults.conf
spark.driver.extraClassPath /path/to/splash.jar
spark.executor.extraClassPath /path/to/splash.jar
- You can include the plugin jar in the same way.
- You can configure your Spark application to use the Splash shuffle manager by adding the following option:
spark.shuffle.manager org.apache.spark.shuffle.SplashShuffleManager
- The storage plugin is tunable at the application level. The user can specify different storage implementations for different applications.
- Support both on-premise and cloud deployments.
- Release numbering follows Semantic Versioning 2.0.0
- Releases are available in project's release page.
Although the basic functionality of the project has been verified, we still feel that the public API might be modified when more storage plugins are developed. Therefore:
- The public API may change until we reach version 1.0.0.
According to the definition of semantic versioning 2.0.0, we do not promise backward compatibility if the first digit in the version is changed.
- Please raise your question in the project's issue page and tag it with
question
. - Project documents are available in the
doc
folder.
You can communicate with us in following ways:
- Start a new thread in Github issues, recommended.
- Request to join the WeChat group through email and make sure you include your WeChat ID in the mail.
Please check the Contributing document for details.
-
Use
mvn install
to build the project. Optionally, you could use-DskipTests=true
to disable the unit tests.When the build process completes:
- A standard jar will be generated at:
./target/splash-<version>.jar
. This jar is what you need to deploy to your Spark environment. - A fat jar will be generated at:
./target/splash-<version>-shaded.jar
- You can find the unit test result in:
./target/surefire-reports
- You can find the coverage report in:
./target/site/jacoco
- A standard jar will be generated at:
-
Use
mvn clean
to clean the build output. -
Use
integration-test
ormvn failsafe:integration-test -DskipIT=false
to run the integration tests. Those tests should connect to the actual File System. You could also modify the test source code to test your own storage plugin.- Once the tests complete, the results are available in:
./target/failsafe-reports
- Once the tests complete, the results are available in:
-
Use
mvn pmd:pmd
to run static code analysis.- Analysis report is available in:
./target/site/pmd.html
- Analysis report is available in:
spark.shuffle.splash.storageFactory
specifies the class name of your factory. This class must implementStorageFactory
spark.shuffle.splash.clearShuffleOutput
is a boolean value telling the shuffle manager whether to clear the shuffle output when the shuffle stage completes.
Splash uses plugins to support different types of storage systems. The user can develop their own storage plugins for the shuffle manager. The user can use different types of storage system based on the usage of the file. For details, please check our design document.
The Splash project is currently released with a default plugin:
- the plugin for shared file systems like NFS is implemented by:
com.memverge.splash.shared.SharedFSFactory
This plugin serves as an example for developers to develop their own storage plugins.
Take NFS as an example, here are the steps to configure Splash with the shared folder plugin.
- Update the configurations in
spark-defaults.conf
:
# add the Splash jar to the classpath
spark.driver.extraClassPath /path/to/splash.jar
spark.executor.extraClassPath /path/to/splash.jar
# set shuffle manager and storage plugin
spark.shuffle.manager org.apache.spark.shuffle.SplashShuffleManager
spark.shuffle.splash.storageFactory com.memverge.splash.shared.SharedFSFactory
# set the location of your shared folder
spark.shuffle.splash.folder /your/share/folder
- Make sure that all your Spark nodes can access the shared folder you specified in the configuration file.
- Run some sample Spark applications and you should be able to observe that the application folder is created in a shared folder.
Use this tool to verify the performance of the storage plugin. Users could also use this tool to compare different storage plugin implementations or find the regressions of the storage plugin.
Note that this tool bases on the storage interface. It does not require a Spark environment.
It writes the shuffle output and read them with configured arguments. See the configuration details below:
-h
or--help
: display the usage-f
or--factory
: specify the name of the storage factory-i
or--shuffleId
: the test shuffle ID, default to 1-t
or--tasks
: the number of concurrent tasks, default to 5-m
or--mappers
: the number of mappers, default to 10-r
or--reducers
: the number of reducers, default to 10-d
or--data
: the number of data blocks, default to 1K-b
or--blockSize
: the block/buffer size of each data block, default to 256K-o
or--overwrite
: overwrite existing outputs
Sample command:
java -cp target/splash-shaded.jar com.memverge.splash.ShufflePerfTool
-d 64 -m 200 -r 200 -t 8 -o
Sample output
overwrite, removing existing shuffle for shuffleTest-1
==========================================
Writing 200 shuffle with 8 threads: 100% (200/200)
Write shuffle data completed in 7440 milliseconds
Reading index file: 0 ms
storage factory: com.memverge.splash.shared.SharedFSFactory
shuffle folder: \tmp\splash\shuffleTest-1\shuffle
number of mappers: 200
number of reducers: 200
total shuffle size: 3GB
bytes written: 3GB
bytes read: 0B
number of blocks: 64
blocks size: 256KB
partition size: 81KB
concurrent tasks: 8
bandwidth: 430MB/s
==========================================
Reading 40000 partitions with 8 threads 100% (40000/40000)
Read shuffle data completed in 35525 milliseconds
Reading index file: 15907 ms
storage factory: com.memverge.splash.shared.SharedFSFactory
shuffle folder: \tmp\splash\shuffleTest-1\shuffle
number of mappers: 200
number of reducers: 200
total shuffle size: 3GB
bytes written: 3GB
bytes read: 3GB
number of blocks: 64
blocks size: 256KB
partition size: 81KB
concurrent tasks: 8
bandwidth: 90MB/s