Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

LocalTxMonitor: Add support for GetMeasures #4918

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions ouroboros-network-protocols/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Revision history for ouroboros-network-protocols

### Breaking changes

* Added new `GetMeasures` message to `LocalTxMonitor`

## 0.12.0.0 -- 2024-10-17

### Breaking changes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ library
base >=4.14 && <4.21,
bytestring >=0.10 && <0.13,
cborg >=0.2.1 && <0.3,
containers,
deepseq,
io-classes ^>=1.5.0,
nothunks,
Expand All @@ -108,6 +109,7 @@ library
serialise,
si-timers,
singletons,
text,
typed-protocols ^>=0.3,
typed-protocols-cborg ^>=0.3,
typed-protocols-stateful,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,15 @@ module Ouroboros.Network.Protocol.LocalTxMonitor.Codec
, codecLocalTxMonitorId
) where

import Control.Monad
import Control.Monad.Class.MonadST

import Network.TypedProtocol.Codec.CBOR
import Network.TypedProtocol.Core

import Data.ByteString.Lazy (ByteString)
import Data.Map.Strict (Map)
import Data.Map.Strict qualified as Map

import Codec.CBOR.Decoding qualified as CBOR
import Codec.CBOR.Encoding qualified as CBOR
Expand Down Expand Up @@ -64,6 +67,8 @@ codecLocalTxMonitor encodeTxId decodeTxId
CBOR.encodeListLen 2 <> CBOR.encodeWord 7 <> encodeTxId txid
MsgGetSizes ->
CBOR.encodeListLen 1 <> CBOR.encodeWord 9
MsgGetMeasures ->
CBOR.encodeListLen 1 <> CBOR.encodeWord 11
MsgAcquired slot ->
CBOR.encodeListLen 2 <> CBOR.encodeWord 2 <> encodeSlot slot
MsgReplyNextTx Nothing ->
Expand All @@ -79,6 +84,12 @@ codecLocalTxMonitor encodeTxId decodeTxId
<> CBOR.encodeWord32 (capacityInBytes sz)
<> CBOR.encodeWord32 (sizeInBytes sz)
<> CBOR.encodeWord32 (numberOfTxs sz)
MsgReplyGetMeasures measures ->
CBOR.encodeListLen 2
<> CBOR.encodeWord 12
<> CBOR.encodeListLen 2
<> CBOR.encodeWord32 (txCount measures)
<> encodeMeasureMap (measuresMap measures)
Comment on lines +87 to +92
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not?

Suggested change
MsgReplyGetMeasures measures ->
CBOR.encodeListLen 2
<> CBOR.encodeWord 12
<> CBOR.encodeListLen 2
<> CBOR.encodeWord32 (txCount measures)
<> encodeMeasureMap (measuresMap measures)
MsgReplyGetMeasures measures ->
CBOR.encodeListLen 3
<> CBOR.encodeWord 12
<> CBOR.encodeWord32 (txCount measures)
<> encodeMeasureMap (measuresMap measures)


decode ::
forall s (st :: ptcl).
Expand All @@ -105,6 +116,8 @@ codecLocalTxMonitor encodeTxId decodeTxId
return (SomeMessage (MsgHasTx txid))
(SingAcquired, 1, 9) ->
return (SomeMessage MsgGetSizes)
(SingAcquired, 1, 11) ->
return (SomeMessage MsgGetMeasures)

(SingAcquiring, 2, 2) -> do
slot <- decodeSlot
Expand All @@ -128,12 +141,53 @@ codecLocalTxMonitor encodeTxId decodeTxId
let sizes = MempoolSizeAndCapacity { capacityInBytes, sizeInBytes, numberOfTxs }
return (SomeMessage (MsgReplyGetSizes sizes))

