From c072a380bdf446f96866af16a033d68c03e1b3fa Mon Sep 17 00:00:00 2001 From: Warren He Date: Fri, 19 Aug 2022 13:34:39 -0700 Subject: [PATCH] go/runtime/txpool: make TxQueueMeta fields private, add getters --- go/runtime/registry/host.go | 2 +- go/runtime/txpool/check_queue_test.go | 8 ++-- go/runtime/txpool/local_queue.go | 4 +- go/runtime/txpool/local_queue_test.go | 12 +++--- go/runtime/txpool/main_queue.go | 26 ++---------- go/runtime/txpool/queues.go | 28 +++++++++++-- go/runtime/txpool/rim_queue.go | 4 +- go/runtime/txpool/rim_queue_test.go | 8 ++-- go/runtime/txpool/schedule_queue.go | 9 ++-- go/runtime/txpool/schedule_queue_test.go | 8 ++-- go/runtime/txpool/txpool.go | 42 +++++++++---------- go/worker/common/p2p/txsync/server.go | 2 +- go/worker/compute/executor/committee/batch.go | 4 +- go/worker/compute/executor/committee/node.go | 2 +- .../executor/committee/transactions.go | 2 +- 15 files changed, 81 insertions(+), 80 deletions(-) diff --git a/go/runtime/registry/host.go b/go/runtime/registry/host.go index 0bc0ec23bde..e6c12e137a9 100644 --- a/go/runtime/registry/host.go +++ b/go/runtime/registry/host.go @@ -252,7 +252,7 @@ func (h *runtimeHostHandler) Handle(ctx context.Context, body *protocol.Body) (* batch := txPool.GetSchedulingExtra(rq.Offset, rq.Limit) raw := make([][]byte, 0, len(batch)) for _, tx := range batch { - raw = append(raw, tx.Raw) + raw = append(raw, tx.Raw()) } return &protocol.Body{HostFetchTxBatchResponse: &protocol.HostFetchTxBatchResponse{ diff --git a/go/runtime/txpool/check_queue_test.go b/go/runtime/txpool/check_queue_test.go index 29c7d72b8f2..2d296ab31c0 100644 --- a/go/runtime/txpool/check_queue_test.go +++ b/go/runtime/txpool/check_queue_test.go @@ -12,8 +12,8 @@ import ( func newPendingTx(tx []byte) *PendingCheckTransaction { return &PendingCheckTransaction{ TxQueueMeta: &TxQueueMeta{ - Raw: tx, - Hash: hash.NewFromBytes(tx), + raw: tx, + hash: hash.NewFromBytes(tx), }, } } @@ -39,9 +39,9 @@ func TestCheckTxQueueBasic(t *testing.T) { require.EqualValues(t, 10, len(batch), "Batch size") require.EqualValues(t, 41, queue.size(), "Size") - require.EqualValues(t, batch[0].Raw, []byte("hello world")) + require.EqualValues(t, batch[0].Raw(), []byte("hello world")) for i := 0; i < 9; i++ { - require.EqualValues(t, batch[i+1].Raw, []byte(fmt.Sprintf("call %d", i))) + require.EqualValues(t, batch[i+1].Raw(), []byte(fmt.Sprintf("call %d", i))) } queue.clear() diff --git a/go/runtime/txpool/local_queue.go b/go/runtime/txpool/local_queue.go index 8f8ef2add46..47c71f6feda 100644 --- a/go/runtime/txpool/local_queue.go +++ b/go/runtime/txpool/local_queue.go @@ -64,7 +64,7 @@ func (lq *localQueue) HandleTxsUsed(hashes []hash.Hash) { } i := len(keptTxs) keptTxs = append(keptTxs, tx) - lq.indexesByHash[tx.Hash] = i + lq.indexesByHash[tx.Hash()] = i } lq.txs = keptTxs } @@ -81,7 +81,7 @@ func (lq *localQueue) TakeAll() []*TxQueueMeta { func (lq *localQueue) OfferChecked(tx *TxQueueMeta, _ *protocol.CheckTxMetadata) error { lq.l.Lock() defer lq.l.Unlock() - lq.indexesByHash[tx.Hash] = len(lq.txs) + lq.indexesByHash[tx.Hash()] = len(lq.txs) lq.txs = append(lq.txs, tx) return nil } diff --git a/go/runtime/txpool/local_queue_test.go b/go/runtime/txpool/local_queue_test.go index 9ad13ef0dab..1810b05d9bb 100644 --- a/go/runtime/txpool/local_queue_test.go +++ b/go/runtime/txpool/local_queue_test.go @@ -16,10 +16,10 @@ func TestLocalQueueBasic(t *testing.T) { // Add two transactions, with a higher priority one coming later. rawA := []byte("a") - txA := &TxQueueMeta{Raw: rawA, Hash: hash.NewFromBytes(rawA)} + txA := &TxQueueMeta{raw: rawA, hash: hash.NewFromBytes(rawA)} require.NoError(t, lq.OfferChecked(txA, &protocol.CheckTxMetadata{Priority: 1}), "offer checked a") rawB := []byte("b") - txB := &TxQueueMeta{Raw: rawB, Hash: hash.NewFromBytes(rawB)} + txB := &TxQueueMeta{raw: rawB, hash: hash.NewFromBytes(rawB)} require.NoError(t, lq.OfferChecked(txB, &protocol.CheckTxMetadata{Priority: 5}), "offer checked a") require.Equal(t, 2, lq.size()) @@ -28,16 +28,16 @@ func TestLocalQueueBasic(t *testing.T) { // Schedule in original order. require.EqualValues(t, []*TxQueueMeta{txA, txB}, lq.GetSchedulingSuggestion(50), "get scheduling suggestion") - tx := lq.GetTxByHash(txA.Hash) + tx := lq.GetTxByHash(txA.Hash()) require.EqualValues(t, txA, tx, "get tx by hash a") hashC := hash.NewFromBytes([]byte("c")) tx = lq.GetTxByHash(hashC) require.Nil(t, tx, "get tx by hash c") lq.HandleTxsUsed([]hash.Hash{hashC}) - require.EqualValues(t, map[hash.Hash]int{txA.Hash: 0, txB.Hash: 1}, lq.indexesByHash, "after handle txs used absent") - lq.HandleTxsUsed([]hash.Hash{txA.Hash}) - require.EqualValues(t, map[hash.Hash]int{txB.Hash: 0}, lq.indexesByHash, "after handle txs used") + require.EqualValues(t, map[hash.Hash]int{txA.Hash(): 0, txB.Hash(): 1}, lq.indexesByHash, "after handle txs used absent") + lq.HandleTxsUsed([]hash.Hash{txA.Hash()}) + require.EqualValues(t, map[hash.Hash]int{txB.Hash(): 0}, lq.indexesByHash, "after handle txs used") require.EqualValues(t, []*TxQueueMeta{txB}, lq.TakeAll(), "take all") require.Len(t, lq.GetSchedulingSuggestion(50), 0, "after take all") diff --git a/go/runtime/txpool/main_queue.go b/go/runtime/txpool/main_queue.go index 625fbccf3dc..2d14550a3ca 100644 --- a/go/runtime/txpool/main_queue.go +++ b/go/runtime/txpool/main_queue.go @@ -2,7 +2,6 @@ package txpool import ( "fmt" - "time" "github.com/oasisprotocol/oasis-core/go/common/crypto/hash" "github.com/oasisprotocol/oasis-core/go/runtime/host/protocol" @@ -35,27 +34,7 @@ func newTransaction(tx TxQueueMeta) *MainQueueTransaction { // String returns a string representation of a transaction. func (tx *MainQueueTransaction) String() string { - return fmt.Sprintf("MainQueueTransaction{hash: %s, first_seen: %s, priority: %d}", tx.TxQueueMeta.Hash, tx.TxQueueMeta.FirstSeen, tx.priority) -} - -// Raw returns the raw transaction data. -func (tx *MainQueueTransaction) Raw() []byte { - return tx.TxQueueMeta.Raw -} - -// Size returns the size (in bytes) of the raw transaction data. -func (tx *MainQueueTransaction) Size() int { - return len(tx.TxQueueMeta.Raw) -} - -// Hash returns the hash of the transaction binary data. -func (tx *MainQueueTransaction) Hash() hash.Hash { - return tx.TxQueueMeta.Hash -} - -// FirstSeen returns the time the transaction was first seen. -func (tx *MainQueueTransaction) FirstSeen() time.Time { - return tx.TxQueueMeta.FirstSeen + return fmt.Sprintf("MainQueueTransaction{hash: %s, first_seen: %s, priority: %d}", tx.Hash(), tx.FirstSeen(), tx.priority) } // Priority returns the transaction priority. @@ -84,7 +63,8 @@ func (tx *MainQueueTransaction) setChecked(meta *protocol.CheckTxMetadata) { // If the sender is empty (e.g. because the runtime does not support specifying a sender), we // treat each transaction as having a unique sender. This is to allow backwards compatibility. if len(tx.sender) == 0 { - tx.sender = string(tx.TxQueueMeta.Hash[:]) + h := tx.Hash() + tx.sender = string(h[:]) } } diff --git a/go/runtime/txpool/queues.go b/go/runtime/txpool/queues.go index b59e4bea9e3..2b9fb44208b 100644 --- a/go/runtime/txpool/queues.go +++ b/go/runtime/txpool/queues.go @@ -9,13 +9,33 @@ import ( // TxQueueMeta stores some queuing-related metadata alongside a raw transaction. type TxQueueMeta struct { - Raw []byte - Hash hash.Hash - // FirstSeen is the timestamp when the transaction was first seen. + raw []byte + hash hash.Hash + // firstSeen is the timestamp when the transaction was first seen. // We populate this in `submitTx`. Other forms of ingress (namely loading from roothash incoming messages and // receiving from txSync) leave this in its default value. Transactions from those sources, however, only move // through a limited area in the tx pool. - FirstSeen time.Time + firstSeen time.Time +} + +// Raw returns the raw transaction data. +func (t *TxQueueMeta) Raw() []byte { + return t.raw +} + +// Size returns the size (in bytes) of the raw transaction data. +func (t *TxQueueMeta) Size() int { + return len(t.Raw()) +} + +// Hash returns the hash of the transaction binary data. +func (t *TxQueueMeta) Hash() hash.Hash { + return t.hash +} + +// FirstSeen returns the time the transaction was first seen. +func (t *TxQueueMeta) FirstSeen() time.Time { + return t.firstSeen } // UsableTransactionSource is a place to retrieve txs that are "good enough." "Good enough" variously means CheckTx'd, diff --git a/go/runtime/txpool/rim_queue.go b/go/runtime/txpool/rim_queue.go index 57b1fee7768..1347b1a5382 100644 --- a/go/runtime/txpool/rim_queue.go +++ b/go/runtime/txpool/rim_queue.go @@ -42,8 +42,8 @@ func (rq *rimQueue) Load(inMsgs []*message.IncomingMessage) { for _, msg := range inMsgs { h := hash.NewFromBytes(msg.Data) newTxs[h] = &TxQueueMeta{ - Raw: msg.Data, - Hash: h, + raw: msg.Data, + hash: h, } } rq.l.Lock() diff --git a/go/runtime/txpool/rim_queue_test.go b/go/runtime/txpool/rim_queue_test.go index 61eef6ebfd4..774677a2bae 100644 --- a/go/runtime/txpool/rim_queue_test.go +++ b/go/runtime/txpool/rim_queue_test.go @@ -13,20 +13,20 @@ func TestRimQueue(t *testing.T) { rq := newRimQueue() rawA := []byte("a") - txA := &TxQueueMeta{Raw: rawA, Hash: hash.NewFromBytes(rawA)} + txA := &TxQueueMeta{raw: rawA, hash: hash.NewFromBytes(rawA)} rq.Load([]*message.IncomingMessage{ { ID: 1, Data: rawA, }, }) - require.EqualValues(t, map[hash.Hash]*TxQueueMeta{txA.Hash: txA}, rq.txs, "after load") + require.EqualValues(t, map[hash.Hash]*TxQueueMeta{txA.Hash(): txA}, rq.txs, "after load") require.Equal(t, 1, rq.size()) require.Nil(t, rq.GetSchedulingSuggestion(50), "get scheduling suggestion") - rq.HandleTxsUsed([]hash.Hash{txA.Hash}) + rq.HandleTxsUsed([]hash.Hash{txA.Hash()}) - tx := rq.GetTxByHash(txA.Hash) + tx := rq.GetTxByHash(txA.Hash()) require.EqualValues(t, txA, tx, "get tx by hash a") hashC := hash.NewFromBytes([]byte("c")) tx = rq.GetTxByHash(hashC) diff --git a/go/runtime/txpool/schedule_queue.go b/go/runtime/txpool/schedule_queue.go index bd1346225fd..aeecaf2c19f 100644 --- a/go/runtime/txpool/schedule_queue.go +++ b/go/runtime/txpool/schedule_queue.go @@ -28,7 +28,7 @@ func (tx priorityWrappedTx) Less(other btree.Item) bool { } // If transactions have same priority, sort by first seen time (earlier transactions are later // in the queue as we are iterating over the queue in descending order). - return tx.TxQueueMeta.FirstSeen.After(tx2.TxQueueMeta.FirstSeen) + return tx.FirstSeen().After(tx2.FirstSeen()) } type scheduleQueue struct { @@ -66,7 +66,7 @@ func (sq *scheduleQueue) add(tx *MainQueueTransaction) error { sq.removeLocked(etx.MainQueueTransaction) } - sq.all[tx.TxQueueMeta.Hash] = tx + sq.all[tx.Hash()] = tx sq.bySender[tx.sender] = tx sq.byPriority.ReplaceOrInsert(priorityWrappedTx{tx}) @@ -74,7 +74,7 @@ func (sq *scheduleQueue) add(tx *MainQueueTransaction) error { } func (sq *scheduleQueue) removeLocked(tx *MainQueueTransaction) { - delete(sq.all, tx.TxQueueMeta.Hash) + delete(sq.all, tx.Hash()) delete(sq.bySender, tx.sender) sq.byPriority.Delete(priorityWrappedTx{tx}) } @@ -114,7 +114,8 @@ func (sq *scheduleQueue) getPrioritizedBatch(offset *hash.Hash, limit uint32) [] tx := i.(priorityWrappedTx) // Skip the offset item itself (if specified). - if tx.TxQueueMeta.Hash.Equal(offset) { + h := tx.Hash() + if h.Equal(offset) { return true } diff --git a/go/runtime/txpool/schedule_queue_test.go b/go/runtime/txpool/schedule_queue_test.go index 17624ef2826..584a1fa97f1 100644 --- a/go/runtime/txpool/schedule_queue_test.go +++ b/go/runtime/txpool/schedule_queue_test.go @@ -13,9 +13,9 @@ import ( func newTestTransaction(data []byte, priority uint64) *MainQueueTransaction { tx := newTransaction(TxQueueMeta{ - Raw: data, - Hash: hash.NewFromBytes(data), - FirstSeen: time.Now(), + raw: data, + hash: hash.NewFromBytes(data), + firstSeen: time.Now(), }) tx.setChecked(&protocol.CheckTxMetadata{ Priority: priority, @@ -210,6 +210,6 @@ func TestScheduleQueueSender(t *testing.T) { require.NoError(err, "Add") require.Equal(1, queue.size()) - queue.remove([]hash.Hash{tx.TxQueueMeta.Hash}) + queue.remove([]hash.Hash{tx.Hash()}) require.Equal(0, queue.size()) } diff --git a/go/runtime/txpool/txpool.go b/go/runtime/txpool/txpool.go index 89fc895debc..7ae2bffaadd 100644 --- a/go/runtime/txpool/txpool.go +++ b/go/runtime/txpool/txpool.go @@ -250,13 +250,13 @@ func (t *txPool) SubmitTxNoWait(ctx context.Context, tx []byte, meta *Transactio func (t *txPool) submitTx(ctx context.Context, rawTx []byte, meta *TransactionMeta, notifyCh chan *protocol.CheckTxResult) error { tx := &TxQueueMeta{ - Raw: rawTx, - Hash: hash.NewFromBytes(rawTx), - FirstSeen: time.Now(), + raw: rawTx, + hash: hash.NewFromBytes(rawTx), + firstSeen: time.Now(), } // Skip recently seen transactions. - if _, seen := t.seenCache.Peek(tx.Hash); seen { - t.logger.Debug("ignoring already seen transaction", "tx_hash", tx.Hash) + if _, seen := t.seenCache.Peek(tx.Hash()); seen { + t.logger.Debug("ignoring already seen transaction", "tx_hash", tx.Hash()) return fmt.Errorf("duplicate transaction") } @@ -278,13 +278,13 @@ func (t *txPool) submitTx(ctx context.Context, rawTx []byte, meta *TransactionMe func (t *txPool) addToCheckQueue(pct *PendingCheckTransaction) error { t.logger.Debug("queuing transaction for check", - "tx", pct.Raw, - "tx_hash", pct.Hash, + "tx", pct.Raw(), + "tx_hash", pct.Hash(), "recheck", pct.flags.isRecheck(), ) if err := t.checkTxQueue.add(pct); err != nil { t.logger.Warn("unable to queue transaction", - "tx_hash", pct.Hash, + "tx_hash", pct.Hash(), "err", err, ) return err @@ -309,10 +309,10 @@ func (t *txPool) SubmitProposedBatch(batch [][]byte) { for _, rawTx := range batch { tx := &TxQueueMeta{ - Raw: rawTx, - Hash: hash.NewFromBytes(rawTx), + raw: rawTx, + hash: hash.NewFromBytes(rawTx), } - t.proposedTxs[tx.Hash] = tx + t.proposedTxs[tx.Hash()] = tx } } @@ -331,7 +331,7 @@ func (t *txPool) PromoteProposedBatch(batch []hash.Hash) { if tx == nil { continue } - t.proposedTxs[tx.Hash] = tx + t.proposedTxs[tx.Hash()] = tx } } @@ -504,7 +504,7 @@ func (t *txPool) checkTxBatch(ctx context.Context, rr host.RichRuntime) { // Check batch. rawTxBatch := make([][]byte, 0, len(batch)) for _, pct := range batch { - rawTxBatch = append(rawTxBatch, pct.Raw) + rawTxBatch = append(rawTxBatch, pct.Raw()) } return rr.CheckTx(checkCtx, bi.RuntimeBlock, bi.ConsensusBlock, bi.Epoch, bi.ActiveDescriptor.Executor.MaxMessages, rawTxBatch) }() @@ -559,8 +559,8 @@ func (t *txPool) checkTxBatch(ctx context.Context, rr host.RichRuntime) { if !res.IsSuccess() { rejectedTransactions.With(t.getMetricLabels()).Inc() t.logger.Debug("check tx failed", - "tx", batch[i].Raw, - "tx_hash", batch[i].Hash, + "tx", batch[i].Raw(), + "tx_hash", batch[i].Hash(), "result", res, "recheck", batch[i].flags.isRecheck(), ) @@ -604,7 +604,7 @@ func (t *txPool) checkTxBatch(ctx context.Context, rr host.RichRuntime) { if err = pct.dstQueue.OfferChecked(pct.TxQueueMeta, results[batchIndices[i]].Meta); err != nil { t.logger.Error("unable to queue transaction for scheduling", "err", err, - "tx_hash", pct.Hash, + "tx_hash", pct.Hash(), ) // Change the result into an error and notify submitter. @@ -632,7 +632,7 @@ func (t *txPool) checkTxBatch(ctx context.Context, rr host.RichRuntime) { } // Put cannot fail as seenCache's LRU capacity is not in bytes and the only case where it // can error is if the capacity is in bytes and the value size is over capacity. - _ = t.seenCache.Put(pct.Hash, publishTime) + _ = t.seenCache.Put(pct.Hash(), publishTime) } } @@ -773,7 +773,7 @@ func (t *txPool) republishWorker() { var republishedCount int nextPendingRepublish := republishInterval for _, tx := range txs { - ts, seen := t.seenCache.Peek(tx.Hash) + ts, seen := t.seenCache.Peek(tx.Hash()) if seen { sinceLast := time.Since(ts.(time.Time)) if sinceLast < republishInterval { @@ -784,7 +784,7 @@ func (t *txPool) republishWorker() { } } - if err := t.txPublisher.PublishTx(ctx, tx.Raw); err != nil { + if err := t.txPublisher.PublishTx(ctx, tx.Raw()); err != nil { t.logger.Warn("failed to publish transaction", "err", err, "tx", tx, @@ -794,7 +794,7 @@ func (t *txPool) republishWorker() { } // Update publish timestamp. - _ = t.seenCache.Put(tx.Hash, time.Now()) + _ = t.seenCache.Put(tx.Hash(), time.Now()) republishedCount++ if republishedCount > maxRepublishTxs { @@ -861,7 +861,7 @@ func (t *txPool) recheck() { if err != nil { t.logger.Warn("failed to submit transaction for recheck", "err", err, - "tx_hash", pct.Hash, + "tx_hash", pct.Hash(), ) } } diff --git a/go/worker/common/p2p/txsync/server.go b/go/worker/common/p2p/txsync/server.go index 5aeb863c586..84a6e9209e1 100644 --- a/go/worker/common/p2p/txsync/server.go +++ b/go/worker/common/p2p/txsync/server.go @@ -44,7 +44,7 @@ func (s *service) handleGetTxs(ctx context.Context, request *GetTxsRequest) (*Ge if tx == nil { continue } - rsp.Txs = append(rsp.Txs, tx.Raw) + rsp.Txs = append(rsp.Txs, tx.Raw()) } return &rsp, nil } diff --git a/go/worker/compute/executor/committee/batch.go b/go/worker/compute/executor/committee/batch.go index e5ee5ea551d..239984d00a4 100644 --- a/go/worker/compute/executor/committee/batch.go +++ b/go/worker/compute/executor/committee/batch.go @@ -58,12 +58,12 @@ func (ub *unresolvedBatch) resolve(txPool txpool.TransactionPool) (transaction.R totalSizeBytes int ) for _, checkedTx := range resolvedBatch { - totalSizeBytes = totalSizeBytes + len(checkedTx.Raw) + totalSizeBytes = totalSizeBytes + checkedTx.Size() if ub.maxBatchSizeBytes > 0 && uint64(totalSizeBytes) > ub.maxBatchSizeBytes { return nil, fmt.Errorf("batch too large (max: %d size: >=%d)", ub.maxBatchSizeBytes, totalSizeBytes) } - batch = append(batch, checkedTx.Raw) + batch = append(batch, checkedTx.Raw()) } ub.batch = batch diff --git a/go/worker/compute/executor/committee/node.go b/go/worker/compute/executor/committee/node.go index 3dc0c17c9d3..7087b8b2190 100644 --- a/go/worker/compute/executor/committee/node.go +++ b/go/worker/compute/executor/committee/node.go @@ -701,7 +701,7 @@ func (n *Node) startRuntimeBatchSchedulingLocked( initialBatch := make([][]byte, 0, len(batch)) for _, tx := range batch { - initialBatch = append(initialBatch, tx.Raw) + initialBatch = append(initialBatch, tx.Raw()) } // Ask the runtime to execute the batch. diff --git a/go/worker/compute/executor/committee/transactions.go b/go/worker/compute/executor/committee/transactions.go index f5f1ae4b2c4..8d351ff0c6e 100644 --- a/go/worker/compute/executor/committee/transactions.go +++ b/go/worker/compute/executor/committee/transactions.go @@ -70,7 +70,7 @@ func (n *Node) handleNewCheckedTransactions(txs []*txpool.PendingCheckTransactio } for _, tx := range txs { - delete(batch.missingTxs, tx.Hash) + delete(batch.missingTxs, tx.Hash()) } if len(batch.missingTxs) == 0 { // We have all transactions, signal the node to start processing the batch.