Skip to content

Commit

Permalink
Update configuration after recovering BulkSync in ouroboros-network
Browse files Browse the repository at this point in the history
  • Loading branch information
facundominguez authored and amesgen committed Nov 18, 2024
1 parent c2763cb commit b3aa800
Show file tree
Hide file tree
Showing 8 changed files with 110 additions and 79 deletions.
4 changes: 2 additions & 2 deletions cabal.project
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ constraints: Cabal < 3.13
source-repository-package
type: git
location: https://github.com/IntersectMBO/ouroboros-network
tag: fcb842fcd6f32b43a7cdf18a4301c1659a8bb879
--sha256: kjwUrduwwxC+5QRQNJa4stEBzz7kqDJyyHOgGMfDw7s=
tag: dcc2402326ea8c33a6578babbc7a1edb20ce7f5a
--sha256: sha256-kEOkWueiv41E5A7aMu6gCckVC3Q/AOj+zWWQQgvnMtc=
subdir:
ouroboros-network
ouroboros-network-api
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,8 @@ nonImmutableDbPath (MultipleDbPaths _ vol) = vol
--
-- See 'stdLowLevelRunNodeArgsIO'.
data StdRunNodeArgs m blk (p2p :: Diffusion.P2P) = StdRunNodeArgs
{ srnBfcMaxConcurrencyDeadline :: Maybe Word
{ srnBfcMaxConcurrencyBulkSync :: Maybe Word
, srnBfcMaxConcurrencyDeadline :: Maybe Word
, srnChainDbValidateOverride :: Bool
-- ^ If @True@, validate the ChainDB on init no matter what
, srnDiskPolicyArgs :: DiskPolicyArgs
Expand Down Expand Up @@ -981,6 +982,9 @@ stdLowLevelRunNodeArgsIO RunNodeArgs{ rnProtocolInfo
maybe id
(\mc bfc -> bfc { bfcMaxConcurrencyDeadline = mc })
srnBfcMaxConcurrencyDeadline
. maybe id
(\mc bfc -> bfc { bfcMaxConcurrencyBulkSync = mc })
srnBfcMaxConcurrencyBulkSync
modifyMempoolCapacityOverride =
maybe id
(\mc nka -> nka { mempoolCapacityOverride = mc })
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,26 +63,26 @@ data GenesisConfig = GenesisConfig

-- | Genesis configuration flags and low-level args, as parsed from config file or CLI
data GenesisConfigFlags = GenesisConfigFlags
{ gcfEnableCSJ :: Bool
, gcfEnableLoEAndGDD :: Bool
, gcfEnableLoP :: Bool
, gcfBulkSyncGracePeriod :: Maybe Integer
, gcfBucketCapacity :: Maybe Integer
, gcfBucketRate :: Maybe Integer
, gcfCSJJumpSize :: Maybe Integer
, gcfGDDRateLimit :: Maybe DiffTime
{ gcfEnableCSJ :: Bool
, gcfEnableLoEAndGDD :: Bool
, gcfEnableLoP :: Bool
, gcfBlockFetchGracePeriod :: Maybe Integer
, gcfBucketCapacity :: Maybe Integer
, gcfBucketRate :: Maybe Integer
, gcfCSJJumpSize :: Maybe Integer
, gcfGDDRateLimit :: Maybe DiffTime
} deriving stock (Eq, Generic, Show)

defaultGenesisConfigFlags :: GenesisConfigFlags
defaultGenesisConfigFlags = GenesisConfigFlags
{ gcfEnableCSJ = True
, gcfEnableLoEAndGDD = True
, gcfEnableLoP = True
, gcfBulkSyncGracePeriod = Nothing
, gcfBucketCapacity = Nothing
, gcfBucketRate = Nothing
, gcfCSJJumpSize = Nothing
, gcfGDDRateLimit = Nothing
{ gcfEnableCSJ = True
, gcfEnableLoEAndGDD = True
, gcfEnableLoP = True
, gcfBlockFetchGracePeriod = Nothing
, gcfBucketCapacity = Nothing
, gcfBucketRate = Nothing
, gcfCSJJumpSize = Nothing
, gcfGDDRateLimit = Nothing
}

enableGenesisConfigDefault :: GenesisConfig
Expand All @@ -96,7 +96,7 @@ mkGenesisConfig :: Maybe GenesisConfigFlags -> GenesisConfig
mkGenesisConfig Nothing = -- disable Genesis
GenesisConfig
{ gcBlockFetchConfig = GenesisBlockFetchConfiguration
{ gbfcBulkSyncGracePeriod = 0 -- no grace period when Genesis is disabled
{ gbfcGracePeriod = 0 -- no grace period when Genesis is disabled
}
, gcChainSyncLoPBucketConfig = ChainSyncLoPBucketDisabled
, gcCSJConfig = CSJDisabled
Expand All @@ -106,7 +106,7 @@ mkGenesisConfig Nothing = -- disable Genesis
mkGenesisConfig (Just GenesisConfigFlags{..}) =
GenesisConfig
{ gcBlockFetchConfig = GenesisBlockFetchConfiguration
{ gbfcBulkSyncGracePeriod
{ gbfcGracePeriod
}
, gcChainSyncLoPBucketConfig = if gcfEnableLoP
then ChainSyncLoPBucketEnabled ChainSyncLoPBucketEnabledConfig
Expand All @@ -129,18 +129,18 @@ mkGenesisConfig (Just GenesisConfigFlags{..}) =
}
where
-- TODO justification/derivation from other parameters
defaultBulkSyncGracePeriod = 10 -- seconds
defaultCapacity = 100_000 -- number of tokens
defaultRate = 500 -- tokens per second leaking, 1/2ms
defaultBlockFetchGracePeriod = 10 -- seconds
defaultCapacity = 100_000 -- number of tokens
defaultRate = 500 -- tokens per second leaking, 1/2ms
-- 3 * 2160 * 20 works in more recent ranges of slots, but causes syncing to
-- block in byron.
defaultCSJJumpSize = 2 * 2160
defaultGDDRateLimit = 1.0 -- seconds
defaultCSJJumpSize = 2 * 2160
defaultGDDRateLimit = 1.0 -- seconds

gbfcBulkSyncGracePeriod = fromInteger $ fromMaybe defaultBulkSyncGracePeriod gcfBulkSyncGracePeriod
csbcCapacity = fromInteger $ fromMaybe defaultCapacity gcfBucketCapacity
csbcRate = fromInteger $ fromMaybe defaultRate gcfBucketRate
csjcJumpSize = fromInteger $ fromMaybe defaultCSJJumpSize gcfCSJJumpSize
gbfcGracePeriod = fromInteger $ fromMaybe defaultBlockFetchGracePeriod gcfBlockFetchGracePeriod
csbcCapacity = fromInteger $ fromMaybe defaultCapacity gcfBucketCapacity
csbcRate = fromInteger $ fromMaybe defaultRate gcfBucketRate
csjcJumpSize = fromInteger $ fromMaybe defaultCSJJumpSize gcfCSJJumpSize
lgpGDDRateLimit = fromMaybe defaultGDDRateLimit gcfGDDRateLimit

newtype LoEAndGDDParams = LoEAndGDDParams
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,9 @@ import Ouroboros.Network.AnchoredFragment (AnchoredFragment,
import qualified Ouroboros.Network.AnchoredFragment as AF
import Ouroboros.Network.Block (castTip, tipFromHeader)
import Ouroboros.Network.BlockFetch
import Ouroboros.Network.BlockFetch.ConsensusInterface
(GenesisFetchMode)
import Ouroboros.Network.ConsensusMode (ConsensusMode (..))
import Ouroboros.Network.Diffusion (PublicPeerSelectionState)
import Ouroboros.Network.NodeToNode (ConnectionId,
MiniProtocolParameters (..))
Expand Down Expand Up @@ -136,7 +139,7 @@ data NodeKernel m addrNTN addrNTC blk = NodeKernel {

-- | The fetch mode, used by diffusion.
--
, getFetchMode :: STM m FetchMode
, getFetchMode :: STM m GenesisFetchMode

-- | The GSM state, used by diffusion. A ledger judgement can be derived
-- from it with 'GSM.gsmStateToLedgerJudgement'.
Expand Down Expand Up @@ -378,6 +381,7 @@ initInternalState NodeKernelArgs { tracers, chainDB, registry, cfg
, mempoolCapacityOverride
, gsmArgs, getUseBootstrapPeers
, getDiffusionPipeliningSupport
, genesisArgs
} = do
varGsmState <- do
let GsmNodeKernelArgs {..} = gsmArgs
Expand All @@ -398,6 +402,7 @@ initInternalState NodeKernelArgs { tracers, chainDB, registry, cfg

slotForgeTimeOracle <- BlockFetchClientInterface.initSlotForgeTimeOracle cfg chainDB
let readFetchMode = BlockFetchClientInterface.readFetchModeDefault
(toConsensusMode $ gnkaLoEAndGDDArgs genesisArgs)
btime
(ChainDB.getCurrentChain chainDB)
getUseBootstrapPeers
Expand All @@ -416,6 +421,11 @@ initInternalState NodeKernelArgs { tracers, chainDB, registry, cfg
peerSharingRegistry <- newPeerSharingRegistry

return IS {..}
where
toConsensusMode :: forall a. LoEAndGDDConfig a -> ConsensusMode
toConsensusMode = \case
LoEAndGDDDisabled -> PraosMode
LoEAndGDDEnabled _ -> GenesisMode

forkBlockForging ::
forall m addrNTN addrNTC blk.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1013,10 +1013,11 @@ runThreadNetwork systemTime ThreadNetworkArgs
txSubmissionMaxUnacked = 1000 -- TODO ?
}
, blockFetchConfiguration = BlockFetchConfiguration {
bfcMaxConcurrencyDeadline = 2
bfcMaxConcurrencyBulkSync = 1
, bfcMaxConcurrencyDeadline = 2
, bfcMaxRequestsInflight = 10
, bfcDecisionLoopIntervalBulkSync = 0.0 -- Mock testsuite can use sub-second slot
, bfcDecisionLoopIntervalDeadline = 0.0 -- interval which doesn't play nice with
, bfcDecisionLoopIntervalPraos = 0.0 -- Mock testsuite can use sub-second slot
, bfcDecisionLoopIntervalGenesis = 0.0 -- interval which doesn't play nice with
-- blockfetch descision interval.
, bfcSalt = 0
, bfcGenesisBFConfig = gcBlockFetchConfig enableGenesisConfigDefault
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,12 @@ import Ouroboros.Consensus.Storage.ChainDB.API
import Ouroboros.Consensus.Util (ShowProxy)
import Ouroboros.Consensus.Util.IOLike
import Ouroboros.Network.BlockFetch (BlockFetchConfiguration (..),
FetchClientRegistry, FetchMode (..),
GenesisBlockFetchConfiguration (..), blockFetchLogic,
bracketFetchClient, bracketKeepAliveClient)
FetchClientRegistry, GenesisBlockFetchConfiguration (..),
blockFetchLogic, bracketFetchClient,
bracketKeepAliveClient)
import Ouroboros.Network.BlockFetch.Client (blockFetchClient)
import Ouroboros.Network.BlockFetch.ConsensusInterface
(GenesisFetchMode (..))
import Ouroboros.Network.Channel (Channel)
import Ouroboros.Network.ControlMessage (ControlMessageSTM)
import Ouroboros.Network.Driver (runPeer)
Expand Down Expand Up @@ -93,13 +95,13 @@ startBlockFetchLogic enableChainSelStarvation registry tracer chainDb fetchClien
-- do not serialize the blocks.
(\_hdr -> 1000)
slotForgeTime
-- This is a syncing test, so we use 'FetchModeBulkSync'.
(pure FetchModeBulkSync)
-- This is a syncing test, so we use 'FetchModeGenesis'.
(pure FetchModeGenesis)
DiffusionPipeliningOn

bfcGenesisBFConfig = if enableChainSelStarvation
then GenesisBlockFetchConfiguration
{ gbfcBulkSyncGracePeriod =
{ gbfcGracePeriod =
if enableChainSelStarvation then
10 -- default value for cardano-node at the time of writing
else
Expand All @@ -110,10 +112,11 @@ startBlockFetchLogic enableChainSelStarvation registry tracer chainDb fetchClien
-- Values taken from
-- ouroboros-consensus-diffusion/src/unstable-diffusion-testlib/Test/ThreadNet/Network.hs
blockFetchCfg = BlockFetchConfiguration
{ bfcMaxConcurrencyDeadline = 50 -- unused because of @pure FetchModeBulkSync@ above
{ bfcMaxConcurrencyBulkSync = 50
, bfcMaxConcurrencyDeadline = 50 -- unused because of @pure FetchModeBulkSync@ above
, bfcMaxRequestsInflight = 10
, bfcDecisionLoopIntervalBulkSync = 0
, bfcDecisionLoopIntervalDeadline = 0
, bfcDecisionLoopIntervalPraos = 0
, bfcDecisionLoopIntervalGenesis = 0
, bfcSalt = 0
, bfcGenesisBFConfig
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ import Ouroboros.Network.Block (MaxSlotNo)
import Ouroboros.Network.BlockFetch.ConsensusInterface
(BlockFetchConsensusInterface (..),
ChainSelStarvation (..), FetchMode (..),
FromConsensus (..))
FromConsensus (..), GenesisFetchMode (..), mkReadFetchMode)
import Ouroboros.Network.ConsensusMode (ConsensusMode)
import Ouroboros.Network.PeerSelection.Bootstrap (UseBootstrapPeers,
requiresBootstrapPeers)
import Ouroboros.Network.PeerSelection.LedgerPeers.Type
Expand Down Expand Up @@ -142,37 +143,41 @@ initSlotForgeTimeOracle cfg chainDB = do

readFetchModeDefault ::
(MonadSTM m, HasHeader blk)
=> BlockchainTime m
=> ConsensusMode
-> BlockchainTime m
-> STM m (AnchoredFragment blk)
-> STM m UseBootstrapPeers
-> STM m LedgerStateJudgement
-> STM m FetchMode
readFetchModeDefault btime getCurrentChain
getUseBootstrapPeers getLedgerStateJudgement = do
mCurSlot <- getCurrentSlot btime
usingBootstrapPeers <- requiresBootstrapPeers <$> getUseBootstrapPeers
<*> getLedgerStateJudgement
-> STM m GenesisFetchMode
readFetchModeDefault consensusMode btime getCurrentChain
getUseBootstrapPeers getLedgerStateJudgement =
mkReadFetchMode consensusMode getLedgerStateJudgement praosFetchMode
where
praosFetchMode = do
mCurSlot <- getCurrentSlot btime
usingBootstrapPeers <- requiresBootstrapPeers <$> getUseBootstrapPeers
<*> getLedgerStateJudgement

-- This logic means that when the node is using bootstrap peers and is in
-- TooOld state it will always return BulkSync. Otherwise if the node
-- isn't using bootstrap peers (i.e. has them disabled it will use the old
-- logic of returning BulkSync if behind 1000 slots
case (usingBootstrapPeers, mCurSlot) of
(True, _) -> return FetchModeBulkSync
(False, CurrentSlotUnknown) -> return FetchModeBulkSync
(False, CurrentSlot curSlot) -> do
curChainSlot <- AF.headSlot <$> getCurrentChain
let slotsBehind = case curChainSlot of
-- There's nothing in the chain. If the current slot is 0, then
-- we're 1 slot behind.
Origin -> unSlotNo curSlot + 1
NotOrigin slot -> unSlotNo curSlot - unSlotNo slot
maxSlotsBehind = 1000
return $ if slotsBehind < maxSlotsBehind
-- When the current chain is near to "now", use deadline mode,
-- when it is far away, use bulk sync mode.
then FetchModeDeadline
else FetchModeBulkSync
-- This logic means that when the node is using bootstrap peers and is in
-- TooOld state it will always return BulkSync. Otherwise if the node
-- isn't using bootstrap peers (i.e. has them disabled it will use the old
-- logic of returning BulkSync if behind 1000 slots
case (usingBootstrapPeers, mCurSlot) of
(True, _) -> return FetchModeBulkSync
(False, CurrentSlotUnknown) -> return FetchModeBulkSync
(False, CurrentSlot curSlot) -> do
curChainSlot <- AF.headSlot <$> getCurrentChain
let slotsBehind = case curChainSlot of
-- There's nothing in the chain. If the current slot is 0, then
-- we're 1 slot behind.
Origin -> unSlotNo curSlot + 1
NotOrigin slot -> unSlotNo curSlot - unSlotNo slot
maxSlotsBehind = 1000
return $ if slotsBehind < maxSlotsBehind
-- When the current chain is near to "now", use deadline mode,
-- when it is far away, use bulk sync mode.
then FetchModeDeadline
else FetchModeBulkSync

mkBlockFetchConsensusInterface ::
forall m peer blk.
Expand All @@ -188,7 +193,7 @@ mkBlockFetchConsensusInterface ::
-> (Header blk -> SizeInBytes)
-> SlotForgeTimeOracle m blk
-- ^ Slot forge time, see 'headerForgeUTCTime' and 'blockForgeUTCTime'.
-> STM m FetchMode
-> STM m GenesisFetchMode
-- ^ See 'readFetchMode'.
-> DiffusionPipeliningSupport
-> BlockFetchConsensusInterface peer (Header blk) blk m
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ import Ouroboros.Network.BlockFetch (BlockFetchConfiguration (..),
bracketFetchClient, bracketKeepAliveClient,
bracketSyncWithFetchClient, newFetchClientRegistry)
import Ouroboros.Network.BlockFetch.Client (blockFetchClient)
import Ouroboros.Network.BlockFetch.ConsensusInterface
(GenesisFetchMode (..))
import Ouroboros.Network.ControlMessage (ControlMessage (..))
import Ouroboros.Network.Mock.Chain (Chain)
import qualified Ouroboros.Network.Mock.Chain as Chain
Expand Down Expand Up @@ -97,8 +99,9 @@ prop_blockFetch bfcts@BlockFetchClientTestSetup{..} =
[ Map.keysSet bfcoBlockFetchResults === Map.keysSet peerUpdates
, counterexample ("Fetched blocks per peer: " <> condense bfcoFetchedBlocks) $
property $ case blockFetchMode of
FetchModeDeadline -> all (> 0) bfcoFetchedBlocks
FetchModeBulkSync -> any (> 0) bfcoFetchedBlocks
PraosFetchMode FetchModeDeadline -> all (> 0) bfcoFetchedBlocks
PraosFetchMode FetchModeBulkSync -> all (> 0) bfcoFetchedBlocks
FetchModeGenesis -> any (> 0) bfcoFetchedBlocks
]
where
BlockFetchClientOutcome{..} = runSimOrThrow $ runBlockFetchTest bfcts
Expand Down Expand Up @@ -330,7 +333,7 @@ data BlockFetchClientTestSetup = BlockFetchClientTestSetup {
-- the candidate fragments provided by the ChainSync client.
peerUpdates :: Map PeerId (Schedule ChainUpdate)
-- | BlockFetch 'FetchMode'
, blockFetchMode :: FetchMode
, blockFetchMode :: GenesisFetchMode
, blockFetchCfg :: BlockFetchConfiguration
, blockFetchPipelining :: DiffusionPipeliningSupport
}
Expand Down Expand Up @@ -362,18 +365,23 @@ instance Arbitrary BlockFetchClientTestSetup where
peerUpdates <-
Map.fromList . zip peerIds
<$> replicateM numPeers (genUpdateSchedule blockFetchPipelining)
blockFetchMode <- elements [FetchModeBulkSync, FetchModeDeadline]
blockFetchMode <- elements
[ PraosFetchMode FetchModeBulkSync
, PraosFetchMode FetchModeDeadline
, FetchModeGenesis
]
blockFetchCfg <- do
let -- ensure that we can download blocks from all peers
bfcMaxConcurrencyBulkSync = fromIntegral numPeers
bfcMaxConcurrencyDeadline = fromIntegral numPeers
-- This is used to introduce a minimal delay between BlockFetch
-- logic iterations in case the monitored state vars change too
-- fast, which we don't have to worry about in this test.
bfcDecisionLoopIntervalBulkSync = 0
bfcDecisionLoopIntervalDeadline = 0
bfcDecisionLoopIntervalGenesis = 0
bfcDecisionLoopIntervalPraos = 0
bfcMaxRequestsInflight <- chooseEnum (2, 10)
bfcSalt <- arbitrary
gbfcBulkSyncGracePeriod <- fromIntegral <$> chooseInteger (5, 60)
gbfcGracePeriod <- fromIntegral <$> chooseInteger (5, 60)
let bfcGenesisBFConfig = GenesisBlockFetchConfiguration {..}
pure BlockFetchConfiguration {..}
pure BlockFetchClientTestSetup {..}
Expand Down

0 comments on commit b3aa800

Please sign in to comment.