Skip to content

Commit

Permalink
(unfinished) notes from today
Browse files Browse the repository at this point in the history
  • Loading branch information
pro-wh committed May 26, 2022
1 parent f9f688f commit a15877d
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 68 deletions.
52 changes: 9 additions & 43 deletions go/runtime/txpool/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,12 @@ import (
"github.com/oasisprotocol/oasis-core/go/runtime/host/protocol"
)

type txStatus uint8

const (
txStatusPendingCheck = iota
txStatusChecked
)

// todo: move to main_queue.go
// Transaction is a transaction in the transaction pool.
type Transaction struct {
// tx represents the raw binary transaction data.
tx []byte

// status is the transaction status.
status txStatus
// time is the timestamp when the transaction was first seen.
time time.Time
// hash is the cached transaction hash.
Expand All @@ -36,12 +28,11 @@ type Transaction struct {
senderSeq uint64
}

func newTransaction(tx []byte, status txStatus) *Transaction {
func newTransaction(tx []byte) *Transaction {
return &Transaction{
tx: tx,
status: status,
time: time.Now(),
hash: hash.NewFromBytes(tx),
tx: tx,
time: time.Now(),
hash: hash.NewFromBytes(tx),
}
}

Expand Down Expand Up @@ -85,10 +76,9 @@ func (tx *Transaction) SenderSeq() uint64 {
return tx.senderSeq
}

// todo: move this be part of OfferChecked
// setChecked populates transaction data retrieved from checks.
func (tx *Transaction) setChecked(meta *protocol.CheckTxMetadata) {
tx.status = txStatusChecked

if meta != nil {
tx.priority = meta.Priority
tx.sender = string(meta.Sender)
Expand All @@ -102,37 +92,13 @@ func (tx *Transaction) setChecked(meta *protocol.CheckTxMetadata) {
}
}

// txCheckFlags are the flags describing how transaction should be checked.
type txCheckFlags uint8

const (
// txCheckLocal is a flag indicating that the transaction was obtained from a local client.
txCheckLocal = (1 << 0)
// txCheckDiscard is a flag indicating that the transaction should be discarded after checks.
txCheckDiscard = (1 << 1)
)

func (f txCheckFlags) isLocal() bool {
return (f & txCheckLocal) != 0
}

func (f txCheckFlags) isDiscard() bool {
return (f & txCheckDiscard) != 0
}

// todo: move to check_queue.go
// PendingCheckTransaction is a transaction pending checks.
type PendingCheckTransaction struct {
tx []byte

// flags are the transaction check flags.
// todo: replace with queue reference
flags txCheckFlags
// dstQueue is where to offer the tx after checking, or nil to discard
dstQueue RecheckableTransactionStore
// notifyCh is a channel for sending back the transaction check result.
notifyCh chan *protocol.CheckTxResult
}

func (pct *PendingCheckTransaction) isRecheck() bool {
// If transaction has already been checked then the fact that it is wrapped in a pending check
// transaction again means that this is a re-check.
return pct.status == txStatusChecked
}
46 changes: 21 additions & 25 deletions go/runtime/txpool/txpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,11 +234,16 @@ func (t *txPool) SubmitTxNoWait(ctx context.Context, tx []byte, meta *Transactio
}

func (t *txPool) submitTx(ctx context.Context, rawTx []byte, meta *TransactionMeta, notifyCh chan *protocol.CheckTxResult) error {
tx := newTransaction(rawTx, txStatusPendingCheck)
// todo: in the new design, we'll be submitting to the check queue first, which won't have "first seen time"
// metadata. seems like it could still work out, as checking is single threaded, so we can still preserve the
// original order when we finish checking and offer to the main queue

// todo: can we go without this?
// Skip recently seen transactions.
if _, seen := t.seenCache.Peek(tx.hash); seen {
t.logger.Debug("ignoring already seen transaction", "tx_hash", tx.hash)
// todo: hash used to be cached
h := hash.NewFromBytes(rawTx)
if _, seen := t.seenCache.Peek(h); seen {
t.logger.Debug("ignoring already seen transaction", "tx_hash", h)
return fmt.Errorf("duplicate transaction")
}

Expand All @@ -247,12 +252,12 @@ func (t *txPool) submitTx(ctx context.Context, rawTx []byte, meta *TransactionMe
tx: rawTx,
notifyCh: notifyCh,
}
// todo: change flags to destination queue
if meta.Local {
pct.flags |= txCheckLocal
}
if meta.Discard {
pct.flags |= txCheckDiscard
pct.dstQueue = nil
} else if meta.Local {
pct.dstQueue = t.localQueue
} else {
pct.dstQueue = t.mainQueue
}

return t.addToCheckQueue(pct)
Expand All @@ -264,7 +269,7 @@ func (t *txPool) addToCheckQueue(pct *PendingCheckTransaction) error {
t.logger.Debug("queuing transaction for check",
"tx", pct.tx,
"tx_hash", h,
"recheck", pct.isRecheck(),
"recheck", "dunno", // todo: can we live without this?
)
if err := t.checkTxQueue.add(pct); err != nil {
t.logger.Warn("unable to queue transaction",
Expand Down Expand Up @@ -510,10 +515,10 @@ func (t *txPool) checkTxBatch(ctx context.Context, rr host.RichRuntime) {

newTxs := make([]*PendingCheckTransaction, 0, len(results))
batchIndices := make([]int, 0, len(results))
// todo: trying to remove this. we will unschedule all when starting to recheck, and if it doesn't pass, we won't
// put it back. may need some bookkeeping to notify the sender though
var unschedule []hash.Hash
for i, res := range results {
// todo: we used to do this after failure metrics, but now it's here first
notifySubmitter(i)

if !res.IsSuccess() {
rejectedTransactions.With(t.getMetricLabels()).Inc()
// todo: hash used to be cached
Expand All @@ -522,33 +527,24 @@ func (t *txPool) checkTxBatch(ctx context.Context, rr host.RichRuntime) {
"tx", batch[i].tx,
"tx_hash", h,
"result", res,
"recheck", batch[i].isRecheck(),
"recheck", "dunno", // todo: can we live without this?
)

// If this was a recheck, make sure to remove the transaction from the scheduling queue.
if batch[i].isRecheck() {
unschedule = append(unschedule, h)
}
notifySubmitter(i)
continue
}

if batch[i].flags.isDiscard() || batch[i].isRecheck() {
notifySubmitter(i)
// We won't be sending this tx on to its destination queue.
continue
}

// For any transactions that are to be queued, we defer notification until queued.

acceptedTransactions.With(t.getMetricLabels()).Inc()
// todo: need to design a way to get res.Meta to the main queue
// todo: need to design a way to get res.Meta to the main queue. probably extra parameter to OfferChecked
// batch[i].setChecked(res.Meta)
newTxs = append(newTxs, batch[i])
batchIndices = append(batchIndices, i)
}

// todo: we used to unschedule rejected txs here, but going forward we will (i) not schedule them in the first
// until they are checked and (ii) unschedule them as soon as we start rechecking them.
// place until they are checked and (ii) unschedule them as soon as we start rechecking them.

// If there are more transactions to check, make sure we check them next.
if t.checkTxQueue.size() > 0 {
Expand Down

0 comments on commit a15877d

Please sign in to comment.