Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ParallelIterable may cause memory pressure for presto coordinator #3179

Open
liupan664021 opened this issue Sep 26, 2021 · 3 comments
Open

Comments

@liupan664021
Copy link

liupan664021 commented Sep 26, 2021

  • No limit for split queue
    Presto iceberg connector use BaseTableScan#planTasks() to generate splits which is processed by a ParallelIterable to plan parallelly. The base logic of ParallelIterable is that: when hasNext() is called, plan tasks(each task for each manifest) are submitted to a thread pool to plan data files from each manifest, and the generated FileScanTask are stored in a queue and then are consumed by next(). However there is no limit size or target size for the queue. That is to say when a task is submitted, all data files of the manifest in the task will be added to the queue continuously unitl finished. So in extreme cases, if a manifest has a huge amount of a datafiles and the consume speed is slow meanwhile, the size of the queue will grow quickly and cause memory pressure of the presto coordinator , especially when the FileScanTask size is big, eg. planning table with extremelly complicated schema.

  • Split queue is not cleared after iterator closed
    Another problem is that when the ParallelIterable is closed, the queue is not cleared as the code shows below.
    Since presto keeps history queries for a while, the FileScanTask remaining in the queue are referenced by scheduler and then reference by a presto query which can not collected by gc. This may cause the coordinator OOM in extremelly cases.

In our test, we set up a presto coordinator (worker node as well) of -Xmx=40GB, and query a sql like select col from table limit 1 in which the table has a verg complicated schema. When this sql is submitted continuously 8-10 times, the coordinator crashed caused of a oom error. We found that the FileScanTasks in the ParallelIterator#queue reached 2.5GB per query in the heap dump, and filled the heap of jvm quickly.

Of course, we can set null explicitly to the iterator reference of combinedScanIterable in presto iceberg connector to let gc collect the splits after query completed, but I think it's resonable to clear the queue when closing the ParallelIterator since the iterator can not be accessed anymore after closed.

@Override
    public void close() {
      // cancel background tasks
      for (int i = 0; i < taskFutures.length; i += 1) {
        if (taskFutures[i] != null && !taskFutures[i].isDone()) {
          taskFutures[i].cancel(true);
        }
      }
      this.closed = true;
    }

Maybe we can add some improvement for the ParallelIterable eg. add limit size and clear the queue after closed.

@davseitsev
Copy link

We have the same issue. It caused a lot of OOM problems when we had an issue with data compaction.

Here is an example of the smallest heap dump we took:
image

As you can see, ParallelIterator#queue takes 21GB, it's because it contains 313 573 items.

Some dumps contain more than 1M of org.apache.iceberg.BaseFileScanTask items (for multimple queries):
image

A single call to ParallelIterator.checkTasks() submits 96 processing tasks and it's enough to cause back pressure and memory starvation.

Maybe we can implement kind of slow-start algorithms, like checkTasks() can submit only 1 new task, and if the consumer is fast enough, ParallelIterator#queue will become empty and another checkTasks() call will submit additional task, etc.
Maybe it worths to have separate config for the size of ParallelIterator.taskFutures to limit max number of producers for a single query but not reduce the worker pool size. Or maybe we can limit queue size and use ForkJoinPool#managedBlock not to block completely parallel flows.

@alexjo2144
Copy link
Contributor

alexjo2144 commented Jul 10, 2024

Limiting the queue size and blocking the tasks that are reading manifests feels like it would be the best trade off between fixing the problem without too much added complexity.

Say the queue allows for a buffer of, just throwing a number out there, ~10,000 tasks. It seems unlikely that the consumer would first be slow enough that the queue fills and then suddenly be reading tasks off fast enough to clear the queue and outpace producers.

Here's a stalled attempt from the last time this came up: #7844 (comment)

@findepi
Copy link
Member

findepi commented Jul 12, 2024

Here's a stalled attempt from the last time this came up: #7844 (comment)

This got replaced by and merged as #9402
But that's probably not enough, so I created #10691

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants