Skip to content

Commit

Permalink
Rename Sticky Activities sample (#316)
Browse files Browse the repository at this point in the history
  • Loading branch information
lorensr authored Nov 13, 2023
1 parent 27f21a3 commit 359c36a
Show file tree
Hide file tree
Showing 12 changed files with 122 additions and 117 deletions.
3 changes: 1 addition & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -185,8 +185,7 @@ These samples demonstrate some common control flow patterns using Temporal's Go
- [**Await for signal processing**](./await-signals): Demonstrates how
to process out of order signals processing using `Await` and `AwaitWithTimeout`.

- [**Sticky task queue for activities**](./activities-sticky-queues): Demonstrates how
to create a sticky task queue to run certain activities on the same host.
- [**Worker-specific Task Queues**](./worker-specific-task-queues): Use a unique task queue per Worker to have certain Activities only run on that specific Worker. For instance for a file processing Workflow, where one Activity downloads a file and subsequent Activities need to operate on that file. (If multiple Workers were on the same queue, subsequent Activities may get run on different machines that don't have the downloaded file.)

### Scenario based examples

Expand Down
25 changes: 0 additions & 25 deletions activities-sticky-queues/README.md

This file was deleted.

14 changes: 0 additions & 14 deletions activities-sticky-queues/sticky_task_queue.go

This file was deleted.

59 changes: 0 additions & 59 deletions activities-sticky-queues/worker/main.go

This file was deleted.

2 changes: 1 addition & 1 deletion session-failure/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ The workflow first creates a session then starts a short activity meant to simul
Workflows detect a session worker has gone down through heartbeats by the session worker, so the workflow has a stale view of the session workers state. This is important to consider if your
workflow schedules any activities on a session that can fail due to a timeout. It is possible that when a session worker fails, if your activities timeout is shorter than twice the session heartbeat timeout, your activity may fail with a timeout error and the session state will not be failed yet.

It is also worth noting if a session worker is restarted then it is considered a new session worker and will not pick up any activities scheduled on the old session worker. If you want to be able to keep scheduling activities on the same host after restart look at ../activities-sticky-queues
It is also worth noting if a session worker is restarted then it is considered a new session worker and will not pick up any activities scheduled on the old session worker. If you want to be able to keep scheduling activities on the same host after restart look at ../worker-specific-task-queues

Steps to run this sample:
1) You need a Temporal service running. See details in README.md
Expand Down
30 changes: 30 additions & 0 deletions worker-specific-task-queues/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# Worker-Specific Task Queues

*[Sessions](https://docs.temporal.io/dev-guide/go/features#worker-sessions) are an alternative to Worker-specific Tasks Queues.*

Use a unique Task Queue for each Worker in order to have certain Activities run on a specific Worker.

This is useful in scenarios where multiple Activities need to run in the same process or on the same host, for example to share memory or disk. This sample has a file processing Workflow, where one Activity downloads the file to disk and other Activities process it and clean it up.

The strategy is:

- Each Worker process creates two `worker` instances:
- One instance listens on the `shared-task-queue` Task Queue.
- Another instance listens on a uniquely generated Task Queue (in this case, `uuid` is used, but you can inject smart logic here to uniquely identify the Worker, [as Netflix did](https://community.temporal.io/t/using-dynamic-task-queues-for-traffic-routing/3045)).
- The Workflow and the first Activity are run on `shared-task-queue`.
- The first Activity returns one of the uniquely generated Task Queues (that only one Worker is listening on—i.e. the **Worker-specific Task Queue**).
- The rest of the Activities do the file processing and are run on the Worker-specific Task Queue.

Activities have been artificially slowed with `time.Sleep(3 * time.Second)` to simulate slow activities.

### Running this sample

```bash
go run worker-specific-task-queues/worker/main.go
```

Start the Workflow Execution:

```bash
go run worker-specific-task-queues/starter/main.go
```
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package activities_sticky_queues
package worker_specific_task_queues

import (
"bytes"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (

"go.temporal.io/sdk/client"

activities_sticky_queues "github.com/temporalio/samples-go/activities-sticky-queues"
worker_specific_task_queues "github.com/temporalio/samples-go/worker-specific-task-queues"
)

func main() {
Expand All @@ -18,11 +18,11 @@ func main() {
defer c.Close()

workflowOptions := client.StartWorkflowOptions{
ID: "activities_sticky_queues_WorkflowID",
TaskQueue: "activities-sticky-queues",
ID: "worker_specific_task_queues_WorkflowID",
TaskQueue: "shared-task-queue",
}

we, err := c.ExecuteWorkflow(context.Background(), workflowOptions, activities_sticky_queues.FileProcessingWorkflow)
we, err := c.ExecuteWorkflow(context.Background(), workflowOptions, worker_specific_task_queues.FileProcessingWorkflow)
if err != nil {
log.Fatalln("Unable to execute workflow", err)
}
Expand Down
60 changes: 60 additions & 0 deletions worker-specific-task-queues/worker/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package main

import (
"log"
"sync"

worker_specific_task_queues "github.com/temporalio/samples-go/worker-specific-task-queues"

"go.temporal.io/sdk/activity"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/worker"

"github.com/google/uuid"
)

func main() {
// The client and worker are heavyweight objects that should generally be created once per process.
// In this case, we create a single client but two workers since we need to handle Activities on multiple task queues.
c, err := client.Dial(client.Options{})
if err != nil {
log.Fatalln("Unable to create client", err)
}
defer c.Close()
uniqueTaskQueue := worker_specific_task_queues.WorkerSpecificTaskQueue{
TaskQueue: uuid.New().String(),
}
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
w := worker.New(c, "shared-task-queue", worker.Options{})
w.RegisterWorkflow(worker_specific_task_queues.FileProcessingWorkflow)

w.RegisterActivityWithOptions(uniqueTaskQueue.GetWorkerSpecificTaskQueue, activity.RegisterOptions{
Name: "GetWorkerSpecificTaskQueue",
})
err = w.Run(worker.InterruptCh())
if err != nil {
log.Fatalln("Unable to start worker", err)
}
}()

wg.Add(1)
go func() {
defer wg.Done()
// Create a new worker listening on the unique queue
uniqueTaskQueueWorker := worker.New(c, uniqueTaskQueue.TaskQueue, worker.Options{})

uniqueTaskQueueWorker.RegisterActivity(worker_specific_task_queues.DownloadFile)
uniqueTaskQueueWorker.RegisterActivity(worker_specific_task_queues.ProcessFile)
uniqueTaskQueueWorker.RegisterActivity(worker_specific_task_queues.DeleteFile)

err = uniqueTaskQueueWorker.Run(worker.InterruptCh())
if err != nil {
log.Fatalln("Unable to start worker", err)
}
}()
// Wait for both workers to close
wg.Wait()
}
14 changes: 14 additions & 0 deletions worker-specific-task-queues/worker_specific_task_queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package worker_specific_task_queues

import (
"context"
)

type WorkerSpecificTaskQueue struct {
TaskQueue string
}

// GetWorkerSpecificTaskQueue is an activity to get a hosts unique task queue.
func (q WorkerSpecificTaskQueue) GetWorkerSpecificTaskQueue(ctx context.Context) (string, error) {
return q.TaskQueue, nil
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package activities_sticky_queues
package worker_specific_task_queues

import (
"path/filepath"
Expand All @@ -8,15 +8,15 @@ import (
"go.temporal.io/sdk/workflow"
)

// FileProcessingWorkflow is a workflow that uses stick activity queues to process files
// on a consistent host.
// FileProcessingWorkflow is a workflow that uses Worker-specific Task Queues to run multiple Activities on a consistent
// host.
func FileProcessingWorkflow(ctx workflow.Context) (err error) {
ao := workflow.ActivityOptions{
StartToCloseTimeout: time.Minute,
}
ctx = workflow.WithActivityOptions(ctx, ao)
var stickyTaskQueue string
err = workflow.ExecuteActivity(ctx, "GetStickyTaskQueue").Get(ctx, &stickyTaskQueue)
var WorkerSpecificTaskQueue string
err = workflow.ExecuteActivity(ctx, "GetWorkerSpecificTaskQueue").Get(ctx, &WorkerSpecificTaskQueue)
if err != nil {
return
}
Expand All @@ -27,7 +27,7 @@ func FileProcessingWorkflow(ctx workflow.Context) (err error) {
// activities to progress.
ScheduleToCloseTimeout: time.Minute,

TaskQueue: stickyTaskQueue,
TaskQueue: WorkerSpecificTaskQueue,
}
ctx = workflow.WithActivityOptions(ctx, ao)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package activities_sticky_queues
package worker_specific_task_queues

import (
"testing"
Expand All @@ -15,11 +15,11 @@ func Test_Workflow(t *testing.T) {
env := testSuite.NewTestWorkflowEnvironment()

// Mock activity implementation
var a StickyTaskQueue
env.RegisterActivityWithOptions(a.GetStickyTaskQueue, activity.RegisterOptions{
Name: "GetStickyTaskQueue",
var a WorkerSpecificTaskQueue
env.RegisterActivityWithOptions(a.GetWorkerSpecificTaskQueue, activity.RegisterOptions{
Name: "GetWorkerSpecificTaskQueue",
})
env.OnActivity("GetStickyTaskQueue", mock.Anything).Return("unique-sticky-task-queue", nil)
env.OnActivity("GetWorkerSpecificTaskQueue", mock.Anything).Return("unique-task-queue", nil)
env.OnActivity(DownloadFile, mock.Anything, mock.Anything, mock.Anything).Return(nil)
env.OnActivity(ProcessFile, mock.Anything, mock.Anything).Return(nil)
env.OnActivity(DeleteFile, mock.Anything, mock.Anything).Return(nil)
Expand Down

0 comments on commit 359c36a

Please sign in to comment.