Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(consensus): improve logs #679

Merged
merged 2 commits into from
Sep 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion internal/consensus/block_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (c *blockExecutor) ensureProcess(ctx context.Context, rs *cstypes.RoundStat
block := rs.ProposalBlock
crs := rs.CurrentRoundState
if crs.Params.Source != sm.ProcessProposalSource || !crs.MatchesBlock(block.Header, round) {
c.logger.Debug("CurrentRoundState is outdated", "crs", crs)
c.logger.Debug("CurrentRoundState is outdated, executing ProcessProposal", "crs", crs)
uncommittedState, err := c.blockExec.ProcessProposal(ctx, block, round, c.committedState, true)
if err != nil {
return fmt.Errorf("ProcessProposal abci method: %w", err)
Expand Down
2 changes: 1 addition & 1 deletion internal/consensus/event_publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func (p *EventPublisher) PublishValidBlockEvent(rs cstypes.RoundState) {

// PublishCommitEvent ...
func (p *EventPublisher) PublishCommitEvent(commit *types.Commit) error {
p.logger.Debug("publish commit event", "commit", commit)
p.logger.Trace("publish commit event", "commit", commit)
if err := p.eventBus.PublishEventCommit(types.EventDataCommit{Commit: commit}); err != nil {
return err
}
Expand Down
8 changes: 4 additions & 4 deletions internal/consensus/gossip_peer_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,15 +86,15 @@ func (g *peerGossipWorker) Stop() {
if !g.running.Swap(false) {
return
}
g.logger.Debug("peer gossip worker stopping")
g.logger.Trace("peer gossip worker stopping")
close(g.stopCh)
g.Wait()
}

func (g *peerGossipWorker) Wait() {
for _, hd := range g.handlers {
<-hd.stoppedCh
g.logger.Debug("peer gossip worker stopped")
g.logger.Trace("peer gossip worker stopped")
}
}

Expand All @@ -107,11 +107,11 @@ func (g *peerGossipWorker) runHandler(ctx context.Context, hd gossipHandler) {
select {
case <-timer.Chan():
case <-g.stopCh:
g.logger.Debug("peer gossip worker got stop signal")
g.logger.Trace("peer gossip worker got stop signal")
close(hd.stoppedCh)
return
case <-ctx.Done():
g.logger.Debug("peer gossip worker got stop signal via context.Done")
g.logger.Trace("peer gossip worker got stop signal via context.Done")
close(hd.stoppedCh)
return
}
Expand Down
10 changes: 5 additions & 5 deletions internal/consensus/gossiper.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func (g *msgGossiper) GossipVoteSetMaj23(
"round", prs.Round,
})
for _, msg := range msgs {
logger.Debug("syncing vote set +2/3 message")
logger.Trace("syncing vote set +2/3 message")
err := g.msgSender.send(ctx, msg)
if err != nil {
logger.Error("failed to syncing vote set +2/3 message to the peer", "error", err)
Expand All @@ -121,7 +121,7 @@ func (g *msgGossiper) GossipProposalBlockParts(
"round", prs.Round,
"part_index", index,
})
logger.Debug("syncing proposal block part to the peer")
logger.Trace("syncing proposal block part to the peer")
part := rs.ProposalBlockParts.GetPart(index)
// NOTE: A peer might have received a different proposal message, so this Proposal msg will be rejected!
err := g.syncProposalBlockPart(ctx, part, rs.Height, rs.Round)
Expand All @@ -137,7 +137,7 @@ func (g *msgGossiper) GossipProposal(ctx context.Context, rs cstypes.RoundState,
"round", prs.Round,
})
// Proposal: share the proposal metadata with peer.
logger.Debug("syncing proposal")
logger.Trace("syncing proposal")
err := g.sync(ctx, rs.Proposal.ToProto(), updatePeerProposal(g.ps, rs.Proposal))
if err != nil {
logger.Error("failed to sync proposal to the peer", "error", err)
Expand All @@ -156,7 +156,7 @@ func (g *msgGossiper) GossipProposal(ctx context.Context, rs cstypes.RoundState,
ProposalPolRound: rs.Proposal.POLRound,
ProposalPol: *pPolProto,
}
logger.Debug("syncing proposal POL")
logger.Trace("syncing proposal POL")
err = g.sync(ctx, propPOLMsg, nil)
if err != nil {
logger.Error("failed to sync proposal POL to the peer", "error", err)
Expand Down Expand Up @@ -245,7 +245,7 @@ func (g *msgGossiper) GossipVote(ctx context.Context, rs cstypes.RoundState, prs
"vote_round", vote.Round,
"proto_vote_size", protoVote.Size(),
})
logger.Debug("syncing vote message")
logger.Trace("syncing vote message")
err := g.sync(ctx, protoVote, updatePeerVote(g.ps, vote))
if err != nil {
logger.Error("failed to sync vote message to the peer", "error", err)
Expand Down
4 changes: 2 additions & 2 deletions internal/consensus/msg_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func blockPartMessageHandler(ctrl *Controller) msgHandlerFunc {
FromReplay: envelope.fromReplay,
}, stateData)
if err != nil && msg.Round != stateData.Round {
logger.Debug("received block part from wrong round")
logger.Trace("received block part from wrong round")
return nil
}
return err
Expand Down Expand Up @@ -170,7 +170,7 @@ func loggingMiddleware(logger log.Logger) msgMiddlewareFunc {
loggerWithArgs.Error("failed to process message", "error", err)
return nil
}
loggerWithArgs.Debug("message processed successfully")
loggerWithArgs.Trace("message processed successfully")
return nil
}
}
Expand Down
4 changes: 2 additions & 2 deletions internal/consensus/peer_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,7 @@ func (ps *PeerState) SetHasVote(vote *types.Vote) error {

// setHasVote will return an error when the index exceeds the bitArray length
func (ps *PeerState) setHasVote(height int64, round int32, voteType tmproto.SignedMsgType, index int32) error {
ps.logger.Debug(
ps.logger.Trace(
"peerState setHasVote",
"peer", ps.peerID,
"height", height,
Expand Down Expand Up @@ -466,7 +466,7 @@ func (ps *PeerState) SetHasCommit(commit *types.Commit) {
}

func (ps *PeerState) setHasCommit(height int64, round int32) {
ps.logger.Debug(
ps.logger.Trace(
"setHasCommit",
"height", height,
"round", round,
Expand Down
2 changes: 1 addition & 1 deletion internal/consensus/proposal_updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ func (u *proposalUpdater) updateStateData(stateData *StateData, blockID types.Bl
return nil
}
// If we don't have the block being committed, set up to get it.
u.logger.Info(
u.logger.Debug(
"commit is for a block we do not know about; set ProposalBlock=nil",
"proposal", stateData.ProposalBlock.Hash(),
"commit", blockID.Hash,
Expand Down
27 changes: 13 additions & 14 deletions internal/consensus/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ type channelBundle struct {
// messages on that p2p channel accordingly. The caller must be sure to execute
// OnStop to ensure the outbound p2p Channels are closed.
func (r *Reactor) OnStart(ctx context.Context) error {
r.logger.Debug("consensus wait sync", "wait_sync", r.WaitSync())
r.logger.Trace("consensus wait sync", "wait_sync", r.WaitSync())

peerUpdates := r.peerEvents(ctx, "consensus")

Expand Down Expand Up @@ -393,10 +393,10 @@ func (r *Reactor) broadcast(ctx context.Context, channel p2p.Channel, msg proto.
// logResult creates a log that depends on value of err
func (r *Reactor) logResult(err error, logger log.Logger, message string, keyvals ...interface{}) bool {
if err != nil {
logger.Debug(message+" error", append(keyvals, "error", err))
logger.Error(message+" error", append(keyvals, "error", err))
return false
}
logger.Debug(message+" success", keyvals...)
logger.Trace(message+" success", keyvals...)
return true
}

Expand All @@ -406,7 +406,7 @@ func (r *Reactor) logResult(err error, logger log.Logger, message string, keyval
// the peer. During peer removal, we remove the peer for our set of peers and
// signal to all spawned goroutines to gracefully exit in a non-blocking manner.
func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpdate, chans channelBundle) {
r.logger.Debug("received peer update", "peer", peerUpdate.NodeID, "status", peerUpdate.Status,
r.logger.Trace("received peer update", "peer", peerUpdate.NodeID, "status", peerUpdate.Status,
"peer_proTxHash", peerUpdate.ProTxHash.ShortString())

switch peerUpdate.Status {
Expand Down Expand Up @@ -547,7 +547,7 @@ func (r *Reactor) handleStateMessage(ctx context.Context, envelope *p2p.Envelope

case *tmcons.HasVote:
if err := ps.ApplyHasVoteMessage(msgI.(*HasVoteMessage)); err != nil {
r.logger.Error("applying HasVote message", "msg", msg, "err", err)
r.logger.Error("applying HasVote message failed", "msg", msg, "err", err)
return err
}
case *tmcons.VoteSetMaj23:
Expand Down Expand Up @@ -619,11 +619,11 @@ func (r *Reactor) handleDataMessage(ctx context.Context, envelope *p2p.Envelope,
}

if r.WaitSync() {
logger.Info("ignoring message received during sync", "msg", tmstrings.LazySprintf("%T", msgI))
logger.Debug("ignoring message received during sync", "msg", tmstrings.LazySprintf("%T", msgI))
return nil
}

logger.Debug("data channel processing", "msg", envelope.Message, "type", fmt.Sprintf("%T", envelope.Message))
logger.Trace("data channel processing", "msg", envelope.Message, "type", fmt.Sprintf("%T", envelope.Message))

switch msg := envelope.Message.(type) {
case *tmcons.Proposal:
Expand Down Expand Up @@ -660,11 +660,11 @@ func (r *Reactor) handleVoteMessage(ctx context.Context, envelope *p2p.Envelope,
}

if r.WaitSync() {
logger.Info("ignoring message received during sync", "msg", msgI)
logger.Debug("ignoring message received during sync", "msg", msgI)
return nil
}

logger.Debug("vote channel processing", "msg", envelope.Message, "type", fmt.Sprintf("%T", envelope.Message))
logger.Trace("vote channel processing", "msg", envelope.Message, "type", fmt.Sprintf("%T", envelope.Message))

switch msg := envelope.Message.(type) {
case *tmcons.Commit:
Expand Down Expand Up @@ -715,7 +715,7 @@ func (r *Reactor) handleVoteSetBitsMessage(ctx context.Context, envelope *p2p.En
}

if r.WaitSync() {
logger.Info("ignoring message received during sync", "msg", msgI)
logger.Debug("ignoring message received during sync", "msg", msgI)
return nil
}

Expand Down Expand Up @@ -779,8 +779,6 @@ func (r *Reactor) handleMessage(ctx context.Context, envelope *p2p.Envelope, cha
return err
}

//r.logger.Debug("received message", "ch_id", envelope.ChannelID, "message", msgI, "peer", envelope.From)

switch envelope.ChannelID {
case StateChannel:
err = r.handleStateMessage(ctx, envelope, msg, chans.voteSet)
Expand Down Expand Up @@ -835,15 +833,16 @@ func (r *Reactor) processPeerUpdates(ctx context.Context, peerUpdates *p2p.PeerU
func (r *Reactor) peerStatsRoutine(ctx context.Context, peerUpdates *p2p.PeerUpdates) {
for {
if !r.IsRunning() {
r.logger.Info("stopping peerStatsRoutine")
r.logger.Trace("stopping peerStatsRoutine")
return
}

select {
case msg := <-r.state.statsMsgQueue.ch:
ps, ok := r.GetPeerState(msg.PeerID)
if !ok || ps == nil {
r.logger.Debug("attempt to update stats for non-existent peer", "peer", msg.PeerID)
// it's quite common to happen when a peer is removed
r.logger.Trace("attempt to update stats for non-existent peer", "peer", msg.PeerID)
continue
}

Expand Down
10 changes: 5 additions & 5 deletions internal/consensus/replay.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func (cs *State) readReplayMessage(ctx context.Context, msg *TimedWALMessage, ne
// for logging
switch m := msg.Msg.(type) {
case types.EventDataRoundState:
cs.logger.Info("Replay: New Step", "height", m.Height, "round", m.Round, "step", m.Step)
cs.logger.Trace("Replay: New Step", "height", m.Height, "round", m.Round, "step", m.Step)
// these are playback checks
if newStepSub != nil {
ctxto, cancel := context.WithTimeout(ctx, 2*time.Second)
Expand Down Expand Up @@ -78,18 +78,18 @@ func (cs *State) readReplayMessage(ctx context.Context, msg *TimedWALMessage, ne
stateData.Votes.SetRound(p.Round)
stateData.Round = p.Round
}
cs.logger.Info("Replay: Proposal", "height", p.Height, "round", p.Round, "cs.Round", stateData.Round,
cs.logger.Trace("Replay: Proposal", "height", p.Height, "round", p.Round, "cs.Round", stateData.Round,
"header", p.BlockID.PartSetHeader, "pol", p.POLRound, "peer", peerID)
case *BlockPartMessage:
cs.logger.Info("Replay: BlockPart", "height", msg.Height, "round", msg.Round, "peer", peerID)
cs.logger.Trace("Replay: BlockPart", "height", msg.Height, "round", msg.Round, "peer", peerID)
case *VoteMessage:
v := msg.Vote
cs.logger.Info("Replay: Vote", "height", v.Height, "round", v.Round, "type", v.Type,
cs.logger.Trace("Replay: Vote", "height", v.Height, "round", v.Round, "type", v.Type,
"blockID", v.BlockID, "peer", peerID)
}
_ = cs.msgDispatcher.dispatch(ctx, &stateData, m, msgFromReplay())
case timeoutInfo:
cs.logger.Info("Replay: Timeout", "height", m.Height, "round", m.Round, "step", m.Step, "dur", m.Duration)
cs.logger.Trace("Replay: Timeout", "height", m.Height, "round", m.Round, "step", m.Step, "dur", m.Duration)
cs.handleTimeout(ctx, m, &stateData)
default:
return fmt.Errorf("replay: Unknown TimedWALMessage type: %v", reflect.TypeOf(msg.Msg))
Expand Down
6 changes: 3 additions & 3 deletions internal/consensus/replayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ func (r *BlockReplayer) replayBlock(
state sm.State,
height int64,
) (sm.CurrentRoundState, *abci.ResponseFinalizeBlock, error) {
r.logger.Info("Applying block", "height", height)
r.logger.Info("Replay: applying block", "height", height)
// Extra check to ensure the app was not changed in a way it shouldn't have.
ucState, err := r.blockExec.ProcessProposal(ctx, block, commit.Round, state, false)
if err != nil {
Expand Down Expand Up @@ -337,8 +337,8 @@ func (r *BlockReplayer) execInitChain(ctx context.Context, rs *replayState, stat
}

quorumType := state.Validators.QuorumType
if quorumType.Validate() != nil {
r.logger.Debug("state quorum type: %w", err)
if err := quorumType.Validate(); err != nil {
r.logger.Error("state quorum type validation failed: %w", err)
quorumType = r.genDoc.QuorumType
}

Expand Down
1 change: 0 additions & 1 deletion internal/consensus/round_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ type roundScheduler struct {

// ScheduleRound0 enterNewRoundCommand(height, 0) at StartTime
func (b *roundScheduler) ScheduleRound0(rs cstypes.RoundState) {
// b.logger.Info("scheduleRound0", "now", tmtime.Now(), "startTime", b.StartTime)
sleepDuration := rs.StartTime.Sub(tmtime.Now())
b.ScheduleTimeout(sleepDuration, rs.Height, 0, cstypes.RoundStepNewHeight)
}
Expand Down
6 changes: 3 additions & 3 deletions internal/consensus/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,7 @@ func (cs *State) OnStart(ctx context.Context) error {
return err
}

cs.logger.Debug("backed up WAL file", "src", cs.config.WalFile(), "dst", corruptedFile)
cs.logger.Info("backed up WAL file", "src", cs.config.WalFile(), "dst", corruptedFile)

// 3) try to repair (WAL file will be overwritten!)
if err := repairWalFile(corruptedFile, cs.config.WalFile()); err != nil {
Expand Down Expand Up @@ -722,7 +722,7 @@ func (cs *State) handleTimeout(
ti timeoutInfo,
stateData *StateData,
) {
cs.logger.Debug("received tock", "timeout", ti.Duration, "height", ti.Height, "round", ti.Round, "step", ti.Step)
cs.logger.Trace("received tock", "timeout", ti.Duration, "height", ti.Height, "round", ti.Round, "step", ti.Step)

// timeouts must be for current height, round, step
if ti.Height != stateData.Height || ti.Round < stateData.Round || (ti.Round == stateData.Round && ti.Step < stateData.Step) {
Expand Down Expand Up @@ -801,7 +801,7 @@ func (cs *State) CreateProposalBlock(ctx context.Context) (*types.Block, error)

// PublishCommitEvent ...
func (cs *State) PublishCommitEvent(commit *types.Commit) error {
cs.logger.Debug("publish commit event", "commit", commit)
cs.logger.Trace("publish commit event", "commit", commit)
if err := cs.eventBus.PublishEventCommit(types.EventDataCommit{Commit: commit}); err != nil {
return err
}
Expand Down
34 changes: 19 additions & 15 deletions internal/consensus/state_add_prop_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (c *AddProposalBlockPartAction) addProposalBlockPart(
peerID types.NodeID,
) (bool, error) {
height, round, part := msg.Height, msg.Round, msg.Part
c.logger.Info(
c.logger.Trace(
"addProposalBlockPart",
"height", stateData.Height,
"round", stateData.Round,
Expand Down Expand Up @@ -158,28 +158,30 @@ func (c *AddProposalBlockPartAction) addProposalBlockPart(
}

// NOTE: it's possible to receive complete proposal blocks for future rounds without having the proposal
c.logger.Info(
"received complete proposal block",
"height", stateData.ProposalBlock.Height,
"hash", stateData.ProposalBlock.Hash(),
"round_height", stateData.RoundState.GetHeight(),
)

c.eventPublisher.PublishCompleteProposalEvent(stateData.CompleteProposalEvent())

if stateData.Commit != nil {
c.logger.Info("Proposal block fully received", "proposal", stateData.ProposalBlock)
c.logger.Info("Commit already present", "commit", stateData.Commit)
c.logger.Debug("adding commit after complete proposal",
"height", stateData.ProposalBlock.Height,
c.logger.Info("received complete proposal block and commit",
"proposal", stateData.ProposalBlock,
"commit", stateData.Commit,
"height", stateData.RoundState.Height,
"round", stateData.RoundState.Round,
"hash", stateData.ProposalBlock.Hash(),
)
// We received a commit before the block
// Transit to AddCommit
return added, ctrl.Dispatch(ctx, &AddCommitEvent{Commit: stateData.Commit}, stateData)
}

return added, nil
c.logger.Info(
"received complete proposal block",
"height", stateData.RoundState.Height,
"round", stateData.RoundState.Round,
"proposal_height", stateData.ProposalBlock.Height,
"hash", stateData.ProposalBlock.Hash(),
"round_height", stateData.RoundState.GetHeight(),
)

c.eventPublisher.PublishCompleteProposalEvent(stateData.CompleteProposalEvent())
}

return added, nil
Expand Down Expand Up @@ -231,7 +233,9 @@ func (c *ProposalCompletedAction) Execute(ctx context.Context, stateEvent StateE
// Move onto the next step
// We should allow old blocks if we are recovering from replay
c.logger.Debug("entering prevote after complete proposal",
"height", stateData.ProposalBlock.Height,
"height", stateData.RoundState.Height,
"round", stateData.RoundState.Round,
"proposal_height", stateData.ProposalBlock.Height,
"hash", stateData.ProposalBlock.Hash(),
)
err := stateEvent.Ctrl.Dispatch(ctx, &EnterPrevoteEvent{
Expand Down
Loading