From a01ed0ff111d29857abcffc6ef4769552ce6c3e2 Mon Sep 17 00:00:00 2001 From: Matthew Pendrey Date: Wed, 18 Dec 2024 18:52:38 +0000 Subject: [PATCH] Preperation for register unregister execution capability (#15760) * common bump * move aggregator * message cache to own package * lint * mod tidy * lint * generate * revert dispatcher revert * dispatcher mock --- core/capabilities/launcher.go | 3 +- .../remote/aggregation/default_mode.go | 58 ++++++++++++++ .../remote/aggregation/default_mode_test.go | 51 ++++++++++++ core/capabilities/remote/dispatcher.go | 28 +++---- core/capabilities/remote/dispatcher_test.go | 30 ++++---- .../remote/executable/endtoend_test.go | 34 +------- .../executable/request/client_request.go | 2 +- .../executable/request/client_request_test.go | 14 ++-- .../executable/request/server_request.go | 8 +- .../executable/request/server_request_test.go | 4 +- core/capabilities/remote/executable/server.go | 21 ++--- .../{ => messagecache}/message_cache.go | 17 ++-- .../{ => messagecache}/message_cache_test.go | 32 ++++---- core/capabilities/remote/trigger_publisher.go | 34 ++++---- .../capabilities/remote/trigger_subscriber.go | 30 ++++---- .../remote/trigger_subscriber_test.go | 1 - .../remote/types/mocks/dispatcher.go | 34 ++++---- core/capabilities/remote/types/types.go | 4 +- core/capabilities/remote/utils.go | 59 ++------------ core/capabilities/remote/utils_test.go | 77 +++++-------------- core/scripts/go.mod | 2 +- core/scripts/go.sum | 4 +- deployment/go.mod | 2 +- deployment/go.sum | 4 +- go.mod | 2 +- go.sum | 4 +- integration-tests/go.mod | 2 +- integration-tests/go.sum | 4 +- integration-tests/load/go.mod | 2 +- integration-tests/load/go.sum | 4 +- 30 files changed, 287 insertions(+), 284 deletions(-) create mode 100644 core/capabilities/remote/aggregation/default_mode.go create mode 100644 core/capabilities/remote/aggregation/default_mode_test.go rename core/capabilities/remote/{ => messagecache}/message_cache.go (81%) rename core/capabilities/remote/{ => messagecache}/message_cache_test.go (59%) diff --git a/core/capabilities/launcher.go b/core/capabilities/launcher.go index 98318853e2a..51df7eeebfc 100644 --- a/core/capabilities/launcher.go +++ b/core/capabilities/launcher.go @@ -19,6 +19,7 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/services" "github.com/smartcontractkit/chainlink-common/pkg/values" "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote" + "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/aggregation" "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/executable" remotetypes "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types" "github.com/smartcontractkit/chainlink/v2/core/capabilities/streams" @@ -280,7 +281,7 @@ func (w *launcher) addRemoteCapabilities(ctx context.Context, myDON registrysync w.lggr, ) } else { - aggregator = remote.NewDefaultModeAggregator(uint32(remoteDON.F) + 1) + aggregator = aggregation.NewDefaultModeAggregator(uint32(remoteDON.F) + 1) } // TODO: We need to implement a custom, Mercury-specific diff --git a/core/capabilities/remote/aggregation/default_mode.go b/core/capabilities/remote/aggregation/default_mode.go new file mode 100644 index 00000000000..3d5e262920f --- /dev/null +++ b/core/capabilities/remote/aggregation/default_mode.go @@ -0,0 +1,58 @@ +package aggregation + +import ( + "crypto/sha256" + "encoding/hex" + "errors" + "fmt" + + commoncap "github.com/smartcontractkit/chainlink-common/pkg/capabilities" + "github.com/smartcontractkit/chainlink-common/pkg/capabilities/pb" + remotetypes "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types" +) + +// Default MODE Aggregator needs a configurable number of identical responses for aggregation to succeed +type defaultModeAggregator struct { + minIdenticalResponses uint32 +} + +var _ remotetypes.Aggregator = &defaultModeAggregator{} + +func NewDefaultModeAggregator(minIdenticalResponses uint32) *defaultModeAggregator { + return &defaultModeAggregator{ + minIdenticalResponses: minIdenticalResponses, + } +} + +func (a *defaultModeAggregator) Aggregate(_ string, responses [][]byte) (commoncap.TriggerResponse, error) { + found, err := AggregateModeRaw(responses, a.minIdenticalResponses) + if err != nil { + return commoncap.TriggerResponse{}, fmt.Errorf("failed to aggregate responses, err: %w", err) + } + + unmarshaled, err := pb.UnmarshalTriggerResponse(found) + if err != nil { + return commoncap.TriggerResponse{}, fmt.Errorf("failed to unmarshal aggregated responses, err: %w", err) + } + return unmarshaled, nil +} + +func AggregateModeRaw(elemList [][]byte, minIdenticalResponses uint32) ([]byte, error) { + hashToCount := make(map[string]uint32) + var found []byte + for _, elem := range elemList { + hasher := sha256.New() + hasher.Write(elem) + sha := hex.EncodeToString(hasher.Sum(nil)) + hashToCount[sha]++ + if hashToCount[sha] >= minIdenticalResponses { + found = elem + // update in case we find another elem with an even higher count + minIdenticalResponses = hashToCount[sha] + } + } + if found == nil { + return nil, errors.New("not enough identical responses found") + } + return found, nil +} diff --git a/core/capabilities/remote/aggregation/default_mode_test.go b/core/capabilities/remote/aggregation/default_mode_test.go new file mode 100644 index 00000000000..7c7d615e17a --- /dev/null +++ b/core/capabilities/remote/aggregation/default_mode_test.go @@ -0,0 +1,51 @@ +package aggregation + +import ( + "testing" + + "github.com/stretchr/testify/require" + + commoncap "github.com/smartcontractkit/chainlink-common/pkg/capabilities" + "github.com/smartcontractkit/chainlink-common/pkg/capabilities/pb" + "github.com/smartcontractkit/chainlink-common/pkg/values" +) + +var ( + triggerEvent1 = map[string]any{"event": "triggerEvent1"} + triggerEvent2 = map[string]any{"event": "triggerEvent2"} +) + +func TestDefaultModeAggregator_Aggregate(t *testing.T) { + val, err := values.NewMap(triggerEvent1) + require.NoError(t, err) + capResponse1 := commoncap.TriggerResponse{ + Event: commoncap.TriggerEvent{ + Outputs: val, + }, + Err: nil, + } + marshaled1, err := pb.MarshalTriggerResponse(capResponse1) + require.NoError(t, err) + + val2, err := values.NewMap(triggerEvent2) + require.NoError(t, err) + capResponse2 := commoncap.TriggerResponse{ + Event: commoncap.TriggerEvent{ + Outputs: val2, + }, + Err: nil, + } + marshaled2, err := pb.MarshalTriggerResponse(capResponse2) + require.NoError(t, err) + + agg := NewDefaultModeAggregator(2) + _, err = agg.Aggregate("", [][]byte{marshaled1}) + require.Error(t, err) + + _, err = agg.Aggregate("", [][]byte{marshaled1, marshaled2}) + require.Error(t, err) + + res, err := agg.Aggregate("", [][]byte{marshaled1, marshaled2, marshaled1}) + require.NoError(t, err) + require.Equal(t, res, capResponse1) +} diff --git a/core/capabilities/remote/dispatcher.go b/core/capabilities/remote/dispatcher.go index e3229d35c1e..905eda9d72d 100644 --- a/core/capabilities/remote/dispatcher.go +++ b/core/capabilities/remote/dispatcher.go @@ -3,6 +3,7 @@ package remote import ( "context" "fmt" + "strconv" "sync" "time" @@ -42,8 +43,8 @@ type dispatcher struct { } type key struct { - capId string - donId uint32 + capID string + donID uint32 } var _ services.Service = &dispatcher{} @@ -74,7 +75,7 @@ func (d *dispatcher) Start(ctx context.Context) error { d.peer = d.peerWrapper.GetPeer() d.peerID = d.peer.ID() if d.peer == nil { - return fmt.Errorf("peer is not initialized") + return errors.New("peer is not initialized") } d.wg.Add(1) go func() { @@ -103,13 +104,13 @@ type receiver struct { ch chan *types.MessageBody } -func (d *dispatcher) SetReceiver(capabilityId string, donId uint32, rec types.Receiver) error { +func (d *dispatcher) SetReceiver(capabilityID string, donID uint32, rec types.Receiver) error { d.mu.Lock() defer d.mu.Unlock() - k := key{capabilityId, donId} + k := key{capabilityID, donID} _, ok := d.receivers[k] if ok { - return fmt.Errorf("%w: receiver already exists for capability %s and don %d", ErrReceiverExists, capabilityId, donId) + return fmt.Errorf("%w: receiver already exists for capability %s and don %d", ErrReceiverExists, capabilityID, donID) } receiverCh := make(chan *types.MessageBody, d.cfg.ReceiverBufferSize()) @@ -134,23 +135,24 @@ func (d *dispatcher) SetReceiver(capabilityId string, donId uint32, rec types.Re ch: receiverCh, } - d.lggr.Debugw("receiver set", "capabilityId", capabilityId, "donId", donId) + d.lggr.Debugw("receiver set", "capabilityId", capabilityID, "donId", donID) return nil } -func (d *dispatcher) RemoveReceiver(capabilityId string, donId uint32) { +func (d *dispatcher) RemoveReceiver(capabilityID string, donID uint32) { d.mu.Lock() defer d.mu.Unlock() - receiverKey := key{capabilityId, donId} + receiverKey := key{capabilityID, donID} if receiver, ok := d.receivers[receiverKey]; ok { receiver.cancel() delete(d.receivers, receiverKey) - d.lggr.Debugw("receiver removed", "capabilityId", capabilityId, "donId", donId) + d.lggr.Debugw("receiver removed", "capabilityId", capabilityID, "donId", donID) } } func (d *dispatcher) Send(peerID p2ptypes.PeerID, msgBody *types.MessageBody) error { + //nolint:gosec // disable G115 msgBody.Version = uint32(d.cfg.SupportedVersion()) msgBody.Sender = d.peerID[:] msgBody.Receiver = peerID[:] @@ -194,17 +196,17 @@ func (d *dispatcher) receive() { receiver, ok := d.receivers[k] d.mu.RUnlock() if !ok { - d.lggr.Debugw("received message for unregistered capability", "capabilityId", SanitizeLogString(k.capId), "donId", k.donId) + d.lggr.Debugw("received message for unregistered capability", "capabilityId", SanitizeLogString(k.capID), "donId", k.donID) d.tryRespondWithError(msg.Sender, body, types.Error_CAPABILITY_NOT_FOUND) continue } receiverQueueUsage := float64(len(receiver.ch)) / float64(d.cfg.ReceiverBufferSize()) - capReceiveChannelUsage.WithLabelValues(k.capId, fmt.Sprint(k.donId)).Set(receiverQueueUsage) + capReceiveChannelUsage.WithLabelValues(k.capID, strconv.FormatUint(uint64(k.donID), 10)).Set(receiverQueueUsage) select { case receiver.ch <- body: default: - d.lggr.Warnw("receiver channel full, dropping message", "capabilityId", k.capId, "donId", k.donId) + d.lggr.Warnw("receiver channel full, dropping message", "capabilityId", k.capID, "donId", k.donID) } } } diff --git a/core/capabilities/remote/dispatcher_test.go b/core/capabilities/remote/dispatcher_test.go index 50edc5f3530..fbc9dbb4b49 100644 --- a/core/capabilities/remote/dispatcher_test.go +++ b/core/capabilities/remote/dispatcher_test.go @@ -104,13 +104,13 @@ func TestDispatcher_CleanStartClose(t *testing.T) { func TestDispatcher_Receive(t *testing.T) { lggr := logger.TestLogger(t) ctx := testutils.Context(t) - privKey1, peerId1 := newKeyPair(t) - _, peerId2 := newKeyPair(t) + privKey1, peerID1 := newKeyPair(t) + _, peerID2 := newKeyPair(t) peer := mocks.NewPeer(t) recvCh := make(chan p2ptypes.Message) peer.On("Receive", mock.Anything).Return((<-chan p2ptypes.Message)(recvCh)) - peer.On("ID", mock.Anything).Return(peerId2) + peer.On("ID", mock.Anything).Return(peerID2) wrapper := mocks.NewPeerWrapper(t) wrapper.On("GetPeer").Return(peer) signer := mocks.NewSigner(t) @@ -131,39 +131,39 @@ func TestDispatcher_Receive(t *testing.T) { require.NoError(t, dispatcher.Start(ctx)) rcv := newReceiver() - err = dispatcher.SetReceiver(capId1, donId1, rcv) + err = dispatcher.SetReceiver(capID1, donID1, rcv) require.NoError(t, err) // supported capability - recvCh <- encodeAndSign(t, privKey1, peerId1, peerId2, capId1, donId1, []byte(payload1)) + recvCh <- encodeAndSign(t, privKey1, peerID1, peerID2, capID1, donID1, []byte(payload1)) // unknown capability - recvCh <- encodeAndSign(t, privKey1, peerId1, peerId2, capId2, donId1, []byte(payload1)) + recvCh <- encodeAndSign(t, privKey1, peerID1, peerID2, capID2, donID1, []byte(payload1)) // sender doesn't match - invalid := encodeAndSign(t, privKey1, peerId1, peerId2, capId2, donId1, []byte(payload1)) - invalid.Sender = peerId2 + invalid := encodeAndSign(t, privKey1, peerID1, peerID2, capID2, donID1, []byte(payload1)) + invalid.Sender = peerID2 recvCh <- invalid // supported capability again - recvCh <- encodeAndSign(t, privKey1, peerId1, peerId2, capId1, donId1, []byte(payload2)) + recvCh <- encodeAndSign(t, privKey1, peerID1, peerID2, capID1, donID1, []byte(payload2)) m := <-rcv.ch require.Equal(t, payload1, string(m.Payload)) m = <-rcv.ch require.Equal(t, payload2, string(m.Payload)) - dispatcher.RemoveReceiver(capId1, donId1) + dispatcher.RemoveReceiver(capID1, donID1) require.NoError(t, dispatcher.Close()) } func TestDispatcher_RespondWithError(t *testing.T) { lggr := logger.TestLogger(t) ctx := testutils.Context(t) - privKey1, peerId1 := newKeyPair(t) - _, peerId2 := newKeyPair(t) + privKey1, peerID1 := newKeyPair(t) + _, peerID2 := newKeyPair(t) peer := mocks.NewPeer(t) recvCh := make(chan p2ptypes.Message) peer.On("Receive", mock.Anything).Return((<-chan p2ptypes.Message)(recvCh)) - peer.On("ID", mock.Anything).Return(peerId2) + peer.On("ID", mock.Anything).Return(peerID2) sendCh := make(chan p2ptypes.PeerID) peer.On("Send", mock.Anything, mock.Anything).Run(func(args mock.Arguments) { peerID := args.Get(0).(p2ptypes.PeerID) @@ -189,9 +189,9 @@ func TestDispatcher_RespondWithError(t *testing.T) { require.NoError(t, dispatcher.Start(ctx)) // unknown capability - recvCh <- encodeAndSign(t, privKey1, peerId1, peerId2, capId1, donId1, []byte(payload1)) + recvCh <- encodeAndSign(t, privKey1, peerID1, peerID2, capID1, donID1, []byte(payload1)) responseDestPeerID := <-sendCh - require.Equal(t, peerId1, responseDestPeerID) + require.Equal(t, peerID1, responseDestPeerID) require.NoError(t, dispatcher.Close()) } diff --git a/core/capabilities/remote/executable/endtoend_test.go b/core/capabilities/remote/executable/endtoend_test.go index 5f445db4235..886bde9d33d 100644 --- a/core/capabilities/remote/executable/endtoend_test.go +++ b/core/capabilities/remote/executable/endtoend_test.go @@ -224,9 +224,9 @@ func (a *testAsyncMessageBroker) start(ctx context.Context) error { case <-ctx.Done(): return case msg := <-a.sendCh: - receiverId := toPeerID(msg.Receiver) + receiverID := toPeerID(msg.Receiver) - receiver, ok := a.nodes[receiverId] + receiver, ok := a.nodes[receiverID] if !ok { panic("server not found for peer id") } @@ -299,10 +299,10 @@ func (t *nodeDispatcher) Send(peerID p2ptypes.PeerID, msgBody *remotetypes.Messa return nil } -func (t *nodeDispatcher) SetReceiver(capabilityId string, donId uint32, receiver remotetypes.Receiver) error { +func (t *nodeDispatcher) SetReceiver(capabilityID string, donID uint32, receiver remotetypes.Receiver) error { return nil } -func (t *nodeDispatcher) RemoveReceiver(capabilityId string, donId uint32) {} +func (t *nodeDispatcher) RemoveReceiver(capabilityID string, donID uint32) {} type abstractTestCapability struct { } @@ -407,29 +407,3 @@ func executeCapability(ctx context.Context, t *testing.T, caller commoncap.Execu responseTest(t, response, err) } - -func registerWorkflow(ctx context.Context, t *testing.T, caller commoncap.ExecutableCapability, transmissionSchedule *values.Map, responseTest func(t *testing.T, responseError error)) { - err := caller.RegisterToWorkflow(ctx, commoncap.RegisterToWorkflowRequest{ - Metadata: commoncap.RegistrationMetadata{ - WorkflowID: workflowID1, - ReferenceID: stepReferenceID1, - WorkflowOwner: workflowOwnerID, - }, - Config: transmissionSchedule, - }) - - responseTest(t, err) -} - -func unregisterWorkflow(ctx context.Context, t *testing.T, caller commoncap.ExecutableCapability, transmissionSchedule *values.Map, responseTest func(t *testing.T, responseError error)) { - err := caller.UnregisterFromWorkflow(ctx, commoncap.UnregisterFromWorkflowRequest{ - Metadata: commoncap.RegistrationMetadata{ - WorkflowID: workflowID1, - ReferenceID: stepReferenceID1, - WorkflowOwner: workflowOwnerID, - }, - Config: transmissionSchedule, - }) - - responseTest(t, err) -} diff --git a/core/capabilities/remote/executable/request/client_request.go b/core/capabilities/remote/executable/request/client_request.go index 6b4b9e3a0cd..ef4d0023773 100644 --- a/core/capabilities/remote/executable/request/client_request.go +++ b/core/capabilities/remote/executable/request/client_request.go @@ -212,7 +212,7 @@ func (c *ClientRequest) OnMessage(_ context.Context, msg *types.MessageBody) err } if msg.Sender == nil { - return fmt.Errorf("sender missing from message") + return errors.New("sender missing from message") } c.lggr.Debugw("OnMessage called for client request", "messageID", msg.MessageId) diff --git a/core/capabilities/remote/executable/request/client_request_test.go b/core/capabilities/remote/executable/request/client_request_test.go index c46fd1363a0..45e81fc70d8 100644 --- a/core/capabilities/remote/executable/request/client_request_test.go +++ b/core/capabilities/remote/executable/request/client_request_test.go @@ -167,7 +167,7 @@ func Test_ClientRequest_MessageValidation(t *testing.T) { nonDonPeer := NewP2PPeerID(t) msg.Sender = nonDonPeer[:] err = request.OnMessage(ctx, msg) - require.NotNil(t, err) + require.Error(t, err) select { case <-request.ResponseChan(): @@ -190,7 +190,7 @@ func Test_ClientRequest_MessageValidation(t *testing.T) { err = request.OnMessage(ctx, msg) require.NoError(t, err) err = request.OnMessage(ctx, msg) - require.NotNil(t, err) + require.Error(t, err) select { case <-request.ResponseChan(): @@ -211,7 +211,7 @@ func Test_ClientRequest_MessageValidation(t *testing.T) { <-dispatcher.msgs <-dispatcher.msgs - assert.Equal(t, 0, len(dispatcher.msgs)) + assert.Empty(t, dispatcher.msgs) msgWithError := &types.MessageBody{ CapabilityId: capInfo.ID, @@ -249,7 +249,7 @@ func Test_ClientRequest_MessageValidation(t *testing.T) { <-dispatcher.msgs <-dispatcher.msgs - assert.Equal(t, 0, len(dispatcher.msgs)) + assert.Empty(t, dispatcher.msgs) msgWithError := &types.MessageBody{ CapabilityId: capInfo.ID, @@ -299,7 +299,7 @@ func Test_ClientRequest_MessageValidation(t *testing.T) { <-dispatcher.msgs <-dispatcher.msgs - assert.Equal(t, 0, len(dispatcher.msgs)) + assert.Empty(t, dispatcher.msgs) msg.Sender = capabilityPeers[0][:] err = request.OnMessage(ctx, msg) @@ -497,11 +497,11 @@ func (t *clientRequestTestDispatcher) HealthReport() map[string]error { return nil } -func (t *clientRequestTestDispatcher) SetReceiver(capabilityId string, donId uint32, receiver types.Receiver) error { +func (t *clientRequestTestDispatcher) SetReceiver(capabilityID string, donID uint32, receiver types.Receiver) error { return nil } -func (t *clientRequestTestDispatcher) RemoveReceiver(capabilityId string, donId uint32) {} +func (t *clientRequestTestDispatcher) RemoveReceiver(capabilityID string, donID uint32) {} func (t *clientRequestTestDispatcher) Send(peerID p2ptypes.PeerID, msgBody *types.MessageBody) error { t.msgs <- msgBody diff --git a/core/capabilities/remote/executable/request/server_request.go b/core/capabilities/remote/executable/request/server_request.go index 629622494a4..ee4e48fe1b9 100644 --- a/core/capabilities/remote/executable/request/server_request.go +++ b/core/capabilities/remote/executable/request/server_request.go @@ -26,7 +26,7 @@ type response struct { type ServerRequest struct { capability capabilities.ExecutableCapability - capabilityPeerId p2ptypes.PeerID + capabilityPeerID p2ptypes.PeerID capabilityID string capabilityDonID uint32 @@ -60,7 +60,7 @@ func NewServerRequest(capability capabilities.ExecutableCapability, method strin createdTime: time.Now(), capabilityID: capabilityID, capabilityDonID: capabilityDonID, - capabilityPeerId: capabilityPeerID, + capabilityPeerID: capabilityPeerID, dispatcher: dispatcher, requesters: map[p2ptypes.PeerID]bool{}, responseSentToRequester: map[p2ptypes.PeerID]bool{}, @@ -77,7 +77,7 @@ func (e *ServerRequest) OnMessage(ctx context.Context, msg *types.MessageBody) e defer e.mux.Unlock() if msg.Sender == nil { - return fmt.Errorf("sender missing from message") + return errors.New("sender missing from message") } requester, err := remote.ToPeerID(msg.Sender) @@ -206,7 +206,7 @@ func (e *ServerRequest) sendResponse(requester p2ptypes.PeerID) error { CallerDonId: e.callingDon.ID, Method: types.MethodExecute, MessageId: []byte(e.requestMessageID), - Sender: e.capabilityPeerId[:], + Sender: e.capabilityPeerID[:], Receiver: requester[:], } diff --git a/core/capabilities/remote/executable/request/server_request_test.go b/core/capabilities/remote/executable/request/server_request_test.go index ce539154d93..faf91be0690 100644 --- a/core/capabilities/remote/executable/request/server_request_test.go +++ b/core/capabilities/remote/executable/request/server_request_test.go @@ -311,11 +311,11 @@ func (t *testDispatcher) HealthReport() map[string]error { return nil } -func (t *testDispatcher) SetReceiver(capabilityId string, donId uint32, receiver types.Receiver) error { +func (t *testDispatcher) SetReceiver(capabilityID string, donID uint32, receiver types.Receiver) error { return nil } -func (t *testDispatcher) RemoveReceiver(capabilityId string, donId uint32) {} +func (t *testDispatcher) RemoveReceiver(capabilityID string, donID uint32) {} func (t *testDispatcher) Send(peerID p2ptypes.PeerID, msgBody *types.MessageBody) error { t.msgs = append(t.msgs, msgBody) diff --git a/core/capabilities/remote/executable/server.go b/core/capabilities/remote/executable/server.go index d43c7ab5c41..0208572b0bd 100644 --- a/core/capabilities/remote/executable/server.go +++ b/core/capabilities/remote/executable/server.go @@ -4,6 +4,7 @@ import ( "context" "crypto/sha256" "encoding/hex" + "errors" "fmt" "sync" "time" @@ -142,7 +143,7 @@ func (r *server) Receive(ctx context.Context, msg *types.MessageBody) { r.lggr.Errorw("received request for unsupported method type", "method", remote.SanitizeLogString(msg.Method)) } - messageId, err := GetMessageID(msg) + messageID, err := GetMessageID(msg) if err != nil { r.lggr.Errorw("invalid message id", "err", err, "id", remote.SanitizeLogString(string(msg.MessageId))) return @@ -156,21 +157,21 @@ func (r *server) Receive(ctx context.Context, msg *types.MessageBody) { // A request is uniquely identified by the message id and the hash of the payload to prevent a malicious // actor from sending a different payload with the same message id - requestID := messageId + hex.EncodeToString(msgHash[:]) + requestID := messageID + hex.EncodeToString(msgHash[:]) r.lggr.Debugw("received request", "msgId", msg.MessageId, "requestID", requestID) - if requestIDs, ok := r.messageIDToRequestIDsCount[messageId]; ok { - requestIDs[requestID] = requestIDs[requestID] + 1 + if requestIDs, ok := r.messageIDToRequestIDsCount[messageID]; ok { + requestIDs[requestID]++ } else { - r.messageIDToRequestIDsCount[messageId] = map[string]int{requestID: 1} + r.messageIDToRequestIDsCount[messageID] = map[string]int{requestID: 1} } - requestIDs := r.messageIDToRequestIDsCount[messageId] + requestIDs := r.messageIDToRequestIDsCount[messageID] if len(requestIDs) > 1 { // This is a potential attack vector as well as a situation that will occur if the client is sending non-deterministic payloads // so a warning is logged - r.lggr.Warnw("received messages with the same id and different payloads", "messageID", messageId, "lenRequestIDs", len(requestIDs)) + r.lggr.Warnw("received messages with the same id and different payloads", "messageID", messageID, "lenRequestIDs", len(requestIDs)) } if _, ok := r.requestIDToRequest[requestID]; !ok { @@ -182,8 +183,8 @@ func (r *server) Receive(ctx context.Context, msg *types.MessageBody) { r.requestIDToRequest[requestID] = requestAndMsgID{ request: request.NewServerRequest(r.underlying, msg.Method, r.capInfo.ID, r.localDonInfo.ID, r.peerID, - callingDon, messageId, r.dispatcher, r.requestTimeout, r.lggr), - messageID: messageId, + callingDon, messageID, r.dispatcher, r.requestTimeout, r.lggr), + messageID: messageID, } } @@ -218,7 +219,7 @@ func (r *server) getMessageHash(msg *types.MessageBody) ([32]byte, error) { func GetMessageID(msg *types.MessageBody) (string, error) { idStr := string(msg.MessageId) if !validation.IsValidID(idStr) { - return "", fmt.Errorf("invalid message id") + return "", errors.New("invalid message id") } return idStr, nil } diff --git a/core/capabilities/remote/message_cache.go b/core/capabilities/remote/messagecache/message_cache.go similarity index 81% rename from core/capabilities/remote/message_cache.go rename to core/capabilities/remote/messagecache/message_cache.go index f3a3a79b2c6..28ef57ab875 100644 --- a/core/capabilities/remote/message_cache.go +++ b/core/capabilities/remote/messagecache/message_cache.go @@ -1,9 +1,9 @@ -package remote +package messagecache // MessageCache is a simple store for messages, grouped by event ID and peer ID. // It is used to collect messages from multiple peers until they are ready for aggregation // based on quantity and freshness. -type messageCache[EventID comparable, PeerID comparable] struct { +type MessageCache[EventID comparable, PeerID comparable] struct { events map[EventID]*eventState[PeerID] } @@ -18,14 +18,14 @@ type msgState struct { payload []byte } -func NewMessageCache[EventID comparable, PeerID comparable]() *messageCache[EventID, PeerID] { - return &messageCache[EventID, PeerID]{ +func NewMessageCache[EventID comparable, PeerID comparable]() *MessageCache[EventID, PeerID] { + return &MessageCache[EventID, PeerID]{ events: make(map[EventID]*eventState[PeerID]), } } // Insert or overwrite a message for . Return creation timestamp of the event. -func (c *messageCache[EventID, PeerID]) Insert(eventID EventID, peerID PeerID, timestamp int64, payload []byte) int64 { +func (c *MessageCache[EventID, PeerID]) Insert(eventID EventID, peerID PeerID, timestamp int64, payload []byte) int64 { if _, ok := c.events[eventID]; !ok { c.events[eventID] = &eventState[PeerID]{ peerMsgs: make(map[PeerID]*msgState), @@ -43,7 +43,7 @@ func (c *messageCache[EventID, PeerID]) Insert(eventID EventID, peerID PeerID, t // received more recently than . // Return all messages that satisfy the above condition. // Ready() will return true at most once per event if is true. -func (c *messageCache[EventID, PeerID]) Ready(eventID EventID, minCount uint32, minTimestamp int64, once bool) (bool, [][]byte) { +func (c *MessageCache[EventID, PeerID]) Ready(eventID EventID, minCount uint32, minTimestamp int64, once bool) (bool, [][]byte) { ev, ok := c.events[eventID] if !ok { return false, nil @@ -51,6 +51,7 @@ func (c *messageCache[EventID, PeerID]) Ready(eventID EventID, minCount uint32, if ev.wasReady && once { return false, nil } + //nolint:gosec // G115 if uint32(len(ev.peerMsgs)) < minCount { return false, nil } @@ -69,13 +70,13 @@ func (c *messageCache[EventID, PeerID]) Ready(eventID EventID, minCount uint32, return false, nil } -func (c *messageCache[EventID, PeerID]) Delete(eventID EventID) { +func (c *MessageCache[EventID, PeerID]) Delete(eventID EventID) { delete(c.events, eventID) } // Return the number of events deleted. // Scans all keys, which might be slow for large caches. -func (c *messageCache[EventID, PeerID]) DeleteOlderThan(cutoffTimestamp int64) int { +func (c *MessageCache[EventID, PeerID]) DeleteOlderThan(cutoffTimestamp int64) int { nDeleted := 0 for id, event := range c.events { if event.creationTimestamp < cutoffTimestamp { diff --git a/core/capabilities/remote/message_cache_test.go b/core/capabilities/remote/messagecache/message_cache_test.go similarity index 59% rename from core/capabilities/remote/message_cache_test.go rename to core/capabilities/remote/messagecache/message_cache_test.go index 5ca909ca4ec..2d059adef32 100644 --- a/core/capabilities/remote/message_cache_test.go +++ b/core/capabilities/remote/messagecache/message_cache_test.go @@ -1,53 +1,53 @@ -package remote_test +package messagecache_test import ( "testing" "github.com/stretchr/testify/require" - "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote" + "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/messagecache" ) const ( - eventId1 = "event1" - eventId2 = "event2" - peerId1 = "peer1" - peerId2 = "peer2" + eventID1 = "event1" + eventID2 = "event2" + peerID1 = "peer1" + peerID2 = "peer2" payloadA = "payloadA" ) func TestMessageCache_InsertReady(t *testing.T) { - cache := remote.NewMessageCache[string, string]() + cache := messagecache.NewMessageCache[string, string]() // not ready with one message - ts := cache.Insert(eventId1, peerId1, 100, []byte(payloadA)) + ts := cache.Insert(eventID1, peerID1, 100, []byte(payloadA)) require.Equal(t, int64(100), ts) - ready, _ := cache.Ready(eventId1, 2, 100, true) + ready, _ := cache.Ready(eventID1, 2, 100, true) require.False(t, ready) // not ready with two messages but only one fresh enough - ts = cache.Insert(eventId1, peerId2, 200, []byte(payloadA)) + ts = cache.Insert(eventID1, peerID2, 200, []byte(payloadA)) require.Equal(t, int64(100), ts) - ready, _ = cache.Ready(eventId1, 2, 150, true) + ready, _ = cache.Ready(eventID1, 2, 150, true) require.False(t, ready) // ready with two messages (once only) - ready, messages := cache.Ready(eventId1, 2, 100, true) + ready, messages := cache.Ready(eventID1, 2, 100, true) require.True(t, ready) require.Equal(t, []byte(payloadA), messages[0]) require.Equal(t, []byte(payloadA), messages[1]) // not ready again for the same event ID - ready, _ = cache.Ready(eventId1, 2, 100, true) + ready, _ = cache.Ready(eventID1, 2, 100, true) require.False(t, ready) } func TestMessageCache_DeleteOlderThan(t *testing.T) { - cache := remote.NewMessageCache[string, string]() + cache := messagecache.NewMessageCache[string, string]() - ts := cache.Insert(eventId1, peerId1, 100, []byte(payloadA)) + ts := cache.Insert(eventID1, peerID1, 100, []byte(payloadA)) require.Equal(t, int64(100), ts) - ts = cache.Insert(eventId2, peerId2, 200, []byte(payloadA)) + ts = cache.Insert(eventID2, peerID2, 200, []byte(payloadA)) require.Equal(t, int64(200), ts) deleted := cache.DeleteOlderThan(150) diff --git a/core/capabilities/remote/trigger_publisher.go b/core/capabilities/remote/trigger_publisher.go index 315959605e8..24bd26757ac 100644 --- a/core/capabilities/remote/trigger_publisher.go +++ b/core/capabilities/remote/trigger_publisher.go @@ -10,6 +10,8 @@ import ( commoncap "github.com/smartcontractkit/chainlink-common/pkg/capabilities" "github.com/smartcontractkit/chainlink-common/pkg/capabilities/pb" "github.com/smartcontractkit/chainlink-common/pkg/services" + "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/aggregation" + "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/messagecache" "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types" "github.com/smartcontractkit/chainlink/v2/core/capabilities/validation" "github.com/smartcontractkit/chainlink/v2/core/logger" @@ -30,7 +32,7 @@ type triggerPublisher struct { workflowDONs map[uint32]commoncap.DON membersCache map[uint32]map[p2ptypes.PeerID]bool dispatcher types.Dispatcher - messageCache *messageCache[registrationKey, p2ptypes.PeerID] + messageCache *messagecache.MessageCache[registrationKey, p2ptypes.PeerID] registrations map[registrationKey]*pubRegState mu sync.RWMutex // protects messageCache and registrations batchingQueue map[[32]byte]*batchedResponse @@ -42,8 +44,8 @@ type triggerPublisher struct { } type registrationKey struct { - callerDonId uint32 - workflowId string + callerDonID uint32 + workflowID string } type pubRegState struct { @@ -84,7 +86,7 @@ func NewTriggerPublisher(config *commoncap.RemoteTriggerConfig, underlying commo workflowDONs: workflowDONs, membersCache: membersCache, dispatcher: dispatcher, - messageCache: NewMessageCache[registrationKey, p2ptypes.PeerID](), + messageCache: messagecache.NewMessageCache[registrationKey, p2ptypes.PeerID](), registrations: make(map[registrationKey]*pubRegState), batchingQueue: make(map[[32]byte]*batchedResponse), batchingEnabled: config.MaxBatchSize > 1 && config.BatchCollectionPeriod >= minAllowedBatchCollectionPeriod, @@ -148,7 +150,7 @@ func (p *triggerPublisher) Receive(_ context.Context, msg *types.MessageBody) { p.lggr.Debugw("not ready to aggregate yet", "capabilityId", p.capInfo.ID, "workflowId", req.Metadata.WorkflowID, "minRequired", minRequired) return } - aggregated, err := AggregateModeRaw(payloads, uint32(callerDon.F+1)) + aggregated, err := aggregation.AggregateModeRaw(payloads, uint32(callerDon.F+1)) if err != nil { p.lggr.Errorw("failed to aggregate trigger registrations", "capabilityId", p.capInfo.ID, "workflowId", req.Metadata.WorkflowID, "err", err) return @@ -189,14 +191,14 @@ func (p *triggerPublisher) registrationCleanupLoop() { now := time.Now().UnixMilli() p.mu.Lock() for key, req := range p.registrations { - callerDon := p.workflowDONs[key.callerDonId] + callerDon := p.workflowDONs[key.callerDonID] ready, _ := p.messageCache.Ready(key, uint32(2*callerDon.F+1), now-p.config.RegistrationExpiry.Milliseconds(), false) if !ready { - p.lggr.Infow("trigger registration expired", "capabilityId", p.capInfo.ID, "callerDonID", key.callerDonId, "workflowId", key.workflowId) + p.lggr.Infow("trigger registration expired", "capabilityId", p.capInfo.ID, "callerDonID", key.callerDonID, "workflowId", key.workflowID) ctx, cancel := p.stopCh.NewCtx() err := p.underlying.UnregisterTrigger(ctx, req.request) cancel() - p.lggr.Infow("unregistered trigger", "capabilityId", p.capInfo.ID, "callerDonID", key.callerDonId, "workflowId", key.workflowId, "err", err) + p.lggr.Infow("unregistered trigger", "capabilityId", p.capInfo.ID, "callerDonID", key.callerDonID, "workflowId", key.workflowID, "err", err) // after calling UnregisterTrigger, the underlying trigger will not send any more events to the channel delete(p.registrations, key) p.messageCache.Delete(key) @@ -215,11 +217,11 @@ func (p *triggerPublisher) triggerEventLoop(callbackCh <-chan commoncap.TriggerR return case response, ok := <-callbackCh: if !ok { - p.lggr.Infow("triggerEventLoop channel closed", "capabilityId", p.capInfo.ID, "workflowId", key.workflowId) + p.lggr.Infow("triggerEventLoop channel closed", "capabilityId", p.capInfo.ID, "workflowId", key.workflowID) return } triggerEvent := response.Event - p.lggr.Debugw("received trigger event", "capabilityId", p.capInfo.ID, "workflowId", key.workflowId, "triggerEventID", triggerEvent.ID) + p.lggr.Debugw("received trigger event", "capabilityId", p.capInfo.ID, "workflowId", key.workflowID, "triggerEventID", triggerEvent.ID) marshaledResponse, err := pb.MarshalTriggerResponse(response) if err != nil { p.lggr.Debugw("can't marshal trigger event", "err", err) @@ -232,9 +234,9 @@ func (p *triggerPublisher) triggerEventLoop(callbackCh <-chan commoncap.TriggerR // a single-element "batch" p.sendBatch(&batchedResponse{ rawResponse: marshaledResponse, - callerDonID: key.callerDonId, + callerDonID: key.callerDonID, triggerEventID: triggerEvent.ID, - workflowIDs: []string{key.workflowId}, + workflowIDs: []string{key.workflowID}, }) } } @@ -244,7 +246,7 @@ func (p *triggerPublisher) triggerEventLoop(callbackCh <-chan commoncap.TriggerR func (p *triggerPublisher) enqueueForBatching(rawResponse []byte, key registrationKey, triggerEventID string) { // put in batching queue, group by hash(callerDonId, triggerEventID, response) combined := make([]byte, 4) - binary.LittleEndian.PutUint32(combined, key.callerDonId) + binary.LittleEndian.PutUint32(combined, key.callerDonID) combined = append(combined, []byte(triggerEventID)...) combined = append(combined, rawResponse...) sha := sha256.Sum256(combined) @@ -253,13 +255,13 @@ func (p *triggerPublisher) enqueueForBatching(rawResponse []byte, key registrati if !exists { elem = &batchedResponse{ rawResponse: rawResponse, - callerDonID: key.callerDonId, + callerDonID: key.callerDonID, triggerEventID: triggerEventID, - workflowIDs: []string{key.workflowId}, + workflowIDs: []string{key.workflowID}, } p.batchingQueue[sha] = elem } else { - elem.workflowIDs = append(elem.workflowIDs, key.workflowId) + elem.workflowIDs = append(elem.workflowIDs, key.workflowID) } p.bqMu.Unlock() } diff --git a/core/capabilities/remote/trigger_subscriber.go b/core/capabilities/remote/trigger_subscriber.go index 2638d9ca5f3..7edcbf5eba7 100644 --- a/core/capabilities/remote/trigger_subscriber.go +++ b/core/capabilities/remote/trigger_subscriber.go @@ -9,6 +9,8 @@ import ( commoncap "github.com/smartcontractkit/chainlink-common/pkg/capabilities" "github.com/smartcontractkit/chainlink-common/pkg/capabilities/pb" "github.com/smartcontractkit/chainlink-common/pkg/services" + "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/aggregation" + "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/messagecache" "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types" "github.com/smartcontractkit/chainlink/v2/core/logger" p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types" @@ -29,7 +31,7 @@ type triggerSubscriber struct { localDonInfo commoncap.DON dispatcher types.Dispatcher aggregator types.Aggregator - messageCache *messageCache[triggerEventKey, p2ptypes.PeerID] + messageCache *messagecache.MessageCache[triggerEventKey, p2ptypes.PeerID] registeredWorkflows map[string]*subRegState mu sync.RWMutex // protects registeredWorkflows and messageCache stopCh services.StopChan @@ -38,8 +40,8 @@ type triggerSubscriber struct { } type triggerEventKey struct { - triggerEventId string - workflowId string + triggerEventID string + workflowID string } type subRegState struct { @@ -65,7 +67,7 @@ const ( func NewTriggerSubscriber(config *commoncap.RemoteTriggerConfig, capInfo commoncap.CapabilityInfo, capDonInfo commoncap.DON, localDonInfo commoncap.DON, dispatcher types.Dispatcher, aggregator types.Aggregator, lggr logger.Logger) *triggerSubscriber { if aggregator == nil { lggr.Warnw("no aggregator provided, using default MODE aggregator", "capabilityId", capInfo.ID) - aggregator = NewDefaultModeAggregator(uint32(capDonInfo.F + 1)) + aggregator = aggregation.NewDefaultModeAggregator(uint32(capDonInfo.F + 1)) } if config == nil { lggr.Info("no config provided, using default values") @@ -84,7 +86,7 @@ func NewTriggerSubscriber(config *commoncap.RemoteTriggerConfig, capInfo commonc localDonInfo: localDonInfo, dispatcher: dispatcher, aggregator: aggregator, - messageCache: NewMessageCache[triggerEventKey, p2ptypes.PeerID](), + messageCache: messagecache.NewMessageCache[triggerEventKey, p2ptypes.PeerID](), registeredWorkflows: make(map[string]*subRegState), stopCh: make(services.StopChan), lggr: lggr.Named("TriggerSubscriber"), @@ -200,17 +202,17 @@ func (s *triggerSubscriber) Receive(_ context.Context, msg *types.MessageBody) { s.lggr.Errorw("received message with too many workflow IDs - truncating", "capabilityId", s.capInfo.ID, "nWorkflows", len(meta.WorkflowIds), "sender", sender) meta.WorkflowIds = meta.WorkflowIds[:maxBatchedWorkflowIDs] } - for _, workflowId := range meta.WorkflowIds { + for _, workflowID := range meta.WorkflowIds { s.mu.RLock() - registration, found := s.registeredWorkflows[workflowId] + registration, found := s.registeredWorkflows[workflowID] s.mu.RUnlock() if !found { - s.lggr.Errorw("received message for unregistered workflow", "capabilityId", s.capInfo.ID, "workflowID", SanitizeLogString(workflowId), "sender", sender) + s.lggr.Errorw("received message for unregistered workflow", "capabilityId", s.capInfo.ID, "workflowID", SanitizeLogString(workflowID), "sender", sender) continue } key := triggerEventKey{ - triggerEventId: meta.TriggerEventId, - workflowId: workflowId, + triggerEventID: meta.TriggerEventId, + workflowID: workflowID, } nowMs := time.Now().UnixMilli() s.mu.Lock() @@ -218,17 +220,17 @@ func (s *triggerSubscriber) Receive(_ context.Context, msg *types.MessageBody) { ready, payloads := s.messageCache.Ready(key, s.config.MinResponsesToAggregate, nowMs-s.config.MessageExpiry.Milliseconds(), true) s.mu.Unlock() if nowMs-creationTs > s.config.RegistrationExpiry.Milliseconds() { - s.lggr.Warnw("received trigger event for an expired ID", "triggerEventID", meta.TriggerEventId, "capabilityId", s.capInfo.ID, "workflowId", workflowId, "sender", sender) + s.lggr.Warnw("received trigger event for an expired ID", "triggerEventID", meta.TriggerEventId, "capabilityId", s.capInfo.ID, "workflowId", workflowID, "sender", sender) continue } if ready { - s.lggr.Debugw("trigger event ready to aggregate", "triggerEventID", meta.TriggerEventId, "capabilityId", s.capInfo.ID, "workflowId", workflowId) + s.lggr.Debugw("trigger event ready to aggregate", "triggerEventID", meta.TriggerEventId, "capabilityId", s.capInfo.ID, "workflowId", workflowID) aggregatedResponse, err := s.aggregator.Aggregate(meta.TriggerEventId, payloads) if err != nil { - s.lggr.Errorw("failed to aggregate responses", "triggerEventID", meta.TriggerEventId, "capabilityId", s.capInfo.ID, "workflowId", workflowId, "err", err) + s.lggr.Errorw("failed to aggregate responses", "triggerEventID", meta.TriggerEventId, "capabilityId", s.capInfo.ID, "workflowId", workflowID, "err", err) continue } - s.lggr.Infow("remote trigger event aggregated", "triggerEventID", meta.TriggerEventId, "capabilityId", s.capInfo.ID, "workflowId", workflowId) + s.lggr.Infow("remote trigger event aggregated", "triggerEventID", meta.TriggerEventId, "capabilityId", s.capInfo.ID, "workflowId", workflowID) registration.callback <- aggregatedResponse } } diff --git a/core/capabilities/remote/trigger_subscriber_test.go b/core/capabilities/remote/trigger_subscriber_test.go index b8cc3ddc7bd..d5b48bc1dc8 100644 --- a/core/capabilities/remote/trigger_subscriber_test.go +++ b/core/capabilities/remote/trigger_subscriber_test.go @@ -26,7 +26,6 @@ const ( var ( triggerEvent1 = map[string]any{"event": "triggerEvent1"} - triggerEvent2 = map[string]any{"event": "triggerEvent2"} ) func TestTriggerSubscriber_RegisterAndReceive(t *testing.T) { diff --git a/core/capabilities/remote/types/mocks/dispatcher.go b/core/capabilities/remote/types/mocks/dispatcher.go index 0948698b935..d7f2ab45bac 100644 --- a/core/capabilities/remote/types/mocks/dispatcher.go +++ b/core/capabilities/remote/types/mocks/dispatcher.go @@ -206,9 +206,9 @@ func (_c *Dispatcher_Ready_Call) RunAndReturn(run func() error) *Dispatcher_Read return _c } -// RemoveReceiver provides a mock function with given fields: capabilityId, donId -func (_m *Dispatcher) RemoveReceiver(capabilityId string, donId uint32) { - _m.Called(capabilityId, donId) +// RemoveReceiver provides a mock function with given fields: capabilityID, donID +func (_m *Dispatcher) RemoveReceiver(capabilityID string, donID uint32) { + _m.Called(capabilityID, donID) } // Dispatcher_RemoveReceiver_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'RemoveReceiver' @@ -217,13 +217,13 @@ type Dispatcher_RemoveReceiver_Call struct { } // RemoveReceiver is a helper method to define mock.On call -// - capabilityId string -// - donId uint32 -func (_e *Dispatcher_Expecter) RemoveReceiver(capabilityId interface{}, donId interface{}) *Dispatcher_RemoveReceiver_Call { - return &Dispatcher_RemoveReceiver_Call{Call: _e.mock.On("RemoveReceiver", capabilityId, donId)} +// - capabilityID string +// - donID uint32 +func (_e *Dispatcher_Expecter) RemoveReceiver(capabilityID interface{}, donID interface{}) *Dispatcher_RemoveReceiver_Call { + return &Dispatcher_RemoveReceiver_Call{Call: _e.mock.On("RemoveReceiver", capabilityID, donID)} } -func (_c *Dispatcher_RemoveReceiver_Call) Run(run func(capabilityId string, donId uint32)) *Dispatcher_RemoveReceiver_Call { +func (_c *Dispatcher_RemoveReceiver_Call) Run(run func(capabilityID string, donID uint32)) *Dispatcher_RemoveReceiver_Call { _c.Call.Run(func(args mock.Arguments) { run(args[0].(string), args[1].(uint32)) }) @@ -287,9 +287,9 @@ func (_c *Dispatcher_Send_Call) RunAndReturn(run func(p2ptypes.PeerID, *types.Me return _c } -// SetReceiver provides a mock function with given fields: capabilityId, donId, receiver -func (_m *Dispatcher) SetReceiver(capabilityId string, donId uint32, receiver types.Receiver) error { - ret := _m.Called(capabilityId, donId, receiver) +// SetReceiver provides a mock function with given fields: capabilityID, donID, receiver +func (_m *Dispatcher) SetReceiver(capabilityID string, donID uint32, receiver types.Receiver) error { + ret := _m.Called(capabilityID, donID, receiver) if len(ret) == 0 { panic("no return value specified for SetReceiver") @@ -297,7 +297,7 @@ func (_m *Dispatcher) SetReceiver(capabilityId string, donId uint32, receiver ty var r0 error if rf, ok := ret.Get(0).(func(string, uint32, types.Receiver) error); ok { - r0 = rf(capabilityId, donId, receiver) + r0 = rf(capabilityID, donID, receiver) } else { r0 = ret.Error(0) } @@ -311,14 +311,14 @@ type Dispatcher_SetReceiver_Call struct { } // SetReceiver is a helper method to define mock.On call -// - capabilityId string -// - donId uint32 +// - capabilityID string +// - donID uint32 // - receiver types.Receiver -func (_e *Dispatcher_Expecter) SetReceiver(capabilityId interface{}, donId interface{}, receiver interface{}) *Dispatcher_SetReceiver_Call { - return &Dispatcher_SetReceiver_Call{Call: _e.mock.On("SetReceiver", capabilityId, donId, receiver)} +func (_e *Dispatcher_Expecter) SetReceiver(capabilityID interface{}, donID interface{}, receiver interface{}) *Dispatcher_SetReceiver_Call { + return &Dispatcher_SetReceiver_Call{Call: _e.mock.On("SetReceiver", capabilityID, donID, receiver)} } -func (_c *Dispatcher_SetReceiver_Call) Run(run func(capabilityId string, donId uint32, receiver types.Receiver)) *Dispatcher_SetReceiver_Call { +func (_c *Dispatcher_SetReceiver_Call) Run(run func(capabilityID string, donID uint32, receiver types.Receiver)) *Dispatcher_SetReceiver_Call { _c.Call.Run(func(args mock.Arguments) { run(args[0].(string), args[1].(uint32), args[2].(types.Receiver)) }) diff --git a/core/capabilities/remote/types/types.go b/core/capabilities/remote/types/types.go index fefc9a9b5fe..188587bc7ac 100644 --- a/core/capabilities/remote/types/types.go +++ b/core/capabilities/remote/types/types.go @@ -23,8 +23,8 @@ const ( type Dispatcher interface { services.Service - SetReceiver(capabilityId string, donId uint32, receiver Receiver) error - RemoveReceiver(capabilityId string, donId uint32) + SetReceiver(capabilityID string, donID uint32, receiver Receiver) error + RemoveReceiver(capabilityID string, donID uint32) Send(peerID p2ptypes.PeerID, msgBody *MessageBody) error } diff --git a/core/capabilities/remote/utils.go b/core/capabilities/remote/utils.go index ea6a3efb186..7af34c5c946 100644 --- a/core/capabilities/remote/utils.go +++ b/core/capabilities/remote/utils.go @@ -3,7 +3,6 @@ package remote import ( "bytes" "crypto/ed25519" - "crypto/sha256" "encoding/hex" "errors" "fmt" @@ -11,8 +10,6 @@ import ( "google.golang.org/protobuf/proto" - commoncap "github.com/smartcontractkit/chainlink-common/pkg/capabilities" - "github.com/smartcontractkit/chainlink-common/pkg/capabilities/pb" remotetypes "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types" p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types" ) @@ -25,25 +22,25 @@ func ValidateMessage(msg p2ptypes.Message, expectedReceiver p2ptypes.PeerID) (*r var topLevelMessage remotetypes.Message err := proto.Unmarshal(msg.Payload, &topLevelMessage) if err != nil { - return nil, fmt.Errorf("failed to unmarshal message, err: %v", err) + return nil, fmt.Errorf("failed to unmarshal message, err: %w", err) } var body remotetypes.MessageBody err = proto.Unmarshal(topLevelMessage.Body, &body) if err != nil { - return nil, fmt.Errorf("failed to unmarshal message body, err: %v", err) + return nil, fmt.Errorf("failed to unmarshal message body, err: %w", err) } if len(body.Sender) != p2ptypes.PeerIDLength || len(body.Receiver) != p2ptypes.PeerIDLength { return &body, fmt.Errorf("invalid sender length (%d) or receiver length (%d)", len(body.Sender), len(body.Receiver)) } if !ed25519.Verify(body.Sender, topLevelMessage.Body, topLevelMessage.Signature) { - return &body, fmt.Errorf("failed to verify message signature") + return &body, errors.New("failed to verify message signature") } // NOTE we currently don't support relaying messages so the p2p message sender needs to be the message author if !bytes.Equal(body.Sender, msg.Sender[:]) { - return &body, fmt.Errorf("sender in message body does not match sender of p2p message") + return &body, errors.New("sender in message body does not match sender of p2p message") } if !bytes.Equal(body.Receiver, expectedReceiver[:]) { - return &body, fmt.Errorf("receiver in message body does not match expected receiver") + return &body, errors.New("receiver in message body does not match expected receiver") } return &body, nil } @@ -58,52 +55,6 @@ func ToPeerID(peerID []byte) (p2ptypes.PeerID, error) { return id, nil } -// Default MODE Aggregator needs a configurable number of identical responses for aggregation to succeed -type defaultModeAggregator struct { - minIdenticalResponses uint32 -} - -var _ remotetypes.Aggregator = &defaultModeAggregator{} - -func NewDefaultModeAggregator(minIdenticalResponses uint32) *defaultModeAggregator { - return &defaultModeAggregator{ - minIdenticalResponses: minIdenticalResponses, - } -} - -func (a *defaultModeAggregator) Aggregate(_ string, responses [][]byte) (commoncap.TriggerResponse, error) { - found, err := AggregateModeRaw(responses, a.minIdenticalResponses) - if err != nil { - return commoncap.TriggerResponse{}, fmt.Errorf("failed to aggregate responses, err: %w", err) - } - - unmarshaled, err := pb.UnmarshalTriggerResponse(found) - if err != nil { - return commoncap.TriggerResponse{}, fmt.Errorf("failed to unmarshal aggregated responses, err: %w", err) - } - return unmarshaled, nil -} - -func AggregateModeRaw(elemList [][]byte, minIdenticalResponses uint32) ([]byte, error) { - hashToCount := make(map[string]uint32) - var found []byte - for _, elem := range elemList { - hasher := sha256.New() - hasher.Write(elem) - sha := hex.EncodeToString(hasher.Sum(nil)) - hashToCount[sha]++ - if hashToCount[sha] >= minIdenticalResponses { - found = elem - // update in case we find another elem with an even higher count - minIdenticalResponses = hashToCount[sha] - } - } - if found == nil { - return nil, errors.New("not enough identical responses found") - } - return found, nil -} - func SanitizeLogString(s string) string { tooLongSuffix := "" if len(s) > maxLoggedStringLen { diff --git a/core/capabilities/remote/utils_test.go b/core/capabilities/remote/utils_test.go index 6707e6ffb25..360ef9000ba 100644 --- a/core/capabilities/remote/utils_test.go +++ b/core/capabilities/remote/utils_test.go @@ -10,43 +10,39 @@ import ( ragetypes "github.com/smartcontractkit/libocr/ragep2p/types" - commoncap "github.com/smartcontractkit/chainlink-common/pkg/capabilities" - "github.com/smartcontractkit/chainlink-common/pkg/capabilities/pb" - "github.com/smartcontractkit/chainlink-common/pkg/values" - "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote" remotetypes "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types" p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types" ) const ( - capId1 = "cap1" - capId2 = "cap2" - donId1 = uint32(1) + capID1 = "cap1" + capID2 = "cap2" + donID1 = uint32(1) payload1 = "hello world" payload2 = "goodbye world" ) func TestValidateMessage(t *testing.T) { - privKey1, peerId1 := newKeyPair(t) - _, peerId2 := newKeyPair(t) + privKey1, peerID1 := newKeyPair(t) + _, peerID2 := newKeyPair(t) // valid - p2pMsg := encodeAndSign(t, privKey1, peerId1, peerId2, capId1, donId1, []byte(payload1)) - body, err := remote.ValidateMessage(p2pMsg, peerId2) + p2pMsg := encodeAndSign(t, privKey1, peerID1, peerID2, capID1, donID1, []byte(payload1)) + body, err := remote.ValidateMessage(p2pMsg, peerID2) require.NoError(t, err) - require.Equal(t, peerId1[:], body.Sender) + require.Equal(t, peerID1[:], body.Sender) require.Equal(t, payload1, string(body.Payload)) // invalid sender - p2pMsg = encodeAndSign(t, privKey1, peerId1, peerId2, capId1, donId1, []byte(payload1)) - p2pMsg.Sender = peerId2 - _, err = remote.ValidateMessage(p2pMsg, peerId2) + p2pMsg = encodeAndSign(t, privKey1, peerID1, peerID2, capID1, donID1, []byte(payload1)) + p2pMsg.Sender = peerID2 + _, err = remote.ValidateMessage(p2pMsg, peerID2) require.Error(t, err) // invalid receiver - p2pMsg = encodeAndSign(t, privKey1, peerId1, peerId2, capId1, donId1, []byte(payload1)) - _, err = remote.ValidateMessage(p2pMsg, peerId1) + p2pMsg = encodeAndSign(t, privKey1, peerID1, peerID2, capID1, donID1, []byte(payload1)) + _, err = remote.ValidateMessage(p2pMsg, peerID1) require.Error(t, err) } @@ -58,12 +54,12 @@ func newKeyPair(t *testing.T) (ed25519.PrivateKey, ragetypes.PeerID) { return privKey, peerID } -func encodeAndSign(t *testing.T, senderPrivKey ed25519.PrivateKey, senderId p2ptypes.PeerID, receiverId p2ptypes.PeerID, capabilityId string, donId uint32, payload []byte) p2ptypes.Message { +func encodeAndSign(t *testing.T, senderPrivKey ed25519.PrivateKey, senderID p2ptypes.PeerID, receiverID p2ptypes.PeerID, capabilityID string, donID uint32, payload []byte) p2ptypes.Message { body := remotetypes.MessageBody{ - Sender: senderId[:], - Receiver: receiverId[:], - CapabilityId: capabilityId, - CapabilityDonId: donId, + Sender: senderID[:], + Receiver: receiverID[:], + CapabilityId: capabilityID, + CapabilityDonId: donID, Payload: payload, } rawBody, err := proto.Marshal(&body) @@ -78,7 +74,7 @@ func encodeAndSign(t *testing.T, senderPrivKey ed25519.PrivateKey, senderId p2pt require.NoError(t, err) return p2ptypes.Message{ - Sender: senderId, + Sender: senderID, Payload: rawMsg, } } @@ -89,41 +85,6 @@ func TestToPeerID(t *testing.T) { require.Equal(t, "12D3KooWD8QYTQVYjB6oog4Ej8PcPpqTrPRnxLQap8yY8KUQRVvq", id.String()) } -func TestDefaultModeAggregator_Aggregate(t *testing.T) { - val, err := values.NewMap(triggerEvent1) - require.NoError(t, err) - capResponse1 := commoncap.TriggerResponse{ - Event: commoncap.TriggerEvent{ - Outputs: val, - }, - Err: nil, - } - marshaled1, err := pb.MarshalTriggerResponse(capResponse1) - require.NoError(t, err) - - val2, err := values.NewMap(triggerEvent2) - require.NoError(t, err) - capResponse2 := commoncap.TriggerResponse{ - Event: commoncap.TriggerEvent{ - Outputs: val2, - }, - Err: nil, - } - marshaled2, err := pb.MarshalTriggerResponse(capResponse2) - require.NoError(t, err) - - agg := remote.NewDefaultModeAggregator(2) - _, err = agg.Aggregate("", [][]byte{marshaled1}) - require.Error(t, err) - - _, err = agg.Aggregate("", [][]byte{marshaled1, marshaled2}) - require.Error(t, err) - - res, err := agg.Aggregate("", [][]byte{marshaled1, marshaled2, marshaled1}) - require.NoError(t, err) - require.Equal(t, res, capResponse1) -} - func TestSanitizeLogString(t *testing.T) { require.Equal(t, "hello", remote.SanitizeLogString("hello")) require.Equal(t, "[UNPRINTABLE] 0a", remote.SanitizeLogString("\n")) diff --git a/core/scripts/go.mod b/core/scripts/go.mod index fe37eb853a8..972dd4ff4a2 100644 --- a/core/scripts/go.mod +++ b/core/scripts/go.mod @@ -33,7 +33,7 @@ require ( github.com/prometheus/client_golang v1.20.5 github.com/shopspring/decimal v1.4.0 github.com/smartcontractkit/chainlink-automation v0.8.1 - github.com/smartcontractkit/chainlink-common v0.3.1-0.20241214155818-b403079b2805 + github.com/smartcontractkit/chainlink-common v0.4.1-0.20241217120918-bbe318cd0760 github.com/smartcontractkit/libocr v0.0.0-20241007185508-adbe57025f12 github.com/spf13/cobra v1.8.1 github.com/spf13/viper v1.19.0 diff --git a/core/scripts/go.sum b/core/scripts/go.sum index 7dfd0fe5097..3338519266d 100644 --- a/core/scripts/go.sum +++ b/core/scripts/go.sum @@ -1150,8 +1150,8 @@ github.com/smartcontractkit/chainlink-automation v0.8.1 h1:sTc9LKpBvcKPc1JDYAmgB github.com/smartcontractkit/chainlink-automation v0.8.1/go.mod h1:Iij36PvWZ6blrdC5A/nrQUBuf3MH3JvsBB9sSyc9W08= github.com/smartcontractkit/chainlink-ccip v0.0.0-20241218114855-f74219171000 h1:6Zzr/R1j6P7bbvcUlt5WUIbItvrrGdGzIsiAzQezcwo= github.com/smartcontractkit/chainlink-ccip v0.0.0-20241218114855-f74219171000/go.mod h1:ncjd6mPZSRlelEqH/2KeLE1pU3UlqzBSn8RYkEoECzY= -github.com/smartcontractkit/chainlink-common v0.3.1-0.20241214155818-b403079b2805 h1:Pz8jB/6qe10xT10h2S3LFYJrnebNpG5rJ/w16HZGwPQ= -github.com/smartcontractkit/chainlink-common v0.3.1-0.20241214155818-b403079b2805/go.mod h1:yti7e1+G9hhkYhj+L5sVUULn9Bn3bBL5/AxaNqdJ5YQ= +github.com/smartcontractkit/chainlink-common v0.4.1-0.20241217120918-bbe318cd0760 h1:lB5A3TP0zOVuu1n0kEm6d8/o/4Knh6HLvsU/GChk+sI= +github.com/smartcontractkit/chainlink-common v0.4.1-0.20241217120918-bbe318cd0760/go.mod h1:yti7e1+G9hhkYhj+L5sVUULn9Bn3bBL5/AxaNqdJ5YQ= github.com/smartcontractkit/chainlink-cosmos v0.5.2-0.20241202195413-82468150ac1e h1:PRoeby6ZlTuTkv2f+7tVU4+zboTfRzI+beECynF4JQ0= github.com/smartcontractkit/chainlink-cosmos v0.5.2-0.20241202195413-82468150ac1e/go.mod h1:mUh5/woemsVaHgTorA080hrYmO3syBCmPdnWc/5dOqk= github.com/smartcontractkit/chainlink-data-streams v0.1.1-0.20241216163550-fa030d178ba3 h1:aeiBdBHGY8QNftps+VqrIk6OnfeeOD5z4jrAabW4ZSc= diff --git a/deployment/go.mod b/deployment/go.mod index 0f9e66bbb75..d618c38e838 100644 --- a/deployment/go.mod +++ b/deployment/go.mod @@ -29,7 +29,7 @@ require ( github.com/smartcontractkit/ccip-owner-contracts v0.0.0-salt-fix github.com/smartcontractkit/chain-selectors v1.0.34 github.com/smartcontractkit/chainlink-ccip v0.0.0-20241218114855-f74219171000 - github.com/smartcontractkit/chainlink-common v0.3.1-0.20241214155818-b403079b2805 + github.com/smartcontractkit/chainlink-common v0.4.1-0.20241217120918-bbe318cd0760 github.com/smartcontractkit/chainlink-protos/job-distributor v0.6.0 github.com/smartcontractkit/chainlink-testing-framework/lib v1.50.13 github.com/smartcontractkit/libocr v0.0.0-20241007185508-adbe57025f12 diff --git a/deployment/go.sum b/deployment/go.sum index 8e4feda0e82..76546853c38 100644 --- a/deployment/go.sum +++ b/deployment/go.sum @@ -1417,8 +1417,8 @@ github.com/smartcontractkit/chainlink-automation v0.8.1 h1:sTc9LKpBvcKPc1JDYAmgB github.com/smartcontractkit/chainlink-automation v0.8.1/go.mod h1:Iij36PvWZ6blrdC5A/nrQUBuf3MH3JvsBB9sSyc9W08= github.com/smartcontractkit/chainlink-ccip v0.0.0-20241218114855-f74219171000 h1:6Zzr/R1j6P7bbvcUlt5WUIbItvrrGdGzIsiAzQezcwo= github.com/smartcontractkit/chainlink-ccip v0.0.0-20241218114855-f74219171000/go.mod h1:ncjd6mPZSRlelEqH/2KeLE1pU3UlqzBSn8RYkEoECzY= -github.com/smartcontractkit/chainlink-common v0.3.1-0.20241214155818-b403079b2805 h1:Pz8jB/6qe10xT10h2S3LFYJrnebNpG5rJ/w16HZGwPQ= -github.com/smartcontractkit/chainlink-common v0.3.1-0.20241214155818-b403079b2805/go.mod h1:yti7e1+G9hhkYhj+L5sVUULn9Bn3bBL5/AxaNqdJ5YQ= +github.com/smartcontractkit/chainlink-common v0.4.1-0.20241217120918-bbe318cd0760 h1:lB5A3TP0zOVuu1n0kEm6d8/o/4Knh6HLvsU/GChk+sI= +github.com/smartcontractkit/chainlink-common v0.4.1-0.20241217120918-bbe318cd0760/go.mod h1:yti7e1+G9hhkYhj+L5sVUULn9Bn3bBL5/AxaNqdJ5YQ= github.com/smartcontractkit/chainlink-cosmos v0.5.2-0.20241202195413-82468150ac1e h1:PRoeby6ZlTuTkv2f+7tVU4+zboTfRzI+beECynF4JQ0= github.com/smartcontractkit/chainlink-cosmos v0.5.2-0.20241202195413-82468150ac1e/go.mod h1:mUh5/woemsVaHgTorA080hrYmO3syBCmPdnWc/5dOqk= github.com/smartcontractkit/chainlink-data-streams v0.1.1-0.20241216163550-fa030d178ba3 h1:aeiBdBHGY8QNftps+VqrIk6OnfeeOD5z4jrAabW4ZSc= diff --git a/go.mod b/go.mod index da089c15665..e97f949b065 100644 --- a/go.mod +++ b/go.mod @@ -79,7 +79,7 @@ require ( github.com/smartcontractkit/chain-selectors v1.0.34 github.com/smartcontractkit/chainlink-automation v0.8.1 github.com/smartcontractkit/chainlink-ccip v0.0.0-20241218114855-f74219171000 - github.com/smartcontractkit/chainlink-common v0.3.1-0.20241214155818-b403079b2805 + github.com/smartcontractkit/chainlink-common v0.4.1-0.20241217120918-bbe318cd0760 github.com/smartcontractkit/chainlink-cosmos v0.5.2-0.20241202195413-82468150ac1e github.com/smartcontractkit/chainlink-data-streams v0.1.1-0.20241216163550-fa030d178ba3 github.com/smartcontractkit/chainlink-feeds v0.1.1 diff --git a/go.sum b/go.sum index 28392668c87..5d3b9db6b21 100644 --- a/go.sum +++ b/go.sum @@ -1139,8 +1139,8 @@ github.com/smartcontractkit/chainlink-automation v0.8.1 h1:sTc9LKpBvcKPc1JDYAmgB github.com/smartcontractkit/chainlink-automation v0.8.1/go.mod h1:Iij36PvWZ6blrdC5A/nrQUBuf3MH3JvsBB9sSyc9W08= github.com/smartcontractkit/chainlink-ccip v0.0.0-20241218114855-f74219171000 h1:6Zzr/R1j6P7bbvcUlt5WUIbItvrrGdGzIsiAzQezcwo= github.com/smartcontractkit/chainlink-ccip v0.0.0-20241218114855-f74219171000/go.mod h1:ncjd6mPZSRlelEqH/2KeLE1pU3UlqzBSn8RYkEoECzY= -github.com/smartcontractkit/chainlink-common v0.3.1-0.20241214155818-b403079b2805 h1:Pz8jB/6qe10xT10h2S3LFYJrnebNpG5rJ/w16HZGwPQ= -github.com/smartcontractkit/chainlink-common v0.3.1-0.20241214155818-b403079b2805/go.mod h1:yti7e1+G9hhkYhj+L5sVUULn9Bn3bBL5/AxaNqdJ5YQ= +github.com/smartcontractkit/chainlink-common v0.4.1-0.20241217120918-bbe318cd0760 h1:lB5A3TP0zOVuu1n0kEm6d8/o/4Knh6HLvsU/GChk+sI= +github.com/smartcontractkit/chainlink-common v0.4.1-0.20241217120918-bbe318cd0760/go.mod h1:yti7e1+G9hhkYhj+L5sVUULn9Bn3bBL5/AxaNqdJ5YQ= github.com/smartcontractkit/chainlink-cosmos v0.5.2-0.20241202195413-82468150ac1e h1:PRoeby6ZlTuTkv2f+7tVU4+zboTfRzI+beECynF4JQ0= github.com/smartcontractkit/chainlink-cosmos v0.5.2-0.20241202195413-82468150ac1e/go.mod h1:mUh5/woemsVaHgTorA080hrYmO3syBCmPdnWc/5dOqk= github.com/smartcontractkit/chainlink-data-streams v0.1.1-0.20241216163550-fa030d178ba3 h1:aeiBdBHGY8QNftps+VqrIk6OnfeeOD5z4jrAabW4ZSc= diff --git a/integration-tests/go.mod b/integration-tests/go.mod index 678598a26c5..1f35241e461 100644 --- a/integration-tests/go.mod +++ b/integration-tests/go.mod @@ -47,7 +47,7 @@ require ( github.com/smartcontractkit/chain-selectors v1.0.34 github.com/smartcontractkit/chainlink-automation v0.8.1 github.com/smartcontractkit/chainlink-ccip v0.0.0-20241218114855-f74219171000 - github.com/smartcontractkit/chainlink-common v0.3.1-0.20241214155818-b403079b2805 + github.com/smartcontractkit/chainlink-common v0.4.1-0.20241217120918-bbe318cd0760 github.com/smartcontractkit/chainlink-protos/job-distributor v0.6.0 github.com/smartcontractkit/chainlink-testing-framework/havoc v1.50.2 github.com/smartcontractkit/chainlink-testing-framework/lib v1.50.19 diff --git a/integration-tests/go.sum b/integration-tests/go.sum index d846ff5f136..c7cca9094e0 100644 --- a/integration-tests/go.sum +++ b/integration-tests/go.sum @@ -1438,8 +1438,8 @@ github.com/smartcontractkit/chainlink-automation v0.8.1 h1:sTc9LKpBvcKPc1JDYAmgB github.com/smartcontractkit/chainlink-automation v0.8.1/go.mod h1:Iij36PvWZ6blrdC5A/nrQUBuf3MH3JvsBB9sSyc9W08= github.com/smartcontractkit/chainlink-ccip v0.0.0-20241218114855-f74219171000 h1:6Zzr/R1j6P7bbvcUlt5WUIbItvrrGdGzIsiAzQezcwo= github.com/smartcontractkit/chainlink-ccip v0.0.0-20241218114855-f74219171000/go.mod h1:ncjd6mPZSRlelEqH/2KeLE1pU3UlqzBSn8RYkEoECzY= -github.com/smartcontractkit/chainlink-common v0.3.1-0.20241214155818-b403079b2805 h1:Pz8jB/6qe10xT10h2S3LFYJrnebNpG5rJ/w16HZGwPQ= -github.com/smartcontractkit/chainlink-common v0.3.1-0.20241214155818-b403079b2805/go.mod h1:yti7e1+G9hhkYhj+L5sVUULn9Bn3bBL5/AxaNqdJ5YQ= +github.com/smartcontractkit/chainlink-common v0.4.1-0.20241217120918-bbe318cd0760 h1:lB5A3TP0zOVuu1n0kEm6d8/o/4Knh6HLvsU/GChk+sI= +github.com/smartcontractkit/chainlink-common v0.4.1-0.20241217120918-bbe318cd0760/go.mod h1:yti7e1+G9hhkYhj+L5sVUULn9Bn3bBL5/AxaNqdJ5YQ= github.com/smartcontractkit/chainlink-cosmos v0.5.2-0.20241202195413-82468150ac1e h1:PRoeby6ZlTuTkv2f+7tVU4+zboTfRzI+beECynF4JQ0= github.com/smartcontractkit/chainlink-cosmos v0.5.2-0.20241202195413-82468150ac1e/go.mod h1:mUh5/woemsVaHgTorA080hrYmO3syBCmPdnWc/5dOqk= github.com/smartcontractkit/chainlink-data-streams v0.1.1-0.20241216163550-fa030d178ba3 h1:aeiBdBHGY8QNftps+VqrIk6OnfeeOD5z4jrAabW4ZSc= diff --git a/integration-tests/load/go.mod b/integration-tests/load/go.mod index e77f701bf12..3f0cf7e952e 100644 --- a/integration-tests/load/go.mod +++ b/integration-tests/load/go.mod @@ -27,7 +27,7 @@ require ( github.com/pkg/errors v0.9.1 github.com/rs/zerolog v1.33.0 github.com/slack-go/slack v0.15.0 - github.com/smartcontractkit/chainlink-common v0.3.1-0.20241214155818-b403079b2805 + github.com/smartcontractkit/chainlink-common v0.4.1-0.20241217120918-bbe318cd0760 github.com/smartcontractkit/chainlink-testing-framework/lib v1.50.19 github.com/smartcontractkit/chainlink-testing-framework/seth v1.50.9 github.com/smartcontractkit/chainlink-testing-framework/wasp v1.50.2 diff --git a/integration-tests/load/go.sum b/integration-tests/load/go.sum index 6c2117a6e7c..64591b58222 100644 --- a/integration-tests/load/go.sum +++ b/integration-tests/load/go.sum @@ -1429,8 +1429,8 @@ github.com/smartcontractkit/chainlink-automation v0.8.1 h1:sTc9LKpBvcKPc1JDYAmgB github.com/smartcontractkit/chainlink-automation v0.8.1/go.mod h1:Iij36PvWZ6blrdC5A/nrQUBuf3MH3JvsBB9sSyc9W08= github.com/smartcontractkit/chainlink-ccip v0.0.0-20241218114855-f74219171000 h1:6Zzr/R1j6P7bbvcUlt5WUIbItvrrGdGzIsiAzQezcwo= github.com/smartcontractkit/chainlink-ccip v0.0.0-20241218114855-f74219171000/go.mod h1:ncjd6mPZSRlelEqH/2KeLE1pU3UlqzBSn8RYkEoECzY= -github.com/smartcontractkit/chainlink-common v0.3.1-0.20241214155818-b403079b2805 h1:Pz8jB/6qe10xT10h2S3LFYJrnebNpG5rJ/w16HZGwPQ= -github.com/smartcontractkit/chainlink-common v0.3.1-0.20241214155818-b403079b2805/go.mod h1:yti7e1+G9hhkYhj+L5sVUULn9Bn3bBL5/AxaNqdJ5YQ= +github.com/smartcontractkit/chainlink-common v0.4.1-0.20241217120918-bbe318cd0760 h1:lB5A3TP0zOVuu1n0kEm6d8/o/4Knh6HLvsU/GChk+sI= +github.com/smartcontractkit/chainlink-common v0.4.1-0.20241217120918-bbe318cd0760/go.mod h1:yti7e1+G9hhkYhj+L5sVUULn9Bn3bBL5/AxaNqdJ5YQ= github.com/smartcontractkit/chainlink-cosmos v0.5.2-0.20241202195413-82468150ac1e h1:PRoeby6ZlTuTkv2f+7tVU4+zboTfRzI+beECynF4JQ0= github.com/smartcontractkit/chainlink-cosmos v0.5.2-0.20241202195413-82468150ac1e/go.mod h1:mUh5/woemsVaHgTorA080hrYmO3syBCmPdnWc/5dOqk= github.com/smartcontractkit/chainlink-data-streams v0.1.1-0.20241216163550-fa030d178ba3 h1:aeiBdBHGY8QNftps+VqrIk6OnfeeOD5z4jrAabW4ZSc=