From c35d1f9d1b7265a4507c005b72482db8e075bb4f Mon Sep 17 00:00:00 2001 From: Alissa Tung Date: Thu, 28 Sep 2023 14:57:14 +0800 Subject: [PATCH] hstream-kafka-client: init --- .../client/HStream/Kafka/Client/Network.hs | 2 +- .../protocol/Kafka/Protocol/Message.hs | 18 ++++++++++++------ 2 files changed, 13 insertions(+), 7 deletions(-) diff --git a/hstream-kafka/client/HStream/Kafka/Client/Network.hs b/hstream-kafka/client/HStream/Kafka/Client/Network.hs index e235f2090..cc96b92cb 100644 --- a/hstream-kafka/client/HStream/Kafka/Client/Network.hs +++ b/hstream-kafka/client/HStream/Kafka/Client/Network.hs @@ -51,7 +51,7 @@ recvByteString sock = do recvResp :: forall a. K.Serializable a => NW.Socket -> IO a recvResp sock = do bs <- recvByteString sock - ret <- K.runParser @K.ResponseHeader K.get bs + ret <- K.runParser K.getResponseHeaderV0 bs (l, header) <- case ret of K.Done l header -> pure (l, header) K.More k -> undefined diff --git a/hstream-kafka/protocol/Kafka/Protocol/Message.hs b/hstream-kafka/protocol/Kafka/Protocol/Message.hs index 15ac7ac1f..323764053 100644 --- a/hstream-kafka/protocol/Kafka/Protocol/Message.hs +++ b/hstream-kafka/protocol/Kafka/Protocol/Message.hs @@ -3,6 +3,8 @@ module Kafka.Protocol.Message ( RequestHeader (..) , ResponseHeader (..) + , getResponseHeaderV0 + , getResponseHeaderV1 , putResponseHeader , runPutResponseHeaderLazy , Unsupported (..) @@ -62,13 +64,17 @@ data ResponseHeader = ResponseHeader , responseTaggedFields :: !(Either Unsupported TaggedFields) } deriving (Show, Eq) -instance Serializable ResponseHeader where - get = do - responseCorrelationId <- get - responseTaggedFields <- pure $ Left Unsupported -- FIXME - pure ResponseHeader{..} +getResponseHeaderV0 :: Parser ResponseHeader +getResponseHeaderV0 = do + responseCorrelationId <- get + responseTaggedFields <- pure $ Left Unsupported + pure ResponseHeader{..} - put = putResponseHeader +getResponseHeaderV1 :: Parser ResponseHeader +getResponseHeaderV1 = do + responseCorrelationId <- get + responseTaggedFields <- getEither True + pure ResponseHeader{..} putResponseHeader :: ResponseHeader -> Builder putResponseHeader ResponseHeader{..} =