Skip to content

Commit

Permalink
[FLINK-36944][Example] Introduce async state version of wordcount
Browse files Browse the repository at this point in the history
  • Loading branch information
Zakelly committed Dec 22, 2024
1 parent 202c145 commit 358d978
Show file tree
Hide file tree
Showing 6 changed files with 62 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public class StateBackendOptions {
TextElement.code(
"StateBackendFactory#createFromConfig(ReadableConfig, ClassLoader)"))
.linebreak()
.text("Recognized shortcut names are 'hashmap' and 'rocksdb'.")
.text(
"Recognized shortcut names are 'hashmap', 'rocksdb' and 'forst'.")
.build());
}
6 changes: 6 additions & 0 deletions flink-examples/flink-examples-streaming/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,12 @@ under the License.
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-forst</artifactId>
<version>${project.version}</version>
</dependency>

<!-- Dependencies for MatrixVectorMul. We exclude native libraries
because it is not available in all the operating systems and architectures. Moreover,
we also want to enable users to compile and run MatrixVectorMul in different runtime environments.-->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,14 @@
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.StateBackendOptions;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.flink.streaming.examples.wordcount.util.CLI;
Expand All @@ -34,6 +37,8 @@

import java.time.Duration;

import static org.apache.flink.runtime.state.StateBackendLoader.FORST_STATE_BACKEND_NAME;

/**
* Implements the "WordCount" program that computes a simple word occurrence histogram over text
* files. This Job can be executed in both streaming and batch execution modes.
Expand Down Expand Up @@ -76,6 +81,11 @@ public static void main(String[] args) throws Exception {
// to building a Flink application.
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// For async state, by default we will use the forst state backend.
Configuration config = Configuration.fromMap(env.getConfiguration().toMap());
config.set(StateBackendOptions.STATE_BACKEND, FORST_STATE_BACKEND_NAME);
env.configure(config);

// Apache Flink’s unified approach to stream and batch processing means that a DataStream
// application executed over bounded input will produce the same final results regardless
// of the configured execution mode. It is important to note what final means here: a job
Expand Down Expand Up @@ -118,7 +128,7 @@ public static void main(String[] args) throws Exception {
text = env.fromData(WordCountData.WORDS).name("in-memory-input");
}

DataStream<Tuple2<String, Integer>> counts =
KeyedStream<Tuple2<String, Integer>, String> keyedStream =
// The text lines read from the source are split into words
// using a user-defined function. The tokenizer, implemented below,
// will output each word as a (2-tuple) containing (word, 1)
Expand All @@ -128,7 +138,12 @@ public static void main(String[] args) throws Exception {
// Using a keyBy allows performing aggregations and other
// stateful transformations over data on a per-key basis.
// This is similar to a GROUP BY clause in a SQL query.
.keyBy(value -> value.f0)
.keyBy(value -> value.f0);
if (params.isAsyncState()) {
keyedStream.enableAsyncState();
}
DataStream<Tuple2<String, Integer>> counts =
keyedStream
// For each key, we perform a simple sum of the "1" field, the count.
// If the input data stream is bounded, sum will output a final count for
// each word. If it is unbounded, it will continuously output updates
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public class CLI extends ExecutionConfig.GlobalJobParameters {
public static final String OUTPUT_KEY = "output";
public static final String DISCOVERY_INTERVAL = "discovery-interval";
public static final String EXECUTION_MODE = "execution-mode";
public static final String ASYNC_STATE = "async-state";

public static CLI fromArgs(String[] args) throws Exception {
MultipleParameterTool params = MultipleParameterTool.fromArgs(args);
Expand Down Expand Up @@ -72,26 +73,34 @@ public static CLI fromArgs(String[] args) throws Exception {
executionMode = RuntimeExecutionMode.valueOf(params.get(EXECUTION_MODE).toUpperCase());
}

return new CLI(inputs, output, watchInterval, executionMode, params);
boolean asyncState = false;
if (params.has(ASYNC_STATE)) {
asyncState = true;
}

return new CLI(inputs, output, watchInterval, executionMode, params, asyncState);
}

private final Path[] inputs;
private final Path output;
private final Duration discoveryInterval;
private final RuntimeExecutionMode executionMode;
private final MultipleParameterTool params;
private final boolean asyncState;

private CLI(
Path[] inputs,
Path output,
Duration discoveryInterval,
RuntimeExecutionMode executionMode,
MultipleParameterTool params) {
MultipleParameterTool params,
boolean asyncState) {
this.inputs = inputs;
this.output = output;
this.discoveryInterval = discoveryInterval;
this.executionMode = executionMode;
this.params = params;
this.asyncState = asyncState;
}

public Optional<Path[]> getInputs() {
Expand All @@ -110,6 +119,10 @@ public RuntimeExecutionMode getExecutionMode() {
return executionMode;
}

public boolean isAsyncState() {
return asyncState;
}

public OptionalInt getInt(String key) {
if (params.has(key)) {
return OptionalInt.of(params.getInt(key));
Expand Down Expand Up @@ -137,7 +150,8 @@ public boolean equals(Object o) {
CLI cli = (CLI) o;
return Arrays.equals(inputs, cli.inputs)
&& Objects.equals(output, cli.output)
&& Objects.equals(discoveryInterval, cli.discoveryInterval);
&& Objects.equals(discoveryInterval, cli.discoveryInterval)
&& asyncState == cli.asyncState;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,25 @@ public void testWordCount() throws Exception {
compareResultsByLinesInMemory(WordCountData.COUNTS_AS_TUPLES, resultPath);
}

@Test
public void testWordCountWithAsyncState() throws Exception {
final String textPath = createTempFile("text.txt", WordCountData.TEXT);
final String resultPath = getTempDirPath("result");

org.apache.flink.streaming.examples.wordcount.WordCount.main(
new String[] {
"--input",
textPath,
"--output",
resultPath,
"--execution-mode",
"streaming",
"--async-state"
});

compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES, resultPath);
}

/**
* This {@link WatermarkStrategy} assigns the current system time as the event-time timestamp.
* In a real use case you should use proper timestamps and an appropriate {@link
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public final class ReduceTransformation<IN, K> extends PhysicalTransformation<IN
private final KeySelector<IN, K> keySelector;
private final TypeInformation<K> keyTypeInfo;
private ChainingStrategy chainingStrategy = ChainingStrategy.DEFAULT_CHAINING_STRATEGY;
private boolean isEnableAsyncState;
private boolean isEnableAsyncState = false;

public ReduceTransformation(
String name,
Expand Down

0 comments on commit 358d978

Please sign in to comment.