Skip to content

Commit

Permalink
[FLINK-36598][state/ForSt] Provide FileSystem instance in initialization
Browse files Browse the repository at this point in the history
  • Loading branch information
Zakelly committed Nov 11, 2024
1 parent 98835f2 commit c59b44b
Show file tree
Hide file tree
Showing 15 changed files with 112 additions and 151 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@
import javax.annotation.Nonnull;
import javax.annotation.concurrent.GuardedBy;

import java.io.File;
import java.io.IOException;
import java.util.HashSet;
import java.util.LinkedHashMap;
Expand Down Expand Up @@ -522,7 +521,7 @@ public void dispose() {
}

@VisibleForTesting
File getLocalBasePath() {
Path getLocalBasePath() {
return optionsContainer.getLocalBasePath();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.memory.DataInputDeserializer;
import org.apache.flink.core.memory.DataOutputSerializer;
import org.apache.flink.metrics.MetricGroup;
Expand Down Expand Up @@ -62,7 +63,6 @@
import javax.annotation.Nonnegative;
import javax.annotation.Nonnull;

import java.io.File;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -301,10 +301,10 @@ private ForStRestoreOperation getForStRestoreOperation(
// working dir. We will implement this in ForStDB later, but before that, we achieved this
// by setting the dbPath to "/" when the dfs directory existed.
// TODO: use localForStPath as dbPath after ForSt Support mixing local-dir and remote-dir
File instanceForStPath =
Path instanceForStPath =
optionsContainer.getRemoteForStPath() == null
? optionsContainer.getLocalForStPath()
: new File("/");
: new Path("/");

if (CollectionUtil.isEmptyOrAllElementsNull(restoreStateHandles)) {
return new ForStNoneRestoreOperation(
Expand Down Expand Up @@ -377,12 +377,7 @@ private ForStRestoreOperation getForStRestoreOperation(

ForStSnapshotStrategyBase<K, ?> snapshotStrategy;

ForStFlinkFileSystem forStFs =
optionsContainer.getRemoteForStPath() != null
? (ForStFlinkFileSystem)
ForStFlinkFileSystem.get(
optionsContainer.getRemoteForStPath().toUri())
: null;
ForStFlinkFileSystem forStFs = optionsContainer.getFileSystem();
ForStStateDataTransfer stateTransfer =
new ForStStateDataTransfer(ForStStateDataTransfer.DEFAULT_THREAD_NUM, forStFs);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,9 @@ public final class ForStResourceContainer implements AutoCloseable {

@Nullable private final Path remoteForStPath;

@Nullable private final File localBasePath;
@Nullable private final Path localBasePath;

@Nullable private final File localForStPath;
@Nullable private final Path localForStPath;

@Nullable private Path cacheBasePath;

Expand All @@ -97,7 +97,7 @@ public final class ForStResourceContainer implements AutoCloseable {
@Nullable private final ForStOptionsFactory optionsFactory;

/** The ForSt file system. Null when remote dir is not set. */
@Nullable private FileSystem forstFileSystem;
@Nullable private ForStFlinkFileSystem forstFileSystem;

/**
* The shared resource among ForSt instances. This resource is not part of the 'handlesToClose',
Expand Down Expand Up @@ -134,7 +134,7 @@ public ForStResourceContainer(
ReadableConfig configuration,
@Nullable ForStOptionsFactory optionsFactory,
@Nullable OpaqueMemoryResource<ForStSharedResources> sharedResources,
@Nullable File localBasePath,
@Nullable Path localBasePath,
@Nullable Path remoteBasePath,
boolean enableStatistics) {

Expand All @@ -143,7 +143,7 @@ public ForStResourceContainer(
this.sharedResources = sharedResources;

this.localBasePath = localBasePath;
this.localForStPath = localBasePath != null ? new File(localBasePath, DB_DIR_STRING) : null;
this.localForStPath = localBasePath != null ? new Path(localBasePath, DB_DIR_STRING) : null;
this.remoteBasePath = remoteBasePath;
this.remoteForStPath =
remoteBasePath != null ? new Path(remoteBasePath, DB_DIR_STRING) : null;
Expand Down Expand Up @@ -266,12 +266,12 @@ public ReadOptions getReadOptions() {
}

@Nullable
public File getLocalBasePath() {
public Path getLocalBasePath() {
return localBasePath;
}

@Nullable
public File getLocalForStPath() {
public Path getLocalForStPath() {
return localForStPath;
}

Expand All @@ -289,15 +289,15 @@ public Path getBasePath() {
if (remoteBasePath != null) {
return remoteBasePath;
} else {
return Path.fromLocalFile(localBasePath);
return localBasePath;
}
}

public Path getDbPath() {
if (remoteForStPath != null) {
return remoteForStPath;
} else {
return Path.fromLocalFile(localForStPath);
return localForStPath;
}
}

Expand Down Expand Up @@ -331,25 +331,21 @@ public void prepareDirectories() throws Exception {
new Path(localBasePath.getPath()), new Path(localForStPath.getPath()));
}
if (remoteForStPath != null && localForStPath != null) {
ForStFlinkFileSystem.setupLocalBasePath(
remoteForStPath.toString(), localForStPath.toString());
}
if (cacheReservedSize > 0 || cacheCapacity > 0) {
if (cacheBasePath == null && localBasePath != null) {
cacheBasePath = new Path(localBasePath.getPath(), "cache");
LOG.info(
"Cache base path is not configured, set to local base path: {}",
cacheBasePath);
}
ForStFlinkFileSystem.configureCache(cacheBasePath, cacheCapacity, cacheReservedSize);
}
if (remoteForStPath != null) {
forstFileSystem = ForStFlinkFileSystem.get(remoteForStPath.toUri());
forstFileSystem =
ForStFlinkFileSystem.get(
remoteForStPath.toUri(),
localForStPath,
ForStFlinkFileSystem.getFileBasedCache(
cacheBasePath, cacheCapacity, cacheReservedSize));
} else {
forstFileSystem = null;
}
}

public ForStFlinkFileSystem getFileSystem() {
return forstFileSystem;
}

private static void prepareDirectories(Path basePath, Path dbPath) throws IOException {
FileSystem fileSystem = basePath.getFileSystem();
if (fileSystem.exists(basePath)) {
Expand Down Expand Up @@ -377,10 +373,9 @@ private static void prepareDirectories(Path basePath, Path dbPath) throws IOExce
public void clearDirectories() throws Exception {
if (remoteBasePath != null) {
clearDirectories(remoteBasePath);
ForStFlinkFileSystem.unregisterLocalBasePath(remoteForStPath.toString());
}
if (localBasePath != null) {
clearDirectories(new Path(localBasePath.getPath()));
clearDirectories(localBasePath);
}
}

Expand Down Expand Up @@ -464,7 +459,7 @@ private DBOptions setDBOptionsFromConfigurableOptions(DBOptions currentOptions)
String logDir = internalGetOption(ForStConfigurableOptions.LOG_DIR);
if (logDir == null || logDir.isEmpty()) {
if (localForStPath == null
|| localForStPath.getAbsolutePath().length() <= INSTANCE_PATH_LENGTH_LIMIT) {
|| localForStPath.getPath().length() <= INSTANCE_PATH_LENGTH_LIMIT) {
relocateDefaultDbLogDir(currentOptions);
} else {
// disable log relocate when instance path length exceeds limit to prevent ForSt
Expand Down Expand Up @@ -583,7 +578,7 @@ private void setLocalForStPathAsLogDir(DBOptions dbOptions) {
// issues, so the db log dir is temporarily set explicitly here.
// TODO: remove this method after ForSt deal log dir well
if (localForStPath != null) {
this.relocatedDbLogBaseDir = localForStPath.toPath();
this.relocatedDbLogBaseDir = java.nio.file.Path.of(localForStPath.toUri().toString());
dbOptions.setDbLogDir(localForStPath.getPath());
}
}
Expand All @@ -604,8 +599,7 @@ private void cleanRelocatedDbLogs() {
if (localForStPath != null && relocatedDbLogBaseDir != null) {
LOG.info("Cleaning up relocated ForSt logs: {}.", relocatedDbLogBaseDir);

String relocatedDbLogPrefix =
resolveRelocatedDbLogPrefix(localForStPath.getAbsolutePath());
String relocatedDbLogPrefix = resolveRelocatedDbLogPrefix(localForStPath.getPath());
try {
Arrays.stream(FileUtils.listDirectory(relocatedDbLogBaseDir))
.filter(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -329,8 +329,10 @@ public <K> ForStKeyedStateBackend<K> createAsyncKeyedStateBackend(
"op_%s_attempt_%s",
fileCompatibleIdentifier, env.getTaskInfo().getAttemptNumber());

File localBasePath =
new File(new File(getNextStoragePath(), jobId.toHexString()), opChildPath);
Path localBasePath =
new Path(
new File(new File(getNextStoragePath(), jobId.toHexString()), opChildPath)
.getAbsolutePath());
Path remoteBasePath =
remoteForStDirectory != null
? new Path(new Path(remoteForStDirectory, jobId.toHexString()), opChildPath)
Expand Down Expand Up @@ -391,15 +393,17 @@ public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(

lazyInitializeForJob(env, fileCompatibleIdentifier);

File instanceBasePath =
new File(
getNextStoragePath(),
"job_"
+ jobId
+ "_op_"
+ fileCompatibleIdentifier
+ "_uuid_"
+ UUID.randomUUID());
Path instanceBasePath =
new Path(
new File(
getNextStoragePath(),
"job_"
+ jobId
+ "_op_"
+ fileCompatibleIdentifier
+ "_uuid_"
+ UUID.randomUUID())
.getAbsolutePath());

LocalRecoveryConfig localRecoveryConfig =
env.getTaskStateManager().createLocalRecoveryConfig();
Expand Down Expand Up @@ -671,14 +675,14 @@ private ReadableConfig mergeConfigurableOptions(ReadableConfig base, ReadableCon
}

@VisibleForTesting
ForStResourceContainer createOptionsAndResourceContainer(@Nullable File localBasePath) {
ForStResourceContainer createOptionsAndResourceContainer(@Nullable Path localBasePath) {
return createOptionsAndResourceContainer(null, localBasePath, null, false);
}

@VisibleForTesting
private ForStResourceContainer createOptionsAndResourceContainer(
@Nullable OpaqueMemoryResource<ForStSharedResources> sharedResources,
@Nullable File localBasePath,
@Nullable Path localBasePath,
@Nullable Path remoteBasePath,
boolean enableStatistics) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,6 @@ public class ForStFlinkFileSystem extends FileSystem {

private static final Map<String, String> remoteLocalMapping = new ConcurrentHashMap<>();
private static final Function<String, Boolean> miscFileFilter = s -> !s.endsWith(".sst");
private static Path cacheBase = null;
private static long cacheCapacity = Long.MAX_VALUE;
private static long cacheReservedSize = 0;

private final FileSystem localFS;
private final FileSystem delegateFS;
Expand All @@ -86,19 +83,6 @@ public ForStFlinkFileSystem(
this.fileBasedCache = fileBasedCache;
}

/**
* Configure cache for ForStFlinkFileSystem.
*
* @param path the cache base path.
* @param cacheCap the cache capacity.
* @param reserveSize the cache reserved size.
*/
public static void configureCache(Path path, long cacheCap, long reserveSize) {
cacheBase = path;
cacheCapacity = cacheCap;
cacheReservedSize = reserveSize;
}

/**
* Returns a reference to the {@link FileSystem} instance for accessing the file system
* identified by the given {@link URI}.
Expand All @@ -108,14 +92,20 @@ public static void configureCache(Path path, long cacheCap, long reserveSize) {
* identified by the given {@link URI}.
* @throws IOException thrown if a reference to the file system instance could not be obtained.
*/
public static FileSystem get(URI uri) throws IOException {
String localBase = remoteLocalMapping.get(uri.toString());
Preconditions.checkNotNull(localBase, "localBase is null, remote uri:" + uri);
public static ForStFlinkFileSystem get(URI uri) throws IOException {
return new ForStFlinkFileSystem(
FileSystem.get(uri), uri.toString(), localBase, getFileBasedCache());
FileSystem.get(uri), uri.toString(), System.getProperty("java.io.tmpdir"), null);
}

private static FileBasedCache getFileBasedCache() throws IOException {
public static ForStFlinkFileSystem get(URI uri, Path localBase, FileBasedCache fileBasedCache)
throws IOException {
Preconditions.checkNotNull(localBase, "localBase is null, remote uri: %s.", uri);
return new ForStFlinkFileSystem(
FileSystem.get(uri), uri.toString(), localBase.toString(), fileBasedCache);
}

public static FileBasedCache getFileBasedCache(
Path cacheBase, long cacheCapacity, long cacheReservedSize) throws IOException {
if (cacheBase == null || cacheCapacity <= 0 && cacheReservedSize <= 0) {
return null;
}
Expand All @@ -139,25 +129,6 @@ private static FileBasedCache getFileBasedCache() throws IOException {
Integer.MAX_VALUE, cacheLimitPolicy, cacheBase.getFileSystem(), cacheBase);
}

/**
* Setup local base path for corresponding remote base path.
*
* @param remoteBasePath the remote base path.
* @param localBasePath the local base path.
*/
public static void setupLocalBasePath(String remoteBasePath, String localBasePath) {
remoteLocalMapping.put(remoteBasePath, localBasePath);
}

/**
* Unregister local base path for corresponding remote base path.
*
* @param remoteBasePath the remote base path.
*/
public static void unregisterLocalBasePath(String remoteBasePath) {
remoteLocalMapping.remove(remoteBasePath);
}

/**
* Create ByteBufferWritableFSDataOutputStream from specific path which supports to write data
* to ByteBuffer with {@link org.apache.flink.core.fs.FileSystem.WriteMode#OVERWRITE} mode.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.state.forst.restore;

import org.apache.flink.core.fs.Path;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.state.RegisteredStateMetaInfoBase;
import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
Expand All @@ -36,7 +37,6 @@
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
Expand All @@ -62,13 +62,13 @@ class ForStHandle implements AutoCloseable {

protected ForStHandle(
Map<String, ForStKvStateInfo> kvStateInformation,
File instanceRocksDBPath,
Path instanceRocksDBPath,
DBOptions dbOptions,
Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory,
ForStNativeMetricOptions nativeMetricOptions,
MetricGroup metricGroup) {
this.kvStateInformation = kvStateInformation;
this.dbPath = instanceRocksDBPath.getAbsolutePath();
this.dbPath = instanceRocksDBPath.getPath();
this.dbOptions = dbOptions;
this.columnFamilyOptionsFactory = columnFamilyOptionsFactory;
this.nativeMetricOptions = nativeMetricOptions;
Expand Down
Loading

0 comments on commit c59b44b

Please sign in to comment.