diff --git a/README.md b/README.md index 330a0df8..2914e2c7 100644 --- a/README.md +++ b/README.md @@ -191,6 +191,9 @@ These samples demonstrate some common control flow patterns using Temporal's Go - [**Await for signal processing**](./await-signals): Demonstrates how to process out of order signals processing using `Await` and `AwaitWithTimeout`. +- [**Accumulate Signals**](./accumulator): Demonstrates how + to process many signals using `AwaitWithTimeout`, continue as new, and activities. + - [**Worker-specific Task Queues**](./worker-specific-task-queues): Use a unique task queue per Worker to have certain Activities only run on that specific Worker. For instance for a file processing Workflow, where one Activity downloads a file and subsequent Activities need to operate on that file. (If multiple Workers were on the same queue, subsequent Activities may get run on different machines that don't have the downloaded file.) - [**Nexus**](./nexus): Demonstrates how to use the Nexus APIs to facilitate cross namespace calls. diff --git a/accumulator/README.md b/accumulator/README.md new file mode 100644 index 00000000..e7eeae31 --- /dev/null +++ b/accumulator/README.md @@ -0,0 +1,36 @@ +# Accumulator +This sample demonstrates how to accumulate many signals (events) over a time period. +This sample implements the Accumulator Pattern: collect many meaningful things that need to be collected and worked on together, such as all payments for an account, or all account updates by account. + +This sample models robots being created throughout the time period, groups them by what color they are, and greets all the robots of a color at the end. + +A new workflow is created per grouping. +A sample activity at the end is given, and you could add an activity to +process individual events in the processGreeting() method. + +Because Temporal Workflows cannot have an unlimited size, Continue As New is used to process more signals that may come in. +You could create as many groupings as desired, as Temporal Workflows scale out to many workflows without limit. + +You could vary the time that the workflow waits for other signals, say for a day, or a variable time from first signal with the GetNextTimeout() function. +This example supports exiting early with an exit signal. Pending greetings are still collected after exit signal is sent. + + +### Steps to run this sample: + +1) You need a Temporal service running. See details in repo's README.md +2) Run the following command to start the worker + +``` +go run accumulator/worker/main.go +``` + +3) Run the following command to start the workflow and send signals in random order + +``` +go run accumulator/starter/main.go +``` + +You can also run tests with +``` +go test accumulator/accumulate_signals_workflow_test.go +``` diff --git a/accumulator/accumulate_signals_activities.go b/accumulator/accumulate_signals_activities.go new file mode 100644 index 00000000..4c16028a --- /dev/null +++ b/accumulator/accumulate_signals_activities.go @@ -0,0 +1,23 @@ +package accumulator + +import ( + "context" + "fmt" + "go.temporal.io/sdk/activity" +) + +// this activity will process all of the signals together +func ComposeGreeting(ctx context.Context, s []AccumulateGreeting) (string, error) { + log := activity.GetLogger(ctx) + if len(s) == 0 { + log.Warn("No greetings found when trying to Compose Greetings.") + } + + words := fmt.Sprintf("Hello (%v) Robots", len(s)) + for _, v := range s { + words += ", " + v.GreetingText + } + words += "!" + return words, nil + +} diff --git a/accumulator/accumulate_signals_workflow.go b/accumulator/accumulate_signals_workflow.go new file mode 100644 index 00000000..e0ba91c9 --- /dev/null +++ b/accumulator/accumulate_signals_workflow.go @@ -0,0 +1,168 @@ +package accumulator + +import ( + "strconv" + "time" + + "go.temporal.io/sdk/workflow" +) + +/** + * This sample demonstrates how to accumulate many signals (business events) over a time period. + * This sample implements the Accumulator Pattern: collect many meaningful things that + * need to be collected and worked on together, such as all payments for an account, or + * all account updates by account. + * This sample models robots being created throughout the time period, + * groups them by what color they are, and greets all the robots of a color at the end. + * + * A new workflow is created per grouping. Workflows continue as new as needed. + * A sample activity at the end is given, and you could add an activity to + * process individual events in the processGreeting() method. + * + * Because Temporal Workflows cannot have an unlimited size, Continue As New is used + * to process more signals that may come in. + * You could create as many groupings as desired, as Temporal Workflows scale out to many workflows without limit. + * You could vary the time that the workflow waits for other signals, say for a day, or a variable time from first + * signal with the GetNextTimeout() function. + */ + +// signalToSignalTimeout is them maximum time between signals +const signalToSignalTimeout = 30 * time.Second + +// fromStartTimeout is the maximum time to receive all signals +const fromStartTimeout = 60 * time.Second + +// exitTimeout is the time to wait after exit is requested to catch any last few signals +const exitTimeout = 1 * time.Second + +type AccumulateGreeting struct { + GreetingText string + Bucket string + GreetingKey string +} + +type GreetingsInfo struct { + BucketKey string + GreetingsList []AccumulateGreeting + UniqueGreetingKeys map[string]bool + startTime time.Time +} + +// GetNextTimeout returns the maximum time for a workflow to wait for the next signal. +// This waits for the greater of the remaining fromStartTimeout and signalToSignalTimeout +// fromStartTimeout and signalToSignalTimeout can be adjusted to wait for the right amount of time as desired +// This resets with Continue As New +func (a *AccumulateGreeting) GetNextTimeout(ctx workflow.Context, startTime time.Time, exitRequested bool) (time.Duration, error) { + if exitRequested { + return exitTimeout, nil + } + if startTime.IsZero() { + startTime = workflow.GetInfo(ctx).WorkflowStartTime // if you want to start from the time of the first signal, customize this + } + total := workflow.Now(ctx).Sub(startTime) + totalLeft := fromStartTimeout - total + if totalLeft <= 0 { + return 0, nil + } + if signalToSignalTimeout > totalLeft { + return signalToSignalTimeout, nil + } + return totalLeft, nil +} + +// AccumulateSignalsWorkflow workflow definition +func AccumulateSignalsWorkflow(ctx workflow.Context, greetings GreetingsInfo) (allGreetings string, err error) { + log := workflow.GetLogger(ctx) + var a AccumulateGreeting + if greetings.GreetingsList == nil { + greetings.GreetingsList = []AccumulateGreeting{} + } + if greetings.UniqueGreetingKeys == nil { + greetings.UniqueGreetingKeys = make(map[string]bool) + } + var unprocessedGreetings []AccumulateGreeting + if greetings.startTime.IsZero() { + greetings.startTime = workflow.Now(ctx) + } + exitRequested := false + + ao := workflow.ActivityOptions{ + StartToCloseTimeout: 100 * time.Second, + } + ctx = workflow.WithActivityOptions(ctx, ao) + + for !workflow.GetInfo(ctx).GetContinueAsNewSuggested() { + + timeout, err := a.GetNextTimeout(ctx, greetings.startTime, exitRequested) + childCtx, cancelHandler := workflow.WithCancel(ctx) + selector := workflow.NewSelector(ctx) + + if err != nil { + log.Error("Error calculating timeout") + return "", err + } + log.Debug("Awaiting for " + timeout.String()) + selector.AddReceive(workflow.GetSignalChannel(ctx, "greeting"), func(c workflow.ReceiveChannel, more bool) { + c.Receive(ctx, &a) + unprocessedGreetings = append(unprocessedGreetings, a) + log.Debug("Signal Received with text: " + a.GreetingText + ", more?: " + strconv.FormatBool(more) + "\n") + cancelHandler() // cancel timer future + a = AccumulateGreeting{} + }) + selector.AddReceive(workflow.GetSignalChannel(ctx, "exit"), func(c workflow.ReceiveChannel, more bool) { + c.Receive(ctx, nil) + exitRequested = true + cancelHandler() // cancel timer future + log.Debug("Exit Signal Received, more?: " + strconv.FormatBool(more) + "\n") + }) + + timerFuture := workflow.NewTimer(childCtx, timeout) + selector.AddFuture(timerFuture, func(f workflow.Future) { + log.Debug("Timer fired \n") + }) + + selector.Select(ctx) + + if len(unprocessedGreetings) == 0 { // timeout without a signal coming in, so let's process the greetings and wrap it up! + log.Debug("Into final processing, received greetings count: " + strconv.Itoa(len(greetings.GreetingsList)) + "\n") + allGreetings = "" + err := workflow.ExecuteActivity(ctx, ComposeGreeting, greetings.GreetingsList).Get(ctx, &allGreetings) + if err != nil { + log.Error("ComposeGreeting activity failed.", "Error", err) + return allGreetings, err + } + + if !selector.HasPending() { // in case a signal came in while activity was running, check again + return allGreetings, nil + } else { + log.Info("Received a signal while processing ComposeGreeting activity.") + } + } + + /* process latest signals + * Here is where we can process individual signals as they come in. + * It's ok to call activities here. + * This also validates an individual greeting: + * - check for duplicates + * - check for correct bucket + * Using update validation could improve this in the future + */ + toProcess := unprocessedGreetings + unprocessedGreetings = []AccumulateGreeting{} + + for _, ug := range toProcess { + if ug.Bucket != greetings.BucketKey { + log.Warn("Wrong bucket, something is wrong with your signal processing. WF Bucket: [" + greetings.BucketKey + "], greeting bucket: [" + ug.Bucket + "]") + } else if greetings.UniqueGreetingKeys[ug.GreetingKey] { + log.Warn("Duplicate Greeting Key. Key: [" + ug.GreetingKey + "]") + } else { + greetings.UniqueGreetingKeys[ug.GreetingKey] = true + greetings.GreetingsList = append(greetings.GreetingsList, ug) + } + } + + } + + log.Debug("Accumulate workflow starting new run with " + strconv.Itoa(len(greetings.GreetingsList)) + " greetings.") + return "Continued As New.", workflow.NewContinueAsNewError(ctx, AccumulateSignalsWorkflow, greetings) +} diff --git a/accumulator/accumulate_signals_workflow_test.go b/accumulator/accumulate_signals_workflow_test.go new file mode 100644 index 00000000..116c6b82 --- /dev/null +++ b/accumulator/accumulate_signals_workflow_test.go @@ -0,0 +1,352 @@ +package accumulator + +import ( + "testing" + "time" + + "github.com/stretchr/testify/suite" + "go.temporal.io/sdk/testsuite" +) + +type UnitTestSuite struct { + suite.Suite + testsuite.WorkflowTestSuite +} + +func TestUnitTestSuite(t *testing.T) { + suite.Run(t, new(UnitTestSuite)) +} + +// test 0: send nothing, verify it times out but is successful +func (s *UnitTestSuite) Test_WorkflowTimeout() { + env := s.NewTestWorkflowEnvironment() + env.RegisterActivity(ComposeGreeting) + bucket := "blue" + greetings := GreetingsInfo{ + BucketKey: bucket, + GreetingsList: []AccumulateGreeting{}, + UniqueGreetingKeys: make(map[string]bool), + } + env.ExecuteWorkflow(AccumulateSignalsWorkflow, greetings) + + s.True(env.IsWorkflowCompleted()) + // Workflow times out + s.NoError(env.GetWorkflowError()) + + var result string + + s.NoError(env.GetWorkflowResult(&result)) + s.Contains(result, "Hello") + s.Contains(result, "(0)") +} + +// test 1: start workflow, send one signal, make sure one is accepted +func (s *UnitTestSuite) Test_Signal() { + env := s.NewTestWorkflowEnvironment() + + bucket := "purple" + env.RegisterActivity(ComposeGreeting) + env.RegisterDelayedCallback(func() { + suzieGreeting := new(AccumulateGreeting) + suzieGreeting.GreetingText = "Suzie Robot" + suzieGreeting.Bucket = bucket + suzieGreeting.GreetingKey = "11235813" + env.SignalWorkflow("greeting", suzieGreeting) + }, time.Second*5) + + greetings := GreetingsInfo{ + BucketKey: bucket, + GreetingsList: []AccumulateGreeting{}, + UniqueGreetingKeys: make(map[string]bool), + } + env.ExecuteWorkflow(AccumulateSignalsWorkflow, greetings) + + s.True(env.IsWorkflowCompleted()) + s.NoError(env.GetWorkflowError()) + var result string + s.NoError(env.GetWorkflowResult(&result)) + s.Contains(result, "Hello (1)") + s.Contains(result, "Suzie Robot") +} + +// test 2: just send an exit signal, should end quickly and return empty string +func (s *UnitTestSuite) Test_Exit() { + env := s.NewTestWorkflowEnvironment() + bucket := "purple" + env.RegisterActivity(ComposeGreeting) + env.RegisterDelayedCallback(func() { + env.SignalWorkflow("exit", "") + }, time.Second*5) + + greetings := GreetingsInfo{ + BucketKey: bucket, + GreetingsList: []AccumulateGreeting{}, + UniqueGreetingKeys: make(map[string]bool), + } + env.ExecuteWorkflow(AccumulateSignalsWorkflow, greetings) + + s.True(env.IsWorkflowCompleted()) + s.NoError(env.GetWorkflowError()) + var result string + s.NoError(env.GetWorkflowResult(&result)) + s.Contains(result, "Hello") + s.Contains(result, "(0)") +} + +// test 3: send multiple greetings, should get them all +func (s *UnitTestSuite) Test_Multiple_Signals() { + env := s.NewTestWorkflowEnvironment() + + bucket := "purple" + env.RegisterActivity(ComposeGreeting) + env.RegisterDelayedCallback(func() { + suzieGreeting := new(AccumulateGreeting) + suzieGreeting.GreetingText = "Suzie Robot" + suzieGreeting.Bucket = bucket + suzieGreeting.GreetingKey = "11235813" + env.SignalWorkflow("greeting", suzieGreeting) + }, time.Second*5) + env.RegisterDelayedCallback(func() { + hezekiahGreeting := new(AccumulateGreeting) + hezekiahGreeting.GreetingText = "Hezekiah Robot" + hezekiahGreeting.Bucket = bucket + hezekiahGreeting.GreetingKey = "11235" + env.SignalWorkflow("greeting", hezekiahGreeting) + }, time.Second*6) + + greetings := GreetingsInfo{ + BucketKey: bucket, + GreetingsList: []AccumulateGreeting{}, + UniqueGreetingKeys: make(map[string]bool), + } + env.ExecuteWorkflow(AccumulateSignalsWorkflow, greetings) + + s.True(env.IsWorkflowCompleted()) + s.NoError(env.GetWorkflowError()) + var result string + s.NoError(env.GetWorkflowResult(&result)) + s.Contains(result, "Hello (2)") + s.Contains(result, "Suzie Robot") + s.Contains(result, "Hezekiah Robot") +} + +// test 4: send greetings with duplicate keys, should only get one +func (s *UnitTestSuite) Test_Duplicate_Signals() { + env := s.NewTestWorkflowEnvironment() + + bucket := "purple" + env.RegisterActivity(ComposeGreeting) + env.RegisterDelayedCallback(func() { + suzieGreeting := new(AccumulateGreeting) + suzieGreeting.GreetingText = "Suzie Robot" + suzieGreeting.Bucket = bucket + suzieGreeting.GreetingKey = "11235813" + env.SignalWorkflow("greeting", suzieGreeting) + }, time.Second*5) + env.RegisterDelayedCallback(func() { + hezekiahGreeting := new(AccumulateGreeting) + hezekiahGreeting.GreetingText = "Hezekiah Robot" + hezekiahGreeting.Bucket = bucket + hezekiahGreeting.GreetingKey = "11235813" + env.SignalWorkflow("greeting", hezekiahGreeting) + }, time.Second*6) + + greetings := GreetingsInfo{ + BucketKey: bucket, + GreetingsList: []AccumulateGreeting{}, + UniqueGreetingKeys: make(map[string]bool), + } + env.ExecuteWorkflow(AccumulateSignalsWorkflow, greetings) + + s.True(env.IsWorkflowCompleted()) + s.NoError(env.GetWorkflowError()) + var result string + s.NoError(env.GetWorkflowResult(&result)) + s.Contains(result, "Hello (1)") + s.Contains(result, "Suzie Robot") + s.NotContains(result, "Hezekiah Robot") +} + +// test 5: test sent with a bad bucket key +func (s *UnitTestSuite) Test_Bad_Bucket() { + env := s.NewTestWorkflowEnvironment() + + bucket := "purple" + env.RegisterActivity(ComposeGreeting) + env.RegisterDelayedCallback(func() { + suzieGreeting := new(AccumulateGreeting) + suzieGreeting.GreetingText = "Jake Robot" + suzieGreeting.Bucket = "blue" + suzieGreeting.GreetingKey = "11235813" + env.SignalWorkflow("greeting", suzieGreeting) + }, time.Second*5) + env.RegisterDelayedCallback(func() { + hezekiahGreeting := new(AccumulateGreeting) + hezekiahGreeting.GreetingText = "Hezekiah Robot" + hezekiahGreeting.Bucket = bucket + hezekiahGreeting.GreetingKey = "112358" + env.SignalWorkflow("greeting", hezekiahGreeting) + }, time.Second*6) + + greetings := GreetingsInfo{ + BucketKey: bucket, + GreetingsList: []AccumulateGreeting{}, + UniqueGreetingKeys: make(map[string]bool), + } + env.ExecuteWorkflow(AccumulateSignalsWorkflow, greetings) + + s.True(env.IsWorkflowCompleted()) + s.NoError(env.GetWorkflowError()) + var result string + s.NoError(env.GetWorkflowResult(&result)) + s.Contains(result, "Hello (1)") + s.NotContains(result, "Jake Robot") + s.Contains(result, "Hezekiah Robot") +} + +// test 6: test signal with start +func (s *UnitTestSuite) Test_Signal_With_Start() { + env := s.NewTestWorkflowEnvironment() + + bucket := "purple" + env.RegisterActivity(ComposeGreeting) + env.RegisterDelayedCallback(func() { + androssGreeting := new(AccumulateGreeting) + androssGreeting.GreetingText = "Andross Robot" + androssGreeting.Bucket = bucket + androssGreeting.GreetingKey = "1123" + env.SignalWorkflow("greeting", androssGreeting) + }, time.Second*0) + + greetings := GreetingsInfo{ + BucketKey: bucket, + GreetingsList: []AccumulateGreeting{}, + UniqueGreetingKeys: make(map[string]bool), + } + env.ExecuteWorkflow(AccumulateSignalsWorkflow, greetings) + + s.True(env.IsWorkflowCompleted()) + s.NoError(env.GetWorkflowError()) + var result string + s.NoError(env.GetWorkflowResult(&result)) + s.Contains(result, "Hello (1)") + s.Contains(result, "Andross Robot") +} + +// test 7: signal with start, wait too long for first workflow so it times out, send another signal, should be just one greeting +func (s *UnitTestSuite) Test_Signal_With_Start_Wait_Too_Long() { + env := s.NewTestWorkflowEnvironment() + + bucket := "purple" + env.RegisterActivity(ComposeGreeting) + + env.RegisterDelayedCallback(func() { + johnGreeting := new(AccumulateGreeting) + johnGreeting.GreetingText = "John Robot" + johnGreeting.Bucket = bucket + johnGreeting.GreetingKey = "112" + env.SignalWorkflow("greeting", johnGreeting) + }, time.Second*0) + env.RegisterDelayedCallback(func() { + targeGreeting := new(AccumulateGreeting) + targeGreeting.GreetingText = "Targe Robot" + targeGreeting.Bucket = bucket + targeGreeting.GreetingKey = "11" + env.SignalWorkflow("greeting", targeGreeting) + }, fromStartTimeout+time.Second*1) + + greetings := GreetingsInfo{ + BucketKey: bucket, + GreetingsList: []AccumulateGreeting{}, + UniqueGreetingKeys: make(map[string]bool), + } + env.ExecuteWorkflow(AccumulateSignalsWorkflow, greetings) + + s.True(env.IsWorkflowCompleted()) + s.NoError(env.GetWorkflowError()) + var result string + s.NoError(env.GetWorkflowResult(&result)) + s.Contains(result, "Hello (1)") + s.Contains(result, "John Robot") +} + +// test 8: signal with start, don't wait too long for first workflow so it times out, send another signal, should be two greetings +func (s *UnitTestSuite) Test_Signal_With_Start_Wait_Too_Short() { + env := s.NewTestWorkflowEnvironment() + + bucket := "purple" + env.RegisterActivity(ComposeGreeting) + + env.RegisterDelayedCallback(func() { + johnGreeting := new(AccumulateGreeting) + johnGreeting.GreetingText = "John Robot" + johnGreeting.Bucket = bucket + johnGreeting.GreetingKey = "112" + env.SignalWorkflow("greeting", johnGreeting) + }, time.Second*0) + env.RegisterDelayedCallback(func() { + targeGreeting := new(AccumulateGreeting) + targeGreeting.GreetingText = "Targe Robot" + targeGreeting.Bucket = bucket + targeGreeting.GreetingKey = "11" + env.SignalWorkflow("greeting", targeGreeting) + }, signalToSignalTimeout-time.Second*1) + + greetings := GreetingsInfo{ + BucketKey: bucket, + GreetingsList: []AccumulateGreeting{}, + UniqueGreetingKeys: make(map[string]bool), + } + env.ExecuteWorkflow(AccumulateSignalsWorkflow, greetings) + + s.True(env.IsWorkflowCompleted()) + s.NoError(env.GetWorkflowError()) + var result string + s.NoError(env.GetWorkflowResult(&result)) + s.Contains(result, "Hello (2)") + s.Contains(result, "John Robot") + s.Contains(result, "Targe Robot") +} + +// test 9: signal with start, send exit, then signal with start, should get one signal +func (s *UnitTestSuite) Test_Signal_After_Exit() { + env := s.NewTestWorkflowEnvironment() + + bucket := "purple" + env.RegisterActivity(ComposeGreeting) + + env.RegisterDelayedCallback(func() { + johnGreeting := new(AccumulateGreeting) + johnGreeting.GreetingText = "John Robot" + johnGreeting.Bucket = bucket + johnGreeting.GreetingKey = "112" + env.SignalWorkflow("greeting", johnGreeting) + }, time.Second*0) + + env.RegisterDelayedCallback(func() { + env.SignalWorkflow("exit", "") + }, time.Second*5) + + env.RegisterDelayedCallback(func() { + targeGreeting := new(AccumulateGreeting) + targeGreeting.GreetingText = "Targe Robot" + targeGreeting.Bucket = bucket + targeGreeting.GreetingKey = "11" + env.SignalWorkflow("greeting", targeGreeting) + }, time.Second*5+time.Millisecond*1) + + greetings := GreetingsInfo{ + BucketKey: bucket, + GreetingsList: []AccumulateGreeting{}, + UniqueGreetingKeys: make(map[string]bool), + } + env.ExecuteWorkflow(AccumulateSignalsWorkflow, greetings) + + s.True(env.IsWorkflowCompleted()) + s.NoError(env.GetWorkflowError()) + var result string + s.NoError(env.GetWorkflowResult(&result)) + s.Contains(result, "Hello (2)") + s.Contains(result, "John Robot") + s.Contains(result, "Targe Robot") +} diff --git a/accumulator/starter/main.go b/accumulator/starter/main.go new file mode 100644 index 00000000..9ff06a10 --- /dev/null +++ b/accumulator/starter/main.go @@ -0,0 +1,201 @@ +package main + +import ( + "context" + "fmt" + "log" + "time" + + "math/rand" + + accumulator "github.com/temporalio/samples-go/accumulator" + "go.temporal.io/sdk/client" +) + +var WorkflowIDPrefix = "accumulate" + +var TaskQueue = "accumulate_greetings" + +func main() { + // The client is a heavyweight object that should be created once per process. + c, err := client.Dial(client.Options{ + HostPort: client.DefaultHostPort, + }) + if err != nil { + log.Fatalln("Unable to create client", err) + } + defer c.Close() + + // setup which tests to run + // by default it will run an accumulation with a few (20) signals + // to a set of 4 buckets with Signal To Start + triggerContinueAsNewWarning := false + + testSignalEdgeCases := true + // configure signal edge cases to test + testSignalAfterWorkflowExit := false + testSignalAfterExitSignal := !testSignalAfterWorkflowExit + testDuplicate := true + testIgnoreBadBucket := true + + // setup to send signals + bucket := "blue" + buckets := []string{"red", "blue", "green", "yellow"} + names := []string{"Genghis Khan", "Missy", "Bill", "Ted", "Rufus", "Abe"} + + maxSignals := 20 + if triggerContinueAsNewWarning { + maxSignals = 5000 + } + + for i := 0; i < maxSignals; i++ { + bucketIndex := rand.Intn(len(buckets)) + bucket = buckets[bucketIndex] + nameIndex := rand.Intn(len(names)) + + greeting := accumulator.AccumulateGreeting{ + GreetingText: names[nameIndex], + Bucket: bucket, + GreetingKey: "key-" + fmt.Sprint(i), + } + + greetings := accumulator.GreetingsInfo{ + BucketKey: bucket, + GreetingsList: []accumulator.AccumulateGreeting{}, + UniqueGreetingKeys: make(map[string]bool), + } + time.Sleep(5 * time.Millisecond) + + WorkflowID := WorkflowIDPrefix + "-" + bucket + workflowOptions := client.StartWorkflowOptions{ + ID: WorkflowID, + TaskQueue: TaskQueue, + } + we, err := c.SignalWithStartWorkflow(context.Background(), WorkflowID, "greeting", greeting, workflowOptions, accumulator.AccumulateSignalsWorkflow, greetings) + if err != nil { + log.Fatalln("Unable to signal with start workflow", err) + } + log.Println("Signaled/Started workflow", "WorkflowID", we.GetID(), "RunID", we.GetRunID(), "signal:", greeting.GreetingText) + + } + + // skip further testing + if !testSignalEdgeCases { + return + } + + // now we will try sending a signals near time of workflow exit + bucket = "purple" + WorkflowID := WorkflowIDPrefix + "-" + bucket + + workflowOptions := client.StartWorkflowOptions{ + ID: WorkflowID, + TaskQueue: TaskQueue, + } + + suzieGreeting := new(accumulator.AccumulateGreeting) + suzieGreeting.GreetingText = "Suzie Robot" + suzieGreeting.Bucket = bucket + suzieGreeting.GreetingKey = "11235813" + + greetings := accumulator.GreetingsInfo{ + BucketKey: bucket, + GreetingsList: []accumulator.AccumulateGreeting{}, + UniqueGreetingKeys: make(map[string]bool), + } + + // start the workflow async and then signal it + we, err := c.ExecuteWorkflow(context.Background(), workflowOptions, accumulator.AccumulateSignalsWorkflow, greetings) + if err != nil { + log.Fatalln("Unable to execute workflow", err) + } + log.Println("Started workflow", "WorkflowID", we.GetID(), "RunID", we.GetRunID()) + + // After start for AccumulateSignalsWorkflow returns, the workflow is guaranteed to be + // started, so we can send a signal to it using the workflow ID and Run ID + // This workflow keeps receiving signals until exit is called or the timer finishes with no signals + + // When the workflow is started the accumulateGreetings will block for the + // previously defined conditions + // Send the first workflow signal + err = c.SignalWorkflow(context.Background(), we.GetID(), we.GetRunID(), "greeting", suzieGreeting) + if err != nil { + log.Fatalln("Unable to signal workflow", err) + } + log.Println("Sent " + suzieGreeting.GreetingText + " to " + we.GetID()) + + // This test signals exit, then waits for the workflow to end + // signals after this will error, as the workflow execution already completed + if testSignalAfterWorkflowExit { + err = c.SignalWorkflow(context.Background(), we.GetID(), we.GetRunID(), "exit", "") + if err != nil { + log.Fatalln("Unable to signal workflow", err) + } + log.Println(we.GetID() + ":Sent exit") + var exitSignalResults string + err = we.Get(context.Background(), &exitSignalResults) + if err != nil { + log.Fatalln("Unable to get workflow results", err) + } + log.Println(we.GetID() + "-" + exitSignalResults + ": execution results: " + exitSignalResults) + } + + // This test sends an exit signal, does not wait for workflow to exit, then sends a signal + // this demonstrates Temporal history rollback + // see https://community.temporal.io/t/continueasnew-signals/1008/7 + if testSignalAfterExitSignal { + err = c.SignalWorkflow(context.Background(), we.GetID(), we.GetRunID(), "exit", "") + if err != nil { + log.Fatalln("Unable to signal workflow "+we.GetID(), err) + } + log.Println(we.GetID() + ": Sent exit") + // Signals after this test sending more signals after workflow exit + time.Sleep(5 * time.Millisecond) + } + + + + janeGreeting := new(accumulator.AccumulateGreeting) + janeGreeting.GreetingText = "Jane Robot" + janeGreeting.Bucket = bucket + janeGreeting.GreetingKey = "112358132134" + err = c.SignalWorkflow(context.Background(), we.GetID(), we.GetRunID(), "greeting", janeGreeting) + if err != nil { + log.Println("Workflow " + we.GetID() + " not found to signal - this is intentional: " + err.Error()) + } + log.Println("Sent " + janeGreeting.GreetingText + " to " + we.GetID()) + + if testIgnoreBadBucket { + // send a third signal with an incorrect bucket - this will be ignored + // can use workflow update to validate and reject a request if needed + badBucketGreeting := new(accumulator.AccumulateGreeting) + badBucketGreeting.GreetingText = "Ozzy Robot" + badBucketGreeting.Bucket = "taupe" + badBucketGreeting.GreetingKey = "112358132134" + err = c.SignalWorkflow(context.Background(), we.GetID(), we.GetRunID(), "greeting", badBucketGreeting) + if err != nil { + log.Println("Workflow " + we.GetID() + " not found to signal - this is intentional: " + err.Error()) + } + log.Println("Sent invalid bucket signal " + badBucketGreeting.GreetingText + ", " + badBucketGreeting.Bucket + " to " + we.GetID()) + } + + if testDuplicate { + // intentionally send a duplicate signal + err = c.SignalWorkflow(context.Background(), we.GetID(), we.GetRunID(), "greeting", janeGreeting) + if err != nil { + log.Println("Workflow " + we.GetID() + " not found to signal - this is intentional: " + err.Error()) + } + log.Println("Sent Duplicate " + janeGreeting.GreetingText + " to " + we.GetID()) + } + + if !testSignalAfterWorkflowExit { + // wait for results if we haven't waited for them yet + var exitSignalResults string + err = we.Get(context.Background(), &exitSignalResults) + if err != nil { + log.Fatalln("Unable to get workflow results", err) + } + log.Println(we.GetID() + ": Execution results: " + exitSignalResults) + } + +} diff --git a/accumulator/worker/main.go b/accumulator/worker/main.go new file mode 100644 index 00000000..4f727787 --- /dev/null +++ b/accumulator/worker/main.go @@ -0,0 +1,31 @@ +package main + +import ( + "log" + + accumulator "github.com/temporalio/samples-go/accumulator" + + "go.temporal.io/sdk/client" + "go.temporal.io/sdk/worker" +) + +func main() { + // The client and worker are heavyweight objects that should be created once per process. + c, err := client.Dial(client.Options{ + HostPort: client.DefaultHostPort, + }) + if err != nil { + log.Fatalln("Unable to create client", err) + } + defer c.Close() + + w := worker.New(c, "accumulate_greetings", worker.Options{}) + + w.RegisterWorkflow(accumulator.AccumulateSignalsWorkflow) + w.RegisterActivity(accumulator.ComposeGreeting) + + err = w.Run(worker.InterruptCh()) + if err != nil { + log.Fatalln("Unable to start worker", err) + } +}