From 7c726e930194f9ea03a6cf80acb35c4627315656 Mon Sep 17 00:00:00 2001 From: Stephan Behnke Date: Wed, 2 Oct 2024 10:54:48 -0700 Subject: [PATCH 01/12] Early-return sample --- early-return/README.md | 17 ++++++++ early-return/activity.go | 33 ++++++++++++++++ early-return/starter/main.go | 56 ++++++++++++++++++++++++++ early-return/worker/main.go | 27 +++++++++++++ early-return/workflow.go | 77 ++++++++++++++++++++++++++++++++++++ 5 files changed, 210 insertions(+) create mode 100644 early-return/README.md create mode 100644 early-return/activity.go create mode 100644 early-return/starter/main.go create mode 100644 early-return/worker/main.go create mode 100644 early-return/workflow.go diff --git a/early-return/README.md b/early-return/README.md new file mode 100644 index 00000000..ace156e9 --- /dev/null +++ b/early-return/README.md @@ -0,0 +1,17 @@ +### Early-Return Sample + +This sample demonstrates an early-return from a workflow. + +By utilizing Update-with-Start, a client can start a new workflow and synchronously receive +a response mid-workflow, while the workflow continues to run to completion. + +### Steps to run this sample: +1) Run a [Temporal service](https://github.com/temporalio/samples-go/tree/main/#how-to-use). +2) Run the following command to start the worker +``` +go run early-return/worker/main.go +``` +3) Run the following command to start the example +``` +go run early-return/starter/main.go +``` diff --git a/early-return/activity.go b/early-return/activity.go new file mode 100644 index 00000000..0a0025f5 --- /dev/null +++ b/early-return/activity.go @@ -0,0 +1,33 @@ +package earlyreturn + +import ( + "context" + "errors" + + "go.temporal.io/sdk/activity" +) + +func InitTransaction(ctx context.Context, transactionId, fromAccount, toAccount string, amount float64) error { + logger := activity.GetLogger(ctx) + if fromAccount == "" { + return errors.New("invalid fromAccount") + } + if toAccount == "" { + return errors.New("invalid toAccount") + } + if amount == 0 { + return errors.New("invalid amount") + } + logger.Info("Transaction initialized") + return nil +} + +func CancelTransaction(ctx context.Context, transactionId string) { + logger := activity.GetLogger(ctx) + logger.Info("Transaction cancelled") +} + +func CompleteTransaction(ctx context.Context, transactionId string) { + logger := activity.GetLogger(ctx) + logger.Info("Transaction completed") +} diff --git a/early-return/starter/main.go b/early-return/starter/main.go new file mode 100644 index 00000000..2358e52a --- /dev/null +++ b/early-return/starter/main.go @@ -0,0 +1,56 @@ +package main + +import ( + "context" + "log" + "time" + + "github.com/pborman/uuid" + "github.com/temporalio/samples-go/early-return" + "go.temporal.io/sdk/client" +) + +func main() { + c, err := client.Dial(client.Options{}) + if err != nil { + log.Fatalln("Unable to create client", err) + } + defer c.Close() + + ctxWithTimeout, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + updateOperation := client.NewUpdateWithStartWorkflowOperation( + client.UpdateWorkflowOptions{ + UpdateName: earlyreturn.UpdateName, + WaitForStage: client.WorkflowUpdateStageCompleted, + }) + + txId := uuid.New() + workflowOptions := client.StartWorkflowOptions{ + ID: "early-return-workflow-ID-" + txId, + TaskQueue: earlyreturn.TaskQueueName, + WithStartOperation: updateOperation, + } + we, err := c.ExecuteWorkflow(ctxWithTimeout, workflowOptions, earlyreturn.Workflow, txId, "bob", "alice", 100.0) + if err != nil { + log.Fatalln("Error executing workflow:", err) + } + + log.Println("Started workflow", "WorkflowID", we.GetID(), "RunID", we.GetRunID()) + + updateHandle, err := updateOperation.Get(ctxWithTimeout) + if err != nil { + log.Fatalln("Error obtaining update handle:", err) + } + + err = updateHandle.Get(ctxWithTimeout, nil) + if err != nil { + // NOTE: If the error is retryable, a retry attempt must use a unique workflow ID. + log.Fatalln("Error obtaining update result:", err) + } + + log.Println("Transaction completed successfully") + + // The workflow will continue running, either completing or cancelling the transaction. +} diff --git a/early-return/worker/main.go b/early-return/worker/main.go new file mode 100644 index 00000000..1c8520ef --- /dev/null +++ b/early-return/worker/main.go @@ -0,0 +1,27 @@ +package main + +import ( + "log" + + "github.com/temporalio/samples-go/early-return" + "go.temporal.io/sdk/client" + "go.temporal.io/sdk/worker" +) + +func main() { + // The client and worker are heavyweight objects that should be created once per process. + c, err := client.Dial(client.Options{}) + if err != nil { + log.Fatalln("Unable to create client", err) + } + defer c.Close() + + w := worker.New(c, earlyreturn.TaskQueueName, worker.Options{}) + + w.RegisterWorkflow(earlyreturn.Workflow) + + err = w.Run(worker.InterruptCh()) + if err != nil { + log.Fatalln("Unable to start worker", err) + } +} diff --git a/early-return/workflow.go b/early-return/workflow.go new file mode 100644 index 00000000..f81a681b --- /dev/null +++ b/early-return/workflow.go @@ -0,0 +1,77 @@ +package earlyreturn + +import ( + "errors" + "fmt" + "time" + + "go.temporal.io/sdk/workflow" +) + +const ( + UpdateName = "early-return" + TaskQueueName = "early-return-tq" +) + +var ( + activityTimeout = 2 * time.Second + updateTimeout = 5 * time.Second +) + +// Workflow processes a transaction in two phases. First, the transaction is initialized, and if successful, +// it proceeds to completion. However, if initialization fails - due to validation errors or transient +// issues (e.g., network connectivity problems) - the transaction is cancelled. +// +// By utilizing Update-with-Start, the client can initiate the workflow and immediately receive the result of +// the initialization in a single round trip, even before the transaction processing completes. The remainder +// of the transaction is then processed asynchronously. +func Workflow(ctx workflow.Context, transactionId, fromAccount, toAccount string, amount float64) error { + var initErr error + var initDone bool + logger := workflow.GetLogger(ctx) + + if err := workflow.SetUpdateHandler( + ctx, + UpdateName, + func(ctx workflow.Context) error { + condition := func() bool { return initDone } + if completed, err := workflow.AwaitWithTimeout(ctx, updateTimeout, condition); err != nil { + return fmt.Errorf("update cancelled: %w", err) + } else if !completed { + return errors.New("update timed out") + } + return initErr + }, + ); err != nil { + return err + } + + // Phase 1: Initialize the transaction synchronously. + // + // By using a local activity, an additional server roundtrip is avoided. + // See https://docs.temporal.io/activities#local-activity for more details. + + activityOptions := workflow.WithLocalActivityOptions(ctx, workflow.LocalActivityOptions{ + ScheduleToCloseTimeout: activityTimeout, + }) + initErr = workflow.ExecuteLocalActivity( + activityOptions, InitTransaction, transactionId, fromAccount, toAccount, amount, + ).Get(ctx, nil) + initDone = true + + // Phase 2: Complete or cancel the transaction asychronously. + activityCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ + StartToCloseTimeout: 10 * time.Second, + }) + if initErr != nil { + logger.Info("cancelling transaction due to error: %v", initErr) + + // Transaction failed to be initialized or not quickly enough; cancel the transaction. + return workflow.ExecuteActivity(activityCtx, CancelTransaction, transactionId).Get(ctx, nil) + } + + logger.Info("completing transaction") + + // Transaction was initialized successfully; complete the transaction. + return workflow.ExecuteActivity(activityCtx, CompleteTransaction, transactionId).Get(ctx, nil) +} From 509449f63eeff263e400a5bb79101f6719708705 Mon Sep 17 00:00:00 2001 From: Stephan Behnke Date: Wed, 2 Oct 2024 13:44:00 -0700 Subject: [PATCH 02/12] rename timeout --- early-return/workflow.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/early-return/workflow.go b/early-return/workflow.go index f81a681b..e62358a1 100644 --- a/early-return/workflow.go +++ b/early-return/workflow.go @@ -14,8 +14,8 @@ const ( ) var ( - activityTimeout = 2 * time.Second - updateTimeout = 5 * time.Second + activityTimeout = 2 * time.Second + earlyReturnTimeout = 5 * time.Second ) // Workflow processes a transaction in two phases. First, the transaction is initialized, and if successful, @@ -35,7 +35,7 @@ func Workflow(ctx workflow.Context, transactionId, fromAccount, toAccount string UpdateName, func(ctx workflow.Context) error { condition := func() bool { return initDone } - if completed, err := workflow.AwaitWithTimeout(ctx, updateTimeout, condition); err != nil { + if completed, err := workflow.AwaitWithTimeout(ctx, earlyReturnTimeout, condition); err != nil { return fmt.Errorf("update cancelled: %w", err) } else if !completed { return errors.New("update timed out") From 3565530178c1ff239faf3357283d79443e168e6a Mon Sep 17 00:00:00 2001 From: Stephan Behnke Date: Wed, 2 Oct 2024 20:13:51 -0700 Subject: [PATCH 03/12] use struct --- early-return/activity.go | 12 ++++++------ early-return/starter/main.go | 6 +++--- early-return/workflow.go | 17 +++++++++++------ 3 files changed, 20 insertions(+), 15 deletions(-) diff --git a/early-return/activity.go b/early-return/activity.go index 0a0025f5..d86b500d 100644 --- a/early-return/activity.go +++ b/early-return/activity.go @@ -7,27 +7,27 @@ import ( "go.temporal.io/sdk/activity" ) -func InitTransaction(ctx context.Context, transactionId, fromAccount, toAccount string, amount float64) error { +func InitTransaction(ctx context.Context, tx Transaction) error { logger := activity.GetLogger(ctx) - if fromAccount == "" { + if tx.FromAccount == "" { return errors.New("invalid fromAccount") } - if toAccount == "" { + if tx.ToAccount == "" { return errors.New("invalid toAccount") } - if amount == 0 { + if tx.Amount == 0 { return errors.New("invalid amount") } logger.Info("Transaction initialized") return nil } -func CancelTransaction(ctx context.Context, transactionId string) { +func CancelTransaction(ctx context.Context, tx Transaction) { logger := activity.GetLogger(ctx) logger.Info("Transaction cancelled") } -func CompleteTransaction(ctx context.Context, transactionId string) { +func CompleteTransaction(ctx context.Context, tx Transaction) { logger := activity.GetLogger(ctx) logger.Info("Transaction completed") } diff --git a/early-return/starter/main.go b/early-return/starter/main.go index 2358e52a..1e756708 100644 --- a/early-return/starter/main.go +++ b/early-return/starter/main.go @@ -26,13 +26,13 @@ func main() { WaitForStage: client.WorkflowUpdateStageCompleted, }) - txId := uuid.New() + tx := earlyreturn.Transaction{ID: uuid.New(), FromAccount: "Bob", ToAccount: "Alice", Amount: 100.0} workflowOptions := client.StartWorkflowOptions{ - ID: "early-return-workflow-ID-" + txId, + ID: "early-return-workflow-ID-" + tx.ID, TaskQueue: earlyreturn.TaskQueueName, WithStartOperation: updateOperation, } - we, err := c.ExecuteWorkflow(ctxWithTimeout, workflowOptions, earlyreturn.Workflow, txId, "bob", "alice", 100.0) + we, err := c.ExecuteWorkflow(ctxWithTimeout, workflowOptions, earlyreturn.Workflow, tx) if err != nil { log.Fatalln("Error executing workflow:", err) } diff --git a/early-return/workflow.go b/early-return/workflow.go index e62358a1..89dcfdae 100644 --- a/early-return/workflow.go +++ b/early-return/workflow.go @@ -18,6 +18,13 @@ var ( earlyReturnTimeout = 5 * time.Second ) +type Transaction struct { + ID string + FromAccount string + ToAccount string + Amount float64 +} + // Workflow processes a transaction in two phases. First, the transaction is initialized, and if successful, // it proceeds to completion. However, if initialization fails - due to validation errors or transient // issues (e.g., network connectivity problems) - the transaction is cancelled. @@ -25,7 +32,7 @@ var ( // By utilizing Update-with-Start, the client can initiate the workflow and immediately receive the result of // the initialization in a single round trip, even before the transaction processing completes. The remainder // of the transaction is then processed asynchronously. -func Workflow(ctx workflow.Context, transactionId, fromAccount, toAccount string, amount float64) error { +func Workflow(ctx workflow.Context, tx Transaction) error { var initErr error var initDone bool logger := workflow.GetLogger(ctx) @@ -54,9 +61,7 @@ func Workflow(ctx workflow.Context, transactionId, fromAccount, toAccount string activityOptions := workflow.WithLocalActivityOptions(ctx, workflow.LocalActivityOptions{ ScheduleToCloseTimeout: activityTimeout, }) - initErr = workflow.ExecuteLocalActivity( - activityOptions, InitTransaction, transactionId, fromAccount, toAccount, amount, - ).Get(ctx, nil) + initErr = workflow.ExecuteLocalActivity(activityOptions, InitTransaction, tx).Get(ctx, nil) initDone = true // Phase 2: Complete or cancel the transaction asychronously. @@ -67,11 +72,11 @@ func Workflow(ctx workflow.Context, transactionId, fromAccount, toAccount string logger.Info("cancelling transaction due to error: %v", initErr) // Transaction failed to be initialized or not quickly enough; cancel the transaction. - return workflow.ExecuteActivity(activityCtx, CancelTransaction, transactionId).Get(ctx, nil) + return workflow.ExecuteActivity(activityCtx, CancelTransaction, tx).Get(ctx, nil) } logger.Info("completing transaction") // Transaction was initialized successfully; complete the transaction. - return workflow.ExecuteActivity(activityCtx, CompleteTransaction, transactionId).Get(ctx, nil) + return workflow.ExecuteActivity(activityCtx, CompleteTransaction, tx).Get(ctx, nil) } From 9b4a47a4ee444943d006c78eda44cceb33cb7ef5 Mon Sep 17 00:00:00 2001 From: Stephan Behnke Date: Wed, 2 Oct 2024 20:15:12 -0700 Subject: [PATCH 04/12] tweak log --- early-return/starter/main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/early-return/starter/main.go b/early-return/starter/main.go index 1e756708..c995f242 100644 --- a/early-return/starter/main.go +++ b/early-return/starter/main.go @@ -50,7 +50,7 @@ func main() { log.Fatalln("Error obtaining update result:", err) } - log.Println("Transaction completed successfully") + log.Println("Transaction initialized successfully") // The workflow will continue running, either completing or cancelling the transaction. } From 733a0b6e721bc7ae98a7539fba0c30f00421d98f Mon Sep 17 00:00:00 2001 From: Stephan Behnke Date: Wed, 2 Oct 2024 20:16:34 -0700 Subject: [PATCH 05/12] sleep in activities --- early-return/activity.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/early-return/activity.go b/early-return/activity.go index d86b500d..915d9a00 100644 --- a/early-return/activity.go +++ b/early-return/activity.go @@ -3,6 +3,7 @@ package earlyreturn import ( "context" "errors" + "time" "go.temporal.io/sdk/activity" ) @@ -24,10 +25,12 @@ func InitTransaction(ctx context.Context, tx Transaction) error { func CancelTransaction(ctx context.Context, tx Transaction) { logger := activity.GetLogger(ctx) + time.Sleep(1 * time.Second) logger.Info("Transaction cancelled") } func CompleteTransaction(ctx context.Context, tx Transaction) { logger := activity.GetLogger(ctx) + time.Sleep(1 * time.Second) logger.Info("Transaction completed") } From ecd78c89c2e449ad6803fff291e978ab2bb7487e Mon Sep 17 00:00:00 2001 From: Stephan Behnke Date: Thu, 3 Oct 2024 08:29:20 -0700 Subject: [PATCH 06/12] make const --- early-return/workflow.go | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/early-return/workflow.go b/early-return/workflow.go index 89dcfdae..298fc8f0 100644 --- a/early-return/workflow.go +++ b/early-return/workflow.go @@ -9,11 +9,8 @@ import ( ) const ( - UpdateName = "early-return" - TaskQueueName = "early-return-tq" -) - -var ( + UpdateName = "early-return" + TaskQueueName = "early-return-tq" activityTimeout = 2 * time.Second earlyReturnTimeout = 5 * time.Second ) From 2097b865093d1d77681b939a81956f6708a8093a Mon Sep 17 00:00:00 2001 From: Stephan Behnke Date: Thu, 3 Oct 2024 18:08:52 -0700 Subject: [PATCH 07/12] address comments --- early-return/activity.go | 36 ------------ early-return/starter/main.go | 7 ++- early-return/workflow.go | 87 +++++++++++++++++++--------- early-return/workflow_test.go | 103 ++++++++++++++++++++++++++++++++++ 4 files changed, 167 insertions(+), 66 deletions(-) delete mode 100644 early-return/activity.go create mode 100644 early-return/workflow_test.go diff --git a/early-return/activity.go b/early-return/activity.go deleted file mode 100644 index 915d9a00..00000000 --- a/early-return/activity.go +++ /dev/null @@ -1,36 +0,0 @@ -package earlyreturn - -import ( - "context" - "errors" - "time" - - "go.temporal.io/sdk/activity" -) - -func InitTransaction(ctx context.Context, tx Transaction) error { - logger := activity.GetLogger(ctx) - if tx.FromAccount == "" { - return errors.New("invalid fromAccount") - } - if tx.ToAccount == "" { - return errors.New("invalid toAccount") - } - if tx.Amount == 0 { - return errors.New("invalid amount") - } - logger.Info("Transaction initialized") - return nil -} - -func CancelTransaction(ctx context.Context, tx Transaction) { - logger := activity.GetLogger(ctx) - time.Sleep(1 * time.Second) - logger.Info("Transaction cancelled") -} - -func CompleteTransaction(ctx context.Context, tx Transaction) { - logger := activity.GetLogger(ctx) - time.Sleep(1 * time.Second) - logger.Info("Transaction completed") -} diff --git a/early-return/starter/main.go b/early-return/starter/main.go index c995f242..ad54eb5d 100644 --- a/early-return/starter/main.go +++ b/early-return/starter/main.go @@ -26,7 +26,7 @@ func main() { WaitForStage: client.WorkflowUpdateStageCompleted, }) - tx := earlyreturn.Transaction{ID: uuid.New(), FromAccount: "Bob", ToAccount: "Alice", Amount: 100.0} + tx := earlyreturn.Transaction{ID: uuid.New(), SourceAccount: "Bob", TargetAccount: "Alice", Amount: 100} workflowOptions := client.StartWorkflowOptions{ ID: "early-return-workflow-ID-" + tx.ID, TaskQueue: earlyreturn.TaskQueueName, @@ -46,11 +46,12 @@ func main() { err = updateHandle.Get(ctxWithTimeout, nil) if err != nil { + // The workflow will continue running, cancelling the transaction. + // NOTE: If the error is retryable, a retry attempt must use a unique workflow ID. log.Fatalln("Error obtaining update result:", err) } log.Println("Transaction initialized successfully") - - // The workflow will continue running, either completing or cancelling the transaction. + // The workflow will continue running, completing the transaction. } diff --git a/early-return/workflow.go b/early-return/workflow.go index 298fc8f0..fee63833 100644 --- a/early-return/workflow.go +++ b/early-return/workflow.go @@ -1,25 +1,28 @@ package earlyreturn import ( + "context" "errors" "fmt" "time" + "go.temporal.io/sdk/activity" "go.temporal.io/sdk/workflow" ) const ( - UpdateName = "early-return" - TaskQueueName = "early-return-tq" - activityTimeout = 2 * time.Second - earlyReturnTimeout = 5 * time.Second + UpdateName = "early-return" + TaskQueueName = "early-return-tq" ) type Transaction struct { - ID string - FromAccount string - ToAccount string - Amount float64 + ID string + SourceAccount string + TargetAccount string + Amount int // in cents + + initErr error + initDone bool } // Workflow processes a transaction in two phases. First, the transaction is initialized, and if successful, @@ -30,22 +33,12 @@ type Transaction struct { // the initialization in a single round trip, even before the transaction processing completes. The remainder // of the transaction is then processed asynchronously. func Workflow(ctx workflow.Context, tx Transaction) error { - var initErr error - var initDone bool logger := workflow.GetLogger(ctx) if err := workflow.SetUpdateHandler( ctx, UpdateName, - func(ctx workflow.Context) error { - condition := func() bool { return initDone } - if completed, err := workflow.AwaitWithTimeout(ctx, earlyReturnTimeout, condition); err != nil { - return fmt.Errorf("update cancelled: %w", err) - } else if !completed { - return errors.New("update timed out") - } - return initErr - }, + tx.ReturnInitResult, ); err != nil { return err } @@ -56,24 +49,64 @@ func Workflow(ctx workflow.Context, tx Transaction) error { // See https://docs.temporal.io/activities#local-activity for more details. activityOptions := workflow.WithLocalActivityOptions(ctx, workflow.LocalActivityOptions{ - ScheduleToCloseTimeout: activityTimeout, + ScheduleToCloseTimeout: 10 * time.Second, }) - initErr = workflow.ExecuteLocalActivity(activityOptions, InitTransaction, tx).Get(ctx, nil) - initDone = true + tx.initErr = workflow.ExecuteLocalActivity(activityOptions, tx.InitTransaction).Get(ctx, nil) + tx.initDone = true // Phase 2: Complete or cancel the transaction asychronously. + activityCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ - StartToCloseTimeout: 10 * time.Second, + StartToCloseTimeout: 30 * time.Second, }) - if initErr != nil { - logger.Info("cancelling transaction due to error: %v", initErr) + if tx.initErr != nil { + logger.Error(fmt.Sprintf("cancelling transaction due to init error: %v", tx.initErr)) // Transaction failed to be initialized or not quickly enough; cancel the transaction. - return workflow.ExecuteActivity(activityCtx, CancelTransaction, tx).Get(ctx, nil) + if err := workflow.ExecuteActivity(activityCtx, tx.CancelTransaction).Get(ctx, nil); err != nil { + return fmt.Errorf("cancelling the transaction failed: %w", err) + } + + return tx.initErr } logger.Info("completing transaction") // Transaction was initialized successfully; complete the transaction. - return workflow.ExecuteActivity(activityCtx, CompleteTransaction, tx).Get(ctx, nil) + if err := workflow.ExecuteActivity(activityCtx, tx.CompleteTransaction).Get(ctx, nil); err != nil { + return fmt.Errorf("completing the transaction failed: %w", err) + } + + return nil +} + +func (tx *Transaction) ReturnInitResult(ctx workflow.Context) error { + if err := workflow.Await(ctx, func() bool { return tx.initDone }); err != nil { + return fmt.Errorf("transaction init cancelled: %w", err) + } + return tx.initErr +} + +func (tx *Transaction) InitTransaction(ctx context.Context) error { + logger := activity.GetLogger(ctx) + if tx.Amount <= 0 { + return errors.New("invalid Amount") + } + time.Sleep(500 * time.Millisecond) + logger.Info("Transaction initialized") + return nil +} + +func (tx *Transaction) CancelTransaction(ctx context.Context) error { + logger := activity.GetLogger(ctx) + time.Sleep(1 * time.Second) + logger.Info("Transaction cancelled") + return nil +} + +func (tx *Transaction) CompleteTransaction(ctx context.Context) error { + logger := activity.GetLogger(ctx) + time.Sleep(1 * time.Second) + logger.Info("Transaction completed") + return nil } diff --git a/early-return/workflow_test.go b/early-return/workflow_test.go new file mode 100644 index 00000000..75ddf3d0 --- /dev/null +++ b/early-return/workflow_test.go @@ -0,0 +1,103 @@ +package earlyreturn + +import ( + "fmt" + "testing" + + "github.com/pborman/uuid" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + "go.temporal.io/sdk/testsuite" +) + +func Test_CompleteTransaction(t *testing.T) { + testSuite := &testsuite.WorkflowTestSuite{} + env := testSuite.NewTestWorkflowEnvironment() + + tx := Transaction{ID: uuid.New(), SourceAccount: "Bob", TargetAccount: "Alice", Amount: 100} + env.RegisterActivity(tx.InitTransaction) + env.RegisterActivity(tx.CompleteTransaction) + + uc := &updateCallback{} + env.RegisterDelayedCallback(func() { + env.UpdateWorkflow(UpdateName, uuid.New(), uc) + }, 0) + env.ExecuteWorkflow(Workflow, tx) + + require.True(t, env.IsWorkflowCompleted()) + require.NoError(t, env.GetWorkflowError()) + require.NoError(t, uc.completeErr) +} + +func Test_CompleteTransaction_Fails(t *testing.T) { + testSuite := &testsuite.WorkflowTestSuite{} + env := testSuite.NewTestWorkflowEnvironment() + + tx := Transaction{ID: uuid.New(), SourceAccount: "Bob", TargetAccount: "Alice", Amount: 100} + env.RegisterActivity(tx.InitTransaction) + env.RegisterActivity(tx.CompleteTransaction) + + env.OnActivity(tx.CompleteTransaction, mock.Anything).Return(fmt.Errorf("crash")) + + uc := &updateCallback{} + env.RegisterDelayedCallback(func() { + env.UpdateWorkflow(UpdateName, uuid.New(), uc) + }, 0) + env.ExecuteWorkflow(Workflow, tx) + + require.True(t, env.IsWorkflowCompleted()) + require.ErrorContains(t, env.GetWorkflowError(), "crash") +} + +func Test_CancelTransaction(t *testing.T) { + testSuite := &testsuite.WorkflowTestSuite{} + env := testSuite.NewTestWorkflowEnvironment() + + tx := Transaction{ID: uuid.New(), SourceAccount: "Bob", TargetAccount: "Alice", Amount: -1} // invalid! + env.RegisterActivity(tx.InitTransaction) + env.RegisterActivity(tx.CancelTransaction) + + uc := &updateCallback{} + env.RegisterDelayedCallback(func() { + env.UpdateWorkflow(UpdateName, uuid.New(), uc) + }, 0) + env.ExecuteWorkflow(Workflow, tx) + + require.True(t, env.IsWorkflowCompleted()) + require.ErrorContains(t, uc.completeErr, "invalid Amount") + require.ErrorContains(t, env.GetWorkflowError(), "invalid Amount") +} + +func Test_CancelTransaction_Fails(t *testing.T) { + testSuite := &testsuite.WorkflowTestSuite{} + env := testSuite.NewTestWorkflowEnvironment() + + tx := Transaction{ID: uuid.New(), SourceAccount: "Bob", TargetAccount: "Alice", Amount: -1} // invalid! + env.RegisterActivity(tx.InitTransaction) + env.RegisterActivity(tx.CancelTransaction) + + env.OnActivity(tx.CancelTransaction, mock.Anything).Return(fmt.Errorf("crash")) + + uc := &updateCallback{} + env.RegisterDelayedCallback(func() { + env.UpdateWorkflow(UpdateName, uuid.New(), uc) + }, 0) + env.ExecuteWorkflow(Workflow, tx) + + require.True(t, env.IsWorkflowCompleted()) + require.ErrorContains(t, uc.completeErr, "invalid Amount") + require.ErrorContains(t, env.GetWorkflowError(), "crash") +} + +type updateCallback struct { + complete func(any, error) + completeErr error +} + +func (uc *updateCallback) Accept() {} + +func (uc *updateCallback) Reject(err error) {} + +func (uc *updateCallback) Complete(success interface{}, err error) { + uc.completeErr = err +} From a0d812c2e2275042813f6d00bebd9cae3f2d2dc3 Mon Sep 17 00:00:00 2001 From: Stephan Behnke Date: Thu, 3 Oct 2024 18:27:25 -0700 Subject: [PATCH 08/12] remove unused field --- early-return/workflow_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/early-return/workflow_test.go b/early-return/workflow_test.go index 75ddf3d0..ebc23cd9 100644 --- a/early-return/workflow_test.go +++ b/early-return/workflow_test.go @@ -90,7 +90,6 @@ func Test_CancelTransaction_Fails(t *testing.T) { } type updateCallback struct { - complete func(any, error) completeErr error } From d400b38455697f55e9f89368a8d39a3d83202943 Mon Sep 17 00:00:00 2001 From: Stephan Behnke Date: Thu, 3 Oct 2024 18:45:49 -0700 Subject: [PATCH 09/12] tweak ScheduleToCloseTimeout --- early-return/workflow.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/early-return/workflow.go b/early-return/workflow.go index fee63833..6b098005 100644 --- a/early-return/workflow.go +++ b/early-return/workflow.go @@ -49,7 +49,7 @@ func Workflow(ctx workflow.Context, tx Transaction) error { // See https://docs.temporal.io/activities#local-activity for more details. activityOptions := workflow.WithLocalActivityOptions(ctx, workflow.LocalActivityOptions{ - ScheduleToCloseTimeout: 10 * time.Second, + ScheduleToCloseTimeout: 5 * time.Second, // short timeout to avoid another Workflow Task being scheduled }) tx.initErr = workflow.ExecuteLocalActivity(activityOptions, tx.InitTransaction).Get(ctx, nil) tx.initDone = true From c509f54b990098d6be482aa8c4d22228b714765c Mon Sep 17 00:00:00 2001 From: Stephan Behnke Date: Fri, 4 Oct 2024 09:46:55 -0700 Subject: [PATCH 10/12] add README entry --- README.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/README.md b/README.md index 07dbb62b..81b1399e 100644 --- a/README.md +++ b/README.md @@ -230,6 +230,9 @@ resource waiting its successful completion - [**Request/Response with Response Updates**](./reqrespupdate): Demonstrates how to accept requests and responsond via updates. +- [**Early-Return**](./early-return): + Demonstrates how to receive a response mid-workflow, while the workflow continues to run to completion. + ### Pending examples Mostly examples we haven't yet ported from https://github.com/temporalio/samples-java/ From 3c68ea4d86bf80a3c2b20a932bc534b4a61b506a Mon Sep 17 00:00:00 2001 From: Stephan Behnke Date: Fri, 4 Oct 2024 09:54:52 -0700 Subject: [PATCH 11/12] add RegisterDelayedCallback note --- early-return/workflow_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/early-return/workflow_test.go b/early-return/workflow_test.go index ebc23cd9..e47ee598 100644 --- a/early-return/workflow_test.go +++ b/early-return/workflow_test.go @@ -21,7 +21,7 @@ func Test_CompleteTransaction(t *testing.T) { uc := &updateCallback{} env.RegisterDelayedCallback(func() { env.UpdateWorkflow(UpdateName, uuid.New(), uc) - }, 0) + }, 0) // NOTE: zero delay ensures Update is delivered in first workflow task env.ExecuteWorkflow(Workflow, tx) require.True(t, env.IsWorkflowCompleted()) From eac75d8a5c6c7d5123c58d2dd09a69c3e2da3580 Mon Sep 17 00:00:00 2001 From: Stephan Behnke Date: Fri, 4 Oct 2024 10:01:52 -0700 Subject: [PATCH 12/12] run as method --- early-return/workflow.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/early-return/workflow.go b/early-return/workflow.go index 6b098005..2ea47762 100644 --- a/early-return/workflow.go +++ b/early-return/workflow.go @@ -33,12 +33,16 @@ type Transaction struct { // the initialization in a single round trip, even before the transaction processing completes. The remainder // of the transaction is then processed asynchronously. func Workflow(ctx workflow.Context, tx Transaction) error { + return run(ctx, tx) +} + +func run(ctx workflow.Context, tx Transaction) error { logger := workflow.GetLogger(ctx) if err := workflow.SetUpdateHandler( ctx, UpdateName, - tx.ReturnInitResult, + tx.returnInitResult, ); err != nil { return err } @@ -80,7 +84,7 @@ func Workflow(ctx workflow.Context, tx Transaction) error { return nil } -func (tx *Transaction) ReturnInitResult(ctx workflow.Context) error { +func (tx *Transaction) returnInitResult(ctx workflow.Context) error { if err := workflow.Await(ctx, func() bool { return tx.initDone }); err != nil { return fmt.Errorf("transaction init cancelled: %w", err) }