From ebc42b7a0f0d837804f76c2e9bfeeef93508622f Mon Sep 17 00:00:00 2001 From: mu <59917266+4eUeP@users.noreply.github.com> Date: Mon, 4 Sep 2023 11:04:25 +0800 Subject: [PATCH] hstream-kafka: upgrade max api version of produce to 2 --- .../src/Kafka/Protocol/Message/Struct.hs | 79 ++++++++++++++++++- script/kafka_gen.py | 2 +- 2 files changed, 77 insertions(+), 4 deletions(-) diff --git a/common/kafka/src/Kafka/Protocol/Message/Struct.hs b/common/kafka/src/Kafka/Protocol/Message/Struct.hs index 4918c496c..f2db3dab3 100644 --- a/common/kafka/src/Kafka/Protocol/Message/Struct.hs +++ b/common/kafka/src/Kafka/Protocol/Message/Struct.hs @@ -357,6 +357,14 @@ data TopicProduceDataV0 = TopicProduceDataV0 } deriving (Show, Generic) instance Serializable TopicProduceDataV0 +type PartitionProduceDataV1 = PartitionProduceDataV0 + +type TopicProduceDataV1 = TopicProduceDataV0 + +type PartitionProduceDataV2 = PartitionProduceDataV0 + +type TopicProduceDataV2 = TopicProduceDataV0 + data PartitionProduceResponseV0 = PartitionProduceResponseV0 { index :: {-# UNPACK #-} !Int32 -- ^ The partition index. @@ -375,6 +383,33 @@ data TopicProduceResponseV0 = TopicProduceResponseV0 } deriving (Show, Generic) instance Serializable TopicProduceResponseV0 +type PartitionProduceResponseV1 = PartitionProduceResponseV0 + +type TopicProduceResponseV1 = TopicProduceResponseV0 + +data PartitionProduceResponseV2 = PartitionProduceResponseV2 + { index :: {-# UNPACK #-} !Int32 + -- ^ The partition index. + , errorCode :: {-# UNPACK #-} !ErrorCode + -- ^ The error code, or 0 if there was no error. + , baseOffset :: {-# UNPACK #-} !Int64 + -- ^ The base offset. + , logAppendTimeMs :: {-# UNPACK #-} !Int64 + -- ^ The timestamp returned by broker after appending the messages. If + -- CreateTime is used for the topic, the timestamp will be -1. If + -- LogAppendTime is used for the topic, the timestamp will be the broker + -- local time when the messages are appended. + } deriving (Show, Generic) +instance Serializable PartitionProduceResponseV2 + +data TopicProduceResponseV2 = TopicProduceResponseV2 + { name :: !Text + -- ^ The topic name + , partitionResponses :: !(KaArray PartitionProduceResponseV2) + -- ^ Each partition that we produced to within the topic. + } deriving (Show, Generic) +instance Serializable TopicProduceResponseV2 + data SyncGroupRequestAssignmentV0 = SyncGroupRequestAssignmentV0 { memberId :: !Text -- ^ The ID of the member to assign. @@ -635,11 +670,33 @@ data ProduceRequestV0 = ProduceRequestV0 } deriving (Show, Generic) instance Serializable ProduceRequestV0 +type ProduceRequestV1 = ProduceRequestV0 + +type ProduceRequestV2 = ProduceRequestV0 + newtype ProduceResponseV0 = ProduceResponseV0 { responses :: (KaArray TopicProduceResponseV0) } deriving (Show, Generic) instance Serializable ProduceResponseV0 +data ProduceResponseV1 = ProduceResponseV1 + { responses :: !(KaArray TopicProduceResponseV0) + -- ^ Each produce response + , throttleTimeMs :: {-# UNPACK #-} !Int32 + -- ^ The duration in milliseconds for which the request was throttled due + -- to a quota violation, or zero if the request did not violate any quota. + } deriving (Show, Generic) +instance Serializable ProduceResponseV1 + +data ProduceResponseV2 = ProduceResponseV2 + { responses :: !(KaArray TopicProduceResponseV2) + -- ^ Each produce response + , throttleTimeMs :: {-# UNPACK #-} !Int32 + -- ^ The duration in milliseconds for which the request was throttled due + -- to a quota violation, or zero if the request did not violate any quota. + } deriving (Show, Generic) +instance Serializable ProduceResponseV2 + data SyncGroupRequestV0 = SyncGroupRequestV0 { groupId :: !Text -- ^ The unique group identifier. @@ -802,10 +859,18 @@ data HStreamKafkaV1 instance Service HStreamKafkaV1 where type ServiceName HStreamKafkaV1 = "HStreamKafkaV1" type ServiceMethods HStreamKafkaV1 = - '[ "metadata" + '[ "produce" + , "metadata" , "apiVersions" ] +instance HasMethodImpl HStreamKafkaV1 "produce" where + type MethodName HStreamKafkaV1 "produce" = "produce" + type MethodKey HStreamKafkaV1 "produce" = 0 + type MethodVersion HStreamKafkaV1 "produce" = 1 + type MethodInput HStreamKafkaV1 "produce" = ProduceRequestV1 + type MethodOutput HStreamKafkaV1 "produce" = ProduceResponseV1 + instance HasMethodImpl HStreamKafkaV1 "metadata" where type MethodName HStreamKafkaV1 "metadata" = "metadata" type MethodKey HStreamKafkaV1 "metadata" = 3 @@ -825,9 +890,17 @@ data HStreamKafkaV2 instance Service HStreamKafkaV2 where type ServiceName HStreamKafkaV2 = "HStreamKafkaV2" type ServiceMethods HStreamKafkaV2 = - '[ "apiVersions" + '[ "produce" + , "apiVersions" ] +instance HasMethodImpl HStreamKafkaV2 "produce" where + type MethodName HStreamKafkaV2 "produce" = "produce" + type MethodKey HStreamKafkaV2 "produce" = 0 + type MethodVersion HStreamKafkaV2 "produce" = 2 + type MethodInput HStreamKafkaV2 "produce" = ProduceRequestV2 + type MethodOutput HStreamKafkaV2 "produce" = ProduceResponseV2 + instance HasMethodImpl HStreamKafkaV2 "apiVersions" where type MethodName HStreamKafkaV2 "apiVersions" = "apiVersions" type MethodKey HStreamKafkaV2 "apiVersions" = 18 @@ -861,7 +934,7 @@ instance Show ApiKey where supportedApiVersions :: [ApiVersionV0] supportedApiVersions = - [ ApiVersionV0 (ApiKey 0) 0 0 + [ ApiVersionV0 (ApiKey 0) 0 2 , ApiVersionV0 (ApiKey 1) 0 0 , ApiVersionV0 (ApiKey 2) 0 0 , ApiVersionV0 (ApiKey 3) 0 1 diff --git a/script/kafka_gen.py b/script/kafka_gen.py index ac4aada6d..8f7a572e1 100755 --- a/script/kafka_gen.py +++ b/script/kafka_gen.py @@ -64,7 +64,7 @@ API_VERSION_PATCHES = { "ApiVersions": (0, 2), "Metadata": (0, 1), - "Produce": (0, 0), + "Produce": (0, 2), } # Variables