From f1076494c807447484b5a154b242611e98e3b582 Mon Sep 17 00:00:00 2001 From: Piotr Findeisen Date: Thu, 22 Aug 2024 00:06:05 +0200 Subject: [PATCH] Check for minimal queue size in ParallelIterable (#10977) --- .../apache/iceberg/util/ParallelIterable.java | 1 + .../iceberg/util/TestParallelIterable.java | 35 +++++++++++++++++++ 2 files changed, 36 insertions(+) diff --git a/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java b/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java index 40bdf1e0c4f8..f86e5ddc5a8a 100644 --- a/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java +++ b/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java @@ -95,6 +95,7 @@ private ParallelIterator( iterables, iterable -> new Task<>(iterable, queue, closed, maxQueueSize)) .iterator(); this.workerPool = workerPool; + Preconditions.checkArgument(maxQueueSize > 0, "Max queue size must be greater than 0"); this.maxQueueSize = maxQueueSize; // submit 2 tasks per worker at a time this.taskFutures = new CompletableFuture[2 * ThreadPools.WORKER_THREAD_POOL_SIZE]; diff --git a/core/src/test/java/org/apache/iceberg/util/TestParallelIterable.java b/core/src/test/java/org/apache/iceberg/util/TestParallelIterable.java index ee16f3c1fb5a..5e37e0390db9 100644 --- a/core/src/test/java/org/apache/iceberg/util/TestParallelIterable.java +++ b/core/src/test/java/org/apache/iceberg/util/TestParallelIterable.java @@ -170,6 +170,41 @@ public void limitQueueSize() { executor.shutdownNow(); } + @Test + public void queueSizeOne() { + List> iterables = + ImmutableList.of( + () -> IntStream.range(0, 100).iterator(), + () -> IntStream.range(0, 100).iterator(), + () -> IntStream.range(0, 100).iterator()); + + Multiset expectedValues = + IntStream.range(0, 100) + .boxed() + .flatMap(i -> Stream.of(i, i, i)) + .collect(ImmutableMultiset.toImmutableMultiset()); + + ExecutorService executor = Executors.newCachedThreadPool(); + ParallelIterable parallelIterable = new ParallelIterable<>(iterables, executor, 1); + ParallelIterator iterator = (ParallelIterator) parallelIterable.iterator(); + + Multiset actualValues = HashMultiset.create(); + + while (iterator.hasNext()) { + assertThat(iterator.queueSize()) + .as("iterator internal queue size") + .isLessThanOrEqualTo(1 + iterables.size()); + actualValues.add(iterator.next()); + } + + assertThat(actualValues) + .as("multiset of values returned by the iterator") + .isEqualTo(expectedValues); + + iterator.close(); + executor.shutdownNow(); + } + private void queueHasElements(ParallelIterator iterator) { assertThat(iterator.hasNext()).isTrue(); assertThat(iterator.next()).isNotNull();