Skip to content

Commit

Permalink
Run spotless
Browse files Browse the repository at this point in the history
  • Loading branch information
Quinn-With-Two-Ns committed Dec 3, 2024
1 parent 80669ba commit 4f883e1
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 79 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,27 +20,23 @@

package io.temporal.workflow.nexus;

import static io.temporal.testing.internal.SDKTestWorkflowRule.NAMESPACE;

import com.google.common.collect.ImmutableMap;
import com.uber.m3.tally.RootScopeBuilder;
import com.uber.m3.util.Duration;
import io.nexusrpc.handler.OperationHandler;
import io.nexusrpc.handler.OperationImpl;
import io.nexusrpc.handler.ServiceImpl;
import io.temporal.client.WorkflowFailedException;
import io.temporal.client.WorkflowClient;
import io.temporal.client.WorkflowOptions;
import io.temporal.client.WorkflowStub;
import io.temporal.common.reporter.TestStatsReporter;
import io.temporal.failure.ApplicationFailure;
import io.temporal.nexus.Nexus;
import io.temporal.nexus.WorkflowClientOperationHandlers;
import io.temporal.serviceclient.MetricsTag;
import io.temporal.testing.internal.SDKTestWorkflowRule;
import io.temporal.worker.MetricsType;
import io.temporal.worker.WorkerMetricsTag;
import io.temporal.workflow.*;
import io.temporal.workflow.shared.TestNexusServices;
import java.time.Duration;
import java.util.Map;
import org.junit.Assert;
import io.temporal.workflow.shared.TestWorkflows;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import org.junit.Rule;
import org.junit.Test;

Expand All @@ -50,93 +46,44 @@ public class SyncClientOperationTest {
@Rule
public SDKTestWorkflowRule testWorkflowRule =
SDKTestWorkflowRule.newBuilder()
.setWorkflowTypes(TestNexus.class)
.setWorkflowTypes(TestNexus.class, TestSignalWorkflowImpl.class)
.setMetricsScope(
new RootScopeBuilder()
.reporter(reporter)
.reportEvery(com.uber.m3.util.Duration.ofMillis(10)))
new RootScopeBuilder().reporter(reporter).reportEvery(Duration.ofMillis(10)))
.setNexusServiceImplementation(new TestNexusServiceImpl())
.build();

