Skip to content

Commit

Permalink
BlockFetch client test: allow invalid peer behavior
Browse files Browse the repository at this point in the history
Right now, the peer disconnection logic relies on the interplay of the
BlockFetch and the ChainSync client to catch invalid behavior, so this commit
adds an actual ChainSync client for every peer.

Concretely, consider the case when a peer wants to extend an invalid block. In
that case, the ChainSync client will disconnect, either when the extending
header is received, or via the invalid block rejector in a background thread.

In contrast, when we simply add a block (together with a punishment) to the
ChainDB, this punishment will *not* be enacted, as the block is not validated as
it is not reachable via any (valid) block in the VolDB.
  • Loading branch information
amesgen committed Jul 21, 2022
1 parent 0c1b1f5 commit f0aa7d0
Showing 1 changed file with 147 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@ import Control.Monad.Class.MonadTime
import Control.Monad.IOSim (runSimOrThrow)
import Control.Tracer (Tracer (..), nullTracer, traceWith)
import Data.Bifunctor (first)
import Data.Either (isLeft)
import Data.Functor.Contravariant ((>$<))
import Data.Hashable (Hashable)
import Data.List (intercalate)
import Data.Map.Strict (Map)
import qualified Data.Map.Strict as Map
import Data.Traversable (for)
Expand All @@ -50,6 +53,7 @@ import Ouroboros.Network.Channel (createConnectedChannels)
import qualified Ouroboros.Network.Driver.Simple as Driver
import Ouroboros.Network.MockChain.Chain (Chain)
import qualified Ouroboros.Network.MockChain.Chain as Chain
import qualified Ouroboros.Network.MockChain.ProducerState as CPS
import qualified Ouroboros.Network.Mux as Mux
import Ouroboros.Network.NodeToNode.Version (NodeToNodeVersion)
import Ouroboros.Network.Protocol.BlockFetch.Codec (codecBlockFetchId)
Expand All @@ -59,6 +63,15 @@ import Ouroboros.Network.Protocol.BlockFetch.Server
BlockFetchServer (..), blockFetchServerPeer)
import Ouroboros.Network.Protocol.BlockFetch.Type (ChainRange (..),
Message (MsgBlock))
import Ouroboros.Network.Protocol.ChainSync.ClientPipelined
(chainSyncClientPeerPipelined)
import Ouroboros.Network.Protocol.ChainSync.Codec (codecChainSyncId)
import Ouroboros.Network.Protocol.ChainSync.Examples
(chainSyncServerExample)
import Ouroboros.Network.Protocol.ChainSync.PipelineDecision
(pipelineDecisionLowHighMark)
import Ouroboros.Network.Protocol.ChainSync.Server
(chainSyncServerPeer)

import Ouroboros.Consensus.Block
import Ouroboros.Consensus.Config
Expand All @@ -69,6 +82,10 @@ import Ouroboros.Consensus.Fragment.Validated
import Ouroboros.Consensus.HardFork.History.EraParams
(EraParams (eraEpochSize))
import qualified Ouroboros.Consensus.MiniProtocol.BlockFetch.ClientInterface as BlockFetchClientInterface
import Ouroboros.Consensus.MiniProtocol.ChainSync.Client
(ChainSyncClientException, bracketChainSyncClient,
chainSyncClient)
import qualified Ouroboros.Consensus.MiniProtocol.ChainSync.Client as ChainSyncClient
import Ouroboros.Consensus.Node.ProtocolInfo (NumCoreNodes (..))
import qualified Ouroboros.Consensus.Storage.ChainDB.API as ChainDB
import Ouroboros.Consensus.Storage.ChainDB.Impl (ChainDbArgs (..))
Expand All @@ -80,10 +97,10 @@ import Ouroboros.Consensus.Storage.LedgerDB.DiskPolicy
defaultDiskPolicy)
import qualified Ouroboros.Consensus.Storage.VolatileDB as VolatileDB
import Ouroboros.Consensus.Util.Condense (Condense (..))
import Ouroboros.Consensus.Util.Exception (catchAlsoLinked)
import Ouroboros.Consensus.Util.IOLike
import Ouroboros.Consensus.Util.ResourceRegistry
import Ouroboros.Consensus.Util.STM (blockUntilJust,
forkLinkedWatcher)
import Ouroboros.Consensus.Util.STM (forkLinkedWatcher)

