From c15f4573f93d76e9e73b3847c8e564ca37650a81 Mon Sep 17 00:00:00 2001 From: Warren He Date: Fri, 20 May 2022 16:02:21 -0700 Subject: [PATCH] (unfinished) start ripping --- go/runtime/txpool/transaction.go | 3 +- go/runtime/txpool/txpool.go | 152 +++++++++++++++++++------------ 2 files changed, 98 insertions(+), 57 deletions(-) diff --git a/go/runtime/txpool/transaction.go b/go/runtime/txpool/transaction.go index 46fabcc3119..0f521e6cb0f 100644 --- a/go/runtime/txpool/transaction.go +++ b/go/runtime/txpool/transaction.go @@ -122,9 +122,10 @@ func (f txCheckFlags) isDiscard() bool { // PendingCheckTransaction is a transaction pending checks. type PendingCheckTransaction struct { - *Transaction + tx []byte // flags are the transaction check flags. + // todo: replace with queue reference flags txCheckFlags // notifyCh is a channel for sending back the transaction check result. notifyCh chan *protocol.CheckTxResult diff --git a/go/runtime/txpool/txpool.go b/go/runtime/txpool/txpool.go index ed543b58118..96cdba8d2c7 100644 --- a/go/runtime/txpool/txpool.go +++ b/go/runtime/txpool/txpool.go @@ -106,9 +106,6 @@ type TransactionPool interface { // WakeupScheduler explicitly notifies subscribers that they should attempt scheduling. WakeupScheduler() - // Clear clears the transaction pool. - Clear() - // WatchScheduler subscribes to notifications about when to attempt scheduling. The emitted // boolean flag indicates whether the batch flush timeout expired. WatchScheduler() (pubsub.ClosableSubscription, <-chan bool) @@ -177,12 +174,18 @@ type txPool struct { checkTxNotifier *pubsub.Broker recheckTxCh *channels.RingChannel - schedulerQueue *scheduleQueue + usableSources []UsableTransactionSource + recheckableStores []RecheckableTransactionStore + republishableSources []RepublishableTransactionSource + rimQueue *rimQueue + localQueue *localQueue + mainQueue *mainQueue + schedulerTicker *time.Ticker schedulerNotifier *pubsub.Broker proposedTxsLock sync.Mutex - proposedTxs map[hash.Hash]*Transaction + proposedTxs map[hash.Hash][]byte blockInfoLock sync.Mutex blockInfo *BlockInfo @@ -241,9 +244,10 @@ func (t *txPool) submitTx(ctx context.Context, rawTx []byte, meta *TransactionMe // Queue transaction for checks. pct := &PendingCheckTransaction{ - Transaction: tx, - notifyCh: notifyCh, + tx: rawTx, + notifyCh: notifyCh, } + // todo: change flags to destination queue if meta.Local { pct.flags |= txCheckLocal } @@ -255,14 +259,16 @@ func (t *txPool) submitTx(ctx context.Context, rawTx []byte, meta *TransactionMe } func (t *txPool) addToCheckQueue(pct *PendingCheckTransaction) error { + // todo: hash used to be cached + h := hash.NewFromBytes(pct.tx) t.logger.Debug("queuing transaction for check", "tx", pct.tx, - "tx_hash", pct.hash, + "tx_hash", h, "recheck", pct.isRecheck(), ) if err := t.checkTxQueue.add(pct); err != nil { t.logger.Warn("unable to queue transaction", - "tx_hash", pct.hash, + "tx_hash", h, "err", err, ) return err @@ -286,8 +292,7 @@ func (t *txPool) SubmitProposedBatch(batch [][]byte) { defer t.proposedTxsLock.Unlock() for _, rawTx := range batch { - tx := newTransaction(rawTx, txStatusChecked) - t.proposedTxs[tx.hash] = tx + t.proposedTxs[hash.NewFromBytes(rawTx)] = rawTx } } @@ -302,11 +307,11 @@ func (t *txPool) PromoteProposedBatch(batch []hash.Hash) { t.proposedTxsLock.Lock() defer t.proposedTxsLock.Unlock() - for _, tx := range txs { - if tx == nil { + for i, h := range batch { + if _, ok := missingTxs[h]; ok { continue } - t.proposedTxs[tx.hash] = tx + t.proposedTxs[h] = txs[i] } } @@ -314,21 +319,40 @@ func (t *txPool) ClearProposedBatch() { t.proposedTxsLock.Lock() defer t.proposedTxsLock.Unlock() - t.proposedTxs = make(map[hash.Hash]*Transaction) + t.proposedTxs = make(map[hash.Hash][]byte) } -func (t *txPool) RemoveTxBatch(txs []hash.Hash) { - t.schedulerQueue.remove(txs) - - pendingScheduleSize.With(t.getMetricLabels()).Set(float64(t.schedulerQueue.size())) +func (t *txPool) GetSchedulingSuggestion() [][]byte { + var txs [][]byte + for _, q := range t.usableSources { + txs = append(txs, q.GetSchedulingSuggestion()...) + } + return txs } -func (t *txPool) GetPrioritizedBatch(offset *hash.Hash, limit uint32) []*Transaction { - return t.schedulerQueue.getPrioritizedBatch(offset, limit) +func (t *txPool) HandleTxsUsed(hashes []hash.Hash) { + for _, q := range t.usableSources { + q.HandleTxsUsed(hashes) + } + + // todo: metrics + // pendingScheduleSize.With(t.getMetricLabels()).Set(float64(t.schedulerQueue.size())) } -func (t *txPool) GetKnownBatch(batch []hash.Hash) ([]*Transaction, map[hash.Hash]int) { - txs, missingTxs := t.schedulerQueue.getKnownBatch(batch) +func (t *txPool) GetKnownBatch(batch []hash.Hash) ([][]byte, map[hash.Hash]int) { + var txs [][]byte + missingTxs := make(map[hash.Hash]int) +HASH_LOOP: + for i, h := range batch { + for _, q := range t.usableSources { + if tx, ok := q.GetTxByHash(h); ok { + txs = append(txs, tx) + continue HASH_LOOP + } + } + txs = append(txs, nil) + missingTxs[h] = i + } // Also check the proposed transactions set. t.proposedTxsLock.Lock() @@ -388,16 +412,6 @@ func (t *txPool) WakeupScheduler() { t.schedulerNotifier.Broadcast(false) } -func (t *txPool) Clear() { - t.schedulerQueue.clear() - t.checkTxQueue.clear() - - t.seenCache.Clear() - t.ClearProposedBatch() - - pendingScheduleSize.With(t.getMetricLabels()).Set(0) -} - func (t *txPool) WatchScheduler() (pubsub.ClosableSubscription, <-chan bool) { sub := t.schedulerNotifier.Subscribe() ch := make(chan bool) @@ -416,10 +430,6 @@ func (t *txPool) PendingCheckSize() int { return t.checkTxQueue.size() } -func (t *txPool) PendingScheduleSize() int { - return t.schedulerQueue.size() -} - func (t *txPool) getCurrentBlockInfo() (*BlockInfo, error) { t.blockInfoLock.Lock() defer t.blockInfoLock.Unlock() @@ -500,20 +510,24 @@ func (t *txPool) checkTxBatch(ctx context.Context, rr host.RichRuntime) { newTxs := make([]*PendingCheckTransaction, 0, len(results)) batchIndices := make([]int, 0, len(results)) + // todo: trying to remove this. we will unschedule all when starting to recheck, and if it doesn't pass, we won't + // put it back. may need some bookkeeping to notify the sender though var unschedule []hash.Hash for i, res := range results { if !res.IsSuccess() { rejectedTransactions.With(t.getMetricLabels()).Inc() + // todo: hash used to be cached + h := hash.NewFromBytes(batch[i].tx) t.logger.Debug("check tx failed", "tx", batch[i].tx, - "tx_hash", batch[i].hash, + "tx_hash", h, "result", res, "recheck", batch[i].isRecheck(), ) // If this was a recheck, make sure to remove the transaction from the scheduling queue. if batch[i].isRecheck() { - unschedule = append(unschedule, batch[i].hash) + unschedule = append(unschedule, h) } notifySubmitter(i) continue @@ -527,13 +541,14 @@ func (t *txPool) checkTxBatch(ctx context.Context, rr host.RichRuntime) { // For any transactions that are to be queued, we defer notification until queued. acceptedTransactions.With(t.getMetricLabels()).Inc() - batch[i].setChecked(res.Meta) + // todo: need to design a way to get res.Meta to the main queue + // batch[i].setChecked(res.Meta) newTxs = append(newTxs, batch[i]) batchIndices = append(batchIndices, i) } - // Unschedule any transactions that are being rechecked and have failed checks. - t.RemoveTxBatch(unschedule) + // todo: we used to unschedule rejected txs here, but going forward we will (i) not schedule them in the first + // until they are checked and (ii) unschedule them as soon as we start rechecking them. // If there are more transactions to check, make sure we check them next. if t.checkTxQueue.size() > 0 { @@ -549,12 +564,19 @@ func (t *txPool) checkTxBatch(ctx context.Context, rr host.RichRuntime) { ) // Queue checked transactions for scheduling. + // todo: rename tx to pct for i, tx := range newTxs { - // NOTE: Scheduler exists as otherwise there would be no current block info above. - if err := t.schedulerQueue.add(tx.Transaction); err != nil { + // todo: hash used to be cached + h := hash.NewFromBytes(tx.tx) + // todo: get queue reference from metadata + var someRecheckable RecheckableTransactionStore + // todo: is it more efficient to offer a batch at a time? if not, change interface to take one at a time + someRecheckable.OfferChecked([][]byte{tx.tx}) + // todo: notify submitter if it falls off the queue immediately + if false { t.logger.Error("unable to queue transaction for scheduling", "err", err, - "tx_hash", tx.hash, + "tx_hash", h, ) // Change the result into an error and notify submitter. @@ -571,12 +593,13 @@ func (t *txPool) checkTxBatch(ctx context.Context, rr host.RichRuntime) { notifySubmitter(batchIndices[i]) // Publish local transactions immediately. + // todo: move more of this responsibility into RepublishableTransactionSource publishTime := time.Now() if tx.flags.isLocal() { if err := t.txPublisher.PublishTx(ctx, tx.tx); err != nil { t.logger.Warn("failed to publish local transaction", "err", err, - "tx_hash", tx.hash, + "tx_hash", h, ) // Since publication failed, make sure we retry early. @@ -587,14 +610,15 @@ func (t *txPool) checkTxBatch(ctx context.Context, rr host.RichRuntime) { // Put cannot fail as seenCache's LRU capacity is not in bytes and the only case where it // can error is if the capacity is in bytes and the value size is over capacity. - _ = t.seenCache.Put(tx.hash, publishTime) + _ = t.seenCache.Put(h, publishTime) } // Notify subscribers that we have received new transactions. t.checkTxNotifier.Broadcast(newTxs) t.schedulerNotifier.Broadcast(false) - pendingScheduleSize.With(t.getMetricLabels()).Set(float64(t.PendingScheduleSize())) + // todo: metrics + // pendingScheduleSize.With(t.getMetricLabels()).Set(float64(t.PendingScheduleSize())) } func (t *txPool) ensureInitialized() error { @@ -709,14 +733,24 @@ func (t *txPool) republishWorker() { lastRepublish = time.Now() - // Get scheduled transactions. - txs := t.schedulerQueue.getAll() + // Get transactions to republish. + var txs [][]byte + for _, q := range t.republishableSources { + qtxs, next := q.GetTxsToPublish(lastRepublish) + txs = append(txs, qtxs...) + // todo: determine when to run again + _ = next + } // Filter transactions based on whether they can already be republished. var republishedCount int nextPendingRepublish := republishInterval for _, tx := range txs { - ts, seen := t.seenCache.Peek(tx.hash) + // todo: hash used to be cached + // although if this RepublishableTransactionSource conversion goes fine, + // we may be able to get rid of seenCache altogether + h := hash.NewFromBytes(tx) + ts, seen := t.seenCache.Peek(h) if seen { sinceLast := time.Since(ts.(time.Time)) if sinceLast < republishInterval { @@ -727,7 +761,7 @@ func (t *txPool) republishWorker() { } } - if err := t.txPublisher.PublishTx(ctx, tx.tx); err != nil { + if err := t.txPublisher.PublishTx(ctx, tx); err != nil { t.logger.Warn("failed to publish transaction", "err", err, "tx", tx, @@ -737,7 +771,7 @@ func (t *txPool) republishWorker() { } // Update publish timestamp. - _ = t.seenCache.Put(tx.hash, time.Now()) + _ = t.seenCache.Put(h, time.Now()) republishedCount++ if republishedCount > maxRepublishTxs { @@ -769,7 +803,11 @@ func (t *txPool) recheckWorker() { } // Get a batch of scheduled transactions. - txs := t.schedulerQueue.getAll() + var txs [][]byte + for _, q := range t.recheckableStores { + // todo: save what queue they're from + txs = append(txs, q.TakeAll()...) + } if len(txs) == 0 { continue @@ -778,12 +816,13 @@ func (t *txPool) recheckWorker() { // Recheck all transactions in batch. for _, tx := range txs { err := t.addToCheckQueue(&PendingCheckTransaction{ - Transaction: tx, + tx: tx, }) if err != nil { t.logger.Warn("failed to submit transaction for recheck", "err", err, - "tx_hash", tx.hash, + // todo: hash used to be cached + "tx_hash", hash.NewFromBytes(tx), ) } } @@ -824,6 +863,7 @@ func New( // buffer in case the schedule queue is full and is being rechecked. maxCheckTxQueueSize := int((110 * cfg.MaxPoolSize) / 100) + // todo: update return &txPool{ logger: logging.GetLogger("runtime/txpool"), stopCh: make(chan struct{}),