From 35676800a0368d3e4242c593b6a823b295b7a2fa Mon Sep 17 00:00:00 2001 From: mu <59917266+4eUeP@users.noreply.github.com> Date: Wed, 15 May 2024 14:15:53 +0800 Subject: [PATCH] Kafka SparseOffset: ListOffsets handler --- .stylish-haskell.yaml | 2 + .../HStream/Kafka/Common/OffsetManager.hs | 66 ++++++++++++++++++- .../HStream/Kafka/Server/Handler/Offset.hs | 31 ++++++--- .../Server/Handler/SparseOffset/Offset.hs | 10 +++ hstream-kafka/hstream-kafka.cabal | 6 +- 5 files changed, 101 insertions(+), 14 deletions(-) create mode 100644 hstream-kafka/HStream/Kafka/Server/Handler/SparseOffset/Offset.hs diff --git a/.stylish-haskell.yaml b/.stylish-haskell.yaml index 42703ca8e..57a780ba0 100644 --- a/.stylish-haskell.yaml +++ b/.stylish-haskell.yaml @@ -272,6 +272,8 @@ language_extensions: - TemplateHaskell - TypeApplications - QuasiQuotes + - CPP + - PatternSynonyms # Attempt to find the cabal file in ancestors of the current directory, and # parse options (currently only language extensions) from that. diff --git a/hstream-kafka/HStream/Kafka/Common/OffsetManager.hs b/hstream-kafka/HStream/Kafka/Common/OffsetManager.hs index 2dc0ac555..74308f9d3 100644 --- a/hstream-kafka/HStream/Kafka/Common/OffsetManager.hs +++ b/hstream-kafka/HStream/Kafka/Common/OffsetManager.hs @@ -12,10 +12,20 @@ module HStream.Kafka.Common.OffsetManager , getLatestOffset , getLatestOffsetWithLsn , getOffsetByTimestamp + + -- * SparseOffset + , getOldestSparseOffset + , getLatestSparseOffsetWithLsn + , getLatestSparseOffset + , getNextSparseOffset + , getSparseOffsetByTimestamp + , composeSparseOffset + , sparseOffsetToLsn ) where import Control.Concurrent import Control.Exception +import Data.Bits import qualified Data.HashTable.IO as H import Data.Int import Data.Maybe @@ -27,6 +37,7 @@ import GHC.Stack (HasCallStack) import HStream.Kafka.Common.Read import HStream.Kafka.Common.RecordFormat import qualified HStream.Store as S +import qualified HStream.Store.Internal.LogDevice as S ------------------------------------------------------------------------------- @@ -128,8 +139,6 @@ getOffsetByTimestamp OffsetManager{..} logid timestamp = do else pure (lsn, tailLsn) in fmap (calOffset . third) <$> readOneRecordBypassGap store reader logid getLsn -------------------------------------------------------------------------------- - -- Suppose we have three batched records: -- -- record0, record1, record2 @@ -139,6 +148,59 @@ getOffsetByTimestamp OffsetManager{..} logid timestamp = do calOffset :: RecordFormat -> Int64 calOffset RecordFormat{..} = offset + 1 - fromIntegral batchLength +------------------------------------------------------------------------------- + +getOldestSparseOffset :: OffsetManager -> Word64 -> IO (Maybe Int64) +getOldestSparseOffset OffsetManager{..} logid = do + isEmpty <- S.isLogEmpty store logid + if isEmpty + then pure Nothing + else do + lsn <- S.getLogHeadAttrsTrimPoint =<< S.getLogHeadAttrs store logid + pure $ Just $ composeSparseOffset lsn 0 + +getLatestSparseOffsetWithLsn + :: OffsetManager -> Word64 -> IO (Maybe (Int64, S.LSN)) +getLatestSparseOffsetWithLsn OffsetManager{..} logid = do + isEmpty <- S.isLogEmpty store logid + if isEmpty + then pure Nothing + else do + tailLsn <- S.getTailLSN store logid + pure $ Just (composeSparseOffset tailLsn 0, tailLsn) + +getLatestSparseOffset :: OffsetManager -> Word64 -> IO (Maybe Int64) +getLatestSparseOffset o logid = (fmap fst) <$> getLatestSparseOffsetWithLsn o logid + +-- TODO: check overflow +getNextSparseOffset :: Int64 -> Int64 +getNextSparseOffset offset = (offset `shiftR` 24 + 1) `shiftL` 24 + +getSparseOffsetByTimestamp :: HasCallStack => OffsetManager -> Word64 -> Int64 -> IO (Maybe Int64) +getSparseOffsetByTimestamp OffsetManager{..} logid timestamp = do + lsn <- S.findTime store logid timestamp S.FindKeyStrict + tailLsn <- S.getTailLSN store logid + if lsn > tailLsn + then pure Nothing + -- FIXME: the lsn here may be a gap, do we need to handle this? + else pure $ Just $ composeSparseOffset lsn 0 + +-- (20bit, 20bit, 24bit) +composeSparseOffset :: Word64 -> Int32 -> Int64 +composeSparseOffset lsn idx = + let epoch = lsn `shiftR` 32 + esn = lsn .&. 0xffffffff + -- We use 20 bits for epoch, but it should less than 2^19 - 1, since + -- offset is Int64 and we don't want a negative offset. + in if (epoch > 2^(19 :: Int) - 1 || esn > 2^(20 :: Int) - 1 || idx > 2^(24 :: Int) - 1) + then error "SparseOffset overflow!" + else fromIntegral $ (epoch `shiftL` 44) .|. (esn `shiftL` 24) .|. (fromIntegral idx) + +sparseOffsetToLsn :: Int64 -> Word64 +sparseOffsetToLsn offset = fromIntegral $ offset `shiftR` 24 + +------------------------------------------------------------------------------- + third :: (a, b, c) -> c third (_, _, x) = x {-# INLINE third #-} diff --git a/hstream-kafka/HStream/Kafka/Server/Handler/Offset.hs b/hstream-kafka/HStream/Kafka/Server/Handler/Offset.hs index 30bce055c..e8f9b623e 100644 --- a/hstream-kafka/HStream/Kafka/Server/Handler/Offset.hs +++ b/hstream-kafka/HStream/Kafka/Server/Handler/Offset.hs @@ -1,12 +1,12 @@ -{-# LANGUAGE OverloadedRecordDot #-} -{-# LANGUAGE PatternSynonyms #-} +#ifndef HSTREAM_SPARSE_OFFSET +{-# LANGUAGE PatternSynonyms #-} module HStream.Kafka.Server.Handler.Offset ( handleOffsetCommit , handleOffsetFetch , handleListOffsets - ) -where + ) where +#endif import qualified Control.Exception as E import Data.Int (Int64) @@ -17,9 +17,7 @@ import HStream.Kafka.Common.Acl import HStream.Kafka.Common.Authorizer.Class import qualified HStream.Kafka.Common.KafkaException as K import qualified HStream.Kafka.Common.Metrics as Metrics -import HStream.Kafka.Common.OffsetManager (getLatestOffset, - getOffsetByTimestamp, - getOldestOffset) +import qualified HStream.Kafka.Common.OffsetManager as K import HStream.Kafka.Common.Resource import HStream.Kafka.Common.Utils (forKaArray, forKaArrayM) import qualified HStream.Kafka.Group.Group as G @@ -109,18 +107,31 @@ handleListOffsets sc reqCtx req = do , name = listOffsetsTopic.name } +#ifndef HSTREAM_SPARSE_OFFSET -- NOTE: The last offset of a partition is the offset of the upcoming -- message, i.e. the offset of the last available message + 1. getOffset logid LatestTimestamp = - maybe 0 (+ 1) <$> getLatestOffset sc.scOffsetManager logid + maybe 0 (+ 1) <$> K.getLatestOffset sc.scOffsetManager logid getOffset logid EarliestTimestamp = - fromMaybe 0 <$> getOldestOffset sc.scOffsetManager logid + fromMaybe 0 <$> K.getOldestOffset sc.scOffsetManager logid -- Return the earliest offset whose timestamp is greater than or equal to -- the given timestamp. -- -- TODO: actually, this is not supported currently. getOffset logid timestamp = - fromMaybe (-1) <$> getOffsetByTimestamp sc.scOffsetManager logid timestamp + fromMaybe (-1) <$> K.getOffsetByTimestamp sc.scOffsetManager logid timestamp +#else + getOffset logid LatestTimestamp = + maybe 0 K.getNextSparseOffset <$> K.getLatestSparseOffset sc.scOffsetManager logid + getOffset logid EarliestTimestamp = + fromMaybe 0 <$> K.getOldestSparseOffset sc.scOffsetManager logid + -- Return the earliest offset whose timestamp is greater than or equal to + -- the given timestamp. + -- + -- TODO: actually, this is not supported currently. + getOffset logid timestamp = + fromMaybe (-1) <$> K.getSparseOffsetByTimestamp sc.scOffsetManager logid timestamp +#endif -------------------- -- 8: OffsetCommit diff --git a/hstream-kafka/HStream/Kafka/Server/Handler/SparseOffset/Offset.hs b/hstream-kafka/HStream/Kafka/Server/Handler/SparseOffset/Offset.hs new file mode 100644 index 000000000..f6d607a93 --- /dev/null +++ b/hstream-kafka/HStream/Kafka/Server/Handler/SparseOffset/Offset.hs @@ -0,0 +1,10 @@ +{-# LANGUAGE PatternSynonyms #-} +{-# OPTIONS_GHC -Wno-unused-top-binds #-} + +module HStream.Kafka.Server.Handler.SparseOffset.Offset + ( handleListOffsets + ) where + +#define HSTREAM_SPARSE_OFFSET +#include "HStream/Kafka/Server/Handler/Offset.hs" +#undef HSTREAM_SPARSE_OFFSET diff --git a/hstream-kafka/hstream-kafka.cabal b/hstream-kafka/hstream-kafka.cabal index 31eef9df9..43882b375 100644 --- a/hstream-kafka/hstream-kafka.cabal +++ b/hstream-kafka/hstream-kafka.cabal @@ -173,6 +173,7 @@ library HStream.Kafka.Server.Handler.Group HStream.Kafka.Server.Handler.Offset HStream.Kafka.Server.Handler.Produce + HStream.Kafka.Server.Handler.SparseOffset.Offset HStream.Kafka.Server.Handler.Topic HStream.Kafka.Server.Security.SASL @@ -193,7 +194,7 @@ library extra-libraries: rdkafka++ include-dirs: - include /usr/local/include external/asio/asio/include include + . include /usr/local/include external/asio/asio/include extra-lib-dirs: /usr/local/lib hs-source-dirs: . @@ -245,6 +246,7 @@ library default-language: GHC2021 default-extensions: + CPP DerivingStrategies LambdaCase MultiWayIf @@ -262,9 +264,9 @@ test-suite hstream-kafka-test HStream.Kafka.Common.AclEntrySpec HStream.Kafka.Common.AclSpec HStream.Kafka.Common.AuthorizerSpec + HStream.Kafka.Common.ConfigSpec HStream.Kafka.Common.OffsetManagerSpec HStream.Kafka.Common.TestUtils - HStream.Kafka.Common.ConfigSpec hs-source-dirs: tests build-depends: