-
Notifications
You must be signed in to change notification settings - Fork 201
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Adding Accumulator Example #355
base: main
Are you sure you want to change the base?
Changes from all commits
a315242
a15cfd5
d71fc8c
885fce7
ef04ded
73a8a94
03a5f3d
963cce0
58b5442
ac53d5b
e5acbf3
488ec1d
aeb08fb
789a06a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,36 @@ | ||
# Accumulator | ||
This sample demonstrates how to accumulate many signals (events) over a time period. | ||
This sample implements the Accumulator Pattern: collect many meaningful things that need to be collected and worked on together, such as all payments for an account, or all account updates by account. | ||
|
||
This sample models robots being created throughout the time period, groups them by what color they are, and greets all the robots of a color at the end. | ||
|
||
A new workflow is created per grouping. | ||
A sample activity at the end is given, and you could add an activity to | ||
process individual events in the processGreeting() method. | ||
|
||
Because Temporal Workflows cannot have an unlimited size, Continue As New is used to process more signals that may come in. | ||
You could create as many groupings as desired, as Temporal Workflows scale out to many workflows without limit. | ||
|
||
You could vary the time that the workflow waits for other signals, say for a day, or a variable time from first signal with the GetNextTimeout() function. | ||
This example supports exiting early with an exit signal. Pending greetings are still collected after exit signal is sent. | ||
|
||
|
||
### Steps to run this sample: | ||
|
||
1) You need a Temporal service running. See details in repo's README.md | ||
2) Run the following command to start the worker | ||
|
||
``` | ||
go run accumulator/worker/main.go | ||
``` | ||
|
||
3) Run the following command to start the workflow and send signals in random order | ||
|
||
``` | ||
go run accumulator/starter/main.go | ||
``` | ||
|
||
You can also run tests with | ||
``` | ||
go test accumulator/accumulate_signals_workflow_test.go | ||
``` |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,23 @@ | ||
package accumulator | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"go.temporal.io/sdk/activity" | ||
) | ||
|
||
// this activity will process all of the signals together | ||
func ComposeGreeting(ctx context.Context, s []AccumulateGreeting) (string, error) { | ||
log := activity.GetLogger(ctx) | ||
if len(s) == 0 { | ||
log.Warn("No greetings found when trying to Compose Greetings.") | ||
} | ||
|
||
words := fmt.Sprintf("Hello (%v) Robots", len(s)) | ||
for _, v := range s { | ||
words += ", " + v.GreetingText | ||
} | ||
words += "!" | ||
return words, nil | ||
|
||
} |
Original file line number | Diff line number | Diff line change | ||||||
---|---|---|---|---|---|---|---|---|
@@ -0,0 +1,168 @@ | ||||||||
package accumulator | ||||||||
|
||||||||
import ( | ||||||||
"strconv" | ||||||||
"time" | ||||||||
|
||||||||
"go.temporal.io/sdk/workflow" | ||||||||
) | ||||||||
|
||||||||
/** | ||||||||
* This sample demonstrates how to accumulate many signals (business events) over a time period. | ||||||||
* This sample implements the Accumulator Pattern: collect many meaningful things that | ||||||||
* need to be collected and worked on together, such as all payments for an account, or | ||||||||
* all account updates by account. | ||||||||
* This sample models robots being created throughout the time period, | ||||||||
* groups them by what color they are, and greets all the robots of a color at the end. | ||||||||
* | ||||||||
* A new workflow is created per grouping. Workflows continue as new as needed. | ||||||||
* A sample activity at the end is given, and you could add an activity to | ||||||||
* process individual events in the processGreeting() method. | ||||||||
* | ||||||||
* Because Temporal Workflows cannot have an unlimited size, Continue As New is used | ||||||||
* to process more signals that may come in. | ||||||||
* You could create as many groupings as desired, as Temporal Workflows scale out to many workflows without limit. | ||||||||
* You could vary the time that the workflow waits for other signals, say for a day, or a variable time from first | ||||||||
* signal with the GetNextTimeout() function. | ||||||||
*/ | ||||||||
|
||||||||
// signalToSignalTimeout is them maximum time between signals | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||
const signalToSignalTimeout = 30 * time.Second | ||||||||
|
||||||||
// fromStartTimeout is the maximum time to receive all signals | ||||||||
const fromStartTimeout = 60 * time.Second | ||||||||
|
||||||||
// exitTimeout is the time to wait after exit is requested to catch any last few signals | ||||||||
const exitTimeout = 1 * time.Second | ||||||||
|
||||||||
type AccumulateGreeting struct { | ||||||||
GreetingText string | ||||||||
Bucket string | ||||||||
GreetingKey string | ||||||||
} | ||||||||
|
||||||||
type GreetingsInfo struct { | ||||||||
BucketKey string | ||||||||
GreetingsList []AccumulateGreeting | ||||||||
UniqueGreetingKeys map[string]bool | ||||||||
startTime time.Time | ||||||||
} | ||||||||
|
||||||||
// GetNextTimeout returns the maximum time for a workflow to wait for the next signal. | ||||||||
// This waits for the greater of the remaining fromStartTimeout and signalToSignalTimeout | ||||||||
// fromStartTimeout and signalToSignalTimeout can be adjusted to wait for the right amount of time as desired | ||||||||
// This resets with Continue As New | ||||||||
func (a *AccumulateGreeting) GetNextTimeout(ctx workflow.Context, startTime time.Time, exitRequested bool) (time.Duration, error) { | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why is this a method on |
||||||||
if exitRequested { | ||||||||
return exitTimeout, nil | ||||||||
} | ||||||||
if startTime.IsZero() { | ||||||||
startTime = workflow.GetInfo(ctx).WorkflowStartTime // if you want to start from the time of the first signal, customize this | ||||||||
} | ||||||||
total := workflow.Now(ctx).Sub(startTime) | ||||||||
totalLeft := fromStartTimeout - total | ||||||||
if totalLeft <= 0 { | ||||||||
return 0, nil | ||||||||
} | ||||||||
if signalToSignalTimeout > totalLeft { | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Don't you mean:
Suggested change
Since |
||||||||
return signalToSignalTimeout, nil | ||||||||
} | ||||||||
return totalLeft, nil | ||||||||
} | ||||||||
|
||||||||
// AccumulateSignalsWorkflow workflow definition | ||||||||
func AccumulateSignalsWorkflow(ctx workflow.Context, greetings GreetingsInfo) (allGreetings string, err error) { | ||||||||
log := workflow.GetLogger(ctx) | ||||||||
var a AccumulateGreeting | ||||||||
if greetings.GreetingsList == nil { | ||||||||
greetings.GreetingsList = []AccumulateGreeting{} | ||||||||
} | ||||||||
Comment on lines
+77
to
+79
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
This is not needed |
||||||||
if greetings.UniqueGreetingKeys == nil { | ||||||||
greetings.UniqueGreetingKeys = make(map[string]bool) | ||||||||
} | ||||||||
var unprocessedGreetings []AccumulateGreeting | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The way the code is currently written, this will only ever have one value in it, so why make it a slice? |
||||||||
if greetings.startTime.IsZero() { | ||||||||
greetings.startTime = workflow.Now(ctx) | ||||||||
} | ||||||||
exitRequested := false | ||||||||
|
||||||||
ao := workflow.ActivityOptions{ | ||||||||
StartToCloseTimeout: 100 * time.Second, | ||||||||
} | ||||||||
ctx = workflow.WithActivityOptions(ctx, ao) | ||||||||
|
||||||||
for !workflow.GetInfo(ctx).GetContinueAsNewSuggested() { | ||||||||
|
||||||||
timeout, err := a.GetNextTimeout(ctx, greetings.startTime, exitRequested) | ||||||||
childCtx, cancelHandler := workflow.WithCancel(ctx) | ||||||||
selector := workflow.NewSelector(ctx) | ||||||||
|
||||||||
if err != nil { | ||||||||
log.Error("Error calculating timeout") | ||||||||
return "", err | ||||||||
} | ||||||||
log.Debug("Awaiting for " + timeout.String()) | ||||||||
selector.AddReceive(workflow.GetSignalChannel(ctx, "greeting"), func(c workflow.ReceiveChannel, more bool) { | ||||||||
c.Receive(ctx, &a) | ||||||||
unprocessedGreetings = append(unprocessedGreetings, a) | ||||||||
log.Debug("Signal Received with text: " + a.GreetingText + ", more?: " + strconv.FormatBool(more) + "\n") | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
Loggers are expected to take key-value state. Also, don't append newlines to log statements. |
||||||||
cancelHandler() // cancel timer future | ||||||||
a = AccumulateGreeting{} | ||||||||
}) | ||||||||
selector.AddReceive(workflow.GetSignalChannel(ctx, "exit"), func(c workflow.ReceiveChannel, more bool) { | ||||||||
c.Receive(ctx, nil) | ||||||||
exitRequested = true | ||||||||
cancelHandler() // cancel timer future | ||||||||
log.Debug("Exit Signal Received, more?: " + strconv.FormatBool(more) + "\n") | ||||||||
}) | ||||||||
|
||||||||
timerFuture := workflow.NewTimer(childCtx, timeout) | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You should drain the signals before waiting. What happens if you get 100 signals at once? You are starting 100 timers erroneously. You should only wait when there are no signals in the channel. |
||||||||
selector.AddFuture(timerFuture, func(f workflow.Future) { | ||||||||
log.Debug("Timer fired \n") | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This can be a workflow cancel error, it should not be swallowed |
||||||||
}) | ||||||||
|
||||||||
selector.Select(ctx) | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you help me understand the goal here? Are you wanting to wait for the first signal or timer fired? Or do you want to collect all signals until timer reached and then process them? You need to loop on the select until timer fired if you want the latter and not cancel the timer on each signal. |
||||||||
|
||||||||
if len(unprocessedGreetings) == 0 { // timeout without a signal coming in, so let's process the greetings and wrap it up! | ||||||||
log.Debug("Into final processing, received greetings count: " + strconv.Itoa(len(greetings.GreetingsList)) + "\n") | ||||||||
allGreetings = "" | ||||||||
err := workflow.ExecuteActivity(ctx, ComposeGreeting, greetings.GreetingsList).Get(ctx, &allGreetings) | ||||||||
if err != nil { | ||||||||
log.Error("ComposeGreeting activity failed.", "Error", err) | ||||||||
return allGreetings, err | ||||||||
} | ||||||||
|
||||||||
if !selector.HasPending() { // in case a signal came in while activity was running, check again | ||||||||
return allGreetings, nil | ||||||||
} else { | ||||||||
log.Info("Received a signal while processing ComposeGreeting activity.") | ||||||||
} | ||||||||
} | ||||||||
|
||||||||
/* process latest signals | ||||||||
* Here is where we can process individual signals as they come in. | ||||||||
* It's ok to call activities here. | ||||||||
* This also validates an individual greeting: | ||||||||
* - check for duplicates | ||||||||
* - check for correct bucket | ||||||||
* Using update validation could improve this in the future | ||||||||
*/ | ||||||||
toProcess := unprocessedGreetings | ||||||||
unprocessedGreetings = []AccumulateGreeting{} | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||
|
||||||||
for _, ug := range toProcess { | ||||||||
if ug.Bucket != greetings.BucketKey { | ||||||||
log.Warn("Wrong bucket, something is wrong with your signal processing. WF Bucket: [" + greetings.BucketKey + "], greeting bucket: [" + ug.Bucket + "]") | ||||||||
} else if greetings.UniqueGreetingKeys[ug.GreetingKey] { | ||||||||
log.Warn("Duplicate Greeting Key. Key: [" + ug.GreetingKey + "]") | ||||||||
} else { | ||||||||
greetings.UniqueGreetingKeys[ug.GreetingKey] = true | ||||||||
greetings.GreetingsList = append(greetings.GreetingsList, ug) | ||||||||
} | ||||||||
} | ||||||||
|
||||||||
} | ||||||||
|
||||||||
log.Debug("Accumulate workflow starting new run with " + strconv.Itoa(len(greetings.GreetingsList)) + " greetings.") | ||||||||
return "Continued As New.", workflow.NewContinueAsNewError(ctx, AccumulateSignalsWorkflow, greetings) | ||||||||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably don't need to duplicate readme content in the file