Skip to content

Commit

Permalink
Async update sample
Browse files Browse the repository at this point in the history
  • Loading branch information
Quinn-With-Two-Ns committed Mar 5, 2024
1 parent f25b9cb commit d20529b
Show file tree
Hide file tree
Showing 7 changed files with 339 additions and 42 deletions.
16 changes: 16 additions & 0 deletions async-update/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
### Async Update Sample

Here we show an example of a workflow representing a parallel job processor. The workflow accepts
jobs through update requests, allowing up to five parallel jobs, and uses the update validator to reject any
jobs over the limit. The workflow also demonstrates how to properly drain updates so all updates are processed before completing a workflow.

### Steps to run this sample:
1) Run a [Temporal service](https://github.com/temporalio/samples-go/tree/main/#how-to-use).
2) Run the following command to start the worker
```
go run async-update/worker/main.go
```
3) Run the following command to start the example
```
go run async-update/starter/main.go
```
94 changes: 94 additions & 0 deletions async-update/starter/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package main

import (
"context"
"log"

"github.com/google/uuid"
async_update "github.com/temporalio/samples-go/async-update"
enumspb "go.temporal.io/api/enums/v1"
updatepb "go.temporal.io/api/update/v1"

"github.com/temporalio/samples-go/update"
"go.temporal.io/sdk/client"
)

func main() {
c, err := client.Dial(client.Options{})
if err != nil {
log.Fatalln("Unable to create client", err)
}
defer c.Close()

workflowOptions := client.StartWorkflowOptions{
ID: "async-update-workflow-ID",
TaskQueue: "async-update",
}

we, err := c.ExecuteWorkflow(context.Background(), workflowOptions, async_update.ProcessWorkflow)
if err != nil {
log.Fatalln("Unable to execute workflow", err)
}

log.Println("Started workflow", "WorkflowID", we.GetID(), "RunID", we.GetRunID())

// Send multiple updates to the workflow
var updates []client.WorkflowUpdateHandle
// ProcessWorkflow only allows 5 in progress jobs at a time, so we send 6 updates to test the validator.
for i := 0; i < 6; i++ {
updateID := uuid.New().String()
log.Println("Sending workflow update", "WorkflowID", we.GetID(), "RunID", we.GetRunID(), "UpdateID", updateID)
handle, err := c.UpdateWorkflowWithOptions(context.Background(), &client.UpdateWorkflowWithOptionsRequest{
WorkflowID: we.GetID(),
RunID: we.GetRunID(),
UpdateID: updateID,
UpdateName: async_update.ProcessUpdateName,
Args: []interface{}{"world"},
// WaitPolicy is a hint to return early if the update reaches a certain stage.
// By default the SDK will wait until the update is processed or the server sends back
// an empty response then the SDK can poll the update result later.
// Useful for short updates that can be completed with a single RPC.
WaitPolicy: &updatepb.WaitPolicy{
// LifecycleStage UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_ACCEPTED means to wait until the update is accepted
// or the Temporal server returns an empty response.
LifecycleStage: enumspb.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_ACCEPTED,
},
})
if err != nil {
log.Fatalln("Unable to send update request", err)
}
updates = append(updates, handle)
}
for _, handle := range updates {
var updateOutput string
err = handle.Get(context.Background(), &updateOutput)
if err != nil {
log.Println("Update failed with error", err)
} else {
log.Println("Update result", "WorkflowID", we.GetID(), "RunID", we.GetRunID(), "UpdateID", handle.UpdateID(), "Result", updateOutput)
}
}
// You can also create a handle for a previously sent update using the update's ID.
newHandle := c.GetWorkflowUpdateHandle(client.GetWorkflowUpdateHandleOptions{
WorkflowID: we.GetID(),
RunID: we.GetRunID(),
UpdateID: updates[0].UpdateID(),
})
var updateOutput string
err = newHandle.Get(context.Background(), &updateOutput)
if err != nil {
log.Println("Get update result failed with error", err)
} else {
log.Println("Get update result", "WorkflowID", we.GetID(), "RunID", we.GetRunID(), "UpdateID", newHandle.UpdateID(), "Result", updateOutput)
}
// Signal the workflow to stop accepting new work.
if err = c.SignalWorkflow(context.Background(), we.GetID(), we.GetRunID(), update.Done, nil); err != nil {
log.Fatalf("failed to send %q signal to workflow: %v", update.Done, err)
}
// Get the result of the workflow this will block until all the updates are processed.
var wfResult int
if err = we.Get(context.Background(), &wfResult); err != nil {
log.Fatalf("unable get workflow result: %v", err)
}
log.Println("Updates processed:", wfResult)
}
28 changes: 28 additions & 0 deletions async-update/worker/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package main

