diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateBackend.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateBackend.java index 2dc2febfd40c8..8d3046557a94f 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateBackend.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateBackend.java @@ -46,6 +46,7 @@ import org.apache.flink.util.DynamicCodeLoadingException; import org.apache.flink.util.FileUtils; import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.concurrent.FutureUtils; import org.rocksdb.NativeLibraryLoader; import org.rocksdb.RocksDB; @@ -65,6 +66,9 @@ import java.util.Optional; import java.util.Random; import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; import static org.apache.flink.configuration.description.TextElement.text; @@ -312,7 +316,7 @@ public ForStKeyedStateBackend createAsyncKeyedStateBackend( // first, make sure that the ForSt JNI library is loaded // we do this explicitly here to have better error handling String tempDir = env.getTaskManagerInfo().getTmpWorkingDirectory().getAbsolutePath(); - ensureForStIsLoaded(tempDir); + ensureForStIsLoaded(tempDir, env.getAsyncOperationsThreadPool()); // replace all characters that are not legal for filenames with underscore String fileCompatibleIdentifier = @@ -373,7 +377,7 @@ public AbstractKeyedStateBackend createKeyedStateBackend( // first, make sure that the RocksDB JNI library is loaded // we do this explicitly here to have better error handling String tempDir = env.getTaskManagerInfo().getTmpWorkingDirectory().getAbsolutePath(); - ensureForStIsLoaded(tempDir); + ensureForStIsLoaded(tempDir, env.getAsyncOperationsThreadPool()); // replace all characters that are not legal for filenames with underscore String fileCompatibleIdentifier = @@ -696,8 +700,8 @@ public String toString() { // ------------------------------------------------------------------------ @VisibleForTesting - static void ensureForStIsLoaded(String tempDirectory) throws IOException { - ensureForStIsLoaded(tempDirectory, NativeLibraryLoader::getInstance); + static void ensureForStIsLoaded(String tempDirectory, Executor executor) throws IOException { + ensureForStIsLoaded(tempDirectory, NativeLibraryLoader::getInstance, executor); } @VisibleForTesting @@ -707,7 +711,9 @@ static void setForStInitialized(boolean initialized) { @VisibleForTesting static void ensureForStIsLoaded( - String tempDirectory, Supplier nativeLibraryLoaderSupplier) + String tempDirectory, + Supplier nativeLibraryLoaderSupplier, + Executor executor) throws IOException { synchronized (ForStStateBackend.class) { if (!forStInitialized) { @@ -719,7 +725,7 @@ static void ensureForStIsLoaded( Throwable lastException = null; for (int attempt = 1; attempt <= FORST_LIB_LOADING_ATTEMPTS; attempt++) { - File rocksLibFolder = null; + AtomicReference rocksLibFolder = new AtomicReference<>(null); try { // when multiple instances of this class and ForSt exist in different // class loaders, then we can see the following exception: @@ -734,22 +740,38 @@ static void ensureForStIsLoaded( // loaders, but // apparently not when coming from the same file path, so there we go) - rocksLibFolder = new File(tempDirParent, "rocksdb-lib-" + new AbstractID()); - - // make sure the temp path exists - LOG.debug( - "Attempting to create ForSt native library folder {}", - rocksLibFolder); - // noinspection ResultOfMethodCallIgnored - rocksLibFolder.mkdirs(); - - // explicitly load the JNI dependency if it has not been loaded before - nativeLibraryLoaderSupplier - .get() - .loadLibrary(rocksLibFolder.getAbsolutePath()); - - // this initialization here should validate that the loading succeeded - RocksDB.loadLibrary(); + // We use an async procedure to load the library, to make current thread be + // able to interrupt for a fast quit. + CompletableFuture future = + FutureUtils.runAsync( + () -> { + File libFolder = + new File( + tempDirParent, + "rocksdb-lib-" + new AbstractID()); + rocksLibFolder.set(libFolder); + + // make sure the temp path exists + LOG.debug( + "Attempting to create ForSt native library folder {}", + libFolder); + // noinspection ResultOfMethodCallIgnored + libFolder.mkdirs(); + + // explicitly load the JNI dependency if it has not been + // loaded before + nativeLibraryLoaderSupplier + .get() + .loadLibrary(libFolder.getAbsolutePath()); + + // this initialization here should validate that the + // loading succeeded + RocksDB.loadLibrary(); + }, + executor); + + // wait for finish or be interrupted. + future.get(); // seems to have worked LOG.info("Successfully loaded ForSt native library"); @@ -768,7 +790,7 @@ static void ensureForStIsLoaded( tt); } - FileUtils.deleteDirectoryQuietly(rocksLibFolder); + FileUtils.deleteDirectoryQuietly(rocksLibFolder.get()); } } diff --git a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStInitITCase.java b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStInitITCase.java index 37868ffa2ac2d..a23762d016960 100644 --- a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStInitITCase.java +++ b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStInitITCase.java @@ -19,6 +19,7 @@ package org.apache.flink.state.forst; import org.apache.flink.runtime.operators.testutils.ExpectedTestException; +import org.apache.flink.util.concurrent.Executors; import org.junit.Assert; import org.junit.Rule; @@ -52,7 +53,8 @@ public void testTempLibFolderDeletedOnFail() throws Exception { tempFolder.getAbsolutePath(), () -> { throw new ExpectedTestException(); - }); + }, + Executors.directExecutor()); fail("Not throwing expected exception."); } catch (IOException ignored) { // ignored diff --git a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStMultiClassLoaderTest.java b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStMultiClassLoaderTest.java index 9465cc0e3f346..86aa8ecae1564 100644 --- a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStMultiClassLoaderTest.java +++ b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStMultiClassLoaderTest.java @@ -19,6 +19,7 @@ package org.apache.flink.state.forst; import org.apache.flink.util.FlinkUserCodeClassLoaders; +import org.apache.flink.util.concurrent.Executors; import org.junit.Rule; import org.junit.Test; @@ -27,6 +28,7 @@ import java.lang.reflect.Method; import java.net.URL; +import java.util.concurrent.Executor; import static org.apache.flink.util.FlinkUserCodeClassLoader.NOOP_EXCEPTION_HANDLER; import static org.junit.Assert.assertNotEquals; @@ -76,13 +78,15 @@ public void testTwoSeparateClassLoaders() throws Exception { final String tempDir = tmp.newFolder().getAbsolutePath(); - final Method meth1 = clazz1.getDeclaredMethod("ensureForStIsLoaded", String.class); - final Method meth2 = clazz2.getDeclaredMethod("ensureForStIsLoaded", String.class); + final Method meth1 = + clazz1.getDeclaredMethod("ensureForStIsLoaded", String.class, Executor.class); + final Method meth2 = + clazz2.getDeclaredMethod("ensureForStIsLoaded", String.class, Executor.class); meth1.setAccessible(true); meth2.setAccessible(true); // if all is well, these methods can both complete successfully - meth1.invoke(instance1, tempDir); - meth2.invoke(instance2, tempDir); + meth1.invoke(instance1, tempDir, Executors.directExecutor()); + meth2.invoke(instance2, tempDir, Executors.directExecutor()); } }