Skip to content

Commit

Permalink
kafka: return all topics when received [] in Metadata handler (#1597
Browse files Browse the repository at this point in the history
)
  • Loading branch information
Commelina authored Sep 7, 2023
1 parent b97cc91 commit 75cf875
Showing 1 changed file with 16 additions and 9 deletions.
25 changes: 16 additions & 9 deletions hstream/src/HStream/Server/KafkaHandler/Basic.hs
Original file line number Diff line number Diff line change
Expand Up @@ -76,17 +76,24 @@ handleMetadataV1 ctx@ServerContext{..} _ req = do
let ctlId = fromIntegral serverID
let (K.MetadataRequestV0 reqTopics) = req
case reqTopics of
Nothing -> do
Nothing -> returnAllTopics respBrokers ctlId
Just v
| V.null v -> returnAllTopics respBrokers ctlId
| otherwise -> do
let topicNames = V.map (\K.MetadataRequestTopicV0{..} -> name) v
respTopics <- forM topicNames getRespTopic
return $ K.MetadataResponseV1 (Just respBrokers) ctlId (Just respTopics)
where
returnAllTopics :: V.Vector K.MetadataResponseBrokerV1
-> Int32
-> IO K.MetadataResponseV1
returnAllTopics respBrokers_ ctlId_ = do
-- FIXME: `serverID` is a `Word32` but kafka expects an `Int32`,
-- causing a potential overflow.
allStreamNames <- S.findStreams scLDClient S.StreamTypeStream <&> (fmap (Utils.cBytesToText . S.streamName))
respTopics <- forM allStreamNames getRespTopic <&> V.fromList
return $ K.MetadataResponseV1 (Just respBrokers) ctlId (Just respTopics)
Just v -> do
let topicNames = V.map (\K.MetadataRequestTopicV0{..} -> name) v
respTopics <- forM topicNames getRespTopic
return $ K.MetadataResponseV1 (Just respBrokers) ctlId (Just respTopics)
where
return $ K.MetadataResponseV1 (Just respBrokers_) ctlId_ (Just respTopics)

getBrokers :: IO (V.Vector K.MetadataResponseBrokerV1)
getBrokers = do
GRPC.DescribeClusterResponse{..} <- Core.describeCluster ctx
Expand Down Expand Up @@ -131,8 +138,8 @@ handleMetadataV1 ctx@ServerContext{..} _ req = do
{ errorCode = K.NONE
, partitionIndex = (fromIntegral idx)
, leaderId = theNodeId
, replicaNodes = Just (V.singleton 0) -- FIXME: hardcoded
, isrNodes = Just (V.singleton 0) -- FIXME: hardcoded
, replicaNodes = Just (V.singleton theNodeId) -- FIXME: what should it be?
, isrNodes = Just (V.singleton theNodeId) -- FIXME: what should it be?
}
return $
K.MetadataResponseTopicV1 K.NONE topicName False (Just respPartitions)

0 comments on commit 75cf875

Please sign in to comment.