Skip to content

Commit

Permalink
hserver: catch exception for stats handler and gossip nodeChangeEvent (
Browse files Browse the repository at this point in the history
  • Loading branch information
YangKian authored Feb 7, 2024
1 parent 78d505d commit 5360678
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 8 deletions.
21 changes: 18 additions & 3 deletions hstream/app/server.hs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,11 @@
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeApplications #-}

import Control.Concurrent (forkIO, newEmptyMVar,
import Control.Concurrent (MVar, forkIO, newEmptyMVar,
putMVar, readMVar)
import qualified Control.Concurrent.Async as Async
import Control.Exception (bracket, handle)
import Control.Exception (SomeException, bracket,
catch, handle)
import Control.Monad (forM, forM_, join, void,
when)
import Data.ByteString (ByteString)
Expand Down Expand Up @@ -91,6 +92,8 @@ import qualified Network.GRPC.HighLevel.Generated as GRPC
import Text.RawString.QQ (r)
#endif

import qualified HStream.Gossip.Types as Gossip

-------------------------------------------------------------------------------

main :: IO ()
Expand Down Expand Up @@ -143,7 +146,7 @@ app config@ServerOpts{..} = do
}

scMVar <- newEmptyMVar
gossipContext <- initGossipContext defaultGossipOpts mempty (Just $ Cluster.nodeChangeEventHandler scMVar) serverNode _seedNodes
gossipContext <- initGossipContext defaultGossipOpts mempty (Just $ nodeChangeEventHd scMVar) serverNode _seedNodes

serverContext <- initializeServer config gossipContext h db_m
putMVar scMVar serverContext
Expand Down Expand Up @@ -342,3 +345,15 @@ showVersion = do
API.HStreamVersion{..} <- getHStreamVersion
putStrLn $ "version: " <> T.unpack hstreamVersionVersion
<> " (" <> T.unpack hstreamVersionCommit <> ")"

-- TODO: here, we just catch `SomeException` and then show some error messages.
--
-- 1. There may need to be a uniform way to handle all gossip handles(threads).
-- 2. Retry retriable actions
nodeChangeEventHd
:: MVar ServerContext
-> Gossip.ServerState
-> I.ServerNode
-> IO ()
nodeChangeEventHd m s n = catch (Cluster.nodeChangeEventHandler m s n) $
\(e :: SomeException) -> Log.fatal $ "handle node change event error: " <> Log.buildString' e
8 changes: 4 additions & 4 deletions hstream/src/HStream/Server/Handler/ShardReader.hs
Original file line number Diff line number Diff line change
Expand Up @@ -60,15 +60,15 @@ createShardReaderHandler
-> ServerRequest 'Normal CreateShardReaderRequest CreateShardReaderResponse
-> IO (ServerResponse 'Normal CreateShardReaderResponse)
createShardReaderHandler sc (ServerNormalRequest _metadata request) = defaultExceptionHandle $ do
Log.debug $ "Receive Create ShardReader Request" <> Log.buildString' (show request)
Log.info $ "Receive Create ShardReader Request" <> Log.buildString' (show request)
validateCreateShardReader request
C.createShardReader sc request >>= returnResp

handleCreateShardReader
:: ServerContext
-> G.UnaryHandler CreateShardReaderRequest CreateShardReaderResponse
handleCreateShardReader sc _ req = catchDefaultEx $ do
Log.debug $ "Receive Create ShardReader Request" <> Log.buildString' (show req)
Log.info $ "Receive Create ShardReader Request" <> Log.buildString' (show req)
validateCreateShardReader req
C.createShardReader sc req

Expand All @@ -77,7 +77,7 @@ deleteShardReaderHandler
-> ServerRequest 'Normal DeleteShardReaderRequest Empty
-> IO (ServerResponse 'Normal Empty)
deleteShardReaderHandler sc@ServerContext{..} (ServerNormalRequest _metadata request@DeleteShardReaderRequest{..}) = defaultExceptionHandle $ do
Log.debug $ "Receive Delete ShardReader Request" <> Log.buildString' (show request)
Log.info $ "Receive Delete ShardReader Request" <> Log.buildString' (show request)
validateNameAndThrow ResShardReader deleteShardReaderRequestReaderId
ServerNode{..} <- lookupResource sc ResShardReader deleteShardReaderRequestReaderId
unless (serverNodeId == serverID) $
Expand All @@ -88,7 +88,7 @@ handleDeleteShardReader
:: ServerContext
-> G.UnaryHandler DeleteShardReaderRequest Empty
handleDeleteShardReader sc@ServerContext{..} _ req@DeleteShardReaderRequest{..} = catchDefaultEx $ do
Log.debug $ "Receive Delete ShardReader Request" <> Log.buildString' (show req)
Log.info $ "Receive Delete ShardReader Request" <> Log.buildString' (show req)
validateNameAndThrow ResShardReader deleteShardReaderRequestReaderId
ServerNode{..} <- lookupResource sc ResShardReader deleteShardReaderRequestReaderId
unless (serverNodeId == serverID) $
Expand Down
2 changes: 1 addition & 1 deletion hstream/src/HStream/Server/Handler/Stats.hs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ getStatsHandler sc (ServerNormalRequest _ (API.GetStatsRequest mstats)) = defaul
handleGetStats
:: ServerContext
-> G.UnaryHandler API.GetStatsRequest API.GetStatsResponse
handleGetStats sc _ (API.GetStatsRequest mstats) = do
handleGetStats sc _ (API.GetStatsRequest mstats) = catchDefaultEx $ do
(failed, suc) <- getStats mstats sc
pure $ API.GetStatsResponse {getStatsResponseStatsValues = V.fromList suc, getStatsResponseErrors = V.fromList failed}

Expand Down

0 comments on commit 5360678

Please sign in to comment.