Skip to content

Commit

Permalink
hstream kafka: fix fetch highwater_offset (#1628)
Browse files Browse the repository at this point in the history
  • Loading branch information
4eUeP authored Oct 8, 2023
1 parent de9f914 commit 11aa7c9
Show file tree
Hide file tree
Showing 4 changed files with 156 additions and 81 deletions.
59 changes: 17 additions & 42 deletions hstream-kafka/HStream/Kafka/Common/OffsetManager.hs
Original file line number Diff line number Diff line change
Expand Up @@ -8,23 +8,20 @@ module HStream.Kafka.Common.OffsetManager
, cleanOffsetCache
, getOldestOffset
, getLatestOffset
, getLatestOffsetWithLsn
, getOffsetByTimestamp
) where

import Control.Concurrent
import Control.Exception
import Control.Monad
import Data.ByteString (ByteString)
import qualified Data.HashTable.IO as H
import Data.Int
import Data.Word
import GHC.Stack (HasCallStack)

import HStream.Kafka.Common.Read (readOneRecord)
import HStream.Kafka.Common.RecordFormat
import qualified HStream.Logger as Log
import qualified HStream.Store as S
import qualified HStream.Store.Internal.LogDevice as S
import qualified Kafka.Protocol.Encoding as K

-------------------------------------------------------------------------------

Expand Down Expand Up @@ -77,53 +74,31 @@ cleanOffsetCache :: OffsetManager -> Word64 -> IO ()
cleanOffsetCache OffsetManager{..} = H.delete offsets

getOldestOffset :: HasCallStack => OffsetManager -> Word64 -> IO (Maybe Int64)
getOldestOffset OffsetManager{..} logid =
getOldestOffset OffsetManager{..} logid = do
-- Actually, we only need the first lsn but there is no easy way to get
(fmap offset) <$> readOneRecord store reader logid (pure (S.LSN_MIN, S.LSN_MAX))
(fmap $ offset . third) <$> readOneRecord store reader logid (pure (S.LSN_MIN, S.LSN_MAX))

getLatestOffset :: HasCallStack => OffsetManager -> Word64 -> IO (Maybe Int64)
getLatestOffset OffsetManager{..} logid =
getLatestOffset o logid = (fmap fst) <$> getLatestOffsetWithLsn o logid

getLatestOffsetWithLsn
:: HasCallStack
=> OffsetManager -> Word64 -> IO (Maybe (Int64, S.LSN))
getLatestOffsetWithLsn OffsetManager{..} logid =
let getLsn = do tailLsn <- S.getTailLSN store logid
pure (tailLsn, tailLsn)
in (fmap offset) <$> readOneRecord store reader logid getLsn
in do m <- readOneRecord store reader logid getLsn
pure $ do (lsn, _, record) <- m
pure (offset record, lsn)

getOffsetByTimestamp :: HasCallStack => OffsetManager -> Word64 -> Int64 -> IO (Maybe Int64)
getOffsetByTimestamp OffsetManager{..} logid timestamp = do
let getLsn = do lsn <- S.findTime store logid timestamp S.FindKeyStrict
pure (lsn, lsn)
in (fmap offset) <$> readOneRecord store reader logid getLsn
in (fmap $ offset . third) <$> readOneRecord store reader logid getLsn

-------------------------------------------------------------------------------

