From 30d8ab2668484ceb6af9133a695ad3010ade36e5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pascal=20Spo=CC=88rri?= Date: Tue, 12 Sep 2023 12:05:29 +0200 Subject: [PATCH] Improve shuffle storage path for efficient lookup and delete. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Pascal SpoĢˆrri --- README.md | 2 +- .../shuffle/helper/S3ShuffleDispatcher.scala | 60 ++++++++++++++----- .../shuffle/helper/S3ShuffleHelper.scala | 23 ------- .../spark/shuffle/sort/S3ShuffleManager.scala | 25 +------- .../spark/storage/S3ShuffleReader.scala | 2 +- 5 files changed, 48 insertions(+), 64 deletions(-) diff --git a/README.md b/README.md index d7ff11f..2bef208 100644 --- a/README.md +++ b/README.md @@ -30,7 +30,7 @@ These configuration values need to be passed to Spark to load and configure the - `cos://zrlio-tmp.resources/` (Hadoop-Cloud + Stocator) Individual blocks are prefixed in order to get improved performance when accessing them on the remote filesystem. - The generated paths look like this: `${rootDir}/${mapId % 10}/${appDir}/ShuffleBlock{.data / .index}`. + The generated paths look like this: `${rootDir}/${mapId % 10}/${appId}/${shuffleId}/ShuffleBlock{.data / .index}`. The number of prefixes can be controlled with the option `spark.shuffle.s3.folderPrefixes`. diff --git a/src/main/scala/org/apache/spark/shuffle/helper/S3ShuffleDispatcher.scala b/src/main/scala/org/apache/spark/shuffle/helper/S3ShuffleDispatcher.scala index 1a608b6..ccd4738 100644 --- a/src/main/scala/org/apache/spark/shuffle/helper/S3ShuffleDispatcher.scala +++ b/src/main/scala/org/apache/spark/shuffle/helper/S3ShuffleDispatcher.scala @@ -48,7 +48,6 @@ class S3ShuffleDispatcher extends Logging { val checksumAlgorithm: String = SparkEnv.get.conf.get(config.SHUFFLE_CHECKSUM_ALGORITHM) val checksumEnabled: Boolean = SparkEnv.get.conf.get(config.SHUFFLE_CHECKSUM_ENABLED) - val appDir = f"/${startTime}-${appId}/" val fs: FileSystem = FileSystem.get(URI.create(rootDir), { SparkHadoopUtil.newConfiguration(conf) }) @@ -56,7 +55,7 @@ class S3ShuffleDispatcher extends Logging { val canSetReadahead = fs.hasPathCapability(new Path(rootDir), StreamCapabilities.READAHEAD) // Required - logInfo(s"- spark.shuffle.s3.rootDir=${rootDir} (app dir: ${appDir} - can set readahead: ${canSetReadahead})") + logInfo(s"- spark.shuffle.s3.rootDir=${rootDir} (appId: ${appId})") // Optional logInfo(s"- spark.shuffle.s3.bufferSize=${bufferSize}") @@ -79,7 +78,7 @@ class S3ShuffleDispatcher extends Logging { def removeRoot(): Boolean = { Range(0, folderPrefixes).map(idx => { Future { - val prefix = f"${rootDir}/${idx}${appDir}" + val prefix = f"${rootDir}/${idx}/${appId}" try { fs.delete(new Path(prefix), true) } catch { @@ -91,18 +90,49 @@ class S3ShuffleDispatcher extends Logging { } def getPath(blockId: BlockId): Path = { - val idx = (blockId match { - case ShuffleBlockId(_, mapId, _) => - mapId - case ShuffleDataBlockId(_, mapId, _) => - mapId - case ShuffleIndexBlockId(_, mapId, _) => - mapId - case ShuffleChecksumBlockId(_, mapId, _) => - mapId - case _ => 0 - }) % folderPrefixes - new Path(f"${rootDir}/${idx}${appDir}/${blockId.name}") + val (shuffleId, mapId) = blockId match { + case ShuffleBlockId(shuffleId, mapId, _) => + (shuffleId, mapId) + case ShuffleDataBlockId(shuffleId, mapId, _) => + (shuffleId, mapId) + case ShuffleIndexBlockId(shuffleId, mapId, _) => + (shuffleId, mapId) + case ShuffleChecksumBlockId(shuffleId, mapId, _) => + (shuffleId, mapId) + case _ => (0, 0.toLong) + } + val idx = mapId % folderPrefixes + new Path(f"${rootDir}/${idx}/${appId}/${shuffleId}/${blockId.name}") + } + + def listShuffleIndices(shuffleId: Int): Array[ShuffleIndexBlockId] = { + val shuffleIndexFilter: PathFilter = new PathFilter() { + override def accept(path: Path): Boolean = { + val name = path.getName + name.endsWith(".index") + } + } + Range(0, folderPrefixes).map(idx => { + Future { + val path = new Path(f"${rootDir}/${idx}/${appId}/${shuffleId}/") + try { + fs.listStatus(path, shuffleIndexFilter).map(v => { + BlockId.apply(v.getPath.getName).asInstanceOf[ShuffleIndexBlockId] + }) + } catch { + case _: IOException => Array.empty[ShuffleIndexBlockId] + } + } + }).flatMap(Await.result(_, Duration.Inf)).toArray + } + + def removeShuffle(shuffleId: Int): Unit = { + Range(0, folderPrefixes).map(idx => { + val path = new Path(f"${rootDir}/${idx}/${appId}/${shuffleId}/") + Future { + fs.delete(path, true) + } + }).foreach(Await.result(_, Duration.Inf)) } /** diff --git a/src/main/scala/org/apache/spark/shuffle/helper/S3ShuffleHelper.scala b/src/main/scala/org/apache/spark/shuffle/helper/S3ShuffleHelper.scala index 9b2f2fc..0acea60 100644 --- a/src/main/scala/org/apache/spark/shuffle/helper/S3ShuffleHelper.scala +++ b/src/main/scala/org/apache/spark/shuffle/helper/S3ShuffleHelper.scala @@ -62,29 +62,6 @@ object S3ShuffleHelper extends Logging { file.close() } - def listShuffleIndices(shuffleId: Int): Array[ShuffleIndexBlockId] = { - val shuffleIndexFilter: PathFilter = new PathFilter() { - private val prefix = f"shuffle_${shuffleId}_" - - override def accept(path: Path): Boolean = { - val name = path.getName - name.startsWith(prefix) && name.endsWith("_0.index") - } - } - Range(0, dispatcher.folderPrefixes).map(idx => { - Future { - val path = new Path(f"${dispatcher.rootDir}/${idx}${dispatcher.appDir}") - try { - dispatcher.fs.listStatus(path, shuffleIndexFilter).map(v => { - BlockId.apply(v.getPath.getName).asInstanceOf[ShuffleIndexBlockId] - }) - } catch { - case _: IOException => Array.empty[ShuffleIndexBlockId] - } - } - }).flatMap(Await.result(_, Duration.Inf)).toArray - } - /** * Get the cached partition length for shuffle index at shuffleId and mapId * diff --git a/src/main/scala/org/apache/spark/shuffle/sort/S3ShuffleManager.scala b/src/main/scala/org/apache/spark/shuffle/sort/S3ShuffleManager.scala index 8dc1794..186edf3 100644 --- a/src/main/scala/org/apache/spark/shuffle/sort/S3ShuffleManager.scala +++ b/src/main/scala/org/apache/spark/shuffle/sort/S3ShuffleManager.scala @@ -150,30 +150,7 @@ private[spark] class S3ShuffleManager(conf: SparkConf) extends ShuffleManager wi // Remove shuffle files from S3. if (dispatcher.cleanupShuffleFiles) { // Delete all - val shuffleIdFilter: PathFilter = new PathFilter() { - private val prefix = f"shuffle_${shuffleId}_" - - override def accept(path: Path): Boolean = { - val name = path.getName - name.startsWith(prefix) - } - } - - Range(0, dispatcher.folderPrefixes).flatMap(idx => { - val path = new Path(f"${dispatcher.rootDir}/${idx}${dispatcher.appDir}") - try { - dispatcher.fs.listStatus(path, shuffleIdFilter).map(f => { - Future { - dispatcher.fs.delete(f.getPath, false) - } - }) - } catch { - case _: IOException => { - logDebug(s"Unable to delete ${path.getName}") - List() - } - } - }).foreach(Await.result(_, Duration.Inf)) + dispatcher.removeShuffle(shuffleId) } true } diff --git a/src/main/scala/org/apache/spark/storage/S3ShuffleReader.scala b/src/main/scala/org/apache/spark/storage/S3ShuffleReader.scala index 6acc525..ef2e798 100644 --- a/src/main/scala/org/apache/spark/storage/S3ShuffleReader.scala +++ b/src/main/scala/org/apache/spark/storage/S3ShuffleReader.scala @@ -164,7 +164,7 @@ class S3ShuffleReader[K, C]( .flatMap(info => ShuffleBlockFetcherIterator.mergeContinuousShuffleBlockIdsIfNeeded(info, doBatchFetch)) .map(_.blockId) } else { - val indices = S3ShuffleHelper.listShuffleIndices(shuffleId).filter( + val indices = dispatcher.listShuffleIndices(shuffleId).filter( block => block.mapId >= startMapIndex && block.mapId < endMapIndex) if (doBatchFetch || dispatcher.forceBatchFetch) { indices.map(block => ShuffleBlockBatchId(block.shuffleId, block.mapId, startPartition, endPartition)).toIterator