From af6b88dd09599e31e3e707bbe25962b7d9032ab0 Mon Sep 17 00:00:00 2001 From: Zakelly Date: Mon, 15 Apr 2024 20:53:47 +0800 Subject: [PATCH] [FLINK-35025][Runtime/State] Split `AsyncStateProcessing` and introduce element order for state processing --- .../AsyncExecutionController.java | 21 +++ .../asyncprocessing/StateRequestBuffer.java | 8 +- .../AsyncExecutionControllerTest.java | 44 ++++++ .../api/operators/AbstractInput.java | 31 ++--- .../AbstractAsyncStateStreamOperator.java | 44 +++--- .../AbstractAsyncStateStreamOperatorV2.java | 19 ++- .../asyncprocessing/AsyncStateProcessing.java | 45 ++++-- .../AsyncStateProcessingOperator.java | 54 ++++++++ .../asyncprocessing/ElementOrder.java | 42 ++++++ .../AbstractAsyncStateStreamOperatorTest.java | 108 ++++++++++++++- ...bstractAsyncStateStreamOperatorV2Test.java | 130 ++++++++++++++++-- .../input/FirstInputOfTwoInput.java | 14 -- .../multipleinput/input/OneInput.java | 14 -- .../input/SecondInputOfTwoInput.java | 14 -- 14 files changed, 475 insertions(+), 113 deletions(-) create mode 100644 flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AsyncStateProcessingOperator.java create mode 100644 flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/ElementOrder.java diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java index 8dd8d16e7215f8..1964ec3f378e9b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java @@ -21,7 +21,9 @@ import org.apache.flink.api.common.operators.MailboxExecutor; import org.apache.flink.api.common.state.v2.State; import org.apache.flink.core.state.InternalStateFuture; +import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.util.Preconditions; +import org.apache.flink.util.function.ThrowingRunnable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -248,4 +250,23 @@ private void seizeCapacity() { setCurrentContext(storedContext); inFlightRecordNum.incrementAndGet(); } + + /** + * A helper to request a {@link StateRequestType#SYNC_POINT} and run a callback if it finishes + * (once the record is not blocked). + * + * @param callback the callback to run if it finishes (once the record is not blocked). + */ + public void syncPointRequestWithCallback(ThrowingRunnable callback) { + handleRequest(null, StateRequestType.SYNC_POINT, null) + .thenAccept( + v -> { + try { + callback.run(); + } catch (Exception e) { + // TODO: Properly handle the exception and fail the entire job. + throw new FlinkRuntimeException("Unexpected runtime exception", e); + } + }); + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequestBuffer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequestBuffer.java index 0eea7bbcacc42b..9fa10bfee5b960 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequestBuffer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequestBuffer.java @@ -60,7 +60,11 @@ public StateRequestBuffer() { } void enqueueToActive(StateRequest request) { - activeQueue.add(request); + if (request.getRequestType() == StateRequestType.SYNC_POINT) { + request.getFuture().complete(null); + } else { + activeQueue.add(request); + } } void enqueueToBlocking(StateRequest request) { @@ -83,7 +87,7 @@ RecordContext tryActivateOneByKey(K key) { } StateRequest stateRequest = blockingQueue.get(key).removeFirst(); - activeQueue.add(stateRequest); + enqueueToActive(stateRequest); if (blockingQueue.get(key).isEmpty()) { blockingQueue.remove(key); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.java index d07c706c6bb450..05237e238f2a6a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.java @@ -296,6 +296,50 @@ void testInFlightRecordControl() { } } + @Test + public void testSyncPoint() { + AtomicInteger counter = new AtomicInteger(0); + + // Test the sync point processing without a key occupied. + RecordContext recordContext = aec.buildContext("record", "key"); + aec.setCurrentContext(recordContext); + recordContext.retain(); + aec.syncPointRequestWithCallback(counter::incrementAndGet); + assertThat(counter.get()).isEqualTo(1); + assertThat(recordContext.getReferenceCount()).isEqualTo(1); + assertThat(aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(0); + assertThat(aec.stateRequestsBuffer.blockingQueueSize()).isEqualTo(0); + assertThat(aec.keyAccountingUnit.occupiedCount()).isEqualTo(1); + recordContext.release(); + assertThat(aec.keyAccountingUnit.occupiedCount()).isEqualTo(0); + + counter.set(0); + // Test the sync point processing with a key occupied. + RecordContext recordContext1 = aec.buildContext("record1", "occupied"); + aec.setCurrentContext(recordContext1); + userCode.run(); + + RecordContext recordContext2 = aec.buildContext("record2", "occupied"); + aec.setCurrentContext(recordContext2); + aec.syncPointRequestWithCallback(counter::incrementAndGet); + recordContext2.retain(); + assertThat(counter.get()).isEqualTo(0); + assertThat(recordContext2.getReferenceCount()).isGreaterThan(1); + assertThat(aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(1); + assertThat(aec.stateRequestsBuffer.blockingQueueSize()).isEqualTo(1); + aec.triggerIfNeeded(true); + assertThat(counter.get()).isEqualTo(0); + assertThat(recordContext2.getReferenceCount()).isGreaterThan(1); + assertThat(aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(1); + assertThat(aec.stateRequestsBuffer.blockingQueueSize()).isEqualTo(1); + aec.triggerIfNeeded(true); + assertThat(counter.get()).isEqualTo(1); + assertThat(recordContext2.getReferenceCount()).isEqualTo(1); + assertThat(aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(0); + assertThat(aec.stateRequestsBuffer.blockingQueueSize()).isEqualTo(0); + recordContext2.release(); + } + /** Simulate the underlying state that is actually used to execute the request. */ static class TestUnderlyingState { diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractInput.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractInput.java index 20c59fd3e0cfd9..20c3006d83827e 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractInput.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractInput.java @@ -23,6 +23,7 @@ import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.operators.asyncprocessing.AsyncStateProcessing; +import org.apache.flink.streaming.runtime.operators.asyncprocessing.AsyncStateProcessingOperator; import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; @@ -94,32 +95,16 @@ public boolean hasKeyContext() { @Internal @Override public final boolean isAsyncStateProcessingEnabled() { - return (owner instanceof AsyncStateProcessing) - && ((AsyncStateProcessing) owner).isAsyncStateProcessingEnabled(); + return (owner instanceof AsyncStateProcessingOperator) + && ((AsyncStateProcessingOperator) owner).isAsyncStateProcessingEnabled(); } @Internal @Override - @SuppressWarnings("unchecked") - public final void setAsyncKeyedContextElement( - StreamRecord record, KeySelector keySelector) throws Exception { - ((AsyncStateProcessing) owner).setAsyncKeyedContextElement(record, keySelector); - } - - @Internal - @Override - public final void postProcessElement() { - ((AsyncStateProcessing) owner).postProcessElement(); - } - - @Internal - @Override - @SuppressWarnings({"unchecked", "rawtypes"}) - public final ThrowingConsumer getRecordProcessor(int inputId) { - return record -> { - setAsyncKeyedContextElement(record, stateKeySelector); - processElement(record); - postProcessElement(); - }; + public final ThrowingConsumer, Exception> getRecordProcessor(int inputId) { + return AsyncStateProcessing.makeRecordProcessor( + (AsyncStateProcessingOperator) owner, + (KeySelector) stateKeySelector, + this::processElement); } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperator.java index 61aa4a608871d2..e57f8b6678125d 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperator.java @@ -32,6 +32,7 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.StreamTask; import org.apache.flink.util.function.ThrowingConsumer; +import org.apache.flink.util.function.ThrowingRunnable; /** * This operator is an abstract class that give the {@link AbstractStreamOperator} the ability to @@ -41,7 +42,7 @@ @Internal @SuppressWarnings("rawtypes") public abstract class AbstractAsyncStateStreamOperator extends AbstractStreamOperator - implements AsyncStateProcessing { + implements AsyncStateProcessingOperator { private AsyncExecutionController asyncExecutionController; @@ -65,6 +66,11 @@ public boolean isAsyncStateProcessingEnabled() { return true; } + @Override + public ElementOrder getElementOrder() { + return ElementOrder.RECORD_ORDER; + } + @Override @SuppressWarnings("unchecked") public final void setAsyncKeyedContextElement( @@ -91,6 +97,12 @@ public final void postProcessElement() { currentProcessingContext.release(); } + @Override + @SuppressWarnings("unchecked") + public final void preserveRecordOrderAndProcess(ThrowingRunnable processing) { + asyncExecutionController.syncPointRequestWithCallback(processing); + } + @Override @SuppressWarnings("unchecked") public final ThrowingConsumer, Exception> getRecordProcessor(int inputId) { @@ -99,26 +111,21 @@ public final ThrowingConsumer, Exception> getRecordProcessor if (this instanceof TwoInputStreamOperator) { switch (inputId) { case 1: - return record -> { - setAsyncKeyedContextElement(record, (KeySelector) stateKeySelector1); - ((TwoInputStreamOperator) this).processElement1(record); - postProcessElement(); - }; + return AsyncStateProcessing.makeRecordProcessor( + this, + (KeySelector) stateKeySelector1, + ((TwoInputStreamOperator) this)::processElement1); case 2: - return record -> { - setAsyncKeyedContextElement(record, (KeySelector) stateKeySelector2); - ((TwoInputStreamOperator) this).processElement2(record); - postProcessElement(); - }; + return AsyncStateProcessing.makeRecordProcessor( + this, + (KeySelector) stateKeySelector2, + ((TwoInputStreamOperator) this)::processElement2); default: break; } } else if (this instanceof Input && inputId == 1) { - return record -> { - setAsyncKeyedContextElement(record, (KeySelector) stateKeySelector1); - ((Input) this).processElement(record); - postProcessElement(); - }; + return AsyncStateProcessing.makeRecordProcessor( + this, (KeySelector) stateKeySelector1, ((Input) this)::processElement); } throw new IllegalArgumentException( String.format( @@ -130,4 +137,9 @@ public final ThrowingConsumer, Exception> getRecordProcessor AsyncExecutionController getAsyncExecutionController() { return asyncExecutionController; } + + @VisibleForTesting + RecordContext getCurrentProcessingContext() { + return currentProcessingContext; + } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperatorV2.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperatorV2.java index cf57b95aca0f47..fd591db7eecab0 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperatorV2.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperatorV2.java @@ -29,6 +29,7 @@ import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.util.function.ThrowingConsumer; +import org.apache.flink.util.function.ThrowingRunnable; /** * This operator is an abstract class that give the {@link AbstractStreamOperatorV2} the ability to @@ -38,7 +39,7 @@ @Internal @SuppressWarnings("rawtypes") public abstract class AbstractAsyncStateStreamOperatorV2 extends AbstractStreamOperatorV2 - implements AsyncStateProcessing { + implements AsyncStateProcessingOperator { private AsyncExecutionController asyncExecutionController; @@ -65,6 +66,11 @@ public boolean isAsyncStateProcessingEnabled() { return true; } + @Override + public ElementOrder getElementOrder() { + return ElementOrder.RECORD_ORDER; + } + @Override @SuppressWarnings("unchecked") public final void setAsyncKeyedContextElement( @@ -91,6 +97,12 @@ public final void postProcessElement() { currentProcessingContext.release(); } + @Override + @SuppressWarnings("unchecked") + public final void preserveRecordOrderAndProcess(ThrowingRunnable processing) { + asyncExecutionController.syncPointRequestWithCallback(processing); + } + @Override public final ThrowingConsumer, Exception> getRecordProcessor(int inputId) { // The real logic should be in First/SecondInputOfTwoInput#getRecordProcessor. @@ -103,4 +115,9 @@ public final ThrowingConsumer, Exception> getRecordProcessor AsyncExecutionController getAsyncExecutionController() { return asyncExecutionController; } + + @VisibleForTesting + RecordContext getCurrentProcessingContext() { + return currentProcessingContext; + } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AsyncStateProcessing.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AsyncStateProcessing.java index 71f66872ff0dfc..563f3fd1ad83db 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AsyncStateProcessing.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AsyncStateProcessing.java @@ -34,19 +34,6 @@ public interface AsyncStateProcessing { */ boolean isAsyncStateProcessingEnabled(); - /** - * Set key context for async state processing. - * - * @param record the record. - * @param keySelector the key selector to select a key from record. - * @param the type of the record. - */ - void setAsyncKeyedContextElement(StreamRecord record, KeySelector keySelector) - throws Exception; - - /** A callback that will be triggered after an element finishes {@code processElement}. */ - void postProcessElement(); - /** * Get the record processor that could process record from input, which is the only entry for * async processing. @@ -56,4 +43,36 @@ void setAsyncKeyedContextElement(StreamRecord record, KeySelector k * there is multiple inputs for the instance. */ ThrowingConsumer, Exception> getRecordProcessor(int inputId); + + /** + * Static method helper to make a record processor with given infos. + * + * @param asyncOperator the operator that can process state asynchronously. + * @param keySelector the key selector. + * @param processor the record processing logic. + * @return the built record processor that can returned by {@link #getRecordProcessor(int)}. + */ + static ThrowingConsumer, Exception> makeRecordProcessor( + AsyncStateProcessingOperator asyncOperator, + KeySelector keySelector, + ThrowingConsumer, Exception> processor) { + switch (asyncOperator.getElementOrder()) { + case RECORD_ORDER: + return (record) -> { + asyncOperator.setAsyncKeyedContextElement(record, keySelector); + asyncOperator.preserveRecordOrderAndProcess(() -> processor.accept(record)); + asyncOperator.postProcessElement(); + }; + case FIRST_STATE_ORDER: + return (record) -> { + asyncOperator.setAsyncKeyedContextElement(record, keySelector); + processor.accept(record); + asyncOperator.postProcessElement(); + }; + default: + throw new UnsupportedOperationException( + "Unknown element order for async processing:" + + asyncOperator.getElementOrder()); + } + } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AsyncStateProcessingOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AsyncStateProcessingOperator.java new file mode 100644 index 00000000000000..3ac449d3f3dda8 --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AsyncStateProcessingOperator.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.operators.asyncprocessing; + +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.function.ThrowingRunnable; + +/** + * A more detailed interface based on {@link AsyncStateProcessing}, which gives the essential + * methods for an operator to perform async state processing. + */ +public interface AsyncStateProcessingOperator extends AsyncStateProcessing { + + /** Get the {@link ElementOrder} of this operator. */ + ElementOrder getElementOrder(); + + /** + * Set key context for async state processing. + * + * @param record the record. + * @param keySelector the key selector to select a key from record. + * @param the type of the record. + */ + void setAsyncKeyedContextElement(StreamRecord record, KeySelector keySelector) + throws Exception; + + /** A callback that will be triggered after an element finishes {@code processElement}. */ + void postProcessElement(); + + /** + * Check the order of same-key record, and then process the record. Mainly used when the {@link + * #getElementOrder()} returns {@link ElementOrder#RECORD_ORDER}. + * + * @param processing the record processing logic. + */ + void preserveRecordOrderAndProcess(ThrowingRunnable processing); +} diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/ElementOrder.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/ElementOrder.java new file mode 100644 index 00000000000000..2d7b2a63c12ded --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/ElementOrder.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.operators.asyncprocessing; + +import org.apache.flink.annotation.Internal; + +/** + * This enum defines the element order of being processed. Only the elements with the same key + * should be considered here. We should keep this internal and away from API module for now, until + * we could see the concrete need for {@link #FIRST_STATE_ORDER} from average users. + */ +@Internal +public enum ElementOrder { + /** + * Treat the record processing as a whole, meaning that any {@code processElement} call for the + * elements with same key should follow the order of record arrival AND no parallel run is + * allowed. + */ + RECORD_ORDER, + + /** + * The {@code processElement} call will be invoked on record arrival, but may be blocked at the + * first state accessing if there is a preceding same-key record under processing. + */ + FIRST_STATE_ORDER, +} diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperatorTest.java index 20c01c8bc4ffb2..a82a0d2968b8de 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperatorTest.java @@ -25,20 +25,27 @@ import org.apache.flink.streaming.api.operators.InternalTimer; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.Triggerable; +import org.apache.flink.streaming.runtime.io.RecordProcessorUtils; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; +import org.apache.flink.util.function.ThrowingConsumer; import org.junit.jupiter.api.Test; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicInteger; + import static org.assertj.core.api.Assertions.assertThat; /** Basic tests for {@link AbstractAsyncStateStreamOperator}. */ public class AbstractAsyncStateStreamOperatorTest { protected KeyedOneInputStreamOperatorTestHarness, String> - createTestHarness(int maxParalelism, int numSubtasks, int subtaskIndex) + createTestHarness( + int maxParalelism, int numSubtasks, int subtaskIndex, ElementOrder elementOrder) throws Exception { - TestOperator testOperator = new TestOperator(); + TestOperator testOperator = new TestOperator(elementOrder); return new KeyedOneInputStreamOperatorTestHarness<>( testOperator, new TestKeySelector(), @@ -51,7 +58,7 @@ public class AbstractAsyncStateStreamOperatorTest { @Test public void testCreateAsyncExecutionController() throws Exception { try (KeyedOneInputStreamOperatorTestHarness, String> - testHarness = createTestHarness(128, 1, 0)) { + testHarness = createTestHarness(128, 1, 0, ElementOrder.RECORD_ORDER)) { testHarness.open(); assertThat(testHarness.getOperator()) .isInstanceOf(AbstractAsyncStateStreamOperator.class); @@ -62,6 +69,68 @@ public void testCreateAsyncExecutionController() throws Exception { } } + @Test + public void testRecordProcessorWithFirstStateOrder() throws Exception { + try (KeyedOneInputStreamOperatorTestHarness, String> + testHarness = createTestHarness(128, 1, 0, ElementOrder.FIRST_STATE_ORDER)) { + testHarness.open(); + TestOperator testOperator = (TestOperator) testHarness.getOperator(); + ThrowingConsumer>, Exception> processor = + RecordProcessorUtils.getRecordProcessor(testOperator); + ExecutorService anotherThread = Executors.newSingleThreadExecutor(); + // Trigger the processor + anotherThread.execute( + () -> { + try { + processor.accept(new StreamRecord<>(Tuple2.of(5, "5"))); + } catch (Exception e) { + } + }); + + Thread.sleep(1000); + assertThat(testOperator.getProcessed()).isEqualTo(1); + assertThat(testOperator.getCurrentProcessingContext().getReferenceCount()).isEqualTo(1); + + // Proceed processing + testOperator.proceed(); + anotherThread.shutdown(); + Thread.sleep(1000); + assertThat(testOperator.getCurrentProcessingContext().getReferenceCount()).isEqualTo(0); + } + } + + @Test + public void testRecordProcessorWithRecordOrder() throws Exception { + try (KeyedOneInputStreamOperatorTestHarness, String> + testHarness = createTestHarness(128, 1, 0, ElementOrder.RECORD_ORDER)) { + testHarness.open(); + TestOperator testOperator = (TestOperator) testHarness.getOperator(); + ThrowingConsumer>, Exception> processor = + RecordProcessorUtils.getRecordProcessor(testOperator); + ExecutorService anotherThread = Executors.newSingleThreadExecutor(); + // Trigger the processor + anotherThread.execute( + () -> { + try { + processor.accept(new StreamRecord<>(Tuple2.of(5, "5"))); + } catch (Exception e) { + } + }); + + Thread.sleep(1000); + assertThat(testOperator.getProcessed()).isEqualTo(1); + // Why greater than 1: +1 when enter the processor; +1 when handle the SYNC_POINT + assertThat(testOperator.getCurrentProcessingContext().getReferenceCount()) + .isGreaterThan(1); + + // Proceed processing + testOperator.proceed(); + anotherThread.shutdown(); + Thread.sleep(1000); + assertThat(testOperator.getCurrentProcessingContext().getReferenceCount()).isEqualTo(0); + } + } + /** A simple testing operator. */ private static class TestOperator extends AbstractAsyncStateStreamOperator implements OneInputStreamOperator, String>, @@ -69,14 +138,33 @@ private static class TestOperator extends AbstractAsyncStateStreamOperator> element) - throws Exception {} + public ElementOrder getElementOrder() { + return elementOrder; + } + + @Override + public void processElement(StreamRecord> element) throws Exception { + processed.incrementAndGet(); + synchronized (objectToWait) { + objectToWait.wait(); + } + } @Override public void onEventTime(InternalTimer timer) throws Exception {} @@ -84,6 +172,16 @@ public void onEventTime(InternalTimer timer) throws Exce @Override public void onProcessingTime(InternalTimer timer) throws Exception {} + + public int getProcessed() { + return processed.get(); + } + + public void proceed() { + synchronized (objectToWait) { + objectToWait.notify(); + } + } } /** {@link KeySelector} for tests. */ diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperatorV2Test.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperatorV2Test.java index 088729b5067897..5cbb5d71c3ee37 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperatorV2Test.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperatorV2Test.java @@ -32,13 +32,18 @@ import org.apache.flink.streaming.api.operators.StreamOperatorFactory; import org.apache.flink.streaming.api.operators.StreamOperatorParameters; import org.apache.flink.streaming.api.operators.Triggerable; +import org.apache.flink.streaming.runtime.io.RecordProcessorUtils; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; +import org.apache.flink.util.function.ThrowingConsumer; import org.junit.jupiter.api.Test; import java.util.Collections; import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicInteger; import static org.assertj.core.api.Assertions.assertThat; @@ -46,10 +51,11 @@ public class AbstractAsyncStateStreamOperatorV2Test { protected KeyedOneInputStreamOperatorV2TestHarness, String> - createTestHarness(int maxParalelism, int numSubtasks, int subtaskIndex) + createTestHarness( + int maxParalelism, int numSubtasks, int subtaskIndex, ElementOrder elementOrder) throws Exception { return new KeyedOneInputStreamOperatorV2TestHarness<>( - new TestOperatorFactory(), + new TestOperatorFactory(elementOrder), new AbstractAsyncStateStreamOperatorTest.TestKeySelector(), BasicTypeInfo.INT_TYPE_INFO, maxParalelism, @@ -60,7 +66,7 @@ public class AbstractAsyncStateStreamOperatorV2Test { @Test public void testCreateAsyncExecutionController() throws Exception { try (KeyedOneInputStreamOperatorV2TestHarness, String> - testHarness = createTestHarness(128, 1, 0)) { + testHarness = createTestHarness(128, 1, 0, ElementOrder.RECORD_ORDER)) { testHarness.open(); assertThat(testHarness.getBaseOperator()) .isInstanceOf(AbstractAsyncStateStreamOperatorV2.class); @@ -71,6 +77,70 @@ public void testCreateAsyncExecutionController() throws Exception { } } + @Test + public void testRecordProcessorWithFirstStateOrder() throws Exception { + try (KeyedOneInputStreamOperatorV2TestHarness, String> + testHarness = createTestHarness(128, 1, 0, ElementOrder.FIRST_STATE_ORDER)) { + testHarness.open(); + SingleInputTestOperator testOperator = + (SingleInputTestOperator) testHarness.getBaseOperator(); + ThrowingConsumer>, Exception> processor = + RecordProcessorUtils.getRecordProcessor(testOperator.getInputs().get(0)); + ExecutorService anotherThread = Executors.newSingleThreadExecutor(); + // Trigger the processor + anotherThread.execute( + () -> { + try { + processor.accept(new StreamRecord<>(Tuple2.of(5, "5"))); + } catch (Exception e) { + } + }); + + Thread.sleep(1000); + assertThat(testOperator.getProcessed()).isEqualTo(1); + assertThat(testOperator.getCurrentProcessingContext().getReferenceCount()).isEqualTo(1); + + // Proceed processing + testOperator.proceed(); + anotherThread.shutdown(); + Thread.sleep(1000); + assertThat(testOperator.getCurrentProcessingContext().getReferenceCount()).isEqualTo(0); + } + } + + @Test + public void testRecordProcessorWithRecordOrder() throws Exception { + try (KeyedOneInputStreamOperatorV2TestHarness, String> + testHarness = createTestHarness(128, 1, 0, ElementOrder.RECORD_ORDER)) { + testHarness.open(); + SingleInputTestOperator testOperator = + (SingleInputTestOperator) testHarness.getBaseOperator(); + ThrowingConsumer>, Exception> processor = + RecordProcessorUtils.getRecordProcessor(testOperator.getInputs().get(0)); + ExecutorService anotherThread = Executors.newSingleThreadExecutor(); + // Trigger the processor + anotherThread.execute( + () -> { + try { + processor.accept(new StreamRecord<>(Tuple2.of(5, "5"))); + } catch (Exception e) { + } + }); + + Thread.sleep(1000); + assertThat(testOperator.getProcessed()).isEqualTo(1); + // Why greater than 1: +1 when enter the processor; +1 when handle the SYNC_POINT + assertThat(testOperator.getCurrentProcessingContext().getReferenceCount()) + .isGreaterThan(1); + + // Proceed processing + testOperator.proceed(); + anotherThread.shutdown(); + Thread.sleep(1000); + assertThat(testOperator.getCurrentProcessingContext().getReferenceCount()).isEqualTo(0); + } + } + private static class KeyedOneInputStreamOperatorV2TestHarness extends KeyedOneInputStreamOperatorTestHarness { public KeyedOneInputStreamOperatorV2TestHarness( @@ -90,10 +160,17 @@ public StreamOperator getBaseOperator() { } private static class TestOperatorFactory extends AbstractStreamOperatorFactory { + + private final ElementOrder elementOrder; + + TestOperatorFactory(ElementOrder elementOrder) { + this.elementOrder = elementOrder; + } + @Override public > T createStreamOperator( StreamOperatorParameters parameters) { - return (T) new SingleInputTestOperator(parameters); + return (T) new SingleInputTestOperator(parameters, elementOrder); } @Override @@ -111,8 +188,29 @@ private static class SingleInputTestOperator extends AbstractAsyncStateStreamOpe private static final long serialVersionUID = 1L; - public SingleInputTestOperator(StreamOperatorParameters parameters) { + final AtomicInteger processed = new AtomicInteger(0); + + private final ElementOrder elementOrder; + + final Object objectToWait = new Object(); + + final Input input; + + public SingleInputTestOperator( + StreamOperatorParameters parameters, ElementOrder elementOrder) { super(parameters, 1); + this.elementOrder = elementOrder; + input = + new AbstractInput, String>(this, 1) { + @Override + public void processElement(StreamRecord> element) + throws Exception { + processed.incrementAndGet(); + synchronized (objectToWait) { + objectToWait.wait(); + } + } + }; } @Override @@ -122,12 +220,12 @@ public void open() throws Exception { @Override public List getInputs() { - return Collections.singletonList( - new AbstractInput, String>(this, 1) { - @Override - public void processElement(StreamRecord> element) - throws Exception {} - }); + return Collections.singletonList(input); + } + + @Override + public ElementOrder getElementOrder() { + return elementOrder; } @Override @@ -136,5 +234,15 @@ public void onEventTime(InternalTimer timer) throws Exce @Override public void onProcessingTime(InternalTimer timer) throws Exception {} + + public int getProcessed() { + return processed.get(); + } + + public void proceed() { + synchronized (objectToWait) { + objectToWait.notify(); + } + } } } diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/multipleinput/input/FirstInputOfTwoInput.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/multipleinput/input/FirstInputOfTwoInput.java index e318b8159d382d..4f090c290088d1 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/multipleinput/input/FirstInputOfTwoInput.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/multipleinput/input/FirstInputOfTwoInput.java @@ -19,7 +19,6 @@ package org.apache.flink.table.runtime.operators.multipleinput.input; import org.apache.flink.annotation.Internal; -import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.streaming.api.operators.Input; import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; import org.apache.flink.streaming.api.watermark.Watermark; @@ -66,19 +65,6 @@ public final boolean isAsyncStateProcessingEnabled() { && ((AsyncStateProcessing) operator).isAsyncStateProcessingEnabled(); } - @Internal - @Override - public final void setAsyncKeyedContextElement( - StreamRecord record, KeySelector keySelector) throws Exception { - ((AsyncStateProcessing) operator).setAsyncKeyedContextElement(record, keySelector); - } - - @Internal - @Override - public final void postProcessElement() { - ((AsyncStateProcessing) operator).postProcessElement(); - } - @Internal @Override public final ThrowingConsumer, Exception> getRecordProcessor(int inputId) { diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/multipleinput/input/OneInput.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/multipleinput/input/OneInput.java index 907063f5a97e47..f4992249e7d07f 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/multipleinput/input/OneInput.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/multipleinput/input/OneInput.java @@ -19,7 +19,6 @@ package org.apache.flink.table.runtime.operators.multipleinput.input; import org.apache.flink.annotation.Internal; -import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.streaming.api.operators.Input; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.watermark.Watermark; @@ -66,19 +65,6 @@ public final boolean isAsyncStateProcessingEnabled() { && ((AsyncStateProcessing) operator).isAsyncStateProcessingEnabled(); } - @Internal - @Override - public final void setAsyncKeyedContextElement( - StreamRecord record, KeySelector keySelector) throws Exception { - ((AsyncStateProcessing) operator).setAsyncKeyedContextElement(record, keySelector); - } - - @Internal - @Override - public final void postProcessElement() { - ((AsyncStateProcessing) operator).postProcessElement(); - } - @Internal @Override public final ThrowingConsumer, Exception> getRecordProcessor(int inputId) { diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/multipleinput/input/SecondInputOfTwoInput.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/multipleinput/input/SecondInputOfTwoInput.java index ca06f7395d12c8..7158bd115277a8 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/multipleinput/input/SecondInputOfTwoInput.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/multipleinput/input/SecondInputOfTwoInput.java @@ -19,7 +19,6 @@ package org.apache.flink.table.runtime.operators.multipleinput.input; import org.apache.flink.annotation.Internal; -import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.streaming.api.operators.Input; import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; import org.apache.flink.streaming.api.watermark.Watermark; @@ -66,19 +65,6 @@ public final boolean isAsyncStateProcessingEnabled() { && ((AsyncStateProcessing) operator).isAsyncStateProcessingEnabled(); } - @Internal - @Override - public final void setAsyncKeyedContextElement( - StreamRecord record, KeySelector keySelector) throws Exception { - ((AsyncStateProcessing) operator).setAsyncKeyedContextElement(record, keySelector); - } - - @Internal - @Override - public final void postProcessElement() { - ((AsyncStateProcessing) operator).postProcessElement(); - } - @Internal @Override public final ThrowingConsumer, Exception> getRecordProcessor(int inputId) {