Skip to content

Commit

Permalink
hstream-gossip: catch all asyncs
Browse files Browse the repository at this point in the history
  • Loading branch information
4eUeP committed Feb 19, 2024
1 parent 5360678 commit 30e898f
Show file tree
Hide file tree
Showing 7 changed files with 26 additions and 9 deletions.
6 changes: 4 additions & 2 deletions hstream-gossip/src/HStream/Gossip/Gossip.hs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ import qualified HStream.Gossip.HStreamGossip as G
import HStream.Gossip.Types (GossipContext (..),
GossipOpts (..),
RequestAction (..))
import HStream.Gossip.Utils (getMessagesToSend,
import HStream.Gossip.Utils (foreverCatchAll,
getMessagesToSend,
getOtherMembersSTM,
mkClientNormalRequest)
import qualified HStream.Logger as Log
Expand All @@ -37,7 +38,8 @@ doGossip client msgs = do
scheduleGossip :: GossipContext -> IO ()
scheduleGossip gc@GossipContext{..} = do
_ <- readMVar clusterInited
forever $ do
-- FIXME: does catch all exception(SomeException) proper here?
foreverCatchAll True "ScheduleGossip" $ do
atomically doGossip
threadDelay $ gossipInterval gossipOpts
where
Expand Down
8 changes: 5 additions & 3 deletions hstream-gossip/src/HStream/Gossip/Handlers.hs
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,16 @@ import HStream.Gossip.Types (EventMessage (EventMessage),
import qualified HStream.Gossip.Types as T
import HStream.Gossip.Utils (broadcastMessage,
eventNameINIT, eventNameINITED,
incrementTVar,
foreverCatchAll, incrementTVar,
initServerStatus,
updateLamportTime)
import HStream.Gossip.Worker (addToServerList, initGossip)
import qualified HStream.Logger as Log
import qualified HStream.Server.HStreamInternal as I

-- FIXME: does catch all exception(SomeException) proper here?
runStateHandler :: GossipContext -> IO ()
runStateHandler gc@GossipContext{..} = forever $ do
runStateHandler gc@GossipContext{..} = foreverCatchAll True "GossipStateHandler" $ do
newMsgs <- atomically $ do
void $ peekTQueue statePool
flushTQueue statePool
Expand Down Expand Up @@ -141,8 +142,9 @@ handleStateMessage gc@GossipContext{..} msg@(T.GAlive i [email protected]{..} _n

handleStateMessage _ _ = throwIOError "illegal state message"

-- FIXME: does catch all exception(SomeException) proper here?
runEventHandler :: GossipContext -> IO ()
runEventHandler gc@GossipContext{..} = forever $ do
runEventHandler gc@GossipContext{..} = foreverCatchAll True "GossipEventHandler" $ do
newMsgs <- atomically $ do
void $ peekTQueue eventPool
flushTQueue eventPool
Expand Down
4 changes: 3 additions & 1 deletion hstream-gossip/src/HStream/Gossip/Probe.hs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import HStream.Gossip.Utils (ClusterInitedErr (..),
ClusterReadyErr (..),
broadcast, clusterInitedErr,
clusterReadyErr,
foreverCatchAll,
getMessagesToSend,
getOtherMembersSTM,
mkClientNormalRequest,
Expand Down Expand Up @@ -158,7 +159,8 @@ doPing client gc@GossipContext{gossipOpts = GossipOpts{..}, ..}
scheduleProbe :: GossipContext -> IO ()
scheduleProbe gc@GossipContext{..} = do
_ <- readMVar clusterInited
forever $ do
-- FIXME: does catch all exception(SomeException) proper here?
foreverCatchAll True "Gossip ScheduleProbe" $ do
memberMap <- atomically $ do
memberMap <- getOtherMembersSTM gc
check (not $ null memberMap)
Expand Down
6 changes: 4 additions & 2 deletions hstream-gossip/src/HStream/Gossip/Reconnect.hs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ import HStream.Gossip.Types (GossipContext (..),
RequestAction (..),
ServerStatus (..))
import qualified HStream.Gossip.Types as T
import HStream.Gossip.Utils (getMemberList, getMsgInc,
import HStream.Gossip.Utils (foreverCatchAll,
getMemberList, getMsgInc,
mkClientNormalRequest',
mkGRPCClientConf')
import HStream.Logger as Log
Expand All @@ -35,7 +36,8 @@ import qualified HStream.Server.HStreamInternal as I
scheduleReconnect :: GossipContext -> IO ()
scheduleReconnect gc@GossipContext{..} = do
_ <- readMVar clusterInited
forever $ do
-- FIXME: does catch all exception(SomeException) proper here?
foreverCatchAll True "Gossip ScheduleReconnect" $ do
memberMap <- atomically $ do
memberMap <- readTVar deadServers
check (not $ Map.null memberMap)
Expand Down
9 changes: 8 additions & 1 deletion hstream-gossip/src/HStream/Gossip/Utils.hs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import Control.Concurrent.STM (STM, TQueue, TVar,
writeTVar)
import Control.Exception (Handler (..))
import Control.Exception.Base
import Control.Monad (filterM, unless)
import Control.Monad (filterM, forever, unless)
import Data.ByteString (ByteString)
import Data.Foldable (foldl')
import Data.Functor
Expand Down Expand Up @@ -344,3 +344,10 @@ getClusterStatus gc@GossipContext {..} = do
helper state node@ServerNode{..} =
(serverNodeId, ServerNodeStatus { serverNodeStatusNode = Just node
, serverNodeStatusState = EnumPB state})

-- FIXME: how about mv this to HStream.Base?
foreverCatchAll :: Bool -> String -> IO () -> IO a
foreverCatchAll shouldThrow label f =
forever $ catch f $ \(e :: SomeException) -> do
Log.fatal $ Log.buildString label <> ", error: " <> Log.buildString' e
if shouldThrow then throwIO e else threadDelay 1000000
1 change: 1 addition & 0 deletions hstream/app/lib/KafkaServer.hs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ app config@ServerOpts{..} = do
serverContext <- initServerContext config gossipContext h
putMVar scMVar serverContext

-- FIXME: safer way to handle this: what if updateHashRing failed?
void . forkIO $ updateHashRing gossipContext (loadBalanceHashRing serverContext)

-- TODO: support tls (_tlsConfig)
Expand Down
1 change: 1 addition & 0 deletions hstream/app/server.hs
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ app config@ServerOpts{..} = do
serverContext <- initializeServer config gossipContext h db_m
putMVar scMVar serverContext

-- FIXME: safer way to handle this: what if updateHashRing failed?
void . forkIO $ updateHashRing gossipContext (loadBalanceHashRing serverContext)

#ifdef HStreamUseGrpcHaskell
Expand Down

0 comments on commit 30e898f

Please sign in to comment.