Skip to content

Commit

Permalink
go/runtime/txpool: make TxQueueMeta fields private, add getters
Browse files Browse the repository at this point in the history
  • Loading branch information
pro-wh committed Aug 22, 2022
1 parent 5d61f2a commit c072a38
Show file tree
Hide file tree
Showing 15 changed files with 81 additions and 80 deletions.
2 changes: 1 addition & 1 deletion go/runtime/registry/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ func (h *runtimeHostHandler) Handle(ctx context.Context, body *protocol.Body) (*
batch := txPool.GetSchedulingExtra(rq.Offset, rq.Limit)
raw := make([][]byte, 0, len(batch))
for _, tx := range batch {
raw = append(raw, tx.Raw)
raw = append(raw, tx.Raw())
}

return &protocol.Body{HostFetchTxBatchResponse: &protocol.HostFetchTxBatchResponse{
Expand Down
8 changes: 4 additions & 4 deletions go/runtime/txpool/check_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ import (
func newPendingTx(tx []byte) *PendingCheckTransaction {
return &PendingCheckTransaction{
TxQueueMeta: &TxQueueMeta{
Raw: tx,
Hash: hash.NewFromBytes(tx),
raw: tx,
hash: hash.NewFromBytes(tx),
},
}
}
Expand All @@ -39,9 +39,9 @@ func TestCheckTxQueueBasic(t *testing.T) {
require.EqualValues(t, 10, len(batch), "Batch size")
require.EqualValues(t, 41, queue.size(), "Size")

require.EqualValues(t, batch[0].Raw, []byte("hello world"))
require.EqualValues(t, batch[0].Raw(), []byte("hello world"))
for i := 0; i < 9; i++ {
require.EqualValues(t, batch[i+1].Raw, []byte(fmt.Sprintf("call %d", i)))
require.EqualValues(t, batch[i+1].Raw(), []byte(fmt.Sprintf("call %d", i)))
}

queue.clear()
Expand Down
4 changes: 2 additions & 2 deletions go/runtime/txpool/local_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func (lq *localQueue) HandleTxsUsed(hashes []hash.Hash) {
}
i := len(keptTxs)
keptTxs = append(keptTxs, tx)
lq.indexesByHash[tx.Hash] = i
lq.indexesByHash[tx.Hash()] = i
}
lq.txs = keptTxs
}
Expand All @@ -81,7 +81,7 @@ func (lq *localQueue) TakeAll() []*TxQueueMeta {
func (lq *localQueue) OfferChecked(tx *TxQueueMeta, _ *protocol.CheckTxMetadata) error {
lq.l.Lock()
defer lq.l.Unlock()
lq.indexesByHash[tx.Hash] = len(lq.txs)
lq.indexesByHash[tx.Hash()] = len(lq.txs)
lq.txs = append(lq.txs, tx)
return nil
}
Expand Down
12 changes: 6 additions & 6 deletions go/runtime/txpool/local_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ func TestLocalQueueBasic(t *testing.T) {

// Add two transactions, with a higher priority one coming later.
rawA := []byte("a")
txA := &TxQueueMeta{Raw: rawA, Hash: hash.NewFromBytes(rawA)}
txA := &TxQueueMeta{raw: rawA, hash: hash.NewFromBytes(rawA)}
require.NoError(t, lq.OfferChecked(txA, &protocol.CheckTxMetadata{Priority: 1}), "offer checked a")
rawB := []byte("b")
txB := &TxQueueMeta{Raw: rawB, Hash: hash.NewFromBytes(rawB)}
txB := &TxQueueMeta{raw: rawB, hash: hash.NewFromBytes(rawB)}
require.NoError(t, lq.OfferChecked(txB, &protocol.CheckTxMetadata{Priority: 5}), "offer checked a")
require.Equal(t, 2, lq.size())

Expand All @@ -28,16 +28,16 @@ func TestLocalQueueBasic(t *testing.T) {
// Schedule in original order.
require.EqualValues(t, []*TxQueueMeta{txA, txB}, lq.GetSchedulingSuggestion(50), "get scheduling suggestion")

tx := lq.GetTxByHash(txA.Hash)
tx := lq.GetTxByHash(txA.Hash())
require.EqualValues(t, txA, tx, "get tx by hash a")
hashC := hash.NewFromBytes([]byte("c"))
tx = lq.GetTxByHash(hashC)
require.Nil(t, tx, "get tx by hash c")

lq.HandleTxsUsed([]hash.Hash{hashC})
require.EqualValues(t, map[hash.Hash]int{txA.Hash: 0, txB.Hash: 1}, lq.indexesByHash, "after handle txs used absent")
lq.HandleTxsUsed([]hash.Hash{txA.Hash})
require.EqualValues(t, map[hash.Hash]int{txB.Hash: 0}, lq.indexesByHash, "after handle txs used")
require.EqualValues(t, map[hash.Hash]int{txA.Hash(): 0, txB.Hash(): 1}, lq.indexesByHash, "after handle txs used absent")
lq.HandleTxsUsed([]hash.Hash{txA.Hash()})
require.EqualValues(t, map[hash.Hash]int{txB.Hash(): 0}, lq.indexesByHash, "after handle txs used")

require.EqualValues(t, []*TxQueueMeta{txB}, lq.TakeAll(), "take all")
require.Len(t, lq.GetSchedulingSuggestion(50), 0, "after take all")
Expand Down
26 changes: 3 additions & 23 deletions go/runtime/txpool/main_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package txpool

import (
"fmt"
"time"

"github.com/oasisprotocol/oasis-core/go/common/crypto/hash"
"github.com/oasisprotocol/oasis-core/go/runtime/host/protocol"
Expand Down Expand Up @@ -35,27 +34,7 @@ func newTransaction(tx TxQueueMeta) *MainQueueTransaction {

// String returns a string representation of a transaction.
func (tx *MainQueueTransaction) String() string {
return fmt.Sprintf("MainQueueTransaction{hash: %s, first_seen: %s, priority: %d}", tx.TxQueueMeta.Hash, tx.TxQueueMeta.FirstSeen, tx.priority)
}

// Raw returns the raw transaction data.
func (tx *MainQueueTransaction) Raw() []byte {
return tx.TxQueueMeta.Raw
}

// Size returns the size (in bytes) of the raw transaction data.
func (tx *MainQueueTransaction) Size() int {
return len(tx.TxQueueMeta.Raw)
}

// Hash returns the hash of the transaction binary data.
func (tx *MainQueueTransaction) Hash() hash.Hash {
return tx.TxQueueMeta.Hash
}

// FirstSeen returns the time the transaction was first seen.
func (tx *MainQueueTransaction) FirstSeen() time.Time {
return tx.TxQueueMeta.FirstSeen
return fmt.Sprintf("MainQueueTransaction{hash: %s, first_seen: %s, priority: %d}", tx.Hash(), tx.FirstSeen(), tx.priority)
}

// Priority returns the transaction priority.
Expand Down Expand Up @@ -84,7 +63,8 @@ func (tx *MainQueueTransaction) setChecked(meta *protocol.CheckTxMetadata) {
// If the sender is empty (e.g. because the runtime does not support specifying a sender), we
// treat each transaction as having a unique sender. This is to allow backwards compatibility.
if len(tx.sender) == 0 {
tx.sender = string(tx.TxQueueMeta.Hash[:])
h := tx.Hash()
tx.sender = string(h[:])
}
}

Expand Down
28 changes: 24 additions & 4 deletions go/runtime/txpool/queues.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,33 @@ import (

// TxQueueMeta stores some queuing-related metadata alongside a raw transaction.
type TxQueueMeta struct {
Raw []byte
Hash hash.Hash
// FirstSeen is the timestamp when the transaction was first seen.
raw []byte
hash hash.Hash
// firstSeen is the timestamp when the transaction was first seen.
// We populate this in `submitTx`. Other forms of ingress (namely loading from roothash incoming messages and
// receiving from txSync) leave this in its default value. Transactions from those sources, however, only move
// through a limited area in the tx pool.
FirstSeen time.Time
firstSeen time.Time
}

// Raw returns the raw transaction data.
func (t *TxQueueMeta) Raw() []byte {
return t.raw
}

// Size returns the size (in bytes) of the raw transaction data.
func (t *TxQueueMeta) Size() int {
return len(t.Raw())
}

// Hash returns the hash of the transaction binary data.
func (t *TxQueueMeta) Hash() hash.Hash {
return t.hash
}

// FirstSeen returns the time the transaction was first seen.
func (t *TxQueueMeta) FirstSeen() time.Time {
return t.firstSeen
}

// UsableTransactionSource is a place to retrieve txs that are "good enough." "Good enough" variously means CheckTx'd,
Expand Down
4 changes: 2 additions & 2 deletions go/runtime/txpool/rim_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ func (rq *rimQueue) Load(inMsgs []*message.IncomingMessage) {
for _, msg := range inMsgs {
h := hash.NewFromBytes(msg.Data)
newTxs[h] = &TxQueueMeta{
Raw: msg.Data,
Hash: h,
raw: msg.Data,
hash: h,
}
}
rq.l.Lock()
Expand Down
8 changes: 4 additions & 4 deletions go/runtime/txpool/rim_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,20 @@ func TestRimQueue(t *testing.T) {
rq := newRimQueue()

rawA := []byte("a")
txA := &TxQueueMeta{Raw: rawA, Hash: hash.NewFromBytes(rawA)}
txA := &TxQueueMeta{raw: rawA, hash: hash.NewFromBytes(rawA)}
rq.Load([]*message.IncomingMessage{
{
ID: 1,
Data: rawA,
},
})
require.EqualValues(t, map[hash.Hash]*TxQueueMeta{txA.Hash: txA}, rq.txs, "after load")
require.EqualValues(t, map[hash.Hash]*TxQueueMeta{txA.Hash(): txA}, rq.txs, "after load")
require.Equal(t, 1, rq.size())

require.Nil(t, rq.GetSchedulingSuggestion(50), "get scheduling suggestion")
rq.HandleTxsUsed([]hash.Hash{txA.Hash})
rq.HandleTxsUsed([]hash.Hash{txA.Hash()})

tx := rq.GetTxByHash(txA.Hash)
tx := rq.GetTxByHash(txA.Hash())
require.EqualValues(t, txA, tx, "get tx by hash a")
hashC := hash.NewFromBytes([]byte("c"))
tx = rq.GetTxByHash(hashC)
Expand Down
9 changes: 5 additions & 4 deletions go/runtime/txpool/schedule_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func (tx priorityWrappedTx) Less(other btree.Item) bool {
}
// If transactions have same priority, sort by first seen time (earlier transactions are later
// in the queue as we are iterating over the queue in descending order).
return tx.TxQueueMeta.FirstSeen.After(tx2.TxQueueMeta.FirstSeen)
return tx.FirstSeen().After(tx2.FirstSeen())
}

type scheduleQueue struct {
Expand Down Expand Up @@ -66,15 +66,15 @@ func (sq *scheduleQueue) add(tx *MainQueueTransaction) error {
sq.removeLocked(etx.MainQueueTransaction)
}

sq.all[tx.TxQueueMeta.Hash] = tx
sq.all[tx.Hash()] = tx
sq.bySender[tx.sender] = tx
sq.byPriority.ReplaceOrInsert(priorityWrappedTx{tx})

return nil
}

func (sq *scheduleQueue) removeLocked(tx *MainQueueTransaction) {
delete(sq.all, tx.TxQueueMeta.Hash)
delete(sq.all, tx.Hash())
delete(sq.bySender, tx.sender)
sq.byPriority.Delete(priorityWrappedTx{tx})
}
Expand Down Expand Up @@ -114,7 +114,8 @@ func (sq *scheduleQueue) getPrioritizedBatch(offset *hash.Hash, limit uint32) []
tx := i.(priorityWrappedTx)

// Skip the offset item itself (if specified).
if tx.TxQueueMeta.Hash.Equal(offset) {
h := tx.Hash()
if h.Equal(offset) {
return true
}

Expand Down
8 changes: 4 additions & 4 deletions go/runtime/txpool/schedule_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ import (

func newTestTransaction(data []byte, priority uint64) *MainQueueTransaction {
tx := newTransaction(TxQueueMeta{
Raw: data,
Hash: hash.NewFromBytes(data),
FirstSeen: time.Now(),
raw: data,
hash: hash.NewFromBytes(data),
firstSeen: time.Now(),
})
tx.setChecked(&protocol.CheckTxMetadata{
Priority: priority,
Expand Down Expand Up @@ -210,6 +210,6 @@ func TestScheduleQueueSender(t *testing.T) {
require.NoError(err, "Add")
require.Equal(1, queue.size())

queue.remove([]hash.Hash{tx.TxQueueMeta.Hash})
queue.remove([]hash.Hash{tx.Hash()})
require.Equal(0, queue.size())
}
42 changes: 21 additions & 21 deletions go/runtime/txpool/txpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,13 +250,13 @@ func (t *txPool) SubmitTxNoWait(ctx context.Context, tx []byte, meta *Transactio

func (t *txPool) submitTx(ctx context.Context, rawTx []byte, meta *TransactionMeta, notifyCh chan *protocol.CheckTxResult) error {
tx := &TxQueueMeta{
Raw: rawTx,
Hash: hash.NewFromBytes(rawTx),
FirstSeen: time.Now(),
raw: rawTx,
hash: hash.NewFromBytes(rawTx),
firstSeen: time.Now(),
}
// Skip recently seen transactions.
if _, seen := t.seenCache.Peek(tx.Hash); seen {
t.logger.Debug("ignoring already seen transaction", "tx_hash", tx.Hash)
if _, seen := t.seenCache.Peek(tx.Hash()); seen {
t.logger.Debug("ignoring already seen transaction", "tx_hash", tx.Hash())
return fmt.Errorf("duplicate transaction")
}

Expand All @@ -278,13 +278,13 @@ func (t *txPool) submitTx(ctx context.Context, rawTx []byte, meta *TransactionMe

func (t *txPool) addToCheckQueue(pct *PendingCheckTransaction) error {
t.logger.Debug("queuing transaction for check",
"tx", pct.Raw,
"tx_hash", pct.Hash,
"tx", pct.Raw(),
"tx_hash", pct.Hash(),
"recheck", pct.flags.isRecheck(),
)
if err := t.checkTxQueue.add(pct); err != nil {
t.logger.Warn("unable to queue transaction",
"tx_hash", pct.Hash,
"tx_hash", pct.Hash(),
"err", err,
)
return err
Expand All @@ -309,10 +309,10 @@ func (t *txPool) SubmitProposedBatch(batch [][]byte) {

for _, rawTx := range batch {
tx := &TxQueueMeta{
Raw: rawTx,
Hash: hash.NewFromBytes(rawTx),
raw: rawTx,
hash: hash.NewFromBytes(rawTx),
}
t.proposedTxs[tx.Hash] = tx
t.proposedTxs[tx.Hash()] = tx
}
}

Expand All @@ -331,7 +331,7 @@ func (t *txPool) PromoteProposedBatch(batch []hash.Hash) {
if tx == nil {
continue
}
t.proposedTxs[tx.Hash] = tx
t.proposedTxs[tx.Hash()] = tx
}
}

Expand Down Expand Up @@ -504,7 +504,7 @@ func (t *txPool) checkTxBatch(ctx context.Context, rr host.RichRuntime) {
// Check batch.
rawTxBatch := make([][]byte, 0, len(batch))
for _, pct := range batch {
rawTxBatch = append(rawTxBatch, pct.Raw)
rawTxBatch = append(rawTxBatch, pct.Raw())
}
return rr.CheckTx(checkCtx, bi.RuntimeBlock, bi.ConsensusBlock, bi.Epoch, bi.ActiveDescriptor.Executor.MaxMessages, rawTxBatch)
}()
Expand Down Expand Up @@ -559,8 +559,8 @@ func (t *txPool) checkTxBatch(ctx context.Context, rr host.RichRuntime) {
if !res.IsSuccess() {
rejectedTransactions.With(t.getMetricLabels()).Inc()
t.logger.Debug("check tx failed",
"tx", batch[i].Raw,
"tx_hash", batch[i].Hash,
"tx", batch[i].Raw(),
"tx_hash", batch[i].Hash(),
"result", res,
"recheck", batch[i].flags.isRecheck(),
)
Expand Down Expand Up @@ -604,7 +604,7 @@ func (t *txPool) checkTxBatch(ctx context.Context, rr host.RichRuntime) {
if err = pct.dstQueue.OfferChecked(pct.TxQueueMeta, results[batchIndices[i]].Meta); err != nil {
t.logger.Error("unable to queue transaction for scheduling",
"err", err,
"tx_hash", pct.Hash,
"tx_hash", pct.Hash(),
)

// Change the result into an error and notify submitter.
Expand Down Expand Up @@ -632,7 +632,7 @@ func (t *txPool) checkTxBatch(ctx context.Context, rr host.RichRuntime) {
}
// Put cannot fail as seenCache's LRU capacity is not in bytes and the only case where it
// can error is if the capacity is in bytes and the value size is over capacity.
_ = t.seenCache.Put(pct.Hash, publishTime)
_ = t.seenCache.Put(pct.Hash(), publishTime)
}
}

Expand Down Expand Up @@ -773,7 +773,7 @@ func (t *txPool) republishWorker() {
var republishedCount int
nextPendingRepublish := republishInterval
for _, tx := range txs {
ts, seen := t.seenCache.Peek(tx.Hash)
ts, seen := t.seenCache.Peek(tx.Hash())
if seen {
sinceLast := time.Since(ts.(time.Time))
if sinceLast < republishInterval {
Expand All @@ -784,7 +784,7 @@ func (t *txPool) republishWorker() {
}
}

if err := t.txPublisher.PublishTx(ctx, tx.Raw); err != nil {
if err := t.txPublisher.PublishTx(ctx, tx.Raw()); err != nil {
t.logger.Warn("failed to publish transaction",
"err", err,
"tx", tx,
Expand All @@ -794,7 +794,7 @@ func (t *txPool) republishWorker() {
}

// Update publish timestamp.
_ = t.seenCache.Put(tx.Hash, time.Now())
_ = t.seenCache.Put(tx.Hash(), time.Now())

republishedCount++
if republishedCount > maxRepublishTxs {
Expand Down Expand Up @@ -861,7 +861,7 @@ func (t *txPool) recheck() {
if err != nil {
t.logger.Warn("failed to submit transaction for recheck",
"err", err,
"tx_hash", pct.Hash,
"tx_hash", pct.Hash(),
)
}
}
Expand Down
Loading

0 comments on commit c072a38

Please sign in to comment.