Skip to content

Commit

Permalink
hstream-kafka: upgrade max api version of produce to 2
Browse files Browse the repository at this point in the history
  • Loading branch information
4eUeP committed Sep 12, 2023
1 parent 55a53ee commit ebc42b7
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 4 deletions.
79 changes: 76 additions & 3 deletions common/kafka/src/Kafka/Protocol/Message/Struct.hs
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,14 @@ data TopicProduceDataV0 = TopicProduceDataV0
} deriving (Show, Generic)
instance Serializable TopicProduceDataV0

type PartitionProduceDataV1 = PartitionProduceDataV0

type TopicProduceDataV1 = TopicProduceDataV0

type PartitionProduceDataV2 = PartitionProduceDataV0

type TopicProduceDataV2 = TopicProduceDataV0

data PartitionProduceResponseV0 = PartitionProduceResponseV0
{ index :: {-# UNPACK #-} !Int32
-- ^ The partition index.
Expand All @@ -375,6 +383,33 @@ data TopicProduceResponseV0 = TopicProduceResponseV0
} deriving (Show, Generic)
instance Serializable TopicProduceResponseV0

type PartitionProduceResponseV1 = PartitionProduceResponseV0

type TopicProduceResponseV1 = TopicProduceResponseV0

data PartitionProduceResponseV2 = PartitionProduceResponseV2
{ index :: {-# UNPACK #-} !Int32
-- ^ The partition index.
, errorCode :: {-# UNPACK #-} !ErrorCode
-- ^ The error code, or 0 if there was no error.
, baseOffset :: {-# UNPACK #-} !Int64
-- ^ The base offset.
, logAppendTimeMs :: {-# UNPACK #-} !Int64
-- ^ The timestamp returned by broker after appending the messages. If
-- CreateTime is used for the topic, the timestamp will be -1. If
-- LogAppendTime is used for the topic, the timestamp will be the broker
-- local time when the messages are appended.
} deriving (Show, Generic)
instance Serializable PartitionProduceResponseV2

data TopicProduceResponseV2 = TopicProduceResponseV2
{ name :: !Text
-- ^ The topic name
, partitionResponses :: !(KaArray PartitionProduceResponseV2)
-- ^ Each partition that we produced to within the topic.
} deriving (Show, Generic)
instance Serializable TopicProduceResponseV2

data SyncGroupRequestAssignmentV0 = SyncGroupRequestAssignmentV0
{ memberId :: !Text
-- ^ The ID of the member to assign.
Expand Down Expand Up @@ -635,11 +670,33 @@ data ProduceRequestV0 = ProduceRequestV0
} deriving (Show, Generic)
instance Serializable ProduceRequestV0

type ProduceRequestV1 = ProduceRequestV0

type ProduceRequestV2 = ProduceRequestV0

newtype ProduceResponseV0 = ProduceResponseV0
{ responses :: (KaArray TopicProduceResponseV0)
} deriving (Show, Generic)
instance Serializable ProduceResponseV0

data ProduceResponseV1 = ProduceResponseV1
{ responses :: !(KaArray TopicProduceResponseV0)
-- ^ Each produce response
, throttleTimeMs :: {-# UNPACK #-} !Int32
-- ^ The duration in milliseconds for which the request was throttled due
-- to a quota violation, or zero if the request did not violate any quota.
} deriving (Show, Generic)
instance Serializable ProduceResponseV1

data ProduceResponseV2 = ProduceResponseV2
{ responses :: !(KaArray TopicProduceResponseV2)
-- ^ Each produce response
, throttleTimeMs :: {-# UNPACK #-} !Int32
-- ^ The duration in milliseconds for which the request was throttled due
-- to a quota violation, or zero if the request did not violate any quota.
} deriving (Show, Generic)
instance Serializable ProduceResponseV2

data SyncGroupRequestV0 = SyncGroupRequestV0
{ groupId :: !Text
-- ^ The unique group identifier.
Expand Down Expand Up @@ -802,10 +859,18 @@ data HStreamKafkaV1
instance Service HStreamKafkaV1 where
type ServiceName HStreamKafkaV1 = "HStreamKafkaV1"
type ServiceMethods HStreamKafkaV1 =
'[ "metadata"
'[ "produce"
, "metadata"
, "apiVersions"
]

instance HasMethodImpl HStreamKafkaV1 "produce" where
type MethodName HStreamKafkaV1 "produce" = "produce"
type MethodKey HStreamKafkaV1 "produce" = 0
type MethodVersion HStreamKafkaV1 "produce" = 1
type MethodInput HStreamKafkaV1 "produce" = ProduceRequestV1
type MethodOutput HStreamKafkaV1 "produce" = ProduceResponseV1

instance HasMethodImpl HStreamKafkaV1 "metadata" where
type MethodName HStreamKafkaV1 "metadata" = "metadata"
type MethodKey HStreamKafkaV1 "metadata" = 3
Expand All @@ -825,9 +890,17 @@ data HStreamKafkaV2
instance Service HStreamKafkaV2 where
type ServiceName HStreamKafkaV2 = "HStreamKafkaV2"
type ServiceMethods HStreamKafkaV2 =
'[ "apiVersions"
'[ "produce"
, "apiVersions"
]

instance HasMethodImpl HStreamKafkaV2 "produce" where
type MethodName HStreamKafkaV2 "produce" = "produce"
type MethodKey HStreamKafkaV2 "produce" = 0
type MethodVersion HStreamKafkaV2 "produce" = 2
type MethodInput HStreamKafkaV2 "produce" = ProduceRequestV2
type MethodOutput HStreamKafkaV2 "produce" = ProduceResponseV2

instance HasMethodImpl HStreamKafkaV2 "apiVersions" where
type MethodName HStreamKafkaV2 "apiVersions" = "apiVersions"
type MethodKey HStreamKafkaV2 "apiVersions" = 18
Expand Down Expand Up @@ -861,7 +934,7 @@ instance Show ApiKey where

supportedApiVersions :: [ApiVersionV0]
supportedApiVersions =
[ ApiVersionV0 (ApiKey 0) 0 0
[ ApiVersionV0 (ApiKey 0) 0 2
, ApiVersionV0 (ApiKey 1) 0 0
, ApiVersionV0 (ApiKey 2) 0 0
, ApiVersionV0 (ApiKey 3) 0 1
Expand Down
2 changes: 1 addition & 1 deletion script/kafka_gen.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@
API_VERSION_PATCHES = {
"ApiVersions": (0, 2),
"Metadata": (0, 1),
"Produce": (0, 0),
"Produce": (0, 2),
}

# Variables
Expand Down

0 comments on commit ebc42b7

Please sign in to comment.