diff --git a/hstream-kafka/HStream/Kafka/Network.hs b/hstream-kafka/HStream/Kafka/Network.hs index 106407b1b..ddfe1f90e 100644 --- a/hstream-kafka/HStream/Kafka/Network.hs +++ b/hstream-kafka/HStream/Kafka/Network.hs @@ -143,7 +143,7 @@ runHsServer opts sc_ mkPreAuthedHandlers mkAuthedHandlers = doUnaryHandler l reqHeader@RequestHeader{..} rpcHandler' peer = do (req, left) <- runGet' l - when (not . BS.null $ left) $ + unless (BS.null left) $ Log.warning $ "Leftover bytes: " <> Log.buildString' left Log.debug $ "Received request " <> Log.buildString' requestApiKey @@ -237,10 +237,14 @@ runCppServer opts sc_ mkAuthedHandlers = (sc, conn) <- deRefStablePtr sptr let handlers = mkAuthedHandlers sc req <- peek request_ptr + M.incCounter M.totalRequests + (header, reqBs) <- runGet' @RequestHeader req.requestPayload + let ServiceHandler{..} = findHandler handlers header.requestApiKey header.requestApiVersion -- respBs: Nothing means some error occurred, and the server will close -- the connection respBs <- E.catch - (Just <$> handleKafkaMsg conn handlers req.requestPayload) + (Just <$> M.observeWithLabel M.handlerLatencies (Text.pack $ show header.requestApiKey) + (handleKafkaMsg conn rpcHandler header reqBs)) (\err -> do Log.fatal $ Log.buildString' (err :: E.SomeException) pure Nothing) poke response_ptr Cxx.Response{responseData = respBs} @@ -250,19 +254,21 @@ runCppServer opts sc_ mkAuthedHandlers = -- Server misc handleKafkaMsg - :: Cxx.ConnContext -> [ServiceHandler] -> ByteString -> IO ByteString -handleKafkaMsg conn handlers bs = do - (header, reqBs) <- runGet' @RequestHeader bs - let ServiceHandler{..} = findHandler handlers header.requestApiKey header.requestApiVersion + :: Cxx.ConnContext + -> RpcHandler + -> RequestHeader + -> ByteString + -> IO ByteString +handleKafkaMsg conn rpcHandler header bs = do case rpcHandler of UnaryHandler rpcHandler' -> do - (req, left) <- runGet' reqBs + (req, left) <- runGet' bs Log.debug $ "Received request " <> Log.buildString' header.requestApiKey <> ":v" <> Log.build header.requestApiVersion <> " from " <> Log.buildString' conn.peerHost <> ", payload: " <> Log.buildString' req - when (not . BS.null $ left) $ + unless (BS.null left) $ Log.warning $ "Leftover bytes: " <> Log.buildString' left let reqContext = RequestContext