Skip to content

Commit

Permalink
Improve shuffle storage path for efficient lookup and delete.
Browse files Browse the repository at this point in the history
Signed-off-by: Pascal Spörri <[email protected]>
  • Loading branch information
pspoerri committed Sep 12, 2023
1 parent 227d65f commit 30d8ab2
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 64 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ These configuration values need to be passed to Spark to load and configure the
- `cos://zrlio-tmp.resources/` (Hadoop-Cloud + Stocator)

Individual blocks are prefixed in order to get improved performance when accessing them on the remote filesystem.
The generated paths look like this: `${rootDir}/${mapId % 10}/${appDir}/ShuffleBlock{.data / .index}`.
The generated paths look like this: `${rootDir}/${mapId % 10}/${appId}/${shuffleId}/ShuffleBlock{.data / .index}`.

The number of prefixes can be controlled with the option `spark.shuffle.s3.folderPrefixes`.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,14 @@ class S3ShuffleDispatcher extends Logging {
val checksumAlgorithm: String = SparkEnv.get.conf.get(config.SHUFFLE_CHECKSUM_ALGORITHM)
val checksumEnabled: Boolean = SparkEnv.get.conf.get(config.SHUFFLE_CHECKSUM_ENABLED)

val appDir = f"/${startTime}-${appId}/"
val fs: FileSystem = FileSystem.get(URI.create(rootDir), {
SparkHadoopUtil.newConfiguration(conf)
})

val canSetReadahead = fs.hasPathCapability(new Path(rootDir), StreamCapabilities.READAHEAD)

// Required
logInfo(s"- spark.shuffle.s3.rootDir=${rootDir} (app dir: ${appDir} - can set readahead: ${canSetReadahead})")
logInfo(s"- spark.shuffle.s3.rootDir=${rootDir} (appId: ${appId})")

// Optional
logInfo(s"- spark.shuffle.s3.bufferSize=${bufferSize}")
Expand All @@ -79,7 +78,7 @@ class S3ShuffleDispatcher extends Logging {
def removeRoot(): Boolean = {
Range(0, folderPrefixes).map(idx => {
Future {
val prefix = f"${rootDir}/${idx}${appDir}"
val prefix = f"${rootDir}/${idx}/${appId}"
try {
fs.delete(new Path(prefix), true)
} catch {
Expand All @@ -91,18 +90,49 @@ class S3ShuffleDispatcher extends Logging {
}

def getPath(blockId: BlockId): Path = {
val idx = (blockId match {
case ShuffleBlockId(_, mapId, _) =>
mapId
case ShuffleDataBlockId(_, mapId, _) =>
mapId
case ShuffleIndexBlockId(_, mapId, _) =>
mapId
case ShuffleChecksumBlockId(_, mapId, _) =>
mapId
case _ => 0
}) % folderPrefixes
new Path(f"${rootDir}/${idx}${appDir}/${blockId.name}")
val (shuffleId, mapId) = blockId match {
case ShuffleBlockId(shuffleId, mapId, _) =>
(shuffleId, mapId)
case ShuffleDataBlockId(shuffleId, mapId, _) =>
(shuffleId, mapId)
case ShuffleIndexBlockId(shuffleId, mapId, _) =>
(shuffleId, mapId)
case ShuffleChecksumBlockId(shuffleId, mapId, _) =>
(shuffleId, mapId)
case _ => (0, 0.toLong)
}
val idx = mapId % folderPrefixes
new Path(f"${rootDir}/${idx}/${appId}/${shuffleId}/${blockId.name}")
}

def listShuffleIndices(shuffleId: Int): Array[ShuffleIndexBlockId] = {
val shuffleIndexFilter: PathFilter = new PathFilter() {
override def accept(path: Path): Boolean = {
val name = path.getName
name.endsWith(".index")
}
}
Range(0, folderPrefixes).map(idx => {
Future {
val path = new Path(f"${rootDir}/${idx}/${appId}/${shuffleId}/")
try {
fs.listStatus(path, shuffleIndexFilter).map(v => {
BlockId.apply(v.getPath.getName).asInstanceOf[ShuffleIndexBlockId]
})
} catch {
case _: IOException => Array.empty[ShuffleIndexBlockId]
}
}
}).flatMap(Await.result(_, Duration.Inf)).toArray
}

def removeShuffle(shuffleId: Int): Unit = {
Range(0, folderPrefixes).map(idx => {
val path = new Path(f"${rootDir}/${idx}/${appId}/${shuffleId}/")
Future {
fs.delete(path, true)
}
}).foreach(Await.result(_, Duration.Inf))
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,29 +62,6 @@ object S3ShuffleHelper extends Logging {
file.close()
}

def listShuffleIndices(shuffleId: Int): Array[ShuffleIndexBlockId] = {
val shuffleIndexFilter: PathFilter = new PathFilter() {
private val prefix = f"shuffle_${shuffleId}_"

override def accept(path: Path): Boolean = {
val name = path.getName
name.startsWith(prefix) && name.endsWith("_0.index")
}
}
Range(0, dispatcher.folderPrefixes).map(idx => {
Future {
val path = new Path(f"${dispatcher.rootDir}/${idx}${dispatcher.appDir}")
try {
dispatcher.fs.listStatus(path, shuffleIndexFilter).map(v => {
BlockId.apply(v.getPath.getName).asInstanceOf[ShuffleIndexBlockId]
})
} catch {
case _: IOException => Array.empty[ShuffleIndexBlockId]
}
}
}).flatMap(Await.result(_, Duration.Inf)).toArray
}

/**
* Get the cached partition length for shuffle index at shuffleId and mapId
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,30 +150,7 @@ private[spark] class S3ShuffleManager(conf: SparkConf) extends ShuffleManager wi
// Remove shuffle files from S3.
if (dispatcher.cleanupShuffleFiles) {
// Delete all
val shuffleIdFilter: PathFilter = new PathFilter() {
private val prefix = f"shuffle_${shuffleId}_"

override def accept(path: Path): Boolean = {
val name = path.getName
name.startsWith(prefix)
}
}

Range(0, dispatcher.folderPrefixes).flatMap(idx => {
val path = new Path(f"${dispatcher.rootDir}/${idx}${dispatcher.appDir}")
try {
dispatcher.fs.listStatus(path, shuffleIdFilter).map(f => {
Future {
dispatcher.fs.delete(f.getPath, false)
}
})
} catch {
case _: IOException => {
logDebug(s"Unable to delete ${path.getName}")
List()
}
}
}).foreach(Await.result(_, Duration.Inf))
dispatcher.removeShuffle(shuffleId)
}
true
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ class S3ShuffleReader[K, C](
.flatMap(info => ShuffleBlockFetcherIterator.mergeContinuousShuffleBlockIdsIfNeeded(info, doBatchFetch))
.map(_.blockId)
} else {
val indices = S3ShuffleHelper.listShuffleIndices(shuffleId).filter(
val indices = dispatcher.listShuffleIndices(shuffleId).filter(
block => block.mapId >= startMapIndex && block.mapId < endMapIndex)
if (doBatchFetch || dispatcher.forceBatchFetch) {
indices.map(block => ShuffleBlockBatchId(block.shuffleId, block.mapId, startPartition, endPartition)).toIterator
Expand Down

0 comments on commit 30d8ab2

Please sign in to comment.