diff --git a/hstream/src/HStream/Server/KafkaHandler/Basic.hs b/hstream/src/HStream/Server/KafkaHandler/Basic.hs index 176591fca..8993f690c 100644 --- a/hstream/src/HStream/Server/KafkaHandler/Basic.hs +++ b/hstream/src/HStream/Server/KafkaHandler/Basic.hs @@ -76,17 +76,24 @@ handleMetadataV1 ctx@ServerContext{..} _ req = do let ctlId = fromIntegral serverID let (K.MetadataRequestV0 reqTopics) = req case reqTopics of - Nothing -> do + Nothing -> returnAllTopics respBrokers ctlId + Just v + | V.null v -> returnAllTopics respBrokers ctlId + | otherwise -> do + let topicNames = V.map (\K.MetadataRequestTopicV0{..} -> name) v + respTopics <- forM topicNames getRespTopic + return $ K.MetadataResponseV1 (Just respBrokers) ctlId (Just respTopics) + where + returnAllTopics :: V.Vector K.MetadataResponseBrokerV1 + -> Int32 + -> IO K.MetadataResponseV1 + returnAllTopics respBrokers_ ctlId_ = do -- FIXME: `serverID` is a `Word32` but kafka expects an `Int32`, -- causing a potential overflow. allStreamNames <- S.findStreams scLDClient S.StreamTypeStream <&> (fmap (Utils.cBytesToText . S.streamName)) respTopics <- forM allStreamNames getRespTopic <&> V.fromList - return $ K.MetadataResponseV1 (Just respBrokers) ctlId (Just respTopics) - Just v -> do - let topicNames = V.map (\K.MetadataRequestTopicV0{..} -> name) v - respTopics <- forM topicNames getRespTopic - return $ K.MetadataResponseV1 (Just respBrokers) ctlId (Just respTopics) - where + return $ K.MetadataResponseV1 (Just respBrokers_) ctlId_ (Just respTopics) + getBrokers :: IO (V.Vector K.MetadataResponseBrokerV1) getBrokers = do GRPC.DescribeClusterResponse{..} <- Core.describeCluster ctx @@ -131,8 +138,8 @@ handleMetadataV1 ctx@ServerContext{..} _ req = do { errorCode = K.NONE , partitionIndex = (fromIntegral idx) , leaderId = theNodeId - , replicaNodes = Just (V.singleton 0) -- FIXME: hardcoded - , isrNodes = Just (V.singleton 0) -- FIXME: hardcoded + , replicaNodes = Just (V.singleton theNodeId) -- FIXME: what should it be? + , isrNodes = Just (V.singleton theNodeId) -- FIXME: what should it be? } return $ K.MetadataResponseTopicV1 K.NONE topicName False (Just respPartitions)