-
Notifications
You must be signed in to change notification settings - Fork 3.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support limit pushdown in Bigquery connector #24021
Conversation
e3d5b41
to
2061c56
Compare
/test-with-secrets sha=2061c56273afc55ffabb8bb4e139e3260bf7488b |
The CI workflow run with tests that require additional secrets has been started: https://github.com/trinodb/trino/actions/runs/11694897950 |
2061c56
to
23b37b3
Compare
(rebased with master). Ready for review. |
if (limit.isPresent()) { | ||
// Storage API doesn't support limiting rows, but REST API supports LIMIT clause. | ||
// So use REST API to pushdown limit. | ||
return ImmutableList.of(BigQuerySplit.forViewStream(columns, filter, limit)); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But what if the LIMIT
size is too large or if the table is a wide one ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to estimate stream size beforehand and change the split mode based on that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Praveen2112 | @ebyhr ,
I have tested the limit with a managed table with Rest API + limit pushdown
and with Storage API without limit pushdown
.
The Storage API without limit pushdown
generally performs better in most cases. However, the REST API with limit pushdown
is only slightly faster by about 1–2 seconds when the physical data size is very small, around 1 MB.
-
Table
table size: 856 MB rows: 6001215 // 6 million
-
With Rest API + limit pushdown
SELECT * FROM tpch10.lineitem_1 LIMIT 1; -- 1 -- Input: 1 row (166B), Physical input time: 2.22s Execution time: ~4 seconds SELECT * FROM tpch10.lineitem_1 LIMIT 10; -- 10 -- Input: 10 rows (1.55kB), Physical input time: 2.85s Execution time: ~4 seconds SELECT * FROM tpch10.lineitem_1 LIMIT 100; -- 100 -- Input: 100 rows (15.28kB), Physical input time: 2.45s Execution time: ~4 seconds SELECT * FROM tpch10.lineitem_1 LIMIT 1000; -- 1K -- Input: 1000 rows (152.78kB), Physical input time: 2.17s Execution time: ~4 seconds SELECT * FROM tpch10.lineitem_1 LIMIT 2500; -- 2.5K -- Input: 2500 rows (382.77kB), Physical input time: 2.99s Execution time: ~5 seconds SELECT * FROM tpch10.lineitem_1 LIMIT 5000; -- 5K -- Input: 5000 rows (765.49kB), Physical input time: 4.46s Execution time: ~6 seconds SELECT * FROM tpch10.lineitem_1 LIMIT 7500; -- 7.5K -- Input: 7500 rows (1.12MB), Physical input time: 5.59s Execution time: ~7 seconds SELECT * FROM tpch10.lineitem_1 LIMIT 10000; -- 10K -- Input: 10000 rows (1.49MB), Physical input time: 6.49s Execution time: ~8 seconds SELECT * FROM tpch10.lineitem_1 LIMIT 100000; -- 100K -- Input: 100000 rows (14.95MB), Physical input time: 41.57s Execution time: ~40 seconds SELECT * FROM tpch10.lineitem_1 LIMIT 250000; -- 250K -- Input: 250000 rows (37.38MB), Physical input time: 1.65m Execution time: ~1 minutes 37 seconds SELECT * FROM tpch10.lineitem_1 LIMIT 500000; -- 500K -- Input: 500000 rows (74.76MB), Physical input time: 3.22m Execution time: ~3 minutes 14 seconds SELECT * FROM tpch10.lineitem_1 LIMIT 1000000; -- 1 million -- Input: 1000000 rows (149.52MB), Physical input time: 6.42m Execution time: ~6 minutes 60 seconds
-
With Storage API without limit pushdown
SELECT * FROM tpch10.lineitem_1 LIMIT 1; -- 1 -- Input: 1792 rows (274.48kB), Physical input: 204.56kB, Physical input time: 1.49s Execution time: ~5 seconds SELECT * FROM tpch10.lineitem_1 LIMIT 10; -- 10 -- Input: 1792 rows (273.93kB), Physical input: 202.76kB, Physical input time: 997.29ms Execution time: ~5 seconds SELECT * FROM tpch10.lineitem_1 LIMIT 100; -- 100 -- Input: 1792 rows (274.82kB), Physical input: 204.91kB, Physical input time: 1.55s Execution time: ~5 seconds SELECT * FROM tpch10.lineitem_1 LIMIT 1000; -- 1K -- Input: 3584 rows (548.75kB), Physical input: 407.66kB, Physical input time: 3.05s Execution time: ~5 seconds SELECT * FROM tpch10.lineitem_1 LIMIT 2500; -- 2.5K -- Input: 5376 rows (822.89kB), Physical input: 611.83kB, Physical input time: 5.01s Execution time: ~6 seconds SELECT * FROM tpch10.lineitem_1 LIMIT 5000; -- 5K -- Input: 7168 rows (1.07MB), Physical input: 815.95kB, Physical input time: 6.37s Execution time: ~6 seconds SELECT * FROM tpch10.lineitem_1 LIMIT 7500; -- 7.5K -- Input: 12544 rows (1.87MB), Physical input: 1.39MB, Physical input time: 10.97s Execution time: ~7 seconds SELECT * FROM tpch10.lineitem_1 LIMIT 10000; -- 10K -- Input: 14336 rows (2.14MB), Physical input: 1.59MB, Physical input time: 13.24s Execution time: ~5 seconds SELECT * FROM tpch10.lineitem_1 LIMIT 100000; -- 100K -- Input: 101709 rows (15.20MB), Physical input: 11.31MB, Physical input time: 23.33s Execution time: ~7 seconds SELECT * FROM tpch10.lineitem_1 LIMIT 250000; -- 250K -- Input: 254029 rows (37.98MB), Physical input: 28.26MB, Physical input time: 28.88s Execution time: ~7 seconds SELECT * FROM tpch10.lineitem_1 LIMIT 500000; -- 500K -- Input: 501325 rows (74.96MB), Physical input: 55.78MB, Physical input time: 38.63s Execution time: ~8 seconds SELECT * FROM tpch10.lineitem_1 LIMIT 1000000; -- 1 million -- Input: 1003085 rows (149.99MB), Physical input: 111.62MB, Physical input time: 1.05m Execution time: ~9 seconds
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For managed tables
and for views/materialized views
where skip_view_materialization
is disabled,
The Storage API without limit pushdown
usually gives better performance. The REST API with limit pushdown
is only marginally faster (1-2 seconds) for very small data sizes, around 1 MB. So, I am choosing to skip the limit pushdown for these tables.
As for estimating stream size and adjusting split mode dynamically, we could implement something like shouldUseRestApiForLimit
to decide between the REST API and Storage API. However, adding this would increase complexity without significant gains. Let me know if you think this is worth exploring further. CC: @ebyhr | @Praveen2112
private static final long DATA_THRESHOLD_TO_USE_REST_API_FOR_LIMIT = 1024 * 1024; // 1MB
...
public static boolean shouldUseRestApiForLimit(TableInfo tableInfo, int projectedColumnsCount, long limit)
{
Schema schema = tableInfo.getDefinition().getSchema();
if (schema == null) {
SchemaTableName schemaTableName = new SchemaTableName(tableInfo.getTableId().getDataset(), tableInfo.getTableId().getTable());
throw new TableNotFoundException(schemaTableName, format("Table '%s' has no schema", schemaTableName));
}
long tableSize = tableInfo.getNumBytes();
long numRows = tableInfo.getNumRows().longValueExact();
int totalColumns = schema.getFields().size();
double columnFraction = (double) projectedColumnsCount / totalColumns;
double estimatedSizePerRow = ((double) tableSize / numRows) * columnFraction;
long expectedReadSize = (long) (estimatedSizePerRow * limit);
return expectedReadSize < DATA_THRESHOLD_TO_USE_REST_API_FOR_LIMIT;
}
23b37b3
to
54851a7
Compare
Limit pushdown will be supported for
|
Hi @ebyhr, Could you please run this PR against secrets? |
54851a7
to
788e9d5
Compare
(error-prone fix and rebased with master) |
/test-with-secrets sha=788e9d5d3fa7e46fe3ec29cf462ce49c5fec6218 |
The CI workflow run with tests that require additional secrets has been started: https://github.com/trinodb/trino/actions/runs/11827619150 |
plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/BaseBigQueryConnectorTest.java
Outdated
Show resolved
Hide resolved
plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/BaseBigQueryConnectorTest.java
Show resolved
Hide resolved
788e9d5
to
ca72aa6
Compare
/test-with-secrets sha=cceef83d3ad45644c1a2958cbd59096f489f199f |
The CI workflow run with tests that require additional secrets has been started: https://github.com/trinodb/trino/actions/runs/11913214142 |
plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/BaseBigQueryConnectorTest.java
Show resolved
Hide resolved
plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryMetadata.java
Outdated
Show resolved
Hide resolved
plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryQueryPageSource.java
Outdated
Show resolved
Hide resolved
plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/BaseBigQueryConnectorTest.java
Show resolved
Hide resolved
plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/BaseBigQueryConnectorTest.java
Show resolved
Hide resolved
plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQuerySplitSource.java
Outdated
Show resolved
Hide resolved
cceef83
to
88e04b0
Compare
Thanks @pajaks | @findinpath for the review. AC. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM % changes.
} | ||
|
||
public static String selectSql(TableId table, String formattedColumns, Optional<String> filter) | ||
public static String selectSql(TableId table, String formattedColumns, Optional<String> filter, OptionalLong limit) | ||
{ | ||
String tableName = fullTableName(table); | ||
String query = format("SELECT %s FROM `%s`", formattedColumns, tableName); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Maybe as a follow-up - Can we use StringBuilder
here ?
plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryQueryPageSource.java
Outdated
Show resolved
Hide resolved
plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQuerySplitSource.java
Outdated
Show resolved
Hide resolved
plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/BaseBigQueryConnectorTest.java
Show resolved
Hide resolved
assertThat(query(session, "SELECT COUNT(*) FROM (SELECT * FROM %s LIMIT 3)".formatted(tableName))) | ||
.matches("VALUES BIGINT '3'") | ||
.isNotFullyPushedDown(aggregationOverTableScan); | ||
assertThat(query(session, "SELECT COUNT(*) FROM (SELECT * FROM %s LIMIT 10)".formatted(tableName))) | ||
.matches("VALUES BIGINT '5'") | ||
.isNotFullyPushedDown(aggregationOverTableScan); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How do we ensure that LIMIT
is pushed down here ? Can we add anyNot(LimitNode.class, ...)
to ensure that Limit pushdown did happen here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As discussed offline, Using tableScan
to verify limit value.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now verifying tableScan
with limit along with anyNot(LimitNode.class, ...)
anyNot(
LimitNode.class, node(
AggregationNode.class, anyTree(node(
AggregationNode.class,
tableScan(table -> {
BigQueryTableHandle actualTableHandle = (BigQueryTableHandle) table;
return actualTableHandle.limit().equals(limit);
},
TupleDomain.all(),
ImmutableMap.of())))));
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CC: @Praveen2112 ^^
88e04b0
to
7a829b0
Compare
Thanks @Praveen2112 for the review. AC. |
7a829b0
to
e643cbc
Compare
(CI fix) |
e643cbc
to
4cab533
Compare
(Pushed final set of changes for verifying plan) |
Hi @Praveen2112 Can this be merged if there are no comments? |
plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryUtil.java
Outdated
Show resolved
Hide resolved
plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/BaseBigQueryConnectorTest.java
Outdated
Show resolved
Hide resolved
Limit pushdown will be supported only when query based approach is applied 1. external table 2. native query 3. view/materialized-view (with skip_view_materialization enabled)
4cab533
to
57b2176
Compare
Thanks @Praveen2112. AC. |
/test-with-secrets sha=57b217641b9a7798c234dfbb55ac836404caa99b |
The CI workflow run with tests that require additional secrets finished as failure: https://github.com/trinodb/trino/actions/runs/12026031079 |
CI failure is unrelated to this PR. |
Description
Fixes #23937
Since BigQuery Storage API doesn't support limiting rows, but REST API supports LIMIT clause.
https://cloud.google.com/bigquery/docs/reference/standard-sql/query-syntax#limit_and_offset_clause
So, We will be using REST API to read all kinds of table (view, MV, managed table, external table, snapshot table, ...) when limit is specified in the query.
Additional Context
I have tested the limit with a
managed table
withRest API + limit pushdown
and withStorage API without limit pushdown
.The
Storage API without limit pushdown
generally performs better in most cases. However, theREST API with limit pushdown
is only slightly faster by about 1–2 seconds when the physical data size is very small, around 1 MB.Table
With Rest API + limit pushdown
With Storage API without limit pushdown
By observing this test, we are supporting
LIMIT
pushdown only for those queries that useREST
API.Release notes