Skip to content

Commit

Permalink
Merge pull request #106 from s8sg/execute-operation
Browse files Browse the repository at this point in the history
Add Execute method for operation with options
  • Loading branch information
s8sg authored Aug 26, 2019
2 parents 9b40dc2 + a93c63b commit e5e6805
Show file tree
Hide file tree
Showing 10 changed files with 477 additions and 220 deletions.
209 changes: 209 additions & 0 deletions faas_operation.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
package faasflow

import (
"bytes"
"fmt"
"io/ioutil"
"net/http"
"net/url"
"os"
"path"
"strings"
)

Expand Down Expand Up @@ -107,6 +113,209 @@ func (operation *FaasOperation) GetId() string {
return id
}

func (operation *FaasOperation) Encode() []byte {
return []byte("")
}

// buildURL builds OpenFaaS function execution url for the flow
func buildURL(gateway, rPath, function string) string {
u, _ := url.Parse(gateway)
u.Path = path.Join(u.Path, rPath+"/"+function)
return u.String()
}

// makeQueryStringFromParam create query string from provided query
func makeQueryStringFromParam(params map[string][]string) string {
if params == nil {
return ""
}
result := ""
for key, array := range params {
for _, value := range array {
keyVal := fmt.Sprintf("%s-%s", key, value)
if result == "" {
result = "?" + keyVal
} else {
result = result + "&" + keyVal
}
}
}
return result
}

// buildHttpRequest build upstream request for function
func buildHttpRequest(url string, method string, data []byte, params map[string][]string,
headers map[string]string) (*http.Request, error) {

queryString := makeQueryStringFromParam(params)
if queryString != "" {
url = url + queryString
}

httpReq, err := http.NewRequest(method, url, bytes.NewReader(data))
if err != nil {
return nil, err
}

for key, value := range headers {
httpReq.Header.Add(key, value)
}

return httpReq, nil
}

// executeFunction executes a function call
func executeFunction(gateway string, operation *FaasOperation, data []byte) ([]byte, error) {
var err error
var result []byte

name := operation.Function
params := operation.GetParams()
headers := operation.GetHeaders()

funcUrl := buildURL("http://"+gateway, "function", name)

method := os.Getenv("default-method")
if method == "" {
method = "POST"
}

if m, ok := headers["method"]; ok {
method = m
}

httpReq, err := buildHttpRequest(funcUrl, method, data, params, headers)
if err != nil {
return []byte{}, fmt.Errorf("cannot connect to Function on URL: %s", funcUrl)
}

if operation.Requesthandler != nil {
operation.Requesthandler(httpReq)
}

client := &http.Client{}
resp, err := client.Do(httpReq)
if err != nil {
return []byte{}, err
}

defer resp.Body.Close()
if operation.OnResphandler != nil {
result, err = operation.OnResphandler(resp)
} else {
if resp.StatusCode < 200 || resp.StatusCode > 299 {
err = fmt.Errorf("invalid return status %d while connecting %s", resp.StatusCode, funcUrl)
result, _ = ioutil.ReadAll(resp.Body)
} else {
result, err = ioutil.ReadAll(resp.Body)
}
}

return result, err
}

// executeCallback executes a callback
func executeCallback(operation *FaasOperation, data []byte) error {
var err error

cbUrl := operation.CallbackUrl
params := operation.GetParams()
headers := operation.GetHeaders()

method := os.Getenv("default-method")
if method == "" {
method = "POST"
}

if m, ok := headers["method"]; ok {
method = m
}

httpReq, err := buildHttpRequest(cbUrl, method, data, params, headers)
if err != nil {
return fmt.Errorf("cannot connect to Function on URL: %s", cbUrl)
}

if operation.Requesthandler != nil {
operation.Requesthandler(httpReq)
}

client := &http.Client{}
resp, err := client.Do(httpReq)
if err != nil {
return err
}

defer resp.Body.Close()
if operation.OnResphandler != nil {
_, err = operation.OnResphandler(resp)
} else {
if resp.StatusCode < 200 || resp.StatusCode > 299 {
cbResult, _ := ioutil.ReadAll(resp.Body)
err := fmt.Errorf("%v:%s", err, string(cbResult))
return err
}
}
return err

}

func (operation *FaasOperation) Execute(data []byte, option map[string]interface{}) ([]byte, error) {
var result []byte
var err error

reqId := fmt.Sprintf("%v", option["request-id"])
gateway := fmt.Sprintf("%v", option["gateway"])

switch {
// If function
case operation.Function != "":
fmt.Printf("[Request `%s`] Executing function `%s`\n",
reqId, operation.Function)
result, err = executeFunction(gateway, operation, data)
if err != nil {
err = fmt.Errorf("Function(%s), error: function execution failed, %v",
operation.Function, err)
if operation.FailureHandler != nil {
err = operation.FailureHandler(err)
}
if err != nil {
return nil, err
}
}

// If callback
case operation.CallbackUrl != "":
fmt.Printf("[Request `%s`] Executing callback `%s`\n",
reqId, operation.CallbackUrl)
err = executeCallback(operation, data)
if err != nil {
err = fmt.Errorf("Callback(%s), error: callback failed, %v",
operation.CallbackUrl, err)
if operation.FailureHandler != nil {
err = operation.FailureHandler(err)
}
if err != nil {
return nil, err
}
}

// If modifier
default:
fmt.Printf("[Request `%s`] Executing modifier\n", reqId)
result, err = operation.Mod(data)
if err != nil {
err = fmt.Errorf("error: Failed at modifier, %v", err)
return nil, err
}
if result == nil {
result = []byte("")
}
}

return result, nil
}

