diff --git a/core/capabilities/launcher.go b/core/capabilities/launcher.go index f2bfd5e4b16..1d309816c1c 100644 --- a/core/capabilities/launcher.go +++ b/core/capabilities/launcher.go @@ -10,15 +10,14 @@ import ( "google.golang.org/protobuf/proto" - "github.com/smartcontractkit/chainlink-common/pkg/capabilities" - "github.com/smartcontractkit/chainlink-common/pkg/capabilities/triggers" - "github.com/smartcontractkit/chainlink-common/pkg/services" - "github.com/smartcontractkit/chainlink-common/pkg/values" - "github.com/smartcontractkit/libocr/ragep2p" ragetypes "github.com/smartcontractkit/libocr/ragep2p/types" + "github.com/smartcontractkit/chainlink-common/pkg/capabilities" capabilitiespb "github.com/smartcontractkit/chainlink-common/pkg/capabilities/pb" + "github.com/smartcontractkit/chainlink-common/pkg/capabilities/triggers" + "github.com/smartcontractkit/chainlink-common/pkg/services" + "github.com/smartcontractkit/chainlink-common/pkg/values" "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote" "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/target" remotetypes "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types" @@ -253,24 +252,25 @@ func (w *launcher) addRemoteCapabilities(ctx context.Context, myDON registrysync switch capability.CapabilityType { case capabilities.CapabilityTypeTrigger: newTriggerFn := func(info capabilities.CapabilityInfo) (capabilityService, error) { - if !strings.HasPrefix(info.ID, "streams-trigger") { - return nil, errors.New("not supported: trigger capability does not have id = streams-trigger") - } - - codec := streams.NewCodec(w.lggr) - - signers, err := signersFor(remoteDON, state) - if err != nil { - return nil, err + var aggregator remotetypes.Aggregator + if strings.HasPrefix(info.ID, "streams-trigger") { + codec := streams.NewCodec(w.lggr) + + signers, err := signersFor(remoteDON, state) + if err != nil { + return nil, err + } + + aggregator = triggers.NewMercuryRemoteAggregator( + codec, + signers, + int(remoteDON.F+1), + w.lggr, + ) + } else { + aggregator = remote.NewDefaultModeAggregator(uint32(remoteDON.F) + 1) } - aggregator := triggers.NewMercuryRemoteAggregator( - codec, - signers, - int(remoteDON.F+1), - w.lggr, - ) - // TODO: We need to implement a custom, Mercury-specific // aggregator here, because there is no guarantee that // all trigger events in the workflow will have the same diff --git a/core/capabilities/launcher_test.go b/core/capabilities/launcher_test.go index 425c6815a43..c3bbf71e7f5 100644 --- a/core/capabilities/launcher_test.go +++ b/core/capabilities/launcher_test.go @@ -15,12 +15,16 @@ import ( ragetypes "github.com/smartcontractkit/libocr/ragep2p/types" "github.com/smartcontractkit/chainlink-common/pkg/capabilities" + "github.com/smartcontractkit/chainlink-common/pkg/capabilities/pb" capabilitiespb "github.com/smartcontractkit/chainlink-common/pkg/capabilities/pb" "github.com/smartcontractkit/chainlink-common/pkg/utils/tests" + "github.com/smartcontractkit/chainlink-common/pkg/values" "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote" + remotetypes "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types" remoteMocks "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types/mocks" kcr "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/keystone/generated/capabilities_registry" "github.com/smartcontractkit/chainlink/v2/core/logger" + "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types" p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types" "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types/mocks" "github.com/smartcontractkit/chainlink/v2/core/services/registrysyncer" @@ -184,6 +188,234 @@ func TestLauncher_WiresUpExternalCapabilities(t *testing.T) { defer launcher.Close() } +func newTriggerEventMsg(t *testing.T, + senderPeerID types.PeerID, + workflowID string, + triggerEvent map[string]any, + triggerEventID string) (*remotetypes.MessageBody, *values.Map) { + triggerEventValue, err := values.NewMap(triggerEvent) + require.NoError(t, err) + capResponse := capabilities.TriggerResponse{ + Event: capabilities.TriggerEvent{ + Outputs: triggerEventValue, + ID: triggerEventID, + }, + Err: nil, + } + marshaled, err := pb.MarshalTriggerResponse(capResponse) + require.NoError(t, err) + return &remotetypes.MessageBody{ + Sender: senderPeerID[:], + Method: remotetypes.MethodTriggerEvent, + Metadata: &remotetypes.MessageBody_TriggerEventMetadata{ + TriggerEventMetadata: &remotetypes.TriggerEventMetadata{ + WorkflowIds: []string{workflowID}, + }, + }, + Payload: marshaled, + }, triggerEventValue +} + +func TestLauncher_RemoteTriggerModeAggregatorShim(t *testing.T) { + ctx := tests.Context(t) + lggr := logger.TestLogger(t) + registry := NewRegistry(lggr) + dispatcher := remoteMocks.NewDispatcher(t) + + var pid ragetypes.PeerID + err := pid.UnmarshalText([]byte("12D3KooWBCF1XT5Wi8FzfgNCqRL76Swv8TRU3TiD4QiJm8NMNX7N")) + require.NoError(t, err) + peer := mocks.NewPeer(t) + peer.On("UpdateConnections", mock.Anything).Return(nil) + peer.On("ID").Return(pid) + wrapper := mocks.NewPeerWrapper(t) + wrapper.On("GetPeer").Return(peer) + + workflowDonNodes := []ragetypes.PeerID{ + pid, + randomWord(), + randomWord(), + randomWord(), + } + + capabilityDonNodes := []ragetypes.PeerID{ + randomWord(), + randomWord(), + randomWord(), + randomWord(), + } + + fullTriggerCapID := "log-event-trigger-evm-43113@1.0.0" + fullTargetID := "write-chain_evm_1@1.0.0" + triggerCapID := randomWord() + targetCapID := randomWord() + dID := uint32(1) + capDonID := uint32(2) + // The below state describes a Workflow DON (AcceptsWorkflows = true), + // which exposes the log-event-trigger and write_chain capabilities. + // We expect receivers to be wired up and both capabilities to be added to the registry. + rtc := &capabilities.RemoteTriggerConfig{} + rtc.ApplyDefaults() + + cfg, err := proto.Marshal(&capabilitiespb.CapabilityConfig{ + RemoteConfig: &capabilitiespb.CapabilityConfig_RemoteTriggerConfig{ + RemoteTriggerConfig: &capabilitiespb.RemoteTriggerConfig{ + RegistrationRefresh: durationpb.New(1 * time.Second), + MinResponsesToAggregate: 3, + }, + }, + }) + require.NoError(t, err) + + state := ®istrysyncer.LocalRegistry{ + IDsToDONs: map[registrysyncer.DonID]registrysyncer.DON{ + registrysyncer.DonID(dID): { + DON: capabilities.DON{ + ID: dID, + ConfigVersion: uint32(0), + F: uint8(1), + IsPublic: true, + AcceptsWorkflows: true, + Members: workflowDonNodes, + }, + }, + registrysyncer.DonID(capDonID): { + DON: capabilities.DON{ + ID: capDonID, + ConfigVersion: uint32(0), + F: uint8(1), + IsPublic: true, + AcceptsWorkflows: false, + Members: capabilityDonNodes, + }, + CapabilityConfigurations: map[string]registrysyncer.CapabilityConfiguration{ + fullTriggerCapID: { + Config: cfg, + }, + fullTargetID: { + Config: cfg, + }, + }, + }, + }, + IDsToCapabilities: map[string]registrysyncer.Capability{ + fullTriggerCapID: { + ID: fullTriggerCapID, + CapabilityType: capabilities.CapabilityTypeTrigger, + }, + fullTargetID: { + ID: fullTargetID, + CapabilityType: capabilities.CapabilityTypeTarget, + }, + }, + IDsToNodes: map[p2ptypes.PeerID]kcr.CapabilitiesRegistryNodeInfo{ + capabilityDonNodes[0]: { + NodeOperatorId: 1, + Signer: randomWord(), + P2pId: capabilityDonNodes[0], + HashedCapabilityIds: [][32]byte{triggerCapID, targetCapID}, + }, + capabilityDonNodes[1]: { + NodeOperatorId: 1, + Signer: randomWord(), + P2pId: capabilityDonNodes[1], + HashedCapabilityIds: [][32]byte{triggerCapID, targetCapID}, + }, + capabilityDonNodes[2]: { + NodeOperatorId: 1, + Signer: randomWord(), + P2pId: capabilityDonNodes[2], + HashedCapabilityIds: [][32]byte{triggerCapID, targetCapID}, + }, + capabilityDonNodes[3]: { + NodeOperatorId: 1, + Signer: randomWord(), + P2pId: capabilityDonNodes[3], + HashedCapabilityIds: [][32]byte{triggerCapID, targetCapID}, + }, + workflowDonNodes[0]: { + NodeOperatorId: 1, + Signer: randomWord(), + P2pId: workflowDonNodes[0], + }, + workflowDonNodes[1]: { + NodeOperatorId: 1, + Signer: randomWord(), + P2pId: workflowDonNodes[1], + }, + workflowDonNodes[2]: { + NodeOperatorId: 1, + Signer: randomWord(), + P2pId: workflowDonNodes[2], + }, + workflowDonNodes[3]: { + NodeOperatorId: 1, + Signer: randomWord(), + P2pId: workflowDonNodes[3], + }, + }, + } + + launcher := NewLauncher( + lggr, + wrapper, + dispatcher, + registry, + ) + + dispatcher.On("SetReceiver", fullTriggerCapID, capDonID, mock.AnythingOfType("*remote.triggerSubscriber")).Return(nil) + dispatcher.On("SetReceiver", fullTargetID, capDonID, mock.AnythingOfType("*target.client")).Return(nil) + awaitRegistrationMessageCh := make(chan struct{}) + dispatcher.On("Send", mock.Anything, mock.Anything).Return(nil).Run(func(args mock.Arguments) { + select { + case awaitRegistrationMessageCh <- struct{}{}: + default: + } + }) + + err = launcher.Launch(ctx, state) + require.NoError(t, err) + defer launcher.Close() + + baseCapability, err := registry.Get(ctx, fullTriggerCapID) + require.NoError(t, err) + + remoteTriggerSubscriber, ok := baseCapability.(remote.TriggerSubscriber) + require.True(t, ok, "remote trigger capability") + + // Register trigger + workflowID1 := "15c631d295ef5e32deb99a10ee6804bc4af13855687559d7ff6552ac6dbb2ce0" + workflowExecutionID1 := "95ef5e32deb99a10ee6804bc4af13855687559d7ff6552ac6dbb2ce0abbadeed" + req := capabilities.TriggerRegistrationRequest{ + TriggerID: "logeventtrigger_log1", + Metadata: capabilities.RequestMetadata{ + ReferenceID: "logeventtrigger", + WorkflowID: workflowID1, + WorkflowExecutionID: workflowExecutionID1, + }, + } + triggerEventCallbackCh, err := remoteTriggerSubscriber.RegisterTrigger(ctx, req) + require.NoError(t, err) + <-awaitRegistrationMessageCh + + // Receive trigger event + triggerEvent1 := map[string]any{"event": "triggerEvent1"} + triggerEvent2 := map[string]any{"event": "triggerEvent2"} + triggerEventMsg1, triggerEventValue := newTriggerEventMsg(t, capabilityDonNodes[0], workflowID1, triggerEvent1, "TriggerEventID1") + triggerEventMsg2, _ := newTriggerEventMsg(t, capabilityDonNodes[1], workflowID1, triggerEvent1, "TriggerEventID1") + // One Faulty Node (F = 1) sending bad event data for the same TriggerEventID1 + triggerEventMsg3, _ := newTriggerEventMsg(t, capabilityDonNodes[2], workflowID1, triggerEvent2, "TriggerEventID1") + remoteTriggerSubscriber.Receive(ctx, triggerEventMsg1) + remoteTriggerSubscriber.Receive(ctx, triggerEventMsg2) + remoteTriggerSubscriber.Receive(ctx, triggerEventMsg3) + + // After MinResponsesToAggregate, we should get a response + response := <-triggerEventCallbackCh + + // Checks if response is same as minIdenticalResponses = F + 1, F = 1 + require.Equal(t, response.Event.Outputs, triggerEventValue) +} + func TestSyncer_IgnoresCapabilitiesForPrivateDON(t *testing.T) { ctx := tests.Context(t) lggr := logger.TestLogger(t) diff --git a/core/capabilities/remote/trigger_subscriber.go b/core/capabilities/remote/trigger_subscriber.go index 967b59258ae..9f40c6c1f51 100644 --- a/core/capabilities/remote/trigger_subscriber.go +++ b/core/capabilities/remote/trigger_subscriber.go @@ -47,6 +47,11 @@ type subRegState struct { rawRequest []byte } +type TriggerSubscriber interface { + commoncap.TriggerCapability + Receive(ctx context.Context, msg *types.MessageBody) +} + var _ commoncap.TriggerCapability = &triggerSubscriber{} var _ types.Receiver = &triggerSubscriber{} var _ services.Service = &triggerSubscriber{}