diff --git a/internal/evidence/reactor.go b/internal/evidence/reactor.go index 79478761f8..ff8f6b6309 100644 --- a/internal/evidence/reactor.go +++ b/internal/evidence/reactor.go @@ -4,11 +4,9 @@ import ( "context" "fmt" "runtime/debug" - "time" sync "github.com/sasha-s/go-deadlock" - clist "github.com/dashpay/tenderdash/internal/libs/clist" "github.com/dashpay/tenderdash/internal/p2p" "github.com/dashpay/tenderdash/libs/log" "github.com/dashpay/tenderdash/libs/service" @@ -22,12 +20,6 @@ const ( EvidenceChannel = p2p.ChannelID(0x38) maxMsgSize = 1048576 // 1MB TODO make it configurable - - // broadcast all uncommitted evidence this often. This sets when the reactor - // goes back to the start of the list and begins sending the evidence again. - // Most evidence should be committed in the very next block that is why we wait - // just over the block production rate before sending evidence again. - broadcastEvidenceIntervalS = 10 ) // GetChannelDescriptor produces an instance of a descriptor for this @@ -49,6 +41,8 @@ type Reactor struct { evpool *Pool chCreator p2p.ChannelCreator + evidenceCh p2p.Channel + peerEvents p2p.PeerEventSubscriber mtx sync.Mutex @@ -82,14 +76,14 @@ func NewReactor( // envelopes on each. In addition, it also listens for peer updates and handles // messages on that p2p channel accordingly. The caller must be sure to execute // OnStop to ensure the outbound p2p Channels are closed. No error is returned. -func (r *Reactor) OnStart(ctx context.Context) error { - ch, err := r.chCreator(ctx, GetChannelDescriptor()) +func (r *Reactor) OnStart(ctx context.Context) (err error) { + r.evidenceCh, err = r.chCreator(ctx, GetChannelDescriptor()) if err != nil { return err } - go r.processEvidenceCh(ctx, ch) - go r.processPeerUpdates(ctx, r.peerEvents(ctx, "evidence"), ch) + go r.processEvidenceCh(ctx) + go r.processPeerUpdates(ctx, r.peerEvents(ctx, "evidence")) return nil } @@ -111,23 +105,30 @@ func (r *Reactor) handleEvidenceMessage(ctx context.Context, envelope *p2p.Envel // Evidence is sent and received one by one ev, err := types.EvidenceFromProto(msg) if err != nil { - logger.Error("failed to convert evidence", "err", err) + logger.Error("failed to convert evidence", "error", err) return err } - if err := r.evpool.AddEvidence(ctx, ev); err != nil { - // If we're given invalid evidence by the peer, notify the router that - // we should remove this peer by returning an error. - if _, ok := err.(*types.ErrInvalidEvidence); ok { - return err + + // If the evidence is already pending or committed, we don't need to + // broadcast it again. + if !r.evpool.isPending(ev) && !r.evpool.isCommitted(ev) { + if err := r.evpool.AddEvidence(ctx, ev); err != nil { + // If we're given invalid evidence by the peer, notify the router that + // we should remove this peer by returning an error. + if _, ok := err.(*types.ErrInvalidEvidence); ok { + return err + } + logger.Error("failed to add evidence", "error", err) } + return r.broadcastEvidence(ctx, *msg, r.evidenceCh) } + logger.Debug("evidence already pending", "evidence", ev) + return nil default: return fmt.Errorf("received unknown message: %T", msg) } - - return nil } // handleMessage handles an Envelope sent from a peer on a specific p2p Channel. @@ -159,13 +160,13 @@ func (r *Reactor) handleMessage(ctx context.Context, envelope *p2p.Envelope) (er // processEvidenceCh implements a blocking event loop where we listen for p2p // Envelope messages from the evidenceCh. -func (r *Reactor) processEvidenceCh(ctx context.Context, evidenceCh p2p.Channel) { - iter := evidenceCh.Receive(ctx) +func (r *Reactor) processEvidenceCh(ctx context.Context) { + iter := r.evidenceCh.Receive(ctx) for iter.Next(ctx) { envelope := iter.Envelope() if err := r.handleMessage(ctx, envelope); err != nil { r.logger.Error("failed to process message", "ch_id", envelope.ChannelID, "envelope", envelope, "err", err) - if serr := evidenceCh.SendError(ctx, p2p.PeerError{ + if serr := r.evidenceCh.SendError(ctx, p2p.PeerError{ NodeID: envelope.From, Err: err, }); serr != nil { @@ -186,7 +187,7 @@ func (r *Reactor) processEvidenceCh(ctx context.Context, evidenceCh p2p.Channel) // connects/disconnects frequently from the broadcasting peer(s). // // REF: https://github.com/tendermint/tendermint/issues/4727 -func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpdate, evidenceCh p2p.Channel) { +func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpdate) { r.logger.Debug("received peer update", "peer", peerUpdate.NodeID, "status", peerUpdate.Status) r.mtx.Lock() @@ -209,7 +210,7 @@ func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpda if !ok { pctx, pcancel := context.WithCancel(ctx) r.peerRoutines[peerUpdate.NodeID] = pcancel - go r.broadcastEvidenceLoop(pctx, peerUpdate.NodeID, evidenceCh) + go r.syncEvidence(pctx, peerUpdate.NodeID) } case p2p.PeerStatusDown: @@ -227,31 +228,23 @@ func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpda // processPeerUpdates initiates a blocking process where we listen for and handle // PeerUpdate messages. When the reactor is stopped, we will catch the signal and // close the p2p PeerUpdatesCh gracefully. -func (r *Reactor) processPeerUpdates(ctx context.Context, peerUpdates *p2p.PeerUpdates, evidenceCh p2p.Channel) { +func (r *Reactor) processPeerUpdates(ctx context.Context, peerUpdates *p2p.PeerUpdates) { for { select { case peerUpdate := <-peerUpdates.Updates(): - r.processPeerUpdate(ctx, peerUpdate, evidenceCh) + r.processPeerUpdate(ctx, peerUpdate) case <-ctx.Done(): return } } } -// broadcastEvidenceLoop starts a blocking process that continuously reads pieces -// of evidence off of a linked-list and sends the evidence in a p2p Envelope to -// the given peer by ID. This should be invoked in a goroutine per unique peer +// syncEvidence starts a blocking process that sends all evidence to a newly +// connected peer. This should be invoked in a goroutine per unique peer // ID via an appropriate PeerUpdate. The goroutine can be signaled to gracefully // exit by either explicitly closing the provided doneCh or by the reactor // signaling to stop. -// -// TODO: This should be refactored so that we do not blindly gossip evidence -// that the peer has already received or may not be ready for. -// -// REF: https://github.com/tendermint/tendermint/issues/4727 -func (r *Reactor) broadcastEvidenceLoop(ctx context.Context, peerID types.NodeID, evidenceCh p2p.Channel) { - var next *clist.CElement - +func (r *Reactor) syncEvidence(ctx context.Context, peerID types.NodeID) { defer func() { r.mtx.Lock() delete(r.peerRoutines, peerID) @@ -266,25 +259,9 @@ func (r *Reactor) broadcastEvidenceLoop(ctx context.Context, peerID types.NodeID } }() - timer := time.NewTimer(0) - defer timer.Stop() - - for { - // This happens because the CElement we were looking at got garbage - // collected (removed). That is, .NextWaitChan() returned nil. So we can go - // ahead and start from the beginning. - if next == nil { - select { - case <-r.evpool.EvidenceWaitChan(): // wait until next evidence is available - if next = r.evpool.EvidenceFront(); next == nil { - continue - } - - case <-ctx.Done(): - return - } - } + next := r.evpool.EvidenceFront() + for next != nil { ev := next.Value.(types.Evidence) evProto, err := types.EvidenceToProto(ev) if err != nil { @@ -296,25 +273,35 @@ func (r *Reactor) broadcastEvidenceLoop(ctx context.Context, peerID types.NodeID // peer may receive this piece of evidence multiple times if it added and // removed frequently from the broadcasting peer. - if err := evidenceCh.Send(ctx, p2p.Envelope{ + if err := r.evidenceCh.Send(ctx, p2p.Envelope{ To: peerID, Message: evProto, }); err != nil { return } - r.logger.Debug("gossiped evidence to peer", "evidence", ev, "peer", peerID) + r.logger.Debug("evidence sync: sent evidence to peer", "evidence", ev, "peer", peerID) select { - case <-timer.C: - // start from the beginning after broadcastEvidenceIntervalS seconds - timer.Reset(time.Second * broadcastEvidenceIntervalS) - next = nil - - case <-next.NextWaitChan(): - next = next.Next() - case <-ctx.Done(): return + default: } + + next = next.Next() } + r.logger.Debug("evidence sync finished", "peer", peerID) +} + +// broadcastEvidence sends new evidence to all connected peers. +func (r *Reactor) broadcastEvidence(ctx context.Context, evidence tmproto.Evidence, evidenceCh p2p.Channel) error { + + if err := evidenceCh.Send(ctx, p2p.Envelope{ + Broadcast: true, + Message: &evidence, + }); err != nil { + return fmt.Errorf("failed to broadcast evidence: %w", err) + } + r.logger.Debug("evidence broadcasted", "evidence", evidence) + + return nil } diff --git a/internal/p2p/router.go b/internal/p2p/router.go index 7adc3d0dfa..ed6e2c0d52 100644 --- a/internal/p2p/router.go +++ b/internal/p2p/router.go @@ -359,7 +359,7 @@ func (r *Router) routeChannel( r.metrics.RouterPeerQueueSend.Observe(time.Since(start).Seconds()) case <-q.closed(): - r.logger.Debug("dropping message for unconnected peer", "peer", envelope.To, "channel", chID) + r.logger.Debug("dropping message on closed channel", "peer", envelope.To, "channel", chID) case <-ctx.Done(): return