Skip to content

Commit

Permalink
changes from review: not using a separate goroutine for signals & cat…
Browse files Browse the repository at this point in the history
…ching signals at the end
  • Loading branch information
joshmsmith committed Jul 15, 2024
1 parent 488ec1d commit aeb08fb
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 62 deletions.
7 changes: 4 additions & 3 deletions accumulator/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,20 @@ This sample implements the Accumulator Pattern: collect many meaningful things t

This sample models robots being created throughout the time period, groups them by what color they are, and greets all the robots of a color at the end.

A new workflow is created per grouping. Workflows continue as new as needed.
A new workflow is created per grouping.
A sample activity at the end is given, and you could add an activity to
process individual events in the processGreeting() method.

Because Temporal Workflows cannot have an unlimited size, Continue As New is used to process more signals that may come in.
You could create as many groupings as desired, as Temporal Workflows scale out to many workflows without limit.

You could vary the time that the workflow waits for other signals, say for a day, or a variable time from first signal with the GetNextTimeout() function.
This example supports exiting early with an exit signal. Pending greetings are still collected after exit signal is sent.


### Steps to run this sample:

1) You need a Temporal service running. See details in README.md
1) You need a Temporal service running. See details in repo's README.md
2) Run the following command to start the worker

```
Expand All @@ -29,7 +30,7 @@ go run accumulator/worker/main.go
go run accumulator/starter/main.go
```

You can always run tests with
You can also run tests with
```
go test accumulator/accumulate_signals_workflow_test.go
```
109 changes: 55 additions & 54 deletions accumulator/accumulate_signals_workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
)