import Test.Util.ChainUpdates
import qualified Test.Util.FS.Sim.MockFS as MockFS
Expand All @@ -106,10 +123,25 @@ prop_blockFetch bfcts@BlockFetchClientTestSetup{..} =
counterexample ("Trace:\n" <> unlines (ppTrace <$> bfcoTrace)) $
counterexample (condense bfcts) $
conjoin $
[ noException ("BlockFetch client " <> condense peerId) res
| (peerId, res) <- Map.toList bfcoBlockFetchResults
[ counterexample ("Classified behavior: " <> show classifiedBehavior) $
case behaviorValidity classifiedBehavior of
Valid -> conjoin
[ noException ("BlockFetch client " <> condense peerId) blockFetchRes
, noException ("ChainSync client " <> condense peerId) chainSyncRes
]
Invalid ->
counterexample "Invalid behavior not caught" $
tabulateFailureMode $
property (isLeft blockFetchRes || isLeft chainSyncRes)
| (peerId, PeerOutcome{..}) <- Map.toList bfcoPeerOutcomes
, let classifiedBehavior =
classifyBehavior (joinSchedule $ peerUpdates Map.! peerId)
tabulateFailureMode = tabulate "Expected failure due to" $
pure . intercalate "," $
["BlockFetch" | isLeft blockFetchRes]
<> ["ChainSync" | isLeft chainSyncRes]
] <>
[ Map.keysSet bfcoBlockFetchResults === Map.keysSet peerUpdates
[ Map.keysSet bfcoPeerOutcomes === Map.keysSet peerUpdates
, counterexample ("Fetched blocks per peer: " <> condense bfcoFetchedBlocks) $
property $ all (> 0) bfcoFetchedBlocks
]
Expand All @@ -128,9 +160,14 @@ prop_blockFetch bfcts@BlockFetchClientTestSetup{..} =
-------------------------------------------------------------------------------}

data BlockFetchClientOutcome = BlockFetchClientOutcome {
bfcoBlockFetchResults :: Map PeerId (Either SomeException ())
, bfcoFetchedBlocks :: Map PeerId Word
, bfcoTrace :: [(Tick, String)]
bfcoPeerOutcomes :: Map PeerId PeerOutcome
, bfcoFetchedBlocks :: Map PeerId Word
, bfcoTrace :: [(Tick, String)]
}

data PeerOutcome = PeerOutcome {
blockFetchRes :: Either SomeException ()
, chainSyncRes :: Either ChainSyncClientException ()
}

runBlockFetchTest ::
Expand All @@ -139,23 +176,25 @@ runBlockFetchTest ::
=> BlockFetchClientTestSetup
-> m BlockFetchClientOutcome
runBlockFetchTest BlockFetchClientTestSetup{..} = withRegistry \registry -> do
varChains <- uncheckedNewTVarM Map.empty
varControlMessage <- uncheckedNewTVarM Mux.Continue
varFetchedBlocks <- uncheckedNewTVarM (0 <$ peerUpdates)
varCandidates <- uncheckedNewTVarM Map.empty
varControlMessage <- uncheckedNewTVarM Mux.Continue
varFetchedBlocks <- uncheckedNewTVarM (0 <$ peerUpdates)
varChains <- uncheckedNewTVarM (Chain.Genesis <$ peerUpdates)
varChainSyncRes <- uncheckedNewTVarM (Right () <$ peerUpdates)
producerStateVars <- for peerUpdates $ const $
uncheckedNewTVarM $ CPS.initChainProducerState Chain.Genesis

fetchClientRegistry <- newFetchClientRegistry
clock <- LogicalClock.new registry $
LogicalClock.sufficientTimeFor $ lastTick <$> Map.elems peerUpdates
(tracer, getTrace) <-
first (LogicalClock.tickTracer clock) <$> recordingTracerTVar
chainDbView <- mkChainDbView registry tracer

let getCandidates = Map.map chainToAnchoredFragment <$> readTVar varChains
chainDB <- mkChainDB registry tracer

blockFetchConsensusInterface =
let blockFetchConsensusInterface =
mkTestBlockFetchConsensusInterface
(Map.map (AF.mapAnchoredFragment getHeader) <$> getCandidates)
chainDbView
(readTVar varCandidates >>= traverse readTVar)
chainDB

_ <- forkLinkedThread registry "BlockFetchLogic" $
blockFetchLogic
Expand All @@ -175,7 +214,9 @@ runBlockFetchTest BlockFetchClientTestSetup{..} = withRegistry \registry -> do
bfServer =
blockFetchServerPeer $ mockBlockFetchServer getCurrentChain
where
getCurrentChain = atomically $ (Map.! peerId) <$> getCandidates
getCurrentChain =
chainToAnchoredFragment . (Map.! peerId)
<$> readTVarIO varChains

blockFetchTracer = Tracer \case
(Driver.Client, ev) -> do
Expand All @@ -193,8 +234,6 @@ runBlockFetchTest BlockFetchClientTestSetup{..} = withRegistry \registry -> do
bfClient
bfServer

-- On every tick, we schedule updates to the shared chain fragment
-- (mocking ChainSync).
forkTicking peerId =
forkLinkedWatcher registry ("TickWatcher " <> condense peerId) $
LogicalClock.tickWatcher clock \tick -> atomically do
Expand All @@ -205,19 +244,52 @@ runBlockFetchTest BlockFetchClientTestSetup{..} = withRegistry \registry -> do
case Chain.applyChainUpdates updates chain of
Just chain' -> chain'
Nothing -> error "Chain update failed"
-- Block until our "ChainSync" thread registered itself to the
-- FetchClientRegistry, see 'forkChainSync' below.
_ <- blockUntilJust $ Map.lookup peerId <$> readTVar varChains
updateCPS cps =
case CPS.applyChainUpdates (fmap TestHeader <$> updates) cps of
Just cps' -> cps'
Nothing -> error "Chain update failed"
modifyTVar varChains $ Map.adjust updateChain peerId
modifyTVar (producerStateVars Map.! peerId) updateCPS

forkChainSync peerId =
forkLinkedThread registry ("BracketSync" <> condense peerId) $
forkThread registry ("BracketSync" <> condense peerId) $
recordChainSyncExceptions $
bracketSyncWithFetchClient fetchClientRegistry peerId $ do
let modifyChains = atomically . modifyTVar varChains
bracket_
(modifyChains $ Map.insert peerId Chain.Genesis)
(modifyChains $ Map.delete peerId)
(forkTicking peerId >>= waitThread)
let chainSyncTracer =
(\ev -> show peerId <> ": ChainSyncClient: " <> show ev)
>$< tracer
chainDbView = (ChainSyncClient.defaultChainDbView chainDB) {
ChainSyncClient.getCurrentChain =
pure $ AF.Empty AF.AnchorGenesis
}
client =
chainSyncClient
(pipelineDecisionLowHighMark 10 20)
chainSyncTracer
topLevelConfig
chainDbView
ntnVersion
(pure Mux.Continue)
nullTracer
server =
chainSyncServerExample () (producerStateVars Map.! peerId)
bracketChainSyncClient
chainSyncTracer
chainDbView
varCandidates
peerId
ntnVersion $ \varCandidate ->
Driver.runConnectedPeersPipelined
createConnectedChannels
nullTracer
codecChainSyncId
(chainSyncClientPeerPipelined $ client varCandidate)
(chainSyncServerPeer server)
where
recordChainSyncExceptions = flip catchAlsoLinked $ \ex -> do
atomically $ modifyTVar varChainSyncRes $
Map.insert peerId (Left ex)
throwIO ex

-- The BlockFetch logic requires initializing the KeepAlive
-- miniprotocol, even if it does not do anything.
Expand All @@ -227,6 +299,7 @@ runBlockFetchTest BlockFetchClientTestSetup{..} = withRegistry \registry -> do
infiniteDelay

blockFetchThreads <- Map.fromList <$> for peerIds \peerId -> do
_ <- forkTicking peerId
_ <- forkChainSync peerId
_ <- forkKeepAlive peerId
fmap (peerId,) $
Expand All @@ -236,20 +309,29 @@ runBlockFetchTest BlockFetchClientTestSetup{..} = withRegistry \registry -> do
LogicalClock.waitUntilDone clock
atomically $ writeTVar varControlMessage Mux.Terminate

bfcoBlockFetchResults <- traverse waitThread blockFetchThreads
bfcoFetchedBlocks <- readTVarIO varFetchedBlocks
bfcoTrace <- getTrace
bfcoPeerOutcomes <-
flip Map.traverseWithKey blockFetchThreads \peerId threadId -> do
blockFetchRes <- waitThread threadId
chainSyncRes <- (Map.! peerId) <$> readTVarIO varChainSyncRes
pure PeerOutcome {..}
bfcoFetchedBlocks <- readTVarIO varFetchedBlocks
bfcoTrace <- getTrace
pure BlockFetchClientOutcome {..}
where
peerIds = Map.keys peerUpdates

numCoreNodes = NumCoreNodes $ fromIntegral $ Map.size peerUpdates + 1

mkChainDbView ::
-- Needs to be larger than any chain length in this test, to ensure that
-- switching to any chain is never too deep.
securityParam = SecurityParam 1000
topLevelConfig = singleNodeTestConfigWithK securityParam

mkChainDB ::
ResourceRegistry m
-> Tracer m String
-> m (BlockFetchClientInterface.ChainDbView m TestBlock)
mkChainDbView registry tracer = do
-> m (ChainDB.ChainDB m TestBlock)
mkChainDB registry tracer = do
chainDbArgs <- do
cdbHasFSImmutableDB <- mockedHasFS
cdbHasFSVolatileDB <- mockedHasFS
Expand Down Expand Up @@ -278,20 +360,9 @@ runBlockFetchTest BlockFetchClientTestSetup{..} = withRegistry \registry -> do
(\_ -> ChainDBImpl.openDBInternal chainDbArgs False)
(ChainDB.closeDB . fst)
_ <- forkLinkedThread registry "AddBlockRunner" intAddBlockRunner

let -- Always return the empty chain such that the BlockFetch logic
-- downloads all chains.
getCurrentChain = pure $ AF.Empty AF.AnchorGenesis
getIsFetched = ChainDB.getIsFetched chainDB
getMaxSlotNo = ChainDB.getMaxSlotNo chainDB
addBlockWaitWrittenToDisk = ChainDB.addBlockWaitWrittenToDisk chainDB
pure BlockFetchClientInterface.ChainDbView {..}
pure chainDB
where
-- Needs to be larger than any chain length in this test, to ensure that
-- switching to any chain is never too deep.
securityParam = SecurityParam 1000
topLevelConfig = singleNodeTestConfigWithK securityParam
epochSize = eraEpochSize $ topLevelConfigLedger topLevelConfig
epochSize = eraEpochSize $ topLevelConfigLedger topLevelConfig

mockedHasFS = SomeHasFS . simHasFS <$> newTVarIO MockFS.empty

Expand All @@ -305,9 +376,9 @@ runBlockFetchTest BlockFetchClientTestSetup{..} = withRegistry \registry -> do

mkTestBlockFetchConsensusInterface ::
STM m (Map PeerId (AnchoredFragment (Header TestBlock)))
-> BlockFetchClientInterface.ChainDbView m TestBlock
-> ChainDB.ChainDB m TestBlock
-> BlockFetchConsensusInterface PeerId (Header TestBlock) TestBlock m
mkTestBlockFetchConsensusInterface getCandidates chainDbView =
mkTestBlockFetchConsensusInterface getCandidates chainDB =
BlockFetchClientInterface.mkBlockFetchConsensusInterface
(TestBlockConfig numCoreNodes)
chainDbView
Expand All @@ -321,6 +392,15 @@ runBlockFetchTest BlockFetchClientTestSetup{..} = withRegistry \registry -> do
slotForgeTime :: BlockFetchClientInterface.SlotForgeTimeOracle m blk
slotForgeTime _ = pure dawnOfTime

chainDbView = BlockFetchClientInterface.ChainDbView {..}
where
-- Always return the empty chain such that the BlockFetch logic
-- downloads all chains.
getCurrentChain = pure $ AF.Empty AF.AnchorGenesis
getIsFetched = ChainDB.getIsFetched chainDB
getMaxSlotNo = ChainDB.getMaxSlotNo chainDB
addBlockWaitWrittenToDisk = ChainDB.addBlockWaitWrittenToDisk chainDB

mockBlockFetchServer ::
forall m blk.
(Monad m, HasHeader blk)
Expand Down Expand Up @@ -375,11 +455,19 @@ instance Condense BlockFetchClientTestSetup where

instance Arbitrary BlockFetchClientTestSetup where
arbitrary = do
numPeers <- chooseInt (1, 3)
behavior <- elements
[ SelectedChainBehavior
, TentativeChainBehavior
, InvalidChainBehavior
]
numPeers <- case behaviorValidity behavior of
-- Interaction of multiple clients can hide invalid behavior.
Invalid -> pure 1
Valid -> chooseInt (1, 3)
let peerIds = PeerId <$> [1 .. numPeers]
peerUpdates <-
Map.fromList . zip peerIds
<$> replicateM numPeers genUpdateSchedule
<$> replicateM numPeers (genUpdateSchedule behavior)
blockFetchMode <- elements [FetchModeBulkSync, FetchModeDeadline]
blockFetchCfg <- do
let -- ensure that we can download blocks from all peers
Expand All @@ -394,9 +482,13 @@ instance Arbitrary BlockFetchClientTestSetup where
pure BlockFetchConfiguration {..}
pure BlockFetchClientTestSetup {..}
where
genUpdateSchedule =
genChainUpdates TentativeChainBehavior maxRollback 20
>>= genSchedule DefaultSchedulingStrategy
genUpdateSchedule behavior =
genChainUpdates behavior maxRollback 20 >>= genSchedule strat
where
strat = case behaviorValidity behavior of
-- Multiple updates per tick can hide invalid behavior.
Invalid -> SingleItemPerTickStrategy
Valid -> DefaultSchedulingStrategy

-- Only use a small k to avoid rolling forward by a big chain.
maxRollback = SecurityParam 5
Expand Down

0 comments on commit f0aa7d0

Please sign in to comment.