From 53072955546064115155bb967b2dd2e0fe5117fa Mon Sep 17 00:00:00 2001 From: Piotr Findeisen Date: Wed, 15 Nov 2023 17:26:45 +0100 Subject: [PATCH] Evict session level cache when dropping Iceberg table --- .../plugin/iceberg/catalog/AbstractTrinoCatalog.java | 4 ++++ .../plugin/iceberg/catalog/glue/TrinoGlueCatalog.java | 10 ++++++++++ .../plugin/iceberg/catalog/hms/TrinoHiveCatalog.java | 10 ++++++++++ .../plugin/iceberg/catalog/jdbc/TrinoJdbcCatalog.java | 9 +++++++++ .../iceberg/catalog/nessie/TrinoNessieCatalog.java | 8 ++++++++ .../plugin/iceberg/catalog/rest/TrinoRestCatalog.java | 9 +++++++++ 6 files changed, 50 insertions(+) diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractTrinoCatalog.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractTrinoCatalog.java index dc518c966dd3..922d5293f5f3 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractTrinoCatalog.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractTrinoCatalog.java @@ -149,6 +149,7 @@ public void updateTableComment(ConnectorSession session, SchemaTableName schemaT else { icebergTable.updateProperties().set(TABLE_COMMENT, comment.get()).commit(); } + invalidateTableCache(schemaTableName); } @Override @@ -156,6 +157,7 @@ public void updateColumnComment(ConnectorSession session, SchemaTableName schema { Table icebergTable = loadTable(session, schemaTableName); icebergTable.updateSchema().updateColumnDoc(columnIdentity.getName(), comment.orElse(null)).commit(); + invalidateTableCache(schemaTableName); } @Override @@ -471,6 +473,8 @@ protected Map createMaterializedViewProperties(ConnectorSession .buildOrThrow(); } + protected abstract void invalidateTableCache(SchemaTableName schemaTableName); + protected static class MaterializedViewMayBeBeingRemovedException extends RuntimeException { diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/TrinoGlueCatalog.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/TrinoGlueCatalog.java index 995bdb127587..300a831f1ac9 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/TrinoGlueCatalog.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/TrinoGlueCatalog.java @@ -677,6 +677,7 @@ public void dropTable(ConnectorSession session, SchemaTableName schemaTableName) LOG.warn(e, "Failed to delete table data referenced by metadata"); } deleteTableDirectory(fileSystemFactory.create(session), schemaTableName, table.location()); + invalidateTableCache(schemaTableName); } @Override @@ -689,6 +690,7 @@ public void dropCorruptedTable(ConnectorSession session, SchemaTableName schemaT } String tableLocation = metadataLocation.replaceFirst("/metadata/[^/]*$", ""); deleteTableDirectory(fileSystemFactory.create(session), schemaTableName, tableLocation); + invalidateTableCache(schemaTableName); } @Override @@ -752,6 +754,7 @@ public void registerTable(ConnectorSession session, SchemaTableName schemaTableN public void unregisterTable(ConnectorSession session, SchemaTableName schemaTableName) { dropTableFromMetastore(session, schemaTableName); + invalidateTableCache(schemaTableName); } private com.amazonaws.services.glue.model.Table dropTableFromMetastore(ConnectorSession session, SchemaTableName schemaTableName) @@ -796,6 +799,7 @@ public void renameTable(ConnectorSession session, SchemaTableName from, SchemaTa createTable(to.getSchemaName(), tableInput); newTableCreated = true; deleteTable(from.getSchemaName(), from.getTableName()); + invalidateTableCache(from); } catch (RuntimeException e) { if (newTableCreated) { @@ -1486,6 +1490,12 @@ public Optional redirectTable(ConnectorSession session, return Optional.empty(); } + @Override + protected void invalidateTableCache(SchemaTableName schemaTableName) + { + tableMetadataCache.remove(schemaTableName); + } + com.amazonaws.services.glue.model.Table getTable(SchemaTableName tableName, boolean invalidateCaches) { if (invalidateCaches) { diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/TrinoHiveCatalog.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/TrinoHiveCatalog.java index baa6c0d1d058..fa227a3827f3 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/TrinoHiveCatalog.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/TrinoHiveCatalog.java @@ -365,6 +365,7 @@ private static Optional getQueryId(io.trino.plugin.hive.metastore.Table public void unregisterTable(ConnectorSession session, SchemaTableName schemaTableName) { dropTableFromMetastore(schemaTableName); + invalidateTableCache(schemaTableName); } @Override @@ -421,6 +422,7 @@ public void dropTable(ConnectorSession session, SchemaTableName schemaTableName) log.warn(e, "Failed to delete table data referenced by metadata"); } deleteTableDirectory(fileSystemFactory.create(session), schemaTableName, metastoreTable.getStorage().getLocation()); + invalidateTableCache(schemaTableName); } @Override @@ -428,6 +430,7 @@ public void dropCorruptedTable(ConnectorSession session, SchemaTableName schemaT { io.trino.plugin.hive.metastore.Table table = dropTableFromMetastore(schemaTableName); deleteTableDirectory(fileSystemFactory.create(session), schemaTableName, table.getStorage().getLocation()); + invalidateTableCache(schemaTableName); } private io.trino.plugin.hive.metastore.Table dropTableFromMetastore(SchemaTableName schemaTableName) @@ -449,6 +452,7 @@ private io.trino.plugin.hive.metastore.Table dropTableFromMetastore(SchemaTableN public void renameTable(ConnectorSession session, SchemaTableName from, SchemaTableName to) { metastore.renameTable(from.getSchemaName(), from.getTableName(), to.getSchemaName(), to.getTableName()); + invalidateTableCache(from); } @Override @@ -887,4 +891,10 @@ public Optional redirectTable(ConnectorSession session, } return Optional.empty(); } + + @Override + protected void invalidateTableCache(SchemaTableName schemaTableName) + { + tableMetadataCache.remove(schemaTableName); + } } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/TrinoJdbcCatalog.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/TrinoJdbcCatalog.java index 52af89f0fbe0..6f4090954b84 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/TrinoJdbcCatalog.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/TrinoJdbcCatalog.java @@ -273,6 +273,7 @@ public void dropTable(ConnectorSession session, SchemaTableName schemaTableName) LOG.warn(e, "Failed to delete table data referenced by metadata"); } deleteTableDirectory(fileSystemFactory.create(session), schemaTableName, table.location()); + invalidateTableCache(schemaTableName); } @Override @@ -287,6 +288,7 @@ public void dropCorruptedTable(ConnectorSession session, SchemaTableName schemaT } String tableLocation = metadataLocation.get().replaceFirst("/metadata/[^/]*$", ""); deleteTableDirectory(fileSystemFactory.create(session), schemaTableName, tableLocation); + invalidateTableCache(schemaTableName); } @Override @@ -298,6 +300,7 @@ public void renameTable(ConnectorSession session, SchemaTableName from, SchemaTa catch (RuntimeException e) { throw new TrinoException(ICEBERG_CATALOG_ERROR, "Failed to rename table from %s to %s".formatted(from, to), e); } + invalidateTableCache(from); } @Override @@ -447,6 +450,12 @@ public Optional redirectTable(ConnectorSession session, return Optional.empty(); } + @Override + protected void invalidateTableCache(SchemaTableName schemaTableName) + { + tableMetadataCache.remove(schemaTableName); + } + private static TableIdentifier toIdentifier(SchemaTableName table) { return TableIdentifier.of(table.getSchemaName(), table.getTableName()); diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/nessie/TrinoNessieCatalog.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/nessie/TrinoNessieCatalog.java index 6d5badaa8380..1a3c3a2176b5 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/nessie/TrinoNessieCatalog.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/nessie/TrinoNessieCatalog.java @@ -218,6 +218,7 @@ public void dropTable(ConnectorSession session, SchemaTableName schemaTableName) validateTableCanBeDropped(table); nessieClient.dropTable(toIdentifier(schemaTableName), true); deleteTableDirectory(fileSystemFactory.create(session), schemaTableName, table.location()); + invalidateTableCache(schemaTableName); } @Override @@ -230,6 +231,7 @@ public void dropCorruptedTable(ConnectorSession session, SchemaTableName schemaT public void renameTable(ConnectorSession session, SchemaTableName from, SchemaTableName to) { nessieClient.renameTable(toIdentifier(from), toIdentifier(to)); + invalidateTableCache(from); } @Override @@ -419,4 +421,10 @@ public Optional redirectTable(ConnectorSession session, { return Optional.empty(); } + + @Override + protected void invalidateTableCache(SchemaTableName schemaTableName) + { + tableMetadataCache.remove(schemaTableName); + } } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/TrinoRestCatalog.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/TrinoRestCatalog.java index d3a095281e93..06d174f709d5 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/TrinoRestCatalog.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/TrinoRestCatalog.java @@ -266,6 +266,7 @@ public void unregisterTable(ConnectorSession session, SchemaTableName tableName) if (!restSessionCatalog.dropTable(convert(session), toIdentifier(tableName))) { throw new TableNotFoundException(tableName); } + invalidateTableCache(tableName); } @Override @@ -274,6 +275,7 @@ public void dropTable(ConnectorSession session, SchemaTableName schemaTableName) if (!restSessionCatalog.purgeTable(convert(session), toIdentifier(schemaTableName))) { throw new TrinoException(ICEBERG_CATALOG_ERROR, format("Failed to drop table: %s", schemaTableName)); } + invalidateTableCache(schemaTableName); } @Override @@ -293,6 +295,7 @@ public void renameTable(ConnectorSession session, SchemaTableName from, SchemaTa catch (RESTException e) { throw new TrinoException(ICEBERG_CATALOG_ERROR, format("Failed to rename table %s to %s", from, to), e); } + invalidateTableCache(from); } @Override @@ -331,6 +334,7 @@ public void updateTableComment(ConnectorSession session, SchemaTableName schemaT else { icebergTable.updateProperties().set(TABLE_COMMENT, comment.get()).commit(); } + invalidateTableCache(schemaTableName); } @Override @@ -508,6 +512,11 @@ private SessionCatalog.SessionContext convert(ConnectorSession session) }; } + private void invalidateTableCache(SchemaTableName schemaTableName) + { + tableCache.remove(schemaTableName); + } + private static TableIdentifier toIdentifier(SchemaTableName schemaTableName) { return TableIdentifier.of(schemaTableName.getSchemaName(), schemaTableName.getTableName());