Skip to content

Commit

Permalink
Check for minimal queue size in ParallelIterable (#10977)
Browse files Browse the repository at this point in the history
  • Loading branch information
findepi authored Aug 21, 2024
1 parent bcb3281 commit f107649
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ private ParallelIterator(
iterables, iterable -> new Task<>(iterable, queue, closed, maxQueueSize))
.iterator();
this.workerPool = workerPool;
Preconditions.checkArgument(maxQueueSize > 0, "Max queue size must be greater than 0");
this.maxQueueSize = maxQueueSize;
// submit 2 tasks per worker at a time
this.taskFutures = new CompletableFuture[2 * ThreadPools.WORKER_THREAD_POOL_SIZE];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,41 @@ public void limitQueueSize() {
executor.shutdownNow();
}

@Test
public void queueSizeOne() {
List<Iterable<Integer>> iterables =
ImmutableList.of(
() -> IntStream.range(0, 100).iterator(),
() -> IntStream.range(0, 100).iterator(),
() -> IntStream.range(0, 100).iterator());

Multiset<Integer> expectedValues =
IntStream.range(0, 100)
.boxed()
.flatMap(i -> Stream.of(i, i, i))
.collect(ImmutableMultiset.toImmutableMultiset());

ExecutorService executor = Executors.newCachedThreadPool();
ParallelIterable<Integer> parallelIterable = new ParallelIterable<>(iterables, executor, 1);
ParallelIterator<Integer> iterator = (ParallelIterator<Integer>) parallelIterable.iterator();

Multiset<Integer> actualValues = HashMultiset.create();

while (iterator.hasNext()) {
assertThat(iterator.queueSize())
.as("iterator internal queue size")
.isLessThanOrEqualTo(1 + iterables.size());
actualValues.add(iterator.next());
}

assertThat(actualValues)
.as("multiset of values returned by the iterator")
.isEqualTo(expectedValues);

iterator.close();
executor.shutdownNow();
}

private void queueHasElements(ParallelIterator<Integer> iterator) {
assertThat(iterator.hasNext()).isTrue();
assertThat(iterator.next()).isNotNull();
Expand Down

0 comments on commit f107649

Please sign in to comment.