Skip to content

Commit

Permalink
Preperation for register unregister execution capability (#15760)
Browse files Browse the repository at this point in the history
* common bump

* move aggregator

* message cache to own package

* lint

* mod tidy

* lint

* generate

* revert dispatcher revert

* dispatcher mock
  • Loading branch information
ettec authored Dec 18, 2024
1 parent e1b27f3 commit a01ed0f
Show file tree
Hide file tree
Showing 30 changed files with 287 additions and 284 deletions.
3 changes: 2 additions & 1 deletion core/capabilities/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-common/pkg/values"
"github.com/smartcontractkit/chainlink/v2/core/capabilities/remote"
"github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/aggregation"
"github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/executable"
remotetypes "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types"
"github.com/smartcontractkit/chainlink/v2/core/capabilities/streams"
Expand Down Expand Up @@ -280,7 +281,7 @@ func (w *launcher) addRemoteCapabilities(ctx context.Context, myDON registrysync
w.lggr,
)
} else {
aggregator = remote.NewDefaultModeAggregator(uint32(remoteDON.F) + 1)
aggregator = aggregation.NewDefaultModeAggregator(uint32(remoteDON.F) + 1)
}

// TODO: We need to implement a custom, Mercury-specific
Expand Down
58 changes: 58 additions & 0 deletions core/capabilities/remote/aggregation/default_mode.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package aggregation

import (
"crypto/sha256"
"encoding/hex"
"errors"
"fmt"

commoncap "github.com/smartcontractkit/chainlink-common/pkg/capabilities"
"github.com/smartcontractkit/chainlink-common/pkg/capabilities/pb"
remotetypes "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types"
)

// Default MODE Aggregator needs a configurable number of identical responses for aggregation to succeed
type defaultModeAggregator struct {
minIdenticalResponses uint32
}

var _ remotetypes.Aggregator = &defaultModeAggregator{}

func NewDefaultModeAggregator(minIdenticalResponses uint32) *defaultModeAggregator {
return &defaultModeAggregator{
minIdenticalResponses: minIdenticalResponses,
}
}

func (a *defaultModeAggregator) Aggregate(_ string, responses [][]byte) (commoncap.TriggerResponse, error) {
found, err := AggregateModeRaw(responses, a.minIdenticalResponses)
if err != nil {
return commoncap.TriggerResponse{}, fmt.Errorf("failed to aggregate responses, err: %w", err)
}

unmarshaled, err := pb.UnmarshalTriggerResponse(found)
if err != nil {
return commoncap.TriggerResponse{}, fmt.Errorf("failed to unmarshal aggregated responses, err: %w", err)
}
return unmarshaled, nil
}

func AggregateModeRaw(elemList [][]byte, minIdenticalResponses uint32) ([]byte, error) {
hashToCount := make(map[string]uint32)
var found []byte
for _, elem := range elemList {
hasher := sha256.New()
hasher.Write(elem)
sha := hex.EncodeToString(hasher.Sum(nil))
hashToCount[sha]++
if hashToCount[sha] >= minIdenticalResponses {
found = elem
// update in case we find another elem with an even higher count
minIdenticalResponses = hashToCount[sha]
}
}
if found == nil {
return nil, errors.New("not enough identical responses found")
}
return found, nil
}
51 changes: 51 additions & 0 deletions core/capabilities/remote/aggregation/default_mode_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package aggregation

import (
"testing"

"github.com/stretchr/testify/require"

commoncap "github.com/smartcontractkit/chainlink-common/pkg/capabilities"
"github.com/smartcontractkit/chainlink-common/pkg/capabilities/pb"
"github.com/smartcontractkit/chainlink-common/pkg/values"
)

var (
triggerEvent1 = map[string]any{"event": "triggerEvent1"}
triggerEvent2 = map[string]any{"event": "triggerEvent2"}
)

