diff --git a/service/history/api/respondworkflowtaskcompleted/command_checker.go b/service/history/api/respondworkflowtaskcompleted/command_checker.go index 17c14623d7b..934465cd2a9 100644 --- a/service/history/api/respondworkflowtaskcompleted/command_checker.go +++ b/service/history/api/respondworkflowtaskcompleted/command_checker.go @@ -782,6 +782,14 @@ func (v *commandAttrValidator) validateStartChildExecutionAttributes( return failedCause, fmt.Errorf("invalid WorkflowRetryPolicy on StartChildWorkflowExecutionCommand: %w. WorkflowId=%s WorkflowType=%s Namespace=%s", err, wfID, wfType, ns) } + if len(attributes.GetCronSchedule()) > 0 && attributes.GetWorkflowStartDelay() != nil { + return failedCause, fmt.Errorf("CronSchedule and WorkflowStartDelay may not be used together on StartChildWorkflowExecutionCommand. WorkflowId=%s WorkflowType=%s Namespace=%s", wfID, wfType, ns) + } + + if err := timer.ValidateAndCapTimer(attributes.GetWorkflowStartDelay()); err != nil { + return failedCause, serviceerror.NewInvalidArgument(fmt.Sprintf("Invalid WorkflowStartDelay on StartChildWorkflowExecutionCommand: %v. WorkflowId=%s WorkflowType=%s Namespace=%s", err, wfID, wfType, ns)) + } + if err := backoff.ValidateSchedule(attributes.GetCronSchedule()); err != nil { return failedCause, fmt.Errorf("invalid CronSchedule on StartChildWorkflowExecutionCommand: %w. WorkflowId=%s WorkflowType=%s Namespace=%s", err, wfID, wfType, ns) } @@ -800,9 +808,13 @@ func (v *commandAttrValidator) validateStartChildExecutionAttributes( // workflow execution timeout is left as is // if workflow execution timeout == 0 -> infinity - attributes.WorkflowRunTimeout = durationpb.New(common.OverrideWorkflowRunTimeout(attributes.GetWorkflowRunTimeout().AsDuration(), attributes.GetWorkflowExecutionTimeout().AsDuration())) + if attributes.GetWorkflowRunTimeout() != nil { + attributes.WorkflowRunTimeout = durationpb.New(common.OverrideWorkflowRunTimeout(attributes.GetWorkflowRunTimeout().AsDuration(), attributes.GetWorkflowExecutionTimeout().AsDuration())) + } - attributes.WorkflowTaskTimeout = durationpb.New(common.OverrideWorkflowTaskTimeout(targetNamespace.String(), attributes.GetWorkflowTaskTimeout().AsDuration(), attributes.GetWorkflowRunTimeout().AsDuration(), defaultWorkflowTaskTimeoutFn)) + if attributes.GetWorkflowTaskTimeout() != nil { + attributes.WorkflowTaskTimeout = durationpb.New(common.OverrideWorkflowTaskTimeout(targetNamespace.String(), attributes.GetWorkflowTaskTimeout().AsDuration(), attributes.GetWorkflowRunTimeout().AsDuration(), defaultWorkflowTaskTimeoutFn)) + } return enumspb.WORKFLOW_TASK_FAILED_CAUSE_UNSPECIFIED, nil } diff --git a/service/history/api/respondworkflowtaskcompleted/command_checker_test.go b/service/history/api/respondworkflowtaskcompleted/command_checker_test.go index 8e9bbe1f1ac..77ba92c0e62 100644 --- a/service/history/api/respondworkflowtaskcompleted/command_checker_test.go +++ b/service/history/api/respondworkflowtaskcompleted/command_checker_test.go @@ -267,6 +267,55 @@ func (s *commandAttrValidatorSuite) TestValidateContinueAsNewWorkflowExecutionAt s.Equal(common.MaxWorkflowTaskStartToCloseTimeout, attributes.GetWorkflowTaskTimeout().AsDuration()) } +func (s *commandAttrValidatorSuite) TestValidateStartChildExecutionAttributes() { + const expectedFailedCause = enumspb.WORKFLOW_TASK_FAILED_CAUSE_BAD_START_CHILD_EXECUTION_ATTRIBUTES + + namespaceID := namespace.ID("aaa-bbb") + namespace := namespace.Name("tests.Namespace") + var attributes *commandpb.StartChildWorkflowExecutionCommandAttributes + var parentInfo *persistencespb.WorkflowExecutionInfo + + fc, err := s.validator.validateStartChildExecutionAttributes(namespaceID, namespaceID, namespace, attributes, parentInfo, nil) + s.EqualError(err, "StartChildWorkflowExecutionCommandAttributes is not set on StartChildWorkflowExecutionCommand.") + s.Equal(expectedFailedCause, fc) + + attributes = &commandpb.StartChildWorkflowExecutionCommandAttributes{} + parentInfo = &persistencespb.WorkflowExecutionInfo{} + fc, err = s.validator.validateStartChildExecutionAttributes(namespaceID, namespaceID, namespace, attributes, parentInfo, nil) + s.ErrorContains(err, "Required field WorkflowId is not set on StartChildWorkflowExecutionCommand.") + s.Equal(expectedFailedCause, fc) + + attributes.WorkflowId = "workflowId" + fc, err = s.validator.validateStartChildExecutionAttributes(namespaceID, namespaceID, namespace, attributes, parentInfo, nil) + s.ErrorContains(err, "Required field WorkflowType is not set on StartChildWorkflowExecutionCommand.") + s.Equal(expectedFailedCause, fc) + + attributes.WorkflowType = &commonpb.WorkflowType{ + Name: "workflowType", + } + fc, err = s.validator.validateStartChildExecutionAttributes(namespaceID, namespaceID, namespace, attributes, parentInfo, nil) + s.ErrorContains(err, "invalid TaskQueue on StartChildWorkflowExecutionCommand") + s.Equal(expectedFailedCause, fc) + + attributes.TaskQueue = &taskqueuepb.TaskQueue{ + Name: "task-queue", + Kind: enumspb.TASK_QUEUE_KIND_NORMAL, + } + fc, err = s.validator.validateStartChildExecutionAttributes(namespaceID, namespaceID, namespace, attributes, parentInfo, nil) + s.Equal(nil, err) + + // valid WorkflowStartDelay + attributes.CronSchedule = "0 0 * * *" + attributes.WorkflowStartDelay = durationpb.New(10 * time.Second) + fc, err = s.validator.validateStartChildExecutionAttributes(namespaceID, namespaceID, namespace, attributes, parentInfo, nil) + s.ErrorContains(err, "CronSchedule and WorkflowStartDelay may not be used together on StartChildWorkflowExecutionCommand") + s.Equal(expectedFailedCause, fc) + + attributes.CronSchedule = "" + fc, err = s.validator.validateStartChildExecutionAttributes(namespaceID, namespaceID, namespace, attributes, parentInfo, nil) + s.Equal(nil, err) +} + func (s *commandAttrValidatorSuite) TestValidateModifyWorkflowProperties() { namespace := namespace.Name("tests.Namespace") var attributes *commandpb.ModifyWorkflowPropertiesCommandAttributes diff --git a/service/history/historybuilder/event_factory.go b/service/history/historybuilder/event_factory.go index 0da503e6578..a5f2ba1920c 100644 --- a/service/history/historybuilder/event_factory.go +++ b/service/history/historybuilder/event_factory.go @@ -820,6 +820,7 @@ func (b *EventFactory) CreateStartChildWorkflowExecutionInitiatedEvent( SearchAttributes: command.SearchAttributes, ParentClosePolicy: command.GetParentClosePolicy(), InheritBuildId: command.InheritBuildId, + WorkflowStartDelay: command.WorkflowStartDelay, }, } return event diff --git a/service/history/historybuilder/history_builder_test.go b/service/history/historybuilder/history_builder_test.go index 349ecd8a366..1d5bebfe3e5 100644 --- a/service/history/historybuilder/history_builder_test.go +++ b/service/history/historybuilder/history_builder_test.go @@ -112,8 +112,11 @@ var ( MaximumInterval: durationpb.New(time.Duration(rand.Int63())), NonRetryableErrorTypes: []string{"test non retryable error type"}, } - testCronSchedule = "12 * * * *" - testMemo = &commonpb.Memo{ + testCronSchedule = "12 * * * *" + testWorkflowStartDelay = &durationpb.Duration{ + Seconds: 10, + } + testMemo = &commonpb.Memo{ Fields: map[string]*commonpb.Payload{ "random memo key": testPayload, }, @@ -1349,6 +1352,7 @@ func (s *historyBuilderSuite) TestStartChildWorkflowExecutionInitiated() { WorkflowIdReusePolicy: workflowIdReusePolicy, RetryPolicy: testRetryPolicy, CronSchedule: testCronSchedule, + WorkflowStartDelay: testWorkflowStartDelay, Memo: testMemo, SearchAttributes: testSearchAttributes, Header: testHeader, @@ -1382,6 +1386,7 @@ func (s *historyBuilderSuite) TestStartChildWorkflowExecutionInitiated() { WorkflowIdReusePolicy: workflowIdReusePolicy, RetryPolicy: testRetryPolicy, CronSchedule: testCronSchedule, + WorkflowStartDelay: testWorkflowStartDelay, Memo: testMemo, SearchAttributes: testSearchAttributes, Header: testHeader, diff --git a/service/history/transfer_queue_active_task_executor.go b/service/history/transfer_queue_active_task_executor.go index a88f02c9655..495fba49ece 100644 --- a/service/history/transfer_queue_active_task_executor.go +++ b/service/history/transfer_queue_active_task_executor.go @@ -1353,6 +1353,7 @@ func (t *transferQueueActiveTaskExecutor) startWorkflow( WorkflowExecutionTimeout: attributes.WorkflowExecutionTimeout, WorkflowRunTimeout: attributes.WorkflowRunTimeout, WorkflowTaskTimeout: attributes.WorkflowTaskTimeout, + WorkflowStartDelay: attributes.WorkflowStartDelay, // Use the same request ID to dedupe StartWorkflowExecution calls RequestId: childRequestID, diff --git a/tests/child_workflow.go b/tests/child_workflow.go index 8334b9c5c84..e53a8f53783 100644 --- a/tests/child_workflow.go +++ b/tests/child_workflow.go @@ -364,6 +364,190 @@ func (s *FunctionalSuite) TestChildWorkflowExecution() { s.Equal("Child Done", s.decodePayloadsString(completedAttributes.GetResult())) } +func (s *FunctionalSuite) TestWorkflowStartDelayChildWorkflowExecution() { + parentID := "functional-start-delay-child-workflow-test-parent" + childID := "functional-start-delay-child-workflow-test-child" + wtParent := "functional-start-delay-child-workflow-test-parent-type" + wtChild := "functional-start-delay-child-workflow-test-child-type" + tlParent := "functional-start-delay-child-workflow-test-parent-taskqueue" + tlChild := "functional-start-delay-child-workflow-test-child-taskqueue" + identity := "worker1" + + startDelayDuration := 5 * time.Second + + parentWorkflowType := &commonpb.WorkflowType{Name: wtParent} + childWorkflowType := &commonpb.WorkflowType{Name: wtChild} + + taskQueueParent := &taskqueuepb.TaskQueue{Name: tlParent, Kind: enumspb.TASK_QUEUE_KIND_NORMAL} + taskQueueChild := &taskqueuepb.TaskQueue{Name: tlChild, Kind: enumspb.TASK_QUEUE_KIND_NORMAL} + + request := &workflowservice.StartWorkflowExecutionRequest{ + RequestId: uuid.New(), + Namespace: s.namespace, + WorkflowId: parentID, + WorkflowType: parentWorkflowType, + + TaskQueue: taskQueueParent, + Input: nil, + WorkflowRunTimeout: durationpb.New(100 * time.Second), + WorkflowTaskTimeout: durationpb.New(1 * time.Second), + Identity: identity, + } + + startParentWorkflowTS := time.Now().UTC() + we, err0 := s.client.StartWorkflowExecution(NewContext(), request) + s.NoError(err0) + s.Logger.Info("StartWorkflowExecution", tag.WorkflowRunID(we.RunId)) + + // workflow logic + childExecutionStarted := false + seenChildStarted := false + var completedEvent *historypb.HistoryEvent + // Parent workflow logic + wtHandlerParent := func( + task *workflowservice.PollWorkflowTaskQueueResponse) ([]*commandpb.Command, error) { + s.Logger.Info("Processing workflow task for", tag.WorkflowID(task.WorkflowExecution.WorkflowId)) + + if !childExecutionStarted { + s.Logger.Info("Starting child execution") + childExecutionStarted = true + return []*commandpb.Command{{ + CommandType: enumspb.COMMAND_TYPE_START_CHILD_WORKFLOW_EXECUTION, + Attributes: &commandpb.Command_StartChildWorkflowExecutionCommandAttributes{ + StartChildWorkflowExecutionCommandAttributes: &commandpb.StartChildWorkflowExecutionCommandAttributes{ + WorkflowId: childID, + WorkflowType: childWorkflowType, + TaskQueue: taskQueueChild, + Input: nil, + WorkflowRunTimeout: durationpb.New(200 * time.Second), + WorkflowTaskTimeout: durationpb.New(2 * time.Second), + Control: "", + WorkflowStartDelay: durationpb.New(startDelayDuration), + }, + }, + }}, nil + } + for _, event := range task.History.Events[task.PreviousStartedEventId:] { + if event.GetEventType() == enumspb.EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_STARTED { + seenChildStarted = true + } else if event.GetEventType() == enumspb.EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_COMPLETED { + completedEvent = event + // Close out parent workflow when child workflow completes + return []*commandpb.Command{{ + CommandType: enumspb.COMMAND_TYPE_COMPLETE_WORKFLOW_EXECUTION, + Attributes: &commandpb.Command_CompleteWorkflowExecutionCommandAttributes{ + CompleteWorkflowExecutionCommandAttributes: &commandpb.CompleteWorkflowExecutionCommandAttributes{ + Result: payloads.EncodeString("Done"), + }, + }, + }}, nil + } + } + return nil, nil + } + + var childStartedEvent *historypb.HistoryEvent + // Child workflow logic + wtHandlerChild := func(task *workflowservice.PollWorkflowTaskQueueResponse) ([]*commandpb.Command, error) { + s.Logger.Info("Processing workflow task for Child", tag.WorkflowID(task.WorkflowExecution.WorkflowId)) + childStartedEvent = task.History.Events[0] + return []*commandpb.Command{{ + CommandType: enumspb.COMMAND_TYPE_COMPLETE_WORKFLOW_EXECUTION, + Attributes: &commandpb.Command_CompleteWorkflowExecutionCommandAttributes{ + CompleteWorkflowExecutionCommandAttributes: &commandpb.CompleteWorkflowExecutionCommandAttributes{}, + }, + }}, nil + } + + pollerParent := &TaskPoller{ + Client: s.client, + Namespace: s.namespace, + TaskQueue: taskQueueParent, + Identity: identity, + WorkflowTaskHandler: wtHandlerParent, + Logger: s.Logger, + T: s.T(), + } + + pollerChild := &TaskPoller{ + Client: s.client, + Namespace: s.namespace, + TaskQueue: taskQueueChild, + Identity: identity, + WorkflowTaskHandler: wtHandlerChild, + Logger: s.Logger, + T: s.T(), + } + + // Make first workflow task to start child execution + _, err := pollerParent.PollAndProcessWorkflowTask() + s.Logger.Info("PollAndProcessWorkflowTask", tag.Error(err)) + s.NoError(err) + s.True(childExecutionStarted) + + // Process ChildExecution Started event + _, err = pollerParent.PollAndProcessWorkflowTask() + s.Logger.Info("PollAndProcessWorkflowTask", tag.Error(err)) + s.NoError(err) + s.True(seenChildStarted) + + // Poll child queue + _, err = pollerChild.PollAndProcessWorkflowTask() + s.Logger.Info("PollAndProcessWorkflowTask", tag.Error(err)) + s.NoError(err) + s.NotNil(childStartedEvent) + childStartedEventAttrs := childStartedEvent.GetWorkflowExecutionStartedEventAttributes() + s.Equal(enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED, childStartedEvent.GetEventType()) + s.Equal(s.namespace, childStartedEventAttrs.GetParentWorkflowNamespace()) + s.Equal(parentID, childStartedEventAttrs.ParentWorkflowExecution.GetWorkflowId()) + s.Equal(we.GetRunId(), childStartedEventAttrs.ParentWorkflowExecution.GetRunId()) + s.NotNil(childStartedEventAttrs.GetRootWorkflowExecution()) + s.Equal(parentID, childStartedEventAttrs.RootWorkflowExecution.GetWorkflowId()) + s.Equal(we.GetRunId(), childStartedEventAttrs.RootWorkflowExecution.GetRunId()) + s.Equal(startDelayDuration, childStartedEventAttrs.GetFirstWorkflowTaskBackoff().AsDuration()) + // clean up to make sure the next poll will update this var and assert correctly + childStartedEvent = nil + + // Process child workflow completion event and complete parent execution + _, err = pollerParent.PollAndProcessWorkflowTask() + s.Logger.Info("PollAndProcessWorkflowTask", tag.Error(err)) + s.NoError(err) + s.NotNil(completedEvent) + completedAttributes := completedEvent.GetChildWorkflowExecutionCompletedEventAttributes() + s.Equal(childID, completedAttributes.WorkflowExecution.WorkflowId) + s.Equal(wtChild, completedAttributes.WorkflowType.Name) + + startFilter := &filterpb.StartTimeFilter{} + startFilter.EarliestTime = timestamppb.New(startParentWorkflowTS) + startFilter.LatestTime = timestamppb.New(time.Now().UTC()) + var closedExecutions []*workflowpb.WorkflowExecutionInfo + for i := 0; i < 10; i++ { + resp, err := s.client.ListClosedWorkflowExecutions(NewContext(), &workflowservice.ListClosedWorkflowExecutionsRequest{ + Namespace: s.namespace, + MaximumPageSize: 100, + StartTimeFilter: startFilter, + }) + s.NoError(err) + if len(resp.GetExecutions()) == 2 { + closedExecutions = resp.GetExecutions() + break + } + time.Sleep(200 * time.Millisecond) + } + s.NotNil(closedExecutions) + sort.Slice(closedExecutions, func(i, j int) bool { + return closedExecutions[i].GetStartTime().AsTime().Before(closedExecutions[j].GetStartTime().AsTime()) + }) + + // First execution is parent workflow execution, second is the delayed child + // workflow execution + s.Equal(2, len(closedExecutions)) + parentExecution := closedExecutions[0] + childExecution := closedExecutions[1] + actualBackoff := childExecution.GetExecutionTime().AsTime().Sub(parentExecution.GetExecutionTime().AsTime()) + s.True(actualBackoff >= startDelayDuration, "child execution started in advance of delay") +} + func (s *FunctionalSuite) TestCronChildWorkflowExecution() { parentID := "functional-cron-child-workflow-test-parent" childID := "functional-cron-child-workflow-test-child" @@ -383,10 +567,11 @@ func (s *FunctionalSuite) TestCronChildWorkflowExecution() { taskQueueChild := &taskqueuepb.TaskQueue{Name: tlChild, Kind: enumspb.TASK_QUEUE_KIND_NORMAL} request := &workflowservice.StartWorkflowExecutionRequest{ - RequestId: uuid.New(), - Namespace: s.namespace, - WorkflowId: parentID, - WorkflowType: parentWorkflowType, + RequestId: uuid.New(), + Namespace: s.namespace, + WorkflowId: parentID, + WorkflowType: parentWorkflowType, + TaskQueue: taskQueueParent, Input: nil, WorkflowRunTimeout: durationpb.New(100 * time.Second),