diff --git a/hstream-kafka/HStream/Kafka/Common/FetchManager.hs b/hstream-kafka/HStream/Kafka/Common/FetchManager.hs index 396374bd4..e708c7120 100644 --- a/hstream-kafka/HStream/Kafka/Common/FetchManager.hs +++ b/hstream-kafka/HStream/Kafka/Common/FetchManager.hs @@ -24,7 +24,7 @@ import qualified HStream.Store as S data FetchLogContext = FetchLogContext { nextOffset :: Int64 -- ^ Expect next offset to be fetched - , remRecords :: Vector K.RecordFormat + , remRecords :: Vector K.Record -- ^ Remaining records of the batch } deriving (Show) diff --git a/hstream-kafka/HStream/Kafka/Common/RecordFormat.hs b/hstream-kafka/HStream/Kafka/Common/RecordFormat.hs index c05b634f4..967fd7ac7 100644 --- a/hstream-kafka/HStream/Kafka/Common/RecordFormat.hs +++ b/hstream-kafka/HStream/Kafka/Common/RecordFormat.hs @@ -1,8 +1,10 @@ module HStream.Kafka.Common.RecordFormat - ( RecordFormat (..) + ( Record (..) + , RecordFormat (..) , recordBytesSize -- * Helpers , seekMessageSet + , trySeekMessageSet ) where import Control.Monad @@ -11,8 +13,18 @@ import qualified Data.ByteString as BS import Data.Int import GHC.Generics (Generic) +import qualified HStream.Logger as Log +import qualified HStream.Store as S import qualified Kafka.Protocol.Encoding as K +-- | Record is the smallest unit of data in HStream Kafka. +-- +-- For Fetch handler +data Record = Record + { recordFormat :: !RecordFormat + , recordLsn :: !S.LSN + } deriving (Show) + -- on-disk format data RecordFormat = RecordFormat { version :: {-# UNPACK #-} !Int8 @@ -45,3 +57,24 @@ seekMessageSet i bs{- MessageSet data -} = void $ K.takeBytes (fromIntegral len) in snd <$> K.runParser' parser bs {-# INLINE seekMessageSet #-} + +-- | Try to bypass the records if the fetch offset is not the first record +-- in the batch. +trySeekMessageSet + :: Record -- ^ The first record in the batch + -> Int64 -- ^ The fetch offset + -> IO (ByteString, S.LSN) +trySeekMessageSet r fetchOffset = do + let bytesOnDisk = K.unCompactBytes r.recordFormat.recordBytes + magic <- K.decodeRecordMagic bytesOnDisk + fstRecordBytes <- + if magic >= 2 + then pure bytesOnDisk + else do + let absStartOffset = r.recordFormat.offset + 1 - fromIntegral r.recordFormat.batchLength + offset = fetchOffset - absStartOffset + if offset > 0 + then do Log.debug1 $ "Seek MessageSet " <> Log.build offset + seekMessageSet (fromIntegral offset) bytesOnDisk + else pure bytesOnDisk + pure (fstRecordBytes, r.recordLsn) diff --git a/hstream-kafka/HStream/Kafka/Server/Handler/Consume.hs b/hstream-kafka/HStream/Kafka/Server/Handler/Consume.hs index d2538be4e..8f737007f 100644 --- a/hstream-kafka/HStream/Kafka/Server/Handler/Consume.hs +++ b/hstream-kafka/HStream/Kafka/Server/Handler/Consume.hs @@ -47,14 +47,14 @@ type RecordTable = VS.MVector S.C_LogID V.MVector - (Vector K.RecordFormat, GV.Growing Vector GV.RealWorld K.RecordFormat) + (Vector K.Record, GV.Growing Vector GV.RealWorld K.Record) data LsnData = LsnData S.LSN S.LSN Int64 -- ^ (startLsn, tailLsn, highwaterOffset) -- -- NOTE: tailLsn is LSN_INVALID if the partition is empty - | ContReading (Vector K.RecordFormat) Int64 + | ContReading (Vector K.Record) Int64 -- ^ (remRecords, highwaterOffset) -- -- Continue reading, do not need to start reading @@ -202,7 +202,7 @@ handleFetch sc@ServerContext{..} reqCtx r_ = K.catchFetchResponseEx $ do M.withLabel M.topicTotalSendBytes partLabel $ \counter -> void $ M.addCounter counter (fromIntegral $ BS.length bs) M.withLabel M.topicTotalSendMessages partLabel $ \counter -> void $ do - let totalRecords = V.sum $ V.map (\K.RecordFormat{..} -> batchLength) v + let totalRecords = V.sum $ V.map (.recordFormat.batchLength) v M.addCounter counter (fromIntegral totalRecords) -- PartitionData pure $ K.PartitionData @@ -389,7 +389,7 @@ readMode1 r storageOpts reader = do case p.elsn of ContReading remRecords _ -> do void $ atomicFetchAddFastMut mutRemSize $ V.sum $ - V.map (BS.length . K.unCompactBytes . (.recordBytes)) remRecords + V.map (BS.length . K.unCompactBytes . (.recordFormat.recordBytes)) remRecords -- [TAG_NEV]: Make sure do not insert empty vector to the table, -- since we will assume the vector is non-empty in `encodePartition` unless (V.null remRecords) $ @@ -435,7 +435,7 @@ readMode1 r storageOpts reader = do insertRecords recordTable rs pure recordTable - insertRemRecords :: RecordTable -> S.C_LogID -> Vector K.RecordFormat -> IO () + insertRemRecords :: RecordTable -> S.C_LogID -> Vector K.Record -> IO () insertRemRecords table logid records = do (rv, v) <- maybe ((V.empty, ) <$> GV.new) pure =<< (HT.lookup table logid) HT.insert table logid (rv <> records, v) @@ -446,7 +446,7 @@ readMode1 r storageOpts reader = do recordFormat <- K.runGet @K.RecordFormat record.recordPayload let logid = record.recordAttr.recordAttrLogID (rv, v) <- maybe ((V.empty, ) <$> GV.new) pure =<< (HT.lookup table logid) - v' <- GV.append v recordFormat + v' <- GV.append v (K.Record recordFormat (record.recordAttr.recordAttrLSN)) HT.insert table logid (rv, v') -- In kafka broker, regarding the format on disk, the broker will return @@ -466,7 +466,7 @@ encodePartition :: FastMutInt -> FastMutInt -> K.FetchPartition - -> Vector K.RecordFormat + -> Vector K.Record -> IO (ByteString, Maybe Int64, Int) -- ^ (encoded bytes, next offset, taken vector index) -- @@ -478,8 +478,18 @@ encodePartition mutMaxBytes mutIsFirstPartition p v = do where doEncode maxBytes = do isFristPartition <- readFastMutInt mutIsFirstPartition - (fstRecordBytes, vs) <- trySeek - let fstLen = BS.length fstRecordBytes + let (fstRecord :: K.Record, vs) = + -- [TAG_NEV]: This should not be Nothing, because if we found the + -- key in `readRecords`, it means we have at least one record in + -- this. + fromMaybe (error "LogicError: got empty vector value") + (V.uncons v) + -- NOTE: since we don't support RecordBatch version < 2, we don't need to + -- seek the MessageSet. + -- + -- Also see 'HStream.Kafka.Common.RecordFormat.trySeekMessageSet' + let fstRecordBytes = K.unCompactBytes fstRecord.recordFormat.recordBytes + fstLen = BS.length fstRecordBytes if isFristPartition == 1 -- First partition then do @@ -508,7 +518,7 @@ encodePartition mutMaxBytes mutIsFirstPartition p v = do (bb, lastOffset', takenVecIdx) <- vecFoldWhileM vs (BB.byteString fstBs, Left fstBs, 0) $ \(b, lb, i) r -> do -- FIXME: Does this possible be multiple BatchRecords? - let rbs = K.unCompactBytes r.recordBytes + let rbs = K.unCompactBytes r.recordFormat.recordBytes rlen = BS.length rbs curMaxBytes <- atomicFetchAddFastMut mutMaxBytes (-rlen) curPartMaxBytes <- atomicFetchAddFastMut mutPartitionMaxBytes (-rlen) @@ -529,30 +539,6 @@ encodePartition mutMaxBytes mutIsFirstPartition p v = do pure (BS.toStrict $ BB.toLazyByteString bb, lastOffset, takenVecIdx) - -- Try to bypass the records if the fetch offset is not the first record - -- in the batch. - trySeek = do - let (fstRecord :: K.RecordFormat, vs) = - -- [TAG_NEV]: This should not be Nothing, because if we found the - -- key in `readRecords`, it means we have at least one record in - -- this. - fromMaybe (error "LogicError: got empty vector value") - (V.uncons v) - bytesOnDisk = K.unCompactBytes fstRecord.recordBytes - -- only the first MessageSet need to to this seeking - magic <- K.decodeRecordMagic bytesOnDisk - fstRecordBytes <- - if | magic >= 2 -> pure bytesOnDisk - | otherwise -> do - let absStartOffset = fstRecord.offset + 1 - fromIntegral fstRecord.batchLength - offset = p.fetchOffset - absStartOffset - if offset > 0 - then do - Log.debug1 $ "Seek MessageSet " <> Log.build offset - K.seekMessageSet (fromIntegral offset) bytesOnDisk - else pure bytesOnDisk - pure (fstRecordBytes, vs) - errorPartitionResponse :: Int32 -> K.ErrorCode -> K.PartitionData errorPartitionResponse partitionIndex ec = K.PartitionData { partitionIndex = partitionIndex diff --git a/hstream-kafka/HStream/Kafka/Server/Handler/Produce.hs b/hstream-kafka/HStream/Kafka/Server/Handler/Produce.hs index 23ef1045f..2fe9f4215 100644 --- a/hstream-kafka/HStream/Kafka/Server/Handler/Produce.hs +++ b/hstream-kafka/HStream/Kafka/Server/Handler/Produce.hs @@ -95,12 +95,9 @@ handleProduce ServerContext{..} _reqCtx req = do -- -- Currently, only support LogAppendTime catches (do - (S.AppendCompletion{..}, offset) <- + (appendCompTimestamp, offset) <- appendRecords True scLDClient scOffsetManager (topic.name, partition.index) logid recordBytes - Log.debug1 $ "Append done " <> Log.build appendCompLogID - <> ", lsn: " <> Log.build appendCompLSN - <> ", start offset: " <> Log.build offset pure $ K.PartitionProduceResponse { index = partition.index , errorCode = K.NONE @@ -148,7 +145,7 @@ appendRecords -> (Text, Int32) -> Word64 -> ByteString - -> IO (S.AppendCompletion, Int64) + -> IO (Int64, Int64) -- ^ Return (logAppendTimeMs, baseOffset) appendRecords shouldValidateCrc ldclient om (streamName, partition) logid bs = do batch <- K.decodeRecordBatch shouldValidateCrc bs let batchLength = batch.recordsCount @@ -196,7 +193,10 @@ appendRecords shouldValidateCrc ldclient om (streamName, partition) logid bs = d void $ M.addCounter counter (fromIntegral $ BS.length storedRecord) M.withLabel M.topicTotalAppendMessages partLabel $ \counter -> void $ M.addCounter counter (fromIntegral batchLength) - pure (r, startOffset) + Log.debug1 $ "Append done " <> Log.build r.appendCompLogID + <> ", lsn: " <> Log.build r.appendCompLSN + <> ", start offset: " <> Log.build startOffset + pure (r.appendCompTimestamp, startOffset) -- TODO: performance improvements --