-
Notifications
You must be signed in to change notification settings - Fork 87
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
LocalTxMonitor: Add support for GetMeasures #4918
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -14,12 +14,15 @@ module Ouroboros.Network.Protocol.LocalTxMonitor.Codec | |||||||||||||
, codecLocalTxMonitorId | ||||||||||||||
) where | ||||||||||||||
|
||||||||||||||
import Control.Monad | ||||||||||||||
import Control.Monad.Class.MonadST | ||||||||||||||
|
||||||||||||||
import Network.TypedProtocol.Codec.CBOR | ||||||||||||||
import Network.TypedProtocol.Core | ||||||||||||||
|
||||||||||||||
import Data.ByteString.Lazy (ByteString) | ||||||||||||||
import Data.Map.Strict (Map) | ||||||||||||||
import Data.Map.Strict qualified as Map | ||||||||||||||
|
||||||||||||||
import Codec.CBOR.Decoding qualified as CBOR | ||||||||||||||
import Codec.CBOR.Encoding qualified as CBOR | ||||||||||||||
|
@@ -64,6 +67,8 @@ codecLocalTxMonitor encodeTxId decodeTxId | |||||||||||||
CBOR.encodeListLen 2 <> CBOR.encodeWord 7 <> encodeTxId txid | ||||||||||||||
MsgGetSizes -> | ||||||||||||||
CBOR.encodeListLen 1 <> CBOR.encodeWord 9 | ||||||||||||||
MsgGetMeasures -> | ||||||||||||||
CBOR.encodeListLen 1 <> CBOR.encodeWord 11 | ||||||||||||||
MsgAcquired slot -> | ||||||||||||||
CBOR.encodeListLen 2 <> CBOR.encodeWord 2 <> encodeSlot slot | ||||||||||||||
MsgReplyNextTx Nothing -> | ||||||||||||||
|
@@ -79,6 +84,12 @@ codecLocalTxMonitor encodeTxId decodeTxId | |||||||||||||
<> CBOR.encodeWord32 (capacityInBytes sz) | ||||||||||||||
<> CBOR.encodeWord32 (sizeInBytes sz) | ||||||||||||||
<> CBOR.encodeWord32 (numberOfTxs sz) | ||||||||||||||
MsgReplyGetMeasures measures -> | ||||||||||||||
CBOR.encodeListLen 2 | ||||||||||||||
<> CBOR.encodeWord 12 | ||||||||||||||
<> CBOR.encodeListLen 2 | ||||||||||||||
<> CBOR.encodeWord32 (txCount measures) | ||||||||||||||
<> encodeMeasureMap (measuresMap measures) | ||||||||||||||
|
||||||||||||||
decode :: | ||||||||||||||
forall s (st :: ptcl). | ||||||||||||||
|
@@ -105,6 +116,8 @@ codecLocalTxMonitor encodeTxId decodeTxId | |||||||||||||
return (SomeMessage (MsgHasTx txid)) | ||||||||||||||
(SingAcquired, 1, 9) -> | ||||||||||||||
return (SomeMessage MsgGetSizes) | ||||||||||||||
(SingAcquired, 1, 11) -> | ||||||||||||||
return (SomeMessage MsgGetMeasures) | ||||||||||||||
|
||||||||||||||
(SingAcquiring, 2, 2) -> do | ||||||||||||||
slot <- decodeSlot | ||||||||||||||
|
@@ -128,12 +141,53 @@ codecLocalTxMonitor encodeTxId decodeTxId | |||||||||||||
let sizes = MempoolSizeAndCapacity { capacityInBytes, sizeInBytes, numberOfTxs } | ||||||||||||||
return (SomeMessage (MsgReplyGetSizes sizes)) | ||||||||||||||
|
||||||||||||||
(SingBusy SingGetMeasures, 2, 12) -> do | ||||||||||||||
_len <- CBOR.decodeListLen | ||||||||||||||
txCount <- CBOR.decodeWord32 | ||||||||||||||
measuresMap <- decodeMeasureMap | ||||||||||||||
let measures = MempoolMeasures { txCount, measuresMap } | ||||||||||||||
pure (SomeMessage (MsgReplyGetMeasures measures)) | ||||||||||||||
|
||||||||||||||
(SingDone, _, _) -> notActiveState stok | ||||||||||||||
|
||||||||||||||
(_, _, _) -> | ||||||||||||||
fail (printf "codecLocalTxMonitor (%s, %s) unexpected key (%d, %d)" | ||||||||||||||
(show (activeAgency :: ActiveAgency st)) (show stok) key len) | ||||||||||||||
|
||||||||||||||
encodeMeasureMap :: Map MeasureName (SizeAndCapacity Integer) -> CBOR.Encoding | ||||||||||||||
encodeMeasureMap m = | ||||||||||||||
CBOR.encodeMapLen (fromIntegral (Map.size m)) <> | ||||||||||||||
Map.foldMapWithKey f m | ||||||||||||||
where | ||||||||||||||
f mn sc = | ||||||||||||||
encodeMeasureName mn <> encodeSizeAndCapacity sc | ||||||||||||||
Comment on lines
+162
to
+163
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Shouldn't it be ⬇️ for it to produce valid CBOR?
Suggested change
|
||||||||||||||
|
||||||||||||||
decodeMeasureMap :: CBOR.Decoder s (Map MeasureName (SizeAndCapacity Integer)) | ||||||||||||||
decodeMeasureMap = do | ||||||||||||||
len <- CBOR.decodeMapLen | ||||||||||||||
mapContents <- replicateM len $ | ||||||||||||||
(,) <$> decodeMeasureName <*> decodeSizeAndCapacity | ||||||||||||||
pure $ Map.fromList mapContents | ||||||||||||||
|
||||||||||||||
encodeMeasureName :: MeasureName -> CBOR.Encoding | ||||||||||||||
encodeMeasureName (MeasureName t) = CBOR.encodeString t | ||||||||||||||
|
||||||||||||||
decodeMeasureName :: CBOR.Decoder s MeasureName | ||||||||||||||
decodeMeasureName = MeasureName <$> CBOR.decodeString | ||||||||||||||
|
||||||||||||||
encodeSizeAndCapacity :: SizeAndCapacity Integer -> CBOR.Encoding | ||||||||||||||
encodeSizeAndCapacity sc = | ||||||||||||||
CBOR.encodeListLen 2 <> | ||||||||||||||
CBOR.encodeInteger (size sc) <> | ||||||||||||||
CBOR.encodeInteger (capacity sc) | ||||||||||||||
|
||||||||||||||
decodeSizeAndCapacity :: CBOR.Decoder s (SizeAndCapacity Integer) | ||||||||||||||
decodeSizeAndCapacity = do | ||||||||||||||
_len <- CBOR.decodeListLen | ||||||||||||||
size <- CBOR.decodeInteger | ||||||||||||||
capacity <- CBOR.decodeInteger | ||||||||||||||
pure SizeAndCapacity { size, capacity } | ||||||||||||||
|
||||||||||||||
-- | An identity 'Codec' for the 'LocalTxMonitor' protocol. It does not do | ||||||||||||||
-- any serialisation. It keeps the typed messages, wrapped in 'AnyMessage'. | ||||||||||||||
-- | ||||||||||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -48,7 +48,9 @@ module Ouroboros.Network.Protocol.LocalTxMonitor.Type where | |
|
||
|
||
import Data.Kind | ||
import Data.Map.Strict (Map) | ||
import Data.Singletons | ||
import Data.Text (Text) | ||
import Data.Word | ||
import GHC.Generics (Generic) | ||
|
||
|
@@ -131,17 +133,22 @@ data StBusyKind where | |
-- | The server is busy looking for the current size and max capacity of the | ||
-- mempool | ||
GetSizes :: StBusyKind | ||
-- | The server is busy looking for the current size and max capacity of the | ||
-- mempool | ||
GetMeasures :: StBusyKind | ||
|
||
type SingBusyKind :: StBusyKind -> Type | ||
data SingBusyKind st where | ||
SingNextTx :: SingBusyKind NextTx | ||
SingHasTx :: SingBusyKind HasTx | ||
SingGetSizes :: SingBusyKind GetSizes | ||
SingNextTx :: SingBusyKind NextTx | ||
SingHasTx :: SingBusyKind HasTx | ||
SingGetSizes :: SingBusyKind GetSizes | ||
SingGetMeasures :: SingBusyKind GetMeasures | ||
|
||
type instance Sing = SingBusyKind | ||
instance SingI NextTx where sing = SingNextTx | ||
instance SingI HasTx where sing = SingHasTx | ||
instance SingI GetSizes where sing = SingGetSizes | ||
instance SingI NextTx where sing = SingNextTx | ||
instance SingI HasTx where sing = SingHasTx | ||
instance SingI GetSizes where sing = SingGetSizes | ||
instance SingI GetMeasures where sing = SingGetMeasures | ||
|
||
deriving instance Show (SingBusyKind st) | ||
|
||
|
@@ -156,6 +163,22 @@ data MempoolSizeAndCapacity = MempoolSizeAndCapacity | |
-- ^ The number of transactions in the mempool | ||
} deriving (Generic, Eq, Show, NFData) | ||
|
||
data SizeAndCapacity a = SizeAndCapacity | ||
{ size :: !a | ||
, capacity :: !a | ||
} deriving (Generic, Eq, Show, NFData) | ||
|
||
instance Functor SizeAndCapacity where | ||
fmap f (SizeAndCapacity s c) = SizeAndCapacity (f s) (f c) | ||
|
||
newtype MeasureName = MeasureName Text | ||
deriving (Generic, Eq, Ord, Show, NFData) | ||
|
||
data MempoolMeasures = MempoolMeasures | ||
{ txCount :: !Word32 | ||
, measuresMap :: !(Map MeasureName (SizeAndCapacity Integer)) | ||
} deriving (Generic, Eq, Show, NFData) | ||
|
||
instance Protocol (LocalTxMonitor txid tx slot) where | ||
|
||
-- | The messages in the transaction monitoring protocol. | ||
|
@@ -236,6 +259,16 @@ instance Protocol (LocalTxMonitor txid tx slot) where | |
:: MempoolSizeAndCapacity | ||
-> Message (LocalTxMonitor txid tx slot) (StBusy GetSizes) StAcquired | ||
|
||
-- | The client asks the server about the mempool current size and max | ||
-- capacity | ||
-- | ||
MsgGetMeasures | ||
:: Message (LocalTxMonitor txid tx slot) StAcquired (StBusy GetMeasures) | ||
|
||
MsgReplyGetMeasures | ||
:: MempoolMeasures | ||
-> Message (LocalTxMonitor txid tx slot) (StBusy GetMeasures) StAcquired | ||
|
||
-- | Release the acquired snapshot, in order to loop back to the idle state. | ||
-- | ||
MsgRelease | ||
|
@@ -259,27 +292,31 @@ instance ( NFData txid | |
, NFData tx | ||
, NFData slot | ||
) => NFData (Message (LocalTxMonitor txid tx slot) from to) where | ||
rnf MsgAcquire = () | ||
rnf (MsgAcquired slot) = rnf slot | ||
rnf MsgAwaitAcquire = () | ||
rnf MsgNextTx = () | ||
rnf (MsgReplyNextTx mbTx) = rnf mbTx | ||
rnf (MsgHasTx txid) = rnf txid | ||
rnf (MsgReplyHasTx b) = rnf b | ||
rnf MsgGetSizes = () | ||
rnf (MsgReplyGetSizes msc) = rnf msc | ||
rnf MsgRelease = () | ||
rnf MsgDone = () | ||
rnf MsgAcquire = () | ||
rnf (MsgAcquired slot) = rnf slot | ||
rnf MsgAwaitAcquire = () | ||
rnf MsgNextTx = () | ||
rnf (MsgReplyNextTx mbTx) = rnf mbTx | ||
rnf (MsgHasTx txid) = rnf txid | ||
rnf (MsgReplyHasTx b) = rnf b | ||
rnf MsgGetSizes = () | ||
rnf (MsgReplyGetSizes msc) = rnf msc | ||
rnf MsgGetMeasures = () | ||
rnf (MsgReplyGetMeasures msc) = rnf msc | ||
rnf MsgRelease = () | ||
rnf MsgDone = () | ||
|
||
data TokBusyKind (k :: StBusyKind) where | ||
TokNextTx :: TokBusyKind NextTx | ||
TokHasTx :: TokBusyKind HasTx | ||
TokGetSizes :: TokBusyKind GetSizes | ||
TokNextTx :: TokBusyKind NextTx | ||
TokHasTx :: TokBusyKind HasTx | ||
TokGetSizes :: TokBusyKind GetSizes | ||
TokGetMeasures :: TokBusyKind GetMeasures | ||
Comment on lines
309
to
+313
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We have |
||
|
||
instance NFData (TokBusyKind k) where | ||
rnf TokNextTx = () | ||
rnf TokHasTx = () | ||
rnf TokGetSizes = () | ||
rnf TokNextTx = () | ||
rnf TokHasTx = () | ||
rnf TokGetSizes = () | ||
rnf TokGetMeasures = () | ||
|
||
deriving instance (Show txid, Show tx, Show slot) | ||
=> Show (Message (LocalTxMonitor txid tx slot) from to) |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -99,4 +99,10 @@ localTxMonitorServer txId (slot, allTxs) = | |
, numberOfTxs = fromIntegral (length allTxs) | ||
} | ||
in pure $ SendMsgReplyGetSizes sizes (serverStAcquired txs) | ||
, recvMsgGetMeasures = | ||
let measures = MempoolMeasures | ||
{ txCount = fromIntegral (length allTxs) | ||
, measuresMap = mempty | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
} | ||
in pure $ SendMsgReplyGetMeasures measures (serverStAcquired txs) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not?