import (
"log"

async_update "github.com/temporalio/samples-go/async-update"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/worker"
)

func main() {
// The client and worker are heavyweight objects that should be created once per process.
c, err := client.Dial(client.Options{})
if err != nil {
log.Fatalln("Unable to create client", err)
}
defer c.Close()

w := worker.New(c, "async-update", worker.Options{})

w.RegisterWorkflow(async_update.ProcessWorkflow)
w.RegisterActivity(async_update.Activity)

err = w.Run(worker.InterruptCh())
if err != nil {
log.Fatalln("Unable to start worker", err)
}
}
75 changes: 75 additions & 0 deletions async-update/workflow.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package async_update

import (
"context"
"fmt"
"time"

"go.temporal.io/sdk/workflow"
)

const (
ProcessUpdateName = "process"
Done = "done"
)

func ProcessWorkflow(ctx workflow.Context) (int, error) {
logger := workflow.GetLogger(ctx)
// inProgressJobs is used to keep track of the number of jobs currently being processed.
inProgressJobs := 0
// processedJobs is used to keep track of the number of jobs processed so far.
processedJobs := 0
// closing is used to keep track of whether the workflow is closing.
closing := false

if err := workflow.SetUpdateHandlerWithOptions(
ctx,
ProcessUpdateName,
func(ctx workflow.Context, s string) (string, error) {
inProgressJobs++
processedJobs++
defer func() {
inProgressJobs--
}()
logger.Debug("Processing job", "job", s)
ao := workflow.ActivityOptions{
StartToCloseTimeout: 10 * time.Second,
}
ctx = workflow.WithActivityOptions(ctx, ao)
var result string
err := workflow.ExecuteActivity(ctx, Activity, s).Get(ctx, &result)
logger.Debug("Processed job", "job", s)
return result, err
},
workflow.UpdateHandlerOptions{
Validator: func(s string) error {
logger.Debug("Validating job", "job", s, "inProgressJobs", inProgressJobs)
if inProgressJobs >= 5 {
return fmt.Errorf("too many in progress jobs: %d", inProgressJobs)
} else if closing {
return fmt.Errorf("workflow is closing")
}
return nil
},
},
); err != nil {
return 0, err
}

_ = workflow.GetSignalChannel(ctx, Done).Receive(ctx, nil)
logger.Debug("Closing workflow, draining in progress jobs", "inProgressJobs", inProgressJobs)
// set closing to true to indicate that the workflow is closing.
// no more new jobs are allowed, but the existing jobs will be processed.
closing = true
workflow.Await(ctx, func() bool {
return inProgressJobs == 0
})
logger.Debug("All jobs processed, workflow can now close")
return processedJobs, ctx.Err()
}

func Activity(ctx context.Context, name string) (string, error) {
// Simulate a long running activity
time.Sleep(5 * time.Second)
return "Hello " + name + "!", nil
}
87 changes: 87 additions & 0 deletions async-update/workflow_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package async_update_test

import (
"testing"
"time"

"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
async_update "github.com/temporalio/samples-go/async-update"
"go.temporal.io/sdk/testsuite"
)

type updateCallback struct {
accept func()
reject func(error)
complete func(interface{}, error)
}

func (uc *updateCallback) Accept() {
uc.accept()
}

func (uc *updateCallback) Reject(err error) {
uc.reject(err)
}

func (uc *updateCallback) Complete(success interface{}, err error) {
uc.complete(success, err)
}

