diff --git a/hstream-kafka/HStream/Kafka/Common/OffsetManager.hs b/hstream-kafka/HStream/Kafka/Common/OffsetManager.hs index 2dc0ac555..b5f2ce744 100644 --- a/hstream-kafka/HStream/Kafka/Common/OffsetManager.hs +++ b/hstream-kafka/HStream/Kafka/Common/OffsetManager.hs @@ -12,10 +12,20 @@ module HStream.Kafka.Common.OffsetManager , getLatestOffset , getLatestOffsetWithLsn , getOffsetByTimestamp + + -- * SparseOffset + , getOldestSparseOffset + , getLatestHeadSparseOffsetWithLsn + , getLatestHeadSparseOffset + , getSparseOffsetByTimestamp + , composeSparseOffset + , sparseOffsetToLsn + , calNextSparseOffset ) where import Control.Concurrent import Control.Exception +import Data.Bits import qualified Data.HashTable.IO as H import Data.Int import Data.Maybe @@ -27,6 +37,7 @@ import GHC.Stack (HasCallStack) import HStream.Kafka.Common.Read import HStream.Kafka.Common.RecordFormat import qualified HStream.Store as S +import qualified HStream.Store.Internal.LogDevice as S ------------------------------------------------------------------------------- @@ -128,8 +139,6 @@ getOffsetByTimestamp OffsetManager{..} logid timestamp = do else pure (lsn, tailLsn) in fmap (calOffset . third) <$> readOneRecordBypassGap store reader logid getLsn -------------------------------------------------------------------------------- - -- Suppose we have three batched records: -- -- record0, record1, record2 @@ -139,6 +148,75 @@ getOffsetByTimestamp OffsetManager{..} logid timestamp = do calOffset :: RecordFormat -> Int64 calOffset RecordFormat{..} = offset + 1 - fromIntegral batchLength +------------------------------------------------------------------------------- +-- SparseOffset + +-- For performance reason(to bypass reading record), we directly use the +-- trim point as the oldest lsn. +-- +-- If the log is nonempty and was never trimmed, the trim point is LSN_INVALID(0) +-- which is different from the actual oldest lsn. +getOldestSparseOffset :: OffsetManager -> Word64 -> IO (Maybe Int64) +getOldestSparseOffset OffsetManager{..} logid = do + isEmpty <- S.isLogEmpty store logid + if isEmpty + then pure Nothing + else do + lsn <- S.getLogHeadAttrsTrimPoint =<< S.getLogHeadAttrs store logid + pure $ Just $ composeSparseOffset lsn 0 + +-- Get the head of last SparseOffset. +-- +-- Note this return the start offset of the latest batch, which is diff from +-- 'getLatestOffsetWithLsn' +getLatestHeadSparseOffsetWithLsn + :: OffsetManager -> Word64 -> IO (Maybe (Int64, S.LSN)) +getLatestHeadSparseOffsetWithLsn OffsetManager{..} logid = do + isEmpty <- S.isLogEmpty store logid + if isEmpty + then pure Nothing + else do + tailLsn <- S.getTailLSN store logid + pure $ Just (composeSparseOffset tailLsn 0, tailLsn) + +getLatestHeadSparseOffset :: OffsetManager -> Word64 -> IO (Maybe Int64) +getLatestHeadSparseOffset o logid = + (fmap fst) <$> getLatestHeadSparseOffsetWithLsn o logid + +getSparseOffsetByTimestamp :: HasCallStack => OffsetManager -> Word64 -> Int64 -> IO (Maybe Int64) +getSparseOffsetByTimestamp OffsetManager{..} logid timestamp = do + lsn <- S.findTime store logid timestamp S.FindKeyStrict + tailLsn <- S.getTailLSN store logid + if lsn > tailLsn + then pure Nothing + -- FIXME: the lsn here may be a gap, do we need to handle this? + else pure $ Just $ composeSparseOffset lsn 0 + +-- epoch: 20bit, esn: 20bit, record_index: 24bit +-- +-- Actually, epoch will use 19 bits +composeSparseOffset :: Word64 -> Int32 -> Int64 +composeSparseOffset lsn idx = + let epoch = lsn `shiftR` 32 + esn = lsn .&. 0xffffffff + -- We use 40 bits for lsn, but it should less than 2^39, since + -- offset is Int64 and we don't want a negative offset. + in if lsn > 2^(39 :: Int) - 1 || idx > 2^(24 :: Int) - 1 + then error "SparseOffset overflow!" + else fromIntegral $ (epoch `shiftL` 44) .|. (esn `shiftL` 24) .|. (fromIntegral idx) + +sparseOffsetToLsn :: Int64 -> Word64 +sparseOffsetToLsn offset = fromIntegral $ offset `shiftR` 24 + +calNextSparseOffset :: Int64 -> Int64 +calNextSparseOffset offset = + let lsn = offset `shiftR` 24 + in if lsn < 2^(39 :: Int) - 1 + then (offset `shiftR` 24 + 1) `shiftL` 24 + else error "SparseOffset overflow!" + +------------------------------------------------------------------------------- + third :: (a, b, c) -> c third (_, _, x) = x {-# INLINE third #-}