diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStResourceContainer.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStResourceContainer.java index b807e4d6c4b9b..377496d40c860 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStResourceContainer.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStResourceContainer.java @@ -39,6 +39,7 @@ import org.forstdb.FlinkEnv; import org.forstdb.IndexType; import org.forstdb.PlainTableConfig; +import org.forstdb.Priority; import org.forstdb.ReadOptions; import org.forstdb.Statistics; import org.forstdb.TableFormatConfig; @@ -76,6 +77,8 @@ public final class ForStResourceContainer implements AutoCloseable { // the filename length limit is 255 on most operating systems private static final int INSTANCE_PATH_LENGTH_LIMIT = 255 - FORST_RELOCATE_LOG_SUFFIX.length(); + @Nullable private FlinkEnv flinkEnv = null; + @Nullable private final Path remoteBasePath; @Nullable private final Path remoteForStPath; @@ -187,7 +190,8 @@ public DBOptions getDbOptions() { // configured, // fallback to local directory currently temporarily. if (remoteForStPath != null) { - opt.setEnv(new FlinkEnv(remoteForStPath.toString(), forstFileSystem)); + flinkEnv = new FlinkEnv(remoteForStPath.toString(), forstFileSystem); + opt.setEnv(flinkEnv); } return opt; @@ -408,6 +412,15 @@ public void close() throws Exception { sharedResources.close(); } cleanRelocatedDbLogs(); + if (flinkEnv != null) { + // There is something wrong with the FlinkEnv, the background threads won't quit during + // the disposal of DB. We explicit shrink the thread pool here until the ForSt repo + // fixes that. + flinkEnv.setBackgroundThreads(0, Priority.LOW); + flinkEnv.setBackgroundThreads(0, Priority.HIGH); + flinkEnv.close(); + flinkEnv = null; + } } /**