-- Return the first read RecordFormat
readOneRecord
:: HasCallStack
=> S.LDClient
-> S.LDReader
-> Word64
-> IO (S.LSN, S.LSN)
-> IO (Maybe RecordFormat)
readOneRecord store reader logid getLsn = do
-- FIXME: This method is blocking until the state can be determined or an
-- error occurred. Directly read without check isLogEmpty will also block a
-- while for the first time since the state can be determined.
isEmpty <- S.isLogEmpty store logid
if isEmpty
then pure Nothing
else do (start, end) <- getLsn
finally (acquire start end) release
where
acquire start end = do
S.readerStartReading reader logid start end
dataRecords <- S.readerReadAllowGap @ByteString reader 1
case dataRecords of
Right [S.DataRecord{..}] -> Just <$> K.runGet recordPayload
_ -> do Log.fatal $ "readOneRecord read " <> Log.build logid
<> "with lsn (" <> Log.build start <> " "
<> Log.build end <> ") "
<> "get unexpected result "
<> Log.buildString' dataRecords
ioError $ userError $ "Invalid reader result " <> show dataRecords
release = do
isReading <- S.readerIsReading reader logid
when isReading $ S.readerStopReading reader logid
third :: (a, b, c) -> c
third (_, _, x) = x
{-# INLINE third #-}
48 changes: 48 additions & 0 deletions hstream-kafka/HStream/Kafka/Common/Read.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
module HStream.Kafka.Common.Read
( readOneRecord
) where

import Control.Exception
import Control.Monad
import Data.ByteString (ByteString)
import Data.Word
import GHC.Stack (HasCallStack)

import HStream.Kafka.Common.RecordFormat
import qualified HStream.Logger as Log
import qualified HStream.Store as S
import qualified HStream.Store.Internal.LogDevice as S
import qualified Kafka.Protocol.Encoding as K

-- Return the first read RecordFormat
readOneRecord
:: HasCallStack
=> S.LDClient
-> S.LDReader
-> Word64
-> IO (S.LSN, S.LSN)
-> IO (Maybe (S.LSN, S.LSN, RecordFormat))
readOneRecord store reader logid getLsn = do
-- FIXME: This method is blocking until the state can be determined or an
-- error occurred. Directly read without check isLogEmpty will also block a
-- while for the first time since the state can be determined.
isEmpty <- S.isLogEmpty store logid
if isEmpty
then pure Nothing
else do (start, end) <- getLsn
finally (acquire start end) release
where
acquire start end = do
S.readerStartReading reader logid start end
dataRecords <- S.readerReadAllowGap @ByteString reader 1
case dataRecords of
Right [S.DataRecord{..}] -> (Just . (start, end, )) <$> K.runGet recordPayload
_ -> do Log.fatal $ "readOneRecord read " <> Log.build logid
<> "with lsn (" <> Log.build start <> " "
<> Log.build end <> ") "
<> "get unexpected result "
<> Log.buildString' dataRecords
ioError $ userError $ "Invalid reader result " <> show dataRecords
release = do
isReading <- S.readerIsReading reader logid
when isReading $ S.readerStopReading reader logid
122 changes: 86 additions & 36 deletions hstream-kafka/HStream/Kafka/Server/Handler/Consume.hs
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,22 @@ module HStream.Kafka.Server.Handler.Consume
) where

import Control.Monad
import qualified Data.ByteString as BS
import qualified Data.ByteString as BS
import Data.Int
import qualified Data.List as L
import qualified Data.List as L
import Data.Maybe
import qualified Data.Vector as V
import qualified Data.Vector as V

import qualified HStream.Kafka.Common.RecordFormat as K
import HStream.Kafka.Server.Types (ServerContext (..))
import qualified HStream.Logger as Log
import qualified HStream.Store as S
import qualified HStream.Utils as U
import qualified Kafka.Protocol.Encoding as K
import qualified Kafka.Protocol.Error as K
import qualified Kafka.Protocol.Message as K
import qualified Kafka.Protocol.Service as K
import qualified HStream.Kafka.Common.OffsetManager as K
import qualified HStream.Kafka.Common.RecordFormat as K
import HStream.Kafka.Server.Types (ServerContext (..))
import qualified HStream.Logger as Log
import qualified HStream.Store as S
import qualified HStream.Utils as U
import qualified Kafka.Protocol.Encoding as K
import qualified Kafka.Protocol.Error as K
import qualified Kafka.Protocol.Message as K
import qualified Kafka.Protocol.Service as K

