From 3d3d7c9e9b1d4a0f59881e25f150f84d5dfb8619 Mon Sep 17 00:00:00 2001 From: lukechampine Date: Wed, 31 Jul 2024 15:22:09 -0400 Subject: [PATCH 1/4] syncer: Add Close and ensure goroutines exit --- syncer/syncer.go | 32 ++++++++++++++++---------------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/syncer/syncer.go b/syncer/syncer.go index c7adf87..e60c01b 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -276,6 +276,8 @@ func (s *Syncer) runPeer(p *Peer) error { }() inflight := make(chan struct{}, s.config.MaxInflightRPCs) + var wg sync.WaitGroup + defer wg.Wait() for { if p.Err() != nil { return fmt.Errorf("peer error: %w", p.Err()) @@ -286,7 +288,9 @@ func (s *Syncer) runPeer(p *Peer) error { return fmt.Errorf("failed to accept rpc: %w", err) } inflight <- struct{}{} + wg.Add(1) go func() { + defer wg.Done() defer stream.Close() // NOTE: we do not set any deadlines on the stream. If a peer is // slow, fine; we don't need to worry about resource exhaustion @@ -405,12 +409,16 @@ func (s *Syncer) alreadyConnected(id gateway.UniqueID) bool { } func (s *Syncer) acceptLoop() error { + var wg sync.WaitGroup + defer wg.Wait() for { conn, err := s.l.Accept() if err != nil { return err } + wg.Add(1) go func() { + defer wg.Done() defer conn.Close() if err := s.allowConnect(conn.RemoteAddr().String(), true); err != nil { s.log.Debug("rejected inbound connection", zap.Stringer("remoteAddress", conn.RemoteAddr()), zap.Error(err)) @@ -429,15 +437,6 @@ func (s *Syncer) acceptLoop() error { } } -func (s *Syncer) isStopped() bool { - select { - case <-s.shutdownCtx.Done(): - return true - default: - return false - } -} - func (s *Syncer) peerLoop() error { log := s.log.Named("peerLoop") numOutbound := func() (n int) { @@ -517,12 +516,9 @@ func (s *Syncer) peerLoop() error { continue } for _, p := range candidates { - if numOutbound() >= s.config.MaxOutboundPeers || s.isStopped() { + if numOutbound() >= s.config.MaxOutboundPeers { break } - - // NOTE: we don't bother logging failure here, since it's common and - // not particularly interesting or actionable ctx, cancel := context.WithTimeout(s.shutdownCtx, s.config.ConnectTimeout) if _, err := s.Connect(ctx, p); err != nil { log.Debug("connected to peer", zap.String("peer", p)) @@ -632,7 +628,7 @@ func (s *Syncer) syncLoop() error { // Run spawns goroutines for accepting inbound connections, forming outbound // connections, and syncing the blockchain from active peers. It blocks until an // error occurs, upon which all connections are closed and goroutines are -// terminated. To gracefully shutdown a Syncer, close its net.Listener. +// terminated. func (s *Syncer) Run() error { errChan := make(chan error) go func() { errChan <- s.acceptLoop() }() @@ -644,7 +640,6 @@ func (s *Syncer) Run() error { s.shutdownCtxCancel() s.l.Close() s.mu.Lock() - s.l = nil for _, p := range s.peers { p.Close() } @@ -668,6 +663,11 @@ func (s *Syncer) Run() error { return err } +// Close closes the Syncer's net.Listener. +func (s *Syncer) Close() error { + return s.l.Close() +} + // Connect forms an outbound connection to a peer. func (s *Syncer) Connect(ctx context.Context, addr string) (*Peer, error) { if err := s.allowConnect(addr, false); err != nil { @@ -683,11 +683,11 @@ func (s *Syncer) Connect(ctx context.Context, addr string) (*Peer, error) { cancel() } }() - conn, err := (&net.Dialer{}).DialContext(ctx, "tcp", addr) if err != nil { return nil, err } + cancel() conn.SetDeadline(time.Now().Add(s.config.ConnectTimeout)) defer conn.SetDeadline(time.Time{}) t, err := gateway.Dial(conn, s.header) From a0693a56b0ae8dd1927fcad0c407292596ad94a9 Mon Sep 17 00:00:00 2001 From: Nate Maninger Date: Thu, 1 Aug 2024 13:56:12 -0700 Subject: [PATCH 2/4] syncer: add unit test --- syncer/syncer_test.go | 100 +++++++++++++++++++++++++++ testutil/syncer.go | 77 +++++++++++++++++++++ testutil/{network.go => testutil.go} | 16 +++++ 3 files changed, 193 insertions(+) create mode 100644 syncer/syncer_test.go create mode 100644 testutil/syncer.go rename testutil/{network.go => testutil.go} (61%) diff --git a/syncer/syncer_test.go b/syncer/syncer_test.go new file mode 100644 index 0000000..ac5ba9d --- /dev/null +++ b/syncer/syncer_test.go @@ -0,0 +1,100 @@ +package syncer_test + +import ( + "context" + "net" + "testing" + "time" + + "go.sia.tech/core/gateway" + "go.sia.tech/core/types" + "go.sia.tech/coreutils/chain" + "go.sia.tech/coreutils/syncer" + "go.sia.tech/coreutils/testutil" + "go.uber.org/zap/zaptest" +) + +func TestSyncer(t *testing.T) { + log := zaptest.NewLogger(t) + + n, genesis := testutil.Network() + store1, tipState1, err := chain.NewDBStore(chain.NewMemDB(), n, genesis) + if err != nil { + t.Fatal(err) + } + cm1 := chain.NewManager(store1, tipState1) + + store2, tipState2, err := chain.NewDBStore(chain.NewMemDB(), n, genesis) + if err != nil { + t.Fatal(err) + } + cm2 := chain.NewManager(store2, tipState2) + + l1, err := net.Listen("tcp", ":0") + if err != nil { + t.Fatal(err) + } + defer l1.Close() + + l2, err := net.Listen("tcp", ":0") + if err != nil { + t.Fatal(err) + } + defer l2.Close() + + s1 := syncer.New(l1, cm1, testutil.NewMemPeerStore(), gateway.Header{ + GenesisID: genesis.ID(), + UniqueID: gateway.GenerateUniqueID(), + NetAddress: l1.Addr().String(), + }, syncer.WithLogger(log.Named("syncer1"))) + defer s1.Close() + go s1.Run() + + s2 := syncer.New(l2, cm2, testutil.NewMemPeerStore(), gateway.Header{ + GenesisID: genesis.ID(), + UniqueID: gateway.GenerateUniqueID(), + NetAddress: l2.Addr().String(), + }, syncer.WithLogger(log.Named("syncer2")), syncer.WithSyncInterval(10*time.Millisecond)) + defer s2.Close() + go s2.Run() + + // mine a few blocks on cm1 + testutil.MineBlocks(t, cm1, types.VoidAddress, 10) + // mine less blocks on cm2 + testutil.MineBlocks(t, cm2, types.VoidAddress, 5) + + if cm1.Tip().Height != 10 { + t.Fatalf("expected cm1 tip height to be 10, got %v", cm1.Tip().Height) + } else if cm2.Tip().Height != 5 { + t.Fatalf("expected cm2 tip height to be 5, got %v", cm2.Tip().Height) + } + + // connect the syncers + if _, err := s1.Connect(context.Background(), l2.Addr().String()); err != nil { + t.Fatal(err) + } + // broadcast blocks from s1 + b, ok := cm1.Block(cm1.Tip().ID) + if !ok { + t.Fatal("failed to get block") + } + + // broadcast the tip from s1 to s2 + s1.BroadcastHeader(gateway.BlockHeader{ + ParentID: b.ParentID, + Nonce: b.Nonce, + Timestamp: b.Timestamp, + MerkleRoot: b.MerkleRoot(), + }) + + for i := 0; i < 100; i++ { + if cm1.Tip() == cm2.Tip() { + break + } + time.Sleep(10 * time.Millisecond) + } + + if cm1.Tip() != cm2.Tip() { + t.Fatalf("tips are not equal: %v != %v", cm1.Tip(), cm2.Tip()) + } +} diff --git a/testutil/syncer.go b/testutil/syncer.go new file mode 100644 index 0000000..ccbe1b8 --- /dev/null +++ b/testutil/syncer.go @@ -0,0 +1,77 @@ +package testutil + +import ( + "sync" + "time" + + "go.sia.tech/coreutils/syncer" +) + +// A MemPeerStore is an in-memory implementation of a PeerStore. +type MemPeerStore struct { + mu sync.Mutex + peers map[string]syncer.PeerInfo +} + +// AddPeer adds a peer to the store. If the peer already exists, nil should +// be returned. +func (ps *MemPeerStore) AddPeer(addr string) error { + ps.mu.Lock() + defer ps.mu.Unlock() + if _, ok := ps.peers[addr]; ok { + return nil + } + ps.peers[addr] = syncer.PeerInfo{Address: addr} + return nil +} + +// Peers returns the set of known peers. +func (ps *MemPeerStore) Peers() ([]syncer.PeerInfo, error) { + ps.mu.Lock() + defer ps.mu.Unlock() + var peers []syncer.PeerInfo + for _, p := range ps.peers { + peers = append(peers, p) + } + return peers, nil +} + +// PeerInfo returns the metadata for the specified peer or ErrPeerNotFound +// if the peer wasn't found in the store. +func (ps *MemPeerStore) PeerInfo(addr string) (syncer.PeerInfo, error) { + ps.mu.Lock() + defer ps.mu.Unlock() + p, ok := ps.peers[addr] + if !ok { + return syncer.PeerInfo{}, syncer.ErrPeerNotFound + } + return p, nil +} + +// UpdatePeerInfo updates the metadata for the specified peer. If the peer +// is not found, the error should be ErrPeerNotFound. +func (ps *MemPeerStore) UpdatePeerInfo(addr string, fn func(*syncer.PeerInfo)) error { + ps.mu.Lock() + defer ps.mu.Unlock() + p := syncer.PeerInfo{} + fn(&p) + ps.peers[addr] = p + return nil +} + +// Ban temporarily bans one or more IPs. The addr should either be a single +// IP with port (e.g. 1.2.3.4:5678) or a CIDR subnet (e.g. 1.2.3.4/16). +func (ps *MemPeerStore) Ban(addr string, duration time.Duration, reason string) error { + return nil +} + +func (ps *MemPeerStore) Banned(addr string) (bool, error) { return false, nil } + +var _ syncer.PeerStore = (*MemPeerStore)(nil) + +// NewMemPeerStore returns a new MemPeerStore. +func NewMemPeerStore() *MemPeerStore { + return &MemPeerStore{ + peers: make(map[string]syncer.PeerInfo), + } +} diff --git a/testutil/network.go b/testutil/testutil.go similarity index 61% rename from testutil/network.go rename to testutil/testutil.go index e57e453..9c17a9d 100644 --- a/testutil/network.go +++ b/testutil/testutil.go @@ -1,8 +1,12 @@ package testutil import ( + "testing" + "time" + "go.sia.tech/core/consensus" "go.sia.tech/core/types" + "go.sia.tech/coreutils" "go.sia.tech/coreutils/chain" ) @@ -21,3 +25,15 @@ func Network() (*consensus.Network, types.Block) { n.HardforkV2.RequireHeight = 250 return n, genesisBlock } + +// MineBlocks mines n blocks with the reward going to the given address. +func MineBlocks(t *testing.T, cm *chain.Manager, addr types.Address, n int) { + for ; n > 0; n-- { + b, ok := coreutils.MineBlock(cm, addr, time.Second) + if !ok { + t.Fatal("failed to mine block") + } else if err := cm.AddBlocks([]types.Block{b}); err != nil { + t.Fatal(err) + } + } +} From 981187833dc7066c7cb8bef1c7086727ceb00bce Mon Sep 17 00:00:00 2001 From: Nate Maninger Date: Thu, 1 Aug 2024 13:59:12 -0700 Subject: [PATCH 3/4] syncer: fix lint --- testutil/syncer.go | 1 + 1 file changed, 1 insertion(+) diff --git a/testutil/syncer.go b/testutil/syncer.go index ccbe1b8..2679329 100644 --- a/testutil/syncer.go +++ b/testutil/syncer.go @@ -65,6 +65,7 @@ func (ps *MemPeerStore) Ban(addr string, duration time.Duration, reason string) return nil } +// Banned returns false func (ps *MemPeerStore) Banned(addr string) (bool, error) { return false, nil } var _ syncer.PeerStore = (*MemPeerStore)(nil) From bc27564586dc03281ae1cfa6128416c19a9c552c Mon Sep 17 00:00:00 2001 From: Nate Maninger Date: Thu, 1 Aug 2024 14:01:13 -0700 Subject: [PATCH 4/4] syncer: increase test timeout --- syncer/syncer_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/syncer/syncer_test.go b/syncer/syncer_test.go index ac5ba9d..614eae3 100644 --- a/syncer/syncer_test.go +++ b/syncer/syncer_test.go @@ -91,7 +91,7 @@ func TestSyncer(t *testing.T) { if cm1.Tip() == cm2.Tip() { break } - time.Sleep(10 * time.Millisecond) + time.Sleep(100 * time.Millisecond) } if cm1.Tip() != cm2.Tip() {