Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support limit pushdown in Bigquery connector #24021

Merged
merged 2 commits into from
Nov 26, 2024

Conversation

krvikash
Copy link
Contributor

@krvikash krvikash commented Nov 4, 2024

Description

Fixes #23937

Since BigQuery Storage API doesn't support limiting rows, but REST API supports LIMIT clause.
https://cloud.google.com/bigquery/docs/reference/standard-sql/query-syntax#limit_and_offset_clause

So, We will be using REST API to read all kinds of table (view, MV, managed table, external table, snapshot table, ...) when limit is specified in the query.

Additional Context

I have tested the limit with a managed table with Rest API + limit pushdown and with Storage API without limit pushdown.

The Storage API without limit pushdown generally performs better in most cases. However, the REST API with limit pushdown is only slightly faster by about 1–2 seconds when the physical data size is very small, around 1 MB.

  • Table

      table size: 856 MB
      rows: 6001215 // 6 million
    
  • With Rest API + limit pushdown

      SELECT * FROM tpch10.lineitem_1 LIMIT 1; -- 1
      -- Input: 1 row (166B), Physical input time: 2.22s
      Execution time: ~4 seconds
    
      SELECT * FROM tpch10.lineitem_1 LIMIT 10; -- 10
      -- Input: 10 rows (1.55kB), Physical input time: 2.85s
      Execution time: ~4 seconds
    
      SELECT * FROM tpch10.lineitem_1 LIMIT 100; -- 100
      -- Input: 100 rows (15.28kB), Physical input time: 2.45s
      Execution time: ~4 seconds
    
      SELECT * FROM tpch10.lineitem_1 LIMIT 1000; -- 1K
      -- Input: 1000 rows (152.78kB), Physical input time: 2.17s
      Execution time: ~4 seconds
    
      SELECT * FROM tpch10.lineitem_1 LIMIT 2500; -- 2.5K
      -- Input: 2500 rows (382.77kB), Physical input time: 2.99s
      Execution time: ~5 seconds
    
      SELECT * FROM tpch10.lineitem_1 LIMIT 5000; -- 5K
      -- Input: 5000 rows (765.49kB), Physical input time: 4.46s
      Execution time: ~6 seconds
    
      SELECT * FROM tpch10.lineitem_1 LIMIT 7500; -- 7.5K
      -- Input: 7500 rows (1.12MB), Physical input time: 5.59s
      Execution time: ~7 seconds
    
      SELECT * FROM tpch10.lineitem_1 LIMIT 10000; -- 10K
      -- Input: 10000 rows (1.49MB), Physical input time: 6.49s
      Execution time: ~8 seconds
    
      SELECT * FROM tpch10.lineitem_1 LIMIT 100000; -- 100K
      -- Input: 100000 rows (14.95MB), Physical input time: 41.57s
      Execution time: ~40 seconds
    
      SELECT * FROM tpch10.lineitem_1 LIMIT 250000; -- 250K
      -- Input: 250000 rows (37.38MB), Physical input time: 1.65m
      Execution time: ~1 minutes 37 seconds
    
      SELECT * FROM tpch10.lineitem_1 LIMIT 500000; -- 500K
      -- Input: 500000 rows (74.76MB), Physical input time: 3.22m
      Execution time: ~3 minutes 14 seconds
    
      SELECT * FROM tpch10.lineitem_1 LIMIT 1000000; -- 1 million
      -- Input: 1000000 rows (149.52MB), Physical input time: 6.42m
      Execution time: ~6 minutes 60 seconds
    
  • With Storage API without limit pushdown

      SELECT * FROM tpch10.lineitem_1 LIMIT 1; -- 1
      -- Input: 1792 rows (274.48kB), Physical input: 204.56kB, Physical input time: 1.49s
      Execution time: ~5 seconds
    
      SELECT * FROM tpch10.lineitem_1 LIMIT 10; -- 10
      -- Input: 1792 rows (273.93kB), Physical input: 202.76kB, Physical input time: 997.29ms
      Execution time: ~5 seconds
    
      SELECT * FROM tpch10.lineitem_1 LIMIT 100; -- 100
      -- Input: 1792 rows (274.82kB), Physical input: 204.91kB, Physical input time: 1.55s
      Execution time: ~5 seconds
    
      SELECT * FROM tpch10.lineitem_1 LIMIT 1000; -- 1K
      -- Input: 3584 rows (548.75kB), Physical input: 407.66kB, Physical input time: 3.05s
      Execution time: ~5 seconds
      
      SELECT * FROM tpch10.lineitem_1 LIMIT 2500; -- 2.5K
      -- Input: 5376 rows (822.89kB), Physical input: 611.83kB, Physical input time: 5.01s
      Execution time: ~6 seconds
    
      SELECT * FROM tpch10.lineitem_1 LIMIT 5000; -- 5K
      -- Input: 7168 rows (1.07MB), Physical input: 815.95kB, Physical input time: 6.37s
      Execution time: ~6 seconds
    
      SELECT * FROM tpch10.lineitem_1 LIMIT 7500; -- 7.5K
      -- Input: 12544 rows (1.87MB), Physical input: 1.39MB, Physical input time: 10.97s
      Execution time: ~7 seconds
    
      SELECT * FROM tpch10.lineitem_1 LIMIT 10000; -- 10K
      -- Input: 14336 rows (2.14MB), Physical input: 1.59MB, Physical input time: 13.24s
      Execution time: ~5 seconds
    
      SELECT * FROM tpch10.lineitem_1 LIMIT 100000; -- 100K
      -- Input: 101709 rows (15.20MB), Physical input: 11.31MB, Physical input time: 23.33s
      Execution time: ~7 seconds
    
      SELECT * FROM tpch10.lineitem_1 LIMIT 250000; -- 250K
      -- Input: 254029 rows (37.98MB), Physical input: 28.26MB, Physical input time: 28.88s
      Execution time: ~7 seconds
    
      SELECT * FROM tpch10.lineitem_1 LIMIT 500000; -- 500K
      -- Input: 501325 rows (74.96MB), Physical input: 55.78MB, Physical input time: 38.63s
      Execution time: ~8 seconds
    
      SELECT * FROM tpch10.lineitem_1 LIMIT 1000000; -- 1 million
      -- Input: 1003085 rows (149.99MB), Physical input: 111.62MB, Physical input time: 1.05m
      Execution time: ~9 seconds
    

