Skip to content

Commit

Permalink
go/runtime/txpool: new metrics for multiple queue counts
Browse files Browse the repository at this point in the history
  • Loading branch information
pro-wh committed Aug 19, 2022
1 parent f3736bb commit a05f5dd
Show file tree
Hide file tree
Showing 7 changed files with 43 additions and 8 deletions.
4 changes: 3 additions & 1 deletion docs/oasis-node/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,11 @@ oasis_storage_latency | Summary | Storage call latency (seconds). | call | [stor
oasis_storage_successes | Counter | Number of storage successes. | call | [storage/api](https://github.com/oasisprotocol/oasis-core/tree/master/go/storage/api/metrics.go)
oasis_storage_value_size | Summary | Storage call value size (bytes). | call | [storage/api](https://github.com/oasisprotocol/oasis-core/tree/master/go/storage/api/metrics.go)
oasis_txpool_accepted_transactions | Counter | Number of accepted transactions (passing check tx). | runtime | [runtime/txpool](https://github.com/oasisprotocol/oasis-core/tree/master/go/runtime/txpool/metrics.go)
oasis_txpool_local_queue_size | Gauge | Size of the local transactions schedulable queue (number of entries). | runtime | [runtime/txpool](https://github.com/oasisprotocol/oasis-core/tree/master/go/runtime/txpool/metrics.go)
oasis_txpool_pending_check_size | Gauge | Size of the pending to be checked queue (number of entries). | runtime | [runtime/txpool](https://github.com/oasisprotocol/oasis-core/tree/master/go/runtime/txpool/metrics.go)
oasis_txpool_pending_schedule_size | Gauge | Size of the pending to be scheduled queue (number of entries). | runtime | [runtime/txpool](https://github.com/oasisprotocol/oasis-core/tree/master/go/runtime/txpool/metrics.go)
oasis_txpool_pending_schedule_size | Gauge | Size of the main schedulable queue (number of entries). | runtime | [runtime/txpool](https://github.com/oasisprotocol/oasis-core/tree/master/go/runtime/txpool/metrics.go)
oasis_txpool_rejected_transactions | Counter | Number of rejected transactions (failing check tx). | runtime | [runtime/txpool](https://github.com/oasisprotocol/oasis-core/tree/master/go/runtime/txpool/metrics.go)
oasis_txpool_rim_queue_size | Gauge | Size of the roothash incoming message transactions schedulable queue (number of entries). | runtime | [runtime/txpool](https://github.com/oasisprotocol/oasis-core/tree/master/go/runtime/txpool/metrics.go)
oasis_up | Gauge | Is oasis-test-runner active for specific scenario. | | [oasis-node/cmd/common/metrics](https://github.com/oasisprotocol/oasis-core/tree/master/go/oasis-node/cmd/common/metrics/metrics.go)
oasis_worker_aborted_batch_count | Counter | Number of aborted batches. | runtime | [worker/compute/executor/committee](https://github.com/oasisprotocol/oasis-core/tree/master/go/worker/compute/executor/committee/node.go)
oasis_worker_batch_processing_time | Summary | Time it takes for a batch to finalize (seconds). | runtime | [worker/compute/executor/committee](https://github.com/oasisprotocol/oasis-core/tree/master/go/worker/compute/executor/committee/node.go)
Expand Down
6 changes: 6 additions & 0 deletions go/runtime/txpool/local_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,3 +91,9 @@ func (lq *localQueue) GetTxsToPublish() []*TxQueueMeta {
defer lq.l.Unlock()
return append([]*TxQueueMeta(nil), lq.txs...)
}

func (lq *localQueue) size() int {
lq.l.Lock()
defer lq.l.Unlock()
return len(lq.txs)
}
1 change: 1 addition & 0 deletions go/runtime/txpool/local_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ func TestLocalQueueBasic(t *testing.T) {
rawB := []byte("b")
txB := &TxQueueMeta{Raw: rawB, Hash: hash.NewFromBytes(rawB)}
require.NoError(t, lq.OfferChecked(txB, &protocol.CheckTxMetadata{Priority: 5}), "offer checked a")
require.Equal(t, 2, lq.size())

// We should preserve the order. Publish in original order.
require.EqualValues(t, []*TxQueueMeta{txA, txB}, lq.GetTxsToPublish(), "get txs to publish")
Expand Down
22 changes: 19 additions & 3 deletions go/runtime/txpool/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,24 @@ var (
},
[]string{"runtime"},
)
pendingScheduleSize = prometheus.NewGaugeVec(
mainQueueSize = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "oasis_txpool_pending_schedule_size",
Help: "Size of the pending to be scheduled queue (number of entries).",
Help: "Size of the main schedulable queue (number of entries).",
},
[]string{"runtime"},
)
localQueueSize = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "oasis_txpool_local_queue_size",
Help: "Size of the local transactions schedulable queue (number of entries).",
},
[]string{"runtime"},
)
rimQueueSize = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "oasis_txpool_rim_queue_size",
Help: "Size of the roothash incoming message transactions schedulable queue (number of entries).",
},
[]string{"runtime"},
)
Expand All @@ -37,7 +51,9 @@ var (
)
txpoolCollectors = []prometheus.Collector{
pendingCheckSize,
pendingScheduleSize,
mainQueueSize,
localQueueSize,
rimQueueSize,
rejectedTransactions,
acceptedTransactions,
}
Expand Down
6 changes: 6 additions & 0 deletions go/runtime/txpool/rim_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,9 @@ func (rq *rimQueue) Load(inMsgs []*message.IncomingMessage) {
defer rq.l.Unlock()
rq.txs = newTxs
}

func (rq *rimQueue) size() int {
rq.l.Lock()
defer rq.l.Unlock()
return len(rq.txs)
}
1 change: 1 addition & 0 deletions go/runtime/txpool/rim_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ func TestRimQueue(t *testing.T) {
},
})
require.EqualValues(t, map[hash.Hash]*TxQueueMeta{txA.Hash: txA}, rq.txs, "after load")
require.Equal(t, 1, rq.size())

require.Nil(t, rq.GetSchedulingSuggestion(50), "get scheduling suggestion")
rq.HandleTxsUsed([]hash.Hash{txA.Hash})
Expand Down
11 changes: 7 additions & 4 deletions go/runtime/txpool/txpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,8 +364,8 @@ func (t *txPool) HandleTxsUsed(hashes []hash.Hash) {
q.HandleTxsUsed(hashes)
}

// todo: metrics
// pendingScheduleSize.With(t.getMetricLabels()).Set(float64(t.schedulerQueue.size()))
mainQueueSize.With(t.getMetricLabels()).Set(float64(t.mainQueue.inner.size()))
localQueueSize.With(t.getMetricLabels()).Set(float64(t.localQueue.size()))
}

func (t *txPool) GetKnownBatch(batch []hash.Hash) ([]*TxQueueMeta, map[hash.Hash]int) {
Expand Down Expand Up @@ -432,6 +432,7 @@ func (t *txPool) ProcessBlock(bi *BlockInfo) error {

func (t *txPool) ProcessIncomingMessages(inMsgs []*message.IncomingMessage) error {
t.rimQueue.Load(inMsgs)
rimQueueSize.With(t.getMetricLabels()).Set(float64(t.rimQueue.size()))
return nil
}

Expand Down Expand Up @@ -644,8 +645,8 @@ func (t *txPool) checkTxBatch(ctx context.Context, rr host.RichRuntime) {
t.schedulerNotifier.Broadcast(false)
}

// todo: metrics
// pendingScheduleSize.With(t.getMetricLabels()).Set(float64(t.PendingScheduleSize()))
mainQueueSize.With(t.getMetricLabels()).Set(float64(t.mainQueue.inner.size()))
localQueueSize.With(t.getMetricLabels()).Set(float64(t.localQueue.size()))
}

func (t *txPool) ensureInitialized() error {
Expand Down Expand Up @@ -847,6 +848,8 @@ func (t *txPool) recheck() {
results = append(results, notifyCh)
}
}
mainQueueSize.With(t.getMetricLabels()).Set(float64(t.mainQueue.inner.size()))
localQueueSize.With(t.getMetricLabels()).Set(float64(t.localQueue.size()))

if len(pcts) == 0 {
return
Expand Down

0 comments on commit a05f5dd

Please sign in to comment.