Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/s8sg/faasflow
Browse files Browse the repository at this point in the history
  • Loading branch information
s8sg committed Nov 27, 2018
2 parents 9f395cb + 3ce85b4 commit 8c44197
Showing 1 changed file with 156 additions and 60 deletions.
216 changes: 156 additions & 60 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
> - [x] **Pure**              **`FaaS`** with **`openfaas`**
> - [x] **Fast**               build with **`go`**
> - [x] **Secured**        with **`HMAC`**
> - [x] **Stateless**      by **`design`** (with optional 3rd party integration)
> - [x] **Stateless**      by **`design`** (DAG needs external `StateStore` and `DataStore`)
> - [x] **Tracing**         with **`open-tracing`**
> - [x] **Available**       as **`faas-flow`** template
Expand All @@ -34,41 +34,89 @@ By supplying a number of pipeline operators, complex compostion can be achieved
![alt overview](https://github.com/s8sg/faas-flow/blob/master/doc/overview.jpg)

The above pipeline can be achieved with little, but powerfull code:

> SYNC-Call
```go
func Define(flow *faasflow.Workflow, context *faasflow.Context) (err error) {
flow.Apply("func1", faasflow.Sync).
Apply("func2", faasflow.Sync).
Modify(func(data []byte) ([]byte, error) {
// Do something
return data, nil
})
return nil
}
```
> AYNC-Call
```go
func Define(flow *faasflow.Workflow, context *faasflow.Context) (err error) {

// use any 3rd party to maintain state
context.SetDataStore(myMinioStatemanager)

flow.Modify(func(data []byte) ([]byte, error) {
// Set value in context with DataStore
context.Set("raw-image", data)
return data
}).
Apply("facedetect", Header("method","post")).
flow.
Apply("func1").
Apply("func2").
Modify(func(data []byte) ([]byte, error) {
// perform check
// ...
// and replay data
data, _ := context.GetBytes("raw-image")
// do modification if needed
return data
// Do something
return data
}).
Apply("compress", Header("method","post")).
Apply("colorify", Header("method","post")).
Callback("storage.io/bucket?id=3345612358265349126").
Callback("storage.io/bucket?id=3345612358265349126&file=" + context.Query.Get("filename")).
OnFailure(func(err error) {
// failure handler
}).
Finally(func(state string) {
// success - state.State = StateSuccess
// failure - state.State = StateFailure
// cleanup code
context.del("raw-image")
})

return nil
}
```
> DAG-Call
```go
func Define(flow *faasflow.Workflow, context *faasflow.Context) (err error) {

dag := faasflow.CreateDag()
dag.CreateModifierVertex("mod1", func(data []byte) ([]byte, error) {
// do something
return data, nil
})
dag.CreateFunctionVertex("func1", "function_1_name")
dag.CreateFunctionVertex("func2", "function_2_name")
dag.CreateModifierVertex("mod2", func(data []byte) ([]byte, error) {
// do something
return data, nil
})
dag.CreateCallbackVertex("callback",
"storage.io/bucket?id=3345612358265349126&file=" + context.Query.Get("filename"),
faasflow.Serializer(func(inputs map[string][]byte) ([]byte, error) {
mod2Data := inputs["mod2"]
func2Data := inputs["func2"]
// Serialize input for callback
return data, nil
}))


dag.AddEdge("mod1", "func1")
dag.AddEdge("mod1", "func2")
dag.AddEdge("func1", "mod2")
dag.AddEdge("func2", "callback")
dag.AddEdge("mod2", "callback")

flow.ExecuteDag(dag)

return nil
}

func DefineStateStore() (faasflow.StateStore, error) {
// use consul StateStore
consulss, err := consulStateStore.GetConsulStateStore()
return consulss, err
}

func DefineDataStore() (faasflow.DataStore, error) {
// use minio DataStore
miniods, err := minioDataStore.InitFromEnv()
return miniods, err
}
```


## Sync or Async

Expand All @@ -77,11 +125,15 @@ Faasflow supports sync and async function call. By default all call are async. T
flow.Apply("function", faasflow.Sync)
```

**One or more `Async` function call results a pipeline to have multiple phases**
![alt single phase](https://github.com/s8sg/faas-flow/blob/master/doc/asynccall.jpg)

**If all calls are `Sync`, pipeline will have one phase and return the result to the caller**
![alt multi phase](https://github.com/s8sg/faas-flow/blob/master/doc/synccall.jpg)
**If all calls are `Sync`, pipeline will have one Node (Vertex) and return the result to the caller**
![alt single node](https://github.com/s8sg/faas-flow/blob/master/doc/synccall.jpg)

**One or more `Async` function call results a pipeline to have multiple Nodes (Vertex) as a `chain`**
![alt multi node](https://github.com/s8sg/faas-flow/blob/master/doc/asynccall.jpg)

**If pipeline is created as a `dag`, the pipeline will have multiple Nodes(Vertex)**
![alt multi node dag](https://github.com/s8sg/faas-flow/blob/master/doc/asyncdag.jpg)


| Acronyms | description |
Expand All @@ -92,7 +144,7 @@ Faasflow supports sync and async function call. By default all call are async. T
| Callback | A URL that will be called with the final/partial result. `flow.Callback(url)` |
| Handler | A Failure handler registered as `flow.OnFailure(func(err error){})`. If registered it is called if an error occured |
| Finally | A Cleanup handler registered as `flow.Finally(func(){})`. If registered it is called at the end if state is `StateFailure` otherwise `StateSuccess` |
| Phase | Segment of a pipeline definiton which consist of one or more call to `Function` in Sync, `Modifier` or `Callback`. A pipeline definition has one or more phases. Async call `Apply()` results in a new phase. |
| Node | A vertex that represent a segment of a pipeline definiton which consist of one or more call to `Operation`. A pipeline definition has one or more nodes. Async call results in a new node in a chain. A dag is a composition of multiple nodes |
| Context | Request context has the state of request. It abstracts the `StateHandler` and provide API to manage state of the request. Interface `StateHandler{}` can be set by user to use 3rd party storage to manage state. |

## Internal
Expand All @@ -103,9 +155,9 @@ Faasflow runs four major steps to define and run the pipeline
| Step | description |
| ---- | ----- |
| Build Workflow | Identify a request and build a flow. A incoming request could be a partially finished pipeline or a fresh raw request. For a partial request `faas-flow` parse and understand the state of the pipeline from the incoming request |
| Get Definition | FaasWorkflow create simple **pipeline-definition** with one or multiple phases based on the flow defined at `Define()` function in `handler.go`. A **pipeline-definition** consist of multiple `phases`. Each `Phase` includes one or more `Function Call`, `Modifier` or `Callback`. Always a single `phase` is executed in a single invokation of the flow. A same flow always outputs to same pipeline definition, which allows `faas-flow` to be completly `stateless`|
| Execute | Execute executes a `Phase` by calling the `Modifier`, `Functions` or `Callback` based on how user defines the pipeline. Only one `Phase` gets executed at a single execution of `faas-flow function`. |
| Repeat Or Response | If pipeline is not yet completed, FaasWorkflow forwards the remaining pipeline with `partial execution state` and the `partial result` to the same `flow function` via `gateway`. If the pipeline has only one phase or completed `faas-flow` returns the output to the gateway otherwise it returns `empty`|
| Get Definition | FaasWorkflow create simple **pipeline-definition** with one or multiple nodes based on the flow defined at `Define()` function in `handler.go`. A **pipeline-definition** consist of multiple `nodes`. Each `Node` includes one or more `Function Call`, `Modifier` or `Callback`. Always a single `node` is executed in a single invokation of the flow. A same flow always outputs to same pipeline definition, which allows `faas-flow` to be completly `stateless`|
| Execute | Execute executes a `Node` by calling the `Modifier`, `Functions` or `Callback` based on how user defines the pipeline. Only one `Node` gets executed at a single execution of `faas-flow function`. |
| Repeat Or Response | If pipeline is not yet completed, FaasWorkflow forwards the remaining pipeline with `partial execution state` and the `partial result` to the same `flow function` via `gateway`. If the pipeline has only one node or completed `faas-flow` returns the output to the gateway otherwise it returns `empty`|


## Example
Expand Down Expand Up @@ -138,8 +190,8 @@ faas-cli new test-flow --lang faas-flow
environment_file:
- flow.yml
```
> `read_timeout` : A value larger than `max` phase execution time.
> `write_timeout` : A value larger than `max` phase execution time.
> `read_timeout` : A value larger than `max` node execution time.
> `write_timeout` : A value larger than `max` node execution time.
> `write_debug`: It enables the debug msg in logs.
> `combine_output` : It allows debug msg to be excluded from `output`.

Expand All @@ -163,7 +215,7 @@ environment:
> # k8
> gateway: "gateway.openfaas:8080"
> ```
> `enable_tracing` : It ebales the opentracing for requests and their phases.
> `enable_tracing` : It ebales the opentracing for requests and their nodes.
> `trace_server` : The address of opentracing backend jaeger.
> `enable_hmac` : Enable hmac to add extra layer of security for partial request forward.

Expand All @@ -178,12 +230,12 @@ environment:
Callback("http://gateway:8080/function/send2slack",
Header("method", "post"), Query("authtoken", os.Getenv(token)))
```
> This function will generate two phases as:
> This function will generate two nodes as:
> ```
> Phase 1 :
> Node 1 :
> Apply("yourFunc1")
> Modify()
> Phase 2:
> Node 2:
> Apply("yourFunc2")
> Callback()
> ```
Expand All @@ -208,17 +260,17 @@ Function submitted asynchronously.
```
#### Convert with Sync function
> Edit the `test-flow/handler.go`
> Edit the ` at `function/handler.go``
```go
flow.Apply("yourFunc1", Header("method","post"), faasflow.Sync).
Modify(func(data []byte) ([]byte, error) {
// Check, update/customize data, replay data ...
return []byte(fmt.Sprintf("{ \"data\" : \"%s\" }", string(data))), nil
}).Apply("yourFunc2", Header("method", "post"), faasflow.Sync)
```
> This function will generate one phase as:
> This function will generate one node as:
> ```
> Phase 1 :
> Node 1 :
> Apply("yourFunc1")
> Modify()
> Apply("yourFunc2")
Expand Down Expand Up @@ -255,9 +307,9 @@ docker service create --constraint="node.role==manager" --detach=true \
jaegertracing/all-in-one:latest
```

Below is an example of tracing for an async request with 3 phases
Below is an example of tracing for an async request with 3 Nodes

![alt multi phase](https://github.com/s8sg/faas-flow/blob/master/doc/tracing.png)
![alt multi node](https://github.com/s8sg/faas-flow/blob/master/doc/tracing.png)



Expand All @@ -266,30 +318,40 @@ Request context provide verious function such as:
**DataStore** to store data,
**HttpQuery** to retrivbe request query,
**State*** to get flow state,
**phase** to execution phase
**Node** to get current node
etc.

### Manage Data Accross Phase with `DataStore`
The main state in faas-flow is the **`execution-position` (next-phase)** and the **`partially`** completed data.
### Manage Data Accross Node with `DataStore`
The main state in faas-flow chain is the **`execution-position` (next-Node)** and the **`partially`** completed data.
Apart from that faas-flow allow user to define state with `DataStore` interface.
```go
type DataStore interface {
Init(flowName string, requestId string) error
Set(key string, value string) error
Get(key string) (string, error)
Del(key string) error
// Configure the DaraStore with flow name and request ID
Configure(flowName string, requestId string)
// Initialize the DataStore (called only once in a request span)
Init() error
// Set store a value for key, in failure returns error
Set(key string, value string) error
// Get retrives a value by key, if failure returns error
Get(key string) (string, error)
// Del delets a value by a key
Del(key string) error
// Cleanup all the resorces in DataStore
Cleanup() error
}
```

State manager can be implemented and set by user with request context in faas-flow `Define()`:
Data Store can be implemented and set by user at the `DefineDataStore()` at `function/handler.go`:
```go
func Define(flow *faasflow.Workflow, context *faasflow.Context) (err error) {
// initialize my custom DataStore as minioDataStore
context.SetDataStore(minioDataStore)
// ProvideDataStore provides the override of the default DataStore
func DefineDataStore() (faasflow.DataStore, error) {
// initialize minio DataStore
miniods, err := minioDataStore.InitFromEnv()
return miniods, err
}
```

Once a state manager is set it can be used by calling `Get()` and `Set()` from `context`:
Once a DataStore is set it can be used by calling `Get()` and `Set()` from `context`:
```
flow.Modify(func(data []byte) {
// parse data and set to be used later
Expand All @@ -302,12 +364,11 @@ Once a state manager is set it can be used by calling `Get()` and `Set()` from `
// use the query
})
```
* **[MinioDataStore](https://github.com/s8sg/faas-flow-minio-datastore)** allows to store state in **amazon s3** or local **minio DB**
* **[MinioDataStore](https://github.com/s8sg/faas-flow-minio-datastore)** allows to store data in **amazon s3** or local **minio DB**

> **Default `requestEmbedDataStore`:**
> By default faas-flow template use `requestEmbedDataStore` which embed the state data along with the request for the next phase. For bigger values it is recommended to pass it with custom `DataStore`.
> By default faas-flow template use `requestEmbedDataStore` which embed the state data along with the request for the next node. For bigger values it is recommended to pass it with custom `DataStore`.


Once `DataStore` is overridden, all call to `Set()`, `Get()` and `del()` will call the provided `DataStore`

### Use **DataStore** to store intermediate result
Expand All @@ -317,6 +378,41 @@ By default **`partially`** completed data gets forwarded along with the async re
```
Due to **nats** `1mb` storage limitation, async call may fail. In such scenario using `intermediate_storage` is recommended

### Manage State of Pipeline in a DAG with `StateStore`
In a `faas-flow` DAG execution faas-flow state is not only depends on the execution position, as the DAG execution happens on a shared state, a 3rd party **Synchoronous KV store** can be used as a `StateStore`
`StateStore` provides the below interface:
```go
type StateStore interface {
// Configure the StateStore with flow name and request ID
Configure(flowName string, requestId string)
// Initialize the StateStore (called only once in a request span)
Init() error
// create Vertexes for request
// creates a map[<vertexId>]<Indegree Completion Count>
Create(vertexs []string) error
// Increment Vertex Indegree Completion
// synchronously increment map[<vertexId>] Indegree Completion Count by 1 and return updated count
IncrementCounter(vertex string) (int, error)
// Set state of pipeline
SetState(state bool) error
// Get State of pipeline
GetState() (bool, error)
// Cleanup all the resorces in StateStore (called only once in a request span)
Cleanup() error
}
```
A `StateStore` can be implemented with any KV Store that provides `Synchronization`. The implemented `StateStore` can be set with `DefineStateStore()` at `function/handler.go`:
```go
// DefineStateStore provides the override of the default StateStore
func DefineStateStore() (faasflow.StateStore, error) {
consulss, err := consulStateStore.GetConsulStateStore(os.Getenv("consul_url"), os.Getenv("consul_dc"))
return consulss, err
}
```

* **[ConsulStateStore](https://github.com/s8sg/faas-flow-consul-statestore)** manage state in **consul** for dag execution


### Geting Http Query to Workflow:
Http Query to flow can be used from context as
```go
Expand All @@ -327,9 +423,9 @@ Http Query to flow can be used from context as
```

### Other from context:
Phase, requestId, State is provided by the `context`
Node, requestId, State is provided by the `context`
```go
currentPhase := context.GetPhase()
currentNode := context.GetNode()
requestId := context.GetRequestId()
state := context.State
```
Expand All @@ -352,7 +448,7 @@ func Define(flow *faasflow.Workflow, context *faasflow.Context) (err error) {
context.Set("commitsha", req.Sha)
}).Apply("myfunc").
Modify(func(data []byte) {
// retrived the data in different phase from context
// retrived the data in different node from context
commitsha, _ = context.GetString("commitsha")
}).Finally(func() {
// delete the state resource
Expand Down

0 comments on commit 8c44197

Please sign in to comment.