Skip to content

Commit

Permalink
hstream-kafka: handle empty Fetch topic gracefully (#1650)
Browse files Browse the repository at this point in the history
  • Loading branch information
4eUeP authored Oct 19, 2023
1 parent 2c97d71 commit 07e2236
Showing 1 changed file with 28 additions and 2 deletions.
30 changes: 28 additions & 2 deletions hstream-kafka/HStream/Kafka/Server/Handler/Consume.hs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@ module HStream.Kafka.Server.Handler.Consume
( handleFetchV2
) where

import Control.Exception
import Control.Monad
import qualified Data.ByteString as BS
import qualified Data.ByteString.Builder as BB
import Data.Either (isRight)
import Data.Int
import Data.Maybe
import qualified Data.Vector as V
Expand All @@ -25,6 +27,8 @@ import qualified Kafka.Protocol.Error as K
import qualified Kafka.Protocol.Message as K
import qualified Kafka.Protocol.Service as K

-------------------------------------------------------------------------------

type RecordTable =
HT.Dictionary (HT.PrimState IO)
VS.MVector
Expand All @@ -36,7 +40,7 @@ type RecordTable =
handleFetchV2
:: ServerContext -> K.RequestContext
-> K.FetchRequestV2 -> IO K.FetchResponseV2
handleFetchV2 ServerContext{..} _ r = do
handleFetchV2 ServerContext{..} _ r = catchFetchV2 $ do
-- kafka broker just throw java.lang.RuntimeException if topics is null, here
-- we do the same.
let K.NonNullKaArray topicReqs = r.topics
Expand All @@ -49,8 +53,19 @@ handleFetchV2 ServerContext{..} _ r = do
pure (logid, elsn, p)
pure (topic, ps)

let numOfReads = V.sum $
V.map (V.length . (V.filter $ \(_, x, _) -> isRight x) . snd) topics
when (numOfReads < 1) $ do
respTopics <- V.forM topics $ \(topic, partitions) -> do
respPartitionDatas <- V.forM partitions $ \(_, elsn, _) -> do
case elsn of
Left pd -> pure pd
Right _ -> error "LogicError: this should not be right"
pure $ K.FetchableTopicResponseV2 topic (K.NonNullKaArray respPartitionDatas)
let resp = K.FetchResponseV2 0{- TODO: throttleTimeMs -} (K.NonNullKaArray respTopics)
throwIO $ RetFetchRespV2 resp

-- New reader
let numOfReads = V.sum $ V.map (V.length . snd) topics
reader <- S.newLDReader scLDClient (fromIntegral numOfReads) Nothing

-- Start reading
Expand Down Expand Up @@ -116,6 +131,17 @@ handleFetchV2 ServerContext{..} _ r = do

-------------------------------------------------------------------------------

-- TODO: move to Kafka.Protocol.Message.Struct
newtype RetFetchRespV2 = RetFetchRespV2 K.FetchResponseV2
deriving (Show, Eq)

instance Exception RetFetchRespV2

catchFetchV2 :: IO K.FetchResponseV2 -> IO K.FetchResponseV2
catchFetchV2 act = act `catch` \(RetFetchRespV2 resp) -> pure resp

-------------------------------------------------------------------------------

-- Return tuple of (startLsn, tailLsn, highwaterOffset)
--
-- NOTE: tailLsn is LSN_INVALID if the partition is empty
Expand Down

0 comments on commit 07e2236

Please sign in to comment.