From 69673885959942a9e26361463ccbfeba923dae25 Mon Sep 17 00:00:00 2001 From: Piotr Findeisen Date: Fri, 12 Jul 2024 14:34:52 +0200 Subject: [PATCH 01/13] Limit memory used by ParallelIterable ParallelIterable schedules 2 * WORKER_THREAD_POOL_SIZE tasks for processing input iterables. This defaults to 2 * # CPU cores. When one or some of the input iterables are considerable in size and the ParallelIterable consumer is not quick enough, this could result in unbounded allocation inside `ParallelIterator.queue`. This commit bounds the queue. When queue is full, the tasks yield and get removed from the executor. They are resumed when consumer catches up. --- .../apache/iceberg/util/ParallelIterable.java | 142 +++++++++++++----- .../iceberg/util/TestParallelIterable.java | 48 ++++++ 2 files changed, 156 insertions(+), 34 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java b/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java index d7221e7d4545..84c6d9812fb2 100644 --- a/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java +++ b/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java @@ -20,13 +20,17 @@ import java.io.Closeable; import java.io.IOException; +import java.util.ArrayDeque; +import java.util.Deque; import java.util.Iterator; import java.util.NoSuchElementException; +import java.util.Optional; +import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; -import org.apache.iceberg.exceptions.RuntimeIOException; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.iceberg.io.CloseableGroup; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.CloseableIterator; @@ -34,51 +38,50 @@ import org.apache.iceberg.relocated.com.google.common.collect.Iterables; public class ParallelIterable extends CloseableGroup implements CloseableIterable { + + private static final int DEFAULT_MAX_QUEUE_SIZE = 10_000; + private final Iterable> iterables; private final ExecutorService workerPool; + // Bound for number of items in the queue to limit memory consumption + // even in the case when input iterables are large. + private final int approximateMaxQueueSize; + public ParallelIterable(Iterable> iterables, ExecutorService workerPool) { - this.iterables = iterables; - this.workerPool = workerPool; + this(iterables, workerPool, DEFAULT_MAX_QUEUE_SIZE); + } + + public ParallelIterable( + Iterable> iterables, + ExecutorService workerPool, + int approximateMaxQueueSize) { + this.iterables = Preconditions.checkNotNull(iterables, "Input iterables cannot be null"); + this.workerPool = Preconditions.checkNotNull(workerPool, "Worker pool cannot be null"); + this.approximateMaxQueueSize = approximateMaxQueueSize; } @Override public CloseableIterator iterator() { - ParallelIterator iter = new ParallelIterator<>(iterables, workerPool); + ParallelIterator iter = + new ParallelIterator<>(iterables, workerPool, approximateMaxQueueSize); addCloseable(iter); return iter; } private static class ParallelIterator implements CloseableIterator { - private final Iterator tasks; + private final Iterator> tasks; + private final Deque> yieldedTasks = new ArrayDeque<>(); private final ExecutorService workerPool; - private final Future[] taskFutures; + private final Future>>[] taskFutures; private final ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue<>(); - private volatile boolean closed = false; + private final AtomicBoolean closed = new AtomicBoolean(false); private ParallelIterator( - Iterable> iterables, ExecutorService workerPool) { + Iterable> iterables, ExecutorService workerPool, int maxQueueSize) { this.tasks = Iterables.transform( - iterables, - iterable -> - (Runnable) - () -> { - try (Closeable ignored = - (iterable instanceof Closeable) ? (Closeable) iterable : () -> {}) { - for (T item : iterable) { - // exit manually because `ConcurrentLinkedQueue` can't be - // interrupted - if (closed) { - return; - } - - queue.add(item); - } - } catch (IOException e) { - throw new RuntimeIOException(e, "Failed to close iterable"); - } - }) + iterables, iterable -> new Task<>(iterable, queue, closed, maxQueueSize)) .iterator(); this.workerPool = workerPool; // submit 2 tasks per worker at a time @@ -88,7 +91,18 @@ private ParallelIterator( @Override public void close() { // close first, avoid new task submit - this.closed = true; + this.closed.set(true); + + for (Task task : yieldedTasks) { + try { + task.close(); + } catch (Exception e) { + throw new RuntimeException("Close failed", e); + } + } + yieldedTasks.clear(); + + // TODO close input iterables that were not started yet // cancel background tasks for (Future taskFuture : taskFutures) { @@ -113,9 +127,10 @@ private boolean checkTasks() { for (int i = 0; i < taskFutures.length; i += 1) { if (taskFutures[i] == null || taskFutures[i].isDone()) { if (taskFutures[i] != null) { + Optional> continuation; // check for task failure and re-throw any exception try { - taskFutures[i].get(); + continuation = taskFutures[i].get(); } catch (ExecutionException e) { if (e.getCause() instanceof RuntimeException) { // rethrow a runtime exception @@ -126,6 +141,7 @@ private boolean checkTasks() { } catch (InterruptedException e) { throw new RuntimeException("Interrupted while running parallel task", e); } + continuation.ifPresent(yieldedTasks::addLast); } taskFutures[i] = submitNextTask(); @@ -136,19 +152,20 @@ private boolean checkTasks() { } } - return !closed && (tasks.hasNext() || hasRunningTask); + return !closed.get() && (tasks.hasNext() || hasRunningTask); } - private Future submitNextTask() { - if (!closed && tasks.hasNext()) { - return workerPool.submit(tasks.next()); + private Future>> submitNextTask() { + if (!closed.get() && (!yieldedTasks.isEmpty() || tasks.hasNext())) { + return workerPool.submit( + !yieldedTasks.isEmpty() ? yieldedTasks.removeFirst() : tasks.next()); } return null; } @Override public synchronized boolean hasNext() { - Preconditions.checkState(!closed, "Already closed"); + Preconditions.checkState(!closed.get(), "Already closed"); // if the consumer is processing records more slowly than the producers, then this check will // prevent tasks from being submitted. while the producers are running, this will always @@ -192,4 +209,61 @@ public synchronized T next() { return queue.poll(); } } + + private static class Task implements Callable>>, AutoCloseable { + private final Iterable input; + private final ConcurrentLinkedQueue queue; + private final AtomicBoolean closed; + private final int approximateMaxQueueSize; + + private Iterator iterator; + + Task( + Iterable input, + ConcurrentLinkedQueue queue, + AtomicBoolean closed, + int approximateMaxQueueSize) { + this.input = Preconditions.checkNotNull(input, "input cannot be null"); + this.queue = Preconditions.checkNotNull(queue, "queue cannot be null"); + this.closed = Preconditions.checkNotNull(closed, "closed cannot be null"); + this.approximateMaxQueueSize = approximateMaxQueueSize; + } + + @Override + public Optional> call() throws Exception { + try { + if (iterator == null) { + iterator = input.iterator(); + } + while (!closed.get() && iterator.hasNext()) { + if (queue.size() >= approximateMaxQueueSize) { + // yield + return Optional.of(this); + } + queue.add(iterator.next()); + } + } catch (Throwable e) { + try { + close(); + } catch (IOException closeException) { + // self-suppression is not permitted + // (e and closeException to be the same is unlikely, but possible) + if (closeException != e) { + e.addSuppressed(closeException); + } + } + throw e; + } + close(); + return Optional.empty(); + } + + @Override + public void close() throws Exception { + iterator = null; + if (input instanceof Closeable) { + ((Closeable) input).close(); + } + } + } } diff --git a/core/src/test/java/org/apache/iceberg/util/TestParallelIterable.java b/core/src/test/java/org/apache/iceberg/util/TestParallelIterable.java index af9c6ec5212c..4910732f6e35 100644 --- a/core/src/test/java/org/apache/iceberg/util/TestParallelIterable.java +++ b/core/src/test/java/org/apache/iceberg/util/TestParallelIterable.java @@ -24,15 +24,22 @@ import java.lang.reflect.Field; import java.util.Collections; import java.util.Iterator; +import java.util.List; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.stream.IntStream; +import java.util.stream.Stream; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.CloseableIterator; +import org.apache.iceberg.relocated.com.google.common.collect.HashMultiset; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMultiset; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Multiset; import org.awaitility.Awaitility; import org.junit.jupiter.api.Test; @@ -133,6 +140,47 @@ public CloseableIterator iterator() { .untilAsserted(() -> assertThat(queue).as("Queue is not empty after cleaning").isEmpty()); } + @Test + public void limitQueueSize() throws IOException, IllegalAccessException, NoSuchFieldException { + + List> iterables = + ImmutableList.of( + () -> IntStream.range(0, 100).iterator(), + () -> IntStream.range(0, 100).iterator(), + () -> IntStream.range(0, 100).iterator()); + + Multiset expectedValues = + IntStream.range(0, 100) + .boxed() + .flatMap(i -> Stream.of(i, i, i)) + .collect(ImmutableMultiset.toImmutableMultiset()); + + int maxQueueSize = 20; + ExecutorService executor = Executors.newCachedThreadPool(); + ParallelIterable parallelIterable = + new ParallelIterable<>(iterables, executor, maxQueueSize); + CloseableIterator iterator = parallelIterable.iterator(); + Field queueField = iterator.getClass().getDeclaredField("queue"); + queueField.setAccessible(true); + ConcurrentLinkedQueue queue = (ConcurrentLinkedQueue) queueField.get(iterator); + + Multiset actualValues = HashMultiset.create(); + + while (iterator.hasNext()) { + assertThat(queue) + .as("iterator internal queue") + .hasSizeLessThanOrEqualTo(maxQueueSize + iterables.size()); + actualValues.add(iterator.next()); + } + + assertThat(actualValues) + .as("multiset of values returned by the iterator") + .isEqualTo(expectedValues); + + iterator.close(); + executor.shutdownNow(); + } + private void queueHasElements(CloseableIterator iterator, Queue queue) { assertThat(iterator.hasNext()).isTrue(); assertThat(iterator.next()).isNotNull(); From ec4c52f4e92261705cf5c3fd6cc8bd8275d5f9ca Mon Sep 17 00:00:00 2001 From: Piotr Findeisen Date: Fri, 12 Jul 2024 15:23:46 +0200 Subject: [PATCH 02/13] Make code less pretty but satisfy closeMoreDataParallelIteratorWithoutCompleteIteration expectations --- .../java/org/apache/iceberg/util/ParallelIterable.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java b/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java index 84c6d9812fb2..e559632d3115 100644 --- a/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java +++ b/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java @@ -235,12 +235,16 @@ public Optional> call() throws Exception { if (iterator == null) { iterator = input.iterator(); } - while (!closed.get() && iterator.hasNext()) { + while (iterator.hasNext()) { if (queue.size() >= approximateMaxQueueSize) { // yield return Optional.of(this); } - queue.add(iterator.next()); + T next = iterator.next(); + if (closed.get()) { + break; + } + queue.add(next); } } catch (Throwable e) { try { From c561cc4711617064e375631ff9b52ffbb870d460 Mon Sep 17 00:00:00 2001 From: Piotr Findeisen Date: Fri, 12 Jul 2024 18:24:39 +0200 Subject: [PATCH 03/13] reliably attempt close on all yielded tasks --- .../apache/iceberg/util/ParallelIterable.java | 34 +++++++++---------- 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java b/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java index e559632d3115..8b6d2020946b 100644 --- a/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java +++ b/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java @@ -36,6 +36,7 @@ import org.apache.iceberg.io.CloseableIterator; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.relocated.com.google.common.io.Closer; public class ParallelIterable extends CloseableGroup implements CloseableIterable { @@ -93,25 +94,24 @@ public void close() { // close first, avoid new task submit this.closed.set(true); - for (Task task : yieldedTasks) { - try { - task.close(); - } catch (Exception e) { - throw new RuntimeException("Close failed", e); - } - } - yieldedTasks.clear(); + try (Closer closer = Closer.create()) { + yieldedTasks.forEach(closer::register); + yieldedTasks.clear(); - // TODO close input iterables that were not started yet + // TODO close input iterables that were not started yet - // cancel background tasks - for (Future taskFuture : taskFutures) { - if (taskFuture != null && !taskFuture.isDone()) { - taskFuture.cancel(true); + // cancel background tasks + for (Future taskFuture : taskFutures) { + if (taskFuture != null && !taskFuture.isDone()) { + taskFuture.cancel(true); + } } + + // clean queue + this.queue.clear(); + } catch (IOException e) { + throw new RuntimeException("Close failed", e); } - // clean queue - this.queue.clear(); } /** @@ -210,7 +210,7 @@ public synchronized T next() { } } - private static class Task implements Callable>>, AutoCloseable { + private static class Task implements Callable>>, Closeable { private final Iterable input; private final ConcurrentLinkedQueue queue; private final AtomicBoolean closed; @@ -263,7 +263,7 @@ public Optional> call() throws Exception { } @Override - public void close() throws Exception { + public void close() throws IOException { iterator = null; if (input instanceof Closeable) { ((Closeable) input).close(); From 84de5fc3b4c067c0aee04760afc18140385905a4 Mon Sep 17 00:00:00 2001 From: Piotr Findeisen Date: Mon, 15 Jul 2024 13:05:32 +0200 Subject: [PATCH 04/13] styles --- .../apache/iceberg/util/ParallelIterable.java | 24 +++++++++++++------ 1 file changed, 17 insertions(+), 7 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java b/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java index 8b6d2020946b..299bef2f2492 100644 --- a/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java +++ b/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java @@ -127,10 +127,10 @@ private boolean checkTasks() { for (int i = 0; i < taskFutures.length; i += 1) { if (taskFutures[i] == null || taskFutures[i].isDone()) { if (taskFutures[i] != null) { - Optional> continuation; // check for task failure and re-throw any exception try { - continuation = taskFutures[i].get(); + Optional> continuation = taskFutures[i].get(); + continuation.ifPresent(yieldedTasks::addLast); } catch (ExecutionException e) { if (e.getCause() instanceof RuntimeException) { // rethrow a runtime exception @@ -141,7 +141,6 @@ private boolean checkTasks() { } catch (InterruptedException e) { throw new RuntimeException("Interrupted while running parallel task", e); } - continuation.ifPresent(yieldedTasks::addLast); } taskFutures[i] = submitNextTask(); @@ -156,9 +155,14 @@ private boolean checkTasks() { } private Future>> submitNextTask() { - if (!closed.get() && (!yieldedTasks.isEmpty() || tasks.hasNext())) { - return workerPool.submit( - !yieldedTasks.isEmpty() ? yieldedTasks.removeFirst() : tasks.next()); + if (!closed.get()) { + if (!yieldedTasks.isEmpty()) { + return workerPool.submit(yieldedTasks.removeFirst()); + } + + if (tasks.hasNext()) { + return workerPool.submit(tasks.next()); + } } return null; } @@ -235,15 +239,19 @@ public Optional> call() throws Exception { if (iterator == null) { iterator = input.iterator(); } + while (iterator.hasNext()) { if (queue.size() >= approximateMaxQueueSize) { - // yield + // Yield. Stop execution so that the task can be resubmitted, in which case call() will + // be called again. return Optional.of(this); } + T next = iterator.next(); if (closed.get()) { break; } + queue.add(next); } } catch (Throwable e) { @@ -259,6 +267,8 @@ public Optional> call() throws Exception { throw e; } close(); + // The task is complete. Returning empty means there is no continuation that should be + // executed. return Optional.empty(); } From fc1c73473a5052bc33c7ab885efcd332079c1cda Mon Sep 17 00:00:00 2001 From: Piotr Findeisen Date: Tue, 16 Jul 2024 09:02:57 +0200 Subject: [PATCH 05/13] chain IFs despite return --- .../main/java/org/apache/iceberg/util/ParallelIterable.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java b/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java index 299bef2f2492..a2a56408b52f 100644 --- a/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java +++ b/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java @@ -158,9 +158,7 @@ private Future>> submitNextTask() { if (!closed.get()) { if (!yieldedTasks.isEmpty()) { return workerPool.submit(yieldedTasks.removeFirst()); - } - - if (tasks.hasNext()) { + } else if (tasks.hasNext()) { return workerPool.submit(tasks.next()); } } From 984473d0a950e71a2c34f77cc0dd772d63485d87 Mon Sep 17 00:00:00 2001 From: Piotr Findeisen Date: Thu, 18 Jul 2024 11:17:30 +0200 Subject: [PATCH 06/13] fmt --- .../main/java/org/apache/iceberg/util/ParallelIterable.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java b/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java index a2a56408b52f..b571af4161c8 100644 --- a/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java +++ b/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java @@ -218,7 +218,7 @@ private static class Task implements Callable>>, Closeable { private final AtomicBoolean closed; private final int approximateMaxQueueSize; - private Iterator iterator; + private Iterator iterator = null; Task( Iterable input, @@ -262,8 +262,10 @@ public Optional> call() throws Exception { e.addSuppressed(closeException); } } + throw e; } + close(); // The task is complete. Returning empty means there is no continuation that should be // executed. From 61028023cd962953ae5397a7abff57a64ecf09ec Mon Sep 17 00:00:00 2001 From: Piotr Findeisen Date: Thu, 18 Jul 2024 11:32:37 +0200 Subject: [PATCH 07/13] drop todo --- .../src/main/java/org/apache/iceberg/util/ParallelIterable.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java b/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java index b571af4161c8..190369cc22d5 100644 --- a/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java +++ b/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java @@ -98,8 +98,6 @@ public void close() { yieldedTasks.forEach(closer::register); yieldedTasks.clear(); - // TODO close input iterables that were not started yet - // cancel background tasks for (Future taskFuture : taskFutures) { if (taskFuture != null && !taskFuture.isDone()) { From 3a0d6ca5245496521671ee20ca5f88d9da818734 Mon Sep 17 00:00:00 2001 From: Piotr Findeisen Date: Thu, 18 Jul 2024 12:14:04 +0200 Subject: [PATCH 08/13] fix resource leak in close() also fix missing sync around yieldedTasks --- .../apache/iceberg/util/ParallelIterable.java | 55 +++++++++++++------ 1 file changed, 39 insertions(+), 16 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java b/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java index 190369cc22d5..ea50c4081921 100644 --- a/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java +++ b/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java @@ -20,26 +20,31 @@ import java.io.Closeable; import java.io.IOException; +import java.io.UncheckedIOException; import java.util.ArrayDeque; import java.util.Deque; import java.util.Iterator; import java.util.NoSuchElementException; import java.util.Optional; -import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Supplier; import org.apache.iceberg.io.CloseableGroup; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.CloseableIterator; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.io.Closer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class ParallelIterable extends CloseableGroup implements CloseableIterable { + private static final Logger LOG = LoggerFactory.getLogger(ParallelIterable.class); + private static final int DEFAULT_MAX_QUEUE_SIZE = 10_000; private final Iterable> iterables; @@ -74,7 +79,7 @@ private static class ParallelIterator implements CloseableIterator { private final Iterator> tasks; private final Deque> yieldedTasks = new ArrayDeque<>(); private final ExecutorService workerPool; - private final Future>>[] taskFutures; + private final CompletableFuture>>[] taskFutures; private final ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue<>(); private final AtomicBoolean closed = new AtomicBoolean(false); @@ -86,7 +91,7 @@ private ParallelIterator( .iterator(); this.workerPool = workerPool; // submit 2 tasks per worker at a time - this.taskFutures = new Future[2 * ThreadPools.WORKER_THREAD_POOL_SIZE]; + this.taskFutures = new CompletableFuture[2 * ThreadPools.WORKER_THREAD_POOL_SIZE]; } @Override @@ -95,13 +100,25 @@ public void close() { this.closed.set(true); try (Closer closer = Closer.create()) { - yieldedTasks.forEach(closer::register); - yieldedTasks.clear(); + synchronized (this) { + yieldedTasks.forEach(closer::register); + yieldedTasks.clear(); + } - // cancel background tasks - for (Future taskFuture : taskFutures) { - if (taskFuture != null && !taskFuture.isDone()) { + // cancel background tasks and close continuations if any + for (CompletableFuture>> taskFuture : taskFutures) { + if (taskFuture != null) { taskFuture.cancel(true); + taskFuture.thenAccept( + continuation -> { + if (continuation.isPresent()) { + try { + continuation.get().close(); + } catch (IOException e) { + LOG.error("Task close failed", e); + } + } + }); } } @@ -119,7 +136,8 @@ public void close() { * * @return true if there are pending tasks, false otherwise */ - private boolean checkTasks() { + private synchronized boolean checkTasks() { + Preconditions.checkState(!closed.get(), "Already closed"); boolean hasRunningTask = false; for (int i = 0; i < taskFutures.length; i += 1) { @@ -152,12 +170,12 @@ private boolean checkTasks() { return !closed.get() && (tasks.hasNext() || hasRunningTask); } - private Future>> submitNextTask() { + private CompletableFuture>> submitNextTask() { if (!closed.get()) { if (!yieldedTasks.isEmpty()) { - return workerPool.submit(yieldedTasks.removeFirst()); + return CompletableFuture.supplyAsync(yieldedTasks.removeFirst(), workerPool); } else if (tasks.hasNext()) { - return workerPool.submit(tasks.next()); + return CompletableFuture.supplyAsync(tasks.next(), workerPool); } } return null; @@ -210,7 +228,7 @@ public synchronized T next() { } } - private static class Task implements Callable>>, Closeable { + private static class Task implements Supplier>>, Closeable { private final Iterable input; private final ConcurrentLinkedQueue queue; private final AtomicBoolean closed; @@ -230,7 +248,7 @@ private static class Task implements Callable>>, Closeable { } @Override - public Optional> call() throws Exception { + public Optional> get() { try { if (iterator == null) { iterator = input.iterator(); @@ -264,7 +282,12 @@ public Optional> call() throws Exception { throw e; } - close(); + try { + close(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + // The task is complete. Returning empty means there is no continuation that should be // executed. return Optional.empty(); From 4ab1c01002bf82639129bd218ae2722b86944549 Mon Sep 17 00:00:00 2001 From: Piotr Findeisen Date: Thu, 18 Jul 2024 12:26:15 +0200 Subject: [PATCH 09/13] Resume continuations half-way thru the queue. --- .../apache/iceberg/util/ParallelIterable.java | 21 ++++++++++--------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java b/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java index ea50c4081921..7cf92e9461f6 100644 --- a/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java +++ b/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java @@ -81,6 +81,7 @@ private static class ParallelIterator implements CloseableIterator { private final ExecutorService workerPool; private final CompletableFuture>>[] taskFutures; private final ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue<>(); + private final int maxQueueSize; private final AtomicBoolean closed = new AtomicBoolean(false); private ParallelIterator( @@ -90,6 +91,7 @@ private ParallelIterator( iterables, iterable -> new Task<>(iterable, queue, closed, maxQueueSize)) .iterator(); this.workerPool = workerPool; + this.maxQueueSize = maxQueueSize; // submit 2 tasks per worker at a time this.taskFutures = new CompletableFuture[2 * ThreadPools.WORKER_THREAD_POOL_SIZE]; } @@ -185,16 +187,15 @@ private CompletableFuture>> submitNextTask() { public synchronized boolean hasNext() { Preconditions.checkState(!closed.get(), "Already closed"); - // if the consumer is processing records more slowly than the producers, then this check will - // prevent tasks from being submitted. while the producers are running, this will always - // return here before running checkTasks. when enough of the tasks are finished that the - // consumer catches up, then lots of new tasks will be submitted at once. this behavior is - // okay because it ensures that records are not stacking up waiting to be consumed and taking - // up memory. - // - // consumers that process results quickly will periodically exhaust the queue and submit new - // tasks when checkTasks runs. fast consumers should not be delayed. - if (!queue.isEmpty()) { + // If the consumer is processing records more slowly than the producers, the producers will + // eventually fill the queue and yield, returning continuations. Continuations and new tasks + // are started by checkTasks(). The check here prevents us from restarting continuations or + // starting new tasks too early (when queue is almost full) or too late (when queue is already + // emptied). Restarting too early would lead to tasks yielding very quickly (CPU waste on + // scheduling). Restarting too late would mean the consumer may need to wait for the tasks + // to produce new items. A consumer slower than producers shouldn't need to wait. + int queueLowWaterMark = maxQueueSize / 2; + if (queue.size() > queueLowWaterMark) { return true; } From fd0a261a416d110548122c2072d470f9a87f1f9b Mon Sep 17 00:00:00 2001 From: Piotr Findeisen Date: Thu, 18 Jul 2024 14:08:57 +0200 Subject: [PATCH 10/13] empty: retrigger the CI From 62584552da37a1708b1a6f432bec3be1685aa4dd Mon Sep 17 00:00:00 2001 From: Piotr Findeisen Date: Sat, 20 Jul 2024 15:12:34 +0200 Subject: [PATCH 11/13] new round of changes --- .../org/apache/iceberg/util/ParallelIterable.java | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java b/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java index 7cf92e9461f6..0d1d0854af12 100644 --- a/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java +++ b/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java @@ -45,7 +45,7 @@ public class ParallelIterable extends CloseableGroup implements CloseableIter private static final Logger LOG = LoggerFactory.getLogger(ParallelIterable.class); - private static final int DEFAULT_MAX_QUEUE_SIZE = 10_000; + private static final int DEFAULT_MAX_QUEUE_SIZE = 30_000; private final Iterable> iterables; private final ExecutorService workerPool; @@ -127,7 +127,7 @@ public void close() { // clean queue this.queue.clear(); } catch (IOException e) { - throw new RuntimeException("Close failed", e); + throw new UncheckedIOException("Close failed", e); } } @@ -145,7 +145,7 @@ private synchronized boolean checkTasks() { for (int i = 0; i < taskFutures.length; i += 1) { if (taskFutures[i] == null || taskFutures[i].isDone()) { if (taskFutures[i] != null) { - // check for task failure and re-throw any exception + // check for task failure and re-throw any exception. Enqueue continuation if any. try { Optional> continuation = taskFutures[i].get(); continuation.ifPresent(yieldedTasks::addLast); @@ -257,8 +257,7 @@ public Optional> get() { while (iterator.hasNext()) { if (queue.size() >= approximateMaxQueueSize) { - // Yield. Stop execution so that the task can be resubmitted, in which case call() will - // be called again. + // Yield when queue is over the size limit. Task will be resubmitted later and continue the work. return Optional.of(this); } @@ -286,7 +285,7 @@ public Optional> get() { try { close(); } catch (IOException e) { - throw new UncheckedIOException(e); + throw new UncheckedIOException("Close failed", e); } // The task is complete. Returning empty means there is no continuation that should be From b716bfc6695d2290b05b0038f3c62712d77b48cb Mon Sep 17 00:00:00 2001 From: Piotr Findeisen Date: Sat, 20 Jul 2024 15:17:22 +0200 Subject: [PATCH 12/13] add comment for default value --- .../src/main/java/org/apache/iceberg/util/ParallelIterable.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java b/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java index 0d1d0854af12..e72b864c723f 100644 --- a/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java +++ b/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java @@ -45,6 +45,8 @@ public class ParallelIterable extends CloseableGroup implements CloseableIter private static final Logger LOG = LoggerFactory.getLogger(ParallelIterable.class); + // Logic behind default value: ParallelIterable is often used for file planning. + // Assuming that a DataFile or DeleteFile is about 500 bytes, a 30k limit uses 14.3 MB of memory. private static final int DEFAULT_MAX_QUEUE_SIZE = 30_000; private final Iterable> iterables; From 1ffdf3a378f16e1ba5fb1edac835969867f0a8b4 Mon Sep 17 00:00:00 2001 From: Piotr Findeisen Date: Mon, 22 Jul 2024 13:40:00 +0200 Subject: [PATCH 13/13] fix fmt --- .../main/java/org/apache/iceberg/util/ParallelIterable.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java b/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java index e72b864c723f..6486bd7fd483 100644 --- a/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java +++ b/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java @@ -259,7 +259,8 @@ public Optional> get() { while (iterator.hasNext()) { if (queue.size() >= approximateMaxQueueSize) { - // Yield when queue is over the size limit. Task will be resubmitted later and continue the work. + // Yield when queue is over the size limit. Task will be resubmitted later and continue + // the work. return Optional.of(this); }