Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
kecookier committed Dec 6, 2024
1 parent 62d3d59 commit 38dd5a0
Showing 1 changed file with 29 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,18 @@
*/
package org.apache.gluten.memory.memtarget;

import org.apache.gluten.GlutenConfig;
import org.apache.gluten.memory.MemoryUsageStatsBuilder;
import org.apache.gluten.memory.SimpleMemoryUsageRecorder;
import org.apache.gluten.proto.MemoryUsageStats;

import com.google.common.base.Preconditions;
import org.apache.spark.SparkEnv;
import org.apache.spark.memory.SparkMemoryUtil;
import org.apache.spark.util.SparkResourceUtil;
import org.apache.spark.util.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -86,6 +92,7 @@ private static long spillTree(TreeMemoryTarget node, Spiller.Phase phase, final

// non-root nodes are not Spark memory consumer
public static class Node implements TreeMemoryTarget, KnownNameAndStats {
private static final Logger LOGGER = LoggerFactory.getLogger(Node.class);
private final Map<String, Node> children = new HashMap<>();
private final TreeMemoryTarget parent;
private final String name;
Expand Down Expand Up @@ -137,22 +144,42 @@ public Spiller getNodeSpiller() {
}

private boolean ensureFreeCapacity(long bytesNeeded) {
boolean result = false;
while (true) { // FIXME should we add retry limit?
long freeBytes = freeBytes();
Preconditions.checkState(freeBytes >= 0);
if (freeBytes >= bytesNeeded) {
// free bytes fit requirement
return true;
result = true;
break;
}
// spill
long bytesToSpill = bytesNeeded - freeBytes;
long spilledBytes = TreeMemoryTargets.spillTree(this, bytesToSpill);
Preconditions.checkState(spilledBytes >= 0);
if (spilledBytes == 0) {
// OOM
return false;
result = false;
break;
}
}

if (result) {
// if multi-slot and in shared mode, retry spill more memory.
if (SparkResourceUtil.getTaskSlots(SparkEnv.get().conf()) > 1
&& !GlutenConfig.getConf().memoryIsolation()) {
long overUsed =
SparkMemoryUtil.getTaskOffheapMemoryUsage()
- GlutenConfig.getConf().taskOffHeapMemorySize();
if (overUsed > 0) {
// spill
long bytesToSpill = bytesNeeded + overUsed;
LOGGER.debug("Exceed perTaskLimit, try spill:{}", bytesToSpill);
TreeMemoryTargets.spillTree(this, bytesToSpill);
}
}
}
return result;
}

@Override
Expand Down

0 comments on commit 38dd5a0

Please sign in to comment.