From 8abc075ce10c19f6d66faf8c2ae5eab87d7c52fd Mon Sep 17 00:00:00 2001 From: Piotr Findeisen Date: Fri, 12 Jul 2024 14:34:52 +0200 Subject: [PATCH] Limit memory used by ParallelIterable ParallelIterable schedules 2 * WORKER_THREAD_POOL_SIZE tasks for processing input iterables. This defaults to 2 * # CPU cores. When one or some of the input iterables are considerable in size and the ParallelIterable consumer is not quick enough, this could result in unbounded allocation inside `ParallelIterator.queue`. This commit bounds the queue. When queue is full, the tasks yield and get removed from the executor. They are resumed when consumer catches up. --- .../apache/iceberg/util/ParallelIterable.java | 123 +++++++++++++----- 1 file changed, 93 insertions(+), 30 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java b/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java index d7221e7d4545..6a7932524fac 100644 --- a/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java +++ b/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java @@ -20,13 +20,18 @@ import java.io.Closeable; import java.io.IOException; +import java.util.ArrayDeque; +import java.util.Deque; import java.util.Iterator; import java.util.NoSuchElementException; +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; -import org.apache.iceberg.exceptions.RuntimeIOException; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.iceberg.io.CloseableGroup; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.CloseableIterator; @@ -50,35 +55,22 @@ public CloseableIterator iterator() { } private static class ParallelIterator implements CloseableIterator { - private final Iterator tasks; + // Bound for number of items in the queue to limit memory consumption + // even in the case when input iterables are + private static final int MAX_QUEUE_SIZE = 10_000; + + private final Iterator> tasks; + private final Deque> yieldedTasks = new ArrayDeque<>(); private final ExecutorService workerPool; - private final Future[] taskFutures; + private final Future>>[] taskFutures; private final ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue<>(); - private volatile boolean closed = false; + private final AtomicBoolean closed = new AtomicBoolean(false); private ParallelIterator( Iterable> iterables, ExecutorService workerPool) { this.tasks = Iterables.transform( - iterables, - iterable -> - (Runnable) - () -> { - try (Closeable ignored = - (iterable instanceof Closeable) ? (Closeable) iterable : () -> {}) { - for (T item : iterable) { - // exit manually because `ConcurrentLinkedQueue` can't be - // interrupted - if (closed) { - return; - } - - queue.add(item); - } - } catch (IOException e) { - throw new RuntimeIOException(e, "Failed to close iterable"); - } - }) + iterables, iterable -> new Task<>(iterable, queue, closed, MAX_QUEUE_SIZE)) .iterator(); this.workerPool = workerPool; // submit 2 tasks per worker at a time @@ -88,7 +80,18 @@ private ParallelIterator( @Override public void close() { // close first, avoid new task submit - this.closed = true; + this.closed.set(true); + + for (Task task : yieldedTasks) { + try { + task.close(); + } catch (Exception e) { + throw new RuntimeException("Close failed", e); + } + } + yieldedTasks.clear(); + + // TODO close input iterables that were not started yet // cancel background tasks for (Future taskFuture : taskFutures) { @@ -113,9 +116,10 @@ private boolean checkTasks() { for (int i = 0; i < taskFutures.length; i += 1) { if (taskFutures[i] == null || taskFutures[i].isDone()) { if (taskFutures[i] != null) { + Optional> continuation; // check for task failure and re-throw any exception try { - taskFutures[i].get(); + continuation = taskFutures[i].get(); } catch (ExecutionException e) { if (e.getCause() instanceof RuntimeException) { // rethrow a runtime exception @@ -126,6 +130,7 @@ private boolean checkTasks() { } catch (InterruptedException e) { throw new RuntimeException("Interrupted while running parallel task", e); } + continuation.ifPresent(yieldedTasks::addLast); } taskFutures[i] = submitNextTask(); @@ -136,19 +141,20 @@ private boolean checkTasks() { } } - return !closed && (tasks.hasNext() || hasRunningTask); + return !closed.get() && (tasks.hasNext() || hasRunningTask); } - private Future submitNextTask() { - if (!closed && tasks.hasNext()) { - return workerPool.submit(tasks.next()); + private Future>> submitNextTask() { + if (!closed.get() && (!yieldedTasks.isEmpty() || tasks.hasNext())) { + return workerPool.submit( + !yieldedTasks.isEmpty() ? yieldedTasks.removeFirst() : tasks.next()); } return null; } @Override public synchronized boolean hasNext() { - Preconditions.checkState(!closed, "Already closed"); + Preconditions.checkState(!closed.get(), "Already closed"); // if the consumer is processing records more slowly than the producers, then this check will // prevent tasks from being submitted. while the producers are running, this will always @@ -192,4 +198,61 @@ public synchronized T next() { return queue.poll(); } } + + private static class Task implements Callable>>, AutoCloseable { + private final Iterable input; + private final ConcurrentLinkedQueue output; + private final AtomicBoolean closed; + private final int maxQueueSize; + + private Iterator iterator; + + public Task( + Iterable input, + ConcurrentLinkedQueue output, + AtomicBoolean closed, + int maxQueueSize) { + this.input = Objects.requireNonNull(input, "input is null"); + this.output = Objects.requireNonNull(output, "output is null"); + this.closed = Objects.requireNonNull(closed, "closed is null"); + this.maxQueueSize = maxQueueSize; + } + + @Override + public Optional> call() throws Exception { + try { + if (iterator == null) { + iterator = input.iterator(); + } + while (!closed.get() && iterator.hasNext()) { + if (output.size() >= maxQueueSize) { + // yield + return Optional.of(this); + } + output.add(iterator.next()); + } + } catch (Throwable e) { + try { + close(); + } catch (IOException closeException) { + // self-suppression is not permitted + // (e and closeException to be the same is unlikely, but possible) + if (closeException != e) { + e.addSuppressed(closeException); + } + } + throw e; + } + close(); + return Optional.empty(); + } + + @Override + public void close() throws Exception { + iterator = null; + if (input instanceof Closeable) { + ((Closeable) input).close(); + } + } + } }