diff --git a/sdk/executor/executor.go b/sdk/executor/executor.go index 58c61c33..45aa6743 100644 --- a/sdk/executor/executor.go +++ b/sdk/executor/executor.go @@ -292,7 +292,7 @@ func (fexec *FlowExecutor) retrievePartialStates() ([]*PartialState, error) { func (fexec *FlowExecutor) isActive() bool { state, err := fexec.getRequestState() if err != nil { - fexec.log("[Request `%s`] Failed to obtain pipeline state\n", fexec.id) + fexec.log("[Request `%s`] Failed to obtain pipeline state, error %v\n", fexec.id, err) return false } @@ -1209,13 +1209,12 @@ func (fexec *FlowExecutor) Execute(state ExecutionStateOption) ([]byte, error) { if !fexec.partial { // For a new dag pipeline that has edges Create the vertex in stateStore - if fexec.hasEdge { - serr := fexec.setRequestState(STATE_RUNNING) - if serr != nil { - return nil, fmt.Errorf("[Request `%s`] Failed to mark dag state, error %v", fexec.id, serr) - } - fexec.log("[Request `%s`] DAG state initiated at StateStore\n", fexec.id) + serr := fexec.setRequestState(STATE_RUNNING) + if serr != nil { + return nil, fmt.Errorf("[Request `%s`] Failed to mark dag state, error %v", fexec.id, serr) } + fexec.log("[Request `%s`] DAG state initiated at StateStore\n", fexec.id) + // set the execution position to initial node // On the 0th depth set the initial node as the current execution position diff --git a/template/faas-flow/default_state_store.go b/template/faas-flow/default_state_store.go index ca80742f..49b0fffb 100644 --- a/template/faas-flow/default_state_store.go +++ b/template/faas-flow/default_state_store.go @@ -6,19 +6,14 @@ import ( ) type DefaultStateStore struct { - flowName string - requestId string - keyDecorator func(string) string - cache map[string]string - mux sync.Mutex + flowName string + requestId string + cache map[string]string + mux sync.Mutex } // Configure the StateStore with flow name and request ID func (ss *DefaultStateStore) Configure(flowName string, requestId string) { - ss.keyDecorator = func(key string) string { - dKey := flowName + "-" + requestId + "-" + key - return dKey - } } // Initialize the StateStore (called only once in a request span) @@ -30,7 +25,7 @@ func (ss *DefaultStateStore) Init() error { // Set a value (override existing, or create one) func (ss *DefaultStateStore) Set(key string, value string) error { ss.mux.Lock() - ss.cache[ss.keyDecorator(key)] = value + ss.cache[key] = value ss.mux.Unlock() return nil } @@ -38,7 +33,7 @@ func (ss *DefaultStateStore) Set(key string, value string) error { // Get a value func (ss *DefaultStateStore) Get(key string) (string, error) { ss.mux.Lock() - value, ok := ss.cache[ss.keyDecorator(key)] + value, ok := ss.cache[key] ss.mux.Unlock() if !ok { return "", fmt.Errorf("key not found") @@ -49,7 +44,7 @@ func (ss *DefaultStateStore) Get(key string) (string, error) { // Compare and Update a value func (ss *DefaultStateStore) Update(key string, oldValue string, newValue string) error { ss.mux.Lock() - value, ok := ss.cache[ss.keyDecorator(key)] + value, ok := ss.cache[key] if !ok { ss.mux.Unlock() return fmt.Errorf("key not found") @@ -58,7 +53,7 @@ func (ss *DefaultStateStore) Update(key string, oldValue string, newValue string ss.mux.Unlock() return fmt.Errorf("value doesn't match") } - ss.cache[ss.keyDecorator(key)] = newValue + ss.cache[key] = newValue ss.mux.Unlock() return nil } diff --git a/template/faas-flow/vendor/github.com/s8sg/faas-flow/sdk/executor/executor.go b/template/faas-flow/vendor/github.com/s8sg/faas-flow/sdk/executor/executor.go index 58c61c33..45aa6743 100644 --- a/template/faas-flow/vendor/github.com/s8sg/faas-flow/sdk/executor/executor.go +++ b/template/faas-flow/vendor/github.com/s8sg/faas-flow/sdk/executor/executor.go @@ -292,7 +292,7 @@ func (fexec *FlowExecutor) retrievePartialStates() ([]*PartialState, error) { func (fexec *FlowExecutor) isActive() bool { state, err := fexec.getRequestState() if err != nil { - fexec.log("[Request `%s`] Failed to obtain pipeline state\n", fexec.id) + fexec.log("[Request `%s`] Failed to obtain pipeline state, error %v\n", fexec.id, err) return false } @@ -1209,13 +1209,12 @@ func (fexec *FlowExecutor) Execute(state ExecutionStateOption) ([]byte, error) { if !fexec.partial { // For a new dag pipeline that has edges Create the vertex in stateStore - if fexec.hasEdge { - serr := fexec.setRequestState(STATE_RUNNING) - if serr != nil { - return nil, fmt.Errorf("[Request `%s`] Failed to mark dag state, error %v", fexec.id, serr) - } - fexec.log("[Request `%s`] DAG state initiated at StateStore\n", fexec.id) + serr := fexec.setRequestState(STATE_RUNNING) + if serr != nil { + return nil, fmt.Errorf("[Request `%s`] Failed to mark dag state, error %v", fexec.id, serr) } + fexec.log("[Request `%s`] DAG state initiated at StateStore\n", fexec.id) + // set the execution position to initial node // On the 0th depth set the initial node as the current execution position