By observing this test, we are supporting LIMIT pushdown only for those queries that use REST API.

Release notes

## BigQuery
* Add support for LIMIT pushdown. ({issue}`23937`)

@cla-bot cla-bot bot added the cla-signed label Nov 4, 2024
@krvikash krvikash changed the title Support limit pushdown in Bigquery connector [WIP] Support limit pushdown in Bigquery connector Nov 4, 2024
@github-actions github-actions bot added the bigquery BigQuery connector label Nov 4, 2024
@krvikash krvikash force-pushed the bigquery-limit-pushdown branch 2 times, most recently from e3d5b41 to 2061c56 Compare November 5, 2024 17:19
@krvikash krvikash changed the title [WIP] Support limit pushdown in Bigquery connector Support limit pushdown in Bigquery connector Nov 5, 2024
@krvikash krvikash marked this pull request as ready for review November 5, 2024 17:20
@ebyhr
Copy link
Member

ebyhr commented Nov 6, 2024

/test-with-secrets sha=2061c56273afc55ffabb8bb4e139e3260bf7488b

Copy link

github-actions bot commented Nov 6, 2024

The CI workflow run with tests that require additional secrets has been started: https://github.com/trinodb/trino/actions/runs/11694897950

@krvikash krvikash marked this pull request as draft November 6, 2024 06:14
@krvikash krvikash force-pushed the bigquery-limit-pushdown branch from 2061c56 to 23b37b3 Compare November 8, 2024 20:22
@krvikash krvikash marked this pull request as ready for review November 8, 2024 20:25
@krvikash
Copy link
Contributor Author

krvikash commented Nov 8, 2024

(rebased with master). Ready for review.

