Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Core: Limit memory used by ParallelIterable #10691

Merged
merged 13 commits into from
Jul 22, 2024

Conversation

findepi
Copy link
Member

@findepi findepi commented Jul 12, 2024

ParallelIterable schedules 2 * WORKER_THREAD_POOL_SIZE tasks for processing input iterables. This defaults to 2 * # CPU cores. When one or some of the input iterables are considerable in size and the ParallelIterable consumer is not quick enough, this could result in unbounded allocation inside ParallelIterator.queue. This commit bounds the queue. When queue is full, the tasks yield and get removed from the executor. They are resumed when consumer catches up.

Follows #9402
Alternative to #7844

@github-actions github-actions bot added the core label Jul 12, 2024
@findepi findepi force-pushed the findepi/limit-paralleliterable-memory branch from 8abc075 to ccf57e5 Compare July 12, 2024 13:02
@findepi findepi changed the title Limit memory used by ParallelIterable Core: Limit memory used by ParallelIterable Jul 12, 2024
findepi added 2 commits July 12, 2024 15:28
ParallelIterable schedules 2 * WORKER_THREAD_POOL_SIZE tasks for
processing input iterables. This defaults to 2 * # CPU cores.  When one
or some of the input iterables are considerable in size and the
ParallelIterable consumer is not quick enough, this could result in
unbounded allocation inside `ParallelIterator.queue`. This commit bounds
the queue. When queue is full, the tasks yield and get removed from the
executor. They are resumed when consumer catches up.
@findepi findepi force-pushed the findepi/limit-paralleliterable-memory branch from e6c477f to ec4c52f Compare July 12, 2024 13:28
@findepi findepi added this to the Iceberg 1.6.0 milestone Jul 12, 2024
@findepi findepi requested review from rdblue, nastra and Fokko July 12, 2024 13:43
@Fokko
Copy link
Contributor

Fokko commented Jul 12, 2024

I don't have enough knowledge of this piece of this piece of code to merge this without any benchmarks or profiling. Maybe @rdblue?

@findepi
Copy link
Member Author

findepi commented Jul 12, 2024

@Heltman @losipiuk @alexjo2144 you might want to take a look

@findepi
Copy link
Member Author

findepi commented Jul 12, 2024

@Fokko fair. Also, note this is not a performance improvement or something. It's "just" bounding memory usage to prevent OOM. As such, I wound't expect this change to require benchmarking or profiling. But it definitely still requires review.

@Fokko
Copy link
Contributor

Fokko commented Jul 12, 2024

My thinking here is that we bound the queue. The ParallelIterable is often used at places where it is IO intensive. This will limit the parallelism of calls to the object stores and that might harm the overall throughput.

@dekimir
Copy link

dekimir commented Jul 12, 2024

My thinking here is that we bound the queue. The ParallelIterable is often used at places where it is IO intensive. This will limit the parallelism of calls to the object stores and that might harm the overall throughput.

If the consumer of ParallelIterable is fast enough, then this limit should have no impact, right? And if the consumer isn't consuming fast enough, then the producer's throughput doesn't determine the overall throughput.

@Fokko
Copy link
Contributor

Fokko commented Jul 12, 2024

If the consumer of ParallelIterable is fast enough, then this limit should have no impact, right?

Yes, I agree, but on the other hand, if the limit is too high, it doesn't help either.

@dekimir
Copy link

dekimir commented Jul 12, 2024

on the other hand, if the limit is too high, it doesn't help either.

Can't the caller set a lower limit then, by calling the new constructor overload?

@Fokko
Copy link
Contributor

Fokko commented Jul 12, 2024

Can't the caller set a lower limit then, by calling the new constructor overload?

Yes, that's possible but then you already have to inherit quite a few classes to overload certain parts of Iceberg. Then you could also supply your Iterator, for example, the one suggested in #7844. I don't think that's ideal.

@stevenzwu
Copy link
Contributor

@findepi thanks for working on this. we also ran into the memory issue internally when some manifest files are super large (like hundreds of MBs or GBs).

Curious if you have done any performance testing. echo to another comment. wondering if the default queue size of 10K would affect the throughput for very large tables with regular manifest file sizes (like a few to dozens of MBs) in happy path.

@findepi
Copy link
Member Author

findepi commented Jul 12, 2024

Can't the caller set a lower limit then, by calling the new constructor overload?

Yes, that's possible but then you already have to inherit quite a few classes to overload certain parts of Iceberg.

@Fokko this PR adds ParallelIterable constructor overload that takes a desired limit.
initially added this for testing, but might be useful. i am also ok running with the default value, unconfigurable, for now, this should suffice.

@findepi
Copy link
Member Author

findepi commented Jul 12, 2024

@stevenzwu thanks for your comments!

Curious if you have done any performance testing. echo to another comment. wondering if the default queue size of 10K would affect the throughput for very large tables with regular manifest file sizes (like a few to dozens of MBs) in happy path.

