Skip to content

Commit

Permalink
[FLINK-35156][Runtime] Make operators of DataStream V2 integrate with…
Browse files Browse the repository at this point in the history
… async state processing
  • Loading branch information
Zakelly committed Apr 18, 2024
1 parent 43a3d50 commit 503f010
Show file tree
Hide file tree
Showing 3 changed files with 182 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,12 @@ public KeyedProcessOperator(
this.outKeySelector = outKeySelector;
}

@Override
public boolean isAsyncStateProcessingEnabled() {
// For keyed operator, the async state processing should be enabled.
return true;
}

@Override
protected TimestampCollector<OUT> getOutputCollector() {
return outKeySelector != null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,15 @@
import org.apache.flink.datastream.impl.common.TimestampCollector;
import org.apache.flink.datastream.impl.context.DefaultNonPartitionedContext;
import org.apache.flink.datastream.impl.context.DefaultRuntimeContext;
import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.operators.asyncprocessing.AbstractAsyncStateUdfStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;

/** Operator for {@link OneInputStreamProcessFunction}. */
public class ProcessOperator<IN, OUT>
extends AbstractUdfStreamOperator<OUT, OneInputStreamProcessFunction<IN, OUT>>
extends AbstractAsyncStateUdfStreamOperator<OUT, OneInputStreamProcessFunction<IN, OUT>>
implements OneInputStreamOperator<IN, OUT>, BoundedOneInput {

protected transient DefaultRuntimeContext context;
Expand All @@ -46,6 +46,12 @@ public ProcessOperator(OneInputStreamProcessFunction<IN, OUT> userFunction) {
chainingStrategy = ChainingStrategy.ALWAYS;
}

@Override
public boolean isAsyncStateProcessingEnabled() {
// For normal operator (without keyed context) the async state processing is unused.
return false;
}

@Override
public void open() throws Exception {
super.open();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.runtime.operators.asyncprocessing;

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.DefaultOpenContext;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.functions.util.FunctionUtils;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
import org.apache.flink.streaming.api.operators.UserFunctionProvider;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.util.functions.StreamingFunctionUtils;

import static java.util.Objects.requireNonNull;

/**
* This is used as the base class for operators that have a user-defined function. This class
* handles the opening and closing of the user-defined functions, as part of the operator life
* cycle. This class is nearly identical with {@link AbstractUdfStreamOperator}, but extending from
* {@link AbstractAsyncStateStreamOperator} to integrate with asynchronous state access. Another
* difference is this class is internal.
*
* @param <OUT> The output type of the operator
* @param <F> The type of the user function
*/
@Internal
public abstract class AbstractAsyncStateUdfStreamOperator<OUT, F extends Function>
extends AbstractAsyncStateStreamOperator<OUT>
implements OutputTypeConfigurable<OUT>, UserFunctionProvider<F> {

private static final long serialVersionUID = 1L;

/** The user function. */
protected final F userFunction;

public AbstractAsyncStateUdfStreamOperator(F userFunction) {
this.userFunction = requireNonNull(userFunction);
checkUdfCheckpointingPreconditions();
}

/**
* Gets the user function executed in this operator.
*
* @return The user function of this operator.
*/
public F getUserFunction() {
return userFunction;
}

// ------------------------------------------------------------------------
// operator life cycle
// ------------------------------------------------------------------------

@Override
public void setup(
StreamTask<?, ?> containingTask,
StreamConfig config,
Output<StreamRecord<OUT>> output) {
super.setup(containingTask, config, output);
FunctionUtils.setFunctionRuntimeContext(userFunction, getRuntimeContext());
}

@Override
public void snapshotState(StateSnapshotContext context) throws Exception {
super.snapshotState(context);
StreamingFunctionUtils.snapshotFunctionState(
context, getOperatorStateBackend(), userFunction);
}

@Override
public void initializeState(StateInitializationContext context) throws Exception {
super.initializeState(context);
StreamingFunctionUtils.restoreFunctionState(context, userFunction);
}

@Override
public void open() throws Exception {
super.open();
FunctionUtils.openFunction(userFunction, DefaultOpenContext.INSTANCE);
}

@Override
public void finish() throws Exception {
super.finish();
if (userFunction instanceof SinkFunction) {
((SinkFunction<?>) userFunction).finish();
}
}

@Override
public void close() throws Exception {
super.close();
FunctionUtils.closeFunction(userFunction);
}

// ------------------------------------------------------------------------
// checkpointing and recovery
// ------------------------------------------------------------------------

@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
super.notifyCheckpointComplete(checkpointId);

if (userFunction instanceof CheckpointListener) {
((CheckpointListener) userFunction).notifyCheckpointComplete(checkpointId);
}
}

@Override
public void notifyCheckpointAborted(long checkpointId) throws Exception {
super.notifyCheckpointAborted(checkpointId);

if (userFunction instanceof CheckpointListener) {
((CheckpointListener) userFunction).notifyCheckpointAborted(checkpointId);
}
}

// ------------------------------------------------------------------------
// Output type configuration
// ------------------------------------------------------------------------

@Override
public void setOutputType(TypeInformation<OUT> outTypeInfo, ExecutionConfig executionConfig) {
StreamingFunctionUtils.setOutputType(userFunction, outTypeInfo, executionConfig);
}

// ------------------------------------------------------------------------
// Utilities
// ------------------------------------------------------------------------

private void checkUdfCheckpointingPreconditions() {
if (userFunction instanceof CheckpointedFunction
&& userFunction instanceof ListCheckpointed) {

throw new IllegalStateException(
"User functions are not allowed to implement "
+ "CheckpointedFunction AND ListCheckpointed.");
}
}
}

0 comments on commit 503f010

Please sign in to comment.