Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Limit session state during metadata queries in Iceberg #19757

Merged
merged 4 commits into from
Nov 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions lib/trino-cache/src/main/java/io/trino/cache/CacheUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
package io.trino.cache;

import com.google.common.cache.Cache;
import com.google.common.util.concurrent.ExecutionError;
import com.google.common.util.concurrent.UncheckedExecutionException;

import java.util.List;
import java.util.concurrent.ExecutionException;
Expand All @@ -26,6 +28,11 @@ public final class CacheUtils
{
private CacheUtils() {}

/**
* @throws UncheckedExecutionException when {@code loader} throws an unchecked exception
* @throws ExecutionError when {@code loader} throws an {@link Error}
* @throws RuntimeException when{@code loader} throws a checked exception (which should not happen)
*/
public static <K, V> V uncheckedCacheGet(Cache<K, V> cache, K key, Supplier<V> loader)
{
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,13 +149,15 @@ public void updateTableComment(ConnectorSession session, SchemaTableName schemaT
else {
icebergTable.updateProperties().set(TABLE_COMMENT, comment.get()).commit();
}
invalidateTableCache(schemaTableName);
}

@Override
public void updateColumnComment(ConnectorSession session, SchemaTableName schemaTableName, ColumnIdentity columnIdentity, Optional<String> comment)
{
Table icebergTable = loadTable(session, schemaTableName);
icebergTable.updateSchema().updateColumnDoc(columnIdentity.getName(), comment.orElse(null)).commit();
invalidateTableCache(schemaTableName);
}

@Override
Expand Down Expand Up @@ -471,6 +473,8 @@ protected Map<String, String> createMaterializedViewProperties(ConnectorSession
.buildOrThrow();
}

protected abstract void invalidateTableCache(SchemaTableName schemaTableName);

protected static class MaterializedViewMayBeBeingRemovedException
extends RuntimeException
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.function.UnaryOperator;
Expand All @@ -105,6 +104,7 @@
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Throwables.throwIfInstanceOf;
import static com.google.common.base.Throwables.throwIfUnchecked;
import static com.google.common.base.Verify.verify;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
Expand Down Expand Up @@ -180,9 +180,16 @@ public class TrinoGlueCatalog
// Even though this is query-scoped, this still needs to be bounded. information_schema queries can access large number of tables.
.maximumSize(Math.max(PER_QUERY_CACHE_SIZE, IcebergMetadata.GET_METADATA_BATCH_SIZE))
.build();
private final Map<SchemaTableName, TableMetadata> tableMetadataCache = new ConcurrentHashMap<>();
private final Map<SchemaTableName, ConnectorViewDefinition> viewCache = new ConcurrentHashMap<>();
private final Map<SchemaTableName, MaterializedViewData> materializedViewCache = new ConcurrentHashMap<>();

private final Cache<SchemaTableName, TableMetadata> tableMetadataCache = EvictableCacheBuilder.newBuilder()
.maximumSize(PER_QUERY_CACHE_SIZE)
.build();
private final Cache<SchemaTableName, ConnectorViewDefinition> viewCache = EvictableCacheBuilder.newBuilder()
.maximumSize(PER_QUERY_CACHE_SIZE)
.build();
private final Cache<SchemaTableName, MaterializedViewData> materializedViewCache = EvictableCacheBuilder.newBuilder()
.maximumSize(PER_QUERY_CACHE_SIZE)
.build();

public TrinoGlueCatalog(
CatalogName catalogName,
Expand Down Expand Up @@ -558,22 +565,30 @@ private void getCommentsFromIcebergMetadata(
@Override
public Table loadTable(ConnectorSession session, SchemaTableName table)
{
if (viewCache.containsKey(table) || materializedViewCache.containsKey(table)) {
if (viewCache.asMap().containsKey(table) || materializedViewCache.asMap().containsKey(table)) {
throw new TableNotFoundException(table);
}

TableMetadata metadata = tableMetadataCache.computeIfAbsent(
table,
ignore -> {
TableOperations operations = tableOperationsProvider.createTableOperations(
this,
session,
table.getSchemaName(),
table.getTableName(),
Optional.empty(),
Optional.empty());
return new BaseTable(operations, quotedTableName(table), TRINO_METRICS_REPORTER).operations().current();
});
TableMetadata metadata;
try {
metadata = uncheckedCacheGet(
tableMetadataCache,
table,
() -> {
TableOperations operations = tableOperationsProvider.createTableOperations(
this,
session,
table.getSchemaName(),
table.getTableName(),
Optional.empty(),
Optional.empty());
return new BaseTable(operations, quotedTableName(table), TRINO_METRICS_REPORTER).operations().current();
});
}
catch (UncheckedExecutionException e) {
throwIfUnchecked(e.getCause());
throw e;
}

return getIcebergTableWithMetadata(
this,
Expand Down Expand Up @@ -612,7 +627,7 @@ public Map<SchemaTableName, List<ColumnMetadata>> tryGetColumnMetadata(Connector

private Optional<List<ColumnMetadata>> getCachedColumnMetadata(SchemaTableName tableName)
{
if (!cacheTableMetadata || viewCache.containsKey(tableName) || materializedViewCache.containsKey(tableName)) {
if (!cacheTableMetadata || viewCache.asMap().containsKey(tableName) || materializedViewCache.asMap().containsKey(tableName)) {
return Optional.empty();
}

Expand Down Expand Up @@ -677,6 +692,7 @@ public void dropTable(ConnectorSession session, SchemaTableName schemaTableName)
LOG.warn(e, "Failed to delete table data referenced by metadata");
}
deleteTableDirectory(fileSystemFactory.create(session), schemaTableName, table.location());
invalidateTableCache(schemaTableName);
}

@Override
Expand All @@ -689,6 +705,7 @@ public void dropCorruptedTable(ConnectorSession session, SchemaTableName schemaT
}
String tableLocation = metadataLocation.replaceFirst("/metadata/[^/]*$", "");
deleteTableDirectory(fileSystemFactory.create(session), schemaTableName, tableLocation);
invalidateTableCache(schemaTableName);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be moved to dropTableFromMetastore or even to deleteTable to simplify code and prevent from omitting in case new functions in future?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i thought about it. technically it would work, but i considered dropTableFromMetastore being just a technical operation, which may or may not be invoked, or be the last operation as part of the drop flow

}

@Override
Expand Down Expand Up @@ -752,6 +769,7 @@ public void registerTable(ConnectorSession session, SchemaTableName schemaTableN
public void unregisterTable(ConnectorSession session, SchemaTableName schemaTableName)
{
dropTableFromMetastore(session, schemaTableName);
invalidateTableCache(schemaTableName);
}

private com.amazonaws.services.glue.model.Table dropTableFromMetastore(ConnectorSession session, SchemaTableName schemaTableName)
Expand Down Expand Up @@ -796,6 +814,7 @@ public void renameTable(ConnectorSession session, SchemaTableName from, SchemaTa
createTable(to.getSchemaName(), tableInput);
newTableCreated = true;
deleteTable(from.getSchemaName(), from.getTableName());
invalidateTableCache(from);
}
catch (RuntimeException e) {
if (newTableCreated) {
Expand Down Expand Up @@ -824,37 +843,41 @@ private Optional<com.amazonaws.services.glue.model.Table> getTableAndCacheMetada

String tableType = getTableType(table);
Map<String, String> parameters = getTableParameters(table);
if (isIcebergTable(parameters) && !tableMetadataCache.containsKey(schemaTableName)) {
if (viewCache.containsKey(schemaTableName) || materializedViewCache.containsKey(schemaTableName)) {
if (isIcebergTable(parameters) && !tableMetadataCache.asMap().containsKey(schemaTableName)) {
if (viewCache.asMap().containsKey(schemaTableName) || materializedViewCache.asMap().containsKey(schemaTableName)) {
throw new TrinoException(GENERIC_INTERNAL_ERROR, "Glue table cache inconsistency. Table cannot also be a view/materialized view");
}

String metadataLocation = parameters.get(METADATA_LOCATION_PROP);
try {
// Cache the TableMetadata while we have the Table retrieved anyway
tableMetadataCache.put(schemaTableName, TableMetadataParser.read(new ForwardingFileIo(fileSystemFactory.create(session)), metadataLocation));
// Note: this is racy from cache invalidation perspective, but it should not matter here
uncheckedCacheGet(tableMetadataCache, schemaTableName, () -> TableMetadataParser.read(new ForwardingFileIo(fileSystemFactory.create(session)), metadataLocation));
}
catch (RuntimeException e) {
LOG.warn(e, "Failed to cache table metadata from table at %s", metadataLocation);
}
}
else if (isTrinoMaterializedView(tableType, parameters)) {
if (viewCache.containsKey(schemaTableName) || tableMetadataCache.containsKey(schemaTableName)) {
if (viewCache.asMap().containsKey(schemaTableName) || tableMetadataCache.asMap().containsKey(schemaTableName)) {
throw new TrinoException(GENERIC_INTERNAL_ERROR, "Glue table cache inconsistency. Materialized View cannot also be a table or view");
}

try {
ConnectorMaterializedViewDefinition materializedView = createMaterializedViewDefinition(session, schemaTableName, table);
materializedViewCache.put(schemaTableName, new MaterializedViewData(
materializedView,
Optional.ofNullable(parameters.get(METADATA_LOCATION_PROP))));
// Note: this is racy from cache invalidation perspective, but it should not matter here
uncheckedCacheGet(materializedViewCache, schemaTableName, () -> {
ConnectorMaterializedViewDefinition materializedView = createMaterializedViewDefinition(session, schemaTableName, table);
return new MaterializedViewData(
materializedView,
Optional.ofNullable(parameters.get(METADATA_LOCATION_PROP)));
});
}
catch (RuntimeException e) {
LOG.warn(e, "Failed to cache materialized view from %s", schemaTableName);
}
}
else if (isTrinoView(tableType, parameters) && !viewCache.containsKey(schemaTableName)) {
if (materializedViewCache.containsKey(schemaTableName) || tableMetadataCache.containsKey(schemaTableName)) {
else if (isTrinoView(tableType, parameters) && !viewCache.asMap().containsKey(schemaTableName)) {
if (materializedViewCache.asMap().containsKey(schemaTableName) || tableMetadataCache.asMap().containsKey(schemaTableName)) {
throw new TrinoException(GENERIC_INTERNAL_ERROR, "Glue table cache inconsistency. View cannot also be a materialized view or table");
}

Expand All @@ -864,7 +887,10 @@ else if (isTrinoView(tableType, parameters) && !viewCache.containsKey(schemaTabl
tableType,
parameters,
Optional.ofNullable(table.getOwner()))
.ifPresent(viewDefinition -> viewCache.put(schemaTableName, viewDefinition));
.ifPresent(viewDefinition -> {
// Note: this is racy from cache invalidation perspective, but it should not matter here
uncheckedCacheGet(viewCache, schemaTableName, () -> viewDefinition);
});
}
catch (RuntimeException e) {
LOG.warn(e, "Failed to cache view from %s", schemaTableName);
Expand Down Expand Up @@ -953,7 +979,7 @@ public void renameView(ConnectorSession session, SchemaTableName source, SchemaT
try {
com.amazonaws.services.glue.model.Table existingView = getTableAndCacheMetadata(session, source)
.orElseThrow(() -> new TableNotFoundException(source));
viewCache.remove(source);
viewCache.invalidate(source);
TableInput viewTableInput = getViewTableInput(
target.getTableName(),
existingView.getViewOriginalText(),
Expand Down Expand Up @@ -992,7 +1018,7 @@ public void dropView(ConnectorSession session, SchemaTableName schemaViewName)
}

try {
viewCache.remove(schemaViewName);
viewCache.invalidate(schemaViewName);
deleteTable(schemaViewName.getSchemaName(), schemaViewName.getTableName());
}
catch (AmazonServiceException e) {
Expand Down Expand Up @@ -1027,12 +1053,12 @@ public List<SchemaTableName> listViews(ConnectorSession session, Optional<String
@Override
public Optional<ConnectorViewDefinition> getView(ConnectorSession session, SchemaTableName viewName)
{
ConnectorViewDefinition cachedView = viewCache.get(viewName);
ConnectorViewDefinition cachedView = viewCache.getIfPresent(viewName);
if (cachedView != null) {
return Optional.of(cachedView);
}

if (tableMetadataCache.containsKey(viewName) || materializedViewCache.containsKey(viewName)) {
if (tableMetadataCache.asMap().containsKey(viewName) || materializedViewCache.asMap().containsKey(viewName)) {
// Entries in these caches are not views
return Optional.empty();
}
Expand Down Expand Up @@ -1253,7 +1279,7 @@ public void dropMaterializedView(ConnectorSession session, SchemaTableName viewN
if (!isTrinoMaterializedView(getTableType(view), getTableParameters(view))) {
throw new TrinoException(UNSUPPORTED_TABLE_TYPE, "Not a Materialized View: " + view.getDatabaseName() + "." + view.getName());
}
materializedViewCache.remove(viewName);
materializedViewCache.invalidate(viewName);
dropStorageTable(session, view);
deleteTable(view.getDatabaseName(), view.getName());
}
Expand All @@ -1277,12 +1303,12 @@ private void dropStorageTable(ConnectorSession session, com.amazonaws.services.g
@Override
protected Optional<ConnectorMaterializedViewDefinition> doGetMaterializedView(ConnectorSession session, SchemaTableName viewName)
{
MaterializedViewData materializedViewData = materializedViewCache.get(viewName);
MaterializedViewData materializedViewData = materializedViewCache.getIfPresent(viewName);
if (materializedViewData != null) {
return Optional.of(materializedViewData.connectorMaterializedViewDefinition);
}

if (tableMetadataCache.containsKey(viewName) || viewCache.containsKey(viewName)) {
if (tableMetadataCache.asMap().containsKey(viewName) || viewCache.asMap().containsKey(viewName)) {
// Entries in these caches are not materialized views.
return Optional.empty();
}
Expand Down Expand Up @@ -1376,7 +1402,7 @@ private ConnectorMaterializedViewDefinition createMaterializedViewDefinition(
public Optional<BaseTable> getMaterializedViewStorageTable(ConnectorSession session, SchemaTableName viewName)
{
String storageMetadataLocation;
MaterializedViewData materializedViewData = materializedViewCache.get(viewName);
MaterializedViewData materializedViewData = materializedViewCache.getIfPresent(viewName);
if (materializedViewData == null) {
Optional<com.amazonaws.services.glue.model.Table> maybeTable = getTableAndCacheMetadata(session, viewName);
if (maybeTable.isEmpty()) {
Expand Down Expand Up @@ -1419,7 +1445,7 @@ private TableMetadata getMaterializedViewTableMetadata(ConnectorSession session,
{
requireNonNull(storageTableName, "storageTableName is null");
requireNonNull(storageMetadataLocation, "storageMetadataLocation is null");
return tableMetadataCache.computeIfAbsent(storageTableName, ignored -> {
return uncheckedCacheGet(tableMetadataCache, storageTableName, () -> {
TrinoFileSystem fileSystem = fileSystemFactory.create(session);
return TableMetadataParser.read(new ForwardingFileIo(fileSystem), storageMetadataLocation);
});
Expand All @@ -1432,7 +1458,7 @@ public void renameMaterializedView(ConnectorSession session, SchemaTableName sou
try {
com.amazonaws.services.glue.model.Table glueTable = getTableAndCacheMetadata(session, source)
.orElseThrow(() -> new TableNotFoundException(source));
materializedViewCache.remove(source);
materializedViewCache.invalidate(source);
Map<String, String> tableParameters = getTableParameters(glueTable);
if (!isTrinoMaterializedView(getTableType(glueTable), tableParameters)) {
throw new TrinoException(UNSUPPORTED_TABLE_TYPE, "Not a Materialized View: " + source);
Expand Down Expand Up @@ -1486,6 +1512,12 @@ public Optional<CatalogSchemaTableName> redirectTable(ConnectorSession session,
return Optional.empty();
}

@Override
protected void invalidateTableCache(SchemaTableName schemaTableName)
{
tableMetadataCache.invalidate(schemaTableName);
}

com.amazonaws.services.glue.model.Table getTable(SchemaTableName tableName, boolean invalidateCaches)
{
if (invalidateCaches) {
Expand Down
Loading
Loading