Skip to content

Commit

Permalink
Merge pull request #107 from s8sg/inmemory-store
Browse files Browse the repository at this point in the history
In-memory store
  • Loading branch information
s8sg authored Aug 27, 2019
2 parents e5e6805 + ce502ed commit b92863d
Show file tree
Hide file tree
Showing 6 changed files with 98 additions and 22 deletions.
13 changes: 6 additions & 7 deletions sdk/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ func (fexec *FlowExecutor) retrievePartialStates() ([]*PartialState, error) {
func (fexec *FlowExecutor) isActive() bool {
state, err := fexec.getRequestState()
if err != nil {
fexec.log("[Request `%s`] Failed to obtain pipeline state\n", fexec.id)
fexec.log("[Request `%s`] Failed to obtain pipeline state, error %v\n", fexec.id, err)
return false
}

Expand Down Expand Up @@ -1209,13 +1209,12 @@ func (fexec *FlowExecutor) Execute(state ExecutionStateOption) ([]byte, error) {
if !fexec.partial {

// For a new dag pipeline that has edges Create the vertex in stateStore
if fexec.hasEdge {
serr := fexec.setRequestState(STATE_RUNNING)
if serr != nil {
return nil, fmt.Errorf("[Request `%s`] Failed to mark dag state, error %v", fexec.id, serr)
}
fexec.log("[Request `%s`] DAG state initiated at StateStore\n", fexec.id)
serr := fexec.setRequestState(STATE_RUNNING)
if serr != nil {
return nil, fmt.Errorf("[Request `%s`] Failed to mark dag state, error %v", fexec.id, serr)
}
fexec.log("[Request `%s`] DAG state initiated at StateStore\n", fexec.id)


// set the execution position to initial node
// On the 0th depth set the initial node as the current execution position
Expand Down
64 changes: 64 additions & 0 deletions template/faas-flow/default_state_store.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package main

import (
"fmt"
"sync"
)

type DefaultStateStore struct {
flowName string
requestId string
cache map[string]string
mux sync.Mutex
}

// Configure the StateStore with flow name and request ID
func (ss *DefaultStateStore) Configure(flowName string, requestId string) {
}

// Initialize the StateStore (called only once in a request span)
func (ss *DefaultStateStore) Init() error {
ss.cache = make(map[string]string)
return nil
}

// Set a value (override existing, or create one)
func (ss *DefaultStateStore) Set(key string, value string) error {
ss.mux.Lock()
ss.cache[key] = value
ss.mux.Unlock()
return nil
}

// Get a value
func (ss *DefaultStateStore) Get(key string) (string, error) {
ss.mux.Lock()
value, ok := ss.cache[key]
ss.mux.Unlock()
if !ok {
return "", fmt.Errorf("key not found")
}
return value, nil
}

// Compare and Update a value
func (ss *DefaultStateStore) Update(key string, oldValue string, newValue string) error {
ss.mux.Lock()
value, ok := ss.cache[key]
if !ok {
ss.mux.Unlock()
return fmt.Errorf("key not found")
}
if value != oldValue {
ss.mux.Unlock()
return fmt.Errorf("value doesn't match")
}
ss.cache[key] = newValue
ss.mux.Unlock()
return nil
}

// Cleanup all the resources in StateStore (called only once in a request span)
func (ss *DefaultStateStore) Cleanup() error {
return nil
}
5 changes: 4 additions & 1 deletion template/faas-flow/function/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
faasflow "github.com/s8sg/faas-flow"
)

// Define provide definiton of the workflow
// Define provide definition of the workflow
func Define(flow *faasflow.Workflow, context *faasflow.Context) (err error) {
flow.SyncNode().Modify(func(data []byte) ([]byte, error) {
return []byte(fmt.Sprintf("you said \"%s\"", string(data))), nil
Expand All @@ -15,6 +15,9 @@ func Define(flow *faasflow.Workflow, context *faasflow.Context) (err error) {

// DefineStateStore provides the override of the default StateStore
func DefineStateStore() (faasflow.StateStore, error) {
// NOTE: By default FaaS-Flow use a DefaultStateStore
// It stores request state in memory,
// for a distributed flow use external synchronous KV store (e.g. ETCD)
return nil, nil
}

Expand Down
21 changes: 15 additions & 6 deletions template/faas-flow/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,11 +239,20 @@ func (of *openFaasExecutor) GetLogger() (sdk.Logger, error) {
}

func (of *openFaasExecutor) GetStateStore() (sdk.StateStore, error) {
return function.DefineStateStore()
stateStore, err := function.DefineStateStore()
if err != nil {
return stateStore, err
}
if stateStore == nil {
log.Print("using DefaultStateStore, distributed request may fail")
stateStore = &DefaultStateStore{}
}
return stateStore, nil
}

func (of *openFaasExecutor) GetDataStore() (sdk.DataStore, error) {
return function.DefineDataStore()
stateStore, err := function.DefineDataStore()
return stateStore, err
}

// internal
Expand Down Expand Up @@ -272,7 +281,7 @@ func (of *openFaasExecutor) Handle(req *HttpRequest, response *HttpResponse) err

err := of.init(req)
if err != nil {
panic(err.Error())
return err
}

switch {
Expand All @@ -290,7 +299,7 @@ func (of *openFaasExecutor) Handle(req *HttpRequest, response *HttpResponse) err
err := flowExecutor.Stop(requestId)
if err != nil {
log.Printf(err.Error())
return fmt.Errorf("Failed to stop request " + requestId + ", check if request is active")
return fmt.Errorf("failed to stop request " + requestId + ", check if request is active")
}
response.Body = []byte("Successfully stopped request " + requestId)

Expand All @@ -300,7 +309,7 @@ func (of *openFaasExecutor) Handle(req *HttpRequest, response *HttpResponse) err
err := flowExecutor.Pause(requestId)
if err != nil {
log.Printf(err.Error())
return fmt.Errorf("Failed to pause request " + requestId + ", check if request is active")
return fmt.Errorf("failed to pause request " + requestId + ", check if request is active")
}
response.Body = []byte("Successfully paused request " + requestId)

Expand All @@ -310,7 +319,7 @@ func (of *openFaasExecutor) Handle(req *HttpRequest, response *HttpResponse) err
err := flowExecutor.Resume(requestId)
if err != nil {
log.Printf(err.Error())
return fmt.Errorf("Failed to resume request " + requestId + ", check if request is active")
return fmt.Errorf("failed to resume request " + requestId + ", check if request is active")
}
response.Body = []byte("Successfully resumed request " + requestId)

Expand Down
4 changes: 3 additions & 1 deletion template/faas-flow/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,9 @@ func makeRequestHandler() func(http.ResponseWriter, *http.Request) {
}

if responseErr != nil {
fmt.Printf("[ Failed ] %v\n", responseErr)
errorStr := fmt.Sprintf("[ Failed ] %v\n", responseErr)
fmt.Printf(errorStr)
w.Write([]byte(errorStr))
w.WriteHeader(http.StatusInternalServerError)
} else {
if response.StatusCode == 0 {
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit b92863d

Please sign in to comment.