Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Some Praos sim fixes #68

Merged
merged 5 commits into from
Nov 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 14 additions & 12 deletions simulation/src/PraosProtocol/BlockFetch.hs
Original file line number Diff line number Diff line change
Expand Up @@ -322,10 +322,14 @@ longestChainSelection candidateChainVars cpsVar getHeader = do
cps <- readReadOnlyTVar cpsVar
let
chain = fmap getHeader cps.chainState
aux (mpId, c1) (pId, c2) =
let c = Chain.selectChain c1 c2
in if Chain.headPoint c == Chain.headPoint c1
then (mpId, c1)
aux x1@(_mpId, c1) (pId, c2) =
-- We use headHash to refine the order, so that we have less
-- partitioning in the network.
-- Actual implementation uses the VRF hash to be adversarial
-- resistant, but that's not a concern here.
let measure c = (Chain.headBlockNo c, Chain.headHash c)
in if measure c1 >= measure c2
then x1
else (Just pId, c2)
-- using foldl' since @selectChain@ is left biased
(selectedPeer, chain') = List.foldl' aux (Nothing, chain) candidateChains
Expand Down Expand Up @@ -512,20 +516,18 @@ updateChains ::
MonadSTM m =>
BlockFetchControllerState m ->
ChainsUpdate ->
STM m (Bool, Maybe FullTip)
STM m (Bool, Maybe (Chain Block))
updateChains BlockFetchControllerState{..} e =
case e of
FullChain fullChain -> do
FullChain !fullChain -> do
writeTVar targetChainVar Nothing
let !newTip = fullTip fullChain
modifyTVar' cpsVar (switchFork fullChain)
return (True, Just newTip)
return (True, Just fullChain)
ImprovedPrefix missingChain -> do
writeTVar targetChainVar (Just missingChain)
let improvedChain = fromMaybe (error "prefix not from Genesis") $ Chain.fromAnchoredFragment missingChain.prefix
!newTip = fullTip improvedChain
let !improvedChain = fromMaybe (error "prefix not from Genesis") $ Chain.fromAnchoredFragment missingChain.prefix
modifyTVar' cpsVar (switchFork improvedChain)
return (True, Just $ newTip)
return (True, Just improvedChain)
SamePrefix missingChain -> do
target <- readTVar targetChainVar
let useful = Just (headPointMChain missingChain) /= fmap headPointMChain target
Expand Down Expand Up @@ -557,7 +559,7 @@ addFetchedBlock tracer st pId blk = (traceNewTip tracer =<<) . atomically $ do
Just missingChain -> do
fmap snd $ updateChains st =<< fillInBlocks <$> readTVar st.blocksVar <*> pure missingChain

traceNewTip :: Monad m => Tracer m PraosNodeEvent -> Maybe FullTip -> m ()
traceNewTip :: Monad m => Tracer m PraosNodeEvent -> Maybe (Chain Block) -> m ()
traceNewTip tracer x =
case x of
Nothing -> return ()
Expand Down
6 changes: 3 additions & 3 deletions simulation/src/PraosProtocol/BlockGeneration.hs
Original file line number Diff line number Diff line change
Expand Up @@ -88,13 +88,13 @@ blockGenerator tracer praosConfig cpsVar addBlockSt (Just nextBlock) = forever $
let block = mkBlock chain sl body
if (Chain.headSlot chain <= At sl)
then
addBlockSt block >> return (Just block)
addBlockSt block >> return (Just (block, chain))
else return Nothing
case mblk of
Nothing -> return ()
Just blk -> do
Just (blk, chain) -> do
traceWith tracer (PraosNodeEventGenerate blk)
traceWith tracer (PraosNodeEventNewTip (FullTip (blockHeader blk)))
traceWith tracer (PraosNodeEventNewTip (chain Chain.:> blk))
waitForSlot sl = do
let tgt = slotTime praosConfig.slotConfig sl
now <- getCurrentTime
Expand Down
25 changes: 17 additions & 8 deletions simulation/src/PraosProtocol/ChainSync.hs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE OverloadedRecordDot #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE StandaloneDeriving #-}
{-# LANGUAGE TypeFamilies #-}
Expand Down Expand Up @@ -150,18 +151,24 @@ newtype ChainConsumerState m = ChainConsumerState
{ chainVar :: TVar m (Chain BlockHeader)
}

runChainConsumer :: MonadSTM m => Chan m ChainSyncMessage -> ChainConsumerState m -> m ()
runChainConsumer chan st =
void $ runPeerWithDriver (chanDriver decideChainSyncState chan) (chainConsumer st)
runChainConsumer ::
(MonadSTM m, MonadDelay m) =>
PraosConfig ->
Chan m ChainSyncMessage ->
ChainConsumerState m ->
m ()
runChainConsumer cfg chan st =
void $ runPeerWithDriver (chanDriver decideChainSyncState chan) (chainConsumer cfg st)

type ChainConsumer st m a = TC.Client ChainSyncState 'NonPipelined st m a

chainConsumer ::
forall m.
MonadSTM m =>
(MonadSTM m, MonadDelay m) =>
PraosConfig ->
ChainConsumerState m ->
ChainConsumer 'StIdle m ()
chainConsumer (ChainConsumerState hchainVar) = idle True
chainConsumer cfg (ChainConsumerState hchainVar) = idle True
where
-- NOTE: The specification says to do an initial intersection with
-- exponentially spaced points, and perform binary search to
Expand Down Expand Up @@ -194,9 +201,11 @@ chainConsumer (ChainConsumerState hchainVar) = idle True

rollForward :: BlockHeader -> ChainConsumer 'StIdle m ()
rollForward header =
TC.Effect $ atomically $ do
modifyTVar' hchainVar $ Chain.addBlock header
return $ idle False
TC.Effect $ do
threadDelaySI (cfg.headerValidationDelay header)
atomically $ do
modifyTVar' hchainVar $ Chain.addBlock header
return $ idle False

rollBackward :: Point BlockHeader -> ChainConsumer 'StIdle m ()
rollBackward hpoint =
Expand Down
18 changes: 15 additions & 3 deletions simulation/src/PraosProtocol/Common.hs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ module PraosProtocol.Common (
MessageSize (..),
kilobytes,
module TimeCompat,
defaultPraosConfig,
) where

import Control.Concurrent.Class.MonadSTM (
Expand Down Expand Up @@ -141,14 +142,25 @@ data PraosNodeEvent
= PraosNodeEventGenerate Block
| PraosNodeEventReceived Block
| PraosNodeEventEnterState Block
| PraosNodeEventNewTip FullTip
| PraosNodeEventNewTip (Chain Block)
deriving (Show)

data PraosConfig = PraosConfig
{ slotConfig :: SlotConfig
, blockValidationDelay :: Block -> DiffTime
{ slotConfig :: !SlotConfig
, blockValidationDelay :: !(Block -> DiffTime)
, headerValidationDelay :: !(BlockHeader -> DiffTime)
}

defaultPraosConfig :: MonadTime m => m PraosConfig
defaultPraosConfig = do
slotConfig <- slotConfigFromNow
return
PraosConfig
{ slotConfig
, blockValidationDelay = const 0.1
, headerValidationDelay = const 0.005
}

--------------------------------
---- Common Utility Types
--------------------------------
Expand Down
61 changes: 49 additions & 12 deletions simulation/src/PraosProtocol/ExamplesPraosP2P.hs
Original file line number Diff line number Diff line change
Expand Up @@ -4,28 +4,35 @@
{-# LANGUAGE DuplicateRecordFields #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE OverloadedRecordDot #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE TupleSections #-}
{-# LANGUAGE NoFieldSelectors #-}

module PraosProtocol.ExamplesPraosP2P where

import ChanDriver
import Control.Monad
import Data.Aeson
import qualified Data.ByteString.Char8 as BS8
import Data.Coerce (coerce)
import Data.Functor.Contravariant (Contravariant (contramap))
import qualified Data.IntMap.Strict as IMap
import qualified Data.List as List
import qualified Data.Map.Strict as Map
import Data.Maybe (fromMaybe, listToMaybe)
import GHC.Generics
import Network.TypedProtocol
import P2P (P2PTopography (p2pNodes), P2PTopographyCharacteristics (..), genArbitraryP2PTopography)
import PraosProtocol.BlockFetch
import PraosProtocol.BlockGeneration (PacketGenerationPattern (..))
import PraosProtocol.Common
import PraosProtocol.Common.Chain (Chain (Genesis))
import qualified PraosProtocol.Common.Chain as Chain
import PraosProtocol.PraosNode
import PraosProtocol.SimPraos
import PraosProtocol.SimPraosP2P
import PraosProtocol.VizSimPraos (DiffusionLatencyMap, PraosVizConfig (..), accumDiffusionLatency, examplesPraosSimVizConfig, praosSimVizModel)
import PraosProtocol.VizSimPraos (ChainsMap, DiffusionLatencyMap, PraosVizConfig (..), accumChains, accumDiffusionLatency, examplesPraosSimVizConfig, praosSimVizModel)
import PraosProtocol.VizSimPraosP2P
import Sample
import SimTCPLinks (mkTcpConnProps)
Expand Down Expand Up @@ -98,6 +105,7 @@ data DiffusionData = DiffusionData
{ topography :: P2PTopographyCharacteristics
, entries :: [DiffusionEntry]
, latency_per_stake :: [LatencyPerStake]
, stable_chain_hashes :: [Int]
}
deriving (Generic, ToJSON, FromJSON)

Expand All @@ -109,29 +117,56 @@ diffusionEntryToLatencyPerStake nnodes DiffusionEntry{..} =
}
where
bins = [0.5, 0.8, 0.9, 0.92, 0.94, 0.96, 0.98, 1]
bin xs = map (\b -> let ys = takeWhile (\(_, x) -> x <= b) xs in if null ys then (Nothing, b) else (Just $ fst $ last ys, b)) $ bins
bin xs = map (\b -> (,b) $ fst <$> listToMaybe (dropWhile (\(_, x) -> x < b) xs)) $ bins

diffusionSampleModel :: P2PTopographyCharacteristics -> FilePath -> SampleModel PraosEvent DiffusionLatencyMap
diffusionSampleModel p2pTopographyCharacteristics fp = SampleModel Map.empty accumDiffusionLatency render
data DiffusionLatencyState = DiffusionLatencyState
{ chains :: !ChainsMap
, diffusions :: !DiffusionLatencyMap
}

diffusionSampleModel :: P2PTopographyCharacteristics -> FilePath -> SampleModel PraosEvent DiffusionLatencyState
diffusionSampleModel p2pTopographyCharacteristics fp = SampleModel initState accum render
where
initState = DiffusionLatencyState IMap.empty Map.empty
accum t e DiffusionLatencyState{..} =
DiffusionLatencyState
{ chains = accumChains t e chains
, diffusions = accumDiffusionLatency t e diffusions
}
nnodes = p2pNumNodes p2pTopographyCharacteristics
render result = do
render DiffusionLatencyState{..} = do
let stable_chain = fromMaybe Genesis $ do
guard $ not $ IMap.null chains
pure $ List.foldl1' aux (IMap.elems chains)
aux c1 c2 = fromMaybe Genesis $ do
p <- Chain.intersectChains c1 c2
Chain.rollback p c1
let stable_chain_hashes = coerce $ map blockHash $ Chain.toNewestFirst stable_chain
let entries =
[ DiffusionEntry
{ hash = coerce hash'
, node_id = coerce i
, created = coerce t
, arrivals = coerce ts
}
| (hash', (_, i, t, ts)) <- Map.toList result
| (hash', (_, i, t, ts)) <- Map.toList diffusions
]
let diffusionData =
DiffusionData
{ topography = p2pTopographyCharacteristics
, entries
, latency_per_stake = map (diffusionEntryToLatencyPerStake nnodes) entries
, stable_chain_hashes
}

encodeFile fp $
DiffusionData
{ topography = p2pTopographyCharacteristics
, entries
, latency_per_stake = map (diffusionEntryToLatencyPerStake nnodes) entries
}
encodeFile fp diffusionData
putStrLn $ "Diffusion data written to " ++ fp

let arrived98 = unzip [(l.hash, d) | l <- diffusionData.latency_per_stake, (Just d, p) <- l.latencies, p == 0.98]
let missing = filter (not . (`elem` fst arrived98)) diffusionData.stable_chain_hashes
putStrLn $ "Number of blocks that reached 98% stake: " ++ show (length $ fst arrived98)
putStrLn $ "with a maximum diffusion latency: " ++ show (maximum $ snd arrived98)
putStrLn $ "Blocks in longest common prefix that did not reach 98% stake: " ++ show missing

-- | Diffusion example with 1000 nodes.
example1000Diffusion ::
Expand Down Expand Up @@ -180,6 +215,7 @@ example1Trace rng0 blockInterval p2pTopography =
PraosConfig
{ slotConfig
, blockValidationDelay = const 0.1 -- 100ms
, headerValidationDelay = const 0.005 -- 5ms
}
, blockMarker = BS8.pack $ show nid ++ ": "
, chain = Genesis
Expand Down Expand Up @@ -262,6 +298,7 @@ example2 =
PraosConfig
{ slotConfig
, blockValidationDelay = const 0.1 -- 100ms
, headerValidationDelay = const 0.005 -- 5ms
}
, chain = Genesis
, blockMarker = BS8.pack $ show nid ++ ": "
Expand Down
2 changes: 1 addition & 1 deletion simulation/src/PraosProtocol/PraosNode.hs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ runPeer ::
runPeer tracer cfg st peerId chan = do
let chainConsumerState = st.chainSyncConsumerStates Map.! peerId
let blockFetchConsumerState = initBlockFetchConsumerStateForPeerId tracer peerId st.blockFetchControllerState
[ Concurrently $ runChainConsumer (protocolChainSync chan) chainConsumerState
[ Concurrently $ runChainConsumer cfg (protocolChainSync chan) chainConsumerState
, Concurrently $ runBlockFetchConsumer tracer cfg (protocolBlockFetch chan) blockFetchConsumerState
]

Expand Down
3 changes: 1 addition & 2 deletions simulation/src/PraosProtocol/SimBlockFetch.hs
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,7 @@ traceRelayLink1 tcpprops =
[(NodeId 0, NodeId 1), (NodeId 1, NodeId 0)]
)
(inChan, outChan) <- newConnectionTCP (linkTracer na nb) tcpprops
slotConfig <- slotConfigFromNow
let praosConfig = PraosConfig{slotConfig, blockValidationDelay = const 0.1}
praosConfig <- defaultPraosConfig
concurrently_
(nodeA praosConfig outChan)
(nodeB inChan)
Expand Down
7 changes: 4 additions & 3 deletions simulation/src/PraosProtocol/SimChainSync.hs
Original file line number Diff line number Diff line change
Expand Up @@ -81,14 +81,15 @@ traceRelayLink1 tcpprops =
[(NodeId 0, NodeId 1), (NodeId 1, NodeId 0)]
)
(inChan, outChan) <- newConnectionTCP (linkTracer na nb) tcpprops
praosConfig <- defaultPraosConfig
concurrently_
(consumerNode inChan)
(consumerNode praosConfig inChan)
(producerNode outChan)
return ()
where
consumerNode chan = do
consumerNode cfg chan = do
st <- ChainConsumerState <$> newTVarIO Chain.Genesis
runChainConsumer chan st
runChainConsumer cfg chan st
producerNode chan = do
let chain = mkChainSimple $ replicate 10 (BlockBody $ BS.replicate 100 0)
let (cps, fId) = initFollower GenesisPoint $ initChainProducerState chain
Expand Down
3 changes: 1 addition & 2 deletions simulation/src/PraosProtocol/SimPraos.hs
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,7 @@ traceRelayLink1 tcpprops =
( Set.fromList
[(nodeA, nodeB), (nodeB, nodeA)]
)
slotConfig <- slotConfigFromNow
let praosConfig = PraosConfig{slotConfig, blockValidationDelay = const 0.1}
praosConfig <- defaultPraosConfig
let chainA = mkChainSimple $ [BlockBody (BS.singleton word) | word <- [0 .. 9]]
let chainB = Genesis
(pA, cB) <- newConnectionBundleTCP (praosTracer nodeA nodeB) tcpprops
Expand Down
10 changes: 9 additions & 1 deletion simulation/src/PraosProtocol/VizSimPraos.hs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ module PraosProtocol.VizSimPraos where
import ChanDriver
import Control.Exception (assert)
import Data.Coerce (coerce)
import Data.IntMap (IntMap)
import qualified Data.IntMap.Strict as IMap
import Data.Map (Map)
import qualified Data.Map.Strict as Map
import Data.Maybe (fromMaybe)
Expand Down Expand Up @@ -133,6 +135,12 @@ data LinkPoints
{-# UNPACK #-} !Point
deriving (Show)

type ChainsMap = IntMap (Chain Block)

accumChains :: Time -> PraosEvent -> ChainsMap -> ChainsMap
accumChains _ (PraosEventNode (LabelNode nid (PraosNodeEventNewTip ch))) = IMap.insert (coerce nid) ch
accumChains _ _ = id

type DiffusionLatencyMap = Map (HeaderHash BlockHeader) (BlockHeader, NodeId, Time, [Time])

accumDiffusionLatency :: Time -> PraosEvent -> DiffusionLatencyMap -> DiffusionLatencyMap
Expand Down Expand Up @@ -202,7 +210,7 @@ praosSimVizModel =
links
}
accumEventVizState _now (PraosEventNode (LabelNode nid (PraosNodeEventNewTip tip))) vs =
vs{vizNodeTip = Map.insert nid tip (vizNodeTip vs)}
vs{vizNodeTip = Map.insert nid (fullTip tip) (vizNodeTip vs)}
accumEventVizState now (PraosEventNode (LabelNode nid (PraosNodeEventGenerate blk))) vs =
vs
{ vizMsgsAtNodeBuffer =
Expand Down
Loading