(SingBusy SingGetMeasures, 2, 12) -> do
_len <- CBOR.decodeListLen
txCount <- CBOR.decodeWord32
measuresMap <- decodeMeasureMap
let measures = MempoolMeasures { txCount, measuresMap }
pure (SomeMessage (MsgReplyGetMeasures measures))

(SingDone, _, _) -> notActiveState stok

(_, _, _) ->
fail (printf "codecLocalTxMonitor (%s, %s) unexpected key (%d, %d)"
(show (activeAgency :: ActiveAgency st)) (show stok) key len)

encodeMeasureMap :: Map MeasureName (SizeAndCapacity Integer) -> CBOR.Encoding
encodeMeasureMap m =
CBOR.encodeMapLen (fromIntegral (Map.size m)) <>
Map.foldMapWithKey f m
where
f mn sc =
encodeMeasureName mn <> encodeSizeAndCapacity sc
Comment on lines +162 to +163
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't it be ⬇️ for it to produce valid CBOR?

Suggested change
f mn sc =
encodeMeasureName mn <> encodeSizeAndCapacity sc
f mn sc =
encodeListLen 2
<> encodeMeasureName mn
<> encodeSizeAndCapacity sc


decodeMeasureMap :: CBOR.Decoder s (Map MeasureName (SizeAndCapacity Integer))
decodeMeasureMap = do
len <- CBOR.decodeMapLen
mapContents <- replicateM len $
(,) <$> decodeMeasureName <*> decodeSizeAndCapacity
pure $ Map.fromList mapContents

encodeMeasureName :: MeasureName -> CBOR.Encoding
encodeMeasureName (MeasureName t) = CBOR.encodeString t

decodeMeasureName :: CBOR.Decoder s MeasureName
decodeMeasureName = MeasureName <$> CBOR.decodeString

encodeSizeAndCapacity :: SizeAndCapacity Integer -> CBOR.Encoding
encodeSizeAndCapacity sc =
CBOR.encodeListLen 2 <>
CBOR.encodeInteger (size sc) <>
CBOR.encodeInteger (capacity sc)

decodeSizeAndCapacity :: CBOR.Decoder s (SizeAndCapacity Integer)
decodeSizeAndCapacity = do
_len <- CBOR.decodeListLen
size <- CBOR.decodeInteger
capacity <- CBOR.decodeInteger
pure SizeAndCapacity { size, capacity }

-- | An identity 'Codec' for the 'LocalTxMonitor' protocol. It does not do
-- any serialisation. It keeps the typed messages, wrapped in 'AnyMessage'.
--
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ data ServerStAcquired txid tx slot m a = ServerStAcquired
{ recvMsgNextTx :: m (ServerStBusy NextTx txid tx slot m a)
, recvMsgHasTx :: txid -> m (ServerStBusy HasTx txid tx slot m a)
, recvMsgGetSizes :: m (ServerStBusy GetSizes txid tx slot m a)
, recvMsgGetMeasures :: m (ServerStBusy GetMeasures txid tx slot m a)
, recvMsgAwaitAcquire :: m (ServerStAcquiring txid tx slot m a)
, recvMsgRelease :: m (ServerStIdle txid tx slot m a)
}
Expand All @@ -95,6 +96,11 @@ data ServerStBusy (kind :: StBusyKind) txid tx slot m a where
-> ServerStAcquired txid tx slot m a
-> ServerStBusy GetSizes txid tx slot m a

SendMsgReplyGetMeasures
:: MempoolMeasures
-> ServerStAcquired txid tx slot m a
-> ServerStBusy GetMeasures txid tx slot m a

