Skip to content

Commit

Permalink
Optimize buffers when reading/writing checksum and index files.
Browse files Browse the repository at this point in the history
Signed-off-by: Pascal Spörri <[email protected]>
  • Loading branch information
pspoerri committed Sep 7, 2023
1 parent 11101fd commit 18191d1
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ class S3ShuffleDispatcher extends Logging {

private val cachedFileStatus = new ConcurrentObjectMap[BlockId, FileStatus]()

private def getFileStatusCached(blockId: BlockId): FileStatus = {
def getFileStatusCached(blockId: BlockId): FileStatus = {
cachedFileStatus.getOrElsePut(blockId, (value: BlockId) => {
fs.getFileStatus(getPath(value))
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ object S3ShuffleHelper extends Logging {
def writeArrayAsBlock(blockId: BlockId, array: Array[Long]): Unit = {
val serializerInstance = serializer.newInstance()
val buffer = serializerInstance.serialize[Array[Long]](array)
val file = new BufferedOutputStream(dispatcher.createBlock(blockId), dispatcher.bufferSize)
val file = dispatcher.createBlock(blockId)
file.write(buffer.array(), buffer.arrayOffset(), buffer.limit())
file.flush()
file.close()
Expand Down Expand Up @@ -132,11 +132,13 @@ object S3ShuffleHelper extends Logging {
}

private def readBlockAsArray(blockId: BlockId) = {
val file = new BufferedInputStream(dispatcher.openBlock(blockId), dispatcher.bufferSize)
var buffer = new Array[Byte](1024)
val stat = dispatcher.getFileStatusCached(blockId)
val fsize = scala.math.min(stat.getLen.toInt, dispatcher.bufferSize)
val file = new BufferedInputStream(dispatcher.openBlock(blockId), fsize)
var buffer = new Array[Byte](fsize)
var numBytes = 0
var done = false
do {
while (!done) {
val c = file.read(buffer, numBytes, buffer.length - numBytes)
if (c >= 0) {
numBytes += c
Expand All @@ -146,7 +148,7 @@ object S3ShuffleHelper extends Logging {
} else {
done = true
}
} while (!done)
}
val serializerInstance = serializer.newInstance()
try {
val result = serializerInstance.deserialize[Array[Long]](ByteBuffer.wrap(buffer, 0, numBytes))
Expand Down

0 comments on commit 18191d1

Please sign in to comment.