Skip to content

Commit

Permalink
kafka: add handler-level metrics for cpp server
Browse files Browse the repository at this point in the history
  • Loading branch information
YangKian committed May 10, 2024
1 parent 35d0792 commit 05bc9df
Showing 1 changed file with 14 additions and 8 deletions.
22 changes: 14 additions & 8 deletions hstream-kafka/HStream/Kafka/Network.hs
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ runHsServer opts sc_ mkPreAuthedHandlers mkAuthedHandlers =

doUnaryHandler l reqHeader@RequestHeader{..} rpcHandler' peer = do
(req, left) <- runGet' l
when (not . BS.null $ left) $
unless (BS.null left) $
Log.warning $ "Leftover bytes: " <> Log.buildString' left
Log.debug $ "Received request "
<> Log.buildString' requestApiKey
Expand Down Expand Up @@ -237,10 +237,14 @@ runCppServer opts sc_ mkAuthedHandlers =
(sc, conn) <- deRefStablePtr sptr
let handlers = mkAuthedHandlers sc
req <- peek request_ptr
M.incCounter M.totalRequests
(header, reqBs) <- runGet' @RequestHeader req.requestPayload
let ServiceHandler{..} = findHandler handlers header.requestApiKey header.requestApiVersion
-- respBs: Nothing means some error occurred, and the server will close
-- the connection
respBs <- E.catch
(Just <$> handleKafkaMsg conn handlers req.requestPayload)
(Just <$> M.observeWithLabel M.handlerLatencies (Text.pack $ show header.requestApiKey)
(handleKafkaMsg conn rpcHandler header reqBs))
(\err -> do Log.fatal $ Log.buildString' (err :: E.SomeException)
pure Nothing)
poke response_ptr Cxx.Response{responseData = respBs}
Expand All @@ -250,19 +254,21 @@ runCppServer opts sc_ mkAuthedHandlers =
-- Server misc

handleKafkaMsg
:: Cxx.ConnContext -> [ServiceHandler] -> ByteString -> IO ByteString
handleKafkaMsg conn handlers bs = do
(header, reqBs) <- runGet' @RequestHeader bs
let ServiceHandler{..} = findHandler handlers header.requestApiKey header.requestApiVersion
:: Cxx.ConnContext
-> RpcHandler
-> RequestHeader
-> ByteString
-> IO ByteString
handleKafkaMsg conn rpcHandler header bs = do
case rpcHandler of
UnaryHandler rpcHandler' -> do
(req, left) <- runGet' reqBs
(req, left) <- runGet' bs
Log.debug $ "Received request "
<> Log.buildString' header.requestApiKey
<> ":v" <> Log.build header.requestApiVersion
<> " from " <> Log.buildString' conn.peerHost
<> ", payload: " <> Log.buildString' req
when (not . BS.null $ left) $
unless (BS.null left) $
Log.warning $ "Leftover bytes: " <> Log.buildString' left
let reqContext =
RequestContext
Expand Down

0 comments on commit 05bc9df

Please sign in to comment.