From aa3fda005c90ff0cb5d30d6478c1dd4ed3b8c7f8 Mon Sep 17 00:00:00 2001 From: Zakelly Date: Mon, 11 Nov 2024 15:50:16 +0800 Subject: [PATCH] [FLINK-36598][state/ForSt] Provide FileSystem instance in initialization --- .../state/forst/ForStKeyedStateBackend.java | 3 +- .../forst/ForStKeyedStateBackendBuilder.java | 13 ++--- .../state/forst/ForStResourceContainer.java | 46 ++++++++-------- .../flink/state/forst/ForStStateBackend.java | 30 ++++++----- .../state/forst/fs/ForStFlinkFileSystem.java | 54 ++++--------------- .../state/forst/restore/ForStHandle.java | 6 +-- .../ForStHeapTimersFullRestoreOperation.java | 4 +- .../ForStIncrementalRestoreOperation.java | 12 ++--- .../restore/ForStNoneRestoreOperation.java | 4 +- .../sync/ForStSyncKeyedStateBackend.java | 9 ++-- .../ForStSyncKeyedStateBackendBuilder.java | 22 ++++---- .../flink/state/forst/ForStExtension.java | 9 ++-- .../forst/ForStResourceContainerTest.java | 12 ++--- .../forst/ForStStateBackendConfigTest.java | 25 +++++---- .../forst/fs/ForStFlinkFileSystemTest.java | 11 ++-- 15 files changed, 112 insertions(+), 148 deletions(-) diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackend.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackend.java index 61c629563136a1..974661f05b3789 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackend.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackend.java @@ -71,7 +71,6 @@ import javax.annotation.Nonnull; import javax.annotation.concurrent.GuardedBy; -import java.io.File; import java.io.IOException; import java.util.HashSet; import java.util.LinkedHashMap; @@ -522,7 +521,7 @@ public void dispose() { } @VisibleForTesting - File getLocalBasePath() { + Path getLocalBasePath() { return optionsContainer.getLocalBasePath(); } diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackendBuilder.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackendBuilder.java index 7568dd637f82bc..1f0d5c5da72742 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackendBuilder.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStKeyedStateBackendBuilder.java @@ -20,6 +20,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.core.fs.Path; import org.apache.flink.core.memory.DataInputDeserializer; import org.apache.flink.core.memory.DataOutputSerializer; import org.apache.flink.metrics.MetricGroup; @@ -62,7 +63,6 @@ import javax.annotation.Nonnegative; import javax.annotation.Nonnull; -import java.io.File; import java.io.IOException; import java.util.Collection; import java.util.Collections; @@ -301,10 +301,10 @@ private ForStRestoreOperation getForStRestoreOperation( // working dir. We will implement this in ForStDB later, but before that, we achieved this // by setting the dbPath to "/" when the dfs directory existed. // TODO: use localForStPath as dbPath after ForSt Support mixing local-dir and remote-dir - File instanceForStPath = + Path instanceForStPath = optionsContainer.getRemoteForStPath() == null ? optionsContainer.getLocalForStPath() - : new File("/"); + : new Path("/"); if (CollectionUtil.isEmptyOrAllElementsNull(restoreStateHandles)) { return new ForStNoneRestoreOperation( @@ -377,12 +377,7 @@ private ForStRestoreOperation getForStRestoreOperation( ForStSnapshotStrategyBase snapshotStrategy; - ForStFlinkFileSystem forStFs = - optionsContainer.getRemoteForStPath() != null - ? (ForStFlinkFileSystem) - ForStFlinkFileSystem.get( - optionsContainer.getRemoteForStPath().toUri()) - : null; + ForStFlinkFileSystem forStFs = optionsContainer.getFileSystem(); ForStStateDataTransfer stateTransfer = new ForStStateDataTransfer(ForStStateDataTransfer.DEFAULT_THREAD_NUM, forStFs); diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStResourceContainer.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStResourceContainer.java index 36f022458c4172..b807e4d6c4b9b8 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStResourceContainer.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStResourceContainer.java @@ -80,9 +80,9 @@ public final class ForStResourceContainer implements AutoCloseable { @Nullable private final Path remoteForStPath; - @Nullable private final File localBasePath; + @Nullable private final Path localBasePath; - @Nullable private final File localForStPath; + @Nullable private final Path localForStPath; @Nullable private Path cacheBasePath; @@ -97,7 +97,7 @@ public final class ForStResourceContainer implements AutoCloseable { @Nullable private final ForStOptionsFactory optionsFactory; /** The ForSt file system. Null when remote dir is not set. */ - @Nullable private FileSystem forstFileSystem; + @Nullable private ForStFlinkFileSystem forstFileSystem; /** * The shared resource among ForSt instances. This resource is not part of the 'handlesToClose', @@ -134,7 +134,7 @@ public ForStResourceContainer( ReadableConfig configuration, @Nullable ForStOptionsFactory optionsFactory, @Nullable OpaqueMemoryResource sharedResources, - @Nullable File localBasePath, + @Nullable Path localBasePath, @Nullable Path remoteBasePath, boolean enableStatistics) { @@ -143,7 +143,7 @@ public ForStResourceContainer( this.sharedResources = sharedResources; this.localBasePath = localBasePath; - this.localForStPath = localBasePath != null ? new File(localBasePath, DB_DIR_STRING) : null; + this.localForStPath = localBasePath != null ? new Path(localBasePath, DB_DIR_STRING) : null; this.remoteBasePath = remoteBasePath; this.remoteForStPath = remoteBasePath != null ? new Path(remoteBasePath, DB_DIR_STRING) : null; @@ -266,12 +266,12 @@ public ReadOptions getReadOptions() { } @Nullable - public File getLocalBasePath() { + public Path getLocalBasePath() { return localBasePath; } @Nullable - public File getLocalForStPath() { + public Path getLocalForStPath() { return localForStPath; } @@ -289,7 +289,7 @@ public Path getBasePath() { if (remoteBasePath != null) { return remoteBasePath; } else { - return Path.fromLocalFile(localBasePath); + return localBasePath; } } @@ -297,7 +297,7 @@ public Path getDbPath() { if (remoteForStPath != null) { return remoteForStPath; } else { - return Path.fromLocalFile(localForStPath); + return localForStPath; } } @@ -331,25 +331,27 @@ public void prepareDirectories() throws Exception { new Path(localBasePath.getPath()), new Path(localForStPath.getPath())); } if (remoteForStPath != null && localForStPath != null) { - ForStFlinkFileSystem.setupLocalBasePath( - remoteForStPath.toString(), localForStPath.toString()); - } - if (cacheReservedSize > 0 || cacheCapacity > 0) { if (cacheBasePath == null && localBasePath != null) { cacheBasePath = new Path(localBasePath.getPath(), "cache"); LOG.info( "Cache base path is not configured, set to local base path: {}", cacheBasePath); } - ForStFlinkFileSystem.configureCache(cacheBasePath, cacheCapacity, cacheReservedSize); - } - if (remoteForStPath != null) { - forstFileSystem = ForStFlinkFileSystem.get(remoteForStPath.toUri()); + forstFileSystem = + ForStFlinkFileSystem.get( + remoteForStPath.toUri(), + localForStPath, + ForStFlinkFileSystem.getFileBasedCache( + cacheBasePath, cacheCapacity, cacheReservedSize)); } else { forstFileSystem = null; } } + public ForStFlinkFileSystem getFileSystem() { + return forstFileSystem; + } + private static void prepareDirectories(Path basePath, Path dbPath) throws IOException { FileSystem fileSystem = basePath.getFileSystem(); if (fileSystem.exists(basePath)) { @@ -377,10 +379,9 @@ private static void prepareDirectories(Path basePath, Path dbPath) throws IOExce public void clearDirectories() throws Exception { if (remoteBasePath != null) { clearDirectories(remoteBasePath); - ForStFlinkFileSystem.unregisterLocalBasePath(remoteForStPath.toString()); } if (localBasePath != null) { - clearDirectories(new Path(localBasePath.getPath())); + clearDirectories(localBasePath); } } @@ -464,7 +465,7 @@ private DBOptions setDBOptionsFromConfigurableOptions(DBOptions currentOptions) String logDir = internalGetOption(ForStConfigurableOptions.LOG_DIR); if (logDir == null || logDir.isEmpty()) { if (localForStPath == null - || localForStPath.getAbsolutePath().length() <= INSTANCE_PATH_LENGTH_LIMIT) { + || localForStPath.getPath().length() <= INSTANCE_PATH_LENGTH_LIMIT) { relocateDefaultDbLogDir(currentOptions); } else { // disable log relocate when instance path length exceeds limit to prevent ForSt @@ -583,7 +584,7 @@ private void setLocalForStPathAsLogDir(DBOptions dbOptions) { // issues, so the db log dir is temporarily set explicitly here. // TODO: remove this method after ForSt deal log dir well if (localForStPath != null) { - this.relocatedDbLogBaseDir = localForStPath.toPath(); + this.relocatedDbLogBaseDir = java.nio.file.Path.of(localForStPath.toUri().toString()); dbOptions.setDbLogDir(localForStPath.getPath()); } } @@ -604,8 +605,7 @@ private void cleanRelocatedDbLogs() { if (localForStPath != null && relocatedDbLogBaseDir != null) { LOG.info("Cleaning up relocated ForSt logs: {}.", relocatedDbLogBaseDir); - String relocatedDbLogPrefix = - resolveRelocatedDbLogPrefix(localForStPath.getAbsolutePath()); + String relocatedDbLogPrefix = resolveRelocatedDbLogPrefix(localForStPath.getPath()); try { Arrays.stream(FileUtils.listDirectory(relocatedDbLogBaseDir)) .filter( diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateBackend.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateBackend.java index 0826b29ca21a2a..73a130128c305e 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateBackend.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateBackend.java @@ -329,8 +329,10 @@ public ForStKeyedStateBackend createAsyncKeyedStateBackend( "op_%s_attempt_%s", fileCompatibleIdentifier, env.getTaskInfo().getAttemptNumber()); - File localBasePath = - new File(new File(getNextStoragePath(), jobId.toHexString()), opChildPath); + Path localBasePath = + new Path( + new File(new File(getNextStoragePath(), jobId.toHexString()), opChildPath) + .getAbsolutePath()); Path remoteBasePath = remoteForStDirectory != null ? new Path(new Path(remoteForStDirectory, jobId.toHexString()), opChildPath) @@ -391,15 +393,17 @@ public AbstractKeyedStateBackend createKeyedStateBackend( lazyInitializeForJob(env, fileCompatibleIdentifier); - File instanceBasePath = - new File( - getNextStoragePath(), - "job_" - + jobId - + "_op_" - + fileCompatibleIdentifier - + "_uuid_" - + UUID.randomUUID()); + Path instanceBasePath = + new Path( + new File( + getNextStoragePath(), + "job_" + + jobId + + "_op_" + + fileCompatibleIdentifier + + "_uuid_" + + UUID.randomUUID()) + .getAbsolutePath()); LocalRecoveryConfig localRecoveryConfig = env.getTaskStateManager().createLocalRecoveryConfig(); @@ -671,14 +675,14 @@ private ReadableConfig mergeConfigurableOptions(ReadableConfig base, ReadableCon } @VisibleForTesting - ForStResourceContainer createOptionsAndResourceContainer(@Nullable File localBasePath) { + ForStResourceContainer createOptionsAndResourceContainer(@Nullable Path localBasePath) { return createOptionsAndResourceContainer(null, localBasePath, null, false); } @VisibleForTesting private ForStResourceContainer createOptionsAndResourceContainer( @Nullable OpaqueMemoryResource sharedResources, - @Nullable File localBasePath, + @Nullable Path localBasePath, @Nullable Path remoteBasePath, boolean enableStatistics) { diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystem.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystem.java index 4bf283e458d670..1cdafdcf4b1e01 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystem.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystem.java @@ -40,8 +40,6 @@ import java.io.File; import java.io.IOException; import java.net.URI; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; import java.util.function.Function; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -60,11 +58,7 @@ public class ForStFlinkFileSystem extends FileSystem { private static final long SST_FILE_SIZE = 1024 * 1024 * 64; - private static final Map remoteLocalMapping = new ConcurrentHashMap<>(); private static final Function miscFileFilter = s -> !s.endsWith(".sst"); - private static Path cacheBase = null; - private static long cacheCapacity = Long.MAX_VALUE; - private static long cacheReservedSize = 0; private final FileSystem localFS; private final FileSystem delegateFS; @@ -86,19 +80,6 @@ public ForStFlinkFileSystem( this.fileBasedCache = fileBasedCache; } - /** - * Configure cache for ForStFlinkFileSystem. - * - * @param path the cache base path. - * @param cacheCap the cache capacity. - * @param reserveSize the cache reserved size. - */ - public static void configureCache(Path path, long cacheCap, long reserveSize) { - cacheBase = path; - cacheCapacity = cacheCap; - cacheReservedSize = reserveSize; - } - /** * Returns a reference to the {@link FileSystem} instance for accessing the file system * identified by the given {@link URI}. @@ -108,14 +89,20 @@ public static void configureCache(Path path, long cacheCap, long reserveSize) { * identified by the given {@link URI}. * @throws IOException thrown if a reference to the file system instance could not be obtained. */ - public static FileSystem get(URI uri) throws IOException { - String localBase = remoteLocalMapping.get(uri.toString()); - Preconditions.checkNotNull(localBase, "localBase is null, remote uri:" + uri); + public static ForStFlinkFileSystem get(URI uri) throws IOException { return new ForStFlinkFileSystem( - FileSystem.get(uri), uri.toString(), localBase, getFileBasedCache()); + FileSystem.get(uri), uri.toString(), System.getProperty("java.io.tmpdir"), null); } - private static FileBasedCache getFileBasedCache() throws IOException { + public static ForStFlinkFileSystem get(URI uri, Path localBase, FileBasedCache fileBasedCache) + throws IOException { + Preconditions.checkNotNull(localBase, "localBase is null, remote uri: %s.", uri); + return new ForStFlinkFileSystem( + FileSystem.get(uri), uri.toString(), localBase.toString(), fileBasedCache); + } + + public static FileBasedCache getFileBasedCache( + Path cacheBase, long cacheCapacity, long cacheReservedSize) throws IOException { if (cacheBase == null || cacheCapacity <= 0 && cacheReservedSize <= 0) { return null; } @@ -139,25 +126,6 @@ private static FileBasedCache getFileBasedCache() throws IOException { Integer.MAX_VALUE, cacheLimitPolicy, cacheBase.getFileSystem(), cacheBase); } - /** - * Setup local base path for corresponding remote base path. - * - * @param remoteBasePath the remote base path. - * @param localBasePath the local base path. - */ - public static void setupLocalBasePath(String remoteBasePath, String localBasePath) { - remoteLocalMapping.put(remoteBasePath, localBasePath); - } - - /** - * Unregister local base path for corresponding remote base path. - * - * @param remoteBasePath the remote base path. - */ - public static void unregisterLocalBasePath(String remoteBasePath) { - remoteLocalMapping.remove(remoteBasePath); - } - /** * Create ByteBufferWritableFSDataOutputStream from specific path which supports to write data * to ByteBuffer with {@link org.apache.flink.core.fs.FileSystem.WriteMode#OVERWRITE} mode. diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/restore/ForStHandle.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/restore/ForStHandle.java index 045324e2888ab5..ba556c7a5a9556 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/restore/ForStHandle.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/restore/ForStHandle.java @@ -18,6 +18,7 @@ package org.apache.flink.state.forst.restore; +import org.apache.flink.core.fs.Path; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.state.RegisteredStateMetaInfoBase; import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot; @@ -36,7 +37,6 @@ import javax.annotation.Nonnull; import javax.annotation.Nullable; -import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; @@ -62,13 +62,13 @@ class ForStHandle implements AutoCloseable { protected ForStHandle( Map kvStateInformation, - File instanceRocksDBPath, + Path instanceRocksDBPath, DBOptions dbOptions, Function columnFamilyOptionsFactory, ForStNativeMetricOptions nativeMetricOptions, MetricGroup metricGroup) { this.kvStateInformation = kvStateInformation; - this.dbPath = instanceRocksDBPath.getAbsolutePath(); + this.dbPath = instanceRocksDBPath.getPath(); this.dbOptions = dbOptions; this.columnFamilyOptionsFactory = columnFamilyOptionsFactory; this.nativeMetricOptions = nativeMetricOptions; diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/restore/ForStHeapTimersFullRestoreOperation.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/restore/ForStHeapTimersFullRestoreOperation.java index aa673a69658f3a..c0ffb7bfc5d2ce 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/restore/ForStHeapTimersFullRestoreOperation.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/restore/ForStHeapTimersFullRestoreOperation.java @@ -19,6 +19,7 @@ package org.apache.flink.state.forst.restore; import org.apache.flink.core.fs.ICloseableRegistry; +import org.apache.flink.core.fs.Path; import org.apache.flink.core.memory.DataInputDeserializer; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.state.CompositeKeySerializationUtils; @@ -53,7 +54,6 @@ import javax.annotation.Nonnull; import java.io.Closeable; -import java.io.File; import java.io.IOException; import java.util.Collection; import java.util.HashMap; @@ -85,7 +85,7 @@ public ForStHeapTimersFullRestoreOperation( LinkedHashMap> registeredPQStates, HeapPriorityQueueSetFactory priorityQueueFactory, StateSerializerProvider keySerializerProvider, - File instanceRocksDBPath, + Path instanceRocksDBPath, DBOptions dbOptions, Function columnFamilyOptionsFactory, ForStNativeMetricOptions nativeMetricOptions, diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/restore/ForStIncrementalRestoreOperation.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/restore/ForStIncrementalRestoreOperation.java index 616978e04f2071..7c835171ada4b1 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/restore/ForStIncrementalRestoreOperation.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/restore/ForStIncrementalRestoreOperation.java @@ -42,7 +42,6 @@ import org.apache.flink.state.forst.ForStResourceContainer; import org.apache.flink.state.forst.ForStStateDataTransfer; import org.apache.flink.state.forst.StateHandleTransferSpec; -import org.apache.flink.state.forst.fs.ForStFlinkFileSystem; import org.apache.flink.util.StateMigrationException; import org.apache.flink.util.clock.SystemClock; import org.apache.flink.util.function.RunnableWithException; @@ -55,7 +54,6 @@ import javax.annotation.Nonnull; -import java.io.File; import java.io.IOException; import java.io.InputStream; import java.util.ArrayList; @@ -105,7 +103,7 @@ public ForStIncrementalRestoreOperation( StateSerializerProvider keySerializerProvider, ForStResourceContainer optionsContainer, Path forstBasePath, - File instanceRocksDBPath, + Path instanceRocksDBPath, DBOptions dbOptions, Function columnFamilyOptionsFactory, ForStNativeMetricOptions nativeMetricOptions, @@ -221,10 +219,10 @@ private void transferAllStateHandles( // TODO: Now not support rescale, so now ignore otherSpecs. Before implement transfer // otherSpecs, we may need reconsider the implementation of ForStFlinkFileSystem. - FileSystem forStFs = - optionsContainer.getRemoteForStPath() != null - ? ForStFlinkFileSystem.get(optionsContainer.getRemoteForStPath().toUri()) - : FileSystem.getLocalFileSystem(); + FileSystem forStFs = optionsContainer.getFileSystem(); + if (forStFs == null) { + forStFs = FileSystem.getLocalFileSystem(); + } try (ForStStateDataTransfer transfer = new ForStStateDataTransfer(ForStStateDataTransfer.DEFAULT_THREAD_NUM, forStFs)) { diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/restore/ForStNoneRestoreOperation.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/restore/ForStNoneRestoreOperation.java index 49de3b7c99e6ee..161af424ce9e1f 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/restore/ForStNoneRestoreOperation.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/restore/ForStNoneRestoreOperation.java @@ -18,6 +18,7 @@ package org.apache.flink.state.forst.restore; +import org.apache.flink.core.fs.Path; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.state.forst.ForStKeyedStateBackend.ForStKvStateInfo; import org.apache.flink.state.forst.ForStNativeMetricOptions; @@ -25,7 +26,6 @@ import org.forstdb.ColumnFamilyOptions; import org.forstdb.DBOptions; -import java.io.File; import java.util.Map; import java.util.function.Function; @@ -35,7 +35,7 @@ public class ForStNoneRestoreOperation implements ForStRestoreOperation { public ForStNoneRestoreOperation( Map kvStateInformation, - File instanceRocksDBPath, + Path instanceRocksDBPath, DBOptions dbOptions, Function columnFamilyOptionsFactory, ForStNativeMetricOptions nativeMetricOptions, diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStSyncKeyedStateBackend.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStSyncKeyedStateBackend.java index eb9d6c0305298c..9e794adbb33fd9 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStSyncKeyedStateBackend.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStSyncKeyedStateBackend.java @@ -30,6 +30,7 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.core.fs.ICloseableRegistry; +import org.apache.flink.core.fs.Path; import org.apache.flink.core.memory.DataInputDeserializer; import org.apache.flink.core.memory.DataOutputSerializer; import org.apache.flink.runtime.checkpoint.CheckpointOptions; @@ -180,7 +181,7 @@ IS updateState( private final ForStResourceContainer optionsContainer; /** Path where this configured instance stores its data directory. */ - private final File instanceBasePath; + private final Path instanceBasePath; /** * Protects access to RocksDB in other threads, like the checkpointing thread from parallel call @@ -261,7 +262,7 @@ IS updateState( public ForStSyncKeyedStateBackend( ClassLoader userCodeClassLoader, - File instanceBasePath, + Path instanceBasePath, ForStResourceContainer optionsContainer, Function columnFamilyOptionsFactory, TaskKvStateRegistry kvStateRegistry, @@ -530,7 +531,7 @@ private void cleanInstanceBasePath() { instanceBasePath); try { - FileUtils.deleteDirectory(instanceBasePath); + FileUtils.deleteDirectory(new File(instanceBasePath.getPath())); } catch (IOException ex) { LOG.warn("Could not delete RocksDB working directory: {}", instanceBasePath, ex); } @@ -905,7 +906,7 @@ private String stateNotSupportedMessage( } /** Only visible for testing, DO NOT USE. */ - File getInstanceBasePath() { + Path getInstanceBasePath() { return instanceBasePath; } diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStSyncKeyedStateBackendBuilder.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStSyncKeyedStateBackendBuilder.java index cb38b0f315b4a8..3f9a2d103e0843 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStSyncKeyedStateBackendBuilder.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStSyncKeyedStateBackendBuilder.java @@ -22,6 +22,7 @@ import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.core.fs.Path; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.query.TaskKvStateRegistry; import org.apache.flink.runtime.state.AbstractKeyedStateBackendBuilder; @@ -105,10 +106,10 @@ public class ForStSyncKeyedStateBackendBuilder extends AbstractKeyedStateBack private final ForStResourceContainer optionsContainer; /** Path where this configured instance stores its data directory. */ - private final File instanceBasePath; + private final Path instanceBasePath; /** Path where this configured instance stores its RocksDB database. */ - private final File instanceForStDBPath; + private final Path instanceForStDBPath; private final MetricGroup metricGroup; private final StateBackend.CustomInitializationMetrics customInitializationMetrics; @@ -136,7 +137,7 @@ public class ForStSyncKeyedStateBackendBuilder extends AbstractKeyedStateBack public ForStSyncKeyedStateBackendBuilder( String operatorIdentifier, ClassLoader userCodeClassLoader, - File instanceBasePath, + Path instanceBasePath, ForStResourceContainer optionsContainer, Function columnFamilyOptionsFactory, TaskKvStateRegistry kvStateRegistry, @@ -186,7 +187,7 @@ public ForStSyncKeyedStateBackendBuilder( ForStSyncKeyedStateBackendBuilder( String operatorIdentifier, ClassLoader userCodeClassLoader, - File instanceBasePath, + Path instanceBasePath, ForStResourceContainer optionsContainer, Function columnFamilyOptionsFactory, TaskKvStateRegistry kvStateRegistry, @@ -240,8 +241,8 @@ public ForStSyncKeyedStateBackendBuilder setWriteBatchSize(long writeBatchSiz return this; } - public static File getInstanceRocksDBPath(File instanceBasePath) { - return new File(instanceBasePath, DB_INSTANCE_DIR_STRING); + public static Path getInstanceRocksDBPath(Path instanceBasePath) { + return new Path(instanceBasePath, DB_INSTANCE_DIR_STRING); } private static void checkAndCreateDirectory(File directory) throws IOException { @@ -353,7 +354,7 @@ public ForStSyncKeyedStateBackend build() throws BackendBuildingException { kvStateInformation.clear(); try { - FileUtils.deleteDirectory(instanceBasePath); + FileUtils.deleteDirectory(new File(instanceBasePath.getPath())); } catch (Exception ex) { logger.warn("Failed to delete base path for RocksDB: " + instanceBasePath, ex); } @@ -456,11 +457,12 @@ private HeapPriorityQueueSetFactory createHeapQueueFactory() { } private void prepareDirectories() throws IOException { - checkAndCreateDirectory(instanceBasePath); - if (instanceForStDBPath.exists()) { + File baseFile = new File(instanceBasePath.getPath()); + checkAndCreateDirectory(baseFile); + if (new File(instanceForStDBPath.getPath()).exists()) { // Clear the base directory when the backend is created // in case something crashed and the backend never reached dispose() - FileUtils.deleteDirectory(instanceBasePath); + FileUtils.deleteDirectory(baseFile); } } } diff --git a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStExtension.java b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStExtension.java index db384692be3e1c..1268a6e7a0f292 100644 --- a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStExtension.java +++ b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStExtension.java @@ -19,6 +19,7 @@ package org.apache.flink.state.forst; import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.Path; import org.apache.flink.testutils.junit.utils.TempDirUtils; import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.util.IOUtils; @@ -168,7 +169,7 @@ public void before() throws Exception { new Configuration(), optionsFactory, null, - localWorkingDir, + new Path(localWorkingDir.getAbsolutePath()), null, enableStatistics); resourceContainer.prepareDirectories(); @@ -186,16 +187,16 @@ public void before() throws Exception { // working dir. We will implement this in ForStDB later, but before that, we achieved this // by setting the dbPath to "/" when the dfs directory existed. // TODO: use localForStPath as dbPath after ForSt Support mixing local-dir and remote-dir - File instanceForStPath = + Path instanceForStPath = resourceContainer.getRemoteForStPath() == null ? resourceContainer.getLocalForStPath() - : new File("/"); + : new Path("/"); this.columnFamilyHandles = new ArrayList<>(1); this.db = RocksDB.open( dbOptions, - instanceForStPath.getAbsolutePath(), + instanceForStPath.getPath(), Collections.singletonList( new ColumnFamilyDescriptor( "default".getBytes(), columnFamilyOptions)), diff --git a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStResourceContainerTest.java b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStResourceContainerTest.java index dff951847b031a..5f7d0bffc5313c 100644 --- a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStResourceContainerTest.java +++ b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStResourceContainerTest.java @@ -306,33 +306,33 @@ public ColumnFamilyOptions createColumnOptions( @Test public void testDirectoryResources() throws Exception { - File localBasePath = TMP_FOLDER.newFolder(); + Path localBasePath = new Path(TMP_FOLDER.newFolder().getPath()); Path remoteBasePath = new Path(TMP_FOLDER.newFolder().getPath()); try (final ForStResourceContainer optionsContainer = new ForStResourceContainer( new Configuration(), null, null, localBasePath, remoteBasePath, false)) { optionsContainer.prepareDirectories(); - assertTrue(localBasePath.exists()); + assertTrue(new File(localBasePath.getPath()).exists()); assertTrue(new File(remoteBasePath.getPath()).exists()); assertTrue(optionsContainer.getDbOptions().getEnv() instanceof FlinkEnv); optionsContainer.clearDirectories(); - assertFalse(localBasePath.exists()); + assertFalse(new File(localBasePath.getPath()).exists()); assertFalse(new File(remoteBasePath.getPath()).exists()); } } @Test public void testFileSystemInit() throws Exception { - File localBasePath = TMP_FOLDER.newFolder(); + Path localBasePath = new Path(TMP_FOLDER.newFolder().getPath()); Path remoteBasePath = new Path(TMP_FOLDER.newFolder().getPath()); ArrayList columnFamilyHandles = new ArrayList<>(1); ArrayList columnFamilyDescriptors = new ArrayList<>(1); columnFamilyDescriptors.add(new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY)); DBOptions dbOptions2 = new DBOptions().setCreateIfMissing(true).setAvoidFlushDuringShutdown(true); - ForStFlinkFileSystem.setupLocalBasePath(remoteBasePath.toString(), localBasePath.getPath()); - FileSystem forstFileSystem = ForStFlinkFileSystem.get(remoteBasePath.toUri()); + FileSystem forstFileSystem = + ForStFlinkFileSystem.get(remoteBasePath.toUri(), localBasePath, null); dbOptions2.setEnv(new FlinkEnv(remoteBasePath.toString(), forstFileSystem)); RocksDB db = RocksDB.open( diff --git a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateBackendConfigTest.java b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateBackendConfigTest.java index e80f0b9f850ed4..b7b357f2ba7591 100644 --- a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateBackendConfigTest.java +++ b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateBackendConfigTest.java @@ -109,7 +109,7 @@ public void testDefaultDbLogDir() throws Exception { } try (ForStResourceContainer container = backend.createOptionsAndResourceContainer( - new File(longInstanceBasePath.toString()))) { + new Path(longInstanceBasePath.toString()))) { assertTrue(container.getDbOptions().dbLogDir().isEmpty()); } finally { logFile.delete(); @@ -145,10 +145,9 @@ public void testSetDbPath() throws Exception { createKeyedStateBackend(forStStateBackend, env, IntSerializer.INSTANCE); try { - File instanceBasePath = keyedBackend.getLocalBasePath(); + Path instanceBasePath = keyedBackend.getLocalBasePath(); assertThat( - instanceBasePath.getAbsolutePath(), - anyOf(startsWith(testDir1), startsWith(testDir2))); + instanceBasePath.getPath(), anyOf(startsWith(testDir1), startsWith(testDir2))); //noinspection NullArgumentToVariableArgMethod forStStateBackend.setLocalDbStoragePaths(null); @@ -176,7 +175,8 @@ public void testConfigureForStCompressionPerLevel() throws Exception { forStStateBackend.configure(conf, Thread.currentThread().getContextClassLoader()); ForStResourceContainer resourceContainer = - forStStateBackend.createOptionsAndResourceContainer(tempFolder.newFile()); + forStStateBackend.createOptionsAndResourceContainer( + new Path(tempFolder.newFile().getAbsolutePath())); ColumnFamilyOptions columnFamilyOptions = resourceContainer.getColumnOptions(); assertArrayEquals(compressionTypes, columnFamilyOptions.compressionPerLevel().toArray()); @@ -233,9 +233,8 @@ private void testLocalDbPaths(String configuredPath, File expectedPath) throws E createKeyedStateBackend(forStBackend, env, IntSerializer.INSTANCE); try { - File instanceBasePath = keyedBackend.getLocalBasePath(); - assertThat( - instanceBasePath.getAbsolutePath(), startsWith(expectedPath.getAbsolutePath())); + Path instanceBasePath = keyedBackend.getLocalBasePath(); + assertThat(instanceBasePath.getPath(), startsWith(expectedPath.getAbsolutePath())); //noinspection NullArgumentToVariableArgMethod forStBackend.setLocalDbStoragePaths(null); @@ -289,11 +288,11 @@ public void testCleanRelocatedDbLogs() throws Exception { ForStKeyedStateBackend keyedBackend = createKeyedStateBackend(forStBackend, env, IntSerializer.INSTANCE); - File localBasePath = keyedBackend.getLocalBasePath(); - File localForStPath = new File(localBasePath, "db"); + Path localBasePath = keyedBackend.getLocalBasePath(); + Path localForStPath = new Path(localBasePath, "db"); // avoid tests without relocate. - Assume.assumeTrue(localForStPath.getAbsolutePath().length() <= 255 - "_LOG".length()); + Assume.assumeTrue(localForStPath.getPath().length() <= 255 - "_LOG".length()); java.nio.file.Path[] relocatedDbLogs; try { @@ -354,8 +353,8 @@ public void testUseTempDirectories() throws Exception { cancelStreamRegistry)); try { - File instanceBasePath = keyedBackend.getLocalBasePath(); - assertThat(instanceBasePath.getAbsolutePath(), startsWith(dir1.getAbsolutePath())); + Path instanceBasePath = keyedBackend.getLocalBasePath(); + assertThat(instanceBasePath.getPath(), startsWith(dir1.getAbsolutePath())); } finally { keyedBackend.dispose(); keyedBackend.close(); diff --git a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystemTest.java b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystemTest.java index fbe933096963cf..678fd4db491a23 100644 --- a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystemTest.java +++ b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystemTest.java @@ -76,16 +76,16 @@ public static List modes() { @Test void testReadAndWriteWithByteBuffer() throws Exception { - ForStFlinkFileSystem.setupLocalBasePath(tempDir.toString(), tempDir.toString()); ForStFlinkFileSystem fileSystem = - (ForStFlinkFileSystem) ForStFlinkFileSystem.get(new URI(tempDir.toString())); - fileSystem.setupLocalBasePath(tempDir.toString(), tempDir.toString()); + ForStFlinkFileSystem.get( + new URI(tempDir.toString()), + new org.apache.flink.core.fs.Path(tempDir.toString()), + null); testReadAndWriteWithByteBuffer(fileSystem); } @TestTemplate void testPositionedRead() throws Exception { - ForStFlinkFileSystem.setupLocalBasePath(tempDir.toString(), tempDir.toString()); ForStFlinkFileSystem fileSystem = new ForStFlinkFileSystem( new ByteBufferReadableLocalFileSystem(), @@ -165,7 +165,6 @@ private void testReadAndWriteWithByteBuffer(ForStFlinkFileSystem fileSystem) thr @TestTemplate void testReadExceedingFileSize() throws Exception { - ForStFlinkFileSystem.setupLocalBasePath(tempDir.toString(), tempDir.toString()); ForStFlinkFileSystem fileSystem = new ForStFlinkFileSystem( new ByteBufferReadableLocalFileSystem(), @@ -192,7 +191,6 @@ void testMiscFileInLocal() throws IOException { new org.apache.flink.core.fs.Path(tempDir.toString() + "/remote"); org.apache.flink.core.fs.Path localPath = new org.apache.flink.core.fs.Path(tempDir.toString() + "/local"); - ForStFlinkFileSystem.setupLocalBasePath(remotePath.toString(), localPath.toString()); ForStFlinkFileSystem fileSystem = new ForStFlinkFileSystem( new ByteBufferReadableLocalFileSystem(), @@ -223,7 +221,6 @@ void testSstFileInCache() throws IOException { new org.apache.flink.core.fs.Path(tempDir.toString() + "/local"); org.apache.flink.core.fs.Path cachePath = new org.apache.flink.core.fs.Path(tempDir.toString() + "/tmp-cache"); - ForStFlinkFileSystem.setupLocalBasePath(remotePath.toString(), localPath.toString()); BundledCacheLimitPolicy cacheLimitPolicy = new BundledCacheLimitPolicy( new SpaceBasedCacheLimitPolicy(new File(cachePath.toString()), 0, 0),