diff --git a/README.md b/README.md index 978fc27d..d805c3af 100644 --- a/README.md +++ b/README.md @@ -16,17 +16,15 @@ ## Overview -Faas-flow allows you to realize OpenFaaS function composition with ease. By defining a simple pipeline, you can orchestrate multiple functions without having to worry about internals. +Faas-flow allows you to realize OpenFaaS function composition with ease. By defining a simple pipeline, you can orchestrate multiple functions without having to worry about internals ```go -import faasflow "github.com/s8sg/faas-flow" - func Define(flow *faasflow.Workflow, context *faasflow.Context) (err error) { - flow.SyncNode().Apply("yourFunc1").Apply("yourFunc2") + flow.SyncNode().Apply("Func1").Apply("Func2") return } ``` -After building and deploying, it will give you a function that orchestrates calling `yourFunc2` with the output of `yourFunc1` +After building and deploying, it will give you a openfaas function that orchestrates calling `Func2` with the output of `Func1` ## Pipeline Definition @@ -37,85 +35,101 @@ The above pipelines can be achieved with little, but powerfull code: > SYNC Chain ```go func Define(flow *faasflow.Workflow, context *faasflow.Context) (err error) { - flow.SyncNode().Apply("func1").Apply("func2"). - Modify(func(data []byte) ([]byte, error) { - // Do something - return data, nil - }) - return + flow.SyncNode().Apply("func1").Apply("func2"). + Modify(func(data []byte) ([]byte, error) { + // do something + return data, nil + }) + return } ``` > ASYNC Chain ```go func Define(flow *faasflow.Workflow, context *faasflow.Context) (err error) { - dag := flow.Dag() - dag.Node("n1").Apply("func1") - dag.Node("n2").Apply("func2"). - Modify(func(data []byte) ([]byte, error) { - // Do something - return data, nil - }) - dag.Node("n3").Callback("storage.io/bucket?id=3345612358265349126&file=result.dat") - dag.Edge("n1", "n2") - dag.Edge("n2", "n3") - return + dag := flow.Dag() + dag.Node("n1").Apply("func1") + dag.Node("n2").Apply("func2"). + Modify(func(data []byte) ([]byte, error) { + // do something + return data, nil + }) + dag.Node("n3").Callback("http://gateway:8080/function/fake-storage") + dag.Edge("n1", "n2") + dag.Edge("n2", "n3") + return } ``` > PARALLEL Branching ```go func Define(flow *faasflow.Workflow, context *faasflow.Context) (err error) { - dag := flow.Dag() - dag.Node("n1").Modify(func(data []byte) ([]byte, error) { - // do something - return data, nil - }) - dag.Node("n2").Apply("func1") - dag.Node("n3").Apply("func2").Modify(func(data []byte) ([]byte, error) { - // do something - return data, nil - }) - dag.Node("n4").Callback("storage.io/bucket?id=3345612358265349126&file=result") - dag.Edge("n1", "n2") - dag.Edge("n1", "n3") - dag.Edge("n2", "n4") - dag.Edge("n3", "n4") - return + dag := flow.Dag() + dag.Node("n1").Modify(func(data []byte) ([]byte, error) { + // do something + return data, nil + }) + dag.Node("n2").Apply("func1") + dag.Node("n3").Apply("func2").Modify(func(data []byte) ([]byte, error) { + // do something + return data, nil + }) + dag.Node("n4", faasflow.Aggregator(func(data map[string][]byte) ([]byte, error) { + // aggregate branch result data["n2"] and data["n3"] + return []byte(""), nil + })).Callback("http://gateway:8080/function/fake-storage") + + dag.Edge("n1", "n2") + dag.Edge("n1", "n3") + dag.Edge("n2", "n4") + dag.Edge("n3", "n4") + return } ``` > DYNAMIC Branching ```go func Define(flow *faasflow.Workflow, context *faasflow.Context) (err error) { - dag := flow.Dag() - dag.Node("n1").Modify(func(data []byte) ([]byte, error) { - return data, nil - }) - conditionalDags := dag.ConditionalBranch("C", + dag := flow.Dag() + dag.Node("n1").Modify(func(data []byte) ([]byte, error) { + // do something + return data, nil + }) + conditionalDags := dag.ConditionalBranch("C", []string{"c1", "c2"}, // possible conditions - func(response []byte) []string { + func(response []byte) []string { // for each returned condition the corresponding branch will execute // this function executes in the runtime of condition C return []string{"c1", "c2"} }, - ) - conditionalDags["c2"].Node("n1").Apply("func").Modify(func(data []byte) ([]byte, error) { - return data, nil - }) - foreachDag := conditionalDags["c1"].ForEachBranch("F", - func(data []byte) map[string][]byte { - // for each returned key in the hashmap a new branch will be executed + faasflow.Aggregator(func(data map[string][]byte) ([]byte, error) { + // aggregate all dynamic branches results + return []byte(""), nil + }), + ) + + conditionalDags["c2"].Node("n1").Apply("func1").Modify(func(data []byte) ([]byte, error) { + // do something + return data, nil + }) + foreachDag := conditionalDags["c1"].ForEachBranch("F", + func(data []byte) map[string][]byte { + // for each returned key in the hashmap a new branch will be executed // this function executes in the runtime of foreach F - return map[string][]byte{ "f1": data, "f2": data } + return map[string][]byte{"f1": data, "f2": data} }, - ) - foreachDag.Node("n1").Modify(func(data []byte) ([]byte, error) { + faasflow.Aggregator(func(data map[string][]byte) ([]byte, error) { + // aggregate all dynamic branches results + return []byte(""), nil + }), + ) + foreachDag.Node("n1").Modify(func(data []byte) ([]byte, error) { + // do something return data, nil - }) - dag.Node("n2").Callback("storage.io/bucket?id=3345612358265349126&file=result") - dag.Edge("n1", "C") - dag.Edge("C", "n2") - return + }) + dag.Node("n2").Callback("http://gateway:8080/function/fake-storage") + dag.Edge("n1", "C") + dag.Edge("C", "n2") } -``` +``` +Full implementions of the above examples are available [here](https://github.com/s8sg/faasflow-example) ## Faas-flow Design The current design consideration are made based on the below goals > 1. Leverage the openfaas platform @@ -127,42 +141,44 @@ Faas-flow is deployed and provisioned just like any other openfaas function. It ![alt its a function](https://github.com/s8sg/faas-flow/blob/master/doc/design/complete-faas.jpg) #### Adapter pattern for zero intrumenttaion in code -Faas-flow function follow the adapter pattern. Here the adaptee is the functions and the adapter is `faas-flow`. For each node execution, `faas-flow` handle the calls to functions. Once the execution is over, it forwards an event to itself. This way the arrangement logic is seperated from the functions and is implemented in the adapter. Compositions need no code instrumentations, making functions completly independent of the compositions details +Faas-flow function follow the adapter pattern. Here the adaptee is the functions and the adapter is `faas-flow`. For each node execution, `faas-flow` handle the calls to the functions. Once the execution is over, it forwards an event to itself. This way the arrangement logic is seperated from the functions and is implemented in the adapter. Compositions need no code instrumentations, making functions completly independent of the compositions details ![alt function is independent of composition](https://github.com/s8sg/faas-flow/blob/master/doc/design/adapter-pattern.jpg) #### Aggregate pattern as chaining -Aggregatation of seperate function calls are done as chaining. Multiple functions can be called from a single node with order maintained as per the chain. This way one execution node can be implemented as an aggregator function that invokes multiple functions, collects the results, optionally applies business logic, and returns a consolidated response to the client or forward to next nodes. Faas-flow fuses the adapter pattern and aggregate pattern to perform more complex usecases +Aggregatation of seperate function calls are done as chaining. Multiple functions can be called from a single node with order maintained as per the chain. This way one execution node can be implemented as an aggregator function that invokes multiple functions, collects the results, optionally applies business logic, and returns a consolidated response to the client or forward to next nodes. Faas-flow fuses the adapter pattern and aggregate pattern to support more complex usecases ![alt aggregation](https://github.com/s8sg/faas-flow/blob/master/doc/design/aggregate-pattern.jpg) #### Event driven iteration -Openfaas uses [Nats](https://nats.io) for event delivery and faas-flow uses openfaas platform. Node execution in `faas-flow` starts by a completion event of one or more previous nodes. A completion event denotes that all the previous dependent nodes has completed. Event carries the execution state, and identifies the next node to execute. With events faas-flow asynchronously carry-on execution of nodes by iterating itself over and over till all nodes are executed -![alt aggregation](https://github.com/s8sg/faas-flow/blob/master/doc/design/event-driven-iteration.jpg) +Openfaas uses [Nats](https://nats.io) for event delivery and faas-flow leverages openfaas platform. Node execution in `faas-flow` starts by a completion event of one or more previous nodes. A completion event denotes that all the previous dependent nodes have completed. The event carries the execution state and identifies the next node to execute. With events faas-flow asynchronously carry-on execution of nodes by iterating itself over and over till all nodes are executed +![alt iteration](https://github.com/s8sg/faas-flow/blob/master/doc/design/event-driven-iteration.jpg) #### 3rd party KV store for coordination -When executing branches, one node is dependent of multiple predecessor nodes. In that scenario the event for completion is genearted by coordination of earlier nodes. Like any distributed system the coordination are achived via a centralized service. Faas-flow keeps the logic of the coordination controller inside of faas-flow implementation and let user use any external synchronous kv store by implementing [`StateStore`](https://godoc.org/github.com/s8sg/faas-flow#StateStore) +When executing branches, one node is dependent on more than one predecessor nodes. In that scenario, the event for completion is generated by coordination of earlier nodes. Like any distributed system the coordination is achieved via a centralized service. Faas-flow keeps the logic of the coordination controller inside of faas-flow implementation and lets the user use any external synchronous KV store by implementing [`StateStore`](https://godoc.org/github.com/s8sg/faas-flow#StateStore) ![alt coordination](https://github.com/s8sg/faas-flow/blob/master/doc/design/3rd-party-statestore.jpg) #### 3rd party Storage for intermediate data -Results from function execution and intermidiate data can be handled by the user manually. Faas-flow provides data-store for intermediate result storage. It automatically initialize, store, retrive and remove data between nodes. This fits great for data processing applications. Faas-flow keeps the logic of storage controller inside of faas-flow implementation and let user use any external object storage by implementing [`DataStore`](https://godoc.org/github.com/s8sg/faas-flow#DataStore) -![alt coordination](https://github.com/s8sg/faas-flow/blob/master/doc/design/3rd-party-storage.jpg) +Results from function execution and intermediate data can be handled by the user manually. Faas-flow provides data-store for intermediate result storage. It automatically initializes, store, retrieve and remove data between nodes. This fits great for data processing applications. Faas-flow keeps the logic of storage controller inside of Faas-flow implementation and lets the user use any external object storage by implementing [`DataStore`](https://godoc.org/github.com/s8sg/faas-flow#DataStore) +![alt storage](https://github.com/s8sg/faas-flow/blob/master/doc/design/3rd-party-storage.jpg) -Faas-flow design is not fixed and like any good design its evolving. Please contribute to make it better. +Faas-flow design is not fixed and like any good design it is evolving. Please contribute to make it better. ## Getting Started This example implements a very simple flow to `Greet` -#### Get the `faas-flow` template with `faas-cli` +#### Get template +Pull `faas-flow` template with the `faas-cli` ``` faas template pull https://github.com/s8sg/faas-flow ``` -#### Create a new `func` with `faas-flow` template +#### Create new flow function +Create a new function using `faas-flow` template ```bash faas new greet --lang faas-flow ``` -#### Edit `greet.yml` +#### Edit stack Edit function stack file `greet.yml` ```yaml greet: @@ -174,22 +190,22 @@ Edit function stack file `greet.yml` write_timeout: 120 # A value larger than `max` of all execution times of Nodes write_debug: true combine_output: false + workflow_name: "greet" # The name of the flow function, faasflow use this to forward completion event environment_file: - flow.yml ``` -#### Add `flow.yml` +#### Add configuration Add a seperate file `flow.yml` with faas-flow related configuration. ```yaml environment: - workflow_name: "greet" # The name of the flow function, faasflow use this to forward completion event gateway: "gateway:8080" # The address of openfaas gateway, faasflow use this to forward completion event - # gateway: "gateway.openfaas:8080" # For K8 + # gateway: "gateway.openfaas:8080" # For K8s enable_tracing: false # tracing allow to trace internal node execution with opentracing - enable_hmac: false # hmac adds extra layer of security by validating the event source + enable_hmac: true # hmac adds extra layer of security by validating the event source ``` -#### Edit Defnition +#### Edit function defnition Edit `greet/handler.go` and Update `Define()` ```go func Define(flow *faasflow.Workflow, context *faasflow.Context) (err error) { @@ -212,8 +228,8 @@ faas deploy > ``` > Modify("name") -> Hello name > ``` -All calls will be performed in one single execution of the function, and result will be returned to the callee -> Note: For flow that has more than one nodes, faasflow doesn't return any response. External storage or `callback` can be used to retrive async result +All calls will be performed in one single execution of the flow function and result will be returned to the callee +> Note: For flow that has more than one nodes, faas-flow doesn't return any response. External storage or `callback` can be used to retrive async result #### Invoke ``` @@ -221,17 +237,17 @@ echo "Adam" | faas invoke greet ``` ## Request Tracking by ID -For each new request faas-flow generates a unique `RequestId` for the flow. The same Id is used when logging +For each new request faas-flow generates a unique `Request Id` for the flow. The same Id is used when logging ```bash -2018/08/13 07:51:59 [request `bdojh7oi7u6bl8te4r0g`] Created +2018/08/13 07:51:59 [Request `bdojh7oi7u6bl8te4r0g`] Created 2018/08/13 07:52:03 [Request `bdojh7oi7u6bl8te4r0g`] Received ``` -The assigned requestId can be retrived from faas-flow response, `X-Faas-Flow-Reqid` header +The assigned requestId is set on the response header `X-Faas-Flow-Reqid` ## Request Tracing by Open-Tracing -Request tracing can be enabled by providing the `trace_server` and enabling tracing. Tracing is the best way to monitor flows and execution status of each nodes for each requests +Request tracing can be retrived from `trace_server` once enabled. Tracing is the best way to monitor flows and execution status of each nodes for each requests #### Edit `flow.yml` Enable tracing and add trace server as: @@ -324,26 +340,10 @@ func DefineStateStore() (faasflow.StateStore, error) { } ``` +#### Available state-stores: * **[ConsulStateStore](https://github.com/s8sg/faas-flow-consul-statestore)** statestore implementation with **consul** * **[EtcdStateStore](https://github.com/s8sg/faas-flow-etcd-statestore)** statewtore implementation with **etcd** - -### Geting Http Query to Workflow: -Http Query to flow can be used from context as -```go - flow.SyncNode().Apply("myfunc", - Query("auth-token", context.Query.Get("token"))). // pass as a function query - Modify(func(data []byte) { - token = context.Query.Get("token") // get query inside modifier - }) -``` - -### Other from context: -Node, requestId, State is provided by the `context` -#### Available state-store: -1. **[ConsulStateStore](https://github.com/s8sg/faas-flow-consul-statestore)** statestore implementation with **consul** -2. **[EtcdStateStore](https://github.com/s8sg/faas-flow-etcd-statestore)** statewtore implementation with **etcd** - ## External `DataStore` for storage controller Faas-flow uses the `DataStore` to store partially completed data between nodes and request context data. Faas-flow implements storage controller to handle storage that allows user to use any external object store. User can define custom data-store with `DataStore` interface. ```go @@ -373,12 +373,12 @@ func DefineDataStore() (faasflow.DataStore, error) { } ``` -#### Available data-store: -1. **[MinioDataStore](https://github.com/s8sg/faas-flow-minio-datastore)** allows to store data in **amazon s3** or local **minio DB** +#### Available data-stores: +* **[MinioDataStore](https://github.com/s8sg/faas-flow-minio-datastore)** allows to store data in **amazon s3** or local **minio DB** ## Cleanup with `Finally()` -Finally provides a efficient way to perform post execution steps of the flow. If specified `Finally()` invokes in case of both failure and success. A Finally method can be set as: +Finally provides an efficient way to perform post execution steps of the flow. If specified `Finally()` invokes in case of both failure and success of the flow. A Finally method can be set as: ```go func Define(flow *faasflow.Workflow, context *faasflow.Context) (err error) { // Define flow diff --git a/sdk/pipeline.go b/sdk/pipeline.go index 7d2eda15..5e51212d 100644 --- a/sdk/pipeline.go +++ b/sdk/pipeline.go @@ -62,6 +62,35 @@ func (pipeline *Pipeline) GetInitialNodeId() string { return "0" } +// GetNodeExecutionUniqueId provide a ID that is unique in an execution +func (pipeline *Pipeline) GetNodeExecutionUniqueId(node *Node) string { + depth := 0 + dag := pipeline.Dag + depthStr := "" + optionStr := "" + for depth < pipeline.ExecutionDepth { + depthStr = fmt.Sprintf("%d", depth) + node := dag.GetNode(pipeline.ExecutionPosition[depthStr]) + option := pipeline.CurrentDynamicOption[node.GetUniqueId()] + if node.subDag != nil { + dag = node.subDag + } else { + dag = node.conditionalDags[option] + } + if optionStr == "" { + optionStr = option + } else { + optionStr = option + "--" + optionStr + } + + depth++ + } + if optionStr == "" { + return node.GetUniqueId() + } + return optionStr + "--" + node.GetUniqueId() +} + // GetCurrentNodeDag returns the current node and current dag based on execution position func (pipeline *Pipeline) GetCurrentNodeDag() (*Node, *Dag) { depth := 0 @@ -70,10 +99,10 @@ func (pipeline *Pipeline) GetCurrentNodeDag() (*Node, *Dag) { for depth < pipeline.ExecutionDepth { depthStr = fmt.Sprintf("%d", depth) node := dag.GetNode(pipeline.ExecutionPosition[depthStr]) + option := pipeline.CurrentDynamicOption[node.GetUniqueId()] if node.subDag != nil { dag = node.subDag } else { - option := pipeline.CurrentDynamicOption[node.GetUniqueId()] dag = node.conditionalDags[option] } depth++ diff --git a/template/faas-flow/handler.go b/template/faas-flow/handler.go index fb304814..a7600be9 100644 --- a/template/faas-flow/handler.go +++ b/template/faas-flow/handler.go @@ -66,16 +66,6 @@ func (fhandler *flowHandler) GetRequestState() (bool, error) { return false, err } -func (fhandler *flowHandler) InitVertexIndegreeCounter(vertexs []string) error { - for _, vertex := range vertexs { - err := fhandler.stateStore.Set(vertex, "0") - if err != nil { - return fmt.Errorf("failed to create counter for vertex %s, error %v", vertex, err) - } - } - return nil -} - func (fhandler *flowHandler) SetDynamicBranchOptions(nodeUniqueId string, options []string) error { encoded, err := json.Marshal(options) if err != nil { @@ -348,7 +338,7 @@ func buildWorkflow(data []byte, queryString string, // Generate request Id requestId = xid.New().String() - fmt.Printf("[Request `%s`] Created", requestId) + fmt.Printf("[Request `%s`] Created\n", requestId) // create flow properties dataStore := createDataStore() @@ -470,34 +460,39 @@ func executeCallback(pipeline *sdk.Pipeline, operation *sdk.Operation, data []by } -// execute executes a node on a faas-flow dag -func execute(fhandler *flowHandler, request []byte) ([]byte, error) { - var result []byte - var err error - - pipeline := fhandler.getPipeline() - - currentNode, currentDag := pipeline.GetCurrentNodeDag() +// findCurrentNodeToExecute find right node to execute based on state +func findCurrentNodeToExecute(fhandler *flowHandler) { + currentNode, currentDag := fhandler.getPipeline().GetCurrentNodeDag() fmt.Printf("[Request `%s`] Executing node %s\n", fhandler.id, currentNode.GetUniqueId()) // recurse to the subdag - if a node is dynamic stop to evaluate it for true { + // break if request is dynamic if currentNode.Dynamic() { - // trace node - mark as start of node - fhandler.tracer.startNodeSpan(currentNode.GetUniqueId(), - fhandler.id) - return request, nil + return } subdag := currentNode.SubDag() if subdag == nil { break } + // trace node - mark as start of the parent node + fhandler.tracer.startNodeSpan(currentNode.GetUniqueId(), fhandler.id) currentDag = subdag currentNode = currentDag.GetInitialNode() fmt.Printf("[Request `%s`] Executing node %s\n", fhandler.id, currentNode.GetUniqueId()) - pipeline.UpdatePipelineExecutionPosition(sdk.DEPTH_INCREMENT, currentNode.Id) + fhandler.getPipeline().UpdatePipelineExecutionPosition(sdk.DEPTH_INCREMENT, currentNode.Id) } +} + +// execute executes a node on a faas-flow dag +func execute(fhandler *flowHandler, request []byte) ([]byte, error) { + var result []byte + var err error + + pipeline := fhandler.getPipeline() + + currentNode, _ := pipeline.GetCurrentNodeDag() // trace node - mark as start of node fhandler.tracer.startNodeSpan(currentNode.GetUniqueId(), fhandler.id) @@ -624,12 +619,16 @@ func forwardAsync(fhandler *flowHandler, currentNodeId string, result []byte) ([ return resdata, nil } -func handleDynamicNode(fhandler *flowHandler, context *faasflow.Context, result []byte) ([]byte, error) { +func executeDynamic(fhandler *flowHandler, context *faasflow.Context, result []byte) ([]byte, error) { // get pipeline pipeline := fhandler.getPipeline() currentNode, _ := pipeline.GetCurrentNodeDag() + // trace node - mark as start of the dynamic node + fhandler.tracer.startNodeSpan(currentNode.GetUniqueId(), + fhandler.id) + currentNodeUniqueId := currentNode.GetUniqueId() defer fhandler.tracer.stopNodeSpan(currentNodeUniqueId) @@ -643,10 +642,9 @@ func handleDynamicNode(fhandler *flowHandler, context *faasflow.Context, result condition := currentNode.GetCondition() foreach := currentNode.GetForEach() - branchCount := 0 - switch { case condition != nil: + fmt.Printf("[Request `%s`] Executing condition\n", fhandler.id) conditions := condition(result) if conditions == nil { panic(fmt.Sprintf("Condition function at %s returned nil, failed to proceed", @@ -664,9 +662,9 @@ func handleDynamicNode(fhandler *flowHandler, context *faasflow.Context, result } subresults[conditionKey] = result options = append(options, conditionKey) - branchCount = branchCount + 1 } case foreach != nil: + fmt.Printf("[Request `%s`] Executing foreach\n", fhandler.id) foreachResults := foreach(result) if foreachResults == nil { panic(fmt.Sprintf("Foreach function at %s returned nil, failed to proceed", @@ -680,55 +678,53 @@ func handleDynamicNode(fhandler *flowHandler, context *faasflow.Context, result subdags[foreachKey] = currentNode.SubDag() subresults[foreachKey] = foreachResult options = append(options, foreachKey) - branchCount = branchCount + 1 } } + branchCount := len(options) if branchCount == 0 { return nil, fmt.Errorf("[Request `%s`] Dynamic Node %s, failed to execute as condition/foreach returned no option", fhandler.id, currentNodeUniqueId) } - // Increment/Set the no of dynamic branch count for the next nodes - for _, cnode := range currentNode.Children() { - indegree, err := fhandler.IncrementCounter( - cnode.GetUniqueId()+"-dynamic-indegree", branchCount-1) - if err != nil { - return nil, fmt.Errorf("[Request `%s`] Failed to store/increment dynamic indegree count of %s, err %v", - fhandler.id, cnode.GetUniqueId(), err) - } - fmt.Printf("[Request `%s`] Dynamic indegree count for %s set to %d\n", - fhandler.id, cnode.GetUniqueId(), indegree) + // Set the no of branch completion for the current dynamic node + key := pipeline.GetNodeExecutionUniqueId(currentNode) + "-branch-completion" + _, err := fhandler.IncrementCounter(key, 0) + if err != nil { + return nil, fmt.Errorf("[Request `%s`] Failed to initiate dynamic indegree count for %s, err %v", + fhandler.id, key, err) } + fmt.Printf("[Request `%s`] Dynamic indegree count initiated as %s\n", + fhandler.id, key) + // Set all the dynamic options for the current dynamic node - err := fhandler.SetDynamicBranchOptions(currentNodeUniqueId+"-dynamic-branch-options", options) + key = pipeline.GetNodeExecutionUniqueId(currentNode) + "-dynamic-branch-options" + err = fhandler.SetDynamicBranchOptions(key, options) if err != nil { return nil, fmt.Errorf("[Request `%s`] Dynamic Node %s, failed to store dynamic options", fhandler.id, currentNodeUniqueId) } - for dynamicKey, subdag := range subdags { + fmt.Printf("[Request `%s`] Dynamic options initiated as %s\n", + fhandler.id, key) - subNode := subdag.GetInitialNode() - intermediateData := subresults[dynamicKey] + for option, subdag := range subdags { - err := fhandler.InitVertexIndegreeCounter(subdag.GetNodes(dynamicKey)) - if err != nil { - return nil, fmt.Errorf("[Request `%s`] Dynamic DAG %s key %s state can not be initiated at StateStore, %v", - fhandler.id, subdag.Id, dynamicKey, err) - } + subNode := subdag.GetInitialNode() + intermediateData := subresults[option] // If forwarder is not nil its not an execution flow if currentNode.GetForwarder("dynamic") != nil { - //