diff --git a/core/capabilities/don_notifier.go b/core/capabilities/don_notifier.go new file mode 100644 index 00000000000..4edb38d3661 --- /dev/null +++ b/core/capabilities/don_notifier.go @@ -0,0 +1,43 @@ +package capabilities + +import ( + "context" + "sync" + + "github.com/smartcontractkit/chainlink-common/pkg/capabilities" +) + +type DonNotifier struct { + mu sync.Mutex + don capabilities.DON + notified bool + ch chan struct{} +} + +func NewDonNotifier() *DonNotifier { + return &DonNotifier{ + ch: make(chan struct{}), + } +} + +func (n *DonNotifier) NotifyDonSet(don capabilities.DON) { + n.mu.Lock() + defer n.mu.Unlock() + if !n.notified { + n.don = don + n.notified = true + close(n.ch) + } +} + +func (n *DonNotifier) WaitForDon(ctx context.Context) (capabilities.DON, error) { + select { + case <-ctx.Done(): + return capabilities.DON{}, ctx.Err() + case <-n.ch: + } + <-n.ch + n.mu.Lock() + defer n.mu.Unlock() + return n.don, nil +} diff --git a/core/capabilities/don_notifier_test.go b/core/capabilities/don_notifier_test.go new file mode 100644 index 00000000000..f37931259ba --- /dev/null +++ b/core/capabilities/don_notifier_test.go @@ -0,0 +1,49 @@ +package capabilities_test + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/require" + + commoncap "github.com/smartcontractkit/chainlink-common/pkg/capabilities" + + "github.com/stretchr/testify/assert" + + "github.com/smartcontractkit/chainlink/v2/core/capabilities" +) + +func TestDonNotifier_WaitForDon(t *testing.T) { + notifier := capabilities.NewDonNotifier() + don := commoncap.DON{ + ID: 1, + } + + go func() { + time.Sleep(100 * time.Millisecond) + notifier.NotifyDonSet(don) + }() + + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + + result, err := notifier.WaitForDon(ctx) + require.NoError(t, err) + assert.Equal(t, don, result) + + result, err = notifier.WaitForDon(ctx) + require.NoError(t, err) + assert.Equal(t, don, result) +} + +func TestDonNotifier_WaitForDon_ContextTimeout(t *testing.T) { + notifier := capabilities.NewDonNotifier() + + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Millisecond) + defer cancel() + + _, err := notifier.WaitForDon(ctx) + require.Error(t, err) + assert.Equal(t, context.DeadlineExceeded, err) +} diff --git a/core/capabilities/launcher.go b/core/capabilities/launcher.go index be06dcf60c1..97aea5d3c8c 100644 --- a/core/capabilities/launcher.go +++ b/core/capabilities/launcher.go @@ -43,11 +43,12 @@ var defaultStreamConfig = p2ptypes.StreamConfig{ type launcher struct { services.StateMachine - lggr logger.Logger - peerWrapper p2ptypes.PeerWrapper - dispatcher remotetypes.Dispatcher - registry *Registry - subServices []services.Service + lggr logger.Logger + peerWrapper p2ptypes.PeerWrapper + dispatcher remotetypes.Dispatcher + registry *Registry + subServices []services.Service + workflowDonNotifier donNotifier } func unmarshalCapabilityConfig(data []byte) (capabilities.CapabilityConfiguration, error) { @@ -86,18 +87,24 @@ func unmarshalCapabilityConfig(data []byte) (capabilities.CapabilityConfiguratio }, nil } +type donNotifier interface { + NotifyDonSet(don capabilities.DON) +} + func NewLauncher( lggr logger.Logger, peerWrapper p2ptypes.PeerWrapper, dispatcher remotetypes.Dispatcher, registry *Registry, + workflowDonNotifier donNotifier, ) *launcher { return &launcher{ - lggr: lggr.Named("CapabilitiesLauncher"), - peerWrapper: peerWrapper, - dispatcher: dispatcher, - registry: registry, - subServices: []services.Service{}, + lggr: lggr.Named("CapabilitiesLauncher"), + peerWrapper: peerWrapper, + dispatcher: dispatcher, + registry: registry, + subServices: []services.Service{}, + workflowDonNotifier: workflowDonNotifier, } } @@ -215,6 +222,8 @@ func (w *launcher) Launch(ctx context.Context, state *registrysyncer.LocalRegist return errors.New("invariant violation: node is part of more than one workflowDON") } + w.workflowDonNotifier.NotifyDonSet(myDON.DON) + for _, rcd := range remoteCapabilityDONs { err := w.addRemoteCapabilities(ctx, myDON, rcd, state) if err != nil { diff --git a/core/capabilities/launcher_test.go b/core/capabilities/launcher_test.go index 013463bfdbb..c130f9833d9 100644 --- a/core/capabilities/launcher_test.go +++ b/core/capabilities/launcher_test.go @@ -33,6 +33,12 @@ import ( var _ capabilities.TriggerCapability = (*mockTrigger)(nil) +type mockDonNotifier struct { +} + +func (m *mockDonNotifier) NotifyDonSet(don capabilities.DON) { +} + type mockTrigger struct { capabilities.CapabilityInfo } @@ -196,6 +202,7 @@ func TestLauncher(t *testing.T) { wrapper, dispatcher, registry, + &mockDonNotifier{}, ) dispatcher.On("SetReceiver", fullTriggerCapID, dID, mock.AnythingOfType("*remote.triggerPublisher")).Return(nil) @@ -305,6 +312,7 @@ func TestLauncher(t *testing.T) { wrapper, dispatcher, registry, + &mockDonNotifier{}, ) err = launcher.Launch(ctx, state) @@ -409,6 +417,7 @@ func TestLauncher(t *testing.T) { wrapper, dispatcher, registry, + &mockDonNotifier{}, ) err = launcher.Launch(ctx, state) @@ -600,6 +609,7 @@ func TestLauncher_RemoteTriggerModeAggregatorShim(t *testing.T) { wrapper, dispatcher, registry, + &mockDonNotifier{}, ) dispatcher.On("SetReceiver", fullTriggerCapID, capDonID, mock.AnythingOfType("*remote.triggerSubscriber")).Return(nil) @@ -752,6 +762,7 @@ func TestSyncer_IgnoresCapabilitiesForPrivateDON(t *testing.T) { wrapper, dispatcher, registry, + &mockDonNotifier{}, ) // If the DON were public, this would fail with two errors: @@ -917,6 +928,7 @@ func TestLauncher_WiresUpClientsForPublicWorkflowDON(t *testing.T) { wrapper, dispatcher, registry, + &mockDonNotifier{}, ) dispatcher.On("SetReceiver", fullTriggerCapID, capDonID, mock.AnythingOfType("*remote.triggerSubscriber")).Return(nil) @@ -1082,6 +1094,7 @@ func TestLauncher_WiresUpClientsForPublicWorkflowDONButIgnoresPrivateCapabilitie wrapper, dispatcher, registry, + &mockDonNotifier{}, ) dispatcher.On("SetReceiver", fullTriggerCapID, triggerCapDonID, mock.AnythingOfType("*remote.triggerSubscriber")).Return(nil) @@ -1232,6 +1245,7 @@ func TestLauncher_SucceedsEvenIfDispatcherAlreadyHasReceiver(t *testing.T) { wrapper, dispatcher, registry, + &mockDonNotifier{}, ) err = launcher.Launch(ctx, state) diff --git a/core/services/chainlink/application.go b/core/services/chainlink/application.go index 01f5d8b530a..68b9b99a823 100644 --- a/core/services/chainlink/application.go +++ b/core/services/chainlink/application.go @@ -20,6 +20,7 @@ import ( "go.uber.org/multierr" "go.uber.org/zap/zapcore" + "github.com/smartcontractkit/chainlink-common/pkg/custmsg" "github.com/smartcontractkit/chainlink-common/pkg/loop" commonservices "github.com/smartcontractkit/chainlink-common/pkg/services" "github.com/smartcontractkit/chainlink-common/pkg/sqlutil" @@ -33,6 +34,7 @@ import ( gatewayconnector "github.com/smartcontractkit/chainlink/v2/core/capabilities/gateway_connector" "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote" remotetypes "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types" + "github.com/smartcontractkit/chainlink/v2/core/capabilities/webapi" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/txmgr" evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types" @@ -48,6 +50,8 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services/feeds" "github.com/smartcontractkit/chainlink/v2/core/services/fluxmonitorv2" "github.com/smartcontractkit/chainlink/v2/core/services/gateway" + capabilities2 "github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/capabilities" + common2 "github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/common" "github.com/smartcontractkit/chainlink/v2/core/services/headreporter" "github.com/smartcontractkit/chainlink/v2/core/services/job" "github.com/smartcontractkit/chainlink/v2/core/services/keeper" @@ -71,6 +75,7 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services/webhook" "github.com/smartcontractkit/chainlink/v2/core/services/workflows" workflowstore "github.com/smartcontractkit/chainlink/v2/core/services/workflows/store" + "github.com/smartcontractkit/chainlink/v2/core/services/workflows/syncer" "github.com/smartcontractkit/chainlink/v2/core/sessions" "github.com/smartcontractkit/chainlink/v2/core/sessions/ldapauth" "github.com/smartcontractkit/chainlink/v2/core/sessions/localauth" @@ -212,6 +217,17 @@ func NewApplication(opts ApplicationOpts) (Application, error) { opts.CapabilitiesRegistry = capabilities.NewRegistry(globalLogger) } + var gatewayConnectorWrapper *gatewayconnector.ServiceWrapper + if cfg.Capabilities().GatewayConnector().DonID() != "" { + globalLogger.Debugw("Creating GatewayConnector wrapper", "donID", cfg.Capabilities().GatewayConnector().DonID()) + gatewayConnectorWrapper = gatewayconnector.NewGatewayConnectorServiceWrapper( + cfg.Capabilities().GatewayConnector(), + keyStore.Eth(), + clockwork.NewRealClock(), + globalLogger) + srvcs = append(srvcs, gatewayConnectorWrapper) + } + var externalPeerWrapper p2ptypes.PeerWrapper if cfg.Capabilities().Peering().Enabled() { var dispatcher remotetypes.Dispatcher @@ -256,32 +272,79 @@ func NewApplication(opts ApplicationOpts) (Application, error) { return nil, fmt.Errorf("could not configure syncer: %w", err) } + workflowDonNotifier := capabilities.NewDonNotifier() + wfLauncher := capabilities.NewLauncher( globalLogger, externalPeerWrapper, dispatcher, opts.CapabilitiesRegistry, + workflowDonNotifier, ) registrySyncer.AddLauncher(wfLauncher) srvcs = append(srvcs, wfLauncher, registrySyncer) + + if cfg.Capabilities().WorkflowRegistry().Address() != "" { + if gatewayConnectorWrapper == nil { + return nil, errors.New("unable to create workflow registry syncer without gateway connector") + } + + err = keyStore.Workflow().EnsureKey(context.Background()) + if err != nil { + return nil, fmt.Errorf("failed to ensure workflow key: %w", err) + } + + keys, err := keyStore.Workflow().GetAll() + if err != nil { + return nil, fmt.Errorf("failed to get all workflow keys: %w", err) + } + if len(keys) != 1 { + return nil, fmt.Errorf("expected 1 key, got %d", len(keys)) + } + + connector := gatewayConnectorWrapper.GetGatewayConnector() + webAPILggr := globalLogger.Named("WebAPITarget") + + webAPIConfig := webapi.ServiceConfig{ + RateLimiter: common2.RateLimiterConfig{ + GlobalRPS: 100.0, + GlobalBurst: 100, + PerSenderRPS: 100.0, + PerSenderBurst: 100, + }, + } + + outgoingConnectorHandler, err := webapi.NewOutgoingConnectorHandler(connector, + webAPIConfig, + capabilities2.MethodWebAPITarget, webAPILggr) + if err != nil { + return nil, fmt.Errorf("could not create outgoing connector handler: %w", err) + } + + eventHandler := syncer.NewEventHandler(globalLogger, syncer.NewWorkflowRegistryDS(opts.DS, globalLogger), + syncer.NewFetcherFunc(globalLogger, outgoingConnectorHandler), workflowstore.NewDBStore(opts.DS, globalLogger, clockwork.NewRealClock()), opts.CapabilitiesRegistry, + custmsg.NewLabeler(), clockwork.NewRealClock(), keys[0]) + + loader := syncer.NewWorkflowRegistryContractLoader(cfg.Capabilities().WorkflowRegistry().Address(), func(ctx context.Context, bytes []byte) (syncer.ContractReader, error) { + return relayer.NewContractReader(ctx, bytes) + }, eventHandler) + + wfSyncer := syncer.NewWorkflowRegistry(globalLogger, func(ctx context.Context, bytes []byte) (syncer.ContractReader, error) { + return relayer.NewContractReader(ctx, bytes) + }, cfg.Capabilities().WorkflowRegistry().Address(), + syncer.WorkflowEventPollerConfig{ + QueryCount: 100, + }, eventHandler, loader, workflowDonNotifier) + + srvcs = append(srvcs, wfSyncer) + } } } else { globalLogger.Debug("External registry not configured, skipping registry syncer and starting with an empty registry") opts.CapabilitiesRegistry.SetLocalRegistry(&capabilities.TestMetadataRegistry{}) } - var gatewayConnectorWrapper *gatewayconnector.ServiceWrapper - if cfg.Capabilities().GatewayConnector().DonID() != "" { - globalLogger.Debugw("Creating GatewayConnector wrapper", "donID", cfg.Capabilities().GatewayConnector().DonID()) - gatewayConnectorWrapper = gatewayconnector.NewGatewayConnectorServiceWrapper( - cfg.Capabilities().GatewayConnector(), - keyStore.Eth(), - clockwork.NewRealClock(), - globalLogger) - srvcs = append(srvcs, gatewayConnectorWrapper) - } - // LOOPs can be created as options, in the case of LOOP relayers, or // as OCR2 job implementations, in the case of Median today. // We will have a non-nil registry here in LOOP relayers are being used, otherwise diff --git a/core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go b/core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go index 7471c7169ea..cf2fb59a93b 100644 --- a/core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go +++ b/core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go @@ -14,6 +14,7 @@ import ( "github.com/jonboulle/clockwork" "github.com/stretchr/testify/assert" + "github.com/smartcontractkit/chainlink-common/pkg/capabilities" "github.com/smartcontractkit/chainlink-common/pkg/custmsg" "github.com/smartcontractkit/chainlink-common/pkg/services/servicetest" "github.com/smartcontractkit/chainlink-common/pkg/types" @@ -48,7 +49,16 @@ func newTestEvtHandler() *testEvtHandler { type testWorkflowRegistryContractLoader struct { } -func (m *testWorkflowRegistryContractLoader) LoadWorkflows(ctx context.Context) (*types.Head, error) { +type testDonNotifier struct { + don capabilities.DON + err error +} + +func (t *testDonNotifier) WaitForDon(ctx context.Context) (capabilities.DON, error) { + return t.don, t.err +} + +func (m *testWorkflowRegistryContractLoader) LoadWorkflows(ctx context.Context, don capabilities.DON) (*types.Head, error) { return &types.Head{ Height: "0", Hash: nil, @@ -57,7 +67,6 @@ func (m *testWorkflowRegistryContractLoader) LoadWorkflows(ctx context.Context) } func Test_InitialStateSync(t *testing.T) { - ctx := coretestutils.Context(t) lggr := logger.TestLogger(t) backendTH := testutils.NewEVMBackendTH(t) donID := uint32(1) @@ -67,29 +76,6 @@ func Test_InitialStateSync(t *testing.T) { backendTH.Backend.Commit() require.NoError(t, err) - // Build the ContractReader config - contractReaderCfg := evmtypes.ChainReaderConfig{ - Contracts: map[string]evmtypes.ChainContractReader{ - syncer.WorkflowRegistryContractName: { - ContractABI: workflow_registry_wrapper.WorkflowRegistryABI, - Configs: map[string]*evmtypes.ChainReaderDefinition{ - syncer.GetWorkflowMetadataListByDONMethodName: { - ChainSpecificName: syncer.GetWorkflowMetadataListByDONMethodName, - }, - }, - }, - }, - } - - contractReaderCfgBytes, err := json.Marshal(contractReaderCfg) - require.NoError(t, err) - - contractReader, err := backendTH.NewContractReader(ctx, t, contractReaderCfgBytes) - require.NoError(t, err) - - err = contractReader.Bind(ctx, []types.BoundContract{{Name: syncer.WorkflowRegistryContractName, Address: wfRegistryAddr.Hex()}}) - require.NoError(t, err) - // setup contract state to allow the secrets to be updated updateAllowedDONs(t, backendTH, wfRegistryC, []uint32{donID}, true) updateAuthorizedAddress(t, backendTH, wfRegistryC, []common.Address{backendTH.ContractsOwner.From}, true) @@ -112,24 +98,37 @@ func Test_InitialStateSync(t *testing.T) { } testEventHandler := newTestEvtHandler() - loader := syncer.NewWorkflowRegistryContractLoader(wfRegistryAddr.Hex(), donID, contractReader, testEventHandler) + loader := syncer.NewWorkflowRegistryContractLoader(wfRegistryAddr.Hex(), func(ctx context.Context, bytes []byte) (syncer.ContractReader, error) { + return backendTH.NewContractReader(ctx, t, bytes) + }, testEventHandler) // Create the worker worker := syncer.NewWorkflowRegistry( lggr, - contractReader, + func(ctx context.Context, bytes []byte) (syncer.ContractReader, error) { + return backendTH.NewContractReader(ctx, t, bytes) + }, wfRegistryAddr.Hex(), syncer.WorkflowEventPollerConfig{ QueryCount: 20, }, testEventHandler, loader, + &testDonNotifier{ + don: capabilities.DON{ + ID: donID, + }, + err: nil, + }, syncer.WithTicker(make(chan time.Time)), ) servicetest.Run(t, worker) - assert.Len(t, testEventHandler.events, numberWorkflows) + require.Eventually(t, func() bool { + return len(testEventHandler.events) == numberWorkflows + }, 5*time.Second, time.Second) + for _, event := range testEventHandler.events { assert.Equal(t, syncer.WorkflowRegisteredEvent, event.GetEventType()) } @@ -227,10 +226,17 @@ func Test_SecretsWorker(t *testing.T) { handler := syncer.NewEventHandler(lggr, orm, fetcherFn, nil, nil, emitter, clockwork.NewFakeClock(), workflowkey.Key{}) - worker := syncer.NewWorkflowRegistry(lggr, contractReader, wfRegistryAddr.Hex(), + worker := syncer.NewWorkflowRegistry(lggr, func(ctx context.Context, bytes []byte) (syncer.ContractReader, error) { + return contractReader, nil + }, wfRegistryAddr.Hex(), syncer.WorkflowEventPollerConfig{ QueryCount: 20, - }, handler, &testWorkflowRegistryContractLoader{}, syncer.WithTicker(giveTicker.C)) + }, handler, &testWorkflowRegistryContractLoader{}, &testDonNotifier{ + don: capabilities.DON{ + ID: donID, + }, + err: nil, + }, syncer.WithTicker(giveTicker.C)) // setup contract state to allow the secrets to be updated updateAllowedDONs(t, backendTH, wfRegistryC, []uint32{donID}, true) diff --git a/core/services/workflows/syncer/fetcher.go b/core/services/workflows/syncer/fetcher.go index ed815a240ba..bebdfb0519e 100644 --- a/core/services/workflows/syncer/fetcher.go +++ b/core/services/workflows/syncer/fetcher.go @@ -13,7 +13,6 @@ import ( ) func NewFetcherFunc( - ctx context.Context, lggr logger.Logger, och *webapi.OutgoingConnectorHandler) FetcherFunc { return func(ctx context.Context, url string) ([]byte, error) { diff --git a/core/services/workflows/syncer/fetcher_test.go b/core/services/workflows/syncer/fetcher_test.go index 846a9186b5a..4ed228c6a51 100644 --- a/core/services/workflows/syncer/fetcher_test.go +++ b/core/services/workflows/syncer/fetcher_test.go @@ -46,7 +46,7 @@ func TestNewFetcherFunc(t *testing.T) { connector.EXPECT().DonID().Return("don-id") connector.EXPECT().GatewayIDs().Return([]string{"gateway1", "gateway2"}) - fetcher := NewFetcherFunc(ctx, lggr, och) + fetcher := NewFetcherFunc(lggr, och) payload, err := fetcher(ctx, url) require.NoError(t, err) diff --git a/core/services/workflows/syncer/workflow_registry.go b/core/services/workflows/syncer/workflow_registry.go index 4f3bb76bd14..6642679b228 100644 --- a/core/services/workflows/syncer/workflow_registry.go +++ b/core/services/workflows/syncer/workflow_registry.go @@ -8,6 +8,7 @@ import ( "sync" "time" + "github.com/smartcontractkit/chainlink-common/pkg/capabilities" "github.com/smartcontractkit/chainlink-common/pkg/services" types "github.com/smartcontractkit/chainlink-common/pkg/types" query "github.com/smartcontractkit/chainlink-common/pkg/types/query" @@ -111,12 +112,8 @@ type workflowRegistry struct { lggr logger.Logger workflowRegistryAddress string - reader ContractReader - // initReader allows the workflowRegistry to initialize a contract reader if one is not provided - // and separates the contract reader initialization from the workflowRegistry start up. - initReader func(context.Context, logger.Logger, ContractReaderFactory, types.BoundContract) (types.ContractReader, error) - relayer ContractReaderFactory + newContractReaderFn newContractReaderFn eventPollerCfg WorkflowEventPollerConfig eventTypes []WorkflowRegistryEventType @@ -132,6 +129,10 @@ type workflowRegistry struct { // heap is a min heap that merges batches of events from the contract query goroutines. The // default min heap is sorted by block height. heap Heap + + workflowDonNotifier donNotifier + + reader ContractReader } // WithTicker allows external callers to provide a ticker to the workflowRegistry. This is useful @@ -142,12 +143,6 @@ func WithTicker(ticker <-chan time.Time) func(*workflowRegistry) { } } -func WithReader(reader types.ContractReader) func(*workflowRegistry) { - return func(wr *workflowRegistry) { - wr.reader = reader - } -} - type evtHandler interface { Handle(ctx context.Context, event Event) error } @@ -155,27 +150,33 @@ type evtHandler interface { type initialWorkflowsStateLoader interface { // LoadWorkflows loads all the workflows for the given donID from the contract. Returns the head of the chain as of the // point in time at which the load occurred. - LoadWorkflows(ctx context.Context) (*types.Head, error) + LoadWorkflows(ctx context.Context, don capabilities.DON) (*types.Head, error) } +type donNotifier interface { + WaitForDon(ctx context.Context) (capabilities.DON, error) +} + +type newContractReaderFn func(context.Context, []byte) (ContractReader, error) + // NewWorkflowRegistry returns a new workflowRegistry. // Only queries for WorkflowRegistryForceUpdateSecretsRequestedV1 events. func NewWorkflowRegistry( lggr logger.Logger, - reader ContractReader, + newContractReaderFn newContractReaderFn, addr string, eventPollerConfig WorkflowEventPollerConfig, handler evtHandler, initialWorkflowsStateLoader initialWorkflowsStateLoader, + workflowDonNotifier donNotifier, opts ...func(*workflowRegistry), ) *workflowRegistry { ets := []WorkflowRegistryEventType{ForceUpdateSecretsEvent} wr := &workflowRegistry{ lggr: lggr.Named(name), + newContractReaderFn: newContractReaderFn, workflowRegistryAddress: addr, - reader: reader, eventPollerCfg: eventPollerConfig, - initReader: newReader, heap: newBlockHeightHeap(), stopCh: make(services.StopChan), eventTypes: ets, @@ -183,6 +184,7 @@ func NewWorkflowRegistry( batchCh: make(chan []WorkflowRegistryEventResponse, len(ets)), handler: handler, initialWorkflowsStateLoader: initialWorkflowsStateLoader, + workflowDonNotifier: workflowDonNotifier, } for _, opt := range opts { @@ -193,13 +195,8 @@ func NewWorkflowRegistry( // Start starts the workflowRegistry. It starts two goroutines, one for querying the contract // and one for handling the events. -func (w *workflowRegistry) Start(ctx context.Context) error { +func (w *workflowRegistry) Start(_ context.Context) error { return w.StartOnce(w.Name(), func() error { - loadWorkflowsHead, err := w.initialWorkflowsStateLoader.LoadWorkflows(ctx) - if err != nil { - return fmt.Errorf("failed to load workflows: %w", err) - } - ctx, cancel := w.stopCh.NewCtx() w.wg.Add(1) @@ -207,6 +204,18 @@ func (w *workflowRegistry) Start(ctx context.Context) error { defer w.wg.Done() defer cancel() + don, err := w.workflowDonNotifier.WaitForDon(ctx) + if err != nil { + w.lggr.Errorf("failed to wait for don: %v", err) + return + } + + loadWorkflowsHead, err := w.initialWorkflowsStateLoader.LoadWorkflows(ctx, don) + if err != nil { + w.lggr.Errorf("failed to load workflows: %v", err) + return + } + w.syncEventsLoop(ctx, loadWorkflowsHead.Height) }() @@ -394,7 +403,7 @@ func (w *workflowRegistry) getContractReader(ctx context.Context) (ContractReade } if w.reader == nil { - reader, err := w.initReader(ctx, w.lggr, w.relayer, c) + reader, err := getWorkflowRegistryEventReader(ctx, w.newContractReaderFn, c) if err != nil { return nil, err } @@ -490,12 +499,11 @@ func queryEvent( } } -func newReader( +func getWorkflowRegistryEventReader( ctx context.Context, - lggr logger.Logger, - factory ContractReaderFactory, + newReaderFn newContractReaderFn, bc types.BoundContract, -) (types.ContractReader, error) { +) (ContractReader, error) { contractReaderCfg := evmtypes.ChainReaderConfig{ Contracts: map[string]evmtypes.ChainContractReader{ WorkflowRegistryContractName: { @@ -518,7 +526,7 @@ func newReader( return nil, err } - reader, err := factory.NewContractReader(ctx, marshalledCfg) + reader, err := newReaderFn(ctx, marshalledCfg) if err != nil { return nil, err } @@ -546,26 +554,52 @@ func (r workflowAsEvent) GetData() any { type workflowRegistryContractLoader struct { workflowRegistryAddress string - donID uint32 - reader ContractReader + newContractReaderFn newContractReaderFn handler evtHandler } func NewWorkflowRegistryContractLoader( workflowRegistryAddress string, - donID uint32, - reader ContractReader, + newContractReaderFn newContractReaderFn, handler evtHandler, ) *workflowRegistryContractLoader { return &workflowRegistryContractLoader{ workflowRegistryAddress: workflowRegistryAddress, - donID: donID, - reader: reader, + newContractReaderFn: newContractReaderFn, handler: handler, } } -func (l *workflowRegistryContractLoader) LoadWorkflows(ctx context.Context) (*types.Head, error) { +func (l *workflowRegistryContractLoader) LoadWorkflows(ctx context.Context, don capabilities.DON) (*types.Head, error) { + // Build the ContractReader config + contractReaderCfg := evmtypes.ChainReaderConfig{ + Contracts: map[string]evmtypes.ChainContractReader{ + WorkflowRegistryContractName: { + ContractABI: workflow_registry_wrapper.WorkflowRegistryABI, + Configs: map[string]*evmtypes.ChainReaderDefinition{ + GetWorkflowMetadataListByDONMethodName: { + ChainSpecificName: GetWorkflowMetadataListByDONMethodName, + }, + }, + }, + }, + } + + contractReaderCfgBytes, err := json.Marshal(contractReaderCfg) + if err != nil { + return nil, fmt.Errorf("failed to marshal contract reader config: %w", err) + } + + contractReader, err := l.newContractReaderFn(ctx, contractReaderCfgBytes) + if err != nil { + return nil, fmt.Errorf("failed to create contract reader: %w", err) + } + + err = contractReader.Bind(ctx, []types.BoundContract{{Name: WorkflowRegistryContractName, Address: l.workflowRegistryAddress}}) + if err != nil { + return nil, fmt.Errorf("failed to bind contract reader: %w", err) + } + contractBinding := types.BoundContract{ Address: l.workflowRegistryAddress, Name: WorkflowRegistryContractName, @@ -573,7 +607,7 @@ func (l *workflowRegistryContractLoader) LoadWorkflows(ctx context.Context) (*ty readIdentifier := contractBinding.ReadIdentifier(GetWorkflowMetadataListByDONMethodName) params := GetWorkflowMetadataListByDONParams{ - DonID: l.donID, + DonID: don.ID, Start: 0, Limit: 0, // 0 tells the contract to return max pagination limit workflows on each call } @@ -582,7 +616,7 @@ func (l *workflowRegistryContractLoader) LoadWorkflows(ctx context.Context) (*ty for { var err error var workflows GetWorkflowMetadataListByDONReturnVal - headAtLastRead, err = l.reader.GetLatestValueWithHeadData(ctx, readIdentifier, primitives.Finalized, params, &workflows) + headAtLastRead, err = contractReader.GetLatestValueWithHeadData(ctx, readIdentifier, primitives.Finalized, params, &workflows) if err != nil { return nil, fmt.Errorf("failed to get workflow metadata for don %w", err) } diff --git a/core/services/workflows/syncer/workflow_registry_test.go b/core/services/workflows/syncer/workflow_registry_test.go index 17a71d73030..0cccb405710 100644 --- a/core/services/workflows/syncer/workflow_registry_test.go +++ b/core/services/workflows/syncer/workflow_registry_test.go @@ -10,6 +10,7 @@ import ( "github.com/jonboulle/clockwork" + "github.com/smartcontractkit/chainlink-common/pkg/capabilities" "github.com/smartcontractkit/chainlink-common/pkg/custmsg" "github.com/smartcontractkit/chainlink-common/pkg/services/servicetest" types "github.com/smartcontractkit/chainlink-common/pkg/types" @@ -26,6 +27,15 @@ import ( "github.com/stretchr/testify/require" ) +type testDonNotifier struct { + don capabilities.DON + err error +} + +func (t *testDonNotifier) WaitForDon(ctx context.Context) (capabilities.DON, error) { + return t.don, t.err +} + func Test_Workflow_Registry_Syncer(t *testing.T) { var ( giveContents = "contents" @@ -62,12 +72,23 @@ func Test_Workflow_Registry_Syncer(t *testing.T) { handler = NewEventHandler(lggr, orm, gateway, nil, nil, emitter, clockwork.NewFakeClock(), workflowkey.Key{}) - loader = NewWorkflowRegistryContractLoader(contractAddress, 1, reader, handler) + loader = NewWorkflowRegistryContractLoader(contractAddress, func(ctx context.Context, bytes []byte) (ContractReader, error) { + return reader, nil + }, handler) - worker = NewWorkflowRegistry(lggr, reader, contractAddress, + worker = NewWorkflowRegistry(lggr, func(ctx context.Context, bytes []byte) (ContractReader, error) { + return reader, nil + }, contractAddress, WorkflowEventPollerConfig{ QueryCount: 20, - }, handler, loader, WithTicker(ticker)) + }, handler, loader, + &testDonNotifier{ + don: capabilities.DON{ + ID: 1, + }, + err: nil, + }, + WithTicker(ticker)) ) // Cleanup the worker @@ -100,6 +121,7 @@ func Test_Workflow_Registry_Syncer(t *testing.T) { reader.EXPECT().GetLatestValueWithHeadData(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&types.Head{ Height: "0", }, nil) + reader.EXPECT().Bind(mock.Anything, mock.Anything).Return(nil) // Go run the worker servicetest.Run(t, worker) diff --git a/tools/bin/go_core_tests b/tools/bin/go_core_tests index 88ee82c9261..76c15fccd07 100755 --- a/tools/bin/go_core_tests +++ b/tools/bin/go_core_tests @@ -4,7 +4,7 @@ set +e SCRIPT_PATH=`dirname "$0"`; SCRIPT_PATH=`eval "cd \"$SCRIPT_PATH\" && pwd"` OUTPUT_FILE=${OUTPUT_FILE:-"./output.txt"} -EXTRA_FLAGS="" +EXTRA_FLAGS="-timeout 20m" echo "Test execution results: ---------------------" echo ""