Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cap the max data UnsafeExternalSorter can store #55

Open
wants to merge 1 commit into
base: v3.3-lyft
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.spark.SparkEnv;
import org.apache.spark.TaskContext;
import org.apache.spark.executor.ShuffleWriteMetrics;
import org.apache.spark.memory.MemoryConsumer;
Expand All @@ -43,6 +44,7 @@
import org.apache.spark.unsafe.memory.MemoryBlock;
import org.apache.spark.util.Utils;


/**
* External sorter based on {@link UnsafeInMemorySorter}.
*/
Expand Down Expand Up @@ -74,6 +76,7 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
* Force this sorter to spill when there are this many elements in memory.
*/
private final int numElementsForSpillThreshold;
private final long maxMemoryBeforeSpill;

/**
* Memory pages that hold the records being sorted. The pages in this list are freed when
Expand Down Expand Up @@ -174,6 +177,7 @@ private UnsafeExternalSorter(
}
this.peakMemoryUsedBytes = getMemoryUsage();
this.numElementsForSpillThreshold = numElementsForSpillThreshold;
this.maxMemoryBeforeSpill = SparkEnv.get().conf().getSizeAsMb("spark.lyft.spill.mb", "0m") * 1024 * 1024;

// Register a cleanup task with TaskContext to ensure that memory is guaranteed to be freed at
// the end of the task. This is necessary to avoid memory leaks in when the downstream operator
Expand Down Expand Up @@ -393,6 +397,8 @@ private void growPointerArrayIfNecessary() throws IOException {
// The pointer array is too big to fix in a single page, spill.
spill();
} catch (SparkOutOfMemoryError e) {
logger.error("SparkOutOfMemoryException: memory used {}, records {}, threshold {}",
used, inMemSorter.numRecords(), numElementsForSpillThreshold);
if (inMemSorter.numRecords() > 0) {
logger.error("Unable to grow the pointer array");
throw e;
Expand Down Expand Up @@ -479,6 +485,13 @@ public void insertRecord(
spill();
}

if (maxMemoryBeforeSpill > 0 && inMemSorter.getMemoryUsage() >= maxMemoryBeforeSpill)
{
logger.info("Spilling data because the memory usage crossed the threshold " +
"{} used {} ", maxMemoryBeforeSpill, inMemSorter.getMemoryUsage());
spill();
}

final int uaoSize = UnsafeAlignedOffset.getUaoSize();
// Need 4 or 8 bytes to store the record length.
final int required = length + uaoSize;
Expand Down