Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[KS-70] Minimal RageP2P wrapper #12296

Merged
merged 1 commit into from
Mar 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/lemon-balloons-pretend.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

Added a RageP2P wrapper
2 changes: 1 addition & 1 deletion core/scripts/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ require (
github.com/montanaflynn/stats v0.7.1
github.com/olekukonko/tablewriter v0.0.5
github.com/pelletier/go-toml/v2 v2.1.1
github.com/prometheus/client_golang v1.17.0
github.com/shopspring/decimal v1.3.1
github.com/smartcontractkit/chainlink-automation v1.0.2-0.20240118014648-1ab6a88c9429
github.com/smartcontractkit/chainlink-common v0.1.7-0.20240306173252-5cbf83ca3a69
Expand Down Expand Up @@ -231,7 +232,6 @@ require (
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect
github.com/pressly/goose/v3 v3.16.0 // indirect
github.com/prometheus/client_golang v1.17.0 // indirect
github.com/prometheus/client_model v0.5.0 // indirect
github.com/prometheus/common v0.45.0 // indirect
github.com/prometheus/procfs v0.12.0 // indirect
Expand Down
91 changes: 91 additions & 0 deletions core/scripts/p2ptoys/common/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package common

import (
"crypto/ed25519"
"encoding/hex"
"encoding/json"
"fmt"
"os"

"github.com/smartcontractkit/libocr/commontypes"
ragep2ptypes "github.com/smartcontractkit/libocr/ragep2p/types"
)

type Config struct {
Nodes []string `json:"nodes"`
Bootstrappers []string `json:"bootstrappers"`

// parsed values below
NodeKeys []ed25519.PrivateKey
NodePeerIDs []ragep2ptypes.PeerID
NodePeerIDsStr []string

BootstrapperKeys []ed25519.PrivateKey
BootstrapperPeerInfos []ragep2ptypes.PeerInfo
BootstrapperLocators []commontypes.BootstrapperLocator
}

const (
// bootstrappers will listen on 127.0.0.1 ports 9000, 9001, 9002, etc.
BootstrapStartPort = 9000

// nodes will listen on 127.0.0.1 ports 8000, 8001, 8002, etc.
NodeStartPort = 8000
)

func ParseConfigFromFile(fileName string) (*Config, error) {
rawConfig, err := os.ReadFile(fileName)
if err != nil {
return nil, err
}
var config Config
err = json.Unmarshal(rawConfig, &config)
if err != nil {
return nil, err
}

for _, hexKey := range config.Nodes {
key, peerID, err := parseKey(hexKey)
if err != nil {
return nil, err
}
config.NodeKeys = append(config.NodeKeys, key)
config.NodePeerIDs = append(config.NodePeerIDs, peerID)
config.NodePeerIDsStr = append(config.NodePeerIDsStr, peerID.String())
}

for _, hexKey := range config.Bootstrappers {
key, peerID, err := parseKey(hexKey)
if err != nil {
return nil, err
}
config.BootstrapperKeys = append(config.BootstrapperKeys, key)
config.BootstrapperPeerInfos = append(config.BootstrapperPeerInfos, ragep2ptypes.PeerInfo{ID: peerID})
}

locators := []commontypes.BootstrapperLocator{}
for id, peerID := range config.BootstrapperPeerInfos {
addr := fmt.Sprintf("127.0.0.1:%d", BootstrapStartPort+id)
locators = append(locators, commontypes.BootstrapperLocator{
PeerID: peerID.ID.String(),
Addrs: []string{addr},
})
config.BootstrapperPeerInfos[id].Addrs = []ragep2ptypes.Address{ragep2ptypes.Address(addr)}
}
config.BootstrapperLocators = locators

return &config, nil
}

func parseKey(hexKey string) (ed25519.PrivateKey, ragep2ptypes.PeerID, error) {
b, err := hex.DecodeString(hexKey)
if err != nil {
return nil, ragep2ptypes.PeerID{}, err
}
key := ed25519.PrivateKey(b)
peerID, err := ragep2ptypes.PeerIDFromPrivateKey(key)
if err != nil {
return nil, ragep2ptypes.PeerID{}, err
}
return key, peerID, nil
}
37 changes: 37 additions & 0 deletions core/scripts/p2ptoys/keygen/gen_keys.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package main

import (
"crypto/ed25519"
"crypto/rand"
"encoding/hex"
"flag"

"github.com/smartcontractkit/chainlink/v2/core/logger"

ragep2ptypes "github.com/smartcontractkit/libocr/ragep2p/types"
)

func main() {
lggr, _ := logger.NewLogger()

n := flag.Int("n", 1, "how many key pairs to generate")
flag.Parse()

for i := 0; i < *n; i++ {
pubKey, privKey, err := ed25519.GenerateKey(rand.Reader)
if err != nil {
lggr.Error("error generating key pair ", err)
return
}
lggr.Info("key pair ", i, ":")
lggr.Info("public key ", hex.EncodeToString(pubKey))
lggr.Info("private key ", hex.EncodeToString(privKey))

peerID, err := ragep2ptypes.PeerIDFromPrivateKey(privKey)
if err != nil {
lggr.Error("error generating peer ID ", err)
return
}
lggr.Info("peer ID ", peerID.String())
}
}
159 changes: 159 additions & 0 deletions core/scripts/p2ptoys/run.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
package main

import (
"context"
"crypto/ed25519"
"flag"
"fmt"
"os"
"os/signal"
"sync"
"time"

"github.com/prometheus/client_golang/prometheus"

"github.com/smartcontractkit/chainlink/core/scripts/p2ptoys/common"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/p2p"

"github.com/smartcontractkit/libocr/ragep2p"
ragetypes "github.com/smartcontractkit/libocr/ragep2p/types"

p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types"
)

/*
Usage:

go run run.go --bootstrap
go run run.go --index 0
go run run.go --index 1

Observe nodes 0 and 1 discovering each other via the bootstrapper and exchanging messages.
*/
func main() {
lggr, _ := logger.NewLogger()
ctx, _ := signal.NotifyContext(context.Background(), os.Interrupt)
var shutdownWaitGroup sync.WaitGroup

configFile := flag.String("config", "test_keys.json", "Path to JSON config file")
nodeIndex := flag.Int("index", 0, "Index of the key in the config file to use")
isBootstrap := flag.Bool("bootstrap", false, "Whether to run as a bootstrapper or not")
flag.Parse()
config, err := common.ParseConfigFromFile(*configFile)
if err != nil {
lggr.Error("error parsing config ", err)
return
}

// Select this node's private key and listen address from config.
var privateKey ed25519.PrivateKey
var peerIDs []ragetypes.PeerID
var listenAddr string
if *isBootstrap {
privateKey = config.BootstrapperKeys[*nodeIndex]
listenAddr = fmt.Sprintf("127.0.0.1:%d", common.BootstrapStartPort+*nodeIndex)
} else {
privateKey = config.NodeKeys[*nodeIndex]
listenAddr = fmt.Sprintf("127.0.0.1:%d", common.NodeStartPort+*nodeIndex)
}
for _, key := range config.NodeKeys {
peerID, _ := ragetypes.PeerIDFromPrivateKey(key)
peerIDs = append(peerIDs, peerID)
}

reg := prometheus.NewRegistry()
peerConfig := p2p.PeerConfig{
PrivateKey: privateKey,
ListenAddresses: []string{listenAddr},
Bootstrappers: config.BootstrapperPeerInfos,

DeltaReconcile: time.Second * 5,
DeltaDial: time.Second * 5,
DiscovererDatabase: p2p.NewInMemoryDiscovererDatabase(),
MetricsRegisterer: reg,
}

peer, err := p2p.NewPeer(peerConfig, lggr)
if err != nil {
lggr.Error("error creating peer:", err)
return
}
err = peer.Start(ctx)
if err != nil {
lggr.Error("error starting peer:", err)
return
}

peers := make(map[ragetypes.PeerID]p2ptypes.StreamConfig)
for _, peerID := range peerIDs {
peers[peerID] = p2ptypes.StreamConfig{
IncomingMessageBufferSize: 1000000,
OutgoingMessageBufferSize: 1000000,
MaxMessageLenBytes: 100000,
MessageRateLimiter: ragep2p.TokenBucketParams{
Rate: 2.0,
Capacity: 10,
},
BytesRateLimiter: ragep2p.TokenBucketParams{
Rate: 100000.0,
Capacity: 100000,
},
}
}

err = peer.UpdateConnections(peers)
if err != nil {
lggr.Errorw("error updating peer addresses", "error", err)
}

if !*isBootstrap {
shutdownWaitGroup.Add(2)
go sendLoop(ctx, &shutdownWaitGroup, peer, peerIDs, *nodeIndex, lggr)
go recvLoop(ctx, &shutdownWaitGroup, peer.Receive(), lggr)
}

<-ctx.Done()
err = peer.Close()
if err != nil {
lggr.Error("error closing peer:", err)
}
shutdownWaitGroup.Wait()
}

func sendLoop(ctx context.Context, shutdownWaitGroup *sync.WaitGroup, peer p2ptypes.Peer, peerIDs []ragetypes.PeerID, myId int, lggr logger.Logger) {
defer shutdownWaitGroup.Done()
ticker := time.NewTicker(2 * time.Second)
defer ticker.Stop()
lastId := 0
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if lastId != myId {
lggr.Infow("sending message", "receiver", peerIDs[lastId])
err := peer.Send(peerIDs[lastId], []byte("hello!"))
if err != nil {
lggr.Errorw("error sending message", "receiver", peerIDs[lastId], "error", err)
}
}
lastId++
if lastId >= len(peerIDs) {
lastId = 0
}
}
}
}