-- | Interpret a 'LocalTxMonitorServer' action sequence as a 'Peer' on the
-- client-side of the 'LocalTxMonitor' protocol.
--
Expand Down Expand Up @@ -134,6 +140,7 @@ localTxMonitorServerPeer (LocalTxMonitorServer mServer) =
{ recvMsgNextTx
, recvMsgHasTx
, recvMsgGetSizes
, recvMsgGetMeasures
, recvMsgAwaitAcquire
, recvMsgRelease
} -> Await $ \case
Expand All @@ -143,6 +150,8 @@ localTxMonitorServerPeer (LocalTxMonitorServer mServer) =
Effect $ handleHasTx <$> recvMsgHasTx txid
MsgGetSizes ->
Effect $ handleGetSizes <$> recvMsgGetSizes
MsgGetMeasures ->
Effect $ handleGetMeasures <$> recvMsgGetMeasures
MsgAwaitAcquire ->
Effect $ handleStAcquiring <$> recvMsgAwaitAcquire
MsgRelease ->
Expand Down Expand Up @@ -171,3 +180,11 @@ localTxMonitorServerPeer (LocalTxMonitorServer mServer) =
SendMsgReplyGetSizes sizes serverStAcquired ->
Yield (MsgReplyGetSizes sizes) $
handleStAcquired serverStAcquired

handleGetMeasures ::
ServerStBusy GetMeasures txid tx slot m a
-> Server (LocalTxMonitor txid tx slot) NonPipelined (StBusy GetMeasures) m a
handleGetMeasures = \case
SendMsgReplyGetMeasures measures serverStAcquired ->
Yield (MsgReplyGetMeasures measures) $
handleStAcquired serverStAcquired
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@ module Ouroboros.Network.Protocol.LocalTxMonitor.Type where


import Data.Kind
import Data.Map.Strict (Map)
import Data.Singletons
import Data.Text (Text)
import Data.Word
import GHC.Generics (Generic)

Expand Down Expand Up @@ -131,17 +133,22 @@ data StBusyKind where
-- | The server is busy looking for the current size and max capacity of the
-- mempool
GetSizes :: StBusyKind
-- | The server is busy looking for the current size and max capacity of the
-- mempool
GetMeasures :: StBusyKind

type SingBusyKind :: StBusyKind -> Type
data SingBusyKind st where
SingNextTx :: SingBusyKind NextTx
SingHasTx :: SingBusyKind HasTx
SingGetSizes :: SingBusyKind GetSizes
SingNextTx :: SingBusyKind NextTx
SingHasTx :: SingBusyKind HasTx
SingGetSizes :: SingBusyKind GetSizes
SingGetMeasures :: SingBusyKind GetMeasures

type instance Sing = SingBusyKind
instance SingI NextTx where sing = SingNextTx
instance SingI HasTx where sing = SingHasTx
instance SingI GetSizes where sing = SingGetSizes
instance SingI NextTx where sing = SingNextTx
instance SingI HasTx where sing = SingHasTx
instance SingI GetSizes where sing = SingGetSizes
instance SingI GetMeasures where sing = SingGetMeasures

deriving instance Show (SingBusyKind st)

Expand All @@ -156,6 +163,22 @@ data MempoolSizeAndCapacity = MempoolSizeAndCapacity
-- ^ The number of transactions in the mempool
} deriving (Generic, Eq, Show, NFData)

data SizeAndCapacity a = SizeAndCapacity
{ size :: !a
, capacity :: !a
} deriving (Generic, Eq, Show, NFData)

instance Functor SizeAndCapacity where
fmap f (SizeAndCapacity s c) = SizeAndCapacity (f s) (f c)

newtype MeasureName = MeasureName Text
deriving (Generic, Eq, Ord, Show, NFData)

data MempoolMeasures = MempoolMeasures
{ txCount :: !Word32
, measuresMap :: !(Map MeasureName (SizeAndCapacity Integer))
} deriving (Generic, Eq, Show, NFData)

instance Protocol (LocalTxMonitor txid tx slot) where

-- | The messages in the transaction monitoring protocol.
Expand Down Expand Up @@ -236,6 +259,16 @@ instance Protocol (LocalTxMonitor txid tx slot) where
:: MempoolSizeAndCapacity
-> Message (LocalTxMonitor txid tx slot) (StBusy GetSizes) StAcquired