@Test
public void syncClientOperationSuccess() {
TestUpdatedWorkflow workflowStub =
testWorkflowRule.newWorkflowStubTimeoutOptions(TestUpdatedWorkflow.class);
Assert.assertTrue(workflowStub.execute(false).startsWith("Update ID:"));

// Test metrics all tasks should have
Map<String, String> nexusWorkerTags =
ImmutableMap.<String, String>builder()
.putAll(MetricsTag.defaultTags(NAMESPACE))
.put(MetricsTag.WORKER_TYPE, WorkerMetricsTag.WorkerType.NEXUS_WORKER.getValue())
.put(MetricsTag.TASK_QUEUE, testWorkflowRule.getTaskQueue())
.buildKeepingLast();
reporter.assertTimer(MetricsType.NEXUS_SCHEDULE_TO_START_LATENCY, nexusWorkerTags);
Map<String, String> operationTags =
ImmutableMap.<String, String>builder()
.putAll(nexusWorkerTags)
.put(MetricsTag.NEXUS_SERVICE, "TestNexusService1")
.put(MetricsTag.NEXUS_OPERATION, "operation")
.buildKeepingLast();
reporter.assertTimer(MetricsType.NEXUS_EXEC_LATENCY, operationTags);
reporter.assertTimer(MetricsType.NEXUS_TASK_E2E_LATENCY, operationTags);
// Test our custom metric
reporter.assertCounter("operation", operationTags, 1);
}
public void syncClientOperationSuccess() throws ExecutionException, InterruptedException {
TestWorkflows.TestSignaledWorkflow signalWorkflowStub =
testWorkflowRule.newWorkflowStubTimeoutOptions(TestWorkflows.TestSignaledWorkflow.class);
CompletableFuture<String> result = WorkflowClient.execute(signalWorkflowStub::execute);

@Test
public void syncClientOperationFail() {
TestUpdatedWorkflow workflowStub =
testWorkflowRule.newWorkflowStubTimeoutOptions(TestUpdatedWorkflow.class);
Assert.assertThrows(WorkflowFailedException.class, () -> workflowStub.execute(true));

// Test metrics all failed tasks should have
Map<String, String> nexusWorkerTags =
ImmutableMap.<String, String>builder()
.putAll(MetricsTag.defaultTags(NAMESPACE))
.put(MetricsTag.WORKER_TYPE, WorkerMetricsTag.WorkerType.NEXUS_WORKER.getValue())
.put(MetricsTag.TASK_QUEUE, testWorkflowRule.getTaskQueue())
.buildKeepingLast();
reporter.assertTimer(MetricsType.NEXUS_SCHEDULE_TO_START_LATENCY, nexusWorkerTags);
Map<String, String> operationTags =
ImmutableMap.<String, String>builder()
.putAll(nexusWorkerTags)
.put(MetricsTag.NEXUS_SERVICE, "TestNexusService1")
.put(MetricsTag.NEXUS_OPERATION, "operation")
.buildKeepingLast();
reporter.assertTimer(MetricsType.NEXUS_EXEC_LATENCY, operationTags);
reporter.assertTimer(MetricsType.NEXUS_TASK_E2E_LATENCY, operationTags);
Map<String, String> execFailedTags =
ImmutableMap.<String, String>builder()
.putAll(operationTags)
.put(MetricsTag.TASK_FAILURE_TYPE, "handler_error_BAD_REQUEST")
.buildKeepingLast();
reporter.assertCounter(MetricsType.NEXUS_EXEC_FAILED_COUNTER, execFailedTags, 1);
workflowStub.execute(WorkflowStub.fromTyped(signalWorkflowStub).getExecution().getWorkflowId());
result.get();
}

@WorkflowInterface
public interface TestUpdatedWorkflow {

@WorkflowMethod
String execute(boolean fail);
String execute(String id);

@UpdateMethod
String update(String arg);
}

public static class TestNexus implements TestUpdatedWorkflow {
@Override
public String execute(boolean fail) {
NexusOperationOptions options =
NexusOperationOptions.newBuilder()
.setScheduleToCloseTimeout(Duration.ofSeconds(1))
.build();
public String execute(String id) {
NexusOperationOptions options = NexusOperationOptions.getDefaultInstance();
NexusServiceOptions serviceOptions =
NexusServiceOptions.newBuilder().setOperationOptions(options).build();
// Try to call a synchronous operation in a blocking way
TestNexusServices.TestNexusService1 serviceStub =
Workflow.newNexusServiceStub(TestNexusServices.TestNexusService1.class, serviceOptions);
return serviceStub.operation(fail ? "" : Workflow.getInfo().getWorkflowId());
return serviceStub.operation(id);
}

@Override
Expand All @@ -145,20 +92,35 @@ public String update(String arg) {
}
}

public static class TestSignalWorkflowImpl implements TestWorkflows.TestSignaledWorkflow {
private boolean signaled = false;

@Override
public String execute() {
Workflow.await(() -> signaled);
return "";
}

@Override
public void signal(String arg) {
signaled = true;
}
}

@ServiceImpl(service = TestNexusServices.TestNexusService1.class)
public class TestNexusServiceImpl {
@OperationImpl
public OperationHandler<String, String> operation() {
// Implemented inline
return WorkflowClientOperationHandlers.sync(
(ctx, details, client, id) -> {
if (id.isEmpty()) {
throw ApplicationFailure.newNonRetryableFailure("Invalid ID", "TestError");
}
Nexus.getOperationContext().getMetricsScope().counter("operation").inc(1);
return client
.newWorkflowStub(TestUpdatedWorkflow.class, id)
.update("Update from operation");
client
.newWorkflowStub(
TestWorkflows.TestSignaledWorkflow.class,
WorkflowOptions.newBuilder().setWorkflowId(id).setTaskQueue("").build())
.signal("Signaled from operation");
return "";
});
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -792,7 +792,8 @@ private void processScheduleNexusOperation(
"StartNexusOperation request timeout");
if (attr.hasScheduleToCloseTimeout()
&& Durations.toMillis(attr.getScheduleToCloseTimeout()) > 0) {
// ScheduleToCloseTimeout is the total time from the start of the operation to the end of the operation
// ScheduleToCloseTimeout is the total time from the start of the operation to the end of the
// operation
// so the attempt is not relevant here.
ctx.addTimer(
ProtobufTimeUtils.toJavaDuration(attr.getScheduleToCloseTimeout()),
Expand Down

0 comments on commit 4f883e1

Please sign in to comment.