Skip to content

Commit

Permalink
Add HiveMetastore.getTableNamesWithParameters
Browse files Browse the repository at this point in the history
  • Loading branch information
lukasz-stec authored and raunaqmorarka committed Dec 23, 2024
1 parent 563ba35 commit 8cd060d
Show file tree
Hide file tree
Showing 18 changed files with 222 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package io.trino.metastore;

import com.google.common.collect.ImmutableSet;
import io.trino.metastore.HivePrivilegeInfo.HivePrivilege;
import io.trino.spi.TrinoException;
import io.trino.spi.connector.SchemaTableName;
Expand Down Expand Up @@ -67,6 +68,11 @@ default boolean useSparkTableStatistics()

List<TableInfo> getTables(String databaseName);

/**
* @param parameterValues is using ImmutableSet to mark that this api does not support filtering by null parameter value.
*/
List<String> getTableNamesWithParameters(String databaseName, String parameterKey, ImmutableSet<String> parameterValues);

void createDatabase(Database database);

void dropDatabase(String databaseName, boolean deleteData);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package io.trino.metastore.tracing;

import com.google.common.collect.ImmutableSet;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.Tracer;
import io.trino.metastore.AcidOperation;
Expand Down Expand Up @@ -164,6 +165,20 @@ public List<TableInfo> getTables(String databaseName)
});
}

@Override
public List<String> getTableNamesWithParameters(String databaseName, String parameterKey, ImmutableSet<String> parameterValues)
{
Span span = tracer.spanBuilder("HiveMetastore.getTableNamesWithParameters")
.setAttribute(SCHEMA, databaseName)
.setAttribute(TABLE, parameterKey)
.startSpan();
return withTracing(span, () -> {
List<String> tables = delegate.getTableNamesWithParameters(databaseName, parameterKey, parameterValues);
span.setAttribute(TABLE_RESPONSE_COUNT, tables.size());
return tables;
});
}

@Override
public void createDatabase(Database database)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ public enum ObjectType
private final LoadingCache<HiveTableName, Optional<Table>> tableCache;
private final LoadingCache<String, List<TableInfo>> tablesCacheNew;
private final Cache<HiveTableName, AtomicReference<Map<String, HiveColumnStatistics>>> tableColumnStatisticsCache;
private final LoadingCache<TablesWithParameterCacheKey, List<String>> tableNamesWithParametersCache;
private final Cache<HivePartitionName, AtomicReference<Map<String, HiveColumnStatistics>>> partitionStatisticsCache;
private final Cache<HivePartitionName, AtomicReference<Optional<Partition>>> partitionCache;
private final LoadingCache<PartitionFilter, Optional<List<String>>> partitionFilterCache;
Expand Down Expand Up @@ -206,6 +207,7 @@ private CachingHiveMetastore(
tablesCacheNew = cacheFactory.buildCache(this::loadTablesNew);
tableColumnStatisticsCache = statsCacheFactory.buildCache(this::refreshTableColumnStatistics);
tableCache = cacheFactory.buildCache(this::loadTable);
tableNamesWithParametersCache = cacheFactory.buildCache(this::loadTablesMatchingParameter);
tablePrivilegesCache = cacheFactory.buildCache(key -> loadTablePrivileges(key.database(), key.table(), key.owner(), key.principal()));
rolesCache = cacheFactory.buildCache(_ -> loadRoles());
roleGrantsCache = cacheFactory.buildCache(this::loadRoleGrants);
Expand All @@ -223,6 +225,7 @@ public void flushCache()
tablesCacheNew.invalidateAll();
databaseCache.invalidateAll();
tableCache.invalidateAll();
tableNamesWithParametersCache.invalidateAll();
partitionCache.invalidateAll();
partitionFilterCache.invalidateAll();
tablePrivilegesCache.invalidateAll();
Expand Down Expand Up @@ -565,6 +568,18 @@ private List<TableInfo> loadTablesNew(String databaseName)
return delegate.getTables(databaseName);
}

