Skip to content

Commit

Permalink
[flink] fix unstable test: testAsyncCompactionFileDeletedWhenShutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
liming30 committed Nov 6, 2023
1 parent 4a656d9 commit da35cbe
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,8 @@ public void processElement(StreamRecord<AppendOnlyCompactionTask> element) throw
result.add(workerExecutor().submit(() -> task.doCompact(write)));
}

private ExecutorService workerExecutor() {
@VisibleForTesting
ExecutorService workerExecutor() {
if (lazyCompactExecutor == null) {
lazyCompactExecutor =
Executors.newSingleThreadScheduledExecutor(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -78,7 +79,7 @@ public void testAsyncCompactionWorks() throws Exception {
workerOperator.prepareCommit(false, Long.MAX_VALUE));

Long now = System.currentTimeMillis();
if (timeStart - now > timeout && committables.size() != 4) {
if (now - timeStart > timeout && committables.size() != 4) {
throw new RuntimeException(
"Timeout waiting for compaction, maybe some error happens in "
+ AppendOnlyTableCompactionWorkerOperator.class
Expand All @@ -105,6 +106,7 @@ public void testAsyncCompactionFileDeletedWhenShutdown() throws Exception {
AppendOnlyTableCompactionWorkerOperator workerOperator =
new AppendOnlyTableCompactionWorkerOperator(
(AppendOnlyFileStoreTable) getTableDefault(), "user");
ExecutorService workerExecutor = workerOperator.workerExecutor();

// write 200 files
List<CommitMessage> commitMessages = writeDataDefault(200, 40);
Expand Down Expand Up @@ -151,7 +153,7 @@ public void testAsyncCompactionFileDeletedWhenShutdown() throws Exception {
workerOperator.shutdown();

// wait the last runnable in thread pool to stop
Thread.sleep(2_000);
workerExecutor.awaitTermination(120, TimeUnit.SECONDS);

for (Future<CommitMessage> f : workerOperator.result()) {
try {
Expand Down

0 comments on commit da35cbe

Please sign in to comment.