diff --git a/flink-core/src/main/java/org/apache/flink/configuration/StateBackendOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/StateBackendOptions.java
index ce1ff2e4bdd179..193ca06aa17135 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/StateBackendOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/StateBackendOptions.java
@@ -62,6 +62,7 @@ public class StateBackendOptions {
TextElement.code(
"StateBackendFactory#createFromConfig(ReadableConfig, ClassLoader)"))
.linebreak()
- .text("Recognized shortcut names are 'hashmap' and 'rocksdb'.")
+ .text(
+ "Recognized shortcut names are 'hashmap', 'rocksdb' and 'forst'.")
.build());
}
diff --git a/flink-examples/flink-examples-streaming/pom.xml b/flink-examples/flink-examples-streaming/pom.xml
index 91de05f9ba11bb..6dd7859104945f 100644
--- a/flink-examples/flink-examples-streaming/pom.xml
+++ b/flink-examples/flink-examples-streaming/pom.xml
@@ -92,6 +92,12 @@ under the License.
${project.version}
+
+ org.apache.flink
+ flink-statebackend-forst
+ ${project.version}
+
+
diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java
index 1c545b6fdc6183..dae32922859e25 100644
--- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java
@@ -21,11 +21,14 @@
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.StateBackendOptions;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.flink.streaming.examples.wordcount.util.CLI;
@@ -34,6 +37,8 @@
import java.time.Duration;
+import static org.apache.flink.runtime.state.StateBackendLoader.FORST_STATE_BACKEND_NAME;
+
/**
* Implements the "WordCount" program that computes a simple word occurrence histogram over text
* files. This Job can be executed in both streaming and batch execution modes.
@@ -76,6 +81,11 @@ public static void main(String[] args) throws Exception {
// to building a Flink application.
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ // For async state, by default we will use the forst state backend.
+ Configuration config = Configuration.fromMap(env.getConfiguration().toMap());
+ config.set(StateBackendOptions.STATE_BACKEND, FORST_STATE_BACKEND_NAME);
+ env.configure(config);
+
// Apache Flinkās unified approach to stream and batch processing means that a DataStream
// application executed over bounded input will produce the same final results regardless
// of the configured execution mode. It is important to note what final means here: a job
@@ -118,7 +128,7 @@ public static void main(String[] args) throws Exception {
text = env.fromData(WordCountData.WORDS).name("in-memory-input");
}
- DataStream> counts =
+ KeyedStream, String> keyedStream =
// The text lines read from the source are split into words
// using a user-defined function. The tokenizer, implemented below,
// will output each word as a (2-tuple) containing (word, 1)
@@ -128,7 +138,12 @@ public static void main(String[] args) throws Exception {
// Using a keyBy allows performing aggregations and other
// stateful transformations over data on a per-key basis.
// This is similar to a GROUP BY clause in a SQL query.
- .keyBy(value -> value.f0)
+ .keyBy(value -> value.f0);
+ if (params.isAsyncState()) {
+ keyedStream.enableAsyncState();
+ }
+ DataStream> counts =
+ keyedStream
// For each key, we perform a simple sum of the "1" field, the count.
// If the input data stream is bounded, sum will output a final count for
// each word. If it is unbounded, it will continuously output updates
diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/util/CLI.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/util/CLI.java
index 7d9e831402f83a..beb65fabbdde19 100644
--- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/util/CLI.java
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/util/CLI.java
@@ -41,6 +41,7 @@ public class CLI extends ExecutionConfig.GlobalJobParameters {
public static final String OUTPUT_KEY = "output";
public static final String DISCOVERY_INTERVAL = "discovery-interval";
public static final String EXECUTION_MODE = "execution-mode";
+ public static final String ASYNC_STATE = "async-state";
public static CLI fromArgs(String[] args) throws Exception {
MultipleParameterTool params = MultipleParameterTool.fromArgs(args);
@@ -72,7 +73,12 @@ public static CLI fromArgs(String[] args) throws Exception {
executionMode = RuntimeExecutionMode.valueOf(params.get(EXECUTION_MODE).toUpperCase());
}
- return new CLI(inputs, output, watchInterval, executionMode, params);
+ boolean asyncState = false;
+ if (params.has(ASYNC_STATE)) {
+ asyncState = true;
+ }
+
+ return new CLI(inputs, output, watchInterval, executionMode, params, asyncState);
}
private final Path[] inputs;
@@ -80,18 +86,21 @@ public static CLI fromArgs(String[] args) throws Exception {
private final Duration discoveryInterval;
private final RuntimeExecutionMode executionMode;
private final MultipleParameterTool params;
+ private final boolean asyncState;
private CLI(
Path[] inputs,
Path output,
Duration discoveryInterval,
RuntimeExecutionMode executionMode,
- MultipleParameterTool params) {
+ MultipleParameterTool params,
+ boolean asyncState) {
this.inputs = inputs;
this.output = output;
this.discoveryInterval = discoveryInterval;
this.executionMode = executionMode;
this.params = params;
+ this.asyncState = asyncState;
}
public Optional getInputs() {
@@ -110,6 +119,10 @@ public RuntimeExecutionMode getExecutionMode() {
return executionMode;
}
+ public boolean isAsyncState() {
+ return asyncState;
+ }
+
public OptionalInt getInt(String key) {
if (params.has(key)) {
return OptionalInt.of(params.getInt(key));
@@ -137,7 +150,8 @@ public boolean equals(Object o) {
CLI cli = (CLI) o;
return Arrays.equals(inputs, cli.inputs)
&& Objects.equals(output, cli.output)
- && Objects.equals(discoveryInterval, cli.discoveryInterval);
+ && Objects.equals(discoveryInterval, cli.discoveryInterval)
+ && asyncState == cli.asyncState;
}
@Override
diff --git a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/StreamingExamplesITCase.java b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/StreamingExamplesITCase.java
index ac5d6b010b987a..c40f8632728abc 100644
--- a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/StreamingExamplesITCase.java
+++ b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/StreamingExamplesITCase.java
@@ -141,6 +141,25 @@ public void testWordCount() throws Exception {
compareResultsByLinesInMemory(WordCountData.COUNTS_AS_TUPLES, resultPath);
}
+ @Test
+ public void testWordCountWithAsyncState() throws Exception {
+ final String textPath = createTempFile("text.txt", WordCountData.TEXT);
+ final String resultPath = getTempDirPath("result");
+
+ org.apache.flink.streaming.examples.wordcount.WordCount.main(
+ new String[] {
+ "--input",
+ textPath,
+ "--output",
+ resultPath,
+ "--execution-mode",
+ "streaming",
+ "--async-state"
+ });
+
+ compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES, resultPath);
+ }
+
/**
* This {@link WatermarkStrategy} assigns the current system time as the event-time timestamp.
* In a real use case you should use proper timestamps and an appropriate {@link
diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/transformations/ReduceTransformation.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/transformations/ReduceTransformation.java
index 10e8fb4c34512a..fd98029e5be5d1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/transformations/ReduceTransformation.java
+++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/transformations/ReduceTransformation.java
@@ -44,7 +44,7 @@ public final class ReduceTransformation extends PhysicalTransformation keySelector;
private final TypeInformation keyTypeInfo;
private ChainingStrategy chainingStrategy = ChainingStrategy.DEFAULT_CHAINING_STRATEGY;
- private boolean isEnableAsyncState;
+ private boolean isEnableAsyncState = false;
public ReduceTransformation(
String name,