Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add session failure sample #303

Merged
merged 2 commits into from
Sep 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions session-failure/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
This sample workflow demos how to recover from a session failure inside a workflow

The workflow first creates a session then starts a short activity meant to simulate preparing the session session, then it starts a long running activity on the session worker. If the session worker goes down for any reason the session will fail to heartbeat and be marked as failed. This will cause any activities running on the session to be cancelled and the workflow to retry the whole sequence on a new session after a timeout.

### Note on session failure:

Workflows detect a session worker has gone down through heartbeats by the session worker, so the workflow has a stale view of the session workers state. This is important to consider if your
workflow schedules any activities on a session that can fail due to a timeout. It is possible that when a session worker fails, if your activities timeout is shorter than twice the session heartbeat timeout, your activity may fail with a timeout error and the session state will not be failed yet.

It is also worth noting if a session worker is restarted then it is considered a new session worker and will not pick up any activities scheduled on the old session worker. If you want to be able to keep scheduling activities on the same host after restart look at ../activities-sticky-queues

Steps to run this sample:
1) You need a Temporal service running. See details in README.md
2) Run the following command multiple times on different console window. This is to simulate running workers on multiple different machines.
```
go run session-failure/worker/main.go
```
1) Run the following command to submit a start request for this session failure workflow.
```
go run session-failure/starter/main.go
```
1) If you want to observe the workflow recover from a failed session you can restart
the worker you launched in step 2).

You should see that all activities for one particular workflow execution are scheduled to run on one console window.
42 changes: 42 additions & 0 deletions session-failure/activities.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package fileprocessing

import (
"context"
"time"

"go.temporal.io/sdk/activity"
)

/**
* Sample activities used by session failure sample workflow.
*/

type Activities struct {
}

func (a *Activities) PrepareWorkerActivity(ctx context.Context) error {
logger := activity.GetLogger(ctx)
logger.Info("Preparing session worker")
return nil
}

func (a *Activities) LongRunningActivity(ctx context.Context) error {
logger := activity.GetLogger(ctx)
logger.Info("Started running long running activity.")

hbTicker := time.NewTicker(20 * time.Second)
defer hbTicker.Stop()
// Create a 5 minute timer to simulate an activity doing some long work
timer := time.NewTimer(5 * time.Minute)
defer timer.Stop()
for {
select {
case <-hbTicker.C:
activity.RecordHeartbeat(ctx)
case <-timer.C:
return ctx.Err()
case <-ctx.Done():
return ctx.Err()
}
}
}
33 changes: 33 additions & 0 deletions session-failure/starter/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package main

import (
"context"
"log"

"github.com/pborman/uuid"
"go.temporal.io/sdk/client"

sessionfailure "github.com/temporalio/samples-go/session-failure"
)

func main() {
// The client is a heavyweight object that should be created once per process.
c, err := client.Dial(client.Options{
HostPort: client.DefaultHostPort,
})
if err != nil {
log.Fatalln("Unable to create client", err)
}
defer c.Close()

workflowOptions := client.StartWorkflowOptions{
ID: "session_failure_" + uuid.New(),
TaskQueue: "session-failure",
}

we, err := c.ExecuteWorkflow(context.Background(), workflowOptions, sessionfailure.SampleSessionFailureRecoveryWorkflow)
if err != nil {
log.Fatalln("Unable to execute workflow", err)
}
log.Println("Started workflow", "WorkflowID", we.GetID(), "RunID", we.GetRunID())
}
34 changes: 34 additions & 0 deletions session-failure/worker/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package main

import (
"log"

"go.temporal.io/sdk/client"
"go.temporal.io/sdk/worker"

sessionfailure "github.com/temporalio/samples-go/session-failure"
)

func main() {
// The client and worker are heavyweight objects that should be created once per process.
c, err := client.Dial(client.Options{
HostPort: client.DefaultHostPort,
})
if err != nil {
log.Fatalln("Unable to create client", err)
}
defer c.Close()

workerOptions := worker.Options{
EnableSessionWorker: true, // Important for a worker to participate in the session
}
w := worker.New(c, "session-failure", workerOptions)

w.RegisterWorkflow(sessionfailure.SampleSessionFailureRecoveryWorkflow)
w.RegisterActivity(&sessionfailure.Activities{})

err = w.Run(worker.InterruptCh())
if err != nil {
log.Fatalln("Unable to start worker", err)
}
}
85 changes: 85 additions & 0 deletions session-failure/workflow.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package fileprocessing

