From ae28d45cf2e52a2e4f06a64c858cccdb82a7e751 Mon Sep 17 00:00:00 2001 From: Piotr Findeisen Date: Wed, 15 Nov 2023 17:37:41 +0100 Subject: [PATCH 1/4] Remove unused method --- .../plugin/iceberg/catalog/hms/TrinoHiveCatalog.java | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/TrinoHiveCatalog.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/TrinoHiveCatalog.java index c3bcde1a1b73..baa6c0d1d058 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/TrinoHiveCatalog.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/TrinoHiveCatalog.java @@ -86,7 +86,6 @@ import static io.trino.plugin.hive.TableType.EXTERNAL_TABLE; import static io.trino.plugin.hive.TableType.VIRTUAL_VIEW; import static io.trino.plugin.hive.ViewReaderUtil.ICEBERG_MATERIALIZED_VIEW_COMMENT; -import static io.trino.plugin.hive.ViewReaderUtil.encodeViewData; import static io.trino.plugin.hive.ViewReaderUtil.isSomeKindOfAView; import static io.trino.plugin.hive.ViewReaderUtil.isTrinoMaterializedView; import static io.trino.plugin.hive.metastore.MetastoreUtil.buildInitialPrivilegeSet; @@ -480,16 +479,6 @@ public void updateViewColumnComment(ConnectorSession session, SchemaTableName vi trinoViewHiveMetastore.updateViewColumnComment(session, viewName, columnName, comment); } - private void replaceView(ConnectorSession session, SchemaTableName viewName, io.trino.plugin.hive.metastore.Table view, ConnectorViewDefinition newDefinition) - { - io.trino.plugin.hive.metastore.Table.Builder viewBuilder = io.trino.plugin.hive.metastore.Table.builder(view) - .setViewOriginalText(Optional.of(encodeViewData(newDefinition))); - - PrincipalPrivileges principalPrivileges = isUsingSystemSecurity ? NO_PRIVILEGES : buildInitialPrivilegeSet(session.getUser()); - - metastore.replaceTable(viewName.getSchemaName(), viewName.getTableName(), viewBuilder.build(), principalPrivileges); - } - @Override public String defaultTableLocation(ConnectorSession session, SchemaTableName schemaTableName) { From 7789b7a4f490eb72ad2c2dc0346a5d94cb14174e Mon Sep 17 00:00:00 2001 From: Piotr Findeisen Date: Wed, 15 Nov 2023 17:38:07 +0100 Subject: [PATCH 2/4] Fix REST Iceberg catalog for names with dots Let it differentiate between "a"."b.c" and "a.b"."c" tables. --- .../trino/plugin/iceberg/catalog/rest/TrinoRestCatalog.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/TrinoRestCatalog.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/TrinoRestCatalog.java index 74927abd75e9..d3a095281e93 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/TrinoRestCatalog.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/TrinoRestCatalog.java @@ -82,7 +82,7 @@ public class TrinoRestCatalog private final String trinoVersion; private final boolean useUniqueTableLocation; - private final Map tableCache = new ConcurrentHashMap<>(); + private final Map tableCache = new ConcurrentHashMap<>(); public TrinoRestCatalog( RESTSessionCatalog restSessionCatalog, @@ -300,7 +300,7 @@ public Table loadTable(ConnectorSession session, SchemaTableName schemaTableName { try { return tableCache.computeIfAbsent( - schemaTableName.toString(), + schemaTableName, key -> { BaseTable baseTable = (BaseTable) restSessionCatalog.loadTable(convert(session), toIdentifier(schemaTableName)); // Creating a new base table is necessary to adhere to Trino's expectations for quoted table names From 53072955546064115155bb967b2dd2e0fe5117fa Mon Sep 17 00:00:00 2001 From: Piotr Findeisen Date: Wed, 15 Nov 2023 17:26:45 +0100 Subject: [PATCH 3/4] Evict session level cache when dropping Iceberg table --- .../plugin/iceberg/catalog/AbstractTrinoCatalog.java | 4 ++++ .../plugin/iceberg/catalog/glue/TrinoGlueCatalog.java | 10 ++++++++++ .../plugin/iceberg/catalog/hms/TrinoHiveCatalog.java | 10 ++++++++++ .../plugin/iceberg/catalog/jdbc/TrinoJdbcCatalog.java | 9 +++++++++ .../iceberg/catalog/nessie/TrinoNessieCatalog.java | 8 ++++++++ .../plugin/iceberg/catalog/rest/TrinoRestCatalog.java | 9 +++++++++ 6 files changed, 50 insertions(+) diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractTrinoCatalog.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractTrinoCatalog.java index dc518c966dd3..922d5293f5f3 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractTrinoCatalog.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractTrinoCatalog.java @@ -149,6 +149,7 @@ public void updateTableComment(ConnectorSession session, SchemaTableName schemaT else { icebergTable.updateProperties().set(TABLE_COMMENT, comment.get()).commit(); } + invalidateTableCache(schemaTableName); } @Override @@ -156,6 +157,7 @@ public void updateColumnComment(ConnectorSession session, SchemaTableName schema { Table icebergTable = loadTable(session, schemaTableName); icebergTable.updateSchema().updateColumnDoc(columnIdentity.getName(), comment.orElse(null)).commit(); + invalidateTableCache(schemaTableName); } @Override @@ -471,6 +473,8 @@ protected Map createMaterializedViewProperties(ConnectorSession .buildOrThrow(); } + protected abstract void invalidateTableCache(SchemaTableName schemaTableName); + protected static class MaterializedViewMayBeBeingRemovedException extends RuntimeException { diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/TrinoGlueCatalog.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/TrinoGlueCatalog.java index 995bdb127587..300a831f1ac9 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/TrinoGlueCatalog.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/TrinoGlueCatalog.java @@ -677,6 +677,7 @@ public void dropTable(ConnectorSession session, SchemaTableName schemaTableName) LOG.warn(e, "Failed to delete table data referenced by metadata"); } deleteTableDirectory(fileSystemFactory.create(session), schemaTableName, table.location()); + invalidateTableCache(schemaTableName); } @Override @@ -689,6 +690,7 @@ public void dropCorruptedTable(ConnectorSession session, SchemaTableName schemaT } String tableLocation = metadataLocation.replaceFirst("/metadata/[^/]*$", ""); deleteTableDirectory(fileSystemFactory.create(session), schemaTableName, tableLocation); + invalidateTableCache(schemaTableName); } @Override @@ -752,6 +754,7 @@ public void registerTable(ConnectorSession session, SchemaTableName schemaTableN public void unregisterTable(ConnectorSession session, SchemaTableName schemaTableName) { dropTableFromMetastore(session, schemaTableName); + invalidateTableCache(schemaTableName); } private com.amazonaws.services.glue.model.Table dropTableFromMetastore(ConnectorSession session, SchemaTableName schemaTableName) @@ -796,6 +799,7 @@ public void renameTable(ConnectorSession session, SchemaTableName from, SchemaTa createTable(to.getSchemaName(), tableInput); newTableCreated = true; deleteTable(from.getSchemaName(), from.getTableName()); + invalidateTableCache(from); } catch (RuntimeException e) { if (newTableCreated) { @@ -1486,6 +1490,12 @@ public Optional redirectTable(ConnectorSession session, return Optional.empty(); } + @Override + protected void invalidateTableCache(SchemaTableName schemaTableName) + { + tableMetadataCache.remove(schemaTableName); + } + com.amazonaws.services.glue.model.Table getTable(SchemaTableName tableName, boolean invalidateCaches) { if (invalidateCaches) { diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/TrinoHiveCatalog.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/TrinoHiveCatalog.java index baa6c0d1d058..fa227a3827f3 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/TrinoHiveCatalog.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/TrinoHiveCatalog.java @@ -365,6 +365,7 @@ private static Optional getQueryId(io.trino.plugin.hive.metastore.Table public void unregisterTable(ConnectorSession session, SchemaTableName schemaTableName) { dropTableFromMetastore(schemaTableName); + invalidateTableCache(schemaTableName); } @Override @@ -421,6 +422,7 @@ public void dropTable(ConnectorSession session, SchemaTableName schemaTableName) log.warn(e, "Failed to delete table data referenced by metadata"); } deleteTableDirectory(fileSystemFactory.create(session), schemaTableName, metastoreTable.getStorage().getLocation()); + invalidateTableCache(schemaTableName); } @Override @@ -428,6 +430,7 @@ public void dropCorruptedTable(ConnectorSession session, SchemaTableName schemaT { io.trino.plugin.hive.metastore.Table table = dropTableFromMetastore(schemaTableName); deleteTableDirectory(fileSystemFactory.create(session), schemaTableName, table.getStorage().getLocation()); + invalidateTableCache(schemaTableName); } private io.trino.plugin.hive.metastore.Table dropTableFromMetastore(SchemaTableName schemaTableName) @@ -449,6 +452,7 @@ private io.trino.plugin.hive.metastore.Table dropTableFromMetastore(SchemaTableN public void renameTable(ConnectorSession session, SchemaTableName from, SchemaTableName to) { metastore.renameTable(from.getSchemaName(), from.getTableName(), to.getSchemaName(), to.getTableName()); + invalidateTableCache(from); } @Override @@ -887,4 +891,10 @@ public Optional redirectTable(ConnectorSession session, } return Optional.empty(); } + + @Override + protected void invalidateTableCache(SchemaTableName schemaTableName) + { + tableMetadataCache.remove(schemaTableName); + } } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/TrinoJdbcCatalog.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/TrinoJdbcCatalog.java index 52af89f0fbe0..6f4090954b84 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/TrinoJdbcCatalog.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/TrinoJdbcCatalog.java @@ -273,6 +273,7 @@ public void dropTable(ConnectorSession session, SchemaTableName schemaTableName) LOG.warn(e, "Failed to delete table data referenced by metadata"); } deleteTableDirectory(fileSystemFactory.create(session), schemaTableName, table.location()); + invalidateTableCache(schemaTableName); } @Override @@ -287,6 +288,7 @@ public void dropCorruptedTable(ConnectorSession session, SchemaTableName schemaT } String tableLocation = metadataLocation.get().replaceFirst("/metadata/[^/]*$", ""); deleteTableDirectory(fileSystemFactory.create(session), schemaTableName, tableLocation); + invalidateTableCache(schemaTableName); } @Override @@ -298,6 +300,7 @@ public void renameTable(ConnectorSession session, SchemaTableName from, SchemaTa catch (RuntimeException e) { throw new TrinoException(ICEBERG_CATALOG_ERROR, "Failed to rename table from %s to %s".formatted(from, to), e); } + invalidateTableCache(from); } @Override @@ -447,6 +450,12 @@ public Optional redirectTable(ConnectorSession session, return Optional.empty(); } + @Override + protected void invalidateTableCache(SchemaTableName schemaTableName) + { + tableMetadataCache.remove(schemaTableName); + } + private static TableIdentifier toIdentifier(SchemaTableName table) { return TableIdentifier.of(table.getSchemaName(), table.getTableName()); diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/nessie/TrinoNessieCatalog.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/nessie/TrinoNessieCatalog.java index 6d5badaa8380..1a3c3a2176b5 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/nessie/TrinoNessieCatalog.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/nessie/TrinoNessieCatalog.java @@ -218,6 +218,7 @@ public void dropTable(ConnectorSession session, SchemaTableName schemaTableName) validateTableCanBeDropped(table); nessieClient.dropTable(toIdentifier(schemaTableName), true); deleteTableDirectory(fileSystemFactory.create(session), schemaTableName, table.location()); + invalidateTableCache(schemaTableName); } @Override @@ -230,6 +231,7 @@ public void dropCorruptedTable(ConnectorSession session, SchemaTableName schemaT public void renameTable(ConnectorSession session, SchemaTableName from, SchemaTableName to) { nessieClient.renameTable(toIdentifier(from), toIdentifier(to)); + invalidateTableCache(from); } @Override @@ -419,4 +421,10 @@ public Optional redirectTable(ConnectorSession session, { return Optional.empty(); } + + @Override + protected void invalidateTableCache(SchemaTableName schemaTableName) + { + tableMetadataCache.remove(schemaTableName); + } } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/TrinoRestCatalog.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/TrinoRestCatalog.java index d3a095281e93..06d174f709d5 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/TrinoRestCatalog.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/TrinoRestCatalog.java @@ -266,6 +266,7 @@ public void unregisterTable(ConnectorSession session, SchemaTableName tableName) if (!restSessionCatalog.dropTable(convert(session), toIdentifier(tableName))) { throw new TableNotFoundException(tableName); } + invalidateTableCache(tableName); } @Override @@ -274,6 +275,7 @@ public void dropTable(ConnectorSession session, SchemaTableName schemaTableName) if (!restSessionCatalog.purgeTable(convert(session), toIdentifier(schemaTableName))) { throw new TrinoException(ICEBERG_CATALOG_ERROR, format("Failed to drop table: %s", schemaTableName)); } + invalidateTableCache(schemaTableName); } @Override @@ -293,6 +295,7 @@ public void renameTable(ConnectorSession session, SchemaTableName from, SchemaTa catch (RESTException e) { throw new TrinoException(ICEBERG_CATALOG_ERROR, format("Failed to rename table %s to %s", from, to), e); } + invalidateTableCache(from); } @Override @@ -331,6 +334,7 @@ public void updateTableComment(ConnectorSession session, SchemaTableName schemaT else { icebergTable.updateProperties().set(TABLE_COMMENT, comment.get()).commit(); } + invalidateTableCache(schemaTableName); } @Override @@ -508,6 +512,11 @@ private SessionCatalog.SessionContext convert(ConnectorSession session) }; } + private void invalidateTableCache(SchemaTableName schemaTableName) + { + tableCache.remove(schemaTableName); + } + private static TableIdentifier toIdentifier(SchemaTableName schemaTableName) { return TableIdentifier.of(schemaTableName.getSchemaName(), schemaTableName.getTableName()); From ad51682a587d0d3a97e3262726389ff4e3b0d2be Mon Sep 17 00:00:00 2001 From: Piotr Findeisen Date: Wed, 15 Nov 2023 17:25:04 +0100 Subject: [PATCH 4/4] Limit session state during metadata queries in Iceberg Metadata queries such as `information_schema.columns`, `system.jdbc.columns` or `system.metadata.table_comments` may end up loading arbitrary number of relations within single query (transaction). It is important to bound memory usage for such queries. In case of Iceberg Hive metastore based catalog, this is already done in `TrinoHiveCatalogFactory` bu means of configuring per-query `CachingHiveMetastore`. However, catalogs with explicit caching need something similar. --- .../main/java/io/trino/cache/CacheUtils.java | 7 ++ .../catalog/glue/TrinoGlueCatalog.java | 102 +++++++++++------- .../iceberg/catalog/hms/TrinoHiveCatalog.java | 29 +++-- .../catalog/jdbc/TrinoJdbcCatalog.java | 29 +++-- .../catalog/nessie/TrinoNessieCatalog.java | 47 +++++--- .../catalog/rest/TrinoRestCatalog.java | 28 +++-- 6 files changed, 164 insertions(+), 78 deletions(-) diff --git a/lib/trino-cache/src/main/java/io/trino/cache/CacheUtils.java b/lib/trino-cache/src/main/java/io/trino/cache/CacheUtils.java index 614ebaa210f2..313eca6ca3c1 100644 --- a/lib/trino-cache/src/main/java/io/trino/cache/CacheUtils.java +++ b/lib/trino-cache/src/main/java/io/trino/cache/CacheUtils.java @@ -14,6 +14,8 @@ package io.trino.cache; import com.google.common.cache.Cache; +import com.google.common.util.concurrent.ExecutionError; +import com.google.common.util.concurrent.UncheckedExecutionException; import java.util.List; import java.util.concurrent.ExecutionException; @@ -26,6 +28,11 @@ public final class CacheUtils { private CacheUtils() {} + /** + * @throws UncheckedExecutionException when {@code loader} throws an unchecked exception + * @throws ExecutionError when {@code loader} throws an {@link Error} + * @throws RuntimeException when{@code loader} throws a checked exception (which should not happen) + */ public static V uncheckedCacheGet(Cache cache, K key, Supplier loader) { try { diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/TrinoGlueCatalog.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/TrinoGlueCatalog.java index 300a831f1ac9..6761c555c00c 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/TrinoGlueCatalog.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/TrinoGlueCatalog.java @@ -95,7 +95,6 @@ import java.util.Objects; import java.util.Optional; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; import java.util.function.Consumer; import java.util.function.Predicate; import java.util.function.UnaryOperator; @@ -105,6 +104,7 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; import static com.google.common.base.Throwables.throwIfInstanceOf; +import static com.google.common.base.Throwables.throwIfUnchecked; import static com.google.common.base.Verify.verify; import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.collect.ImmutableSet.toImmutableSet; @@ -180,9 +180,16 @@ public class TrinoGlueCatalog // Even though this is query-scoped, this still needs to be bounded. information_schema queries can access large number of tables. .maximumSize(Math.max(PER_QUERY_CACHE_SIZE, IcebergMetadata.GET_METADATA_BATCH_SIZE)) .build(); - private final Map tableMetadataCache = new ConcurrentHashMap<>(); - private final Map viewCache = new ConcurrentHashMap<>(); - private final Map materializedViewCache = new ConcurrentHashMap<>(); + + private final Cache tableMetadataCache = EvictableCacheBuilder.newBuilder() + .maximumSize(PER_QUERY_CACHE_SIZE) + .build(); + private final Cache viewCache = EvictableCacheBuilder.newBuilder() + .maximumSize(PER_QUERY_CACHE_SIZE) + .build(); + private final Cache materializedViewCache = EvictableCacheBuilder.newBuilder() + .maximumSize(PER_QUERY_CACHE_SIZE) + .build(); public TrinoGlueCatalog( CatalogName catalogName, @@ -558,22 +565,30 @@ private void getCommentsFromIcebergMetadata( @Override public Table loadTable(ConnectorSession session, SchemaTableName table) { - if (viewCache.containsKey(table) || materializedViewCache.containsKey(table)) { + if (viewCache.asMap().containsKey(table) || materializedViewCache.asMap().containsKey(table)) { throw new TableNotFoundException(table); } - TableMetadata metadata = tableMetadataCache.computeIfAbsent( - table, - ignore -> { - TableOperations operations = tableOperationsProvider.createTableOperations( - this, - session, - table.getSchemaName(), - table.getTableName(), - Optional.empty(), - Optional.empty()); - return new BaseTable(operations, quotedTableName(table), TRINO_METRICS_REPORTER).operations().current(); - }); + TableMetadata metadata; + try { + metadata = uncheckedCacheGet( + tableMetadataCache, + table, + () -> { + TableOperations operations = tableOperationsProvider.createTableOperations( + this, + session, + table.getSchemaName(), + table.getTableName(), + Optional.empty(), + Optional.empty()); + return new BaseTable(operations, quotedTableName(table), TRINO_METRICS_REPORTER).operations().current(); + }); + } + catch (UncheckedExecutionException e) { + throwIfUnchecked(e.getCause()); + throw e; + } return getIcebergTableWithMetadata( this, @@ -612,7 +627,7 @@ public Map> tryGetColumnMetadata(Connector private Optional> getCachedColumnMetadata(SchemaTableName tableName) { - if (!cacheTableMetadata || viewCache.containsKey(tableName) || materializedViewCache.containsKey(tableName)) { + if (!cacheTableMetadata || viewCache.asMap().containsKey(tableName) || materializedViewCache.asMap().containsKey(tableName)) { return Optional.empty(); } @@ -828,37 +843,41 @@ private Optional getTableAndCacheMetada String tableType = getTableType(table); Map parameters = getTableParameters(table); - if (isIcebergTable(parameters) && !tableMetadataCache.containsKey(schemaTableName)) { - if (viewCache.containsKey(schemaTableName) || materializedViewCache.containsKey(schemaTableName)) { + if (isIcebergTable(parameters) && !tableMetadataCache.asMap().containsKey(schemaTableName)) { + if (viewCache.asMap().containsKey(schemaTableName) || materializedViewCache.asMap().containsKey(schemaTableName)) { throw new TrinoException(GENERIC_INTERNAL_ERROR, "Glue table cache inconsistency. Table cannot also be a view/materialized view"); } String metadataLocation = parameters.get(METADATA_LOCATION_PROP); try { // Cache the TableMetadata while we have the Table retrieved anyway - tableMetadataCache.put(schemaTableName, TableMetadataParser.read(new ForwardingFileIo(fileSystemFactory.create(session)), metadataLocation)); + // Note: this is racy from cache invalidation perspective, but it should not matter here + uncheckedCacheGet(tableMetadataCache, schemaTableName, () -> TableMetadataParser.read(new ForwardingFileIo(fileSystemFactory.create(session)), metadataLocation)); } catch (RuntimeException e) { LOG.warn(e, "Failed to cache table metadata from table at %s", metadataLocation); } } else if (isTrinoMaterializedView(tableType, parameters)) { - if (viewCache.containsKey(schemaTableName) || tableMetadataCache.containsKey(schemaTableName)) { + if (viewCache.asMap().containsKey(schemaTableName) || tableMetadataCache.asMap().containsKey(schemaTableName)) { throw new TrinoException(GENERIC_INTERNAL_ERROR, "Glue table cache inconsistency. Materialized View cannot also be a table or view"); } try { - ConnectorMaterializedViewDefinition materializedView = createMaterializedViewDefinition(session, schemaTableName, table); - materializedViewCache.put(schemaTableName, new MaterializedViewData( - materializedView, - Optional.ofNullable(parameters.get(METADATA_LOCATION_PROP)))); + // Note: this is racy from cache invalidation perspective, but it should not matter here + uncheckedCacheGet(materializedViewCache, schemaTableName, () -> { + ConnectorMaterializedViewDefinition materializedView = createMaterializedViewDefinition(session, schemaTableName, table); + return new MaterializedViewData( + materializedView, + Optional.ofNullable(parameters.get(METADATA_LOCATION_PROP))); + }); } catch (RuntimeException e) { LOG.warn(e, "Failed to cache materialized view from %s", schemaTableName); } } - else if (isTrinoView(tableType, parameters) && !viewCache.containsKey(schemaTableName)) { - if (materializedViewCache.containsKey(schemaTableName) || tableMetadataCache.containsKey(schemaTableName)) { + else if (isTrinoView(tableType, parameters) && !viewCache.asMap().containsKey(schemaTableName)) { + if (materializedViewCache.asMap().containsKey(schemaTableName) || tableMetadataCache.asMap().containsKey(schemaTableName)) { throw new TrinoException(GENERIC_INTERNAL_ERROR, "Glue table cache inconsistency. View cannot also be a materialized view or table"); } @@ -868,7 +887,10 @@ else if (isTrinoView(tableType, parameters) && !viewCache.containsKey(schemaTabl tableType, parameters, Optional.ofNullable(table.getOwner())) - .ifPresent(viewDefinition -> viewCache.put(schemaTableName, viewDefinition)); + .ifPresent(viewDefinition -> { + // Note: this is racy from cache invalidation perspective, but it should not matter here + uncheckedCacheGet(viewCache, schemaTableName, () -> viewDefinition); + }); } catch (RuntimeException e) { LOG.warn(e, "Failed to cache view from %s", schemaTableName); @@ -957,7 +979,7 @@ public void renameView(ConnectorSession session, SchemaTableName source, SchemaT try { com.amazonaws.services.glue.model.Table existingView = getTableAndCacheMetadata(session, source) .orElseThrow(() -> new TableNotFoundException(source)); - viewCache.remove(source); + viewCache.invalidate(source); TableInput viewTableInput = getViewTableInput( target.getTableName(), existingView.getViewOriginalText(), @@ -996,7 +1018,7 @@ public void dropView(ConnectorSession session, SchemaTableName schemaViewName) } try { - viewCache.remove(schemaViewName); + viewCache.invalidate(schemaViewName); deleteTable(schemaViewName.getSchemaName(), schemaViewName.getTableName()); } catch (AmazonServiceException e) { @@ -1031,12 +1053,12 @@ public List listViews(ConnectorSession session, Optional getView(ConnectorSession session, SchemaTableName viewName) { - ConnectorViewDefinition cachedView = viewCache.get(viewName); + ConnectorViewDefinition cachedView = viewCache.getIfPresent(viewName); if (cachedView != null) { return Optional.of(cachedView); } - if (tableMetadataCache.containsKey(viewName) || materializedViewCache.containsKey(viewName)) { + if (tableMetadataCache.asMap().containsKey(viewName) || materializedViewCache.asMap().containsKey(viewName)) { // Entries in these caches are not views return Optional.empty(); } @@ -1257,7 +1279,7 @@ public void dropMaterializedView(ConnectorSession session, SchemaTableName viewN if (!isTrinoMaterializedView(getTableType(view), getTableParameters(view))) { throw new TrinoException(UNSUPPORTED_TABLE_TYPE, "Not a Materialized View: " + view.getDatabaseName() + "." + view.getName()); } - materializedViewCache.remove(viewName); + materializedViewCache.invalidate(viewName); dropStorageTable(session, view); deleteTable(view.getDatabaseName(), view.getName()); } @@ -1281,12 +1303,12 @@ private void dropStorageTable(ConnectorSession session, com.amazonaws.services.g @Override protected Optional doGetMaterializedView(ConnectorSession session, SchemaTableName viewName) { - MaterializedViewData materializedViewData = materializedViewCache.get(viewName); + MaterializedViewData materializedViewData = materializedViewCache.getIfPresent(viewName); if (materializedViewData != null) { return Optional.of(materializedViewData.connectorMaterializedViewDefinition); } - if (tableMetadataCache.containsKey(viewName) || viewCache.containsKey(viewName)) { + if (tableMetadataCache.asMap().containsKey(viewName) || viewCache.asMap().containsKey(viewName)) { // Entries in these caches are not materialized views. return Optional.empty(); } @@ -1380,7 +1402,7 @@ private ConnectorMaterializedViewDefinition createMaterializedViewDefinition( public Optional getMaterializedViewStorageTable(ConnectorSession session, SchemaTableName viewName) { String storageMetadataLocation; - MaterializedViewData materializedViewData = materializedViewCache.get(viewName); + MaterializedViewData materializedViewData = materializedViewCache.getIfPresent(viewName); if (materializedViewData == null) { Optional maybeTable = getTableAndCacheMetadata(session, viewName); if (maybeTable.isEmpty()) { @@ -1423,7 +1445,7 @@ private TableMetadata getMaterializedViewTableMetadata(ConnectorSession session, { requireNonNull(storageTableName, "storageTableName is null"); requireNonNull(storageMetadataLocation, "storageMetadataLocation is null"); - return tableMetadataCache.computeIfAbsent(storageTableName, ignored -> { + return uncheckedCacheGet(tableMetadataCache, storageTableName, () -> { TrinoFileSystem fileSystem = fileSystemFactory.create(session); return TableMetadataParser.read(new ForwardingFileIo(fileSystem), storageMetadataLocation); }); @@ -1436,7 +1458,7 @@ public void renameMaterializedView(ConnectorSession session, SchemaTableName sou try { com.amazonaws.services.glue.model.Table glueTable = getTableAndCacheMetadata(session, source) .orElseThrow(() -> new TableNotFoundException(source)); - materializedViewCache.remove(source); + materializedViewCache.invalidate(source); Map tableParameters = getTableParameters(glueTable); if (!isTrinoMaterializedView(getTableType(glueTable), tableParameters)) { throw new TrinoException(UNSUPPORTED_TABLE_TYPE, "Not a Materialized View: " + source); @@ -1493,7 +1515,7 @@ public Optional redirectTable(ConnectorSession session, @Override protected void invalidateTableCache(SchemaTableName schemaTableName) { - tableMetadataCache.remove(schemaTableName); + tableMetadataCache.invalidate(schemaTableName); } com.amazonaws.services.glue.model.Table getTable(SchemaTableName tableName, boolean invalidateCaches) diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/TrinoHiveCatalog.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/TrinoHiveCatalog.java index fa227a3827f3..39ef750ec7de 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/TrinoHiveCatalog.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/TrinoHiveCatalog.java @@ -13,10 +13,13 @@ */ package io.trino.plugin.iceberg.catalog.hms; +import com.google.common.cache.Cache; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import com.google.common.util.concurrent.UncheckedExecutionException; import io.airlift.log.Logger; +import io.trino.cache.EvictableCacheBuilder; import io.trino.filesystem.Location; import io.trino.filesystem.TrinoFileSystem; import io.trino.filesystem.TrinoFileSystemFactory; @@ -69,14 +72,15 @@ import java.util.Objects; import java.util.Optional; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; import java.util.function.Predicate; import java.util.function.UnaryOperator; import java.util.stream.Collectors; import static com.google.common.base.Preconditions.checkState; +import static com.google.common.base.Throwables.throwIfUnchecked; import static com.google.common.base.Verify.verify; import static com.google.common.collect.ImmutableList.toImmutableList; +import static io.trino.cache.CacheUtils.uncheckedCacheGet; import static io.trino.filesystem.Locations.appendPath; import static io.trino.plugin.hive.HiveErrorCode.HIVE_DATABASE_LOCATION_ERROR; import static io.trino.plugin.hive.HiveErrorCode.HIVE_INVALID_METADATA; @@ -124,6 +128,7 @@ public class TrinoHiveCatalog extends AbstractTrinoCatalog { private static final Logger log = Logger.get(TrinoHiveCatalog.class); + private static final int PER_QUERY_CACHE_SIZE = 1000; public static final String DEPENDS_ON_TABLES = "dependsOnTables"; // Value should be ISO-8601 formatted time instant public static final String TRINO_QUERY_START_TIME = "trino-query-start-time"; @@ -135,7 +140,9 @@ public class TrinoHiveCatalog private final boolean deleteSchemaLocationsFallback; private final boolean hideMaterializedViewStorageTable; - private final Map tableMetadataCache = new ConcurrentHashMap<>(); + private final Cache tableMetadataCache = EvictableCacheBuilder.newBuilder() + .maximumSize(PER_QUERY_CACHE_SIZE) + .build(); public TrinoHiveCatalog( CatalogName catalogName, @@ -458,9 +465,17 @@ public void renameTable(ConnectorSession session, SchemaTableName from, SchemaTa @Override public Table loadTable(ConnectorSession session, SchemaTableName schemaTableName) { - TableMetadata metadata = tableMetadataCache.computeIfAbsent( - schemaTableName, - ignore -> ((BaseTable) loadIcebergTable(this, tableOperationsProvider, session, schemaTableName)).operations().current()); + TableMetadata metadata; + try { + metadata = uncheckedCacheGet( + tableMetadataCache, + schemaTableName, + () -> ((BaseTable) loadIcebergTable(this, tableOperationsProvider, session, schemaTableName)).operations().current()); + } + catch (UncheckedExecutionException e) { + throwIfUnchecked(e.getCause()); + throw e; + } return getIcebergTableWithMetadata(this, tableOperationsProvider, session, schemaTableName, metadata); } @@ -838,7 +853,7 @@ public Optional getMaterializedViewStorageTable(ConnectorSession sess private TableMetadata getMaterializedViewTableMetadata(ConnectorSession session, SchemaTableName storageTableName, io.trino.plugin.hive.metastore.Table materializedView) { - return tableMetadataCache.computeIfAbsent(storageTableName, ignored -> { + return uncheckedCacheGet(tableMetadataCache, storageTableName, () -> { String storageMetadataLocation = materializedView.getParameters().get(METADATA_LOCATION_PROP); checkState(storageMetadataLocation != null, "Storage location missing in definition of materialized view " + materializedView.getTableName()); TrinoFileSystem fileSystem = fileSystemFactory.create(session); @@ -895,6 +910,6 @@ public Optional redirectTable(ConnectorSession session, @Override protected void invalidateTableCache(SchemaTableName schemaTableName) { - tableMetadataCache.remove(schemaTableName); + tableMetadataCache.invalidate(schemaTableName); } } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/TrinoJdbcCatalog.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/TrinoJdbcCatalog.java index 6f4090954b84..fae4f5507cb1 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/TrinoJdbcCatalog.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/TrinoJdbcCatalog.java @@ -13,10 +13,13 @@ */ package io.trino.plugin.iceberg.catalog.jdbc; +import com.google.common.cache.Cache; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import com.google.common.util.concurrent.UncheckedExecutionException; import io.airlift.log.Logger; +import io.trino.cache.EvictableCacheBuilder; import io.trino.filesystem.TrinoFileSystemFactory; import io.trino.plugin.base.CatalogName; import io.trino.plugin.iceberg.catalog.AbstractTrinoCatalog; @@ -50,13 +53,14 @@ import java.util.Map; import java.util.Optional; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; import java.util.function.Predicate; import java.util.function.UnaryOperator; +import static com.google.common.base.Throwables.throwIfUnchecked; import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.collect.ImmutableMap.toImmutableMap; import static com.google.common.collect.Maps.transformValues; +import static io.trino.cache.CacheUtils.uncheckedCacheGet; import static io.trino.filesystem.Locations.appendPath; import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_CATALOG_ERROR; import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_INVALID_METADATA; @@ -75,11 +79,16 @@ public class TrinoJdbcCatalog { private static final Logger LOG = Logger.get(TrinoJdbcCatalog.class); + private static final int PER_QUERY_CACHE_SIZE = 1000; + private final JdbcCatalog jdbcCatalog; private final IcebergJdbcClient jdbcClient; private final TrinoFileSystemFactory fileSystemFactory; private final String defaultWarehouseDir; - private final Map tableMetadataCache = new ConcurrentHashMap<>(); + + private final Cache tableMetadataCache = EvictableCacheBuilder.newBuilder() + .maximumSize(PER_QUERY_CACHE_SIZE) + .build(); public TrinoJdbcCatalog( CatalogName catalogName, @@ -306,9 +315,17 @@ public void renameTable(ConnectorSession session, SchemaTableName from, SchemaTa @Override public Table loadTable(ConnectorSession session, SchemaTableName schemaTableName) { - TableMetadata metadata = tableMetadataCache.computeIfAbsent( - schemaTableName, - ignore -> ((BaseTable) loadIcebergTable(this, tableOperationsProvider, session, schemaTableName)).operations().current()); + TableMetadata metadata; + try { + metadata = uncheckedCacheGet( + tableMetadataCache, + schemaTableName, + () -> ((BaseTable) loadIcebergTable(this, tableOperationsProvider, session, schemaTableName)).operations().current()); + } + catch (UncheckedExecutionException e) { + throwIfUnchecked(e.getCause()); + throw e; + } return getIcebergTableWithMetadata(this, tableOperationsProvider, session, schemaTableName, metadata); } @@ -453,7 +470,7 @@ public Optional redirectTable(ConnectorSession session, @Override protected void invalidateTableCache(SchemaTableName schemaTableName) { - tableMetadataCache.remove(schemaTableName); + tableMetadataCache.invalidate(schemaTableName); } private static TableIdentifier toIdentifier(SchemaTableName table) diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/nessie/TrinoNessieCatalog.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/nessie/TrinoNessieCatalog.java index 1a3c3a2176b5..ab0a40a3ef9a 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/nessie/TrinoNessieCatalog.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/nessie/TrinoNessieCatalog.java @@ -13,9 +13,12 @@ */ package io.trino.plugin.iceberg.catalog.nessie; +import com.google.common.cache.Cache; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; +import com.google.common.util.concurrent.UncheckedExecutionException; +import io.trino.cache.EvictableCacheBuilder; import io.trino.filesystem.TrinoFileSystemFactory; import io.trino.plugin.base.CatalogName; import io.trino.plugin.iceberg.catalog.AbstractTrinoCatalog; @@ -49,11 +52,12 @@ import java.util.Map; import java.util.Optional; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; import java.util.function.Predicate; import java.util.function.UnaryOperator; +import static com.google.common.base.Throwables.throwIfUnchecked; import static com.google.common.collect.ImmutableList.toImmutableList; +import static io.trino.cache.CacheUtils.uncheckedCacheGet; import static io.trino.filesystem.Locations.appendPath; import static io.trino.plugin.iceberg.IcebergSchemaProperties.LOCATION_PROPERTY; import static io.trino.plugin.iceberg.IcebergUtil.getIcebergTableWithMetadata; @@ -67,11 +71,16 @@ public class TrinoNessieCatalog extends AbstractTrinoCatalog { + private static final int PER_QUERY_CACHE_SIZE = 1000; + private final String warehouseLocation; private final NessieIcebergClient nessieClient; - private final Map tableMetadataCache = new ConcurrentHashMap<>(); private final TrinoFileSystemFactory fileSystemFactory; + private final Cache tableMetadataCache = EvictableCacheBuilder.newBuilder() + .maximumSize(PER_QUERY_CACHE_SIZE) + .build(); + public TrinoNessieCatalog( CatalogName catalogName, TypeManager typeManager, @@ -184,18 +193,26 @@ public Optional> streamRelationComments( @Override public Table loadTable(ConnectorSession session, SchemaTableName table) { - TableMetadata metadata = tableMetadataCache.computeIfAbsent( - table, - ignore -> { - TableOperations operations = tableOperationsProvider.createTableOperations( - this, - session, - table.getSchemaName(), - table.getTableName(), - Optional.empty(), - Optional.empty()); - return new BaseTable(operations, quotedTableName(table)).operations().current(); - }); + TableMetadata metadata; + try { + metadata = uncheckedCacheGet( + tableMetadataCache, + table, + () -> { + TableOperations operations = tableOperationsProvider.createTableOperations( + this, + session, + table.getSchemaName(), + table.getTableName(), + Optional.empty(), + Optional.empty()); + return new BaseTable(operations, quotedTableName(table)).operations().current(); + }); + } + catch (UncheckedExecutionException e) { + throwIfUnchecked(e.getCause()); + throw e; + } return getIcebergTableWithMetadata( this, @@ -425,6 +442,6 @@ public Optional redirectTable(ConnectorSession session, @Override protected void invalidateTableCache(SchemaTableName schemaTableName) { - tableMetadataCache.remove(schemaTableName); + tableMetadataCache.invalidate(schemaTableName); } } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/TrinoRestCatalog.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/TrinoRestCatalog.java index 06d174f709d5..9357a08c158a 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/TrinoRestCatalog.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/TrinoRestCatalog.java @@ -13,11 +13,14 @@ */ package io.trino.plugin.iceberg.catalog.rest; +import com.google.common.cache.Cache; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; +import com.google.common.util.concurrent.UncheckedExecutionException; import io.jsonwebtoken.impl.DefaultJwtBuilder; import io.jsonwebtoken.jackson.io.JacksonSerializer; +import io.trino.cache.EvictableCacheBuilder; import io.trino.plugin.base.CatalogName; import io.trino.plugin.iceberg.ColumnIdentity; import io.trino.plugin.iceberg.IcebergSchemaProperties; @@ -58,12 +61,12 @@ import java.util.Map; import java.util.Optional; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; import java.util.function.Predicate; import java.util.function.UnaryOperator; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.collect.ImmutableList.toImmutableList; +import static io.trino.cache.CacheUtils.uncheckedCacheGet; import static io.trino.filesystem.Locations.appendPath; import static io.trino.plugin.hive.HiveMetadata.TABLE_COMMENT; import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_CATALOG_ERROR; @@ -76,13 +79,17 @@ public class TrinoRestCatalog implements TrinoCatalog { + private static final int PER_QUERY_CACHE_SIZE = 1000; + private final RESTSessionCatalog restSessionCatalog; private final CatalogName catalogName; private final SessionType sessionType; private final String trinoVersion; private final boolean useUniqueTableLocation; - private final Map tableCache = new ConcurrentHashMap<>(); + private final Cache tableCache = EvictableCacheBuilder.newBuilder() + .maximumSize(PER_QUERY_CACHE_SIZE) + .build(); public TrinoRestCatalog( RESTSessionCatalog restSessionCatalog, @@ -302,19 +309,20 @@ public void renameTable(ConnectorSession session, SchemaTableName from, SchemaTa public Table loadTable(ConnectorSession session, SchemaTableName schemaTableName) { try { - return tableCache.computeIfAbsent( + return uncheckedCacheGet( + tableCache, schemaTableName, - key -> { + () -> { BaseTable baseTable = (BaseTable) restSessionCatalog.loadTable(convert(session), toIdentifier(schemaTableName)); // Creating a new base table is necessary to adhere to Trino's expectations for quoted table names return new BaseTable(baseTable.operations(), quotedTableName(schemaTableName)); }); } - catch (NoSuchTableException e) { - throw new TableNotFoundException(schemaTableName, e); - } - catch (RuntimeException e) { - throw new TrinoException(ICEBERG_CATALOG_ERROR, format("Failed to load table: %s", schemaTableName), e); + catch (UncheckedExecutionException e) { + if (e.getCause() instanceof NoSuchTableException) { + throw new TableNotFoundException(schemaTableName, e.getCause()); + } + throw new TrinoException(ICEBERG_CATALOG_ERROR, format("Failed to load table: %s", schemaTableName), e.getCause()); } } @@ -514,7 +522,7 @@ private SessionCatalog.SessionContext convert(ConnectorSession session) private void invalidateTableCache(SchemaTableName schemaTableName) { - tableCache.remove(schemaTableName); + tableCache.invalidate(schemaTableName); } private static TableIdentifier toIdentifier(SchemaTableName schemaTableName)