Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

hstream-kafka: show request api_version in debug message #1625

Merged
merged 2 commits into from
Sep 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions hstream-kafka/HStream/Kafka/Server/Handler/Produce.hs
Original file line number Diff line number Diff line change
Expand Up @@ -38,17 +38,17 @@ handleProduceV2
-> K.RequestContext
-> K.ProduceRequestV2
-> IO K.ProduceResponseV2
handleProduceV2 ServerContext{..} _ K.ProduceRequestV0{..} = do
handleProduceV2 ServerContext{..} _ K.ProduceRequestV2{..} = do
-- TODO: handle request args: acks, timeoutMs

let topicData' = fromMaybe V.empty (K.unKaArray topicData)
responses <- V.forM topicData' $ \K.TopicProduceDataV0{..} -> do
responses <- V.forM topicData' $ \K.TopicProduceDataV2{..} -> do
-- A topic is a stream. Here we donot need to check the topic existence,
-- because the metadata api does(?)
let topic = S.transToTopicStreamName name
partitions <- S.listStreamPartitionsOrdered scLDClient topic
let partitionData' = fromMaybe V.empty (K.unKaArray partitionData)
partitionResponses <- V.forM partitionData' $ \K.PartitionProduceDataV0{..} -> do
partitionResponses <- V.forM partitionData' $ \K.PartitionProduceDataV2{..} -> do
let Just (_, logid) = partitions V.!? (fromIntegral index) -- TODO: handle Nothing
let Just recordBytes' = recordBytes -- TODO: handle Nothing

Expand Down
5 changes: 4 additions & 1 deletion hstream-kafka/protocol/Kafka/Network.hs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,10 @@ runServer opts handlers =
case rpcHandler of
UnaryHandler rpcHandler' -> do
req <- runGet l
Log.debug $ "Received request: " <> Log.buildString' req
Log.debug $ "Received request "
<> Log.buildString' requestApiKey
<> ":v" <> Log.build requestApiVersion
<> ", payload: " <> Log.buildString' req
resp <- rpcHandler' RequestContext req
Log.debug $ "Server response: " <> Log.buildString' resp
let respBs = runPutLazy resp
Expand Down
98 changes: 18 additions & 80 deletions hstream-kafka/protocol/Kafka/Protocol/Message/Struct.hs
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
-------------------------------------------------------------------------------
-- Autogenerated by kafka message json schema
--
-- $ ./script/kafka_gen.py run > hstream-kafka/protocol/Kafka/Protocol/Message/Struct.hs
--
-- DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING

