Skip to content

Commit

Permalink
Rename getEvents' -> getEvents and putEvent' -> putEvent
Browse files Browse the repository at this point in the history
  • Loading branch information
ch1bo committed Mar 4, 2024
1 parent 21dff6a commit 9b9da57
Show file tree
Hide file tree
Showing 6 changed files with 40 additions and 25 deletions.
23 changes: 15 additions & 8 deletions hydra-node/src/Hydra/Events.hs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
-- | This module defines the types and functions for creating Hydra
-- 'EventSource' and 'EventSink' instances and is intended to be used as an
-- extension point. A single 'EventSource' and zero or more 'EventSink' handles
-- are used by the main 'HydraNode' handle to load and send out events.
-- | This module defines the types and functions for creating 'EventSource' and
-- 'EventSink' instances and is intended to be used as an extension point.
--
-- A single 'EventSource' and zero or more 'EventSink' handles are used by the
-- main 'HydraNode' handle to load and send out events.
--
-- TODO: add an example event source sink (on top of the persistence one)
module Hydra.Events where
Expand All @@ -19,12 +20,18 @@ class HasEventId a where
instance HasEventId (EventId, a) where
getEventId = fst

newtype EventSource e m = EventSource {getEvents' :: HasEventId e => m [e]}
newtype EventSource e m = EventSource
{ getEvents :: HasEventId e => m [e]
-- ^ Retrieve all events from the event source.
}

newtype EventSink e m = EventSink {putEvent' :: HasEventId e => e -> m ()}
newtype EventSink e m = EventSink
{ putEvent :: HasEventId e => e -> m ()
-- ^ Send a single event to the event sink.
}

putEventToSinks :: (Monad m, HasEventId e) => [EventSink e m] -> e -> m ()
putEventToSinks sinks e = forM_ sinks (\sink -> putEvent' sink e)
putEventToSinks sinks e = forM_ sinks $ \sink -> putEvent sink e

putEventsToSinks :: (Monad m, HasEventId e) => [EventSink e m] -> [e] -> m ()
putEventsToSinks sinks es = forM_ es (\e -> putEventToSinks sinks e)
putEventsToSinks sinks = mapM_ (putEventToSinks sinks)
2 changes: 1 addition & 1 deletion hydra-node/src/Hydra/HeadLogic/Outcome.hs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ instance HasEventId (StateChanged tx) where
TickObserved{stateChangeID} -> stateChangeID

-- FIXME(Elaine): these stateChangeID fields were added in an attempt to make every StateChanged keep track of its ID
-- it's not clear how to handle the state for this. but for now the field is kept so that the type of putEvent' can be kept simple, and shouldn't do harm
-- it's not clear how to handle the state for this. but for now the field is kept so that the type of putEvent can be kept simple, and shouldn't do harm

instance (IsTx tx, Arbitrary (HeadState tx), Arbitrary (ChainStateType tx)) => Arbitrary (StateChanged tx) where
arbitrary = genericArbitrary
Expand Down
18 changes: 13 additions & 5 deletions hydra-node/src/Hydra/Node.hs
Original file line number Diff line number Diff line change
Expand Up @@ -253,20 +253,28 @@ processEffects ::
Word64 ->
Outcome tx ->
m ()
processEffects HydraNode{hn, oc = Chain{postTx}, server, eq, env = Environment{party}} tracer eventId outcome = do
processEffects node tracer eventId outcome = do
mapM_ processEffect $ zip (collectEffects outcome) [0 ..]
where
processEffect (effect, effectId) = do
traceWith tracer $ BeginEffect party eventId effectId effect
case effect of
ClientEffect i -> sendOutput server i
NetworkEffect msg -> broadcast hn msg >> putEvent eq (NetworkEvent defaultTTL party msg)
NetworkEffect msg -> broadcast hn msg >> putEvent (NetworkEvent defaultTTL party msg)
OnChainEffect{postChainTx} ->
postTx postChainTx
`catch` \(postTxError :: PostTxError tx) ->
putEvent eq $ PostTxError{postChainTx, postTxError}
putEvent $ PostTxError{postChainTx, postTxError}
traceWith tracer $ EndEffect party eventId effectId

HydraNode
{ hn
, oc = Chain{postTx}
, server
, eq = EventQueue{putEvent}
, env = Environment{party}
} = node

-- ** Manage state

-- | Handle to access and modify the state in the Hydra Node.
Expand Down Expand Up @@ -294,7 +302,7 @@ loadState ::
ChainStateType tx ->
m (HeadState tx, ChainStateHistory tx)
loadState tracer eventSource defaultChainState = do
events <- getEvents' eventSource
events <- getEvents eventSource
traceWith tracer LoadedState{numberOfEvents = fromIntegral $ length events}
let headState = recoverState initialState events
chainStateHistory = recoverChainStateHistory defaultChainState events
Expand All @@ -310,7 +318,7 @@ loadStateEventSource ::
ChainStateType tx ->
m (HeadState tx, ChainStateHistory tx)
loadStateEventSource tracer eventSource eventSinks defaultChainState = do
events <- getEvents' eventSource
events <- getEvents eventSource
traceWith tracer LoadedState{numberOfEvents = fromIntegral $ length events}
let headState = recoverState initialState events
chainStateHistory = recoverChainStateHistory defaultChainState events
Expand Down
4 changes: 2 additions & 2 deletions hydra-node/src/Hydra/Persistence.hs
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ eventPairFromPersistenceIncremental ::
PersistenceIncremental a m ->
(EventSource a m, EventSink a m)
eventPairFromPersistenceIncremental PersistenceIncremental{append, loadAll} =
let eventSource = EventSource{getEvents' = loadAll}
eventSink = EventSink{putEvent' = append}
let eventSource = EventSource{getEvents = loadAll}
eventSink = EventSink{putEvent = append}
in (eventSource, eventSink)

-- | Initialize persistence handle for given type 'a' at given file path.
Expand Down
8 changes: 4 additions & 4 deletions hydra-node/test/Hydra/NodeSpec.hs
Original file line number Diff line number Diff line change
Expand Up @@ -252,8 +252,8 @@ createHydraNode =
where
append = const $ pure ()
loadAll = pure []
eventSource = EventSource{getEvents' = loadAll}
eventSink = EventSink{putEvent' = append}
eventSource = EventSource{getEvents = loadAll}
eventSink = EventSink{putEvent = append}

createHydraNode' ::
(MonadDelay m, MonadAsync m, MonadLabelledSTM m, MonadThrow m) =>
Expand Down Expand Up @@ -311,12 +311,12 @@ recordPersistedItems node = do
(record, query) <- messageRecorder
lastStateChangeId <- newTVarIO 0
-- pure (node{persistence = PersistenceIncremental{append = record, loadAll = pure []}}, query)
let putEvent' = \e -> do
let putEvent = \e -> do
atomically $ modifyTVar lastStateChangeId succ
record e
pure
( node
{ eventSinks = eventSinks node <> [EventSink{putEvent'}]
{ eventSinks = eventSinks node <> [EventSink{putEvent}]
}
, query
)
Expand Down
10 changes: 5 additions & 5 deletions hydra-node/test/Hydra/PersistenceSpec.hs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import Test.Hydra.Prelude
import Data.Aeson (Value (..))
import Data.Aeson qualified as Aeson
import Data.Text qualified as Text
import Hydra.Events (EventId, getEvents', putEventsToSinks)
import Hydra.Events (EventId, getEvents, putEventsToSinks)
import Hydra.Persistence (Persistence (..), PersistenceException (..), PersistenceIncremental (..), createPersistence, createPersistenceIncremental, eventPairFromPersistenceIncremental)
import Test.QuickCheck (checkCoverage, cover, elements, oneof, suchThat, (===))
import Test.QuickCheck.Gen (listOf)
Expand Down Expand Up @@ -96,13 +96,13 @@ spec = do
-- but it is an okay reference point
-- maybe in node?
-- test for loadStateEventSource
getEvents' eventSource >>= putEventsToSinks eventSinks
getEvents eventSource >>= putEventsToSinks eventSinks

-- after loading our node, all sinks should recieved the same events
getEvents' sink1Source `shouldReturn` items
getEvents' sink2Source `shouldReturn` items
getEvents sink1Source `shouldReturn` items
getEvents sink2Source `shouldReturn` items
-- including the event source itself, which will now have duplicated events, at least by current definition
getEvents' eventSource `shouldReturn` (items <> items)
getEvents eventSource `shouldReturn` (items <> items)
pure ()

genPersistenceItem :: Gen Aeson.Value
Expand Down

0 comments on commit 9b9da57

Please sign in to comment.