Skip to content

Commit

Permalink
[GLUTEN-6960][VL] Limit Velox untracked global memory manager's usage (
Browse files Browse the repository at this point in the history
  • Loading branch information
zhztheplayer authored and shamirchen committed Oct 14, 2024
1 parent 13952e3 commit dd5ceeb
Show file tree
Hide file tree
Showing 6 changed files with 67 additions and 18 deletions.
2 changes: 2 additions & 0 deletions cpp/core/config/GlutenConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ const std::string kIgnoreMissingFiles = "spark.sql.files.ignoreMissingFiles";

const std::string kDefaultSessionTimezone = "spark.gluten.sql.session.timeZone.default";

const std::string kSparkOverheadMemory = "spark.gluten.memoryOverhead.size.in.bytes";

const std::string kSparkOffHeapMemory = "spark.gluten.memory.offHeap.size.in.bytes";

const std::string kSparkTaskOffHeapMemory = "spark.gluten.memory.task.offHeap.size.in.bytes";
Expand Down
14 changes: 12 additions & 2 deletions cpp/velox/compute/VeloxBackend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,18 @@ void VeloxBackend::init(const std::unordered_map<std::string, std::string>& conf
initUdf();
registerSparkTokenizer();

// initialize the global memory manager for current process
facebook::velox::memory::MemoryManager::initialize({});
// Initialize the global memory manager for current process.
auto sparkOverhead = backendConf_->get<int64_t>(kSparkOverheadMemory);
int64_t memoryManagerCapacity;
if (sparkOverhead.hasValue()) {
// 0.75 * total overhead memory is used for Velox global memory manager.
// FIXME: Make this configurable.
memoryManagerCapacity = sparkOverhead.value() * 0.75;
} else {
memoryManagerCapacity = facebook::velox::memory::kMaxMemory;
}
LOG(INFO) << "Setting global Velox memory manager with capacity: " << memoryManagerCapacity;
facebook::velox::memory::MemoryManager::initialize({.allocatorCapacity = memoryManagerCapacity});
}

