From 7ec87bcee3b38f1e5f1bec1df7638fdb83cf7887 Mon Sep 17 00:00:00 2001 From: Raunaq Morarka Date: Sun, 6 Oct 2024 01:31:39 +0530 Subject: [PATCH] Use dynamic filters to reduce hive partitions metadata listing Improves performance of splits generation on large partitioned hive tables by using dynamic filters to reduce the partitions metadata fetched by io.trino.metastore.HiveMetastore#getPartitionsByNames --- .../trino/plugin/hive/HiveSplitManager.java | 20 ++++++- .../TestHiveMetastoreAccessOperations.java | 53 ++++++++++++++++++- 2 files changed, 69 insertions(+), 4 deletions(-) diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveSplitManager.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveSplitManager.java index 5c3ef5d2cc31..0266fef30d2d 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveSplitManager.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveSplitManager.java @@ -38,6 +38,7 @@ import io.trino.plugin.hive.util.HiveUtil; import io.trino.spi.TrinoException; import io.trino.spi.VersionEmbedder; +import io.trino.spi.connector.ColumnHandle; import io.trino.spi.connector.ConnectorSession; import io.trino.spi.connector.ConnectorSplitManager; import io.trino.spi.connector.ConnectorSplitSource; @@ -48,6 +49,7 @@ import io.trino.spi.connector.FixedSplitSource; import io.trino.spi.connector.SchemaTableName; import io.trino.spi.connector.TableNotFoundException; +import io.trino.spi.predicate.TupleDomain; import io.trino.spi.type.TypeManager; import jakarta.annotation.Nullable; import org.weakref.jmx.Managed; @@ -78,6 +80,7 @@ import static io.trino.plugin.hive.HiveErrorCode.HIVE_INVALID_METADATA; import static io.trino.plugin.hive.HiveErrorCode.HIVE_PARTITION_DROPPED_DURING_QUERY; import static io.trino.plugin.hive.HiveErrorCode.HIVE_PARTITION_SCHEMA_MISMATCH; +import static io.trino.plugin.hive.HivePartitionManager.partitionMatches; import static io.trino.plugin.hive.HiveSessionProperties.getDynamicFilteringWaitTimeout; import static io.trino.plugin.hive.HiveSessionProperties.getTimestampPrecision; import static io.trino.plugin.hive.HiveSessionProperties.isIgnoreAbsentPartitions; @@ -252,7 +255,9 @@ public ConnectorSplitSource getSplits( table, peekingIterator(partitions), bucketHandle.map(HiveBucketHandle::toTableBucketProperty), - neededColumnNames); + neededColumnNames, + dynamicFilter, + hiveTable); HiveSplitLoader hiveSplitLoader = new BackgroundHiveSplitLoader( table, @@ -305,7 +310,9 @@ private Iterator getPartitionMetadata( Table table, PeekingIterator hivePartitions, Optional bucketProperty, - Set neededColumnNames) + Set neededColumnNames, + DynamicFilter dynamicFilter, + HiveTableHandle tableHandle) { if (!hivePartitions.hasNext()) { return emptyIterator(); @@ -324,6 +331,15 @@ private Iterator getPartitionMetadata( Iterator> partitionNameBatches = partitionExponentially(hivePartitions, minPartitionBatchSize, maxPartitionBatchSize); Iterator> partitionBatches = transform(partitionNameBatches, partitionBatch -> { + // Use dynamic filters to reduce the partitions listed by getPartitionsByNames + TupleDomain currentDynamicFilter = dynamicFilter.getCurrentPredicate(); + if (!currentDynamicFilter.isAll()) { + TupleDomain partitionsFilter = currentDynamicFilter.intersect(tableHandle.getCompactEffectivePredicate()); + partitionBatch = partitionBatch.stream() + .filter(hivePartition -> partitionMatches(tableHandle.getPartitionColumns(), partitionsFilter, hivePartition)) + .collect(toImmutableList()); + } + SchemaTableName tableName = table.getSchemaTableName(); Map> partitions = metastore.getPartitionsByNames( tableName.getSchemaName(), diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/thrift/TestHiveMetastoreAccessOperations.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/thrift/TestHiveMetastoreAccessOperations.java index da83783c021e..06d0c8cc2343 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/thrift/TestHiveMetastoreAccessOperations.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/thrift/TestHiveMetastoreAccessOperations.java @@ -15,6 +15,7 @@ import com.google.common.collect.ImmutableMultiset; import com.google.common.collect.Multiset; +import io.trino.Session; import io.trino.plugin.hive.HiveQueryRunner; import io.trino.plugin.hive.metastore.MetastoreMethod; import io.trino.testing.AbstractTestQueryFramework; @@ -23,6 +24,7 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.parallel.Execution; +import static io.trino.SystemSessionProperties.ENABLE_DYNAMIC_FILTERING; import static io.trino.plugin.hive.metastore.MetastoreInvocations.assertMetastoreInvocationsForQuery; import static io.trino.plugin.hive.metastore.MetastoreMethod.CREATE_TABLE; import static io.trino.plugin.hive.metastore.MetastoreMethod.GET_DATABASE; @@ -43,7 +45,9 @@ public class TestHiveMetastoreAccessOperations protected QueryRunner createQueryRunner() throws Exception { - return HiveQueryRunner.create(); + return HiveQueryRunner.builder() + .addHiveProperty("hive.dynamic-filtering.wait-timeout", "1h") + .build(); } @Test @@ -179,6 +183,46 @@ public void testSelfJoin() .build()); } + @Test + public void testDynamicPartitionPruning() + { + assertUpdate( + "CREATE TABLE test_dynamic_partition_pruning_table WITH (partitioned_by=ARRAY['suppkey']) AS " + + "SELECT orderkey, partkey, suppkey FROM tpch.tiny.lineitem", + 60175); + + Session session = Session.builder(getSession()) + // Avoid caching all partitions metadata during planning through getTableStatistics + // and force partitions to be fetched during splits generation + .setCatalogSessionProperty("hive", "partition_statistics_sample_size", "10") + .build(); + @Language("SQL") String sql = "SELECT * FROM test_dynamic_partition_pruning_table l JOIN tpch.tiny.supplier s ON l.suppkey = s.suppkey " + + "AND s.name = 'Supplier#000000001'"; + + assertMetastoreInvocations( + Session.builder(session) + .setSystemProperty(ENABLE_DYNAMIC_FILTERING, "false") + .build(), + sql, + ImmutableMultiset.builder() + .add(GET_TABLE) + .addCopies(GET_PARTITIONS_BY_NAMES, 5) + .add(GET_PARTITION_COLUMN_STATISTICS) + .addCopies(GET_PARTITION_NAMES_BY_FILTER, 2) + .build()); + + assertQuerySucceeds("CALL system.flush_metadata_cache()"); + assertMetastoreInvocations( + session, + sql, + ImmutableMultiset.builder() + .add(GET_TABLE) + .add(GET_PARTITIONS_BY_NAMES) + .add(GET_PARTITION_COLUMN_STATISTICS) + .addCopies(GET_PARTITION_NAMES_BY_FILTER, 2) + .build()); + } + @Test public void testExplainSelect() { @@ -301,6 +345,11 @@ public void testDropStatsPartitionedTable() private void assertMetastoreInvocations(@Language("SQL") String query, Multiset expectedInvocations) { - assertMetastoreInvocationsForQuery(getDistributedQueryRunner(), getSession(), query, expectedInvocations); + assertMetastoreInvocations(getSession(), query, expectedInvocations); + } + + private void assertMetastoreInvocations(Session session, @Language("SQL") String query, Multiset expectedInvocations) + { + assertMetastoreInvocationsForQuery(getDistributedQueryRunner(), session, query, expectedInvocations); } }