From 4e9bc63cf450ab8a9d69ba74a3f35478626b63a7 Mon Sep 17 00:00:00 2001 From: Zakelly Date: Fri, 10 May 2024 20:01:54 +0800 Subject: [PATCH] [FLINK-xxx2][Runtime] Declaring processing by chain --- .../declare/DeclarationChain.java | 185 ++++++++++++++++++ .../declare/DeclarationContext.java | 7 + ...yncStateStreamOperatorDeclarationTest.java | 51 +++++ 3 files changed, 243 insertions(+) create mode 100644 flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/declare/DeclarationChain.java diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/declare/DeclarationChain.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/declare/DeclarationChain.java new file mode 100644 index 00000000000000..fea494e6f9302c --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/declare/DeclarationChain.java @@ -0,0 +1,185 @@ +package org.apache.flink.streaming.runtime.operators.asyncprocessing.declare; + +import org.apache.flink.api.common.state.v2.StateFuture; +import org.apache.flink.util.function.FunctionWithException; +import org.apache.flink.util.function.ThrowingConsumer; + +import java.util.Deque; +import java.util.LinkedList; + +public class DeclarationChain implements ThrowingConsumer { + + private final DeclarationContext context; + + private final FunctionWithException, Exception> first; + + private final Deque> transformations; + + private DeclarationStage currentStage; + + DeclarationChain( + DeclarationContext context, + FunctionWithException, Exception> first) { + this.context = context; + this.first = first; + this.transformations = new LinkedList<>(); + } + + @Override + @SuppressWarnings({"unchecked", "rawtypes"}) + public void accept(IN in) throws Exception { + StateFuture future = first.apply(in); + for (Transformation trans : transformations) { + future = trans.apply(future); + } + } + + public DeclarationStage firstStage() throws DeclarationException { + if (currentStage != null) { + throw new DeclarationException( + "Diverged declaration. Please make sure you call firstStage() once."); + } + DeclarationStage declarationStage = new DeclarationStage<>(); + currentStage = declarationStage; + return declarationStage; + } + + Transformation getLastTransformation() { + return transformations.getLast(); + } + + public class DeclarationStage { + + private boolean afterThen = false; + + private void preCheck() throws DeclarationException { + if (afterThen) { + throw new DeclarationException( + "Double thenCompose called for single declaration block."); + } + if (currentStage != this) { + throw new DeclarationException( + "Diverged declaration. Please make sure you are declaring on the last point."); + } + afterThen = true; + } + + public DeclarationStage thenCompose( + FunctionWithException, Exception> action) + throws DeclarationException { + preCheck(); + DeclarationStage next = new DeclarationStage<>(); + ComposeTransformation trans = new ComposeTransformation<>(action, next); + transformations.add(trans); + currentStage = next; + getLastTransformation().declare(); + return next; + } + + public DeclarationStage thenAccept(ThrowingConsumer action) + throws DeclarationException { + preCheck(); + DeclarationStage next = new DeclarationStage<>(); + AcceptTransformation trans = new AcceptTransformation<>(action, next); + transformations.add(trans); + currentStage = next; + getLastTransformation().declare(); + return next; + } + + public DeclarationStage withName(String name) throws DeclarationException { + getLastTransformation().withName(name); + return this; + } + + public DeclarationChain finish() throws DeclarationException { + preCheck(); + getLastTransformation().declare(); + return DeclarationChain.this; + } + } + + interface Transformation { + StateFuture apply(StateFuture upstream) throws Exception; + + void withName(String name) throws DeclarationException; + + void declare() throws DeclarationException; + } + + abstract static class AbstractTransformation implements Transformation { + + String name = null; + + @Override + public void withName(String newName) throws DeclarationException { + if (name != null) { + throw new DeclarationException("Double naming"); + } + name = newName; + declare(); + } + } + + class ComposeTransformation extends AbstractTransformation { + + DeclarationStage to; + + FunctionWithException, ? extends Exception> action; + + NamedFunction> namedFunction; + + ComposeTransformation( + FunctionWithException, Exception> action, + DeclarationStage to) { + this.action = action; + this.to = to; + } + + @Override + public StateFuture apply(StateFuture upstream) throws Exception { + return upstream.thenCompose(namedFunction); + } + + @Override + public void declare() throws DeclarationException { + if (namedFunction == null) { + if (name == null) { + namedFunction = context.declare(action); + } else { + namedFunction = context.declare(name, action); + } + } + } + } + + class AcceptTransformation extends AbstractTransformation { + + DeclarationStage to; + + ThrowingConsumer action; + + NamedConsumer namedFunction; + + AcceptTransformation(ThrowingConsumer action, DeclarationStage to) { + this.action = action; + this.to = to; + } + + @Override + public StateFuture apply(StateFuture upstream) throws Exception { + return upstream.thenAccept(namedFunction); + } + + @Override + public void declare() throws DeclarationException { + if (namedFunction == null) { + if (name == null) { + namedFunction = context.declare(action); + } else { + namedFunction = context.declare(name, action); + } + } + } + } +} diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/declare/DeclarationContext.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/declare/DeclarationContext.java index e8d0a4a53c58f3..97b5a57ff3f0e9 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/declare/DeclarationContext.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/declare/DeclarationContext.java @@ -18,6 +18,7 @@ package org.apache.flink.streaming.runtime.operators.asyncprocessing.declare; +import org.apache.flink.api.common.state.v2.StateFuture; import org.apache.flink.util.function.BiFunctionWithException; import org.apache.flink.util.function.FunctionWithException; import org.apache.flink.util.function.ThrowingConsumer; @@ -73,6 +74,12 @@ public NamedBiFunction declare( return declare(manager.nextAssignedName(), callback); } + public DeclarationChain.DeclarationStage declareChain( + FunctionWithException, Exception> first) + throws DeclarationException { + return new DeclarationChain<>(this, first).firstStage(); + } + DeclarationManager getManager() { return manager; } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/declare/AbstractAsyncStateStreamOperatorDeclarationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/declare/AbstractAsyncStateStreamOperatorDeclarationTest.java index 41ab0125e7aa9b..873083aabc5aa0 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/declare/AbstractAsyncStateStreamOperatorDeclarationTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/declare/AbstractAsyncStateStreamOperatorDeclarationTest.java @@ -57,6 +57,20 @@ public class AbstractAsyncStateStreamOperatorDeclarationTest { subtaskIndex); } + protected KeyedOneInputStreamOperatorTestHarness, String> + createTestHarnessWithChain( + int maxParalelism, int numSubtasks, int subtaskIndex, ElementOrder elementOrder) + throws Exception { + TestDeclarationChainOperator testOperator = new TestDeclarationChainOperator(elementOrder); + return new KeyedOneInputStreamOperatorTestHarness<>( + testOperator, + new TestKeySelector(), + BasicTypeInfo.INT_TYPE_INFO, + maxParalelism, + numSubtasks, + subtaskIndex); + } + @Test public void testRecordProcessor() throws Exception { try (KeyedOneInputStreamOperatorTestHarness, String> @@ -71,6 +85,20 @@ public void testRecordProcessor() throws Exception { } } + @Test + public void testRecordProcessorWithChain() throws Exception { + try (KeyedOneInputStreamOperatorTestHarness, String> + testHarness = createTestHarnessWithChain(128, 1, 0, ElementOrder.RECORD_ORDER)) { + testHarness.open(); + TestDeclarationChainOperator testOperator = + (TestDeclarationChainOperator) testHarness.getOperator(); + ThrowingConsumer>, Exception> processor = + RecordProcessorUtils.getRecordProcessor(testOperator); + processor.accept(new StreamRecord<>(Tuple2.of(5, "5"))); + assertThat(testOperator.getValue()).isEqualTo(12); + } + } + /** A simple testing operator. */ private static class TestDeclarationOperator extends AbstractAsyncStateStreamOperator implements OneInputStreamOperator, String>, @@ -138,6 +166,29 @@ public int getValue() { } } + private static class TestDeclarationChainOperator extends TestDeclarationOperator { + + TestDeclarationChainOperator(ElementOrder elementOrder) { + super(elementOrder); + } + + @Override + public ThrowingConsumer>, Exception> declareProcess( + DeclarationContext context) throws DeclarationException { + + return context.>, Void>declareChain( + e -> { + value.addAndGet(e.getValue().f0); + return StateFutureUtils.completedVoidFuture(); + }) + .thenCompose(v -> StateFutureUtils.completedFuture(value.incrementAndGet())) + .withName("adder") + .thenAccept(value::addAndGet) + .withName("doubler") + .finish(); + } + } + /** {@link KeySelector} for tests. */ static class TestKeySelector implements KeySelector, Integer> { private static final long serialVersionUID = 1L;