func TestWorkflow(t *testing.T) {
// Create env
var suite testsuite.WorkflowTestSuite
env := suite.NewTestWorkflowEnvironment()
env.RegisterWorkflow(async_update.ProcessWorkflow)
env.OnActivity(async_update.Activity, mock.Anything, "world").After(5*time.Second).Return("hello world", nil)

// Use delayed callbacks to send multiple updates at the same time
for i := 0; i < 10; i++ {
i := i
env.RegisterDelayedCallback(func() {
env.UpdateWorkflow(async_update.ProcessUpdateName, "test id", &updateCallback{
accept: func() {
if i >= 5 {
require.Fail(t, "update should fail since we should exceed our max update limit")
}
},
reject: func(err error) {
if i < 5 {
require.Fail(t, "this update should not fail")
}
require.Error(t, err)
},
complete: func(response interface{}, err error) {
require.NoError(t, err)
require.Equal(t, "hello world", response)
},
}, "world")
}, 0)
}
// Use delayed callback to send signal
env.RegisterDelayedCallback(func() {
env.SignalWorkflow(async_update.Done, nil)
}, time.Second)

// Send an update after the workflow is signaled to close, expect the update to be rejected
env.RegisterDelayedCallback(func() {
env.UpdateWorkflow(async_update.ProcessUpdateName, "test id", &updateCallback{
accept: func() {
require.Fail(t, "update should fail since the workflow is closing")
},
reject: func(err error) {
require.Error(t, err)
},
complete: func(response interface{}, err error) {
},
}, "world")
}, 2*time.Second)

// Run workflow
env.ExecuteWorkflow(async_update.ProcessWorkflow)
require.True(t, env.IsWorkflowCompleted())
require.NoError(t, env.GetWorkflowError())
var result int
require.NoError(t, env.GetWorkflowResult(&result))
require.Equal(t, 5, result)
}
28 changes: 13 additions & 15 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ replace github.com/cactus/go-statsd-client => github.com/cactus/go-statsd-client
require (
github.com/golang/mock v1.7.0-rc.1
github.com/golang/snappy v0.0.4
github.com/google/uuid v1.4.0
github.com/google/uuid v1.6.0
github.com/hashicorp/go-plugin v1.4.5
github.com/opentracing/opentracing-go v1.2.0
github.com/pborman/uuid v1.2.1
Expand All @@ -20,14 +20,15 @@ require (
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.16.0
go.opentelemetry.io/otel/sdk v1.22.0
go.opentelemetry.io/otel/trace v1.22.0
go.temporal.io/api v1.26.0
go.temporal.io/sdk v1.25.1
go.temporal.io/api v1.29.0
go.temporal.io/sdk v1.26.0-rc.4
go.temporal.io/sdk/contrib/opentelemetry v0.3.0
go.temporal.io/sdk/contrib/opentracing v0.1.0
go.temporal.io/sdk/contrib/tally v0.2.0
go.temporal.io/sdk/contrib/tools/workflowcheck v0.0.0-20230612164027-11c2cb9e7d2d
go.uber.org/multierr v1.11.0
go.uber.org/zap v1.24.0
google.golang.org/protobuf v1.32.0
gopkg.in/square/go-jose.v2 v2.6.0
gopkg.in/yaml.v3 v3.0.1
)
Expand All @@ -42,12 +43,10 @@ require (
github.com/fatih/color v1.15.0 // indirect
github.com/go-logr/logr v1.4.1 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/gogo/googleapis v1.4.1 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/gogo/status v1.1.1 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/grpc-ecosystem/go-grpc-middleware v1.4.0 // indirect
github.com/grpc-ecosystem/grpc-gateway v1.16.0 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.1 // indirect
github.com/hashicorp/go-hclog v1.3.1 // indirect
github.com/hashicorp/yamux v0.1.1 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
Expand All @@ -66,16 +65,15 @@ require (
github.com/uber/jaeger-lib v2.4.1+incompatible // indirect
go.opentelemetry.io/otel/metric v1.22.0 // indirect
go.uber.org/atomic v1.11.0 // indirect
golang.org/x/crypto v0.16.0 // indirect
golang.org/x/net v0.19.0 // indirect
golang.org/x/sys v0.16.0 // indirect
golang.org/x/crypto v0.19.0 // indirect
golang.org/x/exp v0.0.0-20231127185646-65229373498e // indirect
golang.org/x/net v0.21.0 // indirect
golang.org/x/sys v0.17.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/time v0.3.0 // indirect
golang.org/x/tools v0.16.0 // indirect
google.golang.org/genproto v0.0.0-20240102182953-50ed04b92917 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240102182953-50ed04b92917 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240102182953-50ed04b92917 // indirect
google.golang.org/grpc v1.60.1 // indirect
google.golang.org/protobuf v1.32.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
google.golang.org/genproto v0.0.0-20240221002015-b0ce06bbee7c // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240221002015-b0ce06bbee7c // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240221002015-b0ce06bbee7c // indirect
google.golang.org/grpc v1.62.0 // indirect
)
Loading

0 comments on commit d20529b

Please sign in to comment.