From cf02ffac4329141b30bca265cafb9987f64f6cc4 Mon Sep 17 00:00:00 2001 From: Eduard Tudenhoefner Date: Wed, 14 Aug 2024 01:27:32 +0200 Subject: [PATCH] AWS, Core, Hive: Extract FileIO closing into separate FileIOTracker class (#10893) --- .../apache/iceberg/aws/glue/GlueCatalog.java | 31 ++------ .../org/apache/iceberg/io/FileIOTracker.java | 65 +++++++++++++++++ .../iceberg/rest/RESTSessionCatalog.java | 27 ++----- .../java/org/apache/iceberg/TestTables.java | 21 +++++- .../apache/iceberg/io/TestFileIOTracker.java | 72 +++++++++++++++++++ .../org/apache/iceberg/hive/HiveCatalog.java | 28 ++------ 6 files changed, 173 insertions(+), 71 deletions(-) create mode 100644 core/src/main/java/org/apache/iceberg/io/FileIOTracker.java create mode 100644 core/src/test/java/org/apache/iceberg/io/TestFileIOTracker.java diff --git a/aws/src/main/java/org/apache/iceberg/aws/glue/GlueCatalog.java b/aws/src/main/java/org/apache/iceberg/aws/glue/GlueCatalog.java index c6b157bb5c79..47807a2b9f37 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/glue/GlueCatalog.java +++ b/aws/src/main/java/org/apache/iceberg/aws/glue/GlueCatalog.java @@ -18,9 +18,6 @@ */ package org.apache.iceberg.aws.glue; -import com.github.benmanes.caffeine.cache.Cache; -import com.github.benmanes.caffeine.cache.Caffeine; -import com.github.benmanes.caffeine.cache.RemovalListener; import java.io.IOException; import java.util.List; import java.util.Map; @@ -51,7 +48,7 @@ import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.hadoop.Configurable; import org.apache.iceberg.io.CloseableGroup; -import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.FileIOTracker; import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.base.Strings; @@ -97,7 +94,7 @@ public class GlueCatalog extends BaseMetastoreCatalog private LockManager lockManager; private CloseableGroup closeableGroup; private Map catalogProperties; - private Cache fileIOCloser; + private FileIOTracker fileIOTracker; // Attempt to set versionId if available on the path private static final DynMethods.UnboundMethod SET_VERSION_ID = @@ -194,11 +191,12 @@ void initialize( this.lockManager = lock; this.closeableGroup = new CloseableGroup(); + this.fileIOTracker = new FileIOTracker(); closeableGroup.addCloseable(glue); closeableGroup.addCloseable(lockManager); closeableGroup.addCloseable(metricsReporter()); + closeableGroup.addCloseable(fileIOTracker); closeableGroup.setSuppressCloseFailure(true); - this.fileIOCloser = newFileIOCloser(); } @Override @@ -243,7 +241,7 @@ protected TableOperations newTableOps(TableIdentifier tableIdentifier) { tableSpecificCatalogPropertiesBuilder.buildOrThrow(), hadoopConf, tableIdentifier); - fileIOCloser.put(glueTableOperations, glueTableOperations.io()); + fileIOTracker.track(glueTableOperations); return glueTableOperations; } @@ -256,7 +254,7 @@ protected TableOperations newTableOps(TableIdentifier tableIdentifier) { catalogProperties, hadoopConf, tableIdentifier); - fileIOCloser.put(glueTableOperations, glueTableOperations.io()); + fileIOTracker.track(glueTableOperations); return glueTableOperations; } @@ -634,10 +632,6 @@ public String name() { @Override public void close() throws IOException { closeableGroup.close(); - if (fileIOCloser != null) { - fileIOCloser.invalidateAll(); - fileIOCloser.cleanUp(); - } } @Override @@ -649,17 +643,4 @@ public void setConf(Configuration conf) { protected Map properties() { return catalogProperties == null ? ImmutableMap.of() : catalogProperties; } - - private Cache newFileIOCloser() { - return Caffeine.newBuilder() - .weakKeys() - .removalListener( - (RemovalListener) - (ops, fileIO, cause) -> { - if (null != fileIO) { - fileIO.close(); - } - }) - .build(); - } } diff --git a/core/src/main/java/org/apache/iceberg/io/FileIOTracker.java b/core/src/main/java/org/apache/iceberg/io/FileIOTracker.java new file mode 100644 index 000000000000..9d8630e79b14 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/io/FileIOTracker.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.io; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.RemovalListener; +import java.io.Closeable; +import org.apache.iceberg.TableOperations; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +/** + * Keeps track of the {@link FileIO} instance of the given {@link TableOperations} instance and + * closes the {@link FileIO} when {@link FileIOTracker#close()} gets called + */ +public class FileIOTracker implements Closeable { + private final Cache tracker; + + public FileIOTracker() { + this.tracker = + Caffeine.newBuilder() + .weakKeys() + .removalListener( + (RemovalListener) + (ops, fileIO, cause) -> { + if (null != fileIO) { + fileIO.close(); + } + }) + .build(); + } + + public void track(TableOperations ops) { + Preconditions.checkArgument(null != ops, "Invalid table ops: null"); + tracker.put(ops, ops.io()); + } + + @VisibleForTesting + Cache tracker() { + return tracker; + } + + @Override + public void close() { + tracker.invalidateAll(); + tracker.cleanUp(); + } +} diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java index 1c607e3b0220..53ce45bb0a3f 100644 --- a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java +++ b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java @@ -47,7 +47,6 @@ import org.apache.iceberg.SortOrder; import org.apache.iceberg.Table; import org.apache.iceberg.TableMetadata; -import org.apache.iceberg.TableOperations; import org.apache.iceberg.Transaction; import org.apache.iceberg.Transactions; import org.apache.iceberg.catalog.BaseViewSessionCatalog; @@ -63,6 +62,7 @@ import org.apache.iceberg.hadoop.Configurable; import org.apache.iceberg.io.CloseableGroup; import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.FileIOTracker; import org.apache.iceberg.metrics.MetricsReporter; import org.apache.iceberg.metrics.MetricsReporters; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; @@ -136,7 +136,7 @@ public class RESTSessionCatalog extends BaseViewSessionCatalog private final BiFunction, FileIO> ioBuilder; private Cache sessions = null; private Cache tableSessions = null; - private Cache fileIOCloser; + private FileIOTracker fileIOTracker = null; private AuthSession catalogAuth = null; private boolean keepTokenRefreshed = true; private RESTClient client = null; @@ -268,10 +268,11 @@ public void initialize(String name, Map unresolved) { this.io = newFileIO(SessionContext.createEmpty(), mergedProps); - this.fileIOCloser = newFileIOCloser(); + this.fileIOTracker = new FileIOTracker(); this.closeables = new CloseableGroup(); this.closeables.addCloseable(this.io); this.closeables.addCloseable(this.client); + this.closeables.addCloseable(fileIOTracker); this.closeables.setSuppressCloseFailure(true); this.snapshotMode = @@ -465,7 +466,7 @@ public Table loadTable(SessionContext context, TableIdentifier identifier) { private void trackFileIO(RESTTableOperations ops) { if (io != ops.io()) { - fileIOCloser.put(ops, ops.io()); + fileIOTracker.track(ops); } } @@ -641,11 +642,6 @@ public void close() throws IOException { if (closeables != null) { closeables.close(); } - - if (fileIOCloser != null) { - fileIOCloser.invalidateAll(); - fileIOCloser.cleanUp(); - } } private void shutdownRefreshExecutor() { @@ -1088,19 +1084,6 @@ private static Cache newSessionCache(Map pr .build(); } - private Cache newFileIOCloser() { - return Caffeine.newBuilder() - .weakKeys() - .removalListener( - (RemovalListener) - (ops, fileIO, cause) -> { - if (null != fileIO) { - fileIO.close(); - } - }) - .build(); - } - public void commitTransaction(SessionContext context, List commits) { List tableChanges = Lists.newArrayListWithCapacity(commits.size()); diff --git a/core/src/test/java/org/apache/iceberg/TestTables.java b/core/src/test/java/org/apache/iceberg/TestTables.java index de05e85c3c77..eeff5db8e5a6 100644 --- a/core/src/test/java/org/apache/iceberg/TestTables.java +++ b/core/src/test/java/org/apache/iceberg/TestTables.java @@ -216,6 +216,7 @@ public static class TestTableOperations implements TableOperations { private final String tableName; private final File metadata; + private final FileIO fileIO; private TableMetadata current = null; private long lastSnapshotId = 0; private int failCommits = 0; @@ -223,6 +224,22 @@ public static class TestTableOperations implements TableOperations { public TestTableOperations(String tableName, File location) { this.tableName = tableName; this.metadata = new File(location, "metadata"); + this.fileIO = new LocalFileIO(); + metadata.mkdirs(); + refresh(); + if (current != null) { + for (Snapshot snap : current.snapshots()) { + this.lastSnapshotId = Math.max(lastSnapshotId, snap.snapshotId()); + } + } else { + this.lastSnapshotId = 0; + } + } + + public TestTableOperations(String tableName, File location, FileIO fileIO) { + this.tableName = tableName; + this.metadata = new File(location, "metadata"); + this.fileIO = fileIO; metadata.mkdirs(); refresh(); if (current != null) { @@ -277,7 +294,7 @@ public void commit(TableMetadata base, TableMetadata updatedMetadata) { @Override public FileIO io() { - return new LocalFileIO(); + return fileIO; } @Override @@ -300,7 +317,7 @@ public long newSnapshotId() { } } - static class LocalFileIO implements FileIO { + public static class LocalFileIO implements FileIO { @Override public InputFile newInputFile(String path) { diff --git a/core/src/test/java/org/apache/iceberg/io/TestFileIOTracker.java b/core/src/test/java/org/apache/iceberg/io/TestFileIOTracker.java new file mode 100644 index 000000000000..e6225d886cee --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/io/TestFileIOTracker.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.io; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.Mockito.times; + +import java.io.File; +import java.util.concurrent.TimeUnit; +import org.apache.iceberg.TestTables; +import org.awaitility.Awaitility; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; +import org.mockito.Mockito; + +public class TestFileIOTracker { + + @TempDir private File tableDir; + + @SuppressWarnings("resource") + @Test + public void nullTableOps() { + assertThatThrownBy(() -> new FileIOTracker().track(null)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid table ops: null"); + } + + @SuppressWarnings("unchecked") + @Test + public void fileIOGetsClosed() throws NoSuchFieldException, IllegalAccessException { + FileIOTracker fileIOTracker = new FileIOTracker(); + + FileIO firstFileIO = Mockito.spy(new TestTables.LocalFileIO()); + TestTables.TestTableOperations firstOps = + new TestTables.TestTableOperations("x", tableDir, firstFileIO); + fileIOTracker.track(firstOps); + assertThat(fileIOTracker.tracker().estimatedSize()).isEqualTo(1); + + FileIO secondFileIO = Mockito.spy(new TestTables.LocalFileIO()); + TestTables.TestTableOperations secondOps = + new TestTables.TestTableOperations("y", tableDir, secondFileIO); + fileIOTracker.track(secondOps); + assertThat(fileIOTracker.tracker().estimatedSize()).isEqualTo(2); + + fileIOTracker.close(); + Awaitility.await("FileIO gets closed") + .atMost(5, TimeUnit.SECONDS) + .untilAsserted( + () -> { + assertThat(fileIOTracker.tracker().estimatedSize()).isEqualTo(0); + Mockito.verify(firstFileIO, times(1)).close(); + Mockito.verify(secondFileIO, times(1)).close(); + }); + } +} diff --git a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java index 8944cf93947b..5c58222f0c01 100644 --- a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java +++ b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalog.java @@ -18,9 +18,6 @@ */ package org.apache.iceberg.hive; -import com.github.benmanes.caffeine.cache.Cache; -import com.github.benmanes.caffeine.cache.Caffeine; -import com.github.benmanes.caffeine.cache.RemovalListener; import java.io.IOException; import java.util.List; import java.util.Map; @@ -53,6 +50,7 @@ import org.apache.iceberg.exceptions.NotFoundException; import org.apache.iceberg.hadoop.HadoopFileIO; import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.FileIOTracker; import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; @@ -83,7 +81,7 @@ public class HiveCatalog extends BaseMetastoreCatalog implements SupportsNamespa private ClientPool clients; private boolean listAllTables = false; private Map catalogProperties; - private Cache fileIOCloser; + private FileIOTracker fileIOTracker; public HiveCatalog() {} @@ -116,20 +114,7 @@ public void initialize(String inputName, Map properties) { : CatalogUtil.loadFileIO(fileIOImpl, properties, conf); this.clients = new CachedClientPool(conf, properties); - this.fileIOCloser = newFileIOCloser(); - } - - private Cache newFileIOCloser() { - return Caffeine.newBuilder() - .weakKeys() - .removalListener( - (RemovalListener) - (ops, fileIOInstance, cause) -> { - if (null != fileIOInstance) { - fileIOInstance.close(); - } - }) - .build(); + this.fileIOTracker = new FileIOTracker(); } @Override @@ -533,7 +518,7 @@ public TableOperations newTableOps(TableIdentifier tableIdentifier) { String tableName = tableIdentifier.name(); HiveTableOperations ops = new HiveTableOperations(conf, clients, fileIO, name, dbName, tableName); - fileIOCloser.put(ops, ops.io()); + fileIOTracker.track(ops); return ops; } @@ -661,9 +646,8 @@ protected Map properties() { @Override public void close() throws IOException { super.close(); - if (fileIOCloser != null) { - fileIOCloser.invalidateAll(); - fileIOCloser.cleanUp(); + if (fileIOTracker != null) { + fileIOTracker.close(); } }