diff --git a/README.md b/README.md index 2285cd52..d805c3af 100644 --- a/README.md +++ b/README.md @@ -1,34 +1,30 @@ -# Faas-flow - Function Composition for Openfaas +# Faas-flow - Function Composition for [Openfaas](https://github.com/openfaas/faas) [![Build Status](https://travis-ci.org/s8sg/faas-flow.svg?branch=master)](https://travis-ci.org/s8sg/faas-flow) [![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT) [![GoDoc](https://godoc.org/github.com/s8sg/faas-flow?status.svg)](https://godoc.org/github.com/s8sg/faas-flow) [![OpenTracing Badge](https://img.shields.io/badge/OpenTracing-enabled-blue.svg)](http://opentracing.io) [![OpenFaaS](https://img.shields.io/badge/openfaas-serverless-blue.svg)](https://www.openfaas.com) -> - [x] **Pure**              **`FaaS`** with **`openfaas`** -> - [x] **Fast**               build with **`go`** -> - [x] **Secured**        with **`HMAC`** -> - [x] **Stateless**      by **`design`** -> - [x] **Tracing**         with **`open-tracing`** -> - [x] **Available**       as **`faas-flow`** template - -**FYI**: Faasflow is into conceptual state and API which may change and under active development +> - [x] **Pure**              FaaS with [Openfaas](https://github.com/openfaas/faas) +> - [x] **Fast**               Built with `Go` +> - [x] **Secured**        With `HMAC` +> - [x] **Stateless**      By design +> - [x] **Tracing**         With `open-tracing` +> - [x] **Available**      As `faas-flow` template -**Dashboard:** Faasflow comes with a dashboard for visualizing dag generated from flow functions. -Available as https://github.com/s8sg/faas-flow-tower +[**Faas-flow tower**](https://github.com/s8sg/faas-flow-tower) visualizes and monitors flow function ## Overview -faas-flow allows you to realize OpenFaaS function composition with ease. By defining a simple pipeline, you can orchestrate multiple functions without having to worry about internals. +Faas-flow allows you to realize OpenFaaS function composition with ease. By defining a simple pipeline, you can orchestrate multiple functions without having to worry about internals ```go -import faasflow "github.com/s8sg/faas-flow" - func Define(flow *faasflow.Workflow, context *faasflow.Context) (err error) { - flow.SyncNode().Apply("yourFunc1").Apply("yourFunc2") + flow.SyncNode().Apply("Func1").Apply("Func2") + return } ``` -After building and deploying, it will give you a function that orchestrates calling `yourFunc2` with the output of `yourFunc1` +After building and deploying, it will give you a openfaas function that orchestrates calling `Func2` with the output of `Func1` ## Pipeline Definition @@ -39,92 +35,103 @@ The above pipelines can be achieved with little, but powerfull code: > SYNC Chain ```go func Define(flow *faasflow.Workflow, context *faasflow.Context) (err error) { - flow.SyncNode().Apply("func1").Apply("func2"). - Modify(func(data []byte) ([]byte, error) { - // Do something - return data, nil - }) - return nil + flow.SyncNode().Apply("func1").Apply("func2"). + Modify(func(data []byte) ([]byte, error) { + // do something + return data, nil + }) + return } ``` > ASYNC Chain ```go func Define(flow *faasflow.Workflow, context *faasflow.Context) (err error) { - dag := flow.Dag() - dag.Node("n1").Apply("func1") - dag.Node("n2").Apply("func2"). - Modify(func(data []byte) ([]byte, error) { - // Do something - return data - }) - dag.Node("n3").callback("storage.io/bucket?id=3345612358265349126&file=result.dat") - dag.Edge("n1", "n2") - dag.Edge("n2", "n3") - flow.OnFailure(func(err error) { - // failure handler - }). - Finally(func(state string) { - // cleanup code - }) - - return nil + dag := flow.Dag() + dag.Node("n1").Apply("func1") + dag.Node("n2").Apply("func2"). + Modify(func(data []byte) ([]byte, error) { + // do something + return data, nil + }) + dag.Node("n3").Callback("http://gateway:8080/function/fake-storage") + dag.Edge("n1", "n2") + dag.Edge("n2", "n3") + return } ``` > PARALLEL Branching ```go func Define(flow *faasflow.Workflow, context *faasflow.Context) (err error) { - dag := flow.Dag() - dag.Node("n1").Modify(func(data []byte) ([]byte, error) { - // do something - return data, nil - }) - dag.Node("n2").Apply("func1") - dag.Node("n3").Apply("func2").Modify(func(data []byte) ([]byte, error) { - // do something - return data, nil - }) - dag.Node("n4").Callback("storage.io/bucket?id=3345612358265349126&file=result") - dag.Edge("n1", "n2") - dag.Edge("n1", "n3") - dag.Edge("n2", "n4") - dag.Edge("n3", "n4") + dag := flow.Dag() + dag.Node("n1").Modify(func(data []byte) ([]byte, error) { + // do something + return data, nil + }) + dag.Node("n2").Apply("func1") + dag.Node("n3").Apply("func2").Modify(func(data []byte) ([]byte, error) { + // do something + return data, nil + }) + dag.Node("n4", faasflow.Aggregator(func(data map[string][]byte) ([]byte, error) { + // aggregate branch result data["n2"] and data["n3"] + return []byte(""), nil + })).Callback("http://gateway:8080/function/fake-storage") + + dag.Edge("n1", "n2") + dag.Edge("n1", "n3") + dag.Edge("n2", "n4") + dag.Edge("n3", "n4") + return } ``` > DYNAMIC Branching ```go func Define(flow *faasflow.Workflow, context *faasflow.Context) (err error) { - dag := flow.Dag() - dag.Node("n1").Modify(func(data []byte) ([]byte, error) { + dag := flow.Dag() + dag.Node("n1").Modify(func(data []byte) ([]byte, error) { + // do something return data, nil - }) - conditionalDags := dag.ConditionalBranch("C", + }) + conditionalDags := dag.ConditionalBranch("C", []string{"c1", "c2"}, // possible conditions - func(response []byte) []string { + func(response []byte) []string { // for each returned condition the corresponding branch will execute // this function executes in the runtime of condition C return []string{"c1", "c2"} }, - ) - conditionalDags["c2"].Node("n1").Apply("func").Modify(func(data []byte) ([]byte, error) { - return data, nil - }) - foreachDag := conditionalDags["c1"].ForEachBranch("F", - func(data []byte) map[string][]byte { - // for each returned key in the hashmap a new branch will be executed + faasflow.Aggregator(func(data map[string][]byte) ([]byte, error) { + // aggregate all dynamic branches results + return []byte(""), nil + }), + ) + + conditionalDags["c2"].Node("n1").Apply("func1").Modify(func(data []byte) ([]byte, error) { + // do something + return data, nil + }) + foreachDag := conditionalDags["c1"].ForEachBranch("F", + func(data []byte) map[string][]byte { + // for each returned key in the hashmap a new branch will be executed // this function executes in the runtime of foreach F - return map[string][]byte{ "f1": data, "f2": data } + return map[string][]byte{"f1": data, "f2": data} }, - ) - foreachDag.Node("n1").Modify(func(data []byte) ([]byte, error) { + faasflow.Aggregator(func(data map[string][]byte) ([]byte, error) { + // aggregate all dynamic branches results + return []byte(""), nil + }), + ) + foreachDag.Node("n1").Modify(func(data []byte) ([]byte, error) { + // do something return data, nil - }) - dag.Node("n2").Callback("storage.io/bucket?id=3345612358265349126&file=result") - dag.Edge("n1", "C") - dag.Edge("C", "n2") + }) + dag.Node("n2").Callback("http://gateway:8080/function/fake-storage") + dag.Edge("n1", "C") + dag.Edge("C", "n2") } -``` +``` +Full implementions of the above examples are available [here](https://github.com/s8sg/faasflow-example) ## Faas-flow Design -Faas-flow design is not fixed and like any good design its evolving. The current design consideration are made based on the below goals +The current design consideration are made based on the below goals > 1. Leverage the openfaas platform > 2. Not to violate the notions of function > 3. Provide flexibility, scalability and adaptibility @@ -134,40 +141,44 @@ Faas-flow is deployed and provisioned just like any other openfaas function. It ![alt its a function](https://github.com/s8sg/faas-flow/blob/master/doc/design/complete-faas.jpg) #### Adapter pattern for zero intrumenttaion in code -Faas-flow function follow the adapter pattern. Here the adaptee is the functions and the adapter is `faas-flow` itself. For each **node** execution, `faas-flow` handle the calls to functions. Once the execution is over, it forwards the call to itself. This way the arrangement logic is seperated from functions and implemented in the adapter. As no code instrumentation is needed, functions becomes completly independent of the composition logic +Faas-flow function follow the adapter pattern. Here the adaptee is the functions and the adapter is `faas-flow`. For each node execution, `faas-flow` handle the calls to the functions. Once the execution is over, it forwards an event to itself. This way the arrangement logic is seperated from the functions and is implemented in the adapter. Compositions need no code instrumentations, making functions completly independent of the compositions details ![alt function is independent of composition](https://github.com/s8sg/faas-flow/blob/master/doc/design/adapter-pattern.jpg) #### Aggregate pattern as chaining -Aggregatation of seperate function calls are done as chaining. Multiple functions can be called from a single node with order maintained as per the chain. This way one execution node is capable to be implemented as an aggregator function that invokes multiple functions, collects the results, optionally applies business logic, and returns a consolidated response to the client. Faas-flow fuses the adapter pattern and aggregate pattern to achive more complex usecases +Aggregatation of seperate function calls are done as chaining. Multiple functions can be called from a single node with order maintained as per the chain. This way one execution node can be implemented as an aggregator function that invokes multiple functions, collects the results, optionally applies business logic, and returns a consolidated response to the client or forward to next nodes. Faas-flow fuses the adapter pattern and aggregate pattern to support more complex usecases ![alt aggregation](https://github.com/s8sg/faas-flow/blob/master/doc/design/aggregate-pattern.jpg) #### Event driven iteration -Openfaas uses [Nats](https://nats.io) for event delivery and faas-flow uses openfaas platform without depending on other external system. Node execution in `faas-flow` starts by a completion event of one or more previous nodes. A completion event denotes that all the previous dependent nodes has completed. Event carries the execution state, which is used to identify the next node to execute. With events faas-flow asynchronously carry-on execution of nodes by iterating itself over and over till all nodes has been executed -![alt aggregation](https://github.com/s8sg/faas-flow/blob/master/doc/design/event-driven-iteration.jpg) +Openfaas uses [Nats](https://nats.io) for event delivery and faas-flow leverages openfaas platform. Node execution in `faas-flow` starts by a completion event of one or more previous nodes. A completion event denotes that all the previous dependent nodes have completed. The event carries the execution state and identifies the next node to execute. With events faas-flow asynchronously carry-on execution of nodes by iterating itself over and over till all nodes are executed +![alt iteration](https://github.com/s8sg/faas-flow/blob/master/doc/design/event-driven-iteration.jpg) #### 3rd party KV store for coordination -When executing branches, one node is dependent of multiple predecessor nodes. In that scenario the event for completion is genearted by coordination of earlier nodes. Like any distributed system the coordination are achived via a centralized service. Faas-flow keeps the logic of the coordination controller inside of faas-flow function and let user use any external 3rd party solution. User can implement `StateStore` and use any synchronous KV stores +When executing branches, one node is dependent on more than one predecessor nodes. In that scenario, the event for completion is generated by coordination of earlier nodes. Like any distributed system the coordination is achieved via a centralized service. Faas-flow keeps the logic of the coordination controller inside of faas-flow implementation and lets the user use any external synchronous KV store by implementing [`StateStore`](https://godoc.org/github.com/s8sg/faas-flow#StateStore) ![alt coordination](https://github.com/s8sg/faas-flow/blob/master/doc/design/3rd-party-statestore.jpg) #### 3rd party Storage for intermediate data -Results from function execution and intermidiate data can be handled by the user manually. Although `faas-flow` provides state-store for intermediate result storage which automatically initialize, store, retrive and remove data between nodes. This fits great for data pipelines. Faas-flow keeps the logic of storage controller inside of faas-flow function and let user use any external 3rd party object storage. User can implement `DataStore` and use any object stores -![alt coordination](https://github.com/s8sg/faas-flow/blob/master/doc/design/3rd-party-storage.jpg) - +Results from function execution and intermediate data can be handled by the user manually. Faas-flow provides data-store for intermediate result storage. It automatically initializes, store, retrieve and remove data between nodes. This fits great for data processing applications. Faas-flow keeps the logic of storage controller inside of Faas-flow implementation and lets the user use any external object storage by implementing [`DataStore`](https://godoc.org/github.com/s8sg/faas-flow#DataStore) +![alt storage](https://github.com/s8sg/faas-flow/blob/master/doc/design/3rd-party-storage.jpg) + + +Faas-flow design is not fixed and like any good design it is evolving. Please contribute to make it better. ## Getting Started -This example implements a flow to `Greet` +This example implements a very simple flow to `Greet` -#### Get the `faas-flow` template with `faas-cli` +#### Get template +Pull `faas-flow` template with the `faas-cli` ``` faas template pull https://github.com/s8sg/faas-flow ``` -#### Create a new `func` with `faas-flow` template +#### Create new flow function +Create a new function using `faas-flow` template ```bash faas new greet --lang faas-flow ``` -#### Edit `greet.yml` +#### Edit stack Edit function stack file `greet.yml` ```yaml greet: @@ -179,22 +190,22 @@ Edit function stack file `greet.yml` write_timeout: 120 # A value larger than `max` of all execution times of Nodes write_debug: true combine_output: false + workflow_name: "greet" # The name of the flow function, faasflow use this to forward completion event environment_file: - flow.yml ``` -#### Add `flow.yml` +#### Add configuration Add a seperate file `flow.yml` with faas-flow related configuration. ```yaml environment: - workflow_name: "greet" # The name of the flow function, faasflow use this to forward completion event gateway: "gateway:8080" # The address of openfaas gateway, faasflow use this to forward completion event - # gateway: "gateway.openfaas:8080" # For K8 + # gateway: "gateway.openfaas:8080" # For K8s enable_tracing: false # tracing allow to trace internal node execution with opentracing - enable_hmac: false # hmac adds extra layer of security by validating the event source + enable_hmac: true # hmac adds extra layer of security by validating the event source ``` -#### Edit Defnition +#### Edit function defnition Edit `greet/handler.go` and Update `Define()` ```go func Define(flow *faasflow.Workflow, context *faasflow.Context) (err error) { @@ -206,7 +217,6 @@ func Define(flow *faasflow.Workflow, context *faasflow.Context) (err error) { return nil } ``` - #### Build and Deploy Build and deploy @@ -214,11 +224,12 @@ Build and deploy faas build faas deploy ``` -> This function will generate one Synchronous node as: +> This function will generate one Synchronous node > ``` > Modify("name") -> Hello name > ``` -All calls will be performed in one single execution of the function, and result will be returned to the callee +All calls will be performed in one single execution of the flow function and result will be returned to the callee +> Note: For flow that has more than one nodes, faas-flow doesn't return any response. External storage or `callback` can be used to retrive async result #### Invoke ``` @@ -226,72 +237,48 @@ echo "Adam" | faas invoke greet ``` ## Request Tracking by ID -Request can be traced from the log by `RequestId`. For each new Request a unique `RequestId` is generated. +For each new request faas-flow generates a unique `Request Id` for the flow. The same Id is used when logging ```bash -2018/08/13 07:51:59 [request `bdojh7oi7u6bl8te4r0g`] Created +2018/08/13 07:51:59 [Request `bdojh7oi7u6bl8te4r0g`] Created 2018/08/13 07:52:03 [Request `bdojh7oi7u6bl8te4r0g`] Received ``` +The assigned requestId is set on the response header `X-Faas-Flow-Reqid` + ## Request Tracing by Open-Tracing -Request tracing can be enabled by providing the `trace_server` and enabling tracing. -Edit `flow.yml` as: +Request tracing can be retrived from `trace_server` once enabled. Tracing is the best way to monitor flows and execution status of each nodes for each requests + +#### Edit `flow.yml` +Enable tracing and add trace server as: ```yaml enable_tracing: true trace_server: "jaegertracing:5775" ``` -### Start The Trace Server -`jaeger` (opentracing-1.x) used for traceing +#### Start The Trace Server +`jaeger` (opentracing-1.x) used for tracing backend Quick start with jaegertracing: https://www.jaegertracing.io/docs/1.8/getting-started/ -Below is an example of tracing for: https://github.com/s8sg/branching-in-faas-flow - -![alt multi node](https://github.com/s8sg/faas-flow/blob/master/doc/tracing.png) +#### Use [faas-flow-tower](https://github.com/s8sg/faas-flow-tower) +Retrive the requestID from `X-Faas-Flow-Reqid` header of response + +Below is an example of tracing information for [example-branching-in-faas-flow](https://github.com/s8sg/branching-in-faas-flow) in [faas-flow-tower](https://github.com/s8sg/faas-flow-tower) +![alt monitoring](https://github.com/s8sg/faas-flow-tower/blob/master/doc/monitoring.png) -## Using context -Context provide verious function such as: - **DataStore** to store data, - **HttpQuery** to retrivbe original request queries, - **State*** to get flow state, - **Node** to get current node -etc. +## Use of context -### Manage Data Accross Node with `DataStore` -Faas-flow uses the `DataStore` to store partially completed data and request context data. In faas-flow any dag that forwards data between two nodes need `DataStore`. -faas-flow allow user to define custom datastore with `DataStore` interface. -```go - type DataStore interface { - // Configure the DaraStore with flow name and request ID - Configure(flowName string, requestId string) - // Initialize the DataStore (called only once in a request span) - Init() error - // Set store a value for key, in failure returns error - Set(key string, value string) error - // Get retrives a value by key, if failure returns error - Get(key string) (string, error) - // Del delets a value by a key - Del(key string) error - // Cleanup all the resorces in DataStore - Cleanup() error - } -``` - -Data Store can be implemented and set by user at the `DefineDataStore()` at `function/handler.go`: -```go -// ProvideDataStore provides the override of the default DataStore -func DefineDataStore() (faasflow.DataStore, error) { - // initialize minio DataStore - miniods, err := minioDataStore.InitFromEnv() - return miniods, err -} -``` -Once a `DataStore` is set `faas-flow` uses the same to store intermidiate result inbetween nodes - +Context can be used inside definition for differet usecases. Context provide verious information such as: + **HttpQuery** to retrivbe original request queries + **State** to get flow state + **Node** to get current node +along with that it wraps the **DataStore** to store data + +#### Store data in context with `DataStore` Context uses `DataStore` to store/retrive data. User can do the same by -calling `Get()` and `Set()` from `context`: +calling `Get()`, `Set()` and `Del()` from `context`: ```go flow.SyncNode(). Modify(func(data []byte) { @@ -306,12 +293,28 @@ calling `Get()` and `Set()` from `context`: // use the query }) ``` -* **[MinioDataStore](https://github.com/s8sg/faas-flow-minio-datastore)** allows to store data in **amazon s3** or local **minio DB** + +#### Geting Http Query to Workflow: +Http Query to flow can be used from context as +```go + flow.SyncNode().Apply("myfunc", Query("auth-token", context.Query.Get("token"))). // pass as a function query + Modify(func(data []byte) { + token = context.Query.Get("token") // get query inside modifier + }) +``` + +#### Other from context: +Node, requestId, State is provided by the `context` +```go + currentNode := context.GetNode() + requestId := context.GetRequestId() + state := context.State +``` +for more details check `[faas-flow-GoDoc](https://godoc.org/github.com/s8sg/faas-flow) -### Manage State of Pipeline in a DAG with `StateStore` -Any DAG which has a branch needs external statestore which can be a 3rd party **Synchoronous KV store**. `Faas-flow` uses `StateStore` top maintain state of Node execution in a dag which has branches. -Faas-flow allows user to define custom statestore with `StateStore` interface. +## External `StateStore` for coordination controller +Any DAG which has a branch needs coordination for nodes completion events. Faas-flow implements coordination controller which allows user to use any external Synchoronous KV store. User can define custom state-store with `StateStore` interface. ```go type StateStore interface { // Configure the StateStore with flow name and request ID @@ -328,7 +331,6 @@ type StateStore interface { Cleanup() error } ``` - A `StateStore` can be implemented with any KV Store that provides `Synchronization`. The implemented `StateStore` can be set with `DefineStateStore()` at `function/handler.go`: ```go // DefineStateStore provides the override of the default StateStore @@ -338,50 +340,60 @@ func DefineStateStore() (faasflow.StateStore, error) { } ``` +#### Available state-stores: * **[ConsulStateStore](https://github.com/s8sg/faas-flow-consul-statestore)** statestore implementation with **consul** * **[EtcdStateStore](https://github.com/s8sg/faas-flow-etcd-statestore)** statewtore implementation with **etcd** - -### Geting Http Query to Workflow: -Http Query to flow can be used from context as -```go - flow.SyncNode().Apply("myfunc", Query("auth-token", context.Query.Get("token"))). // pass as a function query - Modify(func(data []byte) { - token = context.Query.Get("token") // get query inside modifier - }) -``` - -### Other from context: -Node, requestId, State is provided by the `context` +## External `DataStore` for storage controller +Faas-flow uses the `DataStore` to store partially completed data between nodes and request context data. Faas-flow implements storage controller to handle storage that allows user to use any external object store. User can define custom data-store with `DataStore` interface. ```go - currentNode := context.GetNode() - requestId := context.GetRequestId() - state := context.State + type DataStore interface { + // Configure the DaraStore with flow name and request ID + Configure(flowName string, requestId string) + // Initialize the DataStore (called only once in a request span) + Init() error + // Set store a value for key, in failure returns error + Set(key string, value string) error + // Get retrives a value by key, if failure returns error + Get(key string) (string, error) + // Del delets a value by a key + Del(key string) error + // Cleanup all the resorces in DataStore + Cleanup() error + } ``` -for more details check `[faas-flow-GoDoc](https://godoc.org/github.com/s8sg/faas-flow) - +Data Store can be implemented and set by user at the `DefineDataStore()` at `function/handler.go`: +```go +// ProvideDataStore provides the override of the default DataStore +func DefineDataStore() (faasflow.DataStore, error) { + // initialize minio DataStore + miniods, err := minioDataStore.InitFromEnv() + return miniods, err +} +``` + +#### Available data-stores: +* **[MinioDataStore](https://github.com/s8sg/faas-flow-minio-datastore)** allows to store data in **amazon s3** or local **minio DB** + ## Cleanup with `Finally()` -Finally provides a way to cleanup context and other resources and do post completion work of the pipeline. -A Finally method can be used on flow as: +Finally provides an efficient way to perform post execution steps of the flow. If specified `Finally()` invokes in case of both failure and success of the flow. A Finally method can be set as: ```go func Define(flow *faasflow.Workflow, context *faasflow.Context) (err error) { - // initialize my custom DataStore as myDataStore - context.SetDataStore(myDataStore) - // Define flow flow.SyncNode().Modify(func(data []byte) { // parse data and set to be used later // json.Unmarshal(&req, data) context.Set("commitsha", req.Sha) }). - Apply("myfunc"). - Modify(func(data []byte) { + Apply("myfunc").Modify(func(data []byte) { // retrived the data in different node from context commitsha, _ = context.GetString("commitsha") }) - + flow.OnFailure(func(err error) { + // failure handler + }) flow.Finally(func() { // delete the state resource context.Del("commitsha") diff --git a/sdk/pipeline.go b/sdk/pipeline.go index 31c45776..5e51212d 100644 --- a/sdk/pipeline.go +++ b/sdk/pipeline.go @@ -23,9 +23,7 @@ type Pipeline struct { ExecutionPosition map[string]string `json:"pipeline-execution-position"` // Denotes the node that is executing now ExecutionDepth int `json:"pipeline-execution-depth"` // Denotes the depth of subgraph its executing - CurrentDynamicOption map[string]string `json:"pipeline-dynamic-option"` // Denotes the current dynamic option mapped against the dynamic Node UQ id - AllDynamicOption map[string][]string `json:"pipeline-all-dynamic-options"` // Denotes all options mapped against the dynamic Node UQ id - DynamicDependencyCount map[string]int `json:"pipeline-dynamic-dependency-count"` // Denotes the no of dependency for a nodes unique Id + CurrentDynamicOption map[string]string `json:"pipeline-dynamic-option"` // Denotes the current dynamic option mapped against the dynamic Node UQ id FailureHandler PipelineErrorHandler `json:"-"` Finally PipelineHandler `json:"-"` @@ -37,12 +35,9 @@ func CreatePipeline() *Pipeline { pipeline.Dag = NewDag() pipeline.ExecutionPosition = make(map[string]string, 0) - + pipeline.ExecutionDepth = 0 pipeline.CurrentDynamicOption = make(map[string]string, 0) - pipeline.AllDynamicOption = make(map[string][]string, 0) - pipeline.DynamicDependencyCount = make(map[string]int, 0) - pipeline.ExecutionDepth = 0 return pipeline } @@ -67,24 +62,53 @@ func (pipeline *Pipeline) GetInitialNodeId() string { return "0" } +// GetNodeExecutionUniqueId provide a ID that is unique in an execution +func (pipeline *Pipeline) GetNodeExecutionUniqueId(node *Node) string { + depth := 0 + dag := pipeline.Dag + depthStr := "" + optionStr := "" + for depth < pipeline.ExecutionDepth { + depthStr = fmt.Sprintf("%d", depth) + node := dag.GetNode(pipeline.ExecutionPosition[depthStr]) + option := pipeline.CurrentDynamicOption[node.GetUniqueId()] + if node.subDag != nil { + dag = node.subDag + } else { + dag = node.conditionalDags[option] + } + if optionStr == "" { + optionStr = option + } else { + optionStr = option + "--" + optionStr + } + + depth++ + } + if optionStr == "" { + return node.GetUniqueId() + } + return optionStr + "--" + node.GetUniqueId() +} + // GetCurrentNodeDag returns the current node and current dag based on execution position func (pipeline *Pipeline) GetCurrentNodeDag() (*Node, *Dag) { - index := 0 + depth := 0 dag := pipeline.Dag - indexStr := "" - for index < pipeline.ExecutionDepth { - indexStr = fmt.Sprintf("%d", index) - node := dag.GetNode(pipeline.ExecutionPosition[indexStr]) + depthStr := "" + for depth < pipeline.ExecutionDepth { + depthStr = fmt.Sprintf("%d", depth) + node := dag.GetNode(pipeline.ExecutionPosition[depthStr]) + option := pipeline.CurrentDynamicOption[node.GetUniqueId()] if node.subDag != nil { dag = node.subDag } else { - option := pipeline.CurrentDynamicOption[node.GetUniqueId()] dag = node.conditionalDags[option] } - index++ + depth++ } - indexStr = fmt.Sprintf("%d", index) - node := dag.GetNode(pipeline.ExecutionPosition[indexStr]) + depthStr = fmt.Sprintf("%d", depth) + node := dag.GetNode(pipeline.ExecutionPosition[depthStr]) return node, dag } @@ -122,8 +146,5 @@ func (pipeline *Pipeline) ApplyState(state string) { temp, _ := decodePipeline([]byte(state)) pipeline.ExecutionDepth = temp.ExecutionDepth pipeline.ExecutionPosition = temp.ExecutionPosition - pipeline.CurrentDynamicOption = temp.CurrentDynamicOption - pipeline.AllDynamicOption = temp.AllDynamicOption - pipeline.DynamicDependencyCount = temp.DynamicDependencyCount } diff --git a/template/faas-flow/Dockerfile b/template/faas-flow/Dockerfile index eb35102f..9f2098d5 100644 --- a/template/faas-flow/Dockerfile +++ b/template/faas-flow/Dockerfile @@ -1,11 +1,10 @@ -FROM golang:1.10.5-alpine3.7 as builder +FROM openfaas/of-watchdog:0.5.3 as watchdog +FROM golang:1.10.4-alpine3.8 as build -RUN apk --no-cache add curl \ - && echo "Pulling watchdog binary from Github." \ - && curl -sSL https://github.com/openfaas/faas/releases/download/0.8.0/fwatchdog > /usr/bin/fwatchdog \ - && chmod +x /usr/bin/fwatchdog \ - && apk del curl --no-cache +COPY --from=watchdog /fwatchdog /usr/bin/fwatchdog +RUN chmod +x /usr/bin/fwatchdog +RUN mkdir -p /go/src/handler WORKDIR /go/src/handler COPY . . @@ -16,26 +15,25 @@ RUN CGO_ENABLED=0 GOOS=linux \ go build --ldflags "-s -w" -a -installsuffix cgo -o handler . && \ go test $(go list ./... | grep -v /vendor/) -cover -FROM alpine:3.7 -RUN apk --no-cache add ca-certificates - -# Add non root user -RUN addgroup -S app && adduser -S -g app app -RUN mkdir -p /home/app +FROM alpine:3.8 +# Add non root user and certs +RUN apk --no-cache add ca-certificates \ + && addgroup -S app && adduser -S -g app app \ + && mkdir -p /home/app \ + && chown app /home/app WORKDIR /home/app -COPY --from=builder /usr/bin/fwatchdog . - -COPY --from=builder /go/src/handler/function/ . -COPY --from=builder /go/src/handler/handler . +COPY --from=build /go/src/handler/handler . +COPY --from=build /usr/bin/fwatchdog . +COPY --from=build /go/src/handler/function/ . -RUN chown app /home/app +RUN chown -R app /home/app USER app ENV fprocess="./handler" - -HEALTHCHECK --interval=2s CMD [ -e /tmp/.lock ] || exit 1 +ENV mode="http" +ENV upstream_url="http://127.0.0.1:8082" CMD ["./fwatchdog"] diff --git a/template/faas-flow/handler.go b/template/faas-flow/handler.go index 69ec8738..a7600be9 100644 --- a/template/faas-flow/handler.go +++ b/template/faas-flow/handler.go @@ -5,19 +5,21 @@ import ( "encoding/hex" "encoding/json" "fmt" - hmac "github.com/alexellis/hmac" - "github.com/rs/xid" - faasflow "github.com/s8sg/faas-flow" - sdk "github.com/s8sg/faas-flow/sdk" "handler/function" "io/ioutil" - "log" "net/http" "net/url" "os" "path" "strconv" "strings" + + hmac "github.com/alexellis/hmac" + xid "github.com/rs/xid" + faasflow "github.com/s8sg/faas-flow" + sdk "github.com/s8sg/faas-flow/sdk" + + gosdk "github.com/openfaas-incubator/go-function-sdk" ) const ( @@ -46,6 +48,8 @@ type flowHandler struct { partial bool // denotes the flow is in partial execution state finished bool // denots the flow has finished execution + tracer *traceHandler // Handler tracing + stateStore faasflow.StateStore // the state store dataStore faasflow.DataStore // the data store } @@ -62,16 +66,6 @@ func (fhandler *flowHandler) GetRequestState() (bool, error) { return false, err } -func (fhandler *flowHandler) InitVertexIndegreeCounter(vertexs []string) error { - for _, vertex := range vertexs { - err := fhandler.stateStore.Set(vertex, "0") - if err != nil { - return fmt.Errorf("failed to create counter for vertex %s, error %v", vertex, err) - } - } - return nil -} - func (fhandler *flowHandler) SetDynamicBranchOptions(nodeUniqueId string, options []string) error { encoded, err := json.Marshal(options) if err != nil { @@ -213,7 +207,6 @@ func verifyRequest() bool { func getHmacKey() string { key, keyErr := readSecret("faasflow-hmac-secret") if keyErr != nil { - log.Printf("Failed to load faasflow-hmac-secret using default") key = defaultHmacKey } return key @@ -310,18 +303,21 @@ func createContext(fhandler *flowHandler) *faasflow.Context { } // buildWorkflow builds a flow and context from raw request or partial completed request -func buildWorkflow(data []byte) (fhandler *flowHandler, requestData []byte) { +func buildWorkflow(data []byte, queryString string, + header http.Header, + tracer *traceHandler) (fhandler *flowHandler, requestData []byte) { requestId := "" var validateErr error if hmacEnabled() { - digest := os.Getenv("Http_X_Hub_Signature") + digest := "" + digest = header.Get("X-Hub-Signature") key := getHmacKey() if len(digest) > 0 { validateErr = hmac.Validate(data, digest, key) } else { - validateErr = fmt.Errorf("Http_X_Hub_Signature is not set") + validateErr = fmt.Errorf("X-Hub-Signature is not set") } } @@ -335,22 +331,21 @@ func buildWorkflow(data []byte) (fhandler *flowHandler, requestData []byte) { panic(fmt.Sprintf("Failed to verify incoming request with Hmac, %v", validateErr.Error())) } else { - log.Printf("Incoming request verified successfully") + fmt.Printf("Incoming request verified successfully") } } // Generate request Id requestId = xid.New().String() - log.Printf("[Request `%s`] Created", requestId) + fmt.Printf("[Request `%s`] Created\n", requestId) // create flow properties - query := os.Getenv("Http_Query") dataStore := createDataStore() // Create fhandler fhandler = newWorkflowHandler("http://"+getGateway(), flowName, - requestId, query, dataStore) + requestId, queryString, dataStore) // set request data requestData = data @@ -359,19 +354,19 @@ func buildWorkflow(data []byte) (fhandler *flowHandler, requestData []byte) { fhandler.partial = false // trace req - mark as start of req - startReqSpan(requestId) + tracer.startReqSpan(requestId) default: // Partial Request // Get the request ID requestId = request.getID() - log.Printf("[Request `%s`] Received", requestId) + fmt.Printf("[Request `%s`] Received\n", requestId) if hmacEnabled() { if validateErr != nil { panic(fmt.Sprintf("[Request `%s`] Invalid Hmac, %v", requestId, validateErr.Error())) } else { - log.Printf("[Request `%s`] Valid Hmac", requestId) + fmt.Printf("[Request `%s`] Valid Hmac\n", requestId) } } @@ -392,9 +387,9 @@ func buildWorkflow(data []byte) (fhandler *flowHandler, requestData []byte) { fhandler.partial = true // Continue request span - continueReqSpan(requestId) + tracer.continueReqSpan(requestId, header) } - + fhandler.tracer = tracer return } @@ -465,36 +460,42 @@ func executeCallback(pipeline *sdk.Pipeline, operation *sdk.Operation, data []by } -// execute executes a node on a faas-flow dag -func execute(fhandler *flowHandler, request []byte) ([]byte, error) { - var result []byte - var err error - - pipeline := fhandler.getPipeline() - - currentNode, currentDag := pipeline.GetCurrentNodeDag() +// findCurrentNodeToExecute find right node to execute based on state +func findCurrentNodeToExecute(fhandler *flowHandler) { + currentNode, currentDag := fhandler.getPipeline().GetCurrentNodeDag() - log.Printf("[Request `%s`] Executing node %s", fhandler.id, currentNode.GetUniqueId()) + fmt.Printf("[Request `%s`] Executing node %s\n", fhandler.id, currentNode.GetUniqueId()) // recurse to the subdag - if a node is dynamic stop to evaluate it for true { + // break if request is dynamic if currentNode.Dynamic() { - // trace node - mark as start of node - startNodeSpan(currentNode.GetUniqueId(), fhandler.id) - return request, nil + return } subdag := currentNode.SubDag() if subdag == nil { break } + // trace node - mark as start of the parent node + fhandler.tracer.startNodeSpan(currentNode.GetUniqueId(), fhandler.id) currentDag = subdag currentNode = currentDag.GetInitialNode() - log.Printf("[Request `%s`] Executing node %s", fhandler.id, currentNode.GetUniqueId()) - pipeline.UpdatePipelineExecutionPosition(sdk.DEPTH_INCREMENT, currentNode.Id) + fmt.Printf("[Request `%s`] Executing node %s\n", fhandler.id, currentNode.GetUniqueId()) + fhandler.getPipeline().UpdatePipelineExecutionPosition(sdk.DEPTH_INCREMENT, currentNode.Id) } +} + +// execute executes a node on a faas-flow dag +func execute(fhandler *flowHandler, request []byte) ([]byte, error) { + var result []byte + var err error + + pipeline := fhandler.getPipeline() + + currentNode, _ := pipeline.GetCurrentNodeDag() // trace node - mark as start of node - startNodeSpan(currentNode.GetUniqueId(), fhandler.id) + fhandler.tracer.startNodeSpan(currentNode.GetUniqueId(), fhandler.id) // Execute all operation for _, operation := range currentNode.Operations() { @@ -502,7 +503,7 @@ func execute(fhandler *flowHandler, request []byte) ([]byte, error) { switch { // If function case operation.Function != "": - log.Printf("[Request `%s`] Executing function `%s`", + fmt.Printf("[Request `%s`] Executing function `%s`\n", fhandler.id, operation.Function) if result == nil { result, err = executeFunction(pipeline, operation, request) @@ -521,7 +522,7 @@ func execute(fhandler *flowHandler, request []byte) ([]byte, error) { } // If callback case operation.CallbackUrl != "": - log.Printf("[Request `%s`] Executing callback `%s`", + fmt.Printf("[Request `%s`] Executing callback `%s`\n", fhandler.id, operation.CallbackUrl) if result == nil { err = executeCallback(pipeline, operation, request) @@ -541,7 +542,7 @@ func execute(fhandler *flowHandler, request []byte) ([]byte, error) { // If modifier default: - log.Printf("[Request `%s`] Executing modifier", fhandler.id) + fmt.Printf("[Request `%s`] Executing modifier\n", fhandler.id) if result == nil { result, err = operation.Mod(request) } else { @@ -558,7 +559,7 @@ func execute(fhandler *flowHandler, request []byte) ([]byte, error) { } } - log.Printf("[Request `%s`] Completed execution of Node %s", fhandler.id, currentNode.GetUniqueId()) + fmt.Printf("[Request `%s`] Completed execution of Node %s\n", fhandler.id, currentNode.GetUniqueId()) return result, nil } @@ -602,7 +603,8 @@ func forwardAsync(fhandler *flowHandler, currentNodeId string, result []byte) ([ } // extend req span for async call (TODO : Get the value) - extendReqSpan(currentNodeId, fhandler.asyncUrl, httpreq) + fhandler.tracer.extendReqSpan(fhandler.id, currentNodeId, + fhandler.asyncUrl, httpreq) client := &http.Client{} res, resErr := client.Do(httpreq) @@ -617,16 +619,20 @@ func forwardAsync(fhandler *flowHandler, currentNodeId string, result []byte) ([ return resdata, nil } -func handleDynamicNode(fhandler *flowHandler, context *faasflow.Context, result []byte) ([]byte, error) { +func executeDynamic(fhandler *flowHandler, context *faasflow.Context, result []byte) ([]byte, error) { // get pipeline pipeline := fhandler.getPipeline() currentNode, _ := pipeline.GetCurrentNodeDag() + // trace node - mark as start of the dynamic node + fhandler.tracer.startNodeSpan(currentNode.GetUniqueId(), + fhandler.id) + currentNodeUniqueId := currentNode.GetUniqueId() - defer stopNodeSpan(currentNodeUniqueId) + defer fhandler.tracer.stopNodeSpan(currentNodeUniqueId) - log.Printf("[Request `%s`] Processing dynamic node %s", fhandler.id, currentNodeUniqueId) + fmt.Printf("[Request `%s`] Processing dynamic node %s\n", fhandler.id, currentNodeUniqueId) // subresults and subdags subresults := make(map[string][]byte) @@ -636,10 +642,9 @@ func handleDynamicNode(fhandler *flowHandler, context *faasflow.Context, result condition := currentNode.GetCondition() foreach := currentNode.GetForEach() - branchCount := 0 - switch { case condition != nil: + fmt.Printf("[Request `%s`] Executing condition\n", fhandler.id) conditions := condition(result) if conditions == nil { panic(fmt.Sprintf("Condition function at %s returned nil, failed to proceed", @@ -657,9 +662,9 @@ func handleDynamicNode(fhandler *flowHandler, context *faasflow.Context, result } subresults[conditionKey] = result options = append(options, conditionKey) - branchCount = branchCount + 1 } case foreach != nil: + fmt.Printf("[Request `%s`] Executing foreach\n", fhandler.id) foreachResults := foreach(result) if foreachResults == nil { panic(fmt.Sprintf("Foreach function at %s returned nil, failed to proceed", @@ -673,55 +678,53 @@ func handleDynamicNode(fhandler *flowHandler, context *faasflow.Context, result subdags[foreachKey] = currentNode.SubDag() subresults[foreachKey] = foreachResult options = append(options, foreachKey) - branchCount = branchCount + 1 } } + branchCount := len(options) if branchCount == 0 { return nil, fmt.Errorf("[Request `%s`] Dynamic Node %s, failed to execute as condition/foreach returned no option", fhandler.id, currentNodeUniqueId) } - // Increment/Set the no of dynamic branch count for the next nodes - for _, cnode := range currentNode.Children() { - indegree, err := fhandler.IncrementCounter( - cnode.GetUniqueId()+"-dynamic-indegree", branchCount-1) - if err != nil { - return nil, fmt.Errorf("[Request `%s`] Failed to store/increment dynamic indegree count of %s, err %v", - fhandler.id, cnode.GetUniqueId(), err) - } - log.Printf("[Request `%s`] Dynamic indegree count for %s set to %d", - fhandler.id, cnode.GetUniqueId(), indegree) + // Set the no of branch completion for the current dynamic node + key := pipeline.GetNodeExecutionUniqueId(currentNode) + "-branch-completion" + _, err := fhandler.IncrementCounter(key, 0) + if err != nil { + return nil, fmt.Errorf("[Request `%s`] Failed to initiate dynamic indegree count for %s, err %v", + fhandler.id, key, err) } + fmt.Printf("[Request `%s`] Dynamic indegree count initiated as %s\n", + fhandler.id, key) + // Set all the dynamic options for the current dynamic node - err := fhandler.SetDynamicBranchOptions(currentNodeUniqueId+"-dynamic-branch-options", options) + key = pipeline.GetNodeExecutionUniqueId(currentNode) + "-dynamic-branch-options" + err = fhandler.SetDynamicBranchOptions(key, options) if err != nil { return nil, fmt.Errorf("[Request `%s`] Dynamic Node %s, failed to store dynamic options", fhandler.id, currentNodeUniqueId) } - for dynamicKey, subdag := range subdags { + fmt.Printf("[Request `%s`] Dynamic options initiated as %s\n", + fhandler.id, key) - subNode := subdag.GetInitialNode() - intermediateData := subresults[dynamicKey] + for option, subdag := range subdags { - err := fhandler.InitVertexIndegreeCounter(subdag.GetNodes(dynamicKey)) - if err != nil { - return nil, fmt.Errorf("[Request `%s`] Dynamic DAG %s key %s state can not be initiated at StateStore, %v", - fhandler.id, subdag.Id, dynamicKey, err) - } + subNode := subdag.GetInitialNode() + intermediateData := subresults[option] // If forwarder is not nil its not an execution flow if currentNode.GetForwarder("dynamic") != nil { - //