Skip to content

Commit

Permalink
[ML-379] Set ZE_AFFINITY_MASK for each executor (#383)
Browse files Browse the repository at this point in the history
* update spark to 3.3.3

Signed-off-by: minmingzhu <[email protected]>

* remove oneccl communicator

* update

* update

* update code style and unit test

* set ZE_AFFINITY_MASK=rankId

* update

* update

* update

---------

Signed-off-by: minmingzhu <[email protected]>
  • Loading branch information
minmingzhu authored Sep 10, 2024
1 parent e307341 commit e6554f1
Show file tree
Hide file tree
Showing 8 changed files with 79 additions and 9 deletions.
12 changes: 3 additions & 9 deletions mllib-dal/src/main/scala/com/intel/oap/mllib/OneCCL.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@ object OneCCL extends Logging {

def init(executor_num: Int, rank: Int, ip_port: String): Unit = {

setExecutorEnv()

logInfo(s"Initializing with IP_PORT: ${ip_port}")

// cclParam is output from native code
Expand All @@ -41,13 +39,9 @@ object OneCCL extends Logging {
s"commSize, ${cclParam.getCommSize}, rankId: ${cclParam.getRankId}")
}

// Run on Executor
def setExecutorEnv(): Unit = {
setEnv("CCL_ATL_TRANSPORT", "ofi")
// Set CCL_ROOT to workaround CCL_ROOT env read bug, should remove when upstream fix this
setEnv("CCL_ROOT", "/opt/intel/oneapi/ccl/latest")
// Uncomment this if you whant to debug oneCCL
// setEnv("CCL_LOG_LEVEL", "debug")
// Sets the specified value to allow each executor to run on the specified GPU
def setAffinityMask(rankId: String): Unit = {
setEnv("ZE_AFFINITY_MASK", rankId)
}

// Run on Executor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,17 @@ class RandomForestClassifierDALImpl(val uid: String,
rfcTimer.record("Data Convertion")
val kvsIPPort = getOneCCLIPPort(labeledPointsTables)

labeledPointsTables.mapPartitionsWithIndex { (rank, iter) =>
val gpuIndices = if (useDevice == "GPU") {
val resources = TaskContext.get().resources()
resources("gpu").addresses.map(_.toInt)
} else {
null
}
OneCCL.setAffinityMask(gpuIndices(0).toString())
Iterator.empty
}.count()

labeledPointsTables.mapPartitionsWithIndex { (rank, table) =>
OneCCL.init(executorNum, rank, kvsIPPort)
Iterator.empty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,16 @@ class KMeansDALImpl(var nClusters: Int,
kmeansTimer.record("Data Convertion")

val kvsIPPort = getOneCCLIPPort(coalescedTables)
coalescedTables.mapPartitionsWithIndex { (rank, iter) =>
val gpuIndices = if (useDevice == "GPU") {
val resources = TaskContext.get().resources()
resources("gpu").addresses.map(_.toInt)
} else {
null
}
OneCCL.setAffinityMask(gpuIndices(0).toString())
Iterator.empty
}.count()

coalescedTables.mapPartitionsWithIndex { (rank, table) =>
OneCCL.init(executorNum, rank, kvsIPPort)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,17 @@ class PCADALImpl(val k: Int,
val kvsIPPort = getOneCCLIPPort(coalescedTables)
pcaTimer.record("Data Convertion")

coalescedTables.mapPartitionsWithIndex { (rank, iter) =>
val gpuIndices = if (useDevice == "GPU") {
val resources = TaskContext.get().resources()
resources("gpu").addresses.map(_.toInt)
} else {
null
}
OneCCL.setAffinityMask(gpuIndices(0).toString())
Iterator.empty
}.count()

coalescedTables.mapPartitionsWithIndex { (rank, table) =>
OneCCL.init(executorNum, rank, kvsIPPort)
Iterator.empty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,17 @@ class LinearRegressionDALImpl( val fitIntercept: Boolean,
}
lrTimer.record("Data Convertion")

labeledPointsTables.mapPartitionsWithIndex { (rank, iter) =>
val gpuIndices = if (useDevice == "GPU") {
val resources = TaskContext.get().resources()
resources("gpu").addresses.map(_.toInt)
} else {
null
}
OneCCL.setAffinityMask(gpuIndices(0).toString())
Iterator.empty
}.count()

val results = labeledPointsTables.mapPartitionsWithIndex { (rank, tables) =>
val (feature, label) = tables.next()
val (featureTabAddr : Long, featureRows : Long, featureColumns : Long) =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,17 @@ class RandomForestRegressorDALImpl(val uid: String,

val kvsIPPort = getOneCCLIPPort(labeledPointsTables)

labeledPointsTables.mapPartitionsWithIndex { (rank, iter) =>
val gpuIndices = if (useDevice == "GPU") {
val resources = TaskContext.get().resources()
resources("gpu").addresses.map(_.toInt)
} else {
null
}
OneCCL.setAffinityMask(gpuIndices(0).toString())
Iterator.empty
}.count()

labeledPointsTables.mapPartitionsWithIndex { (rank, table) =>
OneCCL.init(executorNum, rank, kvsIPPort)
Iterator.empty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,17 @@ class CorrelationDALImpl(
}.count()
corTimer.record("OneCCL Init")

coalescedTables.mapPartitionsWithIndex { (rank, iter) =>
val gpuIndices = if (useDevice == "GPU") {
val resources = TaskContext.get().resources()
resources("gpu").addresses.map(_.toInt)
} else {
null
}
OneCCL.setAffinityMask(gpuIndices(0).toString())
Iterator.empty
}.count()

val results = coalescedTables.mapPartitionsWithIndex { (rank, iter) =>
val (tableArr : Long, rows : Long, columns : Long) = if (useDevice == "GPU") {
iter.next()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,17 @@ class SummarizerDALImpl(val executorNum: Int,
}.count()
sumTimer.record("OneCCL Init")

coalescedTables.mapPartitionsWithIndex { (rank, iter) =>
val gpuIndices = if (useDevice == "GPU") {
val resources = TaskContext.get().resources()
resources("gpu").addresses.map(_.toInt)
} else {
null
}
OneCCL.setAffinityMask(gpuIndices(0).toString())
Iterator.empty
}.count()

val results = coalescedTables.mapPartitionsWithIndex { (rank, iter) =>
val (tableArr : Long, rows : Long, columns : Long) = if (useDevice == "GPU") {
iter.next()
Expand Down

0 comments on commit e6554f1

Please sign in to comment.