Skip to content

Commit

Permalink
Fix performance regression and rename maxBufferSize to bufferInputSize.
Browse files Browse the repository at this point in the history
Signed-off-by: Pascal Spörri <[email protected]>
  • Loading branch information
pspoerri committed Aug 21, 2023
1 parent 6bc39ce commit ed2404e
Show file tree
Hide file tree
Showing 4 changed files with 6 additions and 6 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ Changing these values might have an impact on performance.

- `spark.shuffle.s3.bufferSize`: Default size of the buffered output streams (default: `32768`,
uses `spark.shuffle.file.buffer` as default)
- `spark.shuffle.s3.maxBufferSize`: Maximum size of buffered input streams (default: `209715200`,
- `spark.shuffle.s3.bufferInputSize`: Maximum size of buffered input streams (default: `209715200`,
uses `spark.network.maxRemoteBlockSizeFetchToMem` as default)
- `spark.shuffle.s3.cachePartitionLengths`: Cache partition lengths in memory (default: `true`)
- `spark.shuffle.s3.cacheChecksums`: Cache checksums in memory (default: `true`)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,9 @@ class S3SingleSpillShuffleMapOutputWriter(shuffleId: Int, mapId: Long) extends S
): Unit = {
val in = new FileInputStream(mapSpillFile)
val out = dispatcher.createBlock(ShuffleDataBlockId(shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID))
val bufferedOutputStream = new BufferedOutputStream(out, dispatcher.bufferSize)

// Note: HDFS does not exposed a nio-buffered write interface.
Utils.copyStream(in, bufferedOutputStream, closeStreams = true)
Utils.copyStream(in, out, closeStreams = true)

if (dispatcher.checksumEnabled) {
S3ShuffleHelper.writeChecksum(shuffleId, mapId, checksums)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class S3ShuffleDispatcher extends Logging {

// Optional
val bufferSize: Int = conf.getInt("spark.shuffle.s3.bufferSize", defaultValue = conf.get(SHUFFLE_FILE_BUFFER_SIZE).toInt * 1024)
val maxBufferSize: Int = conf.getInt("spark.shuffle.s3.maxBufferSize", defaultValue = conf.get(MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM).toInt)
val bufferInputSize: Int = conf.getInt("spark.shuffle.s3.bufferInputSize", defaultValue = conf.get(MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM).toInt)
val cachePartitionLengths: Boolean = conf.getBoolean("spark.shuffle.s3.cachePartitionLengths", defaultValue = true)
val cacheChecksums: Boolean = conf.getBoolean("spark.shuffle.s3.cacheChecksums", defaultValue = true)
val cleanupShuffleFiles: Boolean = conf.getBoolean("spark.shuffle.s3.cleanup", defaultValue = true)
Expand Down Expand Up @@ -63,7 +63,7 @@ class S3ShuffleDispatcher extends Logging {

// Optional
logInfo(s"- spark.shuffle.s3.bufferSize=${bufferSize}")
logInfo(s"- spark.shuffle.s3.maxBufferSize=${maxBufferSize}")
logInfo(s"- spark.shuffle.s3.bufferInputSize=${bufferInputSize}")
logInfo(s"- spark.shuffle.s3.cachePartitionLengths=${cachePartitionLengths}")
logInfo(s"- spark.shuffle.s3.cacheChecksums=${cacheChecksums}")
logInfo(s"- spark.shuffle.s3.cleanup=${cleanupShuffleFiles}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ class S3ShuffleReader[K, C](

private val dispatcher = S3ShuffleDispatcher.get
private val dep = handle.dependency
private val bufferInputSize = dispatcher.bufferInputSize

private val fetchContinousBlocksInBatch: Boolean = {
val serializerRelocatable = dep.serializer.supportsRelocationOfSerializedObjects
Expand Down Expand Up @@ -105,7 +106,7 @@ class S3ShuffleReader[K, C](
// NextIterator. The NextIterator makes sure that close() is called on the
// underlying InputStream when all records have been read.
Future {
val bufferSize = scala.math.min(wrappedStream.maxBytes, dispatcher.maxBufferSize).toInt
val bufferSize = scala.math.min(wrappedStream.maxBytes, bufferInputSize).toInt
val stream = new BufferedInputStream(wrappedStream, bufferSize)

// Fill the buffered input stream by reading and then resetting the stream.
Expand Down

0 comments on commit ed2404e

Please sign in to comment.