-- | The client asks the server about the mempool current size and max
-- capacity
--
MsgGetMeasures
:: Message (LocalTxMonitor txid tx slot) StAcquired (StBusy GetMeasures)

MsgReplyGetMeasures
:: MempoolMeasures
-> Message (LocalTxMonitor txid tx slot) (StBusy GetMeasures) StAcquired

-- | Release the acquired snapshot, in order to loop back to the idle state.
--
MsgRelease
Expand All @@ -259,27 +292,31 @@ instance ( NFData txid
, NFData tx
, NFData slot
) => NFData (Message (LocalTxMonitor txid tx slot) from to) where
rnf MsgAcquire = ()
rnf (MsgAcquired slot) = rnf slot
rnf MsgAwaitAcquire = ()
rnf MsgNextTx = ()
rnf (MsgReplyNextTx mbTx) = rnf mbTx
rnf (MsgHasTx txid) = rnf txid
rnf (MsgReplyHasTx b) = rnf b
rnf MsgGetSizes = ()
rnf (MsgReplyGetSizes msc) = rnf msc
rnf MsgRelease = ()
rnf MsgDone = ()
rnf MsgAcquire = ()
rnf (MsgAcquired slot) = rnf slot
rnf MsgAwaitAcquire = ()
rnf MsgNextTx = ()
rnf (MsgReplyNextTx mbTx) = rnf mbTx
rnf (MsgHasTx txid) = rnf txid
rnf (MsgReplyHasTx b) = rnf b
rnf MsgGetSizes = ()
rnf (MsgReplyGetSizes msc) = rnf msc
rnf MsgGetMeasures = ()
rnf (MsgReplyGetMeasures msc) = rnf msc
rnf MsgRelease = ()
rnf MsgDone = ()

data TokBusyKind (k :: StBusyKind) where
TokNextTx :: TokBusyKind NextTx
TokHasTx :: TokBusyKind HasTx
TokGetSizes :: TokBusyKind GetSizes
TokNextTx :: TokBusyKind NextTx
TokHasTx :: TokBusyKind HasTx
TokGetSizes :: TokBusyKind GetSizes
TokGetMeasures :: TokBusyKind GetMeasures
Comment on lines 309 to +313
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have SingBusyKind; I suspect TokBusyKind is not needed at all (either I missed removing it, or it resurfaced after a rebase).


instance NFData (TokBusyKind k) where
rnf TokNextTx = ()
rnf TokHasTx = ()
rnf TokGetSizes = ()
rnf TokNextTx = ()
rnf TokHasTx = ()
rnf TokGetSizes = ()
rnf TokGetMeasures = ()

deriving instance (Show txid, Show tx, Show slot)
=> Show (Message (LocalTxMonitor txid tx slot) from to)
Original file line number Diff line number Diff line change
Expand Up @@ -99,4 +99,10 @@ localTxMonitorServer txId (slot, allTxs) =
, numberOfTxs = fromIntegral (length allTxs)
}
in pure $ SendMsgReplyGetSizes sizes (serverStAcquired txs)
, recvMsgGetMeasures =
let measures = MempoolMeasures
{ txCount = fromIntegral (length allTxs)
, measuresMap = mempty
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mempty? Is it unfinished?

}
in pure $ SendMsgReplyGetMeasures measures (serverStAcquired txs)
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import Ouroboros.Network.Protocol.LocalTxMonitor.Examples
import Ouroboros.Network.Protocol.LocalTxMonitor.Server
import Ouroboros.Network.Protocol.LocalTxMonitor.Type

