Skip to content

Commit

Permalink
Kafka: let RecordBytes be a newtype
Browse files Browse the repository at this point in the history
  • Loading branch information
4eUeP committed May 9, 2024
1 parent dff34b9 commit fffd241
Show file tree
Hide file tree
Showing 6 changed files with 34 additions and 15 deletions.
8 changes: 4 additions & 4 deletions hstream-kafka/HStream/Kafka/Server/Handler/Consume.hs
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ handleFetch sc@ServerContext{..} reqCtx r_ = K.catchFetchResponseEx $ do
{ partitionIndex = request.partition
, errorCode = K.NONE
, highWatermark = hioffset
, recordBytes = (Just "")
, recordBytes = (K.RecordBytes $ Just "")
, lastStableOffset = (-1) -- TODO
, abortedTransactions = K.NonNullKaArray V.empty -- TODO
-- TODO: for performance reason, we don't implement
Expand Down Expand Up @@ -223,7 +223,7 @@ handleFetch sc@ServerContext{..} reqCtx r_ = K.catchFetchResponseEx $ do
{ partitionIndex = request.partition
, errorCode = K.NONE
, highWatermark = hioffset
, recordBytes = (Just bs)
, recordBytes = (K.RecordBytes $ Just bs)
, lastStableOffset = (-1) -- TODO
, abortedTransactions = K.NonNullKaArray V.empty -- TODO
-- TODO: for performance reason, we don't implement
Expand Down Expand Up @@ -572,7 +572,7 @@ errorPartitionResponse partitionIndex ec = K.PartitionData
{ partitionIndex = partitionIndex
, errorCode = ec
, highWatermark = (-1)
, recordBytes = (Just "")
, recordBytes = (K.RecordBytes $ Just "")
, lastStableOffset = (-1) -- TODO
, abortedTransactions = K.NonNullKaArray V.empty -- TODO
-- TODO: for performance reason, we don't implement logStartOffset now
Expand All @@ -585,7 +585,7 @@ partitionResponse0 partitionIndex ec hw = K.PartitionData
{ partitionIndex = partitionIndex
, errorCode = ec
, highWatermark = hw
, recordBytes = (Just "")
, recordBytes = (K.RecordBytes $ Just "")
, lastStableOffset = (-1) -- TODO
, abortedTransactions = K.NonNullKaArray V.empty -- TODO
-- TODO: for performance reason, we don't implement logStartOffset now
Expand Down
4 changes: 3 additions & 1 deletion hstream-kafka/HStream/Kafka/Server/Handler/Produce.hs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,9 @@ handleProduce ServerContext{..} _reqCtx req = do
M.totalProduceRequest
(topic.name, T.pack . show $ partition.index) $ \counter ->
void $ M.addCounter counter 1
let Just recordBytes = partition.recordBytes -- TODO: handle Nothing
let recordBytes =
fromMaybe (error "TODO: Receive empty recordBytes in ProduceRequest")
(K.unRecordBytes partition.recordBytes)
Log.debug1 $ "Try to append to logid " <> Log.build logid
<> "(" <> Log.build partition.index <> ")"

Expand Down
12 changes: 12 additions & 0 deletions hstream-kafka/protocol/Kafka/Protocol/Encoding/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ module Kafka.Protocol.Encoding.Types
, TaggedFields (EmptyTaggedFields) -- TODO
, KaArray (..)
, CompactKaArray (..)
, RecordBytes (..)
, RecordCompactBytes (..)
, RecordKey (..)
, RecordValue (..)
, RecordHeaderKey (..)
Expand Down Expand Up @@ -159,6 +161,13 @@ newtype CompactKaArray a = CompactKaArray
instance Functor CompactKaArray where
fmap f (CompactKaArray xs) = CompactKaArray $ fmap f <$> xs

newtype RecordBytes = RecordBytes { unRecordBytes :: Maybe ByteString }
deriving newtype (Show, Eq, Ord, NFData)

newtype RecordCompactBytes = RecordCompactBytes
{ unRecordCompactBytes :: Maybe ByteString }
deriving newtype (Show, Eq, Ord, NFData)

newtype RecordKey = RecordKey { unRecordKey :: Maybe ByteString }
deriving newtype (Show, Eq, Ord, NFData)

Expand Down Expand Up @@ -209,6 +218,9 @@ INSTANCE_NEWTYPE(CompactNullableString)
INSTANCE_NEWTYPE(CompactBytes)
INSTANCE_NEWTYPE(CompactNullableBytes)

INSTANCE_NEWTYPE_1(RecordBytes, NullableBytes)
INSTANCE_NEWTYPE_1(RecordCompactBytes, CompactNullableBytes)

INSTANCE_NEWTYPE_1(RecordKey, VarBytesInRecord)
INSTANCE_NEWTYPE_1(RecordValue, VarBytesInRecord)
INSTANCE_NEWTYPE_1(RecordHeaderKey, VarStringInRecord)
Expand Down
8 changes: 4 additions & 4 deletions hstream-kafka/protocol/Kafka/Protocol/Message/Struct.hs
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,7 @@ data PartitionDataV0 = PartitionDataV0
-- ^ The error code, or 0 if there was no fetch error.
, highWatermark :: {-# UNPACK #-} !Int64
-- ^ The current high water mark.
, recordBytes :: !NullableBytes
, recordBytes :: !RecordBytes
-- ^ The record data.
} deriving (Show, Eq, Generic)
instance Serializable PartitionDataV0
Expand Down Expand Up @@ -458,7 +458,7 @@ data PartitionDataV4 = PartitionDataV4
-- offset have been decided (ABORTED or COMMITTED)
, abortedTransactions :: !(KaArray AbortedTransactionV4)
-- ^ The aborted transactions.
, recordBytes :: !NullableBytes
, recordBytes :: !RecordBytes
-- ^ The record data.
} deriving (Show, Eq, Generic)
instance Serializable PartitionDataV4
Expand Down Expand Up @@ -488,7 +488,7 @@ data PartitionDataV5 = PartitionDataV5
-- ^ The current log start offset.
, abortedTransactions :: !(KaArray AbortedTransactionV4)
-- ^ The aborted transactions.
, recordBytes :: !NullableBytes
, recordBytes :: !RecordBytes
-- ^ The record data.
} deriving (Show, Eq, Generic)
instance Serializable PartitionDataV5
Expand Down Expand Up @@ -867,7 +867,7 @@ type OffsetFetchResponseTopicV3 = OffsetFetchResponseTopicV0
data PartitionProduceDataV0 = PartitionProduceDataV0
{ index :: {-# UNPACK #-} !Int32
-- ^ The partition index.
, recordBytes :: !NullableBytes
, recordBytes :: !RecordBytes
-- ^ The record data to be produced.
} deriving (Show, Eq, Generic)
instance Serializable PartitionProduceDataV0
Expand Down
4 changes: 2 additions & 2 deletions hstream-kafka/protocol/Kafka/Protocol/Message/Total.hs
Original file line number Diff line number Diff line change
Expand Up @@ -1641,7 +1641,7 @@ data PartitionData = PartitionData
-- ^ The error code, or 0 if there was no fetch error.
, highWatermark :: {-# UNPACK #-} !Int64
-- ^ The current high water mark.
, recordBytes :: !NullableBytes
, recordBytes :: !RecordBytes
-- ^ The record data.
, lastStableOffset :: {-# UNPACK #-} !Int64
-- ^ The last stable offset (or LSO) of the partition. This is the last
Expand Down Expand Up @@ -1731,7 +1731,7 @@ partitionDataFromV6 = partitionDataFromV5
data PartitionProduceData = PartitionProduceData
{ index :: {-# UNPACK #-} !Int32
-- ^ The partition index.
, recordBytes :: !NullableBytes
, recordBytes :: !RecordBytes
-- ^ The record data to be produced.
} deriving (Show, Eq, Generic)
instance Serializable PartitionProduceData
Expand Down
13 changes: 9 additions & 4 deletions script/kafka_gen.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@
# -----------------------------------------------------------------------------
# Constants

# e.g.
#
# {"name": "Records", "type": "records", ...}
# -> {"name": "RecordBytes", "type": "records", ...}
# -> data ... { recordBytes :: ... }
RENAMES = {"Records": "RecordBytes"}

TYPE_MAPS = {
Expand All @@ -52,26 +57,26 @@
"string": "!Text",
"bool": "Bool",
"bytes": "!ByteString",
"records": "!ByteString",
"records": None, # Records should be NULLABLE_BYTES
"array": "!(KaArray {})",
"errorCode": "{{-# UNPACK #-}} !ErrorCode",
"apiKey": "{{-# UNPACK #-}} !ApiKey",
}
NULLABLE_TYPE_MAPS = {
"string": "!NullableString",
"bytes": "!NullableBytes",
"records": "!NullableBytes",
"records": "!RecordBytes",
}
COMPACT_TYPE_MAPS = {
"string": "!CompactString",
"bytes": "!CompactBytes",
"records": "!CompactBytes",
"records": None, # Records should be NULLABLE_BYTES
"array": "!(CompactKaArray {})",
}
COMPACT_NULLABLE_TYPE_MAPS = {
"string": "!CompactNullableString",
"bytes": "!CompactNullableBytes",
"records": "!CompactNullableBytes",
"records": ..., # TODO, produce version >= 9
"array": "!(CompactKaArray {})",
}

Expand Down

0 comments on commit fffd241

Please sign in to comment.