Skip to content

Commit

Permalink
[KS-70] Minimal RageP2P wrapper (#12296)
Browse files Browse the repository at this point in the history
A minimal counterpart of libOCR's peer_v2+endpoint_v2+bootstrapper_v2 with following differences:
1. Ability to change a set of peers dynamically.
2. Identify peers by their PeerIDs instead of Oracle IDs.
  • Loading branch information
bolekk authored and ogtownsend committed Mar 13, 2024
1 parent 5d20ecd commit 3a92354
Show file tree
Hide file tree
Showing 12 changed files with 851 additions and 1 deletion.
5 changes: 5 additions & 0 deletions .changeset/lemon-balloons-pretend.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

Added a RageP2P wrapper
2 changes: 1 addition & 1 deletion core/scripts/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ require (
github.com/montanaflynn/stats v0.7.1
github.com/olekukonko/tablewriter v0.0.5
github.com/pelletier/go-toml/v2 v2.1.1
github.com/prometheus/client_golang v1.17.0
github.com/shopspring/decimal v1.3.1
github.com/smartcontractkit/chainlink-automation v1.0.2-0.20240118014648-1ab6a88c9429
github.com/smartcontractkit/chainlink-common v0.1.7-0.20240306173252-5cbf83ca3a69
Expand Down Expand Up @@ -231,7 +232,6 @@ require (
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect
github.com/pressly/goose/v3 v3.16.0 // indirect
github.com/prometheus/client_golang v1.17.0 // indirect
github.com/prometheus/client_model v0.5.0 // indirect
github.com/prometheus/common v0.45.0 // indirect
github.com/prometheus/procfs v0.12.0 // indirect
Expand Down
91 changes: 91 additions & 0 deletions core/scripts/p2ptoys/common/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package common

import (
"crypto/ed25519"
"encoding/hex"
"encoding/json"
"fmt"
"os"

"github.com/smartcontractkit/libocr/commontypes"
ragep2ptypes "github.com/smartcontractkit/libocr/ragep2p/types"
)

type Config struct {
Nodes []string `json:"nodes"`
Bootstrappers []string `json:"bootstrappers"`

// parsed values below
NodeKeys []ed25519.PrivateKey
NodePeerIDs []ragep2ptypes.PeerID
NodePeerIDsStr []string

BootstrapperKeys []ed25519.PrivateKey
BootstrapperPeerInfos []ragep2ptypes.PeerInfo
BootstrapperLocators []commontypes.BootstrapperLocator
}

const (
// bootstrappers will listen on 127.0.0.1 ports 9000, 9001, 9002, etc.
BootstrapStartPort = 9000

// nodes will listen on 127.0.0.1 ports 8000, 8001, 8002, etc.
NodeStartPort = 8000
)

func ParseConfigFromFile(fileName string) (*Config, error) {
rawConfig, err := os.ReadFile(fileName)
if err != nil {
return nil, err
}
var config Config
err = json.Unmarshal(rawConfig, &config)
if err != nil {
return nil, err
}

for _, hexKey := range config.Nodes {
key, peerID, err := parseKey(hexKey)
if err != nil {
return nil, err
}
config.NodeKeys = append(config.NodeKeys, key)
config.NodePeerIDs = append(config.NodePeerIDs, peerID)
config.NodePeerIDsStr = append(config.NodePeerIDsStr, peerID.String())
}

for _, hexKey := range config.Bootstrappers {
key, peerID, err := parseKey(hexKey)
if err != nil {
return nil, err
}
config.BootstrapperKeys = append(config.BootstrapperKeys, key)
config.BootstrapperPeerInfos = append(config.BootstrapperPeerInfos, ragep2ptypes.PeerInfo{ID: peerID})
}

locators := []commontypes.BootstrapperLocator{}
for id, peerID := range config.BootstrapperPeerInfos {
addr := fmt.Sprintf("127.0.0.1:%d", BootstrapStartPort+id)
locators = append(locators, commontypes.BootstrapperLocator{
PeerID: peerID.ID.String(),
Addrs: []string{addr},
})
config.BootstrapperPeerInfos[id].Addrs = []ragep2ptypes.Address{ragep2ptypes.Address(addr)}
}
config.BootstrapperLocators = locators

return &config, nil
}

func parseKey(hexKey string) (ed25519.PrivateKey, ragep2ptypes.PeerID, error) {
b, err := hex.DecodeString(hexKey)
if err != nil {
return nil, ragep2ptypes.PeerID{}, err
}
key := ed25519.PrivateKey(b)
peerID, err := ragep2ptypes.PeerIDFromPrivateKey(key)
if err != nil {
return nil, ragep2ptypes.PeerID{}, err
}
return key, peerID, nil
}
37 changes: 37 additions & 0 deletions core/scripts/p2ptoys/keygen/gen_keys.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package main

import (
"crypto/ed25519"
"crypto/rand"
"encoding/hex"
"flag"

"github.com/smartcontractkit/chainlink/v2/core/logger"

ragep2ptypes "github.com/smartcontractkit/libocr/ragep2p/types"
)

func main() {
lggr, _ := logger.NewLogger()

n := flag.Int("n", 1, "how many key pairs to generate")
flag.Parse()

for i := 0; i < *n; i++ {
pubKey, privKey, err := ed25519.GenerateKey(rand.Reader)
if err != nil {
lggr.Error("error generating key pair ", err)
return
}
lggr.Info("key pair ", i, ":")
lggr.Info("public key ", hex.EncodeToString(pubKey))
lggr.Info("private key ", hex.EncodeToString(privKey))

peerID, err := ragep2ptypes.PeerIDFromPrivateKey(privKey)
if err != nil {
lggr.Error("error generating peer ID ", err)
return
}
lggr.Info("peer ID ", peerID.String())
}
}
159 changes: 159 additions & 0 deletions core/scripts/p2ptoys/run.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
package main

import (
"context"
"crypto/ed25519"
"flag"
"fmt"
"os"
"os/signal"
"sync"
"time"

"github.com/prometheus/client_golang/prometheus"

"github.com/smartcontractkit/chainlink/core/scripts/p2ptoys/common"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/p2p"

"github.com/smartcontractkit/libocr/ragep2p"
ragetypes "github.com/smartcontractkit/libocr/ragep2p/types"

p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types"
)

/*
Usage:
go run run.go --bootstrap
go run run.go --index 0
go run run.go --index 1
Observe nodes 0 and 1 discovering each other via the bootstrapper and exchanging messages.
*/
func main() {
lggr, _ := logger.NewLogger()
ctx, _ := signal.NotifyContext(context.Background(), os.Interrupt)
var shutdownWaitGroup sync.WaitGroup

configFile := flag.String("config", "test_keys.json", "Path to JSON config file")
nodeIndex := flag.Int("index", 0, "Index of the key in the config file to use")
isBootstrap := flag.Bool("bootstrap", false, "Whether to run as a bootstrapper or not")
flag.Parse()
config, err := common.ParseConfigFromFile(*configFile)
if err != nil {
lggr.Error("error parsing config ", err)
return
}

// Select this node's private key and listen address from config.
var privateKey ed25519.PrivateKey
var peerIDs []ragetypes.PeerID
var listenAddr string
if *isBootstrap {
privateKey = config.BootstrapperKeys[*nodeIndex]
listenAddr = fmt.Sprintf("127.0.0.1:%d", common.BootstrapStartPort+*nodeIndex)
} else {
privateKey = config.NodeKeys[*nodeIndex]
listenAddr = fmt.Sprintf("127.0.0.1:%d", common.NodeStartPort+*nodeIndex)
}
for _, key := range config.NodeKeys {
peerID, _ := ragetypes.PeerIDFromPrivateKey(key)
peerIDs = append(peerIDs, peerID)
}

reg := prometheus.NewRegistry()
peerConfig := p2p.PeerConfig{
PrivateKey: privateKey,
ListenAddresses: []string{listenAddr},
Bootstrappers: config.BootstrapperPeerInfos,

DeltaReconcile: time.Second * 5,
DeltaDial: time.Second * 5,
DiscovererDatabase: p2p.NewInMemoryDiscovererDatabase(),
MetricsRegisterer: reg,
}

peer, err := p2p.NewPeer(peerConfig, lggr)
if err != nil {
lggr.Error("error creating peer:", err)
return
}
err = peer.Start(ctx)
if err != nil {
lggr.Error("error starting peer:", err)
return
}

peers := make(map[ragetypes.PeerID]p2ptypes.StreamConfig)
for _, peerID := range peerIDs {
peers[peerID] = p2ptypes.StreamConfig{
IncomingMessageBufferSize: 1000000,
OutgoingMessageBufferSize: 1000000,
MaxMessageLenBytes: 100000,
MessageRateLimiter: ragep2p.TokenBucketParams{
Rate: 2.0,
Capacity: 10,
},
BytesRateLimiter: ragep2p.TokenBucketParams{
Rate: 100000.0,
Capacity: 100000,
},
}
}

err = peer.UpdateConnections(peers)
if err != nil {
lggr.Errorw("error updating peer addresses", "error", err)
}

if !*isBootstrap {
shutdownWaitGroup.Add(2)
go sendLoop(ctx, &shutdownWaitGroup, peer, peerIDs, *nodeIndex, lggr)
go recvLoop(ctx, &shutdownWaitGroup, peer.Receive(), lggr)
}

<-ctx.Done()
err = peer.Close()
if err != nil {
lggr.Error("error closing peer:", err)
}
shutdownWaitGroup.Wait()
}

func sendLoop(ctx context.Context, shutdownWaitGroup *sync.WaitGroup, peer p2ptypes.Peer, peerIDs []ragetypes.PeerID, myId int, lggr logger.Logger) {
defer shutdownWaitGroup.Done()
ticker := time.NewTicker(2 * time.Second)
defer ticker.Stop()
lastId := 0
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if lastId != myId {
lggr.Infow("sending message", "receiver", peerIDs[lastId])
err := peer.Send(peerIDs[lastId], []byte("hello!"))
if err != nil {
lggr.Errorw("error sending message", "receiver", peerIDs[lastId], "error", err)
}
}
lastId++
if lastId >= len(peerIDs) {
lastId = 0
}
}
}
}

func recvLoop(ctx context.Context, shutdownWaitGroup *sync.WaitGroup, chRecv <-chan p2ptypes.Message, lggr logger.Logger) {
defer shutdownWaitGroup.Done()
for {
select {
case <-ctx.Done():
return
case msg := <-chRecv:
lggr.Info("received message from ", msg.Sender, " : ", string(msg.Payload))
}
}
}
11 changes: 11 additions & 0 deletions core/scripts/p2ptoys/test_keys.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
{
"bootstrappers": [
"105fe90c7b474b9be25e87cce8cd25956bb888215cd8028d6da28a933c8b4124d838facdbdb2e3a2e3e79848b04c3c15815593c2c13ad229eb26a91cd565ab55"
],
"nodes": [
"5f45bf580bb7697f1dd23062444d5692fa0f5e60306e6782b172206aed292abf29c9f9b13bf79e30677cb701fab3140ab5a87d0d0d2dc75de500551de60e5e57",
"bc3d7c6479b44e7ce0377e07d5d456ec0089f77e89d2bdd8eaf98fdc602144dd28ff68e88c565ea4034eb09748ac042ec631c78b4d5db7509c8c54d63bb5550c",
"010fbc8ab7c425f80b7dcfab1b41f6e79ddf27168edb88f423eec624119bca7c99605c48f83f799ecb9abaab8630866867aca20cb8fb9e5518d12a7980ec8e0b",
"12b059c77f7d9367b380d778d45ac8a5dfc8a1737769a868e9e2bc1962eb690e3be9fe624419eb71183707abae4dd30ae56f269b010d98a360850673b51e9488"
]
}
Loading

0 comments on commit 3a92354

Please sign in to comment.