From c96f8d61168d14a5a6e612750ac614b1b57b8d46 Mon Sep 17 00:00:00 2001 From: Quinn Klassen Date: Wed, 30 Oct 2024 17:09:48 -0700 Subject: [PATCH] Add workflow metadata query (#2301) Add workflow metadata query --- .../io/temporal/client/WorkflowClient.java | 3 + .../WorkflowOutboundCallsInterceptor.java | 70 ++++++++ .../common/metadata/POJOWorkflowMethod.java | 17 +- .../metadata/POJOWorkflowMethodMetadata.java | 6 + .../internal/sync/QueryDispatcher.java | 26 ++- .../internal/sync/SignalDispatcher.java | 22 +++ .../temporal/internal/sync/SyncWorkflow.java | 8 + .../internal/sync/SyncWorkflowContext.java | 49 ++++++ .../internal/sync/UpdateDispatcher.java | 22 +++ .../internal/sync/WorkflowInternal.java | 12 ++ .../workflow/DynamicQueryHandler.java | 5 + .../workflow/DynamicSignalHandler.java | 5 + .../workflow/DynamicUpdateHandler.java | 5 + .../io/temporal/workflow/QueryMethod.java | 3 + .../io/temporal/workflow/SignalMethod.java | 3 + .../io/temporal/workflow/UpdateMethod.java | 3 + .../java/io/temporal/workflow/Workflow.java | 21 +++ .../CleanActivityWorkerShutdownTest.java | 1 - .../CleanNexusWorkerShutdownTest.java | 4 +- .../workflow/WorkflowMetadataTest.java | 164 ++++++++++++++++++ .../internal/TracingWorkerInterceptor.java | 1 + 21 files changed, 443 insertions(+), 7 deletions(-) create mode 100644 temporal-sdk/src/test/java/io/temporal/workflow/WorkflowMetadataTest.java diff --git a/temporal-sdk/src/main/java/io/temporal/client/WorkflowClient.java b/temporal-sdk/src/main/java/io/temporal/client/WorkflowClient.java index 75d04118a..80c8044b2 100644 --- a/temporal-sdk/src/main/java/io/temporal/client/WorkflowClient.java +++ b/temporal-sdk/src/main/java/io/temporal/client/WorkflowClient.java @@ -115,6 +115,9 @@ public interface WorkflowClient { /** Use this constant as a query type to get a workflow stack trace. */ String QUERY_TYPE_STACK_TRACE = "__stack_trace"; + /** Use this constant as a query type to get the workflow metadata. */ + String QUERY_TYPE_WORKFLOW_METADATA = "__temporal_workflow_metadata"; + /** Replays workflow to the current state and returns empty result or error if replay failed. */ String QUERY_TYPE_REPLAY_ONLY = "__replay_only"; diff --git a/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowOutboundCallsInterceptor.java b/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowOutboundCallsInterceptor.java index 22e339bcc..4361b182e 100644 --- a/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowOutboundCallsInterceptor.java +++ b/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowOutboundCallsInterceptor.java @@ -452,6 +452,7 @@ public Header getHeader() { final class SignalRegistrationRequest { private final String signalType; + private final String description; private final HandlerUnfinishedPolicy unfinishedPolicy; private final Class[] argTypes; private final Type[] genericArgTypes; @@ -464,19 +465,37 @@ public SignalRegistrationRequest( Type[] genericArgTypes, Functions.Proc1 callback) { this.signalType = signalType; + this.description = ""; this.unfinishedPolicy = HandlerUnfinishedPolicy.WARN_AND_ABANDON; this.argTypes = argTypes; this.genericArgTypes = genericArgTypes; this.callback = callback; } + // Kept for backward compatibility + public SignalRegistrationRequest( + String signalType, + HandlerUnfinishedPolicy unfinishedPolicy, + Class[] argTypes, + Type[] genericArgTypes, + Functions.Proc1 callback) { + this.signalType = signalType; + this.description = ""; + this.unfinishedPolicy = unfinishedPolicy; + this.argTypes = argTypes; + this.genericArgTypes = genericArgTypes; + this.callback = callback; + } + public SignalRegistrationRequest( String signalType, + String description, HandlerUnfinishedPolicy unfinishedPolicy, Class[] argTypes, Type[] genericArgTypes, Functions.Proc1 callback) { this.signalType = signalType; + this.description = description; this.unfinishedPolicy = unfinishedPolicy; this.argTypes = argTypes; this.genericArgTypes = genericArgTypes; @@ -487,6 +506,11 @@ public String getSignalType() { return signalType; } + @Experimental + public String getDescription() { + return description; + } + public HandlerUnfinishedPolicy getUnfinishedPolicy() { return unfinishedPolicy; } @@ -519,20 +543,40 @@ public List getRequests() { @Experimental final class UpdateRegistrationRequest { private final String updateName; + private final String description; private final HandlerUnfinishedPolicy unfinishedPolicy; private final Class[] argTypes; private final Type[] genericArgTypes; private final Functions.Func1 executeCallback; private final Functions.Proc1 validateCallback; + // Kept for backward compatibility + public UpdateRegistrationRequest( + String updateName, + HandlerUnfinishedPolicy unfinishedPolicy, + Class[] argTypes, + Type[] genericArgTypes, + Functions.Proc1 validateCallback, + Functions.Func1 executeCallback) { + this.updateName = updateName; + this.description = ""; + this.unfinishedPolicy = unfinishedPolicy; + this.argTypes = argTypes; + this.genericArgTypes = genericArgTypes; + this.validateCallback = validateCallback; + this.executeCallback = executeCallback; + } + public UpdateRegistrationRequest( String updateName, + String description, HandlerUnfinishedPolicy unfinishedPolicy, Class[] argTypes, Type[] genericArgTypes, Functions.Proc1 validateCallback, Functions.Func1 executeCallback) { this.updateName = updateName; + this.description = description; this.unfinishedPolicy = unfinishedPolicy; this.argTypes = argTypes; this.genericArgTypes = genericArgTypes; @@ -544,6 +588,11 @@ public String getUpdateName() { return updateName; } + @Experimental + public String getDescription() { + return description; + } + public HandlerUnfinishedPolicy getUnfinishedPolicy() { return unfinishedPolicy; } @@ -580,16 +629,32 @@ public List getRequests() { final class RegisterQueryInput { private final String queryType; + private final String description; private final Class[] argTypes; private final Type[] genericArgTypes; private final Functions.Func1 callback; + // Kept for backward compatibility + public RegisterQueryInput( + String queryType, + Class[] argTypes, + Type[] genericArgTypes, + Functions.Func1 callback) { + this.queryType = queryType; + this.description = ""; + this.argTypes = argTypes; + this.genericArgTypes = genericArgTypes; + this.callback = callback; + } + public RegisterQueryInput( String queryType, + String description, Class[] argTypes, Type[] genericArgTypes, Functions.Func1 callback) { this.queryType = queryType; + this.description = description; this.argTypes = argTypes; this.genericArgTypes = genericArgTypes; this.callback = callback; @@ -599,6 +664,11 @@ public String getQueryType() { return queryType; } + @Experimental + public String getDescription() { + return description; + } + public Class[] getArgTypes() { return argTypes; } diff --git a/temporal-sdk/src/main/java/io/temporal/common/metadata/POJOWorkflowMethod.java b/temporal-sdk/src/main/java/io/temporal/common/metadata/POJOWorkflowMethod.java index f6cfba9af..a4c5bf67a 100644 --- a/temporal-sdk/src/main/java/io/temporal/common/metadata/POJOWorkflowMethod.java +++ b/temporal-sdk/src/main/java/io/temporal/common/metadata/POJOWorkflowMethod.java @@ -31,6 +31,7 @@ final class POJOWorkflowMethod { private final WorkflowMethodType type; private final Method method; private final Optional nameFromAnnotation; + private final Optional descriptionFromAnnotation; POJOWorkflowMethod(Method method) { this.method = Objects.requireNonNull(method); @@ -43,6 +44,7 @@ final class POJOWorkflowMethod { int count = 0; WorkflowMethodType type = null; String name = null; + String description = null; if (workflowMethod != null) { type = WorkflowMethodType.WORKFLOW; count++; @@ -56,6 +58,7 @@ final class POJOWorkflowMethod { } count++; name = signalMethod.name(); + description = signalMethod.description(); } if (queryMethod != null) { type = WorkflowMethodType.QUERY; @@ -65,11 +68,13 @@ final class POJOWorkflowMethod { } count++; name = queryMethod.name(); + description = queryMethod.description(); } if (updateMethod != null) { type = WorkflowMethodType.UPDATE; count++; name = updateMethod.name(); + description = updateMethod.description(); } if (updateValidatorMethod != null) { type = WorkflowMethodType.UPDATE_VALIDATOR; @@ -86,13 +91,19 @@ final class POJOWorkflowMethod { throw new IllegalArgumentException( method + " must contain exactly one annotation " - + "of @WorkflowMethod, @QueryMethod or @SignalMethod"); + + "of @WorkflowMethod, @QueryMethod @UpdateMethod or @SignalMethod"); } if (Strings.isNullOrEmpty(name)) { this.nameFromAnnotation = Optional.empty(); } else { this.nameFromAnnotation = Optional.of(name); } + + if (Strings.isNullOrEmpty(description)) { + this.descriptionFromAnnotation = Optional.empty(); + } else { + this.descriptionFromAnnotation = Optional.of(description); + } this.type = Objects.requireNonNull(type); } @@ -108,6 +119,10 @@ public Optional getNameFromAnnotation() { return nameFromAnnotation; } + public Optional getDescriptionFromAnnotation() { + return descriptionFromAnnotation; + } + /** Compare and hash on method only. */ @Override public boolean equals(Object o) { diff --git a/temporal-sdk/src/main/java/io/temporal/common/metadata/POJOWorkflowMethodMetadata.java b/temporal-sdk/src/main/java/io/temporal/common/metadata/POJOWorkflowMethodMetadata.java index a412a444f..e58f2edf7 100644 --- a/temporal-sdk/src/main/java/io/temporal/common/metadata/POJOWorkflowMethodMetadata.java +++ b/temporal-sdk/src/main/java/io/temporal/common/metadata/POJOWorkflowMethodMetadata.java @@ -29,6 +29,7 @@ public final class POJOWorkflowMethodMetadata { private final POJOWorkflowMethod workflowMethod; private final String name; + private final String description; private final Class workflowInterface; POJOWorkflowMethodMetadata(POJOWorkflowMethod methodMetadata, Class workflowInterface) { @@ -47,6 +48,7 @@ public final class POJOWorkflowMethodMetadata { } else { this.name = nameFromAnnotation.orElse(methodMetadata.getMethod().getName()); } + this.description = workflowMethod.getDescriptionFromAnnotation().orElse(""); } public WorkflowMethodType getType() { @@ -62,6 +64,10 @@ public String getName() { return name; } + public String getDescription() { + return description; + } + public Method getWorkflowMethod() { return workflowMethod.getMethod(); } diff --git a/temporal-sdk/src/main/java/io/temporal/internal/sync/QueryDispatcher.java b/temporal-sdk/src/main/java/io/temporal/internal/sync/QueryDispatcher.java index 891addf8c..ac0597a0d 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/sync/QueryDispatcher.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/sync/QueryDispatcher.java @@ -21,15 +21,14 @@ package io.temporal.internal.sync; import io.temporal.api.common.v1.Payloads; +import io.temporal.api.sdk.v1.WorkflowInteractionDefinition; import io.temporal.common.converter.DataConverter; import io.temporal.common.converter.EncodedValues; import io.temporal.common.interceptors.Header; import io.temporal.common.interceptors.WorkflowInboundCallsInterceptor; import io.temporal.common.interceptors.WorkflowOutboundCallsInterceptor; import io.temporal.workflow.DynamicQueryHandler; -import java.util.HashMap; -import java.util.Map; -import java.util.Optional; +import java.util.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -103,4 +102,25 @@ public void registerDynamicQueryHandler( WorkflowOutboundCallsInterceptor.RegisterDynamicQueryHandlerInput input) { dynamicQueryHandler = input.getHandler(); } + + public List getQueryHandlers() { + List handlers = new ArrayList<>(queryCallbacks.size() + 1); + for (Map.Entry entry : + queryCallbacks.entrySet()) { + WorkflowOutboundCallsInterceptor.RegisterQueryInput handler = entry.getValue(); + handlers.add( + WorkflowInteractionDefinition.newBuilder() + .setName(handler.getQueryType()) + .setDescription(handler.getDescription()) + .build()); + } + if (dynamicQueryHandler != null) { + handlers.add( + WorkflowInteractionDefinition.newBuilder() + .setDescription(dynamicQueryHandler.getDescription()) + .build()); + } + handlers.sort(Comparator.comparing(WorkflowInteractionDefinition::getName)); + return handlers; + } } diff --git a/temporal-sdk/src/main/java/io/temporal/internal/sync/SignalDispatcher.java b/temporal-sdk/src/main/java/io/temporal/internal/sync/SignalDispatcher.java index 5900cf051..6a0b5c1cf 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/sync/SignalDispatcher.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/sync/SignalDispatcher.java @@ -21,6 +21,7 @@ package io.temporal.internal.sync; import io.temporal.api.common.v1.Payloads; +import io.temporal.api.sdk.v1.WorkflowInteractionDefinition; import io.temporal.common.converter.DataConverter; import io.temporal.common.converter.DataConverterException; import io.temporal.common.converter.EncodedValues; @@ -160,6 +161,27 @@ private void logSerializationException( Workflow.getMetricsScope().counter(MetricsType.CORRUPTED_SIGNALS_COUNTER).inc(1); } + public List getSignalHandlers() { + List handlers = new ArrayList<>(signalCallbacks.size() + 1); + for (Map.Entry entry : + signalCallbacks.entrySet()) { + WorkflowOutboundCallsInterceptor.SignalRegistrationRequest handler = entry.getValue(); + handlers.add( + WorkflowInteractionDefinition.newBuilder() + .setName(handler.getSignalType()) + .setDescription(handler.getDescription()) + .build()); + } + if (dynamicSignalHandler != null) { + handlers.add( + WorkflowInteractionDefinition.newBuilder() + .setDescription(dynamicSignalHandler.getDescription()) + .build()); + } + handlers.sort(Comparator.comparing(WorkflowInteractionDefinition::getName)); + return handlers; + } + private static class SignalData { private final String signalName; private final Optional payload; diff --git a/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflow.java b/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflow.java index d12d765a2..dc32ba359 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflow.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflow.java @@ -39,6 +39,7 @@ import io.temporal.internal.statemachines.UpdateProtocolCallback; import io.temporal.internal.worker.WorkflowExecutionException; import io.temporal.internal.worker.WorkflowExecutorCache; +import io.temporal.payload.context.WorkflowSerializationContext; import io.temporal.worker.WorkflowImplementationOptions; import io.temporal.workflow.UpdateInfo; import java.util.List; @@ -69,6 +70,7 @@ class SyncWorkflow implements ReplayWorkflow { private WorkflowExecutionHandler workflowProc; private DeterministicRunner runner; private DataConverter dataConverter; + private DataConverter dataConverterWithWorkflowContext; public SyncWorkflow( String namespace, @@ -92,6 +94,9 @@ public SyncWorkflow( this.cache = cache; this.defaultDeadlockDetectionTimeout = defaultDeadlockDetectionTimeout; this.dataConverter = dataConverter; + this.dataConverterWithWorkflowContext = + dataConverter.withContext( + new WorkflowSerializationContext(namespace, workflowExecution.getWorkflowId())); this.workflowContext = new SyncWorkflowContext( namespace, @@ -238,6 +243,9 @@ public Optional query(WorkflowQuery query) { // converter return DefaultDataConverter.STANDARD_INSTANCE.toPayloads(runner.stackTrace()); } + if (WorkflowClient.QUERY_TYPE_WORKFLOW_METADATA.equals(query.getQueryType())) { + return dataConverterWithWorkflowContext.toPayloads(workflowContext.getWorkflowMetadata()); + } Optional args = query.hasQueryArgs() ? Optional.of(query.getQueryArgs()) : Optional.empty(); return workflowProc.handleQuery(query.getQueryType(), query.getHeader(), args); diff --git a/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java b/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java index 30da4227d..dbc77b144 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java @@ -20,6 +20,8 @@ package io.temporal.internal.sync; +import static io.temporal.client.WorkflowClient.QUERY_TYPE_STACK_TRACE; +import static io.temporal.client.WorkflowClient.QUERY_TYPE_WORKFLOW_METADATA; import static io.temporal.internal.common.HeaderUtils.intoPayloadMap; import static io.temporal.internal.common.HeaderUtils.toHeaderGrpc; import static io.temporal.internal.common.RetryOptionsUtils.toRetryPolicy; @@ -43,6 +45,9 @@ import io.temporal.api.failure.v1.Failure; import io.temporal.api.history.v1.HistoryEvent; import io.temporal.api.sdk.v1.UserMetadata; +import io.temporal.api.sdk.v1.WorkflowDefinition; +import io.temporal.api.sdk.v1.WorkflowInteractionDefinition; +import io.temporal.api.sdk.v1.WorkflowMetadata; import io.temporal.api.taskqueue.v1.TaskQueue; import io.temporal.api.workflowservice.v1.PollActivityTaskQueueResponse; import io.temporal.client.WorkflowException; @@ -124,6 +129,7 @@ final class SyncWorkflowContext implements WorkflowContext, WorkflowOutboundCall private Map runningUpdateHandlers = new HashMap<>(); // Map of all running signal handlers. Key is the event Id of the signal event. private Map runningSignalHandlers = new HashMap<>(); + @Nullable private String currentDetails; public SyncWorkflowContext( @Nonnull String namespace, @@ -372,6 +378,40 @@ public boolean isEveryHandlerFinished() { && signalDispatcher.getRunningSignalHandlers().isEmpty(); } + public WorkflowMetadata getWorkflowMetadata() { + WorkflowMetadata.Builder workflowMetadata = WorkflowMetadata.newBuilder(); + WorkflowDefinition.Builder workflowDefinition = WorkflowDefinition.newBuilder(); + // Set the workflow type + if (replayContext.getWorkflowType() != null + && replayContext.getWorkflowType().getName() != null) { + workflowDefinition.setType(replayContext.getWorkflowType().getName()); + } + // Set built in queries + workflowDefinition.addQueryDefinitions( + WorkflowInteractionDefinition.newBuilder() + .setName(QUERY_TYPE_STACK_TRACE) + .setDescription("Current stack trace") + .build()); + workflowDefinition.addQueryDefinitions( + WorkflowInteractionDefinition.newBuilder() + .setName(QUERY_TYPE_WORKFLOW_METADATA) + .setDescription("Metadata about the workflow") + .build()); + // Add user defined queries + workflowDefinition.addAllQueryDefinitions(queryDispatcher.getQueryHandlers()); + // Add user defined signals + workflowDefinition.addAllSignalDefinitions(signalDispatcher.getSignalHandlers()); + // Add user defined update handlers + workflowDefinition.addAllUpdateDefinitions(updateDispatcher.getUpdateHandlers()); + // Set the workflow definition + workflowMetadata.setDefinition(workflowDefinition.build()); + // Add the current workflow details + if (currentDetails != null) { + workflowMetadata.setCurrentDetails(currentDetails); + } + return workflowMetadata.build(); + } + private class ActivityCallback { private final CompletablePromise> result = Workflow.newPromise(); @@ -1438,6 +1478,15 @@ public void setCurrentUpdateInfo(UpdateInfo updateInfo) { currentUpdateInfo.set(updateInfo); } + public void setCurrentDetails(String details) { + currentDetails = details; + } + + @Nullable + public String getCurrentDetails() { + return currentDetails; + } + public Optional getCurrentUpdateInfo() { return Optional.ofNullable(currentUpdateInfo.get()); } diff --git a/temporal-sdk/src/main/java/io/temporal/internal/sync/UpdateDispatcher.java b/temporal-sdk/src/main/java/io/temporal/internal/sync/UpdateDispatcher.java index 9edec581e..f4328aa7c 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/sync/UpdateDispatcher.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/sync/UpdateDispatcher.java @@ -21,6 +21,7 @@ package io.temporal.internal.sync; import io.temporal.api.common.v1.Payloads; +import io.temporal.api.sdk.v1.WorkflowInteractionDefinition; import io.temporal.common.converter.DataConverter; import io.temporal.common.converter.EncodedValues; import io.temporal.common.interceptors.Header; @@ -173,4 +174,25 @@ public UpdateOutput handleInterceptedExecuteUpdate(UpdateInput input) { public Map getRunningUpdateHandlers() { return runningUpdateHandlers; } + + public List getUpdateHandlers() { + List handlers = new ArrayList<>(); + for (Map.Entry entry : + updateCallbacks.entrySet()) { + UpdateRegistrationRequest handler = entry.getValue(); + handlers.add( + WorkflowInteractionDefinition.newBuilder() + .setName(handler.getUpdateName()) + .setDescription(handler.getDescription()) + .build()); + } + if (dynamicUpdateHandler != null) { + handlers.add( + WorkflowInteractionDefinition.newBuilder() + .setDescription(dynamicUpdateHandler.getDescription()) + .build()); + } + handlers.sort(Comparator.comparing(WorkflowInteractionDefinition::getName)); + return handlers; + } } diff --git a/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowInternal.java b/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowInternal.java index 7a9d3b953..55444d50e 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowInternal.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowInternal.java @@ -175,6 +175,7 @@ public static void registerListener(Object implementation) { .registerQuery( new WorkflowOutboundCallsInterceptor.RegisterQueryInput( methodMetadata.getName(), + methodMetadata.getDescription(), method.getParameterTypes(), method.getGenericParameterTypes(), (args) -> { @@ -192,6 +193,7 @@ public static void registerListener(Object implementation) { requests.add( new WorkflowOutboundCallsInterceptor.SignalRegistrationRequest( methodMetadata.getName(), + methodMetadata.getDescription(), signalMethod.unfinishedPolicy(), method.getParameterTypes(), method.getGenericParameterTypes(), @@ -250,6 +252,7 @@ public static void registerListener(Object implementation) { updateRequests.add( new WorkflowOutboundCallsInterceptor.UpdateRegistrationRequest( methodMetadata.getName(), + methodMetadata.getDescription(), updateMethod.unfinishedPolicy(), method.getParameterTypes(), method.getGenericParameterTypes(), @@ -825,6 +828,15 @@ public static NexusOperationHandle startNexusOperation(Functions.Func return StartNexusCallInternal.startNexusOperation(() -> operation.apply()); } + public static void setCurrentDetails(String details) { + getRootWorkflowContext().setCurrentDetails(details); + } + + @Nullable + public static String getCurrentDetails() { + return getRootWorkflowContext().getCurrentDetails(); + } + static WorkflowOutboundCallsInterceptor getWorkflowOutboundInterceptor() { return getRootWorkflowContext().getWorkflowOutboundInterceptor(); } diff --git a/temporal-sdk/src/main/java/io/temporal/workflow/DynamicQueryHandler.java b/temporal-sdk/src/main/java/io/temporal/workflow/DynamicQueryHandler.java index e7ee15df7..a243a58f2 100644 --- a/temporal-sdk/src/main/java/io/temporal/workflow/DynamicQueryHandler.java +++ b/temporal-sdk/src/main/java/io/temporal/workflow/DynamicQueryHandler.java @@ -37,4 +37,9 @@ */ public interface DynamicQueryHandler { Object handle(String queryType, EncodedValues args); + + /** Short description of the Query handler. */ + default String getDescription() { + return "Dynamic query handler"; + } } diff --git a/temporal-sdk/src/main/java/io/temporal/workflow/DynamicSignalHandler.java b/temporal-sdk/src/main/java/io/temporal/workflow/DynamicSignalHandler.java index 9f2850230..9510512fc 100644 --- a/temporal-sdk/src/main/java/io/temporal/workflow/DynamicSignalHandler.java +++ b/temporal-sdk/src/main/java/io/temporal/workflow/DynamicSignalHandler.java @@ -42,4 +42,9 @@ public interface DynamicSignalHandler { default HandlerUnfinishedPolicy getUnfinishedPolicy(String signalName) { return HandlerUnfinishedPolicy.WARN_AND_ABANDON; } + + /** Short description of the Update handler. */ + default String getDescription() { + return "Dynamic signal handler"; + } } diff --git a/temporal-sdk/src/main/java/io/temporal/workflow/DynamicUpdateHandler.java b/temporal-sdk/src/main/java/io/temporal/workflow/DynamicUpdateHandler.java index fbe542985..6434f85cb 100644 --- a/temporal-sdk/src/main/java/io/temporal/workflow/DynamicUpdateHandler.java +++ b/temporal-sdk/src/main/java/io/temporal/workflow/DynamicUpdateHandler.java @@ -41,6 +41,11 @@ default void handleValidate(String updateName, EncodedValues args) {} Object handleExecute(String updateName, EncodedValues args); + /** Short description of the Update handler. */ + default String getDescription() { + return "Dynamic update handler"; + } + /** Returns the actions taken if a workflow exits with a running instance of this handler. */ default HandlerUnfinishedPolicy getUnfinishedPolicy(String updateName) { return HandlerUnfinishedPolicy.WARN_AND_ABANDON; diff --git a/temporal-sdk/src/main/java/io/temporal/workflow/QueryMethod.java b/temporal-sdk/src/main/java/io/temporal/workflow/QueryMethod.java index e1ea4f661..c1ceb78d3 100644 --- a/temporal-sdk/src/main/java/io/temporal/workflow/QueryMethod.java +++ b/temporal-sdk/src/main/java/io/temporal/workflow/QueryMethod.java @@ -43,4 +43,7 @@ * tags. And systems like prometheus ignore metrics which have tags with unsupported characters. */ String name() default ""; + + /** Short description of the query type. Default is an empty string. */ + String description() default ""; } diff --git a/temporal-sdk/src/main/java/io/temporal/workflow/SignalMethod.java b/temporal-sdk/src/main/java/io/temporal/workflow/SignalMethod.java index e57c87e9c..b3839720a 100644 --- a/temporal-sdk/src/main/java/io/temporal/workflow/SignalMethod.java +++ b/temporal-sdk/src/main/java/io/temporal/workflow/SignalMethod.java @@ -60,6 +60,9 @@ */ String name() default ""; + /** Short description of the signal type. Default is to an empty string. */ + String description() default ""; + /** Sets the actions taken if a workflow exits with a running instance of this handler. */ HandlerUnfinishedPolicy unfinishedPolicy() default HandlerUnfinishedPolicy.WARN_AND_ABANDON; } diff --git a/temporal-sdk/src/main/java/io/temporal/workflow/UpdateMethod.java b/temporal-sdk/src/main/java/io/temporal/workflow/UpdateMethod.java index 4c52cc3e6..66e403e0e 100644 --- a/temporal-sdk/src/main/java/io/temporal/workflow/UpdateMethod.java +++ b/temporal-sdk/src/main/java/io/temporal/workflow/UpdateMethod.java @@ -44,6 +44,9 @@ */ String name() default ""; + /** Short description of the update handler. Default is an empty string. */ + String description() default ""; + /** Sets the actions taken if a workflow exits with a running instance of this handler. */ HandlerUnfinishedPolicy unfinishedPolicy() default HandlerUnfinishedPolicy.WARN_AND_ABANDON; } diff --git a/temporal-sdk/src/main/java/io/temporal/workflow/Workflow.java b/temporal-sdk/src/main/java/io/temporal/workflow/Workflow.java index 94528e913..7da164e17 100644 --- a/temporal-sdk/src/main/java/io/temporal/workflow/Workflow.java +++ b/temporal-sdk/src/main/java/io/temporal/workflow/Workflow.java @@ -1364,6 +1364,27 @@ public static NexusOperationHandle startNexusOperation(Functions.Func return WorkflowInternal.startNexusOperation(operation); } + /** + * Sets the current workflows details. + * + * @param details details to set + */ + @Experimental + public static void setCurrentDetails(String details) { + WorkflowInternal.setCurrentDetails(details); + } + + /** + * Get the current workflows details. + * + * @return details of the current workflow + */ + @Experimental + @Nullable + public static String getCurrentDetails() { + return WorkflowInternal.getCurrentDetails(); + } + /** Prohibit instantiation. */ private Workflow() {} } diff --git a/temporal-sdk/src/test/java/io/temporal/worker/shutdown/CleanActivityWorkerShutdownTest.java b/temporal-sdk/src/test/java/io/temporal/worker/shutdown/CleanActivityWorkerShutdownTest.java index 40090e966..3f856a1e9 100644 --- a/temporal-sdk/src/test/java/io/temporal/worker/shutdown/CleanActivityWorkerShutdownTest.java +++ b/temporal-sdk/src/test/java/io/temporal/worker/shutdown/CleanActivityWorkerShutdownTest.java @@ -96,7 +96,6 @@ public void testShutdownNow() throws InterruptedException { .getExecutionHistory(execution.getWorkflowId()) .getHistory() .getEventsList(); - events.forEach(System.out::println); boolean found = false; for (HistoryEvent e : events) { if (e.getEventType() == EventType.EVENT_TYPE_ACTIVITY_TASK_COMPLETED) { diff --git a/temporal-sdk/src/test/java/io/temporal/worker/shutdown/CleanNexusWorkerShutdownTest.java b/temporal-sdk/src/test/java/io/temporal/worker/shutdown/CleanNexusWorkerShutdownTest.java index e9e8e260a..5a36611fa 100644 --- a/temporal-sdk/src/test/java/io/temporal/worker/shutdown/CleanNexusWorkerShutdownTest.java +++ b/temporal-sdk/src/test/java/io/temporal/worker/shutdown/CleanNexusWorkerShutdownTest.java @@ -63,7 +63,7 @@ public class CleanNexusWorkerShutdownTest { .setWorkerOptions(WorkerOptions.newBuilder().setLocalActivityWorkerOnly(true).build()) .build(); - @Test + @Test(timeout = 20000) public void testShutdown() throws InterruptedException { TestWorkflow1 workflow = testWorkflowRule.newWorkflowStub(TestWorkflow1.class); WorkflowExecution execution = WorkflowClient.start(workflow::execute, null); @@ -88,7 +88,7 @@ public void testShutdown() throws InterruptedException { assertTrue("Contains NexusOperationCompleted", found); } - @Test + @Test(timeout = 20000) public void testShutdownNow() throws InterruptedException { TestWorkflow1 workflow = testWorkflowRule.newWorkflowStub(TestWorkflow1.class); WorkflowExecution execution = WorkflowClient.start(workflow::execute, "now"); diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/WorkflowMetadataTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/WorkflowMetadataTest.java new file mode 100644 index 000000000..c2a4a8aaf --- /dev/null +++ b/temporal-sdk/src/test/java/io/temporal/workflow/WorkflowMetadataTest.java @@ -0,0 +1,164 @@ +/* + * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this material except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.temporal.workflow; + +import io.temporal.api.sdk.v1.WorkflowDefinition; +import io.temporal.api.sdk.v1.WorkflowInteractionDefinition; +import io.temporal.api.sdk.v1.WorkflowMetadata; +import io.temporal.client.WorkflowClient; +import io.temporal.client.WorkflowStub; +import io.temporal.testing.internal.SDKTestWorkflowRule; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; + +public class WorkflowMetadataTest { + + @Rule + public SDKTestWorkflowRule testWorkflowRule = + SDKTestWorkflowRule.newBuilder().setWorkflowTypes(TestWorkflowWithMetadataImpl.class).build(); + + @Test + public void testGetMetadata() { + TestWorkflowWithMetadata workflowStub = + testWorkflowRule.newWorkflowStubTimeoutOptions(TestWorkflowWithMetadata.class); + String result = workflowStub.execute("current details"); + Assert.assertEquals("current details", result); + WorkflowMetadata metadata = + WorkflowStub.fromTyped(workflowStub) + .query(WorkflowClient.QUERY_TYPE_WORKFLOW_METADATA, WorkflowMetadata.class); + Assert.assertEquals("current details", metadata.getCurrentDetails()); + WorkflowDefinition definition = metadata.getDefinition(); + Assert.assertEquals("TestWorkflowWithMetadata", definition.getType()); + // Check query definitions and order + Assert.assertEquals(5, definition.getQueryDefinitionsCount()); + Assert.assertEquals( + WorkflowInteractionDefinition.newBuilder() + .setName(WorkflowClient.QUERY_TYPE_STACK_TRACE) + .setDescription("Current stack trace") + .build(), + definition.getQueryDefinitions(0)); + Assert.assertEquals( + WorkflowInteractionDefinition.newBuilder() + .setName(WorkflowClient.QUERY_TYPE_WORKFLOW_METADATA) + .setDescription("Metadata about the workflow") + .build(), + definition.getQueryDefinitions(1)); + Assert.assertEquals( + WorkflowInteractionDefinition.newBuilder().setDescription("Dynamic query handler").build(), + definition.getQueryDefinitions(2)); + Assert.assertEquals( + WorkflowInteractionDefinition.newBuilder().setName("query").build(), + definition.getQueryDefinitions(3)); + Assert.assertEquals( + WorkflowInteractionDefinition.newBuilder() + .setName("queryWithDescription") + .setDescription("queryWithDescription description") + .build(), + definition.getQueryDefinitions(4)); + // Check signal definitions and order + Assert.assertEquals(3, definition.getSignalDefinitionsCount()); + Assert.assertEquals( + WorkflowInteractionDefinition.newBuilder().setDescription("Dynamic signal handler").build(), + definition.getSignalDefinitions(0)); + Assert.assertEquals( + WorkflowInteractionDefinition.newBuilder().setName("signal").build(), + definition.getSignalDefinitions(1)); + Assert.assertEquals( + WorkflowInteractionDefinition.newBuilder() + .setName("signalWithDescription") + .setDescription("signalWithDescription description") + .build(), + definition.getSignalDefinitions(2)); + // Check update definitions and order + Assert.assertEquals(3, definition.getUpdateDefinitionsCount()); + Assert.assertEquals( + WorkflowInteractionDefinition.newBuilder().setDescription("Dynamic update handler").build(), + definition.getUpdateDefinitions(0)); + Assert.assertEquals( + WorkflowInteractionDefinition.newBuilder().setName("update").build(), + definition.getUpdateDefinitions(1)); + Assert.assertEquals( + WorkflowInteractionDefinition.newBuilder() + .setName("updateWithDescription") + .setDescription("updateWithDescription description") + .build(), + definition.getUpdateDefinitions(2)); + } + + @WorkflowInterface + public interface TestWorkflowWithMetadata { + @WorkflowMethod + String execute(String arg); + + @SignalMethod + void signal(String value); + + @QueryMethod + String query(); + + @UpdateMethod + void update(String value); + + @SignalMethod(description = "signalWithDescription description") + void signalWithDescription(String value); + + @QueryMethod(description = "queryWithDescription description") + String queryWithDescription(); + + @UpdateMethod(description = "updateWithDescription description") + void updateWithDescription(String value); + } + + public static class TestWorkflowWithMetadataImpl implements TestWorkflowWithMetadata { + + @Override + public String execute(String details) { + Workflow.setCurrentDetails(details); + Workflow.registerListener((DynamicSignalHandler) (signalName, encodedArgs) -> {}); + Workflow.registerListener((DynamicQueryHandler) (queryType, encodedArgs) -> null); + Workflow.registerListener((DynamicUpdateHandler) (updateType, encodedArgs) -> null); + return Workflow.getCurrentDetails(); + } + + @Override + public void signal(String value) {} + + @Override + public String query() { + return null; + } + + @Override + public void update(String value) {} + + @Override + public void signalWithDescription(String value) {} + + @Override + public String queryWithDescription() { + return null; + } + + @Override + public void updateWithDescription(String value) {} + } +} diff --git a/temporal-testing/src/main/java/io/temporal/testing/internal/TracingWorkerInterceptor.java b/temporal-testing/src/main/java/io/temporal/testing/internal/TracingWorkerInterceptor.java index 88bba8b45..30c32ddbd 100644 --- a/temporal-testing/src/main/java/io/temporal/testing/internal/TracingWorkerInterceptor.java +++ b/temporal-testing/src/main/java/io/temporal/testing/internal/TracingWorkerInterceptor.java @@ -308,6 +308,7 @@ public void registerQuery(RegisterQueryInput input) { next.registerQuery( new RegisterQueryInput( queryType, + input.getDescription(), input.getArgTypes(), input.getGenericArgTypes(), (args) -> {