From 4f883e177db05737e728b33537720a52b008db7b Mon Sep 17 00:00:00 2001 From: Quinn Klassen Date: Tue, 3 Dec 2024 09:19:52 -0800 Subject: [PATCH] Run spotless --- .../nexus/SyncClientOperationTest.java | 118 ++++++------------ .../TestWorkflowMutableStateImpl.java | 3 +- 2 files changed, 42 insertions(+), 79 deletions(-) diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/nexus/SyncClientOperationTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/SyncClientOperationTest.java index adbb2c055..ab4f02d53 100644 --- a/temporal-sdk/src/test/java/io/temporal/workflow/nexus/SyncClientOperationTest.java +++ b/temporal-sdk/src/test/java/io/temporal/workflow/nexus/SyncClientOperationTest.java @@ -20,27 +20,23 @@ package io.temporal.workflow.nexus; -import static io.temporal.testing.internal.SDKTestWorkflowRule.NAMESPACE; - -import com.google.common.collect.ImmutableMap; import com.uber.m3.tally.RootScopeBuilder; +import com.uber.m3.util.Duration; import io.nexusrpc.handler.OperationHandler; import io.nexusrpc.handler.OperationImpl; import io.nexusrpc.handler.ServiceImpl; -import io.temporal.client.WorkflowFailedException; +import io.temporal.client.WorkflowClient; +import io.temporal.client.WorkflowOptions; +import io.temporal.client.WorkflowStub; import io.temporal.common.reporter.TestStatsReporter; -import io.temporal.failure.ApplicationFailure; import io.temporal.nexus.Nexus; import io.temporal.nexus.WorkflowClientOperationHandlers; -import io.temporal.serviceclient.MetricsTag; import io.temporal.testing.internal.SDKTestWorkflowRule; -import io.temporal.worker.MetricsType; -import io.temporal.worker.WorkerMetricsTag; import io.temporal.workflow.*; import io.temporal.workflow.shared.TestNexusServices; -import java.time.Duration; -import java.util.Map; -import org.junit.Assert; +import io.temporal.workflow.shared.TestWorkflows; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import org.junit.Rule; import org.junit.Test; @@ -50,75 +46,29 @@ public class SyncClientOperationTest { @Rule public SDKTestWorkflowRule testWorkflowRule = SDKTestWorkflowRule.newBuilder() - .setWorkflowTypes(TestNexus.class) + .setWorkflowTypes(TestNexus.class, TestSignalWorkflowImpl.class) .setMetricsScope( - new RootScopeBuilder() - .reporter(reporter) - .reportEvery(com.uber.m3.util.Duration.ofMillis(10))) + new RootScopeBuilder().reporter(reporter).reportEvery(Duration.ofMillis(10))) .setNexusServiceImplementation(new TestNexusServiceImpl()) .build(); @Test - public void syncClientOperationSuccess() { - TestUpdatedWorkflow workflowStub = - testWorkflowRule.newWorkflowStubTimeoutOptions(TestUpdatedWorkflow.class); - Assert.assertTrue(workflowStub.execute(false).startsWith("Update ID:")); - - // Test metrics all tasks should have - Map nexusWorkerTags = - ImmutableMap.builder() - .putAll(MetricsTag.defaultTags(NAMESPACE)) - .put(MetricsTag.WORKER_TYPE, WorkerMetricsTag.WorkerType.NEXUS_WORKER.getValue()) - .put(MetricsTag.TASK_QUEUE, testWorkflowRule.getTaskQueue()) - .buildKeepingLast(); - reporter.assertTimer(MetricsType.NEXUS_SCHEDULE_TO_START_LATENCY, nexusWorkerTags); - Map operationTags = - ImmutableMap.builder() - .putAll(nexusWorkerTags) - .put(MetricsTag.NEXUS_SERVICE, "TestNexusService1") - .put(MetricsTag.NEXUS_OPERATION, "operation") - .buildKeepingLast(); - reporter.assertTimer(MetricsType.NEXUS_EXEC_LATENCY, operationTags); - reporter.assertTimer(MetricsType.NEXUS_TASK_E2E_LATENCY, operationTags); - // Test our custom metric - reporter.assertCounter("operation", operationTags, 1); - } + public void syncClientOperationSuccess() throws ExecutionException, InterruptedException { + TestWorkflows.TestSignaledWorkflow signalWorkflowStub = + testWorkflowRule.newWorkflowStubTimeoutOptions(TestWorkflows.TestSignaledWorkflow.class); + CompletableFuture result = WorkflowClient.execute(signalWorkflowStub::execute); - @Test - public void syncClientOperationFail() { TestUpdatedWorkflow workflowStub = testWorkflowRule.newWorkflowStubTimeoutOptions(TestUpdatedWorkflow.class); - Assert.assertThrows(WorkflowFailedException.class, () -> workflowStub.execute(true)); - - // Test metrics all failed tasks should have - Map nexusWorkerTags = - ImmutableMap.builder() - .putAll(MetricsTag.defaultTags(NAMESPACE)) - .put(MetricsTag.WORKER_TYPE, WorkerMetricsTag.WorkerType.NEXUS_WORKER.getValue()) - .put(MetricsTag.TASK_QUEUE, testWorkflowRule.getTaskQueue()) - .buildKeepingLast(); - reporter.assertTimer(MetricsType.NEXUS_SCHEDULE_TO_START_LATENCY, nexusWorkerTags); - Map operationTags = - ImmutableMap.builder() - .putAll(nexusWorkerTags) - .put(MetricsTag.NEXUS_SERVICE, "TestNexusService1") - .put(MetricsTag.NEXUS_OPERATION, "operation") - .buildKeepingLast(); - reporter.assertTimer(MetricsType.NEXUS_EXEC_LATENCY, operationTags); - reporter.assertTimer(MetricsType.NEXUS_TASK_E2E_LATENCY, operationTags); - Map execFailedTags = - ImmutableMap.builder() - .putAll(operationTags) - .put(MetricsTag.TASK_FAILURE_TYPE, "handler_error_BAD_REQUEST") - .buildKeepingLast(); - reporter.assertCounter(MetricsType.NEXUS_EXEC_FAILED_COUNTER, execFailedTags, 1); + workflowStub.execute(WorkflowStub.fromTyped(signalWorkflowStub).getExecution().getWorkflowId()); + result.get(); } @WorkflowInterface public interface TestUpdatedWorkflow { @WorkflowMethod - String execute(boolean fail); + String execute(String id); @UpdateMethod String update(String arg); @@ -126,17 +76,14 @@ public interface TestUpdatedWorkflow { public static class TestNexus implements TestUpdatedWorkflow { @Override - public String execute(boolean fail) { - NexusOperationOptions options = - NexusOperationOptions.newBuilder() - .setScheduleToCloseTimeout(Duration.ofSeconds(1)) - .build(); + public String execute(String id) { + NexusOperationOptions options = NexusOperationOptions.getDefaultInstance(); NexusServiceOptions serviceOptions = NexusServiceOptions.newBuilder().setOperationOptions(options).build(); // Try to call a synchronous operation in a blocking way TestNexusServices.TestNexusService1 serviceStub = Workflow.newNexusServiceStub(TestNexusServices.TestNexusService1.class, serviceOptions); - return serviceStub.operation(fail ? "" : Workflow.getInfo().getWorkflowId()); + return serviceStub.operation(id); } @Override @@ -145,6 +92,21 @@ public String update(String arg) { } } + public static class TestSignalWorkflowImpl implements TestWorkflows.TestSignaledWorkflow { + private boolean signaled = false; + + @Override + public String execute() { + Workflow.await(() -> signaled); + return ""; + } + + @Override + public void signal(String arg) { + signaled = true; + } + } + @ServiceImpl(service = TestNexusServices.TestNexusService1.class) public class TestNexusServiceImpl { @OperationImpl @@ -152,13 +114,13 @@ public OperationHandler operation() { // Implemented inline return WorkflowClientOperationHandlers.sync( (ctx, details, client, id) -> { - if (id.isEmpty()) { - throw ApplicationFailure.newNonRetryableFailure("Invalid ID", "TestError"); - } Nexus.getOperationContext().getMetricsScope().counter("operation").inc(1); - return client - .newWorkflowStub(TestUpdatedWorkflow.class, id) - .update("Update from operation"); + client + .newWorkflowStub( + TestWorkflows.TestSignaledWorkflow.class, + WorkflowOptions.newBuilder().setWorkflowId(id).setTaskQueue("").build()) + .signal("Signaled from operation"); + return ""; }); } } diff --git a/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableStateImpl.java b/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableStateImpl.java index 0f5cf570d..910b987e2 100644 --- a/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableStateImpl.java +++ b/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowMutableStateImpl.java @@ -792,7 +792,8 @@ private void processScheduleNexusOperation( "StartNexusOperation request timeout"); if (attr.hasScheduleToCloseTimeout() && Durations.toMillis(attr.getScheduleToCloseTimeout()) > 0) { - // ScheduleToCloseTimeout is the total time from the start of the operation to the end of the operation + // ScheduleToCloseTimeout is the total time from the start of the operation to the end of the + // operation // so the attempt is not relevant here. ctx.addTimer( ProtobufTimeUtils.toJavaDuration(attr.getScheduleToCloseTimeout()),