Skip to content

Commit

Permalink
Merge pull request #63 from s8sg/dynamic-braching-fix
Browse files Browse the repository at this point in the history
Add fix for dynamic branch issue
  • Loading branch information
s8sg authored Jul 21, 2019
2 parents 201ba99 + 7947b02 commit a511c01
Show file tree
Hide file tree
Showing 4 changed files with 475 additions and 345 deletions.
204 changes: 102 additions & 102 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,15 @@

## Overview

Faas-flow allows you to realize OpenFaaS function composition with ease. By defining a simple pipeline, you can orchestrate multiple functions without having to worry about internals.
Faas-flow allows you to realize OpenFaaS function composition with ease. By defining a simple pipeline, you can orchestrate multiple functions without having to worry about internals

```go
import faasflow "github.com/s8sg/faas-flow"

func Define(flow *faasflow.Workflow, context *faasflow.Context) (err error) {
flow.SyncNode().Apply("yourFunc1").Apply("yourFunc2")
flow.SyncNode().Apply("Func1").Apply("Func2")
return
}
```
After building and deploying, it will give you a function that orchestrates calling `yourFunc2` with the output of `yourFunc1`
After building and deploying, it will give you a openfaas function that orchestrates calling `Func2` with the output of `Func1`

## Pipeline Definition
Expand All @@ -37,85 +35,101 @@ The above pipelines can be achieved with little, but powerfull code:
> SYNC Chain
```go
func Define(flow *faasflow.Workflow, context *faasflow.Context) (err error) {
flow.SyncNode().Apply("func1").Apply("func2").
Modify(func(data []byte) ([]byte, error) {
// Do something
return data, nil
})
return
flow.SyncNode().Apply("func1").Apply("func2").
Modify(func(data []byte) ([]byte, error) {
// do something
return data, nil
})
return
}
```
> ASYNC Chain
```go
func Define(flow *faasflow.Workflow, context *faasflow.Context) (err error) {
dag := flow.Dag()
dag.Node("n1").Apply("func1")
dag.Node("n2").Apply("func2").
Modify(func(data []byte) ([]byte, error) {
// Do something
return data, nil
})
dag.Node("n3").Callback("storage.io/bucket?id=3345612358265349126&file=result.dat")
dag.Edge("n1", "n2")
dag.Edge("n2", "n3")
return
dag := flow.Dag()
dag.Node("n1").Apply("func1")
dag.Node("n2").Apply("func2").
Modify(func(data []byte) ([]byte, error) {
// do something
return data, nil
})
dag.Node("n3").Callback("http://gateway:8080/function/fake-storage")
dag.Edge("n1", "n2")
dag.Edge("n2", "n3")
return
}
```
> PARALLEL Branching
```go
func Define(flow *faasflow.Workflow, context *faasflow.Context) (err error) {
dag := flow.Dag()
dag.Node("n1").Modify(func(data []byte) ([]byte, error) {
// do something
return data, nil
})
dag.Node("n2").Apply("func1")
dag.Node("n3").Apply("func2").Modify(func(data []byte) ([]byte, error) {
// do something
return data, nil
})
dag.Node("n4").Callback("storage.io/bucket?id=3345612358265349126&file=result")
dag.Edge("n1", "n2")
dag.Edge("n1", "n3")
dag.Edge("n2", "n4")
dag.Edge("n3", "n4")
return
dag := flow.Dag()
dag.Node("n1").Modify(func(data []byte) ([]byte, error) {
// do something
return data, nil
})
dag.Node("n2").Apply("func1")
dag.Node("n3").Apply("func2").Modify(func(data []byte) ([]byte, error) {
// do something
return data, nil
})
dag.Node("n4", faasflow.Aggregator(func(data map[string][]byte) ([]byte, error) {
// aggregate branch result data["n2"] and data["n3"]
return []byte(""), nil
})).Callback("http://gateway:8080/function/fake-storage")

dag.Edge("n1", "n2")
dag.Edge("n1", "n3")
dag.Edge("n2", "n4")
dag.Edge("n3", "n4")
return
}
```
> DYNAMIC Branching
```go
func Define(flow *faasflow.Workflow, context *faasflow.Context) (err error) {
dag := flow.Dag()
dag.Node("n1").Modify(func(data []byte) ([]byte, error) {
return data, nil
})
conditionalDags := dag.ConditionalBranch("C",
dag := flow.Dag()
dag.Node("n1").Modify(func(data []byte) ([]byte, error) {
// do something
return data, nil
})
conditionalDags := dag.ConditionalBranch("C",
[]string{"c1", "c2"}, // possible conditions
func(response []byte) []string {
func(response []byte) []string {
// for each returned condition the corresponding branch will execute
// this function executes in the runtime of condition C
return []string{"c1", "c2"}
},
)
conditionalDags["c2"].Node("n1").Apply("func").Modify(func(data []byte) ([]byte, error) {
return data, nil
})
foreachDag := conditionalDags["c1"].ForEachBranch("F",
func(data []byte) map[string][]byte {
// for each returned key in the hashmap a new branch will be executed
faasflow.Aggregator(func(data map[string][]byte) ([]byte, error) {
// aggregate all dynamic branches results
return []byte(""), nil
}),
)

conditionalDags["c2"].Node("n1").Apply("func1").Modify(func(data []byte) ([]byte, error) {
// do something
return data, nil
})
foreachDag := conditionalDags["c1"].ForEachBranch("F",
func(data []byte) map[string][]byte {
// for each returned key in the hashmap a new branch will be executed
// this function executes in the runtime of foreach F
return map[string][]byte{ "f1": data, "f2": data }
return map[string][]byte{"f1": data, "f2": data}
},
)
foreachDag.Node("n1").Modify(func(data []byte) ([]byte, error) {
faasflow.Aggregator(func(data map[string][]byte) ([]byte, error) {
// aggregate all dynamic branches results
return []byte(""), nil
}),
)
foreachDag.Node("n1").Modify(func(data []byte) ([]byte, error) {
// do something
return data, nil
})
dag.Node("n2").Callback("storage.io/bucket?id=3345612358265349126&file=result")
dag.Edge("n1", "C")
dag.Edge("C", "n2")
return
})
dag.Node("n2").Callback("http://gateway:8080/function/fake-storage")
dag.Edge("n1", "C")
dag.Edge("C", "n2")
}
```
```
Full implementions of the above examples are available [here](https://github.com/s8sg/faasflow-example)
## Faas-flow Design
The current design consideration are made based on the below goals
> 1. Leverage the openfaas platform
Expand All @@ -127,42 +141,44 @@ Faas-flow is deployed and provisioned just like any other openfaas function. It
![alt its a function](https://github.com/s8sg/faas-flow/blob/master/doc/design/complete-faas.jpg)

#### Adapter pattern for zero intrumenttaion in code
Faas-flow function follow the adapter pattern. Here the adaptee is the functions and the adapter is `faas-flow`. For each node execution, `faas-flow` handle the calls to functions. Once the execution is over, it forwards an event to itself. This way the arrangement logic is seperated from the functions and is implemented in the adapter. Compositions need no code instrumentations, making functions completly independent of the compositions details
Faas-flow function follow the adapter pattern. Here the adaptee is the functions and the adapter is `faas-flow`. For each node execution, `faas-flow` handle the calls to the functions. Once the execution is over, it forwards an event to itself. This way the arrangement logic is seperated from the functions and is implemented in the adapter. Compositions need no code instrumentations, making functions completly independent of the compositions details
![alt function is independent of composition](https://github.com/s8sg/faas-flow/blob/master/doc/design/adapter-pattern.jpg)

#### Aggregate pattern as chaining
Aggregatation of seperate function calls are done as chaining. Multiple functions can be called from a single node with order maintained as per the chain. This way one execution node can be implemented as an aggregator function that invokes multiple functions, collects the results, optionally applies business logic, and returns a consolidated response to the client or forward to next nodes. Faas-flow fuses the adapter pattern and aggregate pattern to perform more complex usecases
Aggregatation of seperate function calls are done as chaining. Multiple functions can be called from a single node with order maintained as per the chain. This way one execution node can be implemented as an aggregator function that invokes multiple functions, collects the results, optionally applies business logic, and returns a consolidated response to the client or forward to next nodes. Faas-flow fuses the adapter pattern and aggregate pattern to support more complex usecases
![alt aggregation](https://github.com/s8sg/faas-flow/blob/master/doc/design/aggregate-pattern.jpg)

#### Event driven iteration
Openfaas uses [Nats](https://nats.io) for event delivery and faas-flow uses openfaas platform. Node execution in `faas-flow` starts by a completion event of one or more previous nodes. A completion event denotes that all the previous dependent nodes has completed. Event carries the execution state, and identifies the next node to execute. With events faas-flow asynchronously carry-on execution of nodes by iterating itself over and over till all nodes are executed
![alt aggregation](https://github.com/s8sg/faas-flow/blob/master/doc/design/event-driven-iteration.jpg)
Openfaas uses [Nats](https://nats.io) for event delivery and faas-flow leverages openfaas platform. Node execution in `faas-flow` starts by a completion event of one or more previous nodes. A completion event denotes that all the previous dependent nodes have completed. The event carries the execution state and identifies the next node to execute. With events faas-flow asynchronously carry-on execution of nodes by iterating itself over and over till all nodes are executed
![alt iteration](https://github.com/s8sg/faas-flow/blob/master/doc/design/event-driven-iteration.jpg)

#### 3rd party KV store for coordination
When executing branches, one node is dependent of multiple predecessor nodes. In that scenario the event for completion is genearted by coordination of earlier nodes. Like any distributed system the coordination are achived via a centralized service. Faas-flow keeps the logic of the coordination controller inside of faas-flow implementation and let user use any external synchronous kv store by implementing [`StateStore`](https://godoc.org/github.com/s8sg/faas-flow#StateStore)
When executing branches, one node is dependent on more than one predecessor nodes. In that scenario, the event for completion is generated by coordination of earlier nodes. Like any distributed system the coordination is achieved via a centralized service. Faas-flow keeps the logic of the coordination controller inside of faas-flow implementation and lets the user use any external synchronous KV store by implementing [`StateStore`](https://godoc.org/github.com/s8sg/faas-flow#StateStore)
![alt coordination](https://github.com/s8sg/faas-flow/blob/master/doc/design/3rd-party-statestore.jpg)

#### 3rd party Storage for intermediate data
Results from function execution and intermidiate data can be handled by the user manually. Faas-flow provides data-store for intermediate result storage. It automatically initialize, store, retrive and remove data between nodes. This fits great for data processing applications. Faas-flow keeps the logic of storage controller inside of faas-flow implementation and let user use any external object storage by implementing [`DataStore`](https://godoc.org/github.com/s8sg/faas-flow#DataStore)
![alt coordination](https://github.com/s8sg/faas-flow/blob/master/doc/design/3rd-party-storage.jpg)
Results from function execution and intermediate data can be handled by the user manually. Faas-flow provides data-store for intermediate result storage. It automatically initializes, store, retrieve and remove data between nodes. This fits great for data processing applications. Faas-flow keeps the logic of storage controller inside of Faas-flow implementation and lets the user use any external object storage by implementing [`DataStore`](https://godoc.org/github.com/s8sg/faas-flow#DataStore)
![alt storage](https://github.com/s8sg/faas-flow/blob/master/doc/design/3rd-party-storage.jpg)


Faas-flow design is not fixed and like any good design its evolving. Please contribute to make it better.
Faas-flow design is not fixed and like any good design it is evolving. Please contribute to make it better.

## Getting Started
This example implements a very simple flow to `Greet`

#### Get the `faas-flow` template with `faas-cli`
#### Get template
Pull `faas-flow` template with the `faas-cli`
```
faas template pull https://github.com/s8sg/faas-flow
```

#### Create a new `func` with `faas-flow` template
#### Create new flow function
Create a new function using `faas-flow` template
```bash
faas new greet --lang faas-flow
```

#### Edit `greet.yml`
#### Edit stack
Edit function stack file `greet.yml`
```yaml
greet:
Expand All @@ -174,22 +190,22 @@ Edit function stack file `greet.yml`
write_timeout: 120 # A value larger than `max` of all execution times of Nodes
write_debug: true
combine_output: false
workflow_name: "greet" # The name of the flow function, faasflow use this to forward completion event
environment_file:
- flow.yml
```
#### Add `flow.yml`
#### Add configuration
Add a seperate file `flow.yml` with faas-flow related configuration.
```yaml
environment:
workflow_name: "greet" # The name of the flow function, faasflow use this to forward completion event
gateway: "gateway:8080" # The address of openfaas gateway, faasflow use this to forward completion event
# gateway: "gateway.openfaas:8080" # For K8
# gateway: "gateway.openfaas:8080" # For K8s
enable_tracing: false # tracing allow to trace internal node execution with opentracing
enable_hmac: false # hmac adds extra layer of security by validating the event source
enable_hmac: true # hmac adds extra layer of security by validating the event source
```

#### Edit Defnition
#### Edit function defnition
Edit `greet/handler.go` and Update `Define()`
```go
func Define(flow *faasflow.Workflow, context *faasflow.Context) (err error) {
Expand All @@ -212,26 +228,26 @@ faas deploy
> ```
> Modify("name") -> Hello name
> ```
All calls will be performed in one single execution of the function, and result will be returned to the callee
> Note: For flow that has more than one nodes, faasflow doesn't return any response. External storage or `callback` can be used to retrive async result
All calls will be performed in one single execution of the flow function and result will be returned to the callee
> Note: For flow that has more than one nodes, faas-flow doesn't return any response. External storage or `callback` can be used to retrive async result

#### Invoke
```
echo "Adam" | faas invoke greet
```
## Request Tracking by ID
For each new request faas-flow generates a unique `RequestId` for the flow. The same Id is used when logging
For each new request faas-flow generates a unique `Request Id` for the flow. The same Id is used when logging
```bash
2018/08/13 07:51:59 [request `bdojh7oi7u6bl8te4r0g`] Created
2018/08/13 07:51:59 [Request `bdojh7oi7u6bl8te4r0g`] Created
2018/08/13 07:52:03 [Request `bdojh7oi7u6bl8te4r0g`] Received
```
The assigned requestId can be retrived from faas-flow response, `X-Faas-Flow-Reqid` header
The assigned requestId is set on the response header `X-Faas-Flow-Reqid`


## Request Tracing by Open-Tracing

Request tracing can be enabled by providing the `trace_server` and enabling tracing. Tracing is the best way to monitor flows and execution status of each nodes for each requests
Request tracing can be retrived from `trace_server` once enabled. Tracing is the best way to monitor flows and execution status of each nodes for each requests

#### Edit `flow.yml`
Enable tracing and add trace server as:
Expand Down Expand Up @@ -324,26 +340,10 @@ func DefineStateStore() (faasflow.StateStore, error) {
}
```

#### Available state-stores:
* **[ConsulStateStore](https://github.com/s8sg/faas-flow-consul-statestore)** statestore implementation with **consul**
* **[EtcdStateStore](https://github.com/s8sg/faas-flow-etcd-statestore)** statewtore implementation with **etcd**


### Geting Http Query to Workflow:
Http Query to flow can be used from context as
```go
flow.SyncNode().Apply("myfunc",
Query("auth-token", context.Query.Get("token"))). // pass as a function query
Modify(func(data []byte) {
token = context.Query.Get("token") // get query inside modifier
})
```

### Other from context:
Node, requestId, State is provided by the `context`
#### Available state-store:
1. **[ConsulStateStore](https://github.com/s8sg/faas-flow-consul-statestore)** statestore implementation with **consul**
2. **[EtcdStateStore](https://github.com/s8sg/faas-flow-etcd-statestore)** statewtore implementation with **etcd**

## External `DataStore` for storage controller
Faas-flow uses the `DataStore` to store partially completed data between nodes and request context data. Faas-flow implements storage controller to handle storage that allows user to use any external object store. User can define custom data-store with `DataStore` interface.
```go
Expand Down Expand Up @@ -373,12 +373,12 @@ func DefineDataStore() (faasflow.DataStore, error) {
}
```

#### Available data-store:
1. **[MinioDataStore](https://github.com/s8sg/faas-flow-minio-datastore)** allows to store data in **amazon s3** or local **minio DB**
#### Available data-stores:
* **[MinioDataStore](https://github.com/s8sg/faas-flow-minio-datastore)** allows to store data in **amazon s3** or local **minio DB**


## Cleanup with `Finally()`
Finally provides a efficient way to perform post execution steps of the flow. If specified `Finally()` invokes in case of both failure and success. A Finally method can be set as:
Finally provides an efficient way to perform post execution steps of the flow. If specified `Finally()` invokes in case of both failure and success of the flow. A Finally method can be set as:
```go
func Define(flow *faasflow.Workflow, context *faasflow.Context) (err error) {
// Define flow
Expand Down
Loading

0 comments on commit a511c01

Please sign in to comment.