Skip to content

Commit

Permalink
[FLINK-36838][state] Join background threads when ForSt state backend…
Browse files Browse the repository at this point in the history
… quit
  • Loading branch information
Zakelly committed Dec 3, 2024
1 parent 22cc255 commit b66c90c
Showing 1 changed file with 14 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.forstdb.FlinkEnv;
import org.forstdb.IndexType;
import org.forstdb.PlainTableConfig;
import org.forstdb.Priority;
import org.forstdb.ReadOptions;
import org.forstdb.Statistics;
import org.forstdb.TableFormatConfig;
Expand Down Expand Up @@ -76,6 +77,8 @@ public final class ForStResourceContainer implements AutoCloseable {
// the filename length limit is 255 on most operating systems
private static final int INSTANCE_PATH_LENGTH_LIMIT = 255 - FORST_RELOCATE_LOG_SUFFIX.length();

@Nullable private FlinkEnv flinkEnv = null;

@Nullable private final Path remoteBasePath;

@Nullable private final Path remoteForStPath;
Expand Down Expand Up @@ -187,7 +190,8 @@ public DBOptions getDbOptions() {
// configured,
// fallback to local directory currently temporarily.
if (remoteForStPath != null) {
opt.setEnv(new FlinkEnv(remoteForStPath.toString(), forstFileSystem));
flinkEnv = new FlinkEnv(remoteForStPath.toString(), forstFileSystem);
opt.setEnv(flinkEnv);
}

return opt;
Expand Down Expand Up @@ -408,6 +412,15 @@ public void close() throws Exception {
sharedResources.close();
}
cleanRelocatedDbLogs();
if (flinkEnv != null) {
// There is something wrong with the FlinkEnv, the background threads won't quit during
// the disposal of DB. We explicit shrink the thread pool here until the ForSt repo
// fixes that.
flinkEnv.setBackgroundThreads(0, Priority.LOW);
flinkEnv.setBackgroundThreads(0, Priority.HIGH);
flinkEnv.close();
flinkEnv = null;
}
}

/**
Expand Down

0 comments on commit b66c90c

Please sign in to comment.