From a1b031dde45d52599b4fbca35415e783ea49e598 Mon Sep 17 00:00:00 2001 From: Quinn Klassen Date: Tue, 3 Dec 2024 08:35:52 -0800 Subject: [PATCH 1/2] Fix unbalanced locks in test server for Nexus --- .../internal/testservice/TestWorkflowMutableStateImpl.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableStateImpl.java b/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableStateImpl.java index 84f5791fa..d14de9736 100644 --- a/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableStateImpl.java +++ b/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableStateImpl.java @@ -62,6 +62,7 @@ import io.temporal.internal.testservice.StateMachines.ActivityTaskData; import io.temporal.internal.testservice.StateMachines.CancelExternalData; import io.temporal.internal.testservice.StateMachines.ChildWorkflowData; +import io.temporal.internal.testservice.StateMachines.NexusOperationData; import io.temporal.internal.testservice.StateMachines.SignalExternalData; import io.temporal.internal.testservice.StateMachines.State; import io.temporal.internal.testservice.StateMachines.TimerData; @@ -782,11 +783,11 @@ private void processScheduleNexusOperation( nexusOperations.put(scheduleEventId, operation); operation.action(Action.INITIATE, ctx, attr, workflowTaskCompletedId); + // Record the current attempt of this + int attempt = operation.getData().getAttempt(); ctx.addTimer( ProtobufTimeUtils.toJavaDuration(operation.getData().requestTimeout), - () -> - timeoutNexusRequest( - scheduleEventId, "StartNexusOperation", operation.getData().getAttempt()), + () -> timeoutNexusRequest(scheduleEventId, "StartNexusOperation", attempt), "StartNexusOperation request timeout"); if (attr.hasScheduleToCloseTimeout() && Durations.toMillis(attr.getScheduleToCloseTimeout()) > 0) { From 1b8ceea715a184b6343c4518a79ad0bc93123456 Mon Sep 17 00:00:00 2001 From: Quinn Klassen Date: Tue, 3 Dec 2024 09:47:56 -0800 Subject: [PATCH 2/2] Add more comments and test --- .../workflow/nexus/SyncOperationTimeoutTest.java | 14 +++++++++++--- .../testservice/TestWorkflowMutableStateImpl.java | 7 ++++++- 2 files changed, 17 insertions(+), 4 deletions(-) diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/nexus/SyncOperationTimeoutTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/SyncOperationTimeoutTest.java index 21a5a4236..1dd6ac566 100644 --- a/temporal-sdk/src/test/java/io/temporal/workflow/nexus/SyncOperationTimeoutTest.java +++ b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/SyncOperationTimeoutTest.java @@ -61,7 +61,7 @@ public static class TestNexus implements TestWorkflows.TestWorkflow1 { public String execute(String input) { NexusOperationOptions options = NexusOperationOptions.newBuilder() - .setScheduleToCloseTimeout(Duration.ofSeconds(1)) + .setScheduleToCloseTimeout(Duration.ofSeconds(5)) .build(); NexusServiceOptions serviceOptions = NexusServiceOptions.newBuilder().setOperationOptions(options).build(); @@ -74,14 +74,22 @@ public String execute(String input) { @ServiceImpl(service = TestNexusServices.TestNexusService1.class) public class TestNexusServiceImpl { + int attempt = 0; + @OperationImpl public OperationHandler operation() { // Implemented inline return OperationHandler.sync( (ctx, details, name) -> { - // Simulate a long running operation + // Fail the first attempt with a retry-able exception. This tests + // the schedule-to-close timeout applies across attempts. + attempt += 1; + if (attempt == 1) { + throw new RuntimeException("test exception"); + } + // Simulate a long-running operation try { - Thread.sleep(2000); + Thread.sleep(6000); } catch (InterruptedException e) { throw new RuntimeException(e); } diff --git a/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableStateImpl.java b/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableStateImpl.java index d14de9736..910b987e2 100644 --- a/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableStateImpl.java +++ b/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableStateImpl.java @@ -783,7 +783,8 @@ private void processScheduleNexusOperation( nexusOperations.put(scheduleEventId, operation); operation.action(Action.INITIATE, ctx, attr, workflowTaskCompletedId); - // Record the current attempt of this + // Record the current attempt of this request to be used in the timeout handler + // of this request to make sure we are timing out the correct request. int attempt = operation.getData().getAttempt(); ctx.addTimer( ProtobufTimeUtils.toJavaDuration(operation.getData().requestTimeout), @@ -791,6 +792,9 @@ private void processScheduleNexusOperation( "StartNexusOperation request timeout"); if (attr.hasScheduleToCloseTimeout() && Durations.toMillis(attr.getScheduleToCloseTimeout()) > 0) { + // ScheduleToCloseTimeout is the total time from the start of the operation to the end of the + // operation + // so the attempt is not relevant here. ctx.addTimer( ProtobufTimeUtils.toJavaDuration(attr.getScheduleToCloseTimeout()), () -> @@ -979,6 +983,7 @@ private void processScheduleActivityTask( ActivityTaskScheduledEventAttributes scheduledEvent = activityStateMachine.getData().scheduledEvent; int attempt = activityStateMachine.getData().getAttempt(); + // TODO(quinn) If the first attempt fails, it is not clear this timer will work as expected ctx.addTimer( ProtobufTimeUtils.toJavaDuration(scheduledEvent.getScheduleToCloseTimeout()), () ->