diff --git a/hstream-gossip/src/HStream/Gossip/Gossip.hs b/hstream-gossip/src/HStream/Gossip/Gossip.hs index ab431a12b..f41e4f15a 100644 --- a/hstream-gossip/src/HStream/Gossip/Gossip.hs +++ b/hstream-gossip/src/HStream/Gossip/Gossip.hs @@ -21,7 +21,8 @@ import qualified HStream.Gossip.HStreamGossip as G import HStream.Gossip.Types (GossipContext (..), GossipOpts (..), RequestAction (..)) -import HStream.Gossip.Utils (getMessagesToSend, +import HStream.Gossip.Utils (foreverCatchAll, + getMessagesToSend, getOtherMembersSTM, mkClientNormalRequest) import qualified HStream.Logger as Log @@ -37,7 +38,8 @@ doGossip client msgs = do scheduleGossip :: GossipContext -> IO () scheduleGossip gc@GossipContext{..} = do _ <- readMVar clusterInited - forever $ do + -- FIXME: does catch all exception(SomeException) proper here? + foreverCatchAll True "ScheduleGossip" $ do atomically doGossip threadDelay $ gossipInterval gossipOpts where diff --git a/hstream-gossip/src/HStream/Gossip/Handlers.hs b/hstream-gossip/src/HStream/Gossip/Handlers.hs index 7ed0200ac..2651c13a9 100644 --- a/hstream-gossip/src/HStream/Gossip/Handlers.hs +++ b/hstream-gossip/src/HStream/Gossip/Handlers.hs @@ -35,15 +35,16 @@ import HStream.Gossip.Types (EventMessage (EventMessage), import qualified HStream.Gossip.Types as T import HStream.Gossip.Utils (broadcastMessage, eventNameINIT, eventNameINITED, - incrementTVar, + foreverCatchAll, incrementTVar, initServerStatus, updateLamportTime) import HStream.Gossip.Worker (addToServerList, initGossip) import qualified HStream.Logger as Log import qualified HStream.Server.HStreamInternal as I +-- FIXME: does catch all exception(SomeException) proper here? runStateHandler :: GossipContext -> IO () -runStateHandler gc@GossipContext{..} = forever $ do +runStateHandler gc@GossipContext{..} = foreverCatchAll True "GossipStateHandler" $ do newMsgs <- atomically $ do void $ peekTQueue statePool flushTQueue statePool @@ -141,8 +142,9 @@ handleStateMessage gc@GossipContext{..} msg@(T.GAlive i node@I.ServerNode{..} _n handleStateMessage _ _ = throwIOError "illegal state message" +-- FIXME: does catch all exception(SomeException) proper here? runEventHandler :: GossipContext -> IO () -runEventHandler gc@GossipContext{..} = forever $ do +runEventHandler gc@GossipContext{..} = foreverCatchAll True "GossipEventHandler" $ do newMsgs <- atomically $ do void $ peekTQueue eventPool flushTQueue eventPool diff --git a/hstream-gossip/src/HStream/Gossip/Probe.hs b/hstream-gossip/src/HStream/Gossip/Probe.hs index cfc225b67..876ee8696 100644 --- a/hstream-gossip/src/HStream/Gossip/Probe.hs +++ b/hstream-gossip/src/HStream/Gossip/Probe.hs @@ -45,6 +45,7 @@ import HStream.Gossip.Utils (ClusterInitedErr (..), ClusterReadyErr (..), broadcast, clusterInitedErr, clusterReadyErr, + foreverCatchAll, getMessagesToSend, getOtherMembersSTM, mkClientNormalRequest, @@ -158,7 +159,8 @@ doPing client gc@GossipContext{gossipOpts = GossipOpts{..}, ..} scheduleProbe :: GossipContext -> IO () scheduleProbe gc@GossipContext{..} = do _ <- readMVar clusterInited - forever $ do + -- FIXME: does catch all exception(SomeException) proper here? + foreverCatchAll True "Gossip ScheduleProbe" $ do memberMap <- atomically $ do memberMap <- getOtherMembersSTM gc check (not $ null memberMap) diff --git a/hstream-gossip/src/HStream/Gossip/Reconnect.hs b/hstream-gossip/src/HStream/Gossip/Reconnect.hs index 5805eebd0..ac67fbfeb 100644 --- a/hstream-gossip/src/HStream/Gossip/Reconnect.hs +++ b/hstream-gossip/src/HStream/Gossip/Reconnect.hs @@ -26,7 +26,8 @@ import HStream.Gossip.Types (GossipContext (..), RequestAction (..), ServerStatus (..)) import qualified HStream.Gossip.Types as T -import HStream.Gossip.Utils (getMemberList, getMsgInc, +import HStream.Gossip.Utils (foreverCatchAll, + getMemberList, getMsgInc, mkClientNormalRequest', mkGRPCClientConf') import HStream.Logger as Log @@ -35,7 +36,8 @@ import qualified HStream.Server.HStreamInternal as I scheduleReconnect :: GossipContext -> IO () scheduleReconnect gc@GossipContext{..} = do _ <- readMVar clusterInited - forever $ do + -- FIXME: does catch all exception(SomeException) proper here? + foreverCatchAll True "Gossip ScheduleReconnect" $ do memberMap <- atomically $ do memberMap <- readTVar deadServers check (not $ Map.null memberMap) diff --git a/hstream-gossip/src/HStream/Gossip/Utils.hs b/hstream-gossip/src/HStream/Gossip/Utils.hs index 93a7ba220..56f7c575a 100644 --- a/hstream-gossip/src/HStream/Gossip/Utils.hs +++ b/hstream-gossip/src/HStream/Gossip/Utils.hs @@ -15,7 +15,7 @@ import Control.Concurrent.STM (STM, TQueue, TVar, writeTVar) import Control.Exception (Handler (..)) import Control.Exception.Base -import Control.Monad (filterM, unless) +import Control.Monad (filterM, forever, unless) import Data.ByteString (ByteString) import Data.Foldable (foldl') import Data.Functor @@ -344,3 +344,10 @@ getClusterStatus gc@GossipContext {..} = do helper state node@ServerNode{..} = (serverNodeId, ServerNodeStatus { serverNodeStatusNode = Just node , serverNodeStatusState = EnumPB state}) + +-- FIXME: how about mv this to HStream.Base? +foreverCatchAll :: Bool -> String -> IO () -> IO a +foreverCatchAll shouldThrow label f = + forever $ catch f $ \(e :: SomeException) -> do + Log.fatal $ Log.buildString label <> ", error: " <> Log.buildString' e + if shouldThrow then throwIO e else threadDelay 1000000 diff --git a/hstream/app/lib/KafkaServer.hs b/hstream/app/lib/KafkaServer.hs index bb89821fb..fb8645296 100644 --- a/hstream/app/lib/KafkaServer.hs +++ b/hstream/app/lib/KafkaServer.hs @@ -120,6 +120,7 @@ app config@ServerOpts{..} = do serverContext <- initServerContext config gossipContext h putMVar scMVar serverContext + -- FIXME: safer way to handle this: what if updateHashRing failed? void . forkIO $ updateHashRing gossipContext (loadBalanceHashRing serverContext) -- TODO: support tls (_tlsConfig) diff --git a/hstream/app/server.hs b/hstream/app/server.hs index 7b99e5cdf..79c85492b 100644 --- a/hstream/app/server.hs +++ b/hstream/app/server.hs @@ -151,6 +151,7 @@ app config@ServerOpts{..} = do serverContext <- initializeServer config gossipContext h db_m putMVar scMVar serverContext + -- FIXME: safer way to handle this: what if updateHashRing failed? void . forkIO $ updateHashRing gossipContext (loadBalanceHashRing serverContext) #ifdef HStreamUseGrpcHaskell