diff --git a/go.mod b/go.mod index 0cb058e..ba90286 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.21 require ( go.etcd.io/bbolt v1.3.8 go.sia.tech/core v0.2.0 + go.uber.org/zap v1.26.0 golang.org/x/crypto v0.0.0-20220507011949-2cf3adece122 lukechampine.com/frand v1.4.2 ) @@ -12,5 +13,6 @@ require ( require ( github.com/aead/chacha20 v0.0.0-20180709150244-8b13a72661da // indirect go.sia.tech/mux v1.2.0 // indirect + go.uber.org/multierr v1.10.0 // indirect golang.org/x/sys v0.5.0 // indirect ) diff --git a/go.sum b/go.sum index 5df2f6a..e14723d 100644 --- a/go.sum +++ b/go.sum @@ -8,12 +8,16 @@ github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKs github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= go.etcd.io/bbolt v1.3.8 h1:xs88BrvEv273UsB79e0hcVrlUWmS0a8upikMFhSyAtA= go.etcd.io/bbolt v1.3.8/go.mod h1:N9Mkw9X8x5fupy0IKsmuqVtoGDyxsaDlbk4Rd05IAQw= -go.sia.tech/core v0.1.12 h1:nrq/BvYbTGVLtZu0MHBTExUAP5nfNbcGhaJbuK839gc= -go.sia.tech/core v0.1.12/go.mod h1:3EoY+rR78w1/uGoXXVqcYdwSjSJKuEMI5bL7WROA27Q= go.sia.tech/core v0.2.0 h1:+J/QylNueFmg5kCJCIfwqnCtKKoC/JN5wasPLy85QZI= go.sia.tech/core v0.2.0/go.mod h1:3EoY+rR78w1/uGoXXVqcYdwSjSJKuEMI5bL7WROA27Q= go.sia.tech/mux v1.2.0 h1:ofa1Us9mdymBbGMY2XH/lSpY8itFsKIo/Aq8zwe+GHU= go.sia.tech/mux v1.2.0/go.mod h1:Yyo6wZelOYTyvrHmJZ6aQfRoer3o4xyKQ4NmQLJrBSo= +go.uber.org/goleak v1.2.0 h1:xqgm/S+aQvhWFTtR0XK3Jvg7z8kGV8P4X14IzwN3Eqk= +go.uber.org/goleak v1.2.0/go.mod h1:XJYK+MuIchqpmGmUSAzotztawfKvYLUIgg7guXrwVUo= +go.uber.org/multierr v1.10.0 h1:S0h4aNzvfcFsC3dRF1jLoaov7oRaKqRGC/pUEJ2yvPQ= +go.uber.org/multierr v1.10.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= +go.uber.org/zap v1.26.0 h1:sI7k6L95XOKS281NhVKOFCUNIvv9e0w4BF8N3u+tCRo= +go.uber.org/zap v1.26.0/go.mod h1:dtElttAiwGvoJ/vj4IwHBS/gXsEu/pZ50mUIRWuG0so= golang.org/x/crypto v0.0.0-20220507011949-2cf3adece122 h1:NvGWuYG8dkDHFSKksI1P9faiVJ9rayE6l0+ouWVIDs8= golang.org/x/crypto v0.0.0-20220507011949-2cf3adece122/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/sys v0.0.0-20190626221950-04f50cda93cb/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= diff --git a/syncer/syncer.go b/syncer/syncer.go index 17a425d..2704c2e 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -4,8 +4,6 @@ import ( "context" "errors" "fmt" - "io" - "log" "net" "reflect" "sync" @@ -14,6 +12,7 @@ import ( "go.sia.tech/core/consensus" "go.sia.tech/core/gateway" "go.sia.tech/core/types" + "go.uber.org/zap" "lukechampine.com/frand" ) @@ -79,7 +78,7 @@ type config struct { MaxSendBlocks uint64 PeerDiscoveryInterval time.Duration SyncInterval time.Duration - Logger *log.Logger + Logger *zap.Logger } // An Option modifies a Syncer's configuration. @@ -171,7 +170,7 @@ func WithSyncInterval(d time.Duration) Option { // WithLogger sets the logger used by a Syncer. The default is a logger that // outputs to io.Discard. -func WithLogger(l *log.Logger) Option { +func WithLogger(l *zap.Logger) Option { return func(c *config) { c.Logger = l } } @@ -182,7 +181,7 @@ type Syncer struct { pm PeerStore header gateway.Header config config - log *log.Logger // redundant, but convenient + log *zap.Logger // redundant, but convenient mu sync.Mutex peers map[string]*gateway.Peer @@ -200,7 +199,7 @@ func (h *rpcHandler) resync(p *gateway.Peer, reason string) { h.s.synced[p.Addr] = false h.s.mu.Unlock() if !alreadyResyncing { - h.s.log.Printf("triggering resync with %v: %v", p, reason) + h.s.log.Debug("resync triggered", zap.String("peer", p.Addr), zap.String("reason", reason)) } } @@ -278,7 +277,7 @@ func (h *rpcHandler) RelayHeader(bh gateway.BlockHeader, origin *gateway.Peer) { // request + validate full block if b, err := origin.SendBlock(bh.ID(), h.s.config.SendBlockTimeout); err != nil { // log-worthy, but not ban-worthy - h.s.log.Printf("couldn't retrieve new block %v after header relay from %v: %v", bh.ID(), origin, err) + h.s.log.Warn("couldn't retrieve new block after header relay", zap.Stringer("header", bh.ID()), zap.Stringer("origin", origin), zap.Error(err)) return } else if err := h.s.cm.AddBlocks([]types.Block{b}); err != nil { h.s.ban(origin, err) @@ -296,7 +295,7 @@ func (h *rpcHandler) RelayTransactionSet(txns []types.Transaction, origin *gatew // too risky to ban here (txns are probably just outdated), but at least // log it if we think we're synced if b, ok := h.s.cm.Block(h.s.cm.Tip().ID); ok && time.Since(b.Timestamp) < 2*h.s.cm.TipState().BlockInterval() { - h.s.log.Printf("received an invalid transaction set from %v: %v", origin, err) + h.s.log.Debug("invalid transaction set received", zap.Stringer("origin", origin), zap.Error(err)) } } else { h.s.relayTransactionSet(txns, origin) // non-blocking @@ -361,7 +360,7 @@ func (h *rpcHandler) RelayV2BlockOutline(bo gateway.V2BlockOutline, origin *gate txns, v2txns, err := origin.SendTransactions(index, missing, h.s.config.SendTransactionsTimeout) if err != nil { // log-worthy, but not ban-worthy - h.s.log.Printf("couldn't retrieve missing transactions of %v after relay from %v: %v", bid, origin, err) + h.s.log.Debug("couldn't retrieve missing transactions from peer", zap.Stringer("blockID", bid), zap.Stringer("origin", origin), zap.Error(err)) return } b, missing = bo.Complete(cs, txns, v2txns) @@ -393,7 +392,7 @@ func (h *rpcHandler) RelayV2TransactionSet(basis types.ChainIndex, txns []types. h.s.ban(origin, errors.New("peer sent an empty transaction set")) } else if known, err := h.s.cm.AddV2PoolTransactions(basis, txns); !known { if err != nil { - h.s.log.Printf("received an invalid transaction set from %v: %v", origin, err) + h.s.log.Debug("received invalid transaction set", zap.Stringer("origin", origin), zap.Error(err)) } else { h.s.relayV2TransactionSet(basis, txns, origin) // non-blocking } @@ -401,7 +400,7 @@ func (h *rpcHandler) RelayV2TransactionSet(basis types.ChainIndex, txns []types. } func (s *Syncer) ban(p *gateway.Peer, err error) { - s.log.Printf("banning %v: %v", p, err) + s.log.Debug("banning peer", zap.Stringer("peer", p), zap.Error(err)) p.SetErr(errors.New("banned")) s.pm.Ban(p.ConnAddr, 24*time.Hour, err.Error()) @@ -462,7 +461,7 @@ func (s *Syncer) runPeer(p *gateway.Peer) { // slow, fine; we don't need to worry about resource exhaustion // unless we have tons of peers. if err := p.HandleRPC(id, stream, h); err != nil { - s.log.Printf("incoming RPC %v from peer %v failed: %v", id, p, err) + s.log.Debug("rpc failing", zap.Stringer("peer", p), zap.Stringer("rpc", id), zap.Error(err)) } <-inflight }() @@ -570,11 +569,11 @@ func (s *Syncer) acceptLoop() error { go func() { defer conn.Close() if err := s.allowConnect(conn.RemoteAddr().String(), true); err != nil { - s.log.Printf("rejected inbound connection from %v: %v", conn.RemoteAddr(), err) + s.log.Debug("rejected inbound connection", zap.Stringer("remoteAddress", conn.RemoteAddr()), zap.Error(err)) } else if p, err := gateway.Accept(conn, s.header); err != nil { - s.log.Printf("failed to accept inbound connection from %v: %v", conn.RemoteAddr(), err) + s.log.Debug("failed to accept inbound connection", zap.Stringer("remoteAddress", conn.RemoteAddr()), zap.Error(err)) } else if s.alreadyConnected(p) { - s.log.Printf("rejected inbound connection from %v: already connected", conn.RemoteAddr()) + s.log.Debug("already connected to peer", zap.Stringer("remoteAddress", conn.RemoteAddr())) } else { s.runPeer(p) } @@ -660,7 +659,7 @@ func (s *Syncer) peerLoop(closeChan <-chan struct{}) error { // NOTE: we don't bother logging failure here, since it's common and // not particularly interesting or actionable if _, err := s.Connect(p); err == nil { - s.log.Printf("formed outbound connection to %v", p) + s.log.Debug("connected to peer", zap.String("peer", p)) } lastTried[p] = time.Now() } @@ -702,7 +701,7 @@ func (s *Syncer) syncLoop(closeChan <-chan struct{}) error { s.mu.Lock() s.synced[p.Addr] = true s.mu.Unlock() - s.log.Printf("starting sync with %v", p) + s.log.Debug("syncing with peer", zap.Stringer("peer", p)) oldTip := s.cm.Tip() oldTime := time.Now() lastPrint := time.Now() @@ -720,7 +719,7 @@ func (s *Syncer) syncLoop(closeChan <-chan struct{}) error { }) startTime, startHeight = endTime, endHeight if time.Since(lastPrint) > 30*time.Second { - s.log.Printf("syncing with %v, tip now %v (avg %.2f blocks/s)", p, s.cm.Tip(), float64(s.cm.Tip().Height-oldTip.Height)/endTime.Sub(oldTime).Seconds()) + s.log.Debug("syncing with peer", zap.Stringer("peer", p), zap.Uint64("blocks", sentBlocks), zap.Duration("elapsed", endTime.Sub(oldTime))) lastPrint = time.Now() } return nil @@ -745,11 +744,11 @@ func (s *Syncer) syncLoop(closeChan <-chan struct{}) error { } totalBlocks := s.cm.Tip().Height - oldTip.Height if err != nil { - s.log.Printf("syncing with %v failed after %v blocks: %v", p, totalBlocks, err) + s.log.Debug("syncing with peer failed", zap.Stringer("peer", p), zap.Error(err), zap.Uint64("blocks", totalBlocks)) } else if newTip := s.cm.Tip(); newTip != oldTip { - s.log.Printf("finished syncing %v blocks with %v, tip now %v", totalBlocks, p, newTip) + s.log.Debug("finished syncing with peer", zap.Stringer("peer", p), zap.Stringer("newTip", newTip), zap.Uint64("blocks", totalBlocks)) } else { - s.log.Printf("finished syncing %v blocks with %v, tip unchanged", sentBlocks, p) + s.log.Debug("finished syncing with peer, tip unchanged", zap.Stringer("peer", p), zap.Uint64("blocks", sentBlocks)) } } } @@ -899,7 +898,7 @@ func New(l net.Listener, cm ChainManager, pm PeerStore, header gateway.Header, o MaxSendBlocks: 10, PeerDiscoveryInterval: 5 * time.Second, SyncInterval: 5 * time.Second, - Logger: log.New(io.Discard, "", 0), + Logger: zap.NewNop(), } for _, opt := range opts { opt(&config)