diff --git a/examples/go.mod b/examples/go.mod index 3a05d251..d504837f 100644 --- a/examples/go.mod +++ b/examples/go.mod @@ -26,7 +26,7 @@ require ( github.com/golang/protobuf v1.5.4 // indirect github.com/kr/pretty v0.3.1 // indirect github.com/marusama/semaphore/v2 v2.5.0 // indirect - github.com/microsoft/durabletask-go v0.5.0 // indirect + github.com/microsoft/durabletask-go v0.5.1-0.20241014200046-fac9dd959f4d // indirect github.com/xhit/go-str2duration/v2 v2.1.0 // indirect go.opentelemetry.io/otel v1.27.0 // indirect go.opentelemetry.io/otel/metric v1.27.0 // indirect diff --git a/examples/go.sum b/examples/go.sum index 28f207fb..b3b2f562 100644 --- a/examples/go.sum +++ b/examples/go.sum @@ -39,8 +39,8 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/marusama/semaphore/v2 v2.5.0 h1:o/1QJD9DBYOWRnDhPwDVAXQn6mQYD0gZaS1Tpx6DJGM= github.com/marusama/semaphore/v2 v2.5.0/go.mod h1:z9nMiNUekt/LTpTUQdpp+4sJeYqUGpwMHfW0Z8V8fnQ= -github.com/microsoft/durabletask-go v0.5.0 h1:4DWBgg05wnkV/VwakaiPqZ4cARvATP74ZQJFcXVMC18= -github.com/microsoft/durabletask-go v0.5.0/go.mod h1:goe2gmMgLptCijMDQ7JsekaR86KjPUG64V9JDXvKBhE= +github.com/microsoft/durabletask-go v0.5.1-0.20241014200046-fac9dd959f4d h1:Phnx8/wPd9BM6RPIjlqNl8kAaFjtU+Sdw9CzmZd8Wsw= +github.com/microsoft/durabletask-go v0.5.1-0.20241014200046-fac9dd959f4d/go.mod h1:goe2gmMgLptCijMDQ7JsekaR86KjPUG64V9JDXvKBhE= github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU= github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE= diff --git a/examples/workflow/README.md b/examples/workflow/README.md index 85e01e74..54b9e5fe 100644 --- a/examples/workflow/README.md +++ b/examples/workflow/README.md @@ -26,16 +26,9 @@ expected_stdout_lines: - '== APP == workflow purged' - '== APP == stage: 2' - '== APP == workflow started with id: a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9' + - '== APP == workflow status: RUNNING' - '== APP == workflow terminated' - '== APP == workflow purged' - - '== APP == workflow client test' - - '== APP == [wfclient] started workflow with id: a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9' - - '== APP == [wfclient] workflow status: RUNNING' - - '== APP == [wfclient] stage: 1' - - '== APP == [wfclient] event raised' - - '== APP == [wfclient] stage: 2' - - '== APP == [wfclient] workflow terminated' - - '== APP == [wfclient] workflow purged' - '== APP == workflow worker successfully shutdown' background: true diff --git a/examples/workflow/main.go b/examples/workflow/main.go index 99c16407..0e5677dd 100644 --- a/examples/workflow/main.go +++ b/examples/workflow/main.go @@ -20,16 +20,11 @@ import ( "log" "time" - "github.com/dapr/go-sdk/client" "github.com/dapr/go-sdk/workflow" ) var stage = 0 -const ( - workflowComponent = "dapr" -) - func main() { w, err := workflow.NewWorker() if err != nil { @@ -54,70 +49,49 @@ func main() { } fmt.Println("runner started") - daprClient, err := client.NewClient() + wfClient, err := workflow.NewClient() if err != nil { log.Fatalf("failed to intialise client: %v", err) } - defer daprClient.Close() + defer wfClient.Close() ctx := context.Background() // Start workflow test - respStart, err := daprClient.StartWorkflowBeta1(ctx, &client.StartWorkflowRequest{ - InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9", - WorkflowComponent: workflowComponent, - WorkflowName: "TestWorkflow", - Options: nil, - Input: 1, - SendRawInput: false, - }) + instanceID, err := wfClient.ScheduleNewWorkflow(ctx, "TestWorkflow", workflow.WithInstanceID("a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9"), workflow.WithInput(1)) if err != nil { log.Fatalf("failed to start workflow: %v", err) } - fmt.Printf("workflow started with id: %v\n", respStart.InstanceID) + fmt.Printf("workflow started with id: %v\n", instanceID) // Pause workflow test - err = daprClient.PauseWorkflowBeta1(ctx, &client.PauseWorkflowRequest{ - InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9", - WorkflowComponent: workflowComponent, - }) - + err = wfClient.SuspendWorkflow(ctx, instanceID, "") if err != nil { log.Fatalf("failed to pause workflow: %v", err) } - respGet, err := daprClient.GetWorkflowBeta1(ctx, &client.GetWorkflowRequest{ - InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9", - WorkflowComponent: workflowComponent, - }) + respFetch, err := wfClient.FetchWorkflowMetadata(ctx, instanceID, workflow.WithFetchPayloads(true)) if err != nil { - log.Fatalf("failed to get workflow: %v", err) + log.Fatalf("failed to fetch workflow: %v", err) } - if respGet.RuntimeStatus != workflow.StatusSuspended.String() { - log.Fatalf("workflow not paused: %v", respGet.RuntimeStatus) + if respFetch.RuntimeStatus != workflow.StatusSuspended { + log.Fatalf("workflow not paused: %v", respFetch.RuntimeStatus) } fmt.Printf("workflow paused\n") // Resume workflow test - err = daprClient.ResumeWorkflowBeta1(ctx, &client.ResumeWorkflowRequest{ - InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9", - WorkflowComponent: workflowComponent, - }) - + err = wfClient.ResumeWorkflow(ctx, instanceID, "") if err != nil { log.Fatalf("failed to resume workflow: %v", err) } - respGet, err = daprClient.GetWorkflowBeta1(ctx, &client.GetWorkflowRequest{ - InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9", - WorkflowComponent: workflowComponent, - }) + respFetch, err = wfClient.FetchWorkflowMetadata(ctx, instanceID, workflow.WithFetchPayloads(true)) if err != nil { log.Fatalf("failed to get workflow: %v", err) } - if respGet.RuntimeStatus != workflow.StatusRunning.String() { + if respFetch.RuntimeStatus != workflow.StatusRunning { log.Fatalf("workflow not running") } @@ -127,14 +101,7 @@ func main() { // Raise Event Test - err = daprClient.RaiseEventWorkflowBeta1(ctx, &client.RaiseEventWorkflowRequest{ - InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9", - WorkflowComponent: workflowComponent, - EventName: "testEvent", - EventData: "testData", - SendRawData: false, - }) - + err = wfClient.RaiseEvent(ctx, instanceID, "testEvent", workflow.WithEventPayload("testData")) if err != nil { fmt.Printf("failed to raise event: %v", err) } @@ -145,31 +112,22 @@ func main() { fmt.Printf("stage: %d\n", stage) - respGet, err = daprClient.GetWorkflowBeta1(ctx, &client.GetWorkflowRequest{ - InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9", - WorkflowComponent: workflowComponent, - }) + respFetch, err = wfClient.FetchWorkflowMetadata(ctx, instanceID, workflow.WithFetchPayloads(true)) if err != nil { log.Fatalf("failed to get workflow: %v", err) } - fmt.Printf("workflow status: %v\n", respGet.RuntimeStatus) + fmt.Printf("workflow status: %v\n", respFetch.RuntimeStatus) // Purge workflow test - err = daprClient.PurgeWorkflowBeta1(ctx, &client.PurgeWorkflowRequest{ - InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9", - WorkflowComponent: workflowComponent, - }) + err = wfClient.PurgeWorkflow(ctx, instanceID) if err != nil { log.Fatalf("failed to purge workflow: %v", err) } - respGet, err = daprClient.GetWorkflowBeta1(ctx, &client.GetWorkflowRequest{ - InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9", - WorkflowComponent: workflowComponent, - }) - if err != nil && respGet != nil { - log.Fatal("failed to purge workflow") + respFetch, err = wfClient.FetchWorkflowMetadata(ctx, instanceID, workflow.WithFetchPayloads(true)) + if err == nil || respFetch != nil { + log.Fatalf("failed to purge workflow: %v", err) } fmt.Println("workflow purged") @@ -177,120 +135,30 @@ func main() { fmt.Printf("stage: %d\n", stage) // Terminate workflow test - respStart, err = daprClient.StartWorkflowBeta1(ctx, &client.StartWorkflowRequest{ - InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9", - WorkflowComponent: workflowComponent, - WorkflowName: "TestWorkflow", - Options: nil, - Input: 1, - SendRawInput: false, - }) + id, err := wfClient.ScheduleNewWorkflow(ctx, "TestWorkflow", workflow.WithInstanceID("a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9"), workflow.WithInput(1)) if err != nil { log.Fatalf("failed to start workflow: %v", err) } + fmt.Printf("workflow started with id: %v\n", instanceID) - fmt.Printf("workflow started with id: %s\n", respStart.InstanceID) - - err = daprClient.TerminateWorkflowBeta1(ctx, &client.TerminateWorkflowRequest{ - InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9", - WorkflowComponent: workflowComponent, - }) + metadata, err := wfClient.WaitForWorkflowStart(ctx, id) if err != nil { - log.Fatalf("failed to terminate workflow: %v", err) + log.Fatalf("failed to get workflow: %v", err) } + fmt.Printf("workflow status: %s\n", metadata.RuntimeStatus.String()) - respGet, err = daprClient.GetWorkflowBeta1(ctx, &client.GetWorkflowRequest{ - InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9", - WorkflowComponent: workflowComponent, - }) + err = wfClient.TerminateWorkflow(ctx, id) if err != nil { - log.Fatalf("failed to get workflow: %v", err) - } - if respGet.RuntimeStatus != workflow.StatusTerminated.String() { - log.Fatal("failed to terminate workflow") + log.Fatalf("failed to terminate workflow: %v", err) } - fmt.Println("workflow terminated") - err = daprClient.PurgeWorkflowBeta1(ctx, &client.PurgeWorkflowRequest{ - InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9", - WorkflowComponent: workflowComponent, - }) - - respGet, err = daprClient.GetWorkflowBeta1(ctx, &client.GetWorkflowRequest{ - InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9", - WorkflowComponent: workflowComponent, - }) - if err == nil || respGet != nil { + err = wfClient.PurgeWorkflow(ctx, id) + if err != nil { log.Fatalf("failed to purge workflow: %v", err) } - fmt.Println("workflow purged") - // WFClient - // TODO: Expand client validation - - stage = 0 - fmt.Println("workflow client test") - - wfClient, err := workflow.NewClient() - if err != nil { - log.Fatalf("[wfclient] faield to initialize: %v", err) - } - - id, err := wfClient.ScheduleNewWorkflow(ctx, "TestWorkflow", workflow.WithInstanceID("a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9"), workflow.WithInput(1)) - if err != nil { - log.Fatalf("[wfclient] failed to start workflow: %v", err) - } - - fmt.Printf("[wfclient] started workflow with id: %s\n", id) - - metadata, err := wfClient.FetchWorkflowMetadata(ctx, id) - if err != nil { - log.Fatalf("[wfclient] failed to get worfklow: %v", err) - } - - fmt.Printf("[wfclient] workflow status: %v\n", metadata.RuntimeStatus.String()) - - if stage != 1 { - log.Fatalf("Workflow assertion failed while validating the wfclient. Stage 1 expected, current: %d", stage) - } - - fmt.Printf("[wfclient] stage: %d\n", stage) - - // TODO: WaitForWorkflowStart - // TODO: WaitForWorkflowCompletion - - // raise event - - if err := wfClient.RaiseEvent(ctx, id, "testEvent", workflow.WithEventPayload("testData")); err != nil { - log.Fatalf("[wfclient] failed to raise event: %v", err) - } - - fmt.Println("[wfclient] event raised") - - // Sleep to allow the workflow to advance - time.Sleep(time.Second) - - if stage != 2 { - log.Fatalf("Workflow assertion failed while validating the wfclient. Stage 2 expected, current: %d", stage) - } - - fmt.Printf("[wfclient] stage: %d\n", stage) - - // stop workflow - if err := wfClient.TerminateWorkflow(ctx, id); err != nil { - log.Fatalf("[wfclient] failed to terminate workflow: %v", err) - } - - fmt.Println("[wfclient] workflow terminated") - - if err := wfClient.PurgeWorkflow(ctx, id); err != nil { - log.Fatalf("[wfclient] failed to purge workflow: %v", err) - } - - fmt.Println("[wfclient] workflow purged") - // stop workflow runtime if err := w.Shutdown(); err != nil { log.Fatalf("failed to shutdown runtime: %v", err) diff --git a/workflow/client.go b/workflow/client.go index 7566ce28..1cbe0229 100644 --- a/workflow/client.go +++ b/workflow/client.go @@ -23,11 +23,13 @@ import ( "github.com/microsoft/durabletask-go/api" "github.com/microsoft/durabletask-go/backend" durabletaskclient "github.com/microsoft/durabletask-go/client" + "google.golang.org/grpc" dapr "github.com/dapr/go-sdk/client" ) type Client struct { + conn *grpc.ClientConn taskHubClient *durabletaskclient.TaskHubGrpcClient } @@ -143,9 +145,11 @@ func NewClient(opts ...clientOption) (*Client, error) { return &Client{}, fmt.Errorf("failed to initialise dapr.Client: %v", err) } - taskHubClient := durabletaskclient.NewTaskHubGrpcClient(daprClient.GrpcClientConn(), backend.DefaultLogger()) + conn := daprClient.GrpcClientConn() + taskHubClient := durabletaskclient.NewTaskHubGrpcClient(conn, backend.DefaultLogger()) return &Client{ + conn: conn, taskHubClient: taskHubClient, }, nil } @@ -241,3 +245,7 @@ func (c *Client) PurgeWorkflow(ctx context.Context, id string, opts ...api.Purge } return c.taskHubClient.PurgeOrchestrationState(ctx, api.InstanceID(id), opts...) } + +func (c *Client) Close() { + _ = c.conn.Close() +}