func TestDefaultModeAggregator_Aggregate(t *testing.T) {
val, err := values.NewMap(triggerEvent1)
require.NoError(t, err)
capResponse1 := commoncap.TriggerResponse{
Event: commoncap.TriggerEvent{
Outputs: val,
},
Err: nil,
}
marshaled1, err := pb.MarshalTriggerResponse(capResponse1)
require.NoError(t, err)

val2, err := values.NewMap(triggerEvent2)
require.NoError(t, err)
capResponse2 := commoncap.TriggerResponse{
Event: commoncap.TriggerEvent{
Outputs: val2,
},
Err: nil,
}
marshaled2, err := pb.MarshalTriggerResponse(capResponse2)
require.NoError(t, err)

agg := NewDefaultModeAggregator(2)
_, err = agg.Aggregate("", [][]byte{marshaled1})
require.Error(t, err)

_, err = agg.Aggregate("", [][]byte{marshaled1, marshaled2})
require.Error(t, err)

res, err := agg.Aggregate("", [][]byte{marshaled1, marshaled2, marshaled1})
require.NoError(t, err)
require.Equal(t, res, capResponse1)
}
28 changes: 15 additions & 13 deletions core/capabilities/remote/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package remote
import (
"context"
"fmt"
"strconv"
"sync"
"time"

Expand Down Expand Up @@ -42,8 +43,8 @@ type dispatcher struct {
}

type key struct {
capId string
donId uint32
capID string
donID uint32
}

var _ services.Service = &dispatcher{}
Expand Down Expand Up @@ -74,7 +75,7 @@ func (d *dispatcher) Start(ctx context.Context) error {
d.peer = d.peerWrapper.GetPeer()
d.peerID = d.peer.ID()
if d.peer == nil {
return fmt.Errorf("peer is not initialized")
return errors.New("peer is not initialized")
}
d.wg.Add(1)
go func() {
Expand Down Expand Up @@ -103,13 +104,13 @@ type receiver struct {
ch chan *types.MessageBody
}

func (d *dispatcher) SetReceiver(capabilityId string, donId uint32, rec types.Receiver) error {
func (d *dispatcher) SetReceiver(capabilityID string, donID uint32, rec types.Receiver) error {
d.mu.Lock()
defer d.mu.Unlock()
k := key{capabilityId, donId}
k := key{capabilityID, donID}
_, ok := d.receivers[k]
if ok {
return fmt.Errorf("%w: receiver already exists for capability %s and don %d", ErrReceiverExists, capabilityId, donId)
return fmt.Errorf("%w: receiver already exists for capability %s and don %d", ErrReceiverExists, capabilityID, donID)
}

receiverCh := make(chan *types.MessageBody, d.cfg.ReceiverBufferSize())
Expand All @@ -134,23 +135,24 @@ func (d *dispatcher) SetReceiver(capabilityId string, donId uint32, rec types.Re
ch: receiverCh,
}

d.lggr.Debugw("receiver set", "capabilityId", capabilityId, "donId", donId)
d.lggr.Debugw("receiver set", "capabilityId", capabilityID, "donId", donID)
return nil
}

func (d *dispatcher) RemoveReceiver(capabilityId string, donId uint32) {
func (d *dispatcher) RemoveReceiver(capabilityID string, donID uint32) {
d.mu.Lock()
defer d.mu.Unlock()

receiverKey := key{capabilityId, donId}
receiverKey := key{capabilityID, donID}
if receiver, ok := d.receivers[receiverKey]; ok {
receiver.cancel()
delete(d.receivers, receiverKey)
d.lggr.Debugw("receiver removed", "capabilityId", capabilityId, "donId", donId)
d.lggr.Debugw("receiver removed", "capabilityId", capabilityID, "donId", donID)
}
}

func (d *dispatcher) Send(peerID p2ptypes.PeerID, msgBody *types.MessageBody) error {
//nolint:gosec // disable G115
msgBody.Version = uint32(d.cfg.SupportedVersion())
msgBody.Sender = d.peerID[:]
msgBody.Receiver = peerID[:]
Expand Down Expand Up @@ -194,17 +196,17 @@ func (d *dispatcher) receive() {
receiver, ok := d.receivers[k]
d.mu.RUnlock()
if !ok {
d.lggr.Debugw("received message for unregistered capability", "capabilityId", SanitizeLogString(k.capId), "donId", k.donId)
d.lggr.Debugw("received message for unregistered capability", "capabilityId", SanitizeLogString(k.capID), "donId", k.donID)
d.tryRespondWithError(msg.Sender, body, types.Error_CAPABILITY_NOT_FOUND)
continue
}

receiverQueueUsage := float64(len(receiver.ch)) / float64(d.cfg.ReceiverBufferSize())
capReceiveChannelUsage.WithLabelValues(k.capId, fmt.Sprint(k.donId)).Set(receiverQueueUsage)
capReceiveChannelUsage.WithLabelValues(k.capID, strconv.FormatUint(uint64(k.donID), 10)).Set(receiverQueueUsage)
select {
case receiver.ch <- body:
default:
d.lggr.Warnw("receiver channel full, dropping message", "capabilityId", k.capId, "donId", k.donId)
d.lggr.Warnw("receiver channel full, dropping message", "capabilityId", k.capID, "donId", k.donID)
}
}
}
Expand Down
30 changes: 15 additions & 15 deletions core/capabilities/remote/dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,13 +104,13 @@ func TestDispatcher_CleanStartClose(t *testing.T) {
func TestDispatcher_Receive(t *testing.T) {
lggr := logger.TestLogger(t)
ctx := testutils.Context(t)
privKey1, peerId1 := newKeyPair(t)
_, peerId2 := newKeyPair(t)
privKey1, peerID1 := newKeyPair(t)
_, peerID2 := newKeyPair(t)

peer := mocks.NewPeer(t)
recvCh := make(chan p2ptypes.Message)
peer.On("Receive", mock.Anything).Return((<-chan p2ptypes.Message)(recvCh))
peer.On("ID", mock.Anything).Return(peerId2)
peer.On("ID", mock.Anything).Return(peerID2)
wrapper := mocks.NewPeerWrapper(t)
wrapper.On("GetPeer").Return(peer)
signer := mocks.NewSigner(t)
Expand All @@ -131,39 +131,39 @@ func TestDispatcher_Receive(t *testing.T) {
require.NoError(t, dispatcher.Start(ctx))

rcv := newReceiver()
err = dispatcher.SetReceiver(capId1, donId1, rcv)
err = dispatcher.SetReceiver(capID1, donID1, rcv)
require.NoError(t, err)

// supported capability
recvCh <- encodeAndSign(t, privKey1, peerId1, peerId2, capId1, donId1, []byte(payload1))
recvCh <- encodeAndSign(t, privKey1, peerID1, peerID2, capID1, donID1, []byte(payload1))
// unknown capability
recvCh <- encodeAndSign(t, privKey1, peerId1, peerId2, capId2, donId1, []byte(payload1))
recvCh <- encodeAndSign(t, privKey1, peerID1, peerID2, capID2, donID1, []byte(payload1))
// sender doesn't match
invalid := encodeAndSign(t, privKey1, peerId1, peerId2, capId2, donId1, []byte(payload1))
invalid.Sender = peerId2
invalid := encodeAndSign(t, privKey1, peerID1, peerID2, capID2, donID1, []byte(payload1))
invalid.Sender = peerID2
recvCh <- invalid
// supported capability again
recvCh <- encodeAndSign(t, privKey1, peerId1, peerId2, capId1, donId1, []byte(payload2))
recvCh <- encodeAndSign(t, privKey1, peerID1, peerID2, capID1, donID1, []byte(payload2))

m := <-rcv.ch
require.Equal(t, payload1, string(m.Payload))
m = <-rcv.ch
require.Equal(t, payload2, string(m.Payload))

dispatcher.RemoveReceiver(capId1, donId1)
dispatcher.RemoveReceiver(capID1, donID1)
require.NoError(t, dispatcher.Close())
}

func TestDispatcher_RespondWithError(t *testing.T) {
lggr := logger.TestLogger(t)
ctx := testutils.Context(t)
privKey1, peerId1 := newKeyPair(t)
_, peerId2 := newKeyPair(t)
privKey1, peerID1 := newKeyPair(t)
_, peerID2 := newKeyPair(t)

peer := mocks.NewPeer(t)
recvCh := make(chan p2ptypes.Message)
peer.On("Receive", mock.Anything).Return((<-chan p2ptypes.Message)(recvCh))
peer.On("ID", mock.Anything).Return(peerId2)
peer.On("ID", mock.Anything).Return(peerID2)
sendCh := make(chan p2ptypes.PeerID)
peer.On("Send", mock.Anything, mock.Anything).Run(func(args mock.Arguments) {
peerID := args.Get(0).(p2ptypes.PeerID)
Expand All @@ -189,9 +189,9 @@ func TestDispatcher_RespondWithError(t *testing.T) {
require.NoError(t, dispatcher.Start(ctx))

// unknown capability
recvCh <- encodeAndSign(t, privKey1, peerId1, peerId2, capId1, donId1, []byte(payload1))
recvCh <- encodeAndSign(t, privKey1, peerID1, peerID2, capID1, donID1, []byte(payload1))
responseDestPeerID := <-sendCh
require.Equal(t, peerId1, responseDestPeerID)
require.Equal(t, peerID1, responseDestPeerID)

require.NoError(t, dispatcher.Close())
}
34 changes: 4 additions & 30 deletions core/capabilities/remote/executable/endtoend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,9 +224,9 @@ func (a *testAsyncMessageBroker) start(ctx context.Context) error {
case <-ctx.Done():
return
case msg := <-a.sendCh:
receiverId := toPeerID(msg.Receiver)
receiverID := toPeerID(msg.Receiver)

receiver, ok := a.nodes[receiverId]
receiver, ok := a.nodes[receiverID]
if !ok {
panic("server not found for peer id")
}
Expand Down Expand Up @@ -299,10 +299,10 @@ func (t *nodeDispatcher) Send(peerID p2ptypes.PeerID, msgBody *remotetypes.Messa
return nil
}

func (t *nodeDispatcher) SetReceiver(capabilityId string, donId uint32, receiver remotetypes.Receiver) error {
func (t *nodeDispatcher) SetReceiver(capabilityID string, donID uint32, receiver remotetypes.Receiver) error {
return nil
}
func (t *nodeDispatcher) RemoveReceiver(capabilityId string, donId uint32) {}
func (t *nodeDispatcher) RemoveReceiver(capabilityID string, donID uint32) {}

type abstractTestCapability struct {
}
Expand Down Expand Up @@ -407,29 +407,3 @@ func executeCapability(ctx context.Context, t *testing.T, caller commoncap.Execu

responseTest(t, response, err)
}

func registerWorkflow(ctx context.Context, t *testing.T, caller commoncap.ExecutableCapability, transmissionSchedule *values.Map, responseTest func(t *testing.T, responseError error)) {
err := caller.RegisterToWorkflow(ctx, commoncap.RegisterToWorkflowRequest{
Metadata: commoncap.RegistrationMetadata{
WorkflowID: workflowID1,
ReferenceID: stepReferenceID1,
WorkflowOwner: workflowOwnerID,
},
Config: transmissionSchedule,
})

responseTest(t, err)
}

func unregisterWorkflow(ctx context.Context, t *testing.T, caller commoncap.ExecutableCapability, transmissionSchedule *values.Map, responseTest func(t *testing.T, responseError error)) {
err := caller.UnregisterFromWorkflow(ctx, commoncap.UnregisterFromWorkflowRequest{
Metadata: commoncap.RegistrationMetadata{
WorkflowID: workflowID1,
ReferenceID: stepReferenceID1,
WorkflowOwner: workflowOwnerID,
},
Config: transmissionSchedule,
})

responseTest(t, err)
}
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ func (c *ClientRequest) OnMessage(_ context.Context, msg *types.MessageBody) err
}

if msg.Sender == nil {
return fmt.Errorf("sender missing from message")
return errors.New("sender missing from message")
}

c.lggr.Debugw("OnMessage called for client request", "messageID", msg.MessageId)
Expand Down
Loading

0 comments on commit a01ed0f

Please sign in to comment.