Skip to content

Commit

Permalink
Test server support for bidi links (#2258)
Browse files Browse the repository at this point in the history
* Test server support for bidi links

* typo

* license

* feedback

* link validation

* describe fields

* link validation
  • Loading branch information
pdoerner authored Oct 10, 2024
1 parent d1dc2e1 commit 393045d
Show file tree
Hide file tree
Showing 5 changed files with 573 additions and 15 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/*
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
*
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this material except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.temporal.internal.testservice;

import io.temporal.api.common.v1.Link;
import io.temporal.api.enums.v1.EventType;
import java.net.URI;
import java.net.URLDecoder;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.util.StringTokenizer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LinkConverter {

private static final Logger log = LoggerFactory.getLogger(StateMachines.class);

private static final String linkPathFormat = "temporal:///namespaces/%s/workflows/%s/%s/history";

public static io.temporal.api.nexus.v1.Link workflowEventToNexusLink(Link.WorkflowEvent we) {
try {
String url =
String.format(
linkPathFormat,
URLEncoder.encode(we.getNamespace(), StandardCharsets.UTF_8.toString()),
URLEncoder.encode(we.getWorkflowId(), StandardCharsets.UTF_8.toString()),
URLEncoder.encode(we.getRunId(), StandardCharsets.UTF_8.toString()));

if (we.hasEventRef()) {
url += "?";
if (we.getEventRef().getEventId() > 0) {
url += "eventID=" + we.getEventRef().getEventId() + "&";
}
url +=
"eventType="
+ URLEncoder.encode(
we.getEventRef().getEventType().name(), StandardCharsets.UTF_8.toString())
+ "&";
url += "referenceType=EventReference";
}

return io.temporal.api.nexus.v1.Link.newBuilder()
.setUrl(url)
.setType(we.getDescriptorForType().getFullName())
.build();
} catch (Exception e) {
log.error("Failed to encode Nexus link URL", e);
}
return null;
}

public static Link nexusLinkToWorkflowEvent(io.temporal.api.nexus.v1.Link nexusLink) {
Link.Builder link = Link.newBuilder();
try {
URI uri = new URI(nexusLink.getUrl());

if (!uri.getScheme().equals("temporal")) {
log.error("Failed to parse Nexus link URL: invalid scheme: {}", uri.getScheme());
return null;
}

StringTokenizer st = new StringTokenizer(uri.getRawPath(), "/");
if (!st.nextToken().equals("namespaces")) {
log.error("Failed to parse Nexus link URL: invalid path: {}", uri.getRawPath());
return null;
}
String namespace = URLDecoder.decode(st.nextToken(), StandardCharsets.UTF_8.toString());
if (!st.nextToken().equals("workflows")) {
log.error("Failed to parse Nexus link URL: invalid path: {}", uri.getRawPath());
return null;
}
String workflowID = URLDecoder.decode(st.nextToken(), StandardCharsets.UTF_8.toString());
String runID = URLDecoder.decode(st.nextToken(), StandardCharsets.UTF_8.toString());
if (!st.hasMoreTokens() || !st.nextToken().equals("history")) {
log.error("Failed to parse Nexus link URL: invalid path: {}", uri.getRawPath());
return null;
}

Link.WorkflowEvent.Builder we =
Link.WorkflowEvent.newBuilder()
.setNamespace(namespace)
.setWorkflowId(workflowID)
.setRunId(runID);

if (uri.getQuery() != null) {
Link.WorkflowEvent.EventReference.Builder eventRef =
Link.WorkflowEvent.EventReference.newBuilder();
String query = URLDecoder.decode(uri.getQuery(), StandardCharsets.UTF_8.toString());
st = new StringTokenizer(query, "&");
while (st.hasMoreTokens()) {
String[] param = st.nextToken().split("=");
switch (param[0]) {
case "eventID":
eventRef.setEventId(Long.parseLong(param[1]));
continue;
case "eventType":
eventRef.setEventType(EventType.valueOf(param[1]));
}
}
we.setEventRef(eventRef);
link.setWorkflowEvent(we);
}
} catch (Exception e) {
// Swallow un-parsable links since they are not critical to processing
log.error("Failed to parse Nexus link URL", e);
return null;
}
return link.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

package io.temporal.internal.testservice;

import static io.temporal.internal.testservice.LinkConverter.*;
import static io.temporal.internal.testservice.StateMachines.Action.CANCEL;
import static io.temporal.internal.testservice.StateMachines.Action.COMPLETE;
import static io.temporal.internal.testservice.StateMachines.Action.CONTINUE_AS_NEW;
Expand Down Expand Up @@ -60,6 +61,7 @@
import io.temporal.api.failure.v1.TimeoutFailureInfo;
import io.temporal.api.history.v1.*;
import io.temporal.api.nexus.v1.*;
import io.temporal.api.nexus.v1.Link;
import io.temporal.api.protocol.v1.Message;
import io.temporal.api.query.v1.WorkflowQueryResult;
import io.temporal.api.taskqueue.v1.StickyExecutionAttributes;
Expand Down Expand Up @@ -347,11 +349,13 @@ static final class NexusOperationData {
RetryPolicy retryPolicy = defaultNexusRetryPolicy();

long scheduledEventId = NO_EVENT_ID;
Timestamp cancelRequestedTime;

TestServiceRetryState retryState;
long lastAttemptCompleteTime;
boolean isBackingOff = false;
Duration nextBackoffInterval;
long nextAttemptScheduleTime;
Timestamp lastAttemptCompleteTime;
Timestamp nextAttemptScheduleTime;
String identity;

public NexusOperationData(Endpoint endpoint) {
Expand Down Expand Up @@ -685,6 +689,18 @@ private static void scheduleNexusOperation(
NexusOperationRef ref = new NexusOperationRef(ctx.getExecutionId(), scheduledEventId);
NexusTaskToken taskToken = new NexusTaskToken(ref, data.getAttempt(), false);

Link link =
workflowEventToNexusLink(
io.temporal.api.common.v1.Link.WorkflowEvent.newBuilder()
.setNamespace(ctx.getNamespace())
.setWorkflowId(ctx.getExecution().getWorkflowId())
.setRunId(ctx.getExecution().getRunId())
.setEventRef(
io.temporal.api.common.v1.Link.WorkflowEvent.EventReference.newBuilder()
.setEventId(scheduledEventId)
.setEventType(EventType.EVENT_TYPE_NEXUS_OPERATION_SCHEDULED))
.build());

PollNexusTaskQueueResponse.Builder pollResponse =
PollNexusTaskQueueResponse.newBuilder()
.setTaskToken(taskToken.toBytes())
Expand All @@ -697,6 +713,7 @@ private static void scheduleNexusOperation(
.setService(attr.getService())
.setOperation(attr.getOperation())
.setPayload(attr.getInput())
.addLinks(link)
.setCallback("http://test-env/operations")
// The test server uses this to lookup the operation
.putCallbackHeader(
Expand Down Expand Up @@ -725,15 +742,24 @@ private static void startNexusOperation(
NexusOperationData data,
StartOperationResponse.Async resp,
long notUsed) {
ctx.addEvent(
HistoryEvent.Builder event =
HistoryEvent.newBuilder()
.setEventType(EventType.EVENT_TYPE_NEXUS_OPERATION_STARTED)
.setNexusOperationStartedEventAttributes(
NexusOperationStartedEventAttributes.newBuilder()
.setOperationId(resp.getOperationId())
.setScheduledEventId(data.scheduledEventId)
.setRequestId(data.scheduledEvent.getRequestId()))
.build());
.setRequestId(data.scheduledEvent.getRequestId()));

for (Link l : resp.getLinksList()) {
if (!l.getType()
.equals(io.temporal.api.common.v1.Link.WorkflowEvent.getDescriptor().getFullName())) {
continue;
}
event.addLinks(nexusLinkToWorkflowEvent(l));
}

ctx.addEvent(event.build());
ctx.onCommit(historySize -> data.operationId = resp.getOperationId());
}

Expand Down Expand Up @@ -846,7 +872,10 @@ private static RetryState attemptNexusOperationRetry(
ctx.onCommit(
(historySize) -> {
data.retryState = nextAttempt;
data.nextAttemptScheduleTime = ctx.currentTime().getSeconds();
data.isBackingOff = true;
data.lastAttemptCompleteTime = ctx.currentTime();
data.nextAttemptScheduleTime =
Timestamps.add(ProtobufTimeUtils.getCurrentProtoTime(), data.nextBackoffInterval);
task.setTaskToken(
new NexusTaskToken(
ctx.getExecutionId(),
Expand Down Expand Up @@ -899,7 +928,12 @@ private static void requestCancelNexusOperation(
// Test server only supports worker targets, so just push directly to Nexus task queue without
// invoking Nexus client.
ctx.addNexusTask(cancelTask);
ctx.onCommit(historySize -> data.nexusTask = cancelTask);
ctx.onCommit(
historySize -> {
data.nexusTask = cancelTask;
data.cancelRequestedTime = ctx.currentTime();
data.isBackingOff = false;
});
}

private static void reportNexusOperationCancellation(
Expand Down Expand Up @@ -1238,12 +1272,14 @@ private static void startWorkflow(
a.setParentWorkflowNamespace(parentExecutionId.getNamespace());
a.setParentWorkflowExecution(parentExecutionId.getExecution());
}
HistoryEvent event =
HistoryEvent.Builder event =
HistoryEvent.newBuilder()
.setEventType(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED)
.setWorkflowExecutionStartedEventAttributes(a)
.build();
ctx.addEvent(event);
.setWorkflowExecutionStartedEventAttributes(a);
if (request.getLinksCount() > 0) {
event.addAllLinks(request.getLinksList());
}
ctx.addEvent(event.build());
}

private static void completeWorkflow(
Expand Down
Loading

0 comments on commit 393045d

Please sign in to comment.