From 8c991b137554ab3478ca74619a04a6674d0424bb Mon Sep 17 00:00:00 2001 From: mu <59917266+4eUeP@users.noreply.github.com> Date: Thu, 14 Sep 2023 17:23:28 +0800 Subject: [PATCH] Support ApiVersion v3 --- .../src/Kafka/Protocol/Message/Struct.hs | 72 ++++++++++++++++++- hstream/src/HStream/Server/KafkaHandler.hs | 1 + .../src/HStream/Server/KafkaHandler/Basic.hs | 19 +++++ script/kafka_gen.py | 2 +- 4 files changed, 92 insertions(+), 2 deletions(-) diff --git a/hstream-kafka/src/Kafka/Protocol/Message/Struct.hs b/hstream-kafka/src/Kafka/Protocol/Message/Struct.hs index ad6a48c3e..0a844512c 100644 --- a/hstream-kafka/src/Kafka/Protocol/Message/Struct.hs +++ b/hstream-kafka/src/Kafka/Protocol/Message/Struct.hs @@ -32,6 +32,39 @@ type ApiVersionV1 = ApiVersionV0 type ApiVersionV2 = ApiVersionV0 +data ApiVersionV3 = ApiVersionV3 + { apiKey :: {-# UNPACK #-} !ApiKey + -- ^ The API index. + , minVersion :: {-# UNPACK #-} !Int16 + -- ^ The minimum supported version, inclusive. + , maxVersion :: {-# UNPACK #-} !Int16 + -- ^ The maximum supported version, inclusive. + , taggedFields :: !TaggedFields + } deriving (Show, Generic) +instance Serializable ApiVersionV3 + +data SupportedFeatureKeyV3 = SupportedFeatureKeyV3 + { name :: !CompactString + -- ^ The name of the feature. + , minVersion :: {-# UNPACK #-} !Int16 + -- ^ The minimum supported version for the feature. + , maxVersion :: {-# UNPACK #-} !Int16 + -- ^ The maximum supported version for the feature. + , taggedFields :: !TaggedFields + } deriving (Show, Generic) +instance Serializable SupportedFeatureKeyV3 + +data FinalizedFeatureKeyV3 = FinalizedFeatureKeyV3 + { name :: !CompactString + -- ^ The name of the feature. + , maxVersionLevel :: {-# UNPACK #-} !Int16 + -- ^ The cluster-wide finalized max version level for the feature. + , minVersionLevel :: {-# UNPACK #-} !Int16 + -- ^ The cluster-wide finalized min version level for the feature. + , taggedFields :: !TaggedFields + } deriving (Show, Generic) +instance Serializable FinalizedFeatureKeyV3 + data CreatableReplicaAssignmentV0 = CreatableReplicaAssignmentV0 { partitionIndex :: {-# UNPACK #-} !Int32 -- ^ The partition index. @@ -428,6 +461,15 @@ type ApiVersionsRequestV1 = ApiVersionsRequestV0 type ApiVersionsRequestV2 = ApiVersionsRequestV0 +data ApiVersionsRequestV3 = ApiVersionsRequestV3 + { clientSoftwareName :: !CompactString + -- ^ The name of the client. + , clientSoftwareVersion :: !CompactString + -- ^ The version of the client. + , taggedFields :: !TaggedFields + } deriving (Show, Generic) +instance Serializable ApiVersionsRequestV3 + data ApiVersionsResponseV0 = ApiVersionsResponseV0 { errorCode :: {-# UNPACK #-} !ErrorCode -- ^ The top-level error code. @@ -449,6 +491,18 @@ instance Serializable ApiVersionsResponseV1 type ApiVersionsResponseV2 = ApiVersionsResponseV1 +data ApiVersionsResponseV3 = ApiVersionsResponseV3 + { errorCode :: {-# UNPACK #-} !ErrorCode + -- ^ The top-level error code. + , apiKeys :: !(CompactKaArray ApiVersionV3) + -- ^ The APIs supported by the broker. + , throttleTimeMs :: {-# UNPACK #-} !Int32 + -- ^ The duration in milliseconds for which the request was throttled due + -- to a quota violation, or zero if the request did not violate any quota. + , taggedFields :: !TaggedFields + } deriving (Show, Generic) +instance Serializable ApiVersionsResponseV3 + data CreateTopicsRequestV0 = CreateTopicsRequestV0 { topics :: !(KaArray CreatableTopicV0) -- ^ The topics to create. @@ -908,6 +962,21 @@ instance HasMethodImpl HStreamKafkaV2 "apiVersions" where type MethodInput HStreamKafkaV2 "apiVersions" = ApiVersionsRequestV2 type MethodOutput HStreamKafkaV2 "apiVersions" = ApiVersionsResponseV2 +data HStreamKafkaV3 + +instance Service HStreamKafkaV3 where + type ServiceName HStreamKafkaV3 = "HStreamKafkaV3" + type ServiceMethods HStreamKafkaV3 = + '[ "apiVersions" + ] + +instance HasMethodImpl HStreamKafkaV3 "apiVersions" where + type MethodName HStreamKafkaV3 "apiVersions" = "apiVersions" + type MethodKey HStreamKafkaV3 "apiVersions" = 18 + type MethodVersion HStreamKafkaV3 "apiVersions" = 3 + type MethodInput HStreamKafkaV3 "apiVersions" = ApiVersionsRequestV3 + type MethodOutput HStreamKafkaV3 "apiVersions" = ApiVersionsResponseV3 + ------------------------------------------------------------------------------- newtype ApiKey = ApiKey Int16 @@ -947,7 +1016,7 @@ supportedApiVersions = , ApiVersionV0 (ApiKey 14) 0 0 , ApiVersionV0 (ApiKey 15) 0 0 , ApiVersionV0 (ApiKey 16) 0 0 - , ApiVersionV0 (ApiKey 18) 0 2 + , ApiVersionV0 (ApiKey 18) 0 3 , ApiVersionV0 (ApiKey 19) 0 0 , ApiVersionV0 (ApiKey 20) 0 0 ] @@ -972,6 +1041,7 @@ getHeaderVersion (ApiKey 16) 0 = (1, 0) getHeaderVersion (ApiKey 18) 0 = (1, 0) getHeaderVersion (ApiKey 18) 1 = (1, 0) getHeaderVersion (ApiKey 18) 2 = (1, 0) +getHeaderVersion (ApiKey 18) 3 = (2, 0) getHeaderVersion (ApiKey 19) 0 = (1, 0) getHeaderVersion (ApiKey 20) 0 = (1, 0) getHeaderVersion k v = error $ "Unknown " <> show k <> " v" <> show v diff --git a/hstream/src/HStream/Server/KafkaHandler.hs b/hstream/src/HStream/Server/KafkaHandler.hs index 967bab558..065e8ac03 100644 --- a/hstream/src/HStream/Server/KafkaHandler.hs +++ b/hstream/src/HStream/Server/KafkaHandler.hs @@ -14,6 +14,7 @@ handlers sc = [ K.hd (K.RPC :: K.RPC K.HStreamKafkaV0 "apiVersions") handleApiversionsV0 , K.hd (K.RPC :: K.RPC K.HStreamKafkaV1 "apiVersions") handleApiversionsV1 , K.hd (K.RPC :: K.RPC K.HStreamKafkaV2 "apiVersions") handleApiversionsV2 + , K.hd (K.RPC :: K.RPC K.HStreamKafkaV3 "apiVersions") handleApiversionsV3 , K.hd (K.RPC :: K.RPC K.HStreamKafkaV0 "metadata") (handleMetadataV0 sc) , K.hd (K.RPC :: K.RPC K.HStreamKafkaV1 "metadata") (handleMetadataV1 sc) diff --git a/hstream/src/HStream/Server/KafkaHandler/Basic.hs b/hstream/src/HStream/Server/KafkaHandler/Basic.hs index 73cb25c65..4600ae0b1 100644 --- a/hstream/src/HStream/Server/KafkaHandler/Basic.hs +++ b/hstream/src/HStream/Server/KafkaHandler/Basic.hs @@ -3,6 +3,7 @@ module HStream.Server.KafkaHandler.Basic handleApiversionsV0 , handleApiversionsV1 , handleApiversionsV2 + , handleApiversionsV3 -- 3: Metadata , handleMetadataV0 , handleMetadataV1 @@ -25,6 +26,7 @@ import HStream.Server.Types (ServerContext (..), transToStreamName) import qualified HStream.Store as S import qualified HStream.Utils as Utils +import qualified Kafka.Protocol.Encoding as K import qualified Kafka.Protocol.Error as K import qualified Kafka.Protocol.Message as K import qualified Kafka.Protocol.Service as K @@ -48,6 +50,17 @@ handleApiversionsV2 :: K.RequestContext -> K.ApiVersionsRequestV2 -> IO K.ApiVersionsResponseV2 handleApiversionsV2 = handleApiversionsV1 +handleApiversionsV3 + :: K.RequestContext -> K.ApiVersionsRequestV3 -> IO K.ApiVersionsResponseV3 +handleApiversionsV3 _ req = do + let apiKeys = K.CompactKaArray + . Just + . (V.map apiVersionV0ToV3) + . V.fromList + $ K.supportedApiVersions + pure $ K.ApiVersionsResponseV3 K.NONE apiKeys 0{- throttle_time_ms -} + K.EmptyTaggedFields + -------------------- -- 3: Metadata -------------------- @@ -143,3 +156,9 @@ handleMetadataV1 ctx@ServerContext{..} _ req = do } return $ K.MetadataResponseTopicV1 K.NONE topicName False (Just respPartitions) + +------------------------------------------------------------------------------- + +apiVersionV0ToV3 :: K.ApiVersionV0 -> K.ApiVersionV3 +apiVersionV0ToV3 K.ApiVersionV0{..} = + let taggedFields = K.EmptyTaggedFields in K.ApiVersionV3{..} diff --git a/script/kafka_gen.py b/script/kafka_gen.py index 6caad6838..507b9b6fe 100755 --- a/script/kafka_gen.py +++ b/script/kafka_gen.py @@ -72,7 +72,7 @@ GLOBAL_API_VERSION_PATCH = (0, 0) API_VERSION_PATCHES = { - "ApiVersions": (0, 2), + "ApiVersions": (0, 3), "Metadata": (0, 1), "Produce": (0, 2), }