Skip to content

Commit

Permalink
Use dynamic filters to reduce hive partitions metadata listing
Browse files Browse the repository at this point in the history
Improves performance of splits generation on large partitioned hive tables
by using dynamic filters to reduce the partitions metadata fetched by
io.trino.metastore.HiveMetastore#getPartitionsByNames
  • Loading branch information
raunaqmorarka committed Oct 7, 2024
1 parent 8fc7a2a commit 7ec87bc
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import io.trino.plugin.hive.util.HiveUtil;
import io.trino.spi.TrinoException;
import io.trino.spi.VersionEmbedder;
import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorSplitManager;
import io.trino.spi.connector.ConnectorSplitSource;
Expand All @@ -48,6 +49,7 @@
import io.trino.spi.connector.FixedSplitSource;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.connector.TableNotFoundException;
import io.trino.spi.predicate.TupleDomain;
import io.trino.spi.type.TypeManager;
import jakarta.annotation.Nullable;
import org.weakref.jmx.Managed;
Expand Down Expand Up @@ -78,6 +80,7 @@
import static io.trino.plugin.hive.HiveErrorCode.HIVE_INVALID_METADATA;
import static io.trino.plugin.hive.HiveErrorCode.HIVE_PARTITION_DROPPED_DURING_QUERY;
import static io.trino.plugin.hive.HiveErrorCode.HIVE_PARTITION_SCHEMA_MISMATCH;
import static io.trino.plugin.hive.HivePartitionManager.partitionMatches;
import static io.trino.plugin.hive.HiveSessionProperties.getDynamicFilteringWaitTimeout;
import static io.trino.plugin.hive.HiveSessionProperties.getTimestampPrecision;
import static io.trino.plugin.hive.HiveSessionProperties.isIgnoreAbsentPartitions;
Expand Down Expand Up @@ -252,7 +255,9 @@ public ConnectorSplitSource getSplits(
table,
peekingIterator(partitions),
bucketHandle.map(HiveBucketHandle::toTableBucketProperty),
neededColumnNames);
neededColumnNames,
dynamicFilter,
hiveTable);

HiveSplitLoader hiveSplitLoader = new BackgroundHiveSplitLoader(
table,
Expand Down Expand Up @@ -305,7 +310,9 @@ private Iterator<HivePartitionMetadata> getPartitionMetadata(
Table table,
PeekingIterator<HivePartition> hivePartitions,
Optional<HiveBucketProperty> bucketProperty,
Set<String> neededColumnNames)
Set<String> neededColumnNames,
DynamicFilter dynamicFilter,
HiveTableHandle tableHandle)
{
if (!hivePartitions.hasNext()) {
return emptyIterator();
Expand All @@ -324,6 +331,15 @@ private Iterator<HivePartitionMetadata> getPartitionMetadata(

Iterator<List<HivePartition>> partitionNameBatches = partitionExponentially(hivePartitions, minPartitionBatchSize, maxPartitionBatchSize);
Iterator<List<HivePartitionMetadata>> partitionBatches = transform(partitionNameBatches, partitionBatch -> {
// Use dynamic filters to reduce the partitions listed by getPartitionsByNames
TupleDomain<ColumnHandle> currentDynamicFilter = dynamicFilter.getCurrentPredicate();
if (!currentDynamicFilter.isAll()) {
TupleDomain<ColumnHandle> partitionsFilter = currentDynamicFilter.intersect(tableHandle.getCompactEffectivePredicate());
partitionBatch = partitionBatch.stream()
.filter(hivePartition -> partitionMatches(tableHandle.getPartitionColumns(), partitionsFilter, hivePartition))
.collect(toImmutableList());
}

SchemaTableName tableName = table.getSchemaTableName();
Map<String, Optional<Partition>> partitions = metastore.getPartitionsByNames(
tableName.getSchemaName(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import com.google.common.collect.ImmutableMultiset;
import com.google.common.collect.Multiset;
import io.trino.Session;
import io.trino.plugin.hive.HiveQueryRunner;
import io.trino.plugin.hive.metastore.MetastoreMethod;
import io.trino.testing.AbstractTestQueryFramework;
Expand All @@ -23,6 +24,7 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.parallel.Execution;

import static io.trino.SystemSessionProperties.ENABLE_DYNAMIC_FILTERING;
import static io.trino.plugin.hive.metastore.MetastoreInvocations.assertMetastoreInvocationsForQuery;
import static io.trino.plugin.hive.metastore.MetastoreMethod.CREATE_TABLE;
import static io.trino.plugin.hive.metastore.MetastoreMethod.GET_DATABASE;
Expand All @@ -43,7 +45,9 @@ public class TestHiveMetastoreAccessOperations
protected QueryRunner createQueryRunner()
throws Exception
{
return HiveQueryRunner.create();
return HiveQueryRunner.builder()
.addHiveProperty("hive.dynamic-filtering.wait-timeout", "1h")
.build();
}

@Test
Expand Down Expand Up @@ -179,6 +183,46 @@ public void testSelfJoin()
.build());
}

@Test
public void testDynamicPartitionPruning()
{
assertUpdate(
"CREATE TABLE test_dynamic_partition_pruning_table WITH (partitioned_by=ARRAY['suppkey']) AS " +
"SELECT orderkey, partkey, suppkey FROM tpch.tiny.lineitem",
60175);

Session session = Session.builder(getSession())
// Avoid caching all partitions metadata during planning through getTableStatistics
// and force partitions to be fetched during splits generation
.setCatalogSessionProperty("hive", "partition_statistics_sample_size", "10")
.build();
@Language("SQL") String sql = "SELECT * FROM test_dynamic_partition_pruning_table l JOIN tpch.tiny.supplier s ON l.suppkey = s.suppkey " +
"AND s.name = 'Supplier#000000001'";

assertMetastoreInvocations(
Session.builder(session)
.setSystemProperty(ENABLE_DYNAMIC_FILTERING, "false")
.build(),
sql,
ImmutableMultiset.<MetastoreMethod>builder()
.add(GET_TABLE)
.addCopies(GET_PARTITIONS_BY_NAMES, 5)
.add(GET_PARTITION_COLUMN_STATISTICS)
.addCopies(GET_PARTITION_NAMES_BY_FILTER, 2)
.build());

assertQuerySucceeds("CALL system.flush_metadata_cache()");
assertMetastoreInvocations(
session,
sql,
ImmutableMultiset.<MetastoreMethod>builder()
.add(GET_TABLE)
.add(GET_PARTITIONS_BY_NAMES)
.add(GET_PARTITION_COLUMN_STATISTICS)
.addCopies(GET_PARTITION_NAMES_BY_FILTER, 2)
.build());
}

@Test
public void testExplainSelect()
{
Expand Down Expand Up @@ -301,6 +345,11 @@ public void testDropStatsPartitionedTable()

private void assertMetastoreInvocations(@Language("SQL") String query, Multiset<MetastoreMethod> expectedInvocations)
{
assertMetastoreInvocationsForQuery(getDistributedQueryRunner(), getSession(), query, expectedInvocations);
assertMetastoreInvocations(getSession(), query, expectedInvocations);
}

private void assertMetastoreInvocations(Session session, @Language("SQL") String query, Multiset<MetastoreMethod> expectedInvocations)
{
assertMetastoreInvocationsForQuery(getDistributedQueryRunner(), session, query, expectedInvocations);
}
}

0 comments on commit 7ec87bc

Please sign in to comment.