No, i haven't. (and for large manifests fixing OOM failures seemed more important than anything else)

Also, per #10691 (comment), the perf testing will very much depend on how the parallel iterator is consumed.

if we're concerned about impact -- there is potential impact since we resume tasks only after queue is emptied by the consumer (pre-existing logic) -- we can change this logic. We can resume tasks even before queue is fully empty. Would that help alleviate concerns?

@findepi findepi requested a review from stevenzwu July 12, 2024 16:25
Copy link
Contributor

@rdblue rdblue left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me.

@findepi
Copy link
Member Author

findepi commented Jul 19, 2024

thanks @rdblue for your review and approval too!
@RussellSpitzer do you want to take another look?

@RussellSpitzer
Copy link
Member

I'm ok with adding a performance testing in another issue, I'll do a full read throw again this morning.


// If the consumer is processing records more slowly than the producers, the producers will
// eventually fill the queue and yield, returning continuations. Continuations and new tasks
// are started by checkTasks(). The check here prevents us from restarting continuations or
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The check here prevents us from restarting continuations or starting new tasks too early (when queue is almost full) or too late (when queue is already emptied).

I thought the if check here only prevents restarting continuations too early by skipping the checkTasks later in the method?

To prevent resuming too late, we need to reduce the Thread.sleep(10)?

I feel bulk this comment section might be better to move to method Javadoc. add a comment right before the if check to preventing resuming too early?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think there is a lot we can change about this code. I don't like the fact that method called "check tasks" has side-effect of scheduling new tasks. I also don't like the fact we do thread sleep. At minimum, the sleep should be interrupted when new item is put into the queue.
however, i wanted to avoid doing too many changes in single PR. I thought it's generally discouraged.
@stevenzwu would be OK to consider the sleep(10) to be pre-existing and not address now? Or do you think this sleep, when combined with changes introduced in this PR, has overall more negative effect that it used to be?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry for the confusion. I didn't mean to change the sleep. I was mainly talking about the code comment, which seems mainly about "resume too early"

// emptied). Restarting too early would lead to tasks yielding very quickly (CPU waste on
// scheduling). Restarting too late would mean the consumer may need to wait for the tasks
// to produce new items. A consumer slower than producers shouldn't need to wait.
int queueLowWaterMark = maxQueueSize / 2;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think either way doesn't affect readability. but since it is static, it might be a little bit to set in the constructor as a final field.

@findepi
Copy link
Member Author

findepi commented Jul 20, 2024

@stevenzwu thanks for your review! addressed comments, PTAL

@rdblue i changed the default size limit + some other editorials, PTAL

@findepi findepi requested review from stevenzwu and rdblue July 20, 2024 13:20

// Logic behind default value: ParallelIterable is often used for file planning.
// Assuming that a DataFile or DeleteFile is about 500 bytes, a 30k limit uses 14.3 MB of memory.
private static final int DEFAULT_MAX_QUEUE_SIZE = 30_000;
Copy link
Contributor

@stevenzwu stevenzwu Jul 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

finding a good default here is a bit tricky as it depends on two variables

  1. consumer speed which is hard to predict
  2. Thread.sleep(10) in the hasNext method for checkTasks while loop. Half the queue size should be large enough to avoid starving the consumer

Anyway, I am good with the default here since I don't know how to come up with a better number. I would be ok to go even a little higher like 50K. even assuming 1KB per item, it is 50 MB which is pretty small in modern computer. since we are changing from unbounded to some bound, a higher value would not be a regression.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we should use "the lowest number possible that doesn't affect performance negatively", rather than "the highest number possible that we believe will not blow up the memory". 50 MB is indeed very little. But uncontrolled 50 MB allocation per worker thread with eg 128 threads doing some work is 6.25 GB. Some applications like Trino initialize their worker pool size to # cores or twice # cores, so 128 threads is not an absurd number. Of course, a machine with 64 cores will have a lot more memory than this mere ~6 GB. Yet, ~6 GB of uncontrolled (and potentially unnecessary) allocation may lead to problems. Of course, same can be said about 30k size limit (3.75 GB allocation with 128 threads). I am not saying this particular number is the best one. But I would rather spend more energy in making the number smaller.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I disagree with "the lowest number possible that doesn't affect performance negatively" because we don't know what the affect on performance is. Until we do, we should be more conservative.

Being conservative means a higher limit, hopefully high enough that most use cases with an active consumer never hit that number of data files in a single scan. At the same time, we do want to balance memory consumption in cases when the consumer does pause.

I think that 30k is a good limit because it is higher than the number of files produced by most scans and has a reasonable limit for memory consumption.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I disagree with "the lowest number possible that doesn't affect performance negatively" because we don't know what the affect on performance is. Until we do, we should be more conservative.

I think we actually agree, despite wording syntactically suggesting otherwise.

IF we knew the performance impact THEN we would go with "the lowest number possible that doesn't affect performance negatively".
But we don't know and we try to be conservative and 30k may be a good conservative value. (Maybe 10k was also a good conservative value. Maybe none of them is -- we don't know for sure,)

