From 2c6e7e5107048c628a13f4520034f940a23f2fab Mon Sep 17 00:00:00 2001 From: Zakelly Date: Thu, 12 Dec 2024 16:39:37 +0800 Subject: [PATCH] [FLINK-36892][Test] Use record processor in operator test harness --- .../AbstractBroadcastStreamOperatorTestHarness.java | 7 +++---- .../util/MultiInputStreamOperatorTestHarness.java | 6 ++++-- .../util/OneInputStreamOperatorTestHarness.java | 10 ++++++---- .../util/TwoInputStreamOperatorTestHarness.java | 13 +++++++++---- 4 files changed, 22 insertions(+), 14 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/util/AbstractBroadcastStreamOperatorTestHarness.java b/flink-runtime/src/test/java/org/apache/flink/streaming/util/AbstractBroadcastStreamOperatorTestHarness.java index f3ae6524fa3145..0402ca2c2e73a0 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/util/AbstractBroadcastStreamOperatorTestHarness.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/util/AbstractBroadcastStreamOperatorTestHarness.java @@ -21,6 +21,7 @@ import org.apache.flink.streaming.api.operators.StreamOperator; import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.io.RecordProcessorUtils; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; /** Base class for broadcast stream operator test harnesses. */ @@ -38,8 +39,7 @@ public TwoInputStreamOperator getTwoInputOperator() { } public void processElement(StreamRecord element) throws Exception { - getTwoInputOperator().setKeyContextElement1(element); - getTwoInputOperator().processElement1(element); + RecordProcessorUtils.getRecordProcessor1(getTwoInputOperator()).accept(element); } public void processElement(IN1 value, long timestamp) throws Exception { @@ -47,8 +47,7 @@ public void processElement(IN1 value, long timestamp) throws Exception { } public void processBroadcastElement(StreamRecord element) throws Exception { - getTwoInputOperator().setKeyContextElement2(element); - getTwoInputOperator().processElement2(element); + RecordProcessorUtils.getRecordProcessor2(getTwoInputOperator()).accept(element); } public void processBroadcastElement(IN2 value, long timestamp) throws Exception { diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/util/MultiInputStreamOperatorTestHarness.java b/flink-runtime/src/test/java/org/apache/flink/streaming/util/MultiInputStreamOperatorTestHarness.java index 454e0e60d5a5a6..7585652277feee 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/util/MultiInputStreamOperatorTestHarness.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/util/MultiInputStreamOperatorTestHarness.java @@ -22,9 +22,11 @@ import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator; import org.apache.flink.streaming.api.operators.StreamOperatorFactory; import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.io.RecordProcessorUtils; import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus; +import org.apache.flink.util.function.ThrowingConsumer; /** * A test harness for testing a {@link MultipleInputStreamOperator}. @@ -52,8 +54,8 @@ public MultiInputStreamOperatorTestHarness( public void processElement(int idx, StreamRecord element) throws Exception { Input input = getCastedOperator().getInputs().get(idx); - input.setKeyContextElement(element); - input.processElement(element); + ((ThrowingConsumer) RecordProcessorUtils.getRecordProcessor(input)) + .accept(element); } public void processWatermark(int idx, Watermark mark) throws Exception { diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java b/flink-runtime/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java index f7c86e3e65deac..12f16c5046b226 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java @@ -29,10 +29,12 @@ import org.apache.flink.streaming.api.operators.SimpleOperatorFactory; import org.apache.flink.streaming.api.operators.StreamOperatorFactory; import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.io.RecordProcessorUtils; import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus; import org.apache.flink.util.Preconditions; +import org.apache.flink.util.function.ThrowingConsumer; import java.util.ArrayList; import java.util.Collection; @@ -213,13 +215,13 @@ public void processElement(IN value, long timestamp) throws Exception { public void processElement(StreamRecord element) throws Exception { if (inputs.isEmpty()) { - operator.setKeyContextElement1(element); - getOneInputOperator().processElement(element); + RecordProcessorUtils.getRecordProcessor(getOneInputOperator()).accept(element); } else { checkState(inputs.size() == 1); Input input = inputs.get(0); - input.setKeyContextElement(element); - input.processElement(element); + ((ThrowingConsumer) + RecordProcessorUtils.getRecordProcessor(input)) + .accept(element); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java b/flink-runtime/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java index 3bc0cdeab715cd..9bd99dd0a4b904 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java @@ -21,9 +21,11 @@ import org.apache.flink.streaming.api.operators.BoundedMultiInput; import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.io.RecordProcessorUtils; import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus; +import org.apache.flink.util.function.ThrowingConsumer; /** * A test harness for testing a {@link TwoInputStreamOperator}. @@ -37,6 +39,9 @@ public class TwoInputStreamOperatorTestHarness private final TwoInputStreamOperator twoInputOperator; + private final ThrowingConsumer, Exception> processor1; + private final ThrowingConsumer, Exception> processor2; + public TwoInputStreamOperatorTestHarness(TwoInputStreamOperator operator) throws Exception { this(operator, 1, 1, 0); @@ -51,11 +56,12 @@ public TwoInputStreamOperatorTestHarness( super(operator, maxParallelism, numSubtasks, subtaskIndex); this.twoInputOperator = operator; + processor1 = RecordProcessorUtils.getRecordProcessor1(twoInputOperator); + processor2 = RecordProcessorUtils.getRecordProcessor2(twoInputOperator); } public void processElement1(StreamRecord element) throws Exception { - twoInputOperator.setKeyContextElement1(element); - twoInputOperator.processElement1(element); + processor1.accept(element); } public void processElement1(IN1 value, long timestamp) throws Exception { @@ -63,8 +69,7 @@ public void processElement1(IN1 value, long timestamp) throws Exception { } public void processElement2(StreamRecord element) throws Exception { - twoInputOperator.setKeyContextElement2(element); - twoInputOperator.processElement2(element); + processor2.accept(element); } public void processElement2(IN2 value, long timestamp) throws Exception {