Skip to content

Commit

Permalink
go/runtime/txpool: make TxQueueMeta fields private, add getters
Browse files Browse the repository at this point in the history
  • Loading branch information
pro-wh committed Aug 19, 2022
1 parent b815cd5 commit 3b7b1db
Show file tree
Hide file tree
Showing 11 changed files with 46 additions and 32 deletions.
2 changes: 1 addition & 1 deletion go/runtime/registry/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ func (h *runtimeHostHandler) Handle(ctx context.Context, body *protocol.Body) (*
batch := txPool.GetSchedulingExtra(rq.Offset, rq.Limit)
raw := make([][]byte, 0, len(batch))
for _, tx := range batch {
raw = append(raw, tx.Raw)
raw = append(raw, tx.Raw())
}

return &protocol.Body{HostFetchTxBatchResponse: &protocol.HostFetchTxBatchResponse{
Expand Down
4 changes: 2 additions & 2 deletions go/runtime/txpool/local_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func (lq *localQueue) HandleTxsUsed(hashes []hash.Hash) {
}
i := len(keptTxs)
keptTxs = append(keptTxs, tx)
lq.indexesByHash[tx.Hash] = i
lq.indexesByHash[tx.Hash()] = i
}
lq.txs = keptTxs
}
Expand All @@ -81,7 +81,7 @@ func (lq *localQueue) TakeAll() []*TxQueueMeta {
func (lq *localQueue) OfferChecked(tx *TxQueueMeta, _ *protocol.CheckTxMetadata) error {
lq.l.Lock()
defer lq.l.Unlock()
lq.indexesByHash[tx.Hash] = len(lq.txs)
lq.indexesByHash[tx.Hash()] = len(lq.txs)
lq.txs = append(lq.txs, tx)
return nil
}
Expand Down
11 changes: 6 additions & 5 deletions go/runtime/txpool/main_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,22 +40,22 @@ func (tx *MainQueueTransaction) String() string {

// Raw returns the raw transaction data.
func (tx *MainQueueTransaction) Raw() []byte {
return tx.TxQueueMeta.Raw
return tx.TxQueueMeta.Raw()
}

// Size returns the size (in bytes) of the raw transaction data.
func (tx *MainQueueTransaction) Size() int {
return len(tx.TxQueueMeta.Raw)
return len(tx.TxQueueMeta.Raw())
}

// Hash returns the hash of the transaction binary data.
func (tx *MainQueueTransaction) Hash() hash.Hash {
return tx.TxQueueMeta.Hash
return tx.TxQueueMeta.Hash()
}

// FirstSeen returns the time the transaction was first seen.
func (tx *MainQueueTransaction) FirstSeen() time.Time {
return tx.TxQueueMeta.FirstSeen
return tx.TxQueueMeta.FirstSeen()
}

// Priority returns the transaction priority.
Expand Down Expand Up @@ -84,7 +84,8 @@ func (tx *MainQueueTransaction) setChecked(meta *protocol.CheckTxMetadata) {
// If the sender is empty (e.g. because the runtime does not support specifying a sender), we
// treat each transaction as having a unique sender. This is to allow backwards compatibility.
if len(tx.sender) == 0 {
tx.sender = string(tx.TxQueueMeta.Hash[:])
h := tx.TxQueueMeta.Hash()
tx.sender = string(h[:])
}
}

Expand Down
20 changes: 16 additions & 4 deletions go/runtime/txpool/queues.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,25 @@ import (
)

type TxQueueMeta struct {
Raw []byte
Hash hash.Hash
// FirstSeen is the timestamp when the transaction was first seen.
raw []byte
hash hash.Hash
// firstSeen is the timestamp when the transaction was first seen.
// We populate this in `submitTx`. Other forms of ingress (namely loading from roothash incoming messages and
// receiving from txSync) leave this in its default value. Transactions from those sources, however, only move
// through a limited area in the tx pool.
FirstSeen time.Time
firstSeen time.Time
}

func (t *TxQueueMeta) Raw() []byte {
return t.raw
}

func (t *TxQueueMeta) Hash() hash.Hash {
return t.hash
}

func (t *TxQueueMeta) FirstSeen() time.Time {
return t.firstSeen
}

// UsableTransactionSource is a place to retrieve txs that are "good enough." "Good enough" variously means CheckTx'd,
Expand Down
4 changes: 2 additions & 2 deletions go/runtime/txpool/rim_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ func (rq *rimQueue) Load(inMsgs []*message.IncomingMessage) {
for _, msg := range inMsgs {
h := hash.NewFromBytes(msg.Data)
newTxs[h] = &TxQueueMeta{
Raw: msg.Data,
Hash: h,
raw: msg.Data,
hash: h,
}
}
rq.l.Lock()
Expand Down
9 changes: 5 additions & 4 deletions go/runtime/txpool/schedule_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func (tx priorityWrappedTx) Less(other btree.Item) bool {
}
// If transactions have same priority, sort by first seen time (earlier transactions are later
// in the queue as we are iterating over the queue in descending order).
return tx.TxQueueMeta.FirstSeen.After(tx2.TxQueueMeta.FirstSeen)
return tx.TxQueueMeta.FirstSeen().After(tx2.TxQueueMeta.FirstSeen())
}

type scheduleQueue struct {
Expand Down Expand Up @@ -66,15 +66,15 @@ func (sq *scheduleQueue) add(tx *MainQueueTransaction) error {
sq.removeLocked(etx.MainQueueTransaction)
}

sq.all[tx.TxQueueMeta.Hash] = tx
sq.all[tx.TxQueueMeta.Hash()] = tx
sq.bySender[tx.sender] = tx
sq.byPriority.ReplaceOrInsert(priorityWrappedTx{tx})

return nil
}

func (sq *scheduleQueue) removeLocked(tx *MainQueueTransaction) {
delete(sq.all, tx.TxQueueMeta.Hash)
delete(sq.all, tx.TxQueueMeta.Hash())
delete(sq.bySender, tx.sender)
sq.byPriority.Delete(priorityWrappedTx{tx})
}
Expand Down Expand Up @@ -114,7 +114,8 @@ func (sq *scheduleQueue) getPrioritizedBatch(offset *hash.Hash, limit uint32) []
tx := i.(priorityWrappedTx)

// Skip the offset item itself (if specified).
if tx.TxQueueMeta.Hash.Equal(offset) {
h := tx.TxQueueMeta.Hash()
if h.Equal(offset) {
return true
}

Expand Down
18 changes: 9 additions & 9 deletions go/runtime/txpool/txpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,9 +250,9 @@ func (t *txPool) SubmitTxNoWait(ctx context.Context, tx []byte, meta *Transactio

func (t *txPool) submitTx(ctx context.Context, rawTx []byte, meta *TransactionMeta, notifyCh chan *protocol.CheckTxResult) error {
tx := &TxQueueMeta{
Raw: rawTx,
Hash: hash.NewFromBytes(rawTx),
FirstSeen: time.Now(),
raw: rawTx,
hash: hash.NewFromBytes(rawTx),
firstSeen: time.Now(),
}
// Skip recently seen transactions.
if _, seen := t.seenCache.Peek(tx.Hash); seen {
Expand Down Expand Up @@ -309,10 +309,10 @@ func (t *txPool) SubmitProposedBatch(batch [][]byte) {

for _, rawTx := range batch {
tx := &TxQueueMeta{
Raw: rawTx,
Hash: hash.NewFromBytes(rawTx),
raw: rawTx,
hash: hash.NewFromBytes(rawTx),
}
t.proposedTxs[tx.Hash] = tx
t.proposedTxs[tx.Hash()] = tx
}
}

Expand All @@ -331,7 +331,7 @@ func (t *txPool) PromoteProposedBatch(batch []hash.Hash) {
if tx == nil {
continue
}
t.proposedTxs[tx.Hash] = tx
t.proposedTxs[tx.Hash()] = tx
}
}

Expand Down Expand Up @@ -504,7 +504,7 @@ func (t *txPool) checkTxBatch(ctx context.Context, rr host.RichRuntime) {
// Check batch.
rawTxBatch := make([][]byte, 0, len(batch))
for _, pct := range batch {
rawTxBatch = append(rawTxBatch, pct.Raw)
rawTxBatch = append(rawTxBatch, pct.Raw())
}
return rr.CheckTx(checkCtx, bi.RuntimeBlock, bi.ConsensusBlock, bi.Epoch, bi.ActiveDescriptor.Executor.MaxMessages, rawTxBatch)
}()
Expand Down Expand Up @@ -784,7 +784,7 @@ func (t *txPool) republishWorker() {
}
}

if err := t.txPublisher.PublishTx(ctx, tx.Raw); err != nil {
if err := t.txPublisher.PublishTx(ctx, tx.Raw()); err != nil {
t.logger.Warn("failed to publish transaction",
"err", err,
"tx", tx,
Expand Down
2 changes: 1 addition & 1 deletion go/worker/common/p2p/txsync/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func (s *service) handleGetTxs(ctx context.Context, request *GetTxsRequest) (*Ge
if tx == nil {
continue
}
rsp.Txs = append(rsp.Txs, tx.Raw)
rsp.Txs = append(rsp.Txs, tx.Raw())
}
return &rsp, nil
}
Expand Down
4 changes: 2 additions & 2 deletions go/worker/compute/executor/committee/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,12 @@ func (ub *unresolvedBatch) resolve(txPool txpool.TransactionPool) (transaction.R
totalSizeBytes int
)
for _, checkedTx := range resolvedBatch {
totalSizeBytes = totalSizeBytes + len(checkedTx.Raw)
totalSizeBytes = totalSizeBytes + len(checkedTx.Raw())
if ub.maxBatchSizeBytes > 0 && uint64(totalSizeBytes) > ub.maxBatchSizeBytes {
return nil, fmt.Errorf("batch too large (max: %d size: >=%d)", ub.maxBatchSizeBytes, totalSizeBytes)
}

batch = append(batch, checkedTx.Raw)
batch = append(batch, checkedTx.Raw())
}
ub.batch = batch

Expand Down
2 changes: 1 addition & 1 deletion go/worker/compute/executor/committee/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -701,7 +701,7 @@ func (n *Node) startRuntimeBatchSchedulingLocked(

initialBatch := make([][]byte, 0, len(batch))
for _, tx := range batch {
initialBatch = append(initialBatch, tx.Raw)
initialBatch = append(initialBatch, tx.Raw())
}

// Ask the runtime to execute the batch.
Expand Down
2 changes: 1 addition & 1 deletion go/worker/compute/executor/committee/transactions.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (n *Node) handleNewCheckedTransactions(txs []*txpool.PendingCheckTransactio
}

for _, tx := range txs {
delete(batch.missingTxs, tx.Hash)
delete(batch.missingTxs, tx.Hash())
}
if len(batch.missingTxs) == 0 {
// We have all transactions, signal the node to start processing the batch.
Expand Down

0 comments on commit 3b7b1db

Please sign in to comment.