diff --git a/hstream-kafka/HStream/Kafka/Client/Cli.hs b/hstream-kafka/HStream/Kafka/Client/Cli.hs index c79a8c681..1e1da764d 100644 --- a/hstream-kafka/HStream/Kafka/Client/Cli.hs +++ b/hstream-kafka/HStream/Kafka/Client/Cli.hs @@ -257,18 +257,23 @@ handleGroupCommand opts (GroupCommandShow n) = handleGroupShow opts n handleGroupList :: Options -> IO () handleGroupList Options{..} = do - let req = K.ListGroupsRequestV0 correlationId <- getCorrelationId - resp <- KA.withSendAndRecv host port (KA.listGroups correlationId req) - when (resp.errorCode /= K.NONE) $ - errorWithoutStackTrace $ "List groups failed: " <> show resp.errorCode - let titles = ["ID", "ProtocolType"] - K.NonNullKaArray groups = resp.groups - lenses = [ Text.unpack . (.groupId) - , Text.unpack . (.protocolType) - ] - stats = (\s -> ($ s) <$> lenses) <$> (V.toList groups) - putStrLn $ simpleShowTable (map (, 30, Table.left) titles) stats + brokers <- findAllBrokers host port + let req = K.ListGroupsRequestV0 + rows <- V.forM brokers $ \broker -> do + resp <- KA.withSendAndRecv broker.host broker.port (KA.listGroups correlationId req) + when (resp.errorCode /= K.NONE) $ + errorWithoutStackTrace $ "List groups failed: " <> show resp.errorCode + let K.NonNullKaArray groups = resp.groups + lenses = [ Text.unpack . (.groupId) + , const broker.host + , Text.unpack . (.protocolType) + ] + stats = (\s -> ($ s) <$> lenses) <$> (V.toList groups) + return stats + + let titles = ["ID", "Host", "ProtocolType"] + putStrLn $ simpleShowTable (map (, 30, Table.left) titles) (concat rows) handleGroupShow :: Options -> Text -> IO () handleGroupShow Options{..} name = do @@ -568,3 +573,19 @@ unsafeWithStdString f = do !r <- f ptr bs <- HsForeign.unsafePeekStdString ptr pure (bs, r) + +data Broker = Broker + { nodeId :: !Int32 + , host :: String + , port :: !Int + } + +findAllBrokers :: String -> Int -> IO (V.Vector Broker) +findAllBrokers host port = do + let req = K.MetadataRequestV0 (K.KaArray Nothing) + correlationId <- getCorrelationId + resp <- KA.withSendAndRecv host port (KA.metadata correlationId req) + let K.NonNullKaArray brokers = resp.brokers + when (V.null brokers) $ + errorWithoutStackTrace $ "Get empty brokers" + return $ V.map (\b -> Broker{nodeId=b.nodeId, host=(Text.unpack b.host), port=(fromIntegral b.port)}) brokers