From 75cf8751cae5b25336945e493fe4453a745af9cd Mon Sep 17 00:00:00 2001 From: Commelina Date: Thu, 7 Sep 2023 16:30:25 +0800 Subject: [PATCH] kafka: return all topics when received `[]` in `Metadata` handler (#1597) --- .../src/HStream/Server/KafkaHandler/Basic.hs | 25 ++++++++++++------- 1 file changed, 16 insertions(+), 9 deletions(-) diff --git a/hstream/src/HStream/Server/KafkaHandler/Basic.hs b/hstream/src/HStream/Server/KafkaHandler/Basic.hs index 176591fca..8993f690c 100644 --- a/hstream/src/HStream/Server/KafkaHandler/Basic.hs +++ b/hstream/src/HStream/Server/KafkaHandler/Basic.hs @@ -76,17 +76,24 @@ handleMetadataV1 ctx@ServerContext{..} _ req = do let ctlId = fromIntegral serverID let (K.MetadataRequestV0 reqTopics) = req case reqTopics of - Nothing -> do + Nothing -> returnAllTopics respBrokers ctlId + Just v + | V.null v -> returnAllTopics respBrokers ctlId + | otherwise -> do + let topicNames = V.map (\K.MetadataRequestTopicV0{..} -> name) v + respTopics <- forM topicNames getRespTopic + return $ K.MetadataResponseV1 (Just respBrokers) ctlId (Just respTopics) + where + returnAllTopics :: V.Vector K.MetadataResponseBrokerV1 + -> Int32 + -> IO K.MetadataResponseV1 + returnAllTopics respBrokers_ ctlId_ = do -- FIXME: `serverID` is a `Word32` but kafka expects an `Int32`, -- causing a potential overflow. allStreamNames <- S.findStreams scLDClient S.StreamTypeStream <&> (fmap (Utils.cBytesToText . S.streamName)) respTopics <- forM allStreamNames getRespTopic <&> V.fromList - return $ K.MetadataResponseV1 (Just respBrokers) ctlId (Just respTopics) - Just v -> do - let topicNames = V.map (\K.MetadataRequestTopicV0{..} -> name) v - respTopics <- forM topicNames getRespTopic - return $ K.MetadataResponseV1 (Just respBrokers) ctlId (Just respTopics) - where + return $ K.MetadataResponseV1 (Just respBrokers_) ctlId_ (Just respTopics) + getBrokers :: IO (V.Vector K.MetadataResponseBrokerV1) getBrokers = do GRPC.DescribeClusterResponse{..} <- Core.describeCluster ctx @@ -131,8 +138,8 @@ handleMetadataV1 ctx@ServerContext{..} _ req = do { errorCode = K.NONE , partitionIndex = (fromIntegral idx) , leaderId = theNodeId - , replicaNodes = Just (V.singleton 0) -- FIXME: hardcoded - , isrNodes = Just (V.singleton 0) -- FIXME: hardcoded + , replicaNodes = Just (V.singleton theNodeId) -- FIXME: what should it be? + , isrNodes = Just (V.singleton theNodeId) -- FIXME: what should it be? } return $ K.MetadataResponseTopicV1 K.NONE topicName False (Just respPartitions)