diff --git a/backends-velox/src/test/scala/org/apache/gluten/execution/DynamicOffHeapSizingTest.scala b/backends-velox/src/test/scala/org/apache/gluten/execution/DynamicOffHeapSizingTest.scala new file mode 100644 index 000000000000..56fc6eac3e11 --- /dev/null +++ b/backends-velox/src/test/scala/org/apache/gluten/execution/DynamicOffHeapSizingTest.scala @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.execution + +import org.apache.gluten.benchmarks.RandomParquetDataGenerator +import org.apache.gluten.tags.SkipTestTags + +import org.apache.spark.SparkConf + +@SkipTestTags +class DynamicOffHeapSizingTest extends VeloxWholeStageTransformerSuite { + override protected val resourcePath: String = "/tpch-data-parquet-velox" + override protected val fileFormat: String = "parquet" + + private val dataGenerator = RandomParquetDataGenerator(System.currentTimeMillis()) + private val outputPath = getClass.getResource("/").getPath + "dynamicoffheapsizing_output.parquet" + private val AGG_SQL = + """select f_1, count(DISTINCT f_1) + |from tbl group + |group by 1""".stripMargin + + override def beforeAll(): Unit = { + super.beforeAll() + } + override protected def sparkConf: SparkConf = { + super.sparkConf + .set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.ColumnarShuffleManager") + .set("spark.executor.memory", "6GB") + .set("spark.gluten.memory.dynamic.offHeap.sizing.memory.fraction", "0.8") + .set("spark.gluten.memory.dynamic.offHeap.sizing.enabled", "true") + } + + def getRootCause(e: Throwable): Throwable = { + if (e.getCause == null) { + return e + } + getRootCause(e.getCause) + } + + test("Dynamic Off-Heap Sizing") { + System.gc() + dataGenerator.generateRandomData(spark, Some(outputPath)) + spark.read.format("parquet").load(outputPath).createOrReplaceTempView("tbl") + spark.sql(AGG_SQL) + } +} diff --git a/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/DynamicOffHeapSizingMemoryTarget.java b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/DynamicOffHeapSizingMemoryTarget.java new file mode 100644 index 000000000000..b7f15d830bed --- /dev/null +++ b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/DynamicOffHeapSizingMemoryTarget.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.memory.memtarget; + +import org.apache.gluten.GlutenConfig; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.atomic.AtomicLong; + +public class DynamicOffHeapSizingMemoryTarget implements MemoryTarget { + private static final Logger LOG = LoggerFactory.getLogger(DynamicOffHeapSizingMemoryTarget.class); + private final MemoryTarget delegated; + // When dynamic off-heap sizing is enabled, the off-heap should be sized for the total usable + // memory, so we can use it as the max memory we will use. + private static final long MAX_MEMORY_IN_BYTES = GlutenConfig.getConf().offHeapMemorySize(); + private static final AtomicLong USED_OFFHEAP_BYTES = new AtomicLong(); + + public DynamicOffHeapSizingMemoryTarget(MemoryTarget delegated) { + this.delegated = delegated; + } + + @Override + public long borrow(long size) { + if (size == 0) { + return 0; + } + + long totalMemory = Runtime.getRuntime().totalMemory(); + long freeMemory = Runtime.getRuntime().freeMemory(); + long usedOnHeapBytes = (totalMemory - freeMemory); + long usedOffHeapBytesNow = USED_OFFHEAP_BYTES.get(); + + if (size + usedOffHeapBytesNow + usedOnHeapBytes > MAX_MEMORY_IN_BYTES) { + LOG.warn( + String.format( + "Failing allocation as unified memory is OOM. " + + "Used Off-heap: %d, Used On-Heap: %d, " + + "Free On-heap: %d, Total On-heap: %d, " + + "Max On-heap: %d, Allocation: %d.", + usedOffHeapBytesNow, + usedOnHeapBytes, + freeMemory, + totalMemory, + MAX_MEMORY_IN_BYTES, + size)); + + return 0; + } + + long reserved = delegated.borrow(size); + + USED_OFFHEAP_BYTES.addAndGet(reserved); + + return reserved; + } + + @Override + public long repay(long size) { + long unreserved = delegated.repay(size); + + USED_OFFHEAP_BYTES.addAndGet(-unreserved); + + return unreserved; + } + + @Override + public long usedBytes() { + return delegated.usedBytes(); + } + + @Override + public T accept(MemoryTargetVisitor visitor) { + return visitor.visit(this); + } + + public MemoryTarget delegated() { + return delegated; + } +} diff --git a/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/MemoryTargetVisitor.java b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/MemoryTargetVisitor.java index caff2605d923..e58dbb295b08 100644 --- a/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/MemoryTargetVisitor.java +++ b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/MemoryTargetVisitor.java @@ -33,4 +33,6 @@ public interface MemoryTargetVisitor { T visit(LoggingMemoryTarget loggingMemoryTarget); T visit(NoopMemoryTarget noopMemoryTarget); + + T visit(DynamicOffHeapSizingMemoryTarget dynamicOffHeapSizingMemoryTarget); } diff --git a/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/MemoryTargets.java b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/MemoryTargets.java index ccb4beee8475..2d6fc0748464 100644 --- a/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/MemoryTargets.java +++ b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/MemoryTargets.java @@ -43,6 +43,14 @@ public static MemoryTarget overAcquire( return new OverAcquire(target, overTarget, overAcquiredRatio); } + public static MemoryTarget dynamicOffHeapSizingIfEnabled(MemoryTarget memoryTarget) { + if (GlutenConfig.getConf().dynamicOffHeapSizingEnabled()) { + return new DynamicOffHeapSizingMemoryTarget(memoryTarget); + } + + return memoryTarget; + } + public static MemoryTarget newConsumer( TaskMemoryManager tmm, String name, @@ -54,6 +62,7 @@ public static MemoryTarget newConsumer( } else { factory = TreeMemoryConsumers.shared(); } - return factory.newConsumer(tmm, name, spillers, virtualChildren); + + return dynamicOffHeapSizingIfEnabled(factory.newConsumer(tmm, name, spillers, virtualChildren)); } } diff --git a/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/ThrowOnOomMemoryTarget.java b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/ThrowOnOomMemoryTarget.java index 6621f3b1683f..e6b6ba07eb6b 100644 --- a/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/ThrowOnOomMemoryTarget.java +++ b/gluten-core/src/main/java/org/apache/gluten/memory/memtarget/ThrowOnOomMemoryTarget.java @@ -52,8 +52,10 @@ public long borrow(long size) { .append( String.format( "Not enough spark off-heap execution memory. Acquired: %s, granted: %s. " - + "Try tweaking config option spark.memory.offHeap.size to get larger space " - + "to run this application. %n", + + "Try tweaking config option spark.memory.offHeap.size to get larger " + + "space to run this application " + + "(if spark.gluten.memory.dynamic.offHeap.sizing.enabled " + + "is not enabled). %n", Utils.bytesToString(size), Utils.bytesToString(granted))) .append("Current config settings: ") .append(System.lineSeparator()) @@ -83,6 +85,19 @@ public long borrow(long size) { .getConfString( GlutenConfig$.MODULE$ .GLUTEN_CONSERVATIVE_TASK_OFFHEAP_SIZE_IN_BYTES_KEY())))) + .append(System.lineSeparator()) + .append( + String.format( + "\t%s=%s", + GlutenConfig$.MODULE$.GLUTEN_OFFHEAP_ENABLED(), + SQLConf.get().getConfString(GlutenConfig$.MODULE$.GLUTEN_OFFHEAP_ENABLED()))) + .append(System.lineSeparator()) + .append( + String.format( + "\t%s=%s", + GlutenConfig$.MODULE$.GLUTEN_DYNAMIC_OFFHEAP_SIZING_ENABLED(), + SQLConf.get() + .getConfString(GlutenConfig$.MODULE$.GLUTEN_DYNAMIC_OFFHEAP_SIZING_ENABLED()))) .append(System.lineSeparator()); // Dump all consumer usages to exception body errorBuilder.append(SparkMemoryUtil.dumpMemoryTargetStats(target)); diff --git a/gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala b/gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala index adb3f418907f..6c3d62c1e207 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala @@ -148,9 +148,10 @@ private[gluten] class GlutenDriverPlugin extends DriverPlugin with Logging { // check memory off-heap enabled and size val minOffHeapSize = "1MB" if ( - !conf.getBoolean(GlutenConfig.GLUTEN_OFFHEAP_ENABLED, false) || - conf.getSizeAsBytes(GlutenConfig.GLUTEN_OFFHEAP_SIZE_KEY, 0) < JavaUtils.byteStringAsBytes( - minOffHeapSize) + !conf.getBoolean(GlutenConfig.GLUTEN_DYNAMIC_OFFHEAP_SIZING_ENABLED, false) && + (!conf.getBoolean(GlutenConfig.GLUTEN_OFFHEAP_ENABLED, false) || + conf.getSizeAsBytes(GlutenConfig.GLUTEN_OFFHEAP_SIZE_KEY, 0) < JavaUtils.byteStringAsBytes( + minOffHeapSize)) ) { throw new GlutenException( s"Must set '${GlutenConfig.GLUTEN_OFFHEAP_ENABLED}' to true " + @@ -164,20 +165,71 @@ private[gluten] class GlutenDriverPlugin extends DriverPlugin with Logging { // task slots val taskSlots = SparkResourceUtil.getTaskSlots(conf) - // Optimistic off-heap sizes, assuming all storage memory can be borrowed into execution memory - // pool, regardless of Spark option spark.memory.storageFraction. - val offHeapSize = conf.getSizeAsBytes(GlutenConfig.GLUTEN_OFFHEAP_SIZE_KEY) + var onHeapSize: Long = + if (conf.contains(GlutenConfig.GLUTEN_ONHEAP_SIZE_KEY)) { + conf.getSizeAsBytes(GlutenConfig.GLUTEN_ONHEAP_SIZE_KEY) + } else { + // 1GB default + 1024 * 1024 * 1024 + } + + // If dynamic off-heap sizing is enabled, the off-heap size is calculated based on the on-heap + // size. Otherwise, the off-heap size is set to the value specified by the user (if any). + // Note that this means that we will IGNORE the off-heap size specified by the user if the + // dynamic off-heap feature is enabled. + var offHeapSize: Long = + if (conf.getBoolean(GlutenConfig.GLUTEN_DYNAMIC_OFFHEAP_SIZING_ENABLED, false)) { + // Since when dynamic off-heap sizing is enabled, we commingle on-heap + // and off-heap memory, we set the off-heap size to the usable on-heap size. We will + // size it with a memory fraction, which can be aggressively set, but the default + // is using the same way that Spark sizes on-heap memory: + // + // spark.gluten.memory.dynamic.offHeap.sizing.memory.fraction * + // (spark.executor.memory - 300MB). + // + // We will be careful to use the same configuration settings as Spark to ensure + // that we are sizing the off-heap memory in the same way as Spark sizes on-heap memory. + // The 300MB value, unfortunately, is hard-coded in Spark code. + ((onHeapSize - (300 * 1024 * 1024)) * + conf.getDouble(GlutenConfig.GLUTEN_DYNAMIC_OFFHEAP_SIZING_MEMORY_FRACTION, 0.6d)).toLong + } else if (conf.contains(GlutenConfig.GLUTEN_OFFHEAP_SIZE_KEY)) { + // Optimistic off-heap sizes, assuming all storage memory can be borrowed into execution + // memory pool, regardless of Spark option spark.memory.storageFraction. + conf.getSizeAsBytes(GlutenConfig.GLUTEN_OFFHEAP_SIZE_KEY) + } else { + // Default Spark Value. + 0L + } + conf.set(GlutenConfig.GLUTEN_OFFHEAP_SIZE_IN_BYTES_KEY, offHeapSize.toString) + conf.set(GlutenConfig.GLUTEN_OFFHEAP_SIZE_KEY, offHeapSize.toString) + val offHeapPerTask = offHeapSize / taskSlots conf.set(GlutenConfig.GLUTEN_TASK_OFFHEAP_SIZE_IN_BYTES_KEY, offHeapPerTask.toString) - // Pessimistic off-heap sizes, with the assumption that all non-borrowable storage memory - // determined by spark.memory.storageFraction was used. - val fraction = 1.0d - conf.getDouble("spark.memory.storageFraction", 0.5d) - val conservativeOffHeapPerTask = (offHeapSize * fraction).toLong / taskSlots - conf.set( - GlutenConfig.GLUTEN_CONSERVATIVE_TASK_OFFHEAP_SIZE_IN_BYTES_KEY, - conservativeOffHeapPerTask.toString) + // If we are using dynamic off-heap sizing, we should also enable off-heap memory + // officially. + if (conf.getBoolean(GlutenConfig.GLUTEN_DYNAMIC_OFFHEAP_SIZING_ENABLED, false)) { + conf.set(GlutenConfig.GLUTEN_OFFHEAP_ENABLED, "true") + + // We already sized the off-heap per task in a conservative manner, so we can just + // use it. + conf.set( + GlutenConfig.GLUTEN_CONSERVATIVE_TASK_OFFHEAP_SIZE_IN_BYTES_KEY, + offHeapPerTask.toString) + } else { + // Let's make sure this is set to false explicitly if it is not on as it + // is looked up when throwing OOF exceptions. + conf.set(GlutenConfig.GLUTEN_DYNAMIC_OFFHEAP_SIZING_ENABLED, "false") + + // Pessimistic off-heap sizes, with the assumption that all non-borrowable storage memory + // determined by spark.memory.storageFraction was used. + val fraction = 1.0d - conf.getDouble("spark.memory.storageFraction", 0.5d) + val conservativeOffHeapPerTask = (offHeapSize * fraction).toLong / taskSlots + conf.set( + GlutenConfig.GLUTEN_CONSERVATIVE_TASK_OFFHEAP_SIZE_IN_BYTES_KEY, + conservativeOffHeapPerTask.toString) + } // disable vanilla columnar readers, to prevent columnar-to-columnar conversions if (BackendsApiManager.getSettings.disableVanillaColumnarReaders(conf)) { diff --git a/gluten-core/src/main/scala/org/apache/spark/memory/SparkMemoryUtil.scala b/gluten-core/src/main/scala/org/apache/spark/memory/SparkMemoryUtil.scala index 8bf88ef7d15c..48ed08fb71ce 100644 --- a/gluten-core/src/main/scala/org/apache/spark/memory/SparkMemoryUtil.scala +++ b/gluten-core/src/main/scala/org/apache/spark/memory/SparkMemoryUtil.scala @@ -16,7 +16,7 @@ */ package org.apache.spark.memory -import org.apache.gluten.memory.memtarget.{KnownNameAndStats, LoggingMemoryTarget, MemoryTarget, MemoryTargetVisitor, NoopMemoryTarget, OverAcquire, ThrowOnOomMemoryTarget, TreeMemoryTargets} +import org.apache.gluten.memory.memtarget.{DynamicOffHeapSizingMemoryTarget, KnownNameAndStats, LoggingMemoryTarget, MemoryTarget, MemoryTargetVisitor, NoopMemoryTarget, OverAcquire, ThrowOnOomMemoryTarget, TreeMemoryTargets} import org.apache.gluten.memory.memtarget.spark.{RegularMemoryConsumer, TreeMemoryConsumer} import org.apache.gluten.proto.MemoryUsageStats @@ -117,6 +117,11 @@ object SparkMemoryUtil { override def visit(noopMemoryTarget: NoopMemoryTarget): KnownNameAndStats = { noopMemoryTarget } + + override def visit(dynamicOffHeapSizingMemoryTarget: DynamicOffHeapSizingMemoryTarget) + : KnownNameAndStats = { + dynamicOffHeapSizingMemoryTarget.delegated().accept(this) + } }) } diff --git a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala index 7a501e02deb0..ca8a9dce12c5 100644 --- a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala +++ b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala @@ -395,6 +395,9 @@ class GlutenConfig(conf: SQLConf) extends Logging { def awsSdkLogLevel: String = conf.getConf(AWS_SDK_LOG_LEVEL) def enableCastAvgAggregateFunction: Boolean = conf.getConf(COLUMNAR_NATIVE_CAST_AGGREGATE_ENABLED) + + def dynamicOffHeapSizingEnabled: Boolean = + conf.getConf(DYNAMIC_OFFHEAP_SIZING_ENABLED) } object GlutenConfig { @@ -466,6 +469,7 @@ object GlutenConfig { val GLUTEN_CONFIG_PREFIX = "spark.gluten.sql.columnar.backend." // Private Spark configs. + val GLUTEN_ONHEAP_SIZE_KEY = "spark.executor.memory" val GLUTEN_OFFHEAP_SIZE_KEY = "spark.memory.offHeap.size" val GLUTEN_OFFHEAP_ENABLED = "spark.memory.offHeap.enabled" @@ -543,6 +547,10 @@ object GlutenConfig { val GLUTEN_UI_ENABLED = "spark.gluten.ui.enabled" + val GLUTEN_DYNAMIC_OFFHEAP_SIZING_ENABLED = "spark.gluten.memory.dynamic.offHeap.sizing.enabled" + val GLUTEN_DYNAMIC_OFFHEAP_SIZING_MEMORY_FRACTION = + "spark.gluten.memory.dynamic.offHeap.sizing.memory.fraction" + var ins: GlutenConfig = _ def getConf: GlutenConfig = { @@ -1835,4 +1843,32 @@ object GlutenConfig { .internal() .booleanConf .createWithDefault(true) + + val DYNAMIC_OFFHEAP_SIZING_ENABLED = + buildConf(GlutenConfig.GLUTEN_DYNAMIC_OFFHEAP_SIZING_ENABLED) + .internal() + .doc( + "Experimental: When set to true, the offheap config (spark.memory.offHeap.size) will " + + "be ignored and instead we will consider onheap and offheap memory in combination, " + + "both counting towards the executor memory config (spark.executor.memory). We will " + + "make use of JVM APIs to determine how much onheap memory is use, alongside tracking " + + "offheap allocations made by Gluten. We will then proceed to enforcing a total memory " + + "quota, calculated by the sum of what memory is committed and in use in the Java " + + "heap. Since the calculation of the total quota happens as offheap allocation happens " + + "and not as JVM heap memory is allocated, it is possible that we can oversubscribe " + + "memory. Additionally, note that this change is experimental and may have performance " + + "implications.") + .booleanConf + .createWithDefault(false) + + val DYNAMIC_OFFHEAP_SIZING_MEMORY_FRACTION = + buildConf(GlutenConfig.GLUTEN_DYNAMIC_OFFHEAP_SIZING_MEMORY_FRACTION) + .internal() + .doc( + "Experimental: Determines the memory fraction used to determine the total " + + "memory available for offheap and onheap allocations when the dynamic offheap " + + "sizing feature is enabled. The default is set to match spark.executor.memoryFraction.") + .doubleConf + .checkValue(v => v >= 0 && v <= 1, "offheap sizing memory fraction must between [0, 1]") + .createWithDefault(0.6) }