Skip to content


WIP: going back and forth on how to manage state in tests
Browse files Browse the repository at this point in the history
  • Loading branch information
cardenaso11 committed Feb 19, 2024
1 parent 1fef506 commit 906091d
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 4 deletions.
84 changes: 82 additions & 2 deletions hydra-node/src/Hydra/Persistence.hs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ putEventToSinks sinks e = forM_ sinks (\sink -> putEvent' sink e)
putEventsToSinks :: forall m e. (Monad m, ToJSON e) => NonEmpty (EventSink e m) -> [e] -> m ()
putEventsToSinks sinks es = forM_ es (\e -> putEventToSinks sinks e)

-- FIXME(Elaine): this needs to be the reverse, since we need to keep track of the eventID
-- FIXME(Elaine): neither this nor the opposite direction can handle re-submission properly without keeping track of the state separately in step/hydranode &
-- so this means removing the old persistence for the purpose of statechanged events is more urgent
eventPairFromPersistenceIncremental :: PersistenceIncremental a m -> (EventSource a m, EventSink a m)
Expand Down Expand Up @@ -141,7 +140,88 @@ createNewPersistenceIncremental fp = do
{ putEvent' = \a -> do
threadId <- myThreadId
isEventNew <- atomically $ do
let stateChangeID = (undefined a) :: Word64
let stateChangeID = undefined a
-- FIXME(Elaine): we need to put getStateChangeID into a typeclass and add that constraint to a, in the EventSink type
-- or we can have separate versions of this for StateChanged, and for network functionality etc

let outgoingStateChangeId = stateChangeID
-- outgoingStateChangeId <- readTVar $ stateChangeID -- this is the ID of the state change we just got from the node
-- it's not actually written to disk yet until this function is over

id <- readTVar nextId
writeTVar authorizedThread $ Just threadId
modifyTVar' nextId succ
pure $ outgoingStateChangeId `compare` id
let bytes = toStrict $ Aeson.encode a <> "\n"
case isEventNew of
-- event already persisted
LT -> pure ()
-- event is as new as expected
EQ -> liftIO $ withBinaryFile fp AppendMode (`BS.hPut` bytes)
-- event is newer than expected,
GT -> do
liftIO $ putStrLn "ELAINE: this shouldn't happen with my current understanding of stuff"
liftIO $ withBinaryFile fp AppendMode (`BS.hPut` bytes) -- FIXME(Elaine): maybe error ? shouldnt really happen
eventSinks = eventSink :| []
pure NewPersistenceIncremental{eventSource, eventSinks, lastStateChangeId}

createNewPersistenceIncrementalStateChanged ::
(MonadIO m, MonadThrow m, MonadSTM m, MonadThread m, MonadThrow (STM m)) =>
FilePath ->
m (NewPersistenceIncremental (StateChanged a) m)
createNewPersistenceIncrementalStateChanged = createNewPersistenceIncrementalGeneric getStateChangeID

--FIXME(Elaine): find a better name
createNewPersistenceIncrementalGeneric ::
(MonadIO m, MonadThrow m, MonadSTM m, MonadThread m, MonadThrow (STM m)) =>
(a -> Word64) ->
FilePath ->
m (NewPersistenceIncremental a m)
createNewPersistenceIncrementalGeneric getID fp = do
liftIO . createDirectoryIfMissing True $ takeDirectory fp
authorizedThread <- newTVarIO Nothing
lastStateChangeId <- newTVarIO (0 :: Word64)
-- FIXME(Elaine): eventid too general for this, at least not without writing the eventids to disk, but even then, hacky
-- we'll have a new ID for each statechanged event,
-- i think this is probablyh fine and doesn't need fixing, but wanted to write it down first
-- the eventId here is a monotonically increasing integer, and it lets us keep track of how "far along" we are in the persistence
-- we can use this to skip resubmitting events
-- more complicated solutions would be possible, in particular, rolling hash / merkle chain might be more resilient to corruption
-- but given that persistence is already atomic and only needs to be consistent within a single node, it should suffice
nextId <- newTVarIO (0 :: Word64)
let eventSource =
{ getEvents' = do
tid <- myThreadId
atomically $ do
authTid <- readTVar authorizedThread
when (isJust authTid && authTid /= Just tid) $
throwSTM (IncorrectAccessException $ "Trying to load persisted data in " <> fp <> " from different thread")

liftIO (doesFileExist fp) >>= \case
False -> pure []
True -> do
bs <- readFileBS fp
-- NOTE: We require the whole file to be loadable. It might
-- happen that the data written by 'append' is only there
-- partially and then this will fail (which we accept now).
result <- case forM (C8.lines bs) Aeson.eitherDecodeStrict' of
Left e -> throwIO $ PersistenceException e
Right decoded -> pure decoded
-- set initial nextId (zero-indexed) based on how many state change events we have
atomically $ do
writeTVar lastStateChangeId $ fromIntegral $ length result
writeTVar nextId . fromIntegral $ length result

pure result
eventSink =
{ putEvent' = \a -> do
threadId <- myThreadId
isEventNew <- atomically $ do
let stateChangeID = getID a
-- FIXME(Elaine): we need to put getStateChangeID into a typeclass and add that constraint to a, in the EventSink type
-- or we can have separate versions of this for StateChanged, and for network functionality etc

Expand Down
8 changes: 6 additions & 2 deletions hydra-node/test/Hydra/HeadLogicSpec.hs
Original file line number Diff line number Diff line change
Expand Up @@ -716,6 +716,9 @@ data StepState tx = StepState
{ headState :: HeadState tx
, env :: Environment
, ledger :: Ledger tx
, lastStateChangeId :: Word64
-- TODO(Elaine): should this be folded into HeadState?
-- the type of aggregate kinda suggests it

-- | Retrieves the latest 'HeadState' from within 'runEvents'.
Expand All @@ -728,8 +731,9 @@ step ::
Event tx ->
m (Outcome tx)
step event = do
StepState{headState, env, ledger} <- get
let outcome = update env ledger headState event
StepState{headState, env, ledger, lastStateChangeId} <- get
let nextStateChangeID = succ lastStateChangeId
let outcome = update env ledger nextStateChangeID headState event
let headState' = aggregateState headState outcome
put StepState{env, ledger, headState = headState'}
pure outcome
Expand Down

0 comments on commit 906091d

Please sign in to comment.