import Data.Text qualified as Text
import Test.ChainGenerators ()
import Test.Ouroboros.Network.Testing.Utils (prop_codec_cborM,
prop_codec_valid_cbor_encoding, splits2, splits3)
Expand Down Expand Up @@ -210,10 +211,39 @@ instance (Arbitrary txid, Arbitrary tx, Arbitrary slot)
, AnyMessage . MsgReplyHasTx <$> arbitrary
, pure $ AnyMessage MsgGetSizes
, AnyMessage . MsgReplyGetSizes <$> arbitrary
, pure $ AnyMessage MsgGetMeasures
, AnyMessage . MsgReplyGetMeasures <$> arbitrary
, pure $ AnyMessage MsgRelease
, pure $ AnyMessage MsgDone
]

instance Arbitrary MempoolMeasures where
arbitrary =
MempoolMeasures
<$> arbitrary
<*> arbitrary

instance Arbitrary MeasureName where
arbitrary = MeasureName <$> frequency
[ (9, genKnownMeasureName)
, (1, genUnknownMeasureName)
]
where
genKnownMeasureName =
Text.pack <$> elements
[ "transaction_bytes"
, "reference_scripts"
, "ex_units_memory"
, "ex_units_steps"
]
genUnknownMeasureName = Text.pack <$> arbitrary

instance Arbitrary a => Arbitrary (SizeAndCapacity a) where
arbitrary =
SizeAndCapacity
<$> arbitrary
<*> arbitrary

instance Arbitrary MempoolSizeAndCapacity where
arbitrary =
MempoolSizeAndCapacity
Expand All @@ -224,15 +254,17 @@ instance Arbitrary MempoolSizeAndCapacity where
instance (Eq txid, Eq tx, Eq slot)
=> Eq (AnyMessage (LocalTxMonitor txid tx slot))
where
AnyMessage MsgAcquire == AnyMessage MsgAcquire = True
AnyMessage (MsgAcquired a) == AnyMessage (MsgAcquired b) = a == b
AnyMessage MsgAwaitAcquire == AnyMessage MsgAwaitAcquire = True
AnyMessage MsgNextTx == AnyMessage MsgNextTx = True
AnyMessage (MsgReplyNextTx a) == AnyMessage (MsgReplyNextTx b) = a == b
AnyMessage (MsgHasTx a) == AnyMessage (MsgHasTx b) = a == b
AnyMessage (MsgReplyHasTx a) == AnyMessage (MsgReplyHasTx b) = a == b
AnyMessage MsgGetSizes == AnyMessage MsgGetSizes = True
AnyMessage (MsgReplyGetSizes a) == AnyMessage (MsgReplyGetSizes b) = a == b
AnyMessage MsgRelease == AnyMessage MsgRelease = True
AnyMessage MsgDone == AnyMessage MsgDone = True
AnyMessage _ == AnyMessage _ = False
AnyMessage MsgAcquire == AnyMessage MsgAcquire = True
AnyMessage (MsgAcquired a) == AnyMessage (MsgAcquired b) = a == b
AnyMessage MsgAwaitAcquire == AnyMessage MsgAwaitAcquire = True
AnyMessage MsgNextTx == AnyMessage MsgNextTx = True
AnyMessage (MsgReplyNextTx a) == AnyMessage (MsgReplyNextTx b) = a == b
AnyMessage (MsgHasTx a) == AnyMessage (MsgHasTx b) = a == b
AnyMessage (MsgReplyHasTx a) == AnyMessage (MsgReplyHasTx b) = a == b
AnyMessage MsgGetSizes == AnyMessage MsgGetSizes = True
AnyMessage (MsgReplyGetSizes a) == AnyMessage (MsgReplyGetSizes b) = a == b
AnyMessage MsgGetMeasures == AnyMessage MsgGetMeasures = True
AnyMessage (MsgReplyGetMeasures a) == AnyMessage (MsgReplyGetMeasures b) = a == b
AnyMessage MsgRelease == AnyMessage MsgRelease = True
AnyMessage MsgDone == AnyMessage MsgDone = True
AnyMessage _ == AnyMessage _ = False
Loading