func (operation *FaasOperation) GetProperties() map[string][]string {

result := make(map[string][]string)
Expand Down
13 changes: 8 additions & 5 deletions sdk/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ func (req *PartialState) Encode() ([]byte, error) {
// ExecutionRuntime implements how operation executed and handle next nodes in async
type ExecutionRuntime interface {
// HandleNextNode handles execution of next nodes based on partial state
HandleNextNode(*PartialState) (err error)
// ExecuteOperation implements execution of an operation
ExecuteOperation(sdk.Operation, []byte) ([]byte, error)
HandleNextNode(state *PartialState) (err error)
// Provide an execution option that will be passed to the operation
GetExecutionOption(operation sdk.Operation) map[string]interface{}
}

// Executor implements a faas-flow executor
Expand Down Expand Up @@ -345,10 +345,13 @@ func (fexec *FlowExecutor) executeNode(request []byte) ([]byte, error) {
if fexec.executor.MonitoringEnabled() {
fexec.eventHandler.ReportOperationStart(operation.GetId(), currentNode.GetUniqueId(), fexec.id)
}

options := fexec.executor.GetExecutionOption(operation)

if result == nil {
result, err = fexec.executor.ExecuteOperation(operation, request)
result, err = operation.Execute(request, options)
} else {
result, err = fexec.executor.ExecuteOperation(operation, result)
result, err = operation.Execute(result, options)
}
if err != nil {
if fexec.executor.MonitoringEnabled() {
Expand Down
11 changes: 11 additions & 0 deletions sdk/operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@ import ()

type Operation interface {
GetId() string
Encode() []byte
GetProperties() map[string][]string
// Execute executes an operation, executor can pass configuration
Execute([]byte, map[string]interface{}) ([]byte, error)
}

type BlankOperation struct {
Expand All @@ -14,6 +17,14 @@ func (ops *BlankOperation) GetId() string {
return "end"
}

func (ops *BlankOperation) Encode() []byte {
return []byte("")
}

func (ops *BlankOperation) GetProperties() map[string][]string {
return make(map[string][]string)
}

func (ops *BlankOperation) Execute(data []byte, option map[string]interface{}) ([]byte, error) {
return data, nil
}
69 changes: 10 additions & 59 deletions template/faas-flow/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ var (
// implements faasflow.EventHandler
type openFaasEventHandler struct {
currentNodeId string // used to inject current node id in tracer
tracer *traceHandler // handle traces with opentracing
tracer *traceHandler // handle traces with open-tracing
flowName string
header http.Header
}
Expand Down Expand Up @@ -129,63 +129,6 @@ func (eh *openFaasEventHandler) Flush() {

// ExecutionRuntime

func (of *openFaasExecutor) ExecuteOperation(operation sdk.Operation, data []byte) ([]byte, error) {
var result []byte
var err error

ofoperation, ok := operation.(*faasflow.FaasOperation)
if !ok {
return nil, fmt.Errorf("Operation is not of type faasflow.OpenfaasOperation")
}

switch {
// If function
case ofoperation.Function != "":
fmt.Printf("[Request `%s`] Executing function `%s`\n",
of.reqId, ofoperation.Function)
result, err = executeFunction(of.gateway, ofoperation, data)
if err != nil {
err = fmt.Errorf("Function(%s), error: function execution failed, %v",
ofoperation.Function, err)
if ofoperation.FailureHandler != nil {
err = ofoperation.FailureHandler(err)
}
if err != nil {
return nil, err
}
}
// If callback
case ofoperation.CallbackUrl != "":
fmt.Printf("[Request `%s`] Executing callback `%s`\n",
of.reqId, ofoperation.CallbackUrl)
err = executeCallback(ofoperation, data)
if err != nil {
err = fmt.Errorf("Callback(%s), error: callback failed, %v",
ofoperation.CallbackUrl, err)
if ofoperation.FailureHandler != nil {
err = ofoperation.FailureHandler(err)
}
if err != nil {
return nil, err
}
}

// If modifier
default:
fmt.Printf("[Request `%s`] Executing modifier\n", of.reqId)
result, err = ofoperation.Mod(data)
if err != nil {
err = fmt.Errorf("error: Failed at modifier, %v", err)
return nil, err
}
if result == nil {
result = []byte("")
}
}

return result, nil
}

func (of *openFaasExecutor) HandleNextNode(partial *executor.PartialState) error {

state, err := partial.Encode()
Expand All @@ -198,6 +141,7 @@ func (of *openFaasExecutor) HandleNextNode(partial *executor.PartialState) error
httpreq.Header.Add("Accept", "application/json")
httpreq.Header.Add("Content-Type", "application/json")
httpreq.Header.Add("X-Faas-Flow-Reqid", of.reqId)
httpreq.Header.Set("X-Faas-Flow-State", "partial")

// extend req span for async call
of.tracer.extendReqSpan(of.reqId, of.openFaasEventHandler.currentNodeId,
Expand All @@ -218,6 +162,14 @@ func (of *openFaasExecutor) HandleNextNode(partial *executor.PartialState) error
return nil
}

func (of *openFaasExecutor) GetExecutionOption(operation sdk.Operation) map[string]interface{} {
options := make(map[string]interface{})
options["gateway"] = of.gateway
options["request-id"] = of.reqId

return options
}

// Executor

func (of *openFaasExecutor) Configure(requestId string) {
Expand Down Expand Up @@ -399,7 +351,6 @@ func (of *openFaasExecutor) Handle(req *HttpRequest, response *HttpResponse) err
}
response.Body = resp
response.Header.Set("X-Faas-Flow-Reqid", of.reqId)
response.Header.Set("X-Faas-Flow-State", "partial")
}

response.StatusCode = http.StatusOK
Expand Down
Loading

0 comments on commit e5e6805

Please sign in to comment.