Skip to content

Commit

Permalink
Support all header versions
Browse files Browse the repository at this point in the history
  • Loading branch information
4eUeP committed Sep 14, 2023
1 parent eda3685 commit 2aadd70
Show file tree
Hide file tree
Showing 5 changed files with 344 additions and 91 deletions.
21 changes: 21 additions & 0 deletions hstream-kafka/src/Kafka/Protocol/Encoding.hs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

module Kafka.Protocol.Encoding
( Serializable (..)
, putEither
, getEither
, runGet
, runGet'
, runPut
Expand Down Expand Up @@ -108,6 +110,25 @@ instance (Serializable a) => GSerializable (K1 i a) where
gget = K1 <$> get
gput (K1 x) = put x

-- There is no easy way to support Sum types for Generic instance.
--
-- So here we give a special case for Either
putEither :: (Serializable a, Serializable b) => Either a b -> Builder
putEither (Left x) = put x
putEither (Right x) = put x
{-# INLINE putEither #-}

-- There is no way to support Sum types for Generic instance.
--
-- So here we give a special case for Either
getEither
:: (Serializable a, Serializable b)
=> Bool -- ^ True for Right, False for Left
-> Parser (Either a b)
getEither True = Right <$> get
getEither False = Left <$> get
{-# INLINE getEither #-}

-------------------------------------------------------------------------------

newtype DecodeError = DecodeError String
Expand Down
68 changes: 56 additions & 12 deletions hstream-kafka/src/Kafka/Protocol/Message.hs
Original file line number Diff line number Diff line change
@@ -1,29 +1,73 @@
{-# LANGUAGE DuplicateRecordFields #-}

module Kafka.Protocol.Message
( RequestHeader (..)
, ResponseHeader (..)
, putResponseHeader
, runPutResponseHeaderLazy
, Unsupported (..)

, module Kafka.Protocol.Message.Struct
) where

import qualified Data.ByteString.Lazy as BL
import Data.Int
import Data.Text (Text)
import GHC.Generics

import Kafka.Protocol.Encoding
import Kafka.Protocol.Message.Struct

-- TODO: Support Optional Tagged Fields
data Unsupported = Unsupported
deriving (Show, Eq, Generic)

instance Serializable Unsupported

data RequestHeader = RequestHeader
{ requestApiKey :: !ApiKey
, requestApiVersion :: !Int16
, requestCorrelationId :: !Int32
, requestClientId :: !(Maybe Text)
} deriving (Show, Eq, Generic)
{ requestApiKey :: {-# UNPACK #-} !ApiKey
, requestApiVersion :: {-# UNPACK #-} !Int16
, requestCorrelationId :: {-# UNPACK #-} !Int32
, requestClientId :: !(Either Unsupported NullableString)
, requesteTaggedFields :: !(Either Unsupported TaggedFields)
} deriving (Show, Eq)

instance Serializable RequestHeader where
get = do
requestApiKey <- get
requestApiVersion <- get
requestCorrelationId <- get
let (reqHeaderVer, _) = getHeaderVersion requestApiKey requestApiVersion
case reqHeaderVer of
2 -> do requestClientId <- getEither True
requesteTaggedFields <- getEither True
pure RequestHeader{..}
1 -> do requestClientId <- getEither True
let requesteTaggedFields = Left Unsupported
in pure RequestHeader{..}
0 -> let requestClientId = Left Unsupported
requesteTaggedFields = Left Unsupported
in pure RequestHeader{..}
v -> error $ "Unknown request header version" <> show v
{-# INLINE get #-}

put RequestHeader{..} =
put requestApiKey
<> put requestApiVersion
<> put requestCorrelationId
<> putEither requestClientId
<> putEither requesteTaggedFields
{-# INLINE put #-}

instance Serializable RequestHeader
data ResponseHeader = ResponseHeader
{ responseCorrelationId :: {-# UNPACK #-} !Int32
, responseTaggedFields :: !(Either Unsupported TaggedFields)
} deriving (Show, Eq)

newtype ResponseHeader = ResponseHeader
{ responseCorrelationId :: Int32
} deriving (Show, Eq, Generic)
putResponseHeader :: ResponseHeader -> Builder
putResponseHeader ResponseHeader{..} =
put responseCorrelationId
<> putEither responseTaggedFields
{-# INLINE putResponseHeader #-}

instance Serializable ResponseHeader
runPutResponseHeaderLazy :: ResponseHeader -> BL.ByteString
runPutResponseHeaderLazy = toLazyByteString . putResponseHeader
{-# INLINE runPutResponseHeaderLazy #-}
25 changes: 25 additions & 0 deletions hstream-kafka/src/Kafka/Protocol/Message/Struct.hs
Original file line number Diff line number Diff line change
Expand Up @@ -951,3 +951,28 @@ supportedApiVersions =
, ApiVersionV0 (ApiKey 19) 0 0
, ApiVersionV0 (ApiKey 20) 0 0
]

getHeaderVersion :: ApiKey -> Int16 -> (Int16, Int16)
getHeaderVersion (ApiKey 0) 0 = (1, 0)
getHeaderVersion (ApiKey 0) 1 = (1, 0)
getHeaderVersion (ApiKey 0) 2 = (1, 0)
getHeaderVersion (ApiKey 1) 0 = (1, 0)
getHeaderVersion (ApiKey 2) 0 = (1, 0)
getHeaderVersion (ApiKey 3) 0 = (1, 0)
getHeaderVersion (ApiKey 3) 1 = (1, 0)
getHeaderVersion (ApiKey 8) 0 = (1, 0)
getHeaderVersion (ApiKey 9) 0 = (1, 0)
getHeaderVersion (ApiKey 10) 0 = (1, 0)
getHeaderVersion (ApiKey 11) 0 = (1, 0)
getHeaderVersion (ApiKey 12) 0 = (1, 0)
getHeaderVersion (ApiKey 13) 0 = (1, 0)
getHeaderVersion (ApiKey 14) 0 = (1, 0)
getHeaderVersion (ApiKey 15) 0 = (1, 0)
getHeaderVersion (ApiKey 16) 0 = (1, 0)
getHeaderVersion (ApiKey 18) 0 = (1, 0)
getHeaderVersion (ApiKey 18) 1 = (1, 0)
getHeaderVersion (ApiKey 18) 2 = (1, 0)
getHeaderVersion (ApiKey 19) 0 = (1, 0)
getHeaderVersion (ApiKey 20) 0 = (1, 0)
getHeaderVersion k v = error $ "Unknown " <> show k <> " v" <> show v
{-# INLINE getHeaderVersion #-}
17 changes: 11 additions & 6 deletions hstream-kafka/src/Kafka/Server.hs
Original file line number Diff line number Diff line change
Expand Up @@ -63,24 +63,29 @@ runServer ServerOptions{..} handlers =
runHandler reqBs = do
headerResult <- runParser @RequestHeader get reqBs
case headerResult of
Done l requestHeader -> do
let ServiceHandler{..} = findHandler requestHeader
Done l RequestHeader{..} -> do
let ServiceHandler{..} = findHandler requestApiKey requestApiVersion
case rpcHandler of
UnaryHandler rpcHandler' -> do
req <- runGet l
resp <- rpcHandler' RequestContext req
let respBs = runPutLazy resp
respHeaderBs = runPutLazy $ ResponseHeader (requestCorrelationId requestHeader)
(_, respHeaderVer) = getHeaderVersion requestApiKey requestApiVersion
respHeaderBs =
case respHeaderVer of
0 -> runPutResponseHeaderLazy $ ResponseHeader requestCorrelationId (Left Unsupported)
1 -> runPutResponseHeaderLazy $ ResponseHeader requestCorrelationId (Right EmptyTaggedFields)
_ -> error $ "Unknown response header version" <> show respHeaderVer
let len = BSL.length (respHeaderBs <> respBs)
lenBs = runPutLazy @Int32 (fromIntegral len)
pure $ lenBs <> respHeaderBs <> respBs
Fail _ err -> E.throwIO $ DecodeError $ "Fail, " <> err
More _ -> E.throwIO $ DecodeError $ "More"

findHandler RequestHeader{..} = do
findHandler (ApiKey key) version = do
let m_handler = find (\ServiceHandler{..} ->
rpcMethod == (fromIntegral requestApiKey, requestApiVersion)) handlers
errmsg = "NotImplemented: " <> show requestApiKey <> ":v" <> show requestApiVersion
rpcMethod == (key, version)) handlers
errmsg = "NotImplemented: " <> show key <> ":v" <> show version
fromMaybe (error errmsg) m_handler

-- from the "network-run" package.
Expand Down
Loading

0 comments on commit 2aadd70

Please sign in to comment.