Skip to content

Commit

Permalink
Move partition projection property processing to HiveMetastoreClosure
Browse files Browse the repository at this point in the history
  • Loading branch information
dain committed Nov 18, 2023
1 parent 24523fb commit febb70b
Show file tree
Hide file tree
Showing 14 changed files with 59 additions and 170 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ public HiveMetadataFactory(
public TransactionalMetadata create(ConnectorIdentity identity, boolean autoCommit)
{
CachingHiveMetastore cachingHiveMetastore = createPerTransactionCache(metastoreFactory.createMetastore(Optional.of(identity)), perTransactionCacheMaximumSize);
HiveMetastoreClosure hiveMetastoreClosure = new HiveMetastoreClosure(cachingHiveMetastore);
HiveMetastoreClosure hiveMetastoreClosure = new HiveMetastoreClosure(cachingHiveMetastore, typeManager, partitionProjectionEnabled);

DirectoryLister directoryLister = transactionScopeCachingDirectoryListerFactory.get(this.directoryLister);
SemiTransactionalHiveMetastore metastore = new SemiTransactionalHiveMetastore(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import io.trino.hive.thrift.metastore.DataOperationType;
import io.trino.plugin.hive.acid.AcidOperation;
import io.trino.plugin.hive.acid.AcidTransaction;
import io.trino.plugin.hive.aws.athena.PartitionProjection;
import io.trino.plugin.hive.metastore.AcidTransactionOwner;
import io.trino.plugin.hive.metastore.Column;
import io.trino.plugin.hive.metastore.Database;
Expand All @@ -28,13 +29,15 @@
import io.trino.plugin.hive.metastore.PartitionWithStatistics;
import io.trino.plugin.hive.metastore.PrincipalPrivileges;
import io.trino.plugin.hive.metastore.Table;
import io.trino.spi.TrinoException;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.connector.TableNotFoundException;
import io.trino.spi.function.LanguageFunction;
import io.trino.spi.function.SchemaFunctionName;
import io.trino.spi.predicate.TupleDomain;
import io.trino.spi.security.RoleGrant;
import io.trino.spi.type.Type;
import io.trino.spi.type.TypeManager;

import java.util.Collection;
import java.util.List;
Expand All @@ -47,20 +50,26 @@
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static com.google.common.collect.Maps.immutableEntry;
import static io.trino.plugin.hive.HiveErrorCode.HIVE_TABLE_DROPPED_DURING_QUERY;
import static io.trino.plugin.hive.HivePartitionManager.extractPartitionValues;
import static io.trino.plugin.hive.aws.athena.PartitionProjectionProperties.getPartitionProjectionFromTable;
import static java.util.Objects.requireNonNull;

public class HiveMetastoreClosure
{
private final HiveMetastore delegate;
private final TypeManager typeManager;
private final boolean partitionProjectionEnabled;

/**
* Do not use this directly. Instead, the closure should be fetched from the current SemiTransactionalHiveMetastore,
* which can be fetched from the current HiveMetadata.
*/
public HiveMetastoreClosure(HiveMetastore delegate)
public HiveMetastoreClosure(HiveMetastore delegate, TypeManager typeManager, boolean partitionProjectionEnabled)
{
this.delegate = requireNonNull(delegate, "delegate is null");
this.typeManager = requireNonNull(typeManager, "typeManager is null");
this.partitionProjectionEnabled = partitionProjectionEnabled;
}

public Optional<Database> getDatabase(String databaseName)
Expand Down Expand Up @@ -247,12 +256,21 @@ public Optional<List<String>> getPartitionNamesByFilter(
List<String> columnNames,
TupleDomain<String> partitionKeysFilter)
{
if (partitionProjectionEnabled) {
Table table = getTable(databaseName, tableName)
.orElseThrow(() -> new TrinoException(HIVE_TABLE_DROPPED_DURING_QUERY, "Table does not exists: " + tableName));

Optional<PartitionProjection> projection = getPartitionProjectionFromTable(table, typeManager);
if (projection.isPresent()) {
return projection.get().getProjectedPartitionNamesByFilter(columnNames, partitionKeysFilter);
}
}
return delegate.getPartitionNamesByFilter(databaseName, tableName, columnNames, partitionKeysFilter);
}

private List<Partition> getExistingPartitionsByNames(Table table, List<String> partitionNames)
{
Map<String, Partition> partitions = delegate.getPartitionsByNames(table, partitionNames).entrySet().stream()
Map<String, Partition> partitions = getPartitionsByNames(table, partitionNames).entrySet().stream()
.map(entry -> immutableEntry(entry.getKey(), entry.getValue().orElseThrow(() ->
new PartitionNotFoundException(table.getSchemaTableName(), extractPartitionValues(entry.getKey())))))
.collect(toImmutableMap(Map.Entry::getKey, Map.Entry::getValue));
Expand All @@ -265,11 +283,22 @@ private List<Partition> getExistingPartitionsByNames(Table table, List<String> p
public Map<String, Optional<Partition>> getPartitionsByNames(String databaseName, String tableName, List<String> partitionNames)
{
return delegate.getTable(databaseName, tableName)
.map(table -> delegate.getPartitionsByNames(table, partitionNames))
.map(table -> getPartitionsByNames(table, partitionNames))
.orElseGet(() -> partitionNames.stream()
.collect(toImmutableMap(name -> name, name -> Optional.empty())));
}

private Map<String, Optional<Partition>> getPartitionsByNames(Table table, List<String> partitionNames)
{
if (partitionProjectionEnabled) {
Optional<PartitionProjection> projection = getPartitionProjectionFromTable(table, typeManager);
if (projection.isPresent()) {
return projection.get().getProjectedPartitionsByNames(table, partitionNames);
}
}
return delegate.getPartitionsByNames(table, partitionNames);
}

public void addPartitions(String databaseName, String tableName, List<PartitionWithStatistics> partitions)
{
delegate.addPartitions(databaseName, tableName, partitions);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ private HivePageSink createPageSink(HiveWritableTableHandle handle, boolean isCr
handle.getLocationHandle(),
locationService,
session.getQueryId(),
new HivePageSinkMetadataProvider(handle.getPageSinkMetadata(), new HiveMetastoreClosure(cachingHiveMetastore)),
new HivePageSinkMetadataProvider(handle.getPageSinkMetadata(), new HiveMetastoreClosure(cachingHiveMetastore, typeManager, false)),
typeManager,
pageSorter,
writerSortBufferSize,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import io.trino.plugin.base.jmx.ConnectorObjectNameGeneratorModule;
import io.trino.plugin.base.jmx.MBeanServerModule;
import io.trino.plugin.base.session.SessionPropertiesProvider;
import io.trino.plugin.hive.aws.athena.PartitionProjectionModule;
import io.trino.plugin.hive.fs.CachingDirectoryListerModule;
import io.trino.plugin.hive.fs.DirectoryLister;
import io.trino.plugin.hive.metastore.HiveMetastore;
Expand Down Expand Up @@ -106,7 +105,6 @@ public static Connector createConnector(
new JsonModule(),
new TypeDeserializerModule(context.getTypeManager()),
new HiveModule(),
new PartitionProjectionModule(),
new CachingDirectoryListerModule(directoryLister),
new HiveMetastoreModule(metastore),
new HiveSecurityModule(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;

final class PartitionProjection
public final class PartitionProjection
{
private static final Pattern PROJECTION_LOCATION_TEMPLATE_PLACEHOLDER_PATTERN = Pattern.compile("(\\$\\{[^}]+\\})");

Expand Down

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ public static boolean arePartitionProjectionPropertiesSet(ConnectorTableMetadata
.anyMatch(propertyKey -> propertyKey.startsWith(PROPERTY_KEY_PREFIX));
}

static Optional<PartitionProjection> getPartitionProjectionFromTable(Table table, TypeManager typeManager)
public static Optional<PartitionProjection> getPartitionProjectionFromTable(Table table, TypeManager typeManager)
{
Map<String, String> tableProperties = table.getParameters();
if (parseBoolean(tableProperties.get(METASTORE_PROPERTY_PROJECTION_IGNORE)) ||
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

public interface HiveMetastoreDecorator
{
int PRIORITY_PARTITION_PROJECTION = 50;
int PRIORITY_TRACING = 100;

int getPriority();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3318,7 +3318,7 @@ public void testUpdateTableColumnStatisticsEmptyOptionalFields()

protected void testUpdateTableStatistics(SchemaTableName tableName, PartitionStatistics initialStatistics, PartitionStatistics... statistics)
{
HiveMetastoreClosure metastoreClient = new HiveMetastoreClosure(getMetastoreClient());
HiveMetastoreClosure metastoreClient = new HiveMetastoreClosure(getMetastoreClient(), TESTING_TYPE_MANAGER, false);
assertThat(metastoreClient.getTableStatistics(tableName.getSchemaName(), tableName.getTableName(), Optional.empty()))
.isEqualTo(initialStatistics);

Expand Down Expand Up @@ -3404,7 +3404,7 @@ public void testDataColumnProperties()
throws Exception
{
SchemaTableName tableName = temporaryTable("test_column_properties");
HiveMetastoreClosure metastoreClient = new HiveMetastoreClosure(getMetastoreClient());
HiveMetastoreClosure metastoreClient = new HiveMetastoreClosure(getMetastoreClient(), TESTING_TYPE_MANAGER, false);
try {
doCreateEmptyTable(tableName, ORC, List.of(new ColumnMetadata("id", BIGINT), new ColumnMetadata("part_key", createVarcharType(256))));

Expand Down Expand Up @@ -3447,7 +3447,7 @@ public void testPartitionColumnProperties()
throws Exception
{
SchemaTableName tableName = temporaryTable("test_column_properties");
HiveMetastoreClosure metastoreClient = new HiveMetastoreClosure(getMetastoreClient());
HiveMetastoreClosure metastoreClient = new HiveMetastoreClosure(getMetastoreClient(), TESTING_TYPE_MANAGER, false);
try {
doCreateEmptyTable(tableName, ORC, List.of(new ColumnMetadata("id", BIGINT), new ColumnMetadata("part_key", createVarcharType(256))));

Expand Down Expand Up @@ -3613,7 +3613,7 @@ protected void createDummyPartitionedTable(SchemaTableName tableName, List<Colum
{
doCreateEmptyTable(tableName, ORC, columns);

HiveMetastoreClosure metastoreClient = new HiveMetastoreClosure(getMetastoreClient());
HiveMetastoreClosure metastoreClient = new HiveMetastoreClosure(getMetastoreClient(), TESTING_TYPE_MANAGER, false);
Table table = metastoreClient.getTable(tableName.getSchemaName(), tableName.getTableName())
.orElseThrow(() -> new TableNotFoundException(tableName));

Expand Down Expand Up @@ -3643,7 +3643,7 @@ protected void testUpdatePartitionStatistics(
String firstPartitionName = "ds=2016-01-01";
String secondPartitionName = "ds=2016-01-02";

HiveMetastoreClosure metastoreClient = new HiveMetastoreClosure(getMetastoreClient());
HiveMetastoreClosure metastoreClient = new HiveMetastoreClosure(getMetastoreClient(), TESTING_TYPE_MANAGER, false);
assertThat(metastoreClient.getPartitionStatistics(tableName.getSchemaName(), tableName.getTableName(), ImmutableSet.of(firstPartitionName, secondPartitionName)))
.isEqualTo(ImmutableMap.of(firstPartitionName, initialStatistics, secondPartitionName, initialStatistics));

Expand Down Expand Up @@ -3700,7 +3700,7 @@ protected void testStorePartitionWithStatistics(
try {
doCreateEmptyTable(tableName, ORC, columns);

HiveMetastoreClosure metastoreClient = new HiveMetastoreClosure(getMetastoreClient());
HiveMetastoreClosure metastoreClient = new HiveMetastoreClosure(getMetastoreClient(), TESTING_TYPE_MANAGER, false);
Table table = metastoreClient.getTable(tableName.getSchemaName(), tableName.getTableName())
.orElseThrow(() -> new TableNotFoundException(tableName));

Expand Down Expand Up @@ -3801,7 +3801,7 @@ protected void testPartitionStatisticsSampling(List<ColumnMetadata> columns, Par

try {
createDummyPartitionedTable(tableName, columns);
HiveMetastoreClosure metastoreClient = new HiveMetastoreClosure(getMetastoreClient());
HiveMetastoreClosure metastoreClient = new HiveMetastoreClosure(getMetastoreClient(), TESTING_TYPE_MANAGER, false);
metastoreClient.updatePartitionStatistics(tableName.getSchemaName(), tableName.getTableName(), "ds=2016-01-01", actualStatistics -> statistics);
metastoreClient.updatePartitionStatistics(tableName.getSchemaName(), tableName.getTableName(), "ds=2016-01-02", actualStatistics -> statistics);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import static io.trino.plugin.hive.acid.AcidOperation.INSERT;
import static io.trino.plugin.hive.util.HiveBucketing.BucketingVersion.BUCKETING_V1;
import static io.trino.testing.TestingConnectorSession.SESSION;
import static io.trino.type.InternalTypeManager.TESTING_TYPE_MANAGER;
import static java.util.concurrent.Executors.newFixedThreadPool;
import static java.util.concurrent.Executors.newScheduledThreadPool;
import static org.testng.Assert.assertTrue;
Expand Down Expand Up @@ -80,7 +81,7 @@ private SemiTransactionalHiveMetastore getSemiTransactionalHiveMetastoreWithDrop
{
return new SemiTransactionalHiveMetastore(
HDFS_FILE_SYSTEM_FACTORY,
new HiveMetastoreClosure(new TestingHiveMetastore()),
new HiveMetastoreClosure(new TestingHiveMetastore(), TESTING_TYPE_MANAGER, false),
directExecutor(),
dropExecutor,
directExecutor(),
Expand Down Expand Up @@ -121,7 +122,7 @@ private SemiTransactionalHiveMetastore getSemiTransactionalHiveMetastoreWithUpda
{
return new SemiTransactionalHiveMetastore(
HDFS_FILE_SYSTEM_FACTORY,
new HiveMetastoreClosure(new TestingHiveMetastore()),
new HiveMetastoreClosure(new TestingHiveMetastore(), TESTING_TYPE_MANAGER, false),
directExecutor(),
directExecutor(),
updateExecutor,
Expand Down
Loading

0 comments on commit febb70b

Please sign in to comment.