/**
* This sample demonstrates how to accumulate many signals (events) over a time period.
* This sample demonstrates how to accumulate many signals (business events) over a time period.
* This sample implements the Accumulator Pattern: collect many meaningful things that
* need to be collected and worked on together, such as all payments for an account, or
* all account updates by account.
Expand All @@ -29,8 +29,11 @@ import (
// signalToSignalTimeout is them maximum time between signals
const signalToSignalTimeout = 30 * time.Second

// fromFirstSignalTimeout is the maximum time to receive all signals
const fromFirstSignalTimeout = 60 * time.Second
// fromStartTimeout is the maximum time to receive all signals
const fromStartTimeout = 60 * time.Second

// exitTimeout is the time to wait after exit is requested to catch any last few signals
const exitTimeout = 1 * time.Second

type AccumulateGreeting struct {
GreetingText string
Expand All @@ -42,21 +45,26 @@ type GreetingsInfo struct {
BucketKey string
GreetingsList []AccumulateGreeting
UniqueGreetingKeys map[string]bool
startTime time.Time
}

// GetNextTimeout returns the maximum time for a workflow to wait for the next signal.
// fromFirstSignalTimeout and signalToSignalTimeout can be adjusted to wait for the right amount of time
// This waits for the greater of the remaining fromStartTimeout and signalToSignalTimeout
// fromStartTimeout and signalToSignalTimeout can be adjusted to wait for the right amount of time as desired
// This resets with Continue As New
func (a *AccumulateGreeting) GetNextTimeout(ctx workflow.Context, timeToExit bool, firstSignalTime time.Time) (time.Duration, error) {
if firstSignalTime.IsZero() {
firstSignalTime = workflow.Now(ctx)
func (a *AccumulateGreeting) GetNextTimeout(ctx workflow.Context, startTime time.Time, exitRequested bool) (time.Duration, error) {
if exitRequested {
return exitTimeout, nil
}
if startTime.IsZero() {
startTime = workflow.GetInfo(ctx).WorkflowStartTime // if you want to start from the time of the first signal, customize this
}
total := workflow.Now(ctx).Sub(firstSignalTime)
totalLeft := fromFirstSignalTimeout - total
total := workflow.Now(ctx).Sub(startTime)
totalLeft := fromStartTimeout - total
if totalLeft <= 0 {
return 0, nil
}
if signalToSignalTimeout < totalLeft {
if signalToSignalTimeout > totalLeft {
return signalToSignalTimeout, nil
}
return totalLeft, nil
Expand All @@ -73,78 +81,71 @@ func AccumulateSignalsWorkflow(ctx workflow.Context, greetings GreetingsInfo) (a
greetings.UniqueGreetingKeys = make(map[string]bool)
}
var unprocessedGreetings []AccumulateGreeting
var firstSignalTime time.Time
if greetings.startTime.IsZero() {
greetings.startTime = workflow.Now(ctx)
}
exitRequested := false

ao := workflow.ActivityOptions{
StartToCloseTimeout: 10 * time.Second,
StartToCloseTimeout: 100 * time.Second,
}
ctx = workflow.WithActivityOptions(ctx, ao)

// Listen to signals in a different goroutine
workflow.Go(ctx, func(gCtx workflow.Context) {
for {
log.Debug("In workflow.go signals goroutine for{}")
selector := workflow.NewSelector(gCtx)
selector.AddReceive(workflow.GetSignalChannel(gCtx, "greeting"), func(c workflow.ReceiveChannel, more bool) {
c.Receive(gCtx, &a)
unprocessedGreetings = append(unprocessedGreetings, a)
log.Debug("Signal Received with text: " + a.GreetingText + ", more: " + strconv.FormatBool(more))

a = AccumulateGreeting{}
})
selector.AddReceive(workflow.GetSignalChannel(gCtx, "exit"), func(c workflow.ReceiveChannel, more bool) {
c.Receive(gCtx, nil)
exitRequested = true
log.Debug("Exit Signal Received")
})
log.Debug("Before select greeting text: " + a.GreetingText)
selector.Select(gCtx)
log.Debug("After select, greeting text: " + a.GreetingText)
if firstSignalTime.IsZero() {
firstSignalTime = workflow.Now(gCtx)
}
}
})

for !workflow.GetInfo(ctx).GetContinueAsNewSuggested() {
// Wait for Signal or timeout
timeout, err := a.GetNextTimeout(ctx, exitRequested, firstSignalTime)

timeout, err := a.GetNextTimeout(ctx, greetings.startTime, exitRequested)
childCtx, cancelHandler := workflow.WithCancel(ctx)
selector := workflow.NewSelector(ctx)

if err != nil {
log.Warn("Error awaiting signal")
log.Error("Error calculating timeout")
return "", err
}
if timeout <= 0 {
// do final processing? or just check for signal one more time
log.Debug("No time left for signals, checking one more time")
}

log.Debug("Awaiting for " + timeout.String())
gotSignalBeforeTimeout, _ := workflow.AwaitWithTimeout(ctx, timeout, func() bool {
return len(unprocessedGreetings) > 0 || exitRequested
selector.AddReceive(workflow.GetSignalChannel(ctx, "greeting"), func(c workflow.ReceiveChannel, more bool) {
c.Receive(ctx, &a)
unprocessedGreetings = append(unprocessedGreetings, a)
log.Debug("Signal Received with text: " + a.GreetingText + ", more?: " + strconv.FormatBool(more) + "\n")
cancelHandler() // cancel timer future
a = AccumulateGreeting{}
})
selector.AddReceive(workflow.GetSignalChannel(ctx, "exit"), func(c workflow.ReceiveChannel, more bool) {
c.Receive(ctx, nil)
exitRequested = true
cancelHandler() // cancel timer future
log.Debug("Exit Signal Received, more?: " + strconv.FormatBool(more) + "\n")
})

timerFuture := workflow.NewTimer(childCtx, timeout)
selector.AddFuture(timerFuture, func(f workflow.Future) {
log.Debug("Timer fired \n")
})

// timeout happened without a signal coming in, so let's process the greetings and wrap it up!
if len(unprocessedGreetings) == 0 {
log.Debug("Into final processing, signal received? " + strconv.FormatBool(gotSignalBeforeTimeout) + ", exit requested: " + strconv.FormatBool(exitRequested))
// do final processing
selector.Select(ctx)

if len(unprocessedGreetings) == 0 { // timeout without a signal coming in, so let's process the greetings and wrap it up!
log.Debug("Into final processing, received greetings count: " + strconv.Itoa(len(greetings.GreetingsList)) + "\n")
allGreetings = ""
err := workflow.ExecuteActivity(ctx, ComposeGreeting, greetings.GreetingsList).Get(ctx, &allGreetings)
if err != nil {
log.Error("ComposeGreeting activity failed.", "Error", err)
return allGreetings, err
}

return allGreetings, nil
if !selector.HasPending() { // in case a signal came in while activity was running, check again
return allGreetings, nil
} else {
log.Info("Received a signal while processing ComposeGreeting activity.")
}
}

/* process latest signal
/* process latest signals
* Here is where we can process individual signals as they come in.
* It's ok to call activities here.
* This also validates an individual greeting:
* - check for duplicates
* - check for correct bucket
* Using update validation could improve this in the future
*/
toProcess := unprocessedGreetings
unprocessedGreetings = []AccumulateGreeting{}
Expand Down
6 changes: 3 additions & 3 deletions accumulator/accumulate_signals_workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ func (s *UnitTestSuite) Test_Signal_With_Start_Wait_Too_Long() {
targeGreeting.Bucket = bucket
targeGreeting.GreetingKey = "11"
env.SignalWorkflow("greeting", targeGreeting)
}, fromFirstSignalTimeout+time.Second*1)
}, fromStartTimeout+time.Second*1)

greetings := GreetingsInfo{
BucketKey: bucket,
Expand Down Expand Up @@ -346,7 +346,7 @@ func (s *UnitTestSuite) Test_Signal_After_Exit() {
s.NoError(env.GetWorkflowError())
var result string
s.NoError(env.GetWorkflowResult(&result))
s.Contains(result, "Hello (1)")
s.Contains(result, "Hello (2)")
s.Contains(result, "John Robot")
s.NotContains(result, "Targe Robot")
s.Contains(result, "Targe Robot")
}
6 changes: 4 additions & 2 deletions accumulator/starter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,9 +149,11 @@ func main() {
log.Fatalln("Unable to signal workflow "+we.GetID(), err)
}
log.Println(we.GetID() + ": Sent exit")
// Signals after this test sending more signals after workflow exit
time.Sleep(5 * time.Millisecond)
}

// Test sending more signals after workflow exit


janeGreeting := new(accumulator.AccumulateGreeting)
janeGreeting.GreetingText = "Jane Robot"
Expand All @@ -174,7 +176,7 @@ func main() {
if err != nil {
log.Println("Workflow " + we.GetID() + " not found to signal - this is intentional: " + err.Error())
}
log.Println("Sent " + badBucketGreeting.GreetingText + " to " + we.GetID())
log.Println("Sent invalid bucket signal " + badBucketGreeting.GreetingText + ", " + badBucketGreeting.Bucket + " to " + we.GetID())
}

if testDuplicate {
Expand Down

0 comments on commit aeb08fb

Please sign in to comment.