Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding Accumulator Example #355

Open
wants to merge 14 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,9 @@ These samples demonstrate some common control flow patterns using Temporal's Go
- [**Await for signal processing**](./await-signals): Demonstrates how
to process out of order signals processing using `Await` and `AwaitWithTimeout`.

- [**Accumulate Signals**](./accumulator): Demonstrates how
to process many signals using `AwaitWithTimeout`, continue as new, and activities.

- [**Worker-specific Task Queues**](./worker-specific-task-queues): Use a unique task queue per Worker to have certain Activities only run on that specific Worker. For instance for a file processing Workflow, where one Activity downloads a file and subsequent Activities need to operate on that file. (If multiple Workers were on the same queue, subsequent Activities may get run on different machines that don't have the downloaded file.)

- [**Nexus**](./nexus): Demonstrates how to use the Nexus APIs to facilitate cross namespace calls.
Expand Down
36 changes: 36 additions & 0 deletions accumulator/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# Accumulator
This sample demonstrates how to accumulate many signals (events) over a time period.
This sample implements the Accumulator Pattern: collect many meaningful things that need to be collected and worked on together, such as all payments for an account, or all account updates by account.

This sample models robots being created throughout the time period, groups them by what color they are, and greets all the robots of a color at the end.

A new workflow is created per grouping.
A sample activity at the end is given, and you could add an activity to
process individual events in the processGreeting() method.

Because Temporal Workflows cannot have an unlimited size, Continue As New is used to process more signals that may come in.
You could create as many groupings as desired, as Temporal Workflows scale out to many workflows without limit.

You could vary the time that the workflow waits for other signals, say for a day, or a variable time from first signal with the GetNextTimeout() function.
This example supports exiting early with an exit signal. Pending greetings are still collected after exit signal is sent.


### Steps to run this sample:

1) You need a Temporal service running. See details in repo's README.md
2) Run the following command to start the worker

```
go run accumulator/worker/main.go
```

3) Run the following command to start the workflow and send signals in random order

```
go run accumulator/starter/main.go
```

You can also run tests with
```
go test accumulator/accumulate_signals_workflow_test.go
```
23 changes: 23 additions & 0 deletions accumulator/accumulate_signals_activities.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package accumulator

import (
"context"
"fmt"
"go.temporal.io/sdk/activity"
)

// this activity will process all of the signals together
func ComposeGreeting(ctx context.Context, s []AccumulateGreeting) (string, error) {
log := activity.GetLogger(ctx)
if len(s) == 0 {
log.Warn("No greetings found when trying to Compose Greetings.")
}

words := fmt.Sprintf("Hello (%v) Robots", len(s))
for _, v := range s {
words += ", " + v.GreetingText
}
words += "!"
return words, nil

}
168 changes: 168 additions & 0 deletions accumulator/accumulate_signals_workflow.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
package accumulator

import (
"strconv"
"time"

"go.temporal.io/sdk/workflow"
)

/**
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably don't need to duplicate readme content in the file

* This sample demonstrates how to accumulate many signals (business events) over a time period.
* This sample implements the Accumulator Pattern: collect many meaningful things that
* need to be collected and worked on together, such as all payments for an account, or
* all account updates by account.
* This sample models robots being created throughout the time period,
* groups them by what color they are, and greets all the robots of a color at the end.
*
* A new workflow is created per grouping. Workflows continue as new as needed.
* A sample activity at the end is given, and you could add an activity to
* process individual events in the processGreeting() method.
*
* Because Temporal Workflows cannot have an unlimited size, Continue As New is used
* to process more signals that may come in.
* You could create as many groupings as desired, as Temporal Workflows scale out to many workflows without limit.
* You could vary the time that the workflow waits for other signals, say for a day, or a variable time from first
* signal with the GetNextTimeout() function.
*/

// signalToSignalTimeout is them maximum time between signals
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// signalToSignalTimeout is them maximum time between signals
// signalToSignalTimeout is the maximum time between signals

const signalToSignalTimeout = 30 * time.Second

// fromStartTimeout is the maximum time to receive all signals
const fromStartTimeout = 60 * time.Second

// exitTimeout is the time to wait after exit is requested to catch any last few signals
const exitTimeout = 1 * time.Second

type AccumulateGreeting struct {
GreetingText string
Bucket string
GreetingKey string
}

type GreetingsInfo struct {
BucketKey string
GreetingsList []AccumulateGreeting
UniqueGreetingKeys map[string]bool
startTime time.Time
}

