diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractInput.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractInput.java index 20c3006d83827e..40eda40b1b8a06 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractInput.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractInput.java @@ -24,12 +24,16 @@ import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.operators.asyncprocessing.AsyncStateProcessing; import org.apache.flink.streaming.runtime.operators.asyncprocessing.AsyncStateProcessingOperator; +import org.apache.flink.streaming.runtime.operators.asyncprocessing.declare.DeclarativeProcessingInput; import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus; import org.apache.flink.util.function.ThrowingConsumer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import javax.annotation.Nullable; import static org.apache.flink.util.Preconditions.checkArgument; @@ -41,6 +45,9 @@ @Experimental public abstract class AbstractInput implements Input, KeyContextHandler, AsyncStateProcessing { + + private static final Logger LOG = LoggerFactory.getLogger(AbstractInput.class); + /** * {@code KeySelector} for extracting a key from an element being processed. This is used to * scope keyed state to a key. This is null if the operator is not a keyed operator. @@ -101,10 +108,22 @@ public final boolean isAsyncStateProcessingEnabled() { @Internal @Override + @SuppressWarnings({"unchecked", "rawtypes"}) public final ThrowingConsumer, Exception> getRecordProcessor(int inputId) { - return AsyncStateProcessing.makeRecordProcessor( - (AsyncStateProcessingOperator) owner, - (KeySelector) stateKeySelector, - this::processElement); + if (this instanceof DeclarativeProcessingInput) { + LOG.info("declareProcess is overridden, build process is invoked."); + return AsyncStateProcessing.makeRecordProcessor( + (AsyncStateProcessingOperator) owner, + (KeySelector) stateKeySelector, + ((AsyncStateProcessingOperator) owner) + .getDeclarationManager() + .buildProcess( + ((DeclarativeProcessingInput) this)::declareProcess)); + } else { + return AsyncStateProcessing.makeRecordProcessor( + (AsyncStateProcessingOperator) owner, + (KeySelector) stateKeySelector, + this::processElement); + } } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperator.java index 6c1598c96c8317..901d5dc9eac964 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperator.java @@ -39,11 +39,17 @@ import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer; import org.apache.flink.streaming.api.operators.Triggerable; import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; +import org.apache.flink.streaming.runtime.operators.asyncprocessing.declare.DeclarationManager; +import org.apache.flink.streaming.runtime.operators.asyncprocessing.declare.DeclarativeProcessingInput; +import org.apache.flink.streaming.runtime.operators.asyncprocessing.declare.DeclarativeProcessingTwoInputOperator; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.StreamTask; import org.apache.flink.util.function.ThrowingConsumer; import org.apache.flink.util.function.ThrowingRunnable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import static org.apache.flink.util.Preconditions.checkNotNull; import static org.apache.flink.util.Preconditions.checkState; @@ -56,6 +62,8 @@ @SuppressWarnings("rawtypes") public abstract class AbstractAsyncStateStreamOperator extends AbstractStreamOperator implements AsyncStateProcessingOperator { + private static final Logger LOG = + LoggerFactory.getLogger(AbstractAsyncStateStreamOperator.class); private AsyncExecutionController asyncExecutionController; @@ -63,6 +71,8 @@ public abstract class AbstractAsyncStateStreamOperator extends AbstractStre private Environment environment; + private DeclarationManager declarationManager; + /** Initialize necessary state components for {@link AbstractStreamOperator}. */ @Override public void initializeState(StreamTaskStateInitializer streamTaskStateManager) @@ -95,6 +105,7 @@ public void initializeState(StreamTaskStateInitializer streamTaskStateManager) throw new UnsupportedOperationException( "Current State Backend doesn't support async access, AsyncExecutionController could not work"); } + this.declarationManager = new DeclarationManager(); } private void handleAsyncStateException(String message, Throwable exception) { @@ -143,6 +154,53 @@ public final void preserveRecordOrderAndProcess(ThrowingRunnable proc asyncExecutionController.syncPointRequestWithCallback(processing); } + @Override + public final DeclarationManager getDeclarationManager() { + return declarationManager; + } + + // --------------------------- Declaration part ---------------------------------------- + /** The ordinal identifying which process is declaring. */ + enum DeclareProcessOrdinal { + INPUT1, + INPUT2, + ONLY_ONE + } + + @SuppressWarnings("unchecked") + private ThrowingConsumer, Exception> declareOverrideOrDefault( + DeclareProcessOrdinal processOrdinal, + ThrowingConsumer, Exception> defaultProcess) { + switch (processOrdinal) { + case INPUT1: + if (this instanceof DeclarativeProcessingTwoInputOperator) { + LOG.info("declareProcess1 is provided, build process is invoked."); + return declarationManager.buildProcess( + ((DeclarativeProcessingTwoInputOperator) this) + ::declareProcess1); + } + break; + case INPUT2: + if (this instanceof DeclarativeProcessingTwoInputOperator) { + LOG.info("declareProcess2 is provided, build process is invoked."); + return declarationManager.buildProcess( + ((DeclarativeProcessingTwoInputOperator) this) + ::declareProcess2); + } + break; + case ONLY_ONE: + if (this instanceof DeclarativeProcessingInput) { + LOG.info("declareProcess is provided, build process is invoked."); + return declarationManager.buildProcess( + ((DeclarativeProcessingInput) this)::declareProcess); + } + break; + default: + break; + } + return defaultProcess; + } + @Override @SuppressWarnings("unchecked") public final ThrowingConsumer, Exception> getRecordProcessor(int inputId) { @@ -151,21 +209,28 @@ public final ThrowingConsumer, Exception> getRecordProcessor if (this instanceof TwoInputStreamOperator) { switch (inputId) { case 1: - return AsyncStateProcessing.makeRecordProcessor( + return AsyncStateProcessing.makeRecordProcessor( this, (KeySelector) stateKeySelector1, - ((TwoInputStreamOperator) this)::processElement1); + declareOverrideOrDefault( + DeclareProcessOrdinal.INPUT1, + ((TwoInputStreamOperator) this)::processElement1)); case 2: - return AsyncStateProcessing.makeRecordProcessor( + return AsyncStateProcessing.makeRecordProcessor( this, (KeySelector) stateKeySelector2, - ((TwoInputStreamOperator) this)::processElement2); + declareOverrideOrDefault( + DeclareProcessOrdinal.INPUT2, + ((TwoInputStreamOperator) this)::processElement2)); default: break; } } else if (this instanceof Input && inputId == 1) { - return AsyncStateProcessing.makeRecordProcessor( - this, (KeySelector) stateKeySelector1, ((Input) this)::processElement); + return AsyncStateProcessing.makeRecordProcessor( + this, + (KeySelector) stateKeySelector1, + declareOverrideOrDefault( + DeclareProcessOrdinal.ONLY_ONE, ((Input) this)::processElement)); } throw new IllegalArgumentException( String.format( diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperatorV2.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperatorV2.java index 92a41eaeb06c74..20ec746a353f3d 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperatorV2.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperatorV2.java @@ -37,6 +37,7 @@ import org.apache.flink.streaming.api.operators.StreamOperatorParameters; import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer; import org.apache.flink.streaming.api.operators.Triggerable; +import org.apache.flink.streaming.runtime.operators.asyncprocessing.declare.DeclarationManager; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.util.function.ThrowingConsumer; import org.apache.flink.util.function.ThrowingRunnable; @@ -58,10 +59,13 @@ public abstract class AbstractAsyncStateStreamOperatorV2 extends AbstractSt private RecordContext currentProcessingContext; + private final DeclarationManager declarationManager; + public AbstractAsyncStateStreamOperatorV2( StreamOperatorParameters parameters, int numberOfInputs) { super(parameters, numberOfInputs); this.environment = parameters.getContainingTask().getEnvironment(); + this.declarationManager = new DeclarationManager(); } /** Initialize necessary state components for {@link AbstractStreamOperatorV2}. */ @@ -140,6 +144,11 @@ public final void preserveRecordOrderAndProcess(ThrowingRunnable proc asyncExecutionController.syncPointRequestWithCallback(processing); } + @Override + public final DeclarationManager getDeclarationManager() { + return declarationManager; + } + @Override public final ThrowingConsumer, Exception> getRecordProcessor(int inputId) { // The real logic should be in First/SecondInputOfTwoInput#getRecordProcessor. diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AsyncStateProcessingOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AsyncStateProcessingOperator.java index e24344fece5f1f..14c2650c873370 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AsyncStateProcessingOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AsyncStateProcessingOperator.java @@ -20,6 +20,7 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.streaming.runtime.operators.asyncprocessing.declare.DeclarationManager; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.util.function.ThrowingRunnable; @@ -53,4 +54,7 @@ void setAsyncKeyedContextElement(StreamRecord record, KeySelector k * @param processing the record processing logic. */ void preserveRecordOrderAndProcess(ThrowingRunnable processing); + + /** Get the declaration manager for user-logic declaring. */ + DeclarationManager getDeclarationManager(); } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/declare/DeclarationContext.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/declare/DeclarationContext.java new file mode 100644 index 00000000000000..ba9ddf3256259f --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/declare/DeclarationContext.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.operators.asyncprocessing.declare; + +import java.util.function.BiFunction; +import java.util.function.Consumer; +import java.util.function.Function; + +/** A context to declare parts of process in user-defined function/operator. */ +public class DeclarationContext { + + private final DeclarationManager manager; + + DeclarationContext(DeclarationManager manager) { + this.manager = manager; + } + + // ------------- Declaring Callback part ---------------- + + /** Declare a callback with a name. */ + public NamedConsumer declare(String name, Consumer callback) + throws DeclarationException { + return manager.register(new NamedConsumer<>(name, callback)); + } + + /** Declare a callback with a name. */ + public NamedFunction declare(String name, Function callback) + throws DeclarationException { + return manager.register(new NamedFunction<>(name, callback)); + } + + /** Declare a callback with a name. */ + public NamedBiFunction declare(String name, BiFunction callback) + throws DeclarationException { + return manager.register(new NamedBiFunction<>(name, callback)); + } + + /** Declare a callback with an automatically assigned name. */ + public NamedConsumer declare(Consumer callback) throws DeclarationException { + return declare(manager.nextAssignedName(), callback); + } + + /** Declare a callback with an automatically assigned name. */ + public NamedFunction declare(Function callback) throws DeclarationException { + return declare(manager.nextAssignedName(), callback); + } + + /** Declare a callback with an automatically assigned name. */ + public NamedBiFunction declare(BiFunction callback) + throws DeclarationException { + return declare(manager.nextAssignedName(), callback); + } + + DeclarationManager getManager() { + return manager; + } +} diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/declare/DeclarationException.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/declare/DeclarationException.java new file mode 100644 index 00000000000000..1a8e2542fd8f50 --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/declare/DeclarationException.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.operators.asyncprocessing.declare; + +/** Exception thrown when something wrong with declaration happens. */ +public class DeclarationException extends Exception { + + DeclarationException(String message) { + super(message); + } +} diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/declare/DeclarationManager.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/declare/DeclarationManager.java new file mode 100644 index 00000000000000..8231b53a00f79d --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/declare/DeclarationManager.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.operators.asyncprocessing.declare; + +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.function.FunctionWithException; +import org.apache.flink.util.function.ThrowingConsumer; + +import java.util.HashMap; +import java.util.Map; + +/** The manager holds all the declaration information and manage the building procedure. */ +public class DeclarationManager { + + private final Map knownCallbacks; + + private int nextValidNameSequence = 0; + + public DeclarationManager() { + this.knownCallbacks = new HashMap<>(); + } + + T register(T knownCallback) throws DeclarationException { + if (knownCallbacks.put(knownCallback.getName(), knownCallback) != null) { + throw new DeclarationException("Duplicated key " + knownCallback.getName()); + } + return knownCallback; + } + + String nextAssignedName() { + String name; + do { + name = String.format("___%d___", nextValidNameSequence++); + } while (knownCallbacks.containsKey(name)); + return name; + } + + public ThrowingConsumer, Exception> buildProcess( + FunctionWithException< + DeclarationContext, + ThrowingConsumer, Exception>, + DeclarationException> + declaringMethod) { + final DeclarationContext context = new DeclarationContext(this); + try { + return declaringMethod.apply(context); + } catch (DeclarationException e) { + throw new FlinkRuntimeException(e); + } + } +} diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/declare/DeclarativeProcessingInput.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/declare/DeclarativeProcessingInput.java new file mode 100644 index 00000000000000..a2478c969eaa3a --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/declare/DeclarativeProcessingInput.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.operators.asyncprocessing.declare; + +import org.apache.flink.streaming.api.operators.Input; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.function.ThrowingConsumer; + +/** Input that can declare the processing by a predefined function. */ +public interface DeclarativeProcessingInput extends Input { + + /** + * A hook for declaring the process in {@code processElement}. If subclass wants to define its + * {@code processElement} in declarative way, it should implement this class. If so, the {@code + * processElement} will not take effect and instead, the return value of this method will become + * the processing logic. This method will be called after {@code open()} of operator. + * + * @param context the context that provides useful methods to define named callbacks. + * @return the whole processing logic just like {@code processElement}. + */ + ThrowingConsumer, Exception> declareProcess(DeclarationContext context) + throws DeclarationException; +} diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/declare/DeclarativeProcessingTwoInputOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/declare/DeclarativeProcessingTwoInputOperator.java new file mode 100644 index 00000000000000..f124ebcae212bb --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/declare/DeclarativeProcessingTwoInputOperator.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.operators.asyncprocessing.declare; + +import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.function.ThrowingConsumer; + +/** + * Operator with two inputs that can declare the processing of each input by a predefined function. + */ +public interface DeclarativeProcessingTwoInputOperator + extends TwoInputStreamOperator { + + /** + * A hook for declaring the process in {@code processElement1}. If subclass wants to define its + * {@code processElement1} in declarative way, it should implement this class. If so, the {@code + * processElement1} will not take effect and instead, the return value of this method will + * become the processing logic. This method will be called after {@code open()} of operator. + * + * @param context the context that provides useful methods to define named callbacks. + * @return the whole processing logic just like {@code processElement}. + */ + ThrowingConsumer, Exception> declareProcess1(DeclarationContext context) + throws DeclarationException; + + /** + * A hook for declaring the process in {@code processElement2}. If subclass wants to define its + * {@code processElement2} in declarative way, it should implement this class. If so, the {@code + * processElement2} will not take effect and instead, the return value of this method will + * become the processing logic. This method will be called after {@code open()} of operator. + * + * @param context the context that provides useful methods to define named callbacks. + * @return the whole processing logic just like {@code processElement}. + */ + ThrowingConsumer, Exception> declareProcess2(DeclarationContext context) + throws DeclarationException; +} diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/declare/NamedBiFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/declare/NamedBiFunction.java new file mode 100644 index 00000000000000..a9d9400da5c56c --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/declare/NamedBiFunction.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.operators.asyncprocessing.declare; + +import java.util.function.BiFunction; + +/** A named version of {@link BiFunction}. */ +public class NamedBiFunction extends NamedCallback implements BiFunction { + + BiFunction function; + + NamedBiFunction(String name, BiFunction function) { + super(name); + this.function = function; + } + + @Override + public V apply(T t, U u) { + return function.apply(t, u); + } +} diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/declare/NamedCallback.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/declare/NamedCallback.java new file mode 100644 index 00000000000000..97810454973f02 --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/declare/NamedCallback.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.operators.asyncprocessing.declare; + +/** A named callback that can be identified and checkpoint. */ +public abstract class NamedCallback { + + private String name; + + NamedCallback(String name) { + this.name = name; + } + + /** Get the name of this callback. */ + public String getName() { + return name; + } +} diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/declare/NamedConsumer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/declare/NamedConsumer.java new file mode 100644 index 00000000000000..999d4dd2f5d4ad --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/declare/NamedConsumer.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.operators.asyncprocessing.declare; + +import java.util.function.Consumer; + +/** A named version of {@link Consumer}. */ +public class NamedConsumer extends NamedCallback implements Consumer { + + Consumer consumer; + + NamedConsumer(String name, Consumer consumer) { + super(name); + this.consumer = consumer; + } + + public void accept(T t) { + consumer.accept(t); + } +} diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/declare/NamedFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/declare/NamedFunction.java new file mode 100644 index 00000000000000..42313dc007dabf --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/declare/NamedFunction.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.operators.asyncprocessing.declare; + +import java.util.function.Function; + +/** A named version of {@link Function}. */ +public class NamedFunction extends NamedCallback implements Function { + + Function function; + + NamedFunction(String name, Function function) { + super(name); + this.function = function; + } + + @Override + public R apply(T t) { + return function.apply(t); + } +} diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/declare/AbstractAsyncStateStreamOperatorDeclarationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/declare/AbstractAsyncStateStreamOperatorDeclarationTest.java new file mode 100644 index 00000000000000..99de4ebfeb9fa3 --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/declare/AbstractAsyncStateStreamOperatorDeclarationTest.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.operators.asyncprocessing.declare; + +import org.apache.flink.api.common.state.v2.StateFuture; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.state.StateFutureUtils; +import org.apache.flink.runtime.state.VoidNamespace; +import org.apache.flink.streaming.api.operators.InternalTimer; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.Triggerable; +import org.apache.flink.streaming.runtime.io.RecordProcessorUtils; +import org.apache.flink.streaming.runtime.operators.asyncprocessing.AbstractAsyncStateStreamOperator; +import org.apache.flink.streaming.runtime.operators.asyncprocessing.ElementOrder; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; +import org.apache.flink.util.function.ThrowingConsumer; + +import org.junit.jupiter.api.Test; + +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; +import java.util.function.Function; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Basic tests for {@link AbstractAsyncStateStreamOperator}. */ +public class AbstractAsyncStateStreamOperatorDeclarationTest { + + protected KeyedOneInputStreamOperatorTestHarness, String> + createTestHarness( + int maxParalelism, int numSubtasks, int subtaskIndex, ElementOrder elementOrder) + throws Exception { + TestDeclarationOperator testOperator = new TestDeclarationOperator(elementOrder); + return new KeyedOneInputStreamOperatorTestHarness<>( + testOperator, + new TestKeySelector(), + BasicTypeInfo.INT_TYPE_INFO, + maxParalelism, + numSubtasks, + subtaskIndex); + } + + @Test + public void testRecordProcessor() throws Exception { + try (KeyedOneInputStreamOperatorTestHarness, String> + testHarness = createTestHarness(128, 1, 0, ElementOrder.RECORD_ORDER)) { + testHarness.open(); + TestDeclarationOperator testOperator = + (TestDeclarationOperator) testHarness.getOperator(); + ThrowingConsumer>, Exception> processor = + RecordProcessorUtils.getRecordProcessor(testOperator); + processor.accept(new StreamRecord<>(Tuple2.of(5, "5"))); + assertThat(testOperator.getValue()).isEqualTo(12); + } + } + + /** A simple testing operator. */ + private static class TestDeclarationOperator extends AbstractAsyncStateStreamOperator + implements OneInputStreamOperator, String>, + Triggerable, + DeclarativeProcessingInput> { + + private static final long serialVersionUID = 1L; + + private final ElementOrder elementOrder; + + final AtomicInteger value = new AtomicInteger(0); + + TestDeclarationOperator(ElementOrder elementOrder) { + this.elementOrder = elementOrder; + } + + @Override + public void open() throws Exception { + super.open(); + } + + @Override + public ThrowingConsumer>, Exception> declareProcess( + DeclarationContext context) throws DeclarationException { + + Function> adder = + context.declare( + "adder", + (i) -> { + return StateFutureUtils.completedFuture(value.incrementAndGet()); + }); + Consumer doubler = + context.declare( + "doubler", + (v) -> { + value.addAndGet(v); + }); + assertThat(adder).isInstanceOf(NamedCallback.class); + assertThat(doubler).isInstanceOf(NamedCallback.class); + return (e) -> { + value.addAndGet(e.getValue().f0); + StateFutureUtils.completedVoidFuture().thenCompose(adder).thenAccept(doubler); + }; + } + + @Override + public ElementOrder getElementOrder() { + return elementOrder; + } + + @Override + public void processElement(StreamRecord> element) throws Exception { + value.incrementAndGet(); + } + + @Override + public void onEventTime(InternalTimer timer) throws Exception {} + + @Override + public void onProcessingTime(InternalTimer timer) + throws Exception {} + + public int getValue() { + return value.get(); + } + } + + /** {@link KeySelector} for tests. */ + static class TestKeySelector implements KeySelector, Integer> { + private static final long serialVersionUID = 1L; + + @Override + public Integer getKey(Tuple2 value) { + return value.f0; + } + } +}