Skip to content

Commit

Permalink
Refactor template to separate packages, extract request handlers and …
Browse files Browse the repository at this point in the history
…add REST api
  • Loading branch information
pasdam authored and s8sg committed Jun 9, 2020
1 parent 30543ee commit b268647
Show file tree
Hide file tree
Showing 34 changed files with 993 additions and 820 deletions.
13 changes: 13 additions & 0 deletions template/faas-flow/config/consul_dc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package config

import (
"os"
)

func ConsulDC() string {
val := os.Getenv("consul_dc")
if len(val) == 0 {
val = "dc1"
}
return val
}
13 changes: 13 additions & 0 deletions template/faas-flow/config/consul_url.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package config

import (
"os"
)

func ConsulURL() string {
val := os.Getenv("consul_url")
if len(val) == 0 {
val = "consul.faasflow:8500"
}
return val
}
14 changes: 14 additions & 0 deletions template/faas-flow/config/gateway_url.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package config

import (
"os"
)

// GatewayURL return the gateway address from env
func GatewayURL() string {
gateway := os.Getenv("gateway")
if gateway == "" {
gateway = "gateway.openfaas:8080"
}
return gateway
}
21 changes: 21 additions & 0 deletions template/faas-flow/config/parse_int_or_duration_value.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package config

import (
"strconv"
"time"
)

func parseIntOrDurationValue(val string, fallback time.Duration) time.Duration {
if len(val) > 0 {
parsedVal, parseErr := strconv.Atoi(val)
if parseErr == nil && parsedVal >= 0 {
return time.Duration(parsedVal) * time.Second
}
}

duration, durationErr := time.ParseDuration(val)
if durationErr != nil {
return fallback
}
return duration
}
10 changes: 10 additions & 0 deletions template/faas-flow/config/read_timeout.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package config

import (
"os"
"time"
)

func ReadTimeout() time.Duration {
return parseIntOrDurationValue(os.Getenv("read_timeout"), 10*time.Second)
}
14 changes: 14 additions & 0 deletions template/faas-flow/config/trace_server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package config

import (
"os"
)

// TraceServer get the traceserver address
func TraceServer() string {
traceServer := os.Getenv("trace_server")
if traceServer == "" {
traceServer = "jaeger.faasflow:5775"
}
return traceServer
}
10 changes: 10 additions & 0 deletions template/faas-flow/config/write_timeout.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package config

import (
"os"
"time"
)

func WriteTimeout() time.Duration {
return parseIntOrDurationValue(os.Getenv("write_timeout"), 10*time.Second)
}
65 changes: 65 additions & 0 deletions template/faas-flow/executor/execute_flow_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package executor

import (
"errors"
"fmt"
"io/ioutil"
"log"
"net/http"

"github.com/s8sg/faas-flow/sdk/executor"
)

func executeFlowHandler(w http.ResponseWriter, req *http.Request, id string, ex executor.Executor) ([]byte, error) {
log.Printf("Requested %s %s\n", req.Method, req.URL)
log.Printf("Executing flow %s\n", id)

var stateOption executor.ExecutionStateOption

body, err := ioutil.ReadAll(req.Body)
if err != nil {
return nil, err
}

// TODO: fix this
openFaasEx := ex.(*openFaasExecutor)

state := req.Header.Get("X-Faas-Flow-State")
callbackURL := req.Header.Get("X-Faas-Flow-Callback-Url")
openFaasEx.callbackURL = callbackURL
if state == "" {
rawRequest := &executor.RawRequest{}
rawRequest.Data = body
rawRequest.Query = req.URL.RawQuery
rawRequest.AuthSignature = req.Header.Get("X-Hub-Signature")
// Check if any request Id is passed
if id != "" {
rawRequest.RequestId = id
}
stateOption = executor.NewRequest(rawRequest)

} else {
if id == "" {
return nil, errors.New("request ID not set in partial request")
}

openFaasEx.openFaasEventHandler.header = req.Header
partialState, err := executor.DecodePartialReq(body)
if err != nil {
return nil, errors.New("failed to decode partial state")
}
stateOption = executor.PartialRequest(partialState)
}

// Create a flow executor, OpenFaaSExecutor implements executor
flowExecutor := executor.CreateFlowExecutor(ex, nil)
resp, err := flowExecutor.Execute(stateOption)
if err != nil {
return nil, fmt.Errorf("failed to execute request. %s", err.Error())
}
headers := w.Header()
headers["X-Faas-Flow-Reqid"] = []string{flowExecutor.GetReqId()}
headers["X-Faas-Flow-Callback-Url"] = []string{callbackURL}

return resp, nil
}
8 changes: 8 additions & 0 deletions template/faas-flow/executor/executor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package executor

import "github.com/s8sg/faas-flow/sdk"

var (
stateStore sdk.StateStore
dataStore sdk.DataStore
)
22 changes: 22 additions & 0 deletions template/faas-flow/executor/flow_state_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package executor

