diff --git a/lib/trino-cache/src/main/java/io/trino/cache/CacheUtils.java b/lib/trino-cache/src/main/java/io/trino/cache/CacheUtils.java index 614ebaa210f2..313eca6ca3c1 100644 --- a/lib/trino-cache/src/main/java/io/trino/cache/CacheUtils.java +++ b/lib/trino-cache/src/main/java/io/trino/cache/CacheUtils.java @@ -14,6 +14,8 @@ package io.trino.cache; import com.google.common.cache.Cache; +import com.google.common.util.concurrent.ExecutionError; +import com.google.common.util.concurrent.UncheckedExecutionException; import java.util.List; import java.util.concurrent.ExecutionException; @@ -26,6 +28,11 @@ public final class CacheUtils { private CacheUtils() {} + /** + * @throws UncheckedExecutionException when {@code loader} throws an unchecked exception + * @throws ExecutionError when {@code loader} throws an {@link Error} + * @throws RuntimeException when{@code loader} throws a checked exception (which should not happen) + */ public static V uncheckedCacheGet(Cache cache, K key, Supplier loader) { try { diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractTrinoCatalog.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractTrinoCatalog.java index dc518c966dd3..922d5293f5f3 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractTrinoCatalog.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractTrinoCatalog.java @@ -149,6 +149,7 @@ public void updateTableComment(ConnectorSession session, SchemaTableName schemaT else { icebergTable.updateProperties().set(TABLE_COMMENT, comment.get()).commit(); } + invalidateTableCache(schemaTableName); } @Override @@ -156,6 +157,7 @@ public void updateColumnComment(ConnectorSession session, SchemaTableName schema { Table icebergTable = loadTable(session, schemaTableName); icebergTable.updateSchema().updateColumnDoc(columnIdentity.getName(), comment.orElse(null)).commit(); + invalidateTableCache(schemaTableName); } @Override @@ -471,6 +473,8 @@ protected Map createMaterializedViewProperties(ConnectorSession .buildOrThrow(); } + protected abstract void invalidateTableCache(SchemaTableName schemaTableName); + protected static class MaterializedViewMayBeBeingRemovedException extends RuntimeException { diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/TrinoGlueCatalog.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/TrinoGlueCatalog.java index 995bdb127587..6761c555c00c 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/TrinoGlueCatalog.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/TrinoGlueCatalog.java @@ -95,7 +95,6 @@ import java.util.Objects; import java.util.Optional; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; import java.util.function.Consumer; import java.util.function.Predicate; import java.util.function.UnaryOperator; @@ -105,6 +104,7 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; import static com.google.common.base.Throwables.throwIfInstanceOf; +import static com.google.common.base.Throwables.throwIfUnchecked; import static com.google.common.base.Verify.verify; import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.collect.ImmutableSet.toImmutableSet; @@ -180,9 +180,16 @@ public class TrinoGlueCatalog // Even though this is query-scoped, this still needs to be bounded. information_schema queries can access large number of tables. .maximumSize(Math.max(PER_QUERY_CACHE_SIZE, IcebergMetadata.GET_METADATA_BATCH_SIZE)) .build(); - private final Map tableMetadataCache = new ConcurrentHashMap<>(); - private final Map viewCache = new ConcurrentHashMap<>(); - private final Map materializedViewCache = new ConcurrentHashMap<>(); + + private final Cache tableMetadataCache = EvictableCacheBuilder.newBuilder() + .maximumSize(PER_QUERY_CACHE_SIZE) + .build(); + private final Cache viewCache = EvictableCacheBuilder.newBuilder() + .maximumSize(PER_QUERY_CACHE_SIZE) + .build(); + private final Cache materializedViewCache = EvictableCacheBuilder.newBuilder() + .maximumSize(PER_QUERY_CACHE_SIZE) + .build(); public TrinoGlueCatalog( CatalogName catalogName, @@ -558,22 +565,30 @@ private void getCommentsFromIcebergMetadata( @Override public Table loadTable(ConnectorSession session, SchemaTableName table) { - if (viewCache.containsKey(table) || materializedViewCache.containsKey(table)) { + if (viewCache.asMap().containsKey(table) || materializedViewCache.asMap().containsKey(table)) { throw new TableNotFoundException(table); } - TableMetadata metadata = tableMetadataCache.computeIfAbsent( - table, - ignore -> { - TableOperations operations = tableOperationsProvider.createTableOperations( - this, - session, - table.getSchemaName(), - table.getTableName(), - Optional.empty(), - Optional.empty()); - return new BaseTable(operations, quotedTableName(table), TRINO_METRICS_REPORTER).operations().current(); - }); + TableMetadata metadata; + try { + metadata = uncheckedCacheGet( + tableMetadataCache, + table, + () -> { + TableOperations operations = tableOperationsProvider.createTableOperations( + this, + session, + table.getSchemaName(), + table.getTableName(), + Optional.empty(), + Optional.empty()); + return new BaseTable(operations, quotedTableName(table), TRINO_METRICS_REPORTER).operations().current(); + }); + } + catch (UncheckedExecutionException e) { + throwIfUnchecked(e.getCause()); + throw e; + } return getIcebergTableWithMetadata( this, @@ -612,7 +627,7 @@ public Map> tryGetColumnMetadata(Connector private Optional> getCachedColumnMetadata(SchemaTableName tableName) { - if (!cacheTableMetadata || viewCache.containsKey(tableName) || materializedViewCache.containsKey(tableName)) { + if (!cacheTableMetadata || viewCache.asMap().containsKey(tableName) || materializedViewCache.asMap().containsKey(tableName)) { return Optional.empty(); } @@ -677,6 +692,7 @@ public void dropTable(ConnectorSession session, SchemaTableName schemaTableName) LOG.warn(e, "Failed to delete table data referenced by metadata"); } deleteTableDirectory(fileSystemFactory.create(session), schemaTableName, table.location()); + invalidateTableCache(schemaTableName); } @Override @@ -689,6 +705,7 @@ public void dropCorruptedTable(ConnectorSession session, SchemaTableName schemaT } String tableLocation = metadataLocation.replaceFirst("/metadata/[^/]*$", ""); deleteTableDirectory(fileSystemFactory.create(session), schemaTableName, tableLocation); + invalidateTableCache(schemaTableName); } @Override @@ -752,6 +769,7 @@ public void registerTable(ConnectorSession session, SchemaTableName schemaTableN public void unregisterTable(ConnectorSession session, SchemaTableName schemaTableName) { dropTableFromMetastore(session, schemaTableName); + invalidateTableCache(schemaTableName); } private com.amazonaws.services.glue.model.Table dropTableFromMetastore(ConnectorSession session, SchemaTableName schemaTableName) @@ -796,6 +814,7 @@ public void renameTable(ConnectorSession session, SchemaTableName from, SchemaTa createTable(to.getSchemaName(), tableInput); newTableCreated = true; deleteTable(from.getSchemaName(), from.getTableName()); + invalidateTableCache(from); } catch (RuntimeException e) { if (newTableCreated) { @@ -824,37 +843,41 @@ private Optional getTableAndCacheMetada String tableType = getTableType(table); Map parameters = getTableParameters(table); - if (isIcebergTable(parameters) && !tableMetadataCache.containsKey(schemaTableName)) { - if (viewCache.containsKey(schemaTableName) || materializedViewCache.containsKey(schemaTableName)) { + if (isIcebergTable(parameters) && !tableMetadataCache.asMap().containsKey(schemaTableName)) { + if (viewCache.asMap().containsKey(schemaTableName) || materializedViewCache.asMap().containsKey(schemaTableName)) { throw new TrinoException(GENERIC_INTERNAL_ERROR, "Glue table cache inconsistency. Table cannot also be a view/materialized view"); } String metadataLocation = parameters.get(METADATA_LOCATION_PROP); try { // Cache the TableMetadata while we have the Table retrieved anyway - tableMetadataCache.put(schemaTableName, TableMetadataParser.read(new ForwardingFileIo(fileSystemFactory.create(session)), metadataLocation)); + // Note: this is racy from cache invalidation perspective, but it should not matter here + uncheckedCacheGet(tableMetadataCache, schemaTableName, () -> TableMetadataParser.read(new ForwardingFileIo(fileSystemFactory.create(session)), metadataLocation)); } catch (RuntimeException e) { LOG.warn(e, "Failed to cache table metadata from table at %s", metadataLocation); } } else if (isTrinoMaterializedView(tableType, parameters)) { - if (viewCache.containsKey(schemaTableName) || tableMetadataCache.containsKey(schemaTableName)) { + if (viewCache.asMap().containsKey(schemaTableName) || tableMetadataCache.asMap().containsKey(schemaTableName)) { throw new TrinoException(GENERIC_INTERNAL_ERROR, "Glue table cache inconsistency. Materialized View cannot also be a table or view"); } try { - ConnectorMaterializedViewDefinition materializedView = createMaterializedViewDefinition(session, schemaTableName, table); - materializedViewCache.put(schemaTableName, new MaterializedViewData( - materializedView, - Optional.ofNullable(parameters.get(METADATA_LOCATION_PROP)))); + // Note: this is racy from cache invalidation perspective, but it should not matter here + uncheckedCacheGet(materializedViewCache, schemaTableName, () -> { + ConnectorMaterializedViewDefinition materializedView = createMaterializedViewDefinition(session, schemaTableName, table); + return new MaterializedViewData( + materializedView, + Optional.ofNullable(parameters.get(METADATA_LOCATION_PROP))); + }); } catch (RuntimeException e) { LOG.warn(e, "Failed to cache materialized view from %s", schemaTableName); } } - else if (isTrinoView(tableType, parameters) && !viewCache.containsKey(schemaTableName)) { - if (materializedViewCache.containsKey(schemaTableName) || tableMetadataCache.containsKey(schemaTableName)) { + else if (isTrinoView(tableType, parameters) && !viewCache.asMap().containsKey(schemaTableName)) { + if (materializedViewCache.asMap().containsKey(schemaTableName) || tableMetadataCache.asMap().containsKey(schemaTableName)) { throw new TrinoException(GENERIC_INTERNAL_ERROR, "Glue table cache inconsistency. View cannot also be a materialized view or table"); } @@ -864,7 +887,10 @@ else if (isTrinoView(tableType, parameters) && !viewCache.containsKey(schemaTabl tableType, parameters, Optional.ofNullable(table.getOwner())) - .ifPresent(viewDefinition -> viewCache.put(schemaTableName, viewDefinition)); + .ifPresent(viewDefinition -> { + // Note: this is racy from cache invalidation perspective, but it should not matter here + uncheckedCacheGet(viewCache, schemaTableName, () -> viewDefinition); + }); } catch (RuntimeException e) { LOG.warn(e, "Failed to cache view from %s", schemaTableName); @@ -953,7 +979,7 @@ public void renameView(ConnectorSession session, SchemaTableName source, SchemaT try { com.amazonaws.services.glue.model.Table existingView = getTableAndCacheMetadata(session, source) .orElseThrow(() -> new TableNotFoundException(source)); - viewCache.remove(source); + viewCache.invalidate(source); TableInput viewTableInput = getViewTableInput( target.getTableName(), existingView.getViewOriginalText(), @@ -992,7 +1018,7 @@ public void dropView(ConnectorSession session, SchemaTableName schemaViewName) } try { - viewCache.remove(schemaViewName); + viewCache.invalidate(schemaViewName); deleteTable(schemaViewName.getSchemaName(), schemaViewName.getTableName()); } catch (AmazonServiceException e) { @@ -1027,12 +1053,12 @@ public List listViews(ConnectorSession session, Optional getView(ConnectorSession session, SchemaTableName viewName) { - ConnectorViewDefinition cachedView = viewCache.get(viewName); + ConnectorViewDefinition cachedView = viewCache.getIfPresent(viewName); if (cachedView != null) { return Optional.of(cachedView); } - if (tableMetadataCache.containsKey(viewName) || materializedViewCache.containsKey(viewName)) { + if (tableMetadataCache.asMap().containsKey(viewName) || materializedViewCache.asMap().containsKey(viewName)) { // Entries in these caches are not views return Optional.empty(); } @@ -1253,7 +1279,7 @@ public void dropMaterializedView(ConnectorSession session, SchemaTableName viewN if (!isTrinoMaterializedView(getTableType(view), getTableParameters(view))) { throw new TrinoException(UNSUPPORTED_TABLE_TYPE, "Not a Materialized View: " + view.getDatabaseName() + "." + view.getName()); } - materializedViewCache.remove(viewName); + materializedViewCache.invalidate(viewName); dropStorageTable(session, view); deleteTable(view.getDatabaseName(), view.getName()); } @@ -1277,12 +1303,12 @@ private void dropStorageTable(ConnectorSession session, com.amazonaws.services.g @Override protected Optional doGetMaterializedView(ConnectorSession session, SchemaTableName viewName) { - MaterializedViewData materializedViewData = materializedViewCache.get(viewName); + MaterializedViewData materializedViewData = materializedViewCache.getIfPresent(viewName); if (materializedViewData != null) { return Optional.of(materializedViewData.connectorMaterializedViewDefinition); } - if (tableMetadataCache.containsKey(viewName) || viewCache.containsKey(viewName)) { + if (tableMetadataCache.asMap().containsKey(viewName) || viewCache.asMap().containsKey(viewName)) { // Entries in these caches are not materialized views. return Optional.empty(); } @@ -1376,7 +1402,7 @@ private ConnectorMaterializedViewDefinition createMaterializedViewDefinition( public Optional getMaterializedViewStorageTable(ConnectorSession session, SchemaTableName viewName) { String storageMetadataLocation; - MaterializedViewData materializedViewData = materializedViewCache.get(viewName); + MaterializedViewData materializedViewData = materializedViewCache.getIfPresent(viewName); if (materializedViewData == null) { Optional maybeTable = getTableAndCacheMetadata(session, viewName); if (maybeTable.isEmpty()) { @@ -1419,7 +1445,7 @@ private TableMetadata getMaterializedViewTableMetadata(ConnectorSession session, { requireNonNull(storageTableName, "storageTableName is null"); requireNonNull(storageMetadataLocation, "storageMetadataLocation is null"); - return tableMetadataCache.computeIfAbsent(storageTableName, ignored -> { + return uncheckedCacheGet(tableMetadataCache, storageTableName, () -> { TrinoFileSystem fileSystem = fileSystemFactory.create(session); return TableMetadataParser.read(new ForwardingFileIo(fileSystem), storageMetadataLocation); }); @@ -1432,7 +1458,7 @@ public void renameMaterializedView(ConnectorSession session, SchemaTableName sou try { com.amazonaws.services.glue.model.Table glueTable = getTableAndCacheMetadata(session, source) .orElseThrow(() -> new TableNotFoundException(source)); - materializedViewCache.remove(source); + materializedViewCache.invalidate(source); Map tableParameters = getTableParameters(glueTable); if (!isTrinoMaterializedView(getTableType(glueTable), tableParameters)) { throw new TrinoException(UNSUPPORTED_TABLE_TYPE, "Not a Materialized View: " + source); @@ -1486,6 +1512,12 @@ public Optional redirectTable(ConnectorSession session, return Optional.empty(); } + @Override + protected void invalidateTableCache(SchemaTableName schemaTableName) + { + tableMetadataCache.invalidate(schemaTableName); + } + com.amazonaws.services.glue.model.Table getTable(SchemaTableName tableName, boolean invalidateCaches) { if (invalidateCaches) { diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/TrinoHiveCatalog.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/TrinoHiveCatalog.java index c3bcde1a1b73..39ef750ec7de 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/TrinoHiveCatalog.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/TrinoHiveCatalog.java @@ -13,10 +13,13 @@ */ package io.trino.plugin.iceberg.catalog.hms; +import com.google.common.cache.Cache; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import com.google.common.util.concurrent.UncheckedExecutionException; import io.airlift.log.Logger; +import io.trino.cache.EvictableCacheBuilder; import io.trino.filesystem.Location; import io.trino.filesystem.TrinoFileSystem; import io.trino.filesystem.TrinoFileSystemFactory; @@ -69,14 +72,15 @@ import java.util.Objects; import java.util.Optional; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; import java.util.function.Predicate; import java.util.function.UnaryOperator; import java.util.stream.Collectors; import static com.google.common.base.Preconditions.checkState; +import static com.google.common.base.Throwables.throwIfUnchecked; import static com.google.common.base.Verify.verify; import static com.google.common.collect.ImmutableList.toImmutableList; +import static io.trino.cache.CacheUtils.uncheckedCacheGet; import static io.trino.filesystem.Locations.appendPath; import static io.trino.plugin.hive.HiveErrorCode.HIVE_DATABASE_LOCATION_ERROR; import static io.trino.plugin.hive.HiveErrorCode.HIVE_INVALID_METADATA; @@ -86,7 +90,6 @@ import static io.trino.plugin.hive.TableType.EXTERNAL_TABLE; import static io.trino.plugin.hive.TableType.VIRTUAL_VIEW; import static io.trino.plugin.hive.ViewReaderUtil.ICEBERG_MATERIALIZED_VIEW_COMMENT; -import static io.trino.plugin.hive.ViewReaderUtil.encodeViewData; import static io.trino.plugin.hive.ViewReaderUtil.isSomeKindOfAView; import static io.trino.plugin.hive.ViewReaderUtil.isTrinoMaterializedView; import static io.trino.plugin.hive.metastore.MetastoreUtil.buildInitialPrivilegeSet; @@ -125,6 +128,7 @@ public class TrinoHiveCatalog extends AbstractTrinoCatalog { private static final Logger log = Logger.get(TrinoHiveCatalog.class); + private static final int PER_QUERY_CACHE_SIZE = 1000; public static final String DEPENDS_ON_TABLES = "dependsOnTables"; // Value should be ISO-8601 formatted time instant public static final String TRINO_QUERY_START_TIME = "trino-query-start-time"; @@ -136,7 +140,9 @@ public class TrinoHiveCatalog private final boolean deleteSchemaLocationsFallback; private final boolean hideMaterializedViewStorageTable; - private final Map tableMetadataCache = new ConcurrentHashMap<>(); + private final Cache tableMetadataCache = EvictableCacheBuilder.newBuilder() + .maximumSize(PER_QUERY_CACHE_SIZE) + .build(); public TrinoHiveCatalog( CatalogName catalogName, @@ -366,6 +372,7 @@ private static Optional getQueryId(io.trino.plugin.hive.metastore.Table public void unregisterTable(ConnectorSession session, SchemaTableName schemaTableName) { dropTableFromMetastore(schemaTableName); + invalidateTableCache(schemaTableName); } @Override @@ -422,6 +429,7 @@ public void dropTable(ConnectorSession session, SchemaTableName schemaTableName) log.warn(e, "Failed to delete table data referenced by metadata"); } deleteTableDirectory(fileSystemFactory.create(session), schemaTableName, metastoreTable.getStorage().getLocation()); + invalidateTableCache(schemaTableName); } @Override @@ -429,6 +437,7 @@ public void dropCorruptedTable(ConnectorSession session, SchemaTableName schemaT { io.trino.plugin.hive.metastore.Table table = dropTableFromMetastore(schemaTableName); deleteTableDirectory(fileSystemFactory.create(session), schemaTableName, table.getStorage().getLocation()); + invalidateTableCache(schemaTableName); } private io.trino.plugin.hive.metastore.Table dropTableFromMetastore(SchemaTableName schemaTableName) @@ -450,14 +459,23 @@ private io.trino.plugin.hive.metastore.Table dropTableFromMetastore(SchemaTableN public void renameTable(ConnectorSession session, SchemaTableName from, SchemaTableName to) { metastore.renameTable(from.getSchemaName(), from.getTableName(), to.getSchemaName(), to.getTableName()); + invalidateTableCache(from); } @Override public Table loadTable(ConnectorSession session, SchemaTableName schemaTableName) { - TableMetadata metadata = tableMetadataCache.computeIfAbsent( - schemaTableName, - ignore -> ((BaseTable) loadIcebergTable(this, tableOperationsProvider, session, schemaTableName)).operations().current()); + TableMetadata metadata; + try { + metadata = uncheckedCacheGet( + tableMetadataCache, + schemaTableName, + () -> ((BaseTable) loadIcebergTable(this, tableOperationsProvider, session, schemaTableName)).operations().current()); + } + catch (UncheckedExecutionException e) { + throwIfUnchecked(e.getCause()); + throw e; + } return getIcebergTableWithMetadata(this, tableOperationsProvider, session, schemaTableName, metadata); } @@ -480,16 +498,6 @@ public void updateViewColumnComment(ConnectorSession session, SchemaTableName vi trinoViewHiveMetastore.updateViewColumnComment(session, viewName, columnName, comment); } - private void replaceView(ConnectorSession session, SchemaTableName viewName, io.trino.plugin.hive.metastore.Table view, ConnectorViewDefinition newDefinition) - { - io.trino.plugin.hive.metastore.Table.Builder viewBuilder = io.trino.plugin.hive.metastore.Table.builder(view) - .setViewOriginalText(Optional.of(encodeViewData(newDefinition))); - - PrincipalPrivileges principalPrivileges = isUsingSystemSecurity ? NO_PRIVILEGES : buildInitialPrivilegeSet(session.getUser()); - - metastore.replaceTable(viewName.getSchemaName(), viewName.getTableName(), viewBuilder.build(), principalPrivileges); - } - @Override public String defaultTableLocation(ConnectorSession session, SchemaTableName schemaTableName) { @@ -845,7 +853,7 @@ public Optional getMaterializedViewStorageTable(ConnectorSession sess private TableMetadata getMaterializedViewTableMetadata(ConnectorSession session, SchemaTableName storageTableName, io.trino.plugin.hive.metastore.Table materializedView) { - return tableMetadataCache.computeIfAbsent(storageTableName, ignored -> { + return uncheckedCacheGet(tableMetadataCache, storageTableName, () -> { String storageMetadataLocation = materializedView.getParameters().get(METADATA_LOCATION_PROP); checkState(storageMetadataLocation != null, "Storage location missing in definition of materialized view " + materializedView.getTableName()); TrinoFileSystem fileSystem = fileSystemFactory.create(session); @@ -898,4 +906,10 @@ public Optional redirectTable(ConnectorSession session, } return Optional.empty(); } + + @Override + protected void invalidateTableCache(SchemaTableName schemaTableName) + { + tableMetadataCache.invalidate(schemaTableName); + } } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/TrinoJdbcCatalog.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/TrinoJdbcCatalog.java index 52af89f0fbe0..fae4f5507cb1 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/TrinoJdbcCatalog.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/TrinoJdbcCatalog.java @@ -13,10 +13,13 @@ */ package io.trino.plugin.iceberg.catalog.jdbc; +import com.google.common.cache.Cache; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import com.google.common.util.concurrent.UncheckedExecutionException; import io.airlift.log.Logger; +import io.trino.cache.EvictableCacheBuilder; import io.trino.filesystem.TrinoFileSystemFactory; import io.trino.plugin.base.CatalogName; import io.trino.plugin.iceberg.catalog.AbstractTrinoCatalog; @@ -50,13 +53,14 @@ import java.util.Map; import java.util.Optional; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; import java.util.function.Predicate; import java.util.function.UnaryOperator; +import static com.google.common.base.Throwables.throwIfUnchecked; import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.collect.ImmutableMap.toImmutableMap; import static com.google.common.collect.Maps.transformValues; +import static io.trino.cache.CacheUtils.uncheckedCacheGet; import static io.trino.filesystem.Locations.appendPath; import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_CATALOG_ERROR; import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_INVALID_METADATA; @@ -75,11 +79,16 @@ public class TrinoJdbcCatalog { private static final Logger LOG = Logger.get(TrinoJdbcCatalog.class); + private static final int PER_QUERY_CACHE_SIZE = 1000; + private final JdbcCatalog jdbcCatalog; private final IcebergJdbcClient jdbcClient; private final TrinoFileSystemFactory fileSystemFactory; private final String defaultWarehouseDir; - private final Map tableMetadataCache = new ConcurrentHashMap<>(); + + private final Cache tableMetadataCache = EvictableCacheBuilder.newBuilder() + .maximumSize(PER_QUERY_CACHE_SIZE) + .build(); public TrinoJdbcCatalog( CatalogName catalogName, @@ -273,6 +282,7 @@ public void dropTable(ConnectorSession session, SchemaTableName schemaTableName) LOG.warn(e, "Failed to delete table data referenced by metadata"); } deleteTableDirectory(fileSystemFactory.create(session), schemaTableName, table.location()); + invalidateTableCache(schemaTableName); } @Override @@ -287,6 +297,7 @@ public void dropCorruptedTable(ConnectorSession session, SchemaTableName schemaT } String tableLocation = metadataLocation.get().replaceFirst("/metadata/[^/]*$", ""); deleteTableDirectory(fileSystemFactory.create(session), schemaTableName, tableLocation); + invalidateTableCache(schemaTableName); } @Override @@ -298,14 +309,23 @@ public void renameTable(ConnectorSession session, SchemaTableName from, SchemaTa catch (RuntimeException e) { throw new TrinoException(ICEBERG_CATALOG_ERROR, "Failed to rename table from %s to %s".formatted(from, to), e); } + invalidateTableCache(from); } @Override public Table loadTable(ConnectorSession session, SchemaTableName schemaTableName) { - TableMetadata metadata = tableMetadataCache.computeIfAbsent( - schemaTableName, - ignore -> ((BaseTable) loadIcebergTable(this, tableOperationsProvider, session, schemaTableName)).operations().current()); + TableMetadata metadata; + try { + metadata = uncheckedCacheGet( + tableMetadataCache, + schemaTableName, + () -> ((BaseTable) loadIcebergTable(this, tableOperationsProvider, session, schemaTableName)).operations().current()); + } + catch (UncheckedExecutionException e) { + throwIfUnchecked(e.getCause()); + throw e; + } return getIcebergTableWithMetadata(this, tableOperationsProvider, session, schemaTableName, metadata); } @@ -447,6 +467,12 @@ public Optional redirectTable(ConnectorSession session, return Optional.empty(); } + @Override + protected void invalidateTableCache(SchemaTableName schemaTableName) + { + tableMetadataCache.invalidate(schemaTableName); + } + private static TableIdentifier toIdentifier(SchemaTableName table) { return TableIdentifier.of(table.getSchemaName(), table.getTableName()); diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/nessie/TrinoNessieCatalog.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/nessie/TrinoNessieCatalog.java index 6d5badaa8380..ab0a40a3ef9a 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/nessie/TrinoNessieCatalog.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/nessie/TrinoNessieCatalog.java @@ -13,9 +13,12 @@ */ package io.trino.plugin.iceberg.catalog.nessie; +import com.google.common.cache.Cache; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; +import com.google.common.util.concurrent.UncheckedExecutionException; +import io.trino.cache.EvictableCacheBuilder; import io.trino.filesystem.TrinoFileSystemFactory; import io.trino.plugin.base.CatalogName; import io.trino.plugin.iceberg.catalog.AbstractTrinoCatalog; @@ -49,11 +52,12 @@ import java.util.Map; import java.util.Optional; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; import java.util.function.Predicate; import java.util.function.UnaryOperator; +import static com.google.common.base.Throwables.throwIfUnchecked; import static com.google.common.collect.ImmutableList.toImmutableList; +import static io.trino.cache.CacheUtils.uncheckedCacheGet; import static io.trino.filesystem.Locations.appendPath; import static io.trino.plugin.iceberg.IcebergSchemaProperties.LOCATION_PROPERTY; import static io.trino.plugin.iceberg.IcebergUtil.getIcebergTableWithMetadata; @@ -67,11 +71,16 @@ public class TrinoNessieCatalog extends AbstractTrinoCatalog { + private static final int PER_QUERY_CACHE_SIZE = 1000; + private final String warehouseLocation; private final NessieIcebergClient nessieClient; - private final Map tableMetadataCache = new ConcurrentHashMap<>(); private final TrinoFileSystemFactory fileSystemFactory; + private final Cache tableMetadataCache = EvictableCacheBuilder.newBuilder() + .maximumSize(PER_QUERY_CACHE_SIZE) + .build(); + public TrinoNessieCatalog( CatalogName catalogName, TypeManager typeManager, @@ -184,18 +193,26 @@ public Optional> streamRelationComments( @Override public Table loadTable(ConnectorSession session, SchemaTableName table) { - TableMetadata metadata = tableMetadataCache.computeIfAbsent( - table, - ignore -> { - TableOperations operations = tableOperationsProvider.createTableOperations( - this, - session, - table.getSchemaName(), - table.getTableName(), - Optional.empty(), - Optional.empty()); - return new BaseTable(operations, quotedTableName(table)).operations().current(); - }); + TableMetadata metadata; + try { + metadata = uncheckedCacheGet( + tableMetadataCache, + table, + () -> { + TableOperations operations = tableOperationsProvider.createTableOperations( + this, + session, + table.getSchemaName(), + table.getTableName(), + Optional.empty(), + Optional.empty()); + return new BaseTable(operations, quotedTableName(table)).operations().current(); + }); + } + catch (UncheckedExecutionException e) { + throwIfUnchecked(e.getCause()); + throw e; + } return getIcebergTableWithMetadata( this, @@ -218,6 +235,7 @@ public void dropTable(ConnectorSession session, SchemaTableName schemaTableName) validateTableCanBeDropped(table); nessieClient.dropTable(toIdentifier(schemaTableName), true); deleteTableDirectory(fileSystemFactory.create(session), schemaTableName, table.location()); + invalidateTableCache(schemaTableName); } @Override @@ -230,6 +248,7 @@ public void dropCorruptedTable(ConnectorSession session, SchemaTableName schemaT public void renameTable(ConnectorSession session, SchemaTableName from, SchemaTableName to) { nessieClient.renameTable(toIdentifier(from), toIdentifier(to)); + invalidateTableCache(from); } @Override @@ -419,4 +438,10 @@ public Optional redirectTable(ConnectorSession session, { return Optional.empty(); } + + @Override + protected void invalidateTableCache(SchemaTableName schemaTableName) + { + tableMetadataCache.invalidate(schemaTableName); + } } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/TrinoRestCatalog.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/TrinoRestCatalog.java index 74927abd75e9..9357a08c158a 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/TrinoRestCatalog.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/TrinoRestCatalog.java @@ -13,11 +13,14 @@ */ package io.trino.plugin.iceberg.catalog.rest; +import com.google.common.cache.Cache; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; +import com.google.common.util.concurrent.UncheckedExecutionException; import io.jsonwebtoken.impl.DefaultJwtBuilder; import io.jsonwebtoken.jackson.io.JacksonSerializer; +import io.trino.cache.EvictableCacheBuilder; import io.trino.plugin.base.CatalogName; import io.trino.plugin.iceberg.ColumnIdentity; import io.trino.plugin.iceberg.IcebergSchemaProperties; @@ -58,12 +61,12 @@ import java.util.Map; import java.util.Optional; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; import java.util.function.Predicate; import java.util.function.UnaryOperator; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.collect.ImmutableList.toImmutableList; +import static io.trino.cache.CacheUtils.uncheckedCacheGet; import static io.trino.filesystem.Locations.appendPath; import static io.trino.plugin.hive.HiveMetadata.TABLE_COMMENT; import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_CATALOG_ERROR; @@ -76,13 +79,17 @@ public class TrinoRestCatalog implements TrinoCatalog { + private static final int PER_QUERY_CACHE_SIZE = 1000; + private final RESTSessionCatalog restSessionCatalog; private final CatalogName catalogName; private final SessionType sessionType; private final String trinoVersion; private final boolean useUniqueTableLocation; - private final Map tableCache = new ConcurrentHashMap<>(); + private final Cache tableCache = EvictableCacheBuilder.newBuilder() + .maximumSize(PER_QUERY_CACHE_SIZE) + .build(); public TrinoRestCatalog( RESTSessionCatalog restSessionCatalog, @@ -266,6 +273,7 @@ public void unregisterTable(ConnectorSession session, SchemaTableName tableName) if (!restSessionCatalog.dropTable(convert(session), toIdentifier(tableName))) { throw new TableNotFoundException(tableName); } + invalidateTableCache(tableName); } @Override @@ -274,6 +282,7 @@ public void dropTable(ConnectorSession session, SchemaTableName schemaTableName) if (!restSessionCatalog.purgeTable(convert(session), toIdentifier(schemaTableName))) { throw new TrinoException(ICEBERG_CATALOG_ERROR, format("Failed to drop table: %s", schemaTableName)); } + invalidateTableCache(schemaTableName); } @Override @@ -293,25 +302,27 @@ public void renameTable(ConnectorSession session, SchemaTableName from, SchemaTa catch (RESTException e) { throw new TrinoException(ICEBERG_CATALOG_ERROR, format("Failed to rename table %s to %s", from, to), e); } + invalidateTableCache(from); } @Override public Table loadTable(ConnectorSession session, SchemaTableName schemaTableName) { try { - return tableCache.computeIfAbsent( - schemaTableName.toString(), - key -> { + return uncheckedCacheGet( + tableCache, + schemaTableName, + () -> { BaseTable baseTable = (BaseTable) restSessionCatalog.loadTable(convert(session), toIdentifier(schemaTableName)); // Creating a new base table is necessary to adhere to Trino's expectations for quoted table names return new BaseTable(baseTable.operations(), quotedTableName(schemaTableName)); }); } - catch (NoSuchTableException e) { - throw new TableNotFoundException(schemaTableName, e); - } - catch (RuntimeException e) { - throw new TrinoException(ICEBERG_CATALOG_ERROR, format("Failed to load table: %s", schemaTableName), e); + catch (UncheckedExecutionException e) { + if (e.getCause() instanceof NoSuchTableException) { + throw new TableNotFoundException(schemaTableName, e.getCause()); + } + throw new TrinoException(ICEBERG_CATALOG_ERROR, format("Failed to load table: %s", schemaTableName), e.getCause()); } } @@ -331,6 +342,7 @@ public void updateTableComment(ConnectorSession session, SchemaTableName schemaT else { icebergTable.updateProperties().set(TABLE_COMMENT, comment.get()).commit(); } + invalidateTableCache(schemaTableName); } @Override @@ -508,6 +520,11 @@ private SessionCatalog.SessionContext convert(ConnectorSession session) }; } + private void invalidateTableCache(SchemaTableName schemaTableName) + { + tableCache.invalidate(schemaTableName); + } + private static TableIdentifier toIdentifier(SchemaTableName schemaTableName) { return TableIdentifier.of(schemaTableName.getSchemaName(), schemaTableName.getTableName());