Skip to content

Commit

Permalink
fix: resolve error processing Kafka on-disk record format (#1730)
Browse files Browse the repository at this point in the history
  • Loading branch information
4eUeP authored Jan 5, 2024
1 parent b8e4d72 commit c9f41ae
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 13 deletions.
31 changes: 20 additions & 11 deletions hstream-kafka/HStream/Kafka/Common/RecordFormat.hs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
module HStream.Kafka.Common.RecordFormat
( RecordFormat (..)
, seekBatch
, recordBytesSize
-- * Helpers
, seekMessageSet
) where

import Control.Monad
Expand All @@ -15,22 +16,30 @@ import qualified Kafka.Protocol.Encoding as K
-- on-disk format
data RecordFormat = RecordFormat
{ version :: {-# UNPACK #-} !Int8
-- ^ Currently, the version is always 0.
, offset :: {-# UNPACK #-} !Int64
-- ^ Max offset in the batch
, batchLength :: {-# UNPACK #-} !Int32
, recordBytes :: !K.CompactBytes
-- ^ Total number of records in the batch.
, recordBytes :: {-# UNPACK #-} !K.CompactBytes
-- ^ The BatchRecords data.
} deriving (Generic, Show)

instance K.Serializable RecordFormat

seekBatch :: Int32 -> ByteString -> IO ByteString
seekBatch i bs =
-- Real size of the recordBytes.
recordBytesSize :: ByteString -> Int
recordBytesSize bs = BS.length bs - 13{- 1(version) + 8(offset) + 4(batchLength) -}
{-# INLINE recordBytesSize #-}

-- Only MessageSet need to be seeked.
--
-- https://kafka.apache.org/documentation/#messageset
seekMessageSet :: Int32 -> ByteString -> IO ByteString
seekMessageSet i bs{- MessageSet data -} =
let parser = replicateM_ (fromIntegral i) $ do
void $ K.takeBytes 9{- version(1) + offset(8) -}
len <- K.get @Int32
void $ K.takeBytes 8{- MessageSet.offset(8) -}
len <- K.get @Int32 {- MessageSet.message_size(4) -}
void $ K.takeBytes (fromIntegral len)
in snd <$> K.runParser' parser bs
{-# INLINE seekBatch #-}

recordBytesSize :: ByteString -> Int
recordBytesSize bs = BS.length bs - 12{- 8(baseOffset) + 4(batchLength) -}
{-# INLINE recordBytesSize #-}
{-# INLINE seekMessageSet #-}
4 changes: 3 additions & 1 deletion hstream-kafka/HStream/Kafka/Server/Handler/Consume.hs
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,9 @@ encodePartition mutMaxBytes mutIsFirstPartition p v = do
let absStartOffset = rf.offset + 1 - fromIntegral rf.batchLength
offset = p.fetchOffset - absStartOffset
if offset > 0
then K.seekBatch (fromIntegral offset) bytesOnDisk
then do
Log.debug1 $ "Seek MessageSet " <> Log.build offset
K.seekMessageSet (fromIntegral offset) bytesOnDisk
else pure bytesOnDisk
pure (fstRecordBytes, vs)

Expand Down
5 changes: 4 additions & 1 deletion hstream-kafka/HStream/Kafka/Server/Handler/Produce.hs
Original file line number Diff line number Diff line change
Expand Up @@ -140,9 +140,12 @@ appendRecords shouldValidateCrc ldclient om (streamName, partition) logid bs = d
storedBs = K.encodeBatchRecords records'
-- FIXME unlikely overflow: convert batchLength from Int to Int32
storedRecord = K.runPut $ K.RecordFormat 0{- version -}
o (fromIntegral batchLength)
o
(fromIntegral batchLength)
(K.CompactBytes storedBs)
Log.debug1 $ "Append key " <> Log.buildString' appendKey
<> ", write offset " <> Log.build o
<> ", batch length " <> Log.build batchLength
r <- M.observeWithLabel M.topicWriteStoreLatency streamName $
S.appendCompressedBS ldclient logid storedRecord S.CompressionNone
appendAttrs
Expand Down

0 comments on commit c9f41ae

Please sign in to comment.