Skip to content

Commit

Permalink
hstream-kafka: show request api_version in debug message (#1625)
Browse files Browse the repository at this point in the history
* kafka protocol: bump min version of Produce to 2

* Update debug message
  • Loading branch information
4eUeP authored Sep 27, 2023
1 parent 810eb7f commit 80285fb
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 92 deletions.
6 changes: 3 additions & 3 deletions hstream-kafka/HStream/Kafka/Server/Handler/Produce.hs
Original file line number Diff line number Diff line change
Expand Up @@ -38,17 +38,17 @@ handleProduceV2
-> K.RequestContext
-> K.ProduceRequestV2
-> IO K.ProduceResponseV2
handleProduceV2 ServerContext{..} _ K.ProduceRequestV0{..} = do
handleProduceV2 ServerContext{..} _ K.ProduceRequestV2{..} = do
-- TODO: handle request args: acks, timeoutMs

let topicData' = fromMaybe V.empty (K.unKaArray topicData)
responses <- V.forM topicData' $ \K.TopicProduceDataV0{..} -> do
responses <- V.forM topicData' $ \K.TopicProduceDataV2{..} -> do
-- A topic is a stream. Here we donot need to check the topic existence,
-- because the metadata api does(?)
let topic = S.transToTopicStreamName name
partitions <- S.listStreamPartitionsOrdered scLDClient topic
let partitionData' = fromMaybe V.empty (K.unKaArray partitionData)
partitionResponses <- V.forM partitionData' $ \K.PartitionProduceDataV0{..} -> do
partitionResponses <- V.forM partitionData' $ \K.PartitionProduceDataV2{..} -> do
let Just (_, logid) = partitions V.!? (fromIntegral index) -- TODO: handle Nothing
let Just recordBytes' = recordBytes -- TODO: handle Nothing

Expand Down
5 changes: 4 additions & 1 deletion hstream-kafka/protocol/Kafka/Network.hs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,10 @@ runServer opts handlers =
case rpcHandler of
UnaryHandler rpcHandler' -> do
req <- runGet l
Log.debug $ "Received request: " <> Log.buildString' req
Log.debug $ "Received request "
<> Log.buildString' requestApiKey
<> ":v" <> Log.build requestApiVersion
<> ", payload: " <> Log.buildString' req
resp <- rpcHandler' RequestContext req
Log.debug $ "Server response: " <> Log.buildString' resp
let respBs = runPutLazy resp
Expand Down
98 changes: 18 additions & 80 deletions hstream-kafka/protocol/Kafka/Protocol/Message/Struct.hs
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
-------------------------------------------------------------------------------
-- Autogenerated by kafka message json schema
--
-- $ ./script/kafka_gen.py run > hstream-kafka/protocol/Kafka/Protocol/Message/Struct.hs
--
-- DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING

{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DuplicateRecordFields #-}
{-# LANGUAGE TypeFamilies #-}

-------------------------------------------------------------------------------
-- TODO: Generate by kafka message json schema

module Kafka.Protocol.Message.Struct where

import Data.ByteString (ByteString)
Expand Down Expand Up @@ -374,51 +378,21 @@ data OffsetFetchResponseTopicV0 = OffsetFetchResponseTopicV0
} deriving (Show, Eq, Generic)
instance Serializable OffsetFetchResponseTopicV0

data PartitionProduceDataV0 = PartitionProduceDataV0
data PartitionProduceDataV2 = PartitionProduceDataV2
{ index :: {-# UNPACK #-} !Int32
-- ^ The partition index.
, recordBytes :: !NullableBytes
-- ^ The record data to be produced.
} deriving (Show, Eq, Generic)
instance Serializable PartitionProduceDataV0
instance Serializable PartitionProduceDataV2

data TopicProduceDataV0 = TopicProduceDataV0
data TopicProduceDataV2 = TopicProduceDataV2
{ name :: !Text
-- ^ The topic name.
, partitionData :: !(KaArray PartitionProduceDataV0)
, partitionData :: !(KaArray PartitionProduceDataV2)
-- ^ Each partition to produce to.
} deriving (Show, Eq, Generic)
instance Serializable TopicProduceDataV0

type PartitionProduceDataV1 = PartitionProduceDataV0

type TopicProduceDataV1 = TopicProduceDataV0

type PartitionProduceDataV2 = PartitionProduceDataV0

type TopicProduceDataV2 = TopicProduceDataV0

data PartitionProduceResponseV0 = PartitionProduceResponseV0
{ index :: {-# UNPACK #-} !Int32
-- ^ The partition index.
, errorCode :: {-# UNPACK #-} !ErrorCode
-- ^ The error code, or 0 if there was no error.
, baseOffset :: {-# UNPACK #-} !Int64
-- ^ The base offset.
} deriving (Show, Eq, Generic)
instance Serializable PartitionProduceResponseV0

data TopicProduceResponseV0 = TopicProduceResponseV0
{ name :: !Text
-- ^ The topic name
, partitionResponses :: !(KaArray PartitionProduceResponseV0)
-- ^ Each partition that we produced to within the topic.
} deriving (Show, Eq, Generic)
instance Serializable TopicProduceResponseV0

type PartitionProduceResponseV1 = PartitionProduceResponseV0

type TopicProduceResponseV1 = TopicProduceResponseV0
instance Serializable TopicProduceDataV2

data PartitionProduceResponseV2 = PartitionProduceResponseV2
{ index :: {-# UNPACK #-} !Int32
Expand Down Expand Up @@ -712,35 +686,17 @@ newtype OffsetFetchResponseV0 = OffsetFetchResponseV0
} deriving (Show, Eq, Generic)
instance Serializable OffsetFetchResponseV0

data ProduceRequestV0 = ProduceRequestV0
data ProduceRequestV2 = ProduceRequestV2
{ acks :: {-# UNPACK #-} !Int16
-- ^ The number of acknowledgments the producer requires the leader to have
-- received before considering a request complete. Allowed values: 0 for no
-- acknowledgments, 1 for only the leader and -1 for the full ISR.
, timeoutMs :: {-# UNPACK #-} !Int32
-- ^ The timeout to await a response in milliseconds.
, topicData :: !(KaArray TopicProduceDataV0)
, topicData :: !(KaArray TopicProduceDataV2)
-- ^ Each topic to produce to.
} deriving (Show, Eq, Generic)
instance Serializable ProduceRequestV0

type ProduceRequestV1 = ProduceRequestV0

type ProduceRequestV2 = ProduceRequestV0

newtype ProduceResponseV0 = ProduceResponseV0
{ responses :: (KaArray TopicProduceResponseV0)
} deriving (Show, Eq, Generic)
instance Serializable ProduceResponseV0

data ProduceResponseV1 = ProduceResponseV1
{ responses :: !(KaArray TopicProduceResponseV0)
-- ^ Each produce response
, throttleTimeMs :: {-# UNPACK #-} !Int32
-- ^ The duration in milliseconds for which the request was throttled due
-- to a quota violation, or zero if the request did not violate any quota.
} deriving (Show, Eq, Generic)
instance Serializable ProduceResponseV1
instance Serializable ProduceRequestV2

data ProduceResponseV2 = ProduceResponseV2
{ responses :: !(KaArray TopicProduceResponseV2)
Expand Down Expand Up @@ -778,8 +734,7 @@ data HStreamKafkaV0
instance Service HStreamKafkaV0 where
type ServiceName HStreamKafkaV0 = "HStreamKafkaV0"
type ServiceMethods HStreamKafkaV0 =
'[ "produce"
, "fetch"
'[ "fetch"
, "listOffsets"
, "metadata"
, "offsetCommit"
Expand All @@ -796,13 +751,6 @@ instance Service HStreamKafkaV0 where
, "deleteTopics"
]

instance HasMethodImpl HStreamKafkaV0 "produce" where
type MethodName HStreamKafkaV0 "produce" = "produce"
type MethodKey HStreamKafkaV0 "produce" = 0
type MethodVersion HStreamKafkaV0 "produce" = 0
type MethodInput HStreamKafkaV0 "produce" = ProduceRequestV0
type MethodOutput HStreamKafkaV0 "produce" = ProduceResponseV0

instance HasMethodImpl HStreamKafkaV0 "fetch" where
type MethodName HStreamKafkaV0 "fetch" = "fetch"
type MethodKey HStreamKafkaV0 "fetch" = 1
Expand Down Expand Up @@ -913,18 +861,10 @@ data HStreamKafkaV1
instance Service HStreamKafkaV1 where
type ServiceName HStreamKafkaV1 = "HStreamKafkaV1"
type ServiceMethods HStreamKafkaV1 =
'[ "produce"
, "metadata"
'[ "metadata"
, "apiVersions"
]

instance HasMethodImpl HStreamKafkaV1 "produce" where
type MethodName HStreamKafkaV1 "produce" = "produce"
type MethodKey HStreamKafkaV1 "produce" = 0
type MethodVersion HStreamKafkaV1 "produce" = 1
type MethodInput HStreamKafkaV1 "produce" = ProduceRequestV1
type MethodOutput HStreamKafkaV1 "produce" = ProduceResponseV1

instance HasMethodImpl HStreamKafkaV1 "metadata" where
type MethodName HStreamKafkaV1 "metadata" = "metadata"
type MethodKey HStreamKafkaV1 "metadata" = 3
Expand Down Expand Up @@ -1003,7 +943,7 @@ instance Show ApiKey where

supportedApiVersions :: [ApiVersionV0]
supportedApiVersions =
[ ApiVersionV0 (ApiKey 0) 0 2
[ ApiVersionV0 (ApiKey 0) 2 2
, ApiVersionV0 (ApiKey 1) 0 0
, ApiVersionV0 (ApiKey 2) 0 0
, ApiVersionV0 (ApiKey 3) 0 1
Expand All @@ -1022,8 +962,6 @@ supportedApiVersions =
]

getHeaderVersion :: ApiKey -> Int16 -> (Int16, Int16)
getHeaderVersion (ApiKey 0) 0 = (1, 0)
getHeaderVersion (ApiKey 0) 1 = (1, 0)
getHeaderVersion (ApiKey 0) 2 = (1, 0)
getHeaderVersion (ApiKey 1) 0 = (1, 0)
getHeaderVersion (ApiKey 2) 0 = (1, 0)
Expand Down
21 changes: 13 additions & 8 deletions script/kafka_gen.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@
API_VERSION_PATCHES = {
"ApiVersions": (0, 3),
"Metadata": (0, 1),
"Produce": (0, 2),
"Produce": (2, 2),
}

# -----------------------------------------------------------------------------
Expand Down Expand Up @@ -508,13 +508,17 @@ def parse(msg):

def gen_haskell_header():
return """
-------------------------------------------------------------------------------
-- Autogenerated by kafka message json schema
--
-- $ ./script/kafka_gen.py run > hstream-kafka/protocol/Kafka/Protocol/Message/Struct.hs
--
-- DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DuplicateRecordFields #-}
{-# LANGUAGE TypeFamilies #-}
-------------------------------------------------------------------------------
-- TODO: Generate by kafka message json schema
module Kafka.Protocol.Message.Struct where
import Data.ByteString (ByteString)
Expand Down Expand Up @@ -571,13 +575,12 @@ def gen_supported_api_versions():


def gen_services():
max_api_version = max(x.max_version for x in API_VERSIONS)
services = []
srv_methods = lambda v: format_hs_list(
(
'"' + lower_fst(api.api_name) + '"'
for api in sorted(API_VERSIONS)
if v <= api.max_version
if api.min_version <= v <= api.max_version
),
indent=4,
prefix="'",
Expand All @@ -592,11 +595,13 @@ def gen_services():
type MethodOutput {srv_name} "{lower_fst(api.api_name)}" = {api.api_name}ResponseV{v}
"""
for api in sorted(API_VERSIONS)
if v <= api.max_version
if api.min_version <= v <= api.max_version
)

# for all supported api_version
for v in range(max_api_version + 1):
_glo_max_version = max(x.max_version for x in API_VERSIONS)
_glo_min_version = min(x.min_version for x in API_VERSIONS)
for v in range(_glo_min_version, _glo_max_version + 1):
srv_name = f"HStreamKafkaV{v}"
srv = f"""
data {srv_name}
Expand Down

0 comments on commit 80285fb

Please sign in to comment.