Skip to content

Commit

Permalink
Add default state store
Browse files Browse the repository at this point in the history
Signed-off-by: s8sg <[email protected]>
  • Loading branch information
s8sg committed Aug 27, 2019
1 parent e5e6805 commit 18f2bae
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 8 deletions.
69 changes: 69 additions & 0 deletions template/faas-flow/default_state_store.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package main

import (
"fmt"
"sync"
)

type DefaultStateStore struct {
flowName string
requestId string
keyDecorator func(string) string
cache map[string]string
mux sync.Mutex
}

// Configure the StateStore with flow name and request ID
func (ss *DefaultStateStore) Configure(flowName string, requestId string) {
ss.keyDecorator = func(key string) string {
dKey := flowName + "-" + requestId + "-" + key
return dKey
}
}

// Initialize the StateStore (called only once in a request span)
func (ss *DefaultStateStore) Init() error {
ss.cache = make(map[string]string)
return nil
}

// Set a value (override existing, or create one)
func (ss *DefaultStateStore) Set(key string, value string) error {
ss.mux.Lock()
ss.cache[ss.keyDecorator(key)] = value
ss.mux.Unlock()
return nil
}

// Get a value
func (ss *DefaultStateStore) Get(key string) (string, error) {
ss.mux.Lock()
value, ok := ss.cache[ss.keyDecorator(key)]
ss.mux.Unlock()
if !ok {
return "", fmt.Errorf("key not found")
}
return value, nil
}

// Compare and Update a value
func (ss *DefaultStateStore) Update(key string, oldValue string, newValue string) error {
ss.mux.Lock()
value, ok := ss.cache[ss.keyDecorator(key)]
if !ok {
ss.mux.Unlock()
return fmt.Errorf("key not found")
}
if value != oldValue {
ss.mux.Unlock()
return fmt.Errorf("value doesn't match")
}
ss.cache[ss.keyDecorator(key)] = newValue
ss.mux.Unlock()
return nil
}

// Cleanup all the resources in StateStore (called only once in a request span)
func (ss *DefaultStateStore) Cleanup() error {
return nil
}
5 changes: 4 additions & 1 deletion template/faas-flow/function/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
faasflow "github.com/s8sg/faas-flow"
)

// Define provide definiton of the workflow
// Define provide definition of the workflow
func Define(flow *faasflow.Workflow, context *faasflow.Context) (err error) {
flow.SyncNode().Modify(func(data []byte) ([]byte, error) {
return []byte(fmt.Sprintf("you said \"%s\"", string(data))), nil
Expand All @@ -15,6 +15,9 @@ func Define(flow *faasflow.Workflow, context *faasflow.Context) (err error) {

// DefineStateStore provides the override of the default StateStore
func DefineStateStore() (faasflow.StateStore, error) {
// NOTE: By default FaaS-Flow use a DefaultStateStore
// It stores request state in memory,
// for a distributed flow use external synchronous KV store (e.g. ETCD)
return nil, nil
}

Expand Down
21 changes: 15 additions & 6 deletions template/faas-flow/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,11 +239,20 @@ func (of *openFaasExecutor) GetLogger() (sdk.Logger, error) {
}

func (of *openFaasExecutor) GetStateStore() (sdk.StateStore, error) {
return function.DefineStateStore()
stateStore, err := function.DefineStateStore()
if err != nil {
return stateStore, err
}
if stateStore == nil {
log.Print("using DefaultStateStore, distributed request may fail")
stateStore = &DefaultStateStore{}
}
return stateStore, nil
}

func (of *openFaasExecutor) GetDataStore() (sdk.DataStore, error) {
return function.DefineDataStore()
stateStore, err := function.DefineDataStore()
return stateStore, err
}

// internal
Expand Down Expand Up @@ -272,7 +281,7 @@ func (of *openFaasExecutor) Handle(req *HttpRequest, response *HttpResponse) err

err := of.init(req)
if err != nil {
panic(err.Error())
return err
}

switch {
Expand All @@ -290,7 +299,7 @@ func (of *openFaasExecutor) Handle(req *HttpRequest, response *HttpResponse) err
err := flowExecutor.Stop(requestId)
if err != nil {
log.Printf(err.Error())
return fmt.Errorf("Failed to stop request " + requestId + ", check if request is active")
return fmt.Errorf("failed to stop request " + requestId + ", check if request is active")
}
response.Body = []byte("Successfully stopped request " + requestId)

Expand All @@ -300,7 +309,7 @@ func (of *openFaasExecutor) Handle(req *HttpRequest, response *HttpResponse) err
err := flowExecutor.Pause(requestId)
if err != nil {
log.Printf(err.Error())
return fmt.Errorf("Failed to pause request " + requestId + ", check if request is active")
return fmt.Errorf("failed to pause request " + requestId + ", check if request is active")
}
response.Body = []byte("Successfully paused request " + requestId)

Expand All @@ -310,7 +319,7 @@ func (of *openFaasExecutor) Handle(req *HttpRequest, response *HttpResponse) err
err := flowExecutor.Resume(requestId)
if err != nil {
log.Printf(err.Error())
return fmt.Errorf("Failed to resume request " + requestId + ", check if request is active")
return fmt.Errorf("failed to resume request " + requestId + ", check if request is active")
}
response.Body = []byte("Successfully resumed request " + requestId)

Expand Down
4 changes: 3 additions & 1 deletion template/faas-flow/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,9 @@ func makeRequestHandler() func(http.ResponseWriter, *http.Request) {
}

if responseErr != nil {
fmt.Printf("[ Failed ] %v\n", responseErr)
errorStr := fmt.Sprintf("[ Failed ] %v\n", responseErr)
fmt.Printf(errorStr)
w.Write([]byte(errorStr))
w.WriteHeader(http.StatusInternalServerError)
} else {
if response.StatusCode == 0 {
Expand Down

0 comments on commit 18f2bae

Please sign in to comment.