From ff441bf96790a2b63f5e9f644655b06760201a82 Mon Sep 17 00:00:00 2001 From: big face cat <731030576@qq.com> Date: Thu, 18 Jan 2024 15:31:40 +0800 Subject: [PATCH] Core: Close the MetricsReporter when Catalog is closed (#9353) --- .../iceberg/metrics/MetricsReporter.java | 6 +++++- .../iceberg/aws/dynamodb/DynamoDbCatalog.java | 4 ++-- .../apache/iceberg/aws/glue/GlueCatalog.java | 4 ++-- .../apache/iceberg/BaseMetastoreCatalog.java | 13 +++++++++++-- .../apache/iceberg/hadoop/HadoopCatalog.java | 4 ++-- .../iceberg/inmemory/InMemoryCatalog.java | 6 ++++++ .../org/apache/iceberg/jdbc/JdbcCatalog.java | 19 ++++++++++++++++--- .../apache/iceberg/dell/ecs/EcsCatalog.java | 4 ++-- .../apache/iceberg/nessie/NessieCatalog.java | 3 ++- .../iceberg/snowflake/SnowflakeCatalog.java | 4 ++-- 10 files changed, 50 insertions(+), 17 deletions(-) diff --git a/api/src/main/java/org/apache/iceberg/metrics/MetricsReporter.java b/api/src/main/java/org/apache/iceberg/metrics/MetricsReporter.java index 365f7f99d6f0..9958b75ca32c 100644 --- a/api/src/main/java/org/apache/iceberg/metrics/MetricsReporter.java +++ b/api/src/main/java/org/apache/iceberg/metrics/MetricsReporter.java @@ -18,11 +18,12 @@ */ package org.apache.iceberg.metrics; +import java.io.Closeable; import java.util.Map; /** This interface defines the basic API for reporting metrics for operations to a Table. */ @FunctionalInterface -public interface MetricsReporter { +public interface MetricsReporter extends Closeable { /** * A custom MetricsReporter implementation must have a no-arg constructor, which will be called @@ -40,4 +41,7 @@ default void initialize(Map properties) {} * @param report The {@link MetricsReport} to report. */ void report(MetricsReport report); + + @Override + default void close() {} } diff --git a/aws/src/main/java/org/apache/iceberg/aws/dynamodb/DynamoDbCatalog.java b/aws/src/main/java/org/apache/iceberg/aws/dynamodb/DynamoDbCatalog.java index fc1479c3a007..0c991af75076 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/dynamodb/DynamoDbCatalog.java +++ b/aws/src/main/java/org/apache/iceberg/aws/dynamodb/DynamoDbCatalog.java @@ -18,7 +18,6 @@ */ package org.apache.iceberg.aws.dynamodb; -import java.io.Closeable; import java.io.IOException; import java.util.List; import java.util.Map; @@ -87,7 +86,7 @@ /** DynamoDB implementation of Iceberg catalog */ public class DynamoDbCatalog extends BaseMetastoreCatalog - implements Closeable, SupportsNamespaces, Configurable { + implements SupportsNamespaces, Configurable { private static final Logger LOG = LoggerFactory.getLogger(DynamoDbCatalog.class); private static final int CATALOG_TABLE_CREATION_WAIT_ATTEMPTS_MAX = 5; @@ -143,6 +142,7 @@ void initialize( this.closeableGroup = new CloseableGroup(); closeableGroup.addCloseable(dynamo); closeableGroup.addCloseable(fileIO); + closeableGroup.addCloseable(metricsReporter()); closeableGroup.setSuppressCloseFailure(true); ensureCatalogTableExistsOrCreate(); diff --git a/aws/src/main/java/org/apache/iceberg/aws/glue/GlueCatalog.java b/aws/src/main/java/org/apache/iceberg/aws/glue/GlueCatalog.java index 6e95379c1de5..bdc245273178 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/glue/GlueCatalog.java +++ b/aws/src/main/java/org/apache/iceberg/aws/glue/GlueCatalog.java @@ -21,7 +21,6 @@ import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.Caffeine; import com.github.benmanes.caffeine.cache.RemovalListener; -import java.io.Closeable; import java.io.IOException; import java.util.List; import java.util.Map; @@ -85,7 +84,7 @@ import software.amazon.awssdk.services.glue.model.UpdateDatabaseRequest; public class GlueCatalog extends BaseMetastoreCatalog - implements Closeable, SupportsNamespaces, Configurable { + implements SupportsNamespaces, Configurable { private static final Logger LOG = LoggerFactory.getLogger(GlueCatalog.class); @@ -197,6 +196,7 @@ void initialize( this.closeableGroup = new CloseableGroup(); closeableGroup.addCloseable(glue); closeableGroup.addCloseable(lockManager); + closeableGroup.addCloseable(metricsReporter()); closeableGroup.setSuppressCloseFailure(true); this.fileIOCloser = newFileIOCloser(); } diff --git a/core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java b/core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java index a683533473be..bb7d5a0ffd9d 100644 --- a/core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java +++ b/core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java @@ -18,6 +18,8 @@ */ package org.apache.iceberg; +import java.io.Closeable; +import java.io.IOException; import java.util.Map; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.TableIdentifier; @@ -34,7 +36,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public abstract class BaseMetastoreCatalog implements Catalog { +public abstract class BaseMetastoreCatalog implements Catalog, Closeable { private static final Logger LOG = LoggerFactory.getLogger(BaseMetastoreCatalog.class); private MetricsReporter metricsReporter; @@ -305,11 +307,18 @@ protected static String fullTableName(String catalogName, TableIdentifier identi return sb.toString(); } - private MetricsReporter metricsReporter() { + protected MetricsReporter metricsReporter() { if (metricsReporter == null) { metricsReporter = CatalogUtil.loadMetricsReporter(properties()); } return metricsReporter; } + + @Override + public void close() throws IOException { + if (metricsReporter != null) { + metricsReporter.close(); + } + } } diff --git a/core/src/main/java/org/apache/iceberg/hadoop/HadoopCatalog.java b/core/src/main/java/org/apache/iceberg/hadoop/HadoopCatalog.java index e9ed4dcd280d..92ba25af0f1c 100644 --- a/core/src/main/java/org/apache/iceberg/hadoop/HadoopCatalog.java +++ b/core/src/main/java/org/apache/iceberg/hadoop/HadoopCatalog.java @@ -18,7 +18,6 @@ */ package org.apache.iceberg.hadoop; -import java.io.Closeable; import java.io.FileNotFoundException; import java.io.IOException; import java.io.UncheckedIOException; @@ -78,7 +77,7 @@ *

Note: The HadoopCatalog requires that the underlying file system supports atomic rename. */ public class HadoopCatalog extends BaseMetastoreCatalog - implements Closeable, SupportsNamespaces, Configurable { + implements SupportsNamespaces, Configurable { private static final Logger LOG = LoggerFactory.getLogger(HadoopCatalog.class); @@ -122,6 +121,7 @@ public void initialize(String name, Map properties) { this.closeableGroup = new CloseableGroup(); closeableGroup.addCloseable(lockManager); + closeableGroup.addCloseable(metricsReporter()); closeableGroup.setSuppressCloseFailure(true); this.suppressPermissionError = diff --git a/core/src/main/java/org/apache/iceberg/inmemory/InMemoryCatalog.java b/core/src/main/java/org/apache/iceberg/inmemory/InMemoryCatalog.java index 51d242f93419..a880f94f4385 100644 --- a/core/src/main/java/org/apache/iceberg/inmemory/InMemoryCatalog.java +++ b/core/src/main/java/org/apache/iceberg/inmemory/InMemoryCatalog.java @@ -42,6 +42,7 @@ import org.apache.iceberg.exceptions.NoSuchNamespaceException; import org.apache.iceberg.exceptions.NoSuchTableException; import org.apache.iceberg.exceptions.NoSuchViewException; +import org.apache.iceberg.io.CloseableGroup; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.relocated.com.google.common.base.Joiner; import org.apache.iceberg.relocated.com.google.common.base.Objects; @@ -68,6 +69,7 @@ public class InMemoryCatalog extends BaseMetastoreViewCatalog private FileIO io; private String catalogName; private String warehouseLocation; + private CloseableGroup closeableGroup; public InMemoryCatalog() { this.namespaces = Maps.newConcurrentMap(); @@ -87,6 +89,9 @@ public void initialize(String name, Map properties) { String warehouse = properties.getOrDefault(CatalogProperties.WAREHOUSE_LOCATION, ""); this.warehouseLocation = warehouse.replaceAll("/*$", ""); this.io = new InMemoryFileIO(); + this.closeableGroup = new CloseableGroup(); + closeableGroup.addCloseable(metricsReporter()); + closeableGroup.setSuppressCloseFailure(true); } @Override @@ -302,6 +307,7 @@ public List listNamespaces(Namespace namespace) throws NoSuchNamespac @Override public void close() throws IOException { + closeableGroup.close(); namespaces.clear(); tables.clear(); views.clear(); diff --git a/core/src/main/java/org/apache/iceberg/jdbc/JdbcCatalog.java b/core/src/main/java/org/apache/iceberg/jdbc/JdbcCatalog.java index 314595dd0255..0bab6ade4c85 100644 --- a/core/src/main/java/org/apache/iceberg/jdbc/JdbcCatalog.java +++ b/core/src/main/java/org/apache/iceberg/jdbc/JdbcCatalog.java @@ -18,7 +18,8 @@ */ package org.apache.iceberg.jdbc; -import java.io.Closeable; +import java.io.IOException; +import java.io.UncheckedIOException; import java.sql.DatabaseMetaData; import java.sql.PreparedStatement; import java.sql.ResultSet; @@ -50,6 +51,7 @@ import org.apache.iceberg.exceptions.NoSuchTableException; import org.apache.iceberg.exceptions.NotFoundException; import org.apache.iceberg.hadoop.Configurable; +import org.apache.iceberg.io.CloseableGroup; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.relocated.com.google.common.base.Joiner; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; @@ -62,7 +64,7 @@ import org.slf4j.LoggerFactory; public class JdbcCatalog extends BaseMetastoreCatalog - implements Configurable, SupportsNamespaces, Closeable { + implements Configurable, SupportsNamespaces { public static final String PROPERTY_PREFIX = "jdbc."; private static final String NAMESPACE_EXISTS_PROPERTY = "exists"; @@ -78,6 +80,7 @@ public class JdbcCatalog extends BaseMetastoreCatalog private final Function, FileIO> ioBuilder; private final Function, JdbcClientPool> clientPoolBuilder; private final boolean initializeCatalogTables; + private CloseableGroup closeableGroup; public JdbcCatalog() { this(null, null, true); @@ -140,6 +143,10 @@ public void initialize(String name, Map properties) { Thread.currentThread().interrupt(); throw new UncheckedInterruptedException(e, "Interrupted in call to initialize"); } + this.closeableGroup = new CloseableGroup(); + closeableGroup.addCloseable(metricsReporter()); + closeableGroup.addCloseable(connections); + closeableGroup.setSuppressCloseFailure(true); } private void initializeCatalogTables() throws InterruptedException, SQLException { @@ -482,7 +489,13 @@ public boolean removeProperties(Namespace namespace, Set properties) @Override public void close() { - connections.close(); + if (closeableGroup != null) { + try { + closeableGroup.close(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } } @Override diff --git a/dell/src/main/java/org/apache/iceberg/dell/ecs/EcsCatalog.java b/dell/src/main/java/org/apache/iceberg/dell/ecs/EcsCatalog.java index f951c8c937ea..07ad68365837 100644 --- a/dell/src/main/java/org/apache/iceberg/dell/ecs/EcsCatalog.java +++ b/dell/src/main/java/org/apache/iceberg/dell/ecs/EcsCatalog.java @@ -26,7 +26,6 @@ import com.emc.object.s3.bean.S3Object; import com.emc.object.s3.request.ListObjectsRequest; import com.emc.object.s3.request.PutObjectRequest; -import java.io.Closeable; import java.io.IOException; import java.io.InputStream; import java.io.UncheckedIOException; @@ -64,7 +63,7 @@ import org.slf4j.LoggerFactory; public class EcsCatalog extends BaseMetastoreCatalog - implements Closeable, SupportsNamespaces, Configurable { + implements SupportsNamespaces, Configurable { /** Suffix of table metadata object */ private static final String TABLE_OBJECT_SUFFIX = ".table"; @@ -111,6 +110,7 @@ public void initialize(String name, Map properties) { this.closeableGroup = new CloseableGroup(); closeableGroup.addCloseable(client::destroy); closeableGroup.addCloseable(fileIO); + closeableGroup.addCloseable(metricsReporter()); closeableGroup.setSuppressCloseFailure(true); } diff --git a/nessie/src/main/java/org/apache/iceberg/nessie/NessieCatalog.java b/nessie/src/main/java/org/apache/iceberg/nessie/NessieCatalog.java index 6a877893ef09..cce6fcf144ef 100644 --- a/nessie/src/main/java/org/apache/iceberg/nessie/NessieCatalog.java +++ b/nessie/src/main/java/org/apache/iceberg/nessie/NessieCatalog.java @@ -60,7 +60,7 @@ /** Nessie implementation of Iceberg Catalog. */ public class NessieCatalog extends BaseMetastoreViewCatalog - implements AutoCloseable, SupportsNamespaces, Configurable { + implements SupportsNamespaces, Configurable { private static final Logger LOG = LoggerFactory.getLogger(NessieCatalog.class); private static final Joiner SLASH = Joiner.on("/"); @@ -176,6 +176,7 @@ public void initialize( this.closeableGroup = new CloseableGroup(); closeableGroup.addCloseable(client); closeableGroup.addCloseable(fileIO); + closeableGroup.addCloseable(metricsReporter()); closeableGroup.setSuppressCloseFailure(true); } diff --git a/snowflake/src/main/java/org/apache/iceberg/snowflake/SnowflakeCatalog.java b/snowflake/src/main/java/org/apache/iceberg/snowflake/SnowflakeCatalog.java index dd20c8ded9e2..06dacad185c6 100644 --- a/snowflake/src/main/java/org/apache/iceberg/snowflake/SnowflakeCatalog.java +++ b/snowflake/src/main/java/org/apache/iceberg/snowflake/SnowflakeCatalog.java @@ -18,7 +18,6 @@ */ package org.apache.iceberg.snowflake; -import java.io.Closeable; import java.io.IOException; import java.util.List; import java.util.Map; @@ -45,7 +44,7 @@ import org.slf4j.LoggerFactory; public class SnowflakeCatalog extends BaseMetastoreCatalog - implements Closeable, SupportsNamespaces, Configurable { + implements SupportsNamespaces, Configurable { private static final String DEFAULT_CATALOG_NAME = "snowflake_catalog"; private static final String DEFAULT_FILE_IO_IMPL = "org.apache.iceberg.io.ResolvingFileIO"; // Specifies the name of a Snowflake's partner application to connect through JDBC. @@ -157,6 +156,7 @@ void initialize( this.catalogProperties = properties; this.closeableGroup = new CloseableGroup(); closeableGroup.addCloseable(snowflakeClient); + closeableGroup.addCloseable(metricsReporter()); closeableGroup.setSuppressCloseFailure(true); }