facebook::velox::cache::AsyncDataCache* VeloxBackend::getAsyncDataCache() const {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,8 @@ public long borrow(long size) {
.append(
String.format(
"\t%s=%s",
GlutenConfig$.MODULE$.GLUTEN_OFFHEAP_ENABLED(),
SQLConf.get().getConfString(GlutenConfig$.MODULE$.GLUTEN_OFFHEAP_ENABLED())))
GlutenConfig$.MODULE$.SPARK_OFFHEAP_ENABLED(),
SQLConf.get().getConfString(GlutenConfig$.MODULE$.SPARK_OFFHEAP_ENABLED())))
.append(System.lineSeparator())
.append(
String.format(
Expand Down
30 changes: 20 additions & 10 deletions gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala
Original file line number Diff line number Diff line change
Expand Up @@ -208,13 +208,13 @@ private[gluten] class GlutenDriverPlugin extends DriverPlugin with Logging {
val minOffHeapSize = "1MB"
if (
!conf.getBoolean(GlutenConfig.GLUTEN_DYNAMIC_OFFHEAP_SIZING_ENABLED, false) &&
(!conf.getBoolean(GlutenConfig.GLUTEN_OFFHEAP_ENABLED, false) ||
conf.getSizeAsBytes(GlutenConfig.GLUTEN_OFFHEAP_SIZE_KEY, 0) < JavaUtils.byteStringAsBytes(
(!conf.getBoolean(GlutenConfig.SPARK_OFFHEAP_ENABLED, false) ||
conf.getSizeAsBytes(GlutenConfig.SPARK_OFFHEAP_SIZE_KEY, 0) < JavaUtils.byteStringAsBytes(
minOffHeapSize))
) {
throw new GlutenException(
s"Must set '${GlutenConfig.GLUTEN_OFFHEAP_ENABLED}' to true " +
s"and set '${GlutenConfig.GLUTEN_OFFHEAP_SIZE_KEY}' to be greater than $minOffHeapSize")
s"Must set '${GlutenConfig.SPARK_OFFHEAP_ENABLED}' to true " +
s"and set '${GlutenConfig.SPARK_OFFHEAP_SIZE_KEY}' to be greater than $minOffHeapSize")
}

// Session's local time zone must be set. If not explicitly set by user, its default
Expand All @@ -226,13 +226,23 @@ private[gluten] class GlutenDriverPlugin extends DriverPlugin with Logging {
conf.set(GlutenConfig.GLUTEN_NUM_TASK_SLOTS_PER_EXECUTOR_KEY, taskSlots.toString)

val onHeapSize: Long =
if (conf.contains(GlutenConfig.GLUTEN_ONHEAP_SIZE_KEY)) {
conf.getSizeAsBytes(GlutenConfig.GLUTEN_ONHEAP_SIZE_KEY)
if (conf.contains(GlutenConfig.SPARK_ONHEAP_SIZE_KEY)) {
conf.getSizeAsBytes(GlutenConfig.SPARK_ONHEAP_SIZE_KEY)
} else {
// 1GB default
1024 * 1024 * 1024
}

val overheadSize: Long = SparkResourceUtil.getMemoryOverheadSize(conf)
conf.set(GlutenConfig.GLUTEN_OVERHEAD_SIZE_IN_BYTES_KEY, overheadSize.toString)

// FIXME: The following is a workaround. Remove once the causes are fixed.
conf.set(GlutenConfig.GLUTEN_OVERHEAD_SIZE_IN_BYTES_KEY, Long.MaxValue.toString)
logWarning(
"Setting overhead memory that Gluten can use to UNLIMITED. This is currently a" +
" temporary solution to avoid OOM by Velox's global memory pools." +
" See GLUTEN-6960 for more information.")

// If dynamic off-heap sizing is enabled, the off-heap size is calculated based on the on-heap
// size. Otherwise, the off-heap size is set to the value specified by the user (if any).
// Note that this means that we will IGNORE the off-heap size specified by the user if the
Expand All @@ -252,25 +262,25 @@ private[gluten] class GlutenDriverPlugin extends DriverPlugin with Logging {
// The 300MB value, unfortunately, is hard-coded in Spark code.
((onHeapSize - (300 * 1024 * 1024)) *
conf.getDouble(GlutenConfig.GLUTEN_DYNAMIC_OFFHEAP_SIZING_MEMORY_FRACTION, 0.6d)).toLong
} else if (conf.contains(GlutenConfig.GLUTEN_OFFHEAP_SIZE_KEY)) {
} else if (conf.contains(GlutenConfig.SPARK_OFFHEAP_SIZE_KEY)) {
// Optimistic off-heap sizes, assuming all storage memory can be borrowed into execution
// memory pool, regardless of Spark option spark.memory.storageFraction.
conf.getSizeAsBytes(GlutenConfig.GLUTEN_OFFHEAP_SIZE_KEY)
conf.getSizeAsBytes(GlutenConfig.SPARK_OFFHEAP_SIZE_KEY)
} else {
// Default Spark Value.
0L
}

conf.set(GlutenConfig.GLUTEN_OFFHEAP_SIZE_IN_BYTES_KEY, offHeapSize.toString)
conf.set(GlutenConfig.GLUTEN_OFFHEAP_SIZE_KEY, offHeapSize.toString)
conf.set(GlutenConfig.SPARK_OFFHEAP_SIZE_KEY, offHeapSize.toString)

val offHeapPerTask = offHeapSize / taskSlots
conf.set(GlutenConfig.GLUTEN_TASK_OFFHEAP_SIZE_IN_BYTES_KEY, offHeapPerTask.toString)

// If we are using dynamic off-heap sizing, we should also enable off-heap memory
// officially.
if (conf.getBoolean(GlutenConfig.GLUTEN_DYNAMIC_OFFHEAP_SIZING_ENABLED, false)) {
conf.set(GlutenConfig.GLUTEN_OFFHEAP_ENABLED, "true")
conf.set(GlutenConfig.SPARK_OFFHEAP_ENABLED, "true")

// We already sized the off-heap per task in a conservative manner, so we can just
// use it.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package org.apache.spark.util

import org.apache.spark.{SparkConf, SparkMasterRegex}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.{EXECUTOR_MEMORY, EXECUTOR_MEMORY_OVERHEAD}
import org.apache.spark.network.util.ByteUnit
import org.apache.spark.sql.internal.SQLConf

object SparkResourceUtil extends Logging {
Expand Down Expand Up @@ -80,4 +82,15 @@ object SparkResourceUtil extends Logging {
def isLocalMaster(conf: SparkConf): Boolean = {
Utils.isLocalMaster(conf)
}

def getMemoryOverheadSize(conf: SparkConf): Long = {
val overheadMib = conf.get(EXECUTOR_MEMORY_OVERHEAD).getOrElse {
val executorMemMib = conf.get(EXECUTOR_MEMORY)
val factor =
conf.getDouble("spark.executor.memoryOverheadFactor", 0.1d)
val minMib = conf.getLong("spark.executor.minMemoryOverhead", 384L)
(executorMemMib * factor).toLong.max(minMib)
}
ByteUnit.MiB.toBytes(overheadMib)
}
}
22 changes: 18 additions & 4 deletions shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -535,9 +535,11 @@ object GlutenConfig {
val GLUTEN_CONFIG_PREFIX = "spark.gluten.sql.columnar.backend."

// Private Spark configs.
val GLUTEN_ONHEAP_SIZE_KEY = "spark.executor.memory"
val GLUTEN_OFFHEAP_SIZE_KEY = "spark.memory.offHeap.size"
val GLUTEN_OFFHEAP_ENABLED = "spark.memory.offHeap.enabled"
val SPARK_ONHEAP_SIZE_KEY = "spark.executor.memory"
val SPARK_OVERHEAD_SIZE_KEY = "spark.executor.memoryOverhead"
val SPARK_OVERHEAD_FACTOR_KEY = "spark.executor.memoryOverheadFactor"
val SPARK_OFFHEAP_SIZE_KEY = "spark.memory.offHeap.size"
val SPARK_OFFHEAP_ENABLED = "spark.memory.offHeap.enabled"
val SPARK_REDACTION_REGEX = "spark.redaction.regex"

// For Soft Affinity Scheduling
Expand Down Expand Up @@ -570,6 +572,7 @@ object GlutenConfig {

// Added back to Spark Conf during executor initialization
val GLUTEN_NUM_TASK_SLOTS_PER_EXECUTOR_KEY = "spark.gluten.numTaskSlotsPerExecutor"
val GLUTEN_OVERHEAD_SIZE_IN_BYTES_KEY = "spark.gluten.memoryOverhead.size.in.bytes"
val GLUTEN_OFFHEAP_SIZE_IN_BYTES_KEY = "spark.gluten.memory.offHeap.size.in.bytes"
val GLUTEN_TASK_OFFHEAP_SIZE_IN_BYTES_KEY = "spark.gluten.memory.task.offHeap.size.in.bytes"
val GLUTEN_CONSERVATIVE_TASK_OFFHEAP_SIZE_IN_BYTES_KEY =
Expand Down Expand Up @@ -768,9 +771,10 @@ object GlutenConfig {
SPARK_SQL_PARQUET_COMPRESSION_CODEC,
// datasource config end

GLUTEN_OVERHEAD_SIZE_IN_BYTES_KEY,
GLUTEN_OFFHEAP_SIZE_IN_BYTES_KEY,
GLUTEN_TASK_OFFHEAP_SIZE_IN_BYTES_KEY,
GLUTEN_OFFHEAP_ENABLED,
SPARK_OFFHEAP_ENABLED,
SESSION_LOCAL_TIMEZONE.key,
DECIMAL_OPERATIONS_ALLOW_PREC_LOSS.key,
SPARK_REDACTION_REGEX
Expand Down Expand Up @@ -1250,6 +1254,16 @@ object GlutenConfig {
.intConf
.createWithDefaultString("-1")

val COLUMNAR_OVERHEAD_SIZE_IN_BYTES =
buildConf(GlutenConfig.GLUTEN_OVERHEAD_SIZE_IN_BYTES_KEY)
.internal()
.doc(
"Must provide default value since non-execution operations " +
"(e.g. org.apache.spark.sql.Dataset#summary) doesn't propagate configurations using " +
"org.apache.spark.sql.execution.SQLExecution#withSQLConfPropagated")
.bytesConf(ByteUnit.BYTE)
.createWithDefaultString("0")

val COLUMNAR_OFFHEAP_SIZE_IN_BYTES =
buildConf(GlutenConfig.GLUTEN_OFFHEAP_SIZE_IN_BYTES_KEY)
.internal()
Expand Down

0 comments on commit dd5ceeb

Please sign in to comment.