Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix unbalanced locks in test server for Nexus #2341

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public static class TestNexus implements TestWorkflows.TestWorkflow1 {
public String execute(String input) {
NexusOperationOptions options =
NexusOperationOptions.newBuilder()
.setScheduleToCloseTimeout(Duration.ofSeconds(1))
.setScheduleToCloseTimeout(Duration.ofSeconds(5))
.build();
NexusServiceOptions serviceOptions =
NexusServiceOptions.newBuilder().setOperationOptions(options).build();
Expand All @@ -74,14 +74,22 @@ public String execute(String input) {

@ServiceImpl(service = TestNexusServices.TestNexusService1.class)
public class TestNexusServiceImpl {
int attempt = 0;

@OperationImpl
public OperationHandler<String, String> operation() {
// Implemented inline
return OperationHandler.sync(
(ctx, details, name) -> {
// Simulate a long running operation
// Fail the first attempt with a retry-able exception. This tests
// the schedule-to-close timeout applies across attempts.
attempt += 1;
if (attempt == 1) {
throw new RuntimeException("test exception");
}
// Simulate a long-running operation
try {
Thread.sleep(2000);
Thread.sleep(6000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import io.temporal.internal.testservice.StateMachines.ActivityTaskData;
import io.temporal.internal.testservice.StateMachines.CancelExternalData;
import io.temporal.internal.testservice.StateMachines.ChildWorkflowData;
import io.temporal.internal.testservice.StateMachines.NexusOperationData;
import io.temporal.internal.testservice.StateMachines.SignalExternalData;
import io.temporal.internal.testservice.StateMachines.State;
import io.temporal.internal.testservice.StateMachines.TimerData;
Expand Down Expand Up @@ -782,14 +783,18 @@ private void processScheduleNexusOperation(
nexusOperations.put(scheduleEventId, operation);

operation.action(Action.INITIATE, ctx, attr, workflowTaskCompletedId);
// Record the current attempt of this request to be used in the timeout handler
// of this request to make sure we are timing out the correct request.
int attempt = operation.getData().getAttempt();
ctx.addTimer(
ProtobufTimeUtils.toJavaDuration(operation.getData().requestTimeout),
() ->
timeoutNexusRequest(
scheduleEventId, "StartNexusOperation", operation.getData().getAttempt()),
() -> timeoutNexusRequest(scheduleEventId, "StartNexusOperation", attempt),
"StartNexusOperation request timeout");
if (attr.hasScheduleToCloseTimeout()
&& Durations.toMillis(attr.getScheduleToCloseTimeout()) > 0) {
// ScheduleToCloseTimeout is the total time from the start of the operation to the end of the
// operation
// so the attempt is not relevant here.
ctx.addTimer(
ProtobufTimeUtils.toJavaDuration(attr.getScheduleToCloseTimeout()),
() ->
Expand Down Expand Up @@ -978,6 +983,7 @@ private void processScheduleActivityTask(
ActivityTaskScheduledEventAttributes scheduledEvent =
activityStateMachine.getData().scheduledEvent;
int attempt = activityStateMachine.getData().getAttempt();
// TODO(quinn) If the first attempt fails, it is not clear this timer will work as expected
ctx.addTimer(
ProtobufTimeUtils.toJavaDuration(scheduledEvent.getScheduleToCloseTimeout()),
() ->
Expand Down
Loading