From ed2404e0b2da861083ee8100a7992c4996839603 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pascal=20Spo=CC=88rri?= Date: Mon, 21 Aug 2023 16:35:23 +0200 Subject: [PATCH] Fix performance regression and rename maxBufferSize to bufferInputSize. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Pascal SpoĢˆrri --- README.md | 2 +- .../spark/shuffle/S3SingleSpillShuffleMapOutputWriter.scala | 3 +-- .../org/apache/spark/shuffle/helper/S3ShuffleDispatcher.scala | 4 ++-- src/main/scala/org/apache/spark/storage/S3ShuffleReader.scala | 3 ++- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/README.md b/README.md index 27366a3..d759376 100644 --- a/README.md +++ b/README.md @@ -40,7 +40,7 @@ Changing these values might have an impact on performance. - `spark.shuffle.s3.bufferSize`: Default size of the buffered output streams (default: `32768`, uses `spark.shuffle.file.buffer` as default) -- `spark.shuffle.s3.maxBufferSize`: Maximum size of buffered input streams (default: `209715200`, +- `spark.shuffle.s3.bufferInputSize`: Maximum size of buffered input streams (default: `209715200`, uses `spark.network.maxRemoteBlockSizeFetchToMem` as default) - `spark.shuffle.s3.cachePartitionLengths`: Cache partition lengths in memory (default: `true`) - `spark.shuffle.s3.cacheChecksums`: Cache checksums in memory (default: `true`) diff --git a/src/main/scala/org/apache/spark/shuffle/S3SingleSpillShuffleMapOutputWriter.scala b/src/main/scala/org/apache/spark/shuffle/S3SingleSpillShuffleMapOutputWriter.scala index 7778740..f009526 100644 --- a/src/main/scala/org/apache/spark/shuffle/S3SingleSpillShuffleMapOutputWriter.scala +++ b/src/main/scala/org/apache/spark/shuffle/S3SingleSpillShuffleMapOutputWriter.scala @@ -23,10 +23,9 @@ class S3SingleSpillShuffleMapOutputWriter(shuffleId: Int, mapId: Long) extends S ): Unit = { val in = new FileInputStream(mapSpillFile) val out = dispatcher.createBlock(ShuffleDataBlockId(shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID)) - val bufferedOutputStream = new BufferedOutputStream(out, dispatcher.bufferSize) // Note: HDFS does not exposed a nio-buffered write interface. - Utils.copyStream(in, bufferedOutputStream, closeStreams = true) + Utils.copyStream(in, out, closeStreams = true) if (dispatcher.checksumEnabled) { S3ShuffleHelper.writeChecksum(shuffleId, mapId, checksums) diff --git a/src/main/scala/org/apache/spark/shuffle/helper/S3ShuffleDispatcher.scala b/src/main/scala/org/apache/spark/shuffle/helper/S3ShuffleDispatcher.scala index 0f48a32..efc4b62 100644 --- a/src/main/scala/org/apache/spark/shuffle/helper/S3ShuffleDispatcher.scala +++ b/src/main/scala/org/apache/spark/shuffle/helper/S3ShuffleDispatcher.scala @@ -35,7 +35,7 @@ class S3ShuffleDispatcher extends Logging { // Optional val bufferSize: Int = conf.getInt("spark.shuffle.s3.bufferSize", defaultValue = conf.get(SHUFFLE_FILE_BUFFER_SIZE).toInt * 1024) - val maxBufferSize: Int = conf.getInt("spark.shuffle.s3.maxBufferSize", defaultValue = conf.get(MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM).toInt) + val bufferInputSize: Int = conf.getInt("spark.shuffle.s3.bufferInputSize", defaultValue = conf.get(MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM).toInt) val cachePartitionLengths: Boolean = conf.getBoolean("spark.shuffle.s3.cachePartitionLengths", defaultValue = true) val cacheChecksums: Boolean = conf.getBoolean("spark.shuffle.s3.cacheChecksums", defaultValue = true) val cleanupShuffleFiles: Boolean = conf.getBoolean("spark.shuffle.s3.cleanup", defaultValue = true) @@ -63,7 +63,7 @@ class S3ShuffleDispatcher extends Logging { // Optional logInfo(s"- spark.shuffle.s3.bufferSize=${bufferSize}") - logInfo(s"- spark.shuffle.s3.maxBufferSize=${maxBufferSize}") + logInfo(s"- spark.shuffle.s3.bufferInputSize=${bufferInputSize}") logInfo(s"- spark.shuffle.s3.cachePartitionLengths=${cachePartitionLengths}") logInfo(s"- spark.shuffle.s3.cacheChecksums=${cacheChecksums}") logInfo(s"- spark.shuffle.s3.cleanup=${cleanupShuffleFiles}") diff --git a/src/main/scala/org/apache/spark/storage/S3ShuffleReader.scala b/src/main/scala/org/apache/spark/storage/S3ShuffleReader.scala index 91f3d37..684d6c8 100644 --- a/src/main/scala/org/apache/spark/storage/S3ShuffleReader.scala +++ b/src/main/scala/org/apache/spark/storage/S3ShuffleReader.scala @@ -55,6 +55,7 @@ class S3ShuffleReader[K, C]( private val dispatcher = S3ShuffleDispatcher.get private val dep = handle.dependency + private val bufferInputSize = dispatcher.bufferInputSize private val fetchContinousBlocksInBatch: Boolean = { val serializerRelocatable = dep.serializer.supportsRelocationOfSerializedObjects @@ -105,7 +106,7 @@ class S3ShuffleReader[K, C]( // NextIterator. The NextIterator makes sure that close() is called on the // underlying InputStream when all records have been read. Future { - val bufferSize = scala.math.min(wrappedStream.maxBytes, dispatcher.maxBufferSize).toInt + val bufferSize = scala.math.min(wrappedStream.maxBytes, bufferInputSize).toInt val stream = new BufferedInputStream(wrappedStream, bufferSize) // Fill the buffered input stream by reading and then resetting the stream.