-
Notifications
You must be signed in to change notification settings - Fork 3.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Rewrite partial top-n node to LimitNode or LastNNode #18384
base: master
Are you sure you want to change the base?
Conversation
This mr still has some details that need to be dealt with, but the review can be started. @marton-bod @findepi @alexjo2144 If convenient, Would you mind to have a review on this MR ? |
7e257f5
to
e097b3b
Compare
try (CloseableIterable<FileScanTask> fileScanTasks = table.newScan().filter(toIcebergExpression(effectivePredicate)).planFiles()) { | ||
for (FileScanTask scanTask : fileScanTasks) { | ||
DataFile file = scanTask.file(); | ||
if (file.sortOrderId() == 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Even if iceberg DataFile defines a sortorder, but the file from planFile
does not have it, I suspect this is a bug of iceberg.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorting is optional. It's legitimate for files to be sorted differently, or not be sorted at all.
However, there is a problem with table.newScan()
. We should avoid it. We use it for stats today, and we know it's causing problems (takes too long), so we don't want to do this during planning.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
File sorting is indeed optional, but necessary for this PR to accelerate queries (we can support partial file sorting scenarios later). Since table.newScan()
is not recommended, is there another way to get information about whether the files have been sorted during the planning stage ?
And For query acceleration scenarios based on large amounts of top-N data, I think the planning time is negligible ?
Hi, @raunaqmorarka I've given an initial review of the information available on the partial rewriting of TopN to Limit. It seems that my first step would be to acquire the However, an issue arises as the Furthermore, if data is written by (for instance, ASC) contradicts the query direction (for instance, DESC), it seems that the current Trino version doesn't provide support. Do you have any suggestions? I would greatly appreciate your insights. |
@zhangminglei Thanks for PR. Could you rebase? it also seems that iceberg tests are failing |
@sopel39 Sorry, I haven't read this issue for a long time. I would think it is an iceberg bug apache/iceberg#8864, Hi, @nastra , Could you please help to check this ? |
@findepi @sopel39 Just a friendly ping, the test failed was caused by iceberg bug and I fixed it |
try (CloseableIterable<FileScanTask> fileScanTasks = table.newScan().filter(toIcebergExpression(effectivePredicate)).planFiles()) { | ||
for (FileScanTask scanTask : fileScanTasks) { | ||
DataFile file = scanTask.file(); | ||
if (file.sortOrderId() == 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorting is optional. It's legitimate for files to be sorted differently, or not be sorted at all.
However, there is a problem with table.newScan()
. We should avoid it. We use it for stats today, and we know it's causing problems (takes too long), so we don't want to do this during planning.
@@ -2546,6 +2552,62 @@ private static IcebergColumnHandle createProjectedColumnHandle(IcebergColumnHand | |||
Optional.empty()); | |||
} | |||
|
|||
@Override | |||
public Optional<PartialSortApplicationResult<ConnectorTableHandle>> applyPartialSort( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not applyTopN?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To be precise, this PR does not implement push-down top-N, which should be implemented by database software based on JDBC client connections.This PR just rewrite partial-top-n-node to LimitNode
or LastNNode
.
From
partial-top-n
Project
Filter
TableScan
To
Limit/LastN
Project
Filter
TableScan
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorting is optional. It's legitimate for files to be sorted differently, or not be sorted at all.
You are right. In addition, the multi-version of SortOrder
is not considered now. If the SortOrderId of each file is inconsistent with the SortOrderId defined by the table, then this optimization cannot be used.
originalTableHandle.withSort(allSorted), | ||
trinoSortOrder.isAscending() == icebergSortOrder.isAscending(), | ||
trinoSortOrder.isNullsFirst() == icebergSortOrder.isNullsFirst(), | ||
allSorted)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think i don't understand how it works.
Let's assume i have a bunch of sorted files.
Each of these files may produce one or more splits.
How does the fact that files are sorted help in anything?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a good and worthwhile question. If I understand you correctly, if all the files in the query range are already sorted and the selectivity of the filter field is very low, then it is necessary to have one split per file. However, if the selectivity of the filter field is relatively high, one split per file may not have enough concurrency, which may affect query performance, because almost all the data in the file needs to be read to find the required data. In this case, we would rather have a file split into multiple splits to improve query performance.
|
I think it's a good idea. @findepi will you continue with review? |
This pull request has gone a while without any activity. Tagging the Trino developer relations team: @bitsondatadev @colebow @mosabua |
@zhangminglei please submit the new version as you suggested and also adapt by rebasing. Let us know how we can help after that. |
Thanks @mosabua ! The latest code has been merged to production within our company. The code ensures that data written in one order (e.g. DESC) can be query in two different orders (DESC or ASC) with almost consistent query performance. I will submit the latest code as soon as possible |
e097b3b
to
28f4049
Compare
Thank you for your pull request and welcome to our community. We could not parse the GitHub identity of the following contributors: zhangminglei.
|
ced3941
to
2b21616
Compare
2b21616
to
cf7d6f3
Compare
847cd8a
to
b345b77
Compare
b345b77
to
1a19429
Compare
core/trino-spi/src/main/java/io/trino/spi/connector/ConnectorMetadata.java
Outdated
Show resolved
Hide resolved
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java
Outdated
Show resolved
Hide resolved
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java
Outdated
Show resolved
Hide resolved
// partition specs are {0:f1, 1:f1,f2, 2:f3} | ||
// if query sort is `order by f2`, filter out result(specIdNotMatching) is specId=0,2. later we will use `planFile` decide whether the planed files have specId 0 or 2. | ||
// if all the files spec ids does not have 0 or 2 due to some reason, then we hope the query can use our optimization. | ||
table.specs().values().forEach(spec -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if spec is no longer used, does it matter?
(can a historical spec make the feature not work?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It depends. The important thing is whether the plan file generated includes this historical spec. If it does, then the feature not work.
* </pre> | ||
* | ||
*/ | ||
public class PushPartialTopNIntoTableScan |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc @martint
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi, @martint Just a friendly ping, Could you take a look on this ?
core/trino-spi/src/main/java/io/trino/spi/connector/ConnectorMetadata.java
Outdated
Show resolved
Hide resolved
/** | ||
* If the file is ordered, attempt to push down sort or topN to connector. | ||
*/ | ||
default Optional<PartialSortApplicationResult<ConnectorTableHandle>> applyPartialSort( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In Trino partial sort would often indicate "distributed sort" feature, but this is not the context this method is called (or could be called in the future). Not sure how this new SPI method should be best framed.
cc @martint
Thanks @findepi for review, I have modified the PR in |
88ad207
to
8113233
Compare
8113233
to
daf81cd
Compare
This pull request has gone a while without any activity. Tagging the Trino developer relations team: @bitsondatadev @colebow @mosabua |
Closing this pull request, as it has been stale for six weeks. Feel free to re-open at any time. |
I think we should still pursue this so I am reopening |
This pull request has gone a while without any activity. Tagging the Trino developer relations team: @bitsondatadev @colebow @mosabua |
Description
If Trino knows that each file has been sorted, it can rewrite the
Partial TopN
toLimitNode
orLastNNode
(a new logical plan node in this PR), which will significantly improve query performance. Trino can use thesort_order_id
field in the Iceberg manifest entry to determine whether a data file has been sorted.Backgrounds and Core Idea
For convenience of review, I need to describe the
LastNOperator
(a new operator was introduced) in this PR and the working principle of the related code. First, as a recap, let's take a quick look at how Page was originally built fromOrcReader
. I believe Trino gods are more familiar with the implementation details, but it can help understanding theLastNOperator
implementations later. Trino's OrcReader advances based on RowGroups then obtains the current RowGroup's input stream, and construct Pages based on this input stream. Therefore, at least during the Pages generation phase, a Page is not generated across RowGroups. In other words, a Page does not come from two different RowGroups. Additionally, the batch parameter MAX_BATCH_SIZE is set to 8192 in current codebase, so if the default RowGroup size is 10,000, at least one Page will be generated for each RowGroup.In this PR, when OrcReader outputs a Page, it adds an additional column whose value is calculated by concatenating the existing variables
stripeSize
andcurrentRowGroup
. The following code snippet is the logic for adding the last column to the Page. This code is part of a class calledOrcRecordBackwardReader
. This class mainly iterates row groups in reverse order.It should be noted that trino will do some additional optimization on the output Page. For example, when the
physical operator includes
ScanFilterAndProjectOperator
orFilterAndProjectOperator
instead ofTableScanOperator
. Trino optimizes the Page by merging or splitting and this PR also adapts to the both scenarios.If the implementation of
LastNOperator
reads from the beginning of the file and retains the data from the last few Pages at the end of the file, there may still be significant IO overhead due to processing a lot of irrelevant data. Therefore, theLastNOperator
expects the input page to be constructed from the last RowGroup of the current split and this depends on the behavior of OrcReader's output Page,so the PR introduces a class calledOrcRecordBackwardReader
to differentiate it from the existingOrcRecordReader
.Furthermore, the
LastNOperator
implementation takes performance issues into account. Each Page entering the operator has a unique RowGroupId block to indicate from which row group the Page comes and the block is the last block of the Page. Unlike theLimitOperator
, which outputs any input Page downstream until remainingLimit is no longer greater than 0, theLastNOperator
accumulates the Pages. It does not output to the downstream operator until it accumulates N elements unless the total number of rows in the source data is less than N. Therefore, theLastNOperator
is a blocking operator, like theOrderByOperator
, and it can precisely calculates the N elements needed to reduce the memory of the current worker process and the network pressure caused by sending data to downstream operator.Now, let's illustrate the general implementation approach of the
LastNOperator
with an example.Consider the following data written sequentially in a column. For convenience, I use one row to represent it, and assume the row group size is 3. This numbers will be divided into three row groups. Since the data is written sequentially, the data between row groups follows the order: data in row group 3 is smaller than that in row group 2, and data in row group 2 is smaller than that in row group 1. However, within each row group, the data is still sequential.
The
LastNOperator
maintains a corresponding queue for each row group, and the queue saves the pages generated by the row group. Also, the value of N is updated when encountering each new row group and the purpose is to calculate how many N is needed remaining in the new row group. For example, if we need the top 4 data. i.e. [9,8,7,6], with N being 4, then when row group-1 is processed done, N will be updated to 1, we can call it the remaining N is 1. At this point, only 1 more digit needs to be obtained from row group-2 is fine. Since in each row group, the data is still sequentialas so all pages in row group-2 must be read to find the page containing the number 6. Next, when reading row group-3, N is updated again, and when N decrease to 0, it indicates that no more data is needed. When theLastNOperator
finally outputs, it region the pages to reduce memory and network pressure, ensuring that the data sent downstream only contains [9,8,7,6], even if don't region page atgetOutput
, it won't have much impact on memory and network because this part of the data is very small. In addition, the implementation ofaddInput
inLastNOperator
, for each current queue, the impact of memory on the worker process will also be fully considered.After this version of the code if can be successfully merged, we will also plan to support multi-field sorting and the situation where only some part of files are sorted. Probably like below, but this PR does not have these functions.
Test
Testing was performed on real log queries to retrieve the latest 200 records, which is a very common business requirement. Two query scenarios were tested, one ascending and one descending. It is worth noting that the performance of these two test cases was almost identical.
The brief test idea is as follows:
Write
The iceberg table sorting field is defined as
ORDER BY _timestamp DESC NULLS LAST
, and the iceberg table is written using theApache/Spark
engine.Read
The sorting of the query was divided into two cases, one that is exactly the same as the writing order and one that is completely opposite.
ORDER BY _timestamp DESC NULLS LAST: It took 1.96 seconds, and the partial-top-n node was rewritten as
LimitNode
, corresponding toLimitOperator
.ORDER BY _timestamp ASC NULLS FIRST: It took 2.24 seconds, and the partial-top-n node was rewritten as
LastNNode
, corresponding to the new operator added by the MR, calledLastNOperator
.Compared to the original Trino query, the query takes 36.45 seconds, and the test results show a performance improvement of 16 to 18 times.
Partition statistics
Additional context and related issues
This is the work of the first part of this issue #18139