{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DuplicateRecordFields #-}
{-# LANGUAGE TypeFamilies #-}

-------------------------------------------------------------------------------
-- TODO: Generate by kafka message json schema

module Kafka.Protocol.Message.Struct where

import Data.ByteString (ByteString)
Expand Down Expand Up @@ -374,51 +378,21 @@ data OffsetFetchResponseTopicV0 = OffsetFetchResponseTopicV0
} deriving (Show, Eq, Generic)
instance Serializable OffsetFetchResponseTopicV0

data PartitionProduceDataV0 = PartitionProduceDataV0
data PartitionProduceDataV2 = PartitionProduceDataV2
{ index :: {-# UNPACK #-} !Int32
-- ^ The partition index.
, recordBytes :: !NullableBytes
-- ^ The record data to be produced.
} deriving (Show, Eq, Generic)
instance Serializable PartitionProduceDataV0
instance Serializable PartitionProduceDataV2

data TopicProduceDataV0 = TopicProduceDataV0
data TopicProduceDataV2 = TopicProduceDataV2
{ name :: !Text
-- ^ The topic name.
, partitionData :: !(KaArray PartitionProduceDataV0)
, partitionData :: !(KaArray PartitionProduceDataV2)
-- ^ Each partition to produce to.
} deriving (Show, Eq, Generic)
instance Serializable TopicProduceDataV0

type PartitionProduceDataV1 = PartitionProduceDataV0

type TopicProduceDataV1 = TopicProduceDataV0

type PartitionProduceDataV2 = PartitionProduceDataV0

type TopicProduceDataV2 = TopicProduceDataV0

data PartitionProduceResponseV0 = PartitionProduceResponseV0
{ index :: {-# UNPACK #-} !Int32
-- ^ The partition index.
, errorCode :: {-# UNPACK #-} !ErrorCode
-- ^ The error code, or 0 if there was no error.
, baseOffset :: {-# UNPACK #-} !Int64
-- ^ The base offset.
} deriving (Show, Eq, Generic)
instance Serializable PartitionProduceResponseV0

data TopicProduceResponseV0 = TopicProduceResponseV0
{ name :: !Text
-- ^ The topic name
, partitionResponses :: !(KaArray PartitionProduceResponseV0)
-- ^ Each partition that we produced to within the topic.
} deriving (Show, Eq, Generic)
instance Serializable TopicProduceResponseV0

type PartitionProduceResponseV1 = PartitionProduceResponseV0

type TopicProduceResponseV1 = TopicProduceResponseV0
instance Serializable TopicProduceDataV2

data PartitionProduceResponseV2 = PartitionProduceResponseV2
{ index :: {-# UNPACK #-} !Int32
Expand Down Expand Up @@ -712,35 +686,17 @@ newtype OffsetFetchResponseV0 = OffsetFetchResponseV0
} deriving (Show, Eq, Generic)
instance Serializable OffsetFetchResponseV0

data ProduceRequestV0 = ProduceRequestV0
data ProduceRequestV2 = ProduceRequestV2
{ acks :: {-# UNPACK #-} !Int16
-- ^ The number of acknowledgments the producer requires the leader to have
-- received before considering a request complete. Allowed values: 0 for no
-- acknowledgments, 1 for only the leader and -1 for the full ISR.
, timeoutMs :: {-# UNPACK #-} !Int32
-- ^ The timeout to await a response in milliseconds.
, topicData :: !(KaArray TopicProduceDataV0)
, topicData :: !(KaArray TopicProduceDataV2)
-- ^ Each topic to produce to.
} deriving (Show, Eq, Generic)
instance Serializable ProduceRequestV0

type ProduceRequestV1 = ProduceRequestV0

type ProduceRequestV2 = ProduceRequestV0

newtype ProduceResponseV0 = ProduceResponseV0
{ responses :: (KaArray TopicProduceResponseV0)
} deriving (Show, Eq, Generic)
instance Serializable ProduceResponseV0

data ProduceResponseV1 = ProduceResponseV1
{ responses :: !(KaArray TopicProduceResponseV0)
-- ^ Each produce response
, throttleTimeMs :: {-# UNPACK #-} !Int32
-- ^ The duration in milliseconds for which the request was throttled due
-- to a quota violation, or zero if the request did not violate any quota.
} deriving (Show, Eq, Generic)
instance Serializable ProduceResponseV1
instance Serializable ProduceRequestV2

data ProduceResponseV2 = ProduceResponseV2
{ responses :: !(KaArray TopicProduceResponseV2)
Expand Down Expand Up @@ -778,8 +734,7 @@ data HStreamKafkaV0
instance Service HStreamKafkaV0 where
type ServiceName HStreamKafkaV0 = "HStreamKafkaV0"
type ServiceMethods HStreamKafkaV0 =
'[ "produce"
, "fetch"
'[ "fetch"
, "listOffsets"
, "metadata"
, "offsetCommit"
Expand All @@ -796,13 +751,6 @@ instance Service HStreamKafkaV0 where
, "deleteTopics"
]

instance HasMethodImpl HStreamKafkaV0 "produce" where
type MethodName HStreamKafkaV0 "produce" = "produce"
type MethodKey HStreamKafkaV0 "produce" = 0
type MethodVersion HStreamKafkaV0 "produce" = 0
type MethodInput HStreamKafkaV0 "produce" = ProduceRequestV0
type MethodOutput HStreamKafkaV0 "produce" = ProduceResponseV0

instance HasMethodImpl HStreamKafkaV0 "fetch" where
type MethodName HStreamKafkaV0 "fetch" = "fetch"
type MethodKey HStreamKafkaV0 "fetch" = 1
Expand Down Expand Up @@ -913,18 +861,10 @@ data HStreamKafkaV1
instance Service HStreamKafkaV1 where
type ServiceName HStreamKafkaV1 = "HStreamKafkaV1"
type ServiceMethods HStreamKafkaV1 =
'[ "produce"
, "metadata"
'[ "metadata"
, "apiVersions"
]

instance HasMethodImpl HStreamKafkaV1 "produce" where
type MethodName HStreamKafkaV1 "produce" = "produce"
type MethodKey HStreamKafkaV1 "produce" = 0
type MethodVersion HStreamKafkaV1 "produce" = 1
type MethodInput HStreamKafkaV1 "produce" = ProduceRequestV1
type MethodOutput HStreamKafkaV1 "produce" = ProduceResponseV1

instance HasMethodImpl HStreamKafkaV1 "metadata" where
type MethodName HStreamKafkaV1 "metadata" = "metadata"
type MethodKey HStreamKafkaV1 "metadata" = 3
Expand Down Expand Up @@ -1003,7 +943,7 @@ instance Show ApiKey where

supportedApiVersions :: [ApiVersionV0]
supportedApiVersions =
[ ApiVersionV0 (ApiKey 0) 0 2
[ ApiVersionV0 (ApiKey 0) 2 2
, ApiVersionV0 (ApiKey 1) 0 0
, ApiVersionV0 (ApiKey 2) 0 0
, ApiVersionV0 (ApiKey 3) 0 1
Expand All @@ -1022,8 +962,6 @@ supportedApiVersions =
]

getHeaderVersion :: ApiKey -> Int16 -> (Int16, Int16)
getHeaderVersion (ApiKey 0) 0 = (1, 0)
getHeaderVersion (ApiKey 0) 1 = (1, 0)
getHeaderVersion (ApiKey 0) 2 = (1, 0)
getHeaderVersion (ApiKey 1) 0 = (1, 0)
getHeaderVersion (ApiKey 2) 0 = (1, 0)
Expand Down
21 changes: 13 additions & 8 deletions script/kafka_gen.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@
API_VERSION_PATCHES = {
"ApiVersions": (0, 3),
"Metadata": (0, 1),
"Produce": (0, 2),
"Produce": (2, 2),
}

# -----------------------------------------------------------------------------
Expand Down Expand Up @@ -508,13 +508,17 @@ def parse(msg):

def gen_haskell_header():
return """
-------------------------------------------------------------------------------
-- Autogenerated by kafka message json schema
--
-- $ ./script/kafka_gen.py run > hstream-kafka/protocol/Kafka/Protocol/Message/Struct.hs
--
-- DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING

{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DuplicateRecordFields #-}
{-# LANGUAGE TypeFamilies #-}

-------------------------------------------------------------------------------
-- TODO: Generate by kafka message json schema

module Kafka.Protocol.Message.Struct where

import Data.ByteString (ByteString)
Expand Down Expand Up @@ -571,13 +575,12 @@ def gen_supported_api_versions():


def gen_services():
max_api_version = max(x.max_version for x in API_VERSIONS)
services = []
srv_methods = lambda v: format_hs_list(
(
'"' + lower_fst(api.api_name) + '"'
for api in sorted(API_VERSIONS)
if v <= api.max_version
if api.min_version <= v <= api.max_version
),
indent=4,
prefix="'",
Expand All @@ -592,11 +595,13 @@ def gen_services():
type MethodOutput {srv_name} "{lower_fst(api.api_name)}" = {api.api_name}ResponseV{v}
"""
for api in sorted(API_VERSIONS)
if v <= api.max_version
if api.min_version <= v <= api.max_version
)

# for all supported api_version
for v in range(max_api_version + 1):
_glo_max_version = max(x.max_version for x in API_VERSIONS)
_glo_min_version = min(x.min_version for x in API_VERSIONS)
for v in range(_glo_min_version, _glo_max_version + 1):
srv_name = f"HStreamKafkaV{v}"
srv = f"""
data {srv_name}
Expand Down
Loading