Skip to content

Commit

Permalink
refactor(mempool): relaxed locking of mempool (#737)
Browse files Browse the repository at this point in the history
* refactor(mempool): relaxed locking of mempool addNewTransaction

* fix: wrong condition

* chore: fix condition again

* chore: self-review
  • Loading branch information
lklimek authored Mar 6, 2024
1 parent b0b8d66 commit 0872595
Showing 1 changed file with 100 additions and 42 deletions.
142 changes: 100 additions & 42 deletions internal/mempool/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,11 +448,29 @@ func (txmp *TxMempool) Update(
// transactions are evicted.
//
// Finally, the new transaction is added and size stats updated.
//
// Note: due to locking appoach we take, it is possible that meanwhile another thread evicted the same items.
// This means we can put put slightly more items into the mempool, but it has significant performance impact
func (txmp *TxMempool) addNewTransaction(wtx *WrappedTx, checkTxRes *abci.ResponseCheckTx) error {
txmp.mtx.Lock()
defer txmp.mtx.Unlock()

var err error
// RLock here.

// When the mempool is full, we we don't need a writable lock. RLocking here should add significant
// performance boost in this case, as threads will not need to wait to obtain writable lock.
//
// A disadvantage is that we need to relock RW when we need to evict peers, what introduces race condition
// when two threads want to evict the same transactions. We choose to manage that race condition to gain some
// performance.
txmp.mtx.RLock()
rlocked := true
defer func() {
if rlocked {
txmp.mtx.RUnlock()
} else {
txmp.mtx.Unlock()
}
}()

if txmp.postCheck != nil {
err = txmp.postCheck(wtx.tx, checkTxRes)
}
Expand Down Expand Up @@ -524,10 +542,43 @@ func (txmp *TxMempool) addNewTransaction(wtx *WrappedTx, checkTxRes *abci.Respon
}
}

haveSpace := victimBytes >= wtx.Size()
if haveSpace {
// Sort lowest priority items first so they will be evicted first. Break
// ties in favor of newer items (to maintain FIFO semantics in a group).
sort.Slice(victims, func(i, j int) bool {
iw := victims[i].Value.(*WrappedTx)
jw := victims[j].Value.(*WrappedTx)
if iw.Priority() == jw.Priority() {
return iw.timestamp.After(jw.timestamp)
}
return iw.Priority() < jw.Priority()
})

txmp.logger.Debug("evicting lower-priority transactions",
"new_tx", tmstrings.LazySprintf("%X", wtx.tx.Hash()),
"new_priority", priority)

// Evict as many of the victims as necessary to make room.
// We need to drop RLock and Lock here, as from now on, we will be modifying the mempool.
// This introduces race condition which we handle inside evict()
if rlocked {
txmp.mtx.RUnlock()
txmp.mtx.Lock()
rlocked = false
}

haveSpace = txmp.evict(wtx.Size(), victims)

if !haveSpace {
txmp.logger.Debug("unexpected mempool eviction failure - possibly concurrent eviction happened")
}
}

// If there are no suitable eviction candidates, or the total size of
// those candidates is not enough to make room for the new transaction,
// drop the new one.
if len(victims) == 0 || victimBytes < wtx.Size() {
if !haveSpace {
txmp.cache.Remove(wtx.tx)
txmp.logger.Error(
"rejected valid incoming transaction; mempool is full",
Expand All @@ -540,49 +591,19 @@ func (txmp *TxMempool) addNewTransaction(wtx *WrappedTx, checkTxRes *abci.Respon
// fmt.Errorf("transaction rejected: mempool is full (%X)", wtx.tx.Hash())
return nil
}

txmp.logger.Debug("evicting lower-priority transactions",
"new_tx", tmstrings.LazySprintf("%X", wtx.tx.Hash()),
"new_priority", priority,
)

// Sort lowest priority items first so they will be evicted first. Break
// ties in favor of newer items (to maintain FIFO semantics in a group).
sort.Slice(victims, func(i, j int) bool {
iw := victims[i].Value.(*WrappedTx)
jw := victims[j].Value.(*WrappedTx)
if iw.Priority() == jw.Priority() {
return iw.timestamp.After(jw.timestamp)
}
return iw.Priority() < jw.Priority()
})

// Evict as many of the victims as necessary to make room.
var evictedBytes int64
for _, vic := range victims {
w := vic.Value.(*WrappedTx)

txmp.logger.Debug(
"evicted valid existing transaction; mempool full",
"old_tx", tmstrings.LazySprintf("%X", w.tx.Hash()),
"old_priority", w.priority,
)
txmp.removeTxByElement(vic)
txmp.cache.Remove(w.tx)
txmp.metrics.EvictedTxs.Add(1)

// We may not need to evict all the eligible transactions. Bail out
// early if we have made enough room.
evictedBytes += w.Size()
if evictedBytes >= wtx.Size() {
break
}
}
}

wtx.SetGasWanted(checkTxRes.GasWanted)
wtx.SetPriority(priority)
wtx.SetSender(sender)

// Ensure we have writable lock
if rlocked {
txmp.mtx.RUnlock()
txmp.mtx.Lock()
rlocked = false
}

txmp.insertTx(wtx)

txmp.metrics.TxSizeBytes.Observe(float64(wtx.Size()))
Expand All @@ -594,10 +615,47 @@ func (txmp *TxMempool) addNewTransaction(wtx *WrappedTx, checkTxRes *abci.Respon
"height", txmp.height,
"num_txs", txmp.Size(),
)

txmp.notifyTxsAvailable()

return nil
}

// Remove victims from the mempool until we fee up to <size> bytes
// Returns true when enough victims were removed.
//
// Caller should hold writable lock
func (txmp *TxMempool) evict(size int64, victims []*clist.CElement) bool {
var evictedBytes int64
for _, vic := range victims {
w := vic.Value.(*WrappedTx)

if vic.Removed() {
// Race condition - some other thread already removed this item
// We handle it by just skipping this tx
continue
}

txmp.logger.Debug(
"evicted valid existing transaction; mempool full",
"old_tx", tmstrings.LazySprintf("%X", w.tx.Hash()),
"old_priority", w.priority,
)
txmp.removeTxByElement(vic)
txmp.cache.Remove(w.tx)
txmp.metrics.EvictedTxs.Add(1)

// We may not need to evict all the eligible transactions. Bail out
// early if we have made enough room.
evictedBytes += w.Size()
if evictedBytes >= size {
return true
}
}

return false
}

func (txmp *TxMempool) insertTx(wtx *WrappedTx) {
elt := txmp.txs.PushBack(wtx)
txmp.txByKey[wtx.tx.Key()] = elt
Expand Down

0 comments on commit 0872595

Please sign in to comment.