Skip to content

Commit

Permalink
hstream-kafka: decoding RecordBatch by magic value (#1600)
Browse files Browse the repository at this point in the history
  • Loading branch information
4eUeP authored Sep 12, 2023
1 parent 75cf875 commit 55a53ee
Show file tree
Hide file tree
Showing 3 changed files with 185 additions and 30 deletions.
1 change: 1 addition & 0 deletions common/kafka/hstream-kafka.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ library
build-depends:
, base >=4.11 && <5
, bytestring
, digest
, hstream-common-base
, network
, text
Expand Down
213 changes: 183 additions & 30 deletions common/kafka/src/Kafka/Protocol/Encoding.hs
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,27 @@ module Kafka.Protocol.Encoding
, CompactNullableBytes (..)
, KaArray
, CompactKaArray (..)
-- ** Records
, LegacyRecordV0 (..)
, LegacyRecordV1 (..)
, decodeLegacyRecordBatch
-- * Records
, BatchRecord (..)
, decodeBatchRecords
, encodeBatchRecords
, encodeBatchRecordsLazy
, RecordV0 (..)
, RecordV1 (..)
, RecordV2 (..)
, RecordBatch (..)
, RecordKey (..)
, RecordValue (..)
, RecordBatch (..)
, RecordArray (..)
, RecordHeader
, RecordHeaderKey (..)
, RecordHeaderValue (..)
-- ** Misc
, decodeLegacyRecordBatch
-- * Internals
, Parser
, runParser
, runParser'
, Result (..)
, Builder
, toLazyByteString
Expand All @@ -42,7 +52,9 @@ module Kafka.Protocol.Encoding
import Control.Exception
import Control.Monad
import Data.ByteString (ByteString)
import qualified Data.ByteString as BS
import qualified Data.ByteString.Lazy as BL
import Data.Digest.CRC32 (crc32)
import Data.Int
import Data.String (IsString)
import Data.Text (Text)
Expand Down Expand Up @@ -101,23 +113,24 @@ newtype DecodeError = DecodeError String

instance Exception DecodeError

runGet :: Serializable a => ByteString -> IO a
runGet bs = do
result <- runParser get bs
runParser' :: Parser r -> ByteString -> IO (r, ByteString)
runParser' parser bs = do
result <- runParser parser bs
case result of
Done "" r -> pure r
Done l _ -> throwIO $ DecodeError $ "Done, but left " <> show l
Done l r -> pure (r, l)
Fail _ err -> throwIO $ DecodeError $ "Fail, " <> err
More _ -> throwIO $ DecodeError "Need more"
{-# INLINE runParser' #-}

runGet :: Serializable a => ByteString -> IO a
runGet bs = do
(r, l) <- runParser' get bs
if BS.null l then pure r
else throwIO $ DecodeError $ "Done, but left " <> show l
{-# INLINE runGet #-}

runGet' :: Serializable a => ByteString -> IO (a, ByteString)
runGet' bs = do
result <- runParser get bs
case result of
Done l r -> pure (r, l)
Fail _ err -> throwIO $ DecodeError $ "Fail, " <> err
More _ -> throwIO $ DecodeError "Need more"
runGet' = runParser' get
{-# INLINE runGet' #-}

runPutLazy :: Serializable a => a -> BL.ByteString
Expand Down Expand Up @@ -235,11 +248,143 @@ instance Serializable a => Serializable (RecordArray a) where
{-# INLINE put #-}

-------------------------------------------------------------------------------
-- LegacyRecordBatch
-- Records

data BatchRecord
= BatchRecordV0 RecordV0
| BatchRecordV1 RecordV1
| BatchRecordV2 RecordBatch
deriving (Show)

putBatchRecord :: BatchRecord -> Builder
putBatchRecord (BatchRecordV0 r) = put r
putBatchRecord (BatchRecordV1 r) = put r
putBatchRecord (BatchRecordV2 r) = put r

-- Internal type to help parse all Record version.
--
-- Common Record base for all versions.
data RecordBase = RecordBase
{ baseOffset :: {-# UNPACK #-} !Int64
, batchLength :: {-# UNPACK #-} !Int32
, partitionLeaderEpochOrCrc :: {-# UNPACK #-} !Int32
-- ^ For version 0-1, this is the CRC32 of the remainder of the record.
-- For version 2, this is the partition leader epoch.
, magic :: {-# UNPACK #-} !Int8
} deriving (Generic, Show)

instance Serializable RecordBase

-- Internal type to help parse all Record version.
--
-- RecordV0 = RecordBase + RecordBodyV0
data RecordBodyV0 = RecordBodyV0
{ attributes :: {-# UNPACK #-} !Int8
, key :: !NullableBytes
, value :: !NullableBytes
} deriving (Generic, Show)

instance Serializable RecordBodyV0

-- Internal type to help parse all Record version.
--
-- RecordV1 = RecordBase + RecordBodyV1
data RecordBodyV1 = RecordBodyV1
{ attributes :: {-# UNPACK #-} !Int8
, timestamp :: {-# UNPACK #-} !Int64
, key :: !NullableBytes
, value :: !NullableBytes
} deriving (Generic, Show)

instance Serializable RecordBodyV1

-- Internal type to help parse all Record version.
--
-- RecordBatch = RecordBase + CRC32 + RecordBodyV2
data RecordBodyV2 = RecordBodyV2
{ attributes :: {-# UNPACK #-} !Int16
, lastOffsetDelta :: {-# UNPACK #-} !Int32
, baseTimestamp :: {-# UNPACK #-} !Int64
, maxTimestamp :: {-# UNPACK #-} !Int64
, producerId :: {-# UNPACK #-} !Int64
, producerEpoch :: {-# UNPACK #-} !Int16
, baseSequence :: {-# UNPACK #-} !Int32
, records :: !(KaArray RecordV2)
} deriving (Generic, Show)

instance Serializable RecordBodyV2

decodeBatchRecords :: Bool -> ByteString -> IO (Vector BatchRecord)
decodeBatchRecords shouldValidateCrc batchBs = Growing.new >>= decode batchBs
where
decode "" !v = Growing.unsafeFreeze v
decode !bs !v = do
(RecordBase{..}, bs') <- runGet' @RecordBase bs
case magic of
0 -> do let crc = partitionLeaderEpochOrCrc
messageSize = batchLength
when (messageSize < fromIntegral minRecordSizeV0) $
throwIO $ DecodeError $ "Invalid messageSize"
when shouldValidateCrc $ do
-- NOTE: pass the origin inputs to validLegacyCrc, not the bs'
validLegacyCrc (fromIntegral batchLength) crc bs
(RecordBodyV0{..}, remainder) <- runGet' @RecordBodyV0 bs'
!v' <- Growing.append v (BatchRecordV0 RecordV0{..})
decode remainder v'
1 -> do let crc = partitionLeaderEpochOrCrc
messageSize = batchLength
when (messageSize < fromIntegral minRecordSizeV1) $
throwIO $ DecodeError $ "Invalid messageSize"
when shouldValidateCrc $ do
-- NOTE: pass the origin inputs to validLegacyCrc, not the bs'
validLegacyCrc (fromIntegral batchLength) crc bs
(RecordBodyV1{..}, remainder) <- runGet' @RecordBodyV1 bs'
!v' <- Growing.append v (BatchRecordV1 RecordV1{..})
decode remainder v'
2 -> do let partitionLeaderEpoch = partitionLeaderEpochOrCrc
(crc, bs'') <- runGet' @Int32 bs'
when (shouldValidateCrc && fromIntegral (crc32 bs'') /= crc) $
throwIO $ DecodeError "Invalid CRC32"
(RecordBodyV2{..}, remainder) <- runGet' @RecordBodyV2 bs'
!v' <- Growing.append v (BatchRecordV2 RecordBatch{..})
decode remainder v'
_ -> throwIO $ DecodeError $ "Invalid magic " <> show magic
{-# INLINABLE decodeBatchRecords #-}

validLegacyCrc :: Int -> Int32 -> ByteString -> IO ()
validLegacyCrc batchLength crc bs = do
crcPayload <- getLegacyCrcPayload batchLength bs
when (fromIntegral (crc32 crcPayload) /= crc) $
throwIO $ DecodeError "Invalid CRC32"
{-# INLINE validLegacyCrc #-}

getLegacyCrcPayload :: Int -> ByteString -> IO ByteString
getLegacyCrcPayload msgSize bs =
let parser = do void $ takeBytes 16 -- [offset(8) message_size(4) crc(4) ...]
takeBytes (msgSize - 4)
in fst <$> runParser' parser bs
{-# INLINE getLegacyCrcPayload #-}

encodeBatchRecordsLazy :: Vector BatchRecord -> BL.ByteString
encodeBatchRecordsLazy rs =
let builder = V.foldl' (\s x -> s <> putBatchRecord x) mempty rs
in toLazyByteString builder
{-# INLINABLE encodeBatchRecordsLazy #-}

encodeBatchRecords :: Vector BatchRecord -> ByteString
encodeBatchRecords = BL.toStrict . encodeBatchRecordsLazy
{-# INLINABLE encodeBatchRecords #-}

-------------------------------------------------------------------------------
-- LegacyRecord(MessageSet): v0-1
--
-- https://kafka.apache.org/documentation/#messageset
--
-- In versions prior to Kafka 0.10, the only supported message format version
-- (which is indicated in the magic value) was 0. Message format version 1 was
-- introduced with timestamp support in version 0.10.

data LegacyRecordV0 = LegacyRecordV0
data RecordV0 = RecordV0
{ baseOffset :: {-# UNPACK #-} !Int64
, messageSize :: {-# UNPACK #-} !Int32
, crc :: {-# UNPACK #-} !Int32
Expand All @@ -249,9 +394,13 @@ data LegacyRecordV0 = LegacyRecordV0
, value :: !NullableBytes
} deriving (Generic, Show)

instance Serializable LegacyRecordV0
instance Serializable RecordV0

minRecordSizeV0 :: Int
minRecordSizeV0 =
4{- crc -} + 1{- magic -} + 1{- attributes -} + 4{- key -} + 4{- value -}

data LegacyRecordV1 = LegacyRecordV1
data RecordV1 = RecordV1
{ baseOffset :: {-# UNPACK #-} !Int64
, messageSize :: {-# UNPACK #-} !Int32
, crc :: {-# UNPACK #-} !Int32
Expand All @@ -262,12 +411,17 @@ data LegacyRecordV1 = LegacyRecordV1
, value :: !NullableBytes
} deriving (Generic, Show)

instance Serializable LegacyRecordV1
instance Serializable RecordV1

minRecordSizeV1 :: Int
minRecordSizeV1 =
4{- crc -} + 1{- magic -} + 1{- attributes -}
+ 8{- timestamp -} + 4{- key -} + 4{- value -}

class Serializable a => LegacyRecord a where

instance LegacyRecord LegacyRecordV0
instance LegacyRecord LegacyRecordV1
instance LegacyRecord RecordV0
instance LegacyRecord RecordV1

-- Note that although message sets are represented as an array, they are not
-- preceded by an int32 array size like other array elements in the protocol.
Expand All @@ -280,19 +434,18 @@ decodeLegacyRecordBatch batchBs = Growing.new >>= decode batchBs
!v' <- Growing.append v r
decode l v'

encodeLegacyRecordBatch :: (LegacyRecord a) => Vector a -> ByteString
encodeLegacyRecordBatch = undefined

-------------------------------------------------------------------------------
-- RecordBatch
-- RecordBatch: v2
--
-- Ref: https://kafka.apache.org/documentation/#recordbatch
--
-- Introduced in Kafka 0.11.0

type RecordHeader = (RecordHeaderKey, RecordHeaderValue)

instance Serializable RecordHeader

data Record = Record
data RecordV2 = RecordV2
{ length :: {-# UNPACK #-} !VarInt32
, attributes :: {-# UNPACK #-} !Int8
, timestampDelta :: {-# UNPACK #-} !VarInt64
Expand All @@ -302,7 +455,7 @@ data Record = Record
, headers :: !(RecordArray RecordHeader)
} deriving (Generic, Show)

instance Serializable Record
instance Serializable RecordV2

data RecordBatch = RecordBatch
{ baseOffset :: {-# UNPACK #-} !Int64
Expand All @@ -317,7 +470,7 @@ data RecordBatch = RecordBatch
, producerId :: {-# UNPACK #-} !Int64
, producerEpoch :: {-# UNPACK #-} !Int16
, baseSequence :: {-# UNPACK #-} !Int32
, records :: !(KaArray Record)
, records :: !(KaArray RecordV2)
} deriving (Generic, Show)

instance Serializable RecordBatch
Expand Down
1 change: 1 addition & 0 deletions common/kafka/src/Kafka/Protocol/Encoding/Parser.hs
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,7 @@ instance MonadFail Parser where
-- | Run parser with the input
runParser :: Parser r -> ByteString -> IO (Result r)
runParser s bs = run s bs (\b r -> pure $! Done b r)
{-# INLINE runParser #-}

-- | Parser continuation
type Next a r = ByteString -> a -> IO (Result r)
Expand Down

0 comments on commit 55a53ee

Please sign in to comment.