From f0aa7d0ebc6a751f8978d8c6e835cea80f5034f3 Mon Sep 17 00:00:00 2001 From: Alexander Esgen Date: Mon, 4 Jul 2022 09:46:01 +0200 Subject: [PATCH] BlockFetch client test: allow invalid peer behavior Right now, the peer disconnection logic relies on the interplay of the BlockFetch and the ChainSync client to catch invalid behavior, so this commit adds an actual ChainSync client for every peer. Concretely, consider the case when a peer wants to extend an invalid block. In that case, the ChainSync client will disconnect, either when the extending header is received, or via the invalid block rejector in a background thread. In contrast, when we simply add a block (together with a punishment) to the ChainDB, this punishment will *not* be enacted, as the block is not validated as it is not reachable via any (valid) block in the VolDB. --- .../MiniProtocol/BlockFetch/Client.hs | 202 +++++++++++++----- 1 file changed, 147 insertions(+), 55 deletions(-) diff --git a/ouroboros-consensus-test/test-consensus/Test/Consensus/MiniProtocol/BlockFetch/Client.hs b/ouroboros-consensus-test/test-consensus/Test/Consensus/MiniProtocol/BlockFetch/Client.hs index 8df7d13fad8..4016cc44deb 100644 --- a/ouroboros-consensus-test/test-consensus/Test/Consensus/MiniProtocol/BlockFetch/Client.hs +++ b/ouroboros-consensus-test/test-consensus/Test/Consensus/MiniProtocol/BlockFetch/Client.hs @@ -27,7 +27,10 @@ import Control.Monad.Class.MonadTime import Control.Monad.IOSim (runSimOrThrow) import Control.Tracer (Tracer (..), nullTracer, traceWith) import Data.Bifunctor (first) +import Data.Either (isLeft) +import Data.Functor.Contravariant ((>$<)) import Data.Hashable (Hashable) +import Data.List (intercalate) import Data.Map.Strict (Map) import qualified Data.Map.Strict as Map import Data.Traversable (for) @@ -50,6 +53,7 @@ import Ouroboros.Network.Channel (createConnectedChannels) import qualified Ouroboros.Network.Driver.Simple as Driver import Ouroboros.Network.MockChain.Chain (Chain) import qualified Ouroboros.Network.MockChain.Chain as Chain +import qualified Ouroboros.Network.MockChain.ProducerState as CPS import qualified Ouroboros.Network.Mux as Mux import Ouroboros.Network.NodeToNode.Version (NodeToNodeVersion) import Ouroboros.Network.Protocol.BlockFetch.Codec (codecBlockFetchId) @@ -59,6 +63,15 @@ import Ouroboros.Network.Protocol.BlockFetch.Server BlockFetchServer (..), blockFetchServerPeer) import Ouroboros.Network.Protocol.BlockFetch.Type (ChainRange (..), Message (MsgBlock)) +import Ouroboros.Network.Protocol.ChainSync.ClientPipelined + (chainSyncClientPeerPipelined) +import Ouroboros.Network.Protocol.ChainSync.Codec (codecChainSyncId) +import Ouroboros.Network.Protocol.ChainSync.Examples + (chainSyncServerExample) +import Ouroboros.Network.Protocol.ChainSync.PipelineDecision + (pipelineDecisionLowHighMark) +import Ouroboros.Network.Protocol.ChainSync.Server + (chainSyncServerPeer) import Ouroboros.Consensus.Block import Ouroboros.Consensus.Config @@ -69,6 +82,10 @@ import Ouroboros.Consensus.Fragment.Validated import Ouroboros.Consensus.HardFork.History.EraParams (EraParams (eraEpochSize)) import qualified Ouroboros.Consensus.MiniProtocol.BlockFetch.ClientInterface as BlockFetchClientInterface +import Ouroboros.Consensus.MiniProtocol.ChainSync.Client + (ChainSyncClientException, bracketChainSyncClient, + chainSyncClient) +import qualified Ouroboros.Consensus.MiniProtocol.ChainSync.Client as ChainSyncClient import Ouroboros.Consensus.Node.ProtocolInfo (NumCoreNodes (..)) import qualified Ouroboros.Consensus.Storage.ChainDB.API as ChainDB import Ouroboros.Consensus.Storage.ChainDB.Impl (ChainDbArgs (..)) @@ -80,10 +97,10 @@ import Ouroboros.Consensus.Storage.LedgerDB.DiskPolicy defaultDiskPolicy) import qualified Ouroboros.Consensus.Storage.VolatileDB as VolatileDB import Ouroboros.Consensus.Util.Condense (Condense (..)) +import Ouroboros.Consensus.Util.Exception (catchAlsoLinked) import Ouroboros.Consensus.Util.IOLike import Ouroboros.Consensus.Util.ResourceRegistry -import Ouroboros.Consensus.Util.STM (blockUntilJust, - forkLinkedWatcher) +import Ouroboros.Consensus.Util.STM (forkLinkedWatcher) import Test.Util.ChainUpdates import qualified Test.Util.FS.Sim.MockFS as MockFS @@ -106,10 +123,25 @@ prop_blockFetch bfcts@BlockFetchClientTestSetup{..} = counterexample ("Trace:\n" <> unlines (ppTrace <$> bfcoTrace)) $ counterexample (condense bfcts) $ conjoin $ - [ noException ("BlockFetch client " <> condense peerId) res - | (peerId, res) <- Map.toList bfcoBlockFetchResults + [ counterexample ("Classified behavior: " <> show classifiedBehavior) $ + case behaviorValidity classifiedBehavior of + Valid -> conjoin + [ noException ("BlockFetch client " <> condense peerId) blockFetchRes + , noException ("ChainSync client " <> condense peerId) chainSyncRes + ] + Invalid -> + counterexample "Invalid behavior not caught" $ + tabulateFailureMode $ + property (isLeft blockFetchRes || isLeft chainSyncRes) + | (peerId, PeerOutcome{..}) <- Map.toList bfcoPeerOutcomes + , let classifiedBehavior = + classifyBehavior (joinSchedule $ peerUpdates Map.! peerId) + tabulateFailureMode = tabulate "Expected failure due to" $ + pure . intercalate "," $ + ["BlockFetch" | isLeft blockFetchRes] + <> ["ChainSync" | isLeft chainSyncRes] ] <> - [ Map.keysSet bfcoBlockFetchResults === Map.keysSet peerUpdates + [ Map.keysSet bfcoPeerOutcomes === Map.keysSet peerUpdates , counterexample ("Fetched blocks per peer: " <> condense bfcoFetchedBlocks) $ property $ all (> 0) bfcoFetchedBlocks ] @@ -128,9 +160,14 @@ prop_blockFetch bfcts@BlockFetchClientTestSetup{..} = -------------------------------------------------------------------------------} data BlockFetchClientOutcome = BlockFetchClientOutcome { - bfcoBlockFetchResults :: Map PeerId (Either SomeException ()) - , bfcoFetchedBlocks :: Map PeerId Word - , bfcoTrace :: [(Tick, String)] + bfcoPeerOutcomes :: Map PeerId PeerOutcome + , bfcoFetchedBlocks :: Map PeerId Word + , bfcoTrace :: [(Tick, String)] + } + +data PeerOutcome = PeerOutcome { + blockFetchRes :: Either SomeException () + , chainSyncRes :: Either ChainSyncClientException () } runBlockFetchTest :: @@ -139,23 +176,25 @@ runBlockFetchTest :: => BlockFetchClientTestSetup -> m BlockFetchClientOutcome runBlockFetchTest BlockFetchClientTestSetup{..} = withRegistry \registry -> do - varChains <- uncheckedNewTVarM Map.empty - varControlMessage <- uncheckedNewTVarM Mux.Continue - varFetchedBlocks <- uncheckedNewTVarM (0 <$ peerUpdates) + varCandidates <- uncheckedNewTVarM Map.empty + varControlMessage <- uncheckedNewTVarM Mux.Continue + varFetchedBlocks <- uncheckedNewTVarM (0 <$ peerUpdates) + varChains <- uncheckedNewTVarM (Chain.Genesis <$ peerUpdates) + varChainSyncRes <- uncheckedNewTVarM (Right () <$ peerUpdates) + producerStateVars <- for peerUpdates $ const $ + uncheckedNewTVarM $ CPS.initChainProducerState Chain.Genesis fetchClientRegistry <- newFetchClientRegistry clock <- LogicalClock.new registry $ LogicalClock.sufficientTimeFor $ lastTick <$> Map.elems peerUpdates (tracer, getTrace) <- first (LogicalClock.tickTracer clock) <$> recordingTracerTVar - chainDbView <- mkChainDbView registry tracer - - let getCandidates = Map.map chainToAnchoredFragment <$> readTVar varChains + chainDB <- mkChainDB registry tracer - blockFetchConsensusInterface = + let blockFetchConsensusInterface = mkTestBlockFetchConsensusInterface - (Map.map (AF.mapAnchoredFragment getHeader) <$> getCandidates) - chainDbView + (readTVar varCandidates >>= traverse readTVar) + chainDB _ <- forkLinkedThread registry "BlockFetchLogic" $ blockFetchLogic @@ -175,7 +214,9 @@ runBlockFetchTest BlockFetchClientTestSetup{..} = withRegistry \registry -> do bfServer = blockFetchServerPeer $ mockBlockFetchServer getCurrentChain where - getCurrentChain = atomically $ (Map.! peerId) <$> getCandidates + getCurrentChain = + chainToAnchoredFragment . (Map.! peerId) + <$> readTVarIO varChains blockFetchTracer = Tracer \case (Driver.Client, ev) -> do @@ -193,8 +234,6 @@ runBlockFetchTest BlockFetchClientTestSetup{..} = withRegistry \registry -> do bfClient bfServer - -- On every tick, we schedule updates to the shared chain fragment - -- (mocking ChainSync). forkTicking peerId = forkLinkedWatcher registry ("TickWatcher " <> condense peerId) $ LogicalClock.tickWatcher clock \tick -> atomically do @@ -205,19 +244,52 @@ runBlockFetchTest BlockFetchClientTestSetup{..} = withRegistry \registry -> do case Chain.applyChainUpdates updates chain of Just chain' -> chain' Nothing -> error "Chain update failed" - -- Block until our "ChainSync" thread registered itself to the - -- FetchClientRegistry, see 'forkChainSync' below. - _ <- blockUntilJust $ Map.lookup peerId <$> readTVar varChains + updateCPS cps = + case CPS.applyChainUpdates (fmap TestHeader <$> updates) cps of + Just cps' -> cps' + Nothing -> error "Chain update failed" modifyTVar varChains $ Map.adjust updateChain peerId + modifyTVar (producerStateVars Map.! peerId) updateCPS forkChainSync peerId = - forkLinkedThread registry ("BracketSync" <> condense peerId) $ + forkThread registry ("BracketSync" <> condense peerId) $ + recordChainSyncExceptions $ bracketSyncWithFetchClient fetchClientRegistry peerId $ do - let modifyChains = atomically . modifyTVar varChains - bracket_ - (modifyChains $ Map.insert peerId Chain.Genesis) - (modifyChains $ Map.delete peerId) - (forkTicking peerId >>= waitThread) + let chainSyncTracer = + (\ev -> show peerId <> ": ChainSyncClient: " <> show ev) + >$< tracer + chainDbView = (ChainSyncClient.defaultChainDbView chainDB) { + ChainSyncClient.getCurrentChain = + pure $ AF.Empty AF.AnchorGenesis + } + client = + chainSyncClient + (pipelineDecisionLowHighMark 10 20) + chainSyncTracer + topLevelConfig + chainDbView + ntnVersion + (pure Mux.Continue) + nullTracer + server = + chainSyncServerExample () (producerStateVars Map.! peerId) + bracketChainSyncClient + chainSyncTracer + chainDbView + varCandidates + peerId + ntnVersion $ \varCandidate -> + Driver.runConnectedPeersPipelined + createConnectedChannels + nullTracer + codecChainSyncId + (chainSyncClientPeerPipelined $ client varCandidate) + (chainSyncServerPeer server) + where + recordChainSyncExceptions = flip catchAlsoLinked $ \ex -> do + atomically $ modifyTVar varChainSyncRes $ + Map.insert peerId (Left ex) + throwIO ex -- The BlockFetch logic requires initializing the KeepAlive -- miniprotocol, even if it does not do anything. @@ -227,6 +299,7 @@ runBlockFetchTest BlockFetchClientTestSetup{..} = withRegistry \registry -> do infiniteDelay blockFetchThreads <- Map.fromList <$> for peerIds \peerId -> do + _ <- forkTicking peerId _ <- forkChainSync peerId _ <- forkKeepAlive peerId fmap (peerId,) $ @@ -236,20 +309,29 @@ runBlockFetchTest BlockFetchClientTestSetup{..} = withRegistry \registry -> do LogicalClock.waitUntilDone clock atomically $ writeTVar varControlMessage Mux.Terminate - bfcoBlockFetchResults <- traverse waitThread blockFetchThreads - bfcoFetchedBlocks <- readTVarIO varFetchedBlocks - bfcoTrace <- getTrace + bfcoPeerOutcomes <- + flip Map.traverseWithKey blockFetchThreads \peerId threadId -> do + blockFetchRes <- waitThread threadId + chainSyncRes <- (Map.! peerId) <$> readTVarIO varChainSyncRes + pure PeerOutcome {..} + bfcoFetchedBlocks <- readTVarIO varFetchedBlocks + bfcoTrace <- getTrace pure BlockFetchClientOutcome {..} where peerIds = Map.keys peerUpdates numCoreNodes = NumCoreNodes $ fromIntegral $ Map.size peerUpdates + 1 - mkChainDbView :: + -- Needs to be larger than any chain length in this test, to ensure that + -- switching to any chain is never too deep. + securityParam = SecurityParam 1000 + topLevelConfig = singleNodeTestConfigWithK securityParam + + mkChainDB :: ResourceRegistry m -> Tracer m String - -> m (BlockFetchClientInterface.ChainDbView m TestBlock) - mkChainDbView registry tracer = do + -> m (ChainDB.ChainDB m TestBlock) + mkChainDB registry tracer = do chainDbArgs <- do cdbHasFSImmutableDB <- mockedHasFS cdbHasFSVolatileDB <- mockedHasFS @@ -278,20 +360,9 @@ runBlockFetchTest BlockFetchClientTestSetup{..} = withRegistry \registry -> do (\_ -> ChainDBImpl.openDBInternal chainDbArgs False) (ChainDB.closeDB . fst) _ <- forkLinkedThread registry "AddBlockRunner" intAddBlockRunner - - let -- Always return the empty chain such that the BlockFetch logic - -- downloads all chains. - getCurrentChain = pure $ AF.Empty AF.AnchorGenesis - getIsFetched = ChainDB.getIsFetched chainDB - getMaxSlotNo = ChainDB.getMaxSlotNo chainDB - addBlockWaitWrittenToDisk = ChainDB.addBlockWaitWrittenToDisk chainDB - pure BlockFetchClientInterface.ChainDbView {..} + pure chainDB where - -- Needs to be larger than any chain length in this test, to ensure that - -- switching to any chain is never too deep. - securityParam = SecurityParam 1000 - topLevelConfig = singleNodeTestConfigWithK securityParam - epochSize = eraEpochSize $ topLevelConfigLedger topLevelConfig + epochSize = eraEpochSize $ topLevelConfigLedger topLevelConfig mockedHasFS = SomeHasFS . simHasFS <$> newTVarIO MockFS.empty @@ -305,9 +376,9 @@ runBlockFetchTest BlockFetchClientTestSetup{..} = withRegistry \registry -> do mkTestBlockFetchConsensusInterface :: STM m (Map PeerId (AnchoredFragment (Header TestBlock))) - -> BlockFetchClientInterface.ChainDbView m TestBlock + -> ChainDB.ChainDB m TestBlock -> BlockFetchConsensusInterface PeerId (Header TestBlock) TestBlock m - mkTestBlockFetchConsensusInterface getCandidates chainDbView = + mkTestBlockFetchConsensusInterface getCandidates chainDB = BlockFetchClientInterface.mkBlockFetchConsensusInterface (TestBlockConfig numCoreNodes) chainDbView @@ -321,6 +392,15 @@ runBlockFetchTest BlockFetchClientTestSetup{..} = withRegistry \registry -> do slotForgeTime :: BlockFetchClientInterface.SlotForgeTimeOracle m blk slotForgeTime _ = pure dawnOfTime + chainDbView = BlockFetchClientInterface.ChainDbView {..} + where + -- Always return the empty chain such that the BlockFetch logic + -- downloads all chains. + getCurrentChain = pure $ AF.Empty AF.AnchorGenesis + getIsFetched = ChainDB.getIsFetched chainDB + getMaxSlotNo = ChainDB.getMaxSlotNo chainDB + addBlockWaitWrittenToDisk = ChainDB.addBlockWaitWrittenToDisk chainDB + mockBlockFetchServer :: forall m blk. (Monad m, HasHeader blk) @@ -375,11 +455,19 @@ instance Condense BlockFetchClientTestSetup where instance Arbitrary BlockFetchClientTestSetup where arbitrary = do - numPeers <- chooseInt (1, 3) + behavior <- elements + [ SelectedChainBehavior + , TentativeChainBehavior + , InvalidChainBehavior + ] + numPeers <- case behaviorValidity behavior of + -- Interaction of multiple clients can hide invalid behavior. + Invalid -> pure 1 + Valid -> chooseInt (1, 3) let peerIds = PeerId <$> [1 .. numPeers] peerUpdates <- Map.fromList . zip peerIds - <$> replicateM numPeers genUpdateSchedule + <$> replicateM numPeers (genUpdateSchedule behavior) blockFetchMode <- elements [FetchModeBulkSync, FetchModeDeadline] blockFetchCfg <- do let -- ensure that we can download blocks from all peers @@ -394,9 +482,13 @@ instance Arbitrary BlockFetchClientTestSetup where pure BlockFetchConfiguration {..} pure BlockFetchClientTestSetup {..} where - genUpdateSchedule = - genChainUpdates TentativeChainBehavior maxRollback 20 - >>= genSchedule DefaultSchedulingStrategy + genUpdateSchedule behavior = + genChainUpdates behavior maxRollback 20 >>= genSchedule strat + where + strat = case behaviorValidity behavior of + -- Multiple updates per tick can hide invalid behavior. + Invalid -> SingleItemPerTickStrategy + Valid -> DefaultSchedulingStrategy -- Only use a small k to avoid rolling forward by a big chain. maxRollback = SecurityParam 5