Skip to content

Commit

Permalink
Support ApiVersion v3
Browse files Browse the repository at this point in the history
  • Loading branch information
4eUeP committed Sep 14, 2023
1 parent 2aadd70 commit 8c991b1
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 2 deletions.
72 changes: 71 additions & 1 deletion hstream-kafka/src/Kafka/Protocol/Message/Struct.hs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,39 @@ type ApiVersionV1 = ApiVersionV0

type ApiVersionV2 = ApiVersionV0

data ApiVersionV3 = ApiVersionV3
{ apiKey :: {-# UNPACK #-} !ApiKey
-- ^ The API index.
, minVersion :: {-# UNPACK #-} !Int16
-- ^ The minimum supported version, inclusive.
, maxVersion :: {-# UNPACK #-} !Int16
-- ^ The maximum supported version, inclusive.
, taggedFields :: !TaggedFields
} deriving (Show, Generic)
instance Serializable ApiVersionV3

data SupportedFeatureKeyV3 = SupportedFeatureKeyV3
{ name :: !CompactString
-- ^ The name of the feature.
, minVersion :: {-# UNPACK #-} !Int16
-- ^ The minimum supported version for the feature.
, maxVersion :: {-# UNPACK #-} !Int16
-- ^ The maximum supported version for the feature.
, taggedFields :: !TaggedFields
} deriving (Show, Generic)
instance Serializable SupportedFeatureKeyV3

data FinalizedFeatureKeyV3 = FinalizedFeatureKeyV3
{ name :: !CompactString
-- ^ The name of the feature.
, maxVersionLevel :: {-# UNPACK #-} !Int16
-- ^ The cluster-wide finalized max version level for the feature.
, minVersionLevel :: {-# UNPACK #-} !Int16
-- ^ The cluster-wide finalized min version level for the feature.
, taggedFields :: !TaggedFields
} deriving (Show, Generic)
instance Serializable FinalizedFeatureKeyV3

data CreatableReplicaAssignmentV0 = CreatableReplicaAssignmentV0
{ partitionIndex :: {-# UNPACK #-} !Int32
-- ^ The partition index.
Expand Down Expand Up @@ -428,6 +461,15 @@ type ApiVersionsRequestV1 = ApiVersionsRequestV0

type ApiVersionsRequestV2 = ApiVersionsRequestV0

data ApiVersionsRequestV3 = ApiVersionsRequestV3
{ clientSoftwareName :: !CompactString
-- ^ The name of the client.
, clientSoftwareVersion :: !CompactString
-- ^ The version of the client.
, taggedFields :: !TaggedFields
} deriving (Show, Generic)
instance Serializable ApiVersionsRequestV3

data ApiVersionsResponseV0 = ApiVersionsResponseV0
{ errorCode :: {-# UNPACK #-} !ErrorCode
-- ^ The top-level error code.
Expand All @@ -449,6 +491,18 @@ instance Serializable ApiVersionsResponseV1

type ApiVersionsResponseV2 = ApiVersionsResponseV1

data ApiVersionsResponseV3 = ApiVersionsResponseV3
{ errorCode :: {-# UNPACK #-} !ErrorCode
-- ^ The top-level error code.
, apiKeys :: !(CompactKaArray ApiVersionV3)
-- ^ The APIs supported by the broker.
, throttleTimeMs :: {-# UNPACK #-} !Int32
-- ^ The duration in milliseconds for which the request was throttled due
-- to a quota violation, or zero if the request did not violate any quota.
, taggedFields :: !TaggedFields
} deriving (Show, Generic)
instance Serializable ApiVersionsResponseV3

data CreateTopicsRequestV0 = CreateTopicsRequestV0
{ topics :: !(KaArray CreatableTopicV0)
-- ^ The topics to create.
Expand Down Expand Up @@ -908,6 +962,21 @@ instance HasMethodImpl HStreamKafkaV2 "apiVersions" where
type MethodInput HStreamKafkaV2 "apiVersions" = ApiVersionsRequestV2
type MethodOutput HStreamKafkaV2 "apiVersions" = ApiVersionsResponseV2

data HStreamKafkaV3

instance Service HStreamKafkaV3 where
type ServiceName HStreamKafkaV3 = "HStreamKafkaV3"
type ServiceMethods HStreamKafkaV3 =
'[ "apiVersions"
]

instance HasMethodImpl HStreamKafkaV3 "apiVersions" where
type MethodName HStreamKafkaV3 "apiVersions" = "apiVersions"
type MethodKey HStreamKafkaV3 "apiVersions" = 18
type MethodVersion HStreamKafkaV3 "apiVersions" = 3
type MethodInput HStreamKafkaV3 "apiVersions" = ApiVersionsRequestV3
type MethodOutput HStreamKafkaV3 "apiVersions" = ApiVersionsResponseV3

-------------------------------------------------------------------------------

newtype ApiKey = ApiKey Int16
Expand Down Expand Up @@ -947,7 +1016,7 @@ supportedApiVersions =
, ApiVersionV0 (ApiKey 14) 0 0
, ApiVersionV0 (ApiKey 15) 0 0
, ApiVersionV0 (ApiKey 16) 0 0
, ApiVersionV0 (ApiKey 18) 0 2
, ApiVersionV0 (ApiKey 18) 0 3
, ApiVersionV0 (ApiKey 19) 0 0
, ApiVersionV0 (ApiKey 20) 0 0
]
Expand All @@ -972,6 +1041,7 @@ getHeaderVersion (ApiKey 16) 0 = (1, 0)
getHeaderVersion (ApiKey 18) 0 = (1, 0)
getHeaderVersion (ApiKey 18) 1 = (1, 0)
getHeaderVersion (ApiKey 18) 2 = (1, 0)
getHeaderVersion (ApiKey 18) 3 = (2, 0)
getHeaderVersion (ApiKey 19) 0 = (1, 0)
getHeaderVersion (ApiKey 20) 0 = (1, 0)
getHeaderVersion k v = error $ "Unknown " <> show k <> " v" <> show v
Expand Down
1 change: 1 addition & 0 deletions hstream/src/HStream/Server/KafkaHandler.hs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ handlers sc =
[ K.hd (K.RPC :: K.RPC K.HStreamKafkaV0 "apiVersions") handleApiversionsV0
, K.hd (K.RPC :: K.RPC K.HStreamKafkaV1 "apiVersions") handleApiversionsV1
, K.hd (K.RPC :: K.RPC K.HStreamKafkaV2 "apiVersions") handleApiversionsV2
, K.hd (K.RPC :: K.RPC K.HStreamKafkaV3 "apiVersions") handleApiversionsV3

, K.hd (K.RPC :: K.RPC K.HStreamKafkaV0 "metadata") (handleMetadataV0 sc)
, K.hd (K.RPC :: K.RPC K.HStreamKafkaV1 "metadata") (handleMetadataV1 sc)
Expand Down
19 changes: 19 additions & 0 deletions hstream/src/HStream/Server/KafkaHandler/Basic.hs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module HStream.Server.KafkaHandler.Basic
handleApiversionsV0
, handleApiversionsV1
, handleApiversionsV2
, handleApiversionsV3
-- 3: Metadata
, handleMetadataV0
, handleMetadataV1
Expand All @@ -25,6 +26,7 @@ import HStream.Server.Types (ServerContext (..),
transToStreamName)
import qualified HStream.Store as S
import qualified HStream.Utils as Utils
import qualified Kafka.Protocol.Encoding as K
import qualified Kafka.Protocol.Error as K
import qualified Kafka.Protocol.Message as K
import qualified Kafka.Protocol.Service as K
Expand All @@ -48,6 +50,17 @@ handleApiversionsV2
:: K.RequestContext -> K.ApiVersionsRequestV2 -> IO K.ApiVersionsResponseV2
handleApiversionsV2 = handleApiversionsV1

handleApiversionsV3
:: K.RequestContext -> K.ApiVersionsRequestV3 -> IO K.ApiVersionsResponseV3
handleApiversionsV3 _ req = do
let apiKeys = K.CompactKaArray
. Just
. (V.map apiVersionV0ToV3)
. V.fromList
$ K.supportedApiVersions
pure $ K.ApiVersionsResponseV3 K.NONE apiKeys 0{- throttle_time_ms -}
K.EmptyTaggedFields

--------------------
-- 3: Metadata
--------------------
Expand Down Expand Up @@ -143,3 +156,9 @@ handleMetadataV1 ctx@ServerContext{..} _ req = do
}
return $
K.MetadataResponseTopicV1 K.NONE topicName False (Just respPartitions)

-------------------------------------------------------------------------------

apiVersionV0ToV3 :: K.ApiVersionV0 -> K.ApiVersionV3
apiVersionV0ToV3 K.ApiVersionV0{..} =
let taggedFields = K.EmptyTaggedFields in K.ApiVersionV3{..}
2 changes: 1 addition & 1 deletion script/kafka_gen.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@

GLOBAL_API_VERSION_PATCH = (0, 0)
API_VERSION_PATCHES = {
"ApiVersions": (0, 2),
"ApiVersions": (0, 3),
"Metadata": (0, 1),
"Produce": (0, 2),
}
Expand Down

0 comments on commit 8c991b1

Please sign in to comment.