import (
"fmt"
"log"
"net/http"

"github.com/s8sg/faas-flow/sdk/executor"
)

func flowStateHandler(w http.ResponseWriter, req *http.Request, id string, ex executor.Executor) ([]byte, error) {
log.Printf("Get flow state: %s\n", id)

flowExecutor := executor.CreateFlowExecutor(ex, nil)
state, err := flowExecutor.GetState(id)
if err != nil {
log.Printf(err.Error())
return nil, fmt.Errorf("failed to get request state for %s, check if request is active", id)
}

return []byte(state), nil
}
22 changes: 22 additions & 0 deletions template/faas-flow/executor/get_dag_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package executor

import (
"fmt"
"log"
"net/http"

"github.com/s8sg/faas-flow/sdk/executor"
"github.com/s8sg/faas-flow/sdk/exporter"
)

func getDagHandler(w http.ResponseWriter, req *http.Request, id string, ex executor.Executor) ([]byte, error) {
log.Println("Exporting flow's DAG")

flowExporter := exporter.CreateFlowExporter(ex)
resp, err := flowExporter.Export()
if err != nil {
return nil, fmt.Errorf("failed to export dag, error %v", err)
}

return resp, nil
}
13 changes: 13 additions & 0 deletions template/faas-flow/executor/handle_error.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package executor

import (
"fmt"
"net/http"
)

func handleError(w http.ResponseWriter, message string) {
errorStr := fmt.Sprintf("[ Failed ] %v\n", message)
fmt.Printf(errorStr)
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(errorStr))
}
52 changes: 52 additions & 0 deletions template/faas-flow/executor/init_data_store.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package executor

import (
"log"

"handler/function"

minioDataStore "github.com/s8sg/faas-flow-minio-datastore"
"github.com/s8sg/faas-flow/sdk"
)

func initDataStore() (dataStore sdk.DataStore, err error) {
dataStore, err = function.OverrideDataStore()
if err != nil {
return nil, err
}
if dataStore == nil {

/*
minioUrl := os.Getenv("s3_url")
if len(minioUrl) == 0 {
minioUrl = "minio.faasflow:9000"
}
minioRegion := os.Getenv("s3_region")
if len(minioRegion) == 0 {
minioUrl = "us-east-1"
}
secretKeyName := os.Getenv("s3_secret_key_name")
if len(secretKeyName) == 0 {
secretKeyName = "s3-secret-key"
}
accessKeyName := os.Getenv("s3_access_key_name")
if len(accessKeyName) == 0 {
accessKeyName = "s3-access-key"
}
tlsEnabled := false
if connection := os.Getenv("s3_tls"); connection == "true" || connection == "1" {
tlsEnabled = true
}
dataStore, err = minioDataStore.Init(minioUrl, minioRegion, secretKeyName, accessKeyName, tlsEnabled)
*/
dataStore, err = minioDataStore.InitFromEnv()

log.Print("Using default data store (minio)")
}
return dataStore, err
}
46 changes: 46 additions & 0 deletions template/faas-flow/executor/init_request_tracer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package executor

import (
"fmt"
"time"

hconfig "handler/config"

"github.com/opentracing/opentracing-go"
"github.com/uber/jaeger-client-go"
"github.com/uber/jaeger-client-go/config"
)

// initRequestTracer init global trace with configuration
func initRequestTracer(flowName string) (*traceHandler, error) {
tracerObj := &traceHandler{}

agentPort := hconfig.TraceServer()

cfg := config.Configuration{
ServiceName: flowName,
Sampler: &config.SamplerConfig{
Type: "const",
Param: 1,
},
Reporter: &config.ReporterConfig{
LogSpans: true,
BufferFlushInterval: 1 * time.Second,
LocalAgentHostPort: agentPort,
},
}

opentracer, traceCloser, err := cfg.NewTracer(
config.Logger(jaeger.StdLogger),
)
if err != nil {
return nil, fmt.Errorf("failed to init tracer, error %v", err.Error())
}

tracerObj.closer = traceCloser
tracerObj.tracer = opentracer
tracerObj.nodeSpans = make(map[string]opentracing.Span)
tracerObj.operationSpans = make(map[string]map[string]opentracing.Span)

return tracerObj, nil
}
29 changes: 29 additions & 0 deletions template/faas-flow/executor/init_state_store.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package executor

import (
"log"

"handler/config"
"handler/function"

consulStateStore "github.com/s8sg/faas-flow-consul-statestore"
"github.com/s8sg/faas-flow/sdk"
)

func initStateStore() (stateStore sdk.StateStore, err error) {
stateStore, err = function.OverrideStateStore()
if err != nil {
return nil, err
}

if stateStore == nil {
log.Print("Using default state store (consul)")

consulURL := config.ConsulURL()
consulDC := config.ConsulDC()

stateStore, err = consulStateStore.GetConsulStateStore(consulURL, consulDC)
}

return stateStore, err
}
Loading

0 comments on commit b268647

Please sign in to comment.