From b268647f632041f8819dbcd8131e38803765047e Mon Sep 17 00:00:00 2001 From: pasdam Date: Sat, 6 Jun 2020 11:55:04 +0800 Subject: [PATCH] Refactor template to separate packages, extract request handlers and add REST api --- template/faas-flow/config/consul_dc.go | 13 + template/faas-flow/config/consul_url.go | 13 + template/faas-flow/config/gateway_url.go | 14 + .../config/parse_int_or_duration_value.go | 21 + template/faas-flow/config/read_timeout.go | 10 + template/faas-flow/config/trace_server.go | 14 + template/faas-flow/config/write_timeout.go | 10 + .../executor/execute_flow_handler.go | 65 +++ template/faas-flow/executor/executor.go | 8 + .../faas-flow/executor/flow_state_handler.go | 22 + .../faas-flow/executor/get_dag_handler.go | 22 + template/faas-flow/executor/handle_error.go | 13 + .../faas-flow/executor/init_data_store.go | 52 +++ .../faas-flow/executor/init_request_tracer.go | 46 ++ .../faas-flow/executor/init_state_store.go | 29 ++ .../executor/legacy_request_handler.go | 45 ++ .../executor/new_request_executor.go | 16 + .../executor/new_request_handler_wrapper.go | 29 ++ .../executor/open_faas_event_handler.go | 80 ++++ .../faas-flow/executor/open_faas_executor.go | 203 +++++++++ .../faas-flow/executor/pause_flow_handler.go | 21 + .../faas-flow/executor/resume_flow_handler.go | 21 + template/faas-flow/executor/router.go | 20 + template/faas-flow/executor/start_server.go | 37 ++ .../faas-flow/executor/stop_flow_handler.go | 21 + .../{tracer.go => executor/trace_handler.go} | 127 +----- template/faas-flow/executor/util.go | 79 ++++ template/faas-flow/go.mod | 1 + template/faas-flow/go.sum | 2 + template/faas-flow/handler.go | 404 ------------------ template/faas-flow/log/std_out_logger.go | 17 + template/faas-flow/main.go | 203 +-------- template/faas-flow/openfaas/read_secret.go | 26 ++ template/faas-flow/util.go | 109 ----- 34 files changed, 993 insertions(+), 820 deletions(-) create mode 100644 template/faas-flow/config/consul_dc.go create mode 100644 template/faas-flow/config/consul_url.go create mode 100644 template/faas-flow/config/gateway_url.go create mode 100644 template/faas-flow/config/parse_int_or_duration_value.go create mode 100644 template/faas-flow/config/read_timeout.go create mode 100644 template/faas-flow/config/trace_server.go create mode 100644 template/faas-flow/config/write_timeout.go create mode 100644 template/faas-flow/executor/execute_flow_handler.go create mode 100644 template/faas-flow/executor/executor.go create mode 100644 template/faas-flow/executor/flow_state_handler.go create mode 100644 template/faas-flow/executor/get_dag_handler.go create mode 100644 template/faas-flow/executor/handle_error.go create mode 100644 template/faas-flow/executor/init_data_store.go create mode 100644 template/faas-flow/executor/init_request_tracer.go create mode 100644 template/faas-flow/executor/init_state_store.go create mode 100644 template/faas-flow/executor/legacy_request_handler.go create mode 100644 template/faas-flow/executor/new_request_executor.go create mode 100644 template/faas-flow/executor/new_request_handler_wrapper.go create mode 100644 template/faas-flow/executor/open_faas_event_handler.go create mode 100644 template/faas-flow/executor/open_faas_executor.go create mode 100644 template/faas-flow/executor/pause_flow_handler.go create mode 100644 template/faas-flow/executor/resume_flow_handler.go create mode 100644 template/faas-flow/executor/router.go create mode 100644 template/faas-flow/executor/start_server.go create mode 100644 template/faas-flow/executor/stop_flow_handler.go rename template/faas-flow/{tracer.go => executor/trace_handler.go} (51%) create mode 100644 template/faas-flow/executor/util.go delete mode 100644 template/faas-flow/handler.go create mode 100644 template/faas-flow/log/std_out_logger.go create mode 100644 template/faas-flow/openfaas/read_secret.go delete mode 100644 template/faas-flow/util.go diff --git a/template/faas-flow/config/consul_dc.go b/template/faas-flow/config/consul_dc.go new file mode 100644 index 00000000..1a3a3548 --- /dev/null +++ b/template/faas-flow/config/consul_dc.go @@ -0,0 +1,13 @@ +package config + +import ( + "os" +) + +func ConsulDC() string { + val := os.Getenv("consul_dc") + if len(val) == 0 { + val = "dc1" + } + return val +} diff --git a/template/faas-flow/config/consul_url.go b/template/faas-flow/config/consul_url.go new file mode 100644 index 00000000..b90e3d09 --- /dev/null +++ b/template/faas-flow/config/consul_url.go @@ -0,0 +1,13 @@ +package config + +import ( + "os" +) + +func ConsulURL() string { + val := os.Getenv("consul_url") + if len(val) == 0 { + val = "consul.faasflow:8500" + } + return val +} diff --git a/template/faas-flow/config/gateway_url.go b/template/faas-flow/config/gateway_url.go new file mode 100644 index 00000000..615c37bd --- /dev/null +++ b/template/faas-flow/config/gateway_url.go @@ -0,0 +1,14 @@ +package config + +import ( + "os" +) + +// GatewayURL return the gateway address from env +func GatewayURL() string { + gateway := os.Getenv("gateway") + if gateway == "" { + gateway = "gateway.openfaas:8080" + } + return gateway +} diff --git a/template/faas-flow/config/parse_int_or_duration_value.go b/template/faas-flow/config/parse_int_or_duration_value.go new file mode 100644 index 00000000..ff143d75 --- /dev/null +++ b/template/faas-flow/config/parse_int_or_duration_value.go @@ -0,0 +1,21 @@ +package config + +import ( + "strconv" + "time" +) + +func parseIntOrDurationValue(val string, fallback time.Duration) time.Duration { + if len(val) > 0 { + parsedVal, parseErr := strconv.Atoi(val) + if parseErr == nil && parsedVal >= 0 { + return time.Duration(parsedVal) * time.Second + } + } + + duration, durationErr := time.ParseDuration(val) + if durationErr != nil { + return fallback + } + return duration +} diff --git a/template/faas-flow/config/read_timeout.go b/template/faas-flow/config/read_timeout.go new file mode 100644 index 00000000..e44bea20 --- /dev/null +++ b/template/faas-flow/config/read_timeout.go @@ -0,0 +1,10 @@ +package config + +import ( + "os" + "time" +) + +func ReadTimeout() time.Duration { + return parseIntOrDurationValue(os.Getenv("read_timeout"), 10*time.Second) +} diff --git a/template/faas-flow/config/trace_server.go b/template/faas-flow/config/trace_server.go new file mode 100644 index 00000000..a2b84709 --- /dev/null +++ b/template/faas-flow/config/trace_server.go @@ -0,0 +1,14 @@ +package config + +import ( + "os" +) + +// TraceServer get the traceserver address +func TraceServer() string { + traceServer := os.Getenv("trace_server") + if traceServer == "" { + traceServer = "jaeger.faasflow:5775" + } + return traceServer +} diff --git a/template/faas-flow/config/write_timeout.go b/template/faas-flow/config/write_timeout.go new file mode 100644 index 00000000..d799431e --- /dev/null +++ b/template/faas-flow/config/write_timeout.go @@ -0,0 +1,10 @@ +package config + +import ( + "os" + "time" +) + +func WriteTimeout() time.Duration { + return parseIntOrDurationValue(os.Getenv("write_timeout"), 10*time.Second) +} diff --git a/template/faas-flow/executor/execute_flow_handler.go b/template/faas-flow/executor/execute_flow_handler.go new file mode 100644 index 00000000..8ef38eb7 --- /dev/null +++ b/template/faas-flow/executor/execute_flow_handler.go @@ -0,0 +1,65 @@ +package executor + +import ( + "errors" + "fmt" + "io/ioutil" + "log" + "net/http" + + "github.com/s8sg/faas-flow/sdk/executor" +) + +func executeFlowHandler(w http.ResponseWriter, req *http.Request, id string, ex executor.Executor) ([]byte, error) { + log.Printf("Requested %s %s\n", req.Method, req.URL) + log.Printf("Executing flow %s\n", id) + + var stateOption executor.ExecutionStateOption + + body, err := ioutil.ReadAll(req.Body) + if err != nil { + return nil, err + } + + // TODO: fix this + openFaasEx := ex.(*openFaasExecutor) + + state := req.Header.Get("X-Faas-Flow-State") + callbackURL := req.Header.Get("X-Faas-Flow-Callback-Url") + openFaasEx.callbackURL = callbackURL + if state == "" { + rawRequest := &executor.RawRequest{} + rawRequest.Data = body + rawRequest.Query = req.URL.RawQuery + rawRequest.AuthSignature = req.Header.Get("X-Hub-Signature") + // Check if any request Id is passed + if id != "" { + rawRequest.RequestId = id + } + stateOption = executor.NewRequest(rawRequest) + + } else { + if id == "" { + return nil, errors.New("request ID not set in partial request") + } + + openFaasEx.openFaasEventHandler.header = req.Header + partialState, err := executor.DecodePartialReq(body) + if err != nil { + return nil, errors.New("failed to decode partial state") + } + stateOption = executor.PartialRequest(partialState) + } + + // Create a flow executor, OpenFaaSExecutor implements executor + flowExecutor := executor.CreateFlowExecutor(ex, nil) + resp, err := flowExecutor.Execute(stateOption) + if err != nil { + return nil, fmt.Errorf("failed to execute request. %s", err.Error()) + } + headers := w.Header() + headers["X-Faas-Flow-Reqid"] = []string{flowExecutor.GetReqId()} + headers["X-Faas-Flow-Callback-Url"] = []string{callbackURL} + + return resp, nil +} diff --git a/template/faas-flow/executor/executor.go b/template/faas-flow/executor/executor.go new file mode 100644 index 00000000..47768f8d --- /dev/null +++ b/template/faas-flow/executor/executor.go @@ -0,0 +1,8 @@ +package executor + +import "github.com/s8sg/faas-flow/sdk" + +var ( + stateStore sdk.StateStore + dataStore sdk.DataStore +) diff --git a/template/faas-flow/executor/flow_state_handler.go b/template/faas-flow/executor/flow_state_handler.go new file mode 100644 index 00000000..df67e7f1 --- /dev/null +++ b/template/faas-flow/executor/flow_state_handler.go @@ -0,0 +1,22 @@ +package executor + +import ( + "fmt" + "log" + "net/http" + + "github.com/s8sg/faas-flow/sdk/executor" +) + +func flowStateHandler(w http.ResponseWriter, req *http.Request, id string, ex executor.Executor) ([]byte, error) { + log.Printf("Get flow state: %s\n", id) + + flowExecutor := executor.CreateFlowExecutor(ex, nil) + state, err := flowExecutor.GetState(id) + if err != nil { + log.Printf(err.Error()) + return nil, fmt.Errorf("failed to get request state for %s, check if request is active", id) + } + + return []byte(state), nil +} diff --git a/template/faas-flow/executor/get_dag_handler.go b/template/faas-flow/executor/get_dag_handler.go new file mode 100644 index 00000000..0cc6e7d2 --- /dev/null +++ b/template/faas-flow/executor/get_dag_handler.go @@ -0,0 +1,22 @@ +package executor + +import ( + "fmt" + "log" + "net/http" + + "github.com/s8sg/faas-flow/sdk/executor" + "github.com/s8sg/faas-flow/sdk/exporter" +) + +func getDagHandler(w http.ResponseWriter, req *http.Request, id string, ex executor.Executor) ([]byte, error) { + log.Println("Exporting flow's DAG") + + flowExporter := exporter.CreateFlowExporter(ex) + resp, err := flowExporter.Export() + if err != nil { + return nil, fmt.Errorf("failed to export dag, error %v", err) + } + + return resp, nil +} diff --git a/template/faas-flow/executor/handle_error.go b/template/faas-flow/executor/handle_error.go new file mode 100644 index 00000000..73a992f5 --- /dev/null +++ b/template/faas-flow/executor/handle_error.go @@ -0,0 +1,13 @@ +package executor + +import ( + "fmt" + "net/http" +) + +func handleError(w http.ResponseWriter, message string) { + errorStr := fmt.Sprintf("[ Failed ] %v\n", message) + fmt.Printf(errorStr) + w.WriteHeader(http.StatusInternalServerError) + w.Write([]byte(errorStr)) +} diff --git a/template/faas-flow/executor/init_data_store.go b/template/faas-flow/executor/init_data_store.go new file mode 100644 index 00000000..eb9e5e0f --- /dev/null +++ b/template/faas-flow/executor/init_data_store.go @@ -0,0 +1,52 @@ +package executor + +import ( + "log" + + "handler/function" + + minioDataStore "github.com/s8sg/faas-flow-minio-datastore" + "github.com/s8sg/faas-flow/sdk" +) + +func initDataStore() (dataStore sdk.DataStore, err error) { + dataStore, err = function.OverrideDataStore() + if err != nil { + return nil, err + } + if dataStore == nil { + + /* + minioUrl := os.Getenv("s3_url") + if len(minioUrl) == 0 { + minioUrl = "minio.faasflow:9000" + } + + minioRegion := os.Getenv("s3_region") + if len(minioRegion) == 0 { + minioUrl = "us-east-1" + } + + secretKeyName := os.Getenv("s3_secret_key_name") + if len(secretKeyName) == 0 { + secretKeyName = "s3-secret-key" + } + + accessKeyName := os.Getenv("s3_access_key_name") + if len(accessKeyName) == 0 { + accessKeyName = "s3-access-key" + } + + tlsEnabled := false + if connection := os.Getenv("s3_tls"); connection == "true" || connection == "1" { + tlsEnabled = true + } + + dataStore, err = minioDataStore.Init(minioUrl, minioRegion, secretKeyName, accessKeyName, tlsEnabled) + */ + dataStore, err = minioDataStore.InitFromEnv() + + log.Print("Using default data store (minio)") + } + return dataStore, err +} diff --git a/template/faas-flow/executor/init_request_tracer.go b/template/faas-flow/executor/init_request_tracer.go new file mode 100644 index 00000000..a3a127f7 --- /dev/null +++ b/template/faas-flow/executor/init_request_tracer.go @@ -0,0 +1,46 @@ +package executor + +import ( + "fmt" + "time" + + hconfig "handler/config" + + "github.com/opentracing/opentracing-go" + "github.com/uber/jaeger-client-go" + "github.com/uber/jaeger-client-go/config" +) + +// initRequestTracer init global trace with configuration +func initRequestTracer(flowName string) (*traceHandler, error) { + tracerObj := &traceHandler{} + + agentPort := hconfig.TraceServer() + + cfg := config.Configuration{ + ServiceName: flowName, + Sampler: &config.SamplerConfig{ + Type: "const", + Param: 1, + }, + Reporter: &config.ReporterConfig{ + LogSpans: true, + BufferFlushInterval: 1 * time.Second, + LocalAgentHostPort: agentPort, + }, + } + + opentracer, traceCloser, err := cfg.NewTracer( + config.Logger(jaeger.StdLogger), + ) + if err != nil { + return nil, fmt.Errorf("failed to init tracer, error %v", err.Error()) + } + + tracerObj.closer = traceCloser + tracerObj.tracer = opentracer + tracerObj.nodeSpans = make(map[string]opentracing.Span) + tracerObj.operationSpans = make(map[string]map[string]opentracing.Span) + + return tracerObj, nil +} diff --git a/template/faas-flow/executor/init_state_store.go b/template/faas-flow/executor/init_state_store.go new file mode 100644 index 00000000..d17ffed4 --- /dev/null +++ b/template/faas-flow/executor/init_state_store.go @@ -0,0 +1,29 @@ +package executor + +import ( + "log" + + "handler/config" + "handler/function" + + consulStateStore "github.com/s8sg/faas-flow-consul-statestore" + "github.com/s8sg/faas-flow/sdk" +) + +func initStateStore() (stateStore sdk.StateStore, err error) { + stateStore, err = function.OverrideStateStore() + if err != nil { + return nil, err + } + + if stateStore == nil { + log.Print("Using default state store (consul)") + + consulURL := config.ConsulURL() + consulDC := config.ConsulDC() + + stateStore, err = consulStateStore.GetConsulStateStore(consulURL, consulDC) + } + + return stateStore, err +} diff --git a/template/faas-flow/executor/legacy_request_handler.go b/template/faas-flow/executor/legacy_request_handler.go new file mode 100644 index 00000000..5ce5d04c --- /dev/null +++ b/template/faas-flow/executor/legacy_request_handler.go @@ -0,0 +1,45 @@ +package executor + +import ( + "net/http" + + "github.com/julienschmidt/httprouter" + "github.com/s8sg/faas-flow/sdk/executor" +) + +func legacyRequestHandler(w http.ResponseWriter, r *http.Request, p httprouter.Params) { + var handler func(http.ResponseWriter, *http.Request, string, executor.Executor) ([]byte, error) + id := "" + + switch { + case isDagExportRequest(r.URL.RawQuery): + handler = getDagHandler + + case getPauseRequestID(r.URL.RawQuery) != "": + id = getPauseRequestID(r.URL.RawQuery) + handler = pauseFlowHandler + + case getStopRequestID(r.URL.RawQuery) != "": + id = getStopRequestID(r.URL.RawQuery) + handler = stopFlowHandler + + case getResumeRequestID(r.URL.RawQuery) != "": + id = getResumeRequestID(r.URL.RawQuery) + handler = resumeFlowHandler + + case getStateRequestID(r.URL.RawQuery) != "": + id = getStateRequestID(r.URL.RawQuery) + handler = flowStateHandler + + default: + id = r.Header.Get("X-Faas-Flow-Reqid") + handler = executeFlowHandler + } + + p = append(p, httprouter.Param{ + Key: "id", + Value: id, + }) + + newRequestHandlerWrapper(handler)(w, r, p) +} diff --git a/template/faas-flow/executor/new_request_executor.go b/template/faas-flow/executor/new_request_executor.go new file mode 100644 index 00000000..2d444945 --- /dev/null +++ b/template/faas-flow/executor/new_request_executor.go @@ -0,0 +1,16 @@ +package executor + +import ( + "net/http" +) + +func newRequestExecutor(request *http.Request) (*openFaasExecutor, error) { + ex := &openFaasExecutor{stateStore: stateStore, dataStore: dataStore} + + err := ex.init(request) + if err != nil { + return nil, err + } + + return ex, nil +} diff --git a/template/faas-flow/executor/new_request_handler_wrapper.go b/template/faas-flow/executor/new_request_handler_wrapper.go new file mode 100644 index 00000000..f9781291 --- /dev/null +++ b/template/faas-flow/executor/new_request_handler_wrapper.go @@ -0,0 +1,29 @@ +package executor + +import ( + "fmt" + "net/http" + + "github.com/julienschmidt/httprouter" + "github.com/s8sg/faas-flow/sdk/executor" +) + +func newRequestHandlerWrapper(handler func(http.ResponseWriter, *http.Request, string, executor.Executor) ([]byte, error)) func(http.ResponseWriter, *http.Request, httprouter.Params) { + return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { + id := params.ByName("id") + + ex, err := newRequestExecutor(req) + if err != nil { + handleError(w, fmt.Sprintf("failed to execute request "+id)) + return + } + + body, err := handler(w, req, id, ex) + if err != nil { + handleError(w, fmt.Sprintf("request failed to be processed. "+err.Error())) + } + + w.WriteHeader(http.StatusOK) + w.Write(body) + } +} diff --git a/template/faas-flow/executor/open_faas_event_handler.go b/template/faas-flow/executor/open_faas_event_handler.go new file mode 100644 index 00000000..099db1e0 --- /dev/null +++ b/template/faas-flow/executor/open_faas_event_handler.go @@ -0,0 +1,80 @@ +package executor + +import ( + "fmt" + "net/http" +) + +// implements faasflow.EventHandler +type openFaasEventHandler struct { + currentNodeID string // used to inject current node id in tracer + tracer *traceHandler // handle traces with open-tracing + flowName string + header http.Header +} + +func (eh *openFaasEventHandler) Configure(flowName string, requestID string) { + eh.flowName = flowName +} + +func (eh *openFaasEventHandler) Init() error { + var err error + + // initialize trace server if tracing enabled + eh.tracer, err = initRequestTracer(eh.flowName) + if err != nil { + return fmt.Errorf("failed to init request tracer, error %v", err) + } + return nil +} + +func (eh *openFaasEventHandler) ReportRequestStart(requestID string) { + eh.tracer.startReqSpan(requestID) +} + +func (eh *openFaasEventHandler) ReportRequestFailure(requestID string, err error) { + // TODO: add log + eh.tracer.stopReqSpan() +} + +func (eh *openFaasEventHandler) ReportExecutionForward(currentNodeID string, requestID string) { + eh.currentNodeID = currentNodeID +} + +func (eh *openFaasEventHandler) ReportExecutionContinuation(requestID string) { + eh.tracer.continueReqSpan(requestID, eh.header) +} + +func (eh *openFaasEventHandler) ReportRequestEnd(requestID string) { + eh.tracer.stopReqSpan() +} + +func (eh *openFaasEventHandler) ReportNodeStart(nodeID string, requestID string) { + eh.tracer.startNodeSpan(nodeID, requestID) +} + +func (eh *openFaasEventHandler) ReportNodeEnd(nodeID string, requestID string) { + eh.tracer.stopNodeSpan(nodeID) +} + +func (eh *openFaasEventHandler) ReportNodeFailure(nodeID string, requestID string, err error) { + // TODO: add log + eh.tracer.stopNodeSpan(nodeID) +} + +func (eh *openFaasEventHandler) ReportOperationStart(operationID string, nodeID string, requestID string) { + eh.tracer.startOperationSpan(nodeID, requestID, operationID) +} + +func (eh *openFaasEventHandler) ReportOperationEnd(operationID string, nodeID string, requestID string) { + eh.tracer.stopOperationSpan(nodeID, operationID) +} + +func (eh *openFaasEventHandler) ReportOperationFailure(operationID string, nodeID string, requestID string, err error) { + // TODO: add log + eh.tracer.stopOperationSpan(nodeID, operationID) +} + +func (eh *openFaasEventHandler) Flush() { + eh.tracer.flushTracer() +} diff --git a/template/faas-flow/executor/open_faas_executor.go b/template/faas-flow/executor/open_faas_executor.go new file mode 100644 index 00000000..6b70e9f0 --- /dev/null +++ b/template/faas-flow/executor/open_faas_executor.go @@ -0,0 +1,203 @@ +package executor + +import ( + "bytes" + "fmt" + "io/ioutil" + "log" + "net/http" + "os" + "strings" + + "handler/config" + "handler/function" + hlog "handler/log" + "handler/openfaas" + + faasflow "github.com/s8sg/faas-flow" + "github.com/s8sg/faas-flow/sdk" + "github.com/s8sg/faas-flow/sdk/executor" +) + +// A signature of SHA265 equivalent of "github.com/s8sg/faas-flow" +const defaultHmacKey = "71F1D3011F8E6160813B4997BA29856744375A7F26D427D491E1CCABD4627E7C" + +// implements faasflow.Executor + RequestHandler +type openFaasExecutor struct { + gateway string + asyncURL string // the async URL of the flow + flowName string // the name of the function + reqID string // the request id + callbackURL string // the callback url + partialState []byte + rawRequest *executor.RawRequest + stateStore sdk.StateStore + dataStore sdk.DataStore + logger hlog.StdOutLogger + + openFaasEventHandler +} + +func (of *openFaasExecutor) HandleNextNode(partial *executor.PartialState) error { + + state, err := partial.Encode() + if err != nil { + return fmt.Errorf("failed to encode partial state, error %v", err) + } + + // build url for calling the flow in async + httpreq, _ := http.NewRequest(http.MethodPost, of.asyncURL, bytes.NewReader(state)) + httpreq.Header.Add("Accept", "application/json") + httpreq.Header.Add("Content-Type", "application/json") + httpreq.Header.Add("X-Faas-Flow-Reqid", of.reqID) + httpreq.Header.Set("X-Faas-Flow-State", "partial") + httpreq.Header.Set("X-Faas-Flow-Callback-Url", of.callbackURL) + + // extend req span for async call + if of.MonitoringEnabled() { + of.tracer.extendReqSpan(of.reqID, of.openFaasEventHandler.currentNodeID, + of.asyncURL, httpreq) + } + + client := &http.Client{} + res, resErr := client.Do(httpreq) + if resErr != nil { + return resErr + } + + defer res.Body.Close() + resdata, _ := ioutil.ReadAll(res.Body) + + if res.StatusCode != http.StatusOK && res.StatusCode != http.StatusAccepted { + return fmt.Errorf("%d: %s", res.StatusCode, string(resdata)) + } + return nil +} + +func (of *openFaasExecutor) GetExecutionOption(operation sdk.Operation) map[string]interface{} { + options := make(map[string]interface{}) + options["gateway"] = of.gateway + options["request-id"] = of.reqID + + return options +} + +func (of *openFaasExecutor) HandleExecutionCompletion(data []byte) error { + if of.callbackURL == "" { + return nil + } + + log.Printf("calling callback url (%s) with result", of.callbackURL) + httpreq, _ := http.NewRequest(http.MethodPost, of.callbackURL, bytes.NewReader(data)) + httpreq.Header.Add("X-Faas-Flow-ReqiD", of.reqID) + client := &http.Client{} + + res, resErr := client.Do(httpreq) + if resErr != nil { + return resErr + } + defer res.Body.Close() + resdata, _ := ioutil.ReadAll(res.Body) + + if res.StatusCode != http.StatusOK && res.StatusCode != http.StatusAccepted { + return fmt.Errorf("failed to call callback %d: %s", res.StatusCode, string(resdata)) + } + + return nil +} + +func (of *openFaasExecutor) Configure(requestID string) { + of.reqID = requestID +} + +func (of *openFaasExecutor) GetFlowName() string { + return of.flowName +} + +func (of *openFaasExecutor) GetFlowDefinition(pipeline *sdk.Pipeline, context *sdk.Context) error { + workflow := faasflow.GetWorkflow(pipeline) + faasflowContext := (*faasflow.Context)(context) + err := function.Define(workflow, faasflowContext) + return err +} + +func (of *openFaasExecutor) ReqValidationEnabled() bool { + status := true + hmacStatus := os.Getenv("validate_request") + if strings.ToUpper(hmacStatus) == "FALSE" { + status = false + } + return status +} + +func (of *openFaasExecutor) GetValidationKey() (string, error) { + key, keyErr := openfaas.ReadSecret("faasflow-hmac-secret") + if keyErr != nil { + key = defaultHmacKey + } + return key, nil +} + +func (of *openFaasExecutor) ReqAuthEnabled() bool { + status := false + verifyStatus := os.Getenv("authenticate_request") + if strings.ToUpper(verifyStatus) == "TRUE" { + status = true + } + return status +} + +func (of *openFaasExecutor) GetReqAuthKey() (string, error) { + key, keyErr := openfaas.ReadSecret("faasflow-hmac-secret") + return key, keyErr +} + +func (of *openFaasExecutor) MonitoringEnabled() bool { + tracing := os.Getenv("enable_tracing") + if strings.ToUpper(tracing) == "TRUE" { + return true + } + return false +} + +func (of *openFaasExecutor) GetEventHandler() (sdk.EventHandler, error) { + return &of.openFaasEventHandler, nil +} + +func (of *openFaasExecutor) LoggingEnabled() bool { + return true +} + +func (of *openFaasExecutor) GetLogger() (sdk.Logger, error) { + return &of.logger, nil +} + +func (of *openFaasExecutor) GetStateStore() (sdk.StateStore, error) { + return of.stateStore, nil +} + +func (of *openFaasExecutor) GetDataStore() (sdk.DataStore, error) { + return of.dataStore, nil +} + +// internal + +func (of *openFaasExecutor) init(req *http.Request) error { + of.gateway = config.GatewayURL() + of.flowName = getWorkflowNameFromHost(req.Host) + if of.flowName == "" { + return fmt.Errorf("failed to parse workflow name from host") + } + of.asyncURL = buildURL("http://"+of.gateway, "async-function", of.flowName) + + if of.MonitoringEnabled() { + var err error + // initialize trace server if tracing enabled + of.openFaasEventHandler.tracer, err = initRequestTracer(of.flowName) + if err != nil { + return fmt.Errorf("failed to init request tracer, error %v", err) + } + } + + return nil +} diff --git a/template/faas-flow/executor/pause_flow_handler.go b/template/faas-flow/executor/pause_flow_handler.go new file mode 100644 index 00000000..e6d1785b --- /dev/null +++ b/template/faas-flow/executor/pause_flow_handler.go @@ -0,0 +1,21 @@ +package executor + +import ( + "fmt" + "log" + "net/http" + + "github.com/s8sg/faas-flow/sdk/executor" +) + +func pauseFlowHandler(w http.ResponseWriter, req *http.Request, id string, ex executor.Executor) ([]byte, error) { + log.Printf("Pausing flow %s\n", id) + + flowExecutor := executor.CreateFlowExecutor(ex, nil) + err := flowExecutor.Pause(id) + if err != nil { + return nil, fmt.Errorf("failed to pause request %s, check if request is active", id) + } + + return []byte("Successfully paused request " + id), nil +} diff --git a/template/faas-flow/executor/resume_flow_handler.go b/template/faas-flow/executor/resume_flow_handler.go new file mode 100644 index 00000000..646ed8ac --- /dev/null +++ b/template/faas-flow/executor/resume_flow_handler.go @@ -0,0 +1,21 @@ +package executor + +import ( + "fmt" + "log" + "net/http" + + "github.com/s8sg/faas-flow/sdk/executor" +) + +func resumeFlowHandler(w http.ResponseWriter, req *http.Request, id string, ex executor.Executor) ([]byte, error) { + log.Printf("Resuming flow %s\n", id) + + flowExecutor := executor.CreateFlowExecutor(ex, nil) + err := flowExecutor.Resume(id) + if err != nil { + return nil, fmt.Errorf("failed to resume request %s, check if request is active", id) + } + + return []byte("Successfully resumed request " + id), nil +} diff --git a/template/faas-flow/executor/router.go b/template/faas-flow/executor/router.go new file mode 100644 index 00000000..d99f2705 --- /dev/null +++ b/template/faas-flow/executor/router.go @@ -0,0 +1,20 @@ +package executor + +import ( + "net/http" + + "github.com/julienschmidt/httprouter" +) + +func router() http.Handler { + router := httprouter.New() + // router.POST("/flow/execute", newRequestHandlerWrapper(executeFlowHandler)) + router.POST("/flow/:id/execute", newRequestHandlerWrapper(executeFlowHandler)) + router.POST("/flow/:id/pause", newRequestHandlerWrapper(pauseFlowHandler)) + router.POST("/flow/:id/resume", newRequestHandlerWrapper(resumeFlowHandler)) + router.POST("/flow/:id/stop", newRequestHandlerWrapper(stopFlowHandler)) + router.GET("/flow/:id/state", newRequestHandlerWrapper(flowStateHandler)) + router.POST("/", legacyRequestHandler) + router.GET("/", legacyRequestHandler) + return router +} diff --git a/template/faas-flow/executor/start_server.go b/template/faas-flow/executor/start_server.go new file mode 100644 index 00000000..1f661e03 --- /dev/null +++ b/template/faas-flow/executor/start_server.go @@ -0,0 +1,37 @@ +package executor + +import ( + "fmt" + "log" + "net/http" + + "handler/config" +) + +// StartServer starts the flow function +func StartServer() { + readTimeout := config.ReadTimeout() + writeTimeout := config.WriteTimeout() + + var err error + + stateStore, err = initStateStore() + if err != nil { + log.Fatalf("Failed to initialize the StateStore, %v", err) + } + + dataStore, err = initDataStore() + if err != nil { + log.Fatalf("Failed to initialize the StateStore, %v", err) + } + + s := &http.Server{ + Addr: fmt.Sprintf(":%d", 8082), + ReadTimeout: readTimeout, + WriteTimeout: writeTimeout, + Handler: router(), + MaxHeaderBytes: 1 << 20, // Max header of 1MB + } + + log.Fatal(s.ListenAndServe()) +} diff --git a/template/faas-flow/executor/stop_flow_handler.go b/template/faas-flow/executor/stop_flow_handler.go new file mode 100644 index 00000000..246274ae --- /dev/null +++ b/template/faas-flow/executor/stop_flow_handler.go @@ -0,0 +1,21 @@ +package executor + +import ( + "fmt" + "log" + "net/http" + + "github.com/s8sg/faas-flow/sdk/executor" +) + +func stopFlowHandler(w http.ResponseWriter, req *http.Request, id string, ex executor.Executor) ([]byte, error) { + log.Printf("Pausing flow %s\n", id) + + flowExecutor := executor.CreateFlowExecutor(ex, nil) + err := flowExecutor.Stop(id) + if err != nil { + return nil, fmt.Errorf("failed to stop request %s, check if request is active", id) + } + + return []byte("Successfully stopped request " + id), nil +} diff --git a/template/faas-flow/tracer.go b/template/faas-flow/executor/trace_handler.go similarity index 51% rename from template/faas-flow/tracer.go rename to template/faas-flow/executor/trace_handler.go index 9299d494..4e1191ae 100644 --- a/template/faas-flow/tracer.go +++ b/template/faas-flow/executor/trace_handler.go @@ -1,16 +1,13 @@ -package main +package executor import ( "fmt" + "github.com/opentracing/opentracing-go" "github.com/opentracing/opentracing-go/ext" - //"github.com/opentracing/opentracing-go/log" - "github.com/uber/jaeger-client-go" - "github.com/uber/jaeger-client-go/config" + "io" "net/http" - "os" - "time" ) type traceHandler struct { @@ -24,97 +21,15 @@ type traceHandler struct { operationSpans map[string]map[string]opentracing.Span } -// CustomHeadersCarrier satisfies both TextMapWriter and TextMapReader -type CustomHeadersCarrier struct { - envMap map[string]string -} - -// buildCustomHeadersCarrier builds a CustomHeadersCarrier from env -func buildCustomHeadersCarrier(header http.Header) *CustomHeadersCarrier { - carrier := &CustomHeadersCarrier{} - carrier.envMap = make(map[string]string) - - for k, v := range header { - if k == "Uber-Trace-Id" && len(v) > 0 { - key := "uber-trace-id" - carrier.envMap[key] = v[0] - break - } - } - - return carrier -} - -// ForeachKey conforms to the TextMapReader interface -func (c *CustomHeadersCarrier) ForeachKey(handler func(key, val string) error) error { - for key, value := range c.envMap { - err := handler(key, value) - if err != nil { - fmt.Fprintf(os.Stderr, "ForeachKey key %s value %s, error %v", - key, value, err) - return err - } - } - return nil -} - -// Set conforms to the TextMapWriter interface -func (c *CustomHeadersCarrier) Set(key, val string) { - c.envMap[key] = val -} - -// getTraceServer get the traceserver address -func getTraceServer() string { - traceServer := os.Getenv("trace_server") - if traceServer == "" { - traceServer = "jaeger.faasflow:5775" - } - return traceServer -} - -// initRequestTracer init global trace with configuration -func initRequestTracer(flowName string) (*traceHandler, error) { - tracerObj := &traceHandler{} - - agentPort := getTraceServer() - - cfg := config.Configuration{ - ServiceName: flowName, - Sampler: &config.SamplerConfig{ - Type: "const", - Param: 1, - }, - Reporter: &config.ReporterConfig{ - LogSpans: true, - BufferFlushInterval: 1 * time.Second, - LocalAgentHostPort: agentPort, - }, - } - - opentracer, traceCloser, err := cfg.NewTracer( - config.Logger(jaeger.StdLogger), - ) - if err != nil { - return nil, fmt.Errorf("failed to init tracer, error %v", err.Error()) - } - - tracerObj.closer = traceCloser - tracerObj.tracer = opentracer - tracerObj.nodeSpans = make(map[string]opentracing.Span) - tracerObj.operationSpans = make(map[string]map[string]opentracing.Span) - - return tracerObj, nil -} - // startReqSpan starts a request span -func (tracerObj *traceHandler) startReqSpan(reqId string) { - tracerObj.reqSpan = tracerObj.tracer.StartSpan(reqId) - tracerObj.reqSpan.SetTag("request", reqId) +func (tracerObj *traceHandler) startReqSpan(reqID string) { + tracerObj.reqSpan = tracerObj.tracer.StartSpan(reqID) + tracerObj.reqSpan.SetTag("request", reqID) tracerObj.reqSpanCtx = tracerObj.reqSpan.Context() } // continueReqSpan continue request span -func (tracerObj *traceHandler) continueReqSpan(reqId string, header http.Header) { +func (tracerObj *traceHandler) continueReqSpan(reqID string, header http.Header) { var err error tracerObj.reqSpanCtx, err = tracerObj.tracer.Extract( @@ -122,7 +37,7 @@ func (tracerObj *traceHandler) continueReqSpan(reqId string, header http.Header) opentracing.HTTPHeadersCarrier(header), ) if err != nil { - fmt.Printf("[Request %s] failed to continue req span for tracing, error %v\n", reqId, err) + fmt.Printf("[Request %s] failed to continue req span for tracing, error %v\n", reqID, err) return } @@ -135,7 +50,7 @@ func (tracerObj *traceHandler) continueReqSpan(reqId string, header http.Header) // extendReqSpan extend req span over a request // func extendReqSpan(url string, req *http.Request) { -func (tracerObj *traceHandler) extendReqSpan(reqId string, lastNode string, url string, req *http.Request) { +func (tracerObj *traceHandler) extendReqSpan(reqID string, lastNode string, url string, req *http.Request) { // TODO: as requestSpan can't be regenerated with the span context we // forward the nodes SpanContext // span := reqSpan @@ -153,11 +68,11 @@ func (tracerObj *traceHandler) extendReqSpan(reqId string, lastNode string, url opentracing.HTTPHeadersCarrier(req.Header), ) if err != nil { - fmt.Printf("[Request %s] failed to extend req span for tracing, error %v\n", reqId, err) + fmt.Printf("[Request %s] failed to extend req span for tracing, error %v\n", reqID, err) } if req.Header.Get("Uber-Trace-Id") == "" { fmt.Printf("[Request %s] failed to extend req span for tracing, error Uber-Trace-Id not set\n", - reqId) + reqID) } } @@ -171,7 +86,7 @@ func (tracerObj *traceHandler) stopReqSpan() { } // startNodeSpan starts a node span -func (tracerObj *traceHandler) startNodeSpan(node string, reqId string) { +func (tracerObj *traceHandler) startNodeSpan(node string, reqID string) { tracerObj.nodeSpans[node] = tracerObj.tracer.StartSpan( node, ext.RPCServerOption(tracerObj.reqSpanCtx)) @@ -182,7 +97,7 @@ func (tracerObj *traceHandler) startNodeSpan(node string, reqId string) { */ tracerObj.nodeSpans[node].SetTag("async", "true") - tracerObj.nodeSpans[node].SetTag("request", reqId) + tracerObj.nodeSpans[node].SetTag("request", reqID) tracerObj.nodeSpans[node].SetTag("node", node) } @@ -193,7 +108,7 @@ func (tracerObj *traceHandler) stopNodeSpan(node string) { } // startOperationSpan starts an operation span -func (tracerObj *traceHandler) startOperationSpan(node string, reqId string, operationId string) { +func (tracerObj *traceHandler) startOperationSpan(node string, reqID string, operationID string) { if tracerObj.nodeSpans[node] == nil { return @@ -206,23 +121,23 @@ func (tracerObj *traceHandler) startOperationSpan(node string, reqId string, ope } nodeContext := tracerObj.nodeSpans[node].Context() - operationSpans[operationId] = tracerObj.tracer.StartSpan( - operationId, opentracing.ChildOf(nodeContext)) + operationSpans[operationID] = tracerObj.tracer.StartSpan( + operationID, opentracing.ChildOf(nodeContext)) - operationSpans[operationId].SetTag("request", reqId) - operationSpans[operationId].SetTag("node", node) - operationSpans[operationId].SetTag("operation", operationId) + operationSpans[operationID].SetTag("request", reqID) + operationSpans[operationID].SetTag("node", node) + operationSpans[operationID].SetTag("operation", operationID) } // stopOperationSpan stops an operation span -func (tracerObj *traceHandler) stopOperationSpan(node string, operationId string) { +func (tracerObj *traceHandler) stopOperationSpan(node string, operationID string) { if tracerObj.nodeSpans[node] == nil { return } operationSpans := tracerObj.operationSpans[node] - operationSpans[operationId].Finish() + operationSpans[operationID].Finish() } // flushTracer flush all pending traces diff --git a/template/faas-flow/executor/util.go b/template/faas-flow/executor/util.go new file mode 100644 index 00000000..ae0a1b0d --- /dev/null +++ b/template/faas-flow/executor/util.go @@ -0,0 +1,79 @@ +package executor + +import ( + "net/url" + "path" + "regexp" + "strings" +) + +var re = regexp.MustCompile(`(?m)^[^:.]+\s*`) + +// buildURL builds execution url for the flow +func buildURL(gateway, rPath, function string) string { + u, _ := url.Parse(gateway) + u.Path = path.Join(u.Path, rPath, function) + return u.String() +} + +// getWorkflowNameFromHostFromHost returns the flow name from env +func getWorkflowNameFromHost(host string) string { + matches := re.FindAllString(host, -1) + if matches[0] != "" { + return matches[0] + } + return "" +} + +// isDagExportRequest check if dag export request +func isDagExportRequest(query string) bool { + values, err := url.ParseQuery(query) + if err != nil { + return false + } + + if strings.ToUpper(values.Get("export-dag")) == "TRUE" { + return true + } + return false +} + +// getStateRequestID check if state request and return the requestID +func getStateRequestID(query string) string { + values, err := url.ParseQuery(query) + if err != nil { + return "" + } + + return values.Get("state") +} + +// getStopRequestID check if stop request and return the requestID +func getStopRequestID(query string) string { + values, err := url.ParseQuery(query) + if err != nil { + return "" + } + + return values.Get("stop-flow") +} + +// getPauseRequestID check if pause request and return the requestID +func getPauseRequestID(query string) string { + values, err := url.ParseQuery(query) + if err != nil { + return "" + } + + return values.Get("pause-flow") +} + +// getResumeRequestID check if resume request and return the requestID +func getResumeRequestID(query string) string { + values, err := url.ParseQuery(query) + if err != nil { + return "" + } + + return values.Get("resume-flow") +} diff --git a/template/faas-flow/go.mod b/template/faas-flow/go.mod index 156448d4..0c7d9c3e 100644 --- a/template/faas-flow/go.mod +++ b/template/faas-flow/go.mod @@ -6,6 +6,7 @@ require ( github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd // indirect github.com/go-ini/ini v1.56.0 // indirect github.com/hashicorp/consul/api v1.4.0 // indirect + github.com/julienschmidt/httprouter v1.3.0 github.com/minio/minio-go v6.0.14+incompatible // indirect github.com/opentracing/opentracing-go v1.1.0 github.com/pkg/errors v0.9.1 // indirect diff --git a/template/faas-flow/go.sum b/template/faas-flow/go.sum index 9990ae1d..ab70fb10 100644 --- a/template/faas-flow/go.sum +++ b/template/faas-flow/go.sum @@ -54,6 +54,8 @@ github.com/hashicorp/serf v0.8.2 h1:YZ7UKsJv+hKjqGVUUbtE3HNj79Eln2oQ75tniF6iPt0= github.com/hashicorp/serf v0.8.2/go.mod h1:6hOLApaqBFA1NXqRQAsxw9QxuDEvNxSQRwA/JwenrHc= github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo= github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= +github.com/julienschmidt/httprouter v1.3.0 h1:U0609e9tgbseu3rBINet9P48AI/D3oJs4dN7jwJOQ1U= +github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM= github.com/kr/pretty v0.2.0 h1:s5hAObm+yFO5uHYt5dYjxi2rXrsnmRpJx4OYvIWUaQs= github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= diff --git a/template/faas-flow/handler.go b/template/faas-flow/handler.go deleted file mode 100644 index 395263b2..00000000 --- a/template/faas-flow/handler.go +++ /dev/null @@ -1,404 +0,0 @@ -package main - -import ( - "bytes" - "fmt" - "handler/function" - "io/ioutil" - "log" - "net/http" - "os" - "regexp" - "strings" - - faasflow "github.com/s8sg/faas-flow" - - sdk "github.com/s8sg/faas-flow/sdk" - executor "github.com/s8sg/faas-flow/sdk/executor" - exporter "github.com/s8sg/faas-flow/sdk/exporter" -) - -const ( - // A signature of SHA265 equivalent of "github.com/s8sg/faas-flow" - defaultHmacKey = "71F1D3011F8E6160813B4997BA29856744375A7F26D427D491E1CCABD4627E7C" - counterUpdateRetryCount = 10 -) - -var ( - re = regexp.MustCompile(`(?m)^[^:.]+\s*`) -) - -// implements faasflow.EventHandler -type openFaasEventHandler struct { - currentNodeId string // used to inject current node id in tracer - tracer *traceHandler // handle traces with open-tracing - flowName string - header http.Header -} - -// implements faasflow.Logger -type openFaasLogger struct{} - -// implements faasflow.Executor + RequestHandler -type openFaasExecutor struct { - gateway string - asyncUrl string // the async URL of the flow - flowName string // the name of the function - reqId string // the request id - callbackUrl string // the callback url - partialState []byte - rawRequest *executor.RawRequest - stateStore sdk.StateStore - dataStore sdk.DataStore - - openFaasEventHandler - openFaasLogger -} - -// Logger -func (logger *openFaasLogger) Configure(flowName string, requestId string) {} -func (logger *openFaasLogger) Init() error { - return nil -} -func (logger *openFaasLogger) Log(str string) { - fmt.Print(str) -} - -// EventHandler - -func (eh *openFaasEventHandler) Configure(flowName string, requestId string) { - eh.flowName = flowName -} - -func (eh *openFaasEventHandler) Init() error { - var err error - - // initialize trace server if tracing enabled - eh.tracer, err = initRequestTracer(eh.flowName) - if err != nil { - return fmt.Errorf("failed to init request tracer, error %v", err) - } - return nil -} - -func (eh *openFaasEventHandler) ReportRequestStart(requestId string) { - eh.tracer.startReqSpan(requestId) -} - -func (eh *openFaasEventHandler) ReportRequestFailure(requestId string, err error) { - // TODO: add log - eh.tracer.stopReqSpan() -} - -func (eh *openFaasEventHandler) ReportExecutionForward(currentNodeId string, requestId string) { - eh.currentNodeId = currentNodeId -} - -func (eh *openFaasEventHandler) ReportExecutionContinuation(requestId string) { - eh.tracer.continueReqSpan(requestId, eh.header) -} - -func (eh *openFaasEventHandler) ReportRequestEnd(requestId string) { - eh.tracer.stopReqSpan() -} - -func (eh *openFaasEventHandler) ReportNodeStart(nodeId string, requestId string) { - eh.tracer.startNodeSpan(nodeId, requestId) -} - -func (eh *openFaasEventHandler) ReportNodeEnd(nodeId string, requestId string) { - eh.tracer.stopNodeSpan(nodeId) -} - -func (eh *openFaasEventHandler) ReportNodeFailure(nodeId string, requestId string, err error) { - // TODO: add log - eh.tracer.stopNodeSpan(nodeId) -} - -func (eh *openFaasEventHandler) ReportOperationStart(operationId string, nodeId string, requestId string) { - eh.tracer.startOperationSpan(nodeId, requestId, operationId) -} - -func (eh *openFaasEventHandler) ReportOperationEnd(operationId string, nodeId string, requestId string) { - eh.tracer.stopOperationSpan(nodeId, operationId) -} - -func (eh *openFaasEventHandler) ReportOperationFailure(operationId string, nodeId string, requestId string, err error) { - // TODO: add log - eh.tracer.stopOperationSpan(nodeId, operationId) -} - -func (eh *openFaasEventHandler) Flush() { - eh.tracer.flushTracer() -} - -// ExecutionRuntime - -func (of *openFaasExecutor) HandleNextNode(partial *executor.PartialState) error { - - state, err := partial.Encode() - if err != nil { - return fmt.Errorf("failed to encode partial state, error %v", err) - } - - // build url for calling the flow in async - httpreq, _ := http.NewRequest(http.MethodPost, of.asyncUrl, bytes.NewReader(state)) - httpreq.Header.Add("Accept", "application/json") - httpreq.Header.Add("Content-Type", "application/json") - httpreq.Header.Add("X-Faas-Flow-Reqid", of.reqId) - httpreq.Header.Set("X-Faas-Flow-State", "partial") - httpreq.Header.Set("X-Faas-Flow-Callback-Url", of.callbackUrl) - - // extend req span for async call - if of.MonitoringEnabled() { - of.tracer.extendReqSpan(of.reqId, of.openFaasEventHandler.currentNodeId, - of.asyncUrl, httpreq) - } - - client := &http.Client{} - res, resErr := client.Do(httpreq) - if resErr != nil { - return resErr - } - - defer res.Body.Close() - resdata, _ := ioutil.ReadAll(res.Body) - - if res.StatusCode != http.StatusOK && res.StatusCode != http.StatusAccepted { - return fmt.Errorf("%d: %s", res.StatusCode, string(resdata)) - } - return nil -} - -func (of *openFaasExecutor) GetExecutionOption(operation sdk.Operation) map[string]interface{} { - options := make(map[string]interface{}) - options["gateway"] = of.gateway - options["request-id"] = of.reqId - - return options -} - -func (of *openFaasExecutor) HandleExecutionCompletion(data []byte) error { - if of.callbackUrl == "" { - return nil - } - - log.Printf("calling callback url (%s) with result", of.callbackUrl) - httpreq, _ := http.NewRequest(http.MethodPost, of.callbackUrl, bytes.NewReader(data)) - httpreq.Header.Add("X-Faas-Flow-Reqid", of.reqId) - client := &http.Client{} - - res, resErr := client.Do(httpreq) - if resErr != nil { - return resErr - } - defer res.Body.Close() - resdata, _ := ioutil.ReadAll(res.Body) - - if res.StatusCode != http.StatusOK && res.StatusCode != http.StatusAccepted { - return fmt.Errorf("failed to call callback %d: %s", res.StatusCode, string(resdata)) - } - - return nil -} - -// Executor - -func (of *openFaasExecutor) Configure(requestId string) { - of.reqId = requestId -} - -func (of *openFaasExecutor) GetFlowName() string { - return of.flowName -} - -func (of *openFaasExecutor) GetFlowDefinition(pipeline *sdk.Pipeline, context *sdk.Context) error { - workflow := faasflow.GetWorkflow(pipeline) - faasflowContext := (*faasflow.Context)(context) - err := function.Define(workflow, faasflowContext) - return err -} - -func (of *openFaasExecutor) ReqValidationEnabled() bool { - status := true - hmacStatus := os.Getenv("validate_request") - if strings.ToUpper(hmacStatus) == "FALSE" { - status = false - } - return status -} - -func (of *openFaasExecutor) GetValidationKey() (string, error) { - key, keyErr := readSecret("faasflow-hmac-secret") - if keyErr != nil { - key = defaultHmacKey - } - return key, nil -} - -func (of *openFaasExecutor) ReqAuthEnabled() bool { - status := false - verifyStatus := os.Getenv("authenticate_request") - if strings.ToUpper(verifyStatus) == "TRUE" { - status = true - } - return status -} - -func (of *openFaasExecutor) GetReqAuthKey() (string, error) { - key, keyErr := readSecret("faasflow-hmac-secret") - return key, keyErr -} - -func (of *openFaasExecutor) MonitoringEnabled() bool { - tracing := os.Getenv("enable_tracing") - if strings.ToUpper(tracing) == "TRUE" { - return true - } - return false -} - -func (of *openFaasExecutor) GetEventHandler() (sdk.EventHandler, error) { - return &of.openFaasEventHandler, nil -} - -func (of *openFaasExecutor) LoggingEnabled() bool { - return true -} - -func (of *openFaasExecutor) GetLogger() (sdk.Logger, error) { - return &of.openFaasLogger, nil -} - -func (of *openFaasExecutor) GetStateStore() (sdk.StateStore, error) { - return of.stateStore, nil -} - -func (of *openFaasExecutor) GetDataStore() (sdk.DataStore, error) { - return of.dataStore, nil -} - -// internal - -func (of *openFaasExecutor) init(req *HttpRequest) error { - - of.gateway = getGateway() - of.flowName = getWorkflowNameFromHost(req.Host) - if of.flowName == "" { - return fmt.Errorf("failed to parse workflow name from host") - } - of.asyncUrl = buildURL("http://"+of.gateway, "async-function", of.flowName) - - if of.MonitoringEnabled() { - var err error - // initialize trace server if tracing enabled - of.openFaasEventHandler.tracer, err = initRequestTracer(of.flowName) - if err != nil { - return fmt.Errorf("failed to init request tracer, error %v", err) - } - } - - return nil -} - -// Handle handle requests to flow function -func (of *openFaasExecutor) Handle(req *HttpRequest, response *HttpResponse) error { - - err := of.init(req) - if err != nil { - return err - } - - switch { - case isDagExportRequest(req): - flowExporter := exporter.CreateFlowExporter(of) - resp, err := flowExporter.Export() - if err != nil { - return fmt.Errorf("failed to export dag, error %v", err) - } - response.Body = resp - - case getStopRequestId(req) != "": - requestId := getStopRequestId(req) - flowExecutor := executor.CreateFlowExecutor(of, nil) - err := flowExecutor.Stop(requestId) - if err != nil { - log.Printf(err.Error()) - return fmt.Errorf("failed to stop request " + requestId + ", check if request is active") - } - response.Body = []byte("Successfully stopped request " + requestId) - - case getPauseRequestId(req) != "": - requestId := getPauseRequestId(req) - flowExecutor := executor.CreateFlowExecutor(of, nil) - err := flowExecutor.Pause(requestId) - if err != nil { - log.Printf(err.Error()) - return fmt.Errorf("failed to pause request " + requestId + ", check if request is active") - } - response.Body = []byte("Successfully paused request " + requestId) - - case getResumeRequestId(req) != "": - requestId := getResumeRequestId(req) - flowExecutor := executor.CreateFlowExecutor(of, nil) - err := flowExecutor.Resume(requestId) - if err != nil { - log.Printf(err.Error()) - return fmt.Errorf("failed to resume request " + requestId + ", check if request is active") - } - response.Body = []byte("Successfully resumed request " + requestId) - - case getStateRequestId(req) != "": - requestId := getStateRequestId(req) - flowExecutor := executor.CreateFlowExecutor(of, nil) - state, err := flowExecutor.GetState(requestId) - if err != nil { - log.Printf(err.Error()) - return fmt.Errorf("failed to get request state for " + requestId + ", check if request is active") - } - response.Body = []byte(state) - - default: - var stateOption executor.ExecutionStateOption - - requestId := req.Header.Get("X-Faas-Flow-Reqid") - state := req.Header.Get("X-Faas-Flow-State") - of.callbackUrl = req.Header.Get("X-Faas-Flow-Callback-Url") - if state == "" { - rawRequest := &executor.RawRequest{} - rawRequest.Data = req.Body - rawRequest.Query = req.QueryString - rawRequest.AuthSignature = req.Header.Get("X-Hub-Signature") - // Check if any request Id is passed - if requestId != "" { - rawRequest.RequestId = requestId - } - stateOption = executor.NewRequest(rawRequest) - } else { - if requestId == "" { - return fmt.Errorf("request ID not set in partial request") - } - of.openFaasEventHandler.header = req.Header - partialState, err := executor.DecodePartialReq(req.Body) - if err != nil { - log.Printf(err.Error()) - return fmt.Errorf("failed to decode partial state") - } - stateOption = executor.PartialRequest(partialState) - } - - // Create a flow executor, OpenFaaSExecutor implements executor - flowExecutor := executor.CreateFlowExecutor(of, nil) - resp, err := flowExecutor.Execute(stateOption) - if err != nil { - log.Printf(err.Error()) - return fmt.Errorf("failed to execute request") - } - response.Body = resp - response.Header.Set("X-Faas-Flow-Reqid", of.reqId) - response.Header.Set("X-Faas-Flow-Callback-Url", of.callbackUrl) - } - - response.StatusCode = http.StatusOK - return nil -} diff --git a/template/faas-flow/log/std_out_logger.go b/template/faas-flow/log/std_out_logger.go new file mode 100644 index 00000000..693d27da --- /dev/null +++ b/template/faas-flow/log/std_out_logger.go @@ -0,0 +1,17 @@ +package log + +import ( + "fmt" +) + +// implements faasflow.Logger +type StdOutLogger struct{} + +func (l *StdOutLogger) Configure(flowName string, requestId string) {} + +func (l *StdOutLogger) Init() error { + return nil +} +func (l *StdOutLogger) Log(str string) { + fmt.Print(str) +} diff --git a/template/faas-flow/main.go b/template/faas-flow/main.go index b2c2bff6..7d8acf52 100644 --- a/template/faas-flow/main.go +++ b/template/faas-flow/main.go @@ -1,208 +1,9 @@ package main import ( - "fmt" - "handler/function" - "io/ioutil" - "log" - "net/http" - "os" - "strconv" - "time" - - consulStateStore "github.com/s8sg/faas-flow-consul-statestore" - minioDataStore "github.com/s8sg/faas-flow-minio-datastore" - sdk "github.com/s8sg/faas-flow/sdk" -) - -// HttpResponse of function call -type HttpResponse struct { - - // Body the body will be written back - Body []byte - - // StatusCode needs to be populated with value such as http.StatusOK - StatusCode int - - // Header is optional and contains any additional headers the function response should set - Header http.Header -} - -// HttpRequest of function call -type HttpRequest struct { - Body []byte - Header http.Header - QueryString string - Method string - Host string -} - -// FunctionHandler used for a serverless Go method invocation -type FunctionHandler interface { - Handle(req *HttpRequest, response *HttpResponse) (err error) -} - -var ( - stateStore sdk.StateStore - dataStore sdk.DataStore + "handler/executor" ) -func makeRequestHandler() func(http.ResponseWriter, *http.Request) { - return func(w http.ResponseWriter, r *http.Request) { - var input []byte - - if r.Body != nil { - defer r.Body.Close() - - bodyBytes, bodyErr := ioutil.ReadAll(r.Body) - - if bodyErr != nil { - fmt.Printf("Error reading body from request.") - } - - input = bodyBytes - } - - req := &HttpRequest{ - Body: input, - Header: r.Header, - Method: r.Method, - QueryString: r.URL.RawQuery, - Host: r.Host, - } - - response := &HttpResponse{} - response.Header = make(map[string][]string) - - openfaasExecutor := &openFaasExecutor{stateStore: stateStore, dataStore: dataStore} - - responseErr := openfaasExecutor.Handle(req, response) - - for k, v := range response.Header { - w.Header()[k] = v - } - - if responseErr != nil { - errorStr := fmt.Sprintf("[ Failed ] %v\n", responseErr) - fmt.Printf(errorStr) - w.Write([]byte(errorStr)) - w.WriteHeader(http.StatusInternalServerError) - } else { - if response.StatusCode == 0 { - w.WriteHeader(http.StatusOK) - } else { - w.WriteHeader(response.StatusCode) - } - } - - w.Write(response.Body) - } -} - -func parseIntOrDurationValue(val string, fallback time.Duration) time.Duration { - if len(val) > 0 { - parsedVal, parseErr := strconv.Atoi(val) - if parseErr == nil && parsedVal >= 0 { - return time.Duration(parsedVal) * time.Second - } - } - - duration, durationErr := time.ParseDuration(val) - if durationErr != nil { - return fallback - } - return duration -} - -func initStateStore() (err error) { - stateStore, err = function.OverrideStateStore() - if err != nil { - return err - } - if stateStore == nil { - - consulUrl := os.Getenv("consul_url") - if len(consulUrl) == 0 { - consulUrl = "consul.faasflow:8500" - } - - consulDc := os.Getenv("consul_dc") - if len(consulDc) == 0 { - consulDc = "dc1" - } - - stateStore, err = consulStateStore.GetConsulStateStore(consulUrl, consulDc) - - log.Print("Using default state store (consul)") - } - return err -} - -func initDataStore() (err error) { - dataStore, err = function.OverrideDataStore() - if err != nil { - return err - } - if dataStore == nil { - - /* - minioUrl := os.Getenv("s3_url") - if len(minioUrl) == 0 { - minioUrl = "minio.faasflow:9000" - } - - minioRegion := os.Getenv("s3_region") - if len(minioRegion) == 0 { - minioUrl = "us-east-1" - } - - secretKeyName := os.Getenv("s3_secret_key_name") - if len(secretKeyName) == 0 { - secretKeyName = "s3-secret-key" - } - - accessKeyName := os.Getenv("s3_access_key_name") - if len(accessKeyName) == 0 { - accessKeyName = "s3-access-key" - } - - tlsEnabled := false - if connection := os.Getenv("s3_tls"); connection == "true" || connection == "1" { - tlsEnabled = true - } - - dataStore, err = minioDataStore.Init(minioUrl, minioRegion, secretKeyName, accessKeyName, tlsEnabled) - */ - dataStore, err = minioDataStore.InitFromEnv() - - log.Print("Using default data store (minio)") - } - return err -} - func main() { - readTimeout := parseIntOrDurationValue(os.Getenv("read_timeout"), 10*time.Second) - writeTimeout := parseIntOrDurationValue(os.Getenv("write_timeout"), 10*time.Second) - - var err error - - err = initStateStore() - if err != nil { - log.Fatalf("Failed to initialize the StateStore, %v", err) - } - - err = initDataStore() - if err != nil { - log.Fatalf("Failed to initialize the StateStore, %v", err) - } - - s := &http.Server{ - Addr: fmt.Sprintf(":%d", 8082), - ReadTimeout: readTimeout, - WriteTimeout: writeTimeout, - MaxHeaderBytes: 1 << 20, // Max header of 1MB - } - - http.HandleFunc("/", makeRequestHandler()) - log.Fatal(s.ListenAndServe()) + executor.StartServer() } diff --git a/template/faas-flow/openfaas/read_secret.go b/template/faas-flow/openfaas/read_secret.go new file mode 100644 index 00000000..ab3c30e2 --- /dev/null +++ b/template/faas-flow/openfaas/read_secret.go @@ -0,0 +1,26 @@ +package openfaas + +import ( + "fmt" + "io/ioutil" + "os" + "path" + "strings" +) + +// ReadSecret reads a secret from /var/openfaas/secrets or from +// env-var 'secret_mount_path' if set. +func ReadSecret(key string) (string, error) { + basePath := "/var/openfaas/secrets/" + if len(os.Getenv("secret_mount_path")) > 0 { + basePath = os.Getenv("secret_mount_path") + } + + readPath := path.Join(basePath, key) + secretBytes, readErr := ioutil.ReadFile(readPath) + if readErr != nil { + return "", fmt.Errorf("unable to read secret: %s, error: %s", readPath, readErr) + } + val := strings.TrimSpace(string(secretBytes)) + return val, nil +} diff --git a/template/faas-flow/util.go b/template/faas-flow/util.go deleted file mode 100644 index c173a001..00000000 --- a/template/faas-flow/util.go +++ /dev/null @@ -1,109 +0,0 @@ -package main - -import ( - "fmt" - "io/ioutil" - "net/url" - "os" - "path" - "strings" -) - -// buildURL builds execution url for the flow -func buildURL(gateway, rPath, function string) string { - u, _ := url.Parse(gateway) - u.Path = path.Join(u.Path, rPath+"/"+function) - return u.String() -} - -// readSecret reads a secret from /var/openfaas/secrets or from -// env-var 'secret_mount_path' if set. -func readSecret(key string) (string, error) { - basePath := "/var/openfaas/secrets/" - if len(os.Getenv("secret_mount_path")) > 0 { - basePath = os.Getenv("secret_mount_path") - } - - readPath := path.Join(basePath, key) - secretBytes, readErr := ioutil.ReadFile(readPath) - if readErr != nil { - return "", fmt.Errorf("unable to read secret: %s, error: %s", readPath, readErr) - } - val := strings.TrimSpace(string(secretBytes)) - return val, nil -} - -// getGateway return the gateway address from env -func getGateway() string { - gateway := os.Getenv("gateway") - if gateway == "" { - gateway = "gateway.openfaas:8080" - } - return gateway -} - -// getWorkflowNameFromHostFromHost returns the flow name from env -func getWorkflowNameFromHost(host string) string { - matches := re.FindAllString(host, -1) - if matches[0] != "" { - return matches[0] - } - return "" -} - -// isDagExportRequest check if dag export request -func isDagExportRequest(req *HttpRequest) bool { - values, err := url.ParseQuery(req.QueryString) - if err != nil { - return false - } - - if strings.ToUpper(values.Get("export-dag")) == "TRUE" { - return true - } - return false -} - -// getStopRequestId check if stop request and return the requestID -func getStopRequestId(req *HttpRequest) string { - values, err := url.ParseQuery(req.QueryString) - if err != nil { - return "" - } - - reqId := values.Get("stop-flow") - return reqId -} - -// getPauseRequestId check if pause request and return the requestID -func getPauseRequestId(req *HttpRequest) string { - values, err := url.ParseQuery(req.QueryString) - if err != nil { - return "" - } - - reqId := values.Get("pause-flow") - return reqId -} - -// getResumeRequestId check if resume request and return the requestID -func getResumeRequestId(req *HttpRequest) string { - values, err := url.ParseQuery(req.QueryString) - if err != nil { - return "" - } - - reqId := values.Get("resume-flow") - return reqId -} - -// getStateRequestId check if state request and return the requestID -func getStateRequestId(req *HttpRequest) string { - values, err := url.ParseQuery(req.QueryString) - if err != nil { - return "" - } - - reqId := values.Get("state") - return reqId -}