Skip to content

Commit

Permalink
[FLINK-36395][state/forst] Graceful quit for forst threads
Browse files Browse the repository at this point in the history
  • Loading branch information
Zakelly committed Sep 27, 2024
1 parent 82582b3 commit 8582cce
Show file tree
Hide file tree
Showing 4 changed files with 6 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@

import javax.annotation.Nonnull;

import java.io.Closeable;

/**
* An async keyed state backend provides methods supporting to access keyed state asynchronously and
* in batch.
Expand All @@ -42,7 +40,6 @@ public interface AsyncKeyedStateBackend<K>
extends Snapshotable<SnapshotResult<KeyedStateHandle>>,
InternalCheckpointListener,
Disposable,
Closeable,
AsyncExecutionController.SwitchContextListener<K> {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@
import javax.annotation.concurrent.GuardedBy;

import java.io.File;
import java.io.IOException;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.Set;
Expand Down Expand Up @@ -330,7 +329,9 @@ public void dispose() {
}
synchronized (lock) {
if (!closed) {
IOUtils.closeQuietly(this);
for (StateExecutor executor : managedStateExecutors) {
executor.shutdown();
}
}
}

Expand Down Expand Up @@ -382,19 +383,6 @@ Path getRemoteBasePath() {
return optionsContainer.getRemoteBasePath();
}

@Override
public void close() throws IOException {
synchronized (lock) {
if (closed) {
return;
}
closed = true;
for (StateExecutor executor : managedStateExecutors) {
executor.shutdown();
}
}
}

/** ForSt specific information about the k/v states. */
public static class ForStKvStateInfo implements AutoCloseable {
public final ColumnFamilyHandle columnFamilyHandle;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,11 +230,12 @@ private void checkState() {

@Override
public void shutdown() {
// Coordinator should be shutdown before others, since it submit jobs to others.
coordinatorThread.shutdown();
readThreads.shutdown();
if (!sharedWriteThread) {
writeThreads.shutdown();
}
coordinatorThread.shutdown();
LOG.info("Shutting down the ForStStateExecutor.");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ public class ForStFlinkFileSystem extends FileSystem {

private static final Map<String, String> remoteLocalMapping = new ConcurrentHashMap<>();
private static final Function<String, Boolean> miscFileFilter = s -> !s.endsWith(".sst");
private static final FileSystem localFS = FileSystem.getLocalFileSystem();

private final FileSystem localFS = FileSystem.getLocalFileSystem();
private final FileSystem delegateFS;
private final String remoteBase;
private final Function<String, Boolean> localFileFilter;
Expand Down

0 comments on commit 8582cce

Please sign in to comment.