Skip to content

Commit

Permalink
fix(workflows/syncer): skip handling new registration if engine running
Browse files Browse the repository at this point in the history
  • Loading branch information
MStreet3 committed Dec 9, 2024
1 parent 45898fc commit aff4116
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 14 deletions.
20 changes: 13 additions & 7 deletions core/services/workflows/syncer/engine_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,30 +3,36 @@ package syncer
import (
"errors"
"sync"

"github.com/smartcontractkit/chainlink/v2/core/services/workflows"
)

// IsReadyCloser is an abstraction for engines that can be checked for readiness and closed.
type IsReadyCloser interface {
// Ready returns nil if the engine is ready to be used.
Ready() error

// Close closes the engine.
Close() error
}
type engineRegistry struct {
engines map[string]*workflows.Engine
engines map[string]IsReadyCloser
mu sync.RWMutex
}

func newEngineRegistry() *engineRegistry {
return &engineRegistry{
engines: make(map[string]*workflows.Engine),
engines: make(map[string]IsReadyCloser),
}
}

// Add adds an engine to the registry.
func (r *engineRegistry) Add(id string, engine *workflows.Engine) {
func (r *engineRegistry) Add(id string, engine IsReadyCloser) {
r.mu.Lock()
defer r.mu.Unlock()
r.engines[id] = engine
}

// Get retrieves an engine from the registry.
func (r *engineRegistry) Get(id string) (*workflows.Engine, error) {
func (r *engineRegistry) Get(id string) (IsReadyCloser, error) {
r.mu.RLock()
defer r.mu.RUnlock()
engine, found := r.engines[id]
Expand All @@ -49,7 +55,7 @@ func (r *engineRegistry) IsRunning(id string) bool {
}

// Pop removes an engine from the registry and returns the engine if found.
func (r *engineRegistry) Pop(id string) (*workflows.Engine, error) {
func (r *engineRegistry) Pop(id string) (IsReadyCloser, error) {
r.mu.Lock()
defer r.mu.Unlock()
engine, ok := r.engines[id]
Expand Down
5 changes: 5 additions & 0 deletions core/services/workflows/syncer/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,11 @@ func (h *eventHandler) workflowRegisteredEvent(
return fmt.Errorf("workflowID mismatch: %x != %x", hash, payload.WorkflowID)
}

// Ensure that there is no running workflow engine for the given workflow ID.
if h.engineRegistry.IsRunning(hex.EncodeToString(payload.WorkflowID[:])) {
return fmt.Errorf("workflow is already running, so not starting it : %s", hex.EncodeToString(payload.WorkflowID[:]))
}

// Save the workflow secrets
urlHash, err := h.orm.GetSecretsURLHash(payload.Owner, []byte(payload.SecretsURL))
if err != nil {
Expand Down
72 changes: 66 additions & 6 deletions core/services/workflows/syncer/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,30 @@ func newMockFetcher(m map[string]mockFetchResp) FetcherFunc {
return (&mockFetcher{responseMap: m}).Fetch
}

type mockEngine struct {
CloseErr error
ReadyErr error
}

func newMockEngine(errs ...error) *mockEngine {
e := &mockEngine{}
if len(errs) > 0 {
e.ReadyErr = errs[0]
}
if len(errs) > 1 {
e.CloseErr = errs[1]
}
return e
}

func (m *mockEngine) Ready() error {
return m.ReadyErr
}

func (m *mockEngine) Close() error {
return m.CloseErr
}

func Test_Handler(t *testing.T) {
lggr := logger.TestLogger(t)
emitter := custmsg.NewLabeler()
Expand Down Expand Up @@ -179,7 +203,10 @@ func Test_workflowRegisteredHandler(t *testing.T) {
var config = []byte("")
var wfOwner = []byte("0xOwner")
var binary = wasmtest.CreateTestBinary(binaryCmd, binaryLocation, true, t)
defaultValidationFn := func(t *testing.T, ctx context.Context, h *eventHandler, wfOwner []byte, wfName string, wfID string) {
defaultValidationFn := func(t *testing.T, ctx context.Context, event WorkflowRegistryWorkflowRegisteredV1, h *eventHandler, wfOwner []byte, wfName string, wfID string) {
err := h.workflowRegisteredEvent(ctx, event)
require.NoError(t, err)

// Verify the record is updated in the database
dbSpec, err := h.orm.GetWorkflowSpec(ctx, hex.EncodeToString(wfOwner), "workflow-name")
require.NoError(t, err)
Expand Down Expand Up @@ -221,6 +248,38 @@ func Test_workflowRegisteredHandler(t *testing.T) {
},
validationFn: defaultValidationFn,
},
{
Name: "fails if running engine exists",
fetcher: newMockFetcher(map[string]mockFetchResp{
binaryURL: {Body: binary, Err: nil},
configURL: {Body: config, Err: nil},
secretsURL: {Body: []byte("secrets"), Err: nil},
}),
GiveConfig: config,
ConfigURL: configURL,
SecretsURL: secretsURL,
BinaryURL: binaryURL,
GiveBinary: binary,
WFOwner: wfOwner,
Event: func(wfID []byte) WorkflowRegistryWorkflowRegisteredV1 {
return WorkflowRegistryWorkflowRegisteredV1{
Status: uint8(0),
WorkflowID: [32]byte(wfID),
Owner: wfOwner,
WorkflowName: "workflow-name",
BinaryURL: binaryURL,
ConfigURL: configURL,
SecretsURL: secretsURL,
}
},
validationFn: func(t *testing.T, ctx context.Context, event WorkflowRegistryWorkflowRegisteredV1, h *eventHandler, wfOwner []byte, wfName string, wfID string) {
me := newMockEngine()
h.engineRegistry.Add(wfID, me)
err := h.workflowRegisteredEvent(ctx, event)
require.Error(t, err)
require.ErrorContains(t, err, "workflow is already running")
},
},
{
Name: "success with paused workflow registered",
fetcher: newMockFetcher(map[string]mockFetchResp{
Expand All @@ -245,7 +304,10 @@ func Test_workflowRegisteredHandler(t *testing.T) {
SecretsURL: secretsURL,
}
},
validationFn: func(t *testing.T, ctx context.Context, h *eventHandler, wfOwner []byte, wfName string, wfID string) {
validationFn: func(t *testing.T, ctx context.Context, event WorkflowRegistryWorkflowRegisteredV1, h *eventHandler, wfOwner []byte, wfName string, wfID string) {
err := h.workflowRegisteredEvent(ctx, event)
require.NoError(t, err)

// Verify the record is updated in the database
dbSpec, err := h.orm.GetWorkflowSpec(ctx, hex.EncodeToString(wfOwner), "workflow-name")
require.NoError(t, err)
Expand Down Expand Up @@ -322,7 +384,7 @@ type testCase struct {
WFOwner []byte
fetcher FetcherFunc
Event func([]byte) WorkflowRegistryWorkflowRegisteredV1
validationFn func(t *testing.T, ctx context.Context, h *eventHandler, wfOwner []byte, wfName string, wfID string)
validationFn func(t *testing.T, ctx context.Context, event WorkflowRegistryWorkflowRegisteredV1, h *eventHandler, wfOwner []byte, wfName string, wfID string)
}

func testRunningWorkflow(t *testing.T, cmd testCase) {
Expand Down Expand Up @@ -363,10 +425,8 @@ func testRunningWorkflow(t *testing.T, cmd testCase) {
capRegistry: registry,
workflowStore: store,
}
err = h.workflowRegisteredEvent(ctx, event)
require.NoError(t, err)

cmd.validationFn(t, ctx, h, wfOwner, "workflow-name", wfID)
cmd.validationFn(t, ctx, event, h, wfOwner, "workflow-name", wfID)
})
}

Expand Down
2 changes: 1 addition & 1 deletion core/services/workflows/syncer/workflow_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ func (w *workflowRegistry) handlerLoop(ctx context.Context) {
event := resp.Event
w.lggr.Debugf("handling event: %+v", event)
if err := w.handler.Handle(ctx, *event); err != nil {
w.lggr.Errorf("failed to handle event: %+v", event)
w.lggr.Errorf("failed to handle event: %s : %+v", err, event)
continue
}
}
Expand Down

0 comments on commit aff4116

Please sign in to comment.