Skip to content

Commit

Permalink
Merge branch 'main' into batch-sliding-window
Browse files Browse the repository at this point in the history
  • Loading branch information
mfateev authored Aug 30, 2023
2 parents 37d669d + 6b93fe4 commit 714af64
Show file tree
Hide file tree
Showing 17 changed files with 877 additions and 94 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,6 @@ jobs:
submodules: recursive
- uses: actions/setup-go@v2
with:
go-version: "1.18"
go-version: "1.19"
- name: CI Build
run: make ci-build
9 changes: 7 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,7 @@ Each sample demonstrates one feature of the SDK, together with tests.
execute a Child Workflow from a Parent Workflow Execution. A Child Workflow Execution only returns to the Parent
Workflow Execution after completing its last Run.

- [**Child Workflow with
ContinueAsNew**](./child-workflow-continue-as-new): Demonstrates
- [**Child Workflow with ContinueAsNew**](./child-workflow-continue-as-new): Demonstrates
that the call to Continue-As-New, by a Child Workflow Execution, is *not visible to the a parent*. The Parent Workflow
Execution receives a notification only when a Child Workflow Execution completes, fails or times out. This is a useful
feature when there is a need to **process a large set of data**. The child can iterate over the data set calling
Expand Down Expand Up @@ -135,6 +134,9 @@ Each sample demonstrates one feature of the SDK, together with tests.
- [**Interceptors**](./interceptor): Demonstrates how to use
interceptors to intercept calls, in this case for adding context to the logger.

- [**Update**](./update): Demonstrates how to create a workflow that reacts
to workflow update requests.

### Dynamic Workflow logic examples

These samples demonstrate some common control flow patterns using Temporal's Go SDK API.
Expand Down Expand Up @@ -218,6 +220,9 @@ resource waiting its successful completion
- [**Request/Response with Response Queries**](./reqrespquery):
Demonstrates how to accept requests via signals and use queries to poll for responses.

- [**Request/Response with Response Updates**](./reqrespupdate):
Demonstrates how to accept requests and responsond via updates.

### Pending examples

Mostly examples we haven't yet ported from https://github.com/temporalio/samples-java/
Expand Down
23 changes: 23 additions & 0 deletions build-id-versioning/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Build ID Based Versioning
This sample illustrates how to use Build ID based versioning to help you appropriately roll out
incompatible and compatible changes to workflow and activity code for the same task queue.

## Description
The sample shows you how to roll out both a compatible change and an incompatible change to a
workflow.

## Running
1) Run a [Temporal service](https://github.com/temporalio/samples-go/tree/main/#how-to-use).
2) Run
```
go run build-id-versioning/worker/main.go
```
to start the appropriate workers. It will print a task queue name to the console, which you
will need to copy and paste when running the next step. This is to allow running the sample
repeatedly without encountering issues due to Build IDs already existing on the queue.

3) Run
```
go run build-id-versioning/starter/main.go <task queue name>
```
to start the workflows.
155 changes: 155 additions & 0 deletions build-id-versioning/starter/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
package main

import (
"context"
"log"
"os"
"time"

"go.temporal.io/api/workflowservice/v1"

"github.com/pborman/uuid"

"go.temporal.io/sdk/client"
)

