From 665a5670abfccf34d99792f4e782d824e01f030d Mon Sep 17 00:00:00 2001 From: Zakelly Date: Thu, 19 Dec 2024 17:16:45 +0800 Subject: [PATCH] [FLINK-36944][Example] Introduce async state version of wordcount --- .../common_state_backends_section.html | 2 +- .../state_backend_configuration.html | 2 +- .../configuration/StateBackendOptions.java | 3 ++- .../flink-examples-streaming/pom.xml | 6 ++++++ .../examples/wordcount/WordCount.java | 19 ++++++++++++++++-- .../examples/wordcount/util/CLI.java | 20 ++++++++++++++++--- .../test/StreamingExamplesITCase.java | 19 ++++++++++++++++++ .../transformations/ReduceTransformation.java | 2 +- 8 files changed, 64 insertions(+), 9 deletions(-) diff --git a/docs/layouts/shortcodes/generated/common_state_backends_section.html b/docs/layouts/shortcodes/generated/common_state_backends_section.html index dd6106e728718..228571fdcf7dc 100644 --- a/docs/layouts/shortcodes/generated/common_state_backends_section.html +++ b/docs/layouts/shortcodes/generated/common_state_backends_section.html @@ -12,7 +12,7 @@
state.backend.type
"hashmap" String - The state backend to be used to store state.
The implementation can be specified either via their shortcut name, or via the class name of a StateBackendFactory. If a factory is specified it is instantiated via its zero argument constructor and its StateBackendFactory#createFromConfig(ReadableConfig, ClassLoader) method is called.
Recognized shortcut names are 'hashmap' and 'rocksdb'. + The state backend to be used to store state.
The implementation can be specified either via their shortcut name, or via the class name of a StateBackendFactory. If a factory is specified it is instantiated via its zero argument constructor and its StateBackendFactory#createFromConfig(ReadableConfig, ClassLoader) method is called.
Recognized shortcut names are 'hashmap', 'rocksdb' and 'forst'. diff --git a/docs/layouts/shortcodes/generated/state_backend_configuration.html b/docs/layouts/shortcodes/generated/state_backend_configuration.html index dd6106e728718..228571fdcf7dc 100644 --- a/docs/layouts/shortcodes/generated/state_backend_configuration.html +++ b/docs/layouts/shortcodes/generated/state_backend_configuration.html @@ -12,7 +12,7 @@
state.backend.type
"hashmap" String - The state backend to be used to store state.
The implementation can be specified either via their shortcut name, or via the class name of a StateBackendFactory. If a factory is specified it is instantiated via its zero argument constructor and its StateBackendFactory#createFromConfig(ReadableConfig, ClassLoader) method is called.
Recognized shortcut names are 'hashmap' and 'rocksdb'. + The state backend to be used to store state.
The implementation can be specified either via their shortcut name, or via the class name of a StateBackendFactory. If a factory is specified it is instantiated via its zero argument constructor and its StateBackendFactory#createFromConfig(ReadableConfig, ClassLoader) method is called.
Recognized shortcut names are 'hashmap', 'rocksdb' and 'forst'. diff --git a/flink-core/src/main/java/org/apache/flink/configuration/StateBackendOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/StateBackendOptions.java index ce1ff2e4bdd17..193ca06aa1713 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/StateBackendOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/StateBackendOptions.java @@ -62,6 +62,7 @@ public class StateBackendOptions { TextElement.code( "StateBackendFactory#createFromConfig(ReadableConfig, ClassLoader)")) .linebreak() - .text("Recognized shortcut names are 'hashmap' and 'rocksdb'.") + .text( + "Recognized shortcut names are 'hashmap', 'rocksdb' and 'forst'.") .build()); } diff --git a/flink-examples/flink-examples-streaming/pom.xml b/flink-examples/flink-examples-streaming/pom.xml index 91de05f9ba11b..6dd7859104945 100644 --- a/flink-examples/flink-examples-streaming/pom.xml +++ b/flink-examples/flink-examples-streaming/pom.xml @@ -92,6 +92,12 @@ under the License. ${project.version} + + org.apache.flink + flink-statebackend-forst + ${project.version} + + diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java index 1c545b6fdc618..dae32922859e2 100644 --- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java +++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java @@ -21,11 +21,14 @@ import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.serialization.SimpleStringEncoder; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.MemorySize; +import org.apache.flink.configuration.StateBackendOptions; import org.apache.flink.connector.file.sink.FileSink; import org.apache.flink.connector.file.src.FileSource; import org.apache.flink.connector.file.src.reader.TextLineInputFormat; import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy; import org.apache.flink.streaming.examples.wordcount.util.CLI; @@ -34,6 +37,8 @@ import java.time.Duration; +import static org.apache.flink.runtime.state.StateBackendLoader.FORST_STATE_BACKEND_NAME; + /** * Implements the "WordCount" program that computes a simple word occurrence histogram over text * files. This Job can be executed in both streaming and batch execution modes. @@ -76,6 +81,11 @@ public static void main(String[] args) throws Exception { // to building a Flink application. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + // For async state, by default we will use the forst state backend. + Configuration config = Configuration.fromMap(env.getConfiguration().toMap()); + config.set(StateBackendOptions.STATE_BACKEND, FORST_STATE_BACKEND_NAME); + env.configure(config); + // Apache Flinkā€™s unified approach to stream and batch processing means that a DataStream // application executed over bounded input will produce the same final results regardless // of the configured execution mode. It is important to note what final means here: a job @@ -118,7 +128,7 @@ public static void main(String[] args) throws Exception { text = env.fromData(WordCountData.WORDS).name("in-memory-input"); } - DataStream> counts = + KeyedStream, String> keyedStream = // The text lines read from the source are split into words // using a user-defined function. The tokenizer, implemented below, // will output each word as a (2-tuple) containing (word, 1) @@ -128,7 +138,12 @@ public static void main(String[] args) throws Exception { // Using a keyBy allows performing aggregations and other // stateful transformations over data on a per-key basis. // This is similar to a GROUP BY clause in a SQL query. - .keyBy(value -> value.f0) + .keyBy(value -> value.f0); + if (params.isAsyncState()) { + keyedStream.enableAsyncState(); + } + DataStream> counts = + keyedStream // For each key, we perform a simple sum of the "1" field, the count. // If the input data stream is bounded, sum will output a final count for // each word. If it is unbounded, it will continuously output updates diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/util/CLI.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/util/CLI.java index 7d9e831402f83..beb65fabbdde1 100644 --- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/util/CLI.java +++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/util/CLI.java @@ -41,6 +41,7 @@ public class CLI extends ExecutionConfig.GlobalJobParameters { public static final String OUTPUT_KEY = "output"; public static final String DISCOVERY_INTERVAL = "discovery-interval"; public static final String EXECUTION_MODE = "execution-mode"; + public static final String ASYNC_STATE = "async-state"; public static CLI fromArgs(String[] args) throws Exception { MultipleParameterTool params = MultipleParameterTool.fromArgs(args); @@ -72,7 +73,12 @@ public static CLI fromArgs(String[] args) throws Exception { executionMode = RuntimeExecutionMode.valueOf(params.get(EXECUTION_MODE).toUpperCase()); } - return new CLI(inputs, output, watchInterval, executionMode, params); + boolean asyncState = false; + if (params.has(ASYNC_STATE)) { + asyncState = true; + } + + return new CLI(inputs, output, watchInterval, executionMode, params, asyncState); } private final Path[] inputs; @@ -80,18 +86,21 @@ public static CLI fromArgs(String[] args) throws Exception { private final Duration discoveryInterval; private final RuntimeExecutionMode executionMode; private final MultipleParameterTool params; + private final boolean asyncState; private CLI( Path[] inputs, Path output, Duration discoveryInterval, RuntimeExecutionMode executionMode, - MultipleParameterTool params) { + MultipleParameterTool params, + boolean asyncState) { this.inputs = inputs; this.output = output; this.discoveryInterval = discoveryInterval; this.executionMode = executionMode; this.params = params; + this.asyncState = asyncState; } public Optional getInputs() { @@ -110,6 +119,10 @@ public RuntimeExecutionMode getExecutionMode() { return executionMode; } + public boolean isAsyncState() { + return asyncState; + } + public OptionalInt getInt(String key) { if (params.has(key)) { return OptionalInt.of(params.getInt(key)); @@ -137,7 +150,8 @@ public boolean equals(Object o) { CLI cli = (CLI) o; return Arrays.equals(inputs, cli.inputs) && Objects.equals(output, cli.output) - && Objects.equals(discoveryInterval, cli.discoveryInterval); + && Objects.equals(discoveryInterval, cli.discoveryInterval) + && asyncState == cli.asyncState; } @Override diff --git a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/StreamingExamplesITCase.java b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/StreamingExamplesITCase.java index ac5d6b010b987..c40f8632728ab 100644 --- a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/StreamingExamplesITCase.java +++ b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/StreamingExamplesITCase.java @@ -141,6 +141,25 @@ public void testWordCount() throws Exception { compareResultsByLinesInMemory(WordCountData.COUNTS_AS_TUPLES, resultPath); } + @Test + public void testWordCountWithAsyncState() throws Exception { + final String textPath = createTempFile("text.txt", WordCountData.TEXT); + final String resultPath = getTempDirPath("result"); + + org.apache.flink.streaming.examples.wordcount.WordCount.main( + new String[] { + "--input", + textPath, + "--output", + resultPath, + "--execution-mode", + "streaming", + "--async-state" + }); + + compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES, resultPath); + } + /** * This {@link WatermarkStrategy} assigns the current system time as the event-time timestamp. * In a real use case you should use proper timestamps and an appropriate {@link diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/transformations/ReduceTransformation.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/transformations/ReduceTransformation.java index 10e8fb4c34512..fd98029e5be5d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/transformations/ReduceTransformation.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/transformations/ReduceTransformation.java @@ -44,7 +44,7 @@ public final class ReduceTransformation extends PhysicalTransformation keySelector; private final TypeInformation keyTypeInfo; private ChainingStrategy chainingStrategy = ChainingStrategy.DEFAULT_CHAINING_STRATEGY; - private boolean isEnableAsyncState; + private boolean isEnableAsyncState = false; public ReduceTransformation( String name,