Skip to content

Commit

Permalink
refactor(syncer): address test changes
Browse files Browse the repository at this point in the history
  • Loading branch information
MStreet3 committed Dec 10, 2024
1 parent a0cb505 commit 60be59d
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (

"github.com/smartcontractkit/chainlink-common/pkg/capabilities"
"github.com/smartcontractkit/chainlink-common/pkg/custmsg"
"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-common/pkg/services/servicetest"
"github.com/smartcontractkit/chainlink-common/pkg/types"
"github.com/smartcontractkit/chainlink-common/pkg/workflows"
Expand Down Expand Up @@ -317,15 +316,11 @@ func (m *mockService) Start(context.Context) error { return nil }

func (m *mockService) Close() error { return nil }

func (m *mockService) HealthReport() map[string]error { return map[string]error{"svc": nil} }

func (m *mockService) Ready() error { return nil }

func (m *mockService) Name() string { return "svc" }

type mockEngineFactory struct{}

func (m *mockEngineFactory) new(ctx context.Context, wfid string, owner string, name string, config []byte, binary []byte) (services.Service, error) {
func (m *mockEngineFactory) new(ctx context.Context, wfid string, owner string, name string, config []byte, binary []byte) (syncer.StartReadyCloser, error) {
return &mockService{}, nil
}

Expand Down
17 changes: 10 additions & 7 deletions core/services/workflows/syncer/engine_registry.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
package syncer

import (
"context"
"errors"
"sync"
)

// ReadyCloser is an abstraction for engines that can be checked for readiness and closed.
type ReadyCloser interface {
// StartReadyCloser is an abstraction for engines that can be checked for readiness and closed.
type StartReadyCloser interface {
Start(context.Context) error

// Ready returns nil if the engine is ready to be used.
Ready() error

Expand All @@ -15,25 +18,25 @@ type ReadyCloser interface {
}

type EngineRegistry struct {
engines map[string]ReadyCloser
engines map[string]StartReadyCloser
mu sync.RWMutex
}

func NewEngineRegistry() *EngineRegistry {
return &EngineRegistry{
engines: make(map[string]ReadyCloser),
engines: make(map[string]StartReadyCloser),
}
}

// Add adds an engine to the registry.
func (r *EngineRegistry) Add(id string, engine ReadyCloser) {
func (r *EngineRegistry) Add(id string, engine StartReadyCloser) {
r.mu.Lock()
defer r.mu.Unlock()
r.engines[id] = engine
}

// Get retrieves an engine from the registry.
func (r *EngineRegistry) Get(id string) (ReadyCloser, error) {
func (r *EngineRegistry) Get(id string) (StartReadyCloser, error) {
r.mu.RLock()
defer r.mu.RUnlock()
engine, found := r.engines[id]
Expand All @@ -56,7 +59,7 @@ func (r *EngineRegistry) IsRunning(id string) bool {
}

// Pop removes an engine from the registry and returns the engine if found.
func (r *EngineRegistry) Pop(id string) (ReadyCloser, error) {
func (r *EngineRegistry) Pop(id string) (StartReadyCloser, error) {
r.mu.Lock()
defer r.mu.Unlock()
engine, ok := r.engines[id]
Expand Down
5 changes: 2 additions & 3 deletions core/services/workflows/syncer/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"github.com/jonboulle/clockwork"

"github.com/smartcontractkit/chainlink-common/pkg/custmsg"
"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-common/pkg/types/core"
pkgworkflows "github.com/smartcontractkit/chainlink-common/pkg/workflows"
"github.com/smartcontractkit/chainlink-common/pkg/workflows/secrets"
Expand Down Expand Up @@ -127,7 +126,7 @@ func newLastFetchedAtMap() *lastFetchedAtMap {
}
}

type engineFactoryFn func(ctx context.Context, wfid string, owner string, name string, config []byte, binary []byte) (services.Service, error)
type engineFactoryFn func(ctx context.Context, wfid string, owner string, name string, config []byte, binary []byte) (StartReadyCloser, error)

// eventHandler is a handler for WorkflowRegistryEvent events. Each event type has a corresponding
// method that handles the event.
Expand Down Expand Up @@ -498,7 +497,7 @@ func (h *eventHandler) workflowRegisteredEvent(
return nil
}

func (h *eventHandler) engineFactoryFn(ctx context.Context, id string, owner string, name string, config []byte, binary []byte) (services.Service, error) {
func (h *eventHandler) engineFactoryFn(ctx context.Context, id string, owner string, name string, config []byte, binary []byte) (StartReadyCloser, error) {
moduleConfig := &host.ModuleConfig{Logger: h.lggr, Labeler: h.emitter}
sdkSpec, err := host.GetWorkflowSpec(ctx, moduleConfig, binary, config)
if err != nil {
Expand Down
112 changes: 71 additions & 41 deletions core/services/workflows/syncer/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,17 +50,7 @@ func newMockFetcher(m map[string]mockFetchResp) FetcherFunc {
type mockEngine struct {
CloseErr error
ReadyErr error
}

func newMockEngine(errs ...error) *mockEngine {
e := &mockEngine{}
if len(errs) > 0 {
e.ReadyErr = errs[0]
}
if len(errs) > 1 {
e.CloseErr = errs[1]
}
return e
StartErr error
}

func (m *mockEngine) Ready() error {
Expand All @@ -71,6 +61,10 @@ func (m *mockEngine) Close() error {
return m.CloseErr
}

func (m *mockEngine) Start(_ context.Context) error {
return m.StartErr
}

func Test_Handler(t *testing.T) {
lggr := logger.TestLogger(t)
emitter := custmsg.NewLabeler()
Expand Down Expand Up @@ -232,6 +226,9 @@ func Test_workflowRegisteredHandler(t *testing.T) {
configURL: {Body: config, Err: nil},
secretsURL: {Body: []byte("secrets"), Err: nil},
}),
engineFactoryFn: func(ctx context.Context, wfid string, owner string, name string, config []byte, binary []byte) (StartReadyCloser, error) {
return &mockEngine{}, nil
},
GiveConfig: config,
ConfigURL: configURL,
SecretsURL: secretsURL,
Expand All @@ -251,10 +248,43 @@ func Test_workflowRegisteredHandler(t *testing.T) {
},
validationFn: defaultValidationFn,
},
{
Name: "fails to start engine",
fetcher: newMockFetcher(map[string]mockFetchResp{
binaryURL: {Body: encodedBinary, Err: nil},
configURL: {Body: config, Err: nil},
secretsURL: {Body: []byte("secrets"), Err: nil},
}),
engineFactoryFn: func(ctx context.Context, wfid string, owner string, name string, config []byte, binary []byte) (StartReadyCloser, error) {
return &mockEngine{StartErr: assert.AnError}, nil
},
GiveConfig: config,
ConfigURL: configURL,
SecretsURL: secretsURL,
BinaryURL: binaryURL,
GiveBinary: binary,
WFOwner: wfOwner,
Event: func(wfID []byte) WorkflowRegistryWorkflowRegisteredV1 {
return WorkflowRegistryWorkflowRegisteredV1{
Status: uint8(0),
WorkflowID: [32]byte(wfID),
WorkflowOwner: wfOwner,
WorkflowName: "workflow-name",
BinaryURL: binaryURL,
ConfigURL: configURL,
SecretsURL: secretsURL,
}
},
validationFn: func(t *testing.T, ctx context.Context, event WorkflowRegistryWorkflowRegisteredV1, h *eventHandler, wfOwner []byte, wfName string, wfID string) {
err := h.workflowRegisteredEvent(ctx, event)
require.Error(t, err)
require.ErrorIs(t, err, assert.AnError)
},
},
{
Name: "fails if running engine exists",
fetcher: newMockFetcher(map[string]mockFetchResp{
binaryURL: {Body: binary, Err: nil},
binaryURL: {Body: encodedBinary, Err: nil},
configURL: {Body: config, Err: nil},
secretsURL: {Body: []byte("secrets"), Err: nil},
}),
Expand All @@ -276,7 +306,7 @@ func Test_workflowRegisteredHandler(t *testing.T) {
}
},
validationFn: func(t *testing.T, ctx context.Context, event WorkflowRegistryWorkflowRegisteredV1, h *eventHandler, wfOwner []byte, wfName string, wfID string) {
me := newMockEngine()
me := &mockEngine{}
h.engineRegistry.Add(wfID, me)
err := h.workflowRegisteredEvent(ctx, event)
require.Error(t, err)
Expand Down Expand Up @@ -378,58 +408,58 @@ func Test_workflowRegisteredHandler(t *testing.T) {
}

type testCase struct {
Name string
SecretsURL string
BinaryURL string
GiveBinary []byte
GiveConfig []byte
ConfigURL string
WFOwner []byte
fetcher FetcherFunc
Event func([]byte) WorkflowRegistryWorkflowRegisteredV1
validationFn func(t *testing.T, ctx context.Context, event WorkflowRegistryWorkflowRegisteredV1, h *eventHandler, wfOwner []byte, wfName string, wfID string)
Name string
SecretsURL string
BinaryURL string
GiveBinary []byte
GiveConfig []byte
ConfigURL string
WFOwner []byte
fetcher FetcherFunc
Event func([]byte) WorkflowRegistryWorkflowRegisteredV1
validationFn func(t *testing.T, ctx context.Context, event WorkflowRegistryWorkflowRegisteredV1, h *eventHandler, wfOwner []byte, wfName string, wfID string)
engineFactoryFn func(ctx context.Context, wfid string, owner string, name string, config []byte, binary []byte) (StartReadyCloser, error)
}

func testRunningWorkflow(t *testing.T, cmd testCase) {
func testRunningWorkflow(t *testing.T, tc testCase) {
t.Helper()
t.Run(cmd.Name, func(t *testing.T) {
t.Run(tc.Name, func(t *testing.T) {
var (
ctx = testutils.Context(t)
lggr = logger.TestLogger(t)
db = pgtest.NewSqlxDB(t)
orm = NewWorkflowRegistryDS(db, lggr)
emitter = custmsg.NewLabeler()

binary = cmd.GiveBinary
config = cmd.GiveConfig
secretsURL = cmd.SecretsURL
wfOwner = cmd.WFOwner
binary = tc.GiveBinary
config = tc.GiveConfig
secretsURL = tc.SecretsURL
wfOwner = tc.WFOwner

fetcher = cmd.fetcher
fetcher = tc.fetcher
)

giveWFID, err := pkgworkflows.GenerateWorkflowID(wfOwner, binary, config, secretsURL)
require.NoError(t, err)

wfID := hex.EncodeToString(giveWFID[:])

event := cmd.Event(giveWFID[:])
event := tc.Event(giveWFID[:])

er := NewEngineRegistry()
opts := []func(*eventHandler){
WithEngineRegistry(er),
}
if tc.engineFactoryFn != nil {
opts = append(opts, WithEngineFactoryFn(tc.engineFactoryFn))
}
store := wfstore.NewDBStore(db, lggr, clockwork.NewFakeClock())
registry := capabilities.NewRegistry(lggr)
registry.SetLocalRegistry(&capabilities.TestMetadataRegistry{})
h := &eventHandler{
lggr: lggr,
orm: orm,
fetcher: fetcher,
emitter: emitter,
engineRegistry: er,
capRegistry: registry,
workflowStore: store,
}
h := NewEventHandler(lggr, orm, fetcher, store, registry, emitter, clockwork.NewFakeClock(),
workflowkey.Key{}, opts...)

cmd.validationFn(t, ctx, event, h, wfOwner, "workflow-name", wfID)
tc.validationFn(t, ctx, event, h, wfOwner, "workflow-name", wfID)
})
}

Expand Down

0 comments on commit 60be59d

Please sign in to comment.