diff --git a/sdk/executor/executor.go b/sdk/executor/executor.go index 58c61c33..45aa6743 100644 --- a/sdk/executor/executor.go +++ b/sdk/executor/executor.go @@ -292,7 +292,7 @@ func (fexec *FlowExecutor) retrievePartialStates() ([]*PartialState, error) { func (fexec *FlowExecutor) isActive() bool { state, err := fexec.getRequestState() if err != nil { - fexec.log("[Request `%s`] Failed to obtain pipeline state\n", fexec.id) + fexec.log("[Request `%s`] Failed to obtain pipeline state, error %v\n", fexec.id, err) return false } @@ -1209,13 +1209,12 @@ func (fexec *FlowExecutor) Execute(state ExecutionStateOption) ([]byte, error) { if !fexec.partial { // For a new dag pipeline that has edges Create the vertex in stateStore - if fexec.hasEdge { - serr := fexec.setRequestState(STATE_RUNNING) - if serr != nil { - return nil, fmt.Errorf("[Request `%s`] Failed to mark dag state, error %v", fexec.id, serr) - } - fexec.log("[Request `%s`] DAG state initiated at StateStore\n", fexec.id) + serr := fexec.setRequestState(STATE_RUNNING) + if serr != nil { + return nil, fmt.Errorf("[Request `%s`] Failed to mark dag state, error %v", fexec.id, serr) } + fexec.log("[Request `%s`] DAG state initiated at StateStore\n", fexec.id) + // set the execution position to initial node // On the 0th depth set the initial node as the current execution position diff --git a/template/faas-flow/default_state_store.go b/template/faas-flow/default_state_store.go new file mode 100644 index 00000000..49b0fffb --- /dev/null +++ b/template/faas-flow/default_state_store.go @@ -0,0 +1,64 @@ +package main + +import ( + "fmt" + "sync" +) + +type DefaultStateStore struct { + flowName string + requestId string + cache map[string]string + mux sync.Mutex +} + +// Configure the StateStore with flow name and request ID +func (ss *DefaultStateStore) Configure(flowName string, requestId string) { +} + +// Initialize the StateStore (called only once in a request span) +func (ss *DefaultStateStore) Init() error { + ss.cache = make(map[string]string) + return nil +} + +// Set a value (override existing, or create one) +func (ss *DefaultStateStore) Set(key string, value string) error { + ss.mux.Lock() + ss.cache[key] = value + ss.mux.Unlock() + return nil +} + +// Get a value +func (ss *DefaultStateStore) Get(key string) (string, error) { + ss.mux.Lock() + value, ok := ss.cache[key] + ss.mux.Unlock() + if !ok { + return "", fmt.Errorf("key not found") + } + return value, nil +} + +// Compare and Update a value +func (ss *DefaultStateStore) Update(key string, oldValue string, newValue string) error { + ss.mux.Lock() + value, ok := ss.cache[key] + if !ok { + ss.mux.Unlock() + return fmt.Errorf("key not found") + } + if value != oldValue { + ss.mux.Unlock() + return fmt.Errorf("value doesn't match") + } + ss.cache[key] = newValue + ss.mux.Unlock() + return nil +} + +// Cleanup all the resources in StateStore (called only once in a request span) +func (ss *DefaultStateStore) Cleanup() error { + return nil +} diff --git a/template/faas-flow/function/handler.go b/template/faas-flow/function/handler.go index 370bca9f..372c4824 100644 --- a/template/faas-flow/function/handler.go +++ b/template/faas-flow/function/handler.go @@ -5,7 +5,7 @@ import ( faasflow "github.com/s8sg/faas-flow" ) -// Define provide definiton of the workflow +// Define provide definition of the workflow func Define(flow *faasflow.Workflow, context *faasflow.Context) (err error) { flow.SyncNode().Modify(func(data []byte) ([]byte, error) { return []byte(fmt.Sprintf("you said \"%s\"", string(data))), nil @@ -15,6 +15,9 @@ func Define(flow *faasflow.Workflow, context *faasflow.Context) (err error) { // DefineStateStore provides the override of the default StateStore func DefineStateStore() (faasflow.StateStore, error) { + // NOTE: By default FaaS-Flow use a DefaultStateStore + // It stores request state in memory, + // for a distributed flow use external synchronous KV store (e.g. ETCD) return nil, nil } diff --git a/template/faas-flow/handler.go b/template/faas-flow/handler.go index c83f71bd..f50166c6 100644 --- a/template/faas-flow/handler.go +++ b/template/faas-flow/handler.go @@ -239,11 +239,20 @@ func (of *openFaasExecutor) GetLogger() (sdk.Logger, error) { } func (of *openFaasExecutor) GetStateStore() (sdk.StateStore, error) { - return function.DefineStateStore() + stateStore, err := function.DefineStateStore() + if err != nil { + return stateStore, err + } + if stateStore == nil { + log.Print("using DefaultStateStore, distributed request may fail") + stateStore = &DefaultStateStore{} + } + return stateStore, nil } func (of *openFaasExecutor) GetDataStore() (sdk.DataStore, error) { - return function.DefineDataStore() + stateStore, err := function.DefineDataStore() + return stateStore, err } // internal @@ -272,7 +281,7 @@ func (of *openFaasExecutor) Handle(req *HttpRequest, response *HttpResponse) err err := of.init(req) if err != nil { - panic(err.Error()) + return err } switch { @@ -290,7 +299,7 @@ func (of *openFaasExecutor) Handle(req *HttpRequest, response *HttpResponse) err err := flowExecutor.Stop(requestId) if err != nil { log.Printf(err.Error()) - return fmt.Errorf("Failed to stop request " + requestId + ", check if request is active") + return fmt.Errorf("failed to stop request " + requestId + ", check if request is active") } response.Body = []byte("Successfully stopped request " + requestId) @@ -300,7 +309,7 @@ func (of *openFaasExecutor) Handle(req *HttpRequest, response *HttpResponse) err err := flowExecutor.Pause(requestId) if err != nil { log.Printf(err.Error()) - return fmt.Errorf("Failed to pause request " + requestId + ", check if request is active") + return fmt.Errorf("failed to pause request " + requestId + ", check if request is active") } response.Body = []byte("Successfully paused request " + requestId) @@ -310,7 +319,7 @@ func (of *openFaasExecutor) Handle(req *HttpRequest, response *HttpResponse) err err := flowExecutor.Resume(requestId) if err != nil { log.Printf(err.Error()) - return fmt.Errorf("Failed to resume request " + requestId + ", check if request is active") + return fmt.Errorf("failed to resume request " + requestId + ", check if request is active") } response.Body = []byte("Successfully resumed request " + requestId) diff --git a/template/faas-flow/main.go b/template/faas-flow/main.go index 7c60eed8..a1a2c289 100644 --- a/template/faas-flow/main.go +++ b/template/faas-flow/main.go @@ -72,7 +72,9 @@ func makeRequestHandler() func(http.ResponseWriter, *http.Request) { } if responseErr != nil { - fmt.Printf("[ Failed ] %v\n", responseErr) + errorStr := fmt.Sprintf("[ Failed ] %v\n", responseErr) + fmt.Printf(errorStr) + w.Write([]byte(errorStr)) w.WriteHeader(http.StatusInternalServerError) } else { if response.StatusCode == 0 { diff --git a/template/faas-flow/vendor/github.com/s8sg/faas-flow/sdk/executor/executor.go b/template/faas-flow/vendor/github.com/s8sg/faas-flow/sdk/executor/executor.go index 58c61c33..45aa6743 100644 --- a/template/faas-flow/vendor/github.com/s8sg/faas-flow/sdk/executor/executor.go +++ b/template/faas-flow/vendor/github.com/s8sg/faas-flow/sdk/executor/executor.go @@ -292,7 +292,7 @@ func (fexec *FlowExecutor) retrievePartialStates() ([]*PartialState, error) { func (fexec *FlowExecutor) isActive() bool { state, err := fexec.getRequestState() if err != nil { - fexec.log("[Request `%s`] Failed to obtain pipeline state\n", fexec.id) + fexec.log("[Request `%s`] Failed to obtain pipeline state, error %v\n", fexec.id, err) return false } @@ -1209,13 +1209,12 @@ func (fexec *FlowExecutor) Execute(state ExecutionStateOption) ([]byte, error) { if !fexec.partial { // For a new dag pipeline that has edges Create the vertex in stateStore - if fexec.hasEdge { - serr := fexec.setRequestState(STATE_RUNNING) - if serr != nil { - return nil, fmt.Errorf("[Request `%s`] Failed to mark dag state, error %v", fexec.id, serr) - } - fexec.log("[Request `%s`] DAG state initiated at StateStore\n", fexec.id) + serr := fexec.setRequestState(STATE_RUNNING) + if serr != nil { + return nil, fmt.Errorf("[Request `%s`] Failed to mark dag state, error %v", fexec.id, serr) } + fexec.log("[Request `%s`] DAG state initiated at StateStore\n", fexec.id) + // set the execution position to initial node // On the 0th depth set the initial node as the current execution position