diff --git a/hstream-kafka/HStream/Kafka/Server/Handler/Produce.hs b/hstream-kafka/HStream/Kafka/Server/Handler/Produce.hs index aa43bb024..c10944f5b 100644 --- a/hstream-kafka/HStream/Kafka/Server/Handler/Produce.hs +++ b/hstream-kafka/HStream/Kafka/Server/Handler/Produce.hs @@ -38,17 +38,17 @@ handleProduceV2 -> K.RequestContext -> K.ProduceRequestV2 -> IO K.ProduceResponseV2 -handleProduceV2 ServerContext{..} _ K.ProduceRequestV0{..} = do +handleProduceV2 ServerContext{..} _ K.ProduceRequestV2{..} = do -- TODO: handle request args: acks, timeoutMs let topicData' = fromMaybe V.empty (K.unKaArray topicData) - responses <- V.forM topicData' $ \K.TopicProduceDataV0{..} -> do + responses <- V.forM topicData' $ \K.TopicProduceDataV2{..} -> do -- A topic is a stream. Here we donot need to check the topic existence, -- because the metadata api does(?) let topic = S.transToTopicStreamName name partitions <- S.listStreamPartitionsOrdered scLDClient topic let partitionData' = fromMaybe V.empty (K.unKaArray partitionData) - partitionResponses <- V.forM partitionData' $ \K.PartitionProduceDataV0{..} -> do + partitionResponses <- V.forM partitionData' $ \K.PartitionProduceDataV2{..} -> do let Just (_, logid) = partitions V.!? (fromIntegral index) -- TODO: handle Nothing let Just recordBytes' = recordBytes -- TODO: handle Nothing diff --git a/hstream-kafka/protocol/Kafka/Network.hs b/hstream-kafka/protocol/Kafka/Network.hs index fbfe6376d..3f9d742a7 100644 --- a/hstream-kafka/protocol/Kafka/Network.hs +++ b/hstream-kafka/protocol/Kafka/Network.hs @@ -74,7 +74,10 @@ runServer opts handlers = case rpcHandler of UnaryHandler rpcHandler' -> do req <- runGet l - Log.debug $ "Received request: " <> Log.buildString' req + Log.debug $ "Received request " + <> Log.buildString' requestApiKey + <> ":v" <> Log.build requestApiVersion + <> ", payload: " <> Log.buildString' req resp <- rpcHandler' RequestContext req Log.debug $ "Server response: " <> Log.buildString' resp let respBs = runPutLazy resp diff --git a/hstream-kafka/protocol/Kafka/Protocol/Message/Struct.hs b/hstream-kafka/protocol/Kafka/Protocol/Message/Struct.hs index 7d5c506f1..2b7051b43 100644 --- a/hstream-kafka/protocol/Kafka/Protocol/Message/Struct.hs +++ b/hstream-kafka/protocol/Kafka/Protocol/Message/Struct.hs @@ -1,10 +1,14 @@ +------------------------------------------------------------------------------- +-- Autogenerated by kafka message json schema +-- +-- $ ./script/kafka_gen.py run > hstream-kafka/protocol/Kafka/Protocol/Message/Struct.hs +-- +-- DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING + {-# LANGUAGE DataKinds #-} {-# LANGUAGE DuplicateRecordFields #-} {-# LANGUAGE TypeFamilies #-} -------------------------------------------------------------------------------- --- TODO: Generate by kafka message json schema - module Kafka.Protocol.Message.Struct where import Data.ByteString (ByteString) @@ -374,51 +378,21 @@ data OffsetFetchResponseTopicV0 = OffsetFetchResponseTopicV0 } deriving (Show, Eq, Generic) instance Serializable OffsetFetchResponseTopicV0 -data PartitionProduceDataV0 = PartitionProduceDataV0 +data PartitionProduceDataV2 = PartitionProduceDataV2 { index :: {-# UNPACK #-} !Int32 -- ^ The partition index. , recordBytes :: !NullableBytes -- ^ The record data to be produced. } deriving (Show, Eq, Generic) -instance Serializable PartitionProduceDataV0 +instance Serializable PartitionProduceDataV2 -data TopicProduceDataV0 = TopicProduceDataV0 +data TopicProduceDataV2 = TopicProduceDataV2 { name :: !Text -- ^ The topic name. - , partitionData :: !(KaArray PartitionProduceDataV0) + , partitionData :: !(KaArray PartitionProduceDataV2) -- ^ Each partition to produce to. } deriving (Show, Eq, Generic) -instance Serializable TopicProduceDataV0 - -type PartitionProduceDataV1 = PartitionProduceDataV0 - -type TopicProduceDataV1 = TopicProduceDataV0 - -type PartitionProduceDataV2 = PartitionProduceDataV0 - -type TopicProduceDataV2 = TopicProduceDataV0 - -data PartitionProduceResponseV0 = PartitionProduceResponseV0 - { index :: {-# UNPACK #-} !Int32 - -- ^ The partition index. - , errorCode :: {-# UNPACK #-} !ErrorCode - -- ^ The error code, or 0 if there was no error. - , baseOffset :: {-# UNPACK #-} !Int64 - -- ^ The base offset. - } deriving (Show, Eq, Generic) -instance Serializable PartitionProduceResponseV0 - -data TopicProduceResponseV0 = TopicProduceResponseV0 - { name :: !Text - -- ^ The topic name - , partitionResponses :: !(KaArray PartitionProduceResponseV0) - -- ^ Each partition that we produced to within the topic. - } deriving (Show, Eq, Generic) -instance Serializable TopicProduceResponseV0 - -type PartitionProduceResponseV1 = PartitionProduceResponseV0 - -type TopicProduceResponseV1 = TopicProduceResponseV0 +instance Serializable TopicProduceDataV2 data PartitionProduceResponseV2 = PartitionProduceResponseV2 { index :: {-# UNPACK #-} !Int32 @@ -712,35 +686,17 @@ newtype OffsetFetchResponseV0 = OffsetFetchResponseV0 } deriving (Show, Eq, Generic) instance Serializable OffsetFetchResponseV0 -data ProduceRequestV0 = ProduceRequestV0 +data ProduceRequestV2 = ProduceRequestV2 { acks :: {-# UNPACK #-} !Int16 -- ^ The number of acknowledgments the producer requires the leader to have -- received before considering a request complete. Allowed values: 0 for no -- acknowledgments, 1 for only the leader and -1 for the full ISR. , timeoutMs :: {-# UNPACK #-} !Int32 -- ^ The timeout to await a response in milliseconds. - , topicData :: !(KaArray TopicProduceDataV0) + , topicData :: !(KaArray TopicProduceDataV2) -- ^ Each topic to produce to. } deriving (Show, Eq, Generic) -instance Serializable ProduceRequestV0 - -type ProduceRequestV1 = ProduceRequestV0 - -type ProduceRequestV2 = ProduceRequestV0 - -newtype ProduceResponseV0 = ProduceResponseV0 - { responses :: (KaArray TopicProduceResponseV0) - } deriving (Show, Eq, Generic) -instance Serializable ProduceResponseV0 - -data ProduceResponseV1 = ProduceResponseV1 - { responses :: !(KaArray TopicProduceResponseV0) - -- ^ Each produce response - , throttleTimeMs :: {-# UNPACK #-} !Int32 - -- ^ The duration in milliseconds for which the request was throttled due - -- to a quota violation, or zero if the request did not violate any quota. - } deriving (Show, Eq, Generic) -instance Serializable ProduceResponseV1 +instance Serializable ProduceRequestV2 data ProduceResponseV2 = ProduceResponseV2 { responses :: !(KaArray TopicProduceResponseV2) @@ -778,8 +734,7 @@ data HStreamKafkaV0 instance Service HStreamKafkaV0 where type ServiceName HStreamKafkaV0 = "HStreamKafkaV0" type ServiceMethods HStreamKafkaV0 = - '[ "produce" - , "fetch" + '[ "fetch" , "listOffsets" , "metadata" , "offsetCommit" @@ -796,13 +751,6 @@ instance Service HStreamKafkaV0 where , "deleteTopics" ] -instance HasMethodImpl HStreamKafkaV0 "produce" where - type MethodName HStreamKafkaV0 "produce" = "produce" - type MethodKey HStreamKafkaV0 "produce" = 0 - type MethodVersion HStreamKafkaV0 "produce" = 0 - type MethodInput HStreamKafkaV0 "produce" = ProduceRequestV0 - type MethodOutput HStreamKafkaV0 "produce" = ProduceResponseV0 - instance HasMethodImpl HStreamKafkaV0 "fetch" where type MethodName HStreamKafkaV0 "fetch" = "fetch" type MethodKey HStreamKafkaV0 "fetch" = 1 @@ -913,18 +861,10 @@ data HStreamKafkaV1 instance Service HStreamKafkaV1 where type ServiceName HStreamKafkaV1 = "HStreamKafkaV1" type ServiceMethods HStreamKafkaV1 = - '[ "produce" - , "metadata" + '[ "metadata" , "apiVersions" ] -instance HasMethodImpl HStreamKafkaV1 "produce" where - type MethodName HStreamKafkaV1 "produce" = "produce" - type MethodKey HStreamKafkaV1 "produce" = 0 - type MethodVersion HStreamKafkaV1 "produce" = 1 - type MethodInput HStreamKafkaV1 "produce" = ProduceRequestV1 - type MethodOutput HStreamKafkaV1 "produce" = ProduceResponseV1 - instance HasMethodImpl HStreamKafkaV1 "metadata" where type MethodName HStreamKafkaV1 "metadata" = "metadata" type MethodKey HStreamKafkaV1 "metadata" = 3 @@ -1003,7 +943,7 @@ instance Show ApiKey where supportedApiVersions :: [ApiVersionV0] supportedApiVersions = - [ ApiVersionV0 (ApiKey 0) 0 2 + [ ApiVersionV0 (ApiKey 0) 2 2 , ApiVersionV0 (ApiKey 1) 0 0 , ApiVersionV0 (ApiKey 2) 0 0 , ApiVersionV0 (ApiKey 3) 0 1 @@ -1022,8 +962,6 @@ supportedApiVersions = ] getHeaderVersion :: ApiKey -> Int16 -> (Int16, Int16) -getHeaderVersion (ApiKey 0) 0 = (1, 0) -getHeaderVersion (ApiKey 0) 1 = (1, 0) getHeaderVersion (ApiKey 0) 2 = (1, 0) getHeaderVersion (ApiKey 1) 0 = (1, 0) getHeaderVersion (ApiKey 2) 0 = (1, 0) diff --git a/script/kafka_gen.py b/script/kafka_gen.py index c8f0011e3..8c61e9cc5 100755 --- a/script/kafka_gen.py +++ b/script/kafka_gen.py @@ -74,7 +74,7 @@ API_VERSION_PATCHES = { "ApiVersions": (0, 3), "Metadata": (0, 1), - "Produce": (0, 2), + "Produce": (2, 2), } # ----------------------------------------------------------------------------- @@ -508,13 +508,17 @@ def parse(msg): def gen_haskell_header(): return """ +------------------------------------------------------------------------------- +-- Autogenerated by kafka message json schema +-- +-- $ ./script/kafka_gen.py run > hstream-kafka/protocol/Kafka/Protocol/Message/Struct.hs +-- +-- DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING + {-# LANGUAGE DataKinds #-} {-# LANGUAGE DuplicateRecordFields #-} {-# LANGUAGE TypeFamilies #-} -------------------------------------------------------------------------------- --- TODO: Generate by kafka message json schema - module Kafka.Protocol.Message.Struct where import Data.ByteString (ByteString) @@ -571,13 +575,12 @@ def gen_supported_api_versions(): def gen_services(): - max_api_version = max(x.max_version for x in API_VERSIONS) services = [] srv_methods = lambda v: format_hs_list( ( '"' + lower_fst(api.api_name) + '"' for api in sorted(API_VERSIONS) - if v <= api.max_version + if api.min_version <= v <= api.max_version ), indent=4, prefix="'", @@ -592,11 +595,13 @@ def gen_services(): type MethodOutput {srv_name} "{lower_fst(api.api_name)}" = {api.api_name}ResponseV{v} """ for api in sorted(API_VERSIONS) - if v <= api.max_version + if api.min_version <= v <= api.max_version ) # for all supported api_version - for v in range(max_api_version + 1): + _glo_max_version = max(x.max_version for x in API_VERSIONS) + _glo_min_version = min(x.min_version for x in API_VERSIONS) + for v in range(_glo_min_version, _glo_max_version + 1): srv_name = f"HStreamKafkaV{v}" srv = f""" data {srv_name}