Skip to content

Commit

Permalink
Define withTMVar and use it in Mempool
Browse files Browse the repository at this point in the history
  • Loading branch information
jasagredo committed Oct 4, 2024
1 parent a27dc3f commit 3f16a57
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ mkMempool ::
mkMempool mpEnv = Mempool
{ addTx = implAddTx mpEnv
, removeTxs = implRemoveTxs mpEnv
, syncWithLedger = fst <$> implSyncWithLedger mpEnv
, syncWithLedger = implSyncWithLedger mpEnv
, getSnapshot = snapshotFromIS <$> readTMVar istate
, getSnapshotFor = implGetSnapshotFor mpEnv
, getCapacity = isCapacity <$> readTMVar istate
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,13 @@ module Ouroboros.Consensus.Mempool.Update (

import Cardano.Slotting.Slot
import Control.Concurrent.Class.MonadMVar (withMVar)
import Control.Monad (void)
import Control.Monad.Except (runExcept)
import Control.Tracer
import qualified Data.List.NonEmpty as NE
import Data.Maybe (fromMaybe)
import qualified Data.Measure as Measure
import qualified Data.Set as Set
import Ouroboros.Consensus.Block.Abstract (castHash, castPoint,
pointHash)
import Ouroboros.Consensus.HeaderValidation
import Ouroboros.Consensus.Ledger.Abstract
import Ouroboros.Consensus.Ledger.SupportsMempool
Expand All @@ -27,8 +26,9 @@ import Ouroboros.Consensus.Mempool.Capacity
import Ouroboros.Consensus.Mempool.Impl.Common
import Ouroboros.Consensus.Mempool.TxSeq (TxTicket (..))
import qualified Ouroboros.Consensus.Mempool.TxSeq as TxSeq
import Ouroboros.Consensus.Util (whenJust)
import Ouroboros.Consensus.Util (whenJust, withTMVarAnd)
import Ouroboros.Consensus.Util.IOLike hiding (withMVar)
import Ouroboros.Network.Block

{-------------------------------------------------------------------------------
Add transactions
Expand Down Expand Up @@ -158,32 +158,37 @@ doAddTx mpEnv wti tx =

doAddTx' s = do
traceWith trcr $ TraceMempoolAttemptingAdd tx
is <- atomically $ do
i <- takeTMVar istate
res <- withTMVarAnd istate (\is ->
case s of
Nothing -> pure ()
Just s' -> check $ isMempoolSize i /= s'
pure i
mTbs <- getLedgerTablesAtFor ldgrInterface (isTip is) [tx]
case mTbs of
Just tbs -> do
traceWith trcr $ TraceMempoolLedgerFound (isTip is)
case pureTryAddTx cfg wti tx is tbs of
NotEnoughSpaceLeft -> do
atomically $ putTMVar istate is
doAddTx' (Just $ isMempoolSize is)
Processed outcome@(TransactionProcessingResult is' _ _) -> do
atomically $ putTMVar istate $ fromMaybe is is'
pure outcome
Nothing -> do
traceWith trcr $ TraceMempoolLedgerNotFound (isTip is)
-- We couldn't retrieve the values because the state is no longer on
-- the db. We need to resync.
atomically $ putTMVar istate is
(_, mTrace) <- implSyncWithLedger mpEnv
whenJust mTrace (traceWith trcr)
Just s' -> check $ isMempoolSize is /= s')
$ \is () -> do
mTbs <- getLedgerTablesAtFor ldgrInterface (isTip is) [tx]
case mTbs of
Just tbs -> do
traceWith trcr $ TraceMempoolLedgerFound (isTip is)
case pureTryAddTx cfg wti tx is tbs of
NotEnoughSpaceLeft -> do
pure (Retry (isMempoolSize is), is)
Processed outcome@(TransactionProcessingResult is' _ _) -> do
pure (OK outcome, fromMaybe is is')
Nothing -> do
traceWith trcr $ TraceMempoolLedgerNotFound (isTip is)
-- We couldn't retrieve the values because the state is no longer on
-- the db. We need to resync.
pure (Resync, is)
case res of
Retry s' -> doAddTx' (Just s')
OK outcome -> pure outcome
Resync -> do
void $ implSyncWithLedger mpEnv
doAddTx' s

data WithTMVarOutcome retry ok =
Retry retry
| OK ok
| Resync

-- | Craft a 'TriedToAddTx' value containing the resulting state if
-- applicable, the tracing event and the result of adding this transaction. See
-- the documentation of 'implAddTx' for some more context.
Expand Down Expand Up @@ -318,10 +323,8 @@ implRemoveTxs ::
-> NE.NonEmpty (GenTxId blk)
-> m ()
implRemoveTxs mpEnv toRemove = do
(is, ls) <- atomically $ do
is <- takeTMVar istate
ls <- getCurrentLedgerState ldgrInterface
pure (is, ls)
out <- withTMVarAnd istate (const $ getCurrentLedgerState ldgrInterface)
$ \is ls -> do
let toKeep = filter
( (`notElem` Set.fromList (NE.toList toRemove))
. txId
Expand All @@ -333,9 +336,7 @@ implRemoveTxs mpEnv toRemove = do
toKeep' = [ txForgetValidated . TxSeq.txTicketTx $ tx | tx <- toKeep ]
mTbs <- getLedgerTablesAtFor ldgrInterface (castPoint (getTip ls)) toKeep'
case mTbs of
Nothing -> do
atomically $ putTMVar istate is
implRemoveTxs mpEnv toRemove
Nothing -> pure (Resync, is)
Just tbs -> do
let (is', t) = pureRemoveTxs
capacityOverride
Expand All @@ -346,8 +347,14 @@ implRemoveTxs mpEnv toRemove = do
(isLastTicketNo is)
toKeep
toRemove
atomically $ putTMVar istate is'
traceWith trcr t
pure (OK (), is')
case out of
Resync -> do
void $ implSyncWithLedger mpEnv
implRemoveTxs mpEnv toRemove
OK () -> pure ()
Retry _ -> error "Impossible!"
where
MempoolEnv { mpEnvStateVar = istate
, mpEnvLedger = ldgrInterface
Expand Down Expand Up @@ -399,47 +406,43 @@ implSyncWithLedger ::
, HasTxId (GenTx blk)
)
=> MempoolEnv m blk
-> m (MempoolSnapshot blk, Maybe (TraceEventMempool blk))
-> m (MempoolSnapshot blk)
implSyncWithLedger mpEnv = do
traceWith trcr TraceMempoolAttemptingSync
(is, ls) <- atomically $ do
is <- takeTMVar istate
ls <- getCurrentLedgerState ldgrInterface
pure (is, ls)

let (slot, ls') = tickLedgerState cfg $ ForgeInUnknownSlot ls

if pointHash (isTip is) == castHash (getTipHash ls) &&
isSlotNo is == slot
then do
-- The tip didn't change, put the same state.
atomically $ putTMVar istate is
traceWith trcr $ TraceMempoolSyncNotNeeded (isTip is) (castPoint $ getTip ls)
pure (snapshotFromIS is, Nothing)
else do
-- We need to revalidate
let pt = castPoint (getTip ls)
txs = [ txForgetValidated . TxSeq.txTicketTx $ tx
| tx <- TxSeq.toList $ isTxs is
]
mTbs <- getLedgerTablesAtFor ldgrInterface pt txs
case mTbs of
Just tbs -> do
let (is', mTrace) = pureSyncWithLedger
capacityOverride
cfg
slot
ls'
tbs
is
atomically $ putTMVar istate is'
whenJust mTrace (traceWith trcr)
traceWith trcr TraceMempoolSyncDone
return (snapshotFromIS is', mTrace)
Nothing -> do
-- If the point is gone, resync
atomically $ putTMVar istate is
implSyncWithLedger mpEnv
res <- withTMVarAnd istate (const $ getCurrentLedgerState ldgrInterface) $
\is ls -> do
let (slot, ls') = tickLedgerState cfg $ ForgeInUnknownSlot ls
if pointHash (isTip is) == castHash (getTipHash ls) && isSlotNo is == slot
then do
-- The tip didn't change, put the same state.
traceWith trcr $ TraceMempoolSyncNotNeeded (isTip is) (castPoint $ getTip ls)
pure (OK (snapshotFromIS is), is)
else do
-- We need to revalidate
let pt = castPoint (getTip ls)
txs = [ txForgetValidated . TxSeq.txTicketTx $ tx
| tx <- TxSeq.toList $ isTxs is
]
mTbs <- getLedgerTablesAtFor ldgrInterface pt txs
case mTbs of
Just tbs -> do
let (is', mTrace) = pureSyncWithLedger
capacityOverride
cfg
slot
ls'
tbs
is
whenJust mTrace (traceWith trcr)
traceWith trcr TraceMempoolSyncDone
pure (OK (snapshotFromIS is'), is')
Nothing -> do
-- If the point is gone, resync
pure (Resync, is)
case res of
OK v -> pure v
Resync -> implSyncWithLedger mpEnv
Retry _ -> error "Impossible!"
where
MempoolEnv { mpEnvStateVar = istate
, mpEnvLedger = ldgrInterface
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ module Ouroboros.Consensus.Util (
, electric
, newFuse
, withFuse
-- * withTMVar
, withTMVar
, withTMVarAnd
) where

import Cardano.Crypto.Hash (Hash, HashAlgorithm, hashFromBytes,
Expand Down Expand Up @@ -463,3 +466,40 @@ withFuse (Fuse name m) (Electric io) = do
newtype FuseBlownException = FuseBlownException Text
deriving (Show)
deriving anyclass (Exception)

{-------------------------------------------------------------------------------
withTMVar
-------------------------------------------------------------------------------}

-- | Apply @f@ with the content of @tv@ as state, restoring the original value when an
-- exception occurs
withTMVar ::
IOLike m
=> StrictTMVar m a
-> (a -> m (c, a))
-> m c
withTMVar tv f = withTMVarAnd tv (const $ pure ()) (\a -> const $ f a)

-- | Apply @f@ with the content of @tv@ as state, restoring the original value
-- when an exception occurs. Additionally run a @STM@ action when acquiring the
-- value.
withTMVarAnd ::
IOLike m
=> StrictTMVar m a
-> (a -> STM m b) -- ^ Additional STM action to run in the same atomically
-- block as the TMVar is acquired
-> (a -> b -> m (c, a)) -- ^ Action
-> m c
withTMVarAnd tv guard f =
bracketOnError
(atomically $ do
i <- takeTMVar tv
g <- guard i
pure (i, g)
)
(atomically . putTMVar tv . fst)
(\(s, g) -> do
(x, s') <- f s g
atomically $ putTMVar tv s'
return x
)

0 comments on commit 3f16a57

Please sign in to comment.