Skip to content

Commit

Permalink
[FLINK-36395][state/forst] Allow interrupt in forst library load
Browse files Browse the repository at this point in the history
  • Loading branch information
Zakelly committed Sep 29, 2024
1 parent c5b3730 commit 1ba3d39
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.flink.util.DynamicCodeLoadingException;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.concurrent.FutureUtils;

import org.rocksdb.NativeLibraryLoader;
import org.rocksdb.RocksDB;
Expand All @@ -65,6 +66,9 @@
import java.util.Optional;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;

import static org.apache.flink.configuration.description.TextElement.text;
Expand Down Expand Up @@ -312,7 +316,7 @@ public <K> ForStKeyedStateBackend<K> createAsyncKeyedStateBackend(
// first, make sure that the ForSt JNI library is loaded
// we do this explicitly here to have better error handling
String tempDir = env.getTaskManagerInfo().getTmpWorkingDirectory().getAbsolutePath();
ensureForStIsLoaded(tempDir);
ensureForStIsLoaded(tempDir, env.getAsyncOperationsThreadPool());

// replace all characters that are not legal for filenames with underscore
String fileCompatibleIdentifier =
Expand Down Expand Up @@ -373,7 +377,7 @@ public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
// first, make sure that the RocksDB JNI library is loaded
// we do this explicitly here to have better error handling
String tempDir = env.getTaskManagerInfo().getTmpWorkingDirectory().getAbsolutePath();
ensureForStIsLoaded(tempDir);
ensureForStIsLoaded(tempDir, env.getAsyncOperationsThreadPool());

// replace all characters that are not legal for filenames with underscore
String fileCompatibleIdentifier =
Expand Down Expand Up @@ -696,8 +700,8 @@ public String toString() {
// ------------------------------------------------------------------------

@VisibleForTesting
static void ensureForStIsLoaded(String tempDirectory) throws IOException {
ensureForStIsLoaded(tempDirectory, NativeLibraryLoader::getInstance);
static void ensureForStIsLoaded(String tempDirectory, Executor executor) throws IOException {
ensureForStIsLoaded(tempDirectory, NativeLibraryLoader::getInstance, executor);
}

@VisibleForTesting
Expand All @@ -707,7 +711,9 @@ static void setForStInitialized(boolean initialized) {

@VisibleForTesting
static void ensureForStIsLoaded(
String tempDirectory, Supplier<NativeLibraryLoader> nativeLibraryLoaderSupplier)
String tempDirectory,
Supplier<NativeLibraryLoader> nativeLibraryLoaderSupplier,
Executor executor)
throws IOException {
synchronized (ForStStateBackend.class) {
if (!forStInitialized) {
Expand All @@ -719,7 +725,7 @@ static void ensureForStIsLoaded(

Throwable lastException = null;
for (int attempt = 1; attempt <= FORST_LIB_LOADING_ATTEMPTS; attempt++) {
File rocksLibFolder = null;
AtomicReference<File> rocksLibFolder = new AtomicReference<>(null);
try {
// when multiple instances of this class and ForSt exist in different
// class loaders, then we can see the following exception:
Expand All @@ -734,22 +740,38 @@ static void ensureForStIsLoaded(
// loaders, but
// apparently not when coming from the same file path, so there we go)

rocksLibFolder = new File(tempDirParent, "rocksdb-lib-" + new AbstractID());

// make sure the temp path exists
LOG.debug(
"Attempting to create ForSt native library folder {}",
rocksLibFolder);
// noinspection ResultOfMethodCallIgnored
rocksLibFolder.mkdirs();

// explicitly load the JNI dependency if it has not been loaded before
nativeLibraryLoaderSupplier
.get()
.loadLibrary(rocksLibFolder.getAbsolutePath());

// this initialization here should validate that the loading succeeded
RocksDB.loadLibrary();
// We use an async procedure to load the library, to make current thread be
// able to interrupt for a fast quit.
CompletableFuture<Void> future =
FutureUtils.runAsync(
() -> {
File libFolder =
new File(
tempDirParent,
"rocksdb-lib-" + new AbstractID());
rocksLibFolder.set(libFolder);

// make sure the temp path exists
LOG.debug(
"Attempting to create ForSt native library folder {}",
libFolder);
// noinspection ResultOfMethodCallIgnored
libFolder.mkdirs();

// explicitly load the JNI dependency if it has not been
// loaded before
nativeLibraryLoaderSupplier
.get()
.loadLibrary(libFolder.getAbsolutePath());

// this initialization here should validate that the
// loading succeeded
RocksDB.loadLibrary();
},
executor);

// wait for finish or be interrupted.
future.get();

// seems to have worked
LOG.info("Successfully loaded ForSt native library");
Expand All @@ -768,7 +790,7 @@ static void ensureForStIsLoaded(
tt);
}

FileUtils.deleteDirectoryQuietly(rocksLibFolder);
FileUtils.deleteDirectoryQuietly(rocksLibFolder.get());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.state.forst;

import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
import org.apache.flink.util.concurrent.Executors;

import org.junit.Assert;
import org.junit.Rule;
Expand Down Expand Up @@ -52,7 +53,8 @@ public void testTempLibFolderDeletedOnFail() throws Exception {
tempFolder.getAbsolutePath(),
() -> {
throw new ExpectedTestException();
});
},
Executors.directExecutor());
fail("Not throwing expected exception.");
} catch (IOException ignored) {
// ignored
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.state.forst;

import org.apache.flink.util.FlinkUserCodeClassLoaders;
import org.apache.flink.util.concurrent.Executors;

import org.junit.Rule;
import org.junit.Test;
Expand All @@ -27,6 +28,7 @@

import java.lang.reflect.Method;
import java.net.URL;
import java.util.concurrent.Executor;

import static org.apache.flink.util.FlinkUserCodeClassLoader.NOOP_EXCEPTION_HANDLER;
import static org.junit.Assert.assertNotEquals;
Expand Down Expand Up @@ -76,13 +78,15 @@ public void testTwoSeparateClassLoaders() throws Exception {

final String tempDir = tmp.newFolder().getAbsolutePath();

final Method meth1 = clazz1.getDeclaredMethod("ensureForStIsLoaded", String.class);
final Method meth2 = clazz2.getDeclaredMethod("ensureForStIsLoaded", String.class);
final Method meth1 =
clazz1.getDeclaredMethod("ensureForStIsLoaded", String.class, Executor.class);
final Method meth2 =
clazz2.getDeclaredMethod("ensureForStIsLoaded", String.class, Executor.class);
meth1.setAccessible(true);
meth2.setAccessible(true);

// if all is well, these methods can both complete successfully
meth1.invoke(instance1, tempDir);
meth2.invoke(instance2, tempDir);
meth1.invoke(instance1, tempDir, Executors.directExecutor());
meth2.invoke(instance2, tempDir, Executors.directExecutor());
}
}

0 comments on commit 1ba3d39

Please sign in to comment.