Skip to content

Commit

Permalink
Core: Close the MetricsReporter when Catalog is closed (apache#9353)
Browse files Browse the repository at this point in the history
  • Loading branch information
huyuanfeng2018 authored and adnanhemani committed Jan 30, 2024
1 parent 42d027a commit ff441bf
Show file tree
Hide file tree
Showing 10 changed files with 50 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@
*/
package org.apache.iceberg.metrics;

import java.io.Closeable;
import java.util.Map;

/** This interface defines the basic API for reporting metrics for operations to a Table. */
@FunctionalInterface
public interface MetricsReporter {
public interface MetricsReporter extends Closeable {

/**
* A custom MetricsReporter implementation must have a no-arg constructor, which will be called
Expand All @@ -40,4 +41,7 @@ default void initialize(Map<String, String> properties) {}
* @param report The {@link MetricsReport} to report.
*/
void report(MetricsReport report);

@Override
default void close() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
*/
package org.apache.iceberg.aws.dynamodb;

import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -87,7 +86,7 @@

/** DynamoDB implementation of Iceberg catalog */
public class DynamoDbCatalog extends BaseMetastoreCatalog
implements Closeable, SupportsNamespaces, Configurable {
implements SupportsNamespaces, Configurable {

private static final Logger LOG = LoggerFactory.getLogger(DynamoDbCatalog.class);
private static final int CATALOG_TABLE_CREATION_WAIT_ATTEMPTS_MAX = 5;
Expand Down Expand Up @@ -143,6 +142,7 @@ void initialize(
this.closeableGroup = new CloseableGroup();
closeableGroup.addCloseable(dynamo);
closeableGroup.addCloseable(fileIO);
closeableGroup.addCloseable(metricsReporter());
closeableGroup.setSuppressCloseFailure(true);

ensureCatalogTableExistsOrCreate();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.RemovalListener;
import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -85,7 +84,7 @@
import software.amazon.awssdk.services.glue.model.UpdateDatabaseRequest;

public class GlueCatalog extends BaseMetastoreCatalog
implements Closeable, SupportsNamespaces, Configurable<Configuration> {
implements SupportsNamespaces, Configurable<Configuration> {

private static final Logger LOG = LoggerFactory.getLogger(GlueCatalog.class);

Expand Down Expand Up @@ -197,6 +196,7 @@ void initialize(
this.closeableGroup = new CloseableGroup();
closeableGroup.addCloseable(glue);
closeableGroup.addCloseable(lockManager);
closeableGroup.addCloseable(metricsReporter());
closeableGroup.setSuppressCloseFailure(true);
this.fileIOCloser = newFileIOCloser();
}
Expand Down
13 changes: 11 additions & 2 deletions core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.apache.iceberg;

import java.io.Closeable;
import java.io.IOException;
import java.util.Map;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
Expand All @@ -34,7 +36,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class BaseMetastoreCatalog implements Catalog {
public abstract class BaseMetastoreCatalog implements Catalog, Closeable {
private static final Logger LOG = LoggerFactory.getLogger(BaseMetastoreCatalog.class);

private MetricsReporter metricsReporter;
Expand Down Expand Up @@ -305,11 +307,18 @@ protected static String fullTableName(String catalogName, TableIdentifier identi
return sb.toString();
}

private MetricsReporter metricsReporter() {
protected MetricsReporter metricsReporter() {
if (metricsReporter == null) {
metricsReporter = CatalogUtil.loadMetricsReporter(properties());
}

return metricsReporter;
}

@Override
public void close() throws IOException {
if (metricsReporter != null) {
metricsReporter.close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
*/
package org.apache.iceberg.hadoop;

import java.io.Closeable;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.UncheckedIOException;
Expand Down Expand Up @@ -78,7 +77,7 @@
* <p>Note: The HadoopCatalog requires that the underlying file system supports atomic rename.
*/
public class HadoopCatalog extends BaseMetastoreCatalog
implements Closeable, SupportsNamespaces, Configurable {
implements SupportsNamespaces, Configurable {

private static final Logger LOG = LoggerFactory.getLogger(HadoopCatalog.class);

Expand Down Expand Up @@ -122,6 +121,7 @@ public void initialize(String name, Map<String, String> properties) {

this.closeableGroup = new CloseableGroup();
closeableGroup.addCloseable(lockManager);
closeableGroup.addCloseable(metricsReporter());
closeableGroup.setSuppressCloseFailure(true);

this.suppressPermissionError =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.iceberg.exceptions.NoSuchNamespaceException;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.exceptions.NoSuchViewException;
import org.apache.iceberg.io.CloseableGroup;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.base.Joiner;
import org.apache.iceberg.relocated.com.google.common.base.Objects;
Expand All @@ -68,6 +69,7 @@ public class InMemoryCatalog extends BaseMetastoreViewCatalog
private FileIO io;
private String catalogName;
private String warehouseLocation;
private CloseableGroup closeableGroup;

public InMemoryCatalog() {
this.namespaces = Maps.newConcurrentMap();
Expand All @@ -87,6 +89,9 @@ public void initialize(String name, Map<String, String> properties) {
String warehouse = properties.getOrDefault(CatalogProperties.WAREHOUSE_LOCATION, "");
this.warehouseLocation = warehouse.replaceAll("/*$", "");
this.io = new InMemoryFileIO();
this.closeableGroup = new CloseableGroup();
closeableGroup.addCloseable(metricsReporter());
closeableGroup.setSuppressCloseFailure(true);
}

@Override
Expand Down Expand Up @@ -302,6 +307,7 @@ public List<Namespace> listNamespaces(Namespace namespace) throws NoSuchNamespac

@Override
public void close() throws IOException {
closeableGroup.close();
namespaces.clear();
tables.clear();
views.clear();
Expand Down
19 changes: 16 additions & 3 deletions core/src/main/java/org/apache/iceberg/jdbc/JdbcCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
*/
package org.apache.iceberg.jdbc;

import java.io.Closeable;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.sql.DatabaseMetaData;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
Expand Down Expand Up @@ -50,6 +51,7 @@
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.exceptions.NotFoundException;
import org.apache.iceberg.hadoop.Configurable;
import org.apache.iceberg.io.CloseableGroup;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.base.Joiner;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
Expand All @@ -62,7 +64,7 @@
import org.slf4j.LoggerFactory;

public class JdbcCatalog extends BaseMetastoreCatalog
implements Configurable<Object>, SupportsNamespaces, Closeable {
implements Configurable<Object>, SupportsNamespaces {

public static final String PROPERTY_PREFIX = "jdbc.";
private static final String NAMESPACE_EXISTS_PROPERTY = "exists";
Expand All @@ -78,6 +80,7 @@ public class JdbcCatalog extends BaseMetastoreCatalog
private final Function<Map<String, String>, FileIO> ioBuilder;
private final Function<Map<String, String>, JdbcClientPool> clientPoolBuilder;
private final boolean initializeCatalogTables;
private CloseableGroup closeableGroup;

public JdbcCatalog() {
this(null, null, true);
Expand Down Expand Up @@ -140,6 +143,10 @@ public void initialize(String name, Map<String, String> properties) {
Thread.currentThread().interrupt();
throw new UncheckedInterruptedException(e, "Interrupted in call to initialize");
}
this.closeableGroup = new CloseableGroup();
closeableGroup.addCloseable(metricsReporter());
closeableGroup.addCloseable(connections);
closeableGroup.setSuppressCloseFailure(true);
}

private void initializeCatalogTables() throws InterruptedException, SQLException {
Expand Down Expand Up @@ -482,7 +489,13 @@ public boolean removeProperties(Namespace namespace, Set<String> properties)

@Override
public void close() {
connections.close();
if (closeableGroup != null) {
try {
closeableGroup.close();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import com.emc.object.s3.bean.S3Object;
import com.emc.object.s3.request.ListObjectsRequest;
import com.emc.object.s3.request.PutObjectRequest;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.io.UncheckedIOException;
Expand Down Expand Up @@ -64,7 +63,7 @@
import org.slf4j.LoggerFactory;

public class EcsCatalog extends BaseMetastoreCatalog
implements Closeable, SupportsNamespaces, Configurable<Object> {
implements SupportsNamespaces, Configurable<Object> {

/** Suffix of table metadata object */
private static final String TABLE_OBJECT_SUFFIX = ".table";
Expand Down Expand Up @@ -111,6 +110,7 @@ public void initialize(String name, Map<String, String> properties) {
this.closeableGroup = new CloseableGroup();
closeableGroup.addCloseable(client::destroy);
closeableGroup.addCloseable(fileIO);
closeableGroup.addCloseable(metricsReporter());
closeableGroup.setSuppressCloseFailure(true);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@

/** Nessie implementation of Iceberg Catalog. */
public class NessieCatalog extends BaseMetastoreViewCatalog
implements AutoCloseable, SupportsNamespaces, Configurable<Object> {
implements SupportsNamespaces, Configurable<Object> {

private static final Logger LOG = LoggerFactory.getLogger(NessieCatalog.class);
private static final Joiner SLASH = Joiner.on("/");
Expand Down Expand Up @@ -176,6 +176,7 @@ public void initialize(
this.closeableGroup = new CloseableGroup();
closeableGroup.addCloseable(client);
closeableGroup.addCloseable(fileIO);
closeableGroup.addCloseable(metricsReporter());
closeableGroup.setSuppressCloseFailure(true);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
*/
package org.apache.iceberg.snowflake;

import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import java.util.Map;
Expand All @@ -45,7 +44,7 @@
import org.slf4j.LoggerFactory;

public class SnowflakeCatalog extends BaseMetastoreCatalog
implements Closeable, SupportsNamespaces, Configurable<Object> {
implements SupportsNamespaces, Configurable<Object> {
private static final String DEFAULT_CATALOG_NAME = "snowflake_catalog";
private static final String DEFAULT_FILE_IO_IMPL = "org.apache.iceberg.io.ResolvingFileIO";
// Specifies the name of a Snowflake's partner application to connect through JDBC.
Expand Down Expand Up @@ -157,6 +156,7 @@ void initialize(
this.catalogProperties = properties;
this.closeableGroup = new CloseableGroup();
closeableGroup.addCloseable(snowflakeClient);
closeableGroup.addCloseable(metricsReporter());
closeableGroup.setSuppressCloseFailure(true);
}

Expand Down

0 comments on commit ff441bf

Please sign in to comment.