Skip to content

Commit

Permalink
Update configuration after recovering BulkSync in ouroboros-network
Browse files Browse the repository at this point in the history
  • Loading branch information
facundominguez authored and neilmayhew committed Dec 18, 2024
1 parent 2adf35b commit f252906
Show file tree
Hide file tree
Showing 7 changed files with 99 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,8 @@ nonImmutableDbPath (MultipleDbPaths _ vol) = vol
--
-- See 'stdLowLevelRunNodeArgsIO'.
data StdRunNodeArgs m blk (p2p :: Diffusion.P2P) = StdRunNodeArgs
{ srnBfcMaxConcurrencyDeadline :: Maybe Word
{ srnBfcMaxConcurrencyBulkSync :: Maybe Word
, srnBfcMaxConcurrencyDeadline :: Maybe Word
, srnChainDbValidateOverride :: Bool
-- ^ If @True@, validate the ChainDB on init no matter what
, srnDiskPolicyArgs :: DiskPolicyArgs
Expand Down Expand Up @@ -985,6 +986,9 @@ stdLowLevelRunNodeArgsIO RunNodeArgs{ rnProtocolInfo
maybe id
(\mc bfc -> bfc { bfcMaxConcurrencyDeadline = mc })
srnBfcMaxConcurrencyDeadline
. maybe id
(\mc bfc -> bfc { bfcMaxConcurrencyBulkSync = mc })
srnBfcMaxConcurrencyBulkSync
modifyMempoolCapacityOverride =
maybe id
(\mc nka -> nka { mempoolCapacityOverride = mc })
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,26 +66,26 @@ data GenesisConfig = GenesisConfig

-- | Genesis configuration flags and low-level args, as parsed from config file or CLI
data GenesisConfigFlags = GenesisConfigFlags
{ gcfEnableCSJ :: Bool
, gcfEnableLoEAndGDD :: Bool
, gcfEnableLoP :: Bool
, gcfBulkSyncGracePeriod :: Maybe Integer
, gcfBucketCapacity :: Maybe Integer
, gcfBucketRate :: Maybe Integer
, gcfCSJJumpSize :: Maybe Integer
, gcfGDDRateLimit :: Maybe DiffTime
{ gcfEnableCSJ :: Bool
, gcfEnableLoEAndGDD :: Bool
, gcfEnableLoP :: Bool
, gcfBlockFetchGracePeriod :: Maybe Integer
, gcfBucketCapacity :: Maybe Integer
, gcfBucketRate :: Maybe Integer
, gcfCSJJumpSize :: Maybe Integer
, gcfGDDRateLimit :: Maybe DiffTime
} deriving stock (Eq, Generic, Show)

defaultGenesisConfigFlags :: GenesisConfigFlags
defaultGenesisConfigFlags = GenesisConfigFlags
{ gcfEnableCSJ = True
, gcfEnableLoEAndGDD = True
, gcfEnableLoP = True
, gcfBulkSyncGracePeriod = Nothing
, gcfBucketCapacity = Nothing
, gcfBucketRate = Nothing
, gcfCSJJumpSize = Nothing
, gcfGDDRateLimit = Nothing
{ gcfEnableCSJ = True
, gcfEnableLoEAndGDD = True
, gcfEnableLoP = True
, gcfBlockFetchGracePeriod = Nothing
, gcfBucketCapacity = Nothing
, gcfBucketRate = Nothing
, gcfCSJJumpSize = Nothing
, gcfGDDRateLimit = Nothing
}

enableGenesisConfigDefault :: GenesisConfig
Expand All @@ -99,7 +99,7 @@ mkGenesisConfig :: Maybe GenesisConfigFlags -> GenesisConfig
mkGenesisConfig Nothing = -- disable Genesis
GenesisConfig
{ gcBlockFetchConfig = GenesisBlockFetchConfiguration
{ gbfcBulkSyncGracePeriod = 0 -- no grace period when Genesis is disabled
{ gbfcGracePeriod = 0 -- no grace period when Genesis is disabled
}
, gcChainSyncLoPBucketConfig = ChainSyncLoPBucketDisabled
, gcCSJConfig = CSJDisabled
Expand All @@ -109,7 +109,7 @@ mkGenesisConfig Nothing = -- disable Genesis
mkGenesisConfig (Just GenesisConfigFlags{..}) =
GenesisConfig
{ gcBlockFetchConfig = GenesisBlockFetchConfiguration
{ gbfcBulkSyncGracePeriod
{ gbfcGracePeriod
}
, gcChainSyncLoPBucketConfig = if gcfEnableLoP
then ChainSyncLoPBucketEnabled ChainSyncLoPBucketEnabledConfig
Expand All @@ -134,7 +134,7 @@ mkGenesisConfig (Just GenesisConfigFlags{..}) =
-- The minimum amount of time during which the Genesis BlockFetch logic will
-- download blocks from a specific peer (even if it is not performing well
-- during that period).
defaultBulkSyncGracePeriod = 10 -- seconds
defaultBlockFetchGracePeriod = 10 -- seconds

-- LoP parameters. Empirically, it takes less than 1ms to validate a header,
-- so leaking one token per 2ms is conservative. The capacity of 100_000
Expand All @@ -153,11 +153,11 @@ mkGenesisConfig (Just GenesisConfigFlags{..}) =
-- Limiting the performance impact of the GDD.
defaultGDDRateLimit = 1.0 -- seconds

gbfcBulkSyncGracePeriod = fromInteger $ fromMaybe defaultBulkSyncGracePeriod gcfBulkSyncGracePeriod
csbcCapacity = fromInteger $ fromMaybe defaultCapacity gcfBucketCapacity
csbcRate = fromInteger $ fromMaybe defaultRate gcfBucketRate
csjcJumpSize = fromInteger $ fromMaybe defaultCSJJumpSize gcfCSJJumpSize
lgpGDDRateLimit = fromMaybe defaultGDDRateLimit gcfGDDRateLimit
gbfcGracePeriod = fromInteger $ fromMaybe defaultBlockFetchGracePeriod gcfBlockFetchGracePeriod
csbcCapacity = fromInteger $ fromMaybe defaultCapacity gcfBucketCapacity
csbcRate = fromInteger $ fromMaybe defaultRate gcfBucketRate
csjcJumpSize = fromInteger $ fromMaybe defaultCSJJumpSize gcfCSJJumpSize
lgpGDDRateLimit = fromMaybe defaultGDDRateLimit gcfGDDRateLimit

newtype LoEAndGDDParams = LoEAndGDDParams
{ -- | How often to evaluate GDD. 0 means as soon as possible.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ import Ouroboros.Network.AnchoredFragment (AnchoredFragment,
import qualified Ouroboros.Network.AnchoredFragment as AF
import Ouroboros.Network.Block (castTip, tipFromHeader)
import Ouroboros.Network.BlockFetch
import Ouroboros.Network.ConsensusMode (ConsensusMode (..))
import Ouroboros.Network.Diffusion (PublicPeerSelectionState)
import Ouroboros.Network.NodeToNode (ConnectionId,
MiniProtocolParameters (..))
Expand Down Expand Up @@ -378,6 +379,7 @@ initInternalState NodeKernelArgs { tracers, chainDB, registry, cfg
, mempoolCapacityOverride
, gsmArgs, getUseBootstrapPeers
, getDiffusionPipeliningSupport
, genesisArgs
} = do
varGsmState <- do
let GsmNodeKernelArgs {..} = gsmArgs
Expand All @@ -398,6 +400,7 @@ initInternalState NodeKernelArgs { tracers, chainDB, registry, cfg

slotForgeTimeOracle <- BlockFetchClientInterface.initSlotForgeTimeOracle cfg chainDB
let readFetchMode = BlockFetchClientInterface.readFetchModeDefault
(toConsensusMode $ gnkaLoEAndGDDArgs genesisArgs)
btime
(ChainDB.getCurrentChain chainDB)
getUseBootstrapPeers
Expand All @@ -416,6 +419,11 @@ initInternalState NodeKernelArgs { tracers, chainDB, registry, cfg
peerSharingRegistry <- newPeerSharingRegistry

return IS {..}
where
toConsensusMode :: forall a. LoEAndGDDConfig a -> ConsensusMode
toConsensusMode = \case
LoEAndGDDDisabled -> PraosMode
LoEAndGDDEnabled _ -> GenesisMode

forkBlockForging ::
forall m addrNTN addrNTC blk.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1013,10 +1013,11 @@ runThreadNetwork systemTime ThreadNetworkArgs
txSubmissionMaxUnacked = 1000 -- TODO ?
}
, blockFetchConfiguration = BlockFetchConfiguration {
bfcMaxConcurrencyDeadline = 2
bfcMaxConcurrencyBulkSync = 1
, bfcMaxConcurrencyDeadline = 2
, bfcMaxRequestsInflight = 10
, bfcDecisionLoopIntervalBulkSync = 0.0 -- Mock testsuite can use sub-second slot
, bfcDecisionLoopIntervalDeadline = 0.0 -- interval which doesn't play nice with
, bfcDecisionLoopIntervalPraos = 0.0 -- Mock testsuite can use sub-second slot
, bfcDecisionLoopIntervalGenesis = 0.0 -- interval which doesn't play nice with
-- blockfetch descision interval.
, bfcSalt = 0
, bfcGenesisBFConfig = gcBlockFetchConfig enableGenesisConfigDefault
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,12 @@ import Ouroboros.Consensus.Storage.ChainDB.API
import Ouroboros.Consensus.Util (ShowProxy)
import Ouroboros.Consensus.Util.IOLike
import Ouroboros.Network.BlockFetch (BlockFetchConfiguration (..),
FetchClientRegistry, FetchMode (..),
GenesisBlockFetchConfiguration (..), blockFetchLogic,
bracketFetchClient, bracketKeepAliveClient)
FetchClientRegistry, GenesisBlockFetchConfiguration (..),
blockFetchLogic, bracketFetchClient,
bracketKeepAliveClient)
import Ouroboros.Network.BlockFetch.Client (blockFetchClient)
import Ouroboros.Network.BlockFetch.ConsensusInterface
(FetchMode (..))
import Ouroboros.Network.Channel (Channel)
import Ouroboros.Network.ControlMessage (ControlMessageSTM)
import Ouroboros.Network.Driver (runPeer)
Expand Down Expand Up @@ -93,13 +95,13 @@ startBlockFetchLogic enableChainSelStarvation registry tracer chainDb fetchClien
-- do not serialize the blocks.
(\_hdr -> 1000)
slotForgeTime
-- This is a syncing test, so we use 'FetchModeBulkSync'.
(pure FetchModeBulkSync)
-- This is a syncing test, so we use 'FetchModeGenesis'.
(pure FetchModeGenesis)
DiffusionPipeliningOn

bfcGenesisBFConfig = if enableChainSelStarvation
then GenesisBlockFetchConfiguration
{ gbfcBulkSyncGracePeriod =
{ gbfcGracePeriod =
if enableChainSelStarvation then
10 -- default value for cardano-node at the time of writing
else
Expand All @@ -110,10 +112,11 @@ startBlockFetchLogic enableChainSelStarvation registry tracer chainDb fetchClien
-- Values taken from
-- ouroboros-consensus-diffusion/src/unstable-diffusion-testlib/Test/ThreadNet/Network.hs
blockFetchCfg = BlockFetchConfiguration
{ bfcMaxConcurrencyDeadline = 50 -- unused because of @pure FetchModeBulkSync@ above
{ bfcMaxConcurrencyBulkSync = 50
, bfcMaxConcurrencyDeadline = 50 -- unused because of @pure FetchModeBulkSync@ above
, bfcMaxRequestsInflight = 10
, bfcDecisionLoopIntervalBulkSync = 0
, bfcDecisionLoopIntervalDeadline = 0
, bfcDecisionLoopIntervalPraos = 0
, bfcDecisionLoopIntervalGenesis = 0
, bfcSalt = 0
, bfcGenesisBFConfig
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ import Ouroboros.Network.Block (MaxSlotNo)
import Ouroboros.Network.BlockFetch.ConsensusInterface
(BlockFetchConsensusInterface (..),
ChainSelStarvation (..), FetchMode (..),
FromConsensus (..))
FromConsensus (..), PraosFetchMode (..), mkReadFetchMode)
import Ouroboros.Network.ConsensusMode (ConsensusMode)
import Ouroboros.Network.PeerSelection.Bootstrap (UseBootstrapPeers,
requiresBootstrapPeers)
import Ouroboros.Network.PeerSelection.LedgerPeers.Type
Expand Down Expand Up @@ -142,37 +143,41 @@ initSlotForgeTimeOracle cfg chainDB = do

readFetchModeDefault ::
(MonadSTM m, HasHeader blk)
=> BlockchainTime m
=> ConsensusMode
-> BlockchainTime m
-> STM m (AnchoredFragment blk)
-> STM m UseBootstrapPeers
-> STM m LedgerStateJudgement
-> STM m FetchMode
readFetchModeDefault btime getCurrentChain
getUseBootstrapPeers getLedgerStateJudgement = do
mCurSlot <- getCurrentSlot btime
usingBootstrapPeers <- requiresBootstrapPeers <$> getUseBootstrapPeers
<*> getLedgerStateJudgement
readFetchModeDefault consensusMode btime getCurrentChain
getUseBootstrapPeers getLedgerStateJudgement =
mkReadFetchMode consensusMode getLedgerStateJudgement praosFetchMode
where
praosFetchMode = do
mCurSlot <- getCurrentSlot btime
usingBootstrapPeers <- requiresBootstrapPeers <$> getUseBootstrapPeers
<*> getLedgerStateJudgement

-- This logic means that when the node is using bootstrap peers and is in
-- TooOld state it will always return BulkSync. Otherwise if the node
-- isn't using bootstrap peers (i.e. has them disabled it will use the old
-- logic of returning BulkSync if behind 1000 slots
case (usingBootstrapPeers, mCurSlot) of
(True, _) -> return FetchModeBulkSync
(False, CurrentSlotUnknown) -> return FetchModeBulkSync
(False, CurrentSlot curSlot) -> do
curChainSlot <- AF.headSlot <$> getCurrentChain
let slotsBehind = case curChainSlot of
-- There's nothing in the chain. If the current slot is 0, then
-- we're 1 slot behind.
Origin -> unSlotNo curSlot + 1
NotOrigin slot -> unSlotNo curSlot - unSlotNo slot
maxSlotsBehind = 1000
return $ if slotsBehind < maxSlotsBehind
-- When the current chain is near to "now", use deadline mode,
-- when it is far away, use bulk sync mode.
then FetchModeDeadline
else FetchModeBulkSync
-- This logic means that when the node is using bootstrap peers and is in
-- TooOld state it will always return BulkSync. Otherwise if the node
-- isn't using bootstrap peers (i.e. has them disabled it will use the old
-- logic of returning BulkSync if behind 1000 slots
case (usingBootstrapPeers, mCurSlot) of
(True, _) -> return FetchModeBulkSync
(False, CurrentSlotUnknown) -> return FetchModeBulkSync
(False, CurrentSlot curSlot) -> do
curChainSlot <- AF.headSlot <$> getCurrentChain
let slotsBehind = case curChainSlot of
-- There's nothing in the chain. If the current slot is 0, then
-- we're 1 slot behind.
Origin -> unSlotNo curSlot + 1
NotOrigin slot -> unSlotNo curSlot - unSlotNo slot
maxSlotsBehind = 1000
return $ if slotsBehind < maxSlotsBehind
-- When the current chain is near to "now", use deadline mode,
-- when it is far away, use bulk sync mode.
then FetchModeDeadline
else FetchModeBulkSync

mkBlockFetchConsensusInterface ::
forall m peer blk.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ import Ouroboros.Network.BlockFetch (BlockFetchConfiguration (..),
bracketFetchClient, bracketKeepAliveClient,
bracketSyncWithFetchClient, newFetchClientRegistry)
import Ouroboros.Network.BlockFetch.Client (blockFetchClient)
import Ouroboros.Network.BlockFetch.ConsensusInterface
(PraosFetchMode (..))
import Ouroboros.Network.ControlMessage (ControlMessage (..))
import Ouroboros.Network.Mock.Chain (Chain)
import qualified Ouroboros.Network.Mock.Chain as Chain
Expand Down Expand Up @@ -97,8 +99,9 @@ prop_blockFetch bfcts@BlockFetchClientTestSetup{..} =
[ Map.keysSet bfcoBlockFetchResults === Map.keysSet peerUpdates
, counterexample ("Fetched blocks per peer: " <> condense bfcoFetchedBlocks) $
property $ case blockFetchMode of
FetchModeDeadline -> all (> 0) bfcoFetchedBlocks
FetchModeBulkSync -> any (> 0) bfcoFetchedBlocks
PraosFetchMode FetchModeDeadline -> all (> 0) bfcoFetchedBlocks
PraosFetchMode FetchModeBulkSync -> all (> 0) bfcoFetchedBlocks
FetchModeGenesis -> any (> 0) bfcoFetchedBlocks
]
where
BlockFetchClientOutcome{..} = runSimOrThrow $ runBlockFetchTest bfcts
Expand Down Expand Up @@ -361,18 +364,23 @@ instance Arbitrary BlockFetchClientTestSetup where
peerUpdates <-
Map.fromList . zip peerIds
<$> replicateM numPeers (genUpdateSchedule blockFetchPipelining)
blockFetchMode <- elements [FetchModeBulkSync, FetchModeDeadline]
blockFetchMode <- elements
[ PraosFetchMode FetchModeBulkSync
, PraosFetchMode FetchModeDeadline
, FetchModeGenesis
]
blockFetchCfg <- do
let -- ensure that we can download blocks from all peers
bfcMaxConcurrencyBulkSync = fromIntegral numPeers
bfcMaxConcurrencyDeadline = fromIntegral numPeers
-- This is used to introduce a minimal delay between BlockFetch
-- logic iterations in case the monitored state vars change too
-- fast, which we don't have to worry about in this test.
bfcDecisionLoopIntervalBulkSync = 0
bfcDecisionLoopIntervalDeadline = 0
bfcDecisionLoopIntervalGenesis = 0
bfcDecisionLoopIntervalPraos = 0
bfcMaxRequestsInflight <- chooseEnum (2, 10)
bfcSalt <- arbitrary
gbfcBulkSyncGracePeriod <- fromIntegral <$> chooseInteger (5, 60)
gbfcGracePeriod <- fromIntegral <$> chooseInteger (5, 60)
let bfcGenesisBFConfig = GenesisBlockFetchConfiguration {..}
pure BlockFetchConfiguration {..}
pure BlockFetchClientTestSetup {..}
Expand Down

0 comments on commit f252906

Please sign in to comment.