diff --git a/flink-dist/pom.xml b/flink-dist/pom.xml index 7059348a7b78e5..9bf2f2f8859ba6 100644 --- a/flink-dist/pom.xml +++ b/flink-dist/pom.xml @@ -117,6 +117,13 @@ under the License. ${flink.markBundledAsOptional} + + org.apache.flink + flink-statebackend-forst + ${project.version} + ${flink.markBundledAsOptional} + + org.apache.flink flink-statebackend-changelog diff --git a/flink-dist/src/main/resources/META-INF/NOTICE b/flink-dist/src/main/resources/META-INF/NOTICE index 72512500a6c5e4..bc71c1c33816fe 100644 --- a/flink-dist/src/main/resources/META-INF/NOTICE +++ b/flink-dist/src/main/resources/META-INF/NOTICE @@ -9,6 +9,7 @@ This project bundles the following dependencies under the Apache Software Licens - com.google.code.findbugs:jsr305:1.3.9 - com.twitter:chill-java:0.7.6 - com.ververica:frocksdbjni:8.10.0-ververica-beta-1.0 +- com.ververica:forstjni:jar:0.1.0-beta - commons-cli:commons-cli:1.5.0 - commons-collections:commons-collections:3.2.2 - commons-io:commons-io:2.15.1 diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendLoader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendLoader.java index 0084dba711ac7e..e9278626d5e141 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendLoader.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendLoader.java @@ -62,6 +62,10 @@ public class StateBackendLoader { private static final String ROCKSDB_STATE_BACKEND_FACTORY = "org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackendFactory"; + /** Used for loading ForStStateBackend. */ + private static final String FORST_STATE_BACKEND_FACTORY = + "org.apache.flink.state.forst.ForStStateBackendFactory"; + // ------------------------------------------------------------------------ // Configuration shortcut names // ------------------------------------------------------------------------ @@ -77,6 +81,8 @@ public class StateBackendLoader { /** The shortcut configuration name for the RocksDB State Backend. */ public static final String ROCKSDB_STATE_BACKEND_NAME = "rocksdb"; + public static final String FORST_STATE_BACKEND_NAME = "forst"; + // ------------------------------------------------------------------------ // Loading the state backend from a configuration // ------------------------------------------------------------------------ @@ -144,38 +150,45 @@ public static StateBackend loadStateBackendFromConfig( case ROCKSDB_STATE_BACKEND_NAME: factoryClassName = ROCKSDB_STATE_BACKEND_FACTORY; - // fall through to the 'default' case that uses reflection to load the backend + // fall through to the case that uses reflection to load the backend // that way we can keep RocksDB in a separate module + break; - default: - if (logger != null) { - logger.info("Loading state backend via factory {}", factoryClassName); - } + case FORST_STATE_BACKEND_NAME: + factoryClassName = FORST_STATE_BACKEND_FACTORY; - StateBackendFactory factory; - try { - @SuppressWarnings("rawtypes") - Class clazz = - Class.forName(factoryClassName, false, classLoader) - .asSubclass(StateBackendFactory.class); - - factory = clazz.newInstance(); - } catch (ClassNotFoundException e) { - throw new DynamicCodeLoadingException( - "Cannot find configured state backend factory class: " + backendName, - e); - } catch (ClassCastException | InstantiationException | IllegalAccessException e) { - throw new DynamicCodeLoadingException( - "The class configured under '" - + StateBackendOptions.STATE_BACKEND.key() - + "' is not a valid state backend factory (" - + backendName - + ')', - e); - } + // fall through to the case that uses reflection to load the backend + // that way we can keep ForSt in a separate module + break; + } - return factory.createFromConfig(config, classLoader); + // The reflection loading path + if (logger != null) { + logger.info("Loading state backend via factory {}", factoryClassName); } + + StateBackendFactory factory; + try { + @SuppressWarnings("rawtypes") + Class clazz = + Class.forName(factoryClassName, false, classLoader) + .asSubclass(StateBackendFactory.class); + + factory = clazz.newInstance(); + } catch (ClassNotFoundException e) { + throw new DynamicCodeLoadingException( + "Cannot find configured state backend factory class: " + backendName, e); + } catch (ClassCastException | InstantiationException | IllegalAccessException e) { + throw new DynamicCodeLoadingException( + "The class configured under '" + + StateBackendOptions.STATE_BACKEND.key() + + "' is not a valid state backend factory (" + + backendName + + ')', + e); + } + + return factory.createFromConfig(config, classLoader); } /** diff --git a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateBackendFactoryTest.java b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateBackendFactoryTest.java index effcd2be793dd4..e799f8f85fb05e 100644 --- a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateBackendFactoryTest.java +++ b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStStateBackendFactoryTest.java @@ -68,20 +68,27 @@ public void testLoadForStStateBackend() throws Exception { final String localDirs = localDir1 + File.pathSeparator + localDir2; final boolean incremental = !CheckpointingOptions.INCREMENTAL_CHECKPOINTS.defaultValue(); - // TODO: Support short name of backendKey - final Configuration config1 = new Configuration(); - config1.setString(backendKey, ForStStateBackendFactory.class.getName()); + config1.setString(backendKey, "forst"); config1.set(ForStOptions.LOCAL_DIRECTORIES, localDirs); config1.set(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, incremental); + final Configuration config2 = new Configuration(); + config2.setString(backendKey, ForStStateBackendFactory.class.getName()); + config2.set(ForStOptions.LOCAL_DIRECTORIES, localDirs); + config2.set(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, incremental); + StateBackend backend1 = StateBackendLoader.loadStateBackendFromConfig(config1, cl, null); + StateBackend backend2 = StateBackendLoader.loadStateBackendFromConfig(config2, cl, null); assertTrue(backend1 instanceof ForStStateBackend); + assertTrue(backend2 instanceof ForStStateBackend); ForStStateBackend fs1 = (ForStStateBackend) backend1; + ForStStateBackend fs2 = (ForStStateBackend) backend1; checkPaths(fs1.getLocalDbStoragePaths(), localDir1, localDir2); + checkPaths(fs2.getLocalDbStoragePaths(), localDir1, localDir2); } /**