Skip to content

Commit

Permalink
Do not submit a task when there is no space in queue
Browse files Browse the repository at this point in the history
  • Loading branch information
sopel39 committed Dec 18, 2024
1 parent bd1dba1 commit c94513b
Showing 1 changed file with 7 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ static class ParallelIterator<T> implements CloseableIterator<T> {
private final CompletableFuture<Optional<Task<T>>>[] taskFutures;
private final ConcurrentLinkedQueue<T> queue = new ConcurrentLinkedQueue<>();
private final AtomicBoolean closed = new AtomicBoolean(false);
private final int maxQueueSize;

private ParallelIterator(
Iterable<? extends Iterable<T>> iterables, ExecutorService workerPool, int maxQueueSize) {
Expand All @@ -97,6 +98,7 @@ private ParallelIterator(
this.workerPool = workerPool;
// submit 2 tasks per worker at a time
this.taskFutures = new CompletableFuture[2 * ThreadPools.WORKER_THREAD_POOL_SIZE];
this.maxQueueSize = maxQueueSize;
}

@Override
Expand Down Expand Up @@ -153,6 +155,7 @@ private synchronized boolean checkTasks() {
try {
Optional<Task<T>> continuation = taskFutures[i].get();
continuation.ifPresent(yieldedTasks::addLast);
taskFutures[i] = null;
} catch (ExecutionException e) {
if (e.getCause() instanceof RuntimeException) {
// rethrow a runtime exception
Expand All @@ -165,7 +168,10 @@ private synchronized boolean checkTasks() {
}
}

taskFutures[i] = submitNextTask();
// submit a new task if there is space in the queue
if (queue.size() < maxQueueSize) {
taskFutures[i] = submitNextTask();
}
}

if (taskFutures[i] != null) {
Expand Down

0 comments on commit c94513b

Please sign in to comment.