diff --git a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Util/LeakyBucket.hs b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Util/LeakyBucket.hs index 403ca832c5..01ae4a07b9 100644 --- a/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Util/LeakyBucket.hs +++ b/ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Util/LeakyBucket.hs @@ -219,28 +219,28 @@ runAgainstBucket :: (Handlers m -> m a) -> m (State m, a) runAgainstBucket config action = do - runThreadVar <- atomically newEmptyTMVar -- see note [Leaky bucket design]. + leakingPeriodVersionTMVar <- atomically newEmptyTMVar -- see note [Leaky bucket design]. tid <- myThreadId bucket <- init config - withAsync (leak runThreadVar tid bucket) $ \_ -> do - atomicallyWithMonotonicTime $ maybeStartThread Nothing runThreadVar bucket + withAsync (leak (readTMVar leakingPeriodVersionTMVar) tid bucket) $ \_ -> do + atomicallyWithMonotonicTime $ maybeStartThread Nothing leakingPeriodVersionTMVar bucket result <- action $ Handlers { fill = \r t -> (snd <$>) $ snapshotFill bucket r t, setPaused = setPaused bucket, - updateConfig = updateConfig runThreadVar bucket + updateConfig = updateConfig leakingPeriodVersionTMVar bucket } state <- atomicallyWithMonotonicTime $ snapshot bucket pure (state, result) where - -- Start the thread (that is, write to its 'runThreadVar') if it is useful. - -- Takes a potential old value of the 'runThreadVar' as first argument, + -- Start the thread (that is, write to its 'leakingPeriodVersionTMVar') if it is useful. + -- Takes a potential old value of the 'leakingPeriodVersionTMVar' as first argument, -- which will be increased to help differentiate between restarts. maybeStartThread :: Maybe Int -> StrictTMVar m Int -> Bucket m -> Time -> STM m () - maybeStartThread oldRunThread runThreadVar bucket time = do + maybeStartThread mLeakingPeriodVersion leakingPeriodVersionTMVar bucket time = do State {config = Config {rate}} <- snapshot bucket time - when (rate > 0) $ void $ tryPutTMVar runThreadVar $ maybe 0 (+ 1) oldRunThread + when (rate > 0) $ void $ tryPutTMVar leakingPeriodVersionTMVar $ maybe 0 (+ 1) mLeakingPeriodVersion setPaused :: Bucket m -> Bool -> Time -> STM m () setPaused bucket paused time = do @@ -253,7 +253,7 @@ runAgainstBucket config action = do ((Rational, Config m) -> (Rational, Config m)) -> Time -> STM m () - updateConfig runThreadVar bucket f time = do + updateConfig leakingPeriodVersionTMVar bucket f time = do State { level = oldLevel, paused, @@ -272,9 +272,9 @@ runAgainstBucket config action = do configGeneration = oldConfigGeneration + 1, config = newConfig } - -- Ensure that 'runThreadVar' is empty, then maybe start the thread. - oldRunThread <- tryTakeTMVar runThreadVar - maybeStartThread oldRunThread runThreadVar bucket time + -- Ensure that 'leakingPeriodVersionTMVar' is empty, then maybe start the thread. + mLeakingPeriodVersion <- tryTakeTMVar leakingPeriodVersionTMVar + maybeStartThread mLeakingPeriodVersion leakingPeriodVersionTMVar bucket time -- | Initialise a bucket given a configuration. The bucket starts full at the -- time where one calls 'init'. @@ -299,7 +299,7 @@ init config@Config {capacity} = do -- ~~~~~~~~~~~~~~~~~~~~~~~~~~ -- -- The leaky bucket works by running the given action against a thread that --- makes the bucket leak. Since that would be extremely inefficient to actually +-- makes the bucket leak. Since it would be inefficient to actually -- remove tokens one by one from the bucket, the 'leak' thread instead looks at -- the current state of the bucket, computes how much time it would take for the -- bucket to empty, and then wait that amount of time. Once the wait is over, it @@ -317,22 +317,21 @@ init config@Config {capacity} = do -- for the action to lower the waiting time by changing the bucket configuration -- to one where the rate is higher. -- --- We fix both those issues with one mechanism, the @runThreadVar@. It is an --- MVar containing an integer that tells the thread whether it should be --- running. An empty MVar means that the thread should not be running, for --- instance if the rate is null. A full MVar (no matter what the integer is) --- means that the thread should be running. When recursing, the thread blocks --- until the MVar is full, and only then proceeds as described above. +-- We fix both those issues with one mechanism, the @leakingPeriodVersionSTM@. +-- It is a computation returning an integer that identifies a version of the +-- configuration that controls the leaking period. If the computation blocks, +-- it means that no configuration has been determined yet. +-- The leak thread first waits until @leakingPeriodVersionSTM@ yields a +-- value, and only then proceeds as described above. -- Additionally, while waiting for the bucket to empty, the thread monitors --- changes to the MVar, indicating either that the thread should stop running or --- that the configuration changed as that it might have to wait less long. The --- change in configuration is detected by changes in the integer. +-- for changes to the version of the leaking period, indicating either that the +-- thread should pause running if the @leakingPeriodVersionSTM@ starts blocking +-- again or that the configuration changed as that it might have to wait less +-- long. -- --- Note that we call \“start\”/\“stop\” running the action of filling/emptying the --- MVar. This is not to mistaken for the thread actually being spawned/killed. --- | Monadic action that calls 'threadDelay' until the bucket is empty, then --- runs the 'onEmpty' action and terminates. See note [Leaky bucket design]. +-- | Neverending computation that runs 'onEmpty' whenever the bucket becomes +-- empty. See note [Leaky bucket design]. leak :: ( MonadDelay m, MonadCatch m, @@ -340,18 +339,20 @@ leak :: MonadAsync m, MonadTimer m ) => - -- | A variable indicating whether the thread should run (when it is filled) - -- or not (otherwise). The integer it carries only helps in differentiating - -- between starts and restarts. 'leak' does not modify this variable. - StrictTMVar m Int -> + -- | A computation indicating the version of the configuration affecting the + -- leaking period. Whenever the configuration changes, the returned integer + -- must be incremented. While no configuration is available, the computation + -- should block. Blocking is allowed at any time, and it will cause the + -- leaking to pause. + STM m Int -> -- | The 'ThreadId' of the action's thread, which is used to throw exceptions -- at it. ThreadId m -> Bucket m -> m () -leak runThreadVar actionThreadId bucket = forever $ do - -- Block until we are allowed to run. Do not modify the TMVar. - oldRunThread <- atomically $ readTMVar runThreadVar +leak leakingPeriodVersionSTM actionThreadId bucket = forever $ do + -- Block until we are allowed to run. + leakingPeriodVersion <- atomically leakingPeriodVersionSTM -- NOTE: It is tempting to group this @atomically@ and -- @atomicallyWithMonotonicTime@ into one; however, because the former is -- blocking, the latter could get a _very_ inaccurate time, which we @@ -377,7 +378,7 @@ leak runThreadVar actionThreadId bucket = forever $ do atomically $ (check =<< TVar.readTVar varTimeout) `orElse` - (void $ blockUntilChanged id (Just oldRunThread) $ tryReadTMVar runThreadVar) + (void $ blockUntilChanged id leakingPeriodVersion leakingPeriodVersionSTM) -- | Take a snapshot of the bucket, that is compute its state at the current -- time.