From 99e559465ce9c64fc6bb03b2e6ddd0fbac121f2d Mon Sep 17 00:00:00 2001 From: mu <59917266+4eUeP@users.noreply.github.com> Date: Mon, 18 Sep 2023 16:51:32 +0800 Subject: [PATCH] hstream-kafka: add debugging log for request and response --- hstream-kafka/src/Kafka/Protocol/Encoding.hs | 2 ++ hstream-kafka/src/Kafka/Protocol/Service.hs | 8 +++++--- hstream-kafka/src/Kafka/Server.hs | 3 +++ hstream/src/HStream/Server/KafkaHandler/Produce.hs | 2 -- 4 files changed, 10 insertions(+), 5 deletions(-) diff --git a/hstream-kafka/src/Kafka/Protocol/Encoding.hs b/hstream-kafka/src/Kafka/Protocol/Encoding.hs index 0945feca9..57c7f9717 100644 --- a/hstream-kafka/src/Kafka/Protocol/Encoding.hs +++ b/hstream-kafka/src/Kafka/Protocol/Encoding.hs @@ -203,6 +203,8 @@ newtype CompactKaArray a = CompactKaArray { unCompactKaArray :: Maybe (Vector a) } deriving newtype (Show, Eq, Ord) +newtype RecordBytes = RecordBytes (Maybe ByteString) + newtype RecordKey = RecordKey { unRecordKey :: Maybe ByteString } deriving newtype (Show, Eq, Ord) diff --git a/hstream-kafka/src/Kafka/Protocol/Service.hs b/hstream-kafka/src/Kafka/Protocol/Service.hs index 47b3033a1..4b472cbe4 100644 --- a/hstream-kafka/src/Kafka/Protocol/Service.hs +++ b/hstream-kafka/src/Kafka/Protocol/Service.hs @@ -32,7 +32,9 @@ type UnaryHandler i o = RequestContext -> i -> IO o data RpcHandler where UnaryHandler - :: (Serializable i, Serializable o) => UnaryHandler i o -> RpcHandler + :: ( Serializable i, Serializable o + , Show i, Show o + ) => UnaryHandler i o -> RpcHandler instance Show RpcHandler where show (UnaryHandler _) = "" @@ -54,8 +56,8 @@ data ServiceHandler = ServiceHandler } deriving (Show) hd :: ( HasMethod s m - , Serializable i - , Serializable o + , Serializable i, Serializable o + , Show i, Show o , MethodInput s m ~ i , MethodOutput s m ~ o ) diff --git a/hstream-kafka/src/Kafka/Server.hs b/hstream-kafka/src/Kafka/Server.hs index 35bbeb927..c6382dc87 100644 --- a/hstream-kafka/src/Kafka/Server.hs +++ b/hstream-kafka/src/Kafka/Server.hs @@ -16,6 +16,7 @@ import qualified Network.Socket as N import qualified Network.Socket.ByteString as N import qualified Network.Socket.ByteString.Lazy as NL +import qualified HStream.Logger as Log import Kafka.Protocol.Encoding import Kafka.Protocol.Message import Kafka.Protocol.Service @@ -68,7 +69,9 @@ runServer ServerOptions{..} handlers = case rpcHandler of UnaryHandler rpcHandler' -> do req <- runGet l + Log.debug $ "Received request: " <> Log.buildString' req resp <- rpcHandler' RequestContext req + Log.debug $ "Server response: " <> Log.buildString' resp let respBs = runPutLazy resp (_, respHeaderVer) = getHeaderVersion requestApiKey requestApiVersion respHeaderBs = diff --git a/hstream/src/HStream/Server/KafkaHandler/Produce.hs b/hstream/src/HStream/Server/KafkaHandler/Produce.hs index 36b573bfb..2f6aaaa23 100644 --- a/hstream/src/HStream/Server/KafkaHandler/Produce.hs +++ b/hstream/src/HStream/Server/KafkaHandler/Produce.hs @@ -41,8 +41,6 @@ handleProduceV2 -> K.ProduceRequestV2 -> IO K.ProduceResponseV2 handleProduceV2 ServerContext{..} _ req@K.ProduceRequestV0{..} = do - Log.debug $ Log.buildString $ "received request " <> show req - -- TODO: handle request args: acks, timeoutMs let topicData' = fromMaybe V.empty (K.unKaArray topicData)