From 0ece64ad708f5ee91618197bb6626cf134fd7663 Mon Sep 17 00:00:00 2001 From: Nate Mortensen Date: Fri, 1 Nov 2024 16:00:01 -0700 Subject: [PATCH] Fix flakiness in ManualActivityCompletionWorkflowTest The operations here are flaky because we're starting an activity and then attempting to cancel/complete/fail it in parallel. We need the second activity to block until the first one has started, and there's no way to orchestrate that within the workflow. The best we can do is make the acitivites block until the ActivityTask is available. --- .../ManualActivityCompletionWorkflowTest.java | 51 ++++++++++++++----- 1 file changed, 38 insertions(+), 13 deletions(-) diff --git a/src/test/java/com/uber/cadence/workflow/ManualActivityCompletionWorkflowTest.java b/src/test/java/com/uber/cadence/workflow/ManualActivityCompletionWorkflowTest.java index 9eb15b5cf..19c32e3e8 100644 --- a/src/test/java/com/uber/cadence/workflow/ManualActivityCompletionWorkflowTest.java +++ b/src/test/java/com/uber/cadence/workflow/ManualActivityCompletionWorkflowTest.java @@ -17,7 +17,6 @@ package com.uber.cadence.workflow; -import com.google.common.base.Preconditions; import com.uber.cadence.WorkflowExecution; import com.uber.cadence.activity.Activity; import com.uber.cadence.activity.ActivityMethod; @@ -48,6 +47,9 @@ public interface ManualCompletionActivities { @ActivityMethod String asyncActivity(); + @ActivityMethod + void reset(); + @ActivityMethod void completeAsyncActivity(String result); @@ -73,48 +75,61 @@ private class ManualCompletionActivitiesImpl implements ManualCompletionActiviti @Override public synchronized String asyncActivity() { openTask = Activity.getTask(); + notifyAll(); Activity.doNotCompleteOnReturn(); return null; } + @Override + public synchronized void reset() { + openTask = null; + } + @Override public synchronized void completeAsyncActivity(String details) { - Preconditions.checkState(openTask != null); - getClient().complete(openTask.getTaskToken(), details); + getClient().complete(getOpenTask().getTaskToken(), details); } @Override public synchronized void completeAsyncActivityById(String details) { - Preconditions.checkState(openTask != null); - getClient().complete(getCurrentWorkflow(), openTask.getActivityId(), details); + getClient().complete(getCurrentWorkflow(), getOpenTask().getActivityId(), details); } @Override public synchronized void failAsyncActivity(String details) { - Preconditions.checkState(openTask != null); getClient() - .completeExceptionally(openTask.getTaskToken(), new ExceptionWithDetaills(details)); + .completeExceptionally(getOpenTask().getTaskToken(), new ExceptionWithDetaills(details)); } @Override public synchronized void failAsyncActivityById(String details) { - Preconditions.checkState(openTask != null); getClient() .completeExceptionally( - getCurrentWorkflow(), openTask.getActivityId(), new ExceptionWithDetaills(details)); + getCurrentWorkflow(), + getOpenTask().getActivityId(), + new ExceptionWithDetaills(details)); } @Override public synchronized void cancelAsyncActivity(String details) { - Preconditions.checkState(openTask != null); - getClient().reportCancellation(openTask.getTaskToken(), details); + getClient().reportCancellation(getOpenTask().getTaskToken(), details); } @Override public synchronized void cancelAsyncActivityById(String details) { - Preconditions.checkState(openTask != null); - getClient().reportCancellation(getCurrentWorkflow(), openTask.getActivityId(), details); + getClient().reportCancellation(getCurrentWorkflow(), getOpenTask().getActivityId(), details); + } + + private synchronized ActivityTask getOpenTask() { + while (openTask == null) { + try { + wait(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + return openTask; } private WorkflowExecution getCurrentWorkflow() { @@ -146,21 +161,29 @@ public void run() { expectSuccess("1", result); expectFailure(() -> activities.completeAsyncActivity("again")); + activities.reset(); + result = Async.function(activities::asyncActivity); activities.completeAsyncActivityById("2"); expectSuccess("2", result); expectFailure(() -> activities.completeAsyncActivityById("again")); + activities.reset(); + result = Async.function(activities::asyncActivity); activities.failAsyncActivity("3"); expectFailureWithDetails(result, "3"); expectFailure(() -> activities.failAsyncActivity("again")); + activities.reset(); + result = Async.function(activities::asyncActivity); activities.failAsyncActivityById("4"); expectFailureWithDetails(result, "4"); expectFailure(() -> activities.failAsyncActivityById("again")); + activities.reset(); + // Need to request cancellation, then the activity can respond with the cancel CompletablePromise completablePromise = Workflow.newPromise(); CancellationScope scope = @@ -178,6 +201,8 @@ public void run() { activities.cancelAsyncActivity("5"); expectCancelled(result); + activities.reset(); + // Need to request cancellation, then the activity can respond with the cancel CompletablePromise completablePromise2 = Workflow.newPromise(); scope =