Skip to content

Commit

Permalink
ks-291: minor peer discovery optimization
Browse files Browse the repository at this point in the history
  • Loading branch information
krehermann committed Jun 5, 2024
1 parent 9e0dfba commit 02a7c19
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 10 deletions.
23 changes: 23 additions & 0 deletions core/services/p2p/counter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package p2p

import (
"encoding/binary"
)

// counter is a simple abstraction that can be used to generate unique peer group IDs.
type counter struct {
x uint64
}

// Bytes returns the counter as a 32-byte array.
func (g *counter) Bytes() [32]byte {
var b [32]byte
binary.BigEndian.PutUint64(b[24:], g.x)
return b
}

// Inc increments the counter.
func (g *counter) Inc() *counter {
g.x++
return g
}
16 changes: 16 additions & 0 deletions core/services/p2p/counter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package p2p

import (
"testing"

"github.com/stretchr/testify/assert"
)

func Test_groupID(t *testing.T) {
g := &counter{}
assert.Equal(t, [32]byte{}, g.Bytes())
g.Inc()
assert.Equal(t, [32]byte{31: 1}, g.Bytes())
g.Inc()
assert.Equal(t, [32]byte{31: 2}, g.Bytes())
}
24 changes: 14 additions & 10 deletions core/services/p2p/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
)

var (
defaultGroupID = [32]byte{0x01, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01}
defaultStreamName = "stream"
defaultRecvChSize = 10000
)
Expand Down Expand Up @@ -49,9 +48,10 @@ type peer struct {
myID ragetypes.PeerID
recvCh chan p2ptypes.Message

stopCh services.StopChan
wg sync.WaitGroup
lggr logger.Logger
stopCh services.StopChan
wg sync.WaitGroup
lggr logger.Logger
groupID *counter
}

var _ p2ptypes.Peer = &peer{}
Expand Down Expand Up @@ -99,6 +99,7 @@ func NewPeer(cfg PeerConfig, lggr logger.Logger) (*peer, error) {
recvCh: make(chan p2ptypes.Message, defaultRecvChSize),
stopCh: make(services.StopChan),
lggr: lggr.Named("P2PPeer"),
groupID: &counter{},
}, nil
}

Expand All @@ -113,18 +114,21 @@ func (p *peer) UpdateConnections(peers map[ragetypes.PeerID]p2ptypes.StreamConfi
return err
}
}

if err := p.discoverer.RemoveGroup(defaultGroupID); err != nil {
p.lggr.Warnw("failed to remove old group", "groupID", defaultGroupID)
}
// updating the group is a small optimization that avoids reconnecting to existing peers
currentGroupID := p.groupID.Bytes()
newGroupID := p.groupID.Inc().Bytes()
peerIDs := []ragetypes.PeerID{}
for pid := range peers {
peerIDs = append(peerIDs, pid)
}
if err := p.discoverer.AddGroup(defaultGroupID, peerIDs, p.cfg.Bootstrappers); err != nil {
p.lggr.Warnw("failed to add group", "groupID", defaultGroupID)
if err := p.discoverer.AddGroup(newGroupID, peerIDs, p.cfg.Bootstrappers); err != nil {
p.lggr.Warnw("failed to add group", "groupID", newGroupID)
return err
}
if err := p.discoverer.RemoveGroup(currentGroupID); err != nil {
p.lggr.Warnw("failed to remove old group", "groupID", currentGroupID)
}

return nil
}

Expand Down

0 comments on commit 02a7c19

Please sign in to comment.