From d412e40bd31c00bf63ffa111043e248abf38f9d8 Mon Sep 17 00:00:00 2001 From: Zakelly Date: Tue, 3 Dec 2024 11:11:21 +0800 Subject: [PATCH] [FLINK-35156][Runtime] Rework: Make operators of DataStream V2 integrate with async state processing framework --- .../datastream/api/context/StateManager.java | 10 +-- .../context/DefaultPartitionedContext.java | 8 +- .../impl/context/DefaultStateManager.java | 78 +++++++++---------- .../impl/operators/KeyedProcessOperator.java | 28 ++----- ...KeyedTwoInputBroadcastProcessOperator.java | 28 ++----- ...edTwoInputNonBroadcastProcessOperator.java | 33 +++----- .../KeyedTwoOutputProcessOperator.java | 29 ++----- .../impl/operators/ProcessOperator.java | 12 ++- .../TwoInputBroadcastProcessOperator.java | 12 ++- .../TwoInputNonBroadcastProcessOperator.java | 12 ++- .../operators/TwoOutputProcessOperator.java | 14 +++- .../DefaultNonPartitionedContextTest.java | 13 +++- .../impl/context/DefaultStateManagerTest.java | 10 ++- ...ultTwoOutputNonPartitionedContextTest.java | 13 +++- .../MockFreqCountProcessFunction.java | 2 +- ...MockGlobalListAppenderProcessFunction.java | 2 +- .../MockListAppenderProcessFunction.java | 2 +- .../MockMultiplierProcessFunction.java | 2 +- ...MockRecudingMultiplierProcessFunction.java | 2 +- .../MockSumAggregateProcessFunction.java | 2 +- .../AbstractAsyncStateStreamOperator.java | 8 +- .../collect/utils/MockOperatorStateStore.java | 28 ++++++- 22 files changed, 184 insertions(+), 164 deletions(-) diff --git a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/context/StateManager.java b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/context/StateManager.java index e92de2856b0d0d..c7919286c69cf5 100644 --- a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/context/StateManager.java +++ b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/context/StateManager.java @@ -19,18 +19,18 @@ package org.apache.flink.datastream.api.context; import org.apache.flink.annotation.Experimental; -import org.apache.flink.api.common.state.AggregatingState; import org.apache.flink.api.common.state.AggregatingStateDeclaration; import org.apache.flink.api.common.state.BroadcastState; import org.apache.flink.api.common.state.BroadcastStateDeclaration; -import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDeclaration; -import org.apache.flink.api.common.state.MapState; import org.apache.flink.api.common.state.MapStateDeclaration; -import org.apache.flink.api.common.state.ReducingState; import org.apache.flink.api.common.state.ReducingStateDeclaration; -import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDeclaration; +import org.apache.flink.api.common.state.v2.AggregatingState; +import org.apache.flink.api.common.state.v2.ListState; +import org.apache.flink.api.common.state.v2.MapState; +import org.apache.flink.api.common.state.v2.ReducingState; +import org.apache.flink.api.common.state.v2.ValueState; import java.util.Optional; diff --git a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultPartitionedContext.java b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultPartitionedContext.java index b3c7e4b632309b..02f3838ddac7c2 100644 --- a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultPartitionedContext.java +++ b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultPartitionedContext.java @@ -18,16 +18,16 @@ package org.apache.flink.datastream.impl.context; -import org.apache.flink.api.common.state.OperatorStateStore; import org.apache.flink.datastream.api.context.JobInfo; import org.apache.flink.datastream.api.context.PartitionedContext; import org.apache.flink.datastream.api.context.ProcessingTimeManager; import org.apache.flink.datastream.api.context.RuntimeContext; import org.apache.flink.datastream.api.context.TaskInfo; import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.state.v2.OperatorStateStore; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; -import java.util.function.Consumer; +import java.util.function.BiConsumer; import java.util.function.Supplier; /** The default implementation of {@link PartitionedContext}. */ @@ -41,14 +41,14 @@ public class DefaultPartitionedContext implements PartitionedContext { public DefaultPartitionedContext( RuntimeContext context, Supplier currentKeySupplier, - Consumer currentKeySetter, + BiConsumer processorWithKey, ProcessingTimeManager processingTimeManager, StreamingRuntimeContext operatorContext, OperatorStateStore operatorStateStore) { this.context = context; this.stateManager = new DefaultStateManager( - currentKeySupplier, currentKeySetter, operatorContext, operatorStateStore); + currentKeySupplier, processorWithKey, operatorContext, operatorStateStore); this.processingTimeManager = processingTimeManager; } diff --git a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultStateManager.java b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultStateManager.java index bcc7452040386d..d4074d1cfdcd3a 100644 --- a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultStateManager.java +++ b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultStateManager.java @@ -18,31 +18,32 @@ package org.apache.flink.datastream.impl.context; -import org.apache.flink.api.common.state.AggregatingState; import org.apache.flink.api.common.state.AggregatingStateDeclaration; -import org.apache.flink.api.common.state.AggregatingStateDescriptor; import org.apache.flink.api.common.state.BroadcastState; import org.apache.flink.api.common.state.BroadcastStateDeclaration; -import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDeclaration; -import org.apache.flink.api.common.state.ListStateDescriptor; -import org.apache.flink.api.common.state.MapState; import org.apache.flink.api.common.state.MapStateDeclaration; -import org.apache.flink.api.common.state.MapStateDescriptor; -import org.apache.flink.api.common.state.OperatorStateStore; -import org.apache.flink.api.common.state.ReducingState; import org.apache.flink.api.common.state.ReducingStateDeclaration; -import org.apache.flink.api.common.state.ReducingStateDescriptor; import org.apache.flink.api.common.state.StateDeclaration; -import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDeclaration; -import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.state.v2.AggregatingState; +import org.apache.flink.api.common.state.v2.ListState; +import org.apache.flink.api.common.state.v2.MapState; +import org.apache.flink.api.common.state.v2.ReducingState; +import org.apache.flink.api.common.state.v2.ValueState; +import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.datastream.api.context.StateManager; +import org.apache.flink.runtime.state.v2.AggregatingStateDescriptor; +import org.apache.flink.runtime.state.v2.ListStateDescriptor; +import org.apache.flink.runtime.state.v2.MapStateDescriptor; +import org.apache.flink.runtime.state.v2.OperatorStateStore; +import org.apache.flink.runtime.state.v2.ReducingStateDescriptor; +import org.apache.flink.runtime.state.v2.ValueStateDescriptor; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; import org.apache.flink.util.Preconditions; import java.util.Optional; -import java.util.function.Consumer; +import java.util.function.BiConsumer; import java.util.function.Supplier; /** @@ -51,13 +52,10 @@ */ public class DefaultStateManager implements StateManager { - /** - * Retrieve the current key. When {@link #currentKeySetter} receives a key, this must return - * that key until it is reset. - */ + /** Retrieve the current key. */ private final Supplier currentKeySupplier; - private final Consumer currentKeySetter; + private final BiConsumer processorWithKey; protected final StreamingRuntimeContext operatorContext; @@ -65,11 +63,11 @@ public class DefaultStateManager implements StateManager { public DefaultStateManager( Supplier currentKeySupplier, - Consumer currentKeySetter, + BiConsumer processorWithKey, StreamingRuntimeContext operatorContext, OperatorStateStore operatorStateStore) { this.currentKeySupplier = currentKeySupplier; - this.currentKeySetter = currentKeySetter; + this.processorWithKey = processorWithKey; this.operatorContext = Preconditions.checkNotNull(operatorContext); this.operatorStateStore = Preconditions.checkNotNull(operatorStateStore); } @@ -86,8 +84,9 @@ public Optional> getState(ValueStateDeclaration stateDeclar ValueStateDescriptor valueStateDescriptor = new ValueStateDescriptor<>( stateDeclaration.getName(), - stateDeclaration.getTypeDescriptor().getTypeClass()); - return Optional.ofNullable(operatorContext.getState(valueStateDescriptor)); + TypeExtractor.createTypeInfo( + stateDeclaration.getTypeDescriptor().getTypeClass())); + return Optional.ofNullable(operatorContext.getValueState(valueStateDescriptor)); } @Override @@ -97,7 +96,8 @@ public Optional> getState(ListStateDeclaration stateDeclarat ListStateDescriptor listStateDescriptor = new ListStateDescriptor<>( stateDeclaration.getName(), - stateDeclaration.getTypeDescriptor().getTypeClass()); + TypeExtractor.createTypeInfo( + stateDeclaration.getTypeDescriptor().getTypeClass())); if (stateDeclaration.getRedistributionMode() == StateDeclaration.RedistributionMode.REDISTRIBUTABLE) { @@ -119,8 +119,10 @@ public Optional> getState(MapStateDeclaration stateD MapStateDescriptor mapStateDescriptor = new MapStateDescriptor<>( stateDeclaration.getName(), - stateDeclaration.getKeyTypeDescriptor().getTypeClass(), - stateDeclaration.getValueTypeDescriptor().getTypeClass()); + TypeExtractor.createTypeInfo( + stateDeclaration.getKeyTypeDescriptor().getTypeClass()), + TypeExtractor.createTypeInfo( + stateDeclaration.getValueTypeDescriptor().getTypeClass())); return Optional.ofNullable(operatorContext.getMapState(mapStateDescriptor)); } @@ -131,7 +133,8 @@ public Optional> getState(ReducingStateDeclaration state new ReducingStateDescriptor<>( stateDeclaration.getName(), stateDeclaration.getReduceFunction(), - stateDeclaration.getTypeDescriptor().getTypeClass()); + TypeExtractor.createTypeInfo( + stateDeclaration.getTypeDescriptor().getTypeClass())); return Optional.ofNullable(operatorContext.getReducingState(reducingStateDescriptor)); } @@ -142,7 +145,8 @@ public Optional> getState( new AggregatingStateDescriptor<>( stateDeclaration.getName(), stateDeclaration.getAggregateFunction(), - stateDeclaration.getTypeDescriptor().getTypeClass()); + TypeExtractor.createTypeInfo( + stateDeclaration.getTypeDescriptor().getTypeClass())); return Optional.ofNullable(operatorContext.getAggregatingState(aggregatingStateDescriptor)); } @@ -152,8 +156,10 @@ public Optional> getState( MapStateDescriptor mapStateDescriptor = new MapStateDescriptor<>( stateDeclaration.getName(), - stateDeclaration.getKeyTypeDescriptor().getTypeClass(), - stateDeclaration.getValueTypeDescriptor().getTypeClass()); + TypeExtractor.createTypeInfo( + stateDeclaration.getKeyTypeDescriptor().getTypeClass()), + TypeExtractor.createTypeInfo( + stateDeclaration.getValueTypeDescriptor().getTypeClass())); return Optional.ofNullable(operatorStateStore.getBroadcastState(mapStateDescriptor)); } @@ -162,20 +168,6 @@ public Optional> getState( * key must be reset after the block is executed. */ public void executeInKeyContext(Runnable runnable, Object key) { - final Object oldKey = currentKeySupplier.get(); - setCurrentKey(key); - try { - runnable.run(); - } finally { - resetCurrentKey(oldKey); - } - } - - private void setCurrentKey(Object key) { - currentKeySetter.accept(key); - } - - private void resetCurrentKey(Object oldKey) { - currentKeySetter.accept(oldKey); + processorWithKey.accept(runnable, key); } } diff --git a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/KeyedProcessOperator.java b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/KeyedProcessOperator.java index 5c44276e09c165..0ffa1e03533503 100644 --- a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/KeyedProcessOperator.java +++ b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/KeyedProcessOperator.java @@ -40,8 +40,6 @@ import java.util.HashSet; import java.util.Set; -import static org.apache.flink.util.Preconditions.checkNotNull; - /** Operator for {@link OneInputStreamProcessFunction} in {@link KeyedPartitionStream}. */ public class KeyedProcessOperator extends ProcessOperator implements Triggerable { @@ -91,16 +89,8 @@ public void onEventTime(InternalTimer timer) throws Exceptio @Override public void onProcessingTime(InternalTimer timer) throws Exception { - // align the key context with the registered timer. - partitionedContext - .getStateManager() - .executeInKeyContext( - () -> - userFunction.onProcessingTimer( - timer.getTimestamp(), - getOutputCollector(), - partitionedContext), - timer.getKey()); + userFunction.onProcessingTimer( + timer.getTimestamp(), getOutputCollector(), partitionedContext); } @Override @@ -115,16 +105,14 @@ protected NonPartitionedContext getNonPartitionedContext() { } @Override - @SuppressWarnings({"unchecked", "rawtypes"}) + @SuppressWarnings({"rawtypes"}) public void setKeyContextElement1(StreamRecord record) throws Exception { - setKeyContextElement(record, getStateKeySelector1()); + super.setKeyContextElement1(record); + keySet.add(getCurrentKey()); } - private void setKeyContextElement(StreamRecord record, KeySelector selector) - throws Exception { - checkNotNull(selector); - Object key = selector.getKey(record.getValue()); - setCurrentKey(key); - keySet.add(key); + @Override + public boolean isAsyncStateProcessingEnabled() { + return true; } } diff --git a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/KeyedTwoInputBroadcastProcessOperator.java b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/KeyedTwoInputBroadcastProcessOperator.java index 6b0cb3cca5e63f..ad9af9cbecbf5d 100644 --- a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/KeyedTwoInputBroadcastProcessOperator.java +++ b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/KeyedTwoInputBroadcastProcessOperator.java @@ -95,16 +95,8 @@ public void onEventTime(InternalTimer timer) throws Exceptio @Override public void onProcessingTime(InternalTimer timer) throws Exception { - // align the key context with the registered timer. - partitionedContext - .getStateManager() - .executeInKeyContext( - () -> - userFunction.onProcessingTimer( - timer.getTimestamp(), - getOutputCollector(), - partitionedContext), - timer.getKey()); + userFunction.onProcessingTimer( + timer.getTimestamp(), getOutputCollector(), partitionedContext); } @Override @@ -114,19 +106,15 @@ protected NonPartitionedContext getNonPartitionedContext() { } @Override - @SuppressWarnings({"unchecked", "rawtypes"}) + @SuppressWarnings({"rawtypes"}) // Only element from input1 should be considered as the other side is broadcast input. public void setKeyContextElement1(StreamRecord record) throws Exception { - setKeyContextElement(record, getStateKeySelector1()); + super.setKeyContextElement1(record); + keySet.add(getCurrentKey()); } - private void setKeyContextElement(StreamRecord record, KeySelector selector) - throws Exception { - if (selector == null) { - return; - } - Object key = selector.getKey(record.getValue()); - setCurrentKey(key); - keySet.add(key); + @Override + public boolean isAsyncStateProcessingEnabled() { + return true; } } diff --git a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/KeyedTwoInputNonBroadcastProcessOperator.java b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/KeyedTwoInputNonBroadcastProcessOperator.java index def513e491b7cc..28770de1b21340 100644 --- a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/KeyedTwoInputNonBroadcastProcessOperator.java +++ b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/KeyedTwoInputNonBroadcastProcessOperator.java @@ -97,16 +97,8 @@ public void onEventTime(InternalTimer timer) throws Exceptio @Override public void onProcessingTime(InternalTimer timer) throws Exception { - // align the key context with the registered timer. - partitionedContext - .getStateManager() - .executeInKeyContext( - () -> - userFunction.onProcessingTimer( - timer.getTimestamp(), - getOutputCollector(), - partitionedContext), - timer.getKey()); + userFunction.onProcessingTimer( + timer.getTimestamp(), getOutputCollector(), partitionedContext); } @Override @@ -116,24 +108,21 @@ protected NonPartitionedContext getNonPartitionedContext() { } @Override - @SuppressWarnings({"unchecked", "rawtypes"}) + @SuppressWarnings({"rawtypes"}) public void setKeyContextElement1(StreamRecord record) throws Exception { - setKeyContextElement(record, getStateKeySelector1()); + super.setKeyContextElement1(record); + keySet.add(getCurrentKey()); } @Override - @SuppressWarnings({"unchecked", "rawtypes"}) + @SuppressWarnings({"rawtypes"}) public void setKeyContextElement2(StreamRecord record) throws Exception { - setKeyContextElement(record, getStateKeySelector2()); + super.setKeyContextElement2(record); + keySet.add(getCurrentKey()); } - private void setKeyContextElement(StreamRecord record, KeySelector selector) - throws Exception { - if (selector == null) { - return; - } - Object key = selector.getKey(record.getValue()); - setCurrentKey(key); - keySet.add(key); + @Override + public boolean isAsyncStateProcessingEnabled() { + return true; } } diff --git a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/KeyedTwoOutputProcessOperator.java b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/KeyedTwoOutputProcessOperator.java index cffae609986d81..36c44b9f84c66f 100644 --- a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/KeyedTwoOutputProcessOperator.java +++ b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/KeyedTwoOutputProcessOperator.java @@ -41,8 +41,6 @@ import java.util.HashSet; import java.util.Set; -import static org.apache.flink.util.Preconditions.checkNotNull; - /** */ public class KeyedTwoOutputProcessOperator extends TwoOutputProcessOperator @@ -120,17 +118,8 @@ public void onEventTime(InternalTimer timer) throws Exceptio @Override public void onProcessingTime(InternalTimer timer) throws Exception { - // align the key context with the registered timer. - partitionedContext - .getStateManager() - .executeInKeyContext( - () -> - userFunction.onProcessingTimer( - timer.getTimestamp(), - getMainCollector(), - getSideCollector(), - partitionedContext), - timer.getKey()); + userFunction.onProcessingTimer( + timer.getTimestamp(), getMainCollector(), getSideCollector(), partitionedContext); } @Override @@ -140,16 +129,14 @@ protected TwoOutputNonPartitionedContext getNonPartitionedCo } @Override - @SuppressWarnings({"unchecked", "rawtypes"}) + @SuppressWarnings({"rawtypes"}) public void setKeyContextElement1(StreamRecord record) throws Exception { - setKeyContextElement(record, getStateKeySelector1()); + super.setKeyContextElement1(record); + keySet.add(getCurrentKey()); } - private void setKeyContextElement(StreamRecord record, KeySelector selector) - throws Exception { - checkNotNull(selector); - Object key = selector.getKey(record.getValue()); - setCurrentKey(key); - keySet.add(key); + @Override + public boolean isAsyncStateProcessingEnabled() { + return true; } } diff --git a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/ProcessOperator.java b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/ProcessOperator.java index 28eeb84afc2c38..9fff9729a1b6b7 100644 --- a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/ProcessOperator.java +++ b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/ProcessOperator.java @@ -28,7 +28,7 @@ import org.apache.flink.datastream.impl.context.DefaultPartitionedContext; import org.apache.flink.datastream.impl.context.DefaultRuntimeContext; import org.apache.flink.datastream.impl.context.UnsupportedProcessingTimeManager; -import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; +import org.apache.flink.runtime.asyncprocessing.operators.AbstractAsyncStateUdfStreamOperator; import org.apache.flink.streaming.api.operators.BoundedOneInput; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; @@ -36,7 +36,7 @@ /** Operator for {@link OneInputStreamProcessFunction}. */ public class ProcessOperator - extends AbstractUdfStreamOperator> + extends AbstractAsyncStateUdfStreamOperator> implements OneInputStreamOperator, BoundedOneInput { protected transient DefaultRuntimeContext context; @@ -70,7 +70,7 @@ public void open() throws Exception { new DefaultPartitionedContext( context, this::currentKey, - this::setCurrentKey, + (r, k) -> asyncProcessWithKey(k, r::run), getProcessingTimeManager(), operatorContext, getOperatorStateBackend()); @@ -112,4 +112,10 @@ public void close() throws Exception { super.close(); userFunction.close(); } + + @Override + public boolean isAsyncStateProcessingEnabled() { + // For non-keyed operators, we disable async state processing. + return false; + } } diff --git a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoInputBroadcastProcessOperator.java b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoInputBroadcastProcessOperator.java index 8c066145cd47a6..7d2babbeb4ad8a 100644 --- a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoInputBroadcastProcessOperator.java +++ b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoInputBroadcastProcessOperator.java @@ -28,7 +28,7 @@ import org.apache.flink.datastream.impl.context.DefaultPartitionedContext; import org.apache.flink.datastream.impl.context.DefaultRuntimeContext; import org.apache.flink.datastream.impl.context.UnsupportedProcessingTimeManager; -import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; +import org.apache.flink.runtime.asyncprocessing.operators.AbstractAsyncStateUdfStreamOperator; import org.apache.flink.streaming.api.operators.BoundedMultiInput; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; @@ -38,7 +38,7 @@ /** Operator for {@link TwoInputBroadcastStreamProcessFunction}. */ public class TwoInputBroadcastProcessOperator - extends AbstractUdfStreamOperator< + extends AbstractAsyncStateUdfStreamOperator< OUT, TwoInputBroadcastStreamProcessFunction> implements TwoInputStreamOperator, BoundedMultiInput { @@ -75,7 +75,7 @@ public void open() throws Exception { new DefaultPartitionedContext( context, this::currentKey, - this::setCurrentKey, + (r, k) -> asyncProcessWithKey(k, r::run), getProcessingTimeManager(), operatorContext, getOperatorStateBackend()); @@ -129,4 +129,10 @@ public void close() throws Exception { super.close(); userFunction.close(); } + + @Override + public boolean isAsyncStateProcessingEnabled() { + // For non-keyed operators, we disable async state processing. + return false; + } } diff --git a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoInputNonBroadcastProcessOperator.java b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoInputNonBroadcastProcessOperator.java index b4ca7cac2e1018..8afee9e3a3b295 100644 --- a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoInputNonBroadcastProcessOperator.java +++ b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoInputNonBroadcastProcessOperator.java @@ -28,8 +28,8 @@ import org.apache.flink.datastream.impl.context.DefaultPartitionedContext; import org.apache.flink.datastream.impl.context.DefaultRuntimeContext; import org.apache.flink.datastream.impl.context.UnsupportedProcessingTimeManager; +import org.apache.flink.runtime.asyncprocessing.operators.AbstractAsyncStateUdfStreamOperator; import org.apache.flink.runtime.state.OperatorStateBackend; -import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; import org.apache.flink.streaming.api.operators.BoundedMultiInput; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; @@ -39,7 +39,7 @@ /** Operator for {@link TwoInputNonBroadcastStreamProcessFunction}. */ public class TwoInputNonBroadcastProcessOperator - extends AbstractUdfStreamOperator< + extends AbstractAsyncStateUdfStreamOperator< OUT, TwoInputNonBroadcastStreamProcessFunction> implements TwoInputStreamOperator, BoundedMultiInput { @@ -78,7 +78,7 @@ public void open() throws Exception { new DefaultPartitionedContext( context, this::currentKey, - this::setCurrentKey, + (r, k) -> asyncProcessWithKey(k, r::run), getProcessingTimeManager(), operatorContext, operatorStateBackend); @@ -132,4 +132,10 @@ public void close() throws Exception { super.close(); userFunction.close(); } + + @Override + public boolean isAsyncStateProcessingEnabled() { + // For non-keyed operators, we disable async state processing. + return false; + } } diff --git a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoOutputProcessOperator.java b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoOutputProcessOperator.java index a3c7d348ab0d2d..0ae208ae3a42de 100644 --- a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoOutputProcessOperator.java +++ b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoOutputProcessOperator.java @@ -19,7 +19,6 @@ package org.apache.flink.datastream.impl.operators; import org.apache.flink.api.common.TaskInfo; -import org.apache.flink.api.common.state.OperatorStateStore; import org.apache.flink.datastream.api.context.ProcessingTimeManager; import org.apache.flink.datastream.api.context.TwoOutputNonPartitionedContext; import org.apache.flink.datastream.api.function.TwoOutputStreamProcessFunction; @@ -29,7 +28,8 @@ import org.apache.flink.datastream.impl.context.DefaultRuntimeContext; import org.apache.flink.datastream.impl.context.DefaultTwoOutputNonPartitionedContext; import org.apache.flink.datastream.impl.context.UnsupportedProcessingTimeManager; -import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; +import org.apache.flink.runtime.asyncprocessing.operators.AbstractAsyncStateUdfStreamOperator; +import org.apache.flink.runtime.state.v2.OperatorStateStore; import org.apache.flink.streaming.api.operators.BoundedOneInput; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.Output; @@ -43,7 +43,7 @@ *

We support the second output via flink side-output mechanism. */ public class TwoOutputProcessOperator - extends AbstractUdfStreamOperator< + extends AbstractAsyncStateUdfStreamOperator< OUT_MAIN, TwoOutputStreamProcessFunction> implements OneInputStreamOperator, BoundedOneInput { protected transient TimestampCollector mainCollector; @@ -87,7 +87,7 @@ public void open() throws Exception { new DefaultPartitionedContext( context, this::currentKey, - this::setCurrentKey, + (r, k) -> asyncProcessWithKey(k, r::run), getProcessingTimeManager(), operatorContext, operatorStateStore); @@ -157,4 +157,10 @@ public void close() throws Exception { super.close(); userFunction.close(); } + + @Override + public boolean isAsyncStateProcessingEnabled() { + // For non-keyed operators, we disable async state processing. + return false; + } } diff --git a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/context/DefaultNonPartitionedContextTest.java b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/context/DefaultNonPartitionedContextTest.java index e5e36937a4db1e..e42fa5ad72c820 100644 --- a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/context/DefaultNonPartitionedContextTest.java +++ b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/context/DefaultNonPartitionedContextTest.java @@ -65,7 +65,11 @@ void testApplyToAllPartitions() throws Exception { new DefaultPartitionedContext( runtimeContext, Optional::empty, - (key) -> cf.complete(null), + (r, k) -> { + cf.complete(null); + r.run(); + cf.complete(null); + }, UnsupportedProcessingTimeManager.INSTANCE, ContextTestUtils.createStreamingRuntimeContext(), new MockOperatorStateStore()), @@ -116,7 +120,12 @@ void testKeyedApplyToAllPartitions() throws Exception { new DefaultPartitionedContext( runtimeContext, currentKey::get, - (key) -> currentKey.set((Integer) key), + (r, k) -> { + Integer oldKey = currentKey.get(); + currentKey.set((Integer) k); + r.run(); + currentKey.set(oldKey); + }, UnsupportedProcessingTimeManager.INSTANCE, ContextTestUtils.createStreamingRuntimeContext(), new MockOperatorStateStore()), diff --git a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/context/DefaultStateManagerTest.java b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/context/DefaultStateManagerTest.java index 9be18a26768573..5366fca7962813 100644 --- a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/context/DefaultStateManagerTest.java +++ b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/context/DefaultStateManagerTest.java @@ -50,7 +50,7 @@ void testGetCurrentKey() { DefaultStateManager stateManager = new DefaultStateManager( () -> key, - ignore -> {}, + (r, k) -> r.run(), new MockStreamingRuntimeContext(false, 1, 0), new MockOperatorStateStore()); assertThat((String) stateManager.getCurrentKey()).isEqualTo(key); @@ -63,7 +63,7 @@ void testErrorInGetCurrentKey() { () -> { throw new RuntimeException("Expected Error"); }, - ignore -> {}, + (r, k) -> r.run(), new MockStreamingRuntimeContext(false, 1, 0), new MockOperatorStateStore()); assertThatThrownBy(stateManager::getCurrentKey) @@ -80,7 +80,11 @@ void testExecuteInKeyContext() { DefaultStateManager stateManager = new DefaultStateManager( () -> oldKey, - k -> setKey.set((Integer) k), + (r, k) -> { + setKey.set((Integer) k); + r.run(); + setKey.set(oldKey); + }, new MockStreamingRuntimeContext(false, 1, 0), new MockOperatorStateStore()); stateManager.executeInKeyContext(() -> assertThat(setKey).hasValue(newKey), newKey); diff --git a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/context/DefaultTwoOutputNonPartitionedContextTest.java b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/context/DefaultTwoOutputNonPartitionedContextTest.java index cd101bfbb10d75..d23a211c474170 100644 --- a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/context/DefaultTwoOutputNonPartitionedContextTest.java +++ b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/context/DefaultTwoOutputNonPartitionedContextTest.java @@ -69,7 +69,11 @@ void testApplyToAllPartitions() throws Exception { new DefaultPartitionedContext( runtimeContext, Optional::empty, - (key) -> cf.complete(null), + (r, k) -> { + cf.complete(null); + r.run(); + cf.complete(null); + }, UnsupportedProcessingTimeManager.INSTANCE, ContextTestUtils.createStreamingRuntimeContext(), new MockOperatorStateStore()), @@ -129,7 +133,12 @@ void testKeyedApplyToAllPartitions() throws Exception { new DefaultPartitionedContext( runtimeContext, currentKey::get, - (key) -> currentKey.set((Integer) key), + (r, k) -> { + Integer oldKey = currentKey.get(); + currentKey.set((Integer) k); + r.run(); + currentKey.set(oldKey); + }, UnsupportedProcessingTimeManager.INSTANCE, ContextTestUtils.createStreamingRuntimeContext(), new MockOperatorStateStore()), diff --git a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/MockFreqCountProcessFunction.java b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/MockFreqCountProcessFunction.java index 8bbd1e6d9b6524..10ec3576c9d4a9 100644 --- a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/MockFreqCountProcessFunction.java +++ b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/MockFreqCountProcessFunction.java @@ -18,10 +18,10 @@ package org.apache.flink.datastream.impl.operators; -import org.apache.flink.api.common.state.MapState; import org.apache.flink.api.common.state.MapStateDeclaration; import org.apache.flink.api.common.state.StateDeclaration; import org.apache.flink.api.common.state.StateDeclarations; +import org.apache.flink.api.common.state.v2.MapState; import org.apache.flink.api.common.typeinfo.TypeDescriptors; import org.apache.flink.datastream.api.common.Collector; import org.apache.flink.datastream.api.context.PartitionedContext; diff --git a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/MockGlobalListAppenderProcessFunction.java b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/MockGlobalListAppenderProcessFunction.java index 413af0827ebaca..53e214a9dcc3c2 100644 --- a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/MockGlobalListAppenderProcessFunction.java +++ b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/MockGlobalListAppenderProcessFunction.java @@ -18,10 +18,10 @@ package org.apache.flink.datastream.impl.operators; -import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDeclaration; import org.apache.flink.api.common.state.StateDeclaration; import org.apache.flink.api.common.state.StateDeclarations; +import org.apache.flink.api.common.state.v2.ListState; import org.apache.flink.api.common.typeinfo.TypeDescriptors; import org.apache.flink.datastream.api.common.Collector; import org.apache.flink.datastream.api.context.PartitionedContext; diff --git a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/MockListAppenderProcessFunction.java b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/MockListAppenderProcessFunction.java index eca3b295598b00..35c775768757eb 100644 --- a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/MockListAppenderProcessFunction.java +++ b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/MockListAppenderProcessFunction.java @@ -18,10 +18,10 @@ package org.apache.flink.datastream.impl.operators; -import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDeclaration; import org.apache.flink.api.common.state.StateDeclaration; import org.apache.flink.api.common.state.StateDeclarations; +import org.apache.flink.api.common.state.v2.ListState; import org.apache.flink.api.common.typeinfo.TypeDescriptors; import org.apache.flink.datastream.api.common.Collector; import org.apache.flink.datastream.api.context.PartitionedContext; diff --git a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/MockMultiplierProcessFunction.java b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/MockMultiplierProcessFunction.java index 050844ba2f9632..20f8aadb7a4ee4 100644 --- a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/MockMultiplierProcessFunction.java +++ b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/MockMultiplierProcessFunction.java @@ -20,8 +20,8 @@ import org.apache.flink.api.common.state.StateDeclaration; import org.apache.flink.api.common.state.StateDeclarations; -import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDeclaration; +import org.apache.flink.api.common.state.v2.ValueState; import org.apache.flink.api.common.typeinfo.TypeDescriptors; import org.apache.flink.datastream.api.common.Collector; import org.apache.flink.datastream.api.context.PartitionedContext; diff --git a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/MockRecudingMultiplierProcessFunction.java b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/MockRecudingMultiplierProcessFunction.java index f777c8a09e0c2f..ca9d312b056379 100644 --- a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/MockRecudingMultiplierProcessFunction.java +++ b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/MockRecudingMultiplierProcessFunction.java @@ -19,10 +19,10 @@ package org.apache.flink.datastream.impl.operators; import org.apache.flink.api.common.functions.ReduceFunction; -import org.apache.flink.api.common.state.ReducingState; import org.apache.flink.api.common.state.ReducingStateDeclaration; import org.apache.flink.api.common.state.StateDeclaration; import org.apache.flink.api.common.state.StateDeclarations; +import org.apache.flink.api.common.state.v2.ReducingState; import org.apache.flink.api.common.typeinfo.TypeDescriptors; import org.apache.flink.datastream.api.common.Collector; import org.apache.flink.datastream.api.context.PartitionedContext; diff --git a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/MockSumAggregateProcessFunction.java b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/MockSumAggregateProcessFunction.java index 7447a000809ea9..8a995dc010a956 100644 --- a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/MockSumAggregateProcessFunction.java +++ b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/operators/MockSumAggregateProcessFunction.java @@ -19,10 +19,10 @@ package org.apache.flink.datastream.impl.operators; import org.apache.flink.api.common.functions.AggregateFunction; -import org.apache.flink.api.common.state.AggregatingState; import org.apache.flink.api.common.state.AggregatingStateDeclaration; import org.apache.flink.api.common.state.StateDeclaration; import org.apache.flink.api.common.state.StateDeclarations; +import org.apache.flink.api.common.state.v2.AggregatingState; import org.apache.flink.api.common.typeinfo.TypeDescriptors; import org.apache.flink.datastream.api.common.Collector; import org.apache.flink.datastream.api.context.PartitionedContext; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java index 8680d9c620ea91..d9c96cfcfd1a18 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java @@ -411,12 +411,16 @@ RecordContext getCurrentProcessingContext() { @Override public void finish() throws Exception { super.finish(); - asyncExecutionController.drainInflightRecords(0); + if (isAsyncStateProcessingEnabled()) { + asyncExecutionController.drainInflightRecords(0); + } } @Override public void close() throws Exception { super.close(); - asyncExecutionController.close(); + if (isAsyncStateProcessingEnabled()) { + asyncExecutionController.close(); + } } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/utils/MockOperatorStateStore.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/utils/MockOperatorStateStore.java index 5ed2001dc300f9..14752d53a437fa 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/utils/MockOperatorStateStore.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/utils/MockOperatorStateStore.java @@ -22,6 +22,8 @@ import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.common.state.OperatorStateStore; +import org.apache.flink.runtime.state.v2.StateDescriptorUtils; +import org.apache.flink.runtime.state.v2.adaptor.OperatorListStateAdaptor; import org.apache.flink.streaming.api.functions.sink.filesystem.TestUtils; import java.util.Collections; @@ -31,7 +33,8 @@ /** An {@link OperatorStateStore} for testing purpose. */ @SuppressWarnings("rawtypes") -public class MockOperatorStateStore implements OperatorStateStore { +public class MockOperatorStateStore + implements OperatorStateStore, org.apache.flink.runtime.state.v2.OperatorStateStore { private final Map> historyStateMap; @@ -65,6 +68,29 @@ public ListState getUnionListState(ListStateDescriptor stateDescriptor throw new UnsupportedOperationException(); } + @Override + public BroadcastState getBroadcastState( + org.apache.flink.runtime.state.v2.MapStateDescriptor stateDescriptor) + throws Exception { + return getBroadcastState(StateDescriptorUtils.transformFromV2ToV1(stateDescriptor)); + } + + @Override + public org.apache.flink.api.common.state.v2.ListState getListState( + org.apache.flink.runtime.state.v2.ListStateDescriptor stateDescriptor) + throws Exception { + return new OperatorListStateAdaptor<>( + getListState(StateDescriptorUtils.transformFromV2ToV1(stateDescriptor))); + } + + @Override + public org.apache.flink.api.common.state.v2.ListState getUnionListState( + org.apache.flink.runtime.state.v2.ListStateDescriptor stateDescriptor) + throws Exception { + return new OperatorListStateAdaptor<>( + getListState(StateDescriptorUtils.transformFromV2ToV1(stateDescriptor))); + } + @Override public Set getRegisteredStateNames() { throw new UnsupportedOperationException();