From 3b7b1db0b54ba8ac1d3c0bd01cf905dfa3c3ac5b Mon Sep 17 00:00:00 2001 From: Warren He Date: Fri, 19 Aug 2022 13:34:39 -0700 Subject: [PATCH] go/runtime/txpool: make TxQueueMeta fields private, add getters --- go/runtime/registry/host.go | 2 +- go/runtime/txpool/local_queue.go | 4 ++-- go/runtime/txpool/main_queue.go | 11 +++++----- go/runtime/txpool/queues.go | 20 +++++++++++++++---- go/runtime/txpool/rim_queue.go | 4 ++-- go/runtime/txpool/schedule_queue.go | 9 +++++---- go/runtime/txpool/txpool.go | 18 ++++++++--------- go/worker/common/p2p/txsync/server.go | 2 +- go/worker/compute/executor/committee/batch.go | 4 ++-- go/worker/compute/executor/committee/node.go | 2 +- .../executor/committee/transactions.go | 2 +- 11 files changed, 46 insertions(+), 32 deletions(-) diff --git a/go/runtime/registry/host.go b/go/runtime/registry/host.go index 0bc0ec23bde..e6c12e137a9 100644 --- a/go/runtime/registry/host.go +++ b/go/runtime/registry/host.go @@ -252,7 +252,7 @@ func (h *runtimeHostHandler) Handle(ctx context.Context, body *protocol.Body) (* batch := txPool.GetSchedulingExtra(rq.Offset, rq.Limit) raw := make([][]byte, 0, len(batch)) for _, tx := range batch { - raw = append(raw, tx.Raw) + raw = append(raw, tx.Raw()) } return &protocol.Body{HostFetchTxBatchResponse: &protocol.HostFetchTxBatchResponse{ diff --git a/go/runtime/txpool/local_queue.go b/go/runtime/txpool/local_queue.go index 8f8ef2add46..47c71f6feda 100644 --- a/go/runtime/txpool/local_queue.go +++ b/go/runtime/txpool/local_queue.go @@ -64,7 +64,7 @@ func (lq *localQueue) HandleTxsUsed(hashes []hash.Hash) { } i := len(keptTxs) keptTxs = append(keptTxs, tx) - lq.indexesByHash[tx.Hash] = i + lq.indexesByHash[tx.Hash()] = i } lq.txs = keptTxs } @@ -81,7 +81,7 @@ func (lq *localQueue) TakeAll() []*TxQueueMeta { func (lq *localQueue) OfferChecked(tx *TxQueueMeta, _ *protocol.CheckTxMetadata) error { lq.l.Lock() defer lq.l.Unlock() - lq.indexesByHash[tx.Hash] = len(lq.txs) + lq.indexesByHash[tx.Hash()] = len(lq.txs) lq.txs = append(lq.txs, tx) return nil } diff --git a/go/runtime/txpool/main_queue.go b/go/runtime/txpool/main_queue.go index 625fbccf3dc..9af8432ff45 100644 --- a/go/runtime/txpool/main_queue.go +++ b/go/runtime/txpool/main_queue.go @@ -40,22 +40,22 @@ func (tx *MainQueueTransaction) String() string { // Raw returns the raw transaction data. func (tx *MainQueueTransaction) Raw() []byte { - return tx.TxQueueMeta.Raw + return tx.TxQueueMeta.Raw() } // Size returns the size (in bytes) of the raw transaction data. func (tx *MainQueueTransaction) Size() int { - return len(tx.TxQueueMeta.Raw) + return len(tx.TxQueueMeta.Raw()) } // Hash returns the hash of the transaction binary data. func (tx *MainQueueTransaction) Hash() hash.Hash { - return tx.TxQueueMeta.Hash + return tx.TxQueueMeta.Hash() } // FirstSeen returns the time the transaction was first seen. func (tx *MainQueueTransaction) FirstSeen() time.Time { - return tx.TxQueueMeta.FirstSeen + return tx.TxQueueMeta.FirstSeen() } // Priority returns the transaction priority. @@ -84,7 +84,8 @@ func (tx *MainQueueTransaction) setChecked(meta *protocol.CheckTxMetadata) { // If the sender is empty (e.g. because the runtime does not support specifying a sender), we // treat each transaction as having a unique sender. This is to allow backwards compatibility. if len(tx.sender) == 0 { - tx.sender = string(tx.TxQueueMeta.Hash[:]) + h := tx.TxQueueMeta.Hash() + tx.sender = string(h[:]) } } diff --git a/go/runtime/txpool/queues.go b/go/runtime/txpool/queues.go index 3277f81e9fa..591410deea4 100644 --- a/go/runtime/txpool/queues.go +++ b/go/runtime/txpool/queues.go @@ -8,13 +8,25 @@ import ( ) type TxQueueMeta struct { - Raw []byte - Hash hash.Hash - // FirstSeen is the timestamp when the transaction was first seen. + raw []byte + hash hash.Hash + // firstSeen is the timestamp when the transaction was first seen. // We populate this in `submitTx`. Other forms of ingress (namely loading from roothash incoming messages and // receiving from txSync) leave this in its default value. Transactions from those sources, however, only move // through a limited area in the tx pool. - FirstSeen time.Time + firstSeen time.Time +} + +func (t *TxQueueMeta) Raw() []byte { + return t.raw +} + +func (t *TxQueueMeta) Hash() hash.Hash { + return t.hash +} + +func (t *TxQueueMeta) FirstSeen() time.Time { + return t.firstSeen } // UsableTransactionSource is a place to retrieve txs that are "good enough." "Good enough" variously means CheckTx'd, diff --git a/go/runtime/txpool/rim_queue.go b/go/runtime/txpool/rim_queue.go index 57b1fee7768..1347b1a5382 100644 --- a/go/runtime/txpool/rim_queue.go +++ b/go/runtime/txpool/rim_queue.go @@ -42,8 +42,8 @@ func (rq *rimQueue) Load(inMsgs []*message.IncomingMessage) { for _, msg := range inMsgs { h := hash.NewFromBytes(msg.Data) newTxs[h] = &TxQueueMeta{ - Raw: msg.Data, - Hash: h, + raw: msg.Data, + hash: h, } } rq.l.Lock() diff --git a/go/runtime/txpool/schedule_queue.go b/go/runtime/txpool/schedule_queue.go index bd1346225fd..8de7ce0467d 100644 --- a/go/runtime/txpool/schedule_queue.go +++ b/go/runtime/txpool/schedule_queue.go @@ -28,7 +28,7 @@ func (tx priorityWrappedTx) Less(other btree.Item) bool { } // If transactions have same priority, sort by first seen time (earlier transactions are later // in the queue as we are iterating over the queue in descending order). - return tx.TxQueueMeta.FirstSeen.After(tx2.TxQueueMeta.FirstSeen) + return tx.TxQueueMeta.FirstSeen().After(tx2.TxQueueMeta.FirstSeen()) } type scheduleQueue struct { @@ -66,7 +66,7 @@ func (sq *scheduleQueue) add(tx *MainQueueTransaction) error { sq.removeLocked(etx.MainQueueTransaction) } - sq.all[tx.TxQueueMeta.Hash] = tx + sq.all[tx.TxQueueMeta.Hash()] = tx sq.bySender[tx.sender] = tx sq.byPriority.ReplaceOrInsert(priorityWrappedTx{tx}) @@ -74,7 +74,7 @@ func (sq *scheduleQueue) add(tx *MainQueueTransaction) error { } func (sq *scheduleQueue) removeLocked(tx *MainQueueTransaction) { - delete(sq.all, tx.TxQueueMeta.Hash) + delete(sq.all, tx.TxQueueMeta.Hash()) delete(sq.bySender, tx.sender) sq.byPriority.Delete(priorityWrappedTx{tx}) } @@ -114,7 +114,8 @@ func (sq *scheduleQueue) getPrioritizedBatch(offset *hash.Hash, limit uint32) [] tx := i.(priorityWrappedTx) // Skip the offset item itself (if specified). - if tx.TxQueueMeta.Hash.Equal(offset) { + h := tx.TxQueueMeta.Hash() + if h.Equal(offset) { return true } diff --git a/go/runtime/txpool/txpool.go b/go/runtime/txpool/txpool.go index 89fc895debc..070a7ec8cff 100644 --- a/go/runtime/txpool/txpool.go +++ b/go/runtime/txpool/txpool.go @@ -250,9 +250,9 @@ func (t *txPool) SubmitTxNoWait(ctx context.Context, tx []byte, meta *Transactio func (t *txPool) submitTx(ctx context.Context, rawTx []byte, meta *TransactionMeta, notifyCh chan *protocol.CheckTxResult) error { tx := &TxQueueMeta{ - Raw: rawTx, - Hash: hash.NewFromBytes(rawTx), - FirstSeen: time.Now(), + raw: rawTx, + hash: hash.NewFromBytes(rawTx), + firstSeen: time.Now(), } // Skip recently seen transactions. if _, seen := t.seenCache.Peek(tx.Hash); seen { @@ -309,10 +309,10 @@ func (t *txPool) SubmitProposedBatch(batch [][]byte) { for _, rawTx := range batch { tx := &TxQueueMeta{ - Raw: rawTx, - Hash: hash.NewFromBytes(rawTx), + raw: rawTx, + hash: hash.NewFromBytes(rawTx), } - t.proposedTxs[tx.Hash] = tx + t.proposedTxs[tx.Hash()] = tx } } @@ -331,7 +331,7 @@ func (t *txPool) PromoteProposedBatch(batch []hash.Hash) { if tx == nil { continue } - t.proposedTxs[tx.Hash] = tx + t.proposedTxs[tx.Hash()] = tx } } @@ -504,7 +504,7 @@ func (t *txPool) checkTxBatch(ctx context.Context, rr host.RichRuntime) { // Check batch. rawTxBatch := make([][]byte, 0, len(batch)) for _, pct := range batch { - rawTxBatch = append(rawTxBatch, pct.Raw) + rawTxBatch = append(rawTxBatch, pct.Raw()) } return rr.CheckTx(checkCtx, bi.RuntimeBlock, bi.ConsensusBlock, bi.Epoch, bi.ActiveDescriptor.Executor.MaxMessages, rawTxBatch) }() @@ -784,7 +784,7 @@ func (t *txPool) republishWorker() { } } - if err := t.txPublisher.PublishTx(ctx, tx.Raw); err != nil { + if err := t.txPublisher.PublishTx(ctx, tx.Raw()); err != nil { t.logger.Warn("failed to publish transaction", "err", err, "tx", tx, diff --git a/go/worker/common/p2p/txsync/server.go b/go/worker/common/p2p/txsync/server.go index 5aeb863c586..84a6e9209e1 100644 --- a/go/worker/common/p2p/txsync/server.go +++ b/go/worker/common/p2p/txsync/server.go @@ -44,7 +44,7 @@ func (s *service) handleGetTxs(ctx context.Context, request *GetTxsRequest) (*Ge if tx == nil { continue } - rsp.Txs = append(rsp.Txs, tx.Raw) + rsp.Txs = append(rsp.Txs, tx.Raw()) } return &rsp, nil } diff --git a/go/worker/compute/executor/committee/batch.go b/go/worker/compute/executor/committee/batch.go index e5ee5ea551d..2645a40cc1d 100644 --- a/go/worker/compute/executor/committee/batch.go +++ b/go/worker/compute/executor/committee/batch.go @@ -58,12 +58,12 @@ func (ub *unresolvedBatch) resolve(txPool txpool.TransactionPool) (transaction.R totalSizeBytes int ) for _, checkedTx := range resolvedBatch { - totalSizeBytes = totalSizeBytes + len(checkedTx.Raw) + totalSizeBytes = totalSizeBytes + len(checkedTx.Raw()) if ub.maxBatchSizeBytes > 0 && uint64(totalSizeBytes) > ub.maxBatchSizeBytes { return nil, fmt.Errorf("batch too large (max: %d size: >=%d)", ub.maxBatchSizeBytes, totalSizeBytes) } - batch = append(batch, checkedTx.Raw) + batch = append(batch, checkedTx.Raw()) } ub.batch = batch diff --git a/go/worker/compute/executor/committee/node.go b/go/worker/compute/executor/committee/node.go index 3dc0c17c9d3..7087b8b2190 100644 --- a/go/worker/compute/executor/committee/node.go +++ b/go/worker/compute/executor/committee/node.go @@ -701,7 +701,7 @@ func (n *Node) startRuntimeBatchSchedulingLocked( initialBatch := make([][]byte, 0, len(batch)) for _, tx := range batch { - initialBatch = append(initialBatch, tx.Raw) + initialBatch = append(initialBatch, tx.Raw()) } // Ask the runtime to execute the batch. diff --git a/go/worker/compute/executor/committee/transactions.go b/go/worker/compute/executor/committee/transactions.go index f5f1ae4b2c4..8d351ff0c6e 100644 --- a/go/worker/compute/executor/committee/transactions.go +++ b/go/worker/compute/executor/committee/transactions.go @@ -70,7 +70,7 @@ func (n *Node) handleNewCheckedTransactions(txs []*txpool.PendingCheckTransactio } for _, tx := range txs { - delete(batch.missingTxs, tx.Hash) + delete(batch.missingTxs, tx.Hash()) } if len(batch.missingTxs) == 0 { // We have all transactions, signal the node to start processing the batch.