From da35cbe626d23a29e89a1d9237ad694f9bbb720d Mon Sep 17 00:00:00 2001 From: "liming.1018" Date: Mon, 6 Nov 2023 14:21:38 +0800 Subject: [PATCH] [flink] fix unstable test: testAsyncCompactionFileDeletedWhenShutdown --- .../flink/sink/AppendOnlyTableCompactionWorkerOperator.java | 3 ++- .../sink/AppendOnlyTableCompactionWorkerOperatorTest.java | 6 ++++-- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendOnlyTableCompactionWorkerOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendOnlyTableCompactionWorkerOperator.java index 4ed939fdc66c..dc49e32a4f58 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendOnlyTableCompactionWorkerOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendOnlyTableCompactionWorkerOperator.java @@ -109,7 +109,8 @@ public void processElement(StreamRecord element) throw result.add(workerExecutor().submit(() -> task.doCompact(write))); } - private ExecutorService workerExecutor() { + @VisibleForTesting + ExecutorService workerExecutor() { if (lazyCompactExecutor == null) { lazyCompactExecutor = Executors.newSingleThreadScheduledExecutor( diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AppendOnlyTableCompactionWorkerOperatorTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AppendOnlyTableCompactionWorkerOperatorTest.java index aafba3384ec5..89d4ae424a21 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AppendOnlyTableCompactionWorkerOperatorTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AppendOnlyTableCompactionWorkerOperatorTest.java @@ -39,6 +39,7 @@ import java.util.ArrayList; import java.util.List; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -78,7 +79,7 @@ public void testAsyncCompactionWorks() throws Exception { workerOperator.prepareCommit(false, Long.MAX_VALUE)); Long now = System.currentTimeMillis(); - if (timeStart - now > timeout && committables.size() != 4) { + if (now - timeStart > timeout && committables.size() != 4) { throw new RuntimeException( "Timeout waiting for compaction, maybe some error happens in " + AppendOnlyTableCompactionWorkerOperator.class @@ -105,6 +106,7 @@ public void testAsyncCompactionFileDeletedWhenShutdown() throws Exception { AppendOnlyTableCompactionWorkerOperator workerOperator = new AppendOnlyTableCompactionWorkerOperator( (AppendOnlyFileStoreTable) getTableDefault(), "user"); + ExecutorService workerExecutor = workerOperator.workerExecutor(); // write 200 files List commitMessages = writeDataDefault(200, 40); @@ -151,7 +153,7 @@ public void testAsyncCompactionFileDeletedWhenShutdown() throws Exception { workerOperator.shutdown(); // wait the last runnable in thread pool to stop - Thread.sleep(2_000); + workerExecutor.awaitTermination(120, TimeUnit.SECONDS); for (Future f : workerOperator.result()) { try {