Skip to content

Commit

Permalink
Fix proto decoding in a Nexus Operation (#2281)
Browse files Browse the repository at this point in the history
Fix proto decoding in a Nexus Operation
  • Loading branch information
Quinn-With-Two-Ns authored Oct 24, 2024
1 parent 93e30d7 commit b8c4b7b
Show file tree
Hide file tree
Showing 8 changed files with 329 additions and 7 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ ext {
// Platforms
grpcVersion = '1.54.1' // [1.38.0,) Needed for io.grpc.protobuf.services.HealthStatusManager
jacksonVersion = '2.14.2' // [2.9.0,)
nexusVersion = '0.2.0-alpha'
nexusVersion = '0.2.1-alpha'
// we don't upgrade to 1.10.x because it requires kotlin 1.6. Users may use 1.10.x in their environments though.
micrometerVersion = project.hasProperty("edgeDepsTest") ? '1.13.6' : '1.9.9' // [1.0.0,)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.nexusrpc.Serializer;
import io.temporal.api.common.v1.Payload;
import io.temporal.common.converter.DataConverter;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.Optional;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -52,7 +53,14 @@ public Content serialize(@Nullable Object o) {
public @Nullable Object deserialize(Content content, Type type) {
try {
Payload payload = Payload.parseFrom(content.getData());
return dataConverter.fromPayload(payload, type.getClass(), type);
if ((type instanceof Class)) {
return dataConverter.fromPayload(payload, (Class<?>) type, type);
} else if (type instanceof ParameterizedType) {
return dataConverter.fromPayload(
payload, (Class<?>) ((ParameterizedType) type).getRawType(), type);
} else {
throw new IllegalArgumentException("Unsupported type: " + type);
}
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public OperationStartResult<R> start(
io.temporal.api.nexus.v1.Link nexusLink = workflowEventToNexusLink(workflowEventLink);
try {
OperationStartResult.Builder<R> result =
OperationStartResult.<R>newBuilder().setAsyncOperationId(workflowExec.getWorkflowId());
OperationStartResult.<R>newAsyncBuilder(workflowExec.getWorkflowId());
if (nexusLink != null) {
result.addLink(nexusProtoLinkToLink(nexusLink));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,13 @@

package io.temporal.internal.nexus;

import com.google.common.reflect.TypeToken;
import io.temporal.api.common.v1.WorkflowExecution;
import io.temporal.common.converter.DataConverter;
import io.temporal.common.converter.DefaultDataConverter;
import io.temporal.common.converter.EncodedValuesTest;
import java.util.Collections;
import java.util.Map;
import org.junit.Assert;
import org.junit.Test;

Expand All @@ -39,7 +44,40 @@ public void testPayload() {
@Test
public void testNull() {
PayloadSerializer.Content content = payloadSerializer.serialize(null);
payloadSerializer.deserialize(content, String.class);
Assert.assertEquals(null, payloadSerializer.deserialize(content, String.class));
}

@Test
public void testInteger() {
PayloadSerializer.Content content = payloadSerializer.serialize(1);
Assert.assertEquals(1, payloadSerializer.deserialize(content, Integer.class));
}

@Test
public void testArray() {
String[] cars = {"test", "nexus", "serialization"};
PayloadSerializer.Content content = payloadSerializer.serialize(cars);
Assert.assertArrayEquals(
cars, (String[]) payloadSerializer.deserialize(content, String[].class));
}

@Test
public void testHashMap() {
Map<String, EncodedValuesTest.Pair> map =
Collections.singletonMap("key", new EncodedValuesTest.Pair(1, "hello"));
PayloadSerializer.Content content = payloadSerializer.serialize(map);
Map<String, EncodedValuesTest.Pair> newMap =
(Map<String, EncodedValuesTest.Pair>)
payloadSerializer.deserialize(
content, (new TypeToken<Map<String, EncodedValuesTest.Pair>>() {}).getType());
Assert.assertTrue(newMap.get("key") instanceof EncodedValuesTest.Pair);
}

@Test
public void testProto() {
WorkflowExecution exec =
WorkflowExecution.newBuilder().setWorkflowId("id").setRunId("runId").build();
PayloadSerializer.Content content = payloadSerializer.serialize(exec);
Assert.assertEquals(exec, payloadSerializer.deserialize(content, WorkflowExecution.class));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
*
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this material except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.temporal.workflow.nexus;

import io.nexusrpc.Operation;
import io.nexusrpc.Service;
import io.nexusrpc.handler.OperationHandler;
import io.nexusrpc.handler.OperationImpl;
import io.nexusrpc.handler.ServiceImpl;
import io.temporal.common.converter.EncodedValuesTest;
import io.temporal.nexus.WorkflowClientOperationHandlers;
import io.temporal.testing.internal.SDKTestWorkflowRule;
import io.temporal.workflow.NexusServiceOptions;
import io.temporal.workflow.Workflow;
import io.temporal.workflow.shared.TestWorkflows;
import java.util.Collections;
import java.util.List;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;

// Test an operation that takes and returns a List type with a non-primitive element type
public class GenericListOperationTest {
@ClassRule
public static SDKTestWorkflowRule testWorkflowRule =
SDKTestWorkflowRule.newBuilder()
.setWorkflowTypes(TestNexus.class)
.setNexusServiceImplementation(new TestNexusServiceImpl())
.build();

@Test
public void testOperation() {
TestWorkflows.TestWorkflow1 workflowStub =
testWorkflowRule.newWorkflowStubTimeoutOptions(TestWorkflows.TestWorkflow1.class);
String result = workflowStub.execute(testWorkflowRule.getTaskQueue());
Assert.assertEquals("hello", result);
}

public static class TestNexus implements TestWorkflows.TestWorkflow1 {
@Override
public String execute(String input) {
NexusServiceOptions serviceOptions =
NexusServiceOptions.newBuilder()
.setEndpoint(testWorkflowRule.getNexusEndpoint().getSpec().getName())
.build();
// Try to call with the typed stub
TestNexusService serviceStub =
Workflow.newNexusServiceStub(TestNexusService.class, serviceOptions);
List<EncodedValuesTest.Pair> arg =
Collections.singletonList(new EncodedValuesTest.Pair(1, "hello"));
return serviceStub.operation(arg).get(0).getS();
}
}

@Service
public interface TestNexusService {
@Operation
List<EncodedValuesTest.Pair> operation(List<EncodedValuesTest.Pair> input);
}

@ServiceImpl(service = TestNexusService.class)
public static class TestNexusServiceImpl {
@OperationImpl
public OperationHandler<List<EncodedValuesTest.Pair>, List<EncodedValuesTest.Pair>>
operation() {
return WorkflowClientOperationHandlers.sync(
(context, details, client, input) -> {
return input;
});
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
*
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this material except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.temporal.workflow.nexus;

import io.nexusrpc.Operation;
import io.nexusrpc.Service;
import io.nexusrpc.handler.OperationHandler;
import io.nexusrpc.handler.OperationImpl;
import io.nexusrpc.handler.ServiceImpl;
import io.temporal.api.common.v1.WorkflowExecution;
import io.temporal.api.workflowservice.v1.DescribeWorkflowExecutionRequest;
import io.temporal.api.workflowservice.v1.DescribeWorkflowExecutionResponse;
import io.temporal.nexus.WorkflowClientOperationHandlers;
import io.temporal.testing.internal.SDKTestWorkflowRule;
import io.temporal.workflow.*;
import io.temporal.workflow.shared.TestWorkflows;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;

// Test an operation that takes and returns a protobuf message
public class ProtoOperationTest {
@Rule
public SDKTestWorkflowRule testWorkflowRule =
SDKTestWorkflowRule.newBuilder()
.setWorkflowTypes(TestNexus.class)
.setNexusServiceImplementation(new TestNexusServiceImpl())
.build();

@Test
public void testDescribeWorkflowOperation() {
TestWorkflows.TestWorkflow1 workflowStub =
testWorkflowRule.newWorkflowStubTimeoutOptions(TestWorkflows.TestWorkflow1.class);
String result = workflowStub.execute(testWorkflowRule.getTaskQueue());
Assert.assertEquals(testWorkflowRule.getTaskQueue(), result);
}

public static class TestNexus implements TestWorkflows.TestWorkflow1 {
@Override
public String execute(String input) {
TestNexusService serviceStub = Workflow.newNexusServiceStub(TestNexusService.class);

WorkflowExecution exec =
WorkflowExecution.newBuilder()
.setWorkflowId(Workflow.getInfo().getWorkflowId())
.setRunId(Workflow.getInfo().getRunId())
.build();
return serviceStub
.describeWorkflow(
DescribeWorkflowExecutionRequest.newBuilder()
.setNamespace(Workflow.getInfo().getNamespace())
.setExecution(exec)
.build())
.getExecutionConfig()
.getTaskQueue()
.getName();
}
}

@Service
public interface TestNexusService {
@Operation
DescribeWorkflowExecutionResponse describeWorkflow(DescribeWorkflowExecutionRequest input);
}

@ServiceImpl(service = TestNexusService.class)
public class TestNexusServiceImpl {
@OperationImpl
public OperationHandler<DescribeWorkflowExecutionRequest, DescribeWorkflowExecutionResponse>
describeWorkflow() {
return WorkflowClientOperationHandlers.sync(
(context, details, client, input) ->
client.getWorkflowServiceStubs().blockingStub().describeWorkflowExecution(input));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
*
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this material except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.temporal.workflow.nexus;

import io.nexusrpc.Operation;
import io.nexusrpc.Service;
import io.nexusrpc.handler.OperationHandler;
import io.nexusrpc.handler.OperationImpl;
import io.nexusrpc.handler.ServiceImpl;
import io.temporal.nexus.WorkflowClientOperationHandlers;
import io.temporal.testing.internal.SDKTestWorkflowRule;
import io.temporal.workflow.NexusServiceOptions;
import io.temporal.workflow.NexusServiceStub;
import io.temporal.workflow.Workflow;
import io.temporal.workflow.shared.TestWorkflows;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;

// Test an operation that takes and returns a void type
public class VoidOperationTest {
@ClassRule
public static SDKTestWorkflowRule testWorkflowRule =
SDKTestWorkflowRule.newBuilder()
.setWorkflowTypes(TestNexus.class)
.setNexusServiceImplementation(new TestNexusServiceImpl())
.build();

@Test
public void testVoidOperation() {
TestWorkflows.TestWorkflow1 workflowStub =
testWorkflowRule.newWorkflowStubTimeoutOptions(TestWorkflows.TestWorkflow1.class);
String result = workflowStub.execute(testWorkflowRule.getTaskQueue());
Assert.assertEquals("success", result);
}

public static class TestNexus implements TestWorkflows.TestWorkflow1 {
@Override
public String execute(String input) {
NexusServiceOptions serviceOptions =
NexusServiceOptions.newBuilder()
.setEndpoint(testWorkflowRule.getNexusEndpoint().getSpec().getName())
.build();
// Try to call with the typed stub
TestNexusService serviceStub =
Workflow.newNexusServiceStub(TestNexusService.class, serviceOptions);
serviceStub.noop();
// Try to call with an untyped stub
NexusServiceStub untypedServiceStub =
Workflow.newUntypedNexusServiceStub("TestNexusService", serviceOptions);
untypedServiceStub.execute("noop", Void.class, null);
untypedServiceStub.execute("noop", Void.class, Void.class, null);
return "success";
}
}

@Service
public interface TestNexusService {
@Operation
void noop();
}

@ServiceImpl(service = TestNexusService.class)
public static class TestNexusServiceImpl {
@OperationImpl
public OperationHandler<Void, Void> noop() {
return WorkflowClientOperationHandlers.sync((context, details, client, input) -> null);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3180,9 +3180,13 @@ private static PendingNexusOperationInfo constructPendingNexusOperationInfo(
.setScheduledEventId(data.scheduledEventId)
.setScheduleToCloseTimeout(data.scheduledEvent.getScheduleToCloseTimeout())
.setState(convertNexusOperationState(sm.getState(), data))
.setAttempt(data.getAttempt())
.setLastAttemptCompleteTime(data.lastAttemptCompleteTime)
.setNextAttemptScheduleTime(data.nextAttemptScheduleTime);
.setAttempt(data.getAttempt());
if (data.lastAttemptCompleteTime != null) {
builder.setLastAttemptCompleteTime(data.lastAttemptCompleteTime);
}
if (data.nextAttemptScheduleTime != null) {
builder.setNextAttemptScheduleTime(data.nextAttemptScheduleTime);
}

data.retryState.getPreviousRunFailure().ifPresent(builder::setLastAttemptFailure);

Expand Down

0 comments on commit b8c4b7b

Please sign in to comment.