From 84396b0ccb74e71f99be0da2f4c09ba700ddaeb8 Mon Sep 17 00:00:00 2001 From: PJ Date: Thu, 29 Aug 2024 13:15:40 +0200 Subject: [PATCH] syncer: rename Close to Shutdown --- syncer/syncer.go | 29 +++++++++++++++++++++++------ syncer/syncer_test.go | 4 ++-- 2 files changed, 25 insertions(+), 8 deletions(-) diff --git a/syncer/syncer.go b/syncer/syncer.go index 66dac8a..ff00799 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -285,14 +285,19 @@ func (s *Syncer) runPeer(p *Peer) error { p.setErr(err) return fmt.Errorf("failed to accept rpc: %w", err) } + + // set a generous deadline + err = stream.SetDeadline(time.Now().Add(10 * time.Minute)) + if err != nil { + p.setErr(err) + return fmt.Errorf("failed to set deadline: %w", err) + } + inflight <- struct{}{} s.wg.Add(1) go func() { defer s.wg.Done() defer stream.Close() - // NOTE: we do not set any deadlines on the stream. If a peer is - // slow, fine; we don't need to worry about resource exhaustion - // unless we have tons of peers. if err := s.handleRPC(id, stream, p); err != nil { s.log.Debug("rpc failed", zap.Stringer("peer", p), zap.Stringer("rpc", id), zap.Error(err)) } @@ -675,10 +680,22 @@ func (s *Syncer) Run(ctx context.Context) error { return err } -// Close closes the Syncer's net.Listener. -func (s *Syncer) Close() error { +// Shutdown closes the Syncer's net.Listener. +func (s *Syncer) Shutdown(ctx context.Context) error { err := s.l.Close() - s.wg.Wait() + + waitChan := make(chan struct{}) + go func() { + s.wg.Wait() + close(waitChan) + }() + + select { + case <-ctx.Done(): + return errors.Join(err, ctx.Err()) + case <-waitChan: + } + return err } diff --git a/syncer/syncer_test.go b/syncer/syncer_test.go index 89d8502..74083b7 100644 --- a/syncer/syncer_test.go +++ b/syncer/syncer_test.go @@ -47,7 +47,7 @@ func TestSyncer(t *testing.T) { UniqueID: gateway.GenerateUniqueID(), NetAddress: l1.Addr().String(), }, syncer.WithLogger(log.Named("syncer1"))) - defer s1.Close() + defer s1.Shutdown(context.Background()) go s1.Run(context.Background()) s2 := syncer.New(l2, cm2, testutil.NewMemPeerStore(), gateway.Header{ @@ -55,7 +55,7 @@ func TestSyncer(t *testing.T) { UniqueID: gateway.GenerateUniqueID(), NetAddress: l2.Addr().String(), }, syncer.WithLogger(log.Named("syncer2")), syncer.WithSyncInterval(10*time.Millisecond)) - defer s2.Close() + defer s2.Shutdown(context.Background()) go s2.Run(context.Background()) // mine a few blocks on cm1