Skip to content

Commit

Permalink
refactor(logging): Only display RecordBytes length instead of contents (
Browse files Browse the repository at this point in the history
#1814)

* code refactoring: rename functions

* Kafka: let RecordBytes be a newtype

* Show instance for RecordBytes
  • Loading branch information
4eUeP authored May 9, 2024
1 parent 3309ba7 commit 2e4894f
Show file tree
Hide file tree
Showing 8 changed files with 67 additions and 34 deletions.
8 changes: 4 additions & 4 deletions hstream-kafka/HStream/Kafka/Server/Handler/Consume.hs
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ handleFetch sc@ServerContext{..} reqCtx r_ = K.catchFetchResponseEx $ do
{ partitionIndex = request.partition
, errorCode = K.NONE
, highWatermark = hioffset
, recordBytes = (Just "")
, recordBytes = (K.RecordBytes $ Just "")
, lastStableOffset = (-1) -- TODO
, abortedTransactions = K.NonNullKaArray V.empty -- TODO
-- TODO: for performance reason, we don't implement
Expand Down Expand Up @@ -223,7 +223,7 @@ handleFetch sc@ServerContext{..} reqCtx r_ = K.catchFetchResponseEx $ do
{ partitionIndex = request.partition
, errorCode = K.NONE
, highWatermark = hioffset
, recordBytes = (Just bs)
, recordBytes = (K.RecordBytes $ Just bs)
, lastStableOffset = (-1) -- TODO
, abortedTransactions = K.NonNullKaArray V.empty -- TODO
-- TODO: for performance reason, we don't implement
Expand Down Expand Up @@ -572,7 +572,7 @@ errorPartitionResponse partitionIndex ec = K.PartitionData
{ partitionIndex = partitionIndex
, errorCode = ec
, highWatermark = (-1)
, recordBytes = (Just "")
, recordBytes = (K.RecordBytes $ Just "")
, lastStableOffset = (-1) -- TODO
, abortedTransactions = K.NonNullKaArray V.empty -- TODO
-- TODO: for performance reason, we don't implement logStartOffset now
Expand All @@ -585,7 +585,7 @@ partitionResponse0 partitionIndex ec hw = K.PartitionData
{ partitionIndex = partitionIndex
, errorCode = ec
, highWatermark = hw
, recordBytes = (Just "")
, recordBytes = (K.RecordBytes $ Just "")
, lastStableOffset = (-1) -- TODO
, abortedTransactions = K.NonNullKaArray V.empty -- TODO
-- TODO: for performance reason, we don't implement logStartOffset now
Expand Down
9 changes: 8 additions & 1 deletion hstream-kafka/HStream/Kafka/Server/Handler/Produce.hs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,14 @@ handleProduce ServerContext{..} _reqCtx req = do
M.totalProduceRequest
(topic.name, T.pack . show $ partition.index) $ \counter ->
void $ M.addCounter counter 1
let Just recordBytes = partition.recordBytes -- TODO: handle Nothing
let recordBytes =
fromMaybe (error "TODO: Receive empty recordBytes in ProduceRequest")
(K.unRecordBytes partition.recordBytes)
-- Trace raw record bytes of the request
--
-- Note that the Show instance of RecordBytes type will only show the
-- length of the ByteString. So here we pass the ByteString to the Log
Log.trace $ "Received recordBytes: " <> Log.buildString' (recordBytes :: ByteString)
Log.debug1 $ "Try to append to logid " <> Log.build logid
<> "(" <> Log.build partition.index <> ")"

Expand Down
16 changes: 8 additions & 8 deletions hstream-kafka/protocol/Kafka/Protocol/Encoding/Encode.hs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ module Kafka.Protocol.Encoding.Encode
, putCompactBytes
, putNullableBytes
, putCompactNullableBytes
, putRecordNullableBytes
, putRecordString
, putVarBytesInRecord
, putVarStringInRecord
) where

import Data.Bits
Expand Down Expand Up @@ -164,21 +164,21 @@ putCompactNullableBytes :: Maybe ByteString -> Builder
putCompactNullableBytes Nothing = putVarWord32 0
putCompactNullableBytes (Just x) = putCompactBytes x

-- Record key or value
-- | Record key or value
--
-- ref: https://kafka.apache.org/documentation/#record
putRecordNullableBytes :: Maybe ByteString -> Builder
putRecordNullableBytes Nothing = putVarInt32 (-1)
putRecordNullableBytes (Just bs) =
putVarBytesInRecord :: Maybe ByteString -> Builder
putVarBytesInRecord Nothing = putVarInt32 (-1)
putVarBytesInRecord (Just bs) =
let !l = BS.length bs
b = Builder (Sum (fromIntegral l)) (BB.byteStringCopy bs)
in putVarInt32 (fromIntegral l) <> b

-- | Record header key
--
-- ref: https://kafka.apache.org/documentation/#record
putRecordString :: Text -> Builder
putRecordString x =
putVarStringInRecord :: Text -> Builder
putVarStringInRecord x =
let !bs = Text.encodeUtf8 x
!l = BS.length bs
b = Builder (Sum (fromIntegral l)) (BB.byteStringCopy bs)
Expand Down
14 changes: 7 additions & 7 deletions hstream-kafka/protocol/Kafka/Protocol/Encoding/Parser.hs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ module Kafka.Protocol.Encoding.Parser
, getCompactBytes
, getNullableBytes
, getCompactNullableBytes
, getRecordNullableBytes
, getRecordString
, getVarBytesInRecord
, getVarStringInRecord
-- * Internals
, takeBytes
, dropBytes
Expand Down Expand Up @@ -276,19 +276,19 @@ getCompactNullableBytes = do
-- | Record key or value
--
-- ref: https://kafka.apache.org/documentation/#record
getRecordNullableBytes :: Parser (Maybe ByteString)
getRecordNullableBytes = do
getVarBytesInRecord :: Parser (Maybe ByteString)
getVarBytesInRecord = do
!n <- fromIntegral <$!> getVarInt32
if n >= 0
then Just <$> takeBytes n
else pure Nothing
{-# INLINE getRecordNullableBytes #-}
{-# INLINE getVarBytesInRecord #-}

-- | Record header key
--
-- ref: https://kafka.apache.org/documentation/#record
getRecordString :: Parser Text
getRecordString = do
getVarStringInRecord :: Parser Text
getVarStringInRecord = do
n <- fromIntegral <$> getVarInt32
if n >= 0
then decodeUtf8 $! takeBytes n
Expand Down
29 changes: 25 additions & 4 deletions hstream-kafka/protocol/Kafka/Protocol/Encoding/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ module Kafka.Protocol.Encoding.Types
, TaggedFields (EmptyTaggedFields) -- TODO
, KaArray (..)
, CompactKaArray (..)
, RecordBytes (..)
, RecordCompactBytes (..)
, RecordKey (..)
, RecordValue (..)
, RecordHeaderKey (..)
Expand All @@ -31,6 +33,7 @@ module Kafka.Protocol.Encoding.Types
import Control.DeepSeq (NFData)
import Control.Monad
import Data.ByteString (ByteString)
import qualified Data.ByteString as BS
import Data.Int
import Data.String (IsString)
import Data.Text (Text)
Expand Down Expand Up @@ -159,6 +162,21 @@ newtype CompactKaArray a = CompactKaArray
instance Functor CompactKaArray where
fmap f (CompactKaArray xs) = CompactKaArray $ fmap f <$> xs

newtype RecordBytes = RecordBytes { unRecordBytes :: Maybe ByteString }
deriving newtype (Eq, Ord, NFData)

instance Show RecordBytes where
show (RecordBytes (Just bs)) = "<RecordBytes@" <> show (BS.length bs) <> ">"
show (RecordBytes Nothing) = "<RecordBytes@NULL>"

newtype RecordCompactBytes = RecordCompactBytes
{ unRecordCompactBytes :: Maybe ByteString }
deriving newtype (Eq, Ord, NFData)

instance Show RecordCompactBytes where
show (RecordCompactBytes (Just bs)) = "<RecordCompactBytes@" <> show (BS.length bs) <> ">"
show (RecordCompactBytes Nothing) = "<RecordCompactBytes@NULL>"

newtype RecordKey = RecordKey { unRecordKey :: Maybe ByteString }
deriving newtype (Show, Eq, Ord, NFData)

Expand Down Expand Up @@ -209,10 +227,13 @@ INSTANCE_NEWTYPE(CompactNullableString)
INSTANCE_NEWTYPE(CompactBytes)
INSTANCE_NEWTYPE(CompactNullableBytes)

INSTANCE_NEWTYPE_1(RecordKey, RecordNullableBytes)
INSTANCE_NEWTYPE_1(RecordValue, RecordNullableBytes)
INSTANCE_NEWTYPE_1(RecordHeaderKey, RecordString)
INSTANCE_NEWTYPE_1(RecordHeaderValue, RecordNullableBytes)
INSTANCE_NEWTYPE_1(RecordBytes, NullableBytes)
INSTANCE_NEWTYPE_1(RecordCompactBytes, CompactNullableBytes)

INSTANCE_NEWTYPE_1(RecordKey, VarBytesInRecord)
INSTANCE_NEWTYPE_1(RecordValue, VarBytesInRecord)
INSTANCE_NEWTYPE_1(RecordHeaderKey, VarStringInRecord)
INSTANCE_NEWTYPE_1(RecordHeaderValue, VarBytesInRecord)

instance Serializable TaggedFields where
get = do !n <- fromIntegral <$> getVarWord32
Expand Down
8 changes: 4 additions & 4 deletions hstream-kafka/protocol/Kafka/Protocol/Message/Struct.hs
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,7 @@ data PartitionDataV0 = PartitionDataV0
-- ^ The error code, or 0 if there was no fetch error.
, highWatermark :: {-# UNPACK #-} !Int64
-- ^ The current high water mark.
, recordBytes :: !NullableBytes
, recordBytes :: !RecordBytes
-- ^ The record data.
} deriving (Show, Eq, Generic)
instance Serializable PartitionDataV0
Expand Down Expand Up @@ -458,7 +458,7 @@ data PartitionDataV4 = PartitionDataV4
-- offset have been decided (ABORTED or COMMITTED)
, abortedTransactions :: !(KaArray AbortedTransactionV4)
-- ^ The aborted transactions.
, recordBytes :: !NullableBytes
, recordBytes :: !RecordBytes
-- ^ The record data.
} deriving (Show, Eq, Generic)
instance Serializable PartitionDataV4
Expand Down Expand Up @@ -488,7 +488,7 @@ data PartitionDataV5 = PartitionDataV5
-- ^ The current log start offset.
, abortedTransactions :: !(KaArray AbortedTransactionV4)
-- ^ The aborted transactions.
, recordBytes :: !NullableBytes
, recordBytes :: !RecordBytes
-- ^ The record data.
} deriving (Show, Eq, Generic)
instance Serializable PartitionDataV5
Expand Down Expand Up @@ -867,7 +867,7 @@ type OffsetFetchResponseTopicV3 = OffsetFetchResponseTopicV0
data PartitionProduceDataV0 = PartitionProduceDataV0
{ index :: {-# UNPACK #-} !Int32
-- ^ The partition index.
, recordBytes :: !NullableBytes
, recordBytes :: !RecordBytes
-- ^ The record data to be produced.
} deriving (Show, Eq, Generic)
instance Serializable PartitionProduceDataV0
Expand Down
4 changes: 2 additions & 2 deletions hstream-kafka/protocol/Kafka/Protocol/Message/Total.hs
Original file line number Diff line number Diff line change
Expand Up @@ -1641,7 +1641,7 @@ data PartitionData = PartitionData
-- ^ The error code, or 0 if there was no fetch error.
, highWatermark :: {-# UNPACK #-} !Int64
-- ^ The current high water mark.
, recordBytes :: !NullableBytes
, recordBytes :: !RecordBytes
-- ^ The record data.
, lastStableOffset :: {-# UNPACK #-} !Int64
-- ^ The last stable offset (or LSO) of the partition. This is the last
Expand Down Expand Up @@ -1731,7 +1731,7 @@ partitionDataFromV6 = partitionDataFromV5
data PartitionProduceData = PartitionProduceData
{ index :: {-# UNPACK #-} !Int32
-- ^ The partition index.
, recordBytes :: !NullableBytes
, recordBytes :: !RecordBytes
-- ^ The record data to be produced.
} deriving (Show, Eq, Generic)
instance Serializable PartitionProduceData
Expand Down
13 changes: 9 additions & 4 deletions script/kafka_gen.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@
# -----------------------------------------------------------------------------
# Constants

# e.g.
#
# {"name": "Records", "type": "records", ...}
# -> {"name": "RecordBytes", "type": "records", ...}
# -> data ... { recordBytes :: ... }
RENAMES = {"Records": "RecordBytes"}

TYPE_MAPS = {
Expand All @@ -52,26 +57,26 @@
"string": "!Text",
"bool": "Bool",
"bytes": "!ByteString",
"records": "!ByteString",
"records": None, # Records should be NULLABLE_BYTES
"array": "!(KaArray {})",
"errorCode": "{{-# UNPACK #-}} !ErrorCode",
"apiKey": "{{-# UNPACK #-}} !ApiKey",
}
NULLABLE_TYPE_MAPS = {
"string": "!NullableString",
"bytes": "!NullableBytes",
"records": "!NullableBytes",
"records": "!RecordBytes",
}
COMPACT_TYPE_MAPS = {
"string": "!CompactString",
"bytes": "!CompactBytes",
"records": "!CompactBytes",
"records": None, # Records should be NULLABLE_BYTES
"array": "!(CompactKaArray {})",
}
COMPACT_NULLABLE_TYPE_MAPS = {
"string": "!CompactNullableString",
"bytes": "!CompactNullableBytes",
"records": "!CompactNullableBytes",
"records": ..., # TODO, produce version >= 9
"array": "!(CompactKaArray {})",
}

Expand Down

0 comments on commit 2e4894f

Please sign in to comment.