From e5dc01e844064e2fdab976369cf83195f9659216 Mon Sep 17 00:00:00 2001 From: Cedric Date: Tue, 25 Jun 2024 11:59:52 +0100 Subject: [PATCH] Add standalone registrysyncer package (#13668) * Add standalone registrysyncer pkg - And workflow handler implementation for keystone-specific logic * Linting * Add test for LocalNode * Add debug logging * Rename handler -> launcher * Remove read lock in Start * More comments --- .changeset/young-parrots-peel.md | 5 + core/capabilities/launcher.go | 525 ++++++++++ core/capabilities/launcher_test.go | 693 +++++++++++++ core/capabilities/reader.go | 157 --- core/capabilities/syncer.go | 913 ------------------ core/capabilities/syncer_test.go | 613 ------------ core/services/chainlink/application.go | 24 +- core/services/registrysyncer/syncer.go | 250 +++++ .../registrysyncer/syncer_test.go} | 37 +- 9 files changed, 1489 insertions(+), 1728 deletions(-) create mode 100644 .changeset/young-parrots-peel.md create mode 100644 core/capabilities/launcher.go create mode 100644 core/capabilities/launcher_test.go delete mode 100644 core/capabilities/reader.go delete mode 100644 core/capabilities/syncer.go delete mode 100644 core/capabilities/syncer_test.go create mode 100644 core/services/registrysyncer/syncer.go rename core/{capabilities/reader_test.go => services/registrysyncer/syncer_test.go} (90%) diff --git a/.changeset/young-parrots-peel.md b/.changeset/young-parrots-peel.md new file mode 100644 index 00000000000..a7b696d14ff --- /dev/null +++ b/.changeset/young-parrots-peel.md @@ -0,0 +1,5 @@ +--- +"chainlink": minor +--- + +Add registry syncer package #internal diff --git a/core/capabilities/launcher.go b/core/capabilities/launcher.go new file mode 100644 index 00000000000..e4f7f480f3a --- /dev/null +++ b/core/capabilities/launcher.go @@ -0,0 +1,525 @@ +package capabilities + +import ( + "context" + "errors" + "fmt" + "strings" + "time" + + "google.golang.org/protobuf/proto" + + "github.com/smartcontractkit/chainlink-common/pkg/capabilities" + "github.com/smartcontractkit/chainlink-common/pkg/capabilities/triggers" + "github.com/smartcontractkit/chainlink-common/pkg/services" + "github.com/smartcontractkit/chainlink-common/pkg/types/core" + + "github.com/smartcontractkit/libocr/ragep2p" + ragetypes "github.com/smartcontractkit/libocr/ragep2p/types" + + "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote" + "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/target" + remotetypes "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types" + "github.com/smartcontractkit/chainlink/v2/core/capabilities/streams" + kcr "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/keystone/generated/capabilities_registry" + "github.com/smartcontractkit/chainlink/v2/core/logger" + p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types" + "github.com/smartcontractkit/chainlink/v2/core/services/registrysyncer" +) + +var defaultStreamConfig = p2ptypes.StreamConfig{ + IncomingMessageBufferSize: 1000000, + OutgoingMessageBufferSize: 1000000, + MaxMessageLenBytes: 100000, + MessageRateLimiter: ragep2p.TokenBucketParams{ + Rate: 100.0, + Capacity: 1000, + }, + BytesRateLimiter: ragep2p.TokenBucketParams{ + Rate: 100000.0, + Capacity: 1000000, + }, +} + +type launcher struct { + services.StateMachine + lggr logger.Logger + peerWrapper p2ptypes.PeerWrapper + dispatcher remotetypes.Dispatcher + registry core.CapabilitiesRegistry + localNode capabilities.Node + subServices []services.Service +} + +func NewLauncher( + lggr logger.Logger, + peerWrapper p2ptypes.PeerWrapper, + dispatcher remotetypes.Dispatcher, + registry core.CapabilitiesRegistry, +) *launcher { + return &launcher{ + lggr: lggr, + peerWrapper: peerWrapper, + dispatcher: dispatcher, + registry: registry, + subServices: []services.Service{}, + } +} + +func (w *launcher) Start(ctx context.Context) error { + return nil +} + +func (w *launcher) Close() error { + for _, s := range w.subServices { + if err := s.Close(); err != nil { + w.lggr.Errorw("failed to close a sub-service", "name", s.Name(), "error", err) + } + } + + return w.peerWrapper.GetPeer().UpdateConnections(map[ragetypes.PeerID]p2ptypes.StreamConfig{}) +} + +func (w *launcher) Ready() error { + return nil +} + +func (w *launcher) HealthReport() map[string]error { + return nil +} + +func (w *launcher) Name() string { + return "CapabilitiesLauncher" +} + +func (w *launcher) LocalNode(ctx context.Context) (capabilities.Node, error) { + if w.peerWrapper.GetPeer() == nil { + return w.localNode, errors.New("unable to get local node: peerWrapper hasn't started yet") + } + + if w.localNode.WorkflowDON.ID == "" { + return w.localNode, errors.New("unable to get local node: waiting for initial call from syncer") + } + + return w.localNode, nil +} + +func (w *launcher) updateLocalNode(state registrysyncer.State) { + pid := w.peerWrapper.GetPeer().ID() + + var workflowDON capabilities.DON + capabilityDONs := []capabilities.DON{} + for _, d := range state.IDsToDONs { + for _, p := range d.NodeP2PIds { + if p == pid { + if d.AcceptsWorkflows { + if workflowDON.ID == "" { + workflowDON = *toDONInfo(d) + w.lggr.Debug("Workflow DON identified: %+v", workflowDON) + } else { + w.lggr.Errorf("Configuration error: node %s belongs to more than one workflowDON", pid) + } + } + + capabilityDONs = append(capabilityDONs, *toDONInfo(d)) + } + } + } + + w.localNode = capabilities.Node{ + PeerID: &pid, + WorkflowDON: workflowDON, + CapabilityDONs: capabilityDONs, + } +} + +func (w *launcher) Launch(ctx context.Context, state registrysyncer.State) error { + w.updateLocalNode(state) + + // Let's start by updating the list of Peers + // We do this by creating a new entry for each node belonging + // to a public DON. + // We also add the hardcoded peers determined by the NetworkSetup. + allPeers := make(map[ragetypes.PeerID]p2ptypes.StreamConfig) + + publicDONs := []kcr.CapabilitiesRegistryDONInfo{} + for _, d := range state.IDsToDONs { + if !d.IsPublic { + continue + } + + publicDONs = append(publicDONs, d) + + for _, nid := range d.NodeP2PIds { + allPeers[nid] = defaultStreamConfig + } + } + + // TODO: be a bit smarter about who we connect to; we should ideally only + // be connecting to peers when we need to. + // https://smartcontract-it.atlassian.net/browse/KS-330 + err := w.peerWrapper.GetPeer().UpdateConnections(allPeers) + if err != nil { + return fmt.Errorf("failed to update peer connections: %w", err) + } + + // Next, we need to split the DONs into the following: + // - workflow DONs the current node is a part of. + // These will need remote shims to all remote capabilities on other DONs. + // + // We'll also construct a set to record what DONs the current node is a part of, + // regardless of any modifiers (public/acceptsWorkflows etc). + myID := w.peerWrapper.GetPeer().ID() + myWorkflowDONs := []kcr.CapabilitiesRegistryDONInfo{} + remoteWorkflowDONs := []kcr.CapabilitiesRegistryDONInfo{} + myDONs := map[uint32]bool{} + for _, d := range state.IDsToDONs { + for _, peerID := range d.NodeP2PIds { + if peerID == myID { + myDONs[d.Id] = true + } + } + + if d.AcceptsWorkflows { + if myDONs[d.Id] { + myWorkflowDONs = append(myWorkflowDONs, d) + } else { + remoteWorkflowDONs = append(remoteWorkflowDONs, d) + } + } + } + + // - remote capability DONs (with IsPublic = true) the current node is a part of. + // These need server-side shims. + myCapabilityDONs := []kcr.CapabilitiesRegistryDONInfo{} + remoteCapabilityDONs := []kcr.CapabilitiesRegistryDONInfo{} + for _, d := range publicDONs { + if len(d.CapabilityConfigurations) > 0 { + if myDONs[d.Id] { + myCapabilityDONs = append(myCapabilityDONs, d) + } else { + remoteCapabilityDONs = append(remoteCapabilityDONs, d) + } + } + } + + // Now, if my node is a workflow DON, let's setup any shims + // to external capabilities. + if len(myWorkflowDONs) > 0 { + myDON := myWorkflowDONs[0] + + // NOTE: this is enforced on-chain and so should never happen. + if len(myWorkflowDONs) > 1 { + w.lggr.Error("invariant violation: node is part of more than one workflowDON: this shouldn't happen.") + } + + for _, rcd := range remoteCapabilityDONs { + err := w.addRemoteCapabilities(ctx, myDON, rcd, state) + if err != nil { + return err + } + } + } + + // Finally, if I'm a capability DON, let's enable external access + // to the capability. + if len(myCapabilityDONs) > 0 { + for _, mcd := range myCapabilityDONs { + err := w.exposeCapabilities(ctx, myID, mcd, state, remoteWorkflowDONs) + if err != nil { + return err + } + } + } + + return nil +} + +func (w *launcher) addRemoteCapabilities(ctx context.Context, myDON kcr.CapabilitiesRegistryDONInfo, remoteDON kcr.CapabilitiesRegistryDONInfo, state registrysyncer.State) error { + for _, c := range remoteDON.CapabilityConfigurations { + capability, ok := state.IDsToCapabilities[c.CapabilityId] + if !ok { + return fmt.Errorf("could not find capability matching id %s", c.CapabilityId) + } + + switch toCapabilityType(capability.CapabilityType) { + case capabilities.CapabilityTypeTrigger: + newTriggerFn := func(info capabilities.CapabilityInfo) (capabilityService, error) { + if !strings.HasPrefix(info.ID, "streams-trigger") { + return nil, errors.New("not supported: trigger capability does not have id = streams-trigger") + } + + codec := streams.NewCodec(w.lggr) + + signers, err := signersFor(remoteDON, state) + if err != nil { + return nil, err + } + + aggregator := triggers.NewMercuryRemoteAggregator( + codec, + signers, + int(remoteDON.F+1), + w.lggr, + ) + cfg := &remotetypes.RemoteTriggerConfig{} + cfg.ApplyDefaults() + err = proto.Unmarshal(c.Config, cfg) + if err != nil { + return nil, err + } + // TODO: We need to implement a custom, Mercury-specific + // aggregator here, because there is no guarantee that + // all trigger events in the workflow will have the same + // payloads. As a workaround, we validate the signatures. + // When this is solved, we can move to a generic aggregator + // and remove this. + triggerCap := remote.NewTriggerSubscriber( + cfg, + info, + *toDONInfo(remoteDON), + *toDONInfo(myDON), + w.dispatcher, + aggregator, + w.lggr, + ) + return triggerCap, nil + } + err := w.addToRegistryAndSetDispatcher(ctx, capability, remoteDON, newTriggerFn) + if err != nil { + return fmt.Errorf("failed to add trigger shim: %w", err) + } + case capabilities.CapabilityTypeAction: + w.lggr.Warn("no remote client configured for capability type action, skipping configuration") + case capabilities.CapabilityTypeConsensus: + w.lggr.Warn("no remote client configured for capability type consensus, skipping configuration") + case capabilities.CapabilityTypeTarget: + newTargetFn := func(info capabilities.CapabilityInfo) (capabilityService, error) { + client := target.NewClient( + info, + *toDONInfo(myDON), + w.dispatcher, + defaultTargetRequestTimeout, + w.lggr, + ) + return client, nil + } + + err := w.addToRegistryAndSetDispatcher(ctx, capability, remoteDON, newTargetFn) + if err != nil { + return fmt.Errorf("failed to add target shim: %w", err) + } + default: + w.lggr.Warnf("unknown capability type, skipping configuration: %+v", capability) + } + } + return nil +} + +type capabilityService interface { + capabilities.BaseCapability + remotetypes.Receiver + services.Service +} + +func (w *launcher) addToRegistryAndSetDispatcher(ctx context.Context, capabilityInfo kcr.CapabilitiesRegistryCapabilityInfo, don kcr.CapabilitiesRegistryDONInfo, newCapFn func(info capabilities.CapabilityInfo) (capabilityService, error)) error { + fullCapID := fmt.Sprintf("%s@%s", capabilityInfo.LabelledName, capabilityInfo.Version) + info, err := capabilities.NewRemoteCapabilityInfo( + fullCapID, + toCapabilityType(capabilityInfo.CapabilityType), + fmt.Sprintf("Remote Capability for %s", fullCapID), + toDONInfo(don), + ) + if err != nil { + return fmt.Errorf("failed to create remote capability info: %w", err) + } + w.lggr.Debugw("Adding remote capability to registry", "id", info.ID, "don", info.DON) + capability, err := newCapFn(info) + if err != nil { + return fmt.Errorf("failed to instantiate capability: %w", err) + } + + err = w.registry.Add(ctx, capability) + if err != nil { + // If the capability already exists, then it's either local + // or we've handled this in a previous syncer iteration, + // let's skip and move on to other capabilities. + if errors.Is(err, ErrCapabilityAlreadyExists) { + return nil + } + + return fmt.Errorf("failed to add capability to registry: %w", err) + } + + err = w.dispatcher.SetReceiver( + fullCapID, + fmt.Sprint(don.Id), + capability, + ) + if err != nil { + return err + } + w.lggr.Debugw("Setting receiver for capability", "id", fullCapID, "donID", don.Id) + err = capability.Start(ctx) + if err != nil { + return fmt.Errorf("failed to start capability: %w", err) + } + w.subServices = append(w.subServices, capability) + return nil +} + +var ( + defaultTargetRequestTimeout = time.Minute +) + +func (w *launcher) exposeCapabilities(ctx context.Context, myPeerID p2ptypes.PeerID, don kcr.CapabilitiesRegistryDONInfo, state registrysyncer.State, remoteWorkflowDONs []kcr.CapabilitiesRegistryDONInfo) error { + idsToDONs := map[string]capabilities.DON{} + for _, d := range remoteWorkflowDONs { + idsToDONs[fmt.Sprint(d.Id)] = *toDONInfo(d) + } + + for _, c := range don.CapabilityConfigurations { + capability, ok := state.IDsToCapabilities[c.CapabilityId] + if !ok { + return fmt.Errorf("could not find capability matching id %s", c.CapabilityId) + } + + switch toCapabilityType(capability.CapabilityType) { + case capabilities.CapabilityTypeTrigger: + newTriggerPublisher := func(capability capabilities.BaseCapability, info capabilities.CapabilityInfo) (receiverService, error) { + cfg := &remotetypes.RemoteTriggerConfig{} + cfg.ApplyDefaults() + err := proto.Unmarshal(c.Config, cfg) + if err != nil { + return nil, err + } + publisher := remote.NewTriggerPublisher( + cfg, + capability.(capabilities.TriggerCapability), + info, + *toDONInfo(don), + idsToDONs, + w.dispatcher, + w.lggr, + ) + return publisher, nil + } + + err := w.addReceiver(ctx, capability, don, newTriggerPublisher) + if err != nil { + return fmt.Errorf("failed to add server-side receiver: %w", err) + } + case capabilities.CapabilityTypeAction: + w.lggr.Warn("no remote client configured for capability type action, skipping configuration") + case capabilities.CapabilityTypeConsensus: + w.lggr.Warn("no remote client configured for capability type consensus, skipping configuration") + case capabilities.CapabilityTypeTarget: + newTargetServer := func(capability capabilities.BaseCapability, info capabilities.CapabilityInfo) (receiverService, error) { + return target.NewServer( + myPeerID, + capability.(capabilities.TargetCapability), + info, + *toDONInfo(don), + idsToDONs, + w.dispatcher, + defaultTargetRequestTimeout, + w.lggr, + ), nil + } + + err := w.addReceiver(ctx, capability, don, newTargetServer) + if err != nil { + return fmt.Errorf("failed to add server-side receiver: %w", err) + } + default: + w.lggr.Warnf("unknown capability type, skipping configuration: %+v", capability) + } + } + return nil +} + +type receiverService interface { + services.Service + remotetypes.Receiver +} + +func (w *launcher) addReceiver(ctx context.Context, capability kcr.CapabilitiesRegistryCapabilityInfo, don kcr.CapabilitiesRegistryDONInfo, newReceiverFn func(capability capabilities.BaseCapability, info capabilities.CapabilityInfo) (receiverService, error)) error { + fullCapID := fmt.Sprintf("%s@%s", capability.LabelledName, capability.Version) + info, err := capabilities.NewRemoteCapabilityInfo( + fullCapID, + toCapabilityType(capability.CapabilityType), + fmt.Sprintf("Remote Capability for %s", fullCapID), + toDONInfo(don), + ) + if err != nil { + return fmt.Errorf("failed to instantiate remote capability for receiver: %w", err) + } + underlying, err := w.registry.Get(ctx, fullCapID) + if err != nil { + return fmt.Errorf("failed to get capability from registry: %w", err) + } + + receiver, err := newReceiverFn(underlying, info) + if err != nil { + return fmt.Errorf("failed to instantiate receiver: %w", err) + } + + w.lggr.Debugw("Enabling external access for capability", "id", fullCapID, "donID", don.Id) + err = w.dispatcher.SetReceiver(fullCapID, fmt.Sprint(don.Id), receiver) + if err != nil { + return fmt.Errorf("failed to set receiver: %w", err) + } + + err = receiver.Start(ctx) + if err != nil { + return fmt.Errorf("failed to start receiver: %w", err) + } + + w.subServices = append(w.subServices, receiver) + return nil +} + +func signersFor(don kcr.CapabilitiesRegistryDONInfo, state registrysyncer.State) ([][]byte, error) { + s := [][]byte{} + for _, nodeID := range don.NodeP2PIds { + node, ok := state.IDsToNodes[nodeID] + if !ok { + return nil, fmt.Errorf("could not find node for id %s", nodeID) + } + + // NOTE: the capability registry stores signers as [32]byte, + // but we only need the first [20], as the rest is padded. + s = append(s, node.Signer[0:20]) + } + + return s, nil +} + +func toDONInfo(don kcr.CapabilitiesRegistryDONInfo) *capabilities.DON { + peerIDs := []p2ptypes.PeerID{} + for _, p := range don.NodeP2PIds { + peerIDs = append(peerIDs, p) + } + + return &capabilities.DON{ + ID: fmt.Sprint(don.Id), + Members: peerIDs, + F: don.F, + } +} + +func toCapabilityType(capabilityType uint8) capabilities.CapabilityType { + switch capabilityType { + case 0: + return capabilities.CapabilityTypeTrigger + case 1: + return capabilities.CapabilityTypeAction + case 2: + return capabilities.CapabilityTypeConsensus + case 3: + return capabilities.CapabilityTypeTarget + default: + // Not found + return capabilities.CapabilityType(-1) + } +} diff --git a/core/capabilities/launcher_test.go b/core/capabilities/launcher_test.go new file mode 100644 index 00000000000..c29e5ebf38c --- /dev/null +++ b/core/capabilities/launcher_test.go @@ -0,0 +1,693 @@ +package capabilities + +import ( + "context" + "crypto/rand" + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + + ragetypes "github.com/smartcontractkit/libocr/ragep2p/types" + + "github.com/smartcontractkit/chainlink-common/pkg/capabilities" + "github.com/smartcontractkit/chainlink-common/pkg/utils/tests" + remoteMocks "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types/mocks" + kcr "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/keystone/generated/capabilities_registry" + "github.com/smartcontractkit/chainlink/v2/core/logger" + p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types" + "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types/mocks" + "github.com/smartcontractkit/chainlink/v2/core/services/registrysyncer" +) + +type mockTrigger struct { + capabilities.CapabilityInfo +} + +func (m *mockTrigger) RegisterTrigger(ctx context.Context, request capabilities.CapabilityRequest) (<-chan capabilities.CapabilityResponse, error) { + return nil, nil +} + +func (m *mockTrigger) UnregisterTrigger(ctx context.Context, request capabilities.CapabilityRequest) error { + return nil +} + +func newMockTrigger(info capabilities.CapabilityInfo) *mockTrigger { + return &mockTrigger{CapabilityInfo: info} +} + +type mockCapability struct { + capabilities.CapabilityInfo +} + +func (m *mockCapability) Execute(ctx context.Context, req capabilities.CapabilityRequest) (<-chan capabilities.CapabilityResponse, error) { + return nil, nil +} + +func (m *mockCapability) RegisterToWorkflow(ctx context.Context, request capabilities.RegisterToWorkflowRequest) error { + return nil +} + +func (m *mockCapability) UnregisterFromWorkflow(ctx context.Context, request capabilities.UnregisterFromWorkflowRequest) error { + return nil +} + +func randomWord() [32]byte { + word := make([]byte, 32) + _, err := rand.Read(word) + if err != nil { + panic(err) + } + return [32]byte(word) +} + +func TestLauncher_WiresUpExternalCapabilities(t *testing.T) { + ctx := tests.Context(t) + lggr := logger.TestLogger(t) + registry := NewRegistry(lggr) + dispatcher := remoteMocks.NewDispatcher(t) + + var pid ragetypes.PeerID + err := pid.UnmarshalText([]byte("12D3KooWBCF1XT5Wi8FzfgNCqRL76Swv8TRU3TiD4QiJm8NMNX7N")) + require.NoError(t, err) + peer := mocks.NewPeer(t) + peer.On("UpdateConnections", mock.Anything).Return(nil) + peer.On("ID").Return(pid) + wrapper := mocks.NewPeerWrapper(t) + wrapper.On("GetPeer").Return(peer) + + nodes := [][32]byte{ + pid, + randomWord(), + randomWord(), + randomWord(), + } + + fullTriggerCapID := "streams-trigger@1.0.0" + mt := newMockTrigger(capabilities.MustNewCapabilityInfo( + fullTriggerCapID, + capabilities.CapabilityTypeTrigger, + "streams trigger", + )) + require.NoError(t, registry.Add(ctx, mt)) + + fullTargetID := "write-chain_evm_1@1.0.0" + mtarg := &mockCapability{ + CapabilityInfo: capabilities.MustNewCapabilityInfo( + fullTargetID, + capabilities.CapabilityTypeTarget, + "write chain", + ), + } + require.NoError(t, registry.Add(ctx, mtarg)) + + triggerCapID := randomWord() + targetCapID := randomWord() + dID := uint32(1) + // The below state describes a Workflow DON (AcceptsWorkflows = true), + // which exposes the streams-trigger and write_chain capabilities. + // We expect a publisher to be wired up with this configuration, and + // no entries should be added to the registry. + state := registrysyncer.State{ + IDsToDONs: map[registrysyncer.DonID]kcr.CapabilitiesRegistryDONInfo{ + registrysyncer.DonID(dID): { + Id: dID, + ConfigCount: uint32(0), + F: uint8(1), + IsPublic: true, + AcceptsWorkflows: true, + NodeP2PIds: nodes, + CapabilityConfigurations: []kcr.CapabilitiesRegistryCapabilityConfiguration{ + { + CapabilityId: triggerCapID, + Config: []byte(""), + }, + { + CapabilityId: targetCapID, + Config: []byte(""), + }, + }, + }, + }, + IDsToCapabilities: map[registrysyncer.HashedCapabilityID]kcr.CapabilitiesRegistryCapabilityInfo{ + triggerCapID: { + LabelledName: "streams-trigger", + Version: "1.0.0", + CapabilityType: 0, + }, + targetCapID: { + LabelledName: "write-chain_evm_1", + Version: "1.0.0", + CapabilityType: 3, + }, + }, + IDsToNodes: map[p2ptypes.PeerID]kcr.CapabilitiesRegistryNodeInfo{ + nodes[0]: { + NodeOperatorId: 1, + Signer: randomWord(), + P2pId: nodes[0], + HashedCapabilityIds: [][32]byte{triggerCapID, targetCapID}, + }, + nodes[1]: { + NodeOperatorId: 1, + Signer: randomWord(), + P2pId: nodes[1], + HashedCapabilityIds: [][32]byte{triggerCapID, targetCapID}, + }, + nodes[2]: { + NodeOperatorId: 1, + Signer: randomWord(), + P2pId: nodes[2], + HashedCapabilityIds: [][32]byte{triggerCapID, targetCapID}, + }, + nodes[3]: { + NodeOperatorId: 1, + Signer: randomWord(), + P2pId: nodes[3], + HashedCapabilityIds: [][32]byte{triggerCapID, targetCapID}, + }, + }, + } + + launcher := NewLauncher( + lggr, + wrapper, + dispatcher, + registry, + ) + + dispatcher.On("SetReceiver", fullTriggerCapID, fmt.Sprint(dID), mock.AnythingOfType("*remote.triggerPublisher")).Return(nil) + dispatcher.On("SetReceiver", fullTargetID, fmt.Sprint(dID), mock.AnythingOfType("*target.server")).Return(nil) + + err = launcher.Launch(ctx, state) + require.NoError(t, err) + defer launcher.Close() +} + +func TestSyncer_IgnoresCapabilitiesForPrivateDON(t *testing.T) { + ctx := tests.Context(t) + lggr := logger.TestLogger(t) + registry := NewRegistry(lggr) + dispatcher := remoteMocks.NewDispatcher(t) + + var pid ragetypes.PeerID + err := pid.UnmarshalText([]byte("12D3KooWBCF1XT5Wi8FzfgNCqRL76Swv8TRU3TiD4QiJm8NMNX7N")) + require.NoError(t, err) + peer := mocks.NewPeer(t) + peer.On("UpdateConnections", mock.Anything).Return(nil) + peer.On("ID").Return(pid) + wrapper := mocks.NewPeerWrapper(t) + wrapper.On("GetPeer").Return(peer) + + nodes := [][32]byte{ + pid, + randomWord(), + randomWord(), + randomWord(), + } + + triggerCapID := randomWord() + targetCapID := randomWord() + dID := uint32(1) + // The below state describes a Workflow DON (AcceptsWorkflows = true), + // which isn't public (IsPublic = false), but hosts the + // the streams-trigger and write_chain capabilities. + // We expect no action to be taken by the syncer. + state := registrysyncer.State{ + IDsToDONs: map[registrysyncer.DonID]kcr.CapabilitiesRegistryDONInfo{ + registrysyncer.DonID(dID): { + Id: dID, + ConfigCount: uint32(0), + F: uint8(1), + IsPublic: false, + AcceptsWorkflows: true, + NodeP2PIds: nodes, + CapabilityConfigurations: []kcr.CapabilitiesRegistryCapabilityConfiguration{ + { + CapabilityId: triggerCapID, + Config: []byte(""), + }, + { + CapabilityId: targetCapID, + Config: []byte(""), + }, + }, + }, + }, + IDsToCapabilities: map[registrysyncer.HashedCapabilityID]kcr.CapabilitiesRegistryCapabilityInfo{ + triggerCapID: { + LabelledName: "streams-trigger", + Version: "1.0.0", + CapabilityType: 0, + }, + targetCapID: { + LabelledName: "write-chain_evm_1", + Version: "1.0.0", + CapabilityType: 3, + }, + }, + IDsToNodes: map[p2ptypes.PeerID]kcr.CapabilitiesRegistryNodeInfo{ + nodes[0]: { + NodeOperatorId: 1, + Signer: randomWord(), + P2pId: nodes[0], + HashedCapabilityIds: [][32]byte{triggerCapID, targetCapID}, + }, + nodes[1]: { + NodeOperatorId: 1, + Signer: randomWord(), + P2pId: nodes[1], + HashedCapabilityIds: [][32]byte{triggerCapID, targetCapID}, + }, + nodes[2]: { + NodeOperatorId: 1, + Signer: randomWord(), + P2pId: nodes[2], + HashedCapabilityIds: [][32]byte{triggerCapID, targetCapID}, + }, + nodes[3]: { + NodeOperatorId: 1, + Signer: randomWord(), + P2pId: nodes[3], + HashedCapabilityIds: [][32]byte{triggerCapID, targetCapID}, + }, + }, + } + + launcher := NewLauncher( + lggr, + wrapper, + dispatcher, + registry, + ) + + // If the DON were public, this would fail with two errors: + // - error fetching the capabilities from the registry since they haven't been added + // - erroneous calls to dispatcher.SetReceiver, since the call hasn't been registered. + err = launcher.Launch(ctx, state) + require.NoError(t, err) + defer launcher.Close() + + // Finally, assert that no services were added. + assert.Len(t, launcher.subServices, 0) +} + +func TestLauncher_WiresUpClientsForPublicWorkflowDON(t *testing.T) { + ctx := tests.Context(t) + lggr := logger.TestLogger(t) + registry := NewRegistry(lggr) + dispatcher := remoteMocks.NewDispatcher(t) + + var pid ragetypes.PeerID + err := pid.UnmarshalText([]byte("12D3KooWBCF1XT5Wi8FzfgNCqRL76Swv8TRU3TiD4QiJm8NMNX7N")) + require.NoError(t, err) + peer := mocks.NewPeer(t) + peer.On("UpdateConnections", mock.Anything).Return(nil) + peer.On("ID").Return(pid) + wrapper := mocks.NewPeerWrapper(t) + wrapper.On("GetPeer").Return(peer) + + workflowDonNodes := [][32]byte{ + pid, + randomWord(), + randomWord(), + randomWord(), + } + + capabilityDonNodes := [][32]byte{ + randomWord(), + randomWord(), + randomWord(), + randomWord(), + } + + fullTriggerCapID := "streams-trigger@1.0.0" + fullTargetID := "write-chain_evm_1@1.0.0" + triggerCapID := randomWord() + targetCapID := randomWord() + dID := uint32(1) + capDonID := uint32(2) + // The below state describes a Workflow DON (AcceptsWorkflows = true), + // which exposes the streams-trigger and write_chain capabilities. + // We expect receivers to be wired up and both capabilities to be added to the registry. + state := registrysyncer.State{ + IDsToDONs: map[registrysyncer.DonID]kcr.CapabilitiesRegistryDONInfo{ + registrysyncer.DonID(dID): { + Id: dID, + ConfigCount: uint32(0), + F: uint8(1), + IsPublic: true, + AcceptsWorkflows: true, + NodeP2PIds: workflowDonNodes, + }, + registrysyncer.DonID(capDonID): { + Id: capDonID, + ConfigCount: uint32(0), + F: uint8(1), + IsPublic: true, + AcceptsWorkflows: false, + NodeP2PIds: capabilityDonNodes, + CapabilityConfigurations: []kcr.CapabilitiesRegistryCapabilityConfiguration{ + { + CapabilityId: triggerCapID, + Config: []byte(""), + }, + { + CapabilityId: targetCapID, + Config: []byte(""), + }, + }, + }, + }, + IDsToCapabilities: map[registrysyncer.HashedCapabilityID]kcr.CapabilitiesRegistryCapabilityInfo{ + triggerCapID: { + LabelledName: "streams-trigger", + Version: "1.0.0", + CapabilityType: 0, + }, + targetCapID: { + LabelledName: "write-chain_evm_1", + Version: "1.0.0", + CapabilityType: 3, + }, + }, + IDsToNodes: map[p2ptypes.PeerID]kcr.CapabilitiesRegistryNodeInfo{ + capabilityDonNodes[0]: { + NodeOperatorId: 1, + Signer: randomWord(), + P2pId: capabilityDonNodes[0], + HashedCapabilityIds: [][32]byte{triggerCapID, targetCapID}, + }, + capabilityDonNodes[1]: { + NodeOperatorId: 1, + Signer: randomWord(), + P2pId: capabilityDonNodes[1], + HashedCapabilityIds: [][32]byte{triggerCapID, targetCapID}, + }, + capabilityDonNodes[2]: { + NodeOperatorId: 1, + Signer: randomWord(), + P2pId: capabilityDonNodes[2], + HashedCapabilityIds: [][32]byte{triggerCapID, targetCapID}, + }, + capabilityDonNodes[3]: { + NodeOperatorId: 1, + Signer: randomWord(), + P2pId: capabilityDonNodes[3], + HashedCapabilityIds: [][32]byte{triggerCapID, targetCapID}, + }, + workflowDonNodes[0]: { + NodeOperatorId: 1, + Signer: randomWord(), + P2pId: workflowDonNodes[0], + }, + workflowDonNodes[1]: { + NodeOperatorId: 1, + Signer: randomWord(), + P2pId: workflowDonNodes[1], + }, + workflowDonNodes[2]: { + NodeOperatorId: 1, + Signer: randomWord(), + P2pId: workflowDonNodes[2], + }, + workflowDonNodes[3]: { + NodeOperatorId: 1, + Signer: randomWord(), + P2pId: workflowDonNodes[3], + }, + }, + } + + launcher := NewLauncher( + lggr, + wrapper, + dispatcher, + registry, + ) + + dispatcher.On("SetReceiver", fullTriggerCapID, fmt.Sprint(capDonID), mock.AnythingOfType("*remote.triggerSubscriber")).Return(nil) + dispatcher.On("SetReceiver", fullTargetID, fmt.Sprint(capDonID), mock.AnythingOfType("*target.client")).Return(nil) + + err = launcher.Launch(ctx, state) + require.NoError(t, err) + defer launcher.Close() + + _, err = registry.Get(ctx, fullTriggerCapID) + require.NoError(t, err) + + _, err = registry.Get(ctx, fullTargetID) + require.NoError(t, err) +} + +func TestLauncher_WiresUpClientsForPublicWorkflowDONButIgnoresPrivateCapabilities(t *testing.T) { + ctx := tests.Context(t) + lggr := logger.TestLogger(t) + registry := NewRegistry(lggr) + dispatcher := remoteMocks.NewDispatcher(t) + + var pid ragetypes.PeerID + err := pid.UnmarshalText([]byte("12D3KooWBCF1XT5Wi8FzfgNCqRL76Swv8TRU3TiD4QiJm8NMNX7N")) + require.NoError(t, err) + peer := mocks.NewPeer(t) + peer.On("UpdateConnections", mock.Anything).Return(nil) + peer.On("ID").Return(pid) + wrapper := mocks.NewPeerWrapper(t) + wrapper.On("GetPeer").Return(peer) + + workflowDonNodes := [][32]byte{ + pid, + randomWord(), + randomWord(), + randomWord(), + } + + capabilityDonNodes := [][32]byte{ + randomWord(), + randomWord(), + randomWord(), + randomWord(), + } + + fullTriggerCapID := "streams-trigger@1.0.0" + triggerCapID := randomWord() + targetCapID := randomWord() + dID := uint32(1) + triggerCapDonID := uint32(2) + targetCapDonID := uint32(3) + // The below state describes a Workflow DON (AcceptsWorkflows = true), + // which exposes the streams-trigger and write_chain capabilities. + // We expect receivers to be wired up and both capabilities to be added to the registry. + state := registrysyncer.State{ + IDsToDONs: map[registrysyncer.DonID]kcr.CapabilitiesRegistryDONInfo{ + registrysyncer.DonID(dID): { + Id: dID, + ConfigCount: uint32(0), + F: uint8(1), + IsPublic: true, + AcceptsWorkflows: true, + NodeP2PIds: workflowDonNodes, + }, + registrysyncer.DonID(triggerCapDonID): { + Id: triggerCapDonID, + ConfigCount: uint32(0), + F: uint8(1), + IsPublic: true, + AcceptsWorkflows: false, + NodeP2PIds: capabilityDonNodes, + CapabilityConfigurations: []kcr.CapabilitiesRegistryCapabilityConfiguration{ + { + CapabilityId: triggerCapID, + Config: []byte(""), + }, + }, + }, + registrysyncer.DonID(targetCapDonID): { + Id: targetCapDonID, + ConfigCount: uint32(0), + F: uint8(1), + IsPublic: false, + AcceptsWorkflows: false, + NodeP2PIds: capabilityDonNodes, + CapabilityConfigurations: []kcr.CapabilitiesRegistryCapabilityConfiguration{ + { + CapabilityId: targetCapID, + Config: []byte(""), + }, + }, + }, + }, + IDsToCapabilities: map[registrysyncer.HashedCapabilityID]kcr.CapabilitiesRegistryCapabilityInfo{ + triggerCapID: { + LabelledName: "streams-trigger", + Version: "1.0.0", + CapabilityType: 0, + }, + targetCapID: { + LabelledName: "write-chain_evm_1", + Version: "1.0.0", + CapabilityType: 3, + }, + }, + IDsToNodes: map[p2ptypes.PeerID]kcr.CapabilitiesRegistryNodeInfo{ + capabilityDonNodes[0]: { + NodeOperatorId: 1, + Signer: randomWord(), + P2pId: capabilityDonNodes[0], + HashedCapabilityIds: [][32]byte{triggerCapID, targetCapID}, + }, + capabilityDonNodes[1]: { + NodeOperatorId: 1, + Signer: randomWord(), + P2pId: capabilityDonNodes[1], + HashedCapabilityIds: [][32]byte{triggerCapID, targetCapID}, + }, + capabilityDonNodes[2]: { + NodeOperatorId: 1, + Signer: randomWord(), + P2pId: capabilityDonNodes[2], + HashedCapabilityIds: [][32]byte{triggerCapID, targetCapID}, + }, + capabilityDonNodes[3]: { + NodeOperatorId: 1, + Signer: randomWord(), + P2pId: capabilityDonNodes[3], + HashedCapabilityIds: [][32]byte{triggerCapID, targetCapID}, + }, + workflowDonNodes[0]: { + NodeOperatorId: 1, + Signer: randomWord(), + P2pId: workflowDonNodes[0], + }, + workflowDonNodes[1]: { + NodeOperatorId: 1, + Signer: randomWord(), + P2pId: workflowDonNodes[1], + }, + workflowDonNodes[2]: { + NodeOperatorId: 1, + Signer: randomWord(), + P2pId: workflowDonNodes[2], + }, + workflowDonNodes[3]: { + NodeOperatorId: 1, + Signer: randomWord(), + P2pId: workflowDonNodes[3], + }, + }, + } + + launcher := NewLauncher( + lggr, + wrapper, + dispatcher, + registry, + ) + + dispatcher.On("SetReceiver", fullTriggerCapID, fmt.Sprint(triggerCapDonID), mock.AnythingOfType("*remote.triggerSubscriber")).Return(nil) + + err = launcher.Launch(ctx, state) + require.NoError(t, err) + defer launcher.Close() + + _, err = registry.Get(ctx, fullTriggerCapID) + require.NoError(t, err) +} + +func toPeerIDs(is [][32]byte) (out []p2ptypes.PeerID) { + for _, i := range is { + out = append(out, i) + } + + return out +} + +func TestLauncher_LocalNode(t *testing.T) { + ctx := tests.Context(t) + lggr := logger.TestLogger(t) + registry := NewRegistry(lggr) + dispatcher := remoteMocks.NewDispatcher(t) + + var pid ragetypes.PeerID + err := pid.UnmarshalText([]byte("12D3KooWBCF1XT5Wi8FzfgNCqRL76Swv8TRU3TiD4QiJm8NMNX7N")) + require.NoError(t, err) + peer := mocks.NewPeer(t) + peer.On("UpdateConnections", mock.Anything).Return(nil) + peer.On("ID").Return(pid) + wrapper := mocks.NewPeerWrapper(t) + wrapper.On("GetPeer").Return(peer) + + workflowDonNodes := [][32]byte{ + pid, + randomWord(), + randomWord(), + randomWord(), + } + + dID := uint32(1) + // The below state describes a Workflow DON (AcceptsWorkflows = true), + // which exposes the streams-trigger and write_chain capabilities. + // We expect receivers to be wired up and both capabilities to be added to the registry. + state := registrysyncer.State{ + IDsToDONs: map[registrysyncer.DonID]kcr.CapabilitiesRegistryDONInfo{ + registrysyncer.DonID(dID): { + Id: dID, + ConfigCount: uint32(0), + F: uint8(1), + IsPublic: true, + AcceptsWorkflows: true, + NodeP2PIds: workflowDonNodes, + }, + }, + IDsToNodes: map[p2ptypes.PeerID]kcr.CapabilitiesRegistryNodeInfo{ + workflowDonNodes[0]: { + NodeOperatorId: 1, + Signer: randomWord(), + P2pId: workflowDonNodes[0], + }, + workflowDonNodes[1]: { + NodeOperatorId: 1, + Signer: randomWord(), + P2pId: workflowDonNodes[1], + }, + workflowDonNodes[2]: { + NodeOperatorId: 1, + Signer: randomWord(), + P2pId: workflowDonNodes[2], + }, + workflowDonNodes[3]: { + NodeOperatorId: 1, + Signer: randomWord(), + P2pId: workflowDonNodes[3], + }, + }, + } + + launcher := NewLauncher( + lggr, + wrapper, + dispatcher, + registry, + ) + + err = launcher.Launch(ctx, state) + require.NoError(t, err) + defer launcher.Close() + + node, err := launcher.LocalNode(ctx) + require.NoError(t, err) + + don := capabilities.DON{ + ID: fmt.Sprintf("%d", dID), + Members: toPeerIDs(workflowDonNodes), + F: 1, + } + expectedNode := capabilities.Node{ + PeerID: &pid, + WorkflowDON: don, + CapabilityDONs: []capabilities.DON{don}, + } + assert.Equal(t, expectedNode, node) +} diff --git a/core/capabilities/reader.go b/core/capabilities/reader.go deleted file mode 100644 index a0b7d3d96d8..00000000000 --- a/core/capabilities/reader.go +++ /dev/null @@ -1,157 +0,0 @@ -package capabilities - -import ( - "context" - "encoding/json" - "errors" - "fmt" - - "github.com/smartcontractkit/chainlink-common/pkg/capabilities" - "github.com/smartcontractkit/chainlink-common/pkg/logger" - "github.com/smartcontractkit/chainlink-common/pkg/types" - kcr "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/keystone/generated/capabilities_registry" - p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types" - evmrelaytypes "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/types" -) - -type remoteRegistryReader struct { - r types.ContractReader - peerWrapper p2ptypes.PeerWrapper - lggr logger.Logger -} - -var _ reader = (*remoteRegistryReader)(nil) - -type hashedCapabilityID [32]byte -type donID uint32 - -type state struct { - IDsToDONs map[donID]kcr.CapabilitiesRegistryDONInfo - IDsToNodes map[p2ptypes.PeerID]kcr.CapabilitiesRegistryNodeInfo - IDsToCapabilities map[hashedCapabilityID]kcr.CapabilitiesRegistryCapabilityInfo -} - -func (r *remoteRegistryReader) LocalNode(ctx context.Context) (capabilities.Node, error) { - if r.peerWrapper.GetPeer() == nil { - return capabilities.Node{}, errors.New("unable to get peer: peerWrapper hasn't started yet") - } - - pid := r.peerWrapper.GetPeer().ID() - - readerState, err := r.state(ctx) - if err != nil { - return capabilities.Node{}, fmt.Errorf("failed to get state from registry to determine don ownership: %w", err) - } - - var workflowDON capabilities.DON - capabilityDONs := []capabilities.DON{} - for _, d := range readerState.IDsToDONs { - for _, p := range d.NodeP2PIds { - if p == pid { - if d.AcceptsWorkflows { - if workflowDON.ID == "" { - workflowDON = *toDONInfo(d) - } else { - r.lggr.Errorf("Configuration error: node %s belongs to more than one workflowDON", pid) - } - } - - capabilityDONs = append(capabilityDONs, *toDONInfo(d)) - } - } - } - - return capabilities.Node{ - PeerID: &pid, - WorkflowDON: workflowDON, - CapabilityDONs: capabilityDONs, - }, nil -} - -func (r *remoteRegistryReader) state(ctx context.Context) (state, error) { - dons := []kcr.CapabilitiesRegistryDONInfo{} - err := r.r.GetLatestValue(ctx, "CapabilitiesRegistry", "getDONs", nil, &dons) - if err != nil { - return state{}, err - } - - idsToDONs := map[donID]kcr.CapabilitiesRegistryDONInfo{} - for _, d := range dons { - idsToDONs[donID(d.Id)] = d - } - - caps := []kcr.CapabilitiesRegistryCapabilityInfo{} - err = r.r.GetLatestValue(ctx, "CapabilitiesRegistry", "getCapabilities", nil, &caps) - if err != nil { - return state{}, err - } - - idsToCapabilities := map[hashedCapabilityID]kcr.CapabilitiesRegistryCapabilityInfo{} - for _, c := range caps { - idsToCapabilities[c.HashedId] = c - } - - nodes := []kcr.CapabilitiesRegistryNodeInfo{} - err = r.r.GetLatestValue(ctx, "CapabilitiesRegistry", "getNodes", nil, &nodes) - if err != nil { - return state{}, err - } - - idsToNodes := map[p2ptypes.PeerID]kcr.CapabilitiesRegistryNodeInfo{} - for _, node := range nodes { - idsToNodes[node.P2pId] = node - } - - return state{IDsToDONs: idsToDONs, IDsToCapabilities: idsToCapabilities, IDsToNodes: idsToNodes}, nil -} - -type contractReaderFactory interface { - NewContractReader(context.Context, []byte) (types.ContractReader, error) -} - -func newRemoteRegistryReader(ctx context.Context, lggr logger.Logger, peerWrapper p2ptypes.PeerWrapper, relayer contractReaderFactory, remoteRegistryAddress string) (*remoteRegistryReader, error) { - contractReaderConfig := evmrelaytypes.ChainReaderConfig{ - Contracts: map[string]evmrelaytypes.ChainContractReader{ - "CapabilitiesRegistry": { - ContractABI: kcr.CapabilitiesRegistryABI, - Configs: map[string]*evmrelaytypes.ChainReaderDefinition{ - "getDONs": { - ChainSpecificName: "getDONs", - }, - "getCapabilities": { - ChainSpecificName: "getCapabilities", - }, - "getNodes": { - ChainSpecificName: "getNodes", - }, - }, - }, - }, - } - - contractReaderConfigEncoded, err := json.Marshal(contractReaderConfig) - if err != nil { - return nil, err - } - - cr, err := relayer.NewContractReader(ctx, contractReaderConfigEncoded) - if err != nil { - return nil, err - } - - err = cr.Bind(ctx, []types.BoundContract{ - { - Address: remoteRegistryAddress, - Name: "CapabilitiesRegistry", - }, - }) - if err != nil { - return nil, err - } - - return &remoteRegistryReader{ - r: cr, - peerWrapper: peerWrapper, - lggr: lggr, - }, err -} diff --git a/core/capabilities/syncer.go b/core/capabilities/syncer.go deleted file mode 100644 index 800fb325094..00000000000 --- a/core/capabilities/syncer.go +++ /dev/null @@ -1,913 +0,0 @@ -package capabilities - -import ( - "context" - "errors" - "fmt" - "math/big" - "slices" - "strings" - "sync" - "time" - - "google.golang.org/protobuf/proto" - - "github.com/smartcontractkit/chainlink-common/pkg/capabilities" - "github.com/smartcontractkit/chainlink-common/pkg/capabilities/datastreams" - "github.com/smartcontractkit/chainlink-common/pkg/capabilities/triggers" - "github.com/smartcontractkit/chainlink-common/pkg/services" - "github.com/smartcontractkit/chainlink-common/pkg/types/core" - - "github.com/smartcontractkit/libocr/ragep2p" - ragetypes "github.com/smartcontractkit/libocr/ragep2p/types" - - "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote" - "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/target" - remotetypes "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types" - "github.com/smartcontractkit/chainlink/v2/core/capabilities/streams" - kcr "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/keystone/generated/capabilities_registry" - "github.com/smartcontractkit/chainlink/v2/core/logger" - p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types" -) - -type reader interface { - state(ctx context.Context) (state, error) - LocalNode(ctx context.Context) (capabilities.Node, error) -} - -type registrySyncer struct { - peerWrapper p2ptypes.PeerWrapper - registry core.CapabilitiesRegistry - dispatcher remotetypes.Dispatcher - stopCh services.StopChan - subServices []services.Service - networkSetup HardcodedDonNetworkSetup - reader - - wg sync.WaitGroup - lggr logger.Logger -} - -var _ services.Service = ®istrySyncer{} - -var ( - defaultTickInterval = 12 * time.Second -) - -var defaultStreamConfig = p2ptypes.StreamConfig{ - IncomingMessageBufferSize: 1000000, - OutgoingMessageBufferSize: 1000000, - MaxMessageLenBytes: 100000, - MessageRateLimiter: ragep2p.TokenBucketParams{ - Rate: 100.0, - Capacity: 1000, - }, - BytesRateLimiter: ragep2p.TokenBucketParams{ - Rate: 100000.0, - Capacity: 1000000, - }, -} - -// RegistrySyncer updates local Registry to match its onchain counterpart -func NewRegistrySyncer( - peerWrapper p2ptypes.PeerWrapper, - registry core.CapabilitiesRegistry, - dispatcher remotetypes.Dispatcher, - lggr logger.Logger, - networkSetup HardcodedDonNetworkSetup, - relayer contractReaderFactory, - registryAddress string, -) (*registrySyncer, error) { - stopCh := make(services.StopChan) - ctx, _ := stopCh.NewCtx() - reader, err := newRemoteRegistryReader(ctx, lggr, peerWrapper, relayer, registryAddress) - if err != nil { - return nil, err - } - - return newRegistrySyncer( - stopCh, - peerWrapper, - registry, - dispatcher, - lggr.Named("RegistrySyncer"), - networkSetup, - reader, - ), nil -} - -func newRegistrySyncer( - stopCh services.StopChan, - peerWrapper p2ptypes.PeerWrapper, - registry core.CapabilitiesRegistry, - dispatcher remotetypes.Dispatcher, - lggr logger.Logger, - networkSetup HardcodedDonNetworkSetup, - reader reader, -) *registrySyncer { - return ®istrySyncer{ - stopCh: stopCh, - peerWrapper: peerWrapper, - registry: registry, - dispatcher: dispatcher, - networkSetup: networkSetup, - lggr: lggr, - reader: reader, - } -} - -func (s *registrySyncer) Start(ctx context.Context) error { - // NOTE: Decrease wg.Add and uncomment the line below - // `go s.launch()` to enable the hardcoded syncer. - s.wg.Add(1) - // go s.launch() - go s.syncLoop() - return nil -} - -func (s *registrySyncer) syncLoop() { - defer s.wg.Done() - - ctx, cancel := s.stopCh.NewCtx() - defer cancel() - - ticker := time.NewTicker(defaultTickInterval) - defer ticker.Stop() - - // Sync for a first time outside the loop; this means we'll start a remote - // sync immediately once spinning up syncLoop, as by default a ticker will - // fire for the first time at T+N, where N is the interval. - s.lggr.Debug("starting initial sync with remote registry") - err := s.sync(ctx) - if err != nil { - s.lggr.Errorw("failed to sync with remote registry", "error", err) - } - - for { - select { - case <-s.stopCh: - return - case <-ticker.C: - s.lggr.Debug("starting regular sync with the remote registry") - err := s.sync(ctx) - if err != nil { - s.lggr.Errorw("failed to sync with remote registry", "error", err) - } - } - } -} - -func (s *registrySyncer) sync(ctx context.Context) error { - readerState, err := s.reader.state(ctx) - if err != nil { - return fmt.Errorf("failed to sync with remote registry: %w", err) - } - - // Let's start by updating the list of Peers - // We do this by creating a new entry for each node belonging - // to a public DON. - // We also add the hardcoded peers determined by the NetworkSetup. - allPeers := make(map[ragetypes.PeerID]p2ptypes.StreamConfig) - // TODO: Remove this when we're no longer hard-coding - // a `networkSetup`. - for p, cfg := range s.networkSetup.allPeers { - allPeers[p] = cfg - } - - publicDONs := []kcr.CapabilitiesRegistryDONInfo{} - for _, d := range readerState.IDsToDONs { - if !d.IsPublic { - continue - } - - publicDONs = append(publicDONs, d) - - for _, nid := range d.NodeP2PIds { - allPeers[nid] = defaultStreamConfig - } - } - - // TODO: be a bit smarter about who we connect to; we should ideally only - // be connecting to peers when we need to. - // https://smartcontract-it.atlassian.net/browse/KS-330 - err = s.peerWrapper.GetPeer().UpdateConnections(allPeers) - if err != nil { - return fmt.Errorf("failed to update peer connections: %w", err) - } - - // Next, we need to split the DONs into the following: - // - workflow DONs the current node is a part of. - // These will need remote shims to all remote capabilities on other DONs. - // - // We'll also construct a set to record what DONs the current node is a part of, - // regardless of any modifiers (public/acceptsWorkflows etc). - myID := s.peerWrapper.GetPeer().ID() - myWorkflowDONs := []kcr.CapabilitiesRegistryDONInfo{} - remoteWorkflowDONs := []kcr.CapabilitiesRegistryDONInfo{} - myDONs := map[uint32]bool{} - for _, d := range readerState.IDsToDONs { - for _, peerID := range d.NodeP2PIds { - if peerID == myID { - myDONs[d.Id] = true - } - } - - if d.AcceptsWorkflows { - if myDONs[d.Id] { - myWorkflowDONs = append(myWorkflowDONs, d) - } else { - remoteWorkflowDONs = append(remoteWorkflowDONs, d) - } - } - } - - // - remote capability DONs (with IsPublic = true) the current node is a part of. - // These need server-side shims. - myCapabilityDONs := []kcr.CapabilitiesRegistryDONInfo{} - remoteCapabilityDONs := []kcr.CapabilitiesRegistryDONInfo{} - for _, d := range publicDONs { - if len(d.CapabilityConfigurations) > 0 { - if myDONs[d.Id] { - myCapabilityDONs = append(myCapabilityDONs, d) - } else { - remoteCapabilityDONs = append(remoteCapabilityDONs, d) - } - } - } - - // Now, if my node is a workflow DON, let's setup any shims - // to external capabilities. - if len(myWorkflowDONs) > 0 { - myDON := myWorkflowDONs[0] - - // TODO: this is a bit nasty; figure out how best to handle this. - if len(myWorkflowDONs) > 1 { - s.lggr.Warn("node is part of more than one workflow DON; assigning first DON as caller") - } - - for _, rcd := range remoteCapabilityDONs { - err := s.addRemoteCapabilities(ctx, myDON, rcd, readerState) - if err != nil { - return err - } - } - } - - // Finally, if I'm a capability DON, let's enable external access - // to the capability. - if len(myCapabilityDONs) > 0 { - for _, mcd := range myCapabilityDONs { - err := s.enableExternalAccess(ctx, myID, mcd, readerState, remoteWorkflowDONs) - if err != nil { - return err - } - } - } - - return nil -} - -func signersFor(don kcr.CapabilitiesRegistryDONInfo, state state) ([][]byte, error) { - s := [][]byte{} - for _, nodeID := range don.NodeP2PIds { - node, ok := state.IDsToNodes[nodeID] - if !ok { - return nil, fmt.Errorf("could not find node for id %s", nodeID) - } - - // NOTE: the capability registry stores signers as [32]byte, - // but we only need the first [20], as the rest is padded. - s = append(s, node.Signer[0:20]) - } - - return s, nil -} - -func toDONInfo(don kcr.CapabilitiesRegistryDONInfo) *capabilities.DON { - peerIDs := []p2ptypes.PeerID{} - for _, p := range don.NodeP2PIds { - peerIDs = append(peerIDs, p) - } - - return &capabilities.DON{ - ID: fmt.Sprint(don.Id), - Members: peerIDs, - F: don.F, - } -} - -func toCapabilityType(capabilityType uint8) capabilities.CapabilityType { - switch capabilityType { - case 0: - return capabilities.CapabilityTypeTrigger - case 1: - return capabilities.CapabilityTypeAction - case 2: - return capabilities.CapabilityTypeConsensus - case 3: - return capabilities.CapabilityTypeTarget - default: - // Not found - return capabilities.CapabilityType(-1) - } -} - -func (s *registrySyncer) addRemoteCapabilities(ctx context.Context, myDON kcr.CapabilitiesRegistryDONInfo, remoteDON kcr.CapabilitiesRegistryDONInfo, state state) error { - for _, c := range remoteDON.CapabilityConfigurations { - capability, ok := state.IDsToCapabilities[c.CapabilityId] - if !ok { - return fmt.Errorf("could not find capability matching id %s", c.CapabilityId) - } - - switch toCapabilityType(capability.CapabilityType) { - case capabilities.CapabilityTypeTrigger: - newTriggerFn := func(info capabilities.CapabilityInfo) (capabilityService, error) { - if !strings.HasPrefix(info.ID, "streams-trigger") { - return nil, errors.New("not supported: trigger capability does not have id = streams-trigger") - } - - codec := streams.NewCodec(s.lggr) - - signers, err := signersFor(remoteDON, state) - if err != nil { - return nil, err - } - - aggregator := triggers.NewMercuryRemoteAggregator( - codec, - signers, - int(remoteDON.F+1), - s.lggr, - ) - cfg := &remotetypes.RemoteTriggerConfig{} - cfg.ApplyDefaults() - err = proto.Unmarshal(c.Config, cfg) - if err != nil { - return nil, err - } - // TODO: We need to implement a custom, Mercury-specific - // aggregator here, because there is no guarantee that - // all trigger events in the workflow will have the same - // payloads. As a workaround, we validate the signatures. - // When this is solved, we can move to a generic aggregator - // and remove this. - triggerCap := remote.NewTriggerSubscriber( - cfg, - info, - *toDONInfo(remoteDON), - *toDONInfo(myDON), - s.dispatcher, - aggregator, - s.lggr, - ) - return triggerCap, nil - } - err := s.addToRegistryAndSetDispatcher(ctx, capability, remoteDON, newTriggerFn) - if err != nil { - return fmt.Errorf("failed to add trigger shim: %w", err) - } - case capabilities.CapabilityTypeAction: - s.lggr.Warn("no remote client configured for capability type action, skipping configuration") - case capabilities.CapabilityTypeConsensus: - s.lggr.Warn("no remote client configured for capability type consensus, skipping configuration") - case capabilities.CapabilityTypeTarget: - newTargetFn := func(info capabilities.CapabilityInfo) (capabilityService, error) { - client := target.NewClient( - info, - *toDONInfo(myDON), - s.dispatcher, - defaultTargetRequestTimeout, - s.lggr, - ) - return client, nil - } - - err := s.addToRegistryAndSetDispatcher(ctx, capability, remoteDON, newTargetFn) - if err != nil { - return fmt.Errorf("failed to add target shim: %w", err) - } - default: - s.lggr.Warnf("unknown capability type, skipping configuration: %+v", capability) - } - } - return nil -} - -type capabilityService interface { - capabilities.BaseCapability - remotetypes.Receiver - services.Service -} - -func (s *registrySyncer) addToRegistryAndSetDispatcher(ctx context.Context, capabilityInfo kcr.CapabilitiesRegistryCapabilityInfo, don kcr.CapabilitiesRegistryDONInfo, newCapFn func(info capabilities.CapabilityInfo) (capabilityService, error)) error { - fullCapID := fmt.Sprintf("%s@%s", capabilityInfo.LabelledName, capabilityInfo.Version) - info, err := capabilities.NewRemoteCapabilityInfo( - fullCapID, - toCapabilityType(capabilityInfo.CapabilityType), - fmt.Sprintf("Remote Capability for %s", fullCapID), - toDONInfo(don), - ) - if err != nil { - return err - } - s.lggr.Debugw("Adding remote capability to registry", "id", info.ID, "don", info.DON) - capability, err := newCapFn(info) - if err != nil { - return err - } - - err = s.registry.Add(ctx, capability) - if err != nil { - // If the capability already exists, then it's either local - // or we've handled this in a previous syncer iteration, - // let's skip and move on to other capabilities. - if errors.Is(err, ErrCapabilityAlreadyExists) { - return nil - } - - return err - } - - err = s.dispatcher.SetReceiver( - fullCapID, - fmt.Sprint(don.Id), - capability, - ) - if err != nil { - return err - } - s.lggr.Debugw("Setting receiver for capability", "id", fullCapID, "donID", don.Id) - err = capability.Start(ctx) - if err != nil { - return err - } - s.subServices = append(s.subServices, capability) - return nil -} - -var ( - defaultTargetRequestTimeout = time.Minute -) - -func (s *registrySyncer) enableExternalAccess(ctx context.Context, myPeerID p2ptypes.PeerID, don kcr.CapabilitiesRegistryDONInfo, state state, remoteWorkflowDONs []kcr.CapabilitiesRegistryDONInfo) error { - idsToDONs := map[string]capabilities.DON{} - for _, d := range remoteWorkflowDONs { - idsToDONs[fmt.Sprint(d.Id)] = *toDONInfo(d) - } - - for _, c := range don.CapabilityConfigurations { - capability, ok := state.IDsToCapabilities[c.CapabilityId] - if !ok { - return fmt.Errorf("could not find capability matching id %s", c.CapabilityId) - } - - switch toCapabilityType(capability.CapabilityType) { - case capabilities.CapabilityTypeTrigger: - newTriggerPublisher := func(capability capabilities.BaseCapability, info capabilities.CapabilityInfo) (receiverService, error) { - cfg := &remotetypes.RemoteTriggerConfig{} - cfg.ApplyDefaults() - err := proto.Unmarshal(c.Config, cfg) - if err != nil { - return nil, err - } - publisher := remote.NewTriggerPublisher( - cfg, - capability.(capabilities.TriggerCapability), - info, - *toDONInfo(don), - idsToDONs, - s.dispatcher, - s.lggr, - ) - return publisher, nil - } - - err := s.addReceiver(ctx, capability, don, newTriggerPublisher) - if err != nil { - return fmt.Errorf("failed to add server-side receiver: %w", err) - } - case capabilities.CapabilityTypeAction: - s.lggr.Warn("no remote client configured for capability type action, skipping configuration") - case capabilities.CapabilityTypeConsensus: - s.lggr.Warn("no remote client configured for capability type consensus, skipping configuration") - case capabilities.CapabilityTypeTarget: - newTargetServer := func(capability capabilities.BaseCapability, info capabilities.CapabilityInfo) (receiverService, error) { - return target.NewServer( - myPeerID, - capability.(capabilities.TargetCapability), - info, - *toDONInfo(don), - idsToDONs, - s.dispatcher, - defaultTargetRequestTimeout, - s.lggr, - ), nil - } - - err := s.addReceiver(ctx, capability, don, newTargetServer) - if err != nil { - return fmt.Errorf("failed to add server-side receiver: %w", err) - } - default: - s.lggr.Warnf("unknown capability type, skipping configuration: %+v", capability) - } - } - return nil -} - -type receiverService interface { - services.Service - remotetypes.Receiver -} - -func (s *registrySyncer) addReceiver(ctx context.Context, capability kcr.CapabilitiesRegistryCapabilityInfo, don kcr.CapabilitiesRegistryDONInfo, newReceiverFn func(capability capabilities.BaseCapability, info capabilities.CapabilityInfo) (receiverService, error)) error { - fullCapID := fmt.Sprintf("%s@%s", capability.LabelledName, capability.Version) - info, err := capabilities.NewRemoteCapabilityInfo( - fullCapID, - toCapabilityType(capability.CapabilityType), - fmt.Sprintf("Remote Capability for %s", fullCapID), - toDONInfo(don), - ) - if err != nil { - return err - } - underlying, err := s.registry.Get(ctx, fullCapID) - if err != nil { - return err - } - - receiver, err := newReceiverFn(underlying, info) - if err != nil { - return err - } - - s.lggr.Debugw("Enabling external access for capability", "id", fullCapID, "donID", don.Id) - err = s.dispatcher.SetReceiver(fullCapID, fmt.Sprint(don.Id), receiver) - if err != nil { - return err - } - - err = receiver.Start(ctx) - if err != nil { - return err - } - - s.subServices = append(s.subServices, receiver) - return nil -} - -//func hexStringsToBytes(strs []string) (res [][]byte) { -// for _, s := range strs { -// b, _ := hex.DecodeString(s[2:]) -// res = append(res, b) -// } -// return res -//} - -//const maxRetryCount = 60 - -//// NOTE: this implementation of the Syncer is temporary and will be replaced by one -//// that reads the configuration from chain (KS-117). -//func (s *registrySyncer) launch() { -// ctx, _ := s.stopCh.NewCtx() -// defer s.wg.Done() -// capId := "streams-trigger@1.0.0" -// triggerInfo, err := capabilities.NewRemoteCapabilityInfo( -// capId, -// capabilities.CapabilityTypeTrigger, -// "Remote Trigger", -// &s.networkSetup.TriggerCapabilityDonInfo, -// ) -// if err != nil { -// s.lggr.Errorw("failed to create capability info for streams-trigger", "error", err) -// return -// } -// -// targetCapId := "write_ethereum-testnet-sepolia@1.0.0" -// targetInfo, err := capabilities.NewRemoteCapabilityInfo( -// targetCapId, -// capabilities.CapabilityTypeTarget, -// "Remote Target", -// &s.networkSetup.TargetCapabilityDonInfo, -// ) -// if err != nil { -// s.lggr.Errorw("failed to create capability info for write_ethereum-testnet-sepolia", "error", err) -// return -// } -// -// myId := s.peerWrapper.GetPeer().ID() -// config := &remotetypes.RemoteTriggerConfig{ -// RegistrationRefreshMs: 20000, -// RegistrationExpiryMs: 60000, -// MinResponsesToAggregate: uint32(s.networkSetup.TriggerCapabilityDonInfo.F) + 1, -// } -// err = s.peerWrapper.GetPeer().UpdateConnections(s.networkSetup.allPeers) -// if err != nil { -// s.lggr.Errorw("failed to update connections", "error", err) -// return -// } -// if s.networkSetup.IsWorkflowDon(myId) { -// s.lggr.Info("member of a workflow DON - starting remote subscribers") -// codec := streams.NewCodec(s.lggr) -// aggregator := triggers.NewMercuryRemoteAggregator(codec, hexStringsToBytes(s.networkSetup.triggerDonSigners), int(s.networkSetup.TriggerCapabilityDonInfo.F+1), s.lggr) -// triggerCap := remote.NewTriggerSubscriber(config, triggerInfo, s.networkSetup.TriggerCapabilityDonInfo, s.networkSetup.WorkflowsDonInfo, s.dispatcher, aggregator, s.lggr) -// err = s.registry.Add(ctx, triggerCap) -// if err != nil { -// s.lggr.Errorw("failed to add remote trigger capability to registry", "error", err) -// return -// } -// err = s.dispatcher.SetReceiver(capId, s.networkSetup.TriggerCapabilityDonInfo.ID, triggerCap) -// if err != nil { -// s.lggr.Errorw("workflow DON failed to set receiver for trigger", "capabilityId", capId, "donId", s.networkSetup.TriggerCapabilityDonInfo.ID, "error", err) -// return -// } -// s.subServices = append(s.subServices, triggerCap) -// -// s.lggr.Info("member of a workflow DON - starting remote targets") -// targetCap := target.NewClient(targetInfo, s.networkSetup.WorkflowsDonInfo, s.dispatcher, 60*time.Second, s.lggr) -// err = s.registry.Add(ctx, targetCap) -// if err != nil { -// s.lggr.Errorw("failed to add remote target capability to registry", "error", err) -// return -// } -// err = s.dispatcher.SetReceiver(targetCapId, s.networkSetup.TargetCapabilityDonInfo.ID, targetCap) -// if err != nil { -// s.lggr.Errorw("workflow DON failed to set receiver for target", "capabilityId", capId, "donId", s.networkSetup.TargetCapabilityDonInfo.ID, "error", err) -// return -// } -// s.subServices = append(s.subServices, targetCap) -// } -// if s.networkSetup.IsTriggerDon(myId) { -// s.lggr.Info("member of a capability DON - starting remote publishers") -// -// /*{ -// // ---- This is for local tests only, until a full-blown Syncer is implemented -// // ---- Normally this is set up asynchronously (by the Relayer + job specs in Mercury's case) -// localTrigger := triggers.NewMercuryTriggerService(1000, s.lggr) -// mockMercuryDataProducer := NewMockMercuryDataProducer(localTrigger, s.lggr) -// err = s.registry.Add(ctx, localTrigger) -// if err != nil { -// s.lggr.Errorw("failed to add local trigger capability to registry", "error", err) -// return err -// } -// s.subServices = append(s.subServices, localTrigger) -// s.subServices = append(s.subServices, mockMercuryDataProducer) -// // ---- -// }*/ -// -// count := 0 -// for { -// count++ -// if count > maxRetryCount { -// s.lggr.Error("failed to get Streams Trigger from the Registry") -// return -// } -// underlying, err2 := s.registry.GetTrigger(ctx, capId) -// if err2 != nil { -// // NOTE: it's possible that the jobs are not launched yet at this moment. -// // If not found yet, Syncer won't add to Registry but retry on the next tick. -// s.lggr.Infow("trigger not found yet ...", "capabilityId", capId, "error", err2) -// time.Sleep(1 * time.Second) -// continue -// } -// workflowDONs := map[string]capabilities.DON{ -// s.networkSetup.WorkflowsDonInfo.ID: s.networkSetup.WorkflowsDonInfo, -// } -// triggerCap := remote.NewTriggerPublisher(config, underlying, triggerInfo, s.networkSetup.TriggerCapabilityDonInfo, workflowDONs, s.dispatcher, s.lggr) -// err = s.dispatcher.SetReceiver(capId, s.networkSetup.TriggerCapabilityDonInfo.ID, triggerCap) -// if err != nil { -// s.lggr.Errorw("capability DON failed to set receiver", "capabilityId", capId, "donId", s.networkSetup.TriggerCapabilityDonInfo.ID, "error", err) -// return -// } -// s.subServices = append(s.subServices, triggerCap) -// break -// } -// } -// if s.networkSetup.IsTargetDon(myId) { -// s.lggr.Info("member of a target DON - starting remote shims") -// underlying, err2 := s.registry.GetTarget(ctx, targetCapId) -// if err2 != nil { -// s.lggr.Errorw("target not found yet", "capabilityId", targetCapId, "error", err2) -// return -// } -// workflowDONs := map[string]capabilities.DON{ -// s.networkSetup.WorkflowsDonInfo.ID: s.networkSetup.WorkflowsDonInfo, -// } -// targetCap := target.NewServer(myId, underlying, targetInfo, *targetInfo.DON, workflowDONs, s.dispatcher, 60*time.Second, s.lggr) -// err = s.dispatcher.SetReceiver(targetCapId, s.networkSetup.TargetCapabilityDonInfo.ID, targetCap) -// if err != nil { -// s.lggr.Errorw("capability DON failed to set receiver", "capabilityId", capId, "donId", s.networkSetup.TargetCapabilityDonInfo.ID, "error", err) -// return -// } -// s.subServices = append(s.subServices, targetCap) -// } -// // NOTE: temporary service start - should be managed by capability creation -// for _, srv := range s.subServices { -// err = srv.Start(ctx) -// if err != nil { -// s.lggr.Errorw("failed to start remote trigger caller", "error", err) -// return -// } -// } -// s.lggr.Info("registry syncer started") -//} - -func (s *registrySyncer) Close() error { - close(s.stopCh) - s.wg.Wait() - for _, subService := range s.subServices { - err := subService.Close() - if err != nil { - s.lggr.Errorw("failed to close a sub-service", "name", subService.Name(), "error", err) - } - } - return s.peerWrapper.GetPeer().UpdateConnections(map[ragetypes.PeerID]p2ptypes.StreamConfig{}) -} - -func (s *registrySyncer) Ready() error { - return nil -} - -func (s *registrySyncer) HealthReport() map[string]error { - return nil -} - -func (s *registrySyncer) Name() string { - return "RegistrySyncer" -} - -// HardcodedDonNetworkSetup is a temporary setup for testing purposes -type HardcodedDonNetworkSetup struct { - workflowDonPeers []string - triggerDonPeers []string - targetDonPeers []string - triggerDonSigners []string - allPeers map[ragetypes.PeerID]p2ptypes.StreamConfig - - WorkflowsDonInfo capabilities.DON - TriggerCapabilityDonInfo capabilities.DON - TargetCapabilityDonInfo capabilities.DON -} - -func NewHardcodedDonNetworkSetup() (HardcodedDonNetworkSetup, error) { - result := HardcodedDonNetworkSetup{} - - result.workflowDonPeers = []string{ - "12D3KooWBCF1XT5Wi8FzfgNCqRL76Swv8TRU3TiD4QiJm8NMNX7N", - "12D3KooWG1AyvwmCpZ93J8pBQUE1SuzrjDXnT4BeouncHR3jWLCG", - "12D3KooWGeUKZBRMbx27FUTgBwZa9Ap9Ym92mywwpuqkEtz8XWyv", - "12D3KooW9zYWQv3STmDeNDidyzxsJSTxoCTLicafgfeEz9nhwhC4", - "12D3KooWG1AeBnSJH2mdcDusXQVye2jqodZ6pftTH98HH6xvrE97", - "12D3KooWBf3PrkhNoPEmp7iV291YnPuuTsgEDHTscLajxoDvwHGA", - "12D3KooWP3FrMTFXXRU2tBC8aYvEBgUX6qhcH9q2JZCUi9Wvc2GX", - } - result.triggerDonPeers = []string{ - "12D3KooWBaiTbbRwwt2fbNifiL7Ew9tn3vds9AJE3Nf3eaVBX36m", - "12D3KooWS7JSY9fzSfWgbCE1S3W2LNY6ZVpRuun74moVBkKj6utE", - "12D3KooWMMTDXcWhpVnwrdAer1jnVARTmnr3RyT3v7Djg8ZuoBh9", - "12D3KooWGzVXsKxXsF4zLgxSDM8Gzx1ywq2pZef4PrHMKuVg4K3P", - "12D3KooWSyjmmzjVtCzwN7bXzZQFmWiJRuVcKBerNjVgL7HdLJBW", - "12D3KooWLGz9gzhrNsvyM6XnXS3JRkZoQdEzuAvysovnSChNK5ZK", - "12D3KooWAvZnvknFAfSiUYjATyhzEJLTeKvAzpcLELHi4ogM3GET", - } - result.triggerDonSigners = []string{ - "0x9CcE7293a4Cc2621b61193135A95928735e4795F", - "0x3c775F20bCB2108C1A818741Ce332Bb5fe0dB925", - "0x50314239e2CF05555ceeD53E7F47eB2A8Eab0dbB", - "0xd76A4f98898c3b9A72b244476d7337b50D54BCd8", - "0x656A873f6895b8a03Fb112dE927d43FA54B2c92A", - "0x5d1e87d87bF2e0cD4Ea64F381a2dbF45e5f0a553", - "0x91d9b0062265514f012Eb8fABA59372fD9520f56", - } - result.targetDonPeers = []string{ // "cap-one" - "12D3KooWJrthXtnPHw7xyHFAxo6NxifYTvc8igKYaA6wRRRqtsMb", - "12D3KooWFQekP9sGex4XhqEJav5EScjTpDVtDqJFg1JvrePBCEGJ", - "12D3KooWFLEq4hYtdyKWwe47dXGEbSiHMZhmr5xLSJNhpfiEz8NF", - "12D3KooWN2hztiXNNS1jMQTTvvPRYcarK1C7T3Mdqk4x4gwyo5WS", - } - - result.allPeers = make(map[ragetypes.PeerID]p2ptypes.StreamConfig) - addPeersToDONInfo := func(peers []string, donInfo *capabilities.DON) error { - for _, peerID := range peers { - var p ragetypes.PeerID - err := p.UnmarshalText([]byte(peerID)) - if err != nil { - return err - } - result.allPeers[p] = defaultStreamConfig - donInfo.Members = append(donInfo.Members, p) - } - return nil - } - result.WorkflowsDonInfo = capabilities.DON{ID: "workflowDon1", F: 2} - if err := addPeersToDONInfo(result.workflowDonPeers, &result.WorkflowsDonInfo); err != nil { - return HardcodedDonNetworkSetup{}, fmt.Errorf("failed to add peers to workflow DON info: %w", err) - } - result.TriggerCapabilityDonInfo = capabilities.DON{ID: "capabilityDon1", F: 1} // NOTE: misconfiguration - should be 2 - if err := addPeersToDONInfo(result.triggerDonPeers, &result.TriggerCapabilityDonInfo); err != nil { - return HardcodedDonNetworkSetup{}, fmt.Errorf("failed to add peers to trigger DON info: %w", err) - } - - result.TargetCapabilityDonInfo = capabilities.DON{ID: "targetDon1", F: 1} - if err := addPeersToDONInfo(result.targetDonPeers, &result.TargetCapabilityDonInfo); err != nil { - return HardcodedDonNetworkSetup{}, fmt.Errorf("failed to add peers to target DON info: %w", err) - } - - return result, nil -} - -func (h HardcodedDonNetworkSetup) IsWorkflowDon(id p2ptypes.PeerID) bool { - return slices.Contains(h.workflowDonPeers, id.String()) -} - -func (h HardcodedDonNetworkSetup) IsTriggerDon(id p2ptypes.PeerID) bool { - return slices.Contains(h.triggerDonPeers, id.String()) -} - -func (h HardcodedDonNetworkSetup) IsTargetDon(id p2ptypes.PeerID) bool { - return slices.Contains(h.targetDonPeers, id.String()) -} - -type mockMercuryDataProducer struct { - trigger *triggers.MercuryTriggerService - wg sync.WaitGroup - closeCh chan struct{} - lggr logger.Logger -} - -var _ services.Service = &mockMercuryDataProducer{} - -func NewMockMercuryDataProducer(trigger *triggers.MercuryTriggerService, lggr logger.Logger) *mockMercuryDataProducer { - return &mockMercuryDataProducer{ - trigger: trigger, - closeCh: make(chan struct{}), - lggr: lggr, - } -} - -func (m *mockMercuryDataProducer) Start(ctx context.Context) error { - m.wg.Add(1) - go m.loop() - return nil -} - -func (m *mockMercuryDataProducer) loop() { - defer m.wg.Done() - - sleepSec := 60 - ticker := time.NewTicker(time.Duration(sleepSec) * time.Second) - defer ticker.Stop() - - prices := []*big.Int{big.NewInt(300000), big.NewInt(40000), big.NewInt(5000000)} - - for range ticker.C { - for i := range prices { - prices[i].Add(prices[i], big.NewInt(1)) - } - - reports := []datastreams.FeedReport{ - { - FeedID: "0x0003fbba4fce42f65d6032b18aee53efdf526cc734ad296cb57565979d883bdd", - FullReport: []byte{0x11, 0xaa, 0xbb, 0xcc}, - BenchmarkPrice: prices[0].Bytes(), - ObservationTimestamp: time.Now().Unix(), - }, - { - FeedID: "0x0003c317fec7fad514c67aacc6366bf2f007ce37100e3cddcacd0ccaa1f3746d", - FullReport: []byte{0x22, 0xaa, 0xbb, 0xcc}, - BenchmarkPrice: prices[1].Bytes(), - ObservationTimestamp: time.Now().Unix(), - }, - { - FeedID: "0x0003da6ab44ea9296674d80fe2b041738189103d6b4ea9a4d34e2f891fa93d12", - FullReport: []byte{0x33, 0xaa, 0xbb, 0xcc}, - BenchmarkPrice: prices[2].Bytes(), - ObservationTimestamp: time.Now().Unix(), - }, - } - - m.lggr.Infow("New set of Mercury reports", "timestamp", time.Now().Unix(), "payload", reports) - err := m.trigger.ProcessReport(reports) - if err != nil { - m.lggr.Errorw("failed to process Mercury reports", "err", err, "timestamp", time.Now().Unix(), "payload", reports) - } - } -} - -func (m *mockMercuryDataProducer) Close() error { - close(m.closeCh) - m.wg.Wait() - return nil -} - -func (m *mockMercuryDataProducer) HealthReport() map[string]error { - return nil -} - -func (m *mockMercuryDataProducer) Ready() error { - return nil -} - -func (m *mockMercuryDataProducer) Name() string { - return "mockMercuryDataProducer" -} diff --git a/core/capabilities/syncer_test.go b/core/capabilities/syncer_test.go deleted file mode 100644 index 2c5f871bdb8..00000000000 --- a/core/capabilities/syncer_test.go +++ /dev/null @@ -1,613 +0,0 @@ -package capabilities - -import ( - "context" - "fmt" - "testing" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/mock" - "github.com/stretchr/testify/require" - - ragetypes "github.com/smartcontractkit/libocr/ragep2p/types" - - "github.com/smartcontractkit/chainlink-common/pkg/capabilities" - "github.com/smartcontractkit/chainlink-common/pkg/services" - commonMocks "github.com/smartcontractkit/chainlink-common/pkg/types/mocks" - "github.com/smartcontractkit/chainlink-common/pkg/utils/tests" - remoteMocks "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types/mocks" - kcr "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/keystone/generated/capabilities_registry" - "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" - "github.com/smartcontractkit/chainlink/v2/core/logger" - p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types" - "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types/mocks" -) - -func TestSyncer_CleanStartClose(t *testing.T) { - lggr := logger.TestLogger(t) - ctx := testutils.Context(t) - var pid ragetypes.PeerID - err := pid.UnmarshalText([]byte("12D3KooWBCF1XT5Wi8FzfgNCqRL76Swv8TRU3TiD4QiJm8NMNX7N")) - require.NoError(t, err) - peer := mocks.NewPeer(t) - peer.On("UpdateConnections", mock.Anything).Return(nil) - peer.On("ID").Return(pid) - wrapper := mocks.NewPeerWrapper(t) - wrapper.On("GetPeer").Return(peer) - registry := commonMocks.NewCapabilitiesRegistry(t) - dispatcher := remoteMocks.NewDispatcher(t) - - networkSetup, err := NewHardcodedDonNetworkSetup() - require.NoError(t, err) - mr := &mockReader{} - syncer := newRegistrySyncer(make(services.StopChan), wrapper, registry, dispatcher, lggr, networkSetup, mr) - require.NoError(t, err) - require.NoError(t, syncer.Start(ctx)) - require.NoError(t, syncer.Close()) -} - -type mockReader struct { - s state - err error -} - -func (m mockReader) state(ctx context.Context) (state, error) { - return m.s, m.err -} - -func (m mockReader) LocalNode(ctx context.Context) (capabilities.Node, error) { - return capabilities.Node{}, nil -} - -type mockTrigger struct { - capabilities.CapabilityInfo -} - -func (m *mockTrigger) RegisterTrigger(ctx context.Context, request capabilities.CapabilityRequest) (<-chan capabilities.CapabilityResponse, error) { - return nil, nil -} - -func (m *mockTrigger) UnregisterTrigger(ctx context.Context, request capabilities.CapabilityRequest) error { - return nil -} - -func newMockTrigger(info capabilities.CapabilityInfo) *mockTrigger { - return &mockTrigger{CapabilityInfo: info} -} - -type mockCapability struct { - capabilities.CapabilityInfo -} - -func (m *mockCapability) Execute(ctx context.Context, req capabilities.CapabilityRequest) (<-chan capabilities.CapabilityResponse, error) { - return nil, nil -} - -func (m *mockCapability) RegisterToWorkflow(ctx context.Context, request capabilities.RegisterToWorkflowRequest) error { - return nil -} - -func (m *mockCapability) UnregisterFromWorkflow(ctx context.Context, request capabilities.UnregisterFromWorkflowRequest) error { - return nil -} - -func TestSyncer_WiresUpExternalCapabilities(t *testing.T) { - ctx := tests.Context(t) - lggr := logger.TestLogger(t) - registry := NewRegistry(lggr) - dispatcher := remoteMocks.NewDispatcher(t) - - var pid ragetypes.PeerID - err := pid.UnmarshalText([]byte("12D3KooWBCF1XT5Wi8FzfgNCqRL76Swv8TRU3TiD4QiJm8NMNX7N")) - require.NoError(t, err) - peer := mocks.NewPeer(t) - peer.On("UpdateConnections", mock.Anything).Return(nil) - peer.On("ID").Return(pid) - wrapper := mocks.NewPeerWrapper(t) - wrapper.On("GetPeer").Return(peer) - - nodes := [][32]byte{ - pid, - randomWord(), - randomWord(), - randomWord(), - } - - fullTriggerCapID := "streams-trigger@1.0.0" - mt := newMockTrigger(capabilities.MustNewCapabilityInfo( - fullTriggerCapID, - capabilities.CapabilityTypeTrigger, - "streams trigger", - )) - require.NoError(t, registry.Add(ctx, mt)) - - fullTargetID := "write-chain_evm_1@1.0.0" - mtarg := &mockCapability{ - CapabilityInfo: capabilities.MustNewCapabilityInfo( - fullTargetID, - capabilities.CapabilityTypeTarget, - "write chain", - ), - } - require.NoError(t, registry.Add(ctx, mtarg)) - - triggerCapID := randomWord() - targetCapID := randomWord() - dID := uint32(1) - // The below state describes a Workflow DON (AcceptsWorkflows = true), - // which exposes the streams-trigger and write_chain capabilities. - // We expect a publisher to be wired up with this configuration, and - // no entries should be added to the registry. - mr := &mockReader{ - s: state{ - IDsToDONs: map[donID]kcr.CapabilitiesRegistryDONInfo{ - donID(dID): { - Id: dID, - ConfigCount: uint32(0), - F: uint8(1), - IsPublic: true, - AcceptsWorkflows: true, - NodeP2PIds: nodes, - CapabilityConfigurations: []kcr.CapabilitiesRegistryCapabilityConfiguration{ - { - CapabilityId: triggerCapID, - Config: []byte(""), - }, - { - CapabilityId: targetCapID, - Config: []byte(""), - }, - }, - }, - }, - IDsToCapabilities: map[hashedCapabilityID]kcr.CapabilitiesRegistryCapabilityInfo{ - triggerCapID: { - LabelledName: "streams-trigger", - Version: "1.0.0", - CapabilityType: 0, - }, - targetCapID: { - LabelledName: "write-chain_evm_1", - Version: "1.0.0", - CapabilityType: 3, - }, - }, - IDsToNodes: map[p2ptypes.PeerID]kcr.CapabilitiesRegistryNodeInfo{ - nodes[0]: { - NodeOperatorId: 1, - Signer: randomWord(), - P2pId: nodes[0], - HashedCapabilityIds: [][32]byte{triggerCapID, targetCapID}, - }, - nodes[1]: { - NodeOperatorId: 1, - Signer: randomWord(), - P2pId: nodes[1], - HashedCapabilityIds: [][32]byte{triggerCapID, targetCapID}, - }, - nodes[2]: { - NodeOperatorId: 1, - Signer: randomWord(), - P2pId: nodes[2], - HashedCapabilityIds: [][32]byte{triggerCapID, targetCapID}, - }, - nodes[3]: { - NodeOperatorId: 1, - Signer: randomWord(), - P2pId: nodes[3], - HashedCapabilityIds: [][32]byte{triggerCapID, targetCapID}, - }, - }, - }, - } - syncer := newRegistrySyncer(make(services.StopChan), wrapper, registry, dispatcher, lggr, HardcodedDonNetworkSetup{}, mr) - require.NoError(t, err) - - dispatcher.On("SetReceiver", fullTriggerCapID, fmt.Sprint(dID), mock.AnythingOfType("*remote.triggerPublisher")).Return(nil) - dispatcher.On("SetReceiver", fullTargetID, fmt.Sprint(dID), mock.AnythingOfType("*target.server")).Return(nil) - - err = syncer.sync(ctx) - require.NoError(t, err) - defer syncer.Close() -} - -func TestSyncer_IgnoresCapabilitiesForPrivateDON(t *testing.T) { - ctx := tests.Context(t) - lggr := logger.TestLogger(t) - registry := NewRegistry(lggr) - dispatcher := remoteMocks.NewDispatcher(t) - - var pid ragetypes.PeerID - err := pid.UnmarshalText([]byte("12D3KooWBCF1XT5Wi8FzfgNCqRL76Swv8TRU3TiD4QiJm8NMNX7N")) - require.NoError(t, err) - peer := mocks.NewPeer(t) - peer.On("UpdateConnections", mock.Anything).Return(nil) - peer.On("ID").Return(pid) - wrapper := mocks.NewPeerWrapper(t) - wrapper.On("GetPeer").Return(peer) - - nodes := [][32]byte{ - pid, - randomWord(), - randomWord(), - randomWord(), - } - - triggerCapID := randomWord() - targetCapID := randomWord() - dID := uint32(1) - // The below state describes a Workflow DON (AcceptsWorkflows = true), - // which isn't public (IsPublic = false), but hosts the - // the streams-trigger and write_chain capabilities. - // We expect no action to be taken by the syncer. - mr := &mockReader{ - s: state{ - IDsToDONs: map[donID]kcr.CapabilitiesRegistryDONInfo{ - donID(dID): { - Id: dID, - ConfigCount: uint32(0), - F: uint8(1), - IsPublic: false, - AcceptsWorkflows: true, - NodeP2PIds: nodes, - CapabilityConfigurations: []kcr.CapabilitiesRegistryCapabilityConfiguration{ - { - CapabilityId: triggerCapID, - Config: []byte(""), - }, - { - CapabilityId: targetCapID, - Config: []byte(""), - }, - }, - }, - }, - IDsToCapabilities: map[hashedCapabilityID]kcr.CapabilitiesRegistryCapabilityInfo{ - triggerCapID: { - LabelledName: "streams-trigger", - Version: "1.0.0", - CapabilityType: 0, - }, - targetCapID: { - LabelledName: "write-chain_evm_1", - Version: "1.0.0", - CapabilityType: 3, - }, - }, - IDsToNodes: map[p2ptypes.PeerID]kcr.CapabilitiesRegistryNodeInfo{ - nodes[0]: { - NodeOperatorId: 1, - Signer: randomWord(), - P2pId: nodes[0], - HashedCapabilityIds: [][32]byte{triggerCapID, targetCapID}, - }, - nodes[1]: { - NodeOperatorId: 1, - Signer: randomWord(), - P2pId: nodes[1], - HashedCapabilityIds: [][32]byte{triggerCapID, targetCapID}, - }, - nodes[2]: { - NodeOperatorId: 1, - Signer: randomWord(), - P2pId: nodes[2], - HashedCapabilityIds: [][32]byte{triggerCapID, targetCapID}, - }, - nodes[3]: { - NodeOperatorId: 1, - Signer: randomWord(), - P2pId: nodes[3], - HashedCapabilityIds: [][32]byte{triggerCapID, targetCapID}, - }, - }, - }, - } - syncer := newRegistrySyncer(make(services.StopChan), wrapper, registry, dispatcher, lggr, HardcodedDonNetworkSetup{}, mr) - require.NoError(t, err) - - // If the DON were public, this would fail with two errors: - // - error fetching the capabilities from the registry since they haven't been added - // - erroneous calls to dispatcher.SetReceiver, since the call hasn't been registered. - err = syncer.sync(ctx) - require.NoError(t, err) - defer syncer.Close() - - // Finally, assert that no services were added. - assert.Len(t, syncer.subServices, 0) -} - -func TestSyncer_WiresUpClientsForPublicWorkflowDON(t *testing.T) { - ctx := tests.Context(t) - lggr := logger.TestLogger(t) - registry := NewRegistry(lggr) - dispatcher := remoteMocks.NewDispatcher(t) - - var pid ragetypes.PeerID - err := pid.UnmarshalText([]byte("12D3KooWBCF1XT5Wi8FzfgNCqRL76Swv8TRU3TiD4QiJm8NMNX7N")) - require.NoError(t, err) - peer := mocks.NewPeer(t) - peer.On("UpdateConnections", mock.Anything).Return(nil) - peer.On("ID").Return(pid) - wrapper := mocks.NewPeerWrapper(t) - wrapper.On("GetPeer").Return(peer) - - workflowDonNodes := [][32]byte{ - pid, - randomWord(), - randomWord(), - randomWord(), - } - - capabilityDonNodes := [][32]byte{ - randomWord(), - randomWord(), - randomWord(), - randomWord(), - } - - fullTriggerCapID := "streams-trigger@1.0.0" - fullTargetID := "write-chain_evm_1@1.0.0" - triggerCapID := randomWord() - targetCapID := randomWord() - dID := uint32(1) - capDonID := uint32(2) - // The below state describes a Workflow DON (AcceptsWorkflows = true), - // which exposes the streams-trigger and write_chain capabilities. - // We expect receivers to be wired up and both capabilities to be added to the registry. - mr := &mockReader{ - s: state{ - IDsToDONs: map[donID]kcr.CapabilitiesRegistryDONInfo{ - donID(dID): { - Id: dID, - ConfigCount: uint32(0), - F: uint8(1), - IsPublic: true, - AcceptsWorkflows: true, - NodeP2PIds: workflowDonNodes, - }, - donID(capDonID): { - Id: capDonID, - ConfigCount: uint32(0), - F: uint8(1), - IsPublic: true, - AcceptsWorkflows: false, - NodeP2PIds: capabilityDonNodes, - CapabilityConfigurations: []kcr.CapabilitiesRegistryCapabilityConfiguration{ - { - CapabilityId: triggerCapID, - Config: []byte(""), - }, - { - CapabilityId: targetCapID, - Config: []byte(""), - }, - }, - }, - }, - IDsToCapabilities: map[hashedCapabilityID]kcr.CapabilitiesRegistryCapabilityInfo{ - triggerCapID: { - LabelledName: "streams-trigger", - Version: "1.0.0", - CapabilityType: 0, - }, - targetCapID: { - LabelledName: "write-chain_evm_1", - Version: "1.0.0", - CapabilityType: 3, - }, - }, - IDsToNodes: map[p2ptypes.PeerID]kcr.CapabilitiesRegistryNodeInfo{ - capabilityDonNodes[0]: { - NodeOperatorId: 1, - Signer: randomWord(), - P2pId: capabilityDonNodes[0], - HashedCapabilityIds: [][32]byte{triggerCapID, targetCapID}, - }, - capabilityDonNodes[1]: { - NodeOperatorId: 1, - Signer: randomWord(), - P2pId: capabilityDonNodes[1], - HashedCapabilityIds: [][32]byte{triggerCapID, targetCapID}, - }, - capabilityDonNodes[2]: { - NodeOperatorId: 1, - Signer: randomWord(), - P2pId: capabilityDonNodes[2], - HashedCapabilityIds: [][32]byte{triggerCapID, targetCapID}, - }, - capabilityDonNodes[3]: { - NodeOperatorId: 1, - Signer: randomWord(), - P2pId: capabilityDonNodes[3], - HashedCapabilityIds: [][32]byte{triggerCapID, targetCapID}, - }, - workflowDonNodes[0]: { - NodeOperatorId: 1, - Signer: randomWord(), - P2pId: workflowDonNodes[0], - }, - workflowDonNodes[1]: { - NodeOperatorId: 1, - Signer: randomWord(), - P2pId: workflowDonNodes[1], - }, - workflowDonNodes[2]: { - NodeOperatorId: 1, - Signer: randomWord(), - P2pId: workflowDonNodes[2], - }, - workflowDonNodes[3]: { - NodeOperatorId: 1, - Signer: randomWord(), - P2pId: workflowDonNodes[3], - }, - }, - }, - } - syncer := newRegistrySyncer(make(services.StopChan), wrapper, registry, dispatcher, lggr, HardcodedDonNetworkSetup{}, mr) - require.NoError(t, err) - - dispatcher.On("SetReceiver", fullTriggerCapID, fmt.Sprint(capDonID), mock.AnythingOfType("*remote.triggerSubscriber")).Return(nil) - dispatcher.On("SetReceiver", fullTargetID, fmt.Sprint(capDonID), mock.AnythingOfType("*target.client")).Return(nil) - - err = syncer.sync(ctx) - require.NoError(t, err) - defer syncer.Close() - - _, err = registry.Get(ctx, fullTriggerCapID) - require.NoError(t, err) - - _, err = registry.Get(ctx, fullTargetID) - require.NoError(t, err) -} - -func TestSyncer_WiresUpClientsForPublicWorkflowDONButIgnoresPrivateCapabilities(t *testing.T) { - ctx := tests.Context(t) - lggr := logger.TestLogger(t) - registry := NewRegistry(lggr) - dispatcher := remoteMocks.NewDispatcher(t) - - var pid ragetypes.PeerID - err := pid.UnmarshalText([]byte("12D3KooWBCF1XT5Wi8FzfgNCqRL76Swv8TRU3TiD4QiJm8NMNX7N")) - require.NoError(t, err) - peer := mocks.NewPeer(t) - peer.On("UpdateConnections", mock.Anything).Return(nil) - peer.On("ID").Return(pid) - wrapper := mocks.NewPeerWrapper(t) - wrapper.On("GetPeer").Return(peer) - - workflowDonNodes := [][32]byte{ - pid, - randomWord(), - randomWord(), - randomWord(), - } - - capabilityDonNodes := [][32]byte{ - randomWord(), - randomWord(), - randomWord(), - randomWord(), - } - - fullTriggerCapID := "streams-trigger@1.0.0" - triggerCapID := randomWord() - targetCapID := randomWord() - dID := uint32(1) - triggerCapDonID := uint32(2) - targetCapDonID := uint32(3) - // The below state describes a Workflow DON (AcceptsWorkflows = true), - // which exposes the streams-trigger and write_chain capabilities. - // We expect receivers to be wired up and both capabilities to be added to the registry. - mr := &mockReader{ - s: state{ - IDsToDONs: map[donID]kcr.CapabilitiesRegistryDONInfo{ - donID(dID): { - Id: dID, - ConfigCount: uint32(0), - F: uint8(1), - IsPublic: true, - AcceptsWorkflows: true, - NodeP2PIds: workflowDonNodes, - }, - donID(triggerCapDonID): { - Id: triggerCapDonID, - ConfigCount: uint32(0), - F: uint8(1), - IsPublic: true, - AcceptsWorkflows: false, - NodeP2PIds: capabilityDonNodes, - CapabilityConfigurations: []kcr.CapabilitiesRegistryCapabilityConfiguration{ - { - CapabilityId: triggerCapID, - Config: []byte(""), - }, - }, - }, - donID(targetCapDonID): { - Id: targetCapDonID, - ConfigCount: uint32(0), - F: uint8(1), - IsPublic: false, - AcceptsWorkflows: false, - NodeP2PIds: capabilityDonNodes, - CapabilityConfigurations: []kcr.CapabilitiesRegistryCapabilityConfiguration{ - { - CapabilityId: targetCapID, - Config: []byte(""), - }, - }, - }, - }, - IDsToCapabilities: map[hashedCapabilityID]kcr.CapabilitiesRegistryCapabilityInfo{ - triggerCapID: { - LabelledName: "streams-trigger", - Version: "1.0.0", - CapabilityType: 0, - }, - targetCapID: { - LabelledName: "write-chain_evm_1", - Version: "1.0.0", - CapabilityType: 3, - }, - }, - IDsToNodes: map[p2ptypes.PeerID]kcr.CapabilitiesRegistryNodeInfo{ - capabilityDonNodes[0]: { - NodeOperatorId: 1, - Signer: randomWord(), - P2pId: capabilityDonNodes[0], - HashedCapabilityIds: [][32]byte{triggerCapID, targetCapID}, - }, - capabilityDonNodes[1]: { - NodeOperatorId: 1, - Signer: randomWord(), - P2pId: capabilityDonNodes[1], - HashedCapabilityIds: [][32]byte{triggerCapID, targetCapID}, - }, - capabilityDonNodes[2]: { - NodeOperatorId: 1, - Signer: randomWord(), - P2pId: capabilityDonNodes[2], - HashedCapabilityIds: [][32]byte{triggerCapID, targetCapID}, - }, - capabilityDonNodes[3]: { - NodeOperatorId: 1, - Signer: randomWord(), - P2pId: capabilityDonNodes[3], - HashedCapabilityIds: [][32]byte{triggerCapID, targetCapID}, - }, - workflowDonNodes[0]: { - NodeOperatorId: 1, - Signer: randomWord(), - P2pId: workflowDonNodes[0], - }, - workflowDonNodes[1]: { - NodeOperatorId: 1, - Signer: randomWord(), - P2pId: workflowDonNodes[1], - }, - workflowDonNodes[2]: { - NodeOperatorId: 1, - Signer: randomWord(), - P2pId: workflowDonNodes[2], - }, - workflowDonNodes[3]: { - NodeOperatorId: 1, - Signer: randomWord(), - P2pId: workflowDonNodes[3], - }, - }, - }, - } - syncer := newRegistrySyncer(make(services.StopChan), wrapper, registry, dispatcher, lggr, HardcodedDonNetworkSetup{}, mr) - require.NoError(t, err) - - dispatcher.On("SetReceiver", fullTriggerCapID, fmt.Sprint(triggerCapDonID), mock.AnythingOfType("*remote.triggerSubscriber")).Return(nil) - - err = syncer.sync(ctx) - require.NoError(t, err) - defer syncer.Close() - - _, err = registry.Get(ctx, fullTriggerCapID) - require.NoError(t, err) -} diff --git a/core/services/chainlink/application.go b/core/services/chainlink/application.go index c885a3b2104..1a626bc07fc 100644 --- a/core/services/chainlink/application.go +++ b/core/services/chainlink/application.go @@ -59,6 +59,7 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services/periodicbackup" "github.com/smartcontractkit/chainlink/v2/core/services/pipeline" "github.com/smartcontractkit/chainlink/v2/core/services/promreporter" + "github.com/smartcontractkit/chainlink/v2/core/services/registrysyncer" "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury" "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/wsrpc" "github.com/smartcontractkit/chainlink/v2/core/services/streams" @@ -214,11 +215,6 @@ func NewApplication(opts ApplicationOpts) (Application, error) { srvcs = append(srvcs, externalPeerWrapper) - networkSetup, err := capabilities.NewHardcodedDonNetworkSetup() - if err != nil { - return nil, fmt.Errorf("failed to create hardcoded Don network setup: %w", err) - } - dispatcher := remote.NewDispatcher(externalPeerWrapper, signer, opts.CapabilitiesRegistry, globalLogger) rid := cfg.Capabilities().ExternalRegistry().RelayID() @@ -228,12 +224,8 @@ func NewApplication(opts ApplicationOpts) (Application, error) { return nil, fmt.Errorf("could not fetch relayer %s configured for capabilities registry: %w", rid, err) } - registrySyncer, err := capabilities.NewRegistrySyncer( - externalPeerWrapper, - opts.CapabilitiesRegistry, - dispatcher, + registrySyncer, err := registrysyncer.New( globalLogger, - networkSetup, relayer, registryAddress, ) @@ -241,8 +233,16 @@ func NewApplication(opts ApplicationOpts) (Application, error) { return nil, fmt.Errorf("could not configure syncer: %w", err) } - getLocalNode = registrySyncer.LocalNode - srvcs = append(srvcs, dispatcher, registrySyncer) + wfLauncher := capabilities.NewLauncher( + globalLogger, + externalPeerWrapper, + dispatcher, + opts.CapabilitiesRegistry, + ) + registrySyncer.AddLauncher(wfLauncher) + + getLocalNode = wfLauncher.LocalNode + srvcs = append(srvcs, dispatcher, wfLauncher, registrySyncer) } // LOOPs can be created as options, in the case of LOOP relayers, or diff --git a/core/services/registrysyncer/syncer.go b/core/services/registrysyncer/syncer.go new file mode 100644 index 00000000000..e6a54078738 --- /dev/null +++ b/core/services/registrysyncer/syncer.go @@ -0,0 +1,250 @@ +package registrysyncer + +import ( + "context" + "encoding/json" + "fmt" + "sync" + "time" + + "github.com/smartcontractkit/chainlink-common/pkg/services" + "github.com/smartcontractkit/chainlink-common/pkg/types" + + kcr "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/keystone/generated/capabilities_registry" + "github.com/smartcontractkit/chainlink/v2/core/logger" + p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types" + evmrelaytypes "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/types" +) + +type HashedCapabilityID [32]byte +type DonID uint32 + +type State struct { + IDsToDONs map[DonID]kcr.CapabilitiesRegistryDONInfo + IDsToNodes map[p2ptypes.PeerID]kcr.CapabilitiesRegistryNodeInfo + IDsToCapabilities map[HashedCapabilityID]kcr.CapabilitiesRegistryCapabilityInfo +} + +type Launcher interface { + Launch(ctx context.Context, state State) error +} + +type Syncer interface { + services.Service + AddLauncher(h ...Launcher) +} + +type registrySyncer struct { + stopCh services.StopChan + launchers []Launcher + reader types.ContractReader + + wg sync.WaitGroup + lggr logger.Logger + mu sync.RWMutex +} + +var _ services.Service = ®istrySyncer{} + +var ( + defaultTickInterval = 12 * time.Second +) + +// New instantiates a new RegistrySyncer +func New( + lggr logger.Logger, + relayer contractReaderFactory, + registryAddress string, +) (*registrySyncer, error) { + stopCh := make(services.StopChan) + ctx, _ := stopCh.NewCtx() + reader, err := newReader(ctx, lggr, relayer, registryAddress) + if err != nil { + return nil, err + } + + return newSyncer( + stopCh, + lggr.Named("RegistrySyncer"), + reader, + ), nil +} + +type contractReaderFactory interface { + NewContractReader(context.Context, []byte) (types.ContractReader, error) +} + +func newReader(ctx context.Context, lggr logger.Logger, relayer contractReaderFactory, remoteRegistryAddress string) (types.ContractReader, error) { + contractReaderConfig := evmrelaytypes.ChainReaderConfig{ + Contracts: map[string]evmrelaytypes.ChainContractReader{ + "CapabilitiesRegistry": { + ContractABI: kcr.CapabilitiesRegistryABI, + Configs: map[string]*evmrelaytypes.ChainReaderDefinition{ + "getDONs": { + ChainSpecificName: "getDONs", + }, + "getCapabilities": { + ChainSpecificName: "getCapabilities", + }, + "getNodes": { + ChainSpecificName: "getNodes", + }, + }, + }, + }, + } + + contractReaderConfigEncoded, err := json.Marshal(contractReaderConfig) + if err != nil { + return nil, err + } + + cr, err := relayer.NewContractReader(ctx, contractReaderConfigEncoded) + if err != nil { + return nil, err + } + + err = cr.Bind(ctx, []types.BoundContract{ + { + Address: remoteRegistryAddress, + Name: "CapabilitiesRegistry", + }, + }) + + return cr, err +} + +func newSyncer( + stopCh services.StopChan, + lggr logger.Logger, + reader types.ContractReader, +) *registrySyncer { + return ®istrySyncer{ + stopCh: stopCh, + lggr: lggr, + reader: reader, + } +} + +func (s *registrySyncer) Start(ctx context.Context) error { + s.wg.Add(1) + go func() { + defer s.wg.Done() + s.syncLoop() + }() + return nil +} + +func (s *registrySyncer) syncLoop() { + ctx, cancel := s.stopCh.NewCtx() + defer cancel() + + ticker := time.NewTicker(defaultTickInterval) + defer ticker.Stop() + + // Sync for a first time outside the loop; this means we'll start a remote + // sync immediately once spinning up syncLoop, as by default a ticker will + // fire for the first time at T+N, where N is the interval. + s.lggr.Debug("starting initial sync with remote registry") + err := s.sync(ctx) + if err != nil { + s.lggr.Errorw("failed to sync with remote registry", "error", err) + } + + for { + select { + case <-s.stopCh: + return + case <-ticker.C: + s.lggr.Debug("starting regular sync with the remote registry") + err := s.sync(ctx) + if err != nil { + s.lggr.Errorw("failed to sync with remote registry", "error", err) + } + } + } +} + +func (s *registrySyncer) state(ctx context.Context) (State, error) { + dons := []kcr.CapabilitiesRegistryDONInfo{} + err := s.reader.GetLatestValue(ctx, "CapabilitiesRegistry", "getDONs", nil, &dons) + if err != nil { + return State{}, err + } + + idsToDONs := map[DonID]kcr.CapabilitiesRegistryDONInfo{} + for _, d := range dons { + idsToDONs[DonID(d.Id)] = d + } + + caps := []kcr.CapabilitiesRegistryCapabilityInfo{} + err = s.reader.GetLatestValue(ctx, "CapabilitiesRegistry", "getCapabilities", nil, &caps) + if err != nil { + return State{}, err + } + + idsToCapabilities := map[HashedCapabilityID]kcr.CapabilitiesRegistryCapabilityInfo{} + for _, c := range caps { + idsToCapabilities[c.HashedId] = c + } + + nodes := []kcr.CapabilitiesRegistryNodeInfo{} + err = s.reader.GetLatestValue(ctx, "CapabilitiesRegistry", "getNodes", nil, &nodes) + if err != nil { + return State{}, err + } + + idsToNodes := map[p2ptypes.PeerID]kcr.CapabilitiesRegistryNodeInfo{} + for _, node := range nodes { + idsToNodes[node.P2pId] = node + } + + return State{IDsToDONs: idsToDONs, IDsToCapabilities: idsToCapabilities, IDsToNodes: idsToNodes}, nil +} + +func (s *registrySyncer) sync(ctx context.Context) error { + s.mu.RLock() + defer s.mu.RUnlock() + + if len(s.launchers) == 0 { + s.lggr.Warn("sync called, but no launchers are registered; nooping") + return nil + } + + state, err := s.state(ctx) + if err != nil { + return fmt.Errorf("failed to sync with remote registry: %w", err) + } + + for _, h := range s.launchers { + if err := h.Launch(ctx, state); err != nil { + s.lggr.Errorf("error calling launcher: %s", err) + } + } + + return nil +} + +func (s *registrySyncer) AddLauncher(launchers ...Launcher) { + s.mu.Lock() + defer s.mu.Unlock() + s.launchers = append(s.launchers, launchers...) +} + +func (s *registrySyncer) Close() error { + close(s.stopCh) + s.wg.Wait() + return nil +} + +func (s *registrySyncer) Ready() error { + return nil +} + +func (s *registrySyncer) HealthReport() map[string]error { + return nil +} + +func (s *registrySyncer) Name() string { + return "RegistrySyncer" +} diff --git a/core/capabilities/reader_test.go b/core/services/registrysyncer/syncer_test.go similarity index 90% rename from core/capabilities/reader_test.go rename to core/services/registrysyncer/syncer_test.go index cc61628c541..6804e4bec44 100644 --- a/core/capabilities/reader_test.go +++ b/core/services/registrysyncer/syncer_test.go @@ -1,4 +1,4 @@ -package capabilities +package registrysyncer import ( "context" @@ -17,7 +17,6 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "github.com/smartcontractkit/chainlink-common/pkg/services" "github.com/smartcontractkit/chainlink-common/pkg/types" evmclient "github.com/smartcontractkit/chainlink/v2/core/chains/evm/client" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller" @@ -39,7 +38,8 @@ var writeChainCapability = kcr.CapabilitiesRegistryCapability{ func startNewChainWithRegistry(t *testing.T) (*kcr.CapabilitiesRegistry, common.Address, *bind.TransactOpts, *backends.SimulatedBackend) { owner := testutils.MustNewSimTransactor(t) - oneEth, _ := new(big.Int).SetString("100000000000000000000", 10) + i := &big.Int{} + oneEth, _ := i.SetString("100000000000000000000", 10) gasLimit := ethconfig.Defaults.Miner.GasCeil * 2 // 60 M blocks simulatedBackend := backends.NewSimulatedBackend(core.GenesisAlloc{owner.From: { @@ -111,24 +111,6 @@ func randomWord() [32]byte { return [32]byte(word) } -type mockWrapper struct { - services.Service - peer p2ptypes.Peer -} - -func (m mockWrapper) GetPeer() p2ptypes.Peer { - return m.peer -} - -type mockPeer struct { - p2ptypes.Peer - peerID p2ptypes.PeerID -} - -func (m mockPeer) ID() p2ptypes.PeerID { - return m.peerID -} - func TestReader_Integration(t *testing.T) { ctx := testutils.Context(t) reg, regAddress, owner, sim := startNewChainWithRegistry(t) @@ -205,12 +187,7 @@ func TestReader_Integration(t *testing.T) { require.NoError(t, err) factory := newContractReaderFactory(t, sim) - pw := mockWrapper{ - peer: mockPeer{ - peerID: nodeSet[0], - }, - } - reader, err := newRemoteRegistryReader(ctx, logger.TestLogger(t), pw, factory, regAddress.Hex()) + reader, err := New(logger.TestLogger(t), factory, regAddress.Hex()) require.NoError(t, err) s, err := reader.state(ctx) @@ -277,10 +254,4 @@ func TestReader_Integration(t *testing.T) { nodeSet[1]: nodesInfo[1], nodeSet[2]: nodesInfo[2], }, s.IDsToNodes) - - node, err := reader.LocalNode(ctx) - require.NoError(t, err) - - assert.Equal(t, p2ptypes.PeerID(nodeSet[0]), *node.PeerID) - assert.Equal(t, fmt.Sprint(1), node.WorkflowDON.ID) }