Skip to content

Commit

Permalink
workflow examples: remove use of deprecated functions (#640)
Browse files Browse the repository at this point in the history
* workflow examples: remove use of deprecated functions

Signed-off-by: Fabian Martinez <[email protected]>

* fix example tests

Signed-off-by: Fabian Martinez <[email protected]>

* Update README.md

Signed-off-by: Fabian Martinez <[email protected]>

* Update Makefile

Signed-off-by: Fabian Martinez <[email protected]>

---------

Signed-off-by: Fabian Martinez <[email protected]>
  • Loading branch information
famarting authored Oct 30, 2024
1 parent 516684c commit dd9a2d5
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 172 deletions.
2 changes: 1 addition & 1 deletion examples/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ require (
github.com/golang/protobuf v1.5.4 // indirect
github.com/kr/pretty v0.3.1 // indirect
github.com/marusama/semaphore/v2 v2.5.0 // indirect
github.com/microsoft/durabletask-go v0.5.0 // indirect
github.com/microsoft/durabletask-go v0.5.1-0.20241014200046-fac9dd959f4d // indirect
github.com/xhit/go-str2duration/v2 v2.1.0 // indirect
go.opentelemetry.io/otel v1.27.0 // indirect
go.opentelemetry.io/otel/metric v1.27.0 // indirect
Expand Down
4 changes: 2 additions & 2 deletions examples/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/marusama/semaphore/v2 v2.5.0 h1:o/1QJD9DBYOWRnDhPwDVAXQn6mQYD0gZaS1Tpx6DJGM=
github.com/marusama/semaphore/v2 v2.5.0/go.mod h1:z9nMiNUekt/LTpTUQdpp+4sJeYqUGpwMHfW0Z8V8fnQ=
github.com/microsoft/durabletask-go v0.5.0 h1:4DWBgg05wnkV/VwakaiPqZ4cARvATP74ZQJFcXVMC18=
github.com/microsoft/durabletask-go v0.5.0/go.mod h1:goe2gmMgLptCijMDQ7JsekaR86KjPUG64V9JDXvKBhE=
github.com/microsoft/durabletask-go v0.5.1-0.20241014200046-fac9dd959f4d h1:Phnx8/wPd9BM6RPIjlqNl8kAaFjtU+Sdw9CzmZd8Wsw=
github.com/microsoft/durabletask-go v0.5.1-0.20241014200046-fac9dd959f4d/go.mod h1:goe2gmMgLptCijMDQ7JsekaR86KjPUG64V9JDXvKBhE=
github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU=
github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE=
Expand Down
9 changes: 1 addition & 8 deletions examples/workflow/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,9 @@ expected_stdout_lines:
- '== APP == workflow purged'
- '== APP == stage: 2'
- '== APP == workflow started with id: a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9'
- '== APP == workflow status: RUNNING'
- '== APP == workflow terminated'
- '== APP == workflow purged'
- '== APP == workflow client test'
- '== APP == [wfclient] started workflow with id: a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9'
- '== APP == [wfclient] workflow status: RUNNING'
- '== APP == [wfclient] stage: 1'
- '== APP == [wfclient] event raised'
- '== APP == [wfclient] stage: 2'
- '== APP == [wfclient] workflow terminated'
- '== APP == [wfclient] workflow purged'
- '== APP == workflow worker successfully shutdown'
background: true
Expand Down
188 changes: 28 additions & 160 deletions examples/workflow/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,11 @@ import (
"log"
"time"

"github.com/dapr/go-sdk/client"
"github.com/dapr/go-sdk/workflow"
)

var stage = 0

const (
workflowComponent = "dapr"
)

func main() {
w, err := workflow.NewWorker()
if err != nil {
Expand All @@ -54,70 +49,49 @@ func main() {
}
fmt.Println("runner started")

daprClient, err := client.NewClient()
wfClient, err := workflow.NewClient()
if err != nil {
log.Fatalf("failed to intialise client: %v", err)
}
defer daprClient.Close()
defer wfClient.Close()
ctx := context.Background()

// Start workflow test
respStart, err := daprClient.StartWorkflowBeta1(ctx, &client.StartWorkflowRequest{
InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
WorkflowComponent: workflowComponent,
WorkflowName: "TestWorkflow",
Options: nil,
Input: 1,
SendRawInput: false,
})
instanceID, err := wfClient.ScheduleNewWorkflow(ctx, "TestWorkflow", workflow.WithInstanceID("a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9"), workflow.WithInput(1))
if err != nil {
log.Fatalf("failed to start workflow: %v", err)
}
fmt.Printf("workflow started with id: %v\n", respStart.InstanceID)
fmt.Printf("workflow started with id: %v\n", instanceID)

// Pause workflow test
err = daprClient.PauseWorkflowBeta1(ctx, &client.PauseWorkflowRequest{
InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
WorkflowComponent: workflowComponent,
})

err = wfClient.SuspendWorkflow(ctx, instanceID, "")
if err != nil {
log.Fatalf("failed to pause workflow: %v", err)
}

respGet, err := daprClient.GetWorkflowBeta1(ctx, &client.GetWorkflowRequest{
InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
WorkflowComponent: workflowComponent,
})
respFetch, err := wfClient.FetchWorkflowMetadata(ctx, instanceID, workflow.WithFetchPayloads(true))
if err != nil {
log.Fatalf("failed to get workflow: %v", err)
log.Fatalf("failed to fetch workflow: %v", err)
}

if respGet.RuntimeStatus != workflow.StatusSuspended.String() {
log.Fatalf("workflow not paused: %v", respGet.RuntimeStatus)
if respFetch.RuntimeStatus != workflow.StatusSuspended {
log.Fatalf("workflow not paused: %v", respFetch.RuntimeStatus)
}

fmt.Printf("workflow paused\n")

// Resume workflow test
err = daprClient.ResumeWorkflowBeta1(ctx, &client.ResumeWorkflowRequest{
InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
WorkflowComponent: workflowComponent,
})

err = wfClient.ResumeWorkflow(ctx, instanceID, "")
if err != nil {
log.Fatalf("failed to resume workflow: %v", err)
}

respGet, err = daprClient.GetWorkflowBeta1(ctx, &client.GetWorkflowRequest{
InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
WorkflowComponent: workflowComponent,
})
respFetch, err = wfClient.FetchWorkflowMetadata(ctx, instanceID, workflow.WithFetchPayloads(true))
if err != nil {
log.Fatalf("failed to get workflow: %v", err)
}

if respGet.RuntimeStatus != workflow.StatusRunning.String() {
if respFetch.RuntimeStatus != workflow.StatusRunning {
log.Fatalf("workflow not running")
}

Expand All @@ -127,14 +101,7 @@ func main() {

// Raise Event Test

err = daprClient.RaiseEventWorkflowBeta1(ctx, &client.RaiseEventWorkflowRequest{
InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
WorkflowComponent: workflowComponent,
EventName: "testEvent",
EventData: "testData",
SendRawData: false,
})

err = wfClient.RaiseEvent(ctx, instanceID, "testEvent", workflow.WithEventPayload("testData"))
if err != nil {
fmt.Printf("failed to raise event: %v", err)
}
Expand All @@ -145,152 +112,53 @@ func main() {

fmt.Printf("stage: %d\n", stage)

respGet, err = daprClient.GetWorkflowBeta1(ctx, &client.GetWorkflowRequest{
InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
WorkflowComponent: workflowComponent,
})
respFetch, err = wfClient.FetchWorkflowMetadata(ctx, instanceID, workflow.WithFetchPayloads(true))
if err != nil {
log.Fatalf("failed to get workflow: %v", err)
}

fmt.Printf("workflow status: %v\n", respGet.RuntimeStatus)
fmt.Printf("workflow status: %v\n", respFetch.RuntimeStatus)

// Purge workflow test
err = daprClient.PurgeWorkflowBeta1(ctx, &client.PurgeWorkflowRequest{
InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
WorkflowComponent: workflowComponent,
})
err = wfClient.PurgeWorkflow(ctx, instanceID)
if err != nil {
log.Fatalf("failed to purge workflow: %v", err)
}

respGet, err = daprClient.GetWorkflowBeta1(ctx, &client.GetWorkflowRequest{
InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
WorkflowComponent: workflowComponent,
})
if err != nil && respGet != nil {
log.Fatal("failed to purge workflow")
respFetch, err = wfClient.FetchWorkflowMetadata(ctx, instanceID, workflow.WithFetchPayloads(true))
if err == nil || respFetch != nil {
log.Fatalf("failed to purge workflow: %v", err)
}

fmt.Println("workflow purged")

fmt.Printf("stage: %d\n", stage)

// Terminate workflow test
respStart, err = daprClient.StartWorkflowBeta1(ctx, &client.StartWorkflowRequest{
InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
WorkflowComponent: workflowComponent,
WorkflowName: "TestWorkflow",
Options: nil,
Input: 1,
SendRawInput: false,
})
id, err := wfClient.ScheduleNewWorkflow(ctx, "TestWorkflow", workflow.WithInstanceID("a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9"), workflow.WithInput(1))
if err != nil {
log.Fatalf("failed to start workflow: %v", err)
}
fmt.Printf("workflow started with id: %v\n", instanceID)

fmt.Printf("workflow started with id: %s\n", respStart.InstanceID)

err = daprClient.TerminateWorkflowBeta1(ctx, &client.TerminateWorkflowRequest{
InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
WorkflowComponent: workflowComponent,
})
metadata, err := wfClient.WaitForWorkflowStart(ctx, id)
if err != nil {
log.Fatalf("failed to terminate workflow: %v", err)
log.Fatalf("failed to get workflow: %v", err)
}
fmt.Printf("workflow status: %s\n", metadata.RuntimeStatus.String())

respGet, err = daprClient.GetWorkflowBeta1(ctx, &client.GetWorkflowRequest{
InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
WorkflowComponent: workflowComponent,
})
err = wfClient.TerminateWorkflow(ctx, id)
if err != nil {
log.Fatalf("failed to get workflow: %v", err)
}
if respGet.RuntimeStatus != workflow.StatusTerminated.String() {
log.Fatal("failed to terminate workflow")
log.Fatalf("failed to terminate workflow: %v", err)
}

fmt.Println("workflow terminated")

err = daprClient.PurgeWorkflowBeta1(ctx, &client.PurgeWorkflowRequest{
InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
WorkflowComponent: workflowComponent,
})

respGet, err = daprClient.GetWorkflowBeta1(ctx, &client.GetWorkflowRequest{
InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
WorkflowComponent: workflowComponent,
})
if err == nil || respGet != nil {
err = wfClient.PurgeWorkflow(ctx, id)
if err != nil {
log.Fatalf("failed to purge workflow: %v", err)
}

fmt.Println("workflow purged")

// WFClient
// TODO: Expand client validation

stage = 0
fmt.Println("workflow client test")

wfClient, err := workflow.NewClient()
if err != nil {
log.Fatalf("[wfclient] faield to initialize: %v", err)
}

id, err := wfClient.ScheduleNewWorkflow(ctx, "TestWorkflow", workflow.WithInstanceID("a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9"), workflow.WithInput(1))
if err != nil {
log.Fatalf("[wfclient] failed to start workflow: %v", err)
}

fmt.Printf("[wfclient] started workflow with id: %s\n", id)

metadata, err := wfClient.FetchWorkflowMetadata(ctx, id)
if err != nil {
log.Fatalf("[wfclient] failed to get worfklow: %v", err)
}

fmt.Printf("[wfclient] workflow status: %v\n", metadata.RuntimeStatus.String())

if stage != 1 {
log.Fatalf("Workflow assertion failed while validating the wfclient. Stage 1 expected, current: %d", stage)
}

fmt.Printf("[wfclient] stage: %d\n", stage)

// TODO: WaitForWorkflowStart
// TODO: WaitForWorkflowCompletion

// raise event

if err := wfClient.RaiseEvent(ctx, id, "testEvent", workflow.WithEventPayload("testData")); err != nil {
log.Fatalf("[wfclient] failed to raise event: %v", err)
}

fmt.Println("[wfclient] event raised")

// Sleep to allow the workflow to advance
time.Sleep(time.Second)

if stage != 2 {
log.Fatalf("Workflow assertion failed while validating the wfclient. Stage 2 expected, current: %d", stage)
}

fmt.Printf("[wfclient] stage: %d\n", stage)

// stop workflow
if err := wfClient.TerminateWorkflow(ctx, id); err != nil {
log.Fatalf("[wfclient] failed to terminate workflow: %v", err)
}

fmt.Println("[wfclient] workflow terminated")

if err := wfClient.PurgeWorkflow(ctx, id); err != nil {
log.Fatalf("[wfclient] failed to purge workflow: %v", err)
}

fmt.Println("[wfclient] workflow purged")

// stop workflow runtime
if err := w.Shutdown(); err != nil {
log.Fatalf("failed to shutdown runtime: %v", err)
Expand Down
10 changes: 9 additions & 1 deletion workflow/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,13 @@ import (
"github.com/microsoft/durabletask-go/api"
"github.com/microsoft/durabletask-go/backend"
durabletaskclient "github.com/microsoft/durabletask-go/client"
"google.golang.org/grpc"

dapr "github.com/dapr/go-sdk/client"
)

type Client struct {
conn *grpc.ClientConn
taskHubClient *durabletaskclient.TaskHubGrpcClient
}

Expand Down Expand Up @@ -143,9 +145,11 @@ func NewClient(opts ...clientOption) (*Client, error) {
return &Client{}, fmt.Errorf("failed to initialise dapr.Client: %v", err)
}

taskHubClient := durabletaskclient.NewTaskHubGrpcClient(daprClient.GrpcClientConn(), backend.DefaultLogger())
conn := daprClient.GrpcClientConn()
taskHubClient := durabletaskclient.NewTaskHubGrpcClient(conn, backend.DefaultLogger())

return &Client{
conn: conn,
taskHubClient: taskHubClient,
}, nil
}
Expand Down Expand Up @@ -241,3 +245,7 @@ func (c *Client) PurgeWorkflow(ctx context.Context, id string, opts ...api.Purge
}
return c.taskHubClient.PurgeOrchestrationState(ctx, api.InstanceID(id), opts...)
}

func (c *Client) Close() {
_ = c.conn.Close()
}

0 comments on commit dd9a2d5

Please sign in to comment.