Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Preperation for register unregister execution capability #15760

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion core/capabilities/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-common/pkg/values"
"github.com/smartcontractkit/chainlink/v2/core/capabilities/remote"
"github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/aggregation"
"github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/executable"
remotetypes "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types"
"github.com/smartcontractkit/chainlink/v2/core/capabilities/streams"
Expand Down Expand Up @@ -280,7 +281,7 @@ func (w *launcher) addRemoteCapabilities(ctx context.Context, myDON registrysync
w.lggr,
)
} else {
aggregator = remote.NewDefaultModeAggregator(uint32(remoteDON.F) + 1)
aggregator = aggregation.NewDefaultModeAggregator(uint32(remoteDON.F) + 1)
}

// TODO: We need to implement a custom, Mercury-specific
Expand Down
58 changes: 58 additions & 0 deletions core/capabilities/remote/aggregation/default_mode.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package aggregation

import (
"crypto/sha256"
"encoding/hex"
"errors"
"fmt"

commoncap "github.com/smartcontractkit/chainlink-common/pkg/capabilities"
"github.com/smartcontractkit/chainlink-common/pkg/capabilities/pb"
remotetypes "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types"
)

// Default MODE Aggregator needs a configurable number of identical responses for aggregation to succeed
type defaultModeAggregator struct {
minIdenticalResponses uint32
}

var _ remotetypes.Aggregator = &defaultModeAggregator{}

func NewDefaultModeAggregator(minIdenticalResponses uint32) *defaultModeAggregator {
return &defaultModeAggregator{
minIdenticalResponses: minIdenticalResponses,
}
}

func (a *defaultModeAggregator) Aggregate(_ string, responses [][]byte) (commoncap.TriggerResponse, error) {
found, err := AggregateModeRaw(responses, a.minIdenticalResponses)
if err != nil {
return commoncap.TriggerResponse{}, fmt.Errorf("failed to aggregate responses, err: %w", err)
}

unmarshaled, err := pb.UnmarshalTriggerResponse(found)
if err != nil {
return commoncap.TriggerResponse{}, fmt.Errorf("failed to unmarshal aggregated responses, err: %w", err)
}
return unmarshaled, nil
}

func AggregateModeRaw(elemList [][]byte, minIdenticalResponses uint32) ([]byte, error) {
hashToCount := make(map[string]uint32)
var found []byte
for _, elem := range elemList {
hasher := sha256.New()
hasher.Write(elem)
sha := hex.EncodeToString(hasher.Sum(nil))
hashToCount[sha]++
if hashToCount[sha] >= minIdenticalResponses {
found = elem
// update in case we find another elem with an even higher count
minIdenticalResponses = hashToCount[sha]
}
}
if found == nil {
return nil, errors.New("not enough identical responses found")
}
return found, nil
}
51 changes: 51 additions & 0 deletions core/capabilities/remote/aggregation/default_mode_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package aggregation

import (
"testing"

"github.com/stretchr/testify/require"

commoncap "github.com/smartcontractkit/chainlink-common/pkg/capabilities"
"github.com/smartcontractkit/chainlink-common/pkg/capabilities/pb"
"github.com/smartcontractkit/chainlink-common/pkg/values"
)

var (
triggerEvent1 = map[string]any{"event": "triggerEvent1"}
triggerEvent2 = map[string]any{"event": "triggerEvent2"}
)

