diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Mempool/Init.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Mempool/Init.hs index b553faf47a..8f04c2ae3f 100644 --- a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Mempool/Init.hs +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Mempool/Init.hs @@ -104,7 +104,7 @@ mkMempool :: mkMempool mpEnv = Mempool { addTx = implAddTx mpEnv , removeTxs = implRemoveTxs mpEnv - , syncWithLedger = fst <$> implSyncWithLedger mpEnv + , syncWithLedger = implSyncWithLedger mpEnv , getSnapshot = snapshotFromIS <$> readTMVar istate , getSnapshotFor = implGetSnapshotFor mpEnv , getCapacity = isCapacity <$> readTMVar istate diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Mempool/Update.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Mempool/Update.hs index 72c0eafa40..8514e6ef59 100644 --- a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Mempool/Update.hs +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Mempool/Update.hs @@ -10,14 +10,13 @@ module Ouroboros.Consensus.Mempool.Update ( import Cardano.Slotting.Slot import Control.Concurrent.Class.MonadMVar (withMVar) +import Control.Monad (void) import Control.Monad.Except (runExcept) import Control.Tracer import qualified Data.List.NonEmpty as NE import Data.Maybe (fromMaybe) import qualified Data.Measure as Measure import qualified Data.Set as Set -import Ouroboros.Consensus.Block.Abstract (castHash, castPoint, - pointHash) import Ouroboros.Consensus.HeaderValidation import Ouroboros.Consensus.Ledger.Abstract import Ouroboros.Consensus.Ledger.SupportsMempool @@ -27,8 +26,9 @@ import Ouroboros.Consensus.Mempool.Capacity import Ouroboros.Consensus.Mempool.Impl.Common import Ouroboros.Consensus.Mempool.TxSeq (TxTicket (..)) import qualified Ouroboros.Consensus.Mempool.TxSeq as TxSeq -import Ouroboros.Consensus.Util (whenJust) +import Ouroboros.Consensus.Util (whenJust, withTMVarAnd) import Ouroboros.Consensus.Util.IOLike hiding (withMVar) +import Ouroboros.Network.Block {------------------------------------------------------------------------------- Add transactions @@ -158,32 +158,37 @@ doAddTx mpEnv wti tx = doAddTx' s = do traceWith trcr $ TraceMempoolAttemptingAdd tx - is <- atomically $ do - i <- takeTMVar istate + res <- withTMVarAnd istate (\is -> case s of Nothing -> pure () - Just s' -> check $ isMempoolSize i /= s' - pure i - mTbs <- getLedgerTablesAtFor ldgrInterface (isTip is) [tx] - case mTbs of - Just tbs -> do - traceWith trcr $ TraceMempoolLedgerFound (isTip is) - case pureTryAddTx cfg wti tx is tbs of - NotEnoughSpaceLeft -> do - atomically $ putTMVar istate is - doAddTx' (Just $ isMempoolSize is) - Processed outcome@(TransactionProcessingResult is' _ _) -> do - atomically $ putTMVar istate $ fromMaybe is is' - pure outcome - Nothing -> do - traceWith trcr $ TraceMempoolLedgerNotFound (isTip is) - -- We couldn't retrieve the values because the state is no longer on - -- the db. We need to resync. - atomically $ putTMVar istate is - (_, mTrace) <- implSyncWithLedger mpEnv - whenJust mTrace (traceWith trcr) + Just s' -> check $ isMempoolSize is /= s') + $ \is () -> do + mTbs <- getLedgerTablesAtFor ldgrInterface (isTip is) [tx] + case mTbs of + Just tbs -> do + traceWith trcr $ TraceMempoolLedgerFound (isTip is) + case pureTryAddTx cfg wti tx is tbs of + NotEnoughSpaceLeft -> do + pure (Retry (isMempoolSize is), is) + Processed outcome@(TransactionProcessingResult is' _ _) -> do + pure (OK outcome, fromMaybe is is') + Nothing -> do + traceWith trcr $ TraceMempoolLedgerNotFound (isTip is) + -- We couldn't retrieve the values because the state is no longer on + -- the db. We need to resync. + pure (Resync, is) + case res of + Retry s' -> doAddTx' (Just s') + OK outcome -> pure outcome + Resync -> do + void $ implSyncWithLedger mpEnv doAddTx' s +data WithTMVarOutcome retry ok = + Retry retry + | OK ok + | Resync + -- | Craft a 'TriedToAddTx' value containing the resulting state if -- applicable, the tracing event and the result of adding this transaction. See -- the documentation of 'implAddTx' for some more context. @@ -318,10 +323,8 @@ implRemoveTxs :: -> NE.NonEmpty (GenTxId blk) -> m () implRemoveTxs mpEnv toRemove = do - (is, ls) <- atomically $ do - is <- takeTMVar istate - ls <- getCurrentLedgerState ldgrInterface - pure (is, ls) + out <- withTMVarAnd istate (const $ getCurrentLedgerState ldgrInterface) + $ \is ls -> do let toKeep = filter ( (`notElem` Set.fromList (NE.toList toRemove)) . txId @@ -333,9 +336,7 @@ implRemoveTxs mpEnv toRemove = do toKeep' = [ txForgetValidated . TxSeq.txTicketTx $ tx | tx <- toKeep ] mTbs <- getLedgerTablesAtFor ldgrInterface (castPoint (getTip ls)) toKeep' case mTbs of - Nothing -> do - atomically $ putTMVar istate is - implRemoveTxs mpEnv toRemove + Nothing -> pure (Resync, is) Just tbs -> do let (is', t) = pureRemoveTxs capacityOverride @@ -346,8 +347,14 @@ implRemoveTxs mpEnv toRemove = do (isLastTicketNo is) toKeep toRemove - atomically $ putTMVar istate is' traceWith trcr t + pure (OK (), is') + case out of + Resync -> do + void $ implSyncWithLedger mpEnv + implRemoveTxs mpEnv toRemove + OK () -> pure () + Retry _ -> error "Impossible!" where MempoolEnv { mpEnvStateVar = istate , mpEnvLedger = ldgrInterface @@ -399,47 +406,43 @@ implSyncWithLedger :: , HasTxId (GenTx blk) ) => MempoolEnv m blk - -> m (MempoolSnapshot blk, Maybe (TraceEventMempool blk)) + -> m (MempoolSnapshot blk) implSyncWithLedger mpEnv = do traceWith trcr TraceMempoolAttemptingSync - (is, ls) <- atomically $ do - is <- takeTMVar istate - ls <- getCurrentLedgerState ldgrInterface - pure (is, ls) - - let (slot, ls') = tickLedgerState cfg $ ForgeInUnknownSlot ls - - if pointHash (isTip is) == castHash (getTipHash ls) && - isSlotNo is == slot - then do - -- The tip didn't change, put the same state. - atomically $ putTMVar istate is - traceWith trcr $ TraceMempoolSyncNotNeeded (isTip is) (castPoint $ getTip ls) - pure (snapshotFromIS is, Nothing) - else do - -- We need to revalidate - let pt = castPoint (getTip ls) - txs = [ txForgetValidated . TxSeq.txTicketTx $ tx - | tx <- TxSeq.toList $ isTxs is - ] - mTbs <- getLedgerTablesAtFor ldgrInterface pt txs - case mTbs of - Just tbs -> do - let (is', mTrace) = pureSyncWithLedger - capacityOverride - cfg - slot - ls' - tbs - is - atomically $ putTMVar istate is' - whenJust mTrace (traceWith trcr) - traceWith trcr TraceMempoolSyncDone - return (snapshotFromIS is', mTrace) - Nothing -> do - -- If the point is gone, resync - atomically $ putTMVar istate is - implSyncWithLedger mpEnv + res <- withTMVarAnd istate (const $ getCurrentLedgerState ldgrInterface) $ + \is ls -> do + let (slot, ls') = tickLedgerState cfg $ ForgeInUnknownSlot ls + if pointHash (isTip is) == castHash (getTipHash ls) && isSlotNo is == slot + then do + -- The tip didn't change, put the same state. + traceWith trcr $ TraceMempoolSyncNotNeeded (isTip is) (castPoint $ getTip ls) + pure (OK (snapshotFromIS is), is) + else do + -- We need to revalidate + let pt = castPoint (getTip ls) + txs = [ txForgetValidated . TxSeq.txTicketTx $ tx + | tx <- TxSeq.toList $ isTxs is + ] + mTbs <- getLedgerTablesAtFor ldgrInterface pt txs + case mTbs of + Just tbs -> do + let (is', mTrace) = pureSyncWithLedger + capacityOverride + cfg + slot + ls' + tbs + is + whenJust mTrace (traceWith trcr) + traceWith trcr TraceMempoolSyncDone + pure (OK (snapshotFromIS is'), is') + Nothing -> do + -- If the point is gone, resync + pure (Resync, is) + case res of + OK v -> pure v + Resync -> implSyncWithLedger mpEnv + Retry _ -> error "Impossible!" where MempoolEnv { mpEnvStateVar = istate , mpEnvLedger = ldgrInterface diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Util.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Util.hs index 4bee18834f..0b5c88af0c 100644 --- a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Util.hs +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Util.hs @@ -79,6 +79,9 @@ module Ouroboros.Consensus.Util ( , electric , newFuse , withFuse + -- * withTMVar + , withTMVar + , withTMVarAnd ) where import Cardano.Crypto.Hash (Hash, HashAlgorithm, hashFromBytes, @@ -463,3 +466,40 @@ withFuse (Fuse name m) (Electric io) = do newtype FuseBlownException = FuseBlownException Text deriving (Show) deriving anyclass (Exception) + +{------------------------------------------------------------------------------- + withTMVar +-------------------------------------------------------------------------------} + +-- | Apply @f@ with the content of @tv@ as state, restoring the original value when an +-- exception occurs +withTMVar :: + IOLike m + => StrictTMVar m a + -> (a -> m (c, a)) + -> m c +withTMVar tv f = withTMVarAnd tv (const $ pure ()) (\a -> const $ f a) + +-- | Apply @f@ with the content of @tv@ as state, restoring the original value +-- when an exception occurs. Additionally run a @STM@ action when acquiring the +-- value. +withTMVarAnd :: + IOLike m + => StrictTMVar m a + -> (a -> STM m b) -- ^ Additional STM action to run in the same atomically + -- block as the TMVar is acquired + -> (a -> b -> m (c, a)) -- ^ Action + -> m c +withTMVarAnd tv guard f = + bracketOnError + (atomically $ do + i <- takeTMVar tv + g <- guard i + pure (i, g) + ) + (atomically . putTMVar tv . fst) + (\(s, g) -> do + (x, s') <- f s g + atomically $ putTMVar tv s' + return x + )