@Override
public List<String> getTableNamesWithParameters(String databaseName, String parameterKey, ImmutableSet<String> parameterValues)
{
TablesWithParameterCacheKey key = new TablesWithParameterCacheKey(databaseName, parameterKey, parameterValues);
return get(tableNamesWithParametersCache, key);
}

private List<String> loadTablesMatchingParameter(TablesWithParameterCacheKey key)
{
return delegate.getTableNamesWithParameters(key.databaseName(), key.parameterKey(), key.parameterValues());
}

@Override
public void createDatabase(Database database)
{
Expand Down Expand Up @@ -733,6 +748,7 @@ public void invalidateTable(String databaseName, String tableName)
HiveTableName hiveTableName = new HiveTableName(databaseName, tableName);
tableCache.invalidate(hiveTableName);
tablesCacheNew.invalidate(databaseName);
tableNamesWithParametersCache.invalidateAll();
invalidateAllIf(tablePrivilegesCache, userTableKey -> userTableKey.matches(databaseName, tableName));
tableColumnStatisticsCache.invalidate(hiveTableName);
invalidatePartitionCache(databaseName, tableName);
Expand Down Expand Up @@ -1153,6 +1169,16 @@ private static <K, V> Cache<K, AtomicReference<V>> buildBulkCache(
return cacheBuilder.build();
}

record TablesWithParameterCacheKey(String databaseName, String parameterKey, ImmutableSet<String> parameterValues)
{
TablesWithParameterCacheKey
{
requireNonNull(databaseName, "databaseName is null");
requireNonNull(parameterKey, "parameterKey is null");
requireNonNull(parameterValues, "parameterValues is null");
}
}

record UserTableKey(Optional<HivePrincipal> principal, String database, String table, Optional<String> owner)
{
UserTableKey
Expand Down Expand Up @@ -1201,6 +1227,13 @@ public CacheStatsMBean getTableNamesStats()
return new CacheStatsMBean(tablesCacheNew);
}

@Managed
@Nested
public CacheStatsMBean getTableWithParameterStats()
{
return new CacheStatsMBean(tableNamesWithParametersCache);
}

@Managed
@Nested
public CacheStatsMBean getTableColumnStatisticsStats()
Expand Down Expand Up @@ -1275,6 +1308,11 @@ LoadingCache<HiveTableName, Optional<Table>> getTableCache()
return tableCache;
}

LoadingCache<TablesWithParameterCacheKey, List<String>> getTableNamesWithParametersCache()
{
return tableNamesWithParametersCache;
}

public LoadingCache<String, List<TableInfo>> getTablesCacheNew()
{
return tablesCacheNew;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,13 @@ public AggregateCacheStatsMBean getTablesStats()
return new AggregateCacheStatsMBean(CachingHiveMetastore::getTablesCacheNew);
}

@Managed
@Nested
public AggregateCacheStatsMBean getTableWithParameterStats()
{
return new AggregateCacheStatsMBean(CachingHiveMetastore::getTableNamesWithParametersCache);
}

@Managed
@Nested
public AggregateCacheStatsMBean getTableColumnStatisticsCache()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
import java.util.OptionalLong;
import java.util.Set;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;

import static com.google.common.base.Preconditions.checkArgument;
Expand Down Expand Up @@ -177,7 +178,7 @@ public FileHiveMetastore(NodeVersion nodeVersion, TrinoFileSystemFactory fileSys

listTablesCache = EvictableCacheBuilder.newBuilder()
.expireAfterWrite(10, SECONDS)
.build(CacheLoader.from(this::doListAllTables));
.build(CacheLoader.from(databaseName -> doListAllTables(databaseName, _ -> true)));
}

@Override
Expand Down Expand Up @@ -532,7 +533,16 @@ private List<TableInfo> listAllTables(String databaseName)
return listTablesCache.getUnchecked(databaseName);
}