@rdblue rdblue merged commit 7831a8d into apache:main Jul 22, 2024
49 checks passed
@rdblue
Copy link
Contributor

rdblue commented Jul 22, 2024

Thanks, @findepi! Good work finding a solution here.

@findepi findepi deleted the findepi/limit-paralleliterable-memory branch July 23, 2024 21:08
raunaqmorarka pushed a commit to raunaqmorarka/iceberg that referenced this pull request Jul 25, 2024
…pache#10691)

ParallelIterable schedules 2 * WORKER_THREAD_POOL_SIZE tasks for
processing input iterables. This defaults to 2 * # CPU cores.  When one
or some of the input iterables are considerable in size and the
ParallelIterable consumer is not quick enough, this could result in
unbounded allocation inside `ParallelIterator.queue`. This commit bounds
the queue. When queue is full, the tasks yield and get removed from the
executor. They are resumed when consumer catches up.

(cherry picked from commit 7831a8d)
raunaqmorarka pushed a commit to raunaqmorarka/iceberg that referenced this pull request Jul 26, 2024
…(backport apache#10691)

ParallelIterable schedules 2 * WORKER_THREAD_POOL_SIZE tasks for
processing input iterables. This defaults to 2 * # CPU cores.  When one
or some of the input iterables are considerable in size and the
ParallelIterable consumer is not quick enough, this could result in
unbounded allocation inside `ParallelIterator.queue`. This commit bounds
the queue. When queue is full, the tasks yield and get removed from the
executor. They are resumed when consumer catches up.

(cherry picked from commit 7831a8d)
findepi added a commit that referenced this pull request Jul 26, 2024
…(backport #10691) (#10787)

ParallelIterable schedules 2 * WORKER_THREAD_POOL_SIZE tasks for
processing input iterables. This defaults to 2 * # CPU cores.  When one
or some of the input iterables are considerable in size and the
ParallelIterable consumer is not quick enough, this could result in
unbounded allocation inside `ParallelIterator.queue`. This commit bounds
the queue. When queue is full, the tasks yield and get removed from the
executor. They are resumed when consumer catches up.

(cherry picked from commit 7831a8d)

Co-authored-by: Piotr Findeisen <[email protected]>
jasonf20 pushed a commit to jasonf20/iceberg that referenced this pull request Aug 4, 2024
…pache#10691)

ParallelIterable schedules 2 * WORKER_THREAD_POOL_SIZE tasks for
processing input iterables. This defaults to 2 * # CPU cores.  When one
or some of the input iterables are considerable in size and the
ParallelIterable consumer is not quick enough, this could result in
unbounded allocation inside `ParallelIterator.queue`. This commit bounds
the queue. When queue is full, the tasks yield and get removed from the
executor. They are resumed when consumer catches up.
szehon-ho pushed a commit to szehon-ho/iceberg that referenced this pull request Sep 16, 2024
* Core: Fix ParallelIterable memory leak where queue continues to be populated even after iterator close (apache#9402)

(cherry picked from commit d3cb1b6)

* Core: Limit ParallelIterable memory consumption by yielding in tasks (apache#10691)

ParallelIterable schedules 2 * WORKER_THREAD_POOL_SIZE tasks for
processing input iterables. This defaults to 2 * # CPU cores.  When one
or some of the input iterables are considerable in size and the
ParallelIterable consumer is not quick enough, this could result in
unbounded allocation inside `ParallelIterator.queue`. This commit bounds
the queue. When queue is full, the tasks yield and get removed from the
executor. They are resumed when consumer catches up.

(cherry picked from commit 7831a8d)

* Drop ParallelIterable's queue low water mark (apache#10978)

As part of the change in commit
7831a8d, queue low water mark was
introduced. However, it resulted in increased number of manifests being
read when planning LIMIT queries in Trino Iceberg connector. To avoid
increased I/O, back out the change for now.

(cherry picked from commit bcb3281)

---------

Co-authored-by: Helt <[email protected]>
Co-authored-by: Piotr Findeisen <[email protected]>
// cancel background tasks and close continuations if any
for (CompletableFuture<Optional<Task<T>>> taskFuture : taskFutures) {
if (taskFuture != null) {
taskFuture.cancel(true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The mayInterruptIfRunning flag has no effect in CompletableFuture, so I guess it's possible the thread cannot be released here?

zachdisc pushed a commit to zachdisc/iceberg that referenced this pull request Dec 23, 2024
…pache#10691)

ParallelIterable schedules 2 * WORKER_THREAD_POOL_SIZE tasks for
processing input iterables. This defaults to 2 * # CPU cores.  When one
or some of the input iterables are considerable in size and the
ParallelIterable consumer is not quick enough, this could result in
unbounded allocation inside `ParallelIterator.queue`. This commit bounds
the queue. When queue is full, the tasks yield and get removed from the
executor. They are resumed when consumer catches up.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants