From a31524207516772ca3b74287c21ac30b69fb8af8 Mon Sep 17 00:00:00 2001 From: Joshua Smith Date: Wed, 29 May 2024 14:53:36 -0400 Subject: [PATCH 01/10] initial work --- accumulator/README.md | 56 +++++ accumulator/accumulate_signals_activities.go | 33 +++ accumulator/accumulate_signals_workflow.go | 234 +++++++++++++++++++ accumulator/await_signals_workflow_test.go | 61 +++++ accumulator/starter/main.go | 193 +++++++++++++++ accumulator/worker/main.go | 31 +++ 6 files changed, 608 insertions(+) create mode 100644 accumulator/README.md create mode 100644 accumulator/accumulate_signals_activities.go create mode 100644 accumulator/accumulate_signals_workflow.go create mode 100644 accumulator/await_signals_workflow_test.go create mode 100644 accumulator/starter/main.go create mode 100644 accumulator/worker/main.go diff --git a/accumulator/README.md b/accumulator/README.md new file mode 100644 index 00000000..dfb7f6f0 --- /dev/null +++ b/accumulator/README.md @@ -0,0 +1,56 @@ +* The sample demonstrates how to deal with multiple signals that can come out of order and require actions +* if a certain signal not received in a specified time interval. + +This specific sample receives three signals: Signal1, Signal2, Signal3. They have to be processed in the +sequential order, but they can be received out of order. +There are two timeouts to enforce. +The first one is the maximum time between signals. +The second limits the total time since the first signal received. + +A naive implementation of such use case would use a single loop that contains a Selector to listen on three +signals and a timer. Something like: + + for { + selector := workflow.NewSelector(ctx) + selector.AddReceive(workflow.GetSignalChannel(ctx, "Signal1"), func(c workflow.ReceiveChannel, more bool) { + // Process signal1 + }) + selector.AddReceive(workflow.GetSignalChannel(ctx, "Signal2"), func(c workflow.ReceiveChannel, more bool) { + // Process signal2 + } + selector.AddReceive(workflow.GetSignalChannel(ctx, "Signal3"), func(c workflow.ReceiveChannel, more bool) { + // Process signal3 + } + cCtx, cancel := workflow.WithCancel(ctx) + timer := workflow.NewTimer(cCtx, timeToNextSignal) + selector.AddFuture(timer, func(f workflow.Future) { + // Process timeout + }) + selector.Select(ctx) + cancel() + // break out of the loop on certain condition + } + +The above implementation works. But it quickly becomes pretty convoluted if the number of signals +and rules around order of their arrivals and timeouts increases. + +The following example demonstrates an alternative approach. It receives signals in a separate goroutine. +Each signal handler just updates a correspondent shared variable with the signal data. +The main workflow function awaits the next step using `workflow.AwaitWithTimeout` using condition composed of +the shared variables. This makes the main workflow method free from signal callbacks and makes the business logic +clear. + +### Steps to run this sample: + +1) You need a Temporal service running. See details in README.md +2) Run the following command to start the worker + +``` +go run await-signals/worker/main.go +``` + +3) Run the following command to start the workflow and send signals in random order + +``` +go run await-signals/starter/main.go +``` diff --git a/accumulator/accumulate_signals_activities.go b/accumulator/accumulate_signals_activities.go new file mode 100644 index 00000000..37cc44f1 --- /dev/null +++ b/accumulator/accumulate_signals_activities.go @@ -0,0 +1,33 @@ +package accumulator + +import ( + "context" + "fmt" + + "go.temporal.io/sdk/activity" +) + +// this activity will process all of the signals together +func ComposeGreeting(ctx context.Context, s []AccumulateGreeting) (string, error) { + log := activity.GetLogger(ctx) + log.Info("Compose Greetings Activity started. ") + fmt.Printf("greetings slice info: len=%d cap=%d %v\n", len(s), cap(s), s) + if(len(s) < 1) { + log.Warn("No greetings found when trying to Compose Greetings. ") + return "", nil + } + + words := "Hello" + for _, v:= range s { + words += fmt.Sprintf(", " + v.GreetingText ) + } + + words += "!" + return words, nil + + /* List greetingList = + greetings.stream().map(u -> u.greetingText).collect(Collectors.toList()); + return "Hello (" + greetingList.size() + ") robots: " + greetingList + "!"; + } + */ +} diff --git a/accumulator/accumulate_signals_workflow.go b/accumulator/accumulate_signals_workflow.go new file mode 100644 index 00000000..adece083 --- /dev/null +++ b/accumulator/accumulate_signals_workflow.go @@ -0,0 +1,234 @@ +package accumulator + +import ( + "fmt" + "strconv" + "time" + + + //"go.temporal.io/sdk/temporal" + + "go.temporal.io/sdk/workflow" +) + +/** + * The sample demonstrates how to deal with multiple signals that can come out of order and require actions + * if a certain signal not received in a specified time interval. + * + * This specific sample receives three signals: Signal1, Signal2, Signal3. They have to be processed in the + * sequential order, but they can be received out of order. + * There are two timeouts to enforce. + * The first one is the maximum time between signals. + * The second limits the total time since the first signal received. + * + * A naive implementation of such use case would use a single loop that contains a Selector to listen on three + * signals and a timer. Something like: + + * for { + * selector := workflow.NewSelector(ctx) + * selector.AddReceive(workflow.GetSignalChannel(ctx, "Signal1"), func(c workflow.ReceiveChannel, more bool) { + * // Process signal1 + * }) + * selector.AddReceive(workflow.GetSignalChannel(ctx, "Signal2"), func(c workflow.ReceiveChannel, more bool) { + * // Process signal2 + * } + * selector.AddReceive(workflow.GetSignalChannel(ctx, "Signal3"), func(c workflow.ReceiveChannel, more bool) { + * // Process signal3 + * } + * cCtx, cancel := workflow.WithCancel(ctx) + * timer := workflow.NewTimer(cCtx, timeToNextSignal) + * selector.AddFuture(timer, func(f workflow.Future) { + * // Process timeout + * }) + * selector.Select(ctx) + * cancel() + * // break out of the loop on certain condition + * } + * + * The above implementation works. But it quickly becomes pretty convoluted if the number of signals + * and rules around order of their arrivals and timeouts increases. + * + * The following example demonstrates an alternative approach. It receives signals in a separate goroutine. + * Each signal handler just updates a correspondent shared variable with the signal data. + * The main workflow function awaits the next step using `workflow.AwaitWithTimeout` using condition composed of + * the shared variables. This makes the main workflow method free from signal callbacks and makes the business logic + * clear. + */ + +// SignalToSignalTimeout is them maximum time between signals +var SignalToSignalTimeout = 30 * time.Second + +// FromFirstSignalTimeout is the maximum time to receive all signals +var FromFirstSignalTimeout = 60 * time.Second + +type AccumulateGreeting struct { + GreetingText string + Bucket string + GreetingKey string +} + +/* todo section +[x] listen for signals +[x] add to slice +[x] take fisrtsignaltime and exitrequested out of the struct +[x] test exit signal +[x] signal with start +[x] starter like java +[ ] tests like java +[x] consider checking for multiple messages in the signal wait loop +[x] "process" each greeting as they come in -- activity? no +[x] activity to combine all greetings +[ ] make GetNextTimeout not be a func on the struct +[ ] fix the extra listen +[ ] continue as new check and doing it +[ ] decide to use a separate goroutine function or keep the one you have +[ ] for fun race vs java +[ ] update readme +*/ + +// Listen to signals - greetings and exit +func Listen(ctx workflow.Context, a AccumulateGreeting, ExitRequested bool, FirstSignalTime time.Time) { + log := workflow.GetLogger(ctx) + for { + selector := workflow.NewSelector(ctx) + selector.AddReceive(workflow.GetSignalChannel(ctx, "greeting"), func(c workflow.ReceiveChannel, more bool) { + c.Receive(ctx, &a) + log.Info("Signal Received") + }) + selector.AddReceive(workflow.GetSignalChannel(ctx, "exit"), func(c workflow.ReceiveChannel, more bool) { + c.Receive(ctx, nil) + ExitRequested = true + log.Info("Exit Signal Received") + }) + selector.Select(ctx) + if FirstSignalTime.IsZero() { + FirstSignalTime = workflow.Now(ctx) + } + } +} + +// GetNextTimeout returns the maximum time allowed to wait for the next signal. +func (a *AccumulateGreeting) GetNextTimeout(ctx workflow.Context, timeToExit bool, firstSignalTime time.Time ) (time.Duration, error) { + if firstSignalTime.IsZero() { + firstSignalTime = workflow.Now(ctx) + } + total := workflow.Now(ctx).Sub(firstSignalTime) + totalLeft := FromFirstSignalTimeout - total + if totalLeft <= 0 { + return 0, nil + } + if SignalToSignalTimeout < totalLeft { + return SignalToSignalTimeout, nil + } + return totalLeft, nil +} + +// AccumulateSignalsWorkflow workflow definition +// func AccumulateSignalsWorkflow(ctx workflow.Context, bucketKey string, greetings []AccumulateGreeting, greetingKeys map[string]bool) (greeting string, err error) { +func AccumulateSignalsWorkflow(ctx workflow.Context, bucketKey string) (allGreetings string, err error) { + log := workflow.GetLogger(ctx) + var a AccumulateGreeting + greetings := []AccumulateGreeting{} + unprocessedGreetings := []AccumulateGreeting{} + uniqueGreetingKeysMap := make(map[string]bool) + var FirstSignalTime time.Time + ExitRequested := false + + ao := workflow.ActivityOptions{ + StartToCloseTimeout: 10 * time.Second, + } + ctx = workflow.WithActivityOptions(ctx, ao) + + printGreetings(greetings) + // Listen to signals in a different goroutine + workflow.Go(ctx, func(gCtx workflow.Context) { + for { + log.Info("in workflow.go signals goroutine for{} \n") + selector := workflow.NewSelector(gCtx) + selector.AddReceive(workflow.GetSignalChannel(gCtx, "greeting"), func(c workflow.ReceiveChannel, more bool) { + c.Receive(gCtx, &a) + unprocessedGreetings = append(unprocessedGreetings, a) + log.Info("Signal Received with text: " + a.GreetingText + ", more: " + strconv.FormatBool(more) + "\n") + // initialize a + a = AccumulateGreeting{} + }) + selector.AddReceive(workflow.GetSignalChannel(gCtx, "exit"), func(c workflow.ReceiveChannel, more bool) { + c.Receive(gCtx, nil) + ExitRequested = true + log.Info("Exit Signal Received\n") + }) + //log.Info("before select, greeting text: " + a.GreetingText + "\n") + selector.Select(gCtx) + //log.Info("after select, greeting text: " + a.GreetingText + "\n") + if FirstSignalTime.IsZero() { + FirstSignalTime = workflow.Now(gCtx) + } + } + }) + + for ; !workflow.GetInfo(ctx).GetContinueAsNewSuggested() ;{ + // Wait for Signal or timeout + timeout, err := a.GetNextTimeout(ctx, ExitRequested, FirstSignalTime) + + if err != nil { + log.Warn("error awaiting signal\n") + return "", err + } + if timeout <= 0 { + // do final processing? or just check for signal one more time + log.Info("no time left for signals, checking one more time\n") + } + + printGreetings(greetings) + log.Info("Awaiting for " + timeout.String() + "\n") + gotSignalBeforeTimeout, err := workflow.AwaitWithTimeout(ctx, timeout, func() bool { + return len(unprocessedGreetings) > 0 || ExitRequested + }) + printGreetings(greetings) + + // timeout + if len(unprocessedGreetings) == 0 { + log.Info("Into final processing, signal received? " + strconv.FormatBool(gotSignalBeforeTimeout) + ", exit requested: " + strconv.FormatBool(ExitRequested) +"\n") + // do final processing + //printGreetings(greetings) + allGreetings = "" + // get token - retryable like normal, it's failure-prone and idempotent + err := workflow.ExecuteActivity(ctx, ComposeGreeting, greetings).Get(ctx, &allGreetings) + if err != nil { + log.Error("ComposeGreeting activity failed.", "Error", err) + return allGreetings, err + } + return allGreetings, nil + } + + // process latest signal + // Here is where we can process individual signals as they come in. + // It's ok to call activities here. + // This also validates an individual greeting: + // - check for duplicates + // - check for correct bucket + + for len(unprocessedGreetings) > 0 { + ug := unprocessedGreetings[0] + unprocessedGreetings = unprocessedGreetings[1:] + //fmt.Printf("greetings slice info for unprocessedGreetings after taking out ug: len=%d cap=%d %v\n", len(unprocessedGreetings), cap(unprocessedGreetings), unprocessedGreetings) + if ug.Bucket != bucketKey { + log.Warn("Wrong bucket, something is wrong with your signal processing. WF Bucket: [" + bucketKey +"], greeting bucket: [" + ug.Bucket + "]"); + } else if(uniqueGreetingKeysMap[ug.GreetingKey]) { + log.Warn("Duplicate Greeting Key. Key: [" + ug.GreetingKey +"]"); + } else { + uniqueGreetingKeysMap[ug.GreetingKey] = true + greetings = append(greetings, ug) + //log.Info("Adding Greeting. Key: [" + ug.GreetingKey +"], Text: [" + ug.GreetingText + "]"); + } + } + + //a = AccumulateGreeting{} + } + + return a.GreetingText, nil +} + +func printGreetings(s []AccumulateGreeting) { + fmt.Printf("greetings slice info: len=%d cap=%d %v\n", len(s), cap(s), s) +} \ No newline at end of file diff --git a/accumulator/await_signals_workflow_test.go b/accumulator/await_signals_workflow_test.go new file mode 100644 index 00000000..e6098fe2 --- /dev/null +++ b/accumulator/await_signals_workflow_test.go @@ -0,0 +1,61 @@ +package accumulator + +import ( + "testing" + "time" + + "github.com/stretchr/testify/suite" + "go.temporal.io/sdk/testsuite" +) + +type UnitTestSuite struct { + suite.Suite + testsuite.WorkflowTestSuite +} + +func TestUnitTestSuite(t *testing.T) { + suite.Run(t, new(UnitTestSuite)) +} + +func (s *UnitTestSuite) Test_WorkflowTimeout() { + env := s.NewTestWorkflowEnvironment() + env.ExecuteWorkflow(AccumulateSignalsWorkflow) + + s.True(env.IsWorkflowCompleted()) + // Workflow times out + s.Error(env.GetWorkflowError()) +} + +func (s *UnitTestSuite) Test_SignalsInOrder() { + env := s.NewTestWorkflowEnvironment() + env.RegisterDelayedCallback(func() { + env.SignalWorkflow("Signal1", nil) + }, time.Hour) + env.RegisterDelayedCallback(func() { + env.SignalWorkflow("Signal2", nil) + }, time.Hour+time.Second) + env.RegisterDelayedCallback(func() { + env.SignalWorkflow("Signal3", nil) + }, time.Hour+3*time.Second) + env.ExecuteWorkflow(AccumulateSignalsWorkflow) + + s.True(env.IsWorkflowCompleted()) + s.NoError(env.GetWorkflowError()) +} + +func (s *UnitTestSuite) Test_SignalsInReverseOrder() { + env := s.NewTestWorkflowEnvironment() + env.RegisterDelayedCallback(func() { + env.SignalWorkflow("Signal3", nil) + }, time.Hour) + env.RegisterDelayedCallback(func() { + env.SignalWorkflow("Signal2", nil) + }, time.Hour+time.Second) + env.RegisterDelayedCallback(func() { + env.SignalWorkflow("Signal1", nil) + }, time.Hour+3*time.Second) + env.ExecuteWorkflow(AccumulateSignalsWorkflow) + + s.True(env.IsWorkflowCompleted()) + s.NoError(env.GetWorkflowError()) +} diff --git a/accumulator/starter/main.go b/accumulator/starter/main.go new file mode 100644 index 00000000..b547f943 --- /dev/null +++ b/accumulator/starter/main.go @@ -0,0 +1,193 @@ +package main + +import ( + "context" + "fmt" + "log" + "time" + + "math/rand" + + accumulator "github.com/temporalio/samples-go/accumulator" + "go.temporal.io/sdk/client" +) + +var WorkflowIDPrefix = "accumulate" + +var TaskQueue = "accumulate_greetings"; + +func main() { + // The client is a heavyweight object that should be created once per process. + c, err := client.Dial(client.Options{ + HostPort: client.DefaultHostPort, + }) + if err != nil { + log.Fatalln("Unable to create client", err) + } + defer c.Close() + + // setup which tests to run + // by default it will run an accumulation with a few (20) signals + // to a set of 4 buckets with Signal To Start + triggerContinueAsNewWarning := false; + + testSignalEdgeCases := true; + // configure signal edge cases to test + testSignalAfterWorkflowExit := false; + testSignalAfterExitSignal := !testSignalAfterWorkflowExit; + testDuplicate := true; + testIgnoreBadBucket := true; + + // setup to send signals + bucket := "blue"; + workflowId := WorkflowIDPrefix + "-" + bucket; + buckets := []string{"red", "blue", "green", "yellow"} + names := []string{"Genghis Khan", "Missy", "Bill", "Ted", "Rufus", "Abe"} + workflowOptions := client.StartWorkflowOptions{ + ID: workflowId, + TaskQueue: TaskQueue, + } + + max_signals := 20 + if triggerContinueAsNewWarning { + max_signals = 10000 + } + + for i := 0; i < max_signals; i++ { + bucketIndex := rand.Intn(len(buckets)) + bucket = buckets[bucketIndex] + nameIndex := rand.Intn(len(names)) + + greeting := accumulator.AccumulateGreeting{ + GreetingText: names[nameIndex], + Bucket: bucket, + GreetingKey: "key-" + fmt.Sprint(i), + } + time.Sleep(20 * time.Millisecond) + + workflowId = WorkflowIDPrefix + "-" + bucket + workflowOptions = client.StartWorkflowOptions{ + ID: workflowId, + TaskQueue: TaskQueue, + } + we, err := c.SignalWithStartWorkflow(context.Background(), workflowId, "greeting", greeting, workflowOptions, accumulator.AccumulateSignalsWorkflow, bucket) + if err != nil { + log.Fatalln("Unable to signal with start workflow", err) + } + log.Println("Signaled/Started workflow", "WorkflowID", we.GetID(), "RunID", we.GetRunID(), "signal:", greeting.GreetingText) + + } + + // skip further testing + if (!testSignalEdgeCases) { + return + } + + // now we will try sending a signals near time of workflow exit + bucket = "purple" + workflowId = WorkflowIDPrefix + "-" + bucket + + workflowOptions = client.StartWorkflowOptions{ + ID: workflowId, + TaskQueue: TaskQueue, + } + + suzieGreeting := new(accumulator.AccumulateGreeting) + suzieGreeting.GreetingText = "Suzie Robot" + suzieGreeting.Bucket = bucket + suzieGreeting.GreetingKey = "11235813" + + // start the workflow async and then signal it + we, err := c.ExecuteWorkflow(context.Background(), workflowOptions, accumulator.AccumulateSignalsWorkflow, bucket) + if err != nil { + log.Fatalln("Unable to execute workflow", err) + } + log.Println("Started workflow", "WorkflowID", we.GetID(), "RunID", we.GetRunID()) + + + // After start for AccumulateSignalsWorkflow returns, the workflow is guaranteed to be + // started, so we can send a signal to it using the workflow ID and Run ID + // This workflow keeps receiving signals until exit is called or the timer finishes with no signals + + // When the workflow is started the accumulateGreetings will block for the + // previously defined conditions + // Send the first workflow signal + err = c.SignalWorkflow(context.Background(), we.GetID(), we.GetRunID(), "greeting", suzieGreeting) + if err != nil { + log.Fatalln("Unable to signal workflow", err) + } + log.Println("Sent " + suzieGreeting.GreetingText + " to " + we.GetID()) + + + // This test signals exit, then waits for the workflow to end + // signals after this will error, as the workflow execution already completed + if (testSignalAfterWorkflowExit) { + err = c.SignalWorkflow(context.Background(), we.GetID(), we.GetRunID(), "exit", "" ) + if err != nil { + log.Fatalln("Unable to signal workflow", err) + } + log.Println(we.GetID() + ":Sent exit") + var exitSignalResults string + we.Get(context.Background(), &exitSignalResults) + log.Println(we.GetID() + "-" + exitSignalResults + ": execution results: " + exitSignalResults) + } + + // This test sends an exit signal, does not wait for workflow to exit, then sends a signal + // this demonstrates Temporal history rollback + // see https://community.temporal.io/t/continueasnew-signals/1008/7 + if (testSignalAfterExitSignal) { + err = c.SignalWorkflow(context.Background(), we.GetID(), we.GetRunID(), "exit", "" ) + if err != nil { + log.Fatalln("Unable to signal workflow " + we.GetID(), err) + } + log.Println(we.GetID() + ": Sent exit") + } + + // Test sending more signals after workflow exit + + janeGreeting := new(accumulator.AccumulateGreeting) + janeGreeting.GreetingText = "Jane Robot" + janeGreeting.Bucket = bucket + janeGreeting.GreetingKey = "112358132134" + err = c.SignalWorkflow(context.Background(), we.GetID(), we.GetRunID(), "greeting", janeGreeting) + if err != nil { + log.Println("Workflow " + we.GetID() + " not found to signal - this is intentional: " + err.Error()); + } + log.Println("Sent " + janeGreeting.GreetingText + " to " + we.GetID()) + + if (testIgnoreBadBucket) { + // send a third signal with an incorrect bucket - this will be ignored + // can use workflow update to validate and reject a request if needed + badBucketGreeting := new(accumulator.AccumulateGreeting) + badBucketGreeting.GreetingText = "Ozzy Robot" + badBucketGreeting.Bucket = "taupe" + badBucketGreeting.GreetingKey = "112358132134" + err = c.SignalWorkflow(context.Background(), we.GetID(), we.GetRunID(), "greeting", badBucketGreeting) + if err != nil { + log.Println("Workflow " + we.GetID() + " not found to signal - this is intentional: " + err.Error()); + } + log.Println("Sent " + badBucketGreeting.GreetingText + " to " + we.GetID()) + } + + if (testDuplicate) { + // intentionally send a duplicate signal + err = c.SignalWorkflow(context.Background(), we.GetID(), we.GetRunID(), "greeting", janeGreeting) + if err != nil { + log.Println("Workflow " + we.GetID() + " not found to signal - this is intentional: " + err.Error()); + } + log.Println("Sent Duplicate " + janeGreeting.GreetingText + " to " + we.GetID()) + } + + if (!testSignalAfterWorkflowExit) { + // wait for results if we haven't waited for them yet + var exitSignalResults string + we.Get(context.Background(), &exitSignalResults) + log.Println(we.GetID() + "-" + exitSignalResults + ": execution results: " + exitSignalResults) + } + + + + return + + +} diff --git a/accumulator/worker/main.go b/accumulator/worker/main.go new file mode 100644 index 00000000..4f727787 --- /dev/null +++ b/accumulator/worker/main.go @@ -0,0 +1,31 @@ +package main + +import ( + "log" + + accumulator "github.com/temporalio/samples-go/accumulator" + + "go.temporal.io/sdk/client" + "go.temporal.io/sdk/worker" +) + +func main() { + // The client and worker are heavyweight objects that should be created once per process. + c, err := client.Dial(client.Options{ + HostPort: client.DefaultHostPort, + }) + if err != nil { + log.Fatalln("Unable to create client", err) + } + defer c.Close() + + w := worker.New(c, "accumulate_greetings", worker.Options{}) + + w.RegisterWorkflow(accumulator.AccumulateSignalsWorkflow) + w.RegisterActivity(accumulator.ComposeGreeting) + + err = w.Run(worker.InterruptCh()) + if err != nil { + log.Fatalln("Unable to start worker", err) + } +} From a15cfd5e27bcdfc5f3519729dfb486230330e3ca Mon Sep 17 00:00:00 2001 From: Joshua Smith Date: Fri, 14 Jun 2024 15:03:53 -0400 Subject: [PATCH 02/10] code complete, tests complete, tests pass --- accumulator/README.md | 65 ++-- accumulator/accumulate_signals_activities.go | 12 +- accumulator/accumulate_signals_workflow.go | 154 +++------ .../accumulate_signals_workflow_test.go | 304 ++++++++++++++++++ accumulator/await_signals_workflow_test.go | 61 ---- 5 files changed, 370 insertions(+), 226 deletions(-) create mode 100644 accumulator/accumulate_signals_workflow_test.go delete mode 100644 accumulator/await_signals_workflow_test.go diff --git a/accumulator/README.md b/accumulator/README.md index dfb7f6f0..f15f4a01 100644 --- a/accumulator/README.md +++ b/accumulator/README.md @@ -1,44 +1,18 @@ -* The sample demonstrates how to deal with multiple signals that can come out of order and require actions -* if a certain signal not received in a specified time interval. - -This specific sample receives three signals: Signal1, Signal2, Signal3. They have to be processed in the -sequential order, but they can be received out of order. -There are two timeouts to enforce. -The first one is the maximum time between signals. -The second limits the total time since the first signal received. - -A naive implementation of such use case would use a single loop that contains a Selector to listen on three -signals and a timer. Something like: - - for { - selector := workflow.NewSelector(ctx) - selector.AddReceive(workflow.GetSignalChannel(ctx, "Signal1"), func(c workflow.ReceiveChannel, more bool) { - // Process signal1 - }) - selector.AddReceive(workflow.GetSignalChannel(ctx, "Signal2"), func(c workflow.ReceiveChannel, more bool) { - // Process signal2 - } - selector.AddReceive(workflow.GetSignalChannel(ctx, "Signal3"), func(c workflow.ReceiveChannel, more bool) { - // Process signal3 - } - cCtx, cancel := workflow.WithCancel(ctx) - timer := workflow.NewTimer(cCtx, timeToNextSignal) - selector.AddFuture(timer, func(f workflow.Future) { - // Process timeout - }) - selector.Select(ctx) - cancel() - // break out of the loop on certain condition - } - -The above implementation works. But it quickly becomes pretty convoluted if the number of signals -and rules around order of their arrivals and timeouts increases. - -The following example demonstrates an alternative approach. It receives signals in a separate goroutine. -Each signal handler just updates a correspondent shared variable with the signal data. -The main workflow function awaits the next step using `workflow.AwaitWithTimeout` using condition composed of -the shared variables. This makes the main workflow method free from signal callbacks and makes the business logic -clear. +# Accumulator +This sample demonstrates how to accumulate many signals (events) over a time period. +This sample implements the Accumulator Pattern: collect many meaningful things that need to be collected and worked on together, such as all payments for an account, or all account updates by account. + +This sample models robots being created throughout the time period, groups them by what color they are, and greets all the robots of a color at the end. + +A new workflow is created per grouping. Workflows continue as new as needed. +A sample activity at the end is given, and you could add an activity to +process individual events in the processGreeting() method. + +Because Temporal Workflows cannot have an unlimited size, Continue As New is used to process more signals that may come in. +You could create as many groupings as desired, as Temporal Workflows scale out to many workflows without limit. + +You could vary the time that the workflow waits for other signals, say for a day, or a variable time from first signal with the GetNextTimeout() function. + ### Steps to run this sample: @@ -46,11 +20,16 @@ clear. 2) Run the following command to start the worker ``` -go run await-signals/worker/main.go +go run accumulator/worker/main.go ``` 3) Run the following command to start the workflow and send signals in random order ``` -go run await-signals/starter/main.go +go run accumulator/starter/main.go +``` + +You can always run tests with +``` +go test accumulator/accumulate_signals_workflow_test.go ``` diff --git a/accumulator/accumulate_signals_activities.go b/accumulator/accumulate_signals_activities.go index 37cc44f1..37b3db96 100644 --- a/accumulator/accumulate_signals_activities.go +++ b/accumulator/accumulate_signals_activities.go @@ -3,21 +3,18 @@ package accumulator import ( "context" "fmt" - + "strconv" "go.temporal.io/sdk/activity" ) // this activity will process all of the signals together func ComposeGreeting(ctx context.Context, s []AccumulateGreeting) (string, error) { log := activity.GetLogger(ctx) - log.Info("Compose Greetings Activity started. ") - fmt.Printf("greetings slice info: len=%d cap=%d %v\n", len(s), cap(s), s) if(len(s) < 1) { log.Warn("No greetings found when trying to Compose Greetings. ") - return "", nil } - words := "Hello" + words := "Hello (" + strconv.Itoa(len(s)) + ") Robots" for _, v:= range s { words += fmt.Sprintf(", " + v.GreetingText ) } @@ -25,9 +22,4 @@ func ComposeGreeting(ctx context.Context, s []AccumulateGreeting) (string, error words += "!" return words, nil - /* List greetingList = - greetings.stream().map(u -> u.greetingText).collect(Collectors.toList()); - return "Hello (" + greetingList.size() + ") robots: " + greetingList + "!"; - } - */ } diff --git a/accumulator/accumulate_signals_workflow.go b/accumulator/accumulate_signals_workflow.go index adece083..7e5f010c 100644 --- a/accumulator/accumulate_signals_workflow.go +++ b/accumulator/accumulate_signals_workflow.go @@ -4,55 +4,26 @@ import ( "fmt" "strconv" "time" - - - //"go.temporal.io/sdk/temporal" - "go.temporal.io/sdk/workflow" ) /** - * The sample demonstrates how to deal with multiple signals that can come out of order and require actions - * if a certain signal not received in a specified time interval. - * - * This specific sample receives three signals: Signal1, Signal2, Signal3. They have to be processed in the - * sequential order, but they can be received out of order. - * There are two timeouts to enforce. - * The first one is the maximum time between signals. - * The second limits the total time since the first signal received. - * - * A naive implementation of such use case would use a single loop that contains a Selector to listen on three - * signals and a timer. Something like: - - * for { - * selector := workflow.NewSelector(ctx) - * selector.AddReceive(workflow.GetSignalChannel(ctx, "Signal1"), func(c workflow.ReceiveChannel, more bool) { - * // Process signal1 - * }) - * selector.AddReceive(workflow.GetSignalChannel(ctx, "Signal2"), func(c workflow.ReceiveChannel, more bool) { - * // Process signal2 - * } - * selector.AddReceive(workflow.GetSignalChannel(ctx, "Signal3"), func(c workflow.ReceiveChannel, more bool) { - * // Process signal3 - * } - * cCtx, cancel := workflow.WithCancel(ctx) - * timer := workflow.NewTimer(cCtx, timeToNextSignal) - * selector.AddFuture(timer, func(f workflow.Future) { - * // Process timeout - * }) - * selector.Select(ctx) - * cancel() - * // break out of the loop on certain condition - * } - * - * The above implementation works. But it quickly becomes pretty convoluted if the number of signals - * and rules around order of their arrivals and timeouts increases. - * - * The following example demonstrates an alternative approach. It receives signals in a separate goroutine. - * Each signal handler just updates a correspondent shared variable with the signal data. - * The main workflow function awaits the next step using `workflow.AwaitWithTimeout` using condition composed of - * the shared variables. This makes the main workflow method free from signal callbacks and makes the business logic - * clear. + * This sample demonstrates how to accumulate many signals (events) over a time period. + * This sample implements the Accumulator Pattern: collect many meaningful things that + * need to be collected and worked on together, such as all payments for an account, or + * all account updates by account. + * This sample models robots being created throughout the time period, + * groups them by what color they are, and greets all the robots of a color at the end. + * + * A new workflow is created per grouping. Workflows continue as new as needed. + * A sample activity at the end is given, and you could add an activity to + * process individual events in the processGreeting() method. + * + * Because Temporal Workflows cannot have an unlimited size, Continue As New is used + * to process more signals that may come in. + * You could create as many groupings as desired, as Temporal Workflows scale out to many workflows without limit. + * You could vary the time that the workflow waits for other signals, say for a day, or a variable time from first + * signal with the GetNextTimeout() function. */ // SignalToSignalTimeout is them maximum time between signals @@ -67,45 +38,6 @@ type AccumulateGreeting struct { GreetingKey string } -/* todo section -[x] listen for signals -[x] add to slice -[x] take fisrtsignaltime and exitrequested out of the struct -[x] test exit signal -[x] signal with start -[x] starter like java -[ ] tests like java -[x] consider checking for multiple messages in the signal wait loop -[x] "process" each greeting as they come in -- activity? no -[x] activity to combine all greetings -[ ] make GetNextTimeout not be a func on the struct -[ ] fix the extra listen -[ ] continue as new check and doing it -[ ] decide to use a separate goroutine function or keep the one you have -[ ] for fun race vs java -[ ] update readme -*/ - -// Listen to signals - greetings and exit -func Listen(ctx workflow.Context, a AccumulateGreeting, ExitRequested bool, FirstSignalTime time.Time) { - log := workflow.GetLogger(ctx) - for { - selector := workflow.NewSelector(ctx) - selector.AddReceive(workflow.GetSignalChannel(ctx, "greeting"), func(c workflow.ReceiveChannel, more bool) { - c.Receive(ctx, &a) - log.Info("Signal Received") - }) - selector.AddReceive(workflow.GetSignalChannel(ctx, "exit"), func(c workflow.ReceiveChannel, more bool) { - c.Receive(ctx, nil) - ExitRequested = true - log.Info("Exit Signal Received") - }) - selector.Select(ctx) - if FirstSignalTime.IsZero() { - FirstSignalTime = workflow.Now(ctx) - } - } -} // GetNextTimeout returns the maximum time allowed to wait for the next signal. func (a *AccumulateGreeting) GetNextTimeout(ctx workflow.Context, timeToExit bool, firstSignalTime time.Time ) (time.Duration, error) { @@ -124,13 +56,16 @@ func (a *AccumulateGreeting) GetNextTimeout(ctx workflow.Context, timeToExit boo } // AccumulateSignalsWorkflow workflow definition -// func AccumulateSignalsWorkflow(ctx workflow.Context, bucketKey string, greetings []AccumulateGreeting, greetingKeys map[string]bool) (greeting string, err error) { -func AccumulateSignalsWorkflow(ctx workflow.Context, bucketKey string) (allGreetings string, err error) { +func AccumulateSignalsWorkflow(ctx workflow.Context, bucketKey string, greetingsCAN []AccumulateGreeting, uniqueGreetingKeysMapCAN map[string]bool) (allGreetings string, err error) { log := workflow.GetLogger(ctx) var a AccumulateGreeting greetings := []AccumulateGreeting{} + greetings = append(greetings, greetingsCAN...) unprocessedGreetings := []AccumulateGreeting{} uniqueGreetingKeysMap := make(map[string]bool) + for k, v := range uniqueGreetingKeysMapCAN { + uniqueGreetingKeysMap[k] = v + } var FirstSignalTime time.Time ExitRequested := false @@ -139,27 +74,26 @@ func AccumulateSignalsWorkflow(ctx workflow.Context, bucketKey string) (allGreet } ctx = workflow.WithActivityOptions(ctx, ao) - printGreetings(greetings) // Listen to signals in a different goroutine workflow.Go(ctx, func(gCtx workflow.Context) { for { - log.Info("in workflow.go signals goroutine for{} \n") + log.Debug("in workflow.go signals goroutine for{} \n") selector := workflow.NewSelector(gCtx) selector.AddReceive(workflow.GetSignalChannel(gCtx, "greeting"), func(c workflow.ReceiveChannel, more bool) { c.Receive(gCtx, &a) unprocessedGreetings = append(unprocessedGreetings, a) - log.Info("Signal Received with text: " + a.GreetingText + ", more: " + strconv.FormatBool(more) + "\n") - // initialize a + log.Debug("Signal Received with text: " + a.GreetingText + ", more: " + strconv.FormatBool(more) + "\n") + a = AccumulateGreeting{} }) selector.AddReceive(workflow.GetSignalChannel(gCtx, "exit"), func(c workflow.ReceiveChannel, more bool) { c.Receive(gCtx, nil) ExitRequested = true - log.Info("Exit Signal Received\n") + log.Debug("Exit Signal Received\n") }) - //log.Info("before select, greeting text: " + a.GreetingText + "\n") + log.Debug("before select, greeting text: " + a.GreetingText + "\n") selector.Select(gCtx) - //log.Info("after select, greeting text: " + a.GreetingText + "\n") + log.Debug("after select, greeting text: " + a.GreetingText + "\n") if FirstSignalTime.IsZero() { FirstSignalTime = workflow.Now(gCtx) } @@ -176,42 +110,39 @@ func AccumulateSignalsWorkflow(ctx workflow.Context, bucketKey string) (allGreet } if timeout <= 0 { // do final processing? or just check for signal one more time - log.Info("no time left for signals, checking one more time\n") + log.Debug("No time left for signals, checking one more time\n") } - printGreetings(greetings) - log.Info("Awaiting for " + timeout.String() + "\n") + log.Debug("Awaiting for " + timeout.String() + "\n") gotSignalBeforeTimeout, err := workflow.AwaitWithTimeout(ctx, timeout, func() bool { return len(unprocessedGreetings) > 0 || ExitRequested }) - printGreetings(greetings) - // timeout + // timeout happened without a signal coming in, so let's process the greetings and wrap it up! if len(unprocessedGreetings) == 0 { - log.Info("Into final processing, signal received? " + strconv.FormatBool(gotSignalBeforeTimeout) + ", exit requested: " + strconv.FormatBool(ExitRequested) +"\n") + log.Debug("Into final processing, signal received? " + strconv.FormatBool(gotSignalBeforeTimeout) + ", exit requested: " + strconv.FormatBool(ExitRequested) +"\n") // do final processing - //printGreetings(greetings) allGreetings = "" - // get token - retryable like normal, it's failure-prone and idempotent err := workflow.ExecuteActivity(ctx, ComposeGreeting, greetings).Get(ctx, &allGreetings) if err != nil { log.Error("ComposeGreeting activity failed.", "Error", err) return allGreetings, err } + return allGreetings, nil } - // process latest signal - // Here is where we can process individual signals as they come in. - // It's ok to call activities here. - // This also validates an individual greeting: - // - check for duplicates - // - check for correct bucket - + /* process latest signal + * Here is where we can process individual signals as they come in. + * It's ok to call activities here. + * This also validates an individual greeting: + * - check for duplicates + * - check for correct bucket + */ for len(unprocessedGreetings) > 0 { ug := unprocessedGreetings[0] unprocessedGreetings = unprocessedGreetings[1:] - //fmt.Printf("greetings slice info for unprocessedGreetings after taking out ug: len=%d cap=%d %v\n", len(unprocessedGreetings), cap(unprocessedGreetings), unprocessedGreetings) + if ug.Bucket != bucketKey { log.Warn("Wrong bucket, something is wrong with your signal processing. WF Bucket: [" + bucketKey +"], greeting bucket: [" + ug.Bucket + "]"); } else if(uniqueGreetingKeysMap[ug.GreetingKey]) { @@ -219,14 +150,13 @@ func AccumulateSignalsWorkflow(ctx workflow.Context, bucketKey string) (allGreet } else { uniqueGreetingKeysMap[ug.GreetingKey] = true greetings = append(greetings, ug) - //log.Info("Adding Greeting. Key: [" + ug.GreetingKey +"], Text: [" + ug.GreetingText + "]"); } } - //a = AccumulateGreeting{} } - return a.GreetingText, nil + log.Debug("Accumulate workflow starting new run with " + strconv.Itoa(len(greetings)) + " greetings.") + return "Continued As New.", workflow.NewContinueAsNewError(ctx, AccumulateSignalsWorkflow, bucketKey, greetings, uniqueGreetingKeysMap) } func printGreetings(s []AccumulateGreeting) { diff --git a/accumulator/accumulate_signals_workflow_test.go b/accumulator/accumulate_signals_workflow_test.go new file mode 100644 index 00000000..9c5d545c --- /dev/null +++ b/accumulator/accumulate_signals_workflow_test.go @@ -0,0 +1,304 @@ +package accumulator + +import ( + "testing" + "time" + + "github.com/stretchr/testify/suite" + "go.temporal.io/sdk/testsuite" +) + +type UnitTestSuite struct { + suite.Suite + testsuite.WorkflowTestSuite +} + +func TestUnitTestSuite(t *testing.T) { + suite.Run(t, new(UnitTestSuite)) +} + +// test 0: send nothing, verify it times out but is successful +func (s *UnitTestSuite) Test_WorkflowTimeout() { + env := s.NewTestWorkflowEnvironment() + env.RegisterActivity(ComposeGreeting) + env.ExecuteWorkflow(AccumulateSignalsWorkflow, "blue", nil, nil) + + s.True(env.IsWorkflowCompleted()) + // Workflow times out + s.NoError(env.GetWorkflowError()) + + var result string + + s.NoError(env.GetWorkflowResult(&result)) + s.Contains( result, "Hello") + s.Contains( result, "(0)") +} + +// test 1: start workflow, send one signal, make sure one is accepted +func (s *UnitTestSuite) Test_Signal() { + env := s.NewTestWorkflowEnvironment() + + bucket := "purple" + env.RegisterActivity(ComposeGreeting) + env.RegisterDelayedCallback(func() { + suzieGreeting := new(AccumulateGreeting) + suzieGreeting.GreetingText = "Suzie Robot" + suzieGreeting.Bucket = bucket + suzieGreeting.GreetingKey = "11235813" + env.SignalWorkflow("greeting", suzieGreeting) + }, time.Second*5) + + env.ExecuteWorkflow(AccumulateSignalsWorkflow, bucket, nil, nil) + + s.True(env.IsWorkflowCompleted()) + s.NoError(env.GetWorkflowError()) + var result string + s.NoError(env.GetWorkflowResult(&result)) + s.Contains( result, "Hello (1)") + s.Contains( result, "Suzie Robot") +} + +// test 2: just send an exit signal, should end quickly and return empty string +func (s *UnitTestSuite) Test_Exit() { + env := s.NewTestWorkflowEnvironment() + bucket := "purple" + env.RegisterActivity(ComposeGreeting) + env.RegisterDelayedCallback(func() { + env.SignalWorkflow("exit", "") + }, time.Second*5) + + env.ExecuteWorkflow(AccumulateSignalsWorkflow, bucket, nil, nil) + + s.True(env.IsWorkflowCompleted()) + s.NoError(env.GetWorkflowError()) + var result string + s.NoError(env.GetWorkflowResult(&result)) + s.Contains( result, "Hello") + s.Contains( result, "(0)") +} + +// test 3: send multiple greetings, should get them all +func (s *UnitTestSuite) Test_Multiple_Signals() { + env := s.NewTestWorkflowEnvironment() + + bucket := "purple" + env.RegisterActivity(ComposeGreeting) + env.RegisterDelayedCallback(func() { + suzieGreeting := new(AccumulateGreeting) + suzieGreeting.GreetingText = "Suzie Robot" + suzieGreeting.Bucket = bucket + suzieGreeting.GreetingKey = "11235813" + env.SignalWorkflow("greeting", suzieGreeting) + }, time.Second*5) + env.RegisterDelayedCallback(func() { + hezekiahGreeting := new(AccumulateGreeting) + hezekiahGreeting.GreetingText = "Hezekiah Robot" + hezekiahGreeting.Bucket = bucket + hezekiahGreeting.GreetingKey = "11235" + env.SignalWorkflow("greeting", hezekiahGreeting) + }, time.Second*6) + + env.ExecuteWorkflow(AccumulateSignalsWorkflow, bucket, nil, nil) + + s.True(env.IsWorkflowCompleted()) + s.NoError(env.GetWorkflowError()) + var result string + s.NoError(env.GetWorkflowResult(&result)) + s.Contains( result, "Hello (2)") + s.Contains( result, "Suzie Robot") + s.Contains( result, "Hezekiah Robot") +} + +// test 4: send greetings with duplicate keys, should only get one +func (s *UnitTestSuite) Test_Duplicate_Signals() { + env := s.NewTestWorkflowEnvironment() + + bucket := "purple" + env.RegisterActivity(ComposeGreeting) + env.RegisterDelayedCallback(func() { + suzieGreeting := new(AccumulateGreeting) + suzieGreeting.GreetingText = "Suzie Robot" + suzieGreeting.Bucket = bucket + suzieGreeting.GreetingKey = "11235813" + env.SignalWorkflow("greeting", suzieGreeting) + }, time.Second*5) + env.RegisterDelayedCallback(func() { + hezekiahGreeting := new(AccumulateGreeting) + hezekiahGreeting.GreetingText = "Hezekiah Robot" + hezekiahGreeting.Bucket = bucket + hezekiahGreeting.GreetingKey = "11235813" + env.SignalWorkflow("greeting", hezekiahGreeting) + }, time.Second*6) + + env.ExecuteWorkflow(AccumulateSignalsWorkflow, bucket, nil, nil) + + s.True(env.IsWorkflowCompleted()) + s.NoError(env.GetWorkflowError()) + var result string + s.NoError(env.GetWorkflowResult(&result)) + s.Contains( result, "Hello (1)") + s.Contains( result, "Suzie Robot") + s.NotContains( result, "Hezekiah Robot") +} + +// test 5: test sent with a bad bucket key +func (s *UnitTestSuite) Test_Bad_Bucket() { + env := s.NewTestWorkflowEnvironment() + + bucket := "purple" + env.RegisterActivity(ComposeGreeting) + env.RegisterDelayedCallback(func() { + suzieGreeting := new(AccumulateGreeting) + suzieGreeting.GreetingText = "Jake Robot" + suzieGreeting.Bucket = "blue" + suzieGreeting.GreetingKey = "11235813" + env.SignalWorkflow("greeting", suzieGreeting) + }, time.Second*5) + env.RegisterDelayedCallback(func() { + hezekiahGreeting := new(AccumulateGreeting) + hezekiahGreeting.GreetingText = "Hezekiah Robot" + hezekiahGreeting.Bucket = bucket + hezekiahGreeting.GreetingKey = "112358" + env.SignalWorkflow("greeting", hezekiahGreeting) + }, time.Second*6) + + env.ExecuteWorkflow(AccumulateSignalsWorkflow, bucket, nil, nil) + + s.True(env.IsWorkflowCompleted()) + s.NoError(env.GetWorkflowError()) + var result string + s.NoError(env.GetWorkflowResult(&result)) + s.Contains( result, "Hello (1)") + s.NotContains( result, "Jake Robot") + s.Contains( result, "Hezekiah Robot") +} + +// test 6: test signal with start +func (s *UnitTestSuite) Test_Signal_With_Start() { + env := s.NewTestWorkflowEnvironment() + + bucket := "purple" + env.RegisterActivity(ComposeGreeting) + env.RegisterDelayedCallback(func() { + androssGreeting := new(AccumulateGreeting) + androssGreeting.GreetingText = "Andross Robot" + androssGreeting.Bucket = bucket + androssGreeting.GreetingKey = "1123" + env.SignalWorkflow("greeting", androssGreeting) + }, time.Second*0) + + env.ExecuteWorkflow(AccumulateSignalsWorkflow, bucket, nil, nil) + + + s.True(env.IsWorkflowCompleted()) + s.NoError(env.GetWorkflowError()) + var result string + s.NoError(env.GetWorkflowResult(&result)) + s.Contains( result, "Hello (1)") + s.Contains( result, "Andross Robot") +} + +// test 7: signal with start, wait too long for first workflow so it times out, send another signal, should be just one greeting +func (s *UnitTestSuite) Test_Signal_With_Start_Wait_Too_Long() { + env := s.NewTestWorkflowEnvironment() + + bucket := "purple" + env.RegisterActivity(ComposeGreeting) + + env.RegisterDelayedCallback(func() { + johnGreeting := new(AccumulateGreeting) + johnGreeting.GreetingText = "John Robot" + johnGreeting.Bucket = bucket + johnGreeting.GreetingKey = "112" + env.SignalWorkflow("greeting", johnGreeting) + }, time.Second*0) + env.RegisterDelayedCallback(func() { + targeGreeting := new(AccumulateGreeting) + targeGreeting.GreetingText = "Targe Robot" + targeGreeting.Bucket = bucket + targeGreeting.GreetingKey = "11" + env.SignalWorkflow("greeting", targeGreeting) + }, FromFirstSignalTimeout + time.Second*1) + + env.ExecuteWorkflow(AccumulateSignalsWorkflow, bucket, nil, nil) + + + s.True(env.IsWorkflowCompleted()) + s.NoError(env.GetWorkflowError()) + var result string + s.NoError(env.GetWorkflowResult(&result)) + s.Contains( result, "Hello (1)") + s.Contains( result, "John Robot") +} +// test 8: signal with start, don't wait too long for first workflow so it times out, send another signal, should be two greetings +func (s *UnitTestSuite) Test_Signal_With_Start_Wait_Too_Short() { + env := s.NewTestWorkflowEnvironment() + + bucket := "purple" + env.RegisterActivity(ComposeGreeting) + + env.RegisterDelayedCallback(func() { + johnGreeting := new(AccumulateGreeting) + johnGreeting.GreetingText = "John Robot" + johnGreeting.Bucket = bucket + johnGreeting.GreetingKey = "112" + env.SignalWorkflow("greeting", johnGreeting) + }, time.Second*0) + env.RegisterDelayedCallback(func() { + targeGreeting := new(AccumulateGreeting) + targeGreeting.GreetingText = "Targe Robot" + targeGreeting.Bucket = bucket + targeGreeting.GreetingKey = "11" + env.SignalWorkflow("greeting", targeGreeting) + }, SignalToSignalTimeout - time.Second*1) + + env.ExecuteWorkflow(AccumulateSignalsWorkflow, bucket, nil, nil) + + + s.True(env.IsWorkflowCompleted()) + s.NoError(env.GetWorkflowError()) + var result string + s.NoError(env.GetWorkflowResult(&result)) + s.Contains( result, "Hello (2)") + s.Contains( result, "John Robot") + s.Contains( result, "Targe Robot") +} + +// test 9: signal with start, send exit, then signal with start, should get two signals +func (s *UnitTestSuite) Test_Signal_After_Exit() { + env := s.NewTestWorkflowEnvironment() + + bucket := "purple" + env.RegisterActivity(ComposeGreeting) + + env.RegisterDelayedCallback(func() { + johnGreeting := new(AccumulateGreeting) + johnGreeting.GreetingText = "John Robot" + johnGreeting.Bucket = bucket + johnGreeting.GreetingKey = "112" + env.SignalWorkflow("greeting", johnGreeting) + }, time.Second*0) + + env.RegisterDelayedCallback(func() { + env.SignalWorkflow("exit", "") + }, time.Second*5) + + env.RegisterDelayedCallback(func() { + targeGreeting := new(AccumulateGreeting) + targeGreeting.GreetingText = "Targe Robot" + targeGreeting.Bucket = bucket + targeGreeting.GreetingKey = "11" + env.SignalWorkflow("greeting", targeGreeting) + }, time.Second*5+time.Millisecond*1) + + env.ExecuteWorkflow(AccumulateSignalsWorkflow, bucket, nil, nil) + + + s.True(env.IsWorkflowCompleted()) + s.NoError(env.GetWorkflowError()) + var result string + s.NoError(env.GetWorkflowResult(&result)) + s.Contains( result, "Hello (1)") + s.Contains( result, "John Robot") + s.NotContains( result, "Targe Robot") +} diff --git a/accumulator/await_signals_workflow_test.go b/accumulator/await_signals_workflow_test.go deleted file mode 100644 index e6098fe2..00000000 --- a/accumulator/await_signals_workflow_test.go +++ /dev/null @@ -1,61 +0,0 @@ -package accumulator - -import ( - "testing" - "time" - - "github.com/stretchr/testify/suite" - "go.temporal.io/sdk/testsuite" -) - -type UnitTestSuite struct { - suite.Suite - testsuite.WorkflowTestSuite -} - -func TestUnitTestSuite(t *testing.T) { - suite.Run(t, new(UnitTestSuite)) -} - -func (s *UnitTestSuite) Test_WorkflowTimeout() { - env := s.NewTestWorkflowEnvironment() - env.ExecuteWorkflow(AccumulateSignalsWorkflow) - - s.True(env.IsWorkflowCompleted()) - // Workflow times out - s.Error(env.GetWorkflowError()) -} - -func (s *UnitTestSuite) Test_SignalsInOrder() { - env := s.NewTestWorkflowEnvironment() - env.RegisterDelayedCallback(func() { - env.SignalWorkflow("Signal1", nil) - }, time.Hour) - env.RegisterDelayedCallback(func() { - env.SignalWorkflow("Signal2", nil) - }, time.Hour+time.Second) - env.RegisterDelayedCallback(func() { - env.SignalWorkflow("Signal3", nil) - }, time.Hour+3*time.Second) - env.ExecuteWorkflow(AccumulateSignalsWorkflow) - - s.True(env.IsWorkflowCompleted()) - s.NoError(env.GetWorkflowError()) -} - -func (s *UnitTestSuite) Test_SignalsInReverseOrder() { - env := s.NewTestWorkflowEnvironment() - env.RegisterDelayedCallback(func() { - env.SignalWorkflow("Signal3", nil) - }, time.Hour) - env.RegisterDelayedCallback(func() { - env.SignalWorkflow("Signal2", nil) - }, time.Hour+time.Second) - env.RegisterDelayedCallback(func() { - env.SignalWorkflow("Signal1", nil) - }, time.Hour+3*time.Second) - env.ExecuteWorkflow(AccumulateSignalsWorkflow) - - s.True(env.IsWorkflowCompleted()) - s.NoError(env.GetWorkflowError()) -} From 885fce7f5c94a1385a282b00e0fabb380dcb788b Mon Sep 17 00:00:00 2001 From: Joshua Smith Date: Fri, 14 Jun 2024 15:08:49 -0400 Subject: [PATCH 03/10] updated main readme to add accumulator --- README.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/README.md b/README.md index 5fd224b4..345065c4 100644 --- a/README.md +++ b/README.md @@ -191,6 +191,9 @@ These samples demonstrate some common control flow patterns using Temporal's Go - [**Await for signal processing**](./await-signals): Demonstrates how to process out of order signals processing using `Await` and `AwaitWithTimeout`. +- [**Accumulate Signals**](./accumulator): Demonstrates how + to process many signals using `AwaitWithTimeout`, continue as new, and activities. + - [**Worker-specific Task Queues**](./worker-specific-task-queues): Use a unique task queue per Worker to have certain Activities only run on that specific Worker. For instance for a file processing Workflow, where one Activity downloads a file and subsequent Activities need to operate on that file. (If multiple Workers were on the same queue, subsequent Activities may get run on different machines that don't have the downloaded file.) ### Scenario based examples From ef04dedccda69f39af6a240af41faaacb67429bd Mon Sep 17 00:00:00 2001 From: Joshua Smith Date: Fri, 14 Jun 2024 15:19:27 -0400 Subject: [PATCH 04/10] cleaning up unused variables and functions, fixing starter --- accumulator/accumulate_signals_workflow.go | 2 +- accumulator/starter/main.go | 17 ++++------------- 2 files changed, 5 insertions(+), 14 deletions(-) diff --git a/accumulator/accumulate_signals_workflow.go b/accumulator/accumulate_signals_workflow.go index 7e5f010c..c56b117e 100644 --- a/accumulator/accumulate_signals_workflow.go +++ b/accumulator/accumulate_signals_workflow.go @@ -114,7 +114,7 @@ func AccumulateSignalsWorkflow(ctx workflow.Context, bucketKey string, greetings } log.Debug("Awaiting for " + timeout.String() + "\n") - gotSignalBeforeTimeout, err := workflow.AwaitWithTimeout(ctx, timeout, func() bool { + gotSignalBeforeTimeout, _ := workflow.AwaitWithTimeout(ctx, timeout, func() bool { return len(unprocessedGreetings) > 0 || ExitRequested }) diff --git a/accumulator/starter/main.go b/accumulator/starter/main.go index b547f943..19f1619e 100644 --- a/accumulator/starter/main.go +++ b/accumulator/starter/main.go @@ -43,10 +43,6 @@ func main() { workflowId := WorkflowIDPrefix + "-" + bucket; buckets := []string{"red", "blue", "green", "yellow"} names := []string{"Genghis Khan", "Missy", "Bill", "Ted", "Rufus", "Abe"} - workflowOptions := client.StartWorkflowOptions{ - ID: workflowId, - TaskQueue: TaskQueue, - } max_signals := 20 if triggerContinueAsNewWarning { @@ -66,11 +62,11 @@ func main() { time.Sleep(20 * time.Millisecond) workflowId = WorkflowIDPrefix + "-" + bucket - workflowOptions = client.StartWorkflowOptions{ + workflowOptions := client.StartWorkflowOptions{ ID: workflowId, TaskQueue: TaskQueue, } - we, err := c.SignalWithStartWorkflow(context.Background(), workflowId, "greeting", greeting, workflowOptions, accumulator.AccumulateSignalsWorkflow, bucket) + we, err := c.SignalWithStartWorkflow(context.Background(), workflowId, "greeting", greeting, workflowOptions, accumulator.AccumulateSignalsWorkflow, bucket, nil, nil) if err != nil { log.Fatalln("Unable to signal with start workflow", err) } @@ -87,7 +83,7 @@ func main() { bucket = "purple" workflowId = WorkflowIDPrefix + "-" + bucket - workflowOptions = client.StartWorkflowOptions{ + workflowOptions := client.StartWorkflowOptions{ ID: workflowId, TaskQueue: TaskQueue, } @@ -98,7 +94,7 @@ func main() { suzieGreeting.GreetingKey = "11235813" // start the workflow async and then signal it - we, err := c.ExecuteWorkflow(context.Background(), workflowOptions, accumulator.AccumulateSignalsWorkflow, bucket) + we, err := c.ExecuteWorkflow(context.Background(), workflowOptions, accumulator.AccumulateSignalsWorkflow, bucket, nil, nil) if err != nil { log.Fatalln("Unable to execute workflow", err) } @@ -185,9 +181,4 @@ func main() { log.Println(we.GetID() + "-" + exitSignalResults + ": execution results: " + exitSignalResults) } - - - return - - } From 73a8a94b9b284adc743716d7f25b86fc6eb9ecee Mon Sep 17 00:00:00 2001 From: Joshua Smith Date: Fri, 14 Jun 2024 15:21:50 -0400 Subject: [PATCH 05/10] removing unused function --- accumulator/accumulate_signals_workflow.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/accumulator/accumulate_signals_workflow.go b/accumulator/accumulate_signals_workflow.go index c56b117e..69ab8d79 100644 --- a/accumulator/accumulate_signals_workflow.go +++ b/accumulator/accumulate_signals_workflow.go @@ -1,7 +1,6 @@ package accumulator import ( - "fmt" "strconv" "time" "go.temporal.io/sdk/workflow" @@ -158,7 +157,3 @@ func AccumulateSignalsWorkflow(ctx workflow.Context, bucketKey string, greetings log.Debug("Accumulate workflow starting new run with " + strconv.Itoa(len(greetings)) + " greetings.") return "Continued As New.", workflow.NewContinueAsNewError(ctx, AccumulateSignalsWorkflow, bucketKey, greetings, uniqueGreetingKeysMap) } - -func printGreetings(s []AccumulateGreeting) { - fmt.Printf("greetings slice info: len=%d cap=%d %v\n", len(s), cap(s), s) -} \ No newline at end of file From 03a5f3ded00f54de90f8b615da7924087bcc2dfe Mon Sep 17 00:00:00 2001 From: Joshua Smith Date: Fri, 14 Jun 2024 15:24:56 -0400 Subject: [PATCH 06/10] removing unused variable --- accumulator/starter/main.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/accumulator/starter/main.go b/accumulator/starter/main.go index 19f1619e..ac699c1b 100644 --- a/accumulator/starter/main.go +++ b/accumulator/starter/main.go @@ -40,7 +40,6 @@ func main() { // setup to send signals bucket := "blue"; - workflowId := WorkflowIDPrefix + "-" + bucket; buckets := []string{"red", "blue", "green", "yellow"} names := []string{"Genghis Khan", "Missy", "Bill", "Ted", "Rufus", "Abe"} @@ -61,7 +60,7 @@ func main() { } time.Sleep(20 * time.Millisecond) - workflowId = WorkflowIDPrefix + "-" + bucket + workflowId := WorkflowIDPrefix + "-" + bucket workflowOptions := client.StartWorkflowOptions{ ID: workflowId, TaskQueue: TaskQueue, @@ -81,7 +80,7 @@ func main() { // now we will try sending a signals near time of workflow exit bucket = "purple" - workflowId = WorkflowIDPrefix + "-" + bucket + workflowId := WorkflowIDPrefix + "-" + bucket workflowOptions := client.StartWorkflowOptions{ ID: workflowId, From 963cce0e6f443c07c8f448e136e02a7659f0d989 Mon Sep 17 00:00:00 2001 From: Joshua Smith Date: Tue, 18 Jun 2024 12:57:41 -0400 Subject: [PATCH 07/10] lots of improvements from code review --- accumulator/accumulate_signals_activities.go | 14 +- accumulator/accumulate_signals_workflow.go | 128 +++++++------- .../accumulate_signals_workflow_test.go | 158 ++++++++++++------ accumulator/starter/main.go | 154 +++++++++-------- 4 files changed, 261 insertions(+), 193 deletions(-) diff --git a/accumulator/accumulate_signals_activities.go b/accumulator/accumulate_signals_activities.go index 37b3db96..4c16028a 100644 --- a/accumulator/accumulate_signals_activities.go +++ b/accumulator/accumulate_signals_activities.go @@ -3,23 +3,21 @@ package accumulator import ( "context" "fmt" - "strconv" "go.temporal.io/sdk/activity" ) // this activity will process all of the signals together func ComposeGreeting(ctx context.Context, s []AccumulateGreeting) (string, error) { log := activity.GetLogger(ctx) - if(len(s) < 1) { - log.Warn("No greetings found when trying to Compose Greetings. ") + if len(s) == 0 { + log.Warn("No greetings found when trying to Compose Greetings.") } - words := "Hello (" + strconv.Itoa(len(s)) + ") Robots" - for _, v:= range s { - words += fmt.Sprintf(", " + v.GreetingText ) + words := fmt.Sprintf("Hello (%v) Robots", len(s)) + for _, v := range s { + words += ", " + v.GreetingText } - words += "!" return words, nil - + } diff --git a/accumulator/accumulate_signals_workflow.go b/accumulator/accumulate_signals_workflow.go index 69ab8d79..18baf149 100644 --- a/accumulator/accumulate_signals_workflow.go +++ b/accumulator/accumulate_signals_workflow.go @@ -3,70 +3,76 @@ package accumulator import ( "strconv" "time" + "go.temporal.io/sdk/workflow" ) /** - * This sample demonstrates how to accumulate many signals (events) over a time period. + * This sample demonstrates how to accumulate many signals (events) over a time period. * This sample implements the Accumulator Pattern: collect many meaningful things that - * need to be collected and worked on together, such as all payments for an account, or + * need to be collected and worked on together, such as all payments for an account, or * all account updates by account. - * This sample models robots being created throughout the time period, + * This sample models robots being created throughout the time period, * groups them by what color they are, and greets all the robots of a color at the end. - * + * * A new workflow is created per grouping. Workflows continue as new as needed. * A sample activity at the end is given, and you could add an activity to * process individual events in the processGreeting() method. - * - * Because Temporal Workflows cannot have an unlimited size, Continue As New is used + * + * Because Temporal Workflows cannot have an unlimited size, Continue As New is used * to process more signals that may come in. * You could create as many groupings as desired, as Temporal Workflows scale out to many workflows without limit. - * You could vary the time that the workflow waits for other signals, say for a day, or a variable time from first + * You could vary the time that the workflow waits for other signals, say for a day, or a variable time from first * signal with the GetNextTimeout() function. */ -// SignalToSignalTimeout is them maximum time between signals -var SignalToSignalTimeout = 30 * time.Second +// signalToSignalTimeout is them maximum time between signals +const signalToSignalTimeout = 30 * time.Second -// FromFirstSignalTimeout is the maximum time to receive all signals -var FromFirstSignalTimeout = 60 * time.Second +// fromFirstSignalTimeout is the maximum time to receive all signals +const fromFirstSignalTimeout = 60 * time.Second type AccumulateGreeting struct { - GreetingText string - Bucket string - GreetingKey string + GreetingText string + Bucket string + GreetingKey string } +type GreetingsInfo struct { + BucketKey string + GreetingsList []AccumulateGreeting + UniqueGreetingKeys map[string]bool +} // GetNextTimeout returns the maximum time allowed to wait for the next signal. -func (a *AccumulateGreeting) GetNextTimeout(ctx workflow.Context, timeToExit bool, firstSignalTime time.Time ) (time.Duration, error) { +func (a *AccumulateGreeting) GetNextTimeout(ctx workflow.Context, timeToExit bool, firstSignalTime time.Time) (time.Duration, error) { if firstSignalTime.IsZero() { firstSignalTime = workflow.Now(ctx) } total := workflow.Now(ctx).Sub(firstSignalTime) - totalLeft := FromFirstSignalTimeout - total + totalLeft := fromFirstSignalTimeout - total if totalLeft <= 0 { return 0, nil } - if SignalToSignalTimeout < totalLeft { - return SignalToSignalTimeout, nil + if signalToSignalTimeout < totalLeft { + return signalToSignalTimeout, nil } return totalLeft, nil } // AccumulateSignalsWorkflow workflow definition -func AccumulateSignalsWorkflow(ctx workflow.Context, bucketKey string, greetingsCAN []AccumulateGreeting, uniqueGreetingKeysMapCAN map[string]bool) (allGreetings string, err error) { +func AccumulateSignalsWorkflow(ctx workflow.Context, greetings GreetingsInfo) (allGreetings string, err error) { log := workflow.GetLogger(ctx) var a AccumulateGreeting - greetings := []AccumulateGreeting{} - greetings = append(greetings, greetingsCAN...) - unprocessedGreetings := []AccumulateGreeting{} - uniqueGreetingKeysMap := make(map[string]bool) - for k, v := range uniqueGreetingKeysMapCAN { - uniqueGreetingKeysMap[k] = v + if greetings.GreetingsList == nil { + greetings.GreetingsList = []AccumulateGreeting{} } - var FirstSignalTime time.Time - ExitRequested := false + if greetings.UniqueGreetingKeys == nil { + greetings.UniqueGreetingKeys = make(map[string]bool) + } + var unprocessedGreetings []AccumulateGreeting + var firstSignalTime time.Time + exitRequested := false ao := workflow.ActivityOptions{ StartToCloseTimeout: 10 * time.Second, @@ -76,84 +82,84 @@ func AccumulateSignalsWorkflow(ctx workflow.Context, bucketKey string, greetings // Listen to signals in a different goroutine workflow.Go(ctx, func(gCtx workflow.Context) { for { - log.Debug("in workflow.go signals goroutine for{} \n") + log.Debug("In workflow.go signals goroutine for{}") selector := workflow.NewSelector(gCtx) selector.AddReceive(workflow.GetSignalChannel(gCtx, "greeting"), func(c workflow.ReceiveChannel, more bool) { c.Receive(gCtx, &a) unprocessedGreetings = append(unprocessedGreetings, a) - log.Debug("Signal Received with text: " + a.GreetingText + ", more: " + strconv.FormatBool(more) + "\n") - - a = AccumulateGreeting{} + log.Debug("Signal Received with text: " + a.GreetingText + ", more: " + strconv.FormatBool(more)) + + a = AccumulateGreeting{} }) selector.AddReceive(workflow.GetSignalChannel(gCtx, "exit"), func(c workflow.ReceiveChannel, more bool) { c.Receive(gCtx, nil) - ExitRequested = true - log.Debug("Exit Signal Received\n") + exitRequested = true + log.Debug("Exit Signal Received") }) - log.Debug("before select, greeting text: " + a.GreetingText + "\n") + log.Debug("Before select greeting text: " + a.GreetingText) selector.Select(gCtx) - log.Debug("after select, greeting text: " + a.GreetingText + "\n") - if FirstSignalTime.IsZero() { - FirstSignalTime = workflow.Now(gCtx) + log.Debug("After select, greeting text: " + a.GreetingText) + if firstSignalTime.IsZero() { + firstSignalTime = workflow.Now(gCtx) } } }) - for ; !workflow.GetInfo(ctx).GetContinueAsNewSuggested() ;{ + for !workflow.GetInfo(ctx).GetContinueAsNewSuggested() { // Wait for Signal or timeout - timeout, err := a.GetNextTimeout(ctx, ExitRequested, FirstSignalTime) + timeout, err := a.GetNextTimeout(ctx, exitRequested, firstSignalTime) if err != nil { - log.Warn("error awaiting signal\n") + log.Warn("Error awaiting signal") return "", err } if timeout <= 0 { // do final processing? or just check for signal one more time - log.Debug("No time left for signals, checking one more time\n") + log.Debug("No time left for signals, checking one more time") } - log.Debug("Awaiting for " + timeout.String() + "\n") + log.Debug("Awaiting for " + timeout.String()) gotSignalBeforeTimeout, _ := workflow.AwaitWithTimeout(ctx, timeout, func() bool { - return len(unprocessedGreetings) > 0 || ExitRequested + return len(unprocessedGreetings) > 0 || exitRequested }) // timeout happened without a signal coming in, so let's process the greetings and wrap it up! if len(unprocessedGreetings) == 0 { - log.Debug("Into final processing, signal received? " + strconv.FormatBool(gotSignalBeforeTimeout) + ", exit requested: " + strconv.FormatBool(ExitRequested) +"\n") + log.Debug("Into final processing, signal received? " + strconv.FormatBool(gotSignalBeforeTimeout) + ", exit requested: " + strconv.FormatBool(exitRequested)) // do final processing allGreetings = "" - err := workflow.ExecuteActivity(ctx, ComposeGreeting, greetings).Get(ctx, &allGreetings) + err := workflow.ExecuteActivity(ctx, ComposeGreeting, greetings.GreetingsList).Get(ctx, &allGreetings) if err != nil { log.Error("ComposeGreeting activity failed.", "Error", err) return allGreetings, err } - + return allGreetings, nil } - + /* process latest signal * Here is where we can process individual signals as they come in. * It's ok to call activities here. * This also validates an individual greeting: * - check for duplicates - * - check for correct bucket + * - check for correct bucket */ - for len(unprocessedGreetings) > 0 { - ug := unprocessedGreetings[0] - unprocessedGreetings = unprocessedGreetings[1:] - - if ug.Bucket != bucketKey { - log.Warn("Wrong bucket, something is wrong with your signal processing. WF Bucket: [" + bucketKey +"], greeting bucket: [" + ug.Bucket + "]"); - } else if(uniqueGreetingKeysMap[ug.GreetingKey]) { - log.Warn("Duplicate Greeting Key. Key: [" + ug.GreetingKey +"]"); + toProcess := unprocessedGreetings + unprocessedGreetings = []AccumulateGreeting{} + + for _, ug := range toProcess { + if ug.Bucket != greetings.BucketKey { + log.Warn("Wrong bucket, something is wrong with your signal processing. WF Bucket: [" + greetings.BucketKey + "], greeting bucket: [" + ug.Bucket + "]") + } else if greetings.UniqueGreetingKeys[ug.GreetingKey] { + log.Warn("Duplicate Greeting Key. Key: [" + ug.GreetingKey + "]") } else { - uniqueGreetingKeysMap[ug.GreetingKey] = true - greetings = append(greetings, ug) + greetings.UniqueGreetingKeys[ug.GreetingKey] = true + greetings.GreetingsList = append(greetings.GreetingsList, ug) } } - + } - log.Debug("Accumulate workflow starting new run with " + strconv.Itoa(len(greetings)) + " greetings.") - return "Continued As New.", workflow.NewContinueAsNewError(ctx, AccumulateSignalsWorkflow, bucketKey, greetings, uniqueGreetingKeysMap) + log.Debug("Accumulate workflow starting new run with " + strconv.Itoa(len(greetings.GreetingsList)) + " greetings.") + return "Continued As New.", workflow.NewContinueAsNewError(ctx, AccumulateSignalsWorkflow, greetings) } diff --git a/accumulator/accumulate_signals_workflow_test.go b/accumulator/accumulate_signals_workflow_test.go index 9c5d545c..98de1ac0 100644 --- a/accumulator/accumulate_signals_workflow_test.go +++ b/accumulator/accumulate_signals_workflow_test.go @@ -21,23 +21,29 @@ func TestUnitTestSuite(t *testing.T) { func (s *UnitTestSuite) Test_WorkflowTimeout() { env := s.NewTestWorkflowEnvironment() env.RegisterActivity(ComposeGreeting) - env.ExecuteWorkflow(AccumulateSignalsWorkflow, "blue", nil, nil) + bucket := "blue" + greetings := GreetingsInfo{ + BucketKey: bucket, + GreetingsList: []AccumulateGreeting{}, + UniqueGreetingKeys: make(map[string]bool), + } + env.ExecuteWorkflow(AccumulateSignalsWorkflow, greetings) s.True(env.IsWorkflowCompleted()) // Workflow times out s.NoError(env.GetWorkflowError()) - + var result string - + s.NoError(env.GetWorkflowResult(&result)) - s.Contains( result, "Hello") - s.Contains( result, "(0)") + s.Contains(result, "Hello") + s.Contains(result, "(0)") } // test 1: start workflow, send one signal, make sure one is accepted func (s *UnitTestSuite) Test_Signal() { env := s.NewTestWorkflowEnvironment() - + bucket := "purple" env.RegisterActivity(ComposeGreeting) env.RegisterDelayedCallback(func() { @@ -48,39 +54,49 @@ func (s *UnitTestSuite) Test_Signal() { env.SignalWorkflow("greeting", suzieGreeting) }, time.Second*5) - env.ExecuteWorkflow(AccumulateSignalsWorkflow, bucket, nil, nil) + greetings := GreetingsInfo{ + BucketKey: bucket, + GreetingsList: []AccumulateGreeting{}, + UniqueGreetingKeys: make(map[string]bool), + } + env.ExecuteWorkflow(AccumulateSignalsWorkflow, greetings) s.True(env.IsWorkflowCompleted()) s.NoError(env.GetWorkflowError()) var result string s.NoError(env.GetWorkflowResult(&result)) - s.Contains( result, "Hello (1)") - s.Contains( result, "Suzie Robot") + s.Contains(result, "Hello (1)") + s.Contains(result, "Suzie Robot") } // test 2: just send an exit signal, should end quickly and return empty string func (s *UnitTestSuite) Test_Exit() { env := s.NewTestWorkflowEnvironment() - bucket := "purple" + bucket := "purple" env.RegisterActivity(ComposeGreeting) env.RegisterDelayedCallback(func() { env.SignalWorkflow("exit", "") }, time.Second*5) - env.ExecuteWorkflow(AccumulateSignalsWorkflow, bucket, nil, nil) + greetings := GreetingsInfo{ + BucketKey: bucket, + GreetingsList: []AccumulateGreeting{}, + UniqueGreetingKeys: make(map[string]bool), + } + env.ExecuteWorkflow(AccumulateSignalsWorkflow, greetings) s.True(env.IsWorkflowCompleted()) s.NoError(env.GetWorkflowError()) var result string s.NoError(env.GetWorkflowResult(&result)) - s.Contains( result, "Hello") - s.Contains( result, "(0)") + s.Contains(result, "Hello") + s.Contains(result, "(0)") } // test 3: send multiple greetings, should get them all func (s *UnitTestSuite) Test_Multiple_Signals() { env := s.NewTestWorkflowEnvironment() - + bucket := "purple" env.RegisterActivity(ComposeGreeting) env.RegisterDelayedCallback(func() { @@ -98,21 +114,26 @@ func (s *UnitTestSuite) Test_Multiple_Signals() { env.SignalWorkflow("greeting", hezekiahGreeting) }, time.Second*6) - env.ExecuteWorkflow(AccumulateSignalsWorkflow, bucket, nil, nil) + greetings := GreetingsInfo{ + BucketKey: bucket, + GreetingsList: []AccumulateGreeting{}, + UniqueGreetingKeys: make(map[string]bool), + } + env.ExecuteWorkflow(AccumulateSignalsWorkflow, greetings) s.True(env.IsWorkflowCompleted()) s.NoError(env.GetWorkflowError()) var result string s.NoError(env.GetWorkflowResult(&result)) - s.Contains( result, "Hello (2)") - s.Contains( result, "Suzie Robot") - s.Contains( result, "Hezekiah Robot") + s.Contains(result, "Hello (2)") + s.Contains(result, "Suzie Robot") + s.Contains(result, "Hezekiah Robot") } // test 4: send greetings with duplicate keys, should only get one func (s *UnitTestSuite) Test_Duplicate_Signals() { env := s.NewTestWorkflowEnvironment() - + bucket := "purple" env.RegisterActivity(ComposeGreeting) env.RegisterDelayedCallback(func() { @@ -130,21 +151,26 @@ func (s *UnitTestSuite) Test_Duplicate_Signals() { env.SignalWorkflow("greeting", hezekiahGreeting) }, time.Second*6) - env.ExecuteWorkflow(AccumulateSignalsWorkflow, bucket, nil, nil) + greetings := GreetingsInfo{ + BucketKey: bucket, + GreetingsList: []AccumulateGreeting{}, + UniqueGreetingKeys: make(map[string]bool), + } + env.ExecuteWorkflow(AccumulateSignalsWorkflow, greetings) s.True(env.IsWorkflowCompleted()) s.NoError(env.GetWorkflowError()) var result string s.NoError(env.GetWorkflowResult(&result)) - s.Contains( result, "Hello (1)") - s.Contains( result, "Suzie Robot") - s.NotContains( result, "Hezekiah Robot") + s.Contains(result, "Hello (1)") + s.Contains(result, "Suzie Robot") + s.NotContains(result, "Hezekiah Robot") } // test 5: test sent with a bad bucket key func (s *UnitTestSuite) Test_Bad_Bucket() { env := s.NewTestWorkflowEnvironment() - + bucket := "purple" env.RegisterActivity(ComposeGreeting) env.RegisterDelayedCallback(func() { @@ -162,21 +188,26 @@ func (s *UnitTestSuite) Test_Bad_Bucket() { env.SignalWorkflow("greeting", hezekiahGreeting) }, time.Second*6) - env.ExecuteWorkflow(AccumulateSignalsWorkflow, bucket, nil, nil) + greetings := GreetingsInfo{ + BucketKey: bucket, + GreetingsList: []AccumulateGreeting{}, + UniqueGreetingKeys: make(map[string]bool), + } + env.ExecuteWorkflow(AccumulateSignalsWorkflow, greetings) s.True(env.IsWorkflowCompleted()) s.NoError(env.GetWorkflowError()) var result string s.NoError(env.GetWorkflowResult(&result)) - s.Contains( result, "Hello (1)") - s.NotContains( result, "Jake Robot") - s.Contains( result, "Hezekiah Robot") + s.Contains(result, "Hello (1)") + s.NotContains(result, "Jake Robot") + s.Contains(result, "Hezekiah Robot") } // test 6: test signal with start func (s *UnitTestSuite) Test_Signal_With_Start() { env := s.NewTestWorkflowEnvironment() - + bucket := "purple" env.RegisterActivity(ComposeGreeting) env.RegisterDelayedCallback(func() { @@ -187,24 +218,28 @@ func (s *UnitTestSuite) Test_Signal_With_Start() { env.SignalWorkflow("greeting", androssGreeting) }, time.Second*0) - env.ExecuteWorkflow(AccumulateSignalsWorkflow, bucket, nil, nil) - + greetings := GreetingsInfo{ + BucketKey: bucket, + GreetingsList: []AccumulateGreeting{}, + UniqueGreetingKeys: make(map[string]bool), + } + env.ExecuteWorkflow(AccumulateSignalsWorkflow, greetings) s.True(env.IsWorkflowCompleted()) s.NoError(env.GetWorkflowError()) var result string s.NoError(env.GetWorkflowResult(&result)) - s.Contains( result, "Hello (1)") - s.Contains( result, "Andross Robot") + s.Contains(result, "Hello (1)") + s.Contains(result, "Andross Robot") } // test 7: signal with start, wait too long for first workflow so it times out, send another signal, should be just one greeting func (s *UnitTestSuite) Test_Signal_With_Start_Wait_Too_Long() { env := s.NewTestWorkflowEnvironment() - + bucket := "purple" env.RegisterActivity(ComposeGreeting) - + env.RegisterDelayedCallback(func() { johnGreeting := new(AccumulateGreeting) johnGreeting.GreetingText = "John Robot" @@ -218,25 +253,30 @@ func (s *UnitTestSuite) Test_Signal_With_Start_Wait_Too_Long() { targeGreeting.Bucket = bucket targeGreeting.GreetingKey = "11" env.SignalWorkflow("greeting", targeGreeting) - }, FromFirstSignalTimeout + time.Second*1) - - env.ExecuteWorkflow(AccumulateSignalsWorkflow, bucket, nil, nil) + }, fromFirstSignalTimeout+time.Second*1) + greetings := GreetingsInfo{ + BucketKey: bucket, + GreetingsList: []AccumulateGreeting{}, + UniqueGreetingKeys: make(map[string]bool), + } + env.ExecuteWorkflow(AccumulateSignalsWorkflow, greetings) s.True(env.IsWorkflowCompleted()) s.NoError(env.GetWorkflowError()) var result string s.NoError(env.GetWorkflowResult(&result)) - s.Contains( result, "Hello (1)") - s.Contains( result, "John Robot") + s.Contains(result, "Hello (1)") + s.Contains(result, "John Robot") } + // test 8: signal with start, don't wait too long for first workflow so it times out, send another signal, should be two greetings func (s *UnitTestSuite) Test_Signal_With_Start_Wait_Too_Short() { env := s.NewTestWorkflowEnvironment() - + bucket := "purple" env.RegisterActivity(ComposeGreeting) - + env.RegisterDelayedCallback(func() { johnGreeting := new(AccumulateGreeting) johnGreeting.GreetingText = "John Robot" @@ -250,27 +290,31 @@ func (s *UnitTestSuite) Test_Signal_With_Start_Wait_Too_Short() { targeGreeting.Bucket = bucket targeGreeting.GreetingKey = "11" env.SignalWorkflow("greeting", targeGreeting) - }, SignalToSignalTimeout - time.Second*1) - - env.ExecuteWorkflow(AccumulateSignalsWorkflow, bucket, nil, nil) + }, signalToSignalTimeout-time.Second*1) + greetings := GreetingsInfo{ + BucketKey: bucket, + GreetingsList: []AccumulateGreeting{}, + UniqueGreetingKeys: make(map[string]bool), + } + env.ExecuteWorkflow(AccumulateSignalsWorkflow, greetings) s.True(env.IsWorkflowCompleted()) s.NoError(env.GetWorkflowError()) var result string s.NoError(env.GetWorkflowResult(&result)) - s.Contains( result, "Hello (2)") - s.Contains( result, "John Robot") - s.Contains( result, "Targe Robot") + s.Contains(result, "Hello (2)") + s.Contains(result, "John Robot") + s.Contains(result, "Targe Robot") } // test 9: signal with start, send exit, then signal with start, should get two signals func (s *UnitTestSuite) Test_Signal_After_Exit() { env := s.NewTestWorkflowEnvironment() - + bucket := "purple" env.RegisterActivity(ComposeGreeting) - + env.RegisterDelayedCallback(func() { johnGreeting := new(AccumulateGreeting) johnGreeting.GreetingText = "John Robot" @@ -291,14 +335,18 @@ func (s *UnitTestSuite) Test_Signal_After_Exit() { env.SignalWorkflow("greeting", targeGreeting) }, time.Second*5+time.Millisecond*1) - env.ExecuteWorkflow(AccumulateSignalsWorkflow, bucket, nil, nil) - + greetings := GreetingsInfo{ + BucketKey: bucket, + GreetingsList: []AccumulateGreeting{}, + UniqueGreetingKeys: make(map[string]bool), + } + env.ExecuteWorkflow(AccumulateSignalsWorkflow, greetings) s.True(env.IsWorkflowCompleted()) s.NoError(env.GetWorkflowError()) var result string s.NoError(env.GetWorkflowResult(&result)) - s.Contains( result, "Hello (1)") - s.Contains( result, "John Robot") - s.NotContains( result, "Targe Robot") + s.Contains(result, "Hello (1)") + s.Contains(result, "John Robot") + s.NotContains(result, "Targe Robot") } diff --git a/accumulator/starter/main.go b/accumulator/starter/main.go index ac699c1b..807b68a8 100644 --- a/accumulator/starter/main.go +++ b/accumulator/starter/main.go @@ -14,7 +14,7 @@ import ( var WorkflowIDPrefix = "accumulate" -var TaskQueue = "accumulate_greetings"; +var TaskQueue = "accumulate_greetings" func main() { // The client is a heavyweight object that should be created once per process. @@ -27,118 +27,131 @@ func main() { defer c.Close() // setup which tests to run - // by default it will run an accumulation with a few (20) signals - // to a set of 4 buckets with Signal To Start - triggerContinueAsNewWarning := false; + // by default it will run an accumulation with a few (20) signals + // to a set of 4 buckets with Signal To Start + triggerContinueAsNewWarning := false - testSignalEdgeCases := true; - // configure signal edge cases to test - testSignalAfterWorkflowExit := false; - testSignalAfterExitSignal := !testSignalAfterWorkflowExit; - testDuplicate := true; - testIgnoreBadBucket := true; + testSignalEdgeCases := true + // configure signal edge cases to test + testSignalAfterWorkflowExit := false + testSignalAfterExitSignal := !testSignalAfterWorkflowExit + testDuplicate := true + testIgnoreBadBucket := true // setup to send signals - bucket := "blue"; - buckets := []string{"red", "blue", "green", "yellow"} - names := []string{"Genghis Khan", "Missy", "Bill", "Ted", "Rufus", "Abe"} + bucket := "blue" + buckets := []string{"red", "blue", "green", "yellow"} + names := []string{"Genghis Khan", "Missy", "Bill", "Ted", "Rufus", "Abe"} - max_signals := 20 + maxSignals := 20 if triggerContinueAsNewWarning { - max_signals = 10000 + maxSignals = 10000 } - for i := 0; i < max_signals; i++ { + for i := 0; i < maxSignals; i++ { bucketIndex := rand.Intn(len(buckets)) bucket = buckets[bucketIndex] nameIndex := rand.Intn(len(names)) - + greeting := accumulator.AccumulateGreeting{ GreetingText: names[nameIndex], - Bucket: bucket, - GreetingKey: "key-" + fmt.Sprint(i), + Bucket: bucket, + GreetingKey: "key-" + fmt.Sprint(i), + } + + greetings := accumulator.GreetingsInfo{ + BucketKey: bucket, + GreetingsList: []accumulator.AccumulateGreeting{}, + UniqueGreetingKeys: make(map[string]bool), } time.Sleep(20 * time.Millisecond) - workflowId := WorkflowIDPrefix + "-" + bucket + WorkflowID := WorkflowIDPrefix + "-" + bucket workflowOptions := client.StartWorkflowOptions{ - ID: workflowId, + ID: WorkflowID, TaskQueue: TaskQueue, } - we, err := c.SignalWithStartWorkflow(context.Background(), workflowId, "greeting", greeting, workflowOptions, accumulator.AccumulateSignalsWorkflow, bucket, nil, nil) + we, err := c.SignalWithStartWorkflow(context.Background(), WorkflowID, "greeting", greeting, workflowOptions, accumulator.AccumulateSignalsWorkflow, greetings) if err != nil { log.Fatalln("Unable to signal with start workflow", err) } log.Println("Signaled/Started workflow", "WorkflowID", we.GetID(), "RunID", we.GetRunID(), "signal:", greeting.GreetingText) - } - + } + // skip further testing - if (!testSignalEdgeCases) { + if !testSignalEdgeCases { return } // now we will try sending a signals near time of workflow exit bucket = "purple" - workflowId := WorkflowIDPrefix + "-" + bucket - + WorkflowID := WorkflowIDPrefix + "-" + bucket + workflowOptions := client.StartWorkflowOptions{ - ID: workflowId, + ID: WorkflowID, TaskQueue: TaskQueue, } - suzieGreeting := new(accumulator.AccumulateGreeting) + suzieGreeting := new(accumulator.AccumulateGreeting) suzieGreeting.GreetingText = "Suzie Robot" suzieGreeting.Bucket = bucket suzieGreeting.GreetingKey = "11235813" + greetings := accumulator.GreetingsInfo{ + BucketKey: bucket, + GreetingsList: []accumulator.AccumulateGreeting{}, + UniqueGreetingKeys: make(map[string]bool), + } + // start the workflow async and then signal it - we, err := c.ExecuteWorkflow(context.Background(), workflowOptions, accumulator.AccumulateSignalsWorkflow, bucket, nil, nil) + we, err := c.ExecuteWorkflow(context.Background(), workflowOptions, accumulator.AccumulateSignalsWorkflow, greetings) if err != nil { log.Fatalln("Unable to execute workflow", err) } log.Println("Started workflow", "WorkflowID", we.GetID(), "RunID", we.GetRunID()) - - // After start for AccumulateSignalsWorkflow returns, the workflow is guaranteed to be - // started, so we can send a signal to it using the workflow ID and Run ID - // This workflow keeps receiving signals until exit is called or the timer finishes with no signals + // After start for AccumulateSignalsWorkflow returns, the workflow is guaranteed to be + // started, so we can send a signal to it using the workflow ID and Run ID + // This workflow keeps receiving signals until exit is called or the timer finishes with no signals - // When the workflow is started the accumulateGreetings will block for the - // previously defined conditions - // Send the first workflow signal + // When the workflow is started the accumulateGreetings will block for the + // previously defined conditions + // Send the first workflow signal err = c.SignalWorkflow(context.Background(), we.GetID(), we.GetRunID(), "greeting", suzieGreeting) if err != nil { log.Fatalln("Unable to signal workflow", err) } log.Println("Sent " + suzieGreeting.GreetingText + " to " + we.GetID()) - - // This test signals exit, then waits for the workflow to end + // This test signals exit, then waits for the workflow to end // signals after this will error, as the workflow execution already completed - if (testSignalAfterWorkflowExit) { - err = c.SignalWorkflow(context.Background(), we.GetID(), we.GetRunID(), "exit", "" ) + if testSignalAfterWorkflowExit { + err = c.SignalWorkflow(context.Background(), we.GetID(), we.GetRunID(), "exit", "") if err != nil { log.Fatalln("Unable to signal workflow", err) } log.Println(we.GetID() + ":Sent exit") - var exitSignalResults string - we.Get(context.Background(), &exitSignalResults) + var exitSignalResults string + err = we.Get(context.Background(), &exitSignalResults) + if err != nil { + log.Fatalln("Unable to get workflow results", err) + } log.Println(we.GetID() + "-" + exitSignalResults + ": execution results: " + exitSignalResults) - } + } - // This test sends an exit signal, does not wait for workflow to exit, then sends a signal - // this demonstrates Temporal history rollback - // see https://community.temporal.io/t/continueasnew-signals/1008/7 - if (testSignalAfterExitSignal) { - err = c.SignalWorkflow(context.Background(), we.GetID(), we.GetRunID(), "exit", "" ) + // This test sends an exit signal, does not wait for workflow to exit, then sends a signal + // this demonstrates Temporal history rollback + // see https://community.temporal.io/t/continueasnew-signals/1008/7 + if testSignalAfterExitSignal { + err = c.SignalWorkflow(context.Background(), we.GetID(), we.GetRunID(), "exit", "") if err != nil { - log.Fatalln("Unable to signal workflow " + we.GetID(), err) + log.Fatalln("Unable to signal workflow "+we.GetID(), err) } log.Println(we.GetID() + ": Sent exit") - } + } - // Test sending more signals after workflow exit + // Test sending more signals after workflow exit janeGreeting := new(accumulator.AccumulateGreeting) janeGreeting.GreetingText = "Jane Robot" @@ -146,38 +159,41 @@ func main() { janeGreeting.GreetingKey = "112358132134" err = c.SignalWorkflow(context.Background(), we.GetID(), we.GetRunID(), "greeting", janeGreeting) if err != nil { - log.Println("Workflow " + we.GetID() + " not found to signal - this is intentional: " + err.Error()); + log.Println("Workflow " + we.GetID() + " not found to signal - this is intentional: " + err.Error()) } log.Println("Sent " + janeGreeting.GreetingText + " to " + we.GetID()) - if (testIgnoreBadBucket) { - // send a third signal with an incorrect bucket - this will be ignored - // can use workflow update to validate and reject a request if needed + if testIgnoreBadBucket { + // send a third signal with an incorrect bucket - this will be ignored + // can use workflow update to validate and reject a request if needed badBucketGreeting := new(accumulator.AccumulateGreeting) badBucketGreeting.GreetingText = "Ozzy Robot" badBucketGreeting.Bucket = "taupe" badBucketGreeting.GreetingKey = "112358132134" err = c.SignalWorkflow(context.Background(), we.GetID(), we.GetRunID(), "greeting", badBucketGreeting) if err != nil { - log.Println("Workflow " + we.GetID() + " not found to signal - this is intentional: " + err.Error()); + log.Println("Workflow " + we.GetID() + " not found to signal - this is intentional: " + err.Error()) } log.Println("Sent " + badBucketGreeting.GreetingText + " to " + we.GetID()) - } + } - if (testDuplicate) { - // intentionally send a duplicate signal - err = c.SignalWorkflow(context.Background(), we.GetID(), we.GetRunID(), "greeting", janeGreeting) + if testDuplicate { + // intentionally send a duplicate signal + err = c.SignalWorkflow(context.Background(), we.GetID(), we.GetRunID(), "greeting", janeGreeting) if err != nil { - log.Println("Workflow " + we.GetID() + " not found to signal - this is intentional: " + err.Error()); + log.Println("Workflow " + we.GetID() + " not found to signal - this is intentional: " + err.Error()) } log.Println("Sent Duplicate " + janeGreeting.GreetingText + " to " + we.GetID()) - } + } - if (!testSignalAfterWorkflowExit) { - // wait for results if we haven't waited for them yet - var exitSignalResults string - we.Get(context.Background(), &exitSignalResults) - log.Println(we.GetID() + "-" + exitSignalResults + ": execution results: " + exitSignalResults) - } + if !testSignalAfterWorkflowExit { + // wait for results if we haven't waited for them yet + var exitSignalResults string + err = we.Get(context.Background(), &exitSignalResults) + if err != nil { + log.Fatalln("Unable to get workflow results", err) + } + log.Println(we.GetID() + ": Execution results: " + exitSignalResults) + } } From ac53d5be31c75dafa4c7106a55e0edd75878d3a4 Mon Sep 17 00:00:00 2001 From: Joshua Smith Date: Tue, 18 Jun 2024 14:53:38 -0400 Subject: [PATCH 08/10] more changes from code review --- accumulator/accumulate_signals_workflow.go | 4 +++- accumulator/starter/main.go | 4 ++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/accumulator/accumulate_signals_workflow.go b/accumulator/accumulate_signals_workflow.go index 18baf149..558dbc9f 100644 --- a/accumulator/accumulate_signals_workflow.go +++ b/accumulator/accumulate_signals_workflow.go @@ -44,7 +44,9 @@ type GreetingsInfo struct { UniqueGreetingKeys map[string]bool } -// GetNextTimeout returns the maximum time allowed to wait for the next signal. +// GetNextTimeout returns the maximum time for a workflow to wait for the next signal. +// fromFirstSignalTimeout and signalToSignalTimeout can be adjusted to wait for the right amount of time +// This resets with Continue As New func (a *AccumulateGreeting) GetNextTimeout(ctx workflow.Context, timeToExit bool, firstSignalTime time.Time) (time.Duration, error) { if firstSignalTime.IsZero() { firstSignalTime = workflow.Now(ctx) diff --git a/accumulator/starter/main.go b/accumulator/starter/main.go index 807b68a8..d854b624 100644 --- a/accumulator/starter/main.go +++ b/accumulator/starter/main.go @@ -45,7 +45,7 @@ func main() { maxSignals := 20 if triggerContinueAsNewWarning { - maxSignals = 10000 + maxSignals = 5000 } for i := 0; i < maxSignals; i++ { @@ -64,7 +64,7 @@ func main() { GreetingsList: []accumulator.AccumulateGreeting{}, UniqueGreetingKeys: make(map[string]bool), } - time.Sleep(20 * time.Millisecond) + time.Sleep(5 * time.Millisecond) WorkflowID := WorkflowIDPrefix + "-" + bucket workflowOptions := client.StartWorkflowOptions{ From 488ec1de3f806b0414c2b877de17dfde206b6cc5 Mon Sep 17 00:00:00 2001 From: Joshua Smith Date: Thu, 27 Jun 2024 14:58:39 -0400 Subject: [PATCH 09/10] changing test comment --- accumulator/accumulate_signals_workflow_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/accumulator/accumulate_signals_workflow_test.go b/accumulator/accumulate_signals_workflow_test.go index 98de1ac0..5270d5ae 100644 --- a/accumulator/accumulate_signals_workflow_test.go +++ b/accumulator/accumulate_signals_workflow_test.go @@ -308,7 +308,7 @@ func (s *UnitTestSuite) Test_Signal_With_Start_Wait_Too_Short() { s.Contains(result, "Targe Robot") } -// test 9: signal with start, send exit, then signal with start, should get two signals +// test 9: signal with start, send exit, then signal with start, should get one signal func (s *UnitTestSuite) Test_Signal_After_Exit() { env := s.NewTestWorkflowEnvironment() From aeb08fbf0f1e597a2503d1402dbb69dd0a58c8a8 Mon Sep 17 00:00:00 2001 From: Joshua Smith Date: Mon, 15 Jul 2024 13:20:39 -0400 Subject: [PATCH 10/10] changes from review: not using a separate goroutine for signals & catching signals at the end --- accumulator/README.md | 7 +- accumulator/accumulate_signals_workflow.go | 109 +++++++++--------- .../accumulate_signals_workflow_test.go | 6 +- accumulator/starter/main.go | 6 +- 4 files changed, 66 insertions(+), 62 deletions(-) diff --git a/accumulator/README.md b/accumulator/README.md index f15f4a01..e7eeae31 100644 --- a/accumulator/README.md +++ b/accumulator/README.md @@ -4,7 +4,7 @@ This sample implements the Accumulator Pattern: collect many meaningful things t This sample models robots being created throughout the time period, groups them by what color they are, and greets all the robots of a color at the end. -A new workflow is created per grouping. Workflows continue as new as needed. +A new workflow is created per grouping. A sample activity at the end is given, and you could add an activity to process individual events in the processGreeting() method. @@ -12,11 +12,12 @@ Because Temporal Workflows cannot have an unlimited size, Continue As New is use You could create as many groupings as desired, as Temporal Workflows scale out to many workflows without limit. You could vary the time that the workflow waits for other signals, say for a day, or a variable time from first signal with the GetNextTimeout() function. +This example supports exiting early with an exit signal. Pending greetings are still collected after exit signal is sent. ### Steps to run this sample: -1) You need a Temporal service running. See details in README.md +1) You need a Temporal service running. See details in repo's README.md 2) Run the following command to start the worker ``` @@ -29,7 +30,7 @@ go run accumulator/worker/main.go go run accumulator/starter/main.go ``` -You can always run tests with +You can also run tests with ``` go test accumulator/accumulate_signals_workflow_test.go ``` diff --git a/accumulator/accumulate_signals_workflow.go b/accumulator/accumulate_signals_workflow.go index 558dbc9f..e0ba91c9 100644 --- a/accumulator/accumulate_signals_workflow.go +++ b/accumulator/accumulate_signals_workflow.go @@ -8,7 +8,7 @@ import ( ) /** - * This sample demonstrates how to accumulate many signals (events) over a time period. + * This sample demonstrates how to accumulate many signals (business events) over a time period. * This sample implements the Accumulator Pattern: collect many meaningful things that * need to be collected and worked on together, such as all payments for an account, or * all account updates by account. @@ -29,8 +29,11 @@ import ( // signalToSignalTimeout is them maximum time between signals const signalToSignalTimeout = 30 * time.Second -// fromFirstSignalTimeout is the maximum time to receive all signals -const fromFirstSignalTimeout = 60 * time.Second +// fromStartTimeout is the maximum time to receive all signals +const fromStartTimeout = 60 * time.Second + +// exitTimeout is the time to wait after exit is requested to catch any last few signals +const exitTimeout = 1 * time.Second type AccumulateGreeting struct { GreetingText string @@ -42,21 +45,26 @@ type GreetingsInfo struct { BucketKey string GreetingsList []AccumulateGreeting UniqueGreetingKeys map[string]bool + startTime time.Time } // GetNextTimeout returns the maximum time for a workflow to wait for the next signal. -// fromFirstSignalTimeout and signalToSignalTimeout can be adjusted to wait for the right amount of time +// This waits for the greater of the remaining fromStartTimeout and signalToSignalTimeout +// fromStartTimeout and signalToSignalTimeout can be adjusted to wait for the right amount of time as desired // This resets with Continue As New -func (a *AccumulateGreeting) GetNextTimeout(ctx workflow.Context, timeToExit bool, firstSignalTime time.Time) (time.Duration, error) { - if firstSignalTime.IsZero() { - firstSignalTime = workflow.Now(ctx) +func (a *AccumulateGreeting) GetNextTimeout(ctx workflow.Context, startTime time.Time, exitRequested bool) (time.Duration, error) { + if exitRequested { + return exitTimeout, nil + } + if startTime.IsZero() { + startTime = workflow.GetInfo(ctx).WorkflowStartTime // if you want to start from the time of the first signal, customize this } - total := workflow.Now(ctx).Sub(firstSignalTime) - totalLeft := fromFirstSignalTimeout - total + total := workflow.Now(ctx).Sub(startTime) + totalLeft := fromStartTimeout - total if totalLeft <= 0 { return 0, nil } - if signalToSignalTimeout < totalLeft { + if signalToSignalTimeout > totalLeft { return signalToSignalTimeout, nil } return totalLeft, nil @@ -73,62 +81,50 @@ func AccumulateSignalsWorkflow(ctx workflow.Context, greetings GreetingsInfo) (a greetings.UniqueGreetingKeys = make(map[string]bool) } var unprocessedGreetings []AccumulateGreeting - var firstSignalTime time.Time + if greetings.startTime.IsZero() { + greetings.startTime = workflow.Now(ctx) + } exitRequested := false ao := workflow.ActivityOptions{ - StartToCloseTimeout: 10 * time.Second, + StartToCloseTimeout: 100 * time.Second, } ctx = workflow.WithActivityOptions(ctx, ao) - // Listen to signals in a different goroutine - workflow.Go(ctx, func(gCtx workflow.Context) { - for { - log.Debug("In workflow.go signals goroutine for{}") - selector := workflow.NewSelector(gCtx) - selector.AddReceive(workflow.GetSignalChannel(gCtx, "greeting"), func(c workflow.ReceiveChannel, more bool) { - c.Receive(gCtx, &a) - unprocessedGreetings = append(unprocessedGreetings, a) - log.Debug("Signal Received with text: " + a.GreetingText + ", more: " + strconv.FormatBool(more)) - - a = AccumulateGreeting{} - }) - selector.AddReceive(workflow.GetSignalChannel(gCtx, "exit"), func(c workflow.ReceiveChannel, more bool) { - c.Receive(gCtx, nil) - exitRequested = true - log.Debug("Exit Signal Received") - }) - log.Debug("Before select greeting text: " + a.GreetingText) - selector.Select(gCtx) - log.Debug("After select, greeting text: " + a.GreetingText) - if firstSignalTime.IsZero() { - firstSignalTime = workflow.Now(gCtx) - } - } - }) - for !workflow.GetInfo(ctx).GetContinueAsNewSuggested() { - // Wait for Signal or timeout - timeout, err := a.GetNextTimeout(ctx, exitRequested, firstSignalTime) + + timeout, err := a.GetNextTimeout(ctx, greetings.startTime, exitRequested) + childCtx, cancelHandler := workflow.WithCancel(ctx) + selector := workflow.NewSelector(ctx) if err != nil { - log.Warn("Error awaiting signal") + log.Error("Error calculating timeout") return "", err } - if timeout <= 0 { - // do final processing? or just check for signal one more time - log.Debug("No time left for signals, checking one more time") - } - log.Debug("Awaiting for " + timeout.String()) - gotSignalBeforeTimeout, _ := workflow.AwaitWithTimeout(ctx, timeout, func() bool { - return len(unprocessedGreetings) > 0 || exitRequested + selector.AddReceive(workflow.GetSignalChannel(ctx, "greeting"), func(c workflow.ReceiveChannel, more bool) { + c.Receive(ctx, &a) + unprocessedGreetings = append(unprocessedGreetings, a) + log.Debug("Signal Received with text: " + a.GreetingText + ", more?: " + strconv.FormatBool(more) + "\n") + cancelHandler() // cancel timer future + a = AccumulateGreeting{} + }) + selector.AddReceive(workflow.GetSignalChannel(ctx, "exit"), func(c workflow.ReceiveChannel, more bool) { + c.Receive(ctx, nil) + exitRequested = true + cancelHandler() // cancel timer future + log.Debug("Exit Signal Received, more?: " + strconv.FormatBool(more) + "\n") + }) + + timerFuture := workflow.NewTimer(childCtx, timeout) + selector.AddFuture(timerFuture, func(f workflow.Future) { + log.Debug("Timer fired \n") }) - // timeout happened without a signal coming in, so let's process the greetings and wrap it up! - if len(unprocessedGreetings) == 0 { - log.Debug("Into final processing, signal received? " + strconv.FormatBool(gotSignalBeforeTimeout) + ", exit requested: " + strconv.FormatBool(exitRequested)) - // do final processing + selector.Select(ctx) + + if len(unprocessedGreetings) == 0 { // timeout without a signal coming in, so let's process the greetings and wrap it up! + log.Debug("Into final processing, received greetings count: " + strconv.Itoa(len(greetings.GreetingsList)) + "\n") allGreetings = "" err := workflow.ExecuteActivity(ctx, ComposeGreeting, greetings.GreetingsList).Get(ctx, &allGreetings) if err != nil { @@ -136,15 +132,20 @@ func AccumulateSignalsWorkflow(ctx workflow.Context, greetings GreetingsInfo) (a return allGreetings, err } - return allGreetings, nil + if !selector.HasPending() { // in case a signal came in while activity was running, check again + return allGreetings, nil + } else { + log.Info("Received a signal while processing ComposeGreeting activity.") + } } - /* process latest signal + /* process latest signals * Here is where we can process individual signals as they come in. * It's ok to call activities here. * This also validates an individual greeting: * - check for duplicates * - check for correct bucket + * Using update validation could improve this in the future */ toProcess := unprocessedGreetings unprocessedGreetings = []AccumulateGreeting{} diff --git a/accumulator/accumulate_signals_workflow_test.go b/accumulator/accumulate_signals_workflow_test.go index 5270d5ae..116c6b82 100644 --- a/accumulator/accumulate_signals_workflow_test.go +++ b/accumulator/accumulate_signals_workflow_test.go @@ -253,7 +253,7 @@ func (s *UnitTestSuite) Test_Signal_With_Start_Wait_Too_Long() { targeGreeting.Bucket = bucket targeGreeting.GreetingKey = "11" env.SignalWorkflow("greeting", targeGreeting) - }, fromFirstSignalTimeout+time.Second*1) + }, fromStartTimeout+time.Second*1) greetings := GreetingsInfo{ BucketKey: bucket, @@ -346,7 +346,7 @@ func (s *UnitTestSuite) Test_Signal_After_Exit() { s.NoError(env.GetWorkflowError()) var result string s.NoError(env.GetWorkflowResult(&result)) - s.Contains(result, "Hello (1)") + s.Contains(result, "Hello (2)") s.Contains(result, "John Robot") - s.NotContains(result, "Targe Robot") + s.Contains(result, "Targe Robot") } diff --git a/accumulator/starter/main.go b/accumulator/starter/main.go index d854b624..9ff06a10 100644 --- a/accumulator/starter/main.go +++ b/accumulator/starter/main.go @@ -149,9 +149,11 @@ func main() { log.Fatalln("Unable to signal workflow "+we.GetID(), err) } log.Println(we.GetID() + ": Sent exit") + // Signals after this test sending more signals after workflow exit + time.Sleep(5 * time.Millisecond) } - // Test sending more signals after workflow exit + janeGreeting := new(accumulator.AccumulateGreeting) janeGreeting.GreetingText = "Jane Robot" @@ -174,7 +176,7 @@ func main() { if err != nil { log.Println("Workflow " + we.GetID() + " not found to signal - this is intentional: " + err.Error()) } - log.Println("Sent " + badBucketGreeting.GreetingText + " to " + we.GetID()) + log.Println("Sent invalid bucket signal " + badBucketGreeting.GreetingText + ", " + badBucketGreeting.Bucket + " to " + we.GetID()) } if testDuplicate {