func TestDefaultModeAggregator_Aggregate(t *testing.T) {
val, err := values.NewMap(triggerEvent1)
require.NoError(t, err)
capResponse1 := commoncap.TriggerResponse{
Event: commoncap.TriggerEvent{
Outputs: val,
},
Err: nil,
}
marshaled1, err := pb.MarshalTriggerResponse(capResponse1)
require.NoError(t, err)

val2, err := values.NewMap(triggerEvent2)
require.NoError(t, err)
capResponse2 := commoncap.TriggerResponse{
Event: commoncap.TriggerEvent{
Outputs: val2,
},
Err: nil,
}
marshaled2, err := pb.MarshalTriggerResponse(capResponse2)
require.NoError(t, err)

agg := NewDefaultModeAggregator(2)
_, err = agg.Aggregate("", [][]byte{marshaled1})
require.Error(t, err)

_, err = agg.Aggregate("", [][]byte{marshaled1, marshaled2})
require.Error(t, err)

res, err := agg.Aggregate("", [][]byte{marshaled1, marshaled2, marshaled1})
require.NoError(t, err)
require.Equal(t, res, capResponse1)
}
28 changes: 15 additions & 13 deletions core/capabilities/remote/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package remote
import (
"context"
"fmt"
"strconv"
"sync"
"time"

Expand Down Expand Up @@ -42,8 +43,8 @@ type dispatcher struct {
}

type key struct {
capId string
donId uint32
capID string
donID uint32
}

var _ services.Service = &dispatcher{}
Expand Down Expand Up @@ -74,7 +75,7 @@ func (d *dispatcher) Start(ctx context.Context) error {
d.peer = d.peerWrapper.GetPeer()
d.peerID = d.peer.ID()
if d.peer == nil {
return fmt.Errorf("peer is not initialized")
return errors.New("peer is not initialized")
}
d.wg.Add(1)
go func() {
Expand Down Expand Up @@ -103,13 +104,13 @@ type receiver struct {
ch chan *types.MessageBody
}

func (d *dispatcher) SetReceiver(capabilityId string, donId uint32, rec types.Receiver) error {
func (d *dispatcher) SetReceiver(capabilityID string, donID uint32, rec types.Receiver) error {
d.mu.Lock()
defer d.mu.Unlock()
k := key{capabilityId, donId}
k := key{capabilityID, donID}
_, ok := d.receivers[k]
if ok {
return fmt.Errorf("%w: receiver already exists for capability %s and don %d", ErrReceiverExists, capabilityId, donId)
return fmt.Errorf("%w: receiver already exists for capability %s and don %d", ErrReceiverExists, capabilityID, donID)
}

receiverCh := make(chan *types.MessageBody, d.cfg.ReceiverBufferSize())
Expand All @@ -134,23 +135,24 @@ func (d *dispatcher) SetReceiver(capabilityId string, donId uint32, rec types.Re
ch: receiverCh,
}

d.lggr.Debugw("receiver set", "capabilityId", capabilityId, "donId", donId)
d.lggr.Debugw("receiver set", "capabilityId", capabilityID, "donId", donID)
return nil
}

func (d *dispatcher) RemoveReceiver(capabilityId string, donId uint32) {
func (d *dispatcher) RemoveReceiver(capabilityID string, donID uint32) {
d.mu.Lock()
defer d.mu.Unlock()

receiverKey := key{capabilityId, donId}
receiverKey := key{capabilityID, donID}
if receiver, ok := d.receivers[receiverKey]; ok {
receiver.cancel()
delete(d.receivers, receiverKey)
d.lggr.Debugw("receiver removed", "capabilityId", capabilityId, "donId", donId)
d.lggr.Debugw("receiver removed", "capabilityId", capabilityID, "donId", donID)
}
}

func (d *dispatcher) Send(peerID p2ptypes.PeerID, msgBody *types.MessageBody) error {
//nolint:gosec // disable G115
msgBody.Version = uint32(d.cfg.SupportedVersion())
msgBody.Sender = d.peerID[:]
msgBody.Receiver = peerID[:]
Expand Down Expand Up @@ -194,17 +196,17 @@ func (d *dispatcher) receive() {
receiver, ok := d.receivers[k]
d.mu.RUnlock()
if !ok {
d.lggr.Debugw("received message for unregistered capability", "capabilityId", SanitizeLogString(k.capId), "donId", k.donId)
d.lggr.Debugw("received message for unregistered capability", "capabilityId", SanitizeLogString(k.capID), "donId", k.donID)
d.tryRespondWithError(msg.Sender, body, types.Error_CAPABILITY_NOT_FOUND)
continue
}

receiverQueueUsage := float64(len(receiver.ch)) / float64(d.cfg.ReceiverBufferSize())
capReceiveChannelUsage.WithLabelValues(k.capId, fmt.Sprint(k.donId)).Set(receiverQueueUsage)
capReceiveChannelUsage.WithLabelValues(k.capID, strconv.FormatUint(uint64(k.donID), 10)).Set(receiverQueueUsage)
select {
case receiver.ch <- body:
default:
d.lggr.Warnw("receiver channel full, dropping message", "capabilityId", k.capId, "donId", k.donId)
d.lggr.Warnw("receiver channel full, dropping message", "capabilityId", k.capID, "donId", k.donID)
}
}
}
Expand Down
30 changes: 15 additions & 15 deletions core/capabilities/remote/dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,13 +104,13 @@ func TestDispatcher_CleanStartClose(t *testing.T) {
func TestDispatcher_Receive(t *testing.T) {
lggr := logger.TestLogger(t)
ctx := testutils.Context(t)
privKey1, peerId1 := newKeyPair(t)
_, peerId2 := newKeyPair(t)
privKey1, peerID1 := newKeyPair(t)
_, peerID2 := newKeyPair(t)

peer := mocks.NewPeer(t)
recvCh := make(chan p2ptypes.Message)
peer.On("Receive", mock.Anything).Return((<-chan p2ptypes.Message)(recvCh))
peer.On("ID", mock.Anything).Return(peerId2)
peer.On("ID", mock.Anything).Return(peerID2)
wrapper := mocks.NewPeerWrapper(t)
wrapper.On("GetPeer").Return(peer)
signer := mocks.NewSigner(t)
Expand All @@ -131,39 +131,39 @@ func TestDispatcher_Receive(t *testing.T) {
require.NoError(t, dispatcher.Start(ctx))

rcv := newReceiver()
err = dispatcher.SetReceiver(capId1, donId1, rcv)
err = dispatcher.SetReceiver(capID1, donID1, rcv)
require.NoError(t, err)

// supported capability
recvCh <- encodeAndSign(t, privKey1, peerId1, peerId2, capId1, donId1, []byte(payload1))
recvCh <- encodeAndSign(t, privKey1, peerID1, peerID2, capID1, donID1, []byte(payload1))
// unknown capability
recvCh <- encodeAndSign(t, privKey1, peerId1, peerId2, capId2, donId1, []byte(payload1))
recvCh <- encodeAndSign(t, privKey1, peerID1, peerID2, capID2, donID1, []byte(payload1))
// sender doesn't match
invalid := encodeAndSign(t, privKey1, peerId1, peerId2, capId2, donId1, []byte(payload1))
invalid.Sender = peerId2
invalid := encodeAndSign(t, privKey1, peerID1, peerID2, capID2, donID1, []byte(payload1))
invalid.Sender = peerID2
recvCh <- invalid
// supported capability again
recvCh <- encodeAndSign(t, privKey1, peerId1, peerId2, capId1, donId1, []byte(payload2))
recvCh <- encodeAndSign(t, privKey1, peerID1, peerID2, capID1, donID1, []byte(payload2))

m := <-rcv.ch
require.Equal(t, payload1, string(m.Payload))
m = <-rcv.ch
require.Equal(t, payload2, string(m.Payload))

dispatcher.RemoveReceiver(capId1, donId1)
dispatcher.RemoveReceiver(capID1, donID1)
require.NoError(t, dispatcher.Close())
}

func TestDispatcher_RespondWithError(t *testing.T) {
lggr := logger.TestLogger(t)
ctx := testutils.Context(t)
privKey1, peerId1 := newKeyPair(t)
_, peerId2 := newKeyPair(t)
privKey1, peerID1 := newKeyPair(t)
_, peerID2 := newKeyPair(t)

peer := mocks.NewPeer(t)
recvCh := make(chan p2ptypes.Message)
peer.On("Receive", mock.Anything).Return((<-chan p2ptypes.Message)(recvCh))
peer.On("ID", mock.Anything).Return(peerId2)
peer.On("ID", mock.Anything).Return(peerID2)
sendCh := make(chan p2ptypes.PeerID)
peer.On("Send", mock.Anything, mock.Anything).Run(func(args mock.Arguments) {
peerID := args.Get(0).(p2ptypes.PeerID)
Expand All @@ -189,9 +189,9 @@ func TestDispatcher_RespondWithError(t *testing.T) {
require.NoError(t, dispatcher.Start(ctx))

// unknown capability
recvCh <- encodeAndSign(t, privKey1, peerId1, peerId2, capId1, donId1, []byte(payload1))
recvCh <- encodeAndSign(t, privKey1, peerID1, peerID2, capID1, donID1, []byte(payload1))
responseDestPeerID := <-sendCh
require.Equal(t, peerId1, responseDestPeerID)
require.Equal(t, peerID1, responseDestPeerID)

require.NoError(t, dispatcher.Close())
}
34 changes: 4 additions & 30 deletions core/capabilities/remote/executable/endtoend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,9 +224,9 @@ func (a *testAsyncMessageBroker) start(ctx context.Context) error {
case <-ctx.Done():
return
case msg := <-a.sendCh:
receiverId := toPeerID(msg.Receiver)
receiverID := toPeerID(msg.Receiver)

receiver, ok := a.nodes[receiverId]
receiver, ok := a.nodes[receiverID]
if !ok {
panic("server not found for peer id")
}
Expand Down Expand Up @@ -299,10 +299,10 @@ func (t *nodeDispatcher) Send(peerID p2ptypes.PeerID, msgBody *remotetypes.Messa
return nil
}

func (t *nodeDispatcher) SetReceiver(capabilityId string, donId uint32, receiver remotetypes.Receiver) error {
func (t *nodeDispatcher) SetReceiver(capabilityID string, donID uint32, receiver remotetypes.Receiver) error {
return nil
}
func (t *nodeDispatcher) RemoveReceiver(capabilityId string, donId uint32) {}
func (t *nodeDispatcher) RemoveReceiver(capabilityID string, donID uint32) {}

type abstractTestCapability struct {
}
Expand Down Expand Up @@ -407,29 +407,3 @@ func executeCapability(ctx context.Context, t *testing.T, caller commoncap.Execu

responseTest(t, response, err)
}

func registerWorkflow(ctx context.Context, t *testing.T, caller commoncap.ExecutableCapability, transmissionSchedule *values.Map, responseTest func(t *testing.T, responseError error)) {
err := caller.RegisterToWorkflow(ctx, commoncap.RegisterToWorkflowRequest{
Metadata: commoncap.RegistrationMetadata{
WorkflowID: workflowID1,
ReferenceID: stepReferenceID1,
WorkflowOwner: workflowOwnerID,
},
Config: transmissionSchedule,
})

responseTest(t, err)
}

func unregisterWorkflow(ctx context.Context, t *testing.T, caller commoncap.ExecutableCapability, transmissionSchedule *values.Map, responseTest func(t *testing.T, responseError error)) {
err := caller.UnregisterFromWorkflow(ctx, commoncap.UnregisterFromWorkflowRequest{
Metadata: commoncap.RegistrationMetadata{
WorkflowID: workflowID1,
ReferenceID: stepReferenceID1,
WorkflowOwner: workflowOwnerID,
},
Config: transmissionSchedule,
})

responseTest(t, err)
}
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ func (c *ClientRequest) OnMessage(_ context.Context, msg *types.MessageBody) err
}

if msg.Sender == nil {
return fmt.Errorf("sender missing from message")
return errors.New("sender missing from message")
}

c.lggr.Debugw("OnMessage called for client request", "messageID", msg.MessageId)
Expand Down
Loading
Loading