From d45e534151c1196fa4d556a9d264ae6968455caf Mon Sep 17 00:00:00 2001 From: Fraser Murray Date: Fri, 26 Jul 2024 16:14:33 +0100 Subject: [PATCH] add support for GetMeasures --- .../ouroboros-network-protocols.cabal | 2 + .../Network/Protocol/LocalTxMonitor/Codec.hs | 71 ++++++++++++++- .../Network/Protocol/LocalTxMonitor/Server.hs | 17 ++++ .../Network/Protocol/LocalTxMonitor/Type.hs | 87 ++++++++++++++----- .../Protocol/LocalTxMonitor/Examples.hs | 6 ++ .../Network/Protocol/LocalTxMonitor/Test.hs | 67 +++++++++++--- 6 files changed, 215 insertions(+), 35 deletions(-) diff --git a/ouroboros-network-protocols/ouroboros-network-protocols.cabal b/ouroboros-network-protocols/ouroboros-network-protocols.cabal index 8ee30220e99..90b8f52c44b 100644 --- a/ouroboros-network-protocols/ouroboros-network-protocols.cabal +++ b/ouroboros-network-protocols/ouroboros-network-protocols.cabal @@ -96,6 +96,7 @@ library build-depends: base >=4.14 && <4.20, bytestring >=0.10 && <0.13, + containers, cborg >=0.2.1 && <0.3, deepseq, @@ -105,6 +106,7 @@ library ouroboros-network-api ^>=0.7.0, serialise, + text, typed-protocols ^>=0.1.1, typed-protocols-cborg ^>=0.1 diff --git a/ouroboros-network-protocols/src/Ouroboros/Network/Protocol/LocalTxMonitor/Codec.hs b/ouroboros-network-protocols/src/Ouroboros/Network/Protocol/LocalTxMonitor/Codec.hs index 67974580bec..068dd0cbcbe 100644 --- a/ouroboros-network-protocols/src/Ouroboros/Network/Protocol/LocalTxMonitor/Codec.hs +++ b/ouroboros-network-protocols/src/Ouroboros/Network/Protocol/LocalTxMonitor/Codec.hs @@ -2,6 +2,7 @@ {-# LANGUAGE GADTs #-} {-# LANGUAGE LambdaCase #-} {-# LANGUAGE NamedFieldPuns #-} +{-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE PolyKinds #-} {-# LANGUAGE RankNTypes #-} {-# LANGUAGE ScopedTypeVariables #-} @@ -13,11 +14,16 @@ module Ouroboros.Network.Protocol.LocalTxMonitor.Codec , codecLocalTxMonitorId ) where +import Control.Monad import Control.Monad.Class.MonadST +import Data.Functor ((<&>)) +import Data.Word (Word32) import Network.TypedProtocol.Codec.CBOR import Data.ByteString.Lazy (ByteString) +import Data.Map.Strict (Map) +import Data.Map.Strict qualified as Map import Codec.CBOR.Decoding qualified as CBOR import Codec.CBOR.Encoding qualified as CBOR @@ -44,7 +50,7 @@ codecLocalTxMonitor encodeTxId decodeTxId mkCodecCborLazyBS encode decode where encode :: - forall (pr :: PeerRole) (st :: ptcl) (st' :: ptcl). () + forall (pr :: PeerRole) (st :: ptcl) (st' :: ptcl). () => PeerHasAgency pr st -> Message ptcl st st' -> CBOR.Encoding @@ -65,6 +71,8 @@ codecLocalTxMonitor encodeTxId decodeTxId CBOR.encodeListLen 2 <> CBOR.encodeWord 7 <> encodeTxId txid MsgGetSizes -> CBOR.encodeListLen 1 <> CBOR.encodeWord 9 + MsgGetMeasures -> + CBOR.encodeListLen 1 <> CBOR.encodeWord 11 -- why increments of 2? encode (ServerAgency TokAcquiring) = \case MsgAcquired slot -> @@ -89,6 +97,14 @@ codecLocalTxMonitor encodeTxId decodeTxId <> CBOR.encodeWord32 (sizeInBytes sz) <> CBOR.encodeWord32 (numberOfTxs sz) + encode (ServerAgency (TokBusy TokGetMeasures)) = \case + MsgReplyGetMeasures measures -> + CBOR.encodeListLen 2 + <> CBOR.encodeWord 12 + <> CBOR.encodeListLen 2 + <> CBOR.encodeWord32 (txCount measures) + <> encodeMeasureMap (measuresMap measures) + decode :: forall s (pr :: PeerRole) (st :: ptcl). () => PeerHasAgency pr st @@ -113,6 +129,8 @@ codecLocalTxMonitor encodeTxId decodeTxId return (SomeMessage (MsgHasTx txid)) (ClientAgency TokAcquired, 1, 9) -> return (SomeMessage MsgGetSizes) + (ClientAgency TokAcquired, 1, 11) -> + return (SomeMessage MsgGetMeasures) (ServerAgency TokAcquiring, 2, 2) -> do slot <- decodeSlot @@ -136,6 +154,13 @@ codecLocalTxMonitor encodeTxId decodeTxId let sizes = MempoolSizeAndCapacity { capacityInBytes, sizeInBytes, numberOfTxs } return (SomeMessage (MsgReplyGetSizes sizes)) + (ServerAgency (TokBusy TokGetMeasures), 2, 12) -> do + _len <- CBOR.decodeListLen + txCount <- CBOR.decodeWord32 + measuresMap <- decodeMeasureMap + let measures = MempoolMeasures { txCount, measuresMap } + pure (SomeMessage (MsgReplyGetMeasures measures)) + (ClientAgency TokIdle, _, _) -> fail (printf "codecLocalTxMonitor (%s) unexpected key (%d, %d)" (show stok) key len) (ClientAgency TokAcquired, _, _) -> @@ -145,6 +170,50 @@ codecLocalTxMonitor encodeTxId decodeTxId (ServerAgency (TokBusy _), _, _) -> fail (printf "codecLocalTxMonitor (%s) unexpected key (%d, %d)" (show stok) key len) +encodeMeasureMap :: Map MeasureName (SizeAndCapacity Word32) -> CBOR.Encoding +encodeMeasureMap m = + CBOR.encodeMapLen (fromIntegral (Map.size m)) <> + Map.foldMapWithKey f m + where + f mn sc = + encodeMeasureName mn <> encodeSizeAndCapacity sc + +decodeMeasureMap :: CBOR.Decoder s (Map MeasureName (SizeAndCapacity Word32)) +decodeMeasureMap = do + len <- CBOR.decodeMapLen + mapContents <- replicateM len $ + (,) <$> decodeMeasureName <*> decodeSizeAndCapacity + pure $ Map.fromList mapContents + +encodeMeasureName :: MeasureName -> CBOR.Encoding +encodeMeasureName = CBOR.encodeString . \case + TransactionBytes -> "transaction_bytes" + ExUnitsMemory -> "ex_units_memory" + ExUnitsSteps -> "ex_units_steps" + ReferenceScriptsBytes -> "reference_scripts_bytes" + MeasureNameFromFuture (UnknownMeasureName n) -> n + +decodeMeasureName :: CBOR.Decoder s MeasureName +decodeMeasureName = CBOR.decodeString <&> \case + "transaction_bytes" -> TransactionBytes + "ex_units_memory" -> ExUnitsMemory + "ex_units_steps" -> ExUnitsSteps + "reference_scripts_bytes" -> ReferenceScriptsBytes + unknownKey -> MeasureNameFromFuture (UnknownMeasureName unknownKey) + +encodeSizeAndCapacity :: SizeAndCapacity Word32 -> CBOR.Encoding +encodeSizeAndCapacity sc = + CBOR.encodeListLen 2 <> + CBOR.encodeWord32 (size sc) <> + CBOR.encodeWord32 (capacity sc) + +decodeSizeAndCapacity :: CBOR.Decoder s (SizeAndCapacity Word32) +decodeSizeAndCapacity = do + _len <- CBOR.decodeListLen + size <- CBOR.decodeWord32 + capacity <- CBOR.decodeWord32 + pure SizeAndCapacity { size, capacity } + -- | An identity 'Codec' for the 'LocalTxMonitor' protocol. It does not do -- any serialisation. It keeps the typed messages, wrapped in 'AnyMessage'. -- diff --git a/ouroboros-network-protocols/src/Ouroboros/Network/Protocol/LocalTxMonitor/Server.hs b/ouroboros-network-protocols/src/Ouroboros/Network/Protocol/LocalTxMonitor/Server.hs index c8925fc1e79..d4b54cc108c 100644 --- a/ouroboros-network-protocols/src/Ouroboros/Network/Protocol/LocalTxMonitor/Server.hs +++ b/ouroboros-network-protocols/src/Ouroboros/Network/Protocol/LocalTxMonitor/Server.hs @@ -68,6 +68,7 @@ data ServerStAcquired txid tx slot m a = ServerStAcquired { recvMsgNextTx :: m (ServerStBusy NextTx txid tx slot m a) , recvMsgHasTx :: txid -> m (ServerStBusy HasTx txid tx slot m a) , recvMsgGetSizes :: m (ServerStBusy GetSizes txid tx slot m a) + , recvMsgGetMeasures :: m (ServerStBusy GetMeasures txid tx slot m a) , recvMsgAwaitAcquire :: m (ServerStAcquiring txid tx slot m a) , recvMsgRelease :: m (ServerStIdle txid tx slot m a) } @@ -94,6 +95,11 @@ data ServerStBusy (kind :: StBusyKind) txid tx slot m a where -> ServerStAcquired txid tx slot m a -> ServerStBusy GetSizes txid tx slot m a + SendMsgReplyGetMeasures + :: MempoolMeasures + -> ServerStAcquired txid tx slot m a + -> ServerStBusy GetMeasures txid tx slot m a + -- | Interpret a 'LocalTxMonitorServer' action sequence as a 'Peer' on the -- client-side of the 'LocalTxMonitor' protocol. -- @@ -133,6 +139,7 @@ localTxMonitorServerPeer (LocalTxMonitorServer mServer) = { recvMsgNextTx , recvMsgHasTx , recvMsgGetSizes + , recvMsgGetMeasures , recvMsgAwaitAcquire , recvMsgRelease } -> Await (ClientAgency TokAcquired) $ \case @@ -142,6 +149,8 @@ localTxMonitorServerPeer (LocalTxMonitorServer mServer) = Effect $ handleHasTx <$> recvMsgHasTx txid MsgGetSizes -> Effect $ handleGetSizes <$> recvMsgGetSizes + MsgGetMeasures -> + Effect $ handleGetMeasures <$> recvMsgGetMeasures MsgAwaitAcquire -> Effect $ handleStAcquiring <$> recvMsgAwaitAcquire MsgRelease -> @@ -170,3 +179,11 @@ localTxMonitorServerPeer (LocalTxMonitorServer mServer) = SendMsgReplyGetSizes sizes serverStAcquired -> Yield (ServerAgency (TokBusy TokGetSizes)) (MsgReplyGetSizes sizes) $ handleStAcquired serverStAcquired + + handleGetMeasures :: + ServerStBusy GetMeasures txid tx slot m a + -> Peer (LocalTxMonitor txid tx slot) AsServer (StBusy GetMeasures) m a + handleGetMeasures = \case + SendMsgReplyGetMeasures measures serverStAcquired -> + Yield (ServerAgency (TokBusy TokGetMeasures)) (MsgReplyGetMeasures measures) $ + handleStAcquired serverStAcquired diff --git a/ouroboros-network-protocols/src/Ouroboros/Network/Protocol/LocalTxMonitor/Type.hs b/ouroboros-network-protocols/src/Ouroboros/Network/Protocol/LocalTxMonitor/Type.hs index 2ced578153e..3e0e792a3f4 100644 --- a/ouroboros-network-protocols/src/Ouroboros/Network/Protocol/LocalTxMonitor/Type.hs +++ b/ouroboros-network-protocols/src/Ouroboros/Network/Protocol/LocalTxMonitor/Type.hs @@ -45,7 +45,8 @@ -- @ module Ouroboros.Network.Protocol.LocalTxMonitor.Type where - +import Data.Map.Strict (Map) +import Data.Text (Text) import Data.Word import GHC.Generics (Generic) @@ -109,6 +110,9 @@ data StBusyKind where -- | The server is busy looking for the current size and max capacity of the -- mempool GetSizes :: StBusyKind + -- | The server is busy looking for the current size and max capacity of the + -- mempool + GetMeasures :: StBusyKind -- | Describes the MemPool sizes and capacity for a given snapshot. data MempoolSizeAndCapacity = MempoolSizeAndCapacity @@ -121,6 +125,30 @@ data MempoolSizeAndCapacity = MempoolSizeAndCapacity -- ^ The number of transactions in the mempool } deriving (Generic, Eq, Show, NFData) +data SizeAndCapacity a = SizeAndCapacity + { size :: !a + , capacity :: !a + } deriving (Generic, Eq, Show, NFData) + +instance Functor SizeAndCapacity where + fmap f (SizeAndCapacity s c) = SizeAndCapacity (f s) (f c) + +data MeasureName + = TransactionBytes + | ExUnitsMemory + | ExUnitsSteps + | ReferenceScriptsBytes + | MeasureNameFromFuture !UnknownMeasureName + deriving (Generic, Eq, Ord, Show, NFData) + +newtype UnknownMeasureName = UnknownMeasureName Text + deriving (Generic, Eq, Ord, Show, NFData) + +data MempoolMeasures = MempoolMeasures + { txCount :: !Word32 + , measuresMap :: !(Map MeasureName (SizeAndCapacity Word32)) + } deriving (Generic, Eq, Show, NFData) + instance Protocol (LocalTxMonitor txid tx slot) where -- | The messages in the transaction monitoring protocol. @@ -201,6 +229,16 @@ instance Protocol (LocalTxMonitor txid tx slot) where :: MempoolSizeAndCapacity -> Message (LocalTxMonitor txid tx slot) (StBusy GetSizes) StAcquired + -- | The client asks the server about the mempool current size and max + -- capacity + -- + MsgGetMeasures + :: Message (LocalTxMonitor txid tx slot) StAcquired (StBusy GetMeasures) + + MsgReplyGetMeasures + :: MempoolMeasures + -> Message (LocalTxMonitor txid tx slot) (StBusy GetMeasures) StAcquired + -- | Release the acquired snapshot, in order to loop back to the idle state. -- MsgRelease @@ -249,27 +287,31 @@ instance ( NFData txid , NFData tx , NFData slot ) => NFData (Message (LocalTxMonitor txid tx slot) from to) where - rnf MsgAcquire = () - rnf (MsgAcquired slot) = rnf slot - rnf MsgAwaitAcquire = () - rnf MsgNextTx = () - rnf (MsgReplyNextTx mbTx) = rnf mbTx - rnf (MsgHasTx txid) = rnf txid - rnf (MsgReplyHasTx b) = rnf b - rnf MsgGetSizes = () - rnf (MsgReplyGetSizes msc) = rnf msc - rnf MsgRelease = () - rnf MsgDone = () + rnf MsgAcquire = () + rnf (MsgAcquired slot) = rnf slot + rnf MsgAwaitAcquire = () + rnf MsgNextTx = () + rnf (MsgReplyNextTx mbTx) = rnf mbTx + rnf (MsgHasTx txid) = rnf txid + rnf (MsgReplyHasTx b) = rnf b + rnf MsgGetSizes = () + rnf (MsgReplyGetSizes msc) = rnf msc + rnf MsgGetMeasures = () + rnf (MsgReplyGetMeasures msc) = rnf msc + rnf MsgRelease = () + rnf MsgDone = () data TokBusyKind (k :: StBusyKind) where - TokNextTx :: TokBusyKind NextTx - TokHasTx :: TokBusyKind HasTx - TokGetSizes :: TokBusyKind GetSizes + TokNextTx :: TokBusyKind NextTx + TokHasTx :: TokBusyKind HasTx + TokGetSizes :: TokBusyKind GetSizes + TokGetMeasures :: TokBusyKind GetMeasures instance NFData (TokBusyKind k) where - rnf TokNextTx = () - rnf TokHasTx = () - rnf TokGetSizes = () + rnf TokNextTx = () + rnf TokHasTx = () + rnf TokGetSizes = () + rnf TokGetMeasures = () deriving instance (Show txid, Show tx, Show slot) => Show (Message (LocalTxMonitor txid tx slot) from to) @@ -281,7 +323,8 @@ instance Show (ClientHasAgency (st :: LocalTxMonitor txid tx slot)) where instance Show (ServerHasAgency (st :: LocalTxMonitor txid tx slot)) where show = \case - TokAcquiring -> "TokAcquiring" - TokBusy TokNextTx -> "TokBusy TokNextTx" - TokBusy TokHasTx -> "TokBusy TokHasTx" - TokBusy TokGetSizes -> "TokBusy TokGetSizes" + TokAcquiring -> "TokAcquiring" + TokBusy TokNextTx -> "TokBusy TokNextTx" + TokBusy TokHasTx -> "TokBusy TokHasTx" + TokBusy TokGetSizes -> "TokBusy TokGetSizes" + TokBusy TokGetMeasures -> "TokBusy TokGetMeasures" diff --git a/ouroboros-network-protocols/testlib/Ouroboros/Network/Protocol/LocalTxMonitor/Examples.hs b/ouroboros-network-protocols/testlib/Ouroboros/Network/Protocol/LocalTxMonitor/Examples.hs index 7223411a6ed..4ad98d8b383 100644 --- a/ouroboros-network-protocols/testlib/Ouroboros/Network/Protocol/LocalTxMonitor/Examples.hs +++ b/ouroboros-network-protocols/testlib/Ouroboros/Network/Protocol/LocalTxMonitor/Examples.hs @@ -99,4 +99,10 @@ localTxMonitorServer txId (slot, allTxs) = , numberOfTxs = fromIntegral (length allTxs) } in pure $ SendMsgReplyGetSizes sizes (serverStAcquired txs) + , recvMsgGetMeasures = + let measures = MempoolMeasures + { txCount = fromIntegral (length allTxs) + , measuresMap = mempty + } + in pure $ SendMsgReplyGetMeasures measures (serverStAcquired txs) } diff --git a/ouroboros-network-protocols/testlib/Ouroboros/Network/Protocol/LocalTxMonitor/Test.hs b/ouroboros-network-protocols/testlib/Ouroboros/Network/Protocol/LocalTxMonitor/Test.hs index 67c1c4c8927..f21da46c9dc 100644 --- a/ouroboros-network-protocols/testlib/Ouroboros/Network/Protocol/LocalTxMonitor/Test.hs +++ b/ouroboros-network-protocols/testlib/Ouroboros/Network/Protocol/LocalTxMonitor/Test.hs @@ -35,6 +35,7 @@ import Ouroboros.Network.Protocol.LocalTxMonitor.Examples import Ouroboros.Network.Protocol.LocalTxMonitor.Server import Ouroboros.Network.Protocol.LocalTxMonitor.Type +import Data.Text qualified as Text import Test.ChainGenerators () import Test.Ouroboros.Network.Testing.Utils (prop_codec_cborM, prop_codec_valid_cbor_encoding, splits2, splits3) @@ -209,11 +210,51 @@ instance (Arbitrary txid, Arbitrary tx, Arbitrary slot) , AnyMessageAndAgency (ClientAgency TokAcquired) . MsgHasTx <$> arbitrary , AnyMessageAndAgency (ServerAgency (TokBusy TokHasTx)) . MsgReplyHasTx <$> arbitrary , pure $ AnyMessageAndAgency (ClientAgency TokAcquired) MsgGetSizes + , pure $ AnyMessageAndAgency (ClientAgency TokAcquired) MsgGetMeasures , AnyMessageAndAgency (ServerAgency (TokBusy TokGetSizes)) . MsgReplyGetSizes <$> arbitrary + , AnyMessageAndAgency (ServerAgency (TokBusy TokGetMeasures)) . MsgReplyGetMeasures <$> arbitrary , pure $ AnyMessageAndAgency (ClientAgency TokAcquired) MsgRelease , pure $ AnyMessageAndAgency (ClientAgency TokIdle) MsgDone ] +instance Arbitrary MempoolMeasures where + arbitrary = + MempoolMeasures + <$> arbitrary + <*> arbitrary + +instance Arbitrary MeasureName where + arbitrary = frequency + [ (5, pure TransactionBytes) + , (5, pure ExUnitsMemory) + , (5, pure ExUnitsSteps) + , (5, pure ReferenceScriptsBytes) + , (1, MeasureNameFromFuture <$> genUnknownMeasureName) + ] + where + knownMeasureNames = + [ "transaction_bytes" + , "reference_scripts" + , "ex_units_memory" + , "ex_units_steps" + ] + + -- We need to generate a measure name that is currently unknown (because + -- we support forward-compatibility in the protocol), because accidentally + -- generating a known measure name with `arbitrary` will cause the + -- deserialization to fail to roundtrip + genUnknownMeasureName = do + name <- arbitrary + if name `elem` knownMeasureNames + then discard + else pure $ UnknownMeasureName $ Text.pack name + +instance Arbitrary a => Arbitrary (SizeAndCapacity a) where + arbitrary = + SizeAndCapacity + <$> arbitrary + <*> arbitrary + instance Arbitrary MempoolSizeAndCapacity where arbitrary = MempoolSizeAndCapacity @@ -224,15 +265,17 @@ instance Arbitrary MempoolSizeAndCapacity where instance (Eq txid, Eq tx, Eq slot) => Eq (AnyMessage (LocalTxMonitor txid tx slot)) where - AnyMessage MsgAcquire == AnyMessage MsgAcquire = True - AnyMessage (MsgAcquired a) == AnyMessage (MsgAcquired b) = a == b - AnyMessage MsgAwaitAcquire == AnyMessage MsgAwaitAcquire = True - AnyMessage MsgNextTx == AnyMessage MsgNextTx = True - AnyMessage (MsgReplyNextTx a) == AnyMessage (MsgReplyNextTx b) = a == b - AnyMessage (MsgHasTx a) == AnyMessage (MsgHasTx b) = a == b - AnyMessage (MsgReplyHasTx a) == AnyMessage (MsgReplyHasTx b) = a == b - AnyMessage MsgGetSizes == AnyMessage MsgGetSizes = True - AnyMessage (MsgReplyGetSizes a) == AnyMessage (MsgReplyGetSizes b) = a == b - AnyMessage MsgRelease == AnyMessage MsgRelease = True - AnyMessage MsgDone == AnyMessage MsgDone = True - AnyMessage _ == AnyMessage _ = False + AnyMessage MsgAcquire == AnyMessage MsgAcquire = True + AnyMessage (MsgAcquired a) == AnyMessage (MsgAcquired b) = a == b + AnyMessage MsgAwaitAcquire == AnyMessage MsgAwaitAcquire = True + AnyMessage MsgNextTx == AnyMessage MsgNextTx = True + AnyMessage (MsgReplyNextTx a) == AnyMessage (MsgReplyNextTx b) = a == b + AnyMessage (MsgHasTx a) == AnyMessage (MsgHasTx b) = a == b + AnyMessage (MsgReplyHasTx a) == AnyMessage (MsgReplyHasTx b) = a == b + AnyMessage MsgGetSizes == AnyMessage MsgGetSizes = True + AnyMessage (MsgReplyGetSizes a) == AnyMessage (MsgReplyGetSizes b) = a == b + AnyMessage MsgGetMeasures == AnyMessage MsgGetMeasures = True + AnyMessage (MsgReplyGetMeasures a) == AnyMessage (MsgReplyGetMeasures b) = a == b + AnyMessage MsgRelease == AnyMessage MsgRelease = True + AnyMessage MsgDone == AnyMessage MsgDone = True + AnyMessage _ == AnyMessage _ = False