diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/nexus/SyncOperationTimeoutTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/SyncOperationTimeoutTest.java index 21a5a4236..1dd6ac566 100644 --- a/temporal-sdk/src/test/java/io/temporal/workflow/nexus/SyncOperationTimeoutTest.java +++ b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/SyncOperationTimeoutTest.java @@ -61,7 +61,7 @@ public static class TestNexus implements TestWorkflows.TestWorkflow1 { public String execute(String input) { NexusOperationOptions options = NexusOperationOptions.newBuilder() - .setScheduleToCloseTimeout(Duration.ofSeconds(1)) + .setScheduleToCloseTimeout(Duration.ofSeconds(5)) .build(); NexusServiceOptions serviceOptions = NexusServiceOptions.newBuilder().setOperationOptions(options).build(); @@ -74,14 +74,22 @@ public String execute(String input) { @ServiceImpl(service = TestNexusServices.TestNexusService1.class) public class TestNexusServiceImpl { + int attempt = 0; + @OperationImpl public OperationHandler operation() { // Implemented inline return OperationHandler.sync( (ctx, details, name) -> { - // Simulate a long running operation + // Fail the first attempt with a retry-able exception. This tests + // the schedule-to-close timeout applies across attempts. + attempt += 1; + if (attempt == 1) { + throw new RuntimeException("test exception"); + } + // Simulate a long-running operation try { - Thread.sleep(2000); + Thread.sleep(6000); } catch (InterruptedException e) { throw new RuntimeException(e); } diff --git a/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableStateImpl.java b/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableStateImpl.java index 84f5791fa..910b987e2 100644 --- a/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableStateImpl.java +++ b/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableStateImpl.java @@ -62,6 +62,7 @@ import io.temporal.internal.testservice.StateMachines.ActivityTaskData; import io.temporal.internal.testservice.StateMachines.CancelExternalData; import io.temporal.internal.testservice.StateMachines.ChildWorkflowData; +import io.temporal.internal.testservice.StateMachines.NexusOperationData; import io.temporal.internal.testservice.StateMachines.SignalExternalData; import io.temporal.internal.testservice.StateMachines.State; import io.temporal.internal.testservice.StateMachines.TimerData; @@ -782,14 +783,18 @@ private void processScheduleNexusOperation( nexusOperations.put(scheduleEventId, operation); operation.action(Action.INITIATE, ctx, attr, workflowTaskCompletedId); + // Record the current attempt of this request to be used in the timeout handler + // of this request to make sure we are timing out the correct request. + int attempt = operation.getData().getAttempt(); ctx.addTimer( ProtobufTimeUtils.toJavaDuration(operation.getData().requestTimeout), - () -> - timeoutNexusRequest( - scheduleEventId, "StartNexusOperation", operation.getData().getAttempt()), + () -> timeoutNexusRequest(scheduleEventId, "StartNexusOperation", attempt), "StartNexusOperation request timeout"); if (attr.hasScheduleToCloseTimeout() && Durations.toMillis(attr.getScheduleToCloseTimeout()) > 0) { + // ScheduleToCloseTimeout is the total time from the start of the operation to the end of the + // operation + // so the attempt is not relevant here. ctx.addTimer( ProtobufTimeUtils.toJavaDuration(attr.getScheduleToCloseTimeout()), () -> @@ -978,6 +983,7 @@ private void processScheduleActivityTask( ActivityTaskScheduledEventAttributes scheduledEvent = activityStateMachine.getData().scheduledEvent; int attempt = activityStateMachine.getData().getAttempt(); + // TODO(quinn) If the first attempt fails, it is not clear this timer will work as expected ctx.addTimer( ProtobufTimeUtils.toJavaDuration(scheduledEvent.getScheduleToCloseTimeout()), () ->