Skip to content

Commit

Permalink
Merge pull request #4681 from oasisprotocol/pro-wh/feature/txunion
Browse files Browse the repository at this point in the history
go/runtime/txpool: transactions in incoming messages
  • Loading branch information
pro-wh authored Aug 23, 2022
2 parents 20e73be + c072a38 commit 4c15373
Show file tree
Hide file tree
Showing 19 changed files with 774 additions and 297 deletions.
11 changes: 11 additions & 0 deletions .changelog/4681.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
go/runtime/txpool: Add roothash incoming messages' data as transactions

Roothash incoming messages can provide a piece of data for the runtime.
With this change, the data is now treated as a transaction.

Along with this change, we're splitting the txpool into multiple queues.
The transactions collected from roothash incoming messages go in a special
queue that does not undergo checking or broadcasting.

We also make another queue for a node's own transactions, so that a proposer
can prioritize its own transactions.
4 changes: 3 additions & 1 deletion docs/oasis-node/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,11 @@ oasis_storage_latency | Summary | Storage call latency (seconds). | call | [stor
oasis_storage_successes | Counter | Number of storage successes. | call | [storage/api](https://github.com/oasisprotocol/oasis-core/tree/master/go/storage/api/metrics.go)
oasis_storage_value_size | Summary | Storage call value size (bytes). | call | [storage/api](https://github.com/oasisprotocol/oasis-core/tree/master/go/storage/api/metrics.go)
oasis_txpool_accepted_transactions | Counter | Number of accepted transactions (passing check tx). | runtime | [runtime/txpool](https://github.com/oasisprotocol/oasis-core/tree/master/go/runtime/txpool/metrics.go)
oasis_txpool_local_queue_size | Gauge | Size of the local transactions schedulable queue (number of entries). | runtime | [runtime/txpool](https://github.com/oasisprotocol/oasis-core/tree/master/go/runtime/txpool/metrics.go)
oasis_txpool_pending_check_size | Gauge | Size of the pending to be checked queue (number of entries). | runtime | [runtime/txpool](https://github.com/oasisprotocol/oasis-core/tree/master/go/runtime/txpool/metrics.go)
oasis_txpool_pending_schedule_size | Gauge | Size of the pending to be scheduled queue (number of entries). | runtime | [runtime/txpool](https://github.com/oasisprotocol/oasis-core/tree/master/go/runtime/txpool/metrics.go)
oasis_txpool_pending_schedule_size | Gauge | Size of the main schedulable queue (number of entries). | runtime | [runtime/txpool](https://github.com/oasisprotocol/oasis-core/tree/master/go/runtime/txpool/metrics.go)
oasis_txpool_rejected_transactions | Counter | Number of rejected transactions (failing check tx). | runtime | [runtime/txpool](https://github.com/oasisprotocol/oasis-core/tree/master/go/runtime/txpool/metrics.go)
oasis_txpool_rim_queue_size | Gauge | Size of the roothash incoming message transactions schedulable queue (number of entries). | runtime | [runtime/txpool](https://github.com/oasisprotocol/oasis-core/tree/master/go/runtime/txpool/metrics.go)
oasis_up | Gauge | Is oasis-test-runner active for specific scenario. | | [oasis-node/cmd/common/metrics](https://github.com/oasisprotocol/oasis-core/tree/master/go/oasis-node/cmd/common/metrics/metrics.go)
oasis_worker_aborted_batch_count | Counter | Number of aborted batches. | runtime | [worker/compute/executor/committee](https://github.com/oasisprotocol/oasis-core/tree/master/go/worker/compute/executor/committee/node.go)
oasis_worker_batch_processing_time | Summary | Time it takes for a batch to finalize (seconds). | runtime | [worker/compute/executor/committee](https://github.com/oasisprotocol/oasis-core/tree/master/go/worker/compute/executor/committee/node.go)
Expand Down
2 changes: 1 addition & 1 deletion go/roothash/api/message/incoming_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type IncomingMessage struct {
// transferred before the message is processed by the runtime.
Tokens quantity.Quantity `json:"tokens,omitempty"`

// Data is arbitrary runtime-dependent data.
// Data is a runtime transaction.
Data []byte `json:"data,omitempty"`
}

Expand Down
2 changes: 1 addition & 1 deletion go/runtime/registry/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ func (h *runtimeHostHandler) Handle(ctx context.Context, body *protocol.Body) (*
return nil, err
}

batch := txPool.GetPrioritizedBatch(rq.Offset, rq.Limit)
batch := txPool.GetSchedulingExtra(rq.Offset, rq.Limit)
raw := make([][]byte, 0, len(batch))
for _, tx := range batch {
raw = append(raw, tx.Raw())
Expand Down
11 changes: 8 additions & 3 deletions go/runtime/txpool/check_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,16 @@ import (
"testing"

"github.com/stretchr/testify/require"

"github.com/oasisprotocol/oasis-core/go/common/crypto/hash"
)

func newPendingTx(tx []byte) *PendingCheckTransaction {
return &PendingCheckTransaction{
Transaction: newTransaction(tx, txStatusPendingCheck),
TxQueueMeta: &TxQueueMeta{
raw: tx,
hash: hash.NewFromBytes(tx),
},
}
}

Expand All @@ -34,9 +39,9 @@ func TestCheckTxQueueBasic(t *testing.T) {
require.EqualValues(t, 10, len(batch), "Batch size")
require.EqualValues(t, 41, queue.size(), "Size")

require.EqualValues(t, batch[0].tx, []byte("hello world"))
require.EqualValues(t, batch[0].Raw(), []byte("hello world"))
for i := 0; i < 9; i++ {
require.EqualValues(t, batch[i+1].tx, []byte(fmt.Sprintf("call %d", i)))
require.EqualValues(t, batch[i+1].Raw(), []byte(fmt.Sprintf("call %d", i)))
}

queue.clear()
Expand Down
99 changes: 99 additions & 0 deletions go/runtime/txpool/local_queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package txpool

import (
"sync"

"github.com/oasisprotocol/oasis-core/go/common/crypto/hash"
"github.com/oasisprotocol/oasis-core/go/runtime/host/protocol"
)

var (
_ UsableTransactionSource = (*localQueue)(nil)
_ RecheckableTransactionStore = (*localQueue)(nil)
_ RepublishableTransactionSource = (*localQueue)(nil)
)

// localQueue is a "front of the line" area for txs from our own node. We also keep these txs in order.
type localQueue struct {
l sync.Mutex
txs []*TxQueueMeta
indexesByHash map[hash.Hash]int
}

func newLocalQueue() *localQueue {
return &localQueue{
indexesByHash: map[hash.Hash]int{},
}
}

func (lq *localQueue) GetSchedulingSuggestion(countHint uint32) []*TxQueueMeta {
lq.l.Lock()
defer lq.l.Unlock()
return append([]*TxQueueMeta(nil), lq.txs...)
}

func (lq *localQueue) GetTxByHash(h hash.Hash) *TxQueueMeta {
lq.l.Lock()
defer lq.l.Unlock()
i, ok := lq.indexesByHash[h]
if !ok {
return nil
}
return lq.txs[i]
}

func (lq *localQueue) HandleTxsUsed(hashes []hash.Hash) {
lq.l.Lock()
defer lq.l.Unlock()
origCount := len(lq.txs)
keptCount := origCount
for _, h := range hashes {
if i, ok := lq.indexesByHash[h]; ok {
delete(lq.indexesByHash, h)
lq.txs[i] = nil
keptCount--
}
}
if keptCount == origCount {
return
}
keptTxs := make([]*TxQueueMeta, 0, keptCount)
for _, tx := range lq.txs {
if tx == nil {
continue
}
i := len(keptTxs)
keptTxs = append(keptTxs, tx)
lq.indexesByHash[tx.Hash()] = i
}
lq.txs = keptTxs
}

func (lq *localQueue) TakeAll() []*TxQueueMeta {
lq.l.Lock()
defer lq.l.Unlock()
txs := lq.txs
lq.txs = nil
lq.indexesByHash = make(map[hash.Hash]int)
return txs
}

func (lq *localQueue) OfferChecked(tx *TxQueueMeta, _ *protocol.CheckTxMetadata) error {
lq.l.Lock()
defer lq.l.Unlock()
lq.indexesByHash[tx.Hash()] = len(lq.txs)
lq.txs = append(lq.txs, tx)
return nil
}

func (lq *localQueue) GetTxsToPublish() []*TxQueueMeta {
lq.l.Lock()
defer lq.l.Unlock()
return append([]*TxQueueMeta(nil), lq.txs...)
}

func (lq *localQueue) size() int {
lq.l.Lock()
defer lq.l.Unlock()
return len(lq.txs)
}
44 changes: 44 additions & 0 deletions go/runtime/txpool/local_queue_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package txpool

import (
"testing"

"github.com/stretchr/testify/require"

"github.com/oasisprotocol/oasis-core/go/common/crypto/hash"
"github.com/oasisprotocol/oasis-core/go/runtime/host/protocol"
)

func TestLocalQueueBasic(t *testing.T) {
lq := newLocalQueue()

require.Len(t, lq.GetSchedulingSuggestion(50), 0, "get scheduling suggestion")

// Add two transactions, with a higher priority one coming later.
rawA := []byte("a")
txA := &TxQueueMeta{raw: rawA, hash: hash.NewFromBytes(rawA)}
require.NoError(t, lq.OfferChecked(txA, &protocol.CheckTxMetadata{Priority: 1}), "offer checked a")
rawB := []byte("b")
txB := &TxQueueMeta{raw: rawB, hash: hash.NewFromBytes(rawB)}
require.NoError(t, lq.OfferChecked(txB, &protocol.CheckTxMetadata{Priority: 5}), "offer checked a")
require.Equal(t, 2, lq.size())

// We should preserve the order. Publish in original order.
require.EqualValues(t, []*TxQueueMeta{txA, txB}, lq.GetTxsToPublish(), "get txs to publish")
// Schedule in original order.
require.EqualValues(t, []*TxQueueMeta{txA, txB}, lq.GetSchedulingSuggestion(50), "get scheduling suggestion")

tx := lq.GetTxByHash(txA.Hash())
require.EqualValues(t, txA, tx, "get tx by hash a")
hashC := hash.NewFromBytes([]byte("c"))
tx = lq.GetTxByHash(hashC)
require.Nil(t, tx, "get tx by hash c")

lq.HandleTxsUsed([]hash.Hash{hashC})
require.EqualValues(t, map[hash.Hash]int{txA.Hash(): 0, txB.Hash(): 1}, lq.indexesByHash, "after handle txs used absent")
lq.HandleTxsUsed([]hash.Hash{txA.Hash()})
require.EqualValues(t, map[hash.Hash]int{txB.Hash(): 0}, lq.indexesByHash, "after handle txs used")

require.EqualValues(t, []*TxQueueMeta{txB}, lq.TakeAll(), "take all")
require.Len(t, lq.GetSchedulingSuggestion(50), 0, "after take all")
}
137 changes: 137 additions & 0 deletions go/runtime/txpool/main_queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
package txpool

import (
"fmt"

"github.com/oasisprotocol/oasis-core/go/common/crypto/hash"
"github.com/oasisprotocol/oasis-core/go/runtime/host/protocol"
)

var (
_ UsableTransactionSource = (*mainQueue)(nil)
_ RecheckableTransactionStore = (*mainQueue)(nil)
_ RepublishableTransactionSource = (*mainQueue)(nil)
)

// MainQueueTransaction is a transaction and its metadata in the main queue.
type MainQueueTransaction struct {
TxQueueMeta

// priority defines the transaction's priority as specified by the runtime.
priority uint64

// sender is a unique transaction sender identifier as specified by the runtime.
sender string
// senderSeq is a per-sender sequence number as specified by the runtime.
senderSeq uint64
}

func newTransaction(tx TxQueueMeta) *MainQueueTransaction {
return &MainQueueTransaction{
TxQueueMeta: tx,
}
}

// String returns a string representation of a transaction.
func (tx *MainQueueTransaction) String() string {
return fmt.Sprintf("MainQueueTransaction{hash: %s, first_seen: %s, priority: %d}", tx.Hash(), tx.FirstSeen(), tx.priority)
}

// Priority returns the transaction priority.
func (tx *MainQueueTransaction) Priority() uint64 {
return tx.priority
}

// Sender returns the transaction sender.
func (tx *MainQueueTransaction) Sender() string {
return tx.sender
}

// SenderSeq returns the per-sender sequence number.
func (tx *MainQueueTransaction) SenderSeq() uint64 {
return tx.senderSeq
}

// setChecked populates transaction data retrieved from checks.
func (tx *MainQueueTransaction) setChecked(meta *protocol.CheckTxMetadata) {
if meta != nil {
tx.priority = meta.Priority
tx.sender = string(meta.Sender)
tx.senderSeq = meta.SenderSeq
}

// If the sender is empty (e.g. because the runtime does not support specifying a sender), we
// treat each transaction as having a unique sender. This is to allow backwards compatibility.
if len(tx.sender) == 0 {
h := tx.Hash()
tx.sender = string(h[:])
}
}

// mainQueue is a priority queue for transactions that we give no special treatment.
type mainQueue struct {
// This implementation adapts the existing scheduleQueue code.
inner *scheduleQueue
}

func newMainQueue(capacity int) *mainQueue {
return &mainQueue{
inner: newScheduleQueue(capacity),
}
}

func (mq *mainQueue) GetSchedulingSuggestion(countHint uint32) []*TxQueueMeta {
txMetas := mq.inner.getPrioritizedBatch(nil, countHint)
var txs []*TxQueueMeta
for _, txMeta := range txMetas {
txs = append(txs, &txMeta.TxQueueMeta)
}
return txs
}

func (mq *mainQueue) GetTxByHash(h hash.Hash) *TxQueueMeta {
txMetas, _ := mq.inner.getKnownBatch([]hash.Hash{h})
if txMetas[0] == nil {
return nil
}
return &txMetas[0].TxQueueMeta
}

func (mq *mainQueue) HandleTxsUsed(hashes []hash.Hash) {
mq.inner.remove(hashes)
}

func (mq *mainQueue) GetSchedulingExtra(offset *hash.Hash, limit uint32) []*TxQueueMeta {
txMetas := mq.inner.getPrioritizedBatch(offset, limit)
var txs []*TxQueueMeta
for _, txMeta := range txMetas {
txs = append(txs, &txMeta.TxQueueMeta)
}
return txs
}

func (mq *mainQueue) TakeAll() []*TxQueueMeta {
txMetas := mq.inner.getAll()
mq.inner.clear()
var txs []*TxQueueMeta
for _, txMeta := range txMetas {
txs = append(txs, &txMeta.TxQueueMeta)
}
return txs
}

func (mq *mainQueue) OfferChecked(tx *TxQueueMeta, meta *protocol.CheckTxMetadata) error {
txMeta := newTransaction(*tx)
txMeta.setChecked(meta)

return mq.inner.add(txMeta)
}

func (mq *mainQueue) GetTxsToPublish() []*TxQueueMeta {
txMetas := mq.inner.getAll()
var txs []*TxQueueMeta
for _, txMeta := range txMetas {
txs = append(txs, &txMeta.TxQueueMeta)
}
return txs
}
22 changes: 19 additions & 3 deletions go/runtime/txpool/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,24 @@ var (
},
[]string{"runtime"},
)
pendingScheduleSize = prometheus.NewGaugeVec(
mainQueueSize = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "oasis_txpool_pending_schedule_size",
Help: "Size of the pending to be scheduled queue (number of entries).",
Help: "Size of the main schedulable queue (number of entries).",
},
[]string{"runtime"},
)
localQueueSize = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "oasis_txpool_local_queue_size",
Help: "Size of the local transactions schedulable queue (number of entries).",
},
[]string{"runtime"},
)
rimQueueSize = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "oasis_txpool_rim_queue_size",
Help: "Size of the roothash incoming message transactions schedulable queue (number of entries).",
},
[]string{"runtime"},
)
Expand All @@ -37,7 +51,9 @@ var (
)
txpoolCollectors = []prometheus.Collector{
pendingCheckSize,
pendingScheduleSize,
mainQueueSize,
localQueueSize,
rimQueueSize,
rejectedTransactions,
acceptedTransactions,
}
Expand Down
Loading

0 comments on commit 4c15373

Please sign in to comment.