Skip to content

Commit

Permalink
WIP resbuf initial
Browse files Browse the repository at this point in the history
  • Loading branch information
turion committed May 10, 2024
1 parent 2256c24 commit 0e7501b
Show file tree
Hide file tree
Showing 10 changed files with 104 additions and 86 deletions.
1 change: 1 addition & 0 deletions automaton/src/Data/Stream/Internal.hs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE StrictData #-}

-- | Helper functions and types for Data.Stream. You will typically not need them.
module Data.Stream.Internal where

-- | A strict tuple type
Expand Down
15 changes: 8 additions & 7 deletions rhine/src/FRP/Rhine/Reactimation/ClockErasure.hs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import Control.Monad (join)

-- automaton
import Data.Automaton.Trans.Reader
import Data.Stream.Result (Result (..))

-- rhine
import FRP.Rhine.ClSF hiding (runReaderS)
Expand Down Expand Up @@ -98,25 +99,25 @@ eraseClockSN initialTime (Precompose clsf sn) =
proc (time, tag, aMaybe) -> do
bMaybe <- mapMaybeS $ eraseClockClSF (inProxy proxy) initialTime clsf -< (time,,) <$> inTag proxy tag <*> aMaybe
eraseClockSN initialTime sn -< (time, tag, bMaybe)
eraseClockSN initialTime (Feedback buf0 sn) =
eraseClockSN initialTime (Feedback ResamplingBuffer {buffer, put, get} sn) =
let
proxy = toClockProxy sn
in
feedback buf0 $ proc ((time, tag, aMaybe), buf) -> do
feedback buffer $ proc ((time, tag, aMaybe), buf) -> do
(cMaybe, buf') <- case inTag proxy tag of
Nothing -> do
returnA -< (Nothing, buf)
Just tagIn -> do
timeInfo <- genTimeInfo (inProxy proxy) initialTime -< (time, tagIn)
(c, buf') <- arrM $ uncurry get -< (buf, timeInfo)
Result buf' c <- arrM $ uncurry get -< (timeInfo, buf)
returnA -< (Just c, buf')
bdMaybe <- eraseClockSN initialTime sn -< (time, tag, (,) <$> aMaybe <*> cMaybe)
case (,) <$> outTag proxy tag <*> bdMaybe of
Nothing -> do
returnA -< (Nothing, buf')
Just (tagOut, (b, d)) -> do
timeInfo <- genTimeInfo (outProxy proxy) initialTime -< (time, tagOut)
buf'' <- arrM $ uncurry $ uncurry put -< ((buf', timeInfo), d)
buf'' <- arrM $ uncurry $ uncurry put -< ((timeInfo, d), buf')
returnA -< (Just b, buf'')
eraseClockSN initialTime (FirstResampling sn buf) =
let
Expand Down Expand Up @@ -149,14 +150,14 @@ eraseClockResBuf ::
Time cl1 ->
ResBuf m cl1 cl2 a b ->
Automaton m (Either (Time cl1, Tag cl1, a) (Time cl2, Tag cl2)) (Maybe b)
eraseClockResBuf proxy1 proxy2 initialTime resBuf0 = feedback resBuf0 $ proc (input, resBuf) -> do
eraseClockResBuf proxy1 proxy2 initialTime ResamplingBuffer {buffer, put, get} = feedback buffer $ proc (input, resBuf) -> do
case input of
Left (time1, tag1, a) -> do
timeInfo1 <- genTimeInfo proxy1 initialTime -< (time1, tag1)
resBuf' <- arrM (uncurry $ uncurry put) -< ((resBuf, timeInfo1), a)
resBuf' <- arrM (uncurry $ uncurry put) -< ((timeInfo1, a), resBuf)
returnA -< (Nothing, resBuf')
Right (time2, tag2) -> do
timeInfo2 <- genTimeInfo proxy2 initialTime -< (time2, tag2)
(b, resBuf') <- arrM (uncurry get) -< (resBuf, timeInfo2)
Result resBuf' b <- arrM (uncurry get) -< (timeInfo2, resBuf)
returnA -< (Just b, resBuf')
{-# INLINE eraseClockResBuf #-}
29 changes: 17 additions & 12 deletions rhine/src/FRP/Rhine/ResamplingBuffer.hs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
{-# LANGUAGE ExistentialQuantification #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE TypeFamilies #-}
Expand All @@ -15,10 +16,8 @@ module FRP.Rhine.ResamplingBuffer (
)
where

-- base
import Control.Arrow

-- rhine
import Data.Stream.Result
import FRP.Rhine.Clock

-- A quick note on naming conventions, to whoever cares:
Expand All @@ -39,18 +38,23 @@ or specific to certain clocks.
* 'a': The input type
* 'b': The output type
-}
data ResamplingBuffer m cla clb a b = ResamplingBuffer
{ put ::
data ResamplingBuffer m cla clb a b = forall s.
ResamplingBuffer
{ buffer :: s
-- ^ The internal state of the buffer.
, put ::
TimeInfo cla ->
a ->
m (ResamplingBuffer m cla clb a b)
s ->
m s
-- ^ Store one input value of type 'a' at a given time stamp,
-- and return a continuation.
-- and return an updated state.
, get ::
TimeInfo clb ->
m (b, ResamplingBuffer m cla clb a b)
s ->
m (Result s b)
-- ^ Retrieve one output value of type 'b' at a given time stamp,
-- and a continuation.
-- and an updated state.
}

-- | A type synonym to allow for abbreviation.
Expand All @@ -62,8 +66,9 @@ hoistResamplingBuffer ::
(forall c. m1 c -> m2 c) ->
ResamplingBuffer m1 cla clb a b ->
ResamplingBuffer m2 cla clb a b
hoistResamplingBuffer hoist ResamplingBuffer {..} =
hoistResamplingBuffer morph ResamplingBuffer {..} =
ResamplingBuffer
{ put = (((hoistResamplingBuffer hoist <$>) . hoist) .) . put
, get = (second (hoistResamplingBuffer hoist) <$>) . hoist . get
{ put = ((morph .) .) . put
, get = (morph .) . get
, buffer
}
25 changes: 12 additions & 13 deletions rhine/src/FRP/Rhine/ResamplingBuffer/ClSF.hs
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
{-# LANGUAGE RecordWildCards #-}

{- |
Collect and process all incoming values statefully and with time stamps.
-}
module FRP.Rhine.ResamplingBuffer.ClSF where

-- transformers
import Control.Monad.Trans.Reader (runReaderT)
import Control.Monad.Trans.Reader (ReaderT, runReaderT)

-- automaton
import Data.Automaton
import Data.Stream.Result
import Data.Stream
import Data.Stream.Optimized (toStreamT)
import Data.Stream.Result (mapResultState)

-- rhine
import FRP.Rhine.ClSF.Core
Expand All @@ -30,16 +30,15 @@ clsfBuffer ::
-- The list will contain the /newest/ element in the head.
ClSF m cl2 [(TimeInfo cl1, a)] b ->
ResamplingBuffer m cl1 cl2 a b
clsfBuffer = clsfBuffer' []
clsfBuffer = clsfBuffer' . toStreamT . getAutomaton
where
clsfBuffer' ::
(Monad m) =>
[(TimeInfo cl1, a)] ->
ClSF m cl2 [(TimeInfo cl1, a)] b ->
StreamT (ReaderT [(TimeInfo cl1, a)] (ReaderT (TimeInfo cl2) m)) b ->
ResamplingBuffer m cl1 cl2 a b
clsfBuffer' as automaton = ResamplingBuffer {..}
where
put ti1 a = return $ clsfBuffer' ((ti1, a) : as) automaton
get ti2 = do
Result automaton' b <- runReaderT (stepAutomaton automaton as) ti2
return (b, clsfBuffer automaton')
clsfBuffer' StreamT {state, step} =
ResamplingBuffer
{ buffer = (state, [])
, put = \ti1 a (s, as) -> return (s, (ti1, a) : as)
, get = \ti2 (s, as) -> mapResultState (,[]) <$> runReaderT (runReaderT (step s) as) ti2
}
9 changes: 5 additions & 4 deletions rhine/src/FRP/Rhine/ResamplingBuffer/Collect.hs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ module FRP.Rhine.ResamplingBuffer.Collect where
import Data.Sequence

-- rhine
import Data.Stream.Result (Result (..))
import FRP.Rhine.ResamplingBuffer
import FRP.Rhine.ResamplingBuffer.Timeless

Expand All @@ -21,7 +22,7 @@ collect :: (Monad m) => ResamplingBuffer m cl1 cl2 a [a]
collect = timelessResamplingBuffer AsyncMealy {..} []
where
amPut as a = return $ a : as
amGet as = return (as, [])
amGet as = return $! Result [] as

{- | Reimplementation of 'collect' with sequences,
which gives a performance benefit if the sequence needs to be reversed or searched.
Expand All @@ -30,7 +31,7 @@ collectSequence :: (Monad m) => ResamplingBuffer m cl1 cl2 a (Seq a)
collectSequence = timelessResamplingBuffer AsyncMealy {..} empty
where
amPut as a = return $ a <| as
amGet as = return (as, empty)
amGet as = return $! Result empty as

{- | 'pureBuffer' collects all input values lazily in a list
and processes it when output is required.
Expand All @@ -41,7 +42,7 @@ pureBuffer :: (Monad m) => ([a] -> b) -> ResamplingBuffer m cl1 cl2 a b
pureBuffer f = timelessResamplingBuffer AsyncMealy {..} []
where
amPut as a = return (a : as)
amGet as = return (f as, [])
amGet as = return $! Result [] $! f as

-- TODO Test whether strictness works here, or consider using deepSeq

Expand All @@ -58,4 +59,4 @@ foldBuffer ::
foldBuffer f = timelessResamplingBuffer AsyncMealy {..}
where
amPut b a = let !b' = f a b in return b'
amGet b = return (b, b)
amGet b = return $! Result b b
14 changes: 8 additions & 6 deletions rhine/src/FRP/Rhine/ResamplingBuffer/FIFO.hs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import Prelude hiding (length, take)
import Data.Sequence

-- rhine

import Data.Stream.Result (Result (..))
import FRP.Rhine.ResamplingBuffer
import FRP.Rhine.ResamplingBuffer.Timeless

Expand All @@ -25,8 +27,8 @@ fifoUnbounded = timelessResamplingBuffer AsyncMealy {..} empty
where
amPut as a = return $ a <| as
amGet as = case viewr as of
EmptyR -> return (Nothing, empty)
as' :> a -> return (Just a, as')
EmptyR -> return $! Result empty Nothing
as' :> a -> return $! Result as' (Just a)

{- | A bounded FIFO buffer that forgets the oldest values when the size is above a given threshold.
If the buffer is empty, it will return 'Nothing'.
Expand All @@ -36,14 +38,14 @@ fifoBounded threshold = timelessResamplingBuffer AsyncMealy {..} empty
where
amPut as a = return $ take threshold $ a <| as
amGet as = case viewr as of
EmptyR -> return (Nothing, empty)
as' :> a -> return (Just a, as')
EmptyR -> return $! Result empty Nothing
as' :> a -> return $! Result as' (Just a)

-- | An unbounded FIFO buffer that also returns its current size.
fifoWatch :: (Monad m) => ResamplingBuffer m cl1 cl2 a (Maybe a, Int)
fifoWatch = timelessResamplingBuffer AsyncMealy {..} empty
where
amPut as a = return $ a <| as
amGet as = case viewr as of
EmptyR -> return ((Nothing, 0), empty)
as' :> a -> return ((Just a, length as'), as')
EmptyR -> return $! Result empty (Nothing, 0)
as' :> a -> return $! Result as' (Just a, length as')
3 changes: 2 additions & 1 deletion rhine/src/FRP/Rhine/ResamplingBuffer/KeepLast.hs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ A buffer keeping the last value, or zero-order hold.
-}
module FRP.Rhine.ResamplingBuffer.KeepLast where

import Data.Stream.Result (Result (..))
import FRP.Rhine.ResamplingBuffer
import FRP.Rhine.ResamplingBuffer.Timeless

Expand All @@ -16,5 +17,5 @@ import FRP.Rhine.ResamplingBuffer.Timeless
keepLast :: (Monad m) => a -> ResamplingBuffer m cl1 cl2 a a
keepLast = timelessResamplingBuffer AsyncMealy {..}
where
amGet a = return (a, a)
amGet a = return $! Result a a
amPut _ = return
13 changes: 7 additions & 6 deletions rhine/src/FRP/Rhine/ResamplingBuffer/LIFO.hs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import Prelude hiding (length, take)
import Data.Sequence

-- rhine
import Data.Stream.Result (Result (..))
import FRP.Rhine.ResamplingBuffer
import FRP.Rhine.ResamplingBuffer.Timeless

Expand All @@ -25,8 +26,8 @@ lifoUnbounded = timelessResamplingBuffer AsyncMealy {..} empty
where
amPut as a = return $ a <| as
amGet as = case viewl as of
EmptyL -> return (Nothing, empty)
a :< as' -> return (Just a, as')
EmptyL -> return $! Result empty Nothing
a :< as' -> return $! Result as' (Just a)

{- | A bounded LIFO buffer that forgets the oldest values when the size is above a given threshold.
If the buffer is empty, it will return 'Nothing'.
Expand All @@ -36,14 +37,14 @@ lifoBounded threshold = timelessResamplingBuffer AsyncMealy {..} empty
where
amPut as a = return $ take threshold $ a <| as
amGet as = case viewl as of
EmptyL -> return (Nothing, empty)
a :< as' -> return (Just a, as')
EmptyL -> return $! Result empty Nothing
a :< as' -> return $! Result as' (Just a)

-- | An unbounded LIFO buffer that also returns its current size.
lifoWatch :: (Monad m) => ResamplingBuffer m cl1 cl2 a (Maybe a, Int)
lifoWatch = timelessResamplingBuffer AsyncMealy {..} empty
where
amPut as a = return $ a <| as
amGet as = case viewl as of
EmptyL -> return ((Nothing, 0), empty)
a :< as' -> return ((Just a, length as'), as')
EmptyL -> return $! Result empty (Nothing, 0)
a :< as' -> return $! Result as' (Just a, length as')
21 changes: 8 additions & 13 deletions rhine/src/FRP/Rhine/ResamplingBuffer/Timeless.hs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ These are used in many other modules implementing 'ResamplingBuffer's.
-}
module FRP.Rhine.ResamplingBuffer.Timeless where

import Data.Stream.Result
import FRP.Rhine.ResamplingBuffer

{- | An asynchronous, effectful Mealy machine description.
Expand All @@ -16,7 +17,7 @@ import FRP.Rhine.ResamplingBuffer
data AsyncMealy m s a b = AsyncMealy
{ amPut :: s -> a -> m s
-- ^ Given the previous state and an input value, return the new state.
, amGet :: s -> m (b, s)
, amGet :: s -> m (Result s b)
-- ^ Given the previous state, return an output value and a new state.
}
{- FOURMOLU_ENABLE -}
Expand All @@ -30,28 +31,22 @@ data AsyncMealy m s a b = AsyncMealy
-}
timelessResamplingBuffer ::
(Monad m) =>
AsyncMealy m s a b -> -- The asynchronous Mealy machine from which the buffer is built

-- | The asynchronous Mealy machine from which the buffer is built
AsyncMealy m s a b ->
-- | The initial state
s ->
ResamplingBuffer m cl1 cl2 a b
timelessResamplingBuffer AsyncMealy {..} = go
timelessResamplingBuffer AsyncMealy {..} buffer = ResamplingBuffer {..}
where
go s =
let
put _ a = go <$> amPut s a
get _ = do
(b, s') <- amGet s
return (b, go s')
in
ResamplingBuffer {..}
put _ a s = amPut s a
get _ = amGet

-- | A resampling buffer that only accepts and emits units.
trivialResamplingBuffer :: (Monad m) => ResamplingBuffer m cl1 cl2 () ()
trivialResamplingBuffer =
timelessResamplingBuffer
AsyncMealy
{ amPut = const (const (return ()))
, amGet = const (return ((), ()))
, amGet = const (return $! Result () ())
}
()
Loading

0 comments on commit 0e7501b

Please sign in to comment.