import (
"errors"
"time"

"go.temporal.io/sdk/temporal"
"go.temporal.io/sdk/workflow"
)

var (
ErrSessionHostDown = errors.New("session host down")
)

// SampleSessionFailureRecoveryWorkflow workflow definition
func SampleSessionFailureRecoveryWorkflow(ctx workflow.Context) (err error) {
for retryNum := 0; retryNum < 10; retryNum++ {
if err = runSession(ctx); errors.Is(err, ErrSessionHostDown) {
if sleepErr := workflow.Sleep(ctx, 5*time.Minute); sleepErr != nil {
return sleepErr
}
continue
}
if err != nil {
workflow.GetLogger(ctx).Error("Workflow failed.", "Error", err.Error())
} else {
workflow.GetLogger(ctx).Info("Workflow completed.")
}
return
}
workflow.GetLogger(ctx).Error("Workflow failed after multiple session retries.", "Error", err.Error())
return
}

func runSession(ctx workflow.Context) (err error) {

so := &workflow.SessionOptions{
CreationTimeout: time.Minute,
ExecutionTimeout: 20 * time.Minute,
}
sessionCtx, err := workflow.CreateSession(ctx, so)
if err != nil {
// In a production application you may want to distinguish between not being able to create
// a session and a host going down.
if temporal.IsTimeoutError(err) {
workflow.GetLogger(ctx).Error("Session failed", "Error", err.Error())
err = ErrSessionHostDown
}
return err
}

defer func() {
workflow.CompleteSession(sessionCtx)
// If the session host fails any scheduled activity started on the host will be cancelled.
//
// Note: SessionState is inherently a stale view of the session state see the README.md of
// this sample for more details
if workflow.GetSessionInfo(sessionCtx).SessionState == workflow.SessionStateFailed {
err = ErrSessionHostDown
}
}()

ao := workflow.ActivityOptions{
StartToCloseTimeout: 10 * time.Minute,
// When running an activity in a session you don't need to specify a heartbeat timeout to
// detect the host going down, the session heartbeat timeout will handle that for you.
// You may still want to specify a heartbeat timeout if the activity can get stuck or
// you want to record progress with the heartbeat details.
HeartbeatTimeout: 40 * time.Second,
RetryPolicy: &temporal.RetryPolicy{
InitialInterval: time.Second,
BackoffCoefficient: 2.0,
MaximumInterval: time.Minute,
},
}
sessionCtx = workflow.WithActivityOptions(sessionCtx, ao)

var a *Activities
err = workflow.ExecuteActivity(sessionCtx, a.PrepareWorkerActivity).Get(sessionCtx, nil)
if err != nil {
return err
}

return workflow.ExecuteActivity(sessionCtx, a.LongRunningActivity).Get(sessionCtx, nil)
}
40 changes: 40 additions & 0 deletions session-failure/workflow_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package fileprocessing

import (
"testing"

"github.com/stretchr/testify/mock"
"go.temporal.io/sdk/worker"

"github.com/stretchr/testify/suite"
"go.temporal.io/sdk/testsuite"
)

type UnitTestSuite struct {
suite.Suite
testsuite.WorkflowTestSuite
}

func TestUnitTestSuite(t *testing.T) {
suite.Run(t, new(UnitTestSuite))
}

func (s *UnitTestSuite) Test_SampleFileProcessingWorkflow() {
env := s.NewTestWorkflowEnvironment()
env.SetWorkerOptions(worker.Options{
EnableSessionWorker: true, // Important for a worker to participate in the session
})
var a *Activities

env.OnActivity(a.PrepareWorkerActivity, mock.Anything).Return(nil)
env.OnActivity(a.LongRunningActivity, mock.Anything).Return(nil)

env.RegisterActivity(a)

env.ExecuteWorkflow(SampleSessionFailureRecoveryWorkflow)

s.True(env.IsWorkflowCompleted())
s.NoError(env.GetWorkflowError())

env.AssertExpectations(s.T())
}
Loading