Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Early-return sample #366

Merged
merged 13 commits into from
Oct 4, 2024
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,9 @@ resource waiting its successful completion
- [**Request/Response with Response Updates**](./reqrespupdate):
Demonstrates how to accept requests and responsond via updates.

- [**Early-Return**](./early-return):
Demonstrates how to receive a response mid-workflow, while the workflow continues to run to completion.

### Pending examples

Mostly examples we haven't yet ported from https://github.com/temporalio/samples-java/
Expand Down
17 changes: 17 additions & 0 deletions early-return/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
### Early-Return Sample

This sample demonstrates an early-return from a workflow.

By utilizing Update-with-Start, a client can start a new workflow and synchronously receive
a response mid-workflow, while the workflow continues to run to completion.

### Steps to run this sample:
1) Run a [Temporal service](https://github.com/temporalio/samples-go/tree/main/#how-to-use).
2) Run the following command to start the worker
```
go run early-return/worker/main.go
```
3) Run the following command to start the example
```
go run early-return/starter/main.go
```
57 changes: 57 additions & 0 deletions early-return/starter/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package main

import (
"context"
"log"
"time"

"github.com/pborman/uuid"
"github.com/temporalio/samples-go/early-return"
"go.temporal.io/sdk/client"
)

func main() {
c, err := client.Dial(client.Options{})
if err != nil {
log.Fatalln("Unable to create client", err)
}
defer c.Close()

ctxWithTimeout, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

updateOperation := client.NewUpdateWithStartWorkflowOperation(
client.UpdateWorkflowOptions{
UpdateName: earlyreturn.UpdateName,
WaitForStage: client.WorkflowUpdateStageCompleted,
})

tx := earlyreturn.Transaction{ID: uuid.New(), SourceAccount: "Bob", TargetAccount: "Alice", Amount: 100}
workflowOptions := client.StartWorkflowOptions{
ID: "early-return-workflow-ID-" + tx.ID,
TaskQueue: earlyreturn.TaskQueueName,
WithStartOperation: updateOperation,
}
we, err := c.ExecuteWorkflow(ctxWithTimeout, workflowOptions, earlyreturn.Workflow, tx)
if err != nil {
log.Fatalln("Error executing workflow:", err)
}

log.Println("Started workflow", "WorkflowID", we.GetID(), "RunID", we.GetRunID())

updateHandle, err := updateOperation.Get(ctxWithTimeout)
if err != nil {
log.Fatalln("Error obtaining update handle:", err)
}

err = updateHandle.Get(ctxWithTimeout, nil)
if err != nil {
// The workflow will continue running, cancelling the transaction.

// NOTE: If the error is retryable, a retry attempt must use a unique workflow ID.
log.Fatalln("Error obtaining update result:", err)
}

log.Println("Transaction initialized successfully")
// The workflow will continue running, completing the transaction.
}
27 changes: 27 additions & 0 deletions early-return/worker/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package main

import (
"log"

"github.com/temporalio/samples-go/early-return"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/worker"
)

func main() {
// The client and worker are heavyweight objects that should be created once per process.
c, err := client.Dial(client.Options{})
if err != nil {
log.Fatalln("Unable to create client", err)
}
defer c.Close()

w := worker.New(c, earlyreturn.TaskQueueName, worker.Options{})

w.RegisterWorkflow(earlyreturn.Workflow)

err = w.Run(worker.InterruptCh())
if err != nil {
log.Fatalln("Unable to start worker", err)
}
}
116 changes: 116 additions & 0 deletions early-return/workflow.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
package earlyreturn

import (
"context"
"errors"
"fmt"
"time"

"go.temporal.io/sdk/activity"
"go.temporal.io/sdk/workflow"
)

const (
UpdateName = "early-return"
TaskQueueName = "early-return-tq"
)

type Transaction struct {
ID string
SourceAccount string
TargetAccount string
Amount int // in cents

initErr error
initDone bool
}

// Workflow processes a transaction in two phases. First, the transaction is initialized, and if successful,
// it proceeds to completion. However, if initialization fails - due to validation errors or transient
// issues (e.g., network connectivity problems) - the transaction is cancelled.
//
// By utilizing Update-with-Start, the client can initiate the workflow and immediately receive the result of
// the initialization in a single round trip, even before the transaction processing completes. The remainder
// of the transaction is then processed asynchronously.
func Workflow(ctx workflow.Context, tx Transaction) error {
return run(ctx, tx)
}

