diff --git a/hstream-kafka/HStream/Kafka/Common/OffsetManager.hs b/hstream-kafka/HStream/Kafka/Common/OffsetManager.hs index f482e66dc..e73a81255 100644 --- a/hstream-kafka/HStream/Kafka/Common/OffsetManager.hs +++ b/hstream-kafka/HStream/Kafka/Common/OffsetManager.hs @@ -8,23 +8,20 @@ module HStream.Kafka.Common.OffsetManager , cleanOffsetCache , getOldestOffset , getLatestOffset + , getLatestOffsetWithLsn , getOffsetByTimestamp ) where import Control.Concurrent import Control.Exception -import Control.Monad -import Data.ByteString (ByteString) import qualified Data.HashTable.IO as H import Data.Int import Data.Word import GHC.Stack (HasCallStack) +import HStream.Kafka.Common.Read (readOneRecord) import HStream.Kafka.Common.RecordFormat -import qualified HStream.Logger as Log import qualified HStream.Store as S -import qualified HStream.Store.Internal.LogDevice as S -import qualified Kafka.Protocol.Encoding as K ------------------------------------------------------------------------------- @@ -77,53 +74,31 @@ cleanOffsetCache :: OffsetManager -> Word64 -> IO () cleanOffsetCache OffsetManager{..} = H.delete offsets getOldestOffset :: HasCallStack => OffsetManager -> Word64 -> IO (Maybe Int64) -getOldestOffset OffsetManager{..} logid = +getOldestOffset OffsetManager{..} logid = do -- Actually, we only need the first lsn but there is no easy way to get - (fmap offset) <$> readOneRecord store reader logid (pure (S.LSN_MIN, S.LSN_MAX)) + (fmap $ offset . third) <$> readOneRecord store reader logid (pure (S.LSN_MIN, S.LSN_MAX)) getLatestOffset :: HasCallStack => OffsetManager -> Word64 -> IO (Maybe Int64) -getLatestOffset OffsetManager{..} logid = +getLatestOffset o logid = (fmap fst) <$> getLatestOffsetWithLsn o logid + +getLatestOffsetWithLsn + :: HasCallStack + => OffsetManager -> Word64 -> IO (Maybe (Int64, S.LSN)) +getLatestOffsetWithLsn OffsetManager{..} logid = let getLsn = do tailLsn <- S.getTailLSN store logid pure (tailLsn, tailLsn) - in (fmap offset) <$> readOneRecord store reader logid getLsn + in do m <- readOneRecord store reader logid getLsn + pure $ do (lsn, _, record) <- m + pure (offset record, lsn) getOffsetByTimestamp :: HasCallStack => OffsetManager -> Word64 -> Int64 -> IO (Maybe Int64) getOffsetByTimestamp OffsetManager{..} logid timestamp = do let getLsn = do lsn <- S.findTime store logid timestamp S.FindKeyStrict pure (lsn, lsn) - in (fmap offset) <$> readOneRecord store reader logid getLsn + in (fmap $ offset . third) <$> readOneRecord store reader logid getLsn ------------------------------------------------------------------------------- --- Return the first read RecordFormat -readOneRecord - :: HasCallStack - => S.LDClient - -> S.LDReader - -> Word64 - -> IO (S.LSN, S.LSN) - -> IO (Maybe RecordFormat) -readOneRecord store reader logid getLsn = do - -- FIXME: This method is blocking until the state can be determined or an - -- error occurred. Directly read without check isLogEmpty will also block a - -- while for the first time since the state can be determined. - isEmpty <- S.isLogEmpty store logid - if isEmpty - then pure Nothing - else do (start, end) <- getLsn - finally (acquire start end) release - where - acquire start end = do - S.readerStartReading reader logid start end - dataRecords <- S.readerReadAllowGap @ByteString reader 1 - case dataRecords of - Right [S.DataRecord{..}] -> Just <$> K.runGet recordPayload - _ -> do Log.fatal $ "readOneRecord read " <> Log.build logid - <> "with lsn (" <> Log.build start <> " " - <> Log.build end <> ") " - <> "get unexpected result " - <> Log.buildString' dataRecords - ioError $ userError $ "Invalid reader result " <> show dataRecords - release = do - isReading <- S.readerIsReading reader logid - when isReading $ S.readerStopReading reader logid +third :: (a, b, c) -> c +third (_, _, x) = x +{-# INLINE third #-} diff --git a/hstream-kafka/HStream/Kafka/Common/Read.hs b/hstream-kafka/HStream/Kafka/Common/Read.hs new file mode 100644 index 000000000..863d0633b --- /dev/null +++ b/hstream-kafka/HStream/Kafka/Common/Read.hs @@ -0,0 +1,48 @@ +module HStream.Kafka.Common.Read + ( readOneRecord + ) where + +import Control.Exception +import Control.Monad +import Data.ByteString (ByteString) +import Data.Word +import GHC.Stack (HasCallStack) + +import HStream.Kafka.Common.RecordFormat +import qualified HStream.Logger as Log +import qualified HStream.Store as S +import qualified HStream.Store.Internal.LogDevice as S +import qualified Kafka.Protocol.Encoding as K + +-- Return the first read RecordFormat +readOneRecord + :: HasCallStack + => S.LDClient + -> S.LDReader + -> Word64 + -> IO (S.LSN, S.LSN) + -> IO (Maybe (S.LSN, S.LSN, RecordFormat)) +readOneRecord store reader logid getLsn = do + -- FIXME: This method is blocking until the state can be determined or an + -- error occurred. Directly read without check isLogEmpty will also block a + -- while for the first time since the state can be determined. + isEmpty <- S.isLogEmpty store logid + if isEmpty + then pure Nothing + else do (start, end) <- getLsn + finally (acquire start end) release + where + acquire start end = do + S.readerStartReading reader logid start end + dataRecords <- S.readerReadAllowGap @ByteString reader 1 + case dataRecords of + Right [S.DataRecord{..}] -> (Just . (start, end, )) <$> K.runGet recordPayload + _ -> do Log.fatal $ "readOneRecord read " <> Log.build logid + <> "with lsn (" <> Log.build start <> " " + <> Log.build end <> ") " + <> "get unexpected result " + <> Log.buildString' dataRecords + ioError $ userError $ "Invalid reader result " <> show dataRecords + release = do + isReading <- S.readerIsReading reader logid + when isReading $ S.readerStopReading reader logid diff --git a/hstream-kafka/HStream/Kafka/Server/Handler/Consume.hs b/hstream-kafka/HStream/Kafka/Server/Handler/Consume.hs index 441925529..e75f4eb41 100644 --- a/hstream-kafka/HStream/Kafka/Server/Handler/Consume.hs +++ b/hstream-kafka/HStream/Kafka/Server/Handler/Consume.hs @@ -5,21 +5,22 @@ module HStream.Kafka.Server.Handler.Consume ) where import Control.Monad -import qualified Data.ByteString as BS +import qualified Data.ByteString as BS import Data.Int -import qualified Data.List as L +import qualified Data.List as L import Data.Maybe -import qualified Data.Vector as V +import qualified Data.Vector as V -import qualified HStream.Kafka.Common.RecordFormat as K -import HStream.Kafka.Server.Types (ServerContext (..)) -import qualified HStream.Logger as Log -import qualified HStream.Store as S -import qualified HStream.Utils as U -import qualified Kafka.Protocol.Encoding as K -import qualified Kafka.Protocol.Error as K -import qualified Kafka.Protocol.Message as K -import qualified Kafka.Protocol.Service as K +import qualified HStream.Kafka.Common.OffsetManager as K +import qualified HStream.Kafka.Common.RecordFormat as K +import HStream.Kafka.Server.Types (ServerContext (..)) +import qualified HStream.Logger as Log +import qualified HStream.Store as S +import qualified HStream.Utils as U +import qualified Kafka.Protocol.Encoding as K +import qualified Kafka.Protocol.Error as K +import qualified Kafka.Protocol.Message as K +import qualified Kafka.Protocol.Service as K -------------------- -- 1: Fetch @@ -42,10 +43,13 @@ handleFetchV0 ServerContext{..} _ K.FetchRequestV0{..} = case topics of -- Read one topic, then update total bytes left and time left -- Note: it is important to know if this is the first topic! (totalMaxBytes_m', timeLeftMs', resp) <- - readSingleTopic scLDClient topicReq acc_totalMaxBytes_m acc_timeLeft acc_isFirstTopic + readSingleTopic scLDClient scOffsetManager topicReq + acc_totalMaxBytes_m acc_timeLeft + acc_isFirstTopic return ( -- `isJust totalMaxBytes_m' && totalMaxBytes_m' == acc_totalMaxBytes_m` means -- there is nothing read from this topic. - if isJust totalMaxBytes_m' && totalMaxBytes_m' == acc_totalMaxBytes_m then acc_isFirstTopic else False + if isJust totalMaxBytes_m' && totalMaxBytes_m' == acc_totalMaxBytes_m + then acc_isFirstTopic else False , totalMaxBytes_m' , timeLeftMs' , acc_resps ++ [resp] @@ -58,12 +62,13 @@ handleFetchV0 ServerContext{..} _ K.FetchRequestV0{..} = case topics of readSingleTopic :: S.LDClient + -> K.OffsetManager -> K.FetchTopicV0 -> Maybe Int32 -- limit: total bytes left now -> Int32 -- limit: time left now -> Bool -- is this the first topic? (if so, omit the bytes limit of this ) -> IO (Maybe Int32, Int32, K.FetchableTopicResponseV0) -- (total bytes left, time left, response of this topic) -readSingleTopic ldclient K.FetchTopicV0{..} totalMaxBytes_m timeLeftMs isFirstTopic = case partitions of +readSingleTopic ldclient om K.FetchTopicV0{..} totalMaxBytes_m timeLeftMs isFirstTopic = case partitions of K.KaArray Nothing -> return (totalMaxBytes_m, timeLeftMs, K.FetchableTopicResponseV0 topic (K.KaArray Nothing)) K.KaArray (Just parts) -> do orderedParts <- S.listStreamPartitionsOrdered ldclient (S.transToTopicStreamName topic) @@ -78,41 +83,79 @@ readSingleTopic ldclient K.FetchTopicV0{..} totalMaxBytes_m timeLeftMs isFirstTo if acc_timeLeft <= 0 then return (acc_isFirstPartition, acc_totalMaxBytes_m, acc_timeLeft, acc_resps) else do - let (_,logId) = orderedParts V.! fromIntegral partition - (len,timeLeftMs',resp) <- - readSinglePartition ldclient reader logId partition - fetchOffset - acc_totalMaxBytes_m - partitionMaxBytes - acc_timeLeft - acc_isFirstPartition - isFirstTopic - return ( if len > 0 then False else acc_isFirstPartition - , fmap (\x -> x - len) acc_totalMaxBytes_m - , timeLeftMs' - , acc_resps ++ [resp] - ) + let (_, logId) = orderedParts V.! fromIntegral partition + mlsn <- getPartitionLsn ldclient om logId fetchOffset + case mlsn of + Nothing -> + let resp = errorPartitionResponseV0 partition K.OFFSET_OUT_OF_RANGE + in pure (acc_isFirstPartition, acc_totalMaxBytes_m, acc_timeLeft, acc_resps ++ [resp]) + Just (S.LSN_INVALID, S.LSN_INVALID, hioffset) -> + let resp = K.PartitionDataV0 partition K.NONE hioffset (Just "") + in pure (acc_isFirstPartition, acc_totalMaxBytes_m, acc_timeLeft, acc_resps ++ [resp]) + Just (startlsn, endlsn, hioffset) -> do + (len, timeLeftMs', resp) <- + readSinglePartition reader + logId (startlsn, endlsn) + partition + fetchOffset + hioffset + acc_totalMaxBytes_m + partitionMaxBytes + acc_timeLeft + acc_isFirstPartition + isFirstTopic + return ( if len > 0 then False else acc_isFirstPartition + , fmap (\x -> x - len) acc_totalMaxBytes_m + , timeLeftMs' + , acc_resps ++ [resp] + ) ) (True, totalMaxBytes_m, timeLeftMs, []) parts -- !!! FIXME: update time left!!! return ( totalMaxBytes_m' , timeLeftMs' , K.FetchableTopicResponseV0 topic (K.KaArray $ Just $ V.fromList resps) ) +-- Return tuple of (startLsn, endLsn, highwaterOffset) +getPartitionLsn + :: S.LDClient -> K.OffsetManager + -> S.C_LogID + -> Int64 -- ^ kafka start offset + -> IO (Maybe (S.LSN, S.LSN, Int64)) +getPartitionLsn ldclient om logid offset = do + m <- K.getLatestOffsetWithLsn om logid + case m of + Just (latestOffset, endLsn) -> do + if | offset < latestOffset -> do + let key = U.int2cbytes offset + (_, startLsn) <- S.findKey ldclient logid key S.FindKeyStrict + pure $ Just (startLsn, endLsn, latestOffset + 1) + | offset == latestOffset -> + pure $ Just (endLsn, endLsn, latestOffset + 1) + | offset == (latestOffset + 1) -> + pure $ Just (S.LSN_INVALID, S.LSN_INVALID, latestOffset + 1) + | offset > (latestOffset + 1) -> pure Nothing + -- ghc is not smart enough to detact my partten matching is complete + | otherwise -> error "This should not be reached (getPartitionLsn)" + -- log is empty, which means any offsets are out of range + Nothing -> do Log.debug "Empty LatestOffsetWithLsn" + pure Nothing + readSinglePartition - :: S.LDClient - -> S.LDReader -- the logdevice reader of this **topic**, but only one logId is read at the same time + :: S.LDReader -- the logdevice reader of this **topic**, but only one logId is read at the same time -> S.C_LogID -- logId of this partition + -> (S.LSN, S.LSN) -- ^ (start_lsn, end_lsn) -> Int32 -- partition index: 0, 1, ... - -> Int64 -- start offset (kafka) + -> Int64 -- ^ kafka read start offset + -> Int64 -- ^ kafka highwater offset -> Maybe Int32 -- limit: total bytes left now, `Nothing` means no limit -> Int32 -- limit: bytes left of this partition now -> Int32 -- limit: time left now -> Bool -- is this the first partition? (if so, return the data even if it exceeds the limit) -> Bool -- is this the first topic? (if so and this is also the first partition, return the data even if it exceeds the limit) -> IO (Int32, Int32, K.PartitionDataV0) -- (the number of bytes read, time left, response of this partition) -readSinglePartition ldclient reader logId partitionIndex offset totalMaxBytes_m partitionMaxBytes timeLeftMs isFirstPartition isFirstTopic = do - (_, startLSN) <- S.findKey ldclient logId (U.int2cbytes offset) S.FindKeyStrict - endLSN <- S.getTailLSN ldclient logId +readSinglePartition reader logId (startLSN, endLSN) partitionIndex + offset highwaterOffset totalMaxBytes_m partitionMaxBytes + timeLeftMs isFirstPartition isFirstTopic = do S.readerSetTimeout reader timeLeftMs S.readerSetWaitOnlyWhenNoData reader S.readerStartReading reader logId startLSN endLSN -- FIXME: what should the end be? Is tailLSN proper? @@ -122,7 +165,7 @@ readSinglePartition ldclient reader logId partitionIndex offset totalMaxBytes_m S.readerStopReading reader logId -- FIXME: does `readerStopReading` actually stop the reading of the logId? let returnBytes = BS.concat acc -- FIXME: we just concat the payload bytes of each record, is this proper? returnBytesLen = BS.length returnBytes -- FIXME: is the length correct? - let resp = K.PartitionDataV0 partitionIndex K.NONE 0 (Just returnBytes) -- FIXME: exceptions? + let resp = K.PartitionDataV0 partitionIndex K.NONE highwaterOffset (Just returnBytes) -- FIXME: exceptions? return (fromIntegral returnBytesLen, timeLeftMs', resp) -- !!! FIXME: update time left!!! where -- Note: `go` reads records from a logId **one by one** until the time limit or bytes limit is reached. @@ -171,3 +214,10 @@ readSinglePartition ldclient reader logId partitionIndex offset totalMaxBytes_m timeLeft (partitionBytesLeft - fromIntegral recordBytesLen) (fmap (\x -> x - fromIntegral recordBytesLen) totalBytesLeft_m) + +------------------------------------------------------------------------------- + +errorPartitionResponseV0 :: Int32 -> K.ErrorCode -> K.PartitionDataV0 +errorPartitionResponseV0 partitionIndex ec = + K.PartitionDataV0 partitionIndex ec (-1) (Just "") +{-# INLINE errorPartitionResponseV0 #-} diff --git a/hstream-kafka/hstream-kafka.cabal b/hstream-kafka/hstream-kafka.cabal index 554dd1188..c74497f19 100644 --- a/hstream-kafka/hstream-kafka.cabal +++ b/hstream-kafka/hstream-kafka.cabal @@ -106,6 +106,7 @@ library import: shared-properties exposed-modules: HStream.Kafka.Common.OffsetManager + HStream.Kafka.Common.Read HStream.Kafka.Common.RecordFormat HStream.Kafka.Group.GroupMetadataManager HStream.Kafka.Group.OffsetsStore @@ -120,9 +121,9 @@ library HStream.Kafka.Server.Config.Types HStream.Kafka.Server.Handler.Basic HStream.Kafka.Server.Handler.Consume + HStream.Kafka.Server.Handler.Offset HStream.Kafka.Server.Handler.Produce HStream.Kafka.Server.Handler.Topic - HStream.Kafka.Server.Handler.Offset hs-source-dirs: . build-depends: @@ -133,8 +134,8 @@ library , containers , digest , directory - , hashtables , hashable + , hashtables , hstream-api-hs , hstream-common , hstream-common-base @@ -148,14 +149,15 @@ library , stm , text , time + , unordered-containers , vector , yaml , Z-Data - , unordered-containers default-language: GHC2021 default-extensions: DerivingStrategies LambdaCase + MultiWayIf OverloadedStrings RecordWildCards