Skip to content

Commit

Permalink
Experimental SpareOffset 0: basic functions for SpareOffset
Browse files Browse the repository at this point in the history
  • Loading branch information
4eUeP committed May 17, 2024
1 parent ed8a51a commit 10ee123
Showing 1 changed file with 80 additions and 2 deletions.
82 changes: 80 additions & 2 deletions hstream-kafka/HStream/Kafka/Common/OffsetManager.hs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,20 @@ module HStream.Kafka.Common.OffsetManager
, getLatestOffset
, getLatestOffsetWithLsn
, getOffsetByTimestamp

-- * SparseOffset
, getOldestSparseOffset
, getLatestHeadSparseOffsetWithLsn
, getLatestHeadSparseOffset
, getSparseOffsetByTimestamp
, composeSparseOffset
, sparseOffsetToLsn
, calNextSparseOffset
) where

import Control.Concurrent
import Control.Exception
import Data.Bits
import qualified Data.HashTable.IO as H
import Data.Int
import Data.Maybe
Expand All @@ -27,6 +37,7 @@ import GHC.Stack (HasCallStack)
import HStream.Kafka.Common.Read
import HStream.Kafka.Common.RecordFormat
import qualified HStream.Store as S
import qualified HStream.Store.Internal.LogDevice as S

-------------------------------------------------------------------------------

Expand Down Expand Up @@ -128,8 +139,6 @@ getOffsetByTimestamp OffsetManager{..} logid timestamp = do
else pure (lsn, tailLsn)
in fmap (calOffset . third) <$> readOneRecordBypassGap store reader logid getLsn

-------------------------------------------------------------------------------

-- Suppose we have three batched records:
--
-- record0, record1, record2
Expand All @@ -139,6 +148,75 @@ getOffsetByTimestamp OffsetManager{..} logid timestamp = do
calOffset :: RecordFormat -> Int64
calOffset RecordFormat{..} = offset + 1 - fromIntegral batchLength

-------------------------------------------------------------------------------
-- SparseOffset

-- For performance reason(to bypass reading record), we directly use the
-- trim point as the oldest lsn.
--
-- If the log is nonempty and was never trimmed, the trim point is LSN_INVALID(0)
-- which is different from the actual oldest lsn.
getOldestSparseOffset :: OffsetManager -> Word64 -> IO (Maybe Int64)
getOldestSparseOffset OffsetManager{..} logid = do
isEmpty <- S.isLogEmpty store logid
if isEmpty
then pure Nothing
else do
lsn <- S.getLogHeadAttrsTrimPoint =<< S.getLogHeadAttrs store logid
pure $ Just $ composeSparseOffset lsn 0

-- Get the head of last SparseOffset.
--
-- Note this return the start offset of the latest batch, which is diff from
-- 'getLatestOffsetWithLsn'
getLatestHeadSparseOffsetWithLsn
:: OffsetManager -> Word64 -> IO (Maybe (Int64, S.LSN))
getLatestHeadSparseOffsetWithLsn OffsetManager{..} logid = do
isEmpty <- S.isLogEmpty store logid
if isEmpty
then pure Nothing
else do
tailLsn <- S.getTailLSN store logid
pure $ Just (composeSparseOffset tailLsn 0, tailLsn)

getLatestHeadSparseOffset :: OffsetManager -> Word64 -> IO (Maybe Int64)
getLatestHeadSparseOffset o logid =
(fmap fst) <$> getLatestHeadSparseOffsetWithLsn o logid

getSparseOffsetByTimestamp :: HasCallStack => OffsetManager -> Word64 -> Int64 -> IO (Maybe Int64)
getSparseOffsetByTimestamp OffsetManager{..} logid timestamp = do
lsn <- S.findTime store logid timestamp S.FindKeyStrict
tailLsn <- S.getTailLSN store logid
if lsn > tailLsn
then pure Nothing
-- FIXME: the lsn here may be a gap, do we need to handle this?
else pure $ Just $ composeSparseOffset lsn 0

-- epoch: 20bit, esn: 20bit, record_index: 24bit
--
-- Actually, epoch will use 19 bits
composeSparseOffset :: Word64 -> Int32 -> Int64
composeSparseOffset lsn idx =
let epoch = lsn `shiftR` 32
esn = lsn .&. 0xffffffff
-- We use 40 bits for lsn, but it should less than 2^39, since
-- offset is Int64 and we don't want a negative offset.
in if lsn > 2^(39 :: Int) - 1 || idx > 2^(24 :: Int) - 1
then error "SparseOffset overflow!"
else fromIntegral $ (epoch `shiftL` 44) .|. (esn `shiftL` 24) .|. (fromIntegral idx)

sparseOffsetToLsn :: Int64 -> Word64
sparseOffsetToLsn offset = fromIntegral $ offset `shiftR` 24

calNextSparseOffset :: Int64 -> Int64
calNextSparseOffset offset =
let lsn = offset `shiftR` 24
in if lsn < 2^(39 :: Int) - 1
then (offset `shiftR` 24 + 1) `shiftL` 24
else error "SparseOffset overflow!"

-------------------------------------------------------------------------------

third :: (a, b, c) -> c
third (_, _, x) = x
{-# INLINE third #-}

0 comments on commit 10ee123

Please sign in to comment.