diff --git a/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/Network/NodeToNode.hs b/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/Network/NodeToNode.hs index cc4eb2dce7..d0131b77e3 100644 --- a/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/Network/NodeToNode.hs +++ b/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/Network/NodeToNode.hs @@ -213,7 +213,7 @@ mkHandlers -- ^ Peer Sharing result computation callback -> Handlers m addrNTN blk mkHandlers - NodeKernelArgs {keepAliveRng, miniProtocolParameters} + NodeKernelArgs {chainSyncFutureCheck, keepAliveRng, miniProtocolParameters} NodeKernel {getChainDB, getMempool, getTopLevelConfig, getTracers = tracers} computePeers = Handlers { @@ -224,6 +224,7 @@ mkHandlers (chainSyncPipeliningHighMark miniProtocolParameters)) (contramap (TraceLabelPeer peer) (Node.chainSyncClientTracer tracers)) getTopLevelConfig + chainSyncFutureCheck (defaultChainDbView getChainDB) , hChainSyncServer = \peer _version -> chainSyncHeadersServer diff --git a/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/Node.hs b/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/Node.hs index 8bd7b35580..f1de2b3877 100644 --- a/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/Node.hs +++ b/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/Node.hs @@ -74,6 +74,7 @@ import Ouroboros.Consensus.Fragment.InFuture (CheckInFuture, ClockSkew) import qualified Ouroboros.Consensus.Fragment.InFuture as InFuture import Ouroboros.Consensus.Ledger.Extended (ExtLedgerState (..)) +import qualified Ouroboros.Consensus.MiniProtocol.ChainSync.Client.InFutureCheck as InFutureCheck import qualified Ouroboros.Consensus.Network.NodeToClient as NTC import qualified Ouroboros.Consensus.Network.NodeToNode as NTN import Ouroboros.Consensus.Node.DbLock @@ -392,6 +393,7 @@ runWith RunNodeArgs{..} encAddrNtN decAddrNtN LowLevelRunNodeArgs{..} = cfg rnTraceConsensus btime + (InFutureCheck.realHeaderInFutureCheck llrnMaxClockSkew systemTime) chainDB nodeKernel <- initNodeKernel nodeKernelArgs rnNodeKernelHook registry nodeKernel @@ -639,6 +641,7 @@ mkNodeKernelArgs -> TopLevelConfig blk -> Tracers m (ConnectionId addrNTN) (ConnectionId addrNTC) blk -> BlockchainTime m + -> InFutureCheck.HeaderInFutureCheck m blk -> ChainDB m blk -> m (NodeKernelArgs m addrNTN (ConnectionId addrNTC) blk) mkNodeKernelArgs @@ -648,6 +651,7 @@ mkNodeKernelArgs cfg tracers btime + chainSyncFutureCheck chainDB = do return NodeKernelArgs @@ -657,6 +661,7 @@ mkNodeKernelArgs , btime , chainDB , initChainDB = nodeInitChainDB + , chainSyncFutureCheck , blockFetchSize = estimateBlockSize , mempoolCapacityOverride = NoMempoolCapacityBytesOverride , miniProtocolParameters = defaultMiniProtocolParameters diff --git a/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/NodeKernel.hs b/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/NodeKernel.hs index ff73c92667..7e226acc59 100644 --- a/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/NodeKernel.hs +++ b/ouroboros-consensus-diffusion/src/ouroboros-consensus-diffusion/Ouroboros/Consensus/NodeKernel.hs @@ -55,6 +55,8 @@ import Ouroboros.Consensus.Ledger.SupportsPeerSelection import Ouroboros.Consensus.Ledger.SupportsProtocol import Ouroboros.Consensus.Mempool import qualified Ouroboros.Consensus.MiniProtocol.BlockFetch.ClientInterface as BlockFetchClientInterface +import Ouroboros.Consensus.MiniProtocol.ChainSync.Client.InFutureCheck + (HeaderInFutureCheck) import Ouroboros.Consensus.Node.Run import Ouroboros.Consensus.Node.Tracers import Ouroboros.Consensus.Protocol.Abstract @@ -132,6 +134,7 @@ data NodeKernelArgs m addrNTN addrNTC blk = NodeKernelArgs { , btime :: BlockchainTime m , chainDB :: ChainDB m blk , initChainDB :: StorageConfig blk -> InitChainDB m blk -> m () + , chainSyncFutureCheck :: HeaderInFutureCheck m blk , blockFetchSize :: Header blk -> SizeInBytes , mempoolCapacityOverride :: MempoolCapacityBytesOverride , miniProtocolParameters :: MiniProtocolParameters diff --git a/ouroboros-consensus-diffusion/src/unstable-diffusion-testlib/Test/ThreadNet/Network.hs b/ouroboros-consensus-diffusion/src/unstable-diffusion-testlib/Test/ThreadNet/Network.hs index e8543daa1d..8dd019d403 100644 --- a/ouroboros-consensus-diffusion/src/unstable-diffusion-testlib/Test/ThreadNet/Network.hs +++ b/ouroboros-consensus-diffusion/src/unstable-diffusion-testlib/Test/ThreadNet/Network.hs @@ -70,6 +70,7 @@ import Ouroboros.Consensus.Ledger.SupportsMempool import Ouroboros.Consensus.Ledger.SupportsProtocol import Ouroboros.Consensus.Mempool import qualified Ouroboros.Consensus.MiniProtocol.ChainSync.Client as CSClient +import qualified Ouroboros.Consensus.MiniProtocol.ChainSync.Client.InFutureCheck as InFutureCheck import qualified Ouroboros.Consensus.Network.NodeToNode as NTN import Ouroboros.Consensus.Node.ExitPolicy import Ouroboros.Consensus.Node.InitStorage @@ -974,6 +975,10 @@ runThreadNetwork systemTime ThreadNetworkArgs , btime , chainDB , initChainDB = nodeInitChainDB + , chainSyncFutureCheck = + InFutureCheck.realHeaderInFutureCheck + InFuture.defaultClockSkew + (OracularClock.finiteSystemTime clock) , blockFetchSize = estimateBlockSize , mempoolCapacityOverride = NoMempoolCapacityBytesOverride , keepAliveRng = kaRng diff --git a/ouroboros-consensus/ouroboros-consensus.cabal b/ouroboros-consensus/ouroboros-consensus.cabal index 2ff8546700..253582952e 100644 --- a/ouroboros-consensus/ouroboros-consensus.cabal +++ b/ouroboros-consensus/ouroboros-consensus.cabal @@ -141,6 +141,7 @@ library Ouroboros.Consensus.MiniProtocol.BlockFetch.ClientInterface Ouroboros.Consensus.MiniProtocol.BlockFetch.Server Ouroboros.Consensus.MiniProtocol.ChainSync.Client + Ouroboros.Consensus.MiniProtocol.ChainSync.Client.InFutureCheck Ouroboros.Consensus.MiniProtocol.ChainSync.Server Ouroboros.Consensus.MiniProtocol.LocalStateQuery.Server Ouroboros.Consensus.MiniProtocol.LocalTxMonitor.Server diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Fragment/InFuture.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Fragment/InFuture.hs index 90065b0a6b..0d3dffcaa3 100644 --- a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Fragment/InFuture.hs +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Fragment/InFuture.hs @@ -16,8 +16,9 @@ module Ouroboros.Consensus.Fragment.InFuture ( -- * Clock skew , clockSkewInSeconds , defaultClockSkew - -- ** opaque + -- ** not exporting the constructor , ClockSkew + , unClockSkew -- * Testing , dontCheck , miracle diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ChainSync/Client.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ChainSync/Client.hs index 3488b2e189..43c088d6e2 100644 --- a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ChainSync/Client.hs +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ChainSync/Client.hs @@ -36,7 +36,8 @@ module Ouroboros.Consensus.MiniProtocol.ChainSync.Client ( , TraceChainSyncClientEvent (..) ) where -import Control.Monad.Except +import Control.Monad (join) +import Control.Monad.Except (runExcept, throwError) import Control.Tracer import Data.Kind (Type) import Data.Map.Strict (Map) @@ -51,12 +52,16 @@ import NoThunks.Class (unsafeNoThunks) import Ouroboros.Consensus.Block import Ouroboros.Consensus.Config import Ouroboros.Consensus.Forecast +import Ouroboros.Consensus.HardFork.History + (PastHorizonException (PastHorizon)) import Ouroboros.Consensus.HeaderStateHistory (HeaderStateHistory (..), validateHeader) import qualified Ouroboros.Consensus.HeaderStateHistory as HeaderStateHistory import Ouroboros.Consensus.HeaderValidation hiding (validateHeader) +import Ouroboros.Consensus.Ledger.Basics (LedgerState) import Ouroboros.Consensus.Ledger.Extended import Ouroboros.Consensus.Ledger.SupportsProtocol +import qualified Ouroboros.Consensus.MiniProtocol.ChainSync.Client.InFutureCheck as InFutureCheck import Ouroboros.Consensus.Node.NetworkProtocolVersion import Ouroboros.Consensus.Protocol.Abstract import Ouroboros.Consensus.Storage.ChainDB (ChainDB, @@ -64,6 +69,7 @@ import Ouroboros.Consensus.Storage.ChainDB (ChainDB, import qualified Ouroboros.Consensus.Storage.ChainDB as ChainDB import Ouroboros.Consensus.Util import Ouroboros.Consensus.Util.Assert (assertWithMsg) +import qualified Ouroboros.Consensus.Util.EarlyExit as EarlyExit import Ouroboros.Consensus.Util.IOLike import Ouroboros.Consensus.Util.STM (Fingerprint, Watcher (..), WithFingerprint (..), withWatcher) @@ -426,6 +432,7 @@ chainSyncClient => MkPipelineDecision -> Tracer m (TraceChainSyncClientEvent blk) -> TopLevelConfig blk + -> InFutureCheck.HeaderInFutureCheck m blk -> ChainDbView m blk -> NodeToNodeVersion -> ControlMessageSTM m @@ -433,6 +440,13 @@ chainSyncClient -> StrictTVar m (AnchoredFragment (Header blk)) -> Consensus ChainSyncClientPipelined blk m chainSyncClient mkPipelineDecision0 tracer cfg + InFutureCheck.HeaderInFutureCheck + { -- these fields in order of use + proxyArrival = Proxy :: Proxy arrival + , recordHeaderArrival + , judgeHeaderArrival + , handleHeaderArrival + } ChainDbView { getCurrentChain , getHeaderStateHistory @@ -706,104 +720,156 @@ chainSyncClient mkPipelineDecision0 tracer cfg (ClientPipelinedStIdle n) rollForward mkPipelineDecision n hdr theirTip = Stateful $ \kis -> traceException $ do - now <- getMonotonicTime - let hdrPoint = headerPoint hdr - - isInvalidBlock <- atomically $ forgetFingerprint <$> getIsInvalidBlock - let disconnectWhenInvalid = \case - GenesisHash -> pure () - BlockHash hash -> + arrival <- recordHeaderArrival hdr + now <- getMonotonicTime + let hdrPoint = headerPoint hdr + slotNo = blockSlot hdr + + do + let scrutinee = + case isPipeliningEnabled version of + NotReceivingTentativeBlocks -> BlockHash (headerHash hdr) + -- Disconnect if the parent block of `hdr` is known to be invalid. + ReceivingTentativeBlocks -> headerPrevHash hdr + case scrutinee of + GenesisHash -> return () + BlockHash hash -> do + -- If the peer is sending headers quickly, the + -- @invalidBlockWatcher@ might miss one. So this call is a + -- lightweight supplement. Note that neither check /must/ be 100% + -- reliable. + isInvalidBlock <- atomically $ forgetFingerprint <$> getIsInvalidBlock whenJust (isInvalidBlock hash) $ \reason -> disconnect $ InvalidBlock hdrPoint hash reason - disconnectWhenInvalid $ - case isPipeliningEnabled version of - -- Disconnect if the parent block of `hdr` is known to be invalid. - ReceivingTentativeBlocks -> headerPrevHash hdr - NotReceivingTentativeBlocks -> BlockHash (headerHash hdr) - - -- Get the ledger view required to validate the header - -- NOTE: This will block if we are too far behind. - intersectCheck <- atomically $ do - -- Before obtaining a 'LedgerView', we must find the most recent - -- intersection with the current chain. Note that this is cheap when - -- the chain and candidate haven't changed. - mKis' <- intersectsWithCurrentChain kis - case mKis' of - Nothing -> return NoLongerIntersects - Just kis'@KnownIntersectionState { mostRecentIntersection } -> do - -- We're calling 'ledgerViewForecastAt' in the same STM transaction - -- as 'intersectsWithCurrentChain'. This guarantees the former's - -- precondition: the intersection is within the last @k@ blocks of - -- the current chain. - forecast <- + + mLedgerView <- EarlyExit.withEarlyExit $ do + Intersects kis2 lst <- checkArrivalTime kis arrival + Intersects kis3 ledgerView <- case projectLedgerView slotNo lst of + Just ledgerView -> pure $ Intersects kis2 ledgerView + Nothing -> readLedgerState kis2 (projectLedgerView slotNo) + pure $ Intersects kis3 ledgerView + + case mLedgerView of + + Nothing -> do + -- The above computation exited early, which means our chain (tip) + -- has changed and it no longer intersects with the candidate + -- fragment, so we have to find a new intersection. But first drain + -- the pipe. + continueWithState () + $ drainThePipe n + $ findIntersection NoMoreIntersection + + Just (Intersects kis' ledgerView) -> do + -- Our chain still intersects with the candidate fragment and we + -- have obtained a 'LedgerView' that we can use to validate @hdr@. + let KnownIntersectionState { + ourFrag + , theirFrag + , theirHeaderStateHistory + , mostRecentIntersection + } = kis' + + -- Validate header + theirHeaderStateHistory' <- + case runExcept $ validateHeader cfg ledgerView hdr theirHeaderStateHistory of + Right theirHeaderStateHistory' -> return theirHeaderStateHistory' + Left vErr -> + disconnect $ + HeaderError hdrPoint vErr (ourTipFromChain ourFrag) theirTip + + let theirFrag' = theirFrag :> hdr + -- Advance the most recent intersection if we have the same + -- header on our fragment too. This is cheaper than recomputing + -- the intersection from scratch. + mostRecentIntersection' + | Just ourSuccessor <- + AF.successorBlock (castPoint mostRecentIntersection) ourFrag + , headerHash ourSuccessor == headerHash hdr + = headerPoint hdr + | otherwise + = mostRecentIntersection + kis'' = assertKnownIntersectionInvariants (configConsensus cfg) $ + KnownIntersectionState { + theirFrag = theirFrag' + , theirHeaderStateHistory = theirHeaderStateHistory' + , ourFrag = ourFrag + , mostRecentIntersection = mostRecentIntersection' + } + atomically $ writeTVar varCandidate theirFrag' + atomically $ traceWith headerMetricsTracer (slotNo, now) + + continueWithState kis'' $ nextStep mkPipelineDecision n theirTip + + -- Used in 'rollForward': determines whether the header is from the future, + -- and handle that fact if so. Also return the ledger state used for the + -- determination. + -- + -- Relies on 'readLedgerState'. + checkArrivalTime :: KnownIntersectionState blk + -> arrival + -> EarlyExit.WithEarlyExit m (Intersects blk (LedgerState blk)) + checkArrivalTime kis arrival = do + Intersects kis' (lst, judgment) <- readLedgerState kis $ \lst -> + case runExcept $ judgeHeaderArrival (configLedger cfg) lst arrival of + Left PastHorizon{} -> Nothing + Right judgment -> Just (lst, judgment) + + -- For example, throw an exception if the header is from the far + -- future. + EarlyExit.lift $ handleHeaderArrival judgment >>= \case + Just exn -> disconnect (InFutureHeaderExceedsClockSkew exn) + Nothing -> return $ Intersects kis' lst + + -- Used in 'rollForward': block until the the ledger state at the + -- intersection with the local selection returns 'Just'. + -- + -- Exits early if the intersection no longer exists. + readLedgerState :: KnownIntersectionState blk + -> (LedgerState blk -> Maybe a) + -> EarlyExit.WithEarlyExit m (Intersects blk a) + readLedgerState kis prj = + join $ EarlyExit.lift $ readLedgerStateHelper kis prj + + readLedgerStateHelper :: KnownIntersectionState blk + -> (LedgerState blk -> Maybe a) + -> m (EarlyExit.WithEarlyExit m (Intersects blk a)) + readLedgerStateHelper kis prj = atomically $ do + -- We must first find the most recent intersection with the current + -- chain. Note that this is cheap when the chain and candidate haven't + -- changed. + intersectsWithCurrentChain kis >>= \case + Nothing -> return EarlyExit.exitEarly + Just kis' -> do + let KnownIntersectionState { mostRecentIntersection } = kis' + lst <- maybe (error $ "intersection not within last k blocks: " <> show mostRecentIntersection) - (ledgerViewForecastAt (configLedger cfg) . ledgerState) + ledgerState <$> getPastLedger mostRecentIntersection - - case runExcept $ forecastFor forecast (blockSlot hdr) of - -- The header is too far ahead of the intersection point with our - -- current chain. We have to wait until our chain and the - -- intersection have advanced far enough. This will wait on - -- changes to the current chain via the call to - -- 'intersectsWithCurrentChain' before it. - Left OutsideForecastRange{} -> - retry - Right ledgerView -> - return $ Intersects kis' ledgerView - - case intersectCheck of - NoLongerIntersects -> - -- Our chain (tip) has changed and it no longer intersects with the - -- candidate fragment, so we have to find a new intersection, but - -- first drain the pipe. - continueWithState () - $ drainThePipe n - $ findIntersection NoMoreIntersection - - Intersects kis' ledgerView -> do - -- Our chain still intersects with the candidate fragment and we - -- have obtained a 'LedgerView' that we can use to validate @hdr@. - - let KnownIntersectionState { - ourFrag - , theirFrag - , theirHeaderStateHistory - , mostRecentIntersection - } = kis' - - -- Validate header - theirHeaderStateHistory' <- - case runExcept $ validateHeader cfg ledgerView hdr theirHeaderStateHistory of - Right theirHeaderStateHistory' -> return theirHeaderStateHistory' - Left vErr -> - disconnect $ - HeaderError hdrPoint vErr (ourTipFromChain ourFrag) theirTip - - let theirFrag' = theirFrag :> hdr - -- Advance the most recent intersection if we have the same header - -- on our fragment too. This is cheaper than recomputing the - -- intersection from scratch. - mostRecentIntersection' - | Just ourSuccessor <- - AF.successorBlock (castPoint mostRecentIntersection) ourFrag - , headerHash ourSuccessor == headerHash hdr - = headerPoint hdr - | otherwise - = mostRecentIntersection - kis'' = assertKnownIntersectionInvariants (configConsensus cfg) $ - KnownIntersectionState { - theirFrag = theirFrag' - , theirHeaderStateHistory = theirHeaderStateHistory' - , ourFrag = ourFrag - , mostRecentIntersection = mostRecentIntersection' - } - atomically $ writeTVar varCandidate theirFrag' - let slotNo = blockSlot hdr - atomically $ traceWith headerMetricsTracer (slotNo, now) - - continueWithState kis'' $ nextStep mkPipelineDecision n theirTip + case prj lst of + Nothing -> retry + Just ledgerView -> return $ return $ Intersects kis' ledgerView + + -- Used in 'rollForward': returns 'Nothing' if the ledger state cannot + -- forecast the ledger view that far into the future. + projectLedgerView :: SlotNo + -> LedgerState blk + -> Maybe (LedgerView (BlockProtocol blk)) + projectLedgerView slot lst = + let forecast = ledgerViewForecastAt (configLedger cfg) lst + -- TODO cache this in the KnownIntersectionState? Or even in the + -- LedgerDB? + in + case runExcept $ forecastFor forecast slot of + -- The header is too far ahead of the intersection point with our + -- current chain. We have to wait until our chain and the + -- intersection have advanced far enough. This will wait on + -- changes to the current chain via the call to + -- 'intersectsWithCurrentChain' before it. + Left OutsideForecastRange{} -> Nothing + Right ledgerView -> Just ledgerView rollBackward :: MkPipelineDecision -> Nat n @@ -1024,16 +1090,10 @@ invalidBlockRejector tracer version getIsInvalidBlock getCandidate = throwIO ex -- | Auxiliary data type used as an intermediary result in 'rollForward'. -data IntersectCheck blk = - -- | The upstream chain no longer intersects with our current chain because - -- our current chain changed in the background. - NoLongerIntersects - -- | The upstream chain still intersects with our chain, return the - -- resulting 'KnownIntersectionState' and the 'LedgerView' corresponding to - -- the header 'rollForward' received. - | Intersects - (KnownIntersectionState blk) - (LedgerView (BlockProtocol blk)) +data Intersects blk a = + Intersects + (KnownIntersectionState blk) + a {------------------------------------------------------------------------------- Explicit state @@ -1159,6 +1219,8 @@ data ChainSyncClientException = -- different from the previous argument. (InvalidBlockReason blk) + | InFutureHeaderExceedsClockSkew !InFutureCheck.HeaderArrivalException + deriving instance Show ChainSyncClientException instance Eq ChainSyncClientException where @@ -1180,6 +1242,10 @@ instance Eq ChainSyncClientException where Just Refl -> (a, b, c) == (a', b', c') InvalidBlock{} == _ = False + InFutureHeaderExceedsClockSkew a == InFutureHeaderExceedsClockSkew a' = + a == a' + InFutureHeaderExceedsClockSkew{} == _ = False + instance Exception ChainSyncClientException {------------------------------------------------------------------------------- diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ChainSync/Client/InFutureCheck.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ChainSync/Client/InFutureCheck.hs new file mode 100644 index 0000000000..88b004a276 --- /dev/null +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/MiniProtocol/ChainSync/Client/InFutureCheck.hs @@ -0,0 +1,145 @@ +{-# LANGUAGE ExistentialQuantification #-} +{-# LANGUAGE FlexibleContexts #-} +{-# LANGUAGE GADTs #-} +{-# LANGUAGE ScopedTypeVariables #-} +{-# LANGUAGE StandaloneDeriving #-} +{-# LANGUAGE TypeApplications #-} + +module Ouroboros.Consensus.MiniProtocol.ChainSync.Client.InFutureCheck ( + -- * Interface + HeaderInFutureCheck (..) + -- * Real Implementation + , HeaderArrivalException (..) + , realHeaderInFutureCheck + ) where + +import Control.Exception (Exception) +import Control.Monad (guard, unless) +import Control.Monad.Class.MonadTimer.SI (MonadDelay, threadDelay) +import Control.Monad.Except (Except, liftEither) +import Data.Proxy (Proxy (Proxy)) +import Data.Time.Clock (NominalDiffTime) +import Data.Type.Equality ((:~:) (Refl)) +import Data.Typeable (eqT) +import Ouroboros.Consensus.Block.Abstract (Header) +import Ouroboros.Consensus.Block.RealPoint (RealPoint, + headerRealPoint, realPointSlot) +import Ouroboros.Consensus.BlockchainTime.WallClock.Types + (RelativeTime, SystemTime, diffRelTime, systemTimeCurrent) +import Ouroboros.Consensus.Fragment.InFuture (ClockSkew, unClockSkew) +import Ouroboros.Consensus.HardFork.Abstract (HasHardForkHistory, + hardForkSummary) +import Ouroboros.Consensus.HardFork.History (PastHorizonException) +import Ouroboros.Consensus.HardFork.History.Qry (runQuery, + slotToWallclock) +import Ouroboros.Consensus.Ledger.Basics (LedgerConfig, LedgerState) +import Ouroboros.Consensus.Util.Time (nominalDelay) +import Ouroboros.Network.Block (HasHeader) + +{------------------------------------------------------------------------------- + Interface +-------------------------------------------------------------------------------} + +-- | The interface a ChainSync client needs in order to check the arrival time +-- of headers. +-- +-- Instead of alphabetical, the fields are in the order in which the ChainSync +-- client logic will invoke them for each header. +data HeaderInFutureCheck m blk = forall arrival judgment. HeaderInFutureCheck { + proxyArrival :: Proxy arrival + , + -- | This is ideally called _immediately_ upon the header arriving. + recordHeaderArrival :: Header blk -> m arrival + , + -- | Judge what to do about the header's arrival time. + -- + -- Note that this may be called after a delay, hence @arrival@ contains at + -- least the arrival time. + -- + -- In particular, such a delay might be caused by waiting for the + -- intersection with the local selection to change after this function + -- returns 'Ouroboros.Consensus.HardFork.HistoryPastHorizon'. + judgeHeaderArrival :: + LedgerConfig blk + -> LedgerState blk + -> arrival + -> Except PastHorizonException judgment + , + -- | Enact the judgment. + -- + -- If @Just@ is returned, an exception should be raised. + handleHeaderArrival :: judgment -> m (Maybe HeaderArrivalException) + } + +{------------------------------------------------------------------------------- + Real implmementation +-------------------------------------------------------------------------------} + +data HeaderArrivalException = + -- | The header arrived so early that its issuer either minted it before + -- their clock reached its slot onset or else the difference between their + -- clock and ours is more severe than we're configured to tolerate. + -- + -- INVARIANT: @'tolerableClockSkew' < negate 'ageUponArrival'@ + forall blk. HasHeader blk => FarFutureHeaderException { + ageUponArrival :: !NominalDiffTime + , + arrivedPoint :: !(RealPoint blk) + , + arrivalTime :: !RelativeTime + , + tolerableClockSkew :: !NominalDiffTime + } + +deriving instance Show HeaderArrivalException + +instance Exception HeaderArrivalException + +instance Eq HeaderArrivalException where + (==) + (FarFutureHeaderException l0 (l1 :: RealPoint l) l2 l3) + (FarFutureHeaderException r0 (r1 :: RealPoint r) r2 r3) + = case eqT @l @r of + Nothing -> False + Just Refl -> (l0, l1, l2, l3) == (r0, r1, r2, r3) + +realHeaderInFutureCheck :: + ( HasHeader blk + , HasHeader (Header blk) + , HasHardForkHistory blk + , MonadDelay m + ) + => ClockSkew -> SystemTime m -> HeaderInFutureCheck m blk +realHeaderInFutureCheck skew systemTime = HeaderInFutureCheck { + proxyArrival = Proxy + , recordHeaderArrival = \hdr -> do + (,) (headerRealPoint hdr) <$> systemTimeCurrent systemTime + , judgeHeaderArrival = \lcfg lst (p, arrivalTime_) -> do + let qry = slotToWallclock (realPointSlot p) + hfSummary = hardForkSummary lcfg lst + -- TODO cache this in the KnownIntersectionState? Or even in the + -- LedgerDB? + (onset, _slotLength) <- liftEither $ runQuery qry hfSummary + pure (p, arrivalTime_, onset) + , handleHeaderArrival = \(p, arrivalTime_, onset) -> do + let ageUponArrival_ = arrivalTime_ `diffRelTime` onset + tooEarly = unClockSkew skew < negate ageUponArrival_ + -- TODO leap seconds? + + -- this delay is the simple part of Ouroboros Chronos + unless tooEarly $ do + now <- systemTimeCurrent systemTime + let ageNow = now `diffRelTime` onset + syntheticDelay = negate ageNow + threadDelay $ nominalDelay syntheticDelay -- TODO leap seconds? + -- recall that threadDelay ignores negative arguments + + pure $ do + guard tooEarly -- no exception if within skew + pure FarFutureHeaderException { + ageUponArrival = ageUponArrival_ + , arrivedPoint = p + , arrivalTime = arrivalTime_ + , tolerableClockSkew = unClockSkew skew + } + } diff --git a/ouroboros-consensus/test/consensus-test/Test/Consensus/MiniProtocol/ChainSync/Client.hs b/ouroboros-consensus/test/consensus-test/Test/Consensus/MiniProtocol/ChainSync/Client.hs index 041b742e9c..2528049d9e 100644 --- a/ouroboros-consensus/test/consensus-test/Test/Consensus/MiniProtocol/ChainSync/Client.hs +++ b/ouroboros-consensus/test/consensus-test/Test/Consensus/MiniProtocol/ChainSync/Client.hs @@ -46,6 +46,7 @@ import qualified Ouroboros.Consensus.HeaderStateHistory as HeaderStateHistory import Ouroboros.Consensus.Ledger.Abstract import Ouroboros.Consensus.Ledger.Extended hiding (ledgerState) import Ouroboros.Consensus.MiniProtocol.ChainSync.Client +import Ouroboros.Consensus.MiniProtocol.ChainSync.Client.InFutureCheck as InFutureCheck import Ouroboros.Consensus.Node.NetworkProtocolVersion (NodeToNodeVersion) import Ouroboros.Consensus.Node.ProtocolInfo import Ouroboros.Consensus.NodeId @@ -318,6 +319,14 @@ runChainSync securityParam (ClientUpdates clientUpdates) pure $ WithFingerprint isInvalidBlock fp } + headerInFutureCheck :: InFutureCheck.HeaderInFutureCheck m blk + headerInFutureCheck = InFutureCheck.HeaderInFutureCheck + { InFutureCheck.handleHeaderArrival = \_judgment -> pure Nothing + , InFutureCheck.judgeHeaderArrival = \_cfg _lst _arrival -> pure () + , InFutureCheck.proxyArrival = Proxy + , InFutureCheck.recordHeaderArrival = \_hdr -> pure () + } + client :: StrictTVar m (AnchoredFragment (Header TestBlock)) -> Consensus ChainSyncClientPipelined TestBlock @@ -326,6 +335,7 @@ runChainSync securityParam (ClientUpdates clientUpdates) (pipelineDecisionLowHighMark 10 20) chainSyncTracer nodeCfg + headerInFutureCheck chainDbView (maxBound :: NodeToNodeVersion) (return Continue)