Skip to content

Commit

Permalink
TOSQUASH a round of polishing
Browse files Browse the repository at this point in the history
  • Loading branch information
nfrisby committed Jul 2, 2024
1 parent df2bd10 commit 604c811
Show file tree
Hide file tree
Showing 7 changed files with 109 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ module Ouroboros.Consensus.Mempool (
-- * Mempool API
-- ** Mempool
Mempool (..)
, MempoolCapacity (..)
, worstCaseCapacity
-- ** Transaction adding
, MempoolAddTxResult (..)
, addLocalTxs
Expand Down Expand Up @@ -40,9 +42,10 @@ module Ouroboros.Consensus.Mempool (

import Ouroboros.Consensus.Mempool.API (ForgeLedgerState (..),
Mempool (..), MempoolAddTxResult (..),
MempoolSnapshot (..), TicketNo, TxSizeInBytes,
TxTicket (..), addLocalTxs, addTxs, isMempoolTxAdded,
isMempoolTxRejected, mempoolTxAddedToMaybe, snapshotTxs,
MempoolCapacity (..), MempoolSnapshot (..), TicketNo,
TxSizeInBytes, TxTicket (..), addLocalTxs, addTxs,
isMempoolTxAdded, isMempoolTxRejected,
mempoolTxAddedToMaybe, snapshotTxs, worstCaseCapacity,
zeroTicketNo)
import Ouroboros.Consensus.Mempool.Capacity (MempoolSize (..),
TxOverrides (..), applyOverrides, mkOverrides,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE StandaloneDeriving #-}
{-# LANGUAGE TypeApplications #-}
{-# LANGUAGE UndecidableInstances #-}

-- | Exposes the @'Mempool'@ datatype which captures the public API of the
Expand All @@ -12,6 +14,8 @@
module Ouroboros.Consensus.Mempool.API (
-- * Mempool
Mempool (..)
, MempoolCapacity (..)
, worstCaseCapacity
-- * Transaction adding
, AddTxOnBehalfOf (..)
, MempoolAddTxResult (..)
Expand All @@ -32,7 +36,10 @@ module Ouroboros.Consensus.Mempool.API (
, zeroTicketNo
) where

import Data.DerivingVia (InstantiatedAt (..))
import qualified Data.Measure as Measure
import Data.Semigroup (stimes)
import GHC.Generics (Generic)
import Ouroboros.Consensus.Block (SlotNo)
import Ouroboros.Consensus.Ledger.Abstract
import Ouroboros.Consensus.Ledger.SupportsMempool
Expand Down Expand Up @@ -198,18 +205,64 @@ data Mempool m blk = Mempool {
-- This does not update the state of the mempool.
, getSnapshotFor :: ForgeLedgerState blk -> STM m (MempoolSnapshot blk)

-- | Get the mempool's capacity.
-- | Get the current 'MempoolCapacity'.
--
-- Note that the capacity of the Mempool, unless it is overridden, can
-- dynamically change when the ledger state is updated.
--
-- When the capacity happens to shrink at some point, we /do not/ remove
-- transactions from the Mempool to satisfy this new lower limit.
-- Instead, we treat it the same way as a Mempool which is /at/
-- capacity, i.e., we won't admit new transactions until some have been
-- removed because they have become invalid.
, getCapacity :: STM m (TxMeasure blk, Int)
-- This might change if the mempool is synchronized with the node's
-- latest selection.
, getCapacity :: STM m (MempoolCapacity blk)
}

-- | The capacity of a mempool.
--
-- When the capacity happens to shrink at some point, we /do not/ remove
-- transactions from the mempool to satisfy this new lower limit. Instead, we
-- treat it the same way as a mempool which is /at/ capacity, ie we won't admit
-- new transactions until some have been removed because they have become
-- invalid.
--
-- Cardano governance tends to only change this limit based on ticking across
-- some slot boundary. The mempool cannot know the slot of whatever block these
-- transactions will end up in. And so we cannot know what the actual block
-- capacity will be.
--
-- As long as the block capacity is not changed severely and abruptly, then it
-- is an effective approximation to use the capacity of whatever ledger state
-- the mempool was most recently synchronized against.
data MempoolCapacity blk = MempoolCapacity {
-- | The anticipated limits of the next block to be minted.
mcBlockCapacity :: !(TxMeasure blk)

-- | How many 'mcBlockCapacity'-maximized blocks could be cut from the
-- sequence of txs in a full mempool.
, mcBlockMultiplicity :: !Int
}
deriving Generic

-- | The largest the mempool could be along each of the dimensions.
--
-- EG if the mempool contained only transactions that only had one non-trivial
-- component of their size measures, then that component of the mempool's
-- capacity could be up to 'mcBlockMultiplicity' times that component of
-- 'mcBlockCapacity'.
worstCaseCapacity ::
Measure.Measure (TxMeasure blk)
=> MempoolCapacity blk
-> TxMeasure blk
worstCaseCapacity capacity =
x
where
MempoolCapacity {
mcBlockCapacity = cap
, mcBlockMultiplicity = mult
} = capacity

InstantiatedAt x =
stimes mult $ InstantiatedAt @Measure.Measure cap

instance NoThunks (TxMeasure blk) => NoThunks (MempoolCapacity blk)

deriving instance Eq (TxMeasure blk) => Eq (MempoolCapacity blk)
deriving instance Show (TxMeasure blk) => Show (MempoolCapacity blk)

{-------------------------------------------------------------------------------
Result of adding a transaction to the mempool
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,35 +67,30 @@ import Ouroboros.Consensus.Util.IOLike hiding (newMVar)
data InternalState blk = IS {
-- | Transactions currently in the mempool
--
-- NOTE: the total size of the transactions in 'isTxs' may exceed the
-- current capacity ('isCapacity'). When the capacity computed from the
-- ledger has shrunk, we don't remove transactions from the Mempool to
-- satisfy the new lower limit. We let the transactions get removed in
-- the normal way: by becoming invalid w.r.t. the updated ledger state.
-- We treat a Mempool /over/ capacity in the same way as a Mempool /at/
-- capacity.
isTxs :: !(TxSeq (TxMeasure blk) (Validated (GenTx blk)))
-- See the note on 'MempoolCapacity' about the mempool possibly being
-- over-capacity.
isTxs :: !(TxSeq (TxMeasure blk) (Validated (GenTx blk)))

-- | The cached IDs of transactions currently in the mempool.
--
-- This allows one to more quickly lookup transactions by ID from a
-- 'MempoolSnapshot' (see 'snapshotHasTx').
--
-- This should always be in-sync with the transactions in 'isTxs'.
, isTxIds :: !(Set (GenTxId blk))
, isTxIds :: !(Set (GenTxId blk))

-- | The cached ledger state after applying the transactions in the
-- Mempool against the chain's ledger state. New transactions will be
-- validated against this ledger.
--
-- INVARIANT: 'isLedgerState' is the ledger resulting from applying the
-- transactions in 'isTxs' against the ledger identified 'isTip' as tip.
, isLedgerState :: !(TickedLedgerState blk)
, isLedgerState :: !(TickedLedgerState blk)

-- | The tip of the chain that 'isTxs' was validated against
--
-- This comes from the underlying ledger state ('tickedLedgerState')
, isTip :: !(ChainHash blk)
, isTip :: !(ChainHash blk)

-- | The most recent 'SlotNo' that 'isTxs' was validated against
--
Expand All @@ -104,27 +99,16 @@ data InternalState blk = IS {
-- slot, see 'tickLedgerState') and 'isSlotNo' will be set to @succ s@,
-- which is different from the slot of the original ledger state, which
-- will remain in 'isTip'.
, isSlotNo :: !SlotNo
, isSlotNo :: !SlotNo

-- | The mempool 'TicketNo' counter.
--
-- See 'vrLastTicketNo' for more information.
, isLastTicketNo :: !TicketNo

-- | The mempool will refuse additional transactions when it already
-- contains enough to _fill_ this many (or more) blocks, each of size up
-- to @isCapacity@.
--
-- There might be a transaction in the Mempool triggering a change in the
-- maximum transaction capacity of a block, which would change the
-- Mempool's capacity. We don't want the Mempool's capacity to depend on
-- its contents. Any changes caused by those txs will take effect after
-- applying the block they end up in.
, isMultiplicity :: !Int

-- | The capacity of a block according to the last ledger state the
-- mempool was synchronized with.
, isCapacity :: !(TxMeasure blk)
-- | The capacity of a block according to the ledger state the mempool
-- was most recently synchronized with.
, isCapacity :: !(MempoolCapacity blk)
}
deriving (Generic)

Expand Down Expand Up @@ -159,8 +143,11 @@ initInternalState capacityOverride lastTicketNo cfg slot st = IS {
, isTip = castHash (getTipHash st)
, isSlotNo = slot
, isLastTicketNo = lastTicketNo
, isCapacity = capacityOverride `applyOverrides` blockTxCapacity cfg st
, isMultiplicity = 2
, isCapacity = MempoolCapacity {
mcBlockCapacity =
capacityOverride `applyOverrides` blockTxCapacity cfg st
, mcBlockMultiplicity = 2
}
}

{-------------------------------------------------------------------------------
Expand Down Expand Up @@ -199,7 +186,6 @@ data MempoolEnv m blk = MempoolEnv {

initMempoolEnv :: ( IOLike m
, NoThunks (GenTxId blk)
, NoThunks (TxMeasure blk)
, LedgerSupportsMempool blk
, ValidateEnvelope blk
)
Expand Down Expand Up @@ -260,10 +246,9 @@ data ValidationResult invalidTx blk = ValidationResult {
-- | The slot number of the (imaginary) block the txs will be placed in
, vrSlotNo :: SlotNo

-- | Capacity of the Mempool. Corresponds to 'vrBeforeTip' and
-- 'vrBeforeSlotNo', /not/ 'vrAfter'.
, vrBeforeCapacity :: TxMeasure blk
, vrMultiplicity :: Int
-- | The capacity of the mempool according to the ledger state of
-- 'vrBeforeTip' and 'vrBeforeSlotNo', /not/ the 'vrAfter' ledger state.
, vrBeforeCapacity :: MempoolCapacity blk

-- | The transactions that were found to be valid (oldest to newest)
, vrValid :: TxSeq (TxMeasure blk) (Validated (GenTx blk))
Expand Down Expand Up @@ -382,14 +367,12 @@ internalStateFromVR vr = IS {
, isSlotNo = vrSlotNo
, isLastTicketNo = vrLastTicketNo
, isCapacity = vrBeforeCapacity
, isMultiplicity = vrMultiplicity
}
where
ValidationResult {
vrBeforeTip
, vrSlotNo
, vrBeforeCapacity
, vrMultiplicity
, vrValid
, vrValidTxIds
, vrAfter
Expand All @@ -402,7 +385,6 @@ validationResultFromIS is = ValidationResult {
vrBeforeTip = isTip
, vrSlotNo = isSlotNo
, vrBeforeCapacity = isCapacity
, vrMultiplicity = isMultiplicity
, vrValid = isTxs
, vrValidTxIds = isTxIds
, vrNewValid = Nothing
Expand All @@ -419,7 +401,6 @@ validationResultFromIS is = ValidationResult {
, isSlotNo
, isLastTicketNo
, isCapacity
, isMultiplicity
} = is

-- | Create a Mempool Snapshot from a given Internal State of the mempool.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ module Ouroboros.Consensus.Mempool.Init (
, openMempoolWithoutSyncThread
) where

import Control.Arrow ((&&&))
import Control.Monad (void)
import Control.Tracer
import Ouroboros.Consensus.Block
Expand Down Expand Up @@ -108,7 +107,7 @@ mkMempool mpEnv = Mempool
, syncWithLedger = implSyncWithLedger mpEnv
, getSnapshot = snapshotFromIS <$> readTVar istate
, getSnapshotFor = \fls -> pureGetSnapshotFor cfg fls co <$> readTVar istate
, getCapacity = (isCapacity &&& isMultiplicity) <$> readTVar istate
, getCapacity = isCapacity <$> readTVar istate
}
where MempoolEnv { mpEnvStateVar = istate
, mpEnvAddTxsRemoteFifo = remoteFifo
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ pureTryAddTx ::
-- ^ The current internal state of the mempool.
-> TryAddTx blk
pureTryAddTx cfg wti tx is
| willItFit is (txInBlockSize cfg (isLedgerState is) tx)
| shouldTryToAdd is (txInBlockSize cfg (isLedgerState is) tx)
=
case eVtx of
-- We only extended the ValidationResult with a single transaction
Expand Down Expand Up @@ -208,20 +208,30 @@ pureTryAddTx cfg wti tx is
(eVtx, vr) = extendVRNew cfg wti tx $ validationResultFromIS is
is' = internalStateFromVR vr

willItFit ::
-- | Should the mempool admit this tx?
--
-- There's a simple rule, but one exception to that rule. Let this transaction
-- in if and only if the resulting mempool would not be over capacity. However,
-- if the tx alone is too big to fit into even a block that contained no other
-- txs, then try to add it to the mempool; it will be immediately recognized as
-- invalid.
shouldTryToAdd ::
Measure (TxMeasure blk)
=> InternalState blk -> TxMeasure blk -> Bool
willItFit is tx
shouldTryToAdd is tx
| not (tx Measure.<= cap) = True -- let it be found invalid
| otherwise =
go (isTxs is) (max 0 $ isMultiplicity is - 1)
go (isTxs is) (max 0 $ mult - 1)
where
cap = isCapacity is
MempoolCapacity {
mcBlockCapacity = cap
, mcBlockMultiplicity = mult
} = isCapacity is

go !txseq = \case
0 -> msSize (TxSeq.toMempoolSize txseq) `Measure.plus` tx Measure.<= cap
n -> case txseq of
TxSeq.Empty -> go txseq 0
TxSeq.Empty -> True -- guard above ensures 0 + tx <= cap
_ -> go (snd $ TxSeq.splitAfterTxSize txseq cap) (n - 1)

{-------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,9 @@
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeApplications #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE ViewPatterns #-}

module Ouroboros.Consensus.MiniProtocol.LocalTxMonitor.Server (localTxMonitorServer) where

import Data.DerivingVia (InstantiatedAt (..))
import Data.Measure (Measure)
import Data.Semigroup (stimes)
import Ouroboros.Consensus.Block
import Ouroboros.Consensus.Ledger.SupportsMempool
import Ouroboros.Consensus.Mempool
Expand Down Expand Up @@ -89,10 +84,8 @@ localTxMonitorServer mempool =
&&
snapshotSlotNo a == snapshotSlotNo b

query :: STM m (TxMeasure blk, MempoolSnapshot blk)
query = do
(capacity, mult) <- getCapacity mempool
capacity <- worstCaseCapacity <$> getCapacity mempool
snapshot <- getSnapshot mempool
let InstantiatedAt capacity' =
stimes mult
$ InstantiatedAt @Measure capacity
pure (capacity', snapshot)
pure (capacity, snapshot)
Original file line number Diff line number Diff line change
Expand Up @@ -214,8 +214,8 @@ prop_Mempool_semigroup_removeTxs (TestSetupWithTxsInMempool testSetup txsToRemov
prop_Mempool_getCapacity :: MempoolCapTestSetup -> Property
prop_Mempool_getCapacity mcts =
withTestMempool testSetup $ \TestMempool{mempool} -> do
(actualCapacity, _mult) <- atomically $ getCapacity mempool
pure (actualCapacity === min testCapacity simpleBlockTxCapacity )
actualCapacity <- atomically $ worstCaseCapacity <$> getCapacity mempool
pure (actualCapacity === 2 * min testCapacity simpleBlockTxCapacity )
where
Just testCapacity = testMempoolCapOverride testSetup
MempoolCapTestSetup (TestSetupWithTxs testSetup _txsToAdd) = mcts
Expand Down

0 comments on commit 604c811

Please sign in to comment.