// GetNextTimeout returns the maximum time for a workflow to wait for the next signal.
// This waits for the greater of the remaining fromStartTimeout and signalToSignalTimeout
// fromStartTimeout and signalToSignalTimeout can be adjusted to wait for the right amount of time as desired
// This resets with Continue As New
func (a *AccumulateGreeting) GetNextTimeout(ctx workflow.Context, startTime time.Time, exitRequested bool) (time.Duration, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this a method on AccumulateGreeting struct? And why exported?

if exitRequested {
return exitTimeout, nil
}
if startTime.IsZero() {
startTime = workflow.GetInfo(ctx).WorkflowStartTime // if you want to start from the time of the first signal, customize this
}
total := workflow.Now(ctx).Sub(startTime)
totalLeft := fromStartTimeout - total
if totalLeft <= 0 {
return 0, nil
}
if signalToSignalTimeout > totalLeft {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't you mean:

Suggested change
if signalToSignalTimeout > totalLeft {
if totalLeft > signalToSignalTimeout {

Since signalToSignalTimeout says it's the "maximum"?

return signalToSignalTimeout, nil
}
return totalLeft, nil
}

// AccumulateSignalsWorkflow workflow definition
func AccumulateSignalsWorkflow(ctx workflow.Context, greetings GreetingsInfo) (allGreetings string, err error) {
log := workflow.GetLogger(ctx)
var a AccumulateGreeting
if greetings.GreetingsList == nil {
greetings.GreetingsList = []AccumulateGreeting{}
}
Comment on lines +77 to +79
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if greetings.GreetingsList == nil {
greetings.GreetingsList = []AccumulateGreeting{}
}

This is not needed

if greetings.UniqueGreetingKeys == nil {
greetings.UniqueGreetingKeys = make(map[string]bool)
}
var unprocessedGreetings []AccumulateGreeting
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The way the code is currently written, this will only ever have one value in it, so why make it a slice?

if greetings.startTime.IsZero() {
greetings.startTime = workflow.Now(ctx)
}
exitRequested := false

ao := workflow.ActivityOptions{
StartToCloseTimeout: 100 * time.Second,
}
ctx = workflow.WithActivityOptions(ctx, ao)

for !workflow.GetInfo(ctx).GetContinueAsNewSuggested() {

timeout, err := a.GetNextTimeout(ctx, greetings.startTime, exitRequested)
childCtx, cancelHandler := workflow.WithCancel(ctx)
selector := workflow.NewSelector(ctx)

if err != nil {
log.Error("Error calculating timeout")
return "", err
}
log.Debug("Awaiting for " + timeout.String())
selector.AddReceive(workflow.GetSignalChannel(ctx, "greeting"), func(c workflow.ReceiveChannel, more bool) {
c.Receive(ctx, &a)
unprocessedGreetings = append(unprocessedGreetings, a)
log.Debug("Signal Received with text: " + a.GreetingText + ", more?: " + strconv.FormatBool(more) + "\n")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
log.Debug("Signal Received with text: " + a.GreetingText + ", more?: " + strconv.FormatBool(more) + "\n")
log.Debug("Signal Received", "Text", a.GreetingText, "More", more)

Loggers are expected to take key-value state. Also, don't append newlines to log statements.

cancelHandler() // cancel timer future
a = AccumulateGreeting{}
})
selector.AddReceive(workflow.GetSignalChannel(ctx, "exit"), func(c workflow.ReceiveChannel, more bool) {
c.Receive(ctx, nil)
exitRequested = true
cancelHandler() // cancel timer future
log.Debug("Exit Signal Received, more?: " + strconv.FormatBool(more) + "\n")
})

timerFuture := workflow.NewTimer(childCtx, timeout)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should drain the signals before waiting. What happens if you get 100 signals at once? You are starting 100 timers erroneously. You should only wait when there are no signals in the channel.

selector.AddFuture(timerFuture, func(f workflow.Future) {
log.Debug("Timer fired \n")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be a workflow cancel error, it should not be swallowed

})

selector.Select(ctx)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you help me understand the goal here? Are you wanting to wait for the first signal or timer fired? Or do you want to collect all signals until timer reached and then process them? You need to loop on the select until timer fired if you want the latter and not cancel the timer on each signal.


if len(unprocessedGreetings) == 0 { // timeout without a signal coming in, so let's process the greetings and wrap it up!
log.Debug("Into final processing, received greetings count: " + strconv.Itoa(len(greetings.GreetingsList)) + "\n")
allGreetings = ""
err := workflow.ExecuteActivity(ctx, ComposeGreeting, greetings.GreetingsList).Get(ctx, &allGreetings)
if err != nil {
log.Error("ComposeGreeting activity failed.", "Error", err)
return allGreetings, err
}

if !selector.HasPending() { // in case a signal came in while activity was running, check again
return allGreetings, nil
} else {
log.Info("Received a signal while processing ComposeGreeting activity.")
}
}

/* process latest signals
* Here is where we can process individual signals as they come in.
* It's ok to call activities here.
* This also validates an individual greeting:
* - check for duplicates
* - check for correct bucket
* Using update validation could improve this in the future
*/
toProcess := unprocessedGreetings
unprocessedGreetings = []AccumulateGreeting{}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
unprocessedGreetings = []AccumulateGreeting{}
unprocessedGreetings = nil


for _, ug := range toProcess {
if ug.Bucket != greetings.BucketKey {
log.Warn("Wrong bucket, something is wrong with your signal processing. WF Bucket: [" + greetings.BucketKey + "], greeting bucket: [" + ug.Bucket + "]")
} else if greetings.UniqueGreetingKeys[ug.GreetingKey] {
log.Warn("Duplicate Greeting Key. Key: [" + ug.GreetingKey + "]")
} else {
greetings.UniqueGreetingKeys[ug.GreetingKey] = true
greetings.GreetingsList = append(greetings.GreetingsList, ug)
}
}

}

log.Debug("Accumulate workflow starting new run with " + strconv.Itoa(len(greetings.GreetingsList)) + " greetings.")
return "Continued As New.", workflow.NewContinueAsNewError(ctx, AccumulateSignalsWorkflow, greetings)
}
Loading