From fffd241b32f602518934b4b4c9c4d14bb6cb464c Mon Sep 17 00:00:00 2001 From: mu <59917266+4eUeP@users.noreply.github.com> Date: Wed, 8 May 2024 17:20:23 +0800 Subject: [PATCH] Kafka: let RecordBytes be a newtype --- .../HStream/Kafka/Server/Handler/Consume.hs | 8 ++++---- .../HStream/Kafka/Server/Handler/Produce.hs | 4 +++- .../protocol/Kafka/Protocol/Encoding/Types.hs | 12 ++++++++++++ .../protocol/Kafka/Protocol/Message/Struct.hs | 8 ++++---- .../protocol/Kafka/Protocol/Message/Total.hs | 4 ++-- script/kafka_gen.py | 13 +++++++++---- 6 files changed, 34 insertions(+), 15 deletions(-) diff --git a/hstream-kafka/HStream/Kafka/Server/Handler/Consume.hs b/hstream-kafka/HStream/Kafka/Server/Handler/Consume.hs index d93920ecb..08bf5e220 100644 --- a/hstream-kafka/HStream/Kafka/Server/Handler/Consume.hs +++ b/hstream-kafka/HStream/Kafka/Server/Handler/Consume.hs @@ -187,7 +187,7 @@ handleFetch sc@ServerContext{..} reqCtx r_ = K.catchFetchResponseEx $ do { partitionIndex = request.partition , errorCode = K.NONE , highWatermark = hioffset - , recordBytes = (Just "") + , recordBytes = (K.RecordBytes $ Just "") , lastStableOffset = (-1) -- TODO , abortedTransactions = K.NonNullKaArray V.empty -- TODO -- TODO: for performance reason, we don't implement @@ -223,7 +223,7 @@ handleFetch sc@ServerContext{..} reqCtx r_ = K.catchFetchResponseEx $ do { partitionIndex = request.partition , errorCode = K.NONE , highWatermark = hioffset - , recordBytes = (Just bs) + , recordBytes = (K.RecordBytes $ Just bs) , lastStableOffset = (-1) -- TODO , abortedTransactions = K.NonNullKaArray V.empty -- TODO -- TODO: for performance reason, we don't implement @@ -572,7 +572,7 @@ errorPartitionResponse partitionIndex ec = K.PartitionData { partitionIndex = partitionIndex , errorCode = ec , highWatermark = (-1) - , recordBytes = (Just "") + , recordBytes = (K.RecordBytes $ Just "") , lastStableOffset = (-1) -- TODO , abortedTransactions = K.NonNullKaArray V.empty -- TODO -- TODO: for performance reason, we don't implement logStartOffset now @@ -585,7 +585,7 @@ partitionResponse0 partitionIndex ec hw = K.PartitionData { partitionIndex = partitionIndex , errorCode = ec , highWatermark = hw - , recordBytes = (Just "") + , recordBytes = (K.RecordBytes $ Just "") , lastStableOffset = (-1) -- TODO , abortedTransactions = K.NonNullKaArray V.empty -- TODO -- TODO: for performance reason, we don't implement logStartOffset now diff --git a/hstream-kafka/HStream/Kafka/Server/Handler/Produce.hs b/hstream-kafka/HStream/Kafka/Server/Handler/Produce.hs index a641f7c2c..f12bd4856 100644 --- a/hstream-kafka/HStream/Kafka/Server/Handler/Produce.hs +++ b/hstream-kafka/HStream/Kafka/Server/Handler/Produce.hs @@ -72,7 +72,9 @@ handleProduce ServerContext{..} _reqCtx req = do M.totalProduceRequest (topic.name, T.pack . show $ partition.index) $ \counter -> void $ M.addCounter counter 1 - let Just recordBytes = partition.recordBytes -- TODO: handle Nothing + let recordBytes = + fromMaybe (error "TODO: Receive empty recordBytes in ProduceRequest") + (K.unRecordBytes partition.recordBytes) Log.debug1 $ "Try to append to logid " <> Log.build logid <> "(" <> Log.build partition.index <> ")" diff --git a/hstream-kafka/protocol/Kafka/Protocol/Encoding/Types.hs b/hstream-kafka/protocol/Kafka/Protocol/Encoding/Types.hs index a825372d2..68714a9c8 100644 --- a/hstream-kafka/protocol/Kafka/Protocol/Encoding/Types.hs +++ b/hstream-kafka/protocol/Kafka/Protocol/Encoding/Types.hs @@ -21,6 +21,8 @@ module Kafka.Protocol.Encoding.Types , TaggedFields (EmptyTaggedFields) -- TODO , KaArray (..) , CompactKaArray (..) + , RecordBytes (..) + , RecordCompactBytes (..) , RecordKey (..) , RecordValue (..) , RecordHeaderKey (..) @@ -159,6 +161,13 @@ newtype CompactKaArray a = CompactKaArray instance Functor CompactKaArray where fmap f (CompactKaArray xs) = CompactKaArray $ fmap f <$> xs +newtype RecordBytes = RecordBytes { unRecordBytes :: Maybe ByteString } + deriving newtype (Show, Eq, Ord, NFData) + +newtype RecordCompactBytes = RecordCompactBytes + { unRecordCompactBytes :: Maybe ByteString } + deriving newtype (Show, Eq, Ord, NFData) + newtype RecordKey = RecordKey { unRecordKey :: Maybe ByteString } deriving newtype (Show, Eq, Ord, NFData) @@ -209,6 +218,9 @@ INSTANCE_NEWTYPE(CompactNullableString) INSTANCE_NEWTYPE(CompactBytes) INSTANCE_NEWTYPE(CompactNullableBytes) +INSTANCE_NEWTYPE_1(RecordBytes, NullableBytes) +INSTANCE_NEWTYPE_1(RecordCompactBytes, CompactNullableBytes) + INSTANCE_NEWTYPE_1(RecordKey, VarBytesInRecord) INSTANCE_NEWTYPE_1(RecordValue, VarBytesInRecord) INSTANCE_NEWTYPE_1(RecordHeaderKey, VarStringInRecord) diff --git a/hstream-kafka/protocol/Kafka/Protocol/Message/Struct.hs b/hstream-kafka/protocol/Kafka/Protocol/Message/Struct.hs index 26c28128d..000ea55bd 100644 --- a/hstream-kafka/protocol/Kafka/Protocol/Message/Struct.hs +++ b/hstream-kafka/protocol/Kafka/Protocol/Message/Struct.hs @@ -412,7 +412,7 @@ data PartitionDataV0 = PartitionDataV0 -- ^ The error code, or 0 if there was no fetch error. , highWatermark :: {-# UNPACK #-} !Int64 -- ^ The current high water mark. - , recordBytes :: !NullableBytes + , recordBytes :: !RecordBytes -- ^ The record data. } deriving (Show, Eq, Generic) instance Serializable PartitionDataV0 @@ -458,7 +458,7 @@ data PartitionDataV4 = PartitionDataV4 -- offset have been decided (ABORTED or COMMITTED) , abortedTransactions :: !(KaArray AbortedTransactionV4) -- ^ The aborted transactions. - , recordBytes :: !NullableBytes + , recordBytes :: !RecordBytes -- ^ The record data. } deriving (Show, Eq, Generic) instance Serializable PartitionDataV4 @@ -488,7 +488,7 @@ data PartitionDataV5 = PartitionDataV5 -- ^ The current log start offset. , abortedTransactions :: !(KaArray AbortedTransactionV4) -- ^ The aborted transactions. - , recordBytes :: !NullableBytes + , recordBytes :: !RecordBytes -- ^ The record data. } deriving (Show, Eq, Generic) instance Serializable PartitionDataV5 @@ -867,7 +867,7 @@ type OffsetFetchResponseTopicV3 = OffsetFetchResponseTopicV0 data PartitionProduceDataV0 = PartitionProduceDataV0 { index :: {-# UNPACK #-} !Int32 -- ^ The partition index. - , recordBytes :: !NullableBytes + , recordBytes :: !RecordBytes -- ^ The record data to be produced. } deriving (Show, Eq, Generic) instance Serializable PartitionProduceDataV0 diff --git a/hstream-kafka/protocol/Kafka/Protocol/Message/Total.hs b/hstream-kafka/protocol/Kafka/Protocol/Message/Total.hs index 4a22b1f5f..f066e4af2 100644 --- a/hstream-kafka/protocol/Kafka/Protocol/Message/Total.hs +++ b/hstream-kafka/protocol/Kafka/Protocol/Message/Total.hs @@ -1641,7 +1641,7 @@ data PartitionData = PartitionData -- ^ The error code, or 0 if there was no fetch error. , highWatermark :: {-# UNPACK #-} !Int64 -- ^ The current high water mark. - , recordBytes :: !NullableBytes + , recordBytes :: !RecordBytes -- ^ The record data. , lastStableOffset :: {-# UNPACK #-} !Int64 -- ^ The last stable offset (or LSO) of the partition. This is the last @@ -1731,7 +1731,7 @@ partitionDataFromV6 = partitionDataFromV5 data PartitionProduceData = PartitionProduceData { index :: {-# UNPACK #-} !Int32 -- ^ The partition index. - , recordBytes :: !NullableBytes + , recordBytes :: !RecordBytes -- ^ The record data to be produced. } deriving (Show, Eq, Generic) instance Serializable PartitionProduceData diff --git a/script/kafka_gen.py b/script/kafka_gen.py index 7924f8145..fe44214a1 100755 --- a/script/kafka_gen.py +++ b/script/kafka_gen.py @@ -41,6 +41,11 @@ # ----------------------------------------------------------------------------- # Constants +# e.g. +# +# {"name": "Records", "type": "records", ...} +# -> {"name": "RecordBytes", "type": "records", ...} +# -> data ... { recordBytes :: ... } RENAMES = {"Records": "RecordBytes"} TYPE_MAPS = { @@ -52,7 +57,7 @@ "string": "!Text", "bool": "Bool", "bytes": "!ByteString", - "records": "!ByteString", + "records": None, # Records should be NULLABLE_BYTES "array": "!(KaArray {})", "errorCode": "{{-# UNPACK #-}} !ErrorCode", "apiKey": "{{-# UNPACK #-}} !ApiKey", @@ -60,18 +65,18 @@ NULLABLE_TYPE_MAPS = { "string": "!NullableString", "bytes": "!NullableBytes", - "records": "!NullableBytes", + "records": "!RecordBytes", } COMPACT_TYPE_MAPS = { "string": "!CompactString", "bytes": "!CompactBytes", - "records": "!CompactBytes", + "records": None, # Records should be NULLABLE_BYTES "array": "!(CompactKaArray {})", } COMPACT_NULLABLE_TYPE_MAPS = { "string": "!CompactNullableString", "bytes": "!CompactNullableBytes", - "records": "!CompactNullableBytes", + "records": ..., # TODO, produce version >= 9 "array": "!(CompactKaArray {})", }