Skip to content

Commit

Permalink
[FLINK-35025][Runtime/State] Abstract stream operators for async stat…
Browse files Browse the repository at this point in the history
…e processing
  • Loading branch information
Zakelly committed Apr 12, 2024
1 parent f49d4c6 commit 19779de
Show file tree
Hide file tree
Showing 10 changed files with 595 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,15 @@
package org.apache.flink.streaming.api.operators;

import org.apache.flink.annotation.Experimental;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.operators.asyncprocessing.AsyncStateProcessing;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
import org.apache.flink.util.function.ThrowingConsumer;

import javax.annotation.Nullable;

Expand All @@ -35,7 +38,8 @@
* AbstractStreamOperatorV2}.
*/
@Experimental
public abstract class AbstractInput<IN, OUT> implements Input<IN>, KeyContextHandler {
public abstract class AbstractInput<IN, OUT>
implements Input<IN>, KeyContextHandler, AsyncStateProcessing {
/**
* {@code KeySelector} for extracting a key from an element being processed. This is used to
* scope keyed state to a key. This is null if the operator is not a keyed operator.
Expand Down Expand Up @@ -86,4 +90,36 @@ public void processRecordAttributes(RecordAttributes recordAttributes) throws Ex
public boolean hasKeyContext() {
return stateKeySelector != null;
}

@Internal
@Override
public final boolean isAsyncStateProcessingEnabled() {
return (owner instanceof AsyncStateProcessing)
&& ((AsyncStateProcessing) owner).isAsyncStateProcessingEnabled();
}

@Internal
@Override
@SuppressWarnings("unchecked")
public final <T> void setAsyncKeyedContextElement(
StreamRecord<T> record, KeySelector<T, ?> keySelector) throws Exception {
((AsyncStateProcessing) owner).setAsyncKeyedContextElement(record, keySelector);
}

@Internal
@Override
public final void postProcessElement() {
((AsyncStateProcessing) owner).postProcessElement();
}

@Internal
@Override
@SuppressWarnings({"unchecked", "rawtypes"})
public final ThrowingConsumer<StreamRecord, Exception> getRecordProcessor(InputOrdinal input) {
return record -> {
setAsyncKeyedContextElement(record, stateKeySelector);
processElement(record);
postProcessElement();
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -128,15 +128,15 @@ public abstract class AbstractStreamOperator<OUT>
*
* <p>This is for elements from the first input.
*/
private transient KeySelector<?, ?> stateKeySelector1;
protected transient KeySelector<?, ?> stateKeySelector1;

/**
* {@code KeySelector} for extracting a key from an element being processed. This is used to
* scope keyed state to a key. This is null if the operator is not a keyed operator.
*
* <p>This is for elements from the second input.
*/
private transient KeySelector<?, ?> stateKeySelector2;
protected transient KeySelector<?, ?> stateKeySelector2;

private transient StreamOperatorStateHandler stateHandler;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ public OperatorMetricGroup getMetricGroup() {
}

@Override
public final void initializeState(StreamTaskStateInitializer streamTaskStateManager)
public void initializeState(StreamTaskStateInitializer streamTaskStateManager)
throws Exception {
final TypeSerializer<?> keySerializer =
config.getStateKeySerializer(getUserCodeClassloader());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,4 +252,9 @@ private KeyedStateStore checkPreconditionsAndGetKeyedStateStore(
public boolean isCheckpointingEnabled() {
return streamConfig.isCheckpointingEnabled();
}

/** Return the task environment. */
public Environment getTaskEnvironment() {
return taskEnvironment;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.runtime.operators.asyncprocessing.AsyncStateProcessing;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.function.ThrowingConsumer;
Expand Down Expand Up @@ -54,6 +55,10 @@ public static <T> ThrowingConsumer<StreamRecord<T>, Exception> getRecordProcesso

if (canOmitSetKeyContext) {
return input::processElement;
} else if (input instanceof AsyncStateProcessing
&& ((AsyncStateProcessing) input).isAsyncStateProcessingEnabled()) {
return ((AsyncStateProcessing) input)
.getRecordProcessor(AsyncStateProcessing.InputOrdinal.FIRST);
} else {
return record -> {
input.setKeyContextElement(record);
Expand Down Expand Up @@ -82,6 +87,10 @@ public static <T> ThrowingConsumer<StreamRecord<T>, Exception> getRecordProcesso

if (canOmitSetKeyContext) {
return operator::processElement1;
} else if (operator instanceof AsyncStateProcessing
&& ((AsyncStateProcessing) operator).isAsyncStateProcessingEnabled()) {
return ((AsyncStateProcessing) operator)
.getRecordProcessor(AsyncStateProcessing.InputOrdinal.FIRST);
} else {
return record -> {
operator.setKeyContextElement1(record);
Expand Down Expand Up @@ -110,6 +119,9 @@ public static <T> ThrowingConsumer<StreamRecord<T>, Exception> getRecordProcesso

if (canOmitSetKeyContext) {
return operator::processElement2;
} else if (operator instanceof AsyncStateProcessing) {
return ((AsyncStateProcessing) operator)
.getRecordProcessor(AsyncStateProcessing.InputOrdinal.SECOND);
} else {
return record -> {
operator.setKeyContextElement2(record);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.runtime.operators.asyncprocessing;

import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController;
import org.apache.flink.runtime.asyncprocessing.RecordContext;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.util.function.ThrowingConsumer;

import java.lang.reflect.ParameterizedType;

/**
* This operator is an abstract class that give the {@link AbstractStreamOperator} the ability to
* perform {@link AsyncStateProcessing}. The aim is to make any subclass of {@link
* AbstractStreamOperator} could manipulate async state with only a change of base class.
*/
@Internal
@SuppressWarnings("rawtypes")
public abstract class AbstractAsyncStateStreamOperator<OUT> extends AbstractStreamOperator<OUT>
implements AsyncStateProcessing {

private AsyncExecutionController asyncExecutionController;

private RecordContext lastProcessContext;

/** Initialize necessary state components for {@link AbstractStreamOperator}. */
@Override
public void setup(
StreamTask<?, ?> containingTask,
StreamConfig config,
Output<StreamRecord<OUT>> output) {
super.setup(containingTask, config, output);
// TODO: properly read config and setup
final MailboxExecutor mailboxExecutor =
containingTask.getEnvironment().getMainMailboxExecutor();
this.asyncExecutionController =
new AsyncExecutionController(getTypeClassOfKey(), mailboxExecutor, null);
}

private Class<?> getTypeClassOfKey() {
final TypeSerializer<?> keySerializer =
config.getStateKeySerializer(getUserCodeClassloader());
return (Class)
((ParameterizedType) keySerializer.getClass().getGenericSuperclass())
.getActualTypeArguments()[0];
}

@Override
public final boolean isAsyncStateProcessingEnabled() {
// TODO: Read from config
return true;
}

@Override
@SuppressWarnings("unchecked")
public final <T> void setAsyncKeyedContextElement(
StreamRecord<T> record, KeySelector<T, ?> keySelector) throws Exception {
lastProcessContext =
asyncExecutionController.buildContext(
record.getValue(), keySelector.getKey(record.getValue()));
lastProcessContext.retain();
asyncExecutionController.setCurrentContext(lastProcessContext);
}

@Override
public final void postProcessElement() {
lastProcessContext.release();
}

@Override
@SuppressWarnings("unchecked")
public final <T> ThrowingConsumer<StreamRecord<T>, Exception> getRecordProcessor(
InputOrdinal input) {
// Ideally, only TwoStreamInputOperator will invoke here.
if (this instanceof TwoInputStreamOperator) {
switch (input) {
case FIRST:
return record -> {
setAsyncKeyedContextElement(record, (KeySelector<T, ?>) stateKeySelector1);
((TwoInputStreamOperator) this).processElement1(record);
postProcessElement();
};
case SECOND:
return record -> {
setAsyncKeyedContextElement(record, (KeySelector<T, ?>) stateKeySelector2);
((TwoInputStreamOperator) this).processElement2(record);
postProcessElement();
};
default:
throw new IllegalArgumentException("Unsupported input ordinal.");
}
}
throw new UnsupportedOperationException(
"Unsupported operator type for async processing:" + getClass().getName());
}

@VisibleForTesting
AsyncExecutionController<?> getAsyncExecutionController() {
return asyncExecutionController;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.runtime.operators.asyncprocessing;

import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController;
import org.apache.flink.runtime.asyncprocessing.RecordContext;
import org.apache.flink.streaming.api.operators.AbstractStreamOperatorV2;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.function.ThrowingConsumer;

import java.lang.reflect.ParameterizedType;

/**
* This operator is an abstract class that give the {@link AbstractStreamOperatorV2} the ability to
* perform {@link AsyncStateProcessing}. The aim is to make any subclass of {@link
* AbstractStreamOperatorV2} could manipulate async state with only a change of base class.
*/
@Internal
@SuppressWarnings("rawtypes")
public abstract class AbstractAsyncStateStreamOperatorV2<OUT> extends AbstractStreamOperatorV2<OUT>
implements AsyncStateProcessing {

private AsyncExecutionController asyncExecutionController;

private RecordContext lastProcessContext;

public AbstractAsyncStateStreamOperatorV2(
StreamOperatorParameters<OUT> parameters, int numberOfInputs) {
super(parameters, numberOfInputs);
}

/** Initialize necessary state components for {@link AbstractStreamOperatorV2}. */
@Override
public final void initializeState(StreamTaskStateInitializer streamTaskStateManager)
throws Exception {
super.initializeState(streamTaskStateManager);
// TODO: Read config and properly set.
final MailboxExecutor mailboxExecutor =
getRuntimeContext().getTaskEnvironment().getMainMailboxExecutor();
this.asyncExecutionController =
new AsyncExecutionController(getTypeClassOfKey(), mailboxExecutor, null);
}

private Class<?> getTypeClassOfKey() {
final TypeSerializer<?> keySerializer =
config.getStateKeySerializer(getUserCodeClassloader());
return (Class)
((ParameterizedType) keySerializer.getClass().getGenericSuperclass())
.getActualTypeArguments()[0];
}

@Override
public final boolean isAsyncStateProcessingEnabled() {
// TODO: Read from config
return true;
}

@Override
@SuppressWarnings("unchecked")
public final <T> void setAsyncKeyedContextElement(
StreamRecord<T> record, KeySelector<T, ?> keySelector) throws Exception {
lastProcessContext =
asyncExecutionController.buildContext(
record.getValue(), keySelector.getKey(record.getValue()));
lastProcessContext.retain();
asyncExecutionController.setCurrentContext(lastProcessContext);
}

@Override
public final void postProcessElement() {
lastProcessContext.release();
}

@Override
public final <T> ThrowingConsumer<StreamRecord<T>, Exception> getRecordProcessor(
InputOrdinal input) {
// The real logic should be in AbstractInput#getRecordProcessor.
throw new UnsupportedOperationException(
"Never getRecordProcessor from AbstractAsyncStateStreamOperatorV2,"
+ " since this part is handled by the Input.");
}

@VisibleForTesting
AsyncExecutionController<?> getAsyncExecutionController() {
return asyncExecutionController;
}
}
Loading

0 comments on commit 19779de

Please sign in to comment.