-
Notifications
You must be signed in to change notification settings - Fork 2.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Core: Limit memory used by ParallelIterable #10691
Changes from all commits
6967388
ec4c52f
c561cc4
84de5fc
fc1c734
984473d
6102802
3a0d6ca
4ab1c01
fd0a261
6258455
b716bfc
1ffdf3a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,84 +20,117 @@ | |
|
||
import java.io.Closeable; | ||
import java.io.IOException; | ||
import java.io.UncheckedIOException; | ||
import java.util.ArrayDeque; | ||
import java.util.Deque; | ||
import java.util.Iterator; | ||
import java.util.NoSuchElementException; | ||
import java.util.Optional; | ||
import java.util.concurrent.CompletableFuture; | ||
import java.util.concurrent.ConcurrentLinkedQueue; | ||
import java.util.concurrent.ExecutionException; | ||
import java.util.concurrent.ExecutorService; | ||
import java.util.concurrent.Future; | ||
import org.apache.iceberg.exceptions.RuntimeIOException; | ||
import java.util.concurrent.atomic.AtomicBoolean; | ||
import java.util.function.Supplier; | ||
import org.apache.iceberg.io.CloseableGroup; | ||
import org.apache.iceberg.io.CloseableIterable; | ||
import org.apache.iceberg.io.CloseableIterator; | ||
import org.apache.iceberg.relocated.com.google.common.base.Preconditions; | ||
import org.apache.iceberg.relocated.com.google.common.collect.Iterables; | ||
import org.apache.iceberg.relocated.com.google.common.io.Closer; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
public class ParallelIterable<T> extends CloseableGroup implements CloseableIterable<T> { | ||
|
||
private static final Logger LOG = LoggerFactory.getLogger(ParallelIterable.class); | ||
|
||
// Logic behind default value: ParallelIterable is often used for file planning. | ||
// Assuming that a DataFile or DeleteFile is about 500 bytes, a 30k limit uses 14.3 MB of memory. | ||
private static final int DEFAULT_MAX_QUEUE_SIZE = 30_000; | ||
|
||
private final Iterable<? extends Iterable<T>> iterables; | ||
private final ExecutorService workerPool; | ||
|
||
// Bound for number of items in the queue to limit memory consumption | ||
// even in the case when input iterables are large. | ||
private final int approximateMaxQueueSize; | ||
|
||
public ParallelIterable(Iterable<? extends Iterable<T>> iterables, ExecutorService workerPool) { | ||
this.iterables = iterables; | ||
this.workerPool = workerPool; | ||
this(iterables, workerPool, DEFAULT_MAX_QUEUE_SIZE); | ||
} | ||
|
||
public ParallelIterable( | ||
Iterable<? extends Iterable<T>> iterables, | ||
ExecutorService workerPool, | ||
int approximateMaxQueueSize) { | ||
this.iterables = Preconditions.checkNotNull(iterables, "Input iterables cannot be null"); | ||
this.workerPool = Preconditions.checkNotNull(workerPool, "Worker pool cannot be null"); | ||
this.approximateMaxQueueSize = approximateMaxQueueSize; | ||
} | ||
|
||
@Override | ||
public CloseableIterator<T> iterator() { | ||
ParallelIterator<T> iter = new ParallelIterator<>(iterables, workerPool); | ||
ParallelIterator<T> iter = | ||
new ParallelIterator<>(iterables, workerPool, approximateMaxQueueSize); | ||
addCloseable(iter); | ||
return iter; | ||
} | ||
|
||
private static class ParallelIterator<T> implements CloseableIterator<T> { | ||
private final Iterator<Runnable> tasks; | ||
private final Iterator<Task<T>> tasks; | ||
private final Deque<Task<T>> yieldedTasks = new ArrayDeque<>(); | ||
findepi marked this conversation as resolved.
Show resolved
Hide resolved
|
||
private final ExecutorService workerPool; | ||
private final Future<?>[] taskFutures; | ||
private final CompletableFuture<Optional<Task<T>>>[] taskFutures; | ||
private final ConcurrentLinkedQueue<T> queue = new ConcurrentLinkedQueue<>(); | ||
private volatile boolean closed = false; | ||
private final int maxQueueSize; | ||
private final AtomicBoolean closed = new AtomicBoolean(false); | ||
|
||
private ParallelIterator( | ||
Iterable<? extends Iterable<T>> iterables, ExecutorService workerPool) { | ||
Iterable<? extends Iterable<T>> iterables, ExecutorService workerPool, int maxQueueSize) { | ||
this.tasks = | ||
Iterables.transform( | ||
iterables, | ||
iterable -> | ||
(Runnable) | ||
() -> { | ||
try (Closeable ignored = | ||
(iterable instanceof Closeable) ? (Closeable) iterable : () -> {}) { | ||
for (T item : iterable) { | ||
// exit manually because `ConcurrentLinkedQueue` can't be | ||
// interrupted | ||
if (closed) { | ||
return; | ||
} | ||
|
||
queue.add(item); | ||
} | ||
} catch (IOException e) { | ||
throw new RuntimeIOException(e, "Failed to close iterable"); | ||
} | ||
}) | ||
iterables, iterable -> new Task<>(iterable, queue, closed, maxQueueSize)) | ||
.iterator(); | ||
this.workerPool = workerPool; | ||
this.maxQueueSize = maxQueueSize; | ||
// submit 2 tasks per worker at a time | ||
this.taskFutures = new Future[2 * ThreadPools.WORKER_THREAD_POOL_SIZE]; | ||
this.taskFutures = new CompletableFuture[2 * ThreadPools.WORKER_THREAD_POOL_SIZE]; | ||
} | ||
|
||
@Override | ||
public void close() { | ||
// close first, avoid new task submit | ||
this.closed = true; | ||
this.closed.set(true); | ||
|
||
// cancel background tasks | ||
for (Future<?> taskFuture : taskFutures) { | ||
if (taskFuture != null && !taskFuture.isDone()) { | ||
taskFuture.cancel(true); | ||
try (Closer closer = Closer.create()) { | ||
synchronized (this) { | ||
yieldedTasks.forEach(closer::register); | ||
yieldedTasks.clear(); | ||
} | ||
|
||
// cancel background tasks and close continuations if any | ||
for (CompletableFuture<Optional<Task<T>>> taskFuture : taskFutures) { | ||
if (taskFuture != null) { | ||
taskFuture.cancel(true); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The |
||
taskFuture.thenAccept( | ||
continuation -> { | ||
if (continuation.isPresent()) { | ||
try { | ||
continuation.get().close(); | ||
} catch (IOException e) { | ||
LOG.error("Task close failed", e); | ||
} | ||
} | ||
}); | ||
} | ||
} | ||
|
||
// clean queue | ||
this.queue.clear(); | ||
} catch (IOException e) { | ||
throw new UncheckedIOException("Close failed", e); | ||
} | ||
// clean queue | ||
this.queue.clear(); | ||
} | ||
|
||
/** | ||
|
@@ -107,15 +140,17 @@ public void close() { | |
* | ||
* @return true if there are pending tasks, false otherwise | ||
*/ | ||
private boolean checkTasks() { | ||
private synchronized boolean checkTasks() { | ||
Preconditions.checkState(!closed.get(), "Already closed"); | ||
boolean hasRunningTask = false; | ||
|
||
for (int i = 0; i < taskFutures.length; i += 1) { | ||
if (taskFutures[i] == null || taskFutures[i].isDone()) { | ||
if (taskFutures[i] != null) { | ||
// check for task failure and re-throw any exception | ||
// check for task failure and re-throw any exception. Enqueue continuation if any. | ||
try { | ||
taskFutures[i].get(); | ||
Optional<Task<T>> continuation = taskFutures[i].get(); | ||
continuation.ifPresent(yieldedTasks::addLast); | ||
} catch (ExecutionException e) { | ||
if (e.getCause() instanceof RuntimeException) { | ||
// rethrow a runtime exception | ||
|
@@ -136,30 +171,33 @@ private boolean checkTasks() { | |
} | ||
} | ||
|
||
return !closed && (tasks.hasNext() || hasRunningTask); | ||
return !closed.get() && (tasks.hasNext() || hasRunningTask); | ||
} | ||
|
||
private Future<?> submitNextTask() { | ||
if (!closed && tasks.hasNext()) { | ||
return workerPool.submit(tasks.next()); | ||
private CompletableFuture<Optional<Task<T>>> submitNextTask() { | ||
if (!closed.get()) { | ||
if (!yieldedTasks.isEmpty()) { | ||
return CompletableFuture.supplyAsync(yieldedTasks.removeFirst(), workerPool); | ||
} else if (tasks.hasNext()) { | ||
return CompletableFuture.supplyAsync(tasks.next(), workerPool); | ||
} | ||
} | ||
return null; | ||
} | ||
|
||
@Override | ||
public synchronized boolean hasNext() { | ||
Preconditions.checkState(!closed, "Already closed"); | ||
|
||
// if the consumer is processing records more slowly than the producers, then this check will | ||
// prevent tasks from being submitted. while the producers are running, this will always | ||
// return here before running checkTasks. when enough of the tasks are finished that the | ||
// consumer catches up, then lots of new tasks will be submitted at once. this behavior is | ||
// okay because it ensures that records are not stacking up waiting to be consumed and taking | ||
// up memory. | ||
// | ||
// consumers that process results quickly will periodically exhaust the queue and submit new | ||
// tasks when checkTasks runs. fast consumers should not be delayed. | ||
if (!queue.isEmpty()) { | ||
Preconditions.checkState(!closed.get(), "Already closed"); | ||
|
||
// If the consumer is processing records more slowly than the producers, the producers will | ||
// eventually fill the queue and yield, returning continuations. Continuations and new tasks | ||
// are started by checkTasks(). The check here prevents us from restarting continuations or | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I thought the if check here only prevents restarting continuations too early by skipping the To prevent resuming too late, we need to reduce the I feel bulk this comment section might be better to move to method Javadoc. add a comment right before the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i think there is a lot we can change about this code. I don't like the fact that method called "check tasks" has side-effect of scheduling new tasks. I also don't like the fact we do thread sleep. At minimum, the sleep should be interrupted when new item is put into the queue. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. sorry for the confusion. I didn't mean to change the sleep. I was mainly talking about the code comment, which seems mainly about "resume too early" |
||
// starting new tasks too early (when queue is almost full) or too late (when queue is already | ||
// emptied). Restarting too early would lead to tasks yielding very quickly (CPU waste on | ||
// scheduling). Restarting too late would mean the consumer may need to wait for the tasks | ||
// to produce new items. A consumer slower than producers shouldn't need to wait. | ||
int queueLowWaterMark = maxQueueSize / 2; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why is this a local variable? if it is a constant it could be a final field. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's a variable, so it has a name, which is aimed at help understanding what's going on. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think either way doesn't affect readability. but since it is static, it might be a little bit to set in the constructor as a final field. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. technically, it's not static. this is a constructor parameter. the static field serves as a default. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. static is probably the wrong world. it is a constant for the object and could be a class field just like There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. re this being class static field -- currently this is instance parameter, the constant serves only as a default There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @findepi, I think by "static" Steven meant "does not change" rather than Java's |
||
if (queue.size() > queueLowWaterMark) { | ||
return true; | ||
} | ||
|
||
|
@@ -192,4 +230,78 @@ public synchronized T next() { | |
return queue.poll(); | ||
} | ||
} | ||
|
||
private static class Task<T> implements Supplier<Optional<Task<T>>>, Closeable { | ||
private final Iterable<T> input; | ||
private final ConcurrentLinkedQueue<T> queue; | ||
private final AtomicBoolean closed; | ||
private final int approximateMaxQueueSize; | ||
|
||
private Iterator<T> iterator = null; | ||
|
||
Task( | ||
Iterable<T> input, | ||
ConcurrentLinkedQueue<T> queue, | ||
AtomicBoolean closed, | ||
int approximateMaxQueueSize) { | ||
this.input = Preconditions.checkNotNull(input, "input cannot be null"); | ||
this.queue = Preconditions.checkNotNull(queue, "queue cannot be null"); | ||
this.closed = Preconditions.checkNotNull(closed, "closed cannot be null"); | ||
this.approximateMaxQueueSize = approximateMaxQueueSize; | ||
} | ||
|
||
@Override | ||
public Optional<Task<T>> get() { | ||
try { | ||
if (iterator == null) { | ||
findepi marked this conversation as resolved.
Show resolved
Hide resolved
|
||
iterator = input.iterator(); | ||
} | ||
|
||
while (iterator.hasNext()) { | ||
findepi marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if (queue.size() >= approximateMaxQueueSize) { | ||
// Yield when queue is over the size limit. Task will be resubmitted later and continue | ||
// the work. | ||
return Optional.of(this); | ||
findepi marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
|
||
T next = iterator.next(); | ||
findepi marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if (closed.get()) { | ||
break; | ||
} | ||
|
||
queue.add(next); | ||
} | ||
} catch (Throwable e) { | ||
try { | ||
close(); | ||
} catch (IOException closeException) { | ||
// self-suppression is not permitted | ||
// (e and closeException to be the same is unlikely, but possible) | ||
if (closeException != e) { | ||
e.addSuppressed(closeException); | ||
} | ||
} | ||
|
||
throw e; | ||
findepi marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
|
||
try { | ||
close(); | ||
} catch (IOException e) { | ||
throw new UncheckedIOException("Close failed", e); | ||
} | ||
|
||
// The task is complete. Returning empty means there is no continuation that should be | ||
// executed. | ||
return Optional.empty(); | ||
findepi marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
|
||
@Override | ||
public void close() throws IOException { | ||
iterator = null; | ||
if (input instanceof Closeable) { | ||
((Closeable) input).close(); | ||
} | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
finding a good default here is a bit tricky as it depends on two variables
Thread.sleep(10)
in thehasNext
method forcheckTasks
while loop. Half the queue size should be large enough to avoid starving the consumerAnyway, I am good with the default here since I don't know how to come up with a better number. I would be ok to go even a little higher like 50K. even assuming 1KB per item, it is 50 MB which is pretty small in modern computer. since we are changing from unbounded to some bound, a higher value would not be a regression.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here we should use "the lowest number possible that doesn't affect performance negatively", rather than "the highest number possible that we believe will not blow up the memory". 50 MB is indeed very little. But uncontrolled 50 MB allocation per worker thread with eg 128 threads doing some work is 6.25 GB. Some applications like Trino initialize their worker pool size to # cores or twice # cores, so 128 threads is not an absurd number. Of course, a machine with 64 cores will have a lot more memory than this mere ~6 GB. Yet, ~6 GB of uncontrolled (and potentially unnecessary) allocation may lead to problems. Of course, same can be said about 30k size limit (3.75 GB allocation with 128 threads). I am not saying this particular number is the best one. But I would rather spend more energy in making the number smaller.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I disagree with "the lowest number possible that doesn't affect performance negatively" because we don't know what the affect on performance is. Until we do, we should be more conservative.
Being conservative means a higher limit, hopefully high enough that most use cases with an active consumer never hit that number of data files in a single scan. At the same time, we do want to balance memory consumption in cases when the consumer does pause.
I think that 30k is a good limit because it is higher than the number of files produced by most scans and has a reasonable limit for memory consumption.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we actually agree, despite wording syntactically suggesting otherwise.
IF we knew the performance impact THEN we would go with "the lowest number possible that doesn't affect performance negatively".
But we don't know and we try to be conservative and 30k may be a good conservative value. (Maybe 10k was also a good conservative value. Maybe none of them is -- we don't know for sure,)