diff --git a/.changelog/4681.feature.md b/.changelog/4681.feature.md new file mode 100644 index 00000000000..ddf1ce309b3 --- /dev/null +++ b/.changelog/4681.feature.md @@ -0,0 +1,11 @@ +go/runtime/txpool: Add roothash incoming messages' data as transactions + +Roothash incoming messages can provide a piece of data for the runtime. +With this change, the data is now treated as a transaction. + +Along with this change, we're splitting the txpool into multiple queues. +The transactions collected from roothash incoming messages go in a special +queue that does not undergo checking or broadcasting. + +We also make another queue for a node's own transactions, so that a proposer +can prioritize its own transactions. diff --git a/docs/oasis-node/metrics.md b/docs/oasis-node/metrics.md index 74242915ca7..1edc7cd74a5 100644 --- a/docs/oasis-node/metrics.md +++ b/docs/oasis-node/metrics.md @@ -78,9 +78,11 @@ oasis_storage_latency | Summary | Storage call latency (seconds). | call | [stor oasis_storage_successes | Counter | Number of storage successes. | call | [storage/api](https://github.com/oasisprotocol/oasis-core/tree/master/go/storage/api/metrics.go) oasis_storage_value_size | Summary | Storage call value size (bytes). | call | [storage/api](https://github.com/oasisprotocol/oasis-core/tree/master/go/storage/api/metrics.go) oasis_txpool_accepted_transactions | Counter | Number of accepted transactions (passing check tx). | runtime | [runtime/txpool](https://github.com/oasisprotocol/oasis-core/tree/master/go/runtime/txpool/metrics.go) +oasis_txpool_local_queue_size | Gauge | Size of the local transactions schedulable queue (number of entries). | runtime | [runtime/txpool](https://github.com/oasisprotocol/oasis-core/tree/master/go/runtime/txpool/metrics.go) oasis_txpool_pending_check_size | Gauge | Size of the pending to be checked queue (number of entries). | runtime | [runtime/txpool](https://github.com/oasisprotocol/oasis-core/tree/master/go/runtime/txpool/metrics.go) -oasis_txpool_pending_schedule_size | Gauge | Size of the pending to be scheduled queue (number of entries). | runtime | [runtime/txpool](https://github.com/oasisprotocol/oasis-core/tree/master/go/runtime/txpool/metrics.go) +oasis_txpool_pending_schedule_size | Gauge | Size of the main schedulable queue (number of entries). | runtime | [runtime/txpool](https://github.com/oasisprotocol/oasis-core/tree/master/go/runtime/txpool/metrics.go) oasis_txpool_rejected_transactions | Counter | Number of rejected transactions (failing check tx). | runtime | [runtime/txpool](https://github.com/oasisprotocol/oasis-core/tree/master/go/runtime/txpool/metrics.go) +oasis_txpool_rim_queue_size | Gauge | Size of the roothash incoming message transactions schedulable queue (number of entries). | runtime | [runtime/txpool](https://github.com/oasisprotocol/oasis-core/tree/master/go/runtime/txpool/metrics.go) oasis_up | Gauge | Is oasis-test-runner active for specific scenario. | | [oasis-node/cmd/common/metrics](https://github.com/oasisprotocol/oasis-core/tree/master/go/oasis-node/cmd/common/metrics/metrics.go) oasis_worker_aborted_batch_count | Counter | Number of aborted batches. | runtime | [worker/compute/executor/committee](https://github.com/oasisprotocol/oasis-core/tree/master/go/worker/compute/executor/committee/node.go) oasis_worker_batch_processing_time | Summary | Time it takes for a batch to finalize (seconds). | runtime | [worker/compute/executor/committee](https://github.com/oasisprotocol/oasis-core/tree/master/go/worker/compute/executor/committee/node.go) diff --git a/go/roothash/api/message/incoming_message.go b/go/roothash/api/message/incoming_message.go index e9ac7d38992..d42556cf60f 100644 --- a/go/roothash/api/message/incoming_message.go +++ b/go/roothash/api/message/incoming_message.go @@ -26,7 +26,7 @@ type IncomingMessage struct { // transferred before the message is processed by the runtime. Tokens quantity.Quantity `json:"tokens,omitempty"` - // Data is arbitrary runtime-dependent data. + // Data is a runtime transaction. Data []byte `json:"data,omitempty"` } diff --git a/go/runtime/registry/host.go b/go/runtime/registry/host.go index f5780b0996f..e6c12e137a9 100644 --- a/go/runtime/registry/host.go +++ b/go/runtime/registry/host.go @@ -249,7 +249,7 @@ func (h *runtimeHostHandler) Handle(ctx context.Context, body *protocol.Body) (* return nil, err } - batch := txPool.GetPrioritizedBatch(rq.Offset, rq.Limit) + batch := txPool.GetSchedulingExtra(rq.Offset, rq.Limit) raw := make([][]byte, 0, len(batch)) for _, tx := range batch { raw = append(raw, tx.Raw()) diff --git a/go/runtime/txpool/check_queue_test.go b/go/runtime/txpool/check_queue_test.go index a453791beac..2d296ab31c0 100644 --- a/go/runtime/txpool/check_queue_test.go +++ b/go/runtime/txpool/check_queue_test.go @@ -5,11 +5,16 @@ import ( "testing" "github.com/stretchr/testify/require" + + "github.com/oasisprotocol/oasis-core/go/common/crypto/hash" ) func newPendingTx(tx []byte) *PendingCheckTransaction { return &PendingCheckTransaction{ - Transaction: newTransaction(tx, txStatusPendingCheck), + TxQueueMeta: &TxQueueMeta{ + raw: tx, + hash: hash.NewFromBytes(tx), + }, } } @@ -34,9 +39,9 @@ func TestCheckTxQueueBasic(t *testing.T) { require.EqualValues(t, 10, len(batch), "Batch size") require.EqualValues(t, 41, queue.size(), "Size") - require.EqualValues(t, batch[0].tx, []byte("hello world")) + require.EqualValues(t, batch[0].Raw(), []byte("hello world")) for i := 0; i < 9; i++ { - require.EqualValues(t, batch[i+1].tx, []byte(fmt.Sprintf("call %d", i))) + require.EqualValues(t, batch[i+1].Raw(), []byte(fmt.Sprintf("call %d", i))) } queue.clear() diff --git a/go/runtime/txpool/local_queue.go b/go/runtime/txpool/local_queue.go new file mode 100644 index 00000000000..47c71f6feda --- /dev/null +++ b/go/runtime/txpool/local_queue.go @@ -0,0 +1,99 @@ +package txpool + +import ( + "sync" + + "github.com/oasisprotocol/oasis-core/go/common/crypto/hash" + "github.com/oasisprotocol/oasis-core/go/runtime/host/protocol" +) + +var ( + _ UsableTransactionSource = (*localQueue)(nil) + _ RecheckableTransactionStore = (*localQueue)(nil) + _ RepublishableTransactionSource = (*localQueue)(nil) +) + +// localQueue is a "front of the line" area for txs from our own node. We also keep these txs in order. +type localQueue struct { + l sync.Mutex + txs []*TxQueueMeta + indexesByHash map[hash.Hash]int +} + +func newLocalQueue() *localQueue { + return &localQueue{ + indexesByHash: map[hash.Hash]int{}, + } +} + +func (lq *localQueue) GetSchedulingSuggestion(countHint uint32) []*TxQueueMeta { + lq.l.Lock() + defer lq.l.Unlock() + return append([]*TxQueueMeta(nil), lq.txs...) +} + +func (lq *localQueue) GetTxByHash(h hash.Hash) *TxQueueMeta { + lq.l.Lock() + defer lq.l.Unlock() + i, ok := lq.indexesByHash[h] + if !ok { + return nil + } + return lq.txs[i] +} + +func (lq *localQueue) HandleTxsUsed(hashes []hash.Hash) { + lq.l.Lock() + defer lq.l.Unlock() + origCount := len(lq.txs) + keptCount := origCount + for _, h := range hashes { + if i, ok := lq.indexesByHash[h]; ok { + delete(lq.indexesByHash, h) + lq.txs[i] = nil + keptCount-- + } + } + if keptCount == origCount { + return + } + keptTxs := make([]*TxQueueMeta, 0, keptCount) + for _, tx := range lq.txs { + if tx == nil { + continue + } + i := len(keptTxs) + keptTxs = append(keptTxs, tx) + lq.indexesByHash[tx.Hash()] = i + } + lq.txs = keptTxs +} + +func (lq *localQueue) TakeAll() []*TxQueueMeta { + lq.l.Lock() + defer lq.l.Unlock() + txs := lq.txs + lq.txs = nil + lq.indexesByHash = make(map[hash.Hash]int) + return txs +} + +func (lq *localQueue) OfferChecked(tx *TxQueueMeta, _ *protocol.CheckTxMetadata) error { + lq.l.Lock() + defer lq.l.Unlock() + lq.indexesByHash[tx.Hash()] = len(lq.txs) + lq.txs = append(lq.txs, tx) + return nil +} + +func (lq *localQueue) GetTxsToPublish() []*TxQueueMeta { + lq.l.Lock() + defer lq.l.Unlock() + return append([]*TxQueueMeta(nil), lq.txs...) +} + +func (lq *localQueue) size() int { + lq.l.Lock() + defer lq.l.Unlock() + return len(lq.txs) +} diff --git a/go/runtime/txpool/local_queue_test.go b/go/runtime/txpool/local_queue_test.go new file mode 100644 index 00000000000..1810b05d9bb --- /dev/null +++ b/go/runtime/txpool/local_queue_test.go @@ -0,0 +1,44 @@ +package txpool + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/oasisprotocol/oasis-core/go/common/crypto/hash" + "github.com/oasisprotocol/oasis-core/go/runtime/host/protocol" +) + +func TestLocalQueueBasic(t *testing.T) { + lq := newLocalQueue() + + require.Len(t, lq.GetSchedulingSuggestion(50), 0, "get scheduling suggestion") + + // Add two transactions, with a higher priority one coming later. + rawA := []byte("a") + txA := &TxQueueMeta{raw: rawA, hash: hash.NewFromBytes(rawA)} + require.NoError(t, lq.OfferChecked(txA, &protocol.CheckTxMetadata{Priority: 1}), "offer checked a") + rawB := []byte("b") + txB := &TxQueueMeta{raw: rawB, hash: hash.NewFromBytes(rawB)} + require.NoError(t, lq.OfferChecked(txB, &protocol.CheckTxMetadata{Priority: 5}), "offer checked a") + require.Equal(t, 2, lq.size()) + + // We should preserve the order. Publish in original order. + require.EqualValues(t, []*TxQueueMeta{txA, txB}, lq.GetTxsToPublish(), "get txs to publish") + // Schedule in original order. + require.EqualValues(t, []*TxQueueMeta{txA, txB}, lq.GetSchedulingSuggestion(50), "get scheduling suggestion") + + tx := lq.GetTxByHash(txA.Hash()) + require.EqualValues(t, txA, tx, "get tx by hash a") + hashC := hash.NewFromBytes([]byte("c")) + tx = lq.GetTxByHash(hashC) + require.Nil(t, tx, "get tx by hash c") + + lq.HandleTxsUsed([]hash.Hash{hashC}) + require.EqualValues(t, map[hash.Hash]int{txA.Hash(): 0, txB.Hash(): 1}, lq.indexesByHash, "after handle txs used absent") + lq.HandleTxsUsed([]hash.Hash{txA.Hash()}) + require.EqualValues(t, map[hash.Hash]int{txB.Hash(): 0}, lq.indexesByHash, "after handle txs used") + + require.EqualValues(t, []*TxQueueMeta{txB}, lq.TakeAll(), "take all") + require.Len(t, lq.GetSchedulingSuggestion(50), 0, "after take all") +} diff --git a/go/runtime/txpool/main_queue.go b/go/runtime/txpool/main_queue.go new file mode 100644 index 00000000000..2d14550a3ca --- /dev/null +++ b/go/runtime/txpool/main_queue.go @@ -0,0 +1,137 @@ +package txpool + +import ( + "fmt" + + "github.com/oasisprotocol/oasis-core/go/common/crypto/hash" + "github.com/oasisprotocol/oasis-core/go/runtime/host/protocol" +) + +var ( + _ UsableTransactionSource = (*mainQueue)(nil) + _ RecheckableTransactionStore = (*mainQueue)(nil) + _ RepublishableTransactionSource = (*mainQueue)(nil) +) + +// MainQueueTransaction is a transaction and its metadata in the main queue. +type MainQueueTransaction struct { + TxQueueMeta + + // priority defines the transaction's priority as specified by the runtime. + priority uint64 + + // sender is a unique transaction sender identifier as specified by the runtime. + sender string + // senderSeq is a per-sender sequence number as specified by the runtime. + senderSeq uint64 +} + +func newTransaction(tx TxQueueMeta) *MainQueueTransaction { + return &MainQueueTransaction{ + TxQueueMeta: tx, + } +} + +// String returns a string representation of a transaction. +func (tx *MainQueueTransaction) String() string { + return fmt.Sprintf("MainQueueTransaction{hash: %s, first_seen: %s, priority: %d}", tx.Hash(), tx.FirstSeen(), tx.priority) +} + +// Priority returns the transaction priority. +func (tx *MainQueueTransaction) Priority() uint64 { + return tx.priority +} + +// Sender returns the transaction sender. +func (tx *MainQueueTransaction) Sender() string { + return tx.sender +} + +// SenderSeq returns the per-sender sequence number. +func (tx *MainQueueTransaction) SenderSeq() uint64 { + return tx.senderSeq +} + +// setChecked populates transaction data retrieved from checks. +func (tx *MainQueueTransaction) setChecked(meta *protocol.CheckTxMetadata) { + if meta != nil { + tx.priority = meta.Priority + tx.sender = string(meta.Sender) + tx.senderSeq = meta.SenderSeq + } + + // If the sender is empty (e.g. because the runtime does not support specifying a sender), we + // treat each transaction as having a unique sender. This is to allow backwards compatibility. + if len(tx.sender) == 0 { + h := tx.Hash() + tx.sender = string(h[:]) + } +} + +// mainQueue is a priority queue for transactions that we give no special treatment. +type mainQueue struct { + // This implementation adapts the existing scheduleQueue code. + inner *scheduleQueue +} + +func newMainQueue(capacity int) *mainQueue { + return &mainQueue{ + inner: newScheduleQueue(capacity), + } +} + +func (mq *mainQueue) GetSchedulingSuggestion(countHint uint32) []*TxQueueMeta { + txMetas := mq.inner.getPrioritizedBatch(nil, countHint) + var txs []*TxQueueMeta + for _, txMeta := range txMetas { + txs = append(txs, &txMeta.TxQueueMeta) + } + return txs +} + +func (mq *mainQueue) GetTxByHash(h hash.Hash) *TxQueueMeta { + txMetas, _ := mq.inner.getKnownBatch([]hash.Hash{h}) + if txMetas[0] == nil { + return nil + } + return &txMetas[0].TxQueueMeta +} + +func (mq *mainQueue) HandleTxsUsed(hashes []hash.Hash) { + mq.inner.remove(hashes) +} + +func (mq *mainQueue) GetSchedulingExtra(offset *hash.Hash, limit uint32) []*TxQueueMeta { + txMetas := mq.inner.getPrioritizedBatch(offset, limit) + var txs []*TxQueueMeta + for _, txMeta := range txMetas { + txs = append(txs, &txMeta.TxQueueMeta) + } + return txs +} + +func (mq *mainQueue) TakeAll() []*TxQueueMeta { + txMetas := mq.inner.getAll() + mq.inner.clear() + var txs []*TxQueueMeta + for _, txMeta := range txMetas { + txs = append(txs, &txMeta.TxQueueMeta) + } + return txs +} + +func (mq *mainQueue) OfferChecked(tx *TxQueueMeta, meta *protocol.CheckTxMetadata) error { + txMeta := newTransaction(*tx) + txMeta.setChecked(meta) + + return mq.inner.add(txMeta) +} + +func (mq *mainQueue) GetTxsToPublish() []*TxQueueMeta { + txMetas := mq.inner.getAll() + var txs []*TxQueueMeta + for _, txMeta := range txMetas { + txs = append(txs, &txMeta.TxQueueMeta) + } + return txs +} diff --git a/go/runtime/txpool/metrics.go b/go/runtime/txpool/metrics.go index e9e526f81a7..b267909b457 100644 --- a/go/runtime/txpool/metrics.go +++ b/go/runtime/txpool/metrics.go @@ -14,10 +14,24 @@ var ( }, []string{"runtime"}, ) - pendingScheduleSize = prometheus.NewGaugeVec( + mainQueueSize = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Name: "oasis_txpool_pending_schedule_size", - Help: "Size of the pending to be scheduled queue (number of entries).", + Help: "Size of the main schedulable queue (number of entries).", + }, + []string{"runtime"}, + ) + localQueueSize = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "oasis_txpool_local_queue_size", + Help: "Size of the local transactions schedulable queue (number of entries).", + }, + []string{"runtime"}, + ) + rimQueueSize = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "oasis_txpool_rim_queue_size", + Help: "Size of the roothash incoming message transactions schedulable queue (number of entries).", }, []string{"runtime"}, ) @@ -37,7 +51,9 @@ var ( ) txpoolCollectors = []prometheus.Collector{ pendingCheckSize, - pendingScheduleSize, + mainQueueSize, + localQueueSize, + rimQueueSize, rejectedTransactions, acceptedTransactions, } diff --git a/go/runtime/txpool/queues.go b/go/runtime/txpool/queues.go new file mode 100644 index 00000000000..2b9fb44208b --- /dev/null +++ b/go/runtime/txpool/queues.go @@ -0,0 +1,67 @@ +package txpool + +import ( + "time" + + "github.com/oasisprotocol/oasis-core/go/common/crypto/hash" + "github.com/oasisprotocol/oasis-core/go/runtime/host/protocol" +) + +// TxQueueMeta stores some queuing-related metadata alongside a raw transaction. +type TxQueueMeta struct { + raw []byte + hash hash.Hash + // firstSeen is the timestamp when the transaction was first seen. + // We populate this in `submitTx`. Other forms of ingress (namely loading from roothash incoming messages and + // receiving from txSync) leave this in its default value. Transactions from those sources, however, only move + // through a limited area in the tx pool. + firstSeen time.Time +} + +// Raw returns the raw transaction data. +func (t *TxQueueMeta) Raw() []byte { + return t.raw +} + +// Size returns the size (in bytes) of the raw transaction data. +func (t *TxQueueMeta) Size() int { + return len(t.Raw()) +} + +// Hash returns the hash of the transaction binary data. +func (t *TxQueueMeta) Hash() hash.Hash { + return t.hash +} + +// FirstSeen returns the time the transaction was first seen. +func (t *TxQueueMeta) FirstSeen() time.Time { + return t.firstSeen +} + +// UsableTransactionSource is a place to retrieve txs that are "good enough." "Good enough" variously means CheckTx'd, +// came from roothash incoming message, or came from our own node. +type UsableTransactionSource interface { + // GetSchedulingSuggestion returns some number of txs to give to the scheduler as part of the initial + // batch. + GetSchedulingSuggestion(countHint uint32) []*TxQueueMeta + // GetTxByHash returns the specific tx, if it is in this queue. The bool is like `value, ok := txMap[key]`. Used + // for resolving a batch from hashes and serving txSync. + GetTxByHash(h hash.Hash) *TxQueueMeta + // HandleTxsUsed is a callback to indicate that the scheduler is done with a set of txs, by hash. For most + // implementations, remove it from internal storage. + HandleTxsUsed(hashes []hash.Hash) +} + +// RecheckableTransactionStore provides methods for rechecking. +type RecheckableTransactionStore interface { + // TakeAll removes all txs and returns them. + TakeAll() []*TxQueueMeta + // OfferChecked adds a tx that is checked. + OfferChecked(tx *TxQueueMeta, meta *protocol.CheckTxMetadata) error +} + +// RepublishableTransactionSource is a place to get txs that we want to push. +type RepublishableTransactionSource interface { + // GetTxsToPublish gets txs that this queue wants to publish. + GetTxsToPublish() []*TxQueueMeta +} diff --git a/go/runtime/txpool/rim_queue.go b/go/runtime/txpool/rim_queue.go new file mode 100644 index 00000000000..1347b1a5382 --- /dev/null +++ b/go/runtime/txpool/rim_queue.go @@ -0,0 +1,58 @@ +package txpool + +import ( + "sync" + + "github.com/oasisprotocol/oasis-core/go/common/crypto/hash" + "github.com/oasisprotocol/oasis-core/go/roothash/api/message" +) + +var _ UsableTransactionSource = (*rimQueue)(nil) + +// rimQueue exposes transactions form roothash incoming messages. +type rimQueue struct { + l sync.RWMutex + txs map[hash.Hash]*TxQueueMeta +} + +func newRimQueue() *rimQueue { + return &rimQueue{ + txs: map[hash.Hash]*TxQueueMeta{}, + } +} + +func (rq *rimQueue) GetSchedulingSuggestion(countHint uint32) []*TxQueueMeta { + // Runtimes instead get transactions from the incoming messages. + return nil +} + +func (rq *rimQueue) GetTxByHash(h hash.Hash) *TxQueueMeta { + rq.l.RLock() + defer rq.l.RUnlock() + return rq.txs[h] +} + +func (rq *rimQueue) HandleTxsUsed(hashes []hash.Hash) { + // The roothash module manages the incoming message queue on its own, so we don't do anything here. +} + +// Load loads transactions from roothash incoming messages. +func (rq *rimQueue) Load(inMsgs []*message.IncomingMessage) { + newTxs := map[hash.Hash]*TxQueueMeta{} + for _, msg := range inMsgs { + h := hash.NewFromBytes(msg.Data) + newTxs[h] = &TxQueueMeta{ + raw: msg.Data, + hash: h, + } + } + rq.l.Lock() + defer rq.l.Unlock() + rq.txs = newTxs +} + +func (rq *rimQueue) size() int { + rq.l.Lock() + defer rq.l.Unlock() + return len(rq.txs) +} diff --git a/go/runtime/txpool/rim_queue_test.go b/go/runtime/txpool/rim_queue_test.go new file mode 100644 index 00000000000..774677a2bae --- /dev/null +++ b/go/runtime/txpool/rim_queue_test.go @@ -0,0 +1,34 @@ +package txpool + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/oasisprotocol/oasis-core/go/common/crypto/hash" + "github.com/oasisprotocol/oasis-core/go/roothash/api/message" +) + +func TestRimQueue(t *testing.T) { + rq := newRimQueue() + + rawA := []byte("a") + txA := &TxQueueMeta{raw: rawA, hash: hash.NewFromBytes(rawA)} + rq.Load([]*message.IncomingMessage{ + { + ID: 1, + Data: rawA, + }, + }) + require.EqualValues(t, map[hash.Hash]*TxQueueMeta{txA.Hash(): txA}, rq.txs, "after load") + require.Equal(t, 1, rq.size()) + + require.Nil(t, rq.GetSchedulingSuggestion(50), "get scheduling suggestion") + rq.HandleTxsUsed([]hash.Hash{txA.Hash()}) + + tx := rq.GetTxByHash(txA.Hash()) + require.EqualValues(t, txA, tx, "get tx by hash a") + hashC := hash.NewFromBytes([]byte("c")) + tx = rq.GetTxByHash(hashC) + require.Nil(t, tx, "get tx by hash c") +} diff --git a/go/runtime/txpool/schedule_queue.go b/go/runtime/txpool/schedule_queue.go index b8c04184e47..aeecaf2c19f 100644 --- a/go/runtime/txpool/schedule_queue.go +++ b/go/runtime/txpool/schedule_queue.go @@ -16,7 +16,7 @@ var ( // priorityWrappedTx is a wrapped transaction for insertion into the priority B-Tree. type priorityWrappedTx struct { - *Transaction + *MainQueueTransaction } func (tx priorityWrappedTx) Less(other btree.Item) bool { @@ -28,20 +28,20 @@ func (tx priorityWrappedTx) Less(other btree.Item) bool { } // If transactions have same priority, sort by first seen time (earlier transactions are later // in the queue as we are iterating over the queue in descending order). - return tx.time.After(tx2.time) + return tx.FirstSeen().After(tx2.FirstSeen()) } type scheduleQueue struct { l sync.Mutex - all map[hash.Hash]*Transaction - bySender map[string]*Transaction + all map[hash.Hash]*MainQueueTransaction + bySender map[string]*MainQueueTransaction byPriority *btree.BTree capacity int } -func (sq *scheduleQueue) add(tx *Transaction) error { +func (sq *scheduleQueue) add(tx *MainQueueTransaction) error { sq.l.Lock() defer sq.l.Unlock() @@ -63,18 +63,18 @@ func (sq *scheduleQueue) add(tx *Transaction) error { if tx.priority <= etx.priority { return ErrQueueFull } - sq.removeLocked(etx.Transaction) + sq.removeLocked(etx.MainQueueTransaction) } - sq.all[tx.hash] = tx + sq.all[tx.Hash()] = tx sq.bySender[tx.sender] = tx sq.byPriority.ReplaceOrInsert(priorityWrappedTx{tx}) return nil } -func (sq *scheduleQueue) removeLocked(tx *Transaction) { - delete(sq.all, tx.hash) +func (sq *scheduleQueue) removeLocked(tx *MainQueueTransaction) { + delete(sq.all, tx.Hash()) delete(sq.bySender, tx.sender) sq.byPriority.Delete(priorityWrappedTx{tx}) } @@ -93,12 +93,12 @@ func (sq *scheduleQueue) remove(txHashes []hash.Hash) { } } -func (sq *scheduleQueue) getPrioritizedBatch(offset *hash.Hash, limit uint32) []*Transaction { +func (sq *scheduleQueue) getPrioritizedBatch(offset *hash.Hash, limit uint32) []*MainQueueTransaction { sq.l.Lock() defer sq.l.Unlock() var ( - batch []*Transaction + batch []*MainQueueTransaction offsetItem btree.Item ) if offset != nil { @@ -114,13 +114,14 @@ func (sq *scheduleQueue) getPrioritizedBatch(offset *hash.Hash, limit uint32) [] tx := i.(priorityWrappedTx) // Skip the offset item itself (if specified). - if tx.hash.Equal(offset) { + h := tx.Hash() + if h.Equal(offset) { return true } // Add the transaction to the batch. - batch = append(batch, tx.Transaction) - if uint32(len(batch)) >= limit { //nolint: gosimple + batch = append(batch, tx.MainQueueTransaction) + if uint32(len(batch)) >= limit { // nolint: gosimple return false } return true @@ -129,11 +130,11 @@ func (sq *scheduleQueue) getPrioritizedBatch(offset *hash.Hash, limit uint32) [] return batch } -func (sq *scheduleQueue) getKnownBatch(batch []hash.Hash) ([]*Transaction, map[hash.Hash]int) { +func (sq *scheduleQueue) getKnownBatch(batch []hash.Hash) ([]*MainQueueTransaction, map[hash.Hash]int) { sq.l.Lock() defer sq.l.Unlock() - result := make([]*Transaction, 0, len(batch)) + result := make([]*MainQueueTransaction, 0, len(batch)) missing := make(map[hash.Hash]int) for index, txHash := range batch { if tx, ok := sq.all[txHash]; ok { @@ -146,11 +147,11 @@ func (sq *scheduleQueue) getKnownBatch(batch []hash.Hash) ([]*Transaction, map[h return result, missing } -func (sq *scheduleQueue) getAll() []*Transaction { +func (sq *scheduleQueue) getAll() []*MainQueueTransaction { sq.l.Lock() defer sq.l.Unlock() - result := make([]*Transaction, 0, len(sq.all)) + result := make([]*MainQueueTransaction, 0, len(sq.all)) for _, tx := range sq.all { result = append(result, tx) } @@ -168,15 +169,15 @@ func (sq *scheduleQueue) clear() { sq.l.Lock() defer sq.l.Unlock() - sq.all = make(map[hash.Hash]*Transaction) - sq.bySender = make(map[string]*Transaction) + sq.all = make(map[hash.Hash]*MainQueueTransaction) + sq.bySender = make(map[string]*MainQueueTransaction) sq.byPriority.Clear(true) } func newScheduleQueue(capacity int) *scheduleQueue { return &scheduleQueue{ - all: make(map[hash.Hash]*Transaction), - bySender: make(map[string]*Transaction), + all: make(map[hash.Hash]*MainQueueTransaction), + bySender: make(map[string]*MainQueueTransaction), byPriority: btree.New(2), capacity: capacity, } diff --git a/go/runtime/txpool/schedule_queue_test.go b/go/runtime/txpool/schedule_queue_test.go index 3a735ad112a..584a1fa97f1 100644 --- a/go/runtime/txpool/schedule_queue_test.go +++ b/go/runtime/txpool/schedule_queue_test.go @@ -3,6 +3,7 @@ package txpool import ( "fmt" "testing" + "time" "github.com/stretchr/testify/require" @@ -10,8 +11,12 @@ import ( "github.com/oasisprotocol/oasis-core/go/runtime/host/protocol" ) -func newTestTransaction(data []byte, priority uint64) *Transaction { - tx := newTransaction(data, txStatusPendingCheck) +func newTestTransaction(data []byte, priority uint64) *MainQueueTransaction { + tx := newTransaction(TxQueueMeta{ + raw: data, + hash: hash.NewFromBytes(data), + firstSeen: time.Now(), + }) tx.setChecked(&protocol.CheckTxMetadata{ Priority: priority, }) @@ -66,7 +71,7 @@ func TestScheduleQueueRemoveTxBatch(t *testing.T) { queue := newScheduleQueue(51) queue.remove([]hash.Hash{}) - for _, tx := range []*Transaction{ + for _, tx := range []*MainQueueTransaction{ newTestTransaction([]byte("hello world"), 0), newTestTransaction([]byte("one"), 0), newTestTransaction([]byte("two"), 0), @@ -96,7 +101,7 @@ func TestScheduleQueuePriority(t *testing.T) { queue := newScheduleQueue(3) - txs := []*Transaction{ + txs := []*MainQueueTransaction{ newTestTransaction( []byte("hello world 10"), 10, @@ -117,7 +122,7 @@ func TestScheduleQueuePriority(t *testing.T) { batch := queue.getPrioritizedBatch(nil, 2) require.Len(batch, 2, "two transactions should be returned") require.EqualValues( - []*Transaction{ + []*MainQueueTransaction{ txs[2], // 20 txs[0], // 10 }, @@ -129,7 +134,7 @@ func TestScheduleQueuePriority(t *testing.T) { batch = queue.getPrioritizedBatch(&offsetTx, 2) require.Len(batch, 2, "two transactions should be returned") require.EqualValues( - []*Transaction{ + []*MainQueueTransaction{ txs[0], // 10 txs[1], // 5 }, @@ -152,7 +157,7 @@ func TestScheduleQueuePriority(t *testing.T) { batch = queue.getPrioritizedBatch(nil, 3) require.Len(batch, 3, "three transactions should be returned") require.EqualValues( - []*Transaction{ + []*MainQueueTransaction{ txs[2], // 20 txs[0], // 10 highTx, // 6 @@ -205,6 +210,6 @@ func TestScheduleQueueSender(t *testing.T) { require.NoError(err, "Add") require.Equal(1, queue.size()) - queue.remove([]hash.Hash{tx.hash}) + queue.remove([]hash.Hash{tx.Hash()}) require.Equal(0, queue.size()) } diff --git a/go/runtime/txpool/transaction.go b/go/runtime/txpool/transaction.go index 46fabcc3119..741fc43c964 100644 --- a/go/runtime/txpool/transaction.go +++ b/go/runtime/txpool/transaction.go @@ -1,137 +1,29 @@ package txpool import ( - "fmt" - "time" - - "github.com/oasisprotocol/oasis-core/go/common/crypto/hash" "github.com/oasisprotocol/oasis-core/go/runtime/host/protocol" ) -type txStatus uint8 - -const ( - txStatusPendingCheck = iota - txStatusChecked -) - -// Transaction is a transaction in the transaction pool. -type Transaction struct { - // tx represents the raw binary transaction data. - tx []byte - - // status is the transaction status. - status txStatus - // time is the timestamp when the transaction was first seen. - time time.Time - // hash is the cached transaction hash. - hash hash.Hash - - // priority defines the transaction's priority as specified by the runtime. - priority uint64 - - // sender is a unique transaction sender identifier as specified by the runtime. - sender string - // senderSeq is a per-sender sequence number as specified by the runtime. - senderSeq uint64 -} - -func newTransaction(tx []byte, status txStatus) *Transaction { - return &Transaction{ - tx: tx, - status: status, - time: time.Now(), - hash: hash.NewFromBytes(tx), - } -} - -// String returns a string representation of a transaction. -func (tx *Transaction) String() string { - return fmt.Sprintf("Transaction{hash: %s, time: %s, priority: %d}", tx.hash, tx.time, tx.priority) -} - -// Raw returns the raw transaction data. -func (tx *Transaction) Raw() []byte { - return tx.tx -} - -// Size returns the size (in bytes) of the raw transaction data. -func (tx *Transaction) Size() int { - return len(tx.tx) -} - -// Hash returns the hash of the transaction binary data. -func (tx *Transaction) Hash() hash.Hash { - return tx.hash -} - -// Time returns the time the transaction was first seen. -func (tx *Transaction) Time() time.Time { - return tx.time -} - -// Priority returns the transaction priority. -func (tx *Transaction) Priority() uint64 { - return tx.priority -} - -// Sender returns the transaction sender. -func (tx *Transaction) Sender() string { - return tx.sender -} - -// SenderSeq returns the per-sender sequence number. -func (tx *Transaction) SenderSeq() uint64 { - return tx.senderSeq -} - -// setChecked populates transaction data retrieved from checks. -func (tx *Transaction) setChecked(meta *protocol.CheckTxMetadata) { - tx.status = txStatusChecked - - if meta != nil { - tx.priority = meta.Priority - tx.sender = string(meta.Sender) - tx.senderSeq = meta.SenderSeq - } - - // If the sender is empty (e.g. because the runtime does not support specifying a sender), we - // treat each transaction as having a unique sender. This is to allow backwards compatibility. - if len(tx.sender) == 0 { - tx.sender = string(tx.hash[:]) - } -} - -// txCheckFlags are the flags describing how transaction should be checked. +// txCheckFlags are the flags describing how a transaction should be checked. type txCheckFlags uint8 const ( - // txCheckLocal is a flag indicating that the transaction was obtained from a local client. - txCheckLocal = (1 << 0) - // txCheckDiscard is a flag indicating that the transaction should be discarded after checks. - txCheckDiscard = (1 << 1) + // txCheckRecheck is a flag indicating that the transaction already passed checking earlier. + txCheckRecheck = 1 << 0 ) -func (f txCheckFlags) isLocal() bool { - return (f & txCheckLocal) != 0 -} - -func (f txCheckFlags) isDiscard() bool { - return (f & txCheckDiscard) != 0 +func (f txCheckFlags) isRecheck() bool { + return (f * txCheckRecheck) != 0 } // PendingCheckTransaction is a transaction pending checks. type PendingCheckTransaction struct { - *Transaction + *TxQueueMeta // flags are the transaction check flags. flags txCheckFlags + // dstQueue is where to offer the tx after checking, or nil to discard. + dstQueue RecheckableTransactionStore // notifyCh is a channel for sending back the transaction check result. notifyCh chan *protocol.CheckTxResult } - -func (pct *PendingCheckTransaction) isRecheck() bool { - // If transaction has already been checked then the fact that it is wrapped in a pending check - // transaction again means that this is a re-check. - return pct.status == txStatusChecked -} diff --git a/go/runtime/txpool/txpool.go b/go/runtime/txpool/txpool.go index 482dbd2b70e..7ae2bffaadd 100644 --- a/go/runtime/txpool/txpool.go +++ b/go/runtime/txpool/txpool.go @@ -18,6 +18,7 @@ import ( consensus "github.com/oasisprotocol/oasis-core/go/consensus/api" registry "github.com/oasisprotocol/oasis-core/go/registry/api" "github.com/oasisprotocol/oasis-core/go/roothash/api/block" + "github.com/oasisprotocol/oasis-core/go/roothash/api/message" "github.com/oasisprotocol/oasis-core/go/runtime/history" "github.com/oasisprotocol/oasis-core/go/runtime/host" "github.com/oasisprotocol/oasis-core/go/runtime/host/protocol" @@ -67,7 +68,7 @@ type TransactionPool interface { // Quit returns a channel that will be closed when the service terminates. Quit() <-chan struct{} - // Submit adds the transaction into the transaction pool, first performing checks on it by + // SubmitTx adds the transaction into the transaction pool, first performing checks on it by // invoking the runtime. This method waits for the checks to complete. SubmitTx(ctx context.Context, tx []byte, meta *TransactionMeta) (*protocol.CheckTxResult, error) @@ -85,31 +86,41 @@ type TransactionPool interface { // ClearProposedBatch clears the proposal queue. ClearProposedBatch() - // RemoveTxBatch removes a transaction batch from the transaction pool. - RemoveTxBatch(txs []hash.Hash) + // HandleTxsUsed indicates that given transaction hashes are processed in a block. Queues that + // can remove those transactions will do so. + HandleTxsUsed(txs []hash.Hash) - // GetPrioritizedBatch returns a batch of transactions ordered by priority. + // GetSchedulingSuggestion returns a list of transactions to schedule. This begins a + // scheduling session, which suppresses transaction rechecking and republishing. Subsequently + // call GetSchedulingExtra for more transactions, followed by FinishScheduling. + GetSchedulingSuggestion(countHint uint32) []*TxQueueMeta + + // GetSchedulingExtra returns transactions to schedule. // // Offset specifies the transaction hash that should serve as an offset when returning // transactions from the pool. Transactions will be skipped until the given hash is encountered - // and only following transactions will be returned. - GetPrioritizedBatch(offset *hash.Hash, limit uint32) []*Transaction + // and only the following transactions will be returned. + GetSchedulingExtra(offset *hash.Hash, limit uint32) []*TxQueueMeta + + // FinishScheduling finishes a scheduling session, which resumes transaction rechecking and + // republishing. + FinishScheduling() // GetKnownBatch gets a set of known transactions from the transaction pool. // // For any missing transactions nil will be returned in their place and the map of missing // transactions will be populated accordingly. - GetKnownBatch(batch []hash.Hash) ([]*Transaction, map[hash.Hash]int) + GetKnownBatch(batch []hash.Hash) ([]*TxQueueMeta, map[hash.Hash]int) // ProcessBlock updates the last known runtime block information. ProcessBlock(bi *BlockInfo) error + // ProcessIncomingMessages loads transactions from incoming messages into the pool. + ProcessIncomingMessages(inMsgs []*message.IncomingMessage) error + // WakeupScheduler explicitly notifies subscribers that they should attempt scheduling. WakeupScheduler() - // Clear clears the transaction pool. - Clear() - // WatchScheduler subscribes to notifications about when to attempt scheduling. The emitted // boolean flag indicates whether the batch flush timeout expired. WatchScheduler() (pubsub.ClosableSubscription, <-chan bool) @@ -120,9 +131,6 @@ type TransactionPool interface { // PendingCheckSize returns the number of transactions currently pending to be checked. PendingCheckSize() int - - // PendingScheduleSize returns the number of transactions currently pending to be scheduled. - PendingScheduleSize() int } // RuntimeHostProvisioner is a runtime host provisioner. @@ -179,12 +187,20 @@ type txPool struct { checkTxNotifier *pubsub.Broker recheckTxCh *channels.RingChannel - schedulerQueue *scheduleQueue + drainLock sync.Mutex + + usableSources []UsableTransactionSource + recheckableStores []RecheckableTransactionStore + republishableSources []RepublishableTransactionSource + rimQueue *rimQueue + localQueue *localQueue + mainQueue *mainQueue + schedulerTicker *time.Ticker schedulerNotifier *pubsub.Broker proposedTxsLock sync.Mutex - proposedTxs map[hash.Hash]*Transaction + proposedTxs map[hash.Hash]*TxQueueMeta blockInfoLock sync.Mutex blockInfo *BlockInfo @@ -233,24 +249,28 @@ func (t *txPool) SubmitTxNoWait(ctx context.Context, tx []byte, meta *Transactio } func (t *txPool) submitTx(ctx context.Context, rawTx []byte, meta *TransactionMeta, notifyCh chan *protocol.CheckTxResult) error { - tx := newTransaction(rawTx, txStatusPendingCheck) - + tx := &TxQueueMeta{ + raw: rawTx, + hash: hash.NewFromBytes(rawTx), + firstSeen: time.Now(), + } // Skip recently seen transactions. - if _, seen := t.seenCache.Peek(tx.hash); seen { - t.logger.Debug("ignoring already seen transaction", "tx_hash", tx.hash) + if _, seen := t.seenCache.Peek(tx.Hash()); seen { + t.logger.Debug("ignoring already seen transaction", "tx_hash", tx.Hash()) return fmt.Errorf("duplicate transaction") } // Queue transaction for checks. pct := &PendingCheckTransaction{ - Transaction: tx, + TxQueueMeta: tx, notifyCh: notifyCh, } - if meta.Local { - pct.flags |= txCheckLocal - } if meta.Discard { - pct.flags |= txCheckDiscard + pct.dstQueue = nil + } else if meta.Local { + pct.dstQueue = t.localQueue + } else { + pct.dstQueue = t.mainQueue } return t.addToCheckQueue(pct) @@ -258,13 +278,13 @@ func (t *txPool) submitTx(ctx context.Context, rawTx []byte, meta *TransactionMe func (t *txPool) addToCheckQueue(pct *PendingCheckTransaction) error { t.logger.Debug("queuing transaction for check", - "tx", pct.tx, - "tx_hash", pct.hash, - "recheck", pct.isRecheck(), + "tx", pct.Raw(), + "tx_hash", pct.Hash(), + "recheck", pct.flags.isRecheck(), ) if err := t.checkTxQueue.add(pct); err != nil { t.logger.Warn("unable to queue transaction", - "tx_hash", pct.hash, + "tx_hash", pct.Hash(), "err", err, ) return err @@ -288,8 +308,11 @@ func (t *txPool) SubmitProposedBatch(batch [][]byte) { defer t.proposedTxsLock.Unlock() for _, rawTx := range batch { - tx := newTransaction(rawTx, txStatusChecked) - t.proposedTxs[tx.hash] = tx + tx := &TxQueueMeta{ + raw: rawTx, + hash: hash.NewFromBytes(rawTx), + } + t.proposedTxs[tx.Hash()] = tx } } @@ -308,7 +331,7 @@ func (t *txPool) PromoteProposedBatch(batch []hash.Hash) { if tx == nil { continue } - t.proposedTxs[tx.hash] = tx + t.proposedTxs[tx.Hash()] = tx } } @@ -316,21 +339,49 @@ func (t *txPool) ClearProposedBatch() { t.proposedTxsLock.Lock() defer t.proposedTxsLock.Unlock() - t.proposedTxs = make(map[hash.Hash]*Transaction) + t.proposedTxs = make(map[hash.Hash]*TxQueueMeta) } -func (t *txPool) RemoveTxBatch(txs []hash.Hash) { - t.schedulerQueue.remove(txs) +func (t *txPool) GetSchedulingSuggestion(countHint uint32) []*TxQueueMeta { + t.drainLock.Lock() + var txs []*TxQueueMeta + for _, q := range t.usableSources { + txs = append(txs, q.GetSchedulingSuggestion(countHint)...) + } + return txs +} - pendingScheduleSize.With(t.getMetricLabels()).Set(float64(t.schedulerQueue.size())) +func (t *txPool) GetSchedulingExtra(offset *hash.Hash, limit uint32) []*TxQueueMeta { + return t.mainQueue.GetSchedulingExtra(offset, limit) } -func (t *txPool) GetPrioritizedBatch(offset *hash.Hash, limit uint32) []*Transaction { - return t.schedulerQueue.getPrioritizedBatch(offset, limit) +func (t *txPool) FinishScheduling() { + t.drainLock.Unlock() } -func (t *txPool) GetKnownBatch(batch []hash.Hash) ([]*Transaction, map[hash.Hash]int) { - txs, missingTxs := t.schedulerQueue.getKnownBatch(batch) +func (t *txPool) HandleTxsUsed(hashes []hash.Hash) { + for _, q := range t.usableSources { + q.HandleTxsUsed(hashes) + } + + mainQueueSize.With(t.getMetricLabels()).Set(float64(t.mainQueue.inner.size())) + localQueueSize.With(t.getMetricLabels()).Set(float64(t.localQueue.size())) +} + +func (t *txPool) GetKnownBatch(batch []hash.Hash) ([]*TxQueueMeta, map[hash.Hash]int) { + var txs []*TxQueueMeta + missingTxs := make(map[hash.Hash]int) +HASH_LOOP: + for i, h := range batch { + for _, q := range t.usableSources { + if tx := q.GetTxByHash(h); tx != nil { + txs = append(txs, tx) + continue HASH_LOOP + } + } + txs = append(txs, nil) + missingTxs[h] = i + } // Also check the proposed transactions set. t.proposedTxsLock.Lock() @@ -379,6 +430,12 @@ func (t *txPool) ProcessBlock(bi *BlockInfo) error { return nil } +func (t *txPool) ProcessIncomingMessages(inMsgs []*message.IncomingMessage) error { + t.rimQueue.Load(inMsgs) + rimQueueSize.With(t.getMetricLabels()).Set(float64(t.rimQueue.size())) + return nil +} + func (t *txPool) updateScheduler(bi *BlockInfo) error { // Reset ticker to the new interval. t.schedulerTicker.Reset(bi.ActiveDescriptor.TxnScheduler.BatchFlushTimeout) @@ -390,16 +447,6 @@ func (t *txPool) WakeupScheduler() { t.schedulerNotifier.Broadcast(false) } -func (t *txPool) Clear() { - t.schedulerQueue.clear() - t.checkTxQueue.clear() - - t.seenCache.Clear() - t.ClearProposedBatch() - - pendingScheduleSize.With(t.getMetricLabels()).Set(0) -} - func (t *txPool) WatchScheduler() (pubsub.ClosableSubscription, <-chan bool) { sub := t.schedulerNotifier.Subscribe() ch := make(chan bool) @@ -418,10 +465,6 @@ func (t *txPool) PendingCheckSize() int { return t.checkTxQueue.size() } -func (t *txPool) PendingScheduleSize() int { - return t.schedulerQueue.size() -} - func (t *txPool) getCurrentBlockInfo() (*BlockInfo, error) { t.blockInfoLock.Lock() defer t.blockInfoLock.Unlock() @@ -461,7 +504,7 @@ func (t *txPool) checkTxBatch(ctx context.Context, rr host.RichRuntime) { // Check batch. rawTxBatch := make([][]byte, 0, len(batch)) for _, pct := range batch { - rawTxBatch = append(rawTxBatch, pct.tx) + rawTxBatch = append(rawTxBatch, pct.Raw()) } return rr.CheckTx(checkCtx, bi.RuntimeBlock, bi.ConsensusBlock, bi.Epoch, bi.ActiveDescriptor.Executor.MaxMessages, rawTxBatch) }() @@ -510,62 +553,58 @@ func (t *txPool) checkTxBatch(ctx context.Context, rr host.RichRuntime) { } newTxs := make([]*PendingCheckTransaction, 0, len(results)) + goodPcts := make([]*PendingCheckTransaction, 0, len(results)) batchIndices := make([]int, 0, len(results)) - var unschedule []hash.Hash for i, res := range results { if !res.IsSuccess() { rejectedTransactions.With(t.getMetricLabels()).Inc() t.logger.Debug("check tx failed", - "tx", batch[i].tx, - "tx_hash", batch[i].hash, + "tx", batch[i].Raw(), + "tx_hash", batch[i].Hash(), "result", res, - "recheck", batch[i].isRecheck(), + "recheck", batch[i].flags.isRecheck(), ) - // If this was a recheck, make sure to remove the transaction from the scheduling queue. - if batch[i].isRecheck() { - unschedule = append(unschedule, batch[i].hash) - } + // We won't be sending this tx on to its destination queue. notifySubmitter(i) continue } - if batch[i].flags.isDiscard() || batch[i].isRecheck() { + if batch[i].dstQueue == nil { notifySubmitter(i) continue } // For any transactions that are to be queued, we defer notification until queued. - acceptedTransactions.With(t.getMetricLabels()).Inc() - batch[i].setChecked(res.Meta) - newTxs = append(newTxs, batch[i]) + if !batch[i].flags.isRecheck() { + acceptedTransactions.With(t.getMetricLabels()).Inc() + newTxs = append(newTxs, batch[i]) + } + goodPcts = append(goodPcts, batch[i]) batchIndices = append(batchIndices, i) } - // Unschedule any transactions that are being rechecked and have failed checks. - t.RemoveTxBatch(unschedule) - // If there are more transactions to check, make sure we check them next. if t.checkTxQueue.size() > 0 { t.checkTxCh.In() <- struct{}{} } - if len(newTxs) == 0 { + if len(goodPcts) == 0 { return } t.logger.Debug("checked new transactions", "num_txs", len(newTxs), + "accepted_txs", len(goodPcts), ) // Queue checked transactions for scheduling. - for i, tx := range newTxs { - // NOTE: Scheduler exists as otherwise there would be no current block info above. - if err := t.schedulerQueue.add(tx.Transaction); err != nil { + for i, pct := range goodPcts { + if err = pct.dstQueue.OfferChecked(pct.TxQueueMeta, results[batchIndices[i]].Meta); err != nil { t.logger.Error("unable to queue transaction for scheduling", "err", err, - "tx_hash", tx.hash, + "tx_hash", pct.Hash(), ) // Change the result into an error and notify submitter. @@ -581,31 +620,33 @@ func (t *txPool) checkTxBatch(ctx context.Context, rr host.RichRuntime) { // Notify submitter of success. notifySubmitter(batchIndices[i]) - // Publish local transactions immediately. - publishTime := time.Now() - if tx.flags.isLocal() { - if err := t.txPublisher.PublishTx(ctx, tx.tx); err != nil { - t.logger.Warn("failed to publish local transaction", - "err", err, - "tx_hash", tx.hash, - ) - - // Since publication failed, make sure we retry early. - t.republishCh.In() <- struct{}{} - publishTime = time.Time{} + if !pct.flags.isRecheck() { + // Mark new transactions as never having been published. The republish worker will + // publish these immediately. + publishTime := time.Time{} + if pct.dstQueue == t.mainQueue { + // This being a tx we got from outside, it's usually something that another node + // has just broadcast. Treat it as if it were published just now so that we don't + // immediately publish again from our node. + publishTime = time.Now() } + // Put cannot fail as seenCache's LRU capacity is not in bytes and the only case where it + // can error is if the capacity is in bytes and the value size is over capacity. + _ = t.seenCache.Put(pct.Hash(), publishTime) } - - // Put cannot fail as seenCache's LRU capacity is not in bytes and the only case where it - // can error is if the capacity is in bytes and the value size is over capacity. - _ = t.seenCache.Put(tx.hash, publishTime) } - // Notify subscribers that we have received new transactions. - t.checkTxNotifier.Broadcast(newTxs) - t.schedulerNotifier.Broadcast(false) + if len(newTxs) != 0 { + // Kick off publishing for any new txs. + t.republishCh.In() <- struct{}{} + + // Notify subscribers that we have received new transactions. + t.checkTxNotifier.Broadcast(newTxs) + t.schedulerNotifier.Broadcast(false) + } - pendingScheduleSize.With(t.getMetricLabels()).Set(float64(t.PendingScheduleSize())) + mainQueueSize.With(t.getMetricLabels()).Set(float64(t.mainQueue.inner.size())) + localQueueSize.With(t.getMetricLabels()).Set(float64(t.localQueue.size())) } func (t *txPool) ensureInitialized() error { @@ -718,16 +759,21 @@ func (t *txPool) republishWorker() { debounceCh = nil } - lastRepublish = time.Now() - - // Get scheduled transactions. - txs := t.schedulerQueue.getAll() + // Get transactions to republish. + var txs []*TxQueueMeta + (func() { + t.drainLock.Lock() + defer t.drainLock.Unlock() + for _, q := range t.republishableSources { + txs = append(txs, q.GetTxsToPublish()...) + } + })() // Filter transactions based on whether they can already be republished. var republishedCount int nextPendingRepublish := republishInterval for _, tx := range txs { - ts, seen := t.seenCache.Peek(tx.hash) + ts, seen := t.seenCache.Peek(tx.Hash()) if seen { sinceLast := time.Since(ts.(time.Time)) if sinceLast < republishInterval { @@ -738,7 +784,7 @@ func (t *txPool) republishWorker() { } } - if err := t.txPublisher.PublishTx(ctx, tx.tx); err != nil { + if err := t.txPublisher.PublishTx(ctx, tx.Raw()); err != nil { t.logger.Warn("failed to publish transaction", "err", err, "tx", tx, @@ -748,7 +794,7 @@ func (t *txPool) republishWorker() { } // Update publish timestamp. - _ = t.seenCache.Put(tx.hash, time.Now()) + _ = t.seenCache.Put(tx.Hash(), time.Now()) republishedCount++ if republishedCount > maxRepublishTxs { @@ -779,24 +825,54 @@ func (t *txPool) recheckWorker() { case <-t.recheckTxCh.Out(): } - // Get a batch of scheduled transactions. - txs := t.schedulerQueue.getAll() + t.recheck() + } +} - if len(txs) == 0 { - continue - } +func (t *txPool) recheck() { + t.drainLock.Lock() + defer t.drainLock.Unlock() - // Recheck all transactions in batch. - for _, tx := range txs { - err := t.addToCheckQueue(&PendingCheckTransaction{ - Transaction: tx, + // Get a batch of scheduled transactions. + var pcts []*PendingCheckTransaction + var results []chan *protocol.CheckTxResult + for _, q := range t.recheckableStores { + for _, tx := range q.TakeAll() { + notifyCh := make(chan *protocol.CheckTxResult, 1) + pcts = append(pcts, &PendingCheckTransaction{ + TxQueueMeta: tx, + flags: txCheckRecheck, + dstQueue: q, + notifyCh: notifyCh, }) - if err != nil { - t.logger.Warn("failed to submit transaction for recheck", - "err", err, - "tx_hash", tx.hash, - ) - } + results = append(results, notifyCh) + } + } + mainQueueSize.With(t.getMetricLabels()).Set(float64(t.mainQueue.inner.size())) + localQueueSize.With(t.getMetricLabels()).Set(float64(t.localQueue.size())) + + if len(pcts) == 0 { + return + } + + // Recheck all transactions in batch. + for _, pct := range pcts { + err := t.addToCheckQueue(pct) + if err != nil { + t.logger.Warn("failed to submit transaction for recheck", + "err", err, + "tx_hash", pct.Hash(), + ) + } + } + + // Block until checking is done. + for _, notifyCh := range results { + select { + case <-t.stopCh: + return + case <-notifyCh: + // Don't care about result. } } } @@ -836,25 +912,34 @@ func New( // buffer in case the schedule queue is full and is being rechecked. maxCheckTxQueueSize := int((110 * cfg.MaxPoolSize) / 100) + rq := newRimQueue() + lq := newLocalQueue() + mq := newMainQueue(int(cfg.MaxPoolSize)) + return &txPool{ - logger: logging.GetLogger("runtime/txpool"), - stopCh: make(chan struct{}), - quitCh: make(chan struct{}), - initCh: make(chan struct{}), - runtimeID: runtimeID, - cfg: cfg, - host: host, - history: history, - txPublisher: txPublisher, - seenCache: seenCache, - checkTxQueue: newCheckTxQueue(maxCheckTxQueueSize, int(cfg.MaxCheckTxBatchSize)), - checkTxCh: channels.NewRingChannel(1), - checkTxNotifier: pubsub.NewBroker(false), - recheckTxCh: channels.NewRingChannel(1), - schedulerQueue: newScheduleQueue(int(cfg.MaxPoolSize)), - schedulerTicker: time.NewTicker(1 * time.Hour), - schedulerNotifier: pubsub.NewBroker(false), - proposedTxs: make(map[hash.Hash]*Transaction), - republishCh: channels.NewRingChannel(1), + logger: logging.GetLogger("runtime/txpool"), + stopCh: make(chan struct{}), + quitCh: make(chan struct{}), + initCh: make(chan struct{}), + runtimeID: runtimeID, + cfg: cfg, + host: host, + history: history, + txPublisher: txPublisher, + seenCache: seenCache, + checkTxQueue: newCheckTxQueue(maxCheckTxQueueSize, int(cfg.MaxCheckTxBatchSize)), + checkTxCh: channels.NewRingChannel(1), + checkTxNotifier: pubsub.NewBroker(false), + recheckTxCh: channels.NewRingChannel(1), + usableSources: []UsableTransactionSource{rq, lq, mq}, + recheckableStores: []RecheckableTransactionStore{lq, mq}, + republishableSources: []RepublishableTransactionSource{lq, mq}, + rimQueue: rq, + localQueue: lq, + mainQueue: mq, + schedulerTicker: time.NewTicker(1 * time.Hour), + schedulerNotifier: pubsub.NewBroker(false), + proposedTxs: make(map[hash.Hash]*TxQueueMeta), + republishCh: channels.NewRingChannel(1), }, nil } diff --git a/go/worker/client/committee/node.go b/go/worker/client/committee/node.go index f8381382947..78e19b95157 100644 --- a/go/worker/client/committee/node.go +++ b/go/worker/client/committee/node.go @@ -214,7 +214,7 @@ func (n *Node) checkBlock(ctx context.Context, blk *block.Block, pending map[has } // Remove processed transactions from pool. - n.commonNode.TxPool.RemoveTxBatch(processed) + n.commonNode.TxPool.HandleTxsUsed(processed) return nil } diff --git a/go/worker/common/committee/node.go b/go/worker/common/committee/node.go index 5d23e14381d..240f8e22d6d 100644 --- a/go/worker/common/committee/node.go +++ b/go/worker/common/committee/node.go @@ -543,6 +543,26 @@ func (n *Node) handleNewBlockLocked(blk *block.Block, height int64) { ) } + // Fetch incoming messages. + inMsgs, err := n.Consensus.RootHash().GetIncomingMessageQueue(n.ctx, &roothash.InMessageQueueRequest{ + RuntimeID: n.Runtime.ID(), + Height: consensusBlk.Height, + }) + if err != nil { + n.logger.Error("failed to query incoming messages", + "err", err, + "height", height, + "round", blk.Header.Round, + ) + return + } + err = n.TxPool.ProcessIncomingMessages(inMsgs) + if err != nil { + n.logger.Error("failed to process incoming messages in transaction pool", + "err", err, + ) + } + for _, hooks := range n.hooks { hooks.HandleNewBlockLocked(blk) } diff --git a/go/worker/compute/executor/committee/node.go b/go/worker/compute/executor/committee/node.go index d3f3c463827..7087b8b2190 100644 --- a/go/worker/compute/executor/committee/node.go +++ b/go/worker/compute/executor/committee/node.go @@ -373,7 +373,7 @@ func (n *Node) HandleNewBlockLocked(blk *block.Block) { "io_root", header.IORoot, ) // Remove processed transactions from queue. - n.commonNode.TxPool.RemoveTxBatch(state.txHashes) + n.commonNode.TxPool.HandleTxsUsed(state.txHashes) }() } @@ -516,7 +516,7 @@ func (n *Node) getRtStateAndRoundResults(ctx context.Context, height int64) (*ro return state, roundResults, nil } -func (n *Node) handleScheduleBatch(force bool) { //nolint: gocyclo +func (n *Node) handleScheduleBatch(force bool) { // nolint: gocyclo n.commonNode.CrossNode.Lock() defer n.commonNode.CrossNode.Unlock() @@ -586,9 +586,10 @@ func (n *Node) handleScheduleBatch(force bool) { //nolint: gocyclo return } - // Ask the transaction pool to get a batch of transactions for us and see if we should be + // Ask the transaction pool to get a batch of transactions for us and see if we should be // proposing a new batch to other nodes. - batch := n.commonNode.TxPool.GetPrioritizedBatch(nil, rtInfo.Features.ScheduleControl.InitialBatchSize) + batch := n.commonNode.TxPool.GetSchedulingSuggestion(rtInfo.Features.ScheduleControl.InitialBatchSize) + defer n.commonNode.TxPool.FinishScheduling() switch { case len(batch) > 0: // We have some transactions, schedule batch. @@ -676,7 +677,7 @@ func (n *Node) startRuntimeBatchSchedulingLocked( rtState *roothash.RuntimeState, roundResults *roothash.RoundResults, rt host.RichRuntime, - batch []*txpool.Transaction, + batch []*txpool.TxQueueMeta, ) { n.logger.Debug("asking runtime to schedule batch", "initial_batch_size", len(batch), @@ -724,7 +725,7 @@ func (n *Node) startRuntimeBatchSchedulingLocked( } // Remove any rejected transactions. - n.commonNode.TxPool.RemoveTxBatch(rsp.TxRejectHashes) + n.commonNode.TxPool.HandleTxsUsed(rsp.TxRejectHashes) // Mark any proposed transactions. n.commonNode.TxPool.PromoteProposedBatch(rsp.TxHashes)