Comment on lines 197 to 201
if (limit.isPresent()) {
// Storage API doesn't support limiting rows, but REST API supports LIMIT clause.
// So use REST API to pushdown limit.
return ImmutableList.of(BigQuerySplit.forViewStream(columns, filter, limit));
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But what if the LIMIT size is too large or if the table is a wide one ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to estimate stream size beforehand and change the split mode based on that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Praveen2112 | @ebyhr ,

I have tested the limit with a managed table with Rest API + limit pushdown and with Storage API without limit pushdown.

The Storage API without limit pushdown generally performs better in most cases. However, the REST API with limit pushdown is only slightly faster by about 1–2 seconds when the physical data size is very small, around 1 MB.

  • Table

      table size: 856 MB
      rows: 6001215 // 6 million
    
  • With Rest API + limit pushdown

      SELECT * FROM tpch10.lineitem_1 LIMIT 1; -- 1
      -- Input: 1 row (166B), Physical input time: 2.22s
      Execution time: ~4 seconds
    
      SELECT * FROM tpch10.lineitem_1 LIMIT 10; -- 10
      -- Input: 10 rows (1.55kB), Physical input time: 2.85s
      Execution time: ~4 seconds
    
      SELECT * FROM tpch10.lineitem_1 LIMIT 100; -- 100
      -- Input: 100 rows (15.28kB), Physical input time: 2.45s
      Execution time: ~4 seconds
    
      SELECT * FROM tpch10.lineitem_1 LIMIT 1000; -- 1K
      -- Input: 1000 rows (152.78kB), Physical input time: 2.17s
      Execution time: ~4 seconds
    
      SELECT * FROM tpch10.lineitem_1 LIMIT 2500; -- 2.5K
      -- Input: 2500 rows (382.77kB), Physical input time: 2.99s
      Execution time: ~5 seconds
    
      SELECT * FROM tpch10.lineitem_1 LIMIT 5000; -- 5K
      -- Input: 5000 rows (765.49kB), Physical input time: 4.46s
      Execution time: ~6 seconds
    
      SELECT * FROM tpch10.lineitem_1 LIMIT 7500; -- 7.5K
      -- Input: 7500 rows (1.12MB), Physical input time: 5.59s
      Execution time: ~7 seconds
    
      SELECT * FROM tpch10.lineitem_1 LIMIT 10000; -- 10K
      -- Input: 10000 rows (1.49MB), Physical input time: 6.49s
      Execution time: ~8 seconds
    
      SELECT * FROM tpch10.lineitem_1 LIMIT 100000; -- 100K
      -- Input: 100000 rows (14.95MB), Physical input time: 41.57s
      Execution time: ~40 seconds
    
      SELECT * FROM tpch10.lineitem_1 LIMIT 250000; -- 250K
      -- Input: 250000 rows (37.38MB), Physical input time: 1.65m
      Execution time: ~1 minutes 37 seconds
    
      SELECT * FROM tpch10.lineitem_1 LIMIT 500000; -- 500K
      -- Input: 500000 rows (74.76MB), Physical input time: 3.22m
      Execution time: ~3 minutes 14 seconds
    
      SELECT * FROM tpch10.lineitem_1 LIMIT 1000000; -- 1 million
      -- Input: 1000000 rows (149.52MB), Physical input time: 6.42m
      Execution time: ~6 minutes 60 seconds
    
  • With Storage API without limit pushdown

      SELECT * FROM tpch10.lineitem_1 LIMIT 1; -- 1
      -- Input: 1792 rows (274.48kB), Physical input: 204.56kB, Physical input time: 1.49s
      Execution time: ~5 seconds
    
      SELECT * FROM tpch10.lineitem_1 LIMIT 10; -- 10
      -- Input: 1792 rows (273.93kB), Physical input: 202.76kB, Physical input time: 997.29ms
      Execution time: ~5 seconds
    
      SELECT * FROM tpch10.lineitem_1 LIMIT 100; -- 100
      -- Input: 1792 rows (274.82kB), Physical input: 204.91kB, Physical input time: 1.55s
      Execution time: ~5 seconds
    
      SELECT * FROM tpch10.lineitem_1 LIMIT 1000; -- 1K
      -- Input: 3584 rows (548.75kB), Physical input: 407.66kB, Physical input time: 3.05s
      Execution time: ~5 seconds
      
      SELECT * FROM tpch10.lineitem_1 LIMIT 2500; -- 2.5K
      -- Input: 5376 rows (822.89kB), Physical input: 611.83kB, Physical input time: 5.01s
      Execution time: ~6 seconds
    
      SELECT * FROM tpch10.lineitem_1 LIMIT 5000; -- 5K
      -- Input: 7168 rows (1.07MB), Physical input: 815.95kB, Physical input time: 6.37s
      Execution time: ~6 seconds
    
      SELECT * FROM tpch10.lineitem_1 LIMIT 7500; -- 7.5K
      -- Input: 12544 rows (1.87MB), Physical input: 1.39MB, Physical input time: 10.97s
      Execution time: ~7 seconds
    
      SELECT * FROM tpch10.lineitem_1 LIMIT 10000; -- 10K
      -- Input: 14336 rows (2.14MB), Physical input: 1.59MB, Physical input time: 13.24s
      Execution time: ~5 seconds
    
      SELECT * FROM tpch10.lineitem_1 LIMIT 100000; -- 100K
      -- Input: 101709 rows (15.20MB), Physical input: 11.31MB, Physical input time: 23.33s
      Execution time: ~7 seconds
    
      SELECT * FROM tpch10.lineitem_1 LIMIT 250000; -- 250K
      -- Input: 254029 rows (37.98MB), Physical input: 28.26MB, Physical input time: 28.88s
      Execution time: ~7 seconds
    
      SELECT * FROM tpch10.lineitem_1 LIMIT 500000; -- 500K
      -- Input: 501325 rows (74.96MB), Physical input: 55.78MB, Physical input time: 38.63s
      Execution time: ~8 seconds
    
      SELECT * FROM tpch10.lineitem_1 LIMIT 1000000; -- 1 million
      -- Input: 1003085 rows (149.99MB), Physical input: 111.62MB, Physical input time: 1.05m
      Execution time: ~9 seconds
    

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For managed tables and for views/materialized views where skip_view_materialization is disabled,
The Storage API without limit pushdown usually gives better performance. The REST API with limit pushdown is only marginally faster (1-2 seconds) for very small data sizes, around 1 MB. So, I am choosing to skip the limit pushdown for these tables.

As for estimating stream size and adjusting split mode dynamically, we could implement something like shouldUseRestApiForLimit to decide between the REST API and Storage API. However, adding this would increase complexity without significant gains. Let me know if you think this is worth exploring further. CC: @ebyhr | @Praveen2112

    private static final long DATA_THRESHOLD_TO_USE_REST_API_FOR_LIMIT = 1024 * 1024; // 1MB
...
    public static boolean shouldUseRestApiForLimit(TableInfo tableInfo, int projectedColumnsCount, long limit)
    {
        Schema schema = tableInfo.getDefinition().getSchema();
        if (schema == null) {
            SchemaTableName schemaTableName = new SchemaTableName(tableInfo.getTableId().getDataset(), tableInfo.getTableId().getTable());
            throw new TableNotFoundException(schemaTableName, format("Table '%s' has no schema", schemaTableName));
        }
        long tableSize = tableInfo.getNumBytes();
        long numRows = tableInfo.getNumRows().longValueExact();
        int totalColumns = schema.getFields().size();

        double columnFraction = (double) projectedColumnsCount / totalColumns;
        double estimatedSizePerRow = ((double) tableSize / numRows) * columnFraction;
        long expectedReadSize = (long) (estimatedSizePerRow * limit);

        return expectedReadSize < DATA_THRESHOLD_TO_USE_REST_API_FOR_LIMIT;
    }

@krvikash krvikash force-pushed the bigquery-limit-pushdown branch from 23b37b3 to 54851a7 Compare November 13, 2024 19:23
@krvikash
Copy link
Contributor Author

Limit pushdown will be supported for

  • external table
  • native query
  • view/materialized-view (with skip_view_materialization enabled)

@krvikash
Copy link
Contributor Author

Hi @ebyhr, Could you please run this PR against secrets?

@krvikash krvikash force-pushed the bigquery-limit-pushdown branch from 54851a7 to 788e9d5 Compare November 13, 2024 19:32
@krvikash
Copy link
Contributor Author

(error-prone fix and rebased with master)

@ebyhr
Copy link
Member

ebyhr commented Nov 13, 2024

/test-with-secrets sha=788e9d5d3fa7e46fe3ec29cf462ce49c5fec6218

Copy link

The CI workflow run with tests that require additional secrets has been started: https://github.com/trinodb/trino/actions/runs/11827619150

@krvikash krvikash force-pushed the bigquery-limit-pushdown branch from 788e9d5 to ca72aa6 Compare November 19, 2024 06:56
@ebyhr
Copy link
Member

ebyhr commented Nov 19, 2024

/test-with-secrets sha=cceef83d3ad45644c1a2958cbd59096f489f199f

Copy link

The CI workflow run with tests that require additional secrets has been started: https://github.com/trinodb/trino/actions/runs/11913214142

@krvikash krvikash force-pushed the bigquery-limit-pushdown branch from cceef83 to 88e04b0 Compare November 20, 2024 08:54
@krvikash
Copy link
Contributor Author

Thanks @pajaks | @findinpath for the review. AC.

Copy link
Member

@Praveen2112 Praveen2112 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM % changes.

}