func recvLoop(ctx context.Context, shutdownWaitGroup *sync.WaitGroup, chRecv <-chan p2ptypes.Message, lggr logger.Logger) {
defer shutdownWaitGroup.Done()
for {
select {
case <-ctx.Done():
return
case msg := <-chRecv:
lggr.Info("received message from ", msg.Sender, " : ", string(msg.Payload))
}
}
}
11 changes: 11 additions & 0 deletions core/scripts/p2ptoys/test_keys.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
{
"bootstrappers": [
"105fe90c7b474b9be25e87cce8cd25956bb888215cd8028d6da28a933c8b4124d838facdbdb2e3a2e3e79848b04c3c15815593c2c13ad229eb26a91cd565ab55"
],
"nodes": [
"5f45bf580bb7697f1dd23062444d5692fa0f5e60306e6782b172206aed292abf29c9f9b13bf79e30677cb701fab3140ab5a87d0d0d2dc75de500551de60e5e57",
"bc3d7c6479b44e7ce0377e07d5d456ec0089f77e89d2bdd8eaf98fdc602144dd28ff68e88c565ea4034eb09748ac042ec631c78b4d5db7509c8c54d63bb5550c",
"010fbc8ab7c425f80b7dcfab1b41f6e79ddf27168edb88f423eec624119bca7c99605c48f83f799ecb9abaab8630866867aca20cb8fb9e5518d12a7980ec8e0b",
"12b059c77f7d9367b380d778d45ac8a5dfc8a1737769a868e9e2bc1962eb690e3be9fe624419eb71183707abae4dd30ae56f269b010d98a360850673b51e9488"
]
}
Loading
Loading