Skip to content

Commit

Permalink
[FLINK-34986][Runtime/State] More tests for reference counting of Con…
Browse files Browse the repository at this point in the history
…textStateFutureImpl
  • Loading branch information
Zakelly committed Apr 7, 2024
1 parent e4c56c1 commit 4c6a6b3
Show file tree
Hide file tree
Showing 4 changed files with 259 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,8 @@ public <U, V> StateFuture<V> thenCombine(
}

/**
* Make a new future based on context of this future.
* Make a new future based on context of this future. Subclasses need to overload this method to
* generate their own instances (if needed).
*
* @return the new created future.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
* <li>2. -1 when future completed.
* <li>3. +1 when callback registered.
* <li>4. -1 when callback finished.
* <li>Please refer to {@code ContextStateFutureImplTest} where the reference counting is carefully
* tested.
*/
public class ContextStateFutureImpl<T> extends StateFutureImpl<T> {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ void testBasicRun() {
* An AsyncExecutionController for testing purpose, which integrates with basic buffer
* mechanism.
*/
class TestAsyncExecutionController<R, K> extends AsyncExecutionController<R, K> {
static class TestAsyncExecutionController<R, K> extends AsyncExecutionController<R, K> {

LinkedList<StateRequest<K, ?, ?>> activeBuffer;

Expand Down Expand Up @@ -216,7 +216,7 @@ void migrateBlockingToActive() {
}

/** Simulate the underlying state that is actually used to execute the request. */
class TestUnderlyingState {
static class TestUnderlyingState {

private HashMap<String, Integer> hashMap;

Expand All @@ -233,7 +233,7 @@ public void update(String key, Integer val) {
}
}

class TestValueState implements ValueState<Integer> {
static class TestValueState implements ValueState<Integer> {

private AsyncExecutionController<String, String> asyncExecutionController;

Expand Down Expand Up @@ -271,7 +271,7 @@ public StateFuture<Void> asyncUpdate(Integer value) {
* A brief implementation of {@link StateExecutor}, to illustrate the interaction between AEC
* and StateExecutor.
*/
class TestStateExecutor implements StateExecutor {
static class TestStateExecutor implements StateExecutor {

public TestStateExecutor() {}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,251 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.asyncprocessing;

import org.apache.flink.core.state.StateFutureUtils;

import org.junit.Test;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedList;

import static org.assertj.core.api.Assertions.assertThat;

/** Tests for {@link ContextStateFutureImpl}. */
public class ContextStateFutureImplTest {

@Test
public void testThenApply() {
SingleStepRunner runner = new SingleStepRunner();
KeyAccountingUnit<String, String> keyAccountingUnit = new KeyAccountingUnit<>();
RecordContext<String, String> recordContext =
new RecordContext<>(keyAccountingUnit, "a", "b");

// validate
ContextStateFutureImpl<Void> future =
new ContextStateFutureImpl<>(runner::submit, recordContext);
assertThat(recordContext.getReferenceCount()).isEqualTo(1);
future.thenApply((v) -> 1L);
future.complete(null);
assertThat(runner.runThrough()).isTrue();
assertThat(recordContext.getReferenceCount()).isEqualTo(0);

// validate completion before callback
future = new ContextStateFutureImpl<>(runner::submit, recordContext);
assertThat(recordContext.getReferenceCount()).isEqualTo(1);
future.complete(null);
assertThat(recordContext.getReferenceCount()).isEqualTo(0);
future.thenApply((v) -> 1L);
assertThat(recordContext.getReferenceCount()).isEqualTo(0);
assertThat(runner.runThrough()).isFalse();
}

@Test
public void testThenAccept() {
SingleStepRunner runner = new SingleStepRunner();
KeyAccountingUnit<String, String> keyAccountingUnit = new KeyAccountingUnit<>();
RecordContext<String, String> recordContext =
new RecordContext<>(keyAccountingUnit, "a", "b");

// validate
ContextStateFutureImpl<Void> future =
new ContextStateFutureImpl<>(runner::submit, recordContext);
assertThat(recordContext.getReferenceCount()).isEqualTo(1);
future.thenAccept((v) -> {});
future.complete(null);
assertThat(runner.runThrough()).isTrue();
assertThat(recordContext.getReferenceCount()).isEqualTo(0);

// validate completion before callback
future = new ContextStateFutureImpl<>(runner::submit, recordContext);
assertThat(recordContext.getReferenceCount()).isEqualTo(1);
future.complete(null);
assertThat(recordContext.getReferenceCount()).isEqualTo(0);
future.thenAccept((v) -> {});
assertThat(recordContext.getReferenceCount()).isEqualTo(0);
assertThat(runner.runThrough()).isFalse();
}

@Test
public void testThenCompose() {
SingleStepRunner runner = new SingleStepRunner();
KeyAccountingUnit<String, String> keyAccountingUnit = new KeyAccountingUnit<>();
RecordContext<String, String> recordContext =
new RecordContext<>(keyAccountingUnit, "a", "b");

// validate
ContextStateFutureImpl<Void> future =
new ContextStateFutureImpl<>(runner::submit, recordContext);
assertThat(recordContext.getReferenceCount()).isEqualTo(1);
future.thenCompose((v) -> StateFutureUtils.completedFuture(1L));
future.complete(null);
assertThat(runner.runThrough()).isTrue();
assertThat(recordContext.getReferenceCount()).isEqualTo(0);

// validate completion before callback
future = new ContextStateFutureImpl<>(runner::submit, recordContext);
assertThat(recordContext.getReferenceCount()).isEqualTo(1);
future.complete(null);
assertThat(recordContext.getReferenceCount()).isEqualTo(0);
future.thenCompose((v) -> StateFutureUtils.completedFuture(1L));
assertThat(recordContext.getReferenceCount()).isEqualTo(0);
assertThat(runner.runThrough()).isFalse();
}

@Test
public void testThenCombine() {
SingleStepRunner runner = new SingleStepRunner();
KeyAccountingUnit<String, String> keyAccountingUnit = new KeyAccountingUnit<>();
RecordContext<String, String> recordContext =
new RecordContext<>(keyAccountingUnit, "a", "b");

// validate
ContextStateFutureImpl<Void> future1 =
new ContextStateFutureImpl<>(runner::submit, recordContext);
ContextStateFutureImpl<Void> future2 =
new ContextStateFutureImpl<>(runner::submit, recordContext);
assertThat(recordContext.getReferenceCount()).isEqualTo(2);
future1.thenCombine(future2, (v1, v2) -> 1L);
future1.complete(null);
future2.complete(null);
assertThat(runner.runThrough()).isTrue();
assertThat(recordContext.getReferenceCount()).isEqualTo(0);

// validate future1 completion before callback
future1 = new ContextStateFutureImpl<>(runner::submit, recordContext);
future2 = new ContextStateFutureImpl<>(runner::submit, recordContext);
assertThat(recordContext.getReferenceCount()).isEqualTo(2);
future1.complete(null);
future1.thenCombine(future2, (v1, v2) -> 1L);
assertThat(recordContext.getReferenceCount()).isGreaterThan(1);
future2.complete(null);
assertThat(runner.runThrough()).isTrue();
assertThat(recordContext.getReferenceCount()).isEqualTo(0);

// validate future2 completion before callback
future1 = new ContextStateFutureImpl<>(runner::submit, recordContext);
future2 = new ContextStateFutureImpl<>(runner::submit, recordContext);
assertThat(recordContext.getReferenceCount()).isEqualTo(2);
future2.complete(null);
future1.thenCombine(future2, (v1, v2) -> 1L);
assertThat(recordContext.getReferenceCount()).isGreaterThan(1);
future1.complete(null);
assertThat(runner.runThrough()).isTrue();
assertThat(recordContext.getReferenceCount()).isEqualTo(0);

// validate both future1 and future2 completion before callback
future1 = new ContextStateFutureImpl<>(runner::submit, recordContext);
future2 = new ContextStateFutureImpl<>(runner::submit, recordContext);
assertThat(recordContext.getReferenceCount()).isEqualTo(2);
future1.complete(null);
future2.complete(null);
future1.thenCombine(future2, (v1, v2) -> 1L);
assertThat(runner.runThrough()).isFalse();
assertThat(recordContext.getReferenceCount()).isEqualTo(0);
}

@Test
public void testComplex() {
SingleStepRunner runner = new SingleStepRunner();
KeyAccountingUnit<String, String> keyAccountingUnit = new KeyAccountingUnit<>();
RecordContext<String, String> recordContext =
new RecordContext<>(keyAccountingUnit, "a", "b");

for (int i = 0; i < 32; i++) { // 2^5 for completion status combination
// Each bit of 'i' represents the complete status when the user code executes.
// 1 stands for the completion of corresponding future before the user code execution.
// While 0, on the contrary, represents the scenario where the user code is executed
// first and then the future completes.
ArrayList<ContextStateFutureImpl<Void>> futures = new ArrayList<>(6);
for (int j = 0; j < 5; j++) {
ContextStateFutureImpl<Void> future =
new ContextStateFutureImpl<>(runner::submit, recordContext);
futures.add(future);
// Complete future before user code.
if (((i >>> j) & 1) == 1) {
future.complete(null);
}
}

// Simulate user code logic.
StateFutureUtils.combineAll(
Arrays.asList(futures.get(0), futures.get(1), futures.get(2)))
.thenCombine(
futures.get(3),
(a, b) -> {
return 1L;
})
.thenCompose(
(a) -> {
return futures.get(4);
})
.thenApply(
(e) -> {
return 2L;
})
.thenAccept((b) -> {});

// Complete unfinished future.
for (int j = 0; j < 5; j++) {
if (((i >>> j) & 1) == 0) {
futures.get(j).complete(null);
}
}

if (i == 31) {
// All completed before user code.
assertThat(recordContext.getReferenceCount())
.withFailMessage("The reference counted tests fail for profile id %d", i)
.isEqualTo(0);
assertThat(runner.runThrough())
.withFailMessage("The reference counted tests fail for profile id %d", i)
.isFalse();
} else {
assertThat(recordContext.getReferenceCount())
.withFailMessage("The reference counted tests fail for profile id %d", i)
.isGreaterThan(0);
assertThat(runner.runThrough())
.withFailMessage("The reference counted tests fail for profile id %d", i)
.isTrue();
assertThat(recordContext.getReferenceCount())
.withFailMessage("The reference counted tests fail for profile id %d", i)
.isEqualTo(0);
}
}
}

/** A runner that performs single-step debugging. */
public static class SingleStepRunner {
private final LinkedList<Runnable> runnables = new LinkedList<>();

public void submit(Runnable runnable) {
runnables.add(runnable);
}

public boolean runThrough() {
boolean run = false;
while (!runnables.isEmpty()) {
runnables.poll().run();
run = true;
}
return run;
}
}
}

0 comments on commit 4c6a6b3

Please sign in to comment.