public static String selectSql(TableId table, String formattedColumns, Optional<String> filter)
public static String selectSql(TableId table, String formattedColumns, Optional<String> filter, OptionalLong limit)
{
String tableName = fullTableName(table);
String query = format("SELECT %s FROM `%s`", formattedColumns, tableName);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Maybe as a follow-up - Can we use StringBuilder here ?

Comment on lines 1301 to 1306
assertThat(query(session, "SELECT COUNT(*) FROM (SELECT * FROM %s LIMIT 3)".formatted(tableName)))
.matches("VALUES BIGINT '3'")
.isNotFullyPushedDown(aggregationOverTableScan);
assertThat(query(session, "SELECT COUNT(*) FROM (SELECT * FROM %s LIMIT 10)".formatted(tableName)))
.matches("VALUES BIGINT '5'")
.isNotFullyPushedDown(aggregationOverTableScan);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do we ensure that LIMIT is pushed down here ? Can we add anyNot(LimitNode.class, ...) to ensure that Limit pushdown did happen here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed offline, Using tableScan to verify limit value.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now verifying tableScan with limit along with anyNot(LimitNode.class, ...)

anyNot(
    LimitNode.class, node(
            AggregationNode.class, anyTree(node(
                    AggregationNode.class,
                    tableScan(table -> {
                        BigQueryTableHandle actualTableHandle = (BigQueryTableHandle) table;
                        return actualTableHandle.limit().equals(limit);
                    },
                    TupleDomain.all(),
                    ImmutableMap.of())))));

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CC: @Praveen2112 ^^

@krvikash krvikash force-pushed the bigquery-limit-pushdown branch from 88e04b0 to 7a829b0 Compare November 21, 2024 09:33
@krvikash
Copy link
Contributor Author

Thanks @Praveen2112 for the review. AC.

@krvikash krvikash force-pushed the bigquery-limit-pushdown branch from 7a829b0 to e643cbc Compare November 21, 2024 10:31
@krvikash
Copy link
Contributor Author

(CI fix)

@krvikash krvikash force-pushed the bigquery-limit-pushdown branch from e643cbc to 4cab533 Compare November 22, 2024 11:27
@krvikash
Copy link
Contributor Author

(Pushed final set of changes for verifying plan)

@krvikash
Copy link
Contributor Author

Hi @Praveen2112 Can this be merged if there are no comments?

Limit pushdown will be supported only when query based approach is applied

1. external table
2. native query
3. view/materialized-view (with skip_view_materialization enabled)
@krvikash krvikash force-pushed the bigquery-limit-pushdown branch from 4cab533 to 57b2176 Compare November 26, 2024 06:53
@krvikash
Copy link
Contributor Author

Thanks @Praveen2112. AC.

@Praveen2112
Copy link
Member

/test-with-secrets sha=57b217641b9a7798c234dfbb55ac836404caa99b

Copy link

github-actions bot commented Nov 26, 2024

The CI workflow run with tests that require additional secrets finished as failure: https://github.com/trinodb/trino/actions/runs/12026031079

@krvikash
Copy link
Contributor Author

The CI workflow run with tests that require additional secrets finished as failure: https://github.com/trinodb/trino/actions/runs/12026031079

CI failure is unrelated to this PR.

@Praveen2112 Praveen2112 merged commit 33cb3ad into trinodb:master Nov 26, 2024
15 of 16 checks passed
@github-actions github-actions bot added this to the 466 milestone Nov 26, 2024
@krvikash krvikash deleted the bigquery-limit-pushdown branch November 26, 2024 12:06
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bigquery BigQuery connector cla-signed
Development

Successfully merging this pull request may close these issues.

Add support for LIMIT pushdown in BigQuery connector
7 participants