Skip to content

Commit

Permalink
Limit memory used by ParallelIterable
Browse files Browse the repository at this point in the history
ParallelIterable schedules 2 * WORKER_THREAD_POOL_SIZE tasks for
processing input iterables. This defaults to 2 * # CPU cores.  When one
or some of the input iterables are considerable in size and the
ParallelIterable consumer is not quick enough, this could result in
unbounded allocation inside `ParallelIterator.queue`. This commit bounds
the queue. When queue is full, the tasks yield and get removed from the
executor. They are resumed when consumer catches up.
  • Loading branch information
findepi committed Jul 12, 2024
1 parent d9dbb75 commit 8abc075
Showing 1 changed file with 93 additions and 30 deletions.
123 changes: 93 additions & 30 deletions core/src/main/java/org/apache/iceberg/util/ParallelIterable.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,18 @@

import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import org.apache.iceberg.exceptions.RuntimeIOException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.iceberg.io.CloseableGroup;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.CloseableIterator;
Expand All @@ -50,35 +55,22 @@ public CloseableIterator<T> iterator() {
}

private static class ParallelIterator<T> implements CloseableIterator<T> {
private final Iterator<Runnable> tasks;
// Bound for number of items in the queue to limit memory consumption
// even in the case when input iterables are
private static final int MAX_QUEUE_SIZE = 10_000;

private final Iterator<Task<T>> tasks;
private final Deque<Task<T>> yieldedTasks = new ArrayDeque<>();
private final ExecutorService workerPool;
private final Future<?>[] taskFutures;
private final Future<Optional<Task<T>>>[] taskFutures;
private final ConcurrentLinkedQueue<T> queue = new ConcurrentLinkedQueue<>();
private volatile boolean closed = false;
private final AtomicBoolean closed = new AtomicBoolean(false);

private ParallelIterator(
Iterable<? extends Iterable<T>> iterables, ExecutorService workerPool) {
this.tasks =
Iterables.transform(
iterables,
iterable ->
(Runnable)
() -> {
try (Closeable ignored =
(iterable instanceof Closeable) ? (Closeable) iterable : () -> {}) {
for (T item : iterable) {
// exit manually because `ConcurrentLinkedQueue` can't be
// interrupted
if (closed) {
return;
}

queue.add(item);
}
} catch (IOException e) {
throw new RuntimeIOException(e, "Failed to close iterable");
}
})
iterables, iterable -> new Task<>(iterable, queue, closed, MAX_QUEUE_SIZE))
.iterator();
this.workerPool = workerPool;
// submit 2 tasks per worker at a time
Expand All @@ -88,7 +80,18 @@ private ParallelIterator(
@Override
public void close() {
// close first, avoid new task submit
this.closed = true;
this.closed.set(true);

for (Task<T> task : yieldedTasks) {
try {
task.close();
} catch (Exception e) {
throw new RuntimeException("Close failed", e);
}
}
yieldedTasks.clear();

// TODO close input iterables that were not started yet

// cancel background tasks
for (Future<?> taskFuture : taskFutures) {
Expand All @@ -113,9 +116,10 @@ private boolean checkTasks() {
for (int i = 0; i < taskFutures.length; i += 1) {
if (taskFutures[i] == null || taskFutures[i].isDone()) {
if (taskFutures[i] != null) {
Optional<Task<T>> continuation;
// check for task failure and re-throw any exception
try {
taskFutures[i].get();
continuation = taskFutures[i].get();
} catch (ExecutionException e) {
if (e.getCause() instanceof RuntimeException) {
// rethrow a runtime exception
Expand All @@ -126,6 +130,7 @@ private boolean checkTasks() {
} catch (InterruptedException e) {
throw new RuntimeException("Interrupted while running parallel task", e);
}
continuation.ifPresent(yieldedTasks::addLast);
}

taskFutures[i] = submitNextTask();
Expand All @@ -136,19 +141,20 @@ private boolean checkTasks() {
}
}

return !closed && (tasks.hasNext() || hasRunningTask);
return !closed.get() && (tasks.hasNext() || hasRunningTask);
}

private Future<?> submitNextTask() {
if (!closed && tasks.hasNext()) {
return workerPool.submit(tasks.next());
private Future<Optional<Task<T>>> submitNextTask() {
if (!closed.get() && (!yieldedTasks.isEmpty() || tasks.hasNext())) {
return workerPool.submit(
!yieldedTasks.isEmpty() ? yieldedTasks.removeFirst() : tasks.next());
}
return null;
}

@Override
public synchronized boolean hasNext() {
Preconditions.checkState(!closed, "Already closed");
Preconditions.checkState(!closed.get(), "Already closed");

// if the consumer is processing records more slowly than the producers, then this check will
// prevent tasks from being submitted. while the producers are running, this will always
Expand Down Expand Up @@ -192,4 +198,61 @@ public synchronized T next() {
return queue.poll();
}
}

private static class Task<T> implements Callable<Optional<Task<T>>>, AutoCloseable {
private final Iterable<T> input;
private final ConcurrentLinkedQueue<T> output;
private final AtomicBoolean closed;
private final int maxQueueSize;

private Iterator<T> iterator;

public Task(
Iterable<T> input,
ConcurrentLinkedQueue<T> output,
AtomicBoolean closed,
int maxQueueSize) {
this.input = Objects.requireNonNull(input, "input is null");
this.output = Objects.requireNonNull(output, "output is null");
this.closed = Objects.requireNonNull(closed, "closed is null");
this.maxQueueSize = maxQueueSize;
}

@Override
public Optional<Task<T>> call() throws Exception {
try {
if (iterator == null) {
iterator = input.iterator();
}
while (!closed.get() && iterator.hasNext()) {
if (output.size() >= maxQueueSize) {
// yield
return Optional.of(this);
}
output.add(iterator.next());
}
} catch (Throwable e) {
try {
close();
} catch (IOException closeException) {
// self-suppression is not permitted
// (e and closeException to be the same is unlikely, but possible)
if (closeException != e) {
e.addSuppressed(closeException);
}
}
throw e;
}
close();
return Optional.empty();
}

@Override
public void close() throws Exception {
iterator = null;
if (input instanceof Closeable) {
((Closeable) input).close();
}
}
}
}

0 comments on commit 8abc075

Please sign in to comment.