diff --git a/hstream-kafka/HStream/Kafka/Common/FetchManager.hs b/hstream-kafka/HStream/Kafka/Common/FetchManager.hs index e708c7120..2db49daac 100644 --- a/hstream-kafka/HStream/Kafka/Common/FetchManager.hs +++ b/hstream-kafka/HStream/Kafka/Common/FetchManager.hs @@ -22,9 +22,9 @@ import qualified HStream.Kafka.Common.RecordFormat as K import qualified HStream.Store as S data FetchLogContext = FetchLogContext - { nextOffset :: Int64 + { expectedOffset :: Int64 -- ^ Expect next offset to be fetched - , remRecords :: Vector K.Record + , remRecords :: Vector K.Record -- ^ Remaining records of the batch } deriving (Show) diff --git a/hstream-kafka/HStream/Kafka/Server/Handler/Consume.hs b/hstream-kafka/HStream/Kafka/Server/Handler/Consume.hs index 08bf5e220..2790b178f 100644 --- a/hstream-kafka/HStream/Kafka/Server/Handler/Consume.hs +++ b/hstream-kafka/HStream/Kafka/Server/Handler/Consume.hs @@ -1,8 +1,10 @@ +#ifndef HSTREAM_SPARSE_OFFSET {-# LANGUAGE OverloadedRecordDot #-} module HStream.Kafka.Server.Handler.Consume ( handleFetch ) where +#endif import Control.Exception import Control.Monad @@ -17,6 +19,7 @@ import Data.Vector (Vector) import qualified Data.Vector as V import qualified Data.Vector.Hashtables as HT import qualified Data.Vector.Storable as VS +import Data.Word import GHC.Data.FastMutInt import GHC.Stack (HasCallStack) @@ -152,172 +155,27 @@ handleFetch sc@ServerContext{..} reqCtx r_ = K.catchFetchResponseEx $ do --------------------------------------- -- * Generate response --------------------------------------- - mutMaxBytes <- newFastMutInt $ fromIntegral r.maxBytes - mutIsFirstPartition <- newFastMutInt 1 -- TODO: improve this - respTopics <- V.forM r.topics $ \(topic, partitions) -> do - respPartitionDatas <- V.forM partitions $ \partition -> do - let request = partition.request - let e_hioffset = extractHiOffset partition.elsn - case e_hioffset of - Left pd -> do - Log.debug1 $ "Response for (" <> Log.build topic - <> "," <> Log.build request.partition - <> "), log " <> Log.build partition.logid - <> ", error: " <> Log.buildString' pd.errorCode - pure pd - Right hioffset -> do - mgv <- HT.lookup readRecords partition.logid - case mgv of - Nothing -> do - Log.debug1 $ "Response for (" <> Log.build topic - <> "," <> Log.build request.partition - <> "), log " <> Log.build partition.logid - <> ", empty." - -- Cache the context. - -- - -- It's safe to set the remRecords to empty, because "mgv" is - -- Nothing, which means no remaining records in the table. - K.setFetchLogCtx - fetchCtx - partition.logid - K.FetchLogContext{ nextOffset = request.fetchOffset - , remRecords = V.empty - } - pure $ K.PartitionData - { partitionIndex = request.partition - , errorCode = K.NONE - , highWatermark = hioffset - , recordBytes = (K.RecordBytes $ Just "") - , lastStableOffset = (-1) -- TODO - , abortedTransactions = K.NonNullKaArray V.empty -- TODO - -- TODO: for performance reason, we don't implement - -- logStartOffset now - , logStartOffset = (-1) - } - Just (remv, gv) -> do - v <- if V.null remv - then GV.unsafeFreeze gv - -- TODO PERF - else (remv <>) <$> GV.unsafeFreeze gv - (bs, m_offset, tokenIdx) <- encodePartition mutMaxBytes mutIsFirstPartition request v - Log.debug1 $ "Response for (" <> Log.build topic - <> "," <> Log.build request.partition - <> "), log " <> Log.build partition.logid - <> ", " <> Log.build (BS.length bs) <> " bytes" - -- Cache the context - K.setFetchLogCtx - fetchCtx - partition.logid - K.FetchLogContext{ nextOffset = fromMaybe (-1) m_offset - , remRecords = V.drop (tokenIdx + 1) v - } - -- Stats - let partLabel = (topic, T.pack . show $ request.partition) - M.withLabel M.topicTotalSendBytes partLabel $ \counter -> void $ - M.addCounter counter (fromIntegral $ BS.length bs) - M.withLabel M.topicTotalSendMessages partLabel $ \counter -> void $ do - let totalRecords = V.sum $ V.map (.recordFormat.batchLength) v - M.addCounter counter (fromIntegral totalRecords) - -- PartitionData - pure $ K.PartitionData - { partitionIndex = request.partition - , errorCode = K.NONE - , highWatermark = hioffset - , recordBytes = (K.RecordBytes $ Just bs) - , lastStableOffset = (-1) -- TODO - , abortedTransactions = K.NonNullKaArray V.empty -- TODO - -- TODO: for performance reason, we don't implement - -- logStartOffset now - , logStartOffset = (-1) - } - pure $ K.FetchableTopicResponse topic (K.NonNullKaArray respPartitionDatas) - pure $ K.FetchResponse (K.NonNullKaArray respTopics) 0{- TODO: throttleTimeMs -} + generateResponse fetchCtx r readRecords ------------------------------------------------------------------------------- -preProcessRequest :: ServerContext -> K.RequestContext -> K.FetchRequest -> IO ReFetchRequest -preProcessRequest ServerContext{..} reqCtx r = do - -- kafka broker just throw java.lang.RuntimeException if topics is null, here - -- we do the same. - let K.NonNullKaArray topicReqs = r.topics - mutContFetch <- newFastMutInt 1 -- Bool - mutNumOfReads <- newFastMutInt 0 -- Total number of reads - topics <- V.forM topicReqs $ \t{- K.FetchTopic -} -> do - -- [ACL] check [READ TOPIC] - -- TODO: In kafka, check [CLUSTER_ACTION CLUSTER] instead if the request is from follower. - -- Of course, we do not consider this now. - isTopicAuthzed <- K.simpleAuthorize (K.toAuthorizableReqCtx reqCtx) authorizer K.Res_TOPIC t.topic K.AclOp_READ - -- Partition should be non-empty - let K.NonNullKaArray partitionReqs = t.partitions - -- FIXME: we can also cache this in FetchContext, however, we need to - -- consider the following: what if someone delete the topic? - orderedParts <- S.listStreamPartitionsOrderedByName scLDClient - (S.transToTopicStreamName t.topic) - ps <- V.forM partitionReqs $ \p{- K.FetchPartition -} -> do - M.withLabel M.totalConsumeRequest (t.topic, T.pack . show $ p.partition) $ - \counter -> void $ M.addCounter counter 1 - -- FIXME: too deep nesting... - if not isTopicAuthzed then do - let elsn = ErrPartitionData $ - errorPartitionResponse p.partition K.TOPIC_AUTHORIZATION_FAILED - pure $ Partition 0 elsn p - else do - let m_logid = orderedParts V.!? fromIntegral p.partition - case m_logid of - Nothing -> do - let elsn = ErrPartitionData $ - errorPartitionResponse p.partition K.UNKNOWN_TOPIC_OR_PARTITION - -- Actually, the logid should be Nothing but 0, however, we won't - -- use it, so just set it to 0 - pure $ Partition 0 elsn p - Just (_, logid) -> do - void $ atomicFetchAddFastMut mutNumOfReads 1 - contFetch <- readFastMutInt mutContFetch - elsn <- - if contFetch == 0 - then getPartitionLsn scLDClient scOffsetManager logid p.partition - p.fetchOffset - else do - m_logCtx <- K.getFetchLogCtx fetchCtx logid - case m_logCtx of - Nothing -> do -- Cache miss - Log.debug1 $ "ContFetch: cache miss" - writeFastMutInt mutContFetch 0 - getPartitionLsn scLDClient scOffsetManager - logid p.partition p.fetchOffset - Just logCtx -> - if (logCtx.nextOffset /= p.fetchOffset) -- Cache hit but not match - then do - Log.debug1 $ "ContFetch: cache hit but not match" - writeFastMutInt mutContFetch 0 - getPartitionLsn scLDClient scOffsetManager logid p.partition - p.fetchOffset - else do - m <- K.getLatestOffsetWithLsn scOffsetManager logid - case m of - Just (latestOffset, _tailLsn) -> do - Log.debug1 $ "ContFetch: Continue reading" - let highwaterOffset = latestOffset + 1 - pure $ ContReading logCtx.remRecords highwaterOffset - Nothing -> do - Log.debug1 $ "ContFetch: Continue reading, but logid " - <> Log.build logid <> " is empty" - -- We can quick return here, because the partition is empty - if p.fetchOffset == 0 - then pure $ ErrPartitionData $ - partitionResponse0 p.partition K.NONE 0 - else pure $ ErrPartitionData $ - errorPartitionResponse p.partition K.OFFSET_OUT_OF_RANGE - pure $ Partition logid elsn p - pure (t.topic, ps) - contFetch <- readFastMutInt mutContFetch - numOfReads <- readFastMutInt mutNumOfReads +-- | Preprocess the request +preProcessRequest + :: ServerContext -> K.RequestContext -> K.FetchRequest + -> IO ReFetchRequest +preProcessRequest sc@ServerContext{..} reqCtx r = do + (topics, numOfReads, contFetch) <- preProcessTopics sc r.topics reqCtx + + let doesAllError = all (all (isErrPartitionData . (.elsn)) . snd) -- TODO PERF: We can bybass loop all topics(using a global mutAllError). -- However, this will make the code more complex. - let doesAllError = all (all (isErrPartitionData . (.elsn)) . snd) - -- Kafka: fetchMaxBytes = Math.min( - -- Math.min(fetchRequest.maxBytes, config.fetchMaxBytes), - -- maxQuotaWindowBytes) + let allError = doesAllError topics + + -- Kafka: fetchMaxBytes = + -- Math.min( + -- Math.min(fetchRequest.maxBytes, config.fetchMaxBytes), + -- maxQuotaWindowBytes + -- ) let fetchMaxBytes = min r.maxBytes (fromIntegral kafkaBrokerConfigs.fetchMaxBytes._value) Log.debug1 $ "Received fetchMaxBytes " <> Log.build fetchMaxBytes -- Kafka: fetchMinBytes = Math.min(fetchRequest.minBytes, fetchMaxBytes) @@ -331,7 +189,7 @@ preProcessRequest ServerContext{..} reqCtx r = do , maxWaitMs = r.maxWaitMs , contFetch = False , totalReads = numOfReads - , allError = doesAllError topics + , allError = allError } else do cacheNumOfReads <- length <$> K.getAllFetchLogs fetchCtx if numOfReads == cacheNumOfReads @@ -342,7 +200,7 @@ preProcessRequest ServerContext{..} reqCtx r = do , maxWaitMs = r.maxWaitMs , contFetch = True , totalReads = numOfReads - , allError = doesAllError topics + , allError = allError } else do ts <- forM topics $ \(tn, ps) -> do @@ -364,38 +222,101 @@ preProcessRequest ServerContext{..} reqCtx r = do , allError = doesAllError ts } -getPartitionLsn - :: S.LDClient - -> K.OffsetManager - -> S.C_LogID -> Int32 - -> Int64 -- ^ kafka start offset - -> IO LsnData -getPartitionLsn ldclient om logid partition offset = do - m <- K.getLatestOffsetWithLsn om logid - case m of - Just (latestOffset, tailLsn) -> do - let highwaterOffset = latestOffset + 1 - if | offset < latestOffset -> do - let key = U.intToCBytesWithPadding offset - Log.debug1 $ "Try findKey " <> Log.buildString' key <> " in logid " - <> Log.build logid - (_, startLsn) <- S.findKey ldclient logid key S.FindKeyStrict - Log.debug1 $ "FindKey result " <> Log.build logid <> ": " - <> Log.build startLsn - pure $ LsnData startLsn tailLsn highwaterOffset - | offset == latestOffset -> - pure $ LsnData tailLsn tailLsn highwaterOffset - | offset == highwaterOffset -> - pure $ LsnData (tailLsn + 1) tailLsn highwaterOffset - | offset > highwaterOffset -> - pure $ ErrPartitionData $ errorPartitionResponse partition K.OFFSET_OUT_OF_RANGE - -- ghc is not smart enough to detact my partten matching is complete - | otherwise -> error "This should not be reached (getPartitionLsn)" - Nothing -> do - Log.debug $ "Partition " <> Log.build logid <> " is empty" - if offset == 0 - then pure $ LsnData S.LSN_MIN S.LSN_INVALID 0 - else pure $ ErrPartitionData $ errorPartitionResponse partition K.OFFSET_OUT_OF_RANGE +data PreProcessTopicVar = PreProcessTopicVar + { mutNumOfReads :: !FastMutInt -- ^ Total number of reads + , mutContFetch :: !FastMutInt -- ^ Continue fetch, Bool + } + +preProcessTopics + :: ServerContext -> K.KaArray K.FetchTopic -> K.RequestContext + -> IO (Vector (Text, Vector Partition), Int, Int) +preProcessTopics ServerContext{..} fetchTopics reqCtx = do + topicVar <- PreProcessTopicVar + <$> newFastMutInt 0 -- mutNumOfReads + <*> newFastMutInt 1 -- mutContFetch + -- kafka broker just throw java.lang.RuntimeException if topics is null, here + -- we do the same. + let K.NonNullKaArray topicReqs = fetchTopics + topics <- V.forM topicReqs $ \t{- K.FetchTopic -} -> do + -- Partition should be non-null + let K.NonNullKaArray partitionReqs = t.partitions + -- [ACL] check [READ TOPIC] + -- TODO: In kafka, check [CLUSTER_ACTION CLUSTER] instead if the request is from follower. + -- Of course, we do not consider this now. + isTopicAuthzed <- K.simpleAuthorize (K.toAuthorizableReqCtx reqCtx) authorizer K.Res_TOPIC t.topic K.AclOp_READ + parts <- + if isTopicAuthzed + then preProcessTopic t.topic partitionReqs topicVar + else V.forM partitionReqs $ \p{- K.FetchPartition -} -> do + let elsn = ErrPartitionData $ + errorPartitionResponse p.partition K.TOPIC_AUTHORIZATION_FAILED + pure $ Partition 0 elsn p + pure (t.topic, parts) + + contFetch <- readFastMutInt topicVar.mutContFetch + numOfReads <- readFastMutInt topicVar.mutNumOfReads + pure $ (topics, numOfReads, contFetch) + + where + preProcessTopic + :: Text -> Vector K.FetchPartition -> PreProcessTopicVar + -> IO (Vector Partition) + preProcessTopic topicName partitionReqs topicVar = do + -- FIXME: we can also cache this in FetchContext, however, we need to + -- consider the following: what if someone delete the topic? + orderedParts <- S.listStreamPartitionsOrderedByName scLDClient + (S.transToTopicStreamName topicName) + V.forM partitionReqs $ \p{- K.FetchPartition -} -> do + preProcessPartition orderedParts p topicVar + + preProcessPartition partitions p topicVar = do + let m_logid = partitions V.!? fromIntegral p.partition + case m_logid of + Nothing -> do + let elsn = ErrPartitionData $ + errorPartitionResponse p.partition K.UNKNOWN_TOPIC_OR_PARTITION + -- Actually, the logid should be Nothing, however, we won't + -- use it, so just set it to 0 for convenience. + pure $ Partition 0 elsn p + Just (_, logid) -> do + void $ atomicFetchAddFastMut topicVar.mutNumOfReads 1 + contFetch <- readFastMutInt topicVar.mutContFetch + elsn <- + if contFetch == 0 + then getPartitionLsn scLDClient scOffsetManager logid p.partition + p.fetchOffset + else do + m_logCtx <- K.getFetchLogCtx fetchCtx logid + case m_logCtx of + Nothing -> do -- Cache miss + Log.debug1 $ "ContFetch: cache miss" + writeFastMutInt topicVar.mutContFetch 0 + getPartitionLsn scLDClient scOffsetManager + logid p.partition p.fetchOffset + Just logCtx -> + if (logCtx.expectedOffset /= p.fetchOffset) -- Cache hit but not match + then do + Log.debug1 $ "ContFetch: cache hit but not match" + writeFastMutInt topicVar.mutContFetch 0 + getPartitionLsn scLDClient scOffsetManager + logid p.partition p.fetchOffset + else do + m <- getLatestOffsetWithLsn scOffsetManager logid + case m of + Just (latestOffset, _tailLsn) -> do + Log.debug1 $ "ContFetch: Continue reading" + let highwaterOffset = calculateNextOffset latestOffset + pure $ ContReading logCtx.remRecords highwaterOffset + Nothing -> do + Log.debug1 $ "ContFetch: Continue reading, but logid " + <> Log.build logid <> " is empty" + -- We can quick return here, because the partition is empty + if p.fetchOffset == 0 + then pure $ ErrPartitionData $ + partitionResponse0 p.partition K.NONE 0 + else pure $ ErrPartitionData $ + errorPartitionResponse p.partition K.OFFSET_OUT_OF_RANGE + pure $ Partition logid elsn p readMode1 :: ReFetchRequest @@ -477,6 +398,94 @@ readMode1 r storageOpts reader = do v' <- GV.append v (K.Record recordFormat (record.recordAttr.recordAttrLSN)) HT.insert table logid (rv, v') +generateResponse :: K.FetchContext -> ReFetchRequest -> RecordTable + -> IO K.FetchResponse +generateResponse fetchCtx r readRecords = do + mutMaxBytes <- newFastMutInt $ fromIntegral r.maxBytes + mutIsFirstPartition <- newFastMutInt 1 -- TODO: improve this + respTopics <- V.forM r.topics $ \(topic, partitions) -> do + respPartitionDatas <- V.forM partitions $ \partition -> do + let request = partition.request + let e_hioffset = extractHiOffset partition.elsn + case e_hioffset of + Left pd -> do + Log.debug1 $ "Response for (" <> Log.build topic + <> "," <> Log.build request.partition + <> "), log " <> Log.build partition.logid + <> ", error: " <> Log.buildString' pd.errorCode + pure pd + Right hioffset -> do + mgv <- HT.lookup readRecords partition.logid + case mgv of + Nothing -> do + Log.debug1 $ "Response for (" <> Log.build topic + <> "," <> Log.build request.partition + <> "), log " <> Log.build partition.logid + <> ", empty." + -- Cache the context. + -- + -- It's safe to set the remRecords to empty, because "mgv" is + -- Nothing, which means no remaining records in the table. + K.setFetchLogCtx + fetchCtx + partition.logid + K.FetchLogContext{ expectedOffset = request.fetchOffset + , remRecords = V.empty + } + pure $ K.PartitionData + { partitionIndex = request.partition + , errorCode = K.NONE + , highWatermark = hioffset + , recordBytes = (K.RecordBytes $ Just "") + , lastStableOffset = (-1) -- TODO + , abortedTransactions = K.NonNullKaArray V.empty -- TODO + -- TODO: for performance reason, we don't implement + -- logStartOffset now + , logStartOffset = (-1) + } + Just (remv, gv) -> do + v <- if V.null remv + then GV.unsafeFreeze gv + -- TODO PERF + else (remv <>) <$> GV.unsafeFreeze gv + (bs, m_offset, tokenIdx) <- encodePartition mutMaxBytes mutIsFirstPartition request v + Log.debug1 $ "Response for (" <> Log.build topic + <> "," <> Log.build request.partition + <> "), log " <> Log.build partition.logid + <> ", " <> Log.build (BS.length bs) <> " bytes" + -- Cache the context + K.setFetchLogCtx + fetchCtx + partition.logid + -- FIXME: does this correct? + -- + -- Always expect the (last_offset + 1) to be fetched next + -- even with HSTREAM_SPARSE_OFFSET enabled. + K.FetchLogContext{ expectedOffset = maybe (-1) (+ 1) m_offset + , remRecords = V.drop (tokenIdx + 1) v + } + -- Stats + let partLabel = (topic, T.pack . show $ request.partition) + M.withLabel M.topicTotalSendBytes partLabel $ \counter -> void $ + M.addCounter counter (fromIntegral $ BS.length bs) + M.withLabel M.topicTotalSendMessages partLabel $ \counter -> void $ do + let totalRecords = V.sum $ V.map (.recordFormat.batchLength) v + M.addCounter counter (fromIntegral totalRecords) + -- PartitionData + pure $ K.PartitionData + { partitionIndex = request.partition + , errorCode = K.NONE + , highWatermark = hioffset + , recordBytes = (K.RecordBytes $ Just bs) + , lastStableOffset = (-1) -- TODO + , abortedTransactions = K.NonNullKaArray V.empty -- TODO + -- TODO: for performance reason, we don't implement + -- logStartOffset now + , logStartOffset = (-1) + } + pure $ K.FetchableTopicResponse topic (K.NonNullKaArray respPartitionDatas) + pure $ K.FetchResponse (K.NonNullKaArray respTopics) 0{- TODO: throttleTimeMs -} + -- In kafka broker, regarding the format on disk, the broker will return -- the message format according to the fetch api version. Which means -- @@ -496,7 +505,7 @@ encodePartition -> K.FetchPartition -> Vector K.Record -> IO (ByteString, Maybe Int64, Int) - -- ^ (encoded bytes, next offset, taken vector index) + -- ^ (encoded bytes, encoded last offset, taken vector index) -- -- taken vector index: -1 means no vector taken, otherwise, the index of -- the vector taken @@ -517,24 +526,46 @@ encodePartition mutMaxBytes mutIsFirstPartition p v = do -- -- Also see 'HStream.Kafka.Common.RecordFormat.trySeekMessageSet' let fstRecordBytes = K.unCompactBytes fstRecord.recordFormat.recordBytes - fstLen = BS.length fstRecordBytes + fstRecordLen = BS.length fstRecordBytes if isFristPartition == 1 -- First partition then do writeFastMutInt mutIsFirstPartition 0 -- next partition should not be the first - if fstLen >= maxBytes + if fstRecordLen >= maxBytes then do writeFastMutInt mutMaxBytes (-1) - mo <- K.decodeNextRecordOffset fstRecordBytes +#ifndef HSTREAM_SPARSE_OFFSET + mo <- decodeLastBatchOffset fstRecordBytes +#else + -- TODO: improve, + -- add a new function unsafeUpdateRecordBatchBaseOffset' + -- which return the last offset + unsafeUpdateRecordBatchBaseOffset fstRecordBytes fstRecord.recordLsn + mo <- decodeLastBatchOffset fstRecordBytes +#endif pure (fstRecordBytes, mo, 0) - else if fstLen >= (fromIntegral p.partitionMaxBytes) - then do void $ atomicFetchAddFastMut mutMaxBytes (-fstLen) - mo <- K.decodeNextRecordOffset fstRecordBytes + else if fstRecordLen >= (fromIntegral p.partitionMaxBytes) + then do void $ atomicFetchAddFastMut mutMaxBytes (-fstRecordLen) +#ifndef HSTREAM_SPARSE_OFFSET + mo <- decodeLastBatchOffset fstRecordBytes +#else + -- TODO: improve + unsafeUpdateRecordBatchBaseOffset fstRecordBytes fstRecord.recordLsn + mo <- decodeLastBatchOffset fstRecordBytes +#endif pure (fstRecordBytes, mo, 0) - else doEncodeElse fstRecordBytes vs + else do +#ifdef HSTREAM_SPARSE_OFFSET + unsafeUpdateRecordBatchBaseOffset fstRecordBytes fstRecord.recordLsn +#endif + doEncodeElse fstRecordBytes vs -- Not the first partition else do - if fstLen <= maxBytes - then doEncodeElse fstRecordBytes vs + if fstRecordLen <= maxBytes + then do +#ifdef HSTREAM_SPARSE_OFFSET + unsafeUpdateRecordBatchBaseOffset fstRecordBytes fstRecord.recordLsn +#endif + doEncodeElse fstRecordBytes vs else pure ("", Nothing, (-1)) doEncodeElse fstBs vs = do @@ -548,6 +579,9 @@ encodePartition mutMaxBytes mutIsFirstPartition p v = do -- FIXME: Does this possible be multiple BatchRecords? let rbs = K.unCompactBytes r.recordFormat.recordBytes rlen = BS.length rbs +#ifdef HSTREAM_SPARSE_OFFSET + unsafeUpdateRecordBatchBaseOffset rbs r.recordLsn +#endif curMaxBytes <- atomicFetchAddFastMut mutMaxBytes (-rlen) curPartMaxBytes <- atomicFetchAddFastMut mutPartitionMaxBytes (-rlen) let capLen = min curPartMaxBytes curMaxBytes @@ -556,14 +590,14 @@ encodePartition mutMaxBytes mutIsFirstPartition p v = do b' = b <> BB.byteString rbs' if capLen < rlen then do - mo1 <- K.decodeNextRecordOffset rbs' + mo1 <- decodeLastBatchOffset rbs' case mo1 of Just _ -> pure ((b', Right mo1, i), False) Nothing -> do - mo2 <- K.decodeNextRecordOffset (fromLeft' lb) + mo2 <- decodeLastBatchOffset (fromLeft' lb) pure ((b', Right mo2, i), False) else pure ((b', Left rbs, i + 1), True) - lastOffset <- either K.decodeNextRecordOffset pure lastOffset' + lastOffset <- either decodeLastBatchOffset pure lastOffset' pure (BS.toStrict $ BB.toLazyByteString bb, lastOffset, takenVecIdx) @@ -595,6 +629,79 @@ partitionResponse0 partitionIndex ec hw = K.PartitionData ------------------------------------------------------------------------------- +getLatestOffsetWithLsn :: K.OffsetManager -> Word64 -> IO (Maybe (Int64, S.LSN)) +getLatestOffsetWithLsn = +#ifndef HSTREAM_SPARSE_OFFSET + K.getLatestOffsetWithLsn +#else + K.getLatestSparseOffsetWithLsn +#endif + +decodeLastBatchOffset :: ByteString -> IO (Maybe Int64) +decodeLastBatchOffset bs = +#ifndef HSTREAM_SPARSE_OFFSET + -- TODO: use decodeRecordBatchOffset instead + (fmap (subtract 1)) <$> K.decodeNextRecordOffset bs +#else + let f (baseOffset, recordsLen) = baseOffset + fromIntegral recordsLen - 1 + in (fmap f) <$> K.decodeRecordBatchOffset bs +#endif + +#ifdef HSTREAM_SPARSE_OFFSET +unsafeUpdateRecordBatchBaseOffset :: ByteString -> S.LSN -> IO () +unsafeUpdateRecordBatchBaseOffset bs lsn = + K.unsafeUpdateRecordBatchBaseOffset bs (+ (K.composeSparseOffset lsn 0)) +#endif + +calculateNextOffset :: Int64 -> Int64 +calculateNextOffset = +#ifndef HSTREAM_SPARSE_OFFSET + (+ 1) +#else + K.getNextSparseOffset +#endif + +getPartitionLsn + :: S.LDClient + -> K.OffsetManager + -> S.C_LogID -> Int32 + -> Int64 -- ^ kafka start offset + -> IO LsnData +getPartitionLsn ldclient om logid partition offset = do + m <- getLatestOffsetWithLsn om logid + case m of + Just (latestOffset, tailLsn) -> do + let highwaterOffset = calculateNextOffset latestOffset +#ifndef HSTREAM_SPARSE_OFFSET + if | offset < latestOffset -> do + let key = U.intToCBytesWithPadding offset + Log.debug1 $ "Try findKey " <> Log.buildString' key <> " in logid " + <> Log.build logid + (_, startLsn) <- S.findKey ldclient logid key S.FindKeyStrict + Log.debug1 $ "FindKey result " <> Log.build logid <> ": " + <> Log.build startLsn + pure $ LsnData startLsn tailLsn highwaterOffset + | offset == latestOffset -> + pure $ LsnData tailLsn tailLsn highwaterOffset +#else + if | offset < highwaterOffset -> do + let startLsn = K.sparseOffsetToLsn offset + pure $ LsnData startLsn tailLsn highwaterOffset +#endif + | offset == highwaterOffset -> + pure $ LsnData (tailLsn + 1) tailLsn highwaterOffset + | offset > highwaterOffset -> + pure $ ErrPartitionData $ errorPartitionResponse partition K.OFFSET_OUT_OF_RANGE + -- ghc is not smart enough to detact my partten matching is complete + | otherwise -> error "This should not be reached (getPartitionLsn)" + Nothing -> do + Log.debug1 $ "Log " <> Log.build logid <> " is empty" + if offset == 0 + then pure $ LsnData S.LSN_MIN S.LSN_INVALID 0 + else pure $ ErrPartitionData $ errorPartitionResponse partition K.OFFSET_OUT_OF_RANGE + +------------------------------------------------------------------------------- + -- NOTE: condition is True -> continue; False -> break vecFoldWhileM :: Monad m => Vector b -> a -> (a -> b -> m (a, Bool)) -> m a vecFoldWhileM !bs !a !f = diff --git a/hstream-kafka/HStream/Kafka/Server/Handler/SparseOffset/Consume.hs b/hstream-kafka/HStream/Kafka/Server/Handler/SparseOffset/Consume.hs new file mode 100644 index 000000000..541c9a6c9 --- /dev/null +++ b/hstream-kafka/HStream/Kafka/Server/Handler/SparseOffset/Consume.hs @@ -0,0 +1,10 @@ +{-# LANGUAGE PatternSynonyms #-} +{-# OPTIONS_GHC -Wno-unused-top-binds #-} + +module HStream.Kafka.Server.Handler.SparseOffset.Consume + ( handleFetch + ) where + +#define HSTREAM_SPARSE_OFFSET +#include "HStream/Kafka/Server/Handler/Consume.hs" +#undef HSTREAM_SPARSE_OFFSET diff --git a/hstream-kafka/hstream-kafka.cabal b/hstream-kafka/hstream-kafka.cabal index 57188b8bd..dd577aaa0 100644 --- a/hstream-kafka/hstream-kafka.cabal +++ b/hstream-kafka/hstream-kafka.cabal @@ -173,6 +173,7 @@ library HStream.Kafka.Server.Handler.Group HStream.Kafka.Server.Handler.Offset HStream.Kafka.Server.Handler.Produce + HStream.Kafka.Server.Handler.SparseOffset.Consume HStream.Kafka.Server.Handler.SparseOffset.Offset HStream.Kafka.Server.Handler.SparseOffset.Produce HStream.Kafka.Server.Handler.Topic diff --git a/hstream-kafka/protocol/Kafka/Protocol/Encoding.hs b/hstream-kafka/protocol/Kafka/Protocol/Encoding.hs index f8e5de605..e3bcfb9b1 100644 --- a/hstream-kafka/protocol/Kafka/Protocol/Encoding.hs +++ b/hstream-kafka/protocol/Kafka/Protocol/Encoding.hs @@ -13,6 +13,7 @@ module Kafka.Protocol.Encoding -- * Message Format , RecordBatch (..) , decodeRecordBatch + , decodeRecordBatchOffset , updateRecordBatchBaseOffset , unsafeUpdateRecordBatchBaseOffset -- ** Attributes @@ -214,6 +215,35 @@ decodeRecordBatch shouldValidateCrc bs = do else throwIO $ DecodeError (INVALID_RECORD, "There are some bytes left") _ -> throwIO $ DecodeError $ (CORRUPT_MESSAGE, "Invalid magic " <> show magic) +-- | Get the offset from the batch bs. +-- +-- Return (baseOffset, batchRecordsLen) +decodeRecordBatchOffset :: ByteString -> IO (Maybe (Int64, Int32)) +decodeRecordBatchOffset bs = fst <$> runParser' parser bs + where + parser = do + let totalLen = BS.length bs + -- FailFast: batch is incomplete + if totalLen <= 12{- baseOffset, batchLength -} then pure Nothing else do + baseOffset <- get @Int64 + batchLength <- get @Int32 + let remainingLen = fromIntegral totalLen - 12 - batchLength + -- batch is incomplete or more + if remainingLen /= 0 then pure Nothing else do + directDropBytes 4 -- partitionLeaderEpoch: int32 + magic <- get @Int8 + case magic of + 2 -> do + -- crc: int32 + attributes: int16 + lastOffsetDelta: int32 + + -- baseTimestamp: int64 + maxTimestamp: int64 + + -- producerId: int64 + producerEpoch: int16 + + -- baseSequence: int32 + directDropBytes 40 + batchRecordsLen <- get @Int32 + pure $ Just (baseOffset, batchRecordsLen) + _ -> fail $ "Invalid magic " <> show magic +{-# INLINE decodeRecordBatchOffset #-} + -- Be sure to use this function after the calling of 'decodeRecordBatch', -- since we do not check the bounds. updateRecordBatchBaseOffset :: ByteString -> (Int64 -> Int64) -> IO ByteString