Skip to content

Commit

Permalink
Fix issue with executor
Browse files Browse the repository at this point in the history
Signed-off-by: s8sg <[email protected]>
  • Loading branch information
s8sg committed Aug 27, 2019
1 parent 18f2bae commit ce502ed
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 27 deletions.
13 changes: 6 additions & 7 deletions sdk/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ func (fexec *FlowExecutor) retrievePartialStates() ([]*PartialState, error) {
func (fexec *FlowExecutor) isActive() bool {
state, err := fexec.getRequestState()
if err != nil {
fexec.log("[Request `%s`] Failed to obtain pipeline state\n", fexec.id)
fexec.log("[Request `%s`] Failed to obtain pipeline state, error %v\n", fexec.id, err)
return false
}

Expand Down Expand Up @@ -1209,13 +1209,12 @@ func (fexec *FlowExecutor) Execute(state ExecutionStateOption) ([]byte, error) {
if !fexec.partial {

// For a new dag pipeline that has edges Create the vertex in stateStore
if fexec.hasEdge {
serr := fexec.setRequestState(STATE_RUNNING)
if serr != nil {
return nil, fmt.Errorf("[Request `%s`] Failed to mark dag state, error %v", fexec.id, serr)
}
fexec.log("[Request `%s`] DAG state initiated at StateStore\n", fexec.id)
serr := fexec.setRequestState(STATE_RUNNING)
if serr != nil {
return nil, fmt.Errorf("[Request `%s`] Failed to mark dag state, error %v", fexec.id, serr)
}
fexec.log("[Request `%s`] DAG state initiated at StateStore\n", fexec.id)


// set the execution position to initial node
// On the 0th depth set the initial node as the current execution position
Expand Down
21 changes: 8 additions & 13 deletions template/faas-flow/default_state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,14 @@ import (
)

type DefaultStateStore struct {
flowName string
requestId string
keyDecorator func(string) string
cache map[string]string
mux sync.Mutex
flowName string
requestId string
cache map[string]string
mux sync.Mutex
}

// Configure the StateStore with flow name and request ID
func (ss *DefaultStateStore) Configure(flowName string, requestId string) {
ss.keyDecorator = func(key string) string {
dKey := flowName + "-" + requestId + "-" + key
return dKey
}
}

// Initialize the StateStore (called only once in a request span)
Expand All @@ -30,15 +25,15 @@ func (ss *DefaultStateStore) Init() error {
// Set a value (override existing, or create one)
func (ss *DefaultStateStore) Set(key string, value string) error {
ss.mux.Lock()
ss.cache[ss.keyDecorator(key)] = value
ss.cache[key] = value
ss.mux.Unlock()
return nil
}

// Get a value
func (ss *DefaultStateStore) Get(key string) (string, error) {
ss.mux.Lock()
value, ok := ss.cache[ss.keyDecorator(key)]
value, ok := ss.cache[key]
ss.mux.Unlock()
if !ok {
return "", fmt.Errorf("key not found")
Expand All @@ -49,7 +44,7 @@ func (ss *DefaultStateStore) Get(key string) (string, error) {
// Compare and Update a value
func (ss *DefaultStateStore) Update(key string, oldValue string, newValue string) error {
ss.mux.Lock()
value, ok := ss.cache[ss.keyDecorator(key)]
value, ok := ss.cache[key]
if !ok {
ss.mux.Unlock()
return fmt.Errorf("key not found")
Expand All @@ -58,7 +53,7 @@ func (ss *DefaultStateStore) Update(key string, oldValue string, newValue string
ss.mux.Unlock()
return fmt.Errorf("value doesn't match")
}
ss.cache[ss.keyDecorator(key)] = newValue
ss.cache[key] = newValue
ss.mux.Unlock()
return nil
}
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit ce502ed

Please sign in to comment.