Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Internal monad schedule class #376

Draft
wants to merge 22 commits into
base: master
Choose a base branch
from
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,4 @@ cabal.sandbox.config
.stack-work/
cabal.project.local
result
.direnv/
3 changes: 3 additions & 0 deletions automaton/automaton.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ common opts
simple-affine-space ^>=0.2,
these >=1.1 && <=1.3,
transformers >=0.5,
sop-core ^>=0.5,
free >= 5.1,

if flag(dev)
ghc-options: -Werror
Expand Down Expand Up @@ -65,6 +67,7 @@ library
exposed-modules:
Data.Automaton
Data.Automaton.Recursive
Data.Automaton.Schedule
Data.Automaton.Trans.Accum
Data.Automaton.Trans.Except
Data.Automaton.Trans.Maybe
Expand Down
27 changes: 26 additions & 1 deletion automaton/src/Data/Automaton.hs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import Data.Semialign (Align (..), Semialign (..))

-- automaton
import Data.Stream (StreamT (..), fixStream)
import Data.Stream qualified as StreamT
import Data.Stream.Internal (JointState (..))
import Data.Stream.Optimized (
OptimizedStreamT (..),
Expand Down Expand Up @@ -257,6 +258,12 @@ instance (Monad m) => ArrowChoice (Automaton m) where
right (Automaton (Stateless ma)) = Automaton $! Stateless $! ReaderT $! either (pure . Left) (fmap Right . runReaderT ma)
{-# INLINE right #-}

f ||| g = f +++ g >>> arr untag
where
untag (Left x) = x
untag (Right y) = y
{-# INLINE (|||) #-}

-- | Caution, this can make your program hang. Try to use 'feedback' or 'unfold' where possible, or combine 'loop' with 'delay'.
instance (MonadFix m) => ArrowLoop (Automaton m) where
loop (Automaton (Stateless ma)) = Automaton $! Stateless $! ReaderT (\b -> fst <$> mfix ((. snd) $ ($ b) $ curry $ runReaderT ma))
Expand Down Expand Up @@ -374,11 +381,16 @@ embed (Automaton (Stateless m)) = mapM $ runReaderT m

-- * Modifying automata

-- | Change the output type and effect of an automaton without changing its state type.
-- | Change the input and output type and effect of an automaton without changing its state type.
withAutomaton :: (Functor m1, Functor m2) => (forall s. (a1 -> m1 (Result s b1)) -> (a2 -> m2 (Result s b2))) -> Automaton m1 a1 b1 -> Automaton m2 a2 b2
withAutomaton f = Automaton . StreamOptimized.mapOptimizedStreamT (ReaderT . f . runReaderT) . getAutomaton
{-# INLINE withAutomaton #-}

-- | Change the output type and effect of an automaton without changing its state type.
withAutomaton_ :: (Functor m1, Functor m2) => (forall s. m1 (Result s b1) -> m2 (Result s b2)) -> Automaton m1 a b1 -> Automaton m2 a b2
withAutomaton_ f = Automaton . StreamOptimized.mapOptimizedStreamT (mapReaderT f) . getAutomaton
{-# INLINE withAutomaton_ #-}

instance (Monad m) => Profunctor (Automaton m) where
dimap f g Automaton {getAutomaton} = Automaton $ g <$> hoist (withReaderT f) getAutomaton
lmap f Automaton {getAutomaton} = Automaton $ hoist (withReaderT f) getAutomaton
Expand Down Expand Up @@ -519,11 +531,24 @@ sumS = sumFrom zeroVector
-- | Sum up all inputs so far, initialised at 0.
sumN :: (Monad m, Num a) => Automaton m a a
sumN = arr Sum >>> mappendS >>> arr getSum
{-# INLINE sumN #-}

-- | Count the natural numbers, beginning at 1.
count :: (Num n, Monad m) => Automaton m a n
count = feedback 0 $! arr (\(_, n) -> let n' = n + 1 in (n', n'))
{-# INLINE count #-}

-- | Remembers the last 'Just' value, defaulting to the given initialisation value.
lastS :: (Monad m) => a -> Automaton m (Maybe a) a
lastS a = arr Last >>> mappendS >>> arr (getLast >>> fromMaybe a)
{-# INLINE lastS #-}

-- | Call the monadic action once on the first tick and provide its result indefinitely.
initialised :: (Monad m) => (a -> m b) -> Automaton m a b
initialised = Automaton . Stateful . StreamT.initialised . ReaderT
{-# INLINE initialised #-}

-- | Like 'initialised_', but ignores the input.
initialised_ :: (Monad m) => m b -> Automaton m a b
initialised_ = initialised . const
{-# INLINE initialised_ #-}
186 changes: 186 additions & 0 deletions automaton/src/Data/Automaton/Schedule.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
{-# LANGUAGE ExistentialQuantification #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE DerivingStrategies #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}

-- FIXME haddocks
module Data.Automaton.Schedule where

-- base
import Control.Arrow
import Control.Concurrent (forkIO, newEmptyMVar, putMVar, takeMVar, tryTakeMVar, readMVar)
import Control.Monad (forM_, void)
import Data.List.NonEmpty as N
import Control.Monad.Identity (Identity (..))
import Data.Function ((&))
import Data.Functor ((<&>))
import Data.Functor.Compose (Compose (..))
import Data.Kind (Type)
import Control.Monad.IO.Class (MonadIO)
import Data.Maybe (maybeToList, fromMaybe)


import Data.Foldable1 (Foldable1(foldrMap1))

-- transformers
import Control.Monad.Trans.Accum (AccumT (..), runAccumT)
import Control.Monad.Trans.Except (ExceptT (..))
import Control.Monad.Trans.Reader (ReaderT (..))
import Control.Monad.Trans.Writer.CPS qualified as CPS
import Control.Monad.Trans.Writer.Strict qualified as Strict
import Control.Monad.Trans.Class (MonadTrans (..))

-- sop-core
import Data.SOP (I (..), NP (..), SListI, hzipWith, HSequence (htraverse'), hmap, K (..), HCollapse (hcollapse))

-- free
import Control.Monad.Trans.Free (FreeT (..), FreeF (..), liftF, iterT)

-- automaton
import Data.Automaton (Automaton (..), arrM, constM, initialised_, reactimate, withAutomaton_, handleAutomaton, liftS, feedback)
import Data.Automaton.Trans.Except (exceptS)
import Data.Automaton.Trans.Reader (readerS, runReaderS)
import Data.Stream ( StreamT(..), concatS )
import Data.Stream.Result
import Data.Stream.Optimized (toStreamT, OptimizedStreamT (Stateful))
import qualified Data.Automaton as Automaton

class MonadSchedule m where
-- | Run a nonempty list of automata concurrently.
schedule :: NonEmpty (Automaton m a b) -> Automaton m a b

instance MonadSchedule IO where
schedule automata = proc a -> do
(output, input) <- initialised_ startStreams -< ()
arrM $ void . tryTakeMVar -< input
arrM $ uncurry putMVar -< (input, a)
arrM takeMVar -< output
where
startStreams = do
output <- newEmptyMVar
input <- newEmptyMVar
forM_ automata $ \automaton -> forkIO $ reactimate $ lastMVarValue input >>> automaton >>> arrM (putMVar output)
return (output, input)
lastMVarValue var = feedback Nothing $ proc ((), aMaybe) -> do
case aMaybe of
Nothing -> do
a <- constM $ readMVar var -< ()
returnA -< (a, Just a)
Just a -> do
aNewMaybe <- constM $ tryTakeMVar var -< ()
let aNew = fromMaybe a aNewMaybe
returnA -< (aNew, aNewMaybe)

instance (Monad m, MonadSchedule m) => MonadSchedule (ReaderT r m) where
schedule =
fmap runReaderS
>>> schedule
>>> readerS

instance (Monad m, MonadSchedule m) => MonadSchedule (ExceptT e m) where
schedule =
fmap exceptS
>>> schedule
>>> withAutomaton_ (fmap sequenceA >>> ExceptT)

instance (Monoid w, Monad m, MonadSchedule m) => MonadSchedule (CPS.WriterT w m) where
schedule =
fmap (withAutomaton_ (CPS.runWriterT >>> fmap (\(Result s a, w) -> Result s (a, w))))
>>> schedule
>>> withAutomaton_ (fmap (\(Result s (a, w)) -> (Result s a, w)) >>> CPS.writerT)

instance (Monoid w, Monad m, MonadSchedule m) => MonadSchedule (Strict.WriterT w m) where
schedule =
fmap (withAutomaton_ (Strict.runWriterT >>> fmap (\(Result s a, w) -> Result s (a, w))))
>>> schedule
>>> withAutomaton_ (fmap (\(Result s (a, w)) -> (Result s a, w)) >>> Strict.WriterT)

-- FIXME this needs a unit test
instance (Monoid w, Monad m, MonadSchedule m) => MonadSchedule (AccumT w m) where
schedule =
fmap (withAutomaton_ (runAccumT >>> ReaderT >>> CPS.writerT))
>>> schedule
>>> withAutomaton_ (CPS.runWriterT >>> runReaderT >>> AccumT)


-- FIXME MaybeT, other WriterT
instance MonadSchedule Identity where
schedule = fmap (getAutomaton >>> toStreamT)
>>> foldrMap1 buildStreams consStreams
>>> roundRobinStreams >>> fmap N.toList >>> concatS >>> Stateful >>> Automaton
where
buildStreams :: StreamT m b -> Streams m b
buildStreams StreamT {state, step} = Streams
{ states = I state :* Nil
, steps = Step (ResultStateT step) :* Nil
}

consStreams :: StreamT m b -> Streams m b -> Streams m b
consStreams StreamT {state, step} Streams {states, steps} =Streams
{ states = I state :* states
, steps = Step (ResultStateT step) :* steps
}

-- FIXME take care to reverse & test

roundRobinStreams :: (Functor m, Applicative m) => Streams m b -> StreamT m (NonEmpty b)
roundRobinStreams Streams {states, steps} =
StreamT
{ state = states
, step = \s ->
s
& hzipWith (\Step {getStep} (I s) -> getResultStateT getStep s <&> RunningResult & Compose) steps
& htraverse' getCompose
<&> (\results -> Result
(results & hmap (getRunningResult >>> resultState >>> I))
(results & hmap (getRunningResult >>> output >>> K) & hnonemptycollapse))
}

hnonemptycollapse :: SListI as => NP (K b) (a ': as) -> NonEmpty b
hnonemptycollapse (K a :* as) = a :| hcollapse as

-- | A nonempty list of 'StreamT's, unzipped into their states and their steps.
data Streams m b = forall state (states :: [Type]).
(SListI states) =>
Streams
{ states :: NP I (state ': states)
, steps :: NP (Step m b) (state ': states)
}

-- | One step of a stream, with the state type argument going last, so it is usable with sop-core.
newtype Step m b state = Step {getStep :: ResultStateT state m b}

-- | The result of a stream, with the type arguments swapped, so it's usable with sop-core
newtype RunningResult b state = RunningResult {getRunningResult :: Result state b}

-- * Symbolic yielding/suspension operation

newtype YieldT m a = YieldT {getYieldT :: FreeT Identity m a}
deriving newtype (Functor, Applicative, Monad, MonadTrans, MonadIO)

type Yield = YieldT Identity

yieldAutomaton :: (Functor m, Monad m) => Automaton (YieldT m) a b -> Automaton m a (Maybe b)
yieldAutomaton = handleAutomaton $ \StreamT {state, step} -> StreamT
{state = step state
, step = \s -> ReaderT $ \a -> do
oneTick <- runFreeT $ getYieldT $ runReaderT s a
return $ case oneTick of
Pure (Result s' b) -> Result (step s') (Just b)
Free (Identity cont) -> Result (lift $ YieldT cont) Nothing
}-- FIXME Could do without do. Or maybe just use applicative do?

instance (Monad m, MonadSchedule m) => MonadSchedule (YieldT m) where
schedule = fmap yieldAutomaton >>> schedule >>> fmap maybeToList >>> Automaton.concatS >>> liftS

yield :: Monad m => YieldT m ()
yield = YieldT $ liftF $ pure ()

runYieldT :: Monad m => YieldT m a -> m a
runYieldT = iterT runIdentity . getYieldT

runYieldTWith :: Monad m => m () -> YieldT m a -> m a
runYieldTWith action = iterT (\ima -> action >> runIdentity ima) . getYieldT

runYield :: Yield a -> a
runYield = runIdentity . runYieldT
13 changes: 13 additions & 0 deletions automaton/src/Data/Stream.hs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,17 @@ constM :: (Functor m) => m a -> StreamT m a
constM ma = StreamT () $ const $ Result () <$> ma
{-# INLINE constM #-}

-- | Call the monadic action once on the first tick and provide its result indefinitely.
initialised :: (Monad m) => m a -> StreamT m a
initialised action =
let step mr@(Just r) = pure $! Result mr r
step Nothing = (step . Just =<< action)
in StreamT
{ state = Nothing
, step
}
{-# INLINE initialised #-}

instance (Functor m) => Functor (StreamT m) where
fmap f StreamT {state, step} = StreamT state $! fmap (fmap f) <$> step
{-# INLINE fmap #-}
Expand Down Expand Up @@ -232,6 +243,8 @@ withStreamT f StreamT {state, step} = StreamT state $ fmap f step
This function lets a stream control the speed at which it produces data,
since it can decide to produce any amount of output at every step.
-}
-- FIXME this reverses? doc?
-- FIXME generalise to traversable?
concatS :: (Monad m) => StreamT m [a] -> StreamT m a
concatS StreamT {state, step} =
StreamT
Expand Down
4 changes: 3 additions & 1 deletion automaton/src/Data/Stream/Result.hs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
{-# LANGUAGE DeriveFoldable #-}
{-# LANGUAGE DeriveFunctor #-}
{-# LANGUAGE DeriveTraversable #-}
{-# LANGUAGE StrictData #-}

module Data.Stream.Result where
Expand All @@ -15,7 +17,7 @@ This type is used in streams and automata to encode the result of a state transi
The new state should always be strict to avoid space leaks.
-}
data Result s a = Result {resultState :: s, output :: ~a}
deriving (Functor)
deriving (Functor, Foldable, Traversable)

instance Bifunctor Result where
second = fmap
Expand Down
21 changes: 0 additions & 21 deletions flake.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 0 additions & 8 deletions flake.nix
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,6 @@

inputs = {
nixpkgs.url = "github:NixOS/nixpkgs/nixos-unstable-small";
monad-schedule = {
url = "github:turion/monad-schedule";
inputs.nixpkgs.follows = "nixpkgs";
};
};

outputs = inputs:
Expand Down Expand Up @@ -64,9 +60,6 @@
}
{ };
})
(hfinal: hprev: lib.optionalAttrs prev.stdenv.isDarwin {
monad-schedule = dontCheck hprev.monad-schedule;
})
(hfinal: hprev: lib.optionalAttrs (lib.versionOlder hprev.ghc.version "9.4") {
time-domain = doJailbreak hprev.time-domain;
})
Expand Down Expand Up @@ -144,7 +137,6 @@

overlay = lib.composeManyExtensions
[
inputs.monad-schedule.overlays.default
localOverlay
];

Expand Down
Loading
Loading