Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CAPPL-372 update workflow registry to use the same chainreader for initialisation and event reading #15753

Merged
merged 3 commits into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions core/services/chainlink/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,10 +307,6 @@ func NewApplication(opts ApplicationOpts) (Application, error) {
fetcher.Fetch, workflowstore.NewDBStore(opts.DS, lggr, clockwork.NewRealClock()), opts.CapabilitiesRegistry,
custmsg.NewLabeler(), clockwork.NewRealClock(), keys[0])

loader := syncer.NewWorkflowRegistryContractLoader(lggr, cfg.Capabilities().WorkflowRegistry().Address(), func(ctx context.Context, bytes []byte) (syncer.ContractReader, error) {
return relayer.NewContractReader(ctx, bytes)
}, eventHandler)

globalLogger.Debugw("Creating WorkflowRegistrySyncer")
wfSyncer := syncer.NewWorkflowRegistry(
lggr,
Expand All @@ -322,7 +318,6 @@ func NewApplication(opts ApplicationOpts) (Application, error) {
QueryCount: 100,
},
eventHandler,
loader,
workflowDonNotifier,
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,6 @@ func Test_EventHandlerStateSync(t *testing.T) {
}

testEventHandler := newTestEvtHandler()
loader := syncer.NewWorkflowRegistryContractLoader(lggr, wfRegistryAddr.Hex(), func(ctx context.Context, bytes []byte) (syncer.ContractReader, error) {
return backendTH.NewContractReader(ctx, t, bytes)
}, testEventHandler)

// Create the registry
registry := syncer.NewWorkflowRegistry(
Expand All @@ -140,7 +137,6 @@ func Test_EventHandlerStateSync(t *testing.T) {
QueryCount: 20,
},
testEventHandler,
loader,
&testDonNotifier{
don: capabilities.DON{
ID: donID,
Expand Down Expand Up @@ -255,9 +251,6 @@ func Test_InitialStateSync(t *testing.T) {
}

testEventHandler := newTestEvtHandler()
loader := syncer.NewWorkflowRegistryContractLoader(lggr, wfRegistryAddr.Hex(), func(ctx context.Context, bytes []byte) (syncer.ContractReader, error) {
return backendTH.NewContractReader(ctx, t, bytes)
}, testEventHandler)

// Create the worker
worker := syncer.NewWorkflowRegistry(
Expand All @@ -270,7 +263,6 @@ func Test_InitialStateSync(t *testing.T) {
QueryCount: 20,
},
testEventHandler,
loader,
&testDonNotifier{
don: capabilities.DON{
ID: donID,
Expand Down Expand Up @@ -346,8 +338,11 @@ func Test_SecretsWorker(t *testing.T) {
require.NoError(t, err)
require.Equal(t, contents, giveContents)

handler := syncer.NewEventHandler(lggr, orm, fetcherFn, nil, nil,
emitter, clockwork.NewFakeClock(), workflowkey.Key{})
handler := &testSecretsWorkEventHandler{
wrappedHandler: syncer.NewEventHandler(lggr, orm, fetcherFn, nil, nil,
emitter, clockwork.NewFakeClock(), workflowkey.Key{}),
registeredCh: make(chan syncer.Event, 1),
}

worker := syncer.NewWorkflowRegistry(
lggr,
Expand All @@ -357,7 +352,6 @@ func Test_SecretsWorker(t *testing.T) {
wfRegistryAddr.Hex(),
syncer.WorkflowEventPollerConfig{QueryCount: 20},
handler,
&testWorkflowRegistryContractLoader{},
&testDonNotifier{
don: capabilities.DON{
ID: donID,
Expand All @@ -374,6 +368,9 @@ func Test_SecretsWorker(t *testing.T) {

servicetest.Run(t, worker)

// wait for the workflow to be registered
<-handler.registeredCh

// generate a log event
requestForceUpdateSecrets(t, backendTH, wfRegistryC, giveSecretsURL)

Expand Down Expand Up @@ -434,7 +431,6 @@ func Test_RegistrySyncer_WorkflowRegistered_InitiallyPaused(t *testing.T) {
wfRegistryAddr.Hex(),
syncer.WorkflowEventPollerConfig{QueryCount: 20},
handler,
&testWorkflowRegistryContractLoader{},
&testDonNotifier{
don: capabilities.DON{
ID: donID,
Expand Down Expand Up @@ -543,7 +539,6 @@ func Test_RegistrySyncer_WorkflowRegistered_InitiallyActivated(t *testing.T) {
wfRegistryAddr.Hex(),
syncer.WorkflowEventPollerConfig{QueryCount: 20},
handler,
&testWorkflowRegistryContractLoader{},
&testDonNotifier{
don: capabilities.DON{
ID: donID,
Expand Down Expand Up @@ -708,3 +703,24 @@ func updateWorkflow(
th.Backend.Commit()
th.Backend.Commit()
}

type evtHandler interface {
Handle(ctx context.Context, event syncer.Event) error
}

type testSecretsWorkEventHandler struct {
wrappedHandler evtHandler
registeredCh chan syncer.Event
}

func (m *testSecretsWorkEventHandler) Handle(ctx context.Context, event syncer.Event) error {
switch {
case event.GetEventType() == syncer.ForceUpdateSecretsEvent:
return m.wrappedHandler.Handle(ctx, event)
case event.GetEventType() == syncer.WorkflowRegisteredEvent:
m.registeredCh <- event
return nil
default:
panic(fmt.Sprintf("unexpected event type: %v", event.GetEventType()))
}
}
142 changes: 34 additions & 108 deletions core/services/workflows/syncer/workflow_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,14 +130,11 @@ type workflowRegistry struct {

newContractReaderFn newContractReaderFn

eventPollerCfg WorkflowEventPollerConfig
eventTypes []WorkflowRegistryEventType
handler evtHandler
initialWorkflowsStateLoader initialWorkflowsStateLoader
eventPollerCfg WorkflowEventPollerConfig
eventTypes []WorkflowRegistryEventType
handler evtHandler

workflowDonNotifier donNotifier

reader ContractReader
}

// WithTicker allows external callers to provide a ticker to the workflowRegistry. This is useful
Expand All @@ -152,12 +149,6 @@ type evtHandler interface {
Handle(ctx context.Context, event Event) error
}

type initialWorkflowsStateLoader interface {
// LoadWorkflows loads all the workflows for the given donID from the contract. Returns the head of the chain as of the
// point in time at which the load occurred.
LoadWorkflows(ctx context.Context, don capabilities.DON) (*types.Head, error)
}

type donNotifier interface {
WaitForDon(ctx context.Context) (capabilities.DON, error)
}
Expand All @@ -172,7 +163,6 @@ func NewWorkflowRegistry(
addr string,
eventPollerConfig WorkflowEventPollerConfig,
handler evtHandler,
initialWorkflowsStateLoader initialWorkflowsStateLoader,
workflowDonNotifier donNotifier,
opts ...func(*workflowRegistry),
) *workflowRegistry {
Expand All @@ -184,16 +174,16 @@ func NewWorkflowRegistry(
WorkflowRegisteredEvent,
WorkflowUpdatedEvent,
}

wr := &workflowRegistry{
lggr: lggr,
newContractReaderFn: newContractReaderFn,
workflowRegistryAddress: addr,
eventPollerCfg: eventPollerConfig,
stopCh: make(services.StopChan),
eventTypes: ets,
handler: handler,
initialWorkflowsStateLoader: initialWorkflowsStateLoader,
workflowDonNotifier: workflowDonNotifier,
lggr: lggr,
newContractReaderFn: newContractReaderFn,
workflowRegistryAddress: addr,
eventPollerCfg: eventPollerConfig,
stopCh: make(services.StopChan),
eventTypes: ets,
handler: handler,
workflowDonNotifier: workflowDonNotifier,
}

for _, opt := range opts {
Expand All @@ -220,8 +210,14 @@ func (w *workflowRegistry) Start(_ context.Context) error {
return
}

reader, err := w.newWorkflowRegistryContractReader(ctx)
if err != nil {
w.lggr.Criticalf("contract reader unavailable : %s", err)
return
}

w.lggr.Debugw("Loading initial workflows for DON", "DON", don.ID)
loadWorkflowsHead, err := w.initialWorkflowsStateLoader.LoadWorkflows(ctx, don)
loadWorkflowsHead, err := w.loadWorkflows(ctx, don, reader)
if err != nil {
// TODO - this is a temporary fix to handle the case where the chainreader errors because the contract
// contains no workflows. To track: https://smartcontract-it.atlassian.net/browse/CAPPL-393
Expand All @@ -235,12 +231,6 @@ func (w *workflowRegistry) Start(_ context.Context) error {
}
}

reader, err := w.getContractReader(ctx)
if err != nil {
w.lggr.Criticalf("contract reader unavailable : %s", err)
return
}

w.readRegistryEvents(ctx, reader, loadWorkflowsHead.Height)
}()

Expand Down Expand Up @@ -359,36 +349,19 @@ func (w *workflowRegistry) getTicker() <-chan time.Time {
return w.ticker
}

// getContractReader initializes a contract reader if needed, otherwise returns the existing
// reader.
func (w *workflowRegistry) getContractReader(ctx context.Context) (ContractReader, error) {
c := types.BoundContract{
Name: WorkflowRegistryContractName,
Address: w.workflowRegistryAddress,
}

if w.reader == nil {
reader, err := getWorkflowRegistryEventReader(ctx, w.newContractReaderFn, c)
if err != nil {
return nil, err
}

w.reader = reader
}

return w.reader, nil
}

type sequenceWithEventType struct {
Sequence types.Sequence
EventType WorkflowRegistryEventType
}

func getWorkflowRegistryEventReader(
func (w *workflowRegistry) newWorkflowRegistryContractReader(
ctx context.Context,
newReaderFn newContractReaderFn,
bc types.BoundContract,
) (ContractReader, error) {
bc := types.BoundContract{
Name: WorkflowRegistryContractName,
Address: w.workflowRegistryAddress,
}

contractReaderCfg := evmtypes.ChainReaderConfig{
Contracts: map[string]evmtypes.ChainContractReader{
WorkflowRegistryContractName: {
Expand All @@ -404,6 +377,9 @@ func getWorkflowRegistryEventReader(
},
ContractABI: workflow_registry_wrapper.WorkflowRegistryABI,
Configs: map[string]*evmtypes.ChainReaderDefinition{
GetWorkflowMetadataListByDONMethodName: {
ChainSpecificName: GetWorkflowMetadataListByDONMethodName,
},
string(ForceUpdateSecretsEvent): {
ChainSpecificName: string(ForceUpdateSecretsEvent),
ReadType: evmtypes.Event,
Expand Down Expand Up @@ -438,7 +414,7 @@ func getWorkflowRegistryEventReader(
return nil, err
}

reader, err := newReaderFn(ctx, marshalledCfg)
reader, err := w.newContractReaderFn(ctx, marshalledCfg)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -468,59 +444,9 @@ func (r workflowAsEvent) GetData() any {
return r.Data
}

type workflowRegistryContractLoader struct {
lggr logger.Logger
workflowRegistryAddress string
newContractReaderFn newContractReaderFn
handler evtHandler
}

func NewWorkflowRegistryContractLoader(
lggr logger.Logger,
workflowRegistryAddress string,
newContractReaderFn newContractReaderFn,
handler evtHandler,
) *workflowRegistryContractLoader {
return &workflowRegistryContractLoader{
lggr: lggr.Named("WorkflowRegistryContractLoader"),
workflowRegistryAddress: workflowRegistryAddress,
newContractReaderFn: newContractReaderFn,
handler: handler,
}
}

func (l *workflowRegistryContractLoader) LoadWorkflows(ctx context.Context, don capabilities.DON) (*types.Head, error) {
// Build the ContractReader config
contractReaderCfg := evmtypes.ChainReaderConfig{
Contracts: map[string]evmtypes.ChainContractReader{
WorkflowRegistryContractName: {
ContractABI: workflow_registry_wrapper.WorkflowRegistryABI,
Configs: map[string]*evmtypes.ChainReaderDefinition{
GetWorkflowMetadataListByDONMethodName: {
ChainSpecificName: GetWorkflowMetadataListByDONMethodName,
},
},
},
},
}

contractReaderCfgBytes, err := json.Marshal(contractReaderCfg)
if err != nil {
return nil, fmt.Errorf("failed to marshal contract reader config: %w", err)
}

contractReader, err := l.newContractReaderFn(ctx, contractReaderCfgBytes)
if err != nil {
return nil, fmt.Errorf("failed to create contract reader: %w", err)
}

err = contractReader.Bind(ctx, []types.BoundContract{{Name: WorkflowRegistryContractName, Address: l.workflowRegistryAddress}})
if err != nil {
return nil, fmt.Errorf("failed to bind contract reader: %w", err)
}

func (w *workflowRegistry) loadWorkflows(ctx context.Context, don capabilities.DON, contractReader ContractReader) (*types.Head, error) {
contractBinding := types.BoundContract{
Address: l.workflowRegistryAddress,
Address: w.workflowRegistryAddress,
Name: WorkflowRegistryContractName,
}

Expand All @@ -540,7 +466,7 @@ func (l *workflowRegistryContractLoader) LoadWorkflows(ctx context.Context, don
return nil, fmt.Errorf("failed to get lastest value with head data %w", err)
}

l.lggr.Debugw("Rehydrating existing workflows", "len", len(workflows.WorkflowMetadataList))
w.lggr.Debugw("Rehydrating existing workflows", "len", len(workflows.WorkflowMetadataList))
for _, workflow := range workflows.WorkflowMetadataList {
toRegisteredEvent := WorkflowRegistryWorkflowRegisteredV1{
WorkflowID: workflow.WorkflowID,
Expand All @@ -552,11 +478,11 @@ func (l *workflowRegistryContractLoader) LoadWorkflows(ctx context.Context, don
ConfigURL: workflow.ConfigURL,
SecretsURL: workflow.SecretsURL,
}
if err = l.handler.Handle(ctx, workflowAsEvent{
if err = w.handler.Handle(ctx, workflowAsEvent{
Data: toRegisteredEvent,
EventType: WorkflowRegisteredEvent,
}); err != nil {
l.lggr.Errorf("failed to handle workflow registration: %s", err)
w.lggr.Errorf("failed to handle workflow registration: %s", err)
}
}

Expand Down
Loading