Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

go/runtime/txpool: transactions in incoming messages #4681

Merged
merged 17 commits into from
Aug 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions .changelog/4681.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
go/runtime/txpool: Add roothash incoming messages' data as transactions

Roothash incoming messages can provide a piece of data for the runtime.
With this change, the data is now treated as a transaction.

Along with this change, we're splitting the txpool into multiple queues.
The transactions collected from roothash incoming messages go in a special
queue that does not undergo checking or broadcasting.

We also make another queue for a node's own transactions, so that a proposer
can prioritize its own transactions.
4 changes: 3 additions & 1 deletion docs/oasis-node/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,11 @@ oasis_storage_latency | Summary | Storage call latency (seconds). | call | [stor
oasis_storage_successes | Counter | Number of storage successes. | call | [storage/api](https://github.com/oasisprotocol/oasis-core/tree/master/go/storage/api/metrics.go)
oasis_storage_value_size | Summary | Storage call value size (bytes). | call | [storage/api](https://github.com/oasisprotocol/oasis-core/tree/master/go/storage/api/metrics.go)
oasis_txpool_accepted_transactions | Counter | Number of accepted transactions (passing check tx). | runtime | [runtime/txpool](https://github.com/oasisprotocol/oasis-core/tree/master/go/runtime/txpool/metrics.go)
oasis_txpool_local_queue_size | Gauge | Size of the local transactions schedulable queue (number of entries). | runtime | [runtime/txpool](https://github.com/oasisprotocol/oasis-core/tree/master/go/runtime/txpool/metrics.go)
oasis_txpool_pending_check_size | Gauge | Size of the pending to be checked queue (number of entries). | runtime | [runtime/txpool](https://github.com/oasisprotocol/oasis-core/tree/master/go/runtime/txpool/metrics.go)
oasis_txpool_pending_schedule_size | Gauge | Size of the pending to be scheduled queue (number of entries). | runtime | [runtime/txpool](https://github.com/oasisprotocol/oasis-core/tree/master/go/runtime/txpool/metrics.go)
oasis_txpool_pending_schedule_size | Gauge | Size of the main schedulable queue (number of entries). | runtime | [runtime/txpool](https://github.com/oasisprotocol/oasis-core/tree/master/go/runtime/txpool/metrics.go)
oasis_txpool_rejected_transactions | Counter | Number of rejected transactions (failing check tx). | runtime | [runtime/txpool](https://github.com/oasisprotocol/oasis-core/tree/master/go/runtime/txpool/metrics.go)
oasis_txpool_rim_queue_size | Gauge | Size of the roothash incoming message transactions schedulable queue (number of entries). | runtime | [runtime/txpool](https://github.com/oasisprotocol/oasis-core/tree/master/go/runtime/txpool/metrics.go)
oasis_up | Gauge | Is oasis-test-runner active for specific scenario. | | [oasis-node/cmd/common/metrics](https://github.com/oasisprotocol/oasis-core/tree/master/go/oasis-node/cmd/common/metrics/metrics.go)
oasis_worker_aborted_batch_count | Counter | Number of aborted batches. | runtime | [worker/compute/executor/committee](https://github.com/oasisprotocol/oasis-core/tree/master/go/worker/compute/executor/committee/node.go)
oasis_worker_batch_processing_time | Summary | Time it takes for a batch to finalize (seconds). | runtime | [worker/compute/executor/committee](https://github.com/oasisprotocol/oasis-core/tree/master/go/worker/compute/executor/committee/node.go)
Expand Down
2 changes: 1 addition & 1 deletion go/roothash/api/message/incoming_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type IncomingMessage struct {
// transferred before the message is processed by the runtime.
Tokens quantity.Quantity `json:"tokens,omitempty"`

// Data is arbitrary runtime-dependent data.
// Data is a runtime transaction.
Data []byte `json:"data,omitempty"`
}

Expand Down
2 changes: 1 addition & 1 deletion go/runtime/registry/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ func (h *runtimeHostHandler) Handle(ctx context.Context, body *protocol.Body) (*
return nil, err
}

batch := txPool.GetPrioritizedBatch(rq.Offset, rq.Limit)
batch := txPool.GetSchedulingExtra(rq.Offset, rq.Limit)
raw := make([][]byte, 0, len(batch))
for _, tx := range batch {
raw = append(raw, tx.Raw())
Expand Down
11 changes: 8 additions & 3 deletions go/runtime/txpool/check_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,16 @@ import (
"testing"

"github.com/stretchr/testify/require"

"github.com/oasisprotocol/oasis-core/go/common/crypto/hash"
)

func newPendingTx(tx []byte) *PendingCheckTransaction {
return &PendingCheckTransaction{
Transaction: newTransaction(tx, txStatusPendingCheck),
TxQueueMeta: &TxQueueMeta{
raw: tx,
hash: hash.NewFromBytes(tx),
},
}
}

Expand All @@ -34,9 +39,9 @@ func TestCheckTxQueueBasic(t *testing.T) {
require.EqualValues(t, 10, len(batch), "Batch size")
require.EqualValues(t, 41, queue.size(), "Size")

require.EqualValues(t, batch[0].tx, []byte("hello world"))
require.EqualValues(t, batch[0].Raw(), []byte("hello world"))
for i := 0; i < 9; i++ {
require.EqualValues(t, batch[i+1].tx, []byte(fmt.Sprintf("call %d", i)))
require.EqualValues(t, batch[i+1].Raw(), []byte(fmt.Sprintf("call %d", i)))
}

queue.clear()
Expand Down
99 changes: 99 additions & 0 deletions go/runtime/txpool/local_queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package txpool

import (
"sync"

"github.com/oasisprotocol/oasis-core/go/common/crypto/hash"
"github.com/oasisprotocol/oasis-core/go/runtime/host/protocol"
)

var (
_ UsableTransactionSource = (*localQueue)(nil)
_ RecheckableTransactionStore = (*localQueue)(nil)
_ RepublishableTransactionSource = (*localQueue)(nil)
)

// localQueue is a "front of the line" area for txs from our own node. We also keep these txs in order.
type localQueue struct {
pro-wh marked this conversation as resolved.
Show resolved Hide resolved
l sync.Mutex
txs []*TxQueueMeta
indexesByHash map[hash.Hash]int
}

func newLocalQueue() *localQueue {
return &localQueue{
indexesByHash: map[hash.Hash]int{},
}
}

func (lq *localQueue) GetSchedulingSuggestion(countHint uint32) []*TxQueueMeta {
lq.l.Lock()
defer lq.l.Unlock()
return append([]*TxQueueMeta(nil), lq.txs...)
}

func (lq *localQueue) GetTxByHash(h hash.Hash) *TxQueueMeta {
lq.l.Lock()
defer lq.l.Unlock()
i, ok := lq.indexesByHash[h]
if !ok {
return nil
}
return lq.txs[i]
}

func (lq *localQueue) HandleTxsUsed(hashes []hash.Hash) {
lq.l.Lock()
defer lq.l.Unlock()
origCount := len(lq.txs)
keptCount := origCount
for _, h := range hashes {
if i, ok := lq.indexesByHash[h]; ok {
delete(lq.indexesByHash, h)
lq.txs[i] = nil
keptCount--
}
}
if keptCount == origCount {
return
}
keptTxs := make([]*TxQueueMeta, 0, keptCount)
for _, tx := range lq.txs {
if tx == nil {
continue
}
i := len(keptTxs)
keptTxs = append(keptTxs, tx)
lq.indexesByHash[tx.Hash()] = i
}
lq.txs = keptTxs
}

func (lq *localQueue) TakeAll() []*TxQueueMeta {
lq.l.Lock()
defer lq.l.Unlock()
txs := lq.txs
lq.txs = nil
lq.indexesByHash = make(map[hash.Hash]int)
return txs
}

func (lq *localQueue) OfferChecked(tx *TxQueueMeta, _ *protocol.CheckTxMetadata) error {
lq.l.Lock()
defer lq.l.Unlock()
lq.indexesByHash[tx.Hash()] = len(lq.txs)
lq.txs = append(lq.txs, tx)
return nil
}

func (lq *localQueue) GetTxsToPublish() []*TxQueueMeta {
lq.l.Lock()
defer lq.l.Unlock()
return append([]*TxQueueMeta(nil), lq.txs...)
}

func (lq *localQueue) size() int {
lq.l.Lock()
defer lq.l.Unlock()
return len(lq.txs)
}
44 changes: 44 additions & 0 deletions go/runtime/txpool/local_queue_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package txpool

import (
"testing"

"github.com/stretchr/testify/require"

"github.com/oasisprotocol/oasis-core/go/common/crypto/hash"
"github.com/oasisprotocol/oasis-core/go/runtime/host/protocol"
)

func TestLocalQueueBasic(t *testing.T) {
lq := newLocalQueue()

require.Len(t, lq.GetSchedulingSuggestion(50), 0, "get scheduling suggestion")

// Add two transactions, with a higher priority one coming later.
rawA := []byte("a")
txA := &TxQueueMeta{raw: rawA, hash: hash.NewFromBytes(rawA)}
require.NoError(t, lq.OfferChecked(txA, &protocol.CheckTxMetadata{Priority: 1}), "offer checked a")
rawB := []byte("b")
txB := &TxQueueMeta{raw: rawB, hash: hash.NewFromBytes(rawB)}
require.NoError(t, lq.OfferChecked(txB, &protocol.CheckTxMetadata{Priority: 5}), "offer checked a")
require.Equal(t, 2, lq.size())

// We should preserve the order. Publish in original order.
require.EqualValues(t, []*TxQueueMeta{txA, txB}, lq.GetTxsToPublish(), "get txs to publish")
// Schedule in original order.
require.EqualValues(t, []*TxQueueMeta{txA, txB}, lq.GetSchedulingSuggestion(50), "get scheduling suggestion")

tx := lq.GetTxByHash(txA.Hash())
require.EqualValues(t, txA, tx, "get tx by hash a")
hashC := hash.NewFromBytes([]byte("c"))
tx = lq.GetTxByHash(hashC)
require.Nil(t, tx, "get tx by hash c")

lq.HandleTxsUsed([]hash.Hash{hashC})
require.EqualValues(t, map[hash.Hash]int{txA.Hash(): 0, txB.Hash(): 1}, lq.indexesByHash, "after handle txs used absent")
lq.HandleTxsUsed([]hash.Hash{txA.Hash()})
require.EqualValues(t, map[hash.Hash]int{txB.Hash(): 0}, lq.indexesByHash, "after handle txs used")

require.EqualValues(t, []*TxQueueMeta{txB}, lq.TakeAll(), "take all")
require.Len(t, lq.GetSchedulingSuggestion(50), 0, "after take all")
}
137 changes: 137 additions & 0 deletions go/runtime/txpool/main_queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
package txpool

import (
"fmt"

"github.com/oasisprotocol/oasis-core/go/common/crypto/hash"
"github.com/oasisprotocol/oasis-core/go/runtime/host/protocol"
)

var (
_ UsableTransactionSource = (*mainQueue)(nil)
_ RecheckableTransactionStore = (*mainQueue)(nil)
_ RepublishableTransactionSource = (*mainQueue)(nil)
)

// MainQueueTransaction is a transaction and its metadata in the main queue.
type MainQueueTransaction struct {
TxQueueMeta

// priority defines the transaction's priority as specified by the runtime.
priority uint64

// sender is a unique transaction sender identifier as specified by the runtime.
sender string
// senderSeq is a per-sender sequence number as specified by the runtime.
senderSeq uint64
}

func newTransaction(tx TxQueueMeta) *MainQueueTransaction {
return &MainQueueTransaction{
TxQueueMeta: tx,
}
}

// String returns a string representation of a transaction.
func (tx *MainQueueTransaction) String() string {
return fmt.Sprintf("MainQueueTransaction{hash: %s, first_seen: %s, priority: %d}", tx.Hash(), tx.FirstSeen(), tx.priority)
}

// Priority returns the transaction priority.
func (tx *MainQueueTransaction) Priority() uint64 {
return tx.priority
}

// Sender returns the transaction sender.
func (tx *MainQueueTransaction) Sender() string {
return tx.sender
}

// SenderSeq returns the per-sender sequence number.
func (tx *MainQueueTransaction) SenderSeq() uint64 {
return tx.senderSeq
}

// setChecked populates transaction data retrieved from checks.
func (tx *MainQueueTransaction) setChecked(meta *protocol.CheckTxMetadata) {
if meta != nil {
tx.priority = meta.Priority
tx.sender = string(meta.Sender)
tx.senderSeq = meta.SenderSeq
}

// If the sender is empty (e.g. because the runtime does not support specifying a sender), we
// treat each transaction as having a unique sender. This is to allow backwards compatibility.
if len(tx.sender) == 0 {
h := tx.Hash()
tx.sender = string(h[:])
}
}

// mainQueue is a priority queue for transactions that we give no special treatment.
type mainQueue struct {
// This implementation adapts the existing scheduleQueue code.
inner *scheduleQueue
}

func newMainQueue(capacity int) *mainQueue {
return &mainQueue{
inner: newScheduleQueue(capacity),
}
}

func (mq *mainQueue) GetSchedulingSuggestion(countHint uint32) []*TxQueueMeta {
txMetas := mq.inner.getPrioritizedBatch(nil, countHint)
var txs []*TxQueueMeta
for _, txMeta := range txMetas {
txs = append(txs, &txMeta.TxQueueMeta)
}
return txs
}

func (mq *mainQueue) GetTxByHash(h hash.Hash) *TxQueueMeta {
txMetas, _ := mq.inner.getKnownBatch([]hash.Hash{h})
if txMetas[0] == nil {
return nil
}
return &txMetas[0].TxQueueMeta
}

func (mq *mainQueue) HandleTxsUsed(hashes []hash.Hash) {
mq.inner.remove(hashes)
}

func (mq *mainQueue) GetSchedulingExtra(offset *hash.Hash, limit uint32) []*TxQueueMeta {
txMetas := mq.inner.getPrioritizedBatch(offset, limit)
var txs []*TxQueueMeta
for _, txMeta := range txMetas {
txs = append(txs, &txMeta.TxQueueMeta)
}
return txs
}

func (mq *mainQueue) TakeAll() []*TxQueueMeta {
txMetas := mq.inner.getAll()
mq.inner.clear()
var txs []*TxQueueMeta
for _, txMeta := range txMetas {
txs = append(txs, &txMeta.TxQueueMeta)
}
return txs
}

func (mq *mainQueue) OfferChecked(tx *TxQueueMeta, meta *protocol.CheckTxMetadata) error {
txMeta := newTransaction(*tx)
txMeta.setChecked(meta)

return mq.inner.add(txMeta)
}

func (mq *mainQueue) GetTxsToPublish() []*TxQueueMeta {
txMetas := mq.inner.getAll()
var txs []*TxQueueMeta
for _, txMeta := range txMetas {
txs = append(txs, &txMeta.TxQueueMeta)
}
return txs
}
22 changes: 19 additions & 3 deletions go/runtime/txpool/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,24 @@ var (
},
[]string{"runtime"},
)
pendingScheduleSize = prometheus.NewGaugeVec(
mainQueueSize = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "oasis_txpool_pending_schedule_size",
Help: "Size of the pending to be scheduled queue (number of entries).",
Help: "Size of the main schedulable queue (number of entries).",
},
[]string{"runtime"},
)
localQueueSize = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "oasis_txpool_local_queue_size",
Help: "Size of the local transactions schedulable queue (number of entries).",
},
[]string{"runtime"},
)
rimQueueSize = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "oasis_txpool_rim_queue_size",
Help: "Size of the roothash incoming message transactions schedulable queue (number of entries).",
},
[]string{"runtime"},
)
Expand All @@ -37,7 +51,9 @@ var (
)
txpoolCollectors = []prometheus.Collector{
pendingCheckSize,
pendingScheduleSize,
mainQueueSize,
localQueueSize,
rimQueueSize,
rejectedTransactions,
acceptedTransactions,
}
Expand Down
Loading