Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(logging): Only display RecordBytes length instead of contents #1814

Merged
merged 3 commits into from
May 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions hstream-kafka/HStream/Kafka/Server/Handler/Consume.hs
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ handleFetch sc@ServerContext{..} reqCtx r_ = K.catchFetchResponseEx $ do
{ partitionIndex = request.partition
, errorCode = K.NONE
, highWatermark = hioffset
, recordBytes = (Just "")
, recordBytes = (K.RecordBytes $ Just "")
, lastStableOffset = (-1) -- TODO
, abortedTransactions = K.NonNullKaArray V.empty -- TODO
-- TODO: for performance reason, we don't implement
Expand Down Expand Up @@ -223,7 +223,7 @@ handleFetch sc@ServerContext{..} reqCtx r_ = K.catchFetchResponseEx $ do
{ partitionIndex = request.partition
, errorCode = K.NONE
, highWatermark = hioffset
, recordBytes = (Just bs)
, recordBytes = (K.RecordBytes $ Just bs)
, lastStableOffset = (-1) -- TODO
, abortedTransactions = K.NonNullKaArray V.empty -- TODO
-- TODO: for performance reason, we don't implement
Expand Down Expand Up @@ -572,7 +572,7 @@ errorPartitionResponse partitionIndex ec = K.PartitionData
{ partitionIndex = partitionIndex
, errorCode = ec
, highWatermark = (-1)
, recordBytes = (Just "")
, recordBytes = (K.RecordBytes $ Just "")
, lastStableOffset = (-1) -- TODO
, abortedTransactions = K.NonNullKaArray V.empty -- TODO
-- TODO: for performance reason, we don't implement logStartOffset now
Expand All @@ -585,7 +585,7 @@ partitionResponse0 partitionIndex ec hw = K.PartitionData
{ partitionIndex = partitionIndex
, errorCode = ec
, highWatermark = hw
, recordBytes = (Just "")
, recordBytes = (K.RecordBytes $ Just "")
, lastStableOffset = (-1) -- TODO
, abortedTransactions = K.NonNullKaArray V.empty -- TODO
-- TODO: for performance reason, we don't implement logStartOffset now
Expand Down
9 changes: 8 additions & 1 deletion hstream-kafka/HStream/Kafka/Server/Handler/Produce.hs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,14 @@ handleProduce ServerContext{..} _reqCtx req = do
M.totalProduceRequest
(topic.name, T.pack . show $ partition.index) $ \counter ->
void $ M.addCounter counter 1
let Just recordBytes = partition.recordBytes -- TODO: handle Nothing
let recordBytes =
fromMaybe (error "TODO: Receive empty recordBytes in ProduceRequest")
(K.unRecordBytes partition.recordBytes)
-- Trace raw record bytes of the request
--
-- Note that the Show instance of RecordBytes type will only show the
-- length of the ByteString. So here we pass the ByteString to the Log
Log.trace $ "Received recordBytes: " <> Log.buildString' (recordBytes :: ByteString)
Log.debug1 $ "Try to append to logid " <> Log.build logid
<> "(" <> Log.build partition.index <> ")"

Expand Down
16 changes: 8 additions & 8 deletions hstream-kafka/protocol/Kafka/Protocol/Encoding/Encode.hs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ module Kafka.Protocol.Encoding.Encode
, putCompactBytes
, putNullableBytes
, putCompactNullableBytes
, putRecordNullableBytes
, putRecordString
, putVarBytesInRecord
, putVarStringInRecord
) where

import Data.Bits
Expand Down Expand Up @@ -164,21 +164,21 @@ putCompactNullableBytes :: Maybe ByteString -> Builder
putCompactNullableBytes Nothing = putVarWord32 0
putCompactNullableBytes (Just x) = putCompactBytes x

-- Record key or value
-- | Record key or value
--
-- ref: https://kafka.apache.org/documentation/#record
putRecordNullableBytes :: Maybe ByteString -> Builder
putRecordNullableBytes Nothing = putVarInt32 (-1)
putRecordNullableBytes (Just bs) =
putVarBytesInRecord :: Maybe ByteString -> Builder
putVarBytesInRecord Nothing = putVarInt32 (-1)
putVarBytesInRecord (Just bs) =
let !l = BS.length bs
b = Builder (Sum (fromIntegral l)) (BB.byteStringCopy bs)
in putVarInt32 (fromIntegral l) <> b

-- | Record header key
--
-- ref: https://kafka.apache.org/documentation/#record
putRecordString :: Text -> Builder
putRecordString x =
putVarStringInRecord :: Text -> Builder
putVarStringInRecord x =
let !bs = Text.encodeUtf8 x
!l = BS.length bs
b = Builder (Sum (fromIntegral l)) (BB.byteStringCopy bs)
Expand Down
14 changes: 7 additions & 7 deletions hstream-kafka/protocol/Kafka/Protocol/Encoding/Parser.hs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ module Kafka.Protocol.Encoding.Parser
, getCompactBytes
, getNullableBytes
, getCompactNullableBytes
, getRecordNullableBytes
, getRecordString
, getVarBytesInRecord
, getVarStringInRecord
-- * Internals
, takeBytes
, dropBytes
Expand Down Expand Up @@ -276,19 +276,19 @@ getCompactNullableBytes = do
-- | Record key or value
--
-- ref: https://kafka.apache.org/documentation/#record
getRecordNullableBytes :: Parser (Maybe ByteString)
getRecordNullableBytes = do
getVarBytesInRecord :: Parser (Maybe ByteString)
getVarBytesInRecord = do
!n <- fromIntegral <$!> getVarInt32
if n >= 0
then Just <$> takeBytes n
else pure Nothing
{-# INLINE getRecordNullableBytes #-}
{-# INLINE getVarBytesInRecord #-}

-- | Record header key
--
-- ref: https://kafka.apache.org/documentation/#record
getRecordString :: Parser Text
getRecordString = do
getVarStringInRecord :: Parser Text
getVarStringInRecord = do
n <- fromIntegral <$> getVarInt32
if n >= 0
then decodeUtf8 $! takeBytes n
Expand Down
29 changes: 25 additions & 4 deletions hstream-kafka/protocol/Kafka/Protocol/Encoding/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ module Kafka.Protocol.Encoding.Types
, TaggedFields (EmptyTaggedFields) -- TODO
, KaArray (..)
, CompactKaArray (..)
, RecordBytes (..)
, RecordCompactBytes (..)
, RecordKey (..)
, RecordValue (..)
, RecordHeaderKey (..)
Expand All @@ -31,6 +33,7 @@ module Kafka.Protocol.Encoding.Types
import Control.DeepSeq (NFData)
import Control.Monad
import Data.ByteString (ByteString)
import qualified Data.ByteString as BS
import Data.Int
import Data.String (IsString)
import Data.Text (Text)
Expand Down Expand Up @@ -159,6 +162,21 @@ newtype CompactKaArray a = CompactKaArray
instance Functor CompactKaArray where
fmap f (CompactKaArray xs) = CompactKaArray $ fmap f <$> xs

newtype RecordBytes = RecordBytes { unRecordBytes :: Maybe ByteString }
deriving newtype (Eq, Ord, NFData)

instance Show RecordBytes where
show (RecordBytes (Just bs)) = "<RecordBytes@" <> show (BS.length bs) <> ">"
show (RecordBytes Nothing) = "<RecordBytes@NULL>"

newtype RecordCompactBytes = RecordCompactBytes
{ unRecordCompactBytes :: Maybe ByteString }
deriving newtype (Eq, Ord, NFData)

instance Show RecordCompactBytes where
show (RecordCompactBytes (Just bs)) = "<RecordCompactBytes@" <> show (BS.length bs) <> ">"
show (RecordCompactBytes Nothing) = "<RecordCompactBytes@NULL>"

newtype RecordKey = RecordKey { unRecordKey :: Maybe ByteString }
deriving newtype (Show, Eq, Ord, NFData)

Expand Down Expand Up @@ -209,10 +227,13 @@ INSTANCE_NEWTYPE(CompactNullableString)
INSTANCE_NEWTYPE(CompactBytes)
INSTANCE_NEWTYPE(CompactNullableBytes)

INSTANCE_NEWTYPE_1(RecordKey, RecordNullableBytes)
INSTANCE_NEWTYPE_1(RecordValue, RecordNullableBytes)
INSTANCE_NEWTYPE_1(RecordHeaderKey, RecordString)
INSTANCE_NEWTYPE_1(RecordHeaderValue, RecordNullableBytes)
INSTANCE_NEWTYPE_1(RecordBytes, NullableBytes)
INSTANCE_NEWTYPE_1(RecordCompactBytes, CompactNullableBytes)

INSTANCE_NEWTYPE_1(RecordKey, VarBytesInRecord)
INSTANCE_NEWTYPE_1(RecordValue, VarBytesInRecord)
INSTANCE_NEWTYPE_1(RecordHeaderKey, VarStringInRecord)
INSTANCE_NEWTYPE_1(RecordHeaderValue, VarBytesInRecord)

instance Serializable TaggedFields where
get = do !n <- fromIntegral <$> getVarWord32
Expand Down
8 changes: 4 additions & 4 deletions hstream-kafka/protocol/Kafka/Protocol/Message/Struct.hs
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,7 @@ data PartitionDataV0 = PartitionDataV0
-- ^ The error code, or 0 if there was no fetch error.
, highWatermark :: {-# UNPACK #-} !Int64
-- ^ The current high water mark.
, recordBytes :: !NullableBytes
, recordBytes :: !RecordBytes
-- ^ The record data.
} deriving (Show, Eq, Generic)
instance Serializable PartitionDataV0
Expand Down Expand Up @@ -458,7 +458,7 @@ data PartitionDataV4 = PartitionDataV4
-- offset have been decided (ABORTED or COMMITTED)
, abortedTransactions :: !(KaArray AbortedTransactionV4)
-- ^ The aborted transactions.
, recordBytes :: !NullableBytes
, recordBytes :: !RecordBytes
-- ^ The record data.
} deriving (Show, Eq, Generic)
instance Serializable PartitionDataV4
Expand Down Expand Up @@ -488,7 +488,7 @@ data PartitionDataV5 = PartitionDataV5
-- ^ The current log start offset.
, abortedTransactions :: !(KaArray AbortedTransactionV4)
-- ^ The aborted transactions.
, recordBytes :: !NullableBytes
, recordBytes :: !RecordBytes
-- ^ The record data.
} deriving (Show, Eq, Generic)
instance Serializable PartitionDataV5
Expand Down Expand Up @@ -867,7 +867,7 @@ type OffsetFetchResponseTopicV3 = OffsetFetchResponseTopicV0
data PartitionProduceDataV0 = PartitionProduceDataV0
{ index :: {-# UNPACK #-} !Int32
-- ^ The partition index.
, recordBytes :: !NullableBytes
, recordBytes :: !RecordBytes
-- ^ The record data to be produced.
} deriving (Show, Eq, Generic)
instance Serializable PartitionProduceDataV0
Expand Down
4 changes: 2 additions & 2 deletions hstream-kafka/protocol/Kafka/Protocol/Message/Total.hs
Original file line number Diff line number Diff line change
Expand Up @@ -1641,7 +1641,7 @@ data PartitionData = PartitionData
-- ^ The error code, or 0 if there was no fetch error.
, highWatermark :: {-# UNPACK #-} !Int64
-- ^ The current high water mark.
, recordBytes :: !NullableBytes
, recordBytes :: !RecordBytes
-- ^ The record data.
, lastStableOffset :: {-# UNPACK #-} !Int64
-- ^ The last stable offset (or LSO) of the partition. This is the last
Expand Down Expand Up @@ -1731,7 +1731,7 @@ partitionDataFromV6 = partitionDataFromV5
data PartitionProduceData = PartitionProduceData
{ index :: {-# UNPACK #-} !Int32
-- ^ The partition index.
, recordBytes :: !NullableBytes
, recordBytes :: !RecordBytes
-- ^ The record data to be produced.
} deriving (Show, Eq, Generic)
instance Serializable PartitionProduceData
Expand Down
13 changes: 9 additions & 4 deletions script/kafka_gen.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@
# -----------------------------------------------------------------------------
# Constants

# e.g.
#
# {"name": "Records", "type": "records", ...}
# -> {"name": "RecordBytes", "type": "records", ...}
# -> data ... { recordBytes :: ... }
RENAMES = {"Records": "RecordBytes"}

TYPE_MAPS = {
Expand All @@ -52,26 +57,26 @@
"string": "!Text",
"bool": "Bool",
"bytes": "!ByteString",
"records": "!ByteString",
"records": None, # Records should be NULLABLE_BYTES
"array": "!(KaArray {})",
"errorCode": "{{-# UNPACK #-}} !ErrorCode",
"apiKey": "{{-# UNPACK #-}} !ApiKey",
}
NULLABLE_TYPE_MAPS = {
"string": "!NullableString",
"bytes": "!NullableBytes",
"records": "!NullableBytes",
"records": "!RecordBytes",
}
COMPACT_TYPE_MAPS = {
"string": "!CompactString",
"bytes": "!CompactBytes",
"records": "!CompactBytes",
"records": None, # Records should be NULLABLE_BYTES
"array": "!(CompactKaArray {})",
}
COMPACT_NULLABLE_TYPE_MAPS = {
"string": "!CompactNullableString",
"bytes": "!CompactNullableBytes",
"records": "!CompactNullableBytes",
"records": ..., # TODO, produce version >= 9
"array": "!(CompactKaArray {})",
}

Expand Down
Loading