From e6554f11d2c584a7210b503e3448dd31e9c0e67e Mon Sep 17 00:00:00 2001 From: minmingzhu <45281494+minmingzhu@users.noreply.github.com> Date: Tue, 10 Sep 2024 14:16:20 +0800 Subject: [PATCH] [ML-379] Set ZE_AFFINITY_MASK for each executor (#383) * update spark to 3.3.3 Signed-off-by: minmingzhu * remove oneccl communicator * update * update * update code style and unit test * set ZE_AFFINITY_MASK=rankId * update * update * update --------- Signed-off-by: minmingzhu --- .../src/main/scala/com/intel/oap/mllib/OneCCL.scala | 12 +++--------- .../RandomForestClassifierDALImpl.scala | 11 +++++++++++ .../intel/oap/mllib/clustering/KMeansDALImpl.scala | 10 ++++++++++ .../com/intel/oap/mllib/feature/PCADALImpl.scala | 11 +++++++++++ .../mllib/regression/LinearRegressionDALImpl.scala | 11 +++++++++++ .../regression/RandomForestRegressorDALImpl.scala | 11 +++++++++++ .../intel/oap/mllib/stat/CorrelationDALImpl.scala | 11 +++++++++++ .../com/intel/oap/mllib/stat/SummarizerDALImpl.scala | 11 +++++++++++ 8 files changed, 79 insertions(+), 9 deletions(-) diff --git a/mllib-dal/src/main/scala/com/intel/oap/mllib/OneCCL.scala b/mllib-dal/src/main/scala/com/intel/oap/mllib/OneCCL.scala index 48caebe1b..87289d559 100644 --- a/mllib-dal/src/main/scala/com/intel/oap/mllib/OneCCL.scala +++ b/mllib-dal/src/main/scala/com/intel/oap/mllib/OneCCL.scala @@ -26,8 +26,6 @@ object OneCCL extends Logging { def init(executor_num: Int, rank: Int, ip_port: String): Unit = { - setExecutorEnv() - logInfo(s"Initializing with IP_PORT: ${ip_port}") // cclParam is output from native code @@ -41,13 +39,9 @@ object OneCCL extends Logging { s"commSize, ${cclParam.getCommSize}, rankId: ${cclParam.getRankId}") } - // Run on Executor - def setExecutorEnv(): Unit = { - setEnv("CCL_ATL_TRANSPORT", "ofi") - // Set CCL_ROOT to workaround CCL_ROOT env read bug, should remove when upstream fix this - setEnv("CCL_ROOT", "/opt/intel/oneapi/ccl/latest") - // Uncomment this if you whant to debug oneCCL - // setEnv("CCL_LOG_LEVEL", "debug") + // Sets the specified value to allow each executor to run on the specified GPU + def setAffinityMask(rankId: String): Unit = { + setEnv("ZE_AFFINITY_MASK", rankId) } // Run on Executor diff --git a/mllib-dal/src/main/scala/com/intel/oap/mllib/classification/RandomForestClassifierDALImpl.scala b/mllib-dal/src/main/scala/com/intel/oap/mllib/classification/RandomForestClassifierDALImpl.scala index 6aca49f14..da0612b0e 100644 --- a/mllib-dal/src/main/scala/com/intel/oap/mllib/classification/RandomForestClassifierDALImpl.scala +++ b/mllib-dal/src/main/scala/com/intel/oap/mllib/classification/RandomForestClassifierDALImpl.scala @@ -75,6 +75,17 @@ class RandomForestClassifierDALImpl(val uid: String, rfcTimer.record("Data Convertion") val kvsIPPort = getOneCCLIPPort(labeledPointsTables) + labeledPointsTables.mapPartitionsWithIndex { (rank, iter) => + val gpuIndices = if (useDevice == "GPU") { + val resources = TaskContext.get().resources() + resources("gpu").addresses.map(_.toInt) + } else { + null + } + OneCCL.setAffinityMask(gpuIndices(0).toString()) + Iterator.empty + }.count() + labeledPointsTables.mapPartitionsWithIndex { (rank, table) => OneCCL.init(executorNum, rank, kvsIPPort) Iterator.empty diff --git a/mllib-dal/src/main/scala/com/intel/oap/mllib/clustering/KMeansDALImpl.scala b/mllib-dal/src/main/scala/com/intel/oap/mllib/clustering/KMeansDALImpl.scala index 64b2f6c7f..4468bcb00 100644 --- a/mllib-dal/src/main/scala/com/intel/oap/mllib/clustering/KMeansDALImpl.scala +++ b/mllib-dal/src/main/scala/com/intel/oap/mllib/clustering/KMeansDALImpl.scala @@ -51,6 +51,16 @@ class KMeansDALImpl(var nClusters: Int, kmeansTimer.record("Data Convertion") val kvsIPPort = getOneCCLIPPort(coalescedTables) + coalescedTables.mapPartitionsWithIndex { (rank, iter) => + val gpuIndices = if (useDevice == "GPU") { + val resources = TaskContext.get().resources() + resources("gpu").addresses.map(_.toInt) + } else { + null + } + OneCCL.setAffinityMask(gpuIndices(0).toString()) + Iterator.empty + }.count() coalescedTables.mapPartitionsWithIndex { (rank, table) => OneCCL.init(executorNum, rank, kvsIPPort) diff --git a/mllib-dal/src/main/scala/com/intel/oap/mllib/feature/PCADALImpl.scala b/mllib-dal/src/main/scala/com/intel/oap/mllib/feature/PCADALImpl.scala index 8eb9554a1..381802f8c 100644 --- a/mllib-dal/src/main/scala/com/intel/oap/mllib/feature/PCADALImpl.scala +++ b/mllib-dal/src/main/scala/com/intel/oap/mllib/feature/PCADALImpl.scala @@ -59,6 +59,17 @@ class PCADALImpl(val k: Int, val kvsIPPort = getOneCCLIPPort(coalescedTables) pcaTimer.record("Data Convertion") + coalescedTables.mapPartitionsWithIndex { (rank, iter) => + val gpuIndices = if (useDevice == "GPU") { + val resources = TaskContext.get().resources() + resources("gpu").addresses.map(_.toInt) + } else { + null + } + OneCCL.setAffinityMask(gpuIndices(0).toString()) + Iterator.empty + }.count() + coalescedTables.mapPartitionsWithIndex { (rank, table) => OneCCL.init(executorNum, rank, kvsIPPort) Iterator.empty diff --git a/mllib-dal/src/main/scala/com/intel/oap/mllib/regression/LinearRegressionDALImpl.scala b/mllib-dal/src/main/scala/com/intel/oap/mllib/regression/LinearRegressionDALImpl.scala index f95bc0846..c745fba24 100644 --- a/mllib-dal/src/main/scala/com/intel/oap/mllib/regression/LinearRegressionDALImpl.scala +++ b/mllib-dal/src/main/scala/com/intel/oap/mllib/regression/LinearRegressionDALImpl.scala @@ -106,6 +106,17 @@ class LinearRegressionDALImpl( val fitIntercept: Boolean, } lrTimer.record("Data Convertion") + labeledPointsTables.mapPartitionsWithIndex { (rank, iter) => + val gpuIndices = if (useDevice == "GPU") { + val resources = TaskContext.get().resources() + resources("gpu").addresses.map(_.toInt) + } else { + null + } + OneCCL.setAffinityMask(gpuIndices(0).toString()) + Iterator.empty + }.count() + val results = labeledPointsTables.mapPartitionsWithIndex { (rank, tables) => val (feature, label) = tables.next() val (featureTabAddr : Long, featureRows : Long, featureColumns : Long) = diff --git a/mllib-dal/src/main/scala/com/intel/oap/mllib/regression/RandomForestRegressorDALImpl.scala b/mllib-dal/src/main/scala/com/intel/oap/mllib/regression/RandomForestRegressorDALImpl.scala index 018473a61..e24c664da 100644 --- a/mllib-dal/src/main/scala/com/intel/oap/mllib/regression/RandomForestRegressorDALImpl.scala +++ b/mllib-dal/src/main/scala/com/intel/oap/mllib/regression/RandomForestRegressorDALImpl.scala @@ -69,6 +69,17 @@ class RandomForestRegressorDALImpl(val uid: String, val kvsIPPort = getOneCCLIPPort(labeledPointsTables) + labeledPointsTables.mapPartitionsWithIndex { (rank, iter) => + val gpuIndices = if (useDevice == "GPU") { + val resources = TaskContext.get().resources() + resources("gpu").addresses.map(_.toInt) + } else { + null + } + OneCCL.setAffinityMask(gpuIndices(0).toString()) + Iterator.empty + }.count() + labeledPointsTables.mapPartitionsWithIndex { (rank, table) => OneCCL.init(executorNum, rank, kvsIPPort) Iterator.empty diff --git a/mllib-dal/src/main/scala/com/intel/oap/mllib/stat/CorrelationDALImpl.scala b/mllib-dal/src/main/scala/com/intel/oap/mllib/stat/CorrelationDALImpl.scala index e521aefe7..fafbe2cd6 100644 --- a/mllib-dal/src/main/scala/com/intel/oap/mllib/stat/CorrelationDALImpl.scala +++ b/mllib-dal/src/main/scala/com/intel/oap/mllib/stat/CorrelationDALImpl.scala @@ -52,6 +52,17 @@ class CorrelationDALImpl( }.count() corTimer.record("OneCCL Init") + coalescedTables.mapPartitionsWithIndex { (rank, iter) => + val gpuIndices = if (useDevice == "GPU") { + val resources = TaskContext.get().resources() + resources("gpu").addresses.map(_.toInt) + } else { + null + } + OneCCL.setAffinityMask(gpuIndices(0).toString()) + Iterator.empty + }.count() + val results = coalescedTables.mapPartitionsWithIndex { (rank, iter) => val (tableArr : Long, rows : Long, columns : Long) = if (useDevice == "GPU") { iter.next() diff --git a/mllib-dal/src/main/scala/com/intel/oap/mllib/stat/SummarizerDALImpl.scala b/mllib-dal/src/main/scala/com/intel/oap/mllib/stat/SummarizerDALImpl.scala index 277039ab1..cd6a0020a 100644 --- a/mllib-dal/src/main/scala/com/intel/oap/mllib/stat/SummarizerDALImpl.scala +++ b/mllib-dal/src/main/scala/com/intel/oap/mllib/stat/SummarizerDALImpl.scala @@ -53,6 +53,17 @@ class SummarizerDALImpl(val executorNum: Int, }.count() sumTimer.record("OneCCL Init") + coalescedTables.mapPartitionsWithIndex { (rank, iter) => + val gpuIndices = if (useDevice == "GPU") { + val resources = TaskContext.get().resources() + resources("gpu").addresses.map(_.toInt) + } else { + null + } + OneCCL.setAffinityMask(gpuIndices(0).toString()) + Iterator.empty + }.count() + val results = coalescedTables.mapPartitionsWithIndex { (rank, iter) => val (tableArr : Long, rows : Long, columns : Long) = if (useDevice == "GPU") { iter.next()