Skip to content

Commit

Permalink
Merge pull request #77 from SiaFoundation/syncer-shutdown
Browse files Browse the repository at this point in the history
syncer: Add Close and ensure goroutines exit
  • Loading branch information
n8maninger authored Aug 1, 2024
2 parents 8bf8e03 + bc27564 commit 6804566
Show file tree
Hide file tree
Showing 4 changed files with 210 additions and 16 deletions.
32 changes: 16 additions & 16 deletions syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,8 @@ func (s *Syncer) runPeer(p *Peer) error {
}()

inflight := make(chan struct{}, s.config.MaxInflightRPCs)
var wg sync.WaitGroup
defer wg.Wait()
for {
if p.Err() != nil {
return fmt.Errorf("peer error: %w", p.Err())
Expand All @@ -286,7 +288,9 @@ func (s *Syncer) runPeer(p *Peer) error {
return fmt.Errorf("failed to accept rpc: %w", err)
}
inflight <- struct{}{}
wg.Add(1)
go func() {
defer wg.Done()
defer stream.Close()
// NOTE: we do not set any deadlines on the stream. If a peer is
// slow, fine; we don't need to worry about resource exhaustion
Expand Down Expand Up @@ -405,12 +409,16 @@ func (s *Syncer) alreadyConnected(id gateway.UniqueID) bool {
}

func (s *Syncer) acceptLoop() error {
var wg sync.WaitGroup
defer wg.Wait()
for {
conn, err := s.l.Accept()
if err != nil {
return err
}
wg.Add(1)
go func() {
defer wg.Done()
defer conn.Close()
if err := s.allowConnect(conn.RemoteAddr().String(), true); err != nil {
s.log.Debug("rejected inbound connection", zap.Stringer("remoteAddress", conn.RemoteAddr()), zap.Error(err))
Expand All @@ -429,15 +437,6 @@ func (s *Syncer) acceptLoop() error {
}
}

func (s *Syncer) isStopped() bool {
select {
case <-s.shutdownCtx.Done():
return true
default:
return false
}
}

func (s *Syncer) peerLoop() error {
log := s.log.Named("peerLoop")
numOutbound := func() (n int) {
Expand Down Expand Up @@ -517,12 +516,9 @@ func (s *Syncer) peerLoop() error {
continue
}
for _, p := range candidates {
if numOutbound() >= s.config.MaxOutboundPeers || s.isStopped() {
if numOutbound() >= s.config.MaxOutboundPeers {
break
}

// NOTE: we don't bother logging failure here, since it's common and
// not particularly interesting or actionable
ctx, cancel := context.WithTimeout(s.shutdownCtx, s.config.ConnectTimeout)
if _, err := s.Connect(ctx, p); err != nil {
log.Debug("connected to peer", zap.String("peer", p))
Expand Down Expand Up @@ -632,7 +628,7 @@ func (s *Syncer) syncLoop() error {
// Run spawns goroutines for accepting inbound connections, forming outbound
// connections, and syncing the blockchain from active peers. It blocks until an
// error occurs, upon which all connections are closed and goroutines are
// terminated. To gracefully shutdown a Syncer, close its net.Listener.
// terminated.
func (s *Syncer) Run() error {
errChan := make(chan error)
go func() { errChan <- s.acceptLoop() }()
Expand All @@ -644,7 +640,6 @@ func (s *Syncer) Run() error {
s.shutdownCtxCancel()
s.l.Close()
s.mu.Lock()
s.l = nil
for _, p := range s.peers {
p.Close()
}
Expand All @@ -668,6 +663,11 @@ func (s *Syncer) Run() error {
return err
}

// Close closes the Syncer's net.Listener.
func (s *Syncer) Close() error {
return s.l.Close()
}

// Connect forms an outbound connection to a peer.
func (s *Syncer) Connect(ctx context.Context, addr string) (*Peer, error) {
if err := s.allowConnect(addr, false); err != nil {
Expand All @@ -683,11 +683,11 @@ func (s *Syncer) Connect(ctx context.Context, addr string) (*Peer, error) {
cancel()
}
}()

conn, err := (&net.Dialer{}).DialContext(ctx, "tcp", addr)
if err != nil {
return nil, err
}
cancel()
conn.SetDeadline(time.Now().Add(s.config.ConnectTimeout))
defer conn.SetDeadline(time.Time{})
t, err := gateway.Dial(conn, s.header)
Expand Down
100 changes: 100 additions & 0 deletions syncer/syncer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package syncer_test

import (
"context"
"net"
"testing"
"time"

"go.sia.tech/core/gateway"
"go.sia.tech/core/types"
"go.sia.tech/coreutils/chain"
"go.sia.tech/coreutils/syncer"
"go.sia.tech/coreutils/testutil"
"go.uber.org/zap/zaptest"
)

func TestSyncer(t *testing.T) {
log := zaptest.NewLogger(t)

n, genesis := testutil.Network()
store1, tipState1, err := chain.NewDBStore(chain.NewMemDB(), n, genesis)
if err != nil {
t.Fatal(err)
}
cm1 := chain.NewManager(store1, tipState1)

store2, tipState2, err := chain.NewDBStore(chain.NewMemDB(), n, genesis)
if err != nil {
t.Fatal(err)
}
cm2 := chain.NewManager(store2, tipState2)

l1, err := net.Listen("tcp", ":0")
if err != nil {
t.Fatal(err)
}
defer l1.Close()

l2, err := net.Listen("tcp", ":0")
if err != nil {
t.Fatal(err)
}
defer l2.Close()

s1 := syncer.New(l1, cm1, testutil.NewMemPeerStore(), gateway.Header{
GenesisID: genesis.ID(),
UniqueID: gateway.GenerateUniqueID(),
NetAddress: l1.Addr().String(),
}, syncer.WithLogger(log.Named("syncer1")))
defer s1.Close()
go s1.Run()

s2 := syncer.New(l2, cm2, testutil.NewMemPeerStore(), gateway.Header{
GenesisID: genesis.ID(),
UniqueID: gateway.GenerateUniqueID(),
NetAddress: l2.Addr().String(),
}, syncer.WithLogger(log.Named("syncer2")), syncer.WithSyncInterval(10*time.Millisecond))
defer s2.Close()
go s2.Run()

// mine a few blocks on cm1
testutil.MineBlocks(t, cm1, types.VoidAddress, 10)
// mine less blocks on cm2
testutil.MineBlocks(t, cm2, types.VoidAddress, 5)

if cm1.Tip().Height != 10 {
t.Fatalf("expected cm1 tip height to be 10, got %v", cm1.Tip().Height)
} else if cm2.Tip().Height != 5 {
t.Fatalf("expected cm2 tip height to be 5, got %v", cm2.Tip().Height)
}

// connect the syncers
if _, err := s1.Connect(context.Background(), l2.Addr().String()); err != nil {
t.Fatal(err)
}
// broadcast blocks from s1
b, ok := cm1.Block(cm1.Tip().ID)
if !ok {
t.Fatal("failed to get block")
}

// broadcast the tip from s1 to s2
s1.BroadcastHeader(gateway.BlockHeader{
ParentID: b.ParentID,
Nonce: b.Nonce,
Timestamp: b.Timestamp,
MerkleRoot: b.MerkleRoot(),
})

for i := 0; i < 100; i++ {
if cm1.Tip() == cm2.Tip() {
break
}
time.Sleep(100 * time.Millisecond)
}

if cm1.Tip() != cm2.Tip() {
t.Fatalf("tips are not equal: %v != %v", cm1.Tip(), cm2.Tip())
}
}
78 changes: 78 additions & 0 deletions testutil/syncer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package testutil

import (
"sync"
"time"

"go.sia.tech/coreutils/syncer"
)

// A MemPeerStore is an in-memory implementation of a PeerStore.
type MemPeerStore struct {
mu sync.Mutex
peers map[string]syncer.PeerInfo
}

// AddPeer adds a peer to the store. If the peer already exists, nil should
// be returned.
func (ps *MemPeerStore) AddPeer(addr string) error {
ps.mu.Lock()
defer ps.mu.Unlock()
if _, ok := ps.peers[addr]; ok {
return nil
}
ps.peers[addr] = syncer.PeerInfo{Address: addr}
return nil
}

// Peers returns the set of known peers.
func (ps *MemPeerStore) Peers() ([]syncer.PeerInfo, error) {
ps.mu.Lock()
defer ps.mu.Unlock()
var peers []syncer.PeerInfo
for _, p := range ps.peers {
peers = append(peers, p)
}
return peers, nil
}

// PeerInfo returns the metadata for the specified peer or ErrPeerNotFound
// if the peer wasn't found in the store.
func (ps *MemPeerStore) PeerInfo(addr string) (syncer.PeerInfo, error) {
ps.mu.Lock()
defer ps.mu.Unlock()
p, ok := ps.peers[addr]
if !ok {
return syncer.PeerInfo{}, syncer.ErrPeerNotFound
}
return p, nil
}

// UpdatePeerInfo updates the metadata for the specified peer. If the peer
// is not found, the error should be ErrPeerNotFound.
func (ps *MemPeerStore) UpdatePeerInfo(addr string, fn func(*syncer.PeerInfo)) error {
ps.mu.Lock()
defer ps.mu.Unlock()
p := syncer.PeerInfo{}
fn(&p)
ps.peers[addr] = p
return nil
}

// Ban temporarily bans one or more IPs. The addr should either be a single
// IP with port (e.g. 1.2.3.4:5678) or a CIDR subnet (e.g. 1.2.3.4/16).
func (ps *MemPeerStore) Ban(addr string, duration time.Duration, reason string) error {
return nil
}

// Banned returns false
func (ps *MemPeerStore) Banned(addr string) (bool, error) { return false, nil }

var _ syncer.PeerStore = (*MemPeerStore)(nil)

// NewMemPeerStore returns a new MemPeerStore.
func NewMemPeerStore() *MemPeerStore {
return &MemPeerStore{
peers: make(map[string]syncer.PeerInfo),
}
}
16 changes: 16 additions & 0 deletions testutil/network.go → testutil/testutil.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
package testutil

import (
"testing"
"time"

"go.sia.tech/core/consensus"
"go.sia.tech/core/types"
"go.sia.tech/coreutils"
"go.sia.tech/coreutils/chain"
)

Expand All @@ -21,3 +25,15 @@ func Network() (*consensus.Network, types.Block) {
n.HardforkV2.RequireHeight = 250
return n, genesisBlock
}

// MineBlocks mines n blocks with the reward going to the given address.
func MineBlocks(t *testing.T, cm *chain.Manager, addr types.Address, n int) {
for ; n > 0; n-- {
b, ok := coreutils.MineBlock(cm, addr, time.Second)
if !ok {
t.Fatal("failed to mine block")
} else if err := cm.AddBlocks([]types.Block{b}); err != nil {
t.Fatal(err)
}
}
}

0 comments on commit 6804566

Please sign in to comment.