func run(ctx workflow.Context, tx Transaction) error {
logger := workflow.GetLogger(ctx)

if err := workflow.SetUpdateHandler(
ctx,
UpdateName,
tx.returnInitResult,
); err != nil {
return err
}

// Phase 1: Initialize the transaction synchronously.
Copy link
Member

@cretz cretz Oct 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Arguably this logic could be flipped and users may prefer that in many scenarios. Can flip where the update is the init and the primary workflow waits for an init update before continuing. There are tradeoffs to both.

Copy link
Contributor Author

@stephanos stephanos Oct 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In all my examples until now, I've actually had it flipped. Drew convinced me to do it the other way around, but I'm not quite sure anymore why. What makes you say that the other way might be more preferable?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since updates are not durable on admitted that is why we did it this way so this is the only safe way to write this type of workflow. if I remember correctly

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't necessarily think it's more preferable, there are just tradeoffs. The main tradeoff is probably what you want the workflow to do when it's not called via update with start. If you want it to function normally, no problem, if you want it to wait for an update to get it moving, probably want logic flipped.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It isn't more preferable this is the only safe way to write it since update with start is not transactional

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My main reasoning is what chad said: you want the workflow to function properly when the client doesn't call it with an update.
Quinn's reasoning makes sense too.

Copy link
Member

@cretz cretz Oct 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It isn't more preferable this is the only safe way to write it since update with start is not transactional

Do we at least guarantee the update and the start are in the same task? If we don't, all latency bets are off anyways. But whether primary workflow waits on init from update or update waits on init from primary workflow is immaterial I'd think (except if the update can come in a separate task which would be a concern).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do guarantee it for Update-with-Start.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 Then yeah I think it's probably just semantics on which coroutine waits on the other and probably doesn't matter

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another strong reason to do it this way is that, if the update did the init, then the workflow author has to make sure the workflow is correct in the face of multiple calls to the update handler, i.e. normal updates being sent subsequent to the update with start. But with all steps in the main workflow, multiple calls to the update handler are automatically correct.

//
// By using a local activity, an additional server roundtrip is avoided.
// See https://docs.temporal.io/activities#local-activity for more details.

activityOptions := workflow.WithLocalActivityOptions(ctx, workflow.LocalActivityOptions{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I doubt you'll want the default retry options with such an aggressive schedule to close of 2s

ScheduleToCloseTimeout: 5 * time.Second, // short timeout to avoid another Workflow Task being scheduled
})
tx.initErr = workflow.ExecuteLocalActivity(activityOptions, tx.InitTransaction).Get(ctx, nil)
tx.initDone = true

// Phase 2: Complete or cancel the transaction asychronously.

activityCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: 30 * time.Second,
})
if tx.initErr != nil {
logger.Error(fmt.Sprintf("cancelling transaction due to init error: %v", tx.initErr))

// Transaction failed to be initialized or not quickly enough; cancel the transaction.
if err := workflow.ExecuteActivity(activityCtx, tx.CancelTransaction).Get(ctx, nil); err != nil {
return fmt.Errorf("cancelling the transaction failed: %w", err)
}

return tx.initErr
}

logger.Info("completing transaction")

// Transaction was initialized successfully; complete the transaction.
if err := workflow.ExecuteActivity(activityCtx, tx.CompleteTransaction).Get(ctx, nil); err != nil {
return fmt.Errorf("completing the transaction failed: %w", err)
}

return nil
}

func (tx *Transaction) returnInitResult(ctx workflow.Context) error {
if err := workflow.Await(ctx, func() bool { return tx.initDone }); err != nil {
return fmt.Errorf("transaction init cancelled: %w", err)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAICT, this is the only untested line of the workflow. I'm not quite sure how to use the Go SDK testing env to trigger this case.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You would need to test cancellation using https://pkg.go.dev/go.temporal.io/[email protected]/internal#TestWorkflowEnvironment.CancelWorkflow no? Not saying you need to for this sample though.

Copy link
Contributor Author

@stephanos stephanos Oct 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, that's the missing piece! Yeah, I agree, it's prob fine without.

}
return tx.initErr
}

func (tx *Transaction) InitTransaction(ctx context.Context) error {
logger := activity.GetLogger(ctx)
if tx.Amount <= 0 {
return errors.New("invalid Amount")
}
time.Sleep(500 * time.Millisecond)
logger.Info("Transaction initialized")
return nil
}

func (tx *Transaction) CancelTransaction(ctx context.Context) error {
logger := activity.GetLogger(ctx)
time.Sleep(1 * time.Second)
logger.Info("Transaction cancelled")
return nil
}

func (tx *Transaction) CompleteTransaction(ctx context.Context) error {
logger := activity.GetLogger(ctx)
time.Sleep(1 * time.Second)
logger.Info("Transaction completed")
return nil
}
102 changes: 102 additions & 0 deletions early-return/workflow_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package earlyreturn

import (
"fmt"
"testing"

"github.com/pborman/uuid"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"go.temporal.io/sdk/testsuite"
)

func Test_CompleteTransaction(t *testing.T) {
testSuite := &testsuite.WorkflowTestSuite{}
env := testSuite.NewTestWorkflowEnvironment()

tx := Transaction{ID: uuid.New(), SourceAccount: "Bob", TargetAccount: "Alice", Amount: 100}
env.RegisterActivity(tx.InitTransaction)
env.RegisterActivity(tx.CompleteTransaction)

uc := &updateCallback{}
env.RegisterDelayedCallback(func() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: would add a comment explaining this will guarantee the update is sent in the first WFT.

Copy link

@drewhoskins-temporal drewhoskins-temporal Oct 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW I'm confused even after Quinn's comment. Why is this test seemingly not using the normal interface for UwS?
I assume you have a good reason related to the Go test library and that there's nothing fixable in this PR. But I'm wondering if there's something we can improve as part of UwS public preview or GA.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it not backed by the Java test service?

Copy link
Contributor Author

@stephanos stephanos Oct 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct; Go SDK has it's own time-skipping test server, and it has its own APIs. We haven't added an UwS API since this approach here works, too. But I agree, it's not immediately obvious (at least wasn't for me, had to ask Quinn).

env.UpdateWorkflow(UpdateName, uuid.New(), uc)
}, 0) // NOTE: zero delay ensures Update is delivered in first workflow task
env.ExecuteWorkflow(Workflow, tx)

require.True(t, env.IsWorkflowCompleted())
require.NoError(t, env.GetWorkflowError())
require.NoError(t, uc.completeErr)
}

func Test_CompleteTransaction_Fails(t *testing.T) {
testSuite := &testsuite.WorkflowTestSuite{}
env := testSuite.NewTestWorkflowEnvironment()

tx := Transaction{ID: uuid.New(), SourceAccount: "Bob", TargetAccount: "Alice", Amount: 100}
env.RegisterActivity(tx.InitTransaction)
env.RegisterActivity(tx.CompleteTransaction)

env.OnActivity(tx.CompleteTransaction, mock.Anything).Return(fmt.Errorf("crash"))

uc := &updateCallback{}
env.RegisterDelayedCallback(func() {
env.UpdateWorkflow(UpdateName, uuid.New(), uc)
}, 0)
env.ExecuteWorkflow(Workflow, tx)

require.True(t, env.IsWorkflowCompleted())
require.ErrorContains(t, env.GetWorkflowError(), "crash")
}

func Test_CancelTransaction(t *testing.T) {
testSuite := &testsuite.WorkflowTestSuite{}
env := testSuite.NewTestWorkflowEnvironment()

tx := Transaction{ID: uuid.New(), SourceAccount: "Bob", TargetAccount: "Alice", Amount: -1} // invalid!
env.RegisterActivity(tx.InitTransaction)
env.RegisterActivity(tx.CancelTransaction)

uc := &updateCallback{}
env.RegisterDelayedCallback(func() {
env.UpdateWorkflow(UpdateName, uuid.New(), uc)
}, 0)
env.ExecuteWorkflow(Workflow, tx)

require.True(t, env.IsWorkflowCompleted())
require.ErrorContains(t, uc.completeErr, "invalid Amount")
require.ErrorContains(t, env.GetWorkflowError(), "invalid Amount")
}

func Test_CancelTransaction_Fails(t *testing.T) {
testSuite := &testsuite.WorkflowTestSuite{}
env := testSuite.NewTestWorkflowEnvironment()

tx := Transaction{ID: uuid.New(), SourceAccount: "Bob", TargetAccount: "Alice", Amount: -1} // invalid!
env.RegisterActivity(tx.InitTransaction)
env.RegisterActivity(tx.CancelTransaction)

env.OnActivity(tx.CancelTransaction, mock.Anything).Return(fmt.Errorf("crash"))

uc := &updateCallback{}
env.RegisterDelayedCallback(func() {
env.UpdateWorkflow(UpdateName, uuid.New(), uc)
}, 0)
env.ExecuteWorkflow(Workflow, tx)

require.True(t, env.IsWorkflowCompleted())
require.ErrorContains(t, uc.completeErr, "invalid Amount")
require.ErrorContains(t, env.GetWorkflowError(), "crash")
}

type updateCallback struct {
completeErr error
}

func (uc *updateCallback) Accept() {}

func (uc *updateCallback) Reject(err error) {}

func (uc *updateCallback) Complete(success interface{}, err error) {
uc.completeErr = err
}