diff --git a/.changeset/lemon-balloons-pretend.md b/.changeset/lemon-balloons-pretend.md new file mode 100644 index 00000000000..0cb7b41d3a2 --- /dev/null +++ b/.changeset/lemon-balloons-pretend.md @@ -0,0 +1,5 @@ +--- +"chainlink": patch +--- + +Added a RageP2P wrapper diff --git a/core/scripts/go.mod b/core/scripts/go.mod index 15a721dc380..38444af9e82 100644 --- a/core/scripts/go.mod +++ b/core/scripts/go.mod @@ -18,6 +18,7 @@ require ( github.com/montanaflynn/stats v0.7.1 github.com/olekukonko/tablewriter v0.0.5 github.com/pelletier/go-toml/v2 v2.1.1 + github.com/prometheus/client_golang v1.17.0 github.com/shopspring/decimal v1.3.1 github.com/smartcontractkit/chainlink-automation v1.0.2-0.20240118014648-1ab6a88c9429 github.com/smartcontractkit/chainlink-common v0.1.7-0.20240306173252-5cbf83ca3a69 @@ -231,7 +232,6 @@ require ( github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect github.com/pressly/goose/v3 v3.16.0 // indirect - github.com/prometheus/client_golang v1.17.0 // indirect github.com/prometheus/client_model v0.5.0 // indirect github.com/prometheus/common v0.45.0 // indirect github.com/prometheus/procfs v0.12.0 // indirect diff --git a/core/scripts/p2ptoys/common/config.go b/core/scripts/p2ptoys/common/config.go new file mode 100644 index 00000000000..cc5934383eb --- /dev/null +++ b/core/scripts/p2ptoys/common/config.go @@ -0,0 +1,91 @@ +package common + +import ( + "crypto/ed25519" + "encoding/hex" + "encoding/json" + "fmt" + "os" + + "github.com/smartcontractkit/libocr/commontypes" + ragep2ptypes "github.com/smartcontractkit/libocr/ragep2p/types" +) + +type Config struct { + Nodes []string `json:"nodes"` + Bootstrappers []string `json:"bootstrappers"` + + // parsed values below + NodeKeys []ed25519.PrivateKey + NodePeerIDs []ragep2ptypes.PeerID + NodePeerIDsStr []string + + BootstrapperKeys []ed25519.PrivateKey + BootstrapperPeerInfos []ragep2ptypes.PeerInfo + BootstrapperLocators []commontypes.BootstrapperLocator +} + +const ( + // bootstrappers will listen on 127.0.0.1 ports 9000, 9001, 9002, etc. + BootstrapStartPort = 9000 + + // nodes will listen on 127.0.0.1 ports 8000, 8001, 8002, etc. + NodeStartPort = 8000 +) + +func ParseConfigFromFile(fileName string) (*Config, error) { + rawConfig, err := os.ReadFile(fileName) + if err != nil { + return nil, err + } + var config Config + err = json.Unmarshal(rawConfig, &config) + if err != nil { + return nil, err + } + + for _, hexKey := range config.Nodes { + key, peerID, err := parseKey(hexKey) + if err != nil { + return nil, err + } + config.NodeKeys = append(config.NodeKeys, key) + config.NodePeerIDs = append(config.NodePeerIDs, peerID) + config.NodePeerIDsStr = append(config.NodePeerIDsStr, peerID.String()) + } + + for _, hexKey := range config.Bootstrappers { + key, peerID, err := parseKey(hexKey) + if err != nil { + return nil, err + } + config.BootstrapperKeys = append(config.BootstrapperKeys, key) + config.BootstrapperPeerInfos = append(config.BootstrapperPeerInfos, ragep2ptypes.PeerInfo{ID: peerID}) + } + + locators := []commontypes.BootstrapperLocator{} + for id, peerID := range config.BootstrapperPeerInfos { + addr := fmt.Sprintf("127.0.0.1:%d", BootstrapStartPort+id) + locators = append(locators, commontypes.BootstrapperLocator{ + PeerID: peerID.ID.String(), + Addrs: []string{addr}, + }) + config.BootstrapperPeerInfos[id].Addrs = []ragep2ptypes.Address{ragep2ptypes.Address(addr)} + } + config.BootstrapperLocators = locators + + return &config, nil +} + +func parseKey(hexKey string) (ed25519.PrivateKey, ragep2ptypes.PeerID, error) { + b, err := hex.DecodeString(hexKey) + if err != nil { + return nil, ragep2ptypes.PeerID{}, err + } + key := ed25519.PrivateKey(b) + peerID, err := ragep2ptypes.PeerIDFromPrivateKey(key) + if err != nil { + return nil, ragep2ptypes.PeerID{}, err + } + return key, peerID, nil +} diff --git a/core/scripts/p2ptoys/keygen/gen_keys.go b/core/scripts/p2ptoys/keygen/gen_keys.go new file mode 100644 index 00000000000..bcee4a1bad0 --- /dev/null +++ b/core/scripts/p2ptoys/keygen/gen_keys.go @@ -0,0 +1,37 @@ +package main + +import ( + "crypto/ed25519" + "crypto/rand" + "encoding/hex" + "flag" + + "github.com/smartcontractkit/chainlink/v2/core/logger" + + ragep2ptypes "github.com/smartcontractkit/libocr/ragep2p/types" +) + +func main() { + lggr, _ := logger.NewLogger() + + n := flag.Int("n", 1, "how many key pairs to generate") + flag.Parse() + + for i := 0; i < *n; i++ { + pubKey, privKey, err := ed25519.GenerateKey(rand.Reader) + if err != nil { + lggr.Error("error generating key pair ", err) + return + } + lggr.Info("key pair ", i, ":") + lggr.Info("public key ", hex.EncodeToString(pubKey)) + lggr.Info("private key ", hex.EncodeToString(privKey)) + + peerID, err := ragep2ptypes.PeerIDFromPrivateKey(privKey) + if err != nil { + lggr.Error("error generating peer ID ", err) + return + } + lggr.Info("peer ID ", peerID.String()) + } +} diff --git a/core/scripts/p2ptoys/run.go b/core/scripts/p2ptoys/run.go new file mode 100644 index 00000000000..3d0a54f60bf --- /dev/null +++ b/core/scripts/p2ptoys/run.go @@ -0,0 +1,159 @@ +package main + +import ( + "context" + "crypto/ed25519" + "flag" + "fmt" + "os" + "os/signal" + "sync" + "time" + + "github.com/prometheus/client_golang/prometheus" + + "github.com/smartcontractkit/chainlink/core/scripts/p2ptoys/common" + "github.com/smartcontractkit/chainlink/v2/core/logger" + "github.com/smartcontractkit/chainlink/v2/core/services/p2p" + + "github.com/smartcontractkit/libocr/ragep2p" + ragetypes "github.com/smartcontractkit/libocr/ragep2p/types" + + p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types" +) + +/* +Usage: + + go run run.go --bootstrap + go run run.go --index 0 + go run run.go --index 1 + +Observe nodes 0 and 1 discovering each other via the bootstrapper and exchanging messages. +*/ +func main() { + lggr, _ := logger.NewLogger() + ctx, _ := signal.NotifyContext(context.Background(), os.Interrupt) + var shutdownWaitGroup sync.WaitGroup + + configFile := flag.String("config", "test_keys.json", "Path to JSON config file") + nodeIndex := flag.Int("index", 0, "Index of the key in the config file to use") + isBootstrap := flag.Bool("bootstrap", false, "Whether to run as a bootstrapper or not") + flag.Parse() + config, err := common.ParseConfigFromFile(*configFile) + if err != nil { + lggr.Error("error parsing config ", err) + return + } + + // Select this node's private key and listen address from config. + var privateKey ed25519.PrivateKey + var peerIDs []ragetypes.PeerID + var listenAddr string + if *isBootstrap { + privateKey = config.BootstrapperKeys[*nodeIndex] + listenAddr = fmt.Sprintf("127.0.0.1:%d", common.BootstrapStartPort+*nodeIndex) + } else { + privateKey = config.NodeKeys[*nodeIndex] + listenAddr = fmt.Sprintf("127.0.0.1:%d", common.NodeStartPort+*nodeIndex) + } + for _, key := range config.NodeKeys { + peerID, _ := ragetypes.PeerIDFromPrivateKey(key) + peerIDs = append(peerIDs, peerID) + } + + reg := prometheus.NewRegistry() + peerConfig := p2p.PeerConfig{ + PrivateKey: privateKey, + ListenAddresses: []string{listenAddr}, + Bootstrappers: config.BootstrapperPeerInfos, + + DeltaReconcile: time.Second * 5, + DeltaDial: time.Second * 5, + DiscovererDatabase: p2p.NewInMemoryDiscovererDatabase(), + MetricsRegisterer: reg, + } + + peer, err := p2p.NewPeer(peerConfig, lggr) + if err != nil { + lggr.Error("error creating peer:", err) + return + } + err = peer.Start(ctx) + if err != nil { + lggr.Error("error starting peer:", err) + return + } + + peers := make(map[ragetypes.PeerID]p2ptypes.StreamConfig) + for _, peerID := range peerIDs { + peers[peerID] = p2ptypes.StreamConfig{ + IncomingMessageBufferSize: 1000000, + OutgoingMessageBufferSize: 1000000, + MaxMessageLenBytes: 100000, + MessageRateLimiter: ragep2p.TokenBucketParams{ + Rate: 2.0, + Capacity: 10, + }, + BytesRateLimiter: ragep2p.TokenBucketParams{ + Rate: 100000.0, + Capacity: 100000, + }, + } + } + + err = peer.UpdateConnections(peers) + if err != nil { + lggr.Errorw("error updating peer addresses", "error", err) + } + + if !*isBootstrap { + shutdownWaitGroup.Add(2) + go sendLoop(ctx, &shutdownWaitGroup, peer, peerIDs, *nodeIndex, lggr) + go recvLoop(ctx, &shutdownWaitGroup, peer.Receive(), lggr) + } + + <-ctx.Done() + err = peer.Close() + if err != nil { + lggr.Error("error closing peer:", err) + } + shutdownWaitGroup.Wait() +} + +func sendLoop(ctx context.Context, shutdownWaitGroup *sync.WaitGroup, peer p2ptypes.Peer, peerIDs []ragetypes.PeerID, myId int, lggr logger.Logger) { + defer shutdownWaitGroup.Done() + ticker := time.NewTicker(2 * time.Second) + defer ticker.Stop() + lastId := 0 + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + if lastId != myId { + lggr.Infow("sending message", "receiver", peerIDs[lastId]) + err := peer.Send(peerIDs[lastId], []byte("hello!")) + if err != nil { + lggr.Errorw("error sending message", "receiver", peerIDs[lastId], "error", err) + } + } + lastId++ + if lastId >= len(peerIDs) { + lastId = 0 + } + } + } +} + +func recvLoop(ctx context.Context, shutdownWaitGroup *sync.WaitGroup, chRecv <-chan p2ptypes.Message, lggr logger.Logger) { + defer shutdownWaitGroup.Done() + for { + select { + case <-ctx.Done(): + return + case msg := <-chRecv: + lggr.Info("received message from ", msg.Sender, " : ", string(msg.Payload)) + } + } +} diff --git a/core/scripts/p2ptoys/test_keys.json b/core/scripts/p2ptoys/test_keys.json new file mode 100644 index 00000000000..1cb34734e7a --- /dev/null +++ b/core/scripts/p2ptoys/test_keys.json @@ -0,0 +1,11 @@ +{ + "bootstrappers": [ + "105fe90c7b474b9be25e87cce8cd25956bb888215cd8028d6da28a933c8b4124d838facdbdb2e3a2e3e79848b04c3c15815593c2c13ad229eb26a91cd565ab55" + ], + "nodes": [ + "5f45bf580bb7697f1dd23062444d5692fa0f5e60306e6782b172206aed292abf29c9f9b13bf79e30677cb701fab3140ab5a87d0d0d2dc75de500551de60e5e57", + "bc3d7c6479b44e7ce0377e07d5d456ec0089f77e89d2bdd8eaf98fdc602144dd28ff68e88c565ea4034eb09748ac042ec631c78b4d5db7509c8c54d63bb5550c", + "010fbc8ab7c425f80b7dcfab1b41f6e79ddf27168edb88f423eec624119bca7c99605c48f83f799ecb9abaab8630866867aca20cb8fb9e5518d12a7980ec8e0b", + "12b059c77f7d9367b380d778d45ac8a5dfc8a1737769a868e9e2bc1962eb690e3be9fe624419eb71183707abae4dd30ae56f269b010d98a360850673b51e9488" + ] +} \ No newline at end of file diff --git a/core/services/p2p/peer.go b/core/services/p2p/peer.go new file mode 100644 index 00000000000..2ed84f6a3f1 --- /dev/null +++ b/core/services/p2p/peer.go @@ -0,0 +1,228 @@ +package p2p + +import ( + "context" + "crypto/ed25519" + "fmt" + "sync" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/smartcontractkit/libocr/networking/ragedisco" + nettypes "github.com/smartcontractkit/libocr/networking/types" + "github.com/smartcontractkit/libocr/ragep2p" + ragetypes "github.com/smartcontractkit/libocr/ragep2p/types" + + commonlogger "github.com/smartcontractkit/chainlink-common/pkg/logger" + "github.com/smartcontractkit/chainlink-common/pkg/services" + "github.com/smartcontractkit/chainlink/v2/core/logger" + p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types" +) + +var ( + defaultGroupID = [32]byte{0x01, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01} + defaultStreamName = "stream" + defaultRecvChSize = 10000 +) + +type PeerConfig struct { + PrivateKey ed25519.PrivateKey + // List of : addresses. + ListenAddresses []string + // List of : addresses. If empty, defaults to ListenAddresses. + AnnounceAddresses []string + Bootstrappers []ragetypes.PeerInfo + // Every DeltaReconcile a Reconcile message is sent to every peer. + DeltaReconcile time.Duration + // Dial attempts will be at least DeltaDial apart. + DeltaDial time.Duration + DiscovererDatabase nettypes.DiscovererDatabase + MetricsRegisterer prometheus.Registerer +} + +type peer struct { + streams map[ragetypes.PeerID]*ragep2p.Stream + cfg PeerConfig + isBootstrap bool + host *ragep2p.Host + discoverer *ragedisco.Ragep2pDiscoverer + myID ragetypes.PeerID + recvCh chan p2ptypes.Message + + stopCh services.StopChan + wg sync.WaitGroup + lggr logger.Logger +} + +var _ p2ptypes.Peer = &peer{} + +func NewPeer(cfg PeerConfig, lggr logger.Logger) (*peer, error) { + peerID, err := ragetypes.PeerIDFromPrivateKey(cfg.PrivateKey) + if err != nil { + return nil, fmt.Errorf("error extracting v2 peer ID from private key: %w", err) + } + isBootstrap := false + for _, info := range cfg.Bootstrappers { + if info.ID == peerID { + isBootstrap = true + break + } + } + + announceAddresses := cfg.AnnounceAddresses + if len(cfg.AnnounceAddresses) == 0 { + announceAddresses = cfg.ListenAddresses + } + + discoverer := ragedisco.NewRagep2pDiscoverer(cfg.DeltaReconcile, announceAddresses, cfg.DiscovererDatabase, cfg.MetricsRegisterer) + commonLggr := commonlogger.NewOCRWrapper(lggr, true, func(string) {}) + + host, err := ragep2p.NewHost( + ragep2p.HostConfig{DurationBetweenDials: cfg.DeltaDial}, + cfg.PrivateKey, + cfg.ListenAddresses, + discoverer, + commonLggr, + cfg.MetricsRegisterer, + ) + if err != nil { + return nil, fmt.Errorf("failed to construct ragep2p host: %w", err) + } + + return &peer{ + streams: make(map[ragetypes.PeerID]*ragep2p.Stream), + cfg: cfg, + isBootstrap: isBootstrap, + host: host, + discoverer: discoverer, + myID: peerID, + recvCh: make(chan p2ptypes.Message, defaultRecvChSize), + stopCh: make(services.StopChan), + lggr: lggr.Named("P2PPeer"), + }, nil +} + +func (p *peer) UpdateConnections(peers map[ragetypes.PeerID]p2ptypes.StreamConfig) error { + p.lggr.Infow("updating peer addresses", "peers", peers) + if !p.isBootstrap { + if err := p.recreateStreams(peers); err != nil { + return err + } + } + + if err := p.discoverer.RemoveGroup(defaultGroupID); err != nil { + p.lggr.Warnw("failed to remove old group", "groupID", defaultGroupID) + } + peerIDs := []ragetypes.PeerID{} + for pid := range peers { + peerIDs = append(peerIDs, pid) + } + if err := p.discoverer.AddGroup(defaultGroupID, peerIDs, p.cfg.Bootstrappers); err != nil { + p.lggr.Warnw("failed to add group", "groupID", defaultGroupID) + return err + } + return nil +} + +func (p *peer) recreateStreams(peers map[ragetypes.PeerID]p2ptypes.StreamConfig) error { + for pid, cfg := range peers { + pid := pid + if pid == p.myID { // don't create a self-stream + continue + } + _, ok := p.streams[pid] + if ok { // already have a stream with this peer + continue + } + + stream, err := p.host.NewStream( + pid, + defaultStreamName, + cfg.OutgoingMessageBufferSize, + cfg.IncomingMessageBufferSize, + cfg.MaxMessageLenBytes, + cfg.MessageRateLimiter, + cfg.BytesRateLimiter, + ) + if err != nil { + return fmt.Errorf("failed to create stream to peer id: %q: %w", pid, err) + } + p.lggr.Infow("adding peer", "peerID", pid) + p.streams[pid] = stream + p.wg.Add(1) + go p.recvLoopSingle(pid, stream.ReceiveMessages()) + } + // remove obsolete streams + for pid, stream := range p.streams { + _, ok := peers[pid] + if !ok { + p.lggr.Infow("removing peer", "peerID", pid) + delete(p.streams, pid) + err := stream.Close() + if err != nil { + p.lggr.Errorw("failed to close stream", "peerID", pid, "error", err) + } + } + } + return nil +} + +func (p *peer) Start(ctx context.Context) error { + err := p.host.Start() + if err != nil { + return fmt.Errorf("failed to start ragep2p host: %w", err) + } + p.lggr.Info("peer started") + return nil +} + +func (p *peer) recvLoopSingle(pid ragetypes.PeerID, ch <-chan []byte) { + p.lggr.Infow("starting recvLoopSingle", "peerID", pid) + defer p.wg.Done() + for { + select { + case <-p.stopCh: + p.lggr.Infow("stopped - exiting recvLoopSingle", "peerID", pid) + return + case msg, ok := <-ch: + if !ok { + p.lggr.Infow("channel closed - exiting recvLoopSingle", "peerID", pid) + return + } + p.recvCh <- p2ptypes.Message{Sender: pid, Payload: msg} + } + } +} + +func (p *peer) Send(peerID ragetypes.PeerID, msg []byte) error { + stream, ok := p.streams[peerID] + if !ok { + return fmt.Errorf("no stream to peer id: %q", peerID) + } + stream.SendMessage(msg) + return nil +} + +func (p *peer) Receive() <-chan p2ptypes.Message { + return p.recvCh +} + +func (p *peer) Close() error { + err := p.host.Close() + close(p.stopCh) + p.wg.Wait() + p.lggr.Info("peer closed") + return err +} + +func (p *peer) Ready() error { + return nil +} + +func (p *peer) HealthReport() map[string]error { + return nil +} + +func (p *peer) Name() string { + return "P2PPeer" +} diff --git a/core/services/p2p/peer_test.go b/core/services/p2p/peer_test.go new file mode 100644 index 00000000000..004f299d781 --- /dev/null +++ b/core/services/p2p/peer_test.go @@ -0,0 +1,50 @@ +package p2p_test + +import ( + "crypto/ed25519" + "crypto/rand" + "fmt" + "testing" + "time" + + "github.com/hashicorp/consul/sdk/freeport" + "github.com/prometheus/client_golang/prometheus" + ragetypes "github.com/smartcontractkit/libocr/ragep2p/types" + "github.com/stretchr/testify/require" + + "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" + "github.com/smartcontractkit/chainlink/v2/core/logger" + "github.com/smartcontractkit/chainlink/v2/core/services/p2p" +) + +func TestPeer_CleanStartClose(t *testing.T) { + lggr := logger.TestLogger(t) + port := freeport.GetOne(t) + privKey, _ := newKeyPair(t) + + reg := prometheus.NewRegistry() + peerConfig := p2p.PeerConfig{ + PrivateKey: privKey, + ListenAddresses: []string{fmt.Sprintf("127.0.0.1:%d", port)}, + + DeltaReconcile: time.Second * 5, + DeltaDial: time.Second * 5, + DiscovererDatabase: p2p.NewInMemoryDiscovererDatabase(), + MetricsRegisterer: reg, + } + + peer, err := p2p.NewPeer(peerConfig, lggr) + require.NoError(t, err) + err = peer.Start(testutils.Context(t)) + require.NoError(t, err) + err = peer.Close() + require.NoError(t, err) +} + +func newKeyPair(t *testing.T) (ed25519.PrivateKey, ragetypes.PeerID) { + _, privKey, err := ed25519.GenerateKey(rand.Reader) + require.NoError(t, err) + peerID, err := ragetypes.PeerIDFromPrivateKey(privKey) + require.NoError(t, err) + return privKey, peerID +} diff --git a/core/services/p2p/types/mocks/peer.go b/core/services/p2p/types/mocks/peer.go new file mode 100644 index 00000000000..ac4e4eee73d --- /dev/null +++ b/core/services/p2p/types/mocks/peer.go @@ -0,0 +1,179 @@ +// Code generated by mockery v2.38.0. DO NOT EDIT. + +package mocks + +import ( + context "context" + + ragep2ptypes "github.com/smartcontractkit/libocr/ragep2p/types" + mock "github.com/stretchr/testify/mock" + + types "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types" +) + +// Peer is an autogenerated mock type for the Peer type +type Peer struct { + mock.Mock +} + +// Close provides a mock function with given fields: +func (_m *Peer) Close() error { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for Close") + } + + var r0 error + if rf, ok := ret.Get(0).(func() error); ok { + r0 = rf() + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// HealthReport provides a mock function with given fields: +func (_m *Peer) HealthReport() map[string]error { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for HealthReport") + } + + var r0 map[string]error + if rf, ok := ret.Get(0).(func() map[string]error); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(map[string]error) + } + } + + return r0 +} + +// Name provides a mock function with given fields: +func (_m *Peer) Name() string { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for Name") + } + + var r0 string + if rf, ok := ret.Get(0).(func() string); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(string) + } + + return r0 +} + +// Ready provides a mock function with given fields: +func (_m *Peer) Ready() error { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for Ready") + } + + var r0 error + if rf, ok := ret.Get(0).(func() error); ok { + r0 = rf() + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Receive provides a mock function with given fields: +func (_m *Peer) Receive() <-chan types.Message { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for Receive") + } + + var r0 <-chan types.Message + if rf, ok := ret.Get(0).(func() <-chan types.Message); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(<-chan types.Message) + } + } + + return r0 +} + +// Send provides a mock function with given fields: peerID, msg +func (_m *Peer) Send(peerID ragep2ptypes.PeerID, msg []byte) error { + ret := _m.Called(peerID, msg) + + if len(ret) == 0 { + panic("no return value specified for Send") + } + + var r0 error + if rf, ok := ret.Get(0).(func(ragep2ptypes.PeerID, []byte) error); ok { + r0 = rf(peerID, msg) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Start provides a mock function with given fields: _a0 +func (_m *Peer) Start(_a0 context.Context) error { + ret := _m.Called(_a0) + + if len(ret) == 0 { + panic("no return value specified for Start") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context) error); ok { + r0 = rf(_a0) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// UpdateConnections provides a mock function with given fields: peers +func (_m *Peer) UpdateConnections(peers map[ragep2ptypes.PeerID]types.StreamConfig) error { + ret := _m.Called(peers) + + if len(ret) == 0 { + panic("no return value specified for UpdateConnections") + } + + var r0 error + if rf, ok := ret.Get(0).(func(map[ragep2ptypes.PeerID]types.StreamConfig) error); ok { + r0 = rf(peers) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// NewPeer creates a new instance of Peer. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewPeer(t interface { + mock.TestingT + Cleanup(func()) +}) *Peer { + mock := &Peer{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/core/services/p2p/types/types.go b/core/services/p2p/types/types.go new file mode 100644 index 00000000000..5c2e5fa39bb --- /dev/null +++ b/core/services/p2p/types/types.go @@ -0,0 +1,29 @@ +package types + +import ( + "github.com/smartcontractkit/libocr/ragep2p" + ragetypes "github.com/smartcontractkit/libocr/ragep2p/types" + + "github.com/smartcontractkit/chainlink-common/pkg/services" +) + +//go:generate mockery --quiet --name Peer --output ./mocks/ --case=underscore +type Peer interface { + services.Service + UpdateConnections(peers map[ragetypes.PeerID]StreamConfig) error + Send(peerID ragetypes.PeerID, msg []byte) error + Receive() <-chan Message +} + +type Message struct { + Sender ragetypes.PeerID + Payload []byte +} + +type StreamConfig struct { + IncomingMessageBufferSize int + OutgoingMessageBufferSize int + MaxMessageLenBytes int + MessageRateLimiter ragep2p.TokenBucketParams + BytesRateLimiter ragep2p.TokenBucketParams +} diff --git a/core/services/p2p/utils.go b/core/services/p2p/utils.go new file mode 100644 index 00000000000..44898f443eb --- /dev/null +++ b/core/services/p2p/utils.go @@ -0,0 +1,32 @@ +package p2p + +import ( + "context" + + ocrnetworking "github.com/smartcontractkit/libocr/networking/types" +) + +var _ ocrnetworking.DiscovererDatabase = &InMemoryDiscovererDatabase{} + +type InMemoryDiscovererDatabase struct { + store map[string][]byte +} + +func NewInMemoryDiscovererDatabase() *InMemoryDiscovererDatabase { + return &InMemoryDiscovererDatabase{make(map[string][]byte)} +} + +func (d *InMemoryDiscovererDatabase) StoreAnnouncement(ctx context.Context, peerID string, ann []byte) error { + d.store[peerID] = ann + return nil +} + +func (d *InMemoryDiscovererDatabase) ReadAnnouncements(ctx context.Context, peerIDs []string) (map[string][]byte, error) { + ret := make(map[string][]byte) + for _, peerID := range peerIDs { + if ann, ok := d.store[peerID]; ok { + ret[peerID] = ann + } + } + return ret, nil +} diff --git a/core/services/p2p/utils_test.go b/core/services/p2p/utils_test.go new file mode 100644 index 00000000000..968633cca1a --- /dev/null +++ b/core/services/p2p/utils_test.go @@ -0,0 +1,29 @@ +package p2p_test + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" + "github.com/smartcontractkit/chainlink/v2/core/services/p2p" +) + +const ( + peerID1 = "peer1" + peerID2 = "peer2" + ann1 = "announcement1" + ann2 = "announcement2" +) + +func TestInMemoryDiscovererDatabase(t *testing.T) { + db := p2p.NewInMemoryDiscovererDatabase() + require.NoError(t, db.StoreAnnouncement(testutils.Context(t), peerID1, []byte(ann1))) + require.NoError(t, db.StoreAnnouncement(testutils.Context(t), peerID2, []byte(ann2))) + state, err := db.ReadAnnouncements(testutils.Context(t), []string{peerID1, peerID2}) + require.NoError(t, err) + require.Equal(t, map[string][]byte{peerID1: []byte(ann1), peerID2: []byte(ann2)}, state) + state, err = db.ReadAnnouncements(testutils.Context(t), []string{peerID2}) + require.NoError(t, err) + require.Equal(t, map[string][]byte{peerID2: []byte(ann2)}, state) +}