private synchronized List<TableInfo> doListAllTables(String databaseName)
@Override
public synchronized List<String> getTableNamesWithParameters(String databaseName, String parameterKey, ImmutableSet<String> parameterValues)
{
requireNonNull(parameterKey, "parameterKey is null");
return doListAllTables(databaseName, table -> parameterValues.contains(table.getParameters().get(parameterKey))).stream()
.map(tableInfo -> tableInfo.tableName().getTableName())
.collect(toImmutableList());
}

private synchronized List<TableInfo> doListAllTables(String databaseName, Predicate<TableMetadata> tableMetadataPredicate)
{
requireNonNull(databaseName, "databaseName is null");

Expand All @@ -557,7 +567,8 @@ private synchronized List<TableInfo> doListAllTables(String databaseName)
Location schemaFileLocation = subdirectory.appendPath(TRINO_SCHEMA_FILE_NAME_SUFFIX);
readFile("table schema", schemaFileLocation, tableCodec).ifPresent(tableMetadata -> {
checkVersion(tableMetadata.getWriterVersion());
if (hideDeltaLakeTables && DELTA_LAKE_PROVIDER.equals(tableMetadata.getParameters().get(SPARK_TABLE_PROVIDER_KEY))) {
if ((hideDeltaLakeTables && DELTA_LAKE_PROVIDER.equals(tableMetadata.getParameters().get(SPARK_TABLE_PROVIDER_KEY)))
|| !tableMetadataPredicate.test(tableMetadata)) {
return;
}
tables.add(new TableInfo(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -412,10 +412,21 @@ public void setDatabaseOwner(String databaseName, HivePrincipal principal)
@Override
public List<TableInfo> getTables(String databaseName)
{
return glueCache.getTables(databaseName, cacheTable -> getTablesInternal(cacheTable, databaseName));
return glueCache.getTables(databaseName, cacheTable -> getTablesInternal(cacheTable, databaseName, _ -> true));
}

private List<TableInfo> getTablesInternal(Consumer<Table> cacheTable, String databaseName)
@Override
public List<String> getTableNamesWithParameters(String databaseName, String parameterKey, ImmutableSet<String> parameterValues)
{
return getTablesInternal(
_ -> {},
databaseName,
table -> table.parameters() != null && parameterValues.contains(table.parameters().get(parameterKey))).stream()
.map(tableInfo -> tableInfo.tableName().getTableName())
.collect(toImmutableList());
}

private List<TableInfo> getTablesInternal(Consumer<Table> cacheTable, String databaseName, Predicate<software.amazon.awssdk.services.glue.model.Table> filter)
{
try {
ImmutableList<software.amazon.awssdk.services.glue.model.Table> glueTables = stats.getGetTables()
Expand All @@ -425,6 +436,7 @@ private List<TableInfo> getTablesInternal(Consumer<Table> cacheTable, String dat
.map(GetTablesResponse::tableList)
.flatMap(List::stream))
.filter(tableVisibilityFilter)
.filter(filter)
.collect(toImmutableList());

// Store only valid tables in cache
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,25 @@ public List<TableInfo> getTables(String databaseName)
}
}

@Override
public List<String> getTableNamesWithParameters(String databaseName, String parameterKey, ImmutableSet<String> parameterValues)
{
try {
return getGlueTables(databaseName)
.filter(tableFilter)
.filter(table -> parameterValues.contains(getTableParameters(table).get(parameterKey)))
.map(com.amazonaws.services.glue.model.Table::getName)
.collect(toImmutableList());
}
catch (EntityNotFoundException | AccessDeniedException e) {
// database does not exist or permission denied
return ImmutableList.of();
}
catch (AmazonServiceException e) {
throw new TrinoException(HIVE_METASTORE_ERROR, e);
}
}

@Override
public Optional<Table> getTable(String databaseName, String tableName)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import io.trino.hive.thrift.metastore.FieldSchema;
import io.trino.metastore.AcidOperation;
import io.trino.metastore.AcidTransactionOwner;
Expand Down Expand Up @@ -152,6 +153,12 @@ public List<TableInfo> getTables(String databaseName)
.collect(toImmutableList());
}

@Override
public List<String> getTableNamesWithParameters(String databaseName, String parameterKey, ImmutableSet<String> parameterValues)
{
return delegate.getTableNamesWithParameters(databaseName, parameterKey, parameterValues);
}

@Override
public void createDatabase(Database database)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public class DefaultThriftMetastoreClientFactory

private final MetastoreSupportsDateStatistics metastoreSupportsDateStatistics = new MetastoreSupportsDateStatistics();
private final AtomicInteger chosenGetTableAlternative = new AtomicInteger(Integer.MAX_VALUE);
private final AtomicInteger chosenTableParamAlternative = new AtomicInteger(Integer.MAX_VALUE);
private final AtomicInteger chosenAlterTransactionalTableAlternative = new AtomicInteger(Integer.MAX_VALUE);
private final AtomicInteger chosenAlterPartitionsAlternative = new AtomicInteger(Integer.MAX_VALUE);

Expand Down Expand Up @@ -115,6 +116,7 @@ protected ThriftMetastoreClient create(TransportSupplier transportSupplier, Stri
metastoreSupportsDateStatistics,
true,
chosenGetTableAlternative,
chosenTableParamAlternative,
chosenAlterTransactionalTableAlternative,
chosenAlterPartitionsAlternative);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;

import static java.util.Objects.requireNonNull;

Expand Down Expand Up @@ -92,6 +93,13 @@ public List<TableMeta> getTableMeta(String databaseName)
return runWithHandle(() -> delegate.getTableMeta(databaseName));
}

@Override
public List<String> getTableNamesWithParameters(String databaseName, String parameterKey, Set<String> parameterValues)
throws TException
{
return runWithHandle(() -> delegate.getTableNamesWithParameters(databaseName, parameterKey, parameterValues));
}

@Override
public void createDatabase(Database database)
throws TException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ public class HttpThriftMetastoreClientFactory
private final OpenTelemetry openTelemetry;

private final AtomicInteger chosenGetTableAlternative = new AtomicInteger(Integer.MAX_VALUE);
private final AtomicInteger chosenGetTableParamAlternative = new AtomicInteger(Integer.MAX_VALUE);
private final AtomicInteger chosenAlterTransactionalTableAlternative = new AtomicInteger(Integer.MAX_VALUE);
private final AtomicInteger chosenAlterPartitionsAlternative = new AtomicInteger(Integer.MAX_VALUE);

Expand Down Expand Up @@ -85,6 +86,7 @@ public ThriftMetastoreClient create(URI uri, Optional<String> delegationToken)
new MetastoreSupportsDateStatistics(),
false,
chosenGetTableAlternative,
chosenGetTableParamAlternative,
chosenAlterTransactionalTableAlternative,
chosenAlterPartitionsAlternative);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,30 @@ public List<TableMeta> getTables(String databaseName)
}
}

@Override
public List<String> getTableNamesWithParameters(String databaseName, String parameterKey, Set<String> parameterValues)
{
try {
return retry()
.stopOn(NoSuchObjectException.class)
.stopOnIllegalExceptions()
.run("getTableNamesWithParameters", () -> {
try (ThriftMetastoreClient client = createMetastoreClient()) {
return client.getTableNamesWithParameters(databaseName, parameterKey, parameterValues);
}
});
}
catch (NoSuchObjectException e) {
return ImmutableList.of();
}
catch (TException e) {
throw new TrinoException(HIVE_METASTORE_ERROR, e);
}
catch (Exception e) {
throw propagate(e);
}
}

@Override
public Optional<Table> getTable(String databaseName, String tableName)
{
Expand Down
Loading

0 comments on commit 8cd060d

Please sign in to comment.