diff --git a/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java b/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java index 692a470561c0..e5df5ee5f69d 100644 --- a/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java +++ b/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java @@ -86,6 +86,7 @@ static class ParallelIterator implements CloseableIterator { private final CompletableFuture>>[] taskFutures; private final ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue<>(); private final AtomicBoolean closed = new AtomicBoolean(false); + private final int maxQueueSize; private ParallelIterator( Iterable> iterables, ExecutorService workerPool, int maxQueueSize) { @@ -97,6 +98,7 @@ private ParallelIterator( this.workerPool = workerPool; // submit 2 tasks per worker at a time this.taskFutures = new CompletableFuture[2 * ThreadPools.WORKER_THREAD_POOL_SIZE]; + this.maxQueueSize = maxQueueSize; } @Override @@ -153,6 +155,7 @@ private synchronized boolean checkTasks() { try { Optional> continuation = taskFutures[i].get(); continuation.ifPresent(yieldedTasks::addLast); + taskFutures[i] = null; } catch (ExecutionException e) { if (e.getCause() instanceof RuntimeException) { // rethrow a runtime exception @@ -165,7 +168,10 @@ private synchronized boolean checkTasks() { } } - taskFutures[i] = submitNextTask(); + // submit a new task if there is space in the queue + if (queue.size() < maxQueueSize) { + taskFutures[i] = submitNextTask(); + } } if (taskFutures[i] != null) {