Skip to content

Commit

Permalink
(unfinished) start ripping
Browse files Browse the repository at this point in the history
  • Loading branch information
pro-wh committed May 20, 2022
1 parent d278bc4 commit c15f457
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 57 deletions.
3 changes: 2 additions & 1 deletion go/runtime/txpool/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,10 @@ func (f txCheckFlags) isDiscard() bool {

// PendingCheckTransaction is a transaction pending checks.
type PendingCheckTransaction struct {
*Transaction
tx []byte

// flags are the transaction check flags.
// todo: replace with queue reference
flags txCheckFlags
// notifyCh is a channel for sending back the transaction check result.
notifyCh chan *protocol.CheckTxResult
Expand Down
152 changes: 96 additions & 56 deletions go/runtime/txpool/txpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,6 @@ type TransactionPool interface {
// WakeupScheduler explicitly notifies subscribers that they should attempt scheduling.
WakeupScheduler()

// Clear clears the transaction pool.
Clear()

// WatchScheduler subscribes to notifications about when to attempt scheduling. The emitted
// boolean flag indicates whether the batch flush timeout expired.
WatchScheduler() (pubsub.ClosableSubscription, <-chan bool)
Expand Down Expand Up @@ -177,12 +174,18 @@ type txPool struct {
checkTxNotifier *pubsub.Broker
recheckTxCh *channels.RingChannel

schedulerQueue *scheduleQueue
usableSources []UsableTransactionSource
recheckableStores []RecheckableTransactionStore
republishableSources []RepublishableTransactionSource
rimQueue *rimQueue
localQueue *localQueue
mainQueue *mainQueue

schedulerTicker *time.Ticker
schedulerNotifier *pubsub.Broker

proposedTxsLock sync.Mutex
proposedTxs map[hash.Hash]*Transaction
proposedTxs map[hash.Hash][]byte

blockInfoLock sync.Mutex
blockInfo *BlockInfo
Expand Down Expand Up @@ -241,9 +244,10 @@ func (t *txPool) submitTx(ctx context.Context, rawTx []byte, meta *TransactionMe

// Queue transaction for checks.
pct := &PendingCheckTransaction{
Transaction: tx,
notifyCh: notifyCh,
tx: rawTx,
notifyCh: notifyCh,
}
// todo: change flags to destination queue
if meta.Local {
pct.flags |= txCheckLocal
}
Expand All @@ -255,14 +259,16 @@ func (t *txPool) submitTx(ctx context.Context, rawTx []byte, meta *TransactionMe
}

func (t *txPool) addToCheckQueue(pct *PendingCheckTransaction) error {
// todo: hash used to be cached
h := hash.NewFromBytes(pct.tx)
t.logger.Debug("queuing transaction for check",
"tx", pct.tx,
"tx_hash", pct.hash,
"tx_hash", h,
"recheck", pct.isRecheck(),
)
if err := t.checkTxQueue.add(pct); err != nil {
t.logger.Warn("unable to queue transaction",
"tx_hash", pct.hash,
"tx_hash", h,
"err", err,
)
return err
Expand All @@ -286,8 +292,7 @@ func (t *txPool) SubmitProposedBatch(batch [][]byte) {
defer t.proposedTxsLock.Unlock()

for _, rawTx := range batch {
tx := newTransaction(rawTx, txStatusChecked)
t.proposedTxs[tx.hash] = tx
t.proposedTxs[hash.NewFromBytes(rawTx)] = rawTx
}
}

Expand All @@ -302,33 +307,52 @@ func (t *txPool) PromoteProposedBatch(batch []hash.Hash) {
t.proposedTxsLock.Lock()
defer t.proposedTxsLock.Unlock()

for _, tx := range txs {
if tx == nil {
for i, h := range batch {
if _, ok := missingTxs[h]; ok {
continue
}
t.proposedTxs[tx.hash] = tx
t.proposedTxs[h] = txs[i]
}
}

func (t *txPool) ClearProposedBatch() {
t.proposedTxsLock.Lock()
defer t.proposedTxsLock.Unlock()

t.proposedTxs = make(map[hash.Hash]*Transaction)
t.proposedTxs = make(map[hash.Hash][]byte)
}

func (t *txPool) RemoveTxBatch(txs []hash.Hash) {
t.schedulerQueue.remove(txs)

pendingScheduleSize.With(t.getMetricLabels()).Set(float64(t.schedulerQueue.size()))
func (t *txPool) GetSchedulingSuggestion() [][]byte {
var txs [][]byte
for _, q := range t.usableSources {
txs = append(txs, q.GetSchedulingSuggestion()...)
}
return txs
}

func (t *txPool) GetPrioritizedBatch(offset *hash.Hash, limit uint32) []*Transaction {
return t.schedulerQueue.getPrioritizedBatch(offset, limit)
func (t *txPool) HandleTxsUsed(hashes []hash.Hash) {
for _, q := range t.usableSources {
q.HandleTxsUsed(hashes)
}

// todo: metrics
// pendingScheduleSize.With(t.getMetricLabels()).Set(float64(t.schedulerQueue.size()))
}

func (t *txPool) GetKnownBatch(batch []hash.Hash) ([]*Transaction, map[hash.Hash]int) {
txs, missingTxs := t.schedulerQueue.getKnownBatch(batch)
func (t *txPool) GetKnownBatch(batch []hash.Hash) ([][]byte, map[hash.Hash]int) {
var txs [][]byte
missingTxs := make(map[hash.Hash]int)
HASH_LOOP:
for i, h := range batch {
for _, q := range t.usableSources {
if tx, ok := q.GetTxByHash(h); ok {
txs = append(txs, tx)
continue HASH_LOOP
}
}
txs = append(txs, nil)
missingTxs[h] = i
}

// Also check the proposed transactions set.
t.proposedTxsLock.Lock()
Expand Down Expand Up @@ -388,16 +412,6 @@ func (t *txPool) WakeupScheduler() {
t.schedulerNotifier.Broadcast(false)
}

func (t *txPool) Clear() {
t.schedulerQueue.clear()
t.checkTxQueue.clear()

t.seenCache.Clear()
t.ClearProposedBatch()

pendingScheduleSize.With(t.getMetricLabels()).Set(0)
}

func (t *txPool) WatchScheduler() (pubsub.ClosableSubscription, <-chan bool) {
sub := t.schedulerNotifier.Subscribe()
ch := make(chan bool)
Expand All @@ -416,10 +430,6 @@ func (t *txPool) PendingCheckSize() int {
return t.checkTxQueue.size()
}

func (t *txPool) PendingScheduleSize() int {
return t.schedulerQueue.size()
}

func (t *txPool) getCurrentBlockInfo() (*BlockInfo, error) {
t.blockInfoLock.Lock()
defer t.blockInfoLock.Unlock()
Expand Down Expand Up @@ -500,20 +510,24 @@ func (t *txPool) checkTxBatch(ctx context.Context, rr host.RichRuntime) {

newTxs := make([]*PendingCheckTransaction, 0, len(results))
batchIndices := make([]int, 0, len(results))
// todo: trying to remove this. we will unschedule all when starting to recheck, and if it doesn't pass, we won't
// put it back. may need some bookkeeping to notify the sender though
var unschedule []hash.Hash
for i, res := range results {
if !res.IsSuccess() {
rejectedTransactions.With(t.getMetricLabels()).Inc()
// todo: hash used to be cached
h := hash.NewFromBytes(batch[i].tx)
t.logger.Debug("check tx failed",
"tx", batch[i].tx,
"tx_hash", batch[i].hash,
"tx_hash", h,
"result", res,
"recheck", batch[i].isRecheck(),
)

// If this was a recheck, make sure to remove the transaction from the scheduling queue.
if batch[i].isRecheck() {
unschedule = append(unschedule, batch[i].hash)
unschedule = append(unschedule, h)
}
notifySubmitter(i)
continue
Expand All @@ -527,13 +541,14 @@ func (t *txPool) checkTxBatch(ctx context.Context, rr host.RichRuntime) {
// For any transactions that are to be queued, we defer notification until queued.

acceptedTransactions.With(t.getMetricLabels()).Inc()
batch[i].setChecked(res.Meta)
// todo: need to design a way to get res.Meta to the main queue
// batch[i].setChecked(res.Meta)
newTxs = append(newTxs, batch[i])
batchIndices = append(batchIndices, i)
}

// Unschedule any transactions that are being rechecked and have failed checks.
t.RemoveTxBatch(unschedule)
// todo: we used to unschedule rejected txs here, but going forward we will (i) not schedule them in the first
// until they are checked and (ii) unschedule them as soon as we start rechecking them.

// If there are more transactions to check, make sure we check them next.
if t.checkTxQueue.size() > 0 {
Expand All @@ -549,12 +564,19 @@ func (t *txPool) checkTxBatch(ctx context.Context, rr host.RichRuntime) {
)

// Queue checked transactions for scheduling.
// todo: rename tx to pct
for i, tx := range newTxs {
// NOTE: Scheduler exists as otherwise there would be no current block info above.
if err := t.schedulerQueue.add(tx.Transaction); err != nil {
// todo: hash used to be cached
h := hash.NewFromBytes(tx.tx)
// todo: get queue reference from metadata
var someRecheckable RecheckableTransactionStore
// todo: is it more efficient to offer a batch at a time? if not, change interface to take one at a time
someRecheckable.OfferChecked([][]byte{tx.tx})
// todo: notify submitter if it falls off the queue immediately
if false {
t.logger.Error("unable to queue transaction for scheduling",
"err", err,
"tx_hash", tx.hash,
"tx_hash", h,
)

// Change the result into an error and notify submitter.
Expand All @@ -571,12 +593,13 @@ func (t *txPool) checkTxBatch(ctx context.Context, rr host.RichRuntime) {
notifySubmitter(batchIndices[i])

// Publish local transactions immediately.
// todo: move more of this responsibility into RepublishableTransactionSource
publishTime := time.Now()
if tx.flags.isLocal() {
if err := t.txPublisher.PublishTx(ctx, tx.tx); err != nil {
t.logger.Warn("failed to publish local transaction",
"err", err,
"tx_hash", tx.hash,
"tx_hash", h,
)

// Since publication failed, make sure we retry early.
Expand All @@ -587,14 +610,15 @@ func (t *txPool) checkTxBatch(ctx context.Context, rr host.RichRuntime) {

// Put cannot fail as seenCache's LRU capacity is not in bytes and the only case where it
// can error is if the capacity is in bytes and the value size is over capacity.
_ = t.seenCache.Put(tx.hash, publishTime)
_ = t.seenCache.Put(h, publishTime)
}

// Notify subscribers that we have received new transactions.
t.checkTxNotifier.Broadcast(newTxs)
t.schedulerNotifier.Broadcast(false)

pendingScheduleSize.With(t.getMetricLabels()).Set(float64(t.PendingScheduleSize()))
// todo: metrics
// pendingScheduleSize.With(t.getMetricLabels()).Set(float64(t.PendingScheduleSize()))
}

func (t *txPool) ensureInitialized() error {
Expand Down Expand Up @@ -709,14 +733,24 @@ func (t *txPool) republishWorker() {

lastRepublish = time.Now()

// Get scheduled transactions.
txs := t.schedulerQueue.getAll()
// Get transactions to republish.
var txs [][]byte
for _, q := range t.republishableSources {
qtxs, next := q.GetTxsToPublish(lastRepublish)
txs = append(txs, qtxs...)
// todo: determine when to run again
_ = next
}

// Filter transactions based on whether they can already be republished.
var republishedCount int
nextPendingRepublish := republishInterval
for _, tx := range txs {
ts, seen := t.seenCache.Peek(tx.hash)
// todo: hash used to be cached
// although if this RepublishableTransactionSource conversion goes fine,
// we may be able to get rid of seenCache altogether
h := hash.NewFromBytes(tx)
ts, seen := t.seenCache.Peek(h)
if seen {
sinceLast := time.Since(ts.(time.Time))
if sinceLast < republishInterval {
Expand All @@ -727,7 +761,7 @@ func (t *txPool) republishWorker() {
}
}

if err := t.txPublisher.PublishTx(ctx, tx.tx); err != nil {
if err := t.txPublisher.PublishTx(ctx, tx); err != nil {
t.logger.Warn("failed to publish transaction",
"err", err,
"tx", tx,
Expand All @@ -737,7 +771,7 @@ func (t *txPool) republishWorker() {
}

// Update publish timestamp.
_ = t.seenCache.Put(tx.hash, time.Now())
_ = t.seenCache.Put(h, time.Now())

republishedCount++
if republishedCount > maxRepublishTxs {
Expand Down Expand Up @@ -769,7 +803,11 @@ func (t *txPool) recheckWorker() {
}

// Get a batch of scheduled transactions.
txs := t.schedulerQueue.getAll()
var txs [][]byte
for _, q := range t.recheckableStores {
// todo: save what queue they're from
txs = append(txs, q.TakeAll()...)
}

if len(txs) == 0 {
continue
Expand All @@ -778,12 +816,13 @@ func (t *txPool) recheckWorker() {
// Recheck all transactions in batch.
for _, tx := range txs {
err := t.addToCheckQueue(&PendingCheckTransaction{
Transaction: tx,
tx: tx,
})
if err != nil {
t.logger.Warn("failed to submit transaction for recheck",
"err", err,
"tx_hash", tx.hash,
// todo: hash used to be cached
"tx_hash", hash.NewFromBytes(tx),
)
}
}
Expand Down Expand Up @@ -824,6 +863,7 @@ func New(
// buffer in case the schedule queue is full and is being rechecked.
maxCheckTxQueueSize := int((110 * cfg.MaxPoolSize) / 100)

// todo: update
return &txPool{
logger: logging.GetLogger("runtime/txpool"),
stopCh: make(chan struct{}),
Expand Down

0 comments on commit c15f457

Please sign in to comment.