func main() {
ctx := context.Background()

// Get task queue name from CLI arg
taskQueue := os.Args[1]
if taskQueue == "" {
log.Fatalln("Must provide task queue name as first and only argument")
}
log.Println("Using task queue", taskQueue)

// The client is a heavyweight object that should be created once per process.
c, err := client.Dial(client.Options{
HostPort: client.DefaultHostPort,
})
if err != nil {
log.Fatalln("Unable to create client", err)
}
defer c.Close()

// First, let's make the task queue use the build id versioning feature by adding an initial
// default version to the queue:
err = c.UpdateWorkerBuildIdCompatibility(ctx, &client.UpdateWorkerBuildIdCompatibilityOptions{
TaskQueue: taskQueue,
Operation: &client.BuildIDOpAddNewIDInNewDefaultSet{
BuildID: "1.0",
},
})
if err != nil {
log.Fatalln("Unable to update worker build id compatibility", err)
}

firstWorkflowID := "build-id-versioning-first_" + uuid.New()
firstWorkflowOptions := client.StartWorkflowOptions{
ID: firstWorkflowID,
TaskQueue: taskQueue,
WorkflowExecutionTimeout: 5 * time.Minute,
}
firstExecution, err := c.ExecuteWorkflow(ctx, firstWorkflowOptions, "SampleChangingWorkflow")
if err != nil {
log.Fatalln("Unable to start workflow", err)
}
log.Println("Started first workflow",
"WorkflowID", firstExecution.GetID(), "RunID", firstExecution.GetRunID())

// Signal this workflow a few times to drive it
for i := 0; i < 3; i++ {
err = c.SignalWorkflow(ctx, firstExecution.GetID(), firstExecution.GetRunID(),
"do-next-signal", "do-activity")
if err != nil {
log.Fatalln("Unable to signal workflow", err)
}
}

// Give a chance for these signals to be processed by the 1.0 worker
time.Sleep(5 * time.Second)

// Now, let's update the task queue with a new compatible version:
err = c.UpdateWorkerBuildIdCompatibility(ctx, &client.UpdateWorkerBuildIdCompatibilityOptions{
TaskQueue: taskQueue,
Operation: &client.BuildIDOpAddNewCompatibleVersion{
BuildID: "1.1",
ExistingCompatibleBuildID: "1.0",
},
})
if err != nil {
log.Fatalln("Unable to update build id compatability", err)
}

// Continue driving the workflow. Take note that the new version of the workflow run by the
// 1.1 worker is the one that takes over! You might see a workflow task timeout, if the 1.0
// worker is processing a task as the version update happens. That's normal.
for i := 0; i < 3; i++ {
err = c.SignalWorkflow(ctx, firstExecution.GetID(), firstExecution.GetRunID(),
"do-next-signal", "do-activity")
if err != nil {
log.Fatalln("Unable to signal workflow", err)
}
}

// Add a new *incompatible* version to the task queue, which will become the new overall default
// for the queue.
err = c.UpdateWorkerBuildIdCompatibility(ctx, &client.UpdateWorkerBuildIdCompatibilityOptions{
TaskQueue: taskQueue,
Operation: &client.BuildIDOpAddNewIDInNewDefaultSet{
BuildID: "2.0",
},
})
if err != nil {
log.Fatalln("Unable to update build id compatability", err)
}

// Start a new workflow, note that it will run on the new 2.0 version, without the client
// invocation changing at all!
secondWorkflowID := "build-id-versioning-second_" + uuid.New()
secondWorkflowOptions := client.StartWorkflowOptions{
ID: secondWorkflowID,
TaskQueue: taskQueue,
WorkflowExecutionTimeout: 5 * time.Minute,
}
secondExecution, err := c.ExecuteWorkflow(ctx, secondWorkflowOptions, "SampleChangingWorkflow")
if err != nil {
log.Fatalln("Unable to start workflow", err)
}
log.Println("Started second workflow",
"WorkflowID", secondExecution.GetID(), "RunID", secondExecution.GetRunID())

// Drive the first workflow to completion, the second will finish on its own
err = c.SignalWorkflow(ctx, firstExecution.GetID(), firstExecution.GetRunID(),
"do-next-signal", "do-activity")
if err != nil {
log.Fatalln("Unable to signal workflow", err)
}
err = c.SignalWorkflow(ctx, firstExecution.GetID(), firstExecution.GetRunID(),
"do-next-signal", "finish")
if err != nil {
log.Fatalln("Unable to signal workflow", err)
}

// Lastly we'll demonstrate how you can use the gRPC api to determine if certain bulid IDs are
// ready to be retied. There's more information in the documentation, but here's a quick example
// that will show us that we can retire the 1.0 worker:
retirementInfo, err := c.WorkflowService().GetWorkerTaskReachability(ctx, &workflowservice.GetWorkerTaskReachabilityRequest{
Namespace: "default",
BuildIds: []string{"1.0"},
})
if err != nil {
log.Fatalln("Unable to get build id reachability", err)
}
reachabilityOf1Dot0 := retirementInfo.GetBuildIdReachability()[0]
noReachableQueues := true
for _, tq := range reachabilityOf1Dot0.GetTaskQueueReachability() {
if tq.GetReachability() != nil && len(tq.GetReachability()) > 0 {
noReachableQueues = false
}
}
if noReachableQueues {
log.Println("We have determined 1.0 is ready to be retired")
}

}
56 changes: 56 additions & 0 deletions build-id-versioning/worker/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package main

import (
"github.com/pborman/uuid"
build_id_versioning "github.com/temporalio/samples-go/build-id-versioning"
"sync"

"log"

"go.temporal.io/sdk/client"
"go.temporal.io/sdk/worker"
"go.temporal.io/sdk/workflow"
)

func main() {
c, err := client.Dial(client.Options{
HostPort: client.DefaultHostPort,
})
if err != nil {
log.Fatalln("Unable to create client", err)
}
defer c.Close()

taskQueue := "build-id-versioning-" + uuid.New()
log.Println("Using Task Queue name: ", taskQueue, "(Copy this!)")

// We will start a handful of workers, each with a different build identifier.
wg := sync.WaitGroup{}
createAndRunWorker(c, taskQueue, "1.0", build_id_versioning.SampleChangingWorkflowV1, &wg)
createAndRunWorker(c, taskQueue, "1.1", build_id_versioning.SampleChangingWorkflowV1b, &wg)
createAndRunWorker(c, taskQueue, "2.0", build_id_versioning.SampleChangingWorkflowV2, &wg)
wg.Wait()
}

func createAndRunWorker(c client.Client, taskQueue, buildID string, workflowFunc func(ctx workflow.Context) error, wg *sync.WaitGroup) {
w := worker.New(c, taskQueue, worker.Options{
// Both of these options must be set to opt into the feature
BuildID: buildID,
UseBuildIDForVersioning: true,
})
// It's important that we register all the different implementations of the workflow using
// the same name. This allows us to demonstrate what would happen if you were making changes
// to this workflow code over time while keeping the same workflow name/type.
w.RegisterWorkflowWithOptions(workflowFunc, workflow.RegisterOptions{Name: "SampleChangingWorkflow"}) //workflowcheck:ignore
w.RegisterActivity(build_id_versioning.SomeActivity)
w.RegisterActivity(build_id_versioning.SomeIncompatibleActivity)

wg.Add(1)
go func() {
defer wg.Done()
err := w.Run(worker.InterruptCh())
if err != nil {
log.Fatalf("Unable to start %s worker: %v", buildID, err)
}
}()
}
Loading

0 comments on commit 714af64

Please sign in to comment.