Skip to content

Commit

Permalink
Merge branch 'fix-mempool-spam' into test/abci-routing
Browse files Browse the repository at this point in the history
  • Loading branch information
lklimek committed Feb 14, 2024
2 parents 160fc86 + 3a37576 commit 1562299
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 7 deletions.
1 change: 1 addition & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -788,6 +788,7 @@ type MempoolConfig struct {
MaxTxsBytes int64 `mapstructure:"max-txs-bytes"`

// Size of the cache (used to filter transactions we saw earlier) in transactions
// Should be much bigger than mempool size.
CacheSize int `mapstructure:"cache-size"`

// Do not remove invalid transactions from the cache (default: false)
Expand Down
1 change: 1 addition & 0 deletions config/toml.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,7 @@ size = {{ .Mempool.Size }}
max-txs-bytes = {{ .Mempool.MaxTxsBytes }}
# Size of the cache (used to filter transactions we saw earlier) in transactions
# Should be much bigger than mempool size.
cache-size = {{ .Mempool.CacheSize }}
# Do not remove invalid transactions from the cache (default: false)
Expand Down
15 changes: 8 additions & 7 deletions internal/p2p/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -703,7 +703,8 @@ func (r *Router) routePeer(ctx context.Context, peerID types.NodeID, conn Connec
r.peerManager.Ready(ctx, peerID, channels)

// we use context to manage the lifecycle of the peer
ctx, cancel := context.WithCancel(ctx)
// note that original ctx will be used in cleanup
ioCtx, cancel := context.WithCancel(ctx)
defer cancel()

sendQueue := r.getOrMakeQueue(peerID, channels)
Expand Down Expand Up @@ -732,16 +733,16 @@ func (r *Router) routePeer(ctx context.Context, peerID types.NodeID, conn Connec

go func() {
select {
case errCh <- r.receivePeer(ctx, peerID, conn):
case <-ctx.Done():
case errCh <- r.receivePeer(ioCtx, peerID, conn):
case <-ioCtx.Done():
}
wg.Done()
}()

go func() {
select {
case errCh <- r.sendPeer(ctx, peerID, conn, sendQueue):
case <-ctx.Done():
case errCh <- r.sendPeer(ioCtx, peerID, conn, sendQueue):
case <-ioCtx.Done():
}
wg.Done()
}()
Expand All @@ -756,9 +757,9 @@ func (r *Router) routePeer(ctx context.Context, peerID types.NodeID, conn Connec
select {
case err = <-errCh:
r.logger.Debug("routePeer: received error from subroutine 1", "peer", peerID, "err", err)
case <-ctx.Done():
case <-ioCtx.Done():
r.logger.Debug("routePeer: ctx done", "peer", peerID)
ctxErr = ctx.Err()
ctxErr = ioCtx.Err()
}

// goroutine 1 has finished, so we can cancel the context and close everything
Expand Down

0 comments on commit 1562299

Please sign in to comment.