--------------------
-- 1: Fetch
Expand All @@ -42,10 +43,13 @@ handleFetchV0 ServerContext{..} _ K.FetchRequestV0{..} = case topics of
-- Read one topic, then update total bytes left and time left
-- Note: it is important to know if this is the first topic!
(totalMaxBytes_m', timeLeftMs', resp) <-
readSingleTopic scLDClient topicReq acc_totalMaxBytes_m acc_timeLeft acc_isFirstTopic
readSingleTopic scLDClient scOffsetManager topicReq
acc_totalMaxBytes_m acc_timeLeft
acc_isFirstTopic
return ( -- `isJust totalMaxBytes_m' && totalMaxBytes_m' == acc_totalMaxBytes_m` means
-- there is nothing read from this topic.
if isJust totalMaxBytes_m' && totalMaxBytes_m' == acc_totalMaxBytes_m then acc_isFirstTopic else False
if isJust totalMaxBytes_m' && totalMaxBytes_m' == acc_totalMaxBytes_m
then acc_isFirstTopic else False
, totalMaxBytes_m'
, timeLeftMs'
, acc_resps ++ [resp]
Expand All @@ -58,12 +62,13 @@ handleFetchV0 ServerContext{..} _ K.FetchRequestV0{..} = case topics of

readSingleTopic
:: S.LDClient
-> K.OffsetManager
-> K.FetchTopicV0
-> Maybe Int32 -- limit: total bytes left now
-> Int32 -- limit: time left now
-> Bool -- is this the first topic? (if so, omit the bytes limit of this )
-> IO (Maybe Int32, Int32, K.FetchableTopicResponseV0) -- (total bytes left, time left, response of this topic)
readSingleTopic ldclient K.FetchTopicV0{..} totalMaxBytes_m timeLeftMs isFirstTopic = case partitions of
readSingleTopic ldclient om K.FetchTopicV0{..} totalMaxBytes_m timeLeftMs isFirstTopic = case partitions of
K.KaArray Nothing -> return (totalMaxBytes_m, timeLeftMs, K.FetchableTopicResponseV0 topic (K.KaArray Nothing))
K.KaArray (Just parts) -> do
orderedParts <- S.listStreamPartitionsOrdered ldclient (S.transToTopicStreamName topic)
Expand All @@ -78,41 +83,79 @@ readSingleTopic ldclient K.FetchTopicV0{..} totalMaxBytes_m timeLeftMs isFirstTo
if acc_timeLeft <= 0 then
return (acc_isFirstPartition, acc_totalMaxBytes_m, acc_timeLeft, acc_resps)
else do
let (_,logId) = orderedParts V.! fromIntegral partition
(len,timeLeftMs',resp) <-
readSinglePartition ldclient reader logId partition
fetchOffset
acc_totalMaxBytes_m
partitionMaxBytes
acc_timeLeft
acc_isFirstPartition
isFirstTopic
return ( if len > 0 then False else acc_isFirstPartition
, fmap (\x -> x - len) acc_totalMaxBytes_m
, timeLeftMs'
, acc_resps ++ [resp]
)
let (_, logId) = orderedParts V.! fromIntegral partition
mlsn <- getPartitionLsn ldclient om logId fetchOffset
case mlsn of
Nothing ->
let resp = errorPartitionResponseV0 partition K.OFFSET_OUT_OF_RANGE
in pure (acc_isFirstPartition, acc_totalMaxBytes_m, acc_timeLeft, acc_resps ++ [resp])
Just (S.LSN_INVALID, S.LSN_INVALID, hioffset) ->
let resp = K.PartitionDataV0 partition K.NONE hioffset (Just "")
in pure (acc_isFirstPartition, acc_totalMaxBytes_m, acc_timeLeft, acc_resps ++ [resp])
Just (startlsn, endlsn, hioffset) -> do
(len, timeLeftMs', resp) <-
readSinglePartition reader
logId (startlsn, endlsn)
partition
fetchOffset
hioffset
acc_totalMaxBytes_m
partitionMaxBytes
acc_timeLeft
acc_isFirstPartition
isFirstTopic
return ( if len > 0 then False else acc_isFirstPartition
, fmap (\x -> x - len) acc_totalMaxBytes_m
, timeLeftMs'
, acc_resps ++ [resp]
)
) (True, totalMaxBytes_m, timeLeftMs, []) parts -- !!! FIXME: update time left!!!
return ( totalMaxBytes_m'
, timeLeftMs'
, K.FetchableTopicResponseV0 topic (K.KaArray $ Just $ V.fromList resps)
)

-- Return tuple of (startLsn, endLsn, highwaterOffset)
getPartitionLsn
:: S.LDClient -> K.OffsetManager
-> S.C_LogID
-> Int64 -- ^ kafka start offset
-> IO (Maybe (S.LSN, S.LSN, Int64))
getPartitionLsn ldclient om logid offset = do
m <- K.getLatestOffsetWithLsn om logid
case m of
Just (latestOffset, endLsn) -> do
if | offset < latestOffset -> do
let key = U.int2cbytes offset
(_, startLsn) <- S.findKey ldclient logid key S.FindKeyStrict
pure $ Just (startLsn, endLsn, latestOffset + 1)
| offset == latestOffset ->
pure $ Just (endLsn, endLsn, latestOffset + 1)
| offset == (latestOffset + 1) ->
pure $ Just (S.LSN_INVALID, S.LSN_INVALID, latestOffset + 1)
| offset > (latestOffset + 1) -> pure Nothing
-- ghc is not smart enough to detact my partten matching is complete
| otherwise -> error "This should not be reached (getPartitionLsn)"
-- log is empty, which means any offsets are out of range
Nothing -> do Log.debug "Empty LatestOffsetWithLsn"
pure Nothing

readSinglePartition
:: S.LDClient
-> S.LDReader -- the logdevice reader of this **topic**, but only one logId is read at the same time
:: S.LDReader -- the logdevice reader of this **topic**, but only one logId is read at the same time
-> S.C_LogID -- logId of this partition
-> (S.LSN, S.LSN) -- ^ (start_lsn, end_lsn)
-> Int32 -- partition index: 0, 1, ...
-> Int64 -- start offset (kafka)
-> Int64 -- ^ kafka read start offset
-> Int64 -- ^ kafka highwater offset
-> Maybe Int32 -- limit: total bytes left now, `Nothing` means no limit
-> Int32 -- limit: bytes left of this partition now
-> Int32 -- limit: time left now
-> Bool -- is this the first partition? (if so, return the data even if it exceeds the limit)
-> Bool -- is this the first topic? (if so and this is also the first partition, return the data even if it exceeds the limit)
-> IO (Int32, Int32, K.PartitionDataV0) -- (the number of bytes read, time left, response of this partition)
readSinglePartition ldclient reader logId partitionIndex offset totalMaxBytes_m partitionMaxBytes timeLeftMs isFirstPartition isFirstTopic = do
(_, startLSN) <- S.findKey ldclient logId (U.int2cbytes offset) S.FindKeyStrict
endLSN <- S.getTailLSN ldclient logId
readSinglePartition reader logId (startLSN, endLSN) partitionIndex
offset highwaterOffset totalMaxBytes_m partitionMaxBytes
timeLeftMs isFirstPartition isFirstTopic = do
S.readerSetTimeout reader timeLeftMs
S.readerSetWaitOnlyWhenNoData reader
S.readerStartReading reader logId startLSN endLSN -- FIXME: what should the end be? Is tailLSN proper?
Expand All @@ -122,7 +165,7 @@ readSinglePartition ldclient reader logId partitionIndex offset totalMaxBytes_m
S.readerStopReading reader logId -- FIXME: does `readerStopReading` actually stop the reading of the logId?
let returnBytes = BS.concat acc -- FIXME: we just concat the payload bytes of each record, is this proper?
returnBytesLen = BS.length returnBytes -- FIXME: is the length correct?
let resp = K.PartitionDataV0 partitionIndex K.NONE 0 (Just returnBytes) -- FIXME: exceptions?
let resp = K.PartitionDataV0 partitionIndex K.NONE highwaterOffset (Just returnBytes) -- FIXME: exceptions?
return (fromIntegral returnBytesLen, timeLeftMs', resp) -- !!! FIXME: update time left!!!
where
-- Note: `go` reads records from a logId **one by one** until the time limit or bytes limit is reached.
Expand Down Expand Up @@ -171,3 +214,10 @@ readSinglePartition ldclient reader logId partitionIndex offset totalMaxBytes_m
timeLeft
(partitionBytesLeft - fromIntegral recordBytesLen)
(fmap (\x -> x - fromIntegral recordBytesLen) totalBytesLeft_m)

-------------------------------------------------------------------------------

errorPartitionResponseV0 :: Int32 -> K.ErrorCode -> K.PartitionDataV0
errorPartitionResponseV0 partitionIndex ec =
K.PartitionDataV0 partitionIndex ec (-1) (Just "")
{-# INLINE errorPartitionResponseV0 #-}
8 changes: 5 additions & 3 deletions hstream-kafka/hstream-kafka.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ library
import: shared-properties
exposed-modules:
HStream.Kafka.Common.OffsetManager
HStream.Kafka.Common.Read
HStream.Kafka.Common.RecordFormat
HStream.Kafka.Group.GroupMetadataManager
HStream.Kafka.Group.OffsetsStore
Expand All @@ -120,9 +121,9 @@ library
HStream.Kafka.Server.Config.Types
HStream.Kafka.Server.Handler.Basic
HStream.Kafka.Server.Handler.Consume
HStream.Kafka.Server.Handler.Offset
HStream.Kafka.Server.Handler.Produce
HStream.Kafka.Server.Handler.Topic
HStream.Kafka.Server.Handler.Offset

hs-source-dirs: .
build-depends:
Expand All @@ -133,8 +134,8 @@ library
, containers
, digest
, directory
, hashtables
, hashable
, hashtables
, hstream-api-hs
, hstream-common
, hstream-common-base
Expand All @@ -148,14 +149,15 @@ library
, stm
, text
, time
, unordered-containers
, vector
, yaml
, Z-Data
, unordered-containers

default-language: GHC2021
default-extensions:
DerivingStrategies
LambdaCase
MultiWayIf
OverloadedStrings
RecordWildCards

0 comments on commit 11aa7c9

Please sign in to comment.