Skip to content

Commit

Permalink
Add metrics to Libp2p networking processes
Browse files Browse the repository at this point in the history
  • Loading branch information
Djadih authored and gameofpointers committed Apr 6, 2024
1 parent 9e4507f commit edbc370
Show file tree
Hide file tree
Showing 12 changed files with 549 additions and 52 deletions.
24 changes: 24 additions & 0 deletions common/common_metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package common

import (
"github.com/dominant-strategies/go-quai/metrics_config"
"github.com/prometheus/client_golang/prometheus"
)

var (
messageMetrics *prometheus.CounterVec
peerMetrics *prometheus.GaugeVec
)

func init() {
registerMetrics()
}

func registerMetrics() {
messageMetrics = metrics_config.NewCounterVec("MessageCounters", "Counters to track messages sent over the P2P layer")
messageMetrics.WithLabelValues("sent")
messageMetrics.WithLabelValues("received")

peerMetrics = metrics_config.NewGaugeVec("PeerGauges", "Track the number of peers connected to this node")
peerMetrics.WithLabelValues("numPeers")
}
7 changes: 7 additions & 0 deletions common/stream_services.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ func ReadMessageFromStream(stream network.Stream) ([]byte, error) {
return nil, errors.Wrap(err, "failed to read message")
}

if messageMetrics != nil {
messageMetrics.WithLabelValues("received").Inc()
}
return data, nil
}

Expand All @@ -53,5 +56,9 @@ func WriteMessageToStream(stream network.Stream, msg []byte) error {
if err != nil {
return errors.Wrap(err, "failed to write message to stream")
}

if messageMetrics != nil {
messageMetrics.WithLabelValues("sent").Inc()
}
return nil
}
36 changes: 18 additions & 18 deletions core/tx_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,10 +122,10 @@ var (
underpricedTxMeter = txpoolMetrics.WithLabelValues("underpriced") // Underpriced transaction
overflowedTxMeter = txpoolMetrics.WithLabelValues("overflowed") // Overflowed transaction

pendingGauge = txpoolMetrics.WithLabelValues("pending")
queuedGauge = txpoolMetrics.WithLabelValues("queued")
localGauge = txpoolMetrics.WithLabelValues("local")
slotsGauge = txpoolMetrics.WithLabelValues("slots")
pendingTxGauge = txpoolMetrics.WithLabelValues("pending")
queuedGauge = txpoolMetrics.WithLabelValues("queued")
localTxGauge = txpoolMetrics.WithLabelValues("local")
slotsGauge = txpoolMetrics.WithLabelValues("slots")

reheapTimer = metrics_config.NewTimer("Reheap", "Reheap timer")
)
Expand Down Expand Up @@ -340,9 +340,9 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain block
underpricedTxMeter.Set(0)
overflowedTxMeter.Set(0)

pendingGauge.Set(0)
pendingTxGauge.Set(0)
queuedGauge.Set(0)
localGauge.Set(0)
localTxGauge.Set(0)

// Sanitize the input to ensure no vulnerable gas prices are set
config = (&config).sanitize(logger)
Expand Down Expand Up @@ -874,7 +874,7 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (replaced bool, err e
pool.priced.Removed(pool.all.RemoteToLocals(pool.locals)) // Migrate the remotes if it's marked as local first time.
}
if isLocal {
localGauge.Add(1)
localTxGauge.Add(1)
}
pool.journalTx(internal, tx)
pool.queueTxEvent(tx)
Expand Down Expand Up @@ -971,7 +971,7 @@ func (pool *TxPool) promoteTx(addr common.InternalAddress, hash common.Hash, tx
pendingReplaceMeter.Add(1)
} else {
// Nothing was replaced, bump the pending counter
pendingGauge.Add(1)
pendingTxGauge.Add(1)
}
// Set the potentially new pending nonce and notify any subsystems of the new tx
pool.pendingNonces.set(addr, tx.Nonce()+1)
Expand Down Expand Up @@ -1256,7 +1256,7 @@ func (pool *TxPool) removeTx(hash common.Hash, outofbound bool) {
pool.priced.Removed(1)
}
if pool.locals.contains(internal) {
localGauge.Dec()
localTxGauge.Dec()
}
// Remove the transaction from the pending lists and reset the account nonce
if pending := pool.pending[internal]; pending != nil {
Expand All @@ -1273,7 +1273,7 @@ func (pool *TxPool) removeTx(hash common.Hash, outofbound bool) {
// Update the account nonce if needed
pool.pendingNonces.setIfLower(internal, tx.Nonce())
// Reduce the pending counter
pendingGauge.Sub(float64(len(invalids) + 1))
pendingTxGauge.Sub(float64(len(invalids) + 1))
return
}
}
Expand Down Expand Up @@ -1690,7 +1690,7 @@ func (pool *TxPool) promoteExecutables(accounts []common.InternalAddress) []*typ
pool.priced.Removed(len(forwards) + len(drops) + len(caps))
queuedRateLimitMeter.Sub(float64(len(forwards) + len(drops) + len(caps)))
if pool.locals.contains(addr) {
localGauge.Sub(float64(len(forwards) + len(drops) + len(caps)))
localTxGauge.Sub(float64(len(forwards) + len(drops) + len(caps)))
}
// Delete the entire queue entry if it became empty.
if list.Empty() {
Expand Down Expand Up @@ -1757,9 +1757,9 @@ func (pool *TxPool) truncatePending() {
pool.logger.WithField("hash", hash).Trace("Removed fairness-exceeding pending transaction")
}
pool.priced.Removed(len(caps))
pendingGauge.Sub(float64(len(caps)))
pendingTxGauge.Sub(float64(len(caps)))
if pool.locals.contains(offenders[i]) {
localGauge.Sub(float64(len(caps)))
localTxGauge.Sub(float64(len(caps)))
}
pending--
}
Expand All @@ -1784,9 +1784,9 @@ func (pool *TxPool) truncatePending() {
pool.logger.WithField("hash", hash).Trace("Removed fairness-exceeding pending transaction")
}
pool.priced.Removed(len(caps))
pendingGauge.Sub(float64(len(caps)))
pendingTxGauge.Sub(float64(len(caps)))
if pool.locals.contains(addr) {
localGauge.Sub(float64(len(caps)))
localTxGauge.Sub(float64(len(caps)))
}
pending--
}
Expand Down Expand Up @@ -1890,9 +1890,9 @@ func (pool *TxPool) demoteUnexecutables() {
pool.enqueueTx(hash, tx, false, false)
}
removedTxs := float64(len(olds) + len(drops) + len(invalids))
pendingGauge.Sub(removedTxs)
pendingTxGauge.Sub(removedTxs)
if pool.locals.contains(addr) {
localGauge.Sub(removedTxs)
localTxGauge.Sub(removedTxs)
}
// If there's a gap in front, alert (should never happen) and postpone all transactions
if list.Len() > 0 && list.txs.Get(nonce) == nil {
Expand All @@ -1904,7 +1904,7 @@ func (pool *TxPool) demoteUnexecutables() {
// Internal shuffle shouldn't touch the lookup set.
pool.enqueueTx(hash, tx, false, false)
}
pendingGauge.Sub(float64(len(gapped)))
pendingTxGauge.Sub(float64(len(gapped)))
}
// Delete the entire pending entry if it became empty.
if list.Empty() {
Expand Down
Loading

0 comments on commit edbc370

Please sign in to comment.