Skip to content

Commit

Permalink
Cherry-pick 3 ParallelIterable fix around memory problem (apache#1336)
Browse files Browse the repository at this point in the history
* Core: Fix ParallelIterable memory leak where queue continues to be populated even after iterator close (apache#9402)

(cherry picked from commit d3cb1b6)

* Core: Limit ParallelIterable memory consumption by yielding in tasks (apache#10691)

ParallelIterable schedules 2 * WORKER_THREAD_POOL_SIZE tasks for
processing input iterables. This defaults to 2 * # CPU cores.  When one
or some of the input iterables are considerable in size and the
ParallelIterable consumer is not quick enough, this could result in
unbounded allocation inside `ParallelIterator.queue`. This commit bounds
the queue. When queue is full, the tasks yield and get removed from the
executor. They are resumed when consumer catches up.

(cherry picked from commit 7831a8d)

* Drop ParallelIterable's queue low water mark (apache#10978)

As part of the change in commit
7831a8d, queue low water mark was
introduced. However, it resulted in increased number of manifests being
read when planning LIMIT queries in Trino Iceberg connector. To avoid
increased I/O, back out the change for now.

(cherry picked from commit bcb3281)

---------

Co-authored-by: Helt <[email protected]>
Co-authored-by: Piotr Findeisen <[email protected]>
  • Loading branch information
3 people authored and GitHub Enterprise committed Sep 13, 2024
1 parent 638dfc0 commit ffeb8ef
Show file tree
Hide file tree
Showing 2 changed files with 272 additions and 48 deletions.
211 changes: 163 additions & 48 deletions core/src/main/java/org/apache/iceberg/util/ParallelIterable.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,78 +20,117 @@

import java.io.Closeable;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import org.apache.iceberg.exceptions.RuntimeIOException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import org.apache.iceberg.io.CloseableGroup;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.io.Closer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ParallelIterable<T> extends CloseableGroup implements CloseableIterable<T> {

private static final Logger LOG = LoggerFactory.getLogger(ParallelIterable.class);

// Logic behind default value: ParallelIterable is often used for file planning.
// Assuming that a DataFile or DeleteFile is about 500 bytes, a 30k limit uses 14.3 MB of memory.
private static final int DEFAULT_MAX_QUEUE_SIZE = 30_000;

private final Iterable<? extends Iterable<T>> iterables;
private final ExecutorService workerPool;

// Bound for number of items in the queue to limit memory consumption
// even in the case when input iterables are large.
private final int approximateMaxQueueSize;

public ParallelIterable(Iterable<? extends Iterable<T>> iterables, ExecutorService workerPool) {
this.iterables = iterables;
this.workerPool = workerPool;
this(iterables, workerPool, DEFAULT_MAX_QUEUE_SIZE);
}

public ParallelIterable(
Iterable<? extends Iterable<T>> iterables,
ExecutorService workerPool,
int approximateMaxQueueSize) {
this.iterables = Preconditions.checkNotNull(iterables, "Input iterables cannot be null");
this.workerPool = Preconditions.checkNotNull(workerPool, "Worker pool cannot be null");
this.approximateMaxQueueSize = approximateMaxQueueSize;
}

@Override
public CloseableIterator<T> iterator() {
ParallelIterator<T> iter = new ParallelIterator<>(iterables, workerPool);
ParallelIterator<T> iter =
new ParallelIterator<>(iterables, workerPool, approximateMaxQueueSize);
addCloseable(iter);
return iter;
}

private static class ParallelIterator<T> implements CloseableIterator<T> {
private final Iterator<Runnable> tasks;
private final Iterator<Task<T>> tasks;
private final Deque<Task<T>> yieldedTasks = new ArrayDeque<>();
private final ExecutorService workerPool;
private final Future<?>[] taskFutures;
private final CompletableFuture<Optional<Task<T>>>[] taskFutures;
private final ConcurrentLinkedQueue<T> queue = new ConcurrentLinkedQueue<>();
private volatile boolean closed = false;
private final int maxQueueSize;
private final AtomicBoolean closed = new AtomicBoolean(false);

private ParallelIterator(
Iterable<? extends Iterable<T>> iterables, ExecutorService workerPool) {
Iterable<? extends Iterable<T>> iterables, ExecutorService workerPool, int maxQueueSize) {
this.tasks =
Iterables.transform(
iterables,
iterable ->
(Runnable)
() -> {
try (Closeable ignored =
(iterable instanceof Closeable) ? (Closeable) iterable : () -> {}) {
for (T item : iterable) {
queue.add(item);
}
} catch (IOException e) {
throw new RuntimeIOException(e, "Failed to close iterable");
}
})
iterables, iterable -> new Task<>(iterable, queue, closed, maxQueueSize))
.iterator();
this.workerPool = workerPool;
this.maxQueueSize = maxQueueSize;
// submit 2 tasks per worker at a time
this.taskFutures = new Future[2 * ThreadPools.WORKER_THREAD_POOL_SIZE];
this.taskFutures = new CompletableFuture[2 * ThreadPools.WORKER_THREAD_POOL_SIZE];
}

@Override
public void close() {
// close first, avoid new task submit
this.closed = true;
this.closed.set(true);

// cancel background tasks
for (Future<?> taskFuture : taskFutures) {
if (taskFuture != null && !taskFuture.isDone()) {
taskFuture.cancel(true);
try (Closer closer = Closer.create()) {
synchronized (this) {
yieldedTasks.forEach(closer::register);
yieldedTasks.clear();
}

// cancel background tasks and close continuations if any
for (CompletableFuture<Optional<Task<T>>> taskFuture : taskFutures) {
if (taskFuture != null) {
taskFuture.cancel(true);
taskFuture.thenAccept(
continuation -> {
if (continuation.isPresent()) {
try {
continuation.get().close();
} catch (IOException e) {
LOG.error("Task close failed", e);
}
}
});
}
}

// clean queue
this.queue.clear();
} catch (IOException e) {
throw new UncheckedIOException("Close failed", e);
}
// clean queue
this.queue.clear();
}

/**
Expand All @@ -101,15 +140,17 @@ public void close() {
*
* @return true if there are pending tasks, false otherwise
*/
private boolean checkTasks() {
private synchronized boolean checkTasks() {
Preconditions.checkState(!closed.get(), "Already closed");
boolean hasRunningTask = false;

for (int i = 0; i < taskFutures.length; i += 1) {
if (taskFutures[i] == null || taskFutures[i].isDone()) {
if (taskFutures[i] != null) {
// check for task failure and re-throw any exception
// check for task failure and re-throw any exception. Enqueue continuation if any.
try {
taskFutures[i].get();
Optional<Task<T>> continuation = taskFutures[i].get();
continuation.ifPresent(yieldedTasks::addLast);
} catch (ExecutionException e) {
if (e.getCause() instanceof RuntimeException) {
// rethrow a runtime exception
Expand All @@ -130,29 +171,29 @@ private boolean checkTasks() {
}
}

return !closed && (tasks.hasNext() || hasRunningTask);
return !closed.get() && (tasks.hasNext() || hasRunningTask);
}

private Future<?> submitNextTask() {
if (!closed && tasks.hasNext()) {
return workerPool.submit(tasks.next());
private CompletableFuture<Optional<Task<T>>> submitNextTask() {
if (!closed.get()) {
if (!yieldedTasks.isEmpty()) {
return CompletableFuture.supplyAsync(yieldedTasks.removeFirst(), workerPool);
} else if (tasks.hasNext()) {
return CompletableFuture.supplyAsync(tasks.next(), workerPool);
}
}
return null;
}

@Override
public synchronized boolean hasNext() {
Preconditions.checkState(!closed, "Already closed");

// if the consumer is processing records more slowly than the producers, then this check will
// prevent tasks from being submitted. while the producers are running, this will always
// return here before running checkTasks. when enough of the tasks are finished that the
// consumer catches up, then lots of new tasks will be submitted at once. this behavior is
// okay because it ensures that records are not stacking up waiting to be consumed and taking
// up memory.
//
// consumers that process results quickly will periodically exhaust the queue and submit new
// tasks when checkTasks runs. fast consumers should not be delayed.
Preconditions.checkState(!closed.get(), "Already closed");

// If the consumer is processing records more slowly than the producers, the producers will
// eventually fill the queue and yield, returning continuations. Continuations and new tasks
// are started by checkTasks(). The check here prevents us from restarting continuations or
// starting new tasks before the queue is emptied. Restarting too early would lead to tasks
// yielding very quickly (CPU waste on scheduling).
if (!queue.isEmpty()) {
return true;
}
Expand Down Expand Up @@ -186,4 +227,78 @@ public synchronized T next() {
return queue.poll();
}
}

private static class Task<T> implements Supplier<Optional<Task<T>>>, Closeable {
private final Iterable<T> input;
private final ConcurrentLinkedQueue<T> queue;
private final AtomicBoolean closed;
private final int approximateMaxQueueSize;

private Iterator<T> iterator = null;

Task(
Iterable<T> input,
ConcurrentLinkedQueue<T> queue,
AtomicBoolean closed,
int approximateMaxQueueSize) {
this.input = Preconditions.checkNotNull(input, "input cannot be null");
this.queue = Preconditions.checkNotNull(queue, "queue cannot be null");
this.closed = Preconditions.checkNotNull(closed, "closed cannot be null");
this.approximateMaxQueueSize = approximateMaxQueueSize;
}

@Override
public Optional<Task<T>> get() {
try {
if (iterator == null) {
iterator = input.iterator();
}

while (iterator.hasNext()) {
if (queue.size() >= approximateMaxQueueSize) {
// Yield when queue is over the size limit. Task will be resubmitted later and continue
// the work.
return Optional.of(this);
}

T next = iterator.next();
if (closed.get()) {
break;
}

queue.add(next);
}
} catch (Throwable e) {
try {
close();
} catch (IOException closeException) {
// self-suppression is not permitted
// (e and closeException to be the same is unlikely, but possible)
if (closeException != e) {
e.addSuppressed(closeException);
}
}

throw e;
}

try {
close();
} catch (IOException e) {
throw new UncheckedIOException("Close failed", e);
}

// The task is complete. Returning empty means there is no continuation that should be
// executed.
return Optional.empty();
}

@Override
public void close() throws IOException {
iterator = null;
if (input instanceof Closeable) {
((Closeable) input).close();
}
}
}
}
Loading

0 comments on commit ffeb8ef

Please sign in to comment.