Skip to content

Commit

Permalink
add support for GetMeasures
Browse files Browse the repository at this point in the history
  • Loading branch information
fraser-iohk committed Jul 26, 2024
1 parent e28027d commit d45e534
Show file tree
Hide file tree
Showing 6 changed files with 215 additions and 35 deletions.
2 changes: 2 additions & 0 deletions ouroboros-network-protocols/ouroboros-network-protocols.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ library

build-depends: base >=4.14 && <4.20,
bytestring >=0.10 && <0.13,
containers,
cborg >=0.2.1 && <0.3,
deepseq,

Expand All @@ -105,6 +106,7 @@ library
ouroboros-network-api
^>=0.7.0,
serialise,
text,
typed-protocols ^>=0.1.1,
typed-protocols-cborg
^>=0.1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
{-# LANGUAGE GADTs #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE PolyKinds #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
Expand All @@ -13,11 +14,16 @@ module Ouroboros.Network.Protocol.LocalTxMonitor.Codec
, codecLocalTxMonitorId
) where

import Control.Monad
import Control.Monad.Class.MonadST
import Data.Functor ((<&>))
import Data.Word (Word32)

import Network.TypedProtocol.Codec.CBOR

import Data.ByteString.Lazy (ByteString)
import Data.Map.Strict (Map)
import Data.Map.Strict qualified as Map

import Codec.CBOR.Decoding qualified as CBOR
import Codec.CBOR.Encoding qualified as CBOR
Expand All @@ -44,7 +50,7 @@ codecLocalTxMonitor encodeTxId decodeTxId
mkCodecCborLazyBS encode decode
where
encode ::
forall (pr :: PeerRole) (st :: ptcl) (st' :: ptcl). ()
forall (pr :: PeerRole) (st :: ptcl) (st' :: ptcl). ()
=> PeerHasAgency pr st
-> Message ptcl st st'
-> CBOR.Encoding
Expand All @@ -65,6 +71,8 @@ codecLocalTxMonitor encodeTxId decodeTxId
CBOR.encodeListLen 2 <> CBOR.encodeWord 7 <> encodeTxId txid
MsgGetSizes ->
CBOR.encodeListLen 1 <> CBOR.encodeWord 9
MsgGetMeasures ->
CBOR.encodeListLen 1 <> CBOR.encodeWord 11 -- why increments of 2?

encode (ServerAgency TokAcquiring) = \case
MsgAcquired slot ->
Expand All @@ -89,6 +97,14 @@ codecLocalTxMonitor encodeTxId decodeTxId
<> CBOR.encodeWord32 (sizeInBytes sz)
<> CBOR.encodeWord32 (numberOfTxs sz)

encode (ServerAgency (TokBusy TokGetMeasures)) = \case
MsgReplyGetMeasures measures ->
CBOR.encodeListLen 2
<> CBOR.encodeWord 12
<> CBOR.encodeListLen 2
<> CBOR.encodeWord32 (txCount measures)
<> encodeMeasureMap (measuresMap measures)

decode ::
forall s (pr :: PeerRole) (st :: ptcl). ()
=> PeerHasAgency pr st
Expand All @@ -113,6 +129,8 @@ codecLocalTxMonitor encodeTxId decodeTxId
return (SomeMessage (MsgHasTx txid))
(ClientAgency TokAcquired, 1, 9) ->
return (SomeMessage MsgGetSizes)
(ClientAgency TokAcquired, 1, 11) ->
return (SomeMessage MsgGetMeasures)

(ServerAgency TokAcquiring, 2, 2) -> do
slot <- decodeSlot
Expand All @@ -136,6 +154,13 @@ codecLocalTxMonitor encodeTxId decodeTxId
let sizes = MempoolSizeAndCapacity { capacityInBytes, sizeInBytes, numberOfTxs }
return (SomeMessage (MsgReplyGetSizes sizes))

(ServerAgency (TokBusy TokGetMeasures), 2, 12) -> do
_len <- CBOR.decodeListLen
txCount <- CBOR.decodeWord32
measuresMap <- decodeMeasureMap
let measures = MempoolMeasures { txCount, measuresMap }
pure (SomeMessage (MsgReplyGetMeasures measures))

(ClientAgency TokIdle, _, _) ->
fail (printf "codecLocalTxMonitor (%s) unexpected key (%d, %d)" (show stok) key len)
(ClientAgency TokAcquired, _, _) ->
Expand All @@ -145,6 +170,50 @@ codecLocalTxMonitor encodeTxId decodeTxId
(ServerAgency (TokBusy _), _, _) ->
fail (printf "codecLocalTxMonitor (%s) unexpected key (%d, %d)" (show stok) key len)

encodeMeasureMap :: Map MeasureName (SizeAndCapacity Word32) -> CBOR.Encoding
encodeMeasureMap m =
CBOR.encodeMapLen (fromIntegral (Map.size m)) <>
Map.foldMapWithKey f m
where
f mn sc =
encodeMeasureName mn <> encodeSizeAndCapacity sc

decodeMeasureMap :: CBOR.Decoder s (Map MeasureName (SizeAndCapacity Word32))
decodeMeasureMap = do
len <- CBOR.decodeMapLen
mapContents <- replicateM len $
(,) <$> decodeMeasureName <*> decodeSizeAndCapacity
pure $ Map.fromList mapContents

encodeMeasureName :: MeasureName -> CBOR.Encoding
encodeMeasureName = CBOR.encodeString . \case
TransactionBytes -> "transaction_bytes"
ExUnitsMemory -> "ex_units_memory"
ExUnitsSteps -> "ex_units_steps"
ReferenceScriptsBytes -> "reference_scripts_bytes"
MeasureNameFromFuture (UnknownMeasureName n) -> n

decodeMeasureName :: CBOR.Decoder s MeasureName
decodeMeasureName = CBOR.decodeString <&> \case
"transaction_bytes" -> TransactionBytes
"ex_units_memory" -> ExUnitsMemory
"ex_units_steps" -> ExUnitsSteps
"reference_scripts_bytes" -> ReferenceScriptsBytes
unknownKey -> MeasureNameFromFuture (UnknownMeasureName unknownKey)

encodeSizeAndCapacity :: SizeAndCapacity Word32 -> CBOR.Encoding
encodeSizeAndCapacity sc =
CBOR.encodeListLen 2 <>
CBOR.encodeWord32 (size sc) <>
CBOR.encodeWord32 (capacity sc)

decodeSizeAndCapacity :: CBOR.Decoder s (SizeAndCapacity Word32)
decodeSizeAndCapacity = do
_len <- CBOR.decodeListLen
size <- CBOR.decodeWord32
capacity <- CBOR.decodeWord32
pure SizeAndCapacity { size, capacity }

-- | An identity 'Codec' for the 'LocalTxMonitor' protocol. It does not do
-- any serialisation. It keeps the typed messages, wrapped in 'AnyMessage'.
--
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ data ServerStAcquired txid tx slot m a = ServerStAcquired
{ recvMsgNextTx :: m (ServerStBusy NextTx txid tx slot m a)
, recvMsgHasTx :: txid -> m (ServerStBusy HasTx txid tx slot m a)
, recvMsgGetSizes :: m (ServerStBusy GetSizes txid tx slot m a)
, recvMsgGetMeasures :: m (ServerStBusy GetMeasures txid tx slot m a)
, recvMsgAwaitAcquire :: m (ServerStAcquiring txid tx slot m a)
, recvMsgRelease :: m (ServerStIdle txid tx slot m a)
}
Expand All @@ -94,6 +95,11 @@ data ServerStBusy (kind :: StBusyKind) txid tx slot m a where
-> ServerStAcquired txid tx slot m a
-> ServerStBusy GetSizes txid tx slot m a

SendMsgReplyGetMeasures
:: MempoolMeasures
-> ServerStAcquired txid tx slot m a
-> ServerStBusy GetMeasures txid tx slot m a

-- | Interpret a 'LocalTxMonitorServer' action sequence as a 'Peer' on the
-- client-side of the 'LocalTxMonitor' protocol.
--
Expand Down Expand Up @@ -133,6 +139,7 @@ localTxMonitorServerPeer (LocalTxMonitorServer mServer) =
{ recvMsgNextTx
, recvMsgHasTx
, recvMsgGetSizes
, recvMsgGetMeasures
, recvMsgAwaitAcquire
, recvMsgRelease
} -> Await (ClientAgency TokAcquired) $ \case
Expand All @@ -142,6 +149,8 @@ localTxMonitorServerPeer (LocalTxMonitorServer mServer) =
Effect $ handleHasTx <$> recvMsgHasTx txid
MsgGetSizes ->
Effect $ handleGetSizes <$> recvMsgGetSizes
MsgGetMeasures ->
Effect $ handleGetMeasures <$> recvMsgGetMeasures
MsgAwaitAcquire ->
Effect $ handleStAcquiring <$> recvMsgAwaitAcquire
MsgRelease ->
Expand Down Expand Up @@ -170,3 +179,11 @@ localTxMonitorServerPeer (LocalTxMonitorServer mServer) =
SendMsgReplyGetSizes sizes serverStAcquired ->
Yield (ServerAgency (TokBusy TokGetSizes)) (MsgReplyGetSizes sizes) $
handleStAcquired serverStAcquired

handleGetMeasures ::
ServerStBusy GetMeasures txid tx slot m a
-> Peer (LocalTxMonitor txid tx slot) AsServer (StBusy GetMeasures) m a
handleGetMeasures = \case
SendMsgReplyGetMeasures measures serverStAcquired ->
Yield (ServerAgency (TokBusy TokGetMeasures)) (MsgReplyGetMeasures measures) $
handleStAcquired serverStAcquired
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@
-- @
module Ouroboros.Network.Protocol.LocalTxMonitor.Type where


import Data.Map.Strict (Map)
import Data.Text (Text)
import Data.Word
import GHC.Generics (Generic)

Expand Down Expand Up @@ -109,6 +110,9 @@ data StBusyKind where
-- | The server is busy looking for the current size and max capacity of the
-- mempool
GetSizes :: StBusyKind
-- | The server is busy looking for the current size and max capacity of the
-- mempool
GetMeasures :: StBusyKind

-- | Describes the MemPool sizes and capacity for a given snapshot.
data MempoolSizeAndCapacity = MempoolSizeAndCapacity
Expand All @@ -121,6 +125,30 @@ data MempoolSizeAndCapacity = MempoolSizeAndCapacity
-- ^ The number of transactions in the mempool
} deriving (Generic, Eq, Show, NFData)

data SizeAndCapacity a = SizeAndCapacity
{ size :: !a
, capacity :: !a
} deriving (Generic, Eq, Show, NFData)

instance Functor SizeAndCapacity where
fmap f (SizeAndCapacity s c) = SizeAndCapacity (f s) (f c)

data MeasureName
= TransactionBytes
| ExUnitsMemory
| ExUnitsSteps
| ReferenceScriptsBytes
| MeasureNameFromFuture !UnknownMeasureName
deriving (Generic, Eq, Ord, Show, NFData)

newtype UnknownMeasureName = UnknownMeasureName Text
deriving (Generic, Eq, Ord, Show, NFData)

data MempoolMeasures = MempoolMeasures
{ txCount :: !Word32
, measuresMap :: !(Map MeasureName (SizeAndCapacity Word32))
} deriving (Generic, Eq, Show, NFData)

instance Protocol (LocalTxMonitor txid tx slot) where

-- | The messages in the transaction monitoring protocol.
Expand Down Expand Up @@ -201,6 +229,16 @@ instance Protocol (LocalTxMonitor txid tx slot) where
:: MempoolSizeAndCapacity
-> Message (LocalTxMonitor txid tx slot) (StBusy GetSizes) StAcquired

-- | The client asks the server about the mempool current size and max
-- capacity
--
MsgGetMeasures
:: Message (LocalTxMonitor txid tx slot) StAcquired (StBusy GetMeasures)

MsgReplyGetMeasures
:: MempoolMeasures
-> Message (LocalTxMonitor txid tx slot) (StBusy GetMeasures) StAcquired

-- | Release the acquired snapshot, in order to loop back to the idle state.
--
MsgRelease
Expand Down Expand Up @@ -249,27 +287,31 @@ instance ( NFData txid
, NFData tx
, NFData slot
) => NFData (Message (LocalTxMonitor txid tx slot) from to) where
rnf MsgAcquire = ()
rnf (MsgAcquired slot) = rnf slot
rnf MsgAwaitAcquire = ()
rnf MsgNextTx = ()
rnf (MsgReplyNextTx mbTx) = rnf mbTx
rnf (MsgHasTx txid) = rnf txid
rnf (MsgReplyHasTx b) = rnf b
rnf MsgGetSizes = ()
rnf (MsgReplyGetSizes msc) = rnf msc
rnf MsgRelease = ()
rnf MsgDone = ()
rnf MsgAcquire = ()
rnf (MsgAcquired slot) = rnf slot
rnf MsgAwaitAcquire = ()
rnf MsgNextTx = ()
rnf (MsgReplyNextTx mbTx) = rnf mbTx
rnf (MsgHasTx txid) = rnf txid
rnf (MsgReplyHasTx b) = rnf b
rnf MsgGetSizes = ()
rnf (MsgReplyGetSizes msc) = rnf msc
rnf MsgGetMeasures = ()
rnf (MsgReplyGetMeasures msc) = rnf msc
rnf MsgRelease = ()
rnf MsgDone = ()

data TokBusyKind (k :: StBusyKind) where
TokNextTx :: TokBusyKind NextTx
TokHasTx :: TokBusyKind HasTx
TokGetSizes :: TokBusyKind GetSizes
TokNextTx :: TokBusyKind NextTx
TokHasTx :: TokBusyKind HasTx
TokGetSizes :: TokBusyKind GetSizes
TokGetMeasures :: TokBusyKind GetMeasures

instance NFData (TokBusyKind k) where
rnf TokNextTx = ()
rnf TokHasTx = ()
rnf TokGetSizes = ()
rnf TokNextTx = ()
rnf TokHasTx = ()
rnf TokGetSizes = ()
rnf TokGetMeasures = ()

deriving instance (Show txid, Show tx, Show slot)
=> Show (Message (LocalTxMonitor txid tx slot) from to)
Expand All @@ -281,7 +323,8 @@ instance Show (ClientHasAgency (st :: LocalTxMonitor txid tx slot)) where

instance Show (ServerHasAgency (st :: LocalTxMonitor txid tx slot)) where
show = \case
TokAcquiring -> "TokAcquiring"
TokBusy TokNextTx -> "TokBusy TokNextTx"
TokBusy TokHasTx -> "TokBusy TokHasTx"
TokBusy TokGetSizes -> "TokBusy TokGetSizes"
TokAcquiring -> "TokAcquiring"
TokBusy TokNextTx -> "TokBusy TokNextTx"
TokBusy TokHasTx -> "TokBusy TokHasTx"
TokBusy TokGetSizes -> "TokBusy TokGetSizes"
TokBusy TokGetMeasures -> "TokBusy TokGetMeasures"
Original file line number Diff line number Diff line change
Expand Up @@ -99,4 +99,10 @@ localTxMonitorServer txId (slot, allTxs) =
, numberOfTxs = fromIntegral (length allTxs)
}
in pure $ SendMsgReplyGetSizes sizes (serverStAcquired txs)
, recvMsgGetMeasures =
let measures = MempoolMeasures
{ txCount = fromIntegral (length allTxs)
, measuresMap = mempty
}
in pure $ SendMsgReplyGetMeasures measures (serverStAcquired txs)
}
Loading

0 comments on commit d45e534

Please sign in to comment.