diff --git a/temporal-sdk/src/main/java/io/temporal/activity/LocalActivityOptions.java b/temporal-sdk/src/main/java/io/temporal/activity/LocalActivityOptions.java index 031083747..8f66ec862 100644 --- a/temporal-sdk/src/main/java/io/temporal/activity/LocalActivityOptions.java +++ b/temporal-sdk/src/main/java/io/temporal/activity/LocalActivityOptions.java @@ -21,6 +21,7 @@ package io.temporal.activity; import com.google.common.base.Objects; +import io.temporal.common.Experimental; import io.temporal.common.MethodRetry; import io.temporal.common.RetryOptions; import java.time.Duration; @@ -56,6 +57,7 @@ public static final class Builder { private Duration localRetryThreshold; private RetryOptions retryOptions; private Boolean doNotIncludeArgumentsIntoMarker; + private String summary; /** Copy Builder fields from the options. */ private Builder(LocalActivityOptions options) { @@ -68,6 +70,7 @@ private Builder(LocalActivityOptions options) { this.localRetryThreshold = options.getLocalRetryThreshold(); this.retryOptions = options.getRetryOptions(); this.doNotIncludeArgumentsIntoMarker = options.isDoNotIncludeArgumentsIntoMarker(); + this.summary = options.getSummary(); } /** @@ -178,6 +181,18 @@ public Builder setDoNotIncludeArgumentsIntoMarker(boolean doNotIncludeArgumentsI return this; } + /** + * Single-line fixed summary for this activity that will appear in UI/CLI. This can be in + * single-line Temporal Markdown format. + * + *

Default is none/empty. + */ + @Experimental + public Builder setSummary(String summary) { + this.summary = summary; + return this; + } + public Builder mergeActivityOptions(LocalActivityOptions override) { if (override == null) { return this; @@ -204,6 +219,7 @@ public Builder mergeActivityOptions(LocalActivityOptions override) { (override.doNotIncludeArgumentsIntoMarker != null) ? override.doNotIncludeArgumentsIntoMarker : this.doNotIncludeArgumentsIntoMarker; + this.summary = (override.summary == null) ? this.summary : override.summary; return this; } @@ -214,7 +230,8 @@ public LocalActivityOptions build() { scheduleToStartTimeout, localRetryThreshold, retryOptions, - doNotIncludeArgumentsIntoMarker); + doNotIncludeArgumentsIntoMarker, + summary); } public LocalActivityOptions validateAndBuildWithDefaults() { @@ -228,7 +245,8 @@ public LocalActivityOptions validateAndBuildWithDefaults() { scheduleToStartTimeout, localRetryThreshold, RetryOptions.newBuilder(retryOptions).validateBuildWithDefaults(), - doNotIncludeArgumentsIntoMarker); + doNotIncludeArgumentsIntoMarker, + summary); } } @@ -238,6 +256,7 @@ public LocalActivityOptions validateAndBuildWithDefaults() { private final Duration scheduleToStartTimeout; private final RetryOptions retryOptions; private final Boolean doNotIncludeArgumentsIntoMarker; + private final String summary; private LocalActivityOptions( Duration startToCloseTimeout, @@ -245,13 +264,15 @@ private LocalActivityOptions( Duration scheduleToStartTimeout, Duration localRetryThreshold, RetryOptions retryOptions, - Boolean doNotIncludeArgumentsIntoMarker) { + Boolean doNotIncludeArgumentsIntoMarker, + String summary) { this.scheduleToCloseTimeout = scheduleToCloseTimeout; this.startToCloseTimeout = startToCloseTimeout; this.scheduleToStartTimeout = scheduleToStartTimeout; this.localRetryThreshold = localRetryThreshold; this.retryOptions = retryOptions; this.doNotIncludeArgumentsIntoMarker = doNotIncludeArgumentsIntoMarker; + this.summary = summary; } public Duration getScheduleToCloseTimeout() { @@ -278,6 +299,11 @@ public boolean isDoNotIncludeArgumentsIntoMarker() { return doNotIncludeArgumentsIntoMarker != null && doNotIncludeArgumentsIntoMarker; } + @Experimental + public String getSummary() { + return summary; + } + public Builder toBuilder() { return new Builder(this); } @@ -292,7 +318,8 @@ public boolean equals(Object o) { && Objects.equal(startToCloseTimeout, that.startToCloseTimeout) && Objects.equal(scheduleToStartTimeout, that.scheduleToStartTimeout) && Objects.equal(localRetryThreshold, that.localRetryThreshold) - && Objects.equal(retryOptions, that.retryOptions); + && Objects.equal(retryOptions, that.retryOptions) + && Objects.equal(summary, that.summary); } @Override @@ -303,7 +330,8 @@ public int hashCode() { scheduleToStartTimeout, localRetryThreshold, retryOptions, - doNotIncludeArgumentsIntoMarker); + doNotIncludeArgumentsIntoMarker, + summary); } @Override @@ -319,6 +347,8 @@ public String toString() { + retryOptions + ", doNotIncludeArgumentsIntoMarker=" + isDoNotIncludeArgumentsIntoMarker() + + ", summary=" + + summary + '}'; } } diff --git a/temporal-sdk/src/main/java/io/temporal/internal/statemachines/ExecuteLocalActivityParameters.java b/temporal-sdk/src/main/java/io/temporal/internal/statemachines/ExecuteLocalActivityParameters.java index ab767ff40..232661052 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/statemachines/ExecuteLocalActivityParameters.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/statemachines/ExecuteLocalActivityParameters.java @@ -23,6 +23,7 @@ import io.temporal.api.common.v1.ActivityType; import io.temporal.api.common.v1.Payloads; import io.temporal.api.failure.v1.Failure; +import io.temporal.api.sdk.v1.UserMetadata; import io.temporal.api.workflowservice.v1.PollActivityTaskQueueResponse; import io.temporal.internal.common.ProtobufTimeUtils; import io.temporal.workflow.Functions; @@ -51,6 +52,7 @@ public class ExecuteLocalActivityParameters { private final boolean doNotIncludeArgumentsIntoMarker; private final @Nullable Duration scheduleToStartTimeout; private @Nullable Functions.Proc onNewAttemptCallback; + private final UserMetadata metadata; public ExecuteLocalActivityParameters( @Nonnull PollActivityTaskQueueResponse.Builder activityTaskBuilder, @@ -58,7 +60,8 @@ public ExecuteLocalActivityParameters( long originalScheduledTimestamp, @Nullable Failure previousLocalExecutionFailure, boolean doNotIncludeArgumentsIntoMarker, - @Nonnull Duration localRetryThreshold) { + @Nonnull Duration localRetryThreshold, + UserMetadata metadata) { this.activityTaskBuilder = Objects.requireNonNull(activityTaskBuilder, "activityTaskBuilder"); this.scheduleToStartTimeout = scheduleToStartTimeout; this.originalScheduledTimestamp = originalScheduledTimestamp; @@ -66,6 +69,7 @@ public ExecuteLocalActivityParameters( this.doNotIncludeArgumentsIntoMarker = doNotIncludeArgumentsIntoMarker; this.localRetryThreshold = localRetryThreshold; this.onNewAttemptCallback = null; + this.metadata = metadata; } public String getActivityId() { @@ -136,4 +140,8 @@ public Functions.Proc getOnNewAttemptCallback() { public void setOnNewAttemptCallback(@Nonnull Functions.Proc onNewAttemptCallback) { this.onNewAttemptCallback = onNewAttemptCallback; } + + public UserMetadata getMetadata() { + return metadata; + } } diff --git a/temporal-sdk/src/main/java/io/temporal/internal/statemachines/LocalActivityStateMachine.java b/temporal-sdk/src/main/java/io/temporal/internal/statemachines/LocalActivityStateMachine.java index 4feb781bc..420966967 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/statemachines/LocalActivityStateMachine.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/statemachines/LocalActivityStateMachine.java @@ -31,6 +31,7 @@ import io.temporal.api.failure.v1.CanceledFailureInfo; import io.temporal.api.failure.v1.Failure; import io.temporal.api.history.v1.MarkerRecordedEventAttributes; +import io.temporal.api.sdk.v1.UserMetadata; import io.temporal.api.workflowservice.v1.RespondActivityTaskCanceledRequest; import io.temporal.api.workflowservice.v1.RespondActivityTaskCompletedRequest; import io.temporal.common.converter.DefaultDataConverter; @@ -63,6 +64,7 @@ final class LocalActivityStateMachine private final LocalActivityCallback callback; private ExecuteLocalActivityParameters localActivityParameters; + private @Nullable UserMetadata userMetadata; private final Functions.Func replaying; /** Accepts proposed current time. Returns accepted current time. */ @@ -211,6 +213,7 @@ private LocalActivityStateMachine( this.replaying = replaying; this.setCurrentTimeCallback = setCurrentTimeCallback; this.localActivityParameters = localActivityParameters; + this.userMetadata = localActivityParameters.getMetadata(); this.activityId = localActivityParameters.getActivityId(); this.activityType = localActivityParameters.getActivityType(); this.originalScheduledTimestamp = localActivityParameters.getOriginalScheduledTimestamp(); @@ -342,11 +345,15 @@ private void createMarker() { DefaultDataConverter.STANDARD_INSTANCE.toPayloads(localActivityMarkerMetadata).get()); markerAttributes.putAllDetails(details); } - addCommand( + Command.Builder command = Command.newBuilder() .setCommandType(CommandType.COMMAND_TYPE_RECORD_MARKER) - .setRecordMarkerCommandAttributes(markerAttributes.build()) - .build()); + .setRecordMarkerCommandAttributes(markerAttributes.build()); + if (userMetadata != null) { + command.setUserMetadata(userMetadata); + userMetadata = null; + } + addCommand(command.build()); } private void notifyResultFromEvent() { diff --git a/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java b/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java index 1fcbf59d0..4325ab632 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java @@ -680,13 +680,18 @@ private ExecuteLocalActivityParameters constructExecuteLocalActivityParameters( localRetryThreshold = replayContext.getWorkflowTaskTimeout().multipliedBy(3); } + @Nullable + UserMetadata userMetadata = + makeUserMetaData(options.getSummary(), null, dataConverterWithCurrentWorkflowContext); + return new ExecuteLocalActivityParameters( activityTask, options.getScheduleToStartTimeout(), originalScheduledTime, previousExecutionFailure, options.isDoNotIncludeArgumentsIntoMarker(), - localRetryThreshold); + localRetryThreshold, + userMetadata); } @Override diff --git a/temporal-sdk/src/test/java/io/temporal/internal/replay/OutdatedDirectQueryReplayWorkflowRunTaskHandlerTest.java b/temporal-sdk/src/test/java/io/temporal/internal/replay/OutdatedDirectQueryReplayWorkflowRunTaskHandlerTest.java index 27aaed73b..9948072a6 100644 --- a/temporal-sdk/src/test/java/io/temporal/internal/replay/OutdatedDirectQueryReplayWorkflowRunTaskHandlerTest.java +++ b/temporal-sdk/src/test/java/io/temporal/internal/replay/OutdatedDirectQueryReplayWorkflowRunTaskHandlerTest.java @@ -159,6 +159,7 @@ private ReplayWorkflow createReplayWorkflow(WorkflowExecutionHistory workflowExe System.currentTimeMillis(), null, false, + null, null), (r, e) -> {}); return false; diff --git a/temporal-sdk/src/test/java/io/temporal/internal/replay/ReplayWorkflowRunTaskHandlerTaskHandlerTests.java b/temporal-sdk/src/test/java/io/temporal/internal/replay/ReplayWorkflowRunTaskHandlerTaskHandlerTests.java index 29ae017d6..f4ffd57c3 100644 --- a/temporal-sdk/src/test/java/io/temporal/internal/replay/ReplayWorkflowRunTaskHandlerTaskHandlerTests.java +++ b/temporal-sdk/src/test/java/io/temporal/internal/replay/ReplayWorkflowRunTaskHandlerTaskHandlerTests.java @@ -151,6 +151,7 @@ public void localActivityMeteringHelper() { 0, null, false, + null, null); laMeteringHelper.addNewLocalActivity(executeLA); laMeteringHelper.addNewLocalActivity( @@ -160,6 +161,7 @@ public void localActivityMeteringHelper() { 0, null, false, + null, null)); for (int i = 0; i < 5; i++) { executeLA.getOnNewAttemptCallback().apply(); diff --git a/temporal-sdk/src/test/java/io/temporal/internal/statemachines/LocalActivityStateMachineTest.java b/temporal-sdk/src/test/java/io/temporal/internal/statemachines/LocalActivityStateMachineTest.java index b5fffb8bd..810273be4 100644 --- a/temporal-sdk/src/test/java/io/temporal/internal/statemachines/LocalActivityStateMachineTest.java +++ b/temporal-sdk/src/test/java/io/temporal/internal/statemachines/LocalActivityStateMachineTest.java @@ -102,6 +102,7 @@ protected void buildWorkflow(AsyncWorkflowBuilder builder) { System.currentTimeMillis(), null, true, + null, null); ExecuteLocalActivityParameters parameters2 = new ExecuteLocalActivityParameters( @@ -112,6 +113,7 @@ protected void buildWorkflow(AsyncWorkflowBuilder builder) { System.currentTimeMillis(), null, false, + null, null); ExecuteLocalActivityParameters parameters3 = new ExecuteLocalActivityParameters( @@ -122,6 +124,7 @@ protected void buildWorkflow(AsyncWorkflowBuilder builder) { System.currentTimeMillis(), null, true, + null, null); builder @@ -303,6 +306,7 @@ protected void buildWorkflow(AsyncWorkflowBuilder builder) { System.currentTimeMillis(), null, false, + null, null); builder ., LocalActivityCallback.LocalActivityFailedException>add2( @@ -368,6 +372,7 @@ protected void buildWorkflow(AsyncWorkflowBuilder builder) { System.currentTimeMillis(), null, false, + null, null); // TODO: This is a workaround for the lack of support for child workflow in the test // framework. diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/activityTests/LocalActivityMetadataTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/activityTests/LocalActivityMetadataTest.java new file mode 100644 index 000000000..0e7b8ef66 --- /dev/null +++ b/temporal-sdk/src/test/java/io/temporal/workflow/activityTests/LocalActivityMetadataTest.java @@ -0,0 +1,104 @@ +/* + * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this material except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.temporal.workflow.activityTests; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assume.assumeTrue; + +import io.temporal.activity.LocalActivityOptions; +import io.temporal.api.common.v1.WorkflowExecution; +import io.temporal.api.history.v1.HistoryEvent; +import io.temporal.client.WorkflowStub; +import io.temporal.common.WorkflowExecutionHistory; +import io.temporal.common.converter.DefaultDataConverter; +import io.temporal.testing.internal.SDKTestWorkflowRule; +import io.temporal.workflow.Workflow; +import io.temporal.workflow.shared.TestActivities; +import io.temporal.workflow.shared.TestWorkflows.TestWorkflow1; +import java.time.Duration; +import java.util.List; +import java.util.stream.Collectors; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; + +public class LocalActivityMetadataTest { + + @Rule + public SDKTestWorkflowRule testWorkflowRule = + SDKTestWorkflowRule.newBuilder() + .setWorkflowTypes(TestWorkflowImpl.class) + .setActivityImplementations(new TestActivities.TestActivitiesImpl()) + .build(); + + static final String localActivitySummary = "local-activity-summary"; + + @Before + public void checkRealServer() { + assumeTrue("skipping for test server", SDKTestWorkflowRule.useExternalService); + } + + @Test + public void testLocalActivityWithMetaData() { + TestWorkflow1 stub = testWorkflowRule.newWorkflowStubTimeoutOptions(TestWorkflow1.class); + stub.execute(testWorkflowRule.getTaskQueue()); + + WorkflowExecution exec = WorkflowStub.fromTyped(stub).getExecution(); + WorkflowExecutionHistory workflowExecutionHistory = + testWorkflowRule.getWorkflowClient().fetchHistory(exec.getWorkflowId()); + List localActivityScheduledEvents = + workflowExecutionHistory.getEvents().stream() + .filter(HistoryEvent::hasMarkerRecordedEventAttributes) + .collect(Collectors.toList()); + assertEventMetadata(localActivityScheduledEvents.get(0), localActivitySummary, null); + } + + private void assertEventMetadata(HistoryEvent event, String summary, String details) { + if (summary != null) { + String describedSummary = + DefaultDataConverter.STANDARD_INSTANCE.fromPayload( + event.getUserMetadata().getSummary(), String.class, String.class); + assertEquals(summary, describedSummary); + } + if (details != null) { + String describedDetails = + DefaultDataConverter.STANDARD_INSTANCE.fromPayload( + event.getUserMetadata().getDetails(), String.class, String.class); + assertEquals(details, describedDetails); + } + } + + public static class TestWorkflowImpl implements TestWorkflow1 { + + private final TestActivities.VariousTestActivities activities = + Workflow.newLocalActivityStub( + TestActivities.VariousTestActivities.class, + LocalActivityOptions.newBuilder() + .setSummary(localActivitySummary) + .setStartToCloseTimeout(Duration.ofSeconds(5)) + .build()); + + @Override + public String execute(String taskQueue) { + return activities.activity(); + } + } +}