From ad51682a587d0d3a97e3262726389ff4e3b0d2be Mon Sep 17 00:00:00 2001 From: Piotr Findeisen Date: Wed, 15 Nov 2023 17:25:04 +0100 Subject: [PATCH] Limit session state during metadata queries in Iceberg Metadata queries such as `information_schema.columns`, `system.jdbc.columns` or `system.metadata.table_comments` may end up loading arbitrary number of relations within single query (transaction). It is important to bound memory usage for such queries. In case of Iceberg Hive metastore based catalog, this is already done in `TrinoHiveCatalogFactory` bu means of configuring per-query `CachingHiveMetastore`. However, catalogs with explicit caching need something similar. --- .../main/java/io/trino/cache/CacheUtils.java | 7 ++ .../catalog/glue/TrinoGlueCatalog.java | 102 +++++++++++------- .../iceberg/catalog/hms/TrinoHiveCatalog.java | 29 +++-- .../catalog/jdbc/TrinoJdbcCatalog.java | 29 +++-- .../catalog/nessie/TrinoNessieCatalog.java | 47 +++++--- .../catalog/rest/TrinoRestCatalog.java | 28 +++-- 6 files changed, 164 insertions(+), 78 deletions(-) diff --git a/lib/trino-cache/src/main/java/io/trino/cache/CacheUtils.java b/lib/trino-cache/src/main/java/io/trino/cache/CacheUtils.java index 614ebaa210f2..313eca6ca3c1 100644 --- a/lib/trino-cache/src/main/java/io/trino/cache/CacheUtils.java +++ b/lib/trino-cache/src/main/java/io/trino/cache/CacheUtils.java @@ -14,6 +14,8 @@ package io.trino.cache; import com.google.common.cache.Cache; +import com.google.common.util.concurrent.ExecutionError; +import com.google.common.util.concurrent.UncheckedExecutionException; import java.util.List; import java.util.concurrent.ExecutionException; @@ -26,6 +28,11 @@ public final class CacheUtils { private CacheUtils() {} + /** + * @throws UncheckedExecutionException when {@code loader} throws an unchecked exception + * @throws ExecutionError when {@code loader} throws an {@link Error} + * @throws RuntimeException when{@code loader} throws a checked exception (which should not happen) + */ public static V uncheckedCacheGet(Cache cache, K key, Supplier loader) { try { diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/TrinoGlueCatalog.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/TrinoGlueCatalog.java index 300a831f1ac9..6761c555c00c 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/TrinoGlueCatalog.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/TrinoGlueCatalog.java @@ -95,7 +95,6 @@ import java.util.Objects; import java.util.Optional; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; import java.util.function.Consumer; import java.util.function.Predicate; import java.util.function.UnaryOperator; @@ -105,6 +104,7 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; import static com.google.common.base.Throwables.throwIfInstanceOf; +import static com.google.common.base.Throwables.throwIfUnchecked; import static com.google.common.base.Verify.verify; import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.collect.ImmutableSet.toImmutableSet; @@ -180,9 +180,16 @@ public class TrinoGlueCatalog // Even though this is query-scoped, this still needs to be bounded. information_schema queries can access large number of tables. .maximumSize(Math.max(PER_QUERY_CACHE_SIZE, IcebergMetadata.GET_METADATA_BATCH_SIZE)) .build(); - private final Map tableMetadataCache = new ConcurrentHashMap<>(); - private final Map viewCache = new ConcurrentHashMap<>(); - private final Map materializedViewCache = new ConcurrentHashMap<>(); + + private final Cache tableMetadataCache = EvictableCacheBuilder.newBuilder() + .maximumSize(PER_QUERY_CACHE_SIZE) + .build(); + private final Cache viewCache = EvictableCacheBuilder.newBuilder() + .maximumSize(PER_QUERY_CACHE_SIZE) + .build(); + private final Cache materializedViewCache = EvictableCacheBuilder.newBuilder() + .maximumSize(PER_QUERY_CACHE_SIZE) + .build(); public TrinoGlueCatalog( CatalogName catalogName, @@ -558,22 +565,30 @@ private void getCommentsFromIcebergMetadata( @Override public Table loadTable(ConnectorSession session, SchemaTableName table) { - if (viewCache.containsKey(table) || materializedViewCache.containsKey(table)) { + if (viewCache.asMap().containsKey(table) || materializedViewCache.asMap().containsKey(table)) { throw new TableNotFoundException(table); } - TableMetadata metadata = tableMetadataCache.computeIfAbsent( - table, - ignore -> { - TableOperations operations = tableOperationsProvider.createTableOperations( - this, - session, - table.getSchemaName(), - table.getTableName(), - Optional.empty(), - Optional.empty()); - return new BaseTable(operations, quotedTableName(table), TRINO_METRICS_REPORTER).operations().current(); - }); + TableMetadata metadata; + try { + metadata = uncheckedCacheGet( + tableMetadataCache, + table, + () -> { + TableOperations operations = tableOperationsProvider.createTableOperations( + this, + session, + table.getSchemaName(), + table.getTableName(), + Optional.empty(), + Optional.empty()); + return new BaseTable(operations, quotedTableName(table), TRINO_METRICS_REPORTER).operations().current(); + }); + } + catch (UncheckedExecutionException e) { + throwIfUnchecked(e.getCause()); + throw e; + } return getIcebergTableWithMetadata( this, @@ -612,7 +627,7 @@ public Map> tryGetColumnMetadata(Connector private Optional> getCachedColumnMetadata(SchemaTableName tableName) { - if (!cacheTableMetadata || viewCache.containsKey(tableName) || materializedViewCache.containsKey(tableName)) { + if (!cacheTableMetadata || viewCache.asMap().containsKey(tableName) || materializedViewCache.asMap().containsKey(tableName)) { return Optional.empty(); } @@ -828,37 +843,41 @@ private Optional getTableAndCacheMetada String tableType = getTableType(table); Map parameters = getTableParameters(table); - if (isIcebergTable(parameters) && !tableMetadataCache.containsKey(schemaTableName)) { - if (viewCache.containsKey(schemaTableName) || materializedViewCache.containsKey(schemaTableName)) { + if (isIcebergTable(parameters) && !tableMetadataCache.asMap().containsKey(schemaTableName)) { + if (viewCache.asMap().containsKey(schemaTableName) || materializedViewCache.asMap().containsKey(schemaTableName)) { throw new TrinoException(GENERIC_INTERNAL_ERROR, "Glue table cache inconsistency. Table cannot also be a view/materialized view"); } String metadataLocation = parameters.get(METADATA_LOCATION_PROP); try { // Cache the TableMetadata while we have the Table retrieved anyway - tableMetadataCache.put(schemaTableName, TableMetadataParser.read(new ForwardingFileIo(fileSystemFactory.create(session)), metadataLocation)); + // Note: this is racy from cache invalidation perspective, but it should not matter here + uncheckedCacheGet(tableMetadataCache, schemaTableName, () -> TableMetadataParser.read(new ForwardingFileIo(fileSystemFactory.create(session)), metadataLocation)); } catch (RuntimeException e) { LOG.warn(e, "Failed to cache table metadata from table at %s", metadataLocation); } } else if (isTrinoMaterializedView(tableType, parameters)) { - if (viewCache.containsKey(schemaTableName) || tableMetadataCache.containsKey(schemaTableName)) { + if (viewCache.asMap().containsKey(schemaTableName) || tableMetadataCache.asMap().containsKey(schemaTableName)) { throw new TrinoException(GENERIC_INTERNAL_ERROR, "Glue table cache inconsistency. Materialized View cannot also be a table or view"); } try { - ConnectorMaterializedViewDefinition materializedView = createMaterializedViewDefinition(session, schemaTableName, table); - materializedViewCache.put(schemaTableName, new MaterializedViewData( - materializedView, - Optional.ofNullable(parameters.get(METADATA_LOCATION_PROP)))); + // Note: this is racy from cache invalidation perspective, but it should not matter here + uncheckedCacheGet(materializedViewCache, schemaTableName, () -> { + ConnectorMaterializedViewDefinition materializedView = createMaterializedViewDefinition(session, schemaTableName, table); + return new MaterializedViewData( + materializedView, + Optional.ofNullable(parameters.get(METADATA_LOCATION_PROP))); + }); } catch (RuntimeException e) { LOG.warn(e, "Failed to cache materialized view from %s", schemaTableName); } } - else if (isTrinoView(tableType, parameters) && !viewCache.containsKey(schemaTableName)) { - if (materializedViewCache.containsKey(schemaTableName) || tableMetadataCache.containsKey(schemaTableName)) { + else if (isTrinoView(tableType, parameters) && !viewCache.asMap().containsKey(schemaTableName)) { + if (materializedViewCache.asMap().containsKey(schemaTableName) || tableMetadataCache.asMap().containsKey(schemaTableName)) { throw new TrinoException(GENERIC_INTERNAL_ERROR, "Glue table cache inconsistency. View cannot also be a materialized view or table"); } @@ -868,7 +887,10 @@ else if (isTrinoView(tableType, parameters) && !viewCache.containsKey(schemaTabl tableType, parameters, Optional.ofNullable(table.getOwner())) - .ifPresent(viewDefinition -> viewCache.put(schemaTableName, viewDefinition)); + .ifPresent(viewDefinition -> { + // Note: this is racy from cache invalidation perspective, but it should not matter here + uncheckedCacheGet(viewCache, schemaTableName, () -> viewDefinition); + }); } catch (RuntimeException e) { LOG.warn(e, "Failed to cache view from %s", schemaTableName); @@ -957,7 +979,7 @@ public void renameView(ConnectorSession session, SchemaTableName source, SchemaT try { com.amazonaws.services.glue.model.Table existingView = getTableAndCacheMetadata(session, source) .orElseThrow(() -> new TableNotFoundException(source)); - viewCache.remove(source); + viewCache.invalidate(source); TableInput viewTableInput = getViewTableInput( target.getTableName(), existingView.getViewOriginalText(), @@ -996,7 +1018,7 @@ public void dropView(ConnectorSession session, SchemaTableName schemaViewName) } try { - viewCache.remove(schemaViewName); + viewCache.invalidate(schemaViewName); deleteTable(schemaViewName.getSchemaName(), schemaViewName.getTableName()); } catch (AmazonServiceException e) { @@ -1031,12 +1053,12 @@ public List listViews(ConnectorSession session, Optional getView(ConnectorSession session, SchemaTableName viewName) { - ConnectorViewDefinition cachedView = viewCache.get(viewName); + ConnectorViewDefinition cachedView = viewCache.getIfPresent(viewName); if (cachedView != null) { return Optional.of(cachedView); } - if (tableMetadataCache.containsKey(viewName) || materializedViewCache.containsKey(viewName)) { + if (tableMetadataCache.asMap().containsKey(viewName) || materializedViewCache.asMap().containsKey(viewName)) { // Entries in these caches are not views return Optional.empty(); } @@ -1257,7 +1279,7 @@ public void dropMaterializedView(ConnectorSession session, SchemaTableName viewN if (!isTrinoMaterializedView(getTableType(view), getTableParameters(view))) { throw new TrinoException(UNSUPPORTED_TABLE_TYPE, "Not a Materialized View: " + view.getDatabaseName() + "." + view.getName()); } - materializedViewCache.remove(viewName); + materializedViewCache.invalidate(viewName); dropStorageTable(session, view); deleteTable(view.getDatabaseName(), view.getName()); } @@ -1281,12 +1303,12 @@ private void dropStorageTable(ConnectorSession session, com.amazonaws.services.g @Override protected Optional doGetMaterializedView(ConnectorSession session, SchemaTableName viewName) { - MaterializedViewData materializedViewData = materializedViewCache.get(viewName); + MaterializedViewData materializedViewData = materializedViewCache.getIfPresent(viewName); if (materializedViewData != null) { return Optional.of(materializedViewData.connectorMaterializedViewDefinition); } - if (tableMetadataCache.containsKey(viewName) || viewCache.containsKey(viewName)) { + if (tableMetadataCache.asMap().containsKey(viewName) || viewCache.asMap().containsKey(viewName)) { // Entries in these caches are not materialized views. return Optional.empty(); } @@ -1380,7 +1402,7 @@ private ConnectorMaterializedViewDefinition createMaterializedViewDefinition( public Optional getMaterializedViewStorageTable(ConnectorSession session, SchemaTableName viewName) { String storageMetadataLocation; - MaterializedViewData materializedViewData = materializedViewCache.get(viewName); + MaterializedViewData materializedViewData = materializedViewCache.getIfPresent(viewName); if (materializedViewData == null) { Optional maybeTable = getTableAndCacheMetadata(session, viewName); if (maybeTable.isEmpty()) { @@ -1423,7 +1445,7 @@ private TableMetadata getMaterializedViewTableMetadata(ConnectorSession session, { requireNonNull(storageTableName, "storageTableName is null"); requireNonNull(storageMetadataLocation, "storageMetadataLocation is null"); - return tableMetadataCache.computeIfAbsent(storageTableName, ignored -> { + return uncheckedCacheGet(tableMetadataCache, storageTableName, () -> { TrinoFileSystem fileSystem = fileSystemFactory.create(session); return TableMetadataParser.read(new ForwardingFileIo(fileSystem), storageMetadataLocation); }); @@ -1436,7 +1458,7 @@ public void renameMaterializedView(ConnectorSession session, SchemaTableName sou try { com.amazonaws.services.glue.model.Table glueTable = getTableAndCacheMetadata(session, source) .orElseThrow(() -> new TableNotFoundException(source)); - materializedViewCache.remove(source); + materializedViewCache.invalidate(source); Map tableParameters = getTableParameters(glueTable); if (!isTrinoMaterializedView(getTableType(glueTable), tableParameters)) { throw new TrinoException(UNSUPPORTED_TABLE_TYPE, "Not a Materialized View: " + source); @@ -1493,7 +1515,7 @@ public Optional redirectTable(ConnectorSession session, @Override protected void invalidateTableCache(SchemaTableName schemaTableName) { - tableMetadataCache.remove(schemaTableName); + tableMetadataCache.invalidate(schemaTableName); } com.amazonaws.services.glue.model.Table getTable(SchemaTableName tableName, boolean invalidateCaches) diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/TrinoHiveCatalog.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/TrinoHiveCatalog.java index fa227a3827f3..39ef750ec7de 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/TrinoHiveCatalog.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/TrinoHiveCatalog.java @@ -13,10 +13,13 @@ */ package io.trino.plugin.iceberg.catalog.hms; +import com.google.common.cache.Cache; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import com.google.common.util.concurrent.UncheckedExecutionException; import io.airlift.log.Logger; +import io.trino.cache.EvictableCacheBuilder; import io.trino.filesystem.Location; import io.trino.filesystem.TrinoFileSystem; import io.trino.filesystem.TrinoFileSystemFactory; @@ -69,14 +72,15 @@ import java.util.Objects; import java.util.Optional; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; import java.util.function.Predicate; import java.util.function.UnaryOperator; import java.util.stream.Collectors; import static com.google.common.base.Preconditions.checkState; +import static com.google.common.base.Throwables.throwIfUnchecked; import static com.google.common.base.Verify.verify; import static com.google.common.collect.ImmutableList.toImmutableList; +import static io.trino.cache.CacheUtils.uncheckedCacheGet; import static io.trino.filesystem.Locations.appendPath; import static io.trino.plugin.hive.HiveErrorCode.HIVE_DATABASE_LOCATION_ERROR; import static io.trino.plugin.hive.HiveErrorCode.HIVE_INVALID_METADATA; @@ -124,6 +128,7 @@ public class TrinoHiveCatalog extends AbstractTrinoCatalog { private static final Logger log = Logger.get(TrinoHiveCatalog.class); + private static final int PER_QUERY_CACHE_SIZE = 1000; public static final String DEPENDS_ON_TABLES = "dependsOnTables"; // Value should be ISO-8601 formatted time instant public static final String TRINO_QUERY_START_TIME = "trino-query-start-time"; @@ -135,7 +140,9 @@ public class TrinoHiveCatalog private final boolean deleteSchemaLocationsFallback; private final boolean hideMaterializedViewStorageTable; - private final Map tableMetadataCache = new ConcurrentHashMap<>(); + private final Cache tableMetadataCache = EvictableCacheBuilder.newBuilder() + .maximumSize(PER_QUERY_CACHE_SIZE) + .build(); public TrinoHiveCatalog( CatalogName catalogName, @@ -458,9 +465,17 @@ public void renameTable(ConnectorSession session, SchemaTableName from, SchemaTa @Override public Table loadTable(ConnectorSession session, SchemaTableName schemaTableName) { - TableMetadata metadata = tableMetadataCache.computeIfAbsent( - schemaTableName, - ignore -> ((BaseTable) loadIcebergTable(this, tableOperationsProvider, session, schemaTableName)).operations().current()); + TableMetadata metadata; + try { + metadata = uncheckedCacheGet( + tableMetadataCache, + schemaTableName, + () -> ((BaseTable) loadIcebergTable(this, tableOperationsProvider, session, schemaTableName)).operations().current()); + } + catch (UncheckedExecutionException e) { + throwIfUnchecked(e.getCause()); + throw e; + } return getIcebergTableWithMetadata(this, tableOperationsProvider, session, schemaTableName, metadata); } @@ -838,7 +853,7 @@ public Optional getMaterializedViewStorageTable(ConnectorSession sess private TableMetadata getMaterializedViewTableMetadata(ConnectorSession session, SchemaTableName storageTableName, io.trino.plugin.hive.metastore.Table materializedView) { - return tableMetadataCache.computeIfAbsent(storageTableName, ignored -> { + return uncheckedCacheGet(tableMetadataCache, storageTableName, () -> { String storageMetadataLocation = materializedView.getParameters().get(METADATA_LOCATION_PROP); checkState(storageMetadataLocation != null, "Storage location missing in definition of materialized view " + materializedView.getTableName()); TrinoFileSystem fileSystem = fileSystemFactory.create(session); @@ -895,6 +910,6 @@ public Optional redirectTable(ConnectorSession session, @Override protected void invalidateTableCache(SchemaTableName schemaTableName) { - tableMetadataCache.remove(schemaTableName); + tableMetadataCache.invalidate(schemaTableName); } } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/TrinoJdbcCatalog.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/TrinoJdbcCatalog.java index 6f4090954b84..fae4f5507cb1 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/TrinoJdbcCatalog.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/TrinoJdbcCatalog.java @@ -13,10 +13,13 @@ */ package io.trino.plugin.iceberg.catalog.jdbc; +import com.google.common.cache.Cache; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import com.google.common.util.concurrent.UncheckedExecutionException; import io.airlift.log.Logger; +import io.trino.cache.EvictableCacheBuilder; import io.trino.filesystem.TrinoFileSystemFactory; import io.trino.plugin.base.CatalogName; import io.trino.plugin.iceberg.catalog.AbstractTrinoCatalog; @@ -50,13 +53,14 @@ import java.util.Map; import java.util.Optional; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; import java.util.function.Predicate; import java.util.function.UnaryOperator; +import static com.google.common.base.Throwables.throwIfUnchecked; import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.collect.ImmutableMap.toImmutableMap; import static com.google.common.collect.Maps.transformValues; +import static io.trino.cache.CacheUtils.uncheckedCacheGet; import static io.trino.filesystem.Locations.appendPath; import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_CATALOG_ERROR; import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_INVALID_METADATA; @@ -75,11 +79,16 @@ public class TrinoJdbcCatalog { private static final Logger LOG = Logger.get(TrinoJdbcCatalog.class); + private static final int PER_QUERY_CACHE_SIZE = 1000; + private final JdbcCatalog jdbcCatalog; private final IcebergJdbcClient jdbcClient; private final TrinoFileSystemFactory fileSystemFactory; private final String defaultWarehouseDir; - private final Map tableMetadataCache = new ConcurrentHashMap<>(); + + private final Cache tableMetadataCache = EvictableCacheBuilder.newBuilder() + .maximumSize(PER_QUERY_CACHE_SIZE) + .build(); public TrinoJdbcCatalog( CatalogName catalogName, @@ -306,9 +315,17 @@ public void renameTable(ConnectorSession session, SchemaTableName from, SchemaTa @Override public Table loadTable(ConnectorSession session, SchemaTableName schemaTableName) { - TableMetadata metadata = tableMetadataCache.computeIfAbsent( - schemaTableName, - ignore -> ((BaseTable) loadIcebergTable(this, tableOperationsProvider, session, schemaTableName)).operations().current()); + TableMetadata metadata; + try { + metadata = uncheckedCacheGet( + tableMetadataCache, + schemaTableName, + () -> ((BaseTable) loadIcebergTable(this, tableOperationsProvider, session, schemaTableName)).operations().current()); + } + catch (UncheckedExecutionException e) { + throwIfUnchecked(e.getCause()); + throw e; + } return getIcebergTableWithMetadata(this, tableOperationsProvider, session, schemaTableName, metadata); } @@ -453,7 +470,7 @@ public Optional redirectTable(ConnectorSession session, @Override protected void invalidateTableCache(SchemaTableName schemaTableName) { - tableMetadataCache.remove(schemaTableName); + tableMetadataCache.invalidate(schemaTableName); } private static TableIdentifier toIdentifier(SchemaTableName table) diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/nessie/TrinoNessieCatalog.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/nessie/TrinoNessieCatalog.java index 1a3c3a2176b5..ab0a40a3ef9a 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/nessie/TrinoNessieCatalog.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/nessie/TrinoNessieCatalog.java @@ -13,9 +13,12 @@ */ package io.trino.plugin.iceberg.catalog.nessie; +import com.google.common.cache.Cache; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; +import com.google.common.util.concurrent.UncheckedExecutionException; +import io.trino.cache.EvictableCacheBuilder; import io.trino.filesystem.TrinoFileSystemFactory; import io.trino.plugin.base.CatalogName; import io.trino.plugin.iceberg.catalog.AbstractTrinoCatalog; @@ -49,11 +52,12 @@ import java.util.Map; import java.util.Optional; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; import java.util.function.Predicate; import java.util.function.UnaryOperator; +import static com.google.common.base.Throwables.throwIfUnchecked; import static com.google.common.collect.ImmutableList.toImmutableList; +import static io.trino.cache.CacheUtils.uncheckedCacheGet; import static io.trino.filesystem.Locations.appendPath; import static io.trino.plugin.iceberg.IcebergSchemaProperties.LOCATION_PROPERTY; import static io.trino.plugin.iceberg.IcebergUtil.getIcebergTableWithMetadata; @@ -67,11 +71,16 @@ public class TrinoNessieCatalog extends AbstractTrinoCatalog { + private static final int PER_QUERY_CACHE_SIZE = 1000; + private final String warehouseLocation; private final NessieIcebergClient nessieClient; - private final Map tableMetadataCache = new ConcurrentHashMap<>(); private final TrinoFileSystemFactory fileSystemFactory; + private final Cache tableMetadataCache = EvictableCacheBuilder.newBuilder() + .maximumSize(PER_QUERY_CACHE_SIZE) + .build(); + public TrinoNessieCatalog( CatalogName catalogName, TypeManager typeManager, @@ -184,18 +193,26 @@ public Optional> streamRelationComments( @Override public Table loadTable(ConnectorSession session, SchemaTableName table) { - TableMetadata metadata = tableMetadataCache.computeIfAbsent( - table, - ignore -> { - TableOperations operations = tableOperationsProvider.createTableOperations( - this, - session, - table.getSchemaName(), - table.getTableName(), - Optional.empty(), - Optional.empty()); - return new BaseTable(operations, quotedTableName(table)).operations().current(); - }); + TableMetadata metadata; + try { + metadata = uncheckedCacheGet( + tableMetadataCache, + table, + () -> { + TableOperations operations = tableOperationsProvider.createTableOperations( + this, + session, + table.getSchemaName(), + table.getTableName(), + Optional.empty(), + Optional.empty()); + return new BaseTable(operations, quotedTableName(table)).operations().current(); + }); + } + catch (UncheckedExecutionException e) { + throwIfUnchecked(e.getCause()); + throw e; + } return getIcebergTableWithMetadata( this, @@ -425,6 +442,6 @@ public Optional redirectTable(ConnectorSession session, @Override protected void invalidateTableCache(SchemaTableName schemaTableName) { - tableMetadataCache.remove(schemaTableName); + tableMetadataCache.invalidate(schemaTableName); } } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/TrinoRestCatalog.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/TrinoRestCatalog.java index 06d174f709d5..9357a08c158a 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/TrinoRestCatalog.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/TrinoRestCatalog.java @@ -13,11 +13,14 @@ */ package io.trino.plugin.iceberg.catalog.rest; +import com.google.common.cache.Cache; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; +import com.google.common.util.concurrent.UncheckedExecutionException; import io.jsonwebtoken.impl.DefaultJwtBuilder; import io.jsonwebtoken.jackson.io.JacksonSerializer; +import io.trino.cache.EvictableCacheBuilder; import io.trino.plugin.base.CatalogName; import io.trino.plugin.iceberg.ColumnIdentity; import io.trino.plugin.iceberg.IcebergSchemaProperties; @@ -58,12 +61,12 @@ import java.util.Map; import java.util.Optional; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; import java.util.function.Predicate; import java.util.function.UnaryOperator; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.collect.ImmutableList.toImmutableList; +import static io.trino.cache.CacheUtils.uncheckedCacheGet; import static io.trino.filesystem.Locations.appendPath; import static io.trino.plugin.hive.HiveMetadata.TABLE_COMMENT; import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_CATALOG_ERROR; @@ -76,13 +79,17 @@ public class TrinoRestCatalog implements TrinoCatalog { + private static final int PER_QUERY_CACHE_SIZE = 1000; + private final RESTSessionCatalog restSessionCatalog; private final CatalogName catalogName; private final SessionType sessionType; private final String trinoVersion; private final boolean useUniqueTableLocation; - private final Map tableCache = new ConcurrentHashMap<>(); + private final Cache tableCache = EvictableCacheBuilder.newBuilder() + .maximumSize(PER_QUERY_CACHE_SIZE) + .build(); public TrinoRestCatalog( RESTSessionCatalog restSessionCatalog, @@ -302,19 +309,20 @@ public void renameTable(ConnectorSession session, SchemaTableName from, SchemaTa public Table loadTable(ConnectorSession session, SchemaTableName schemaTableName) { try { - return tableCache.computeIfAbsent( + return uncheckedCacheGet( + tableCache, schemaTableName, - key -> { + () -> { BaseTable baseTable = (BaseTable) restSessionCatalog.loadTable(convert(session), toIdentifier(schemaTableName)); // Creating a new base table is necessary to adhere to Trino's expectations for quoted table names return new BaseTable(baseTable.operations(), quotedTableName(schemaTableName)); }); } - catch (NoSuchTableException e) { - throw new TableNotFoundException(schemaTableName, e); - } - catch (RuntimeException e) { - throw new TrinoException(ICEBERG_CATALOG_ERROR, format("Failed to load table: %s", schemaTableName), e); + catch (UncheckedExecutionException e) { + if (e.getCause() instanceof NoSuchTableException) { + throw new TableNotFoundException(schemaTableName, e.getCause()); + } + throw new TrinoException(ICEBERG_CATALOG_ERROR, format("Failed to load table: %s", schemaTableName), e.getCause()); } } @@ -514,7 +522,7 @@ private SessionCatalog.SessionContext convert(ConnectorSession session) private void invalidateTableCache(SchemaTableName schemaTableName) { - tableCache.remove(schemaTableName); + tableCache.invalidate(schemaTableName); } private static TableIdentifier toIdentifier(SchemaTableName schemaTableName)