Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Early-return sample #366

Merged
merged 13 commits into from
Oct 4, 2024
17 changes: 17 additions & 0 deletions early-return/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
### Early-Return Sample

This sample demonstrates an early-return from a workflow.

By utilizing Update-with-Start, a client can start a new workflow and synchronously receive
a response mid-workflow, while the workflow continues to run to completion.

### Steps to run this sample:
1) Run a [Temporal service](https://github.com/temporalio/samples-go/tree/main/#how-to-use).
2) Run the following command to start the worker
```
go run early-return/worker/main.go
```
3) Run the following command to start the example
```
go run early-return/starter/main.go
```
33 changes: 33 additions & 0 deletions early-return/activity.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package earlyreturn

import (
"context"
"errors"

"go.temporal.io/sdk/activity"
)

func InitTransaction(ctx context.Context, transactionId, fromAccount, toAccount string, amount float64) error {
logger := activity.GetLogger(ctx)
if fromAccount == "" {
return errors.New("invalid fromAccount")
}
if toAccount == "" {
return errors.New("invalid toAccount")
}
if amount == 0 {
return errors.New("invalid amount")
}
logger.Info("Transaction initialized")
return nil
}

func CancelTransaction(ctx context.Context, transactionId string) {
stephanos marked this conversation as resolved.
Show resolved Hide resolved
logger := activity.GetLogger(ctx)
logger.Info("Transaction cancelled")
}

func CompleteTransaction(ctx context.Context, transactionId string) {
logger := activity.GetLogger(ctx)
logger.Info("Transaction completed")
}
56 changes: 56 additions & 0 deletions early-return/starter/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package main

import (
"context"
"log"
"time"

"github.com/pborman/uuid"
"github.com/temporalio/samples-go/early-return"
"go.temporal.io/sdk/client"
)

func main() {
c, err := client.Dial(client.Options{})
if err != nil {
log.Fatalln("Unable to create client", err)
}
defer c.Close()

ctxWithTimeout, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

updateOperation := client.NewUpdateWithStartWorkflowOperation(
client.UpdateWorkflowOptions{
UpdateName: earlyreturn.UpdateName,
WaitForStage: client.WorkflowUpdateStageCompleted,
})

txId := uuid.New()
workflowOptions := client.StartWorkflowOptions{
ID: "early-return-workflow-ID-" + txId,
TaskQueue: earlyreturn.TaskQueueName,
WithStartOperation: updateOperation,
}
we, err := c.ExecuteWorkflow(ctxWithTimeout, workflowOptions, earlyreturn.Workflow, txId, "bob", "alice", 100.0)
if err != nil {
log.Fatalln("Error executing workflow:", err)
}

log.Println("Started workflow", "WorkflowID", we.GetID(), "RunID", we.GetRunID())

updateHandle, err := updateOperation.Get(ctxWithTimeout)
if err != nil {
log.Fatalln("Error obtaining update handle:", err)
}

err = updateHandle.Get(ctxWithTimeout, nil)
if err != nil {
// NOTE: If the error is retryable, a retry attempt must use a unique workflow ID.
log.Fatalln("Error obtaining update result:", err)
}

log.Println("Transaction completed successfully")
stephanos marked this conversation as resolved.
Show resolved Hide resolved

// The workflow will continue running, either completing or cancelling the transaction.
}
27 changes: 27 additions & 0 deletions early-return/worker/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package main

import (
"log"

"github.com/temporalio/samples-go/early-return"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/worker"
)

func main() {
// The client and worker are heavyweight objects that should be created once per process.
c, err := client.Dial(client.Options{})
if err != nil {
log.Fatalln("Unable to create client", err)
}
defer c.Close()

w := worker.New(c, earlyreturn.TaskQueueName, worker.Options{})

w.RegisterWorkflow(earlyreturn.Workflow)

err = w.Run(worker.InterruptCh())
if err != nil {
log.Fatalln("Unable to start worker", err)
}
}
77 changes: 77 additions & 0 deletions early-return/workflow.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package earlyreturn

import (
"errors"
"fmt"
"time"

"go.temporal.io/sdk/workflow"
)

const (
UpdateName = "early-return"
TaskQueueName = "early-return-tq"
)

var (
stephanos marked this conversation as resolved.
Show resolved Hide resolved
activityTimeout = 2 * time.Second
updateTimeout = 5 * time.Second
)

// Workflow processes a transaction in two phases. First, the transaction is initialized, and if successful,
// it proceeds to completion. However, if initialization fails - due to validation errors or transient
// issues (e.g., network connectivity problems) - the transaction is cancelled.
//
// By utilizing Update-with-Start, the client can initiate the workflow and immediately receive the result of
// the initialization in a single round trip, even before the transaction processing completes. The remainder
// of the transaction is then processed asynchronously.
func Workflow(ctx workflow.Context, transactionId, fromAccount, toAccount string, amount float64) error {
stephanos marked this conversation as resolved.
Show resolved Hide resolved
var initErr error
var initDone bool
logger := workflow.GetLogger(ctx)

if err := workflow.SetUpdateHandler(
ctx,
UpdateName,
func(ctx workflow.Context) error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Feel free to make a struct with your state and your run as a method and your update handler as a method instead of all in one function. What is here is fine of course, but usually when workflows branch out to handlers and many anonymous functions, it is clearer to use traditional structs with method declarations.

Copy link
Contributor Author

@stephanos stephanos Oct 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea 👍 Only part I don't quite follow is the "run as a method". Is that possible in the Go SDK? I saw an error when trying to register the workflow method of the struct and couldn't find any example doing that (only for activities).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only part I don't quite follow is the "run as a method". Is that possible in the Go SDK?

You'd do the wrapping w/ a one-liner, so something like:

type myWorkflow struct { SomeState }

func MyWorkflow(ctx workflow.Context, someState SomeState) (*SomeResult, error) {
    return myWorkflow{ someState }.run(ctx)
}

func (m *myWorkflow) run(ctx workflow.Context) (*SomeResult, error) {
    // This kind of setup could go into a newMyWorkflow(...) call instead of in here
    if err := workflow.SetUpdateHandler(ctx, "myUpdate", m.myUpdate); err != nil {
        return nil, err
    }
    panic("TODO")
}

func (m *myWorkflow) myUpdate(ctx workflow.Context, someParam SomeParam) (*SomeUpdateResult, error) {
    panic("TODO")
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 I thought I might have missed a trick to do it with a single method

condition := func() bool { return initDone }
if completed, err := workflow.AwaitWithTimeout(ctx, updateTimeout, condition); err != nil {
stephanos marked this conversation as resolved.
Show resolved Hide resolved
return fmt.Errorf("update cancelled: %w", err)
} else if !completed {
return errors.New("update timed out")
}
return initErr
},
); err != nil {
return err
}

// Phase 1: Initialize the transaction synchronously.
Copy link
Member

@cretz cretz Oct 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Arguably this logic could be flipped and users may prefer that in many scenarios. Can flip where the update is the init and the primary workflow waits for an init update before continuing. There are tradeoffs to both.

Copy link
Contributor Author

@stephanos stephanos Oct 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In all my examples until now, I've actually had it flipped. Drew convinced me to do it the other way around, but I'm not quite sure anymore why. What makes you say that the other way might be more preferable?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since updates are not durable on admitted that is why we did it this way so this is the only safe way to write this type of workflow. if I remember correctly

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't necessarily think it's more preferable, there are just tradeoffs. The main tradeoff is probably what you want the workflow to do when it's not called via update with start. If you want it to function normally, no problem, if you want it to wait for an update to get it moving, probably want logic flipped.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It isn't more preferable this is the only safe way to write it since update with start is not transactional

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My main reasoning is what chad said: you want the workflow to function properly when the client doesn't call it with an update.
Quinn's reasoning makes sense too.

Copy link
Member

@cretz cretz Oct 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It isn't more preferable this is the only safe way to write it since update with start is not transactional

Do we at least guarantee the update and the start are in the same task? If we don't, all latency bets are off anyways. But whether primary workflow waits on init from update or update waits on init from primary workflow is immaterial I'd think (except if the update can come in a separate task which would be a concern).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do guarantee it for Update-with-Start.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 Then yeah I think it's probably just semantics on which coroutine waits on the other and probably doesn't matter

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another strong reason to do it this way is that, if the update did the init, then the workflow author has to make sure the workflow is correct in the face of multiple calls to the update handler, i.e. normal updates being sent subsequent to the update with start. But with all steps in the main workflow, multiple calls to the update handler are automatically correct.

//
// By using a local activity, an additional server roundtrip is avoided.
// See https://docs.temporal.io/activities#local-activity for more details.

activityOptions := workflow.WithLocalActivityOptions(ctx, workflow.LocalActivityOptions{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I doubt you'll want the default retry options with such an aggressive schedule to close of 2s

ScheduleToCloseTimeout: activityTimeout,
})
initErr = workflow.ExecuteLocalActivity(
activityOptions, InitTransaction, transactionId, fromAccount, toAccount, amount,
).Get(ctx, nil)
initDone = true

// Phase 2: Complete or cancel the transaction asychronously.
activityCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: 10 * time.Second,
Copy link
Member

@cretz cretz Oct 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is confusing that the activityTimeout global is only used once and is for local activity timeout and this isn't even a global and is actually an activity timeout. Arguably there is no need for these single-use globals instead of just inlining, but if there is, consider consistently using globals for these.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right; I've just made them global to make it easier to see at a glance what the timeouts are without reading line-by-line.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But you've only made some global and the global name is ambiguous because it's not general activity timeout (that's hardcoded right here), it's init transaction timeout.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can you use a different, longer timeout here? since this is the async part, and I think 10s was used elsewhere? I think 30s is a fairly standard timeout for a rando activity.

})
if initErr != nil {
logger.Info("cancelling transaction due to error: %v", initErr)

// Transaction failed to be initialized or not quickly enough; cancel the transaction.
return workflow.ExecuteActivity(activityCtx, CancelTransaction, transactionId).Get(ctx, nil)
}

logger.Info("completing transaction")

// Transaction was initialized successfully; complete the transaction.
return workflow.ExecuteActivity(activityCtx, CompleteTransaction, transactionId).Get(ctx, nil)
}