Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Core: Limit memory used by ParallelIterable #10691

Merged
merged 13 commits into from
Jul 22, 2024
218 changes: 164 additions & 54 deletions core/src/main/java/org/apache/iceberg/util/ParallelIterable.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,84 +20,115 @@

import java.io.Closeable;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import org.apache.iceberg.exceptions.RuntimeIOException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import org.apache.iceberg.io.CloseableGroup;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.io.Closer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ParallelIterable<T> extends CloseableGroup implements CloseableIterable<T> {

private static final Logger LOG = LoggerFactory.getLogger(ParallelIterable.class);

private static final int DEFAULT_MAX_QUEUE_SIZE = 10_000;
findepi marked this conversation as resolved.
Show resolved Hide resolved

private final Iterable<? extends Iterable<T>> iterables;
private final ExecutorService workerPool;

// Bound for number of items in the queue to limit memory consumption
// even in the case when input iterables are large.
private final int approximateMaxQueueSize;

public ParallelIterable(Iterable<? extends Iterable<T>> iterables, ExecutorService workerPool) {
this.iterables = iterables;
this.workerPool = workerPool;
this(iterables, workerPool, DEFAULT_MAX_QUEUE_SIZE);
}

public ParallelIterable(
Iterable<? extends Iterable<T>> iterables,
ExecutorService workerPool,
int approximateMaxQueueSize) {
this.iterables = Preconditions.checkNotNull(iterables, "Input iterables cannot be null");
this.workerPool = Preconditions.checkNotNull(workerPool, "Worker pool cannot be null");
this.approximateMaxQueueSize = approximateMaxQueueSize;
}

@Override
public CloseableIterator<T> iterator() {
ParallelIterator<T> iter = new ParallelIterator<>(iterables, workerPool);
ParallelIterator<T> iter =
new ParallelIterator<>(iterables, workerPool, approximateMaxQueueSize);
addCloseable(iter);
return iter;
}

private static class ParallelIterator<T> implements CloseableIterator<T> {
private final Iterator<Runnable> tasks;
private final Iterator<Task<T>> tasks;
private final Deque<Task<T>> yieldedTasks = new ArrayDeque<>();
findepi marked this conversation as resolved.
Show resolved Hide resolved
private final ExecutorService workerPool;
private final Future<?>[] taskFutures;
private final CompletableFuture<Optional<Task<T>>>[] taskFutures;
private final ConcurrentLinkedQueue<T> queue = new ConcurrentLinkedQueue<>();
private volatile boolean closed = false;
private final int maxQueueSize;
private final AtomicBoolean closed = new AtomicBoolean(false);

private ParallelIterator(
Iterable<? extends Iterable<T>> iterables, ExecutorService workerPool) {
Iterable<? extends Iterable<T>> iterables, ExecutorService workerPool, int maxQueueSize) {
this.tasks =
Iterables.transform(
iterables,
iterable ->
(Runnable)
() -> {
try (Closeable ignored =
(iterable instanceof Closeable) ? (Closeable) iterable : () -> {}) {
for (T item : iterable) {
// exit manually because `ConcurrentLinkedQueue` can't be
// interrupted
if (closed) {
return;
}

queue.add(item);
}
} catch (IOException e) {
throw new RuntimeIOException(e, "Failed to close iterable");
}
})
iterables, iterable -> new Task<>(iterable, queue, closed, maxQueueSize))
.iterator();
this.workerPool = workerPool;
this.maxQueueSize = maxQueueSize;
// submit 2 tasks per worker at a time
this.taskFutures = new Future[2 * ThreadPools.WORKER_THREAD_POOL_SIZE];
this.taskFutures = new CompletableFuture[2 * ThreadPools.WORKER_THREAD_POOL_SIZE];
}

@Override
public void close() {
// close first, avoid new task submit
this.closed = true;
this.closed.set(true);

// cancel background tasks
for (Future<?> taskFuture : taskFutures) {
if (taskFuture != null && !taskFuture.isDone()) {
taskFuture.cancel(true);
try (Closer closer = Closer.create()) {
synchronized (this) {
yieldedTasks.forEach(closer::register);
yieldedTasks.clear();
}

// cancel background tasks and close continuations if any
for (CompletableFuture<Optional<Task<T>>> taskFuture : taskFutures) {
if (taskFuture != null) {
taskFuture.cancel(true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The mayInterruptIfRunning flag has no effect in CompletableFuture, so I guess it's possible the thread cannot be released here?

taskFuture.thenAccept(
continuation -> {
if (continuation.isPresent()) {
try {
continuation.get().close();
} catch (IOException e) {
LOG.error("Task close failed", e);
}
}
});
}
}

// clean queue
this.queue.clear();
} catch (IOException e) {
throw new RuntimeException("Close failed", e);
findepi marked this conversation as resolved.
Show resolved Hide resolved
}
// clean queue
this.queue.clear();
}

/**
Expand All @@ -107,15 +138,17 @@ public void close() {
*
* @return true if there are pending tasks, false otherwise
*/
private boolean checkTasks() {
private synchronized boolean checkTasks() {
Preconditions.checkState(!closed.get(), "Already closed");
boolean hasRunningTask = false;

for (int i = 0; i < taskFutures.length; i += 1) {
if (taskFutures[i] == null || taskFutures[i].isDone()) {
if (taskFutures[i] != null) {
// check for task failure and re-throw any exception
findepi marked this conversation as resolved.
Show resolved Hide resolved
try {
taskFutures[i].get();
Optional<Task<T>> continuation = taskFutures[i].get();
continuation.ifPresent(yieldedTasks::addLast);
} catch (ExecutionException e) {
if (e.getCause() instanceof RuntimeException) {
// rethrow a runtime exception
Expand All @@ -136,30 +169,33 @@ private boolean checkTasks() {
}
}

return !closed && (tasks.hasNext() || hasRunningTask);
return !closed.get() && (tasks.hasNext() || hasRunningTask);
}

private Future<?> submitNextTask() {
if (!closed && tasks.hasNext()) {
return workerPool.submit(tasks.next());
private CompletableFuture<Optional<Task<T>>> submitNextTask() {
if (!closed.get()) {
if (!yieldedTasks.isEmpty()) {
return CompletableFuture.supplyAsync(yieldedTasks.removeFirst(), workerPool);
} else if (tasks.hasNext()) {
return CompletableFuture.supplyAsync(tasks.next(), workerPool);
}
}
return null;
}

@Override
public synchronized boolean hasNext() {
Preconditions.checkState(!closed, "Already closed");

// if the consumer is processing records more slowly than the producers, then this check will
// prevent tasks from being submitted. while the producers are running, this will always
// return here before running checkTasks. when enough of the tasks are finished that the
// consumer catches up, then lots of new tasks will be submitted at once. this behavior is
// okay because it ensures that records are not stacking up waiting to be consumed and taking
// up memory.
//
// consumers that process results quickly will periodically exhaust the queue and submit new
// tasks when checkTasks runs. fast consumers should not be delayed.
if (!queue.isEmpty()) {
Preconditions.checkState(!closed.get(), "Already closed");

// If the consumer is processing records more slowly than the producers, the producers will
// eventually fill the queue and yield, returning continuations. Continuations and new tasks
// are started by checkTasks(). The check here prevents us from restarting continuations or
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The check here prevents us from restarting continuations or starting new tasks too early (when queue is almost full) or too late (when queue is already emptied).

I thought the if check here only prevents restarting continuations too early by skipping the checkTasks later in the method?

To prevent resuming too late, we need to reduce the Thread.sleep(10)?

I feel bulk this comment section might be better to move to method Javadoc. add a comment right before the if check to preventing resuming too early?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think there is a lot we can change about this code. I don't like the fact that method called "check tasks" has side-effect of scheduling new tasks. I also don't like the fact we do thread sleep. At minimum, the sleep should be interrupted when new item is put into the queue.
however, i wanted to avoid doing too many changes in single PR. I thought it's generally discouraged.
@stevenzwu would be OK to consider the sleep(10) to be pre-existing and not address now? Or do you think this sleep, when combined with changes introduced in this PR, has overall more negative effect that it used to be?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry for the confusion. I didn't mean to change the sleep. I was mainly talking about the code comment, which seems mainly about "resume too early"

// starting new tasks too early (when queue is almost full) or too late (when queue is already
// emptied). Restarting too early would lead to tasks yielding very quickly (CPU waste on
// scheduling). Restarting too late would mean the consumer may need to wait for the tasks
// to produce new items. A consumer slower than producers shouldn't need to wait.
int queueLowWaterMark = maxQueueSize / 2;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this a local variable? if it is a constant it could be a final field.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a variable, so it has a name, which is aimed at help understanding what's going on.
maxQueueSize is a configuration parameter. queueLowWaterMark can indeed be a field, i am not convinced it would increase readability, because the usage and the value formulae would be separated.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think either way doesn't affect readability. but since it is static, it might be a little bit to set in the constructor as a final field.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

technically, it's not static. this is a constructor parameter. the static field serves as a default.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

static is probably the wrong world. it is a constant for the object and could be a class field just like maxQueueSize

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

re this being class static field -- currently this is instance parameter, the constant serves only as a default
re this being class non-static (instance) field -- i replied to this in #10691 (comment). let me know what you think about this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@findepi, I think by "static" Steven meant "does not change" rather than Java's static. This is minor, but we don't typically want to recalculate a value in every method call if it can be set when the instance is created. The name isn't the problem, it is that this doesn't need to be recalculated every call.

if (queue.size() > queueLowWaterMark) {
return true;
}

Expand Down Expand Up @@ -192,4 +228,78 @@ public synchronized T next() {
return queue.poll();
}
}

private static class Task<T> implements Supplier<Optional<Task<T>>>, Closeable {
private final Iterable<T> input;
private final ConcurrentLinkedQueue<T> queue;
private final AtomicBoolean closed;
private final int approximateMaxQueueSize;

private Iterator<T> iterator = null;

Task(
Iterable<T> input,
ConcurrentLinkedQueue<T> queue,
AtomicBoolean closed,
int approximateMaxQueueSize) {
this.input = Preconditions.checkNotNull(input, "input cannot be null");
this.queue = Preconditions.checkNotNull(queue, "queue cannot be null");
this.closed = Preconditions.checkNotNull(closed, "closed cannot be null");
this.approximateMaxQueueSize = approximateMaxQueueSize;
}

@Override
public Optional<Task<T>> get() {
try {
if (iterator == null) {
findepi marked this conversation as resolved.
Show resolved Hide resolved
iterator = input.iterator();
}

while (iterator.hasNext()) {
findepi marked this conversation as resolved.
Show resolved Hide resolved
if (queue.size() >= approximateMaxQueueSize) {
// Yield. Stop execution so that the task can be resubmitted, in which case call() will
findepi marked this conversation as resolved.
Show resolved Hide resolved
// be called again.
return Optional.of(this);
findepi marked this conversation as resolved.
Show resolved Hide resolved
}

T next = iterator.next();
findepi marked this conversation as resolved.
Show resolved Hide resolved
if (closed.get()) {
break;
}

queue.add(next);
}
} catch (Throwable e) {
try {
close();
} catch (IOException closeException) {
// self-suppression is not permitted
// (e and closeException to be the same is unlikely, but possible)
if (closeException != e) {
e.addSuppressed(closeException);
}
}

throw e;
findepi marked this conversation as resolved.
Show resolved Hide resolved
}

try {
close();
} catch (IOException e) {
throw new UncheckedIOException(e);
findepi marked this conversation as resolved.
Show resolved Hide resolved
}

// The task is complete. Returning empty means there is no continuation that should be
// executed.
return Optional.empty();
findepi marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
public void close() throws IOException {
iterator = null;
if (input instanceof Closeable) {
((Closeable) input).close();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,22 @@
import java.lang.reflect.Field;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.relocated.com.google.common.collect.HashMultiset;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMultiset;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Multiset;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Test;

Expand Down Expand Up @@ -133,6 +140,47 @@ public CloseableIterator<Integer> iterator() {
.untilAsserted(() -> assertThat(queue).as("Queue is not empty after cleaning").isEmpty());
}

@Test
public void limitQueueSize() throws IOException, IllegalAccessException, NoSuchFieldException {
findepi marked this conversation as resolved.
Show resolved Hide resolved

List<Iterable<Integer>> iterables =
ImmutableList.of(
() -> IntStream.range(0, 100).iterator(),
() -> IntStream.range(0, 100).iterator(),
() -> IntStream.range(0, 100).iterator());

Multiset<Integer> expectedValues =
IntStream.range(0, 100)
.boxed()
.flatMap(i -> Stream.of(i, i, i))
.collect(ImmutableMultiset.toImmutableMultiset());

int maxQueueSize = 20;
ExecutorService executor = Executors.newCachedThreadPool();
findepi marked this conversation as resolved.
Show resolved Hide resolved
ParallelIterable<Integer> parallelIterable =
new ParallelIterable<>(iterables, executor, maxQueueSize);
CloseableIterator<Integer> iterator = parallelIterable.iterator();
findepi marked this conversation as resolved.
Show resolved Hide resolved
Field queueField = iterator.getClass().getDeclaredField("queue");
queueField.setAccessible(true);
ConcurrentLinkedQueue<?> queue = (ConcurrentLinkedQueue<?>) queueField.get(iterator);

Multiset<Integer> actualValues = HashMultiset.create();

while (iterator.hasNext()) {
findepi marked this conversation as resolved.
Show resolved Hide resolved
assertThat(queue)
.as("iterator internal queue")
.hasSizeLessThanOrEqualTo(maxQueueSize + iterables.size());
actualValues.add(iterator.next());
}

assertThat(actualValues)
.as("multiset of values returned by the iterator")
.isEqualTo(expectedValues);

iterator.close();
executor.shutdownNow();
}

private void queueHasElements(CloseableIterator<Integer> iterator, Queue queue) {
assertThat(iterator.hasNext()).isTrue();
assertThat(iterator.next()).isNotNull();
Expand Down