Skip to content

Commit

Permalink
kafka(cli): fix listGroups should list all groups in the cluster (#1724)
Browse files Browse the repository at this point in the history
  • Loading branch information
YangKian authored Jan 2, 2024
1 parent cc7d510 commit 876b7a2
Showing 1 changed file with 32 additions and 11 deletions.
43 changes: 32 additions & 11 deletions hstream-kafka/HStream/Kafka/Client/Cli.hs
Original file line number Diff line number Diff line change
Expand Up @@ -257,18 +257,23 @@ handleGroupCommand opts (GroupCommandShow n) = handleGroupShow opts n

handleGroupList :: Options -> IO ()
handleGroupList Options{..} = do
let req = K.ListGroupsRequestV0
correlationId <- getCorrelationId
resp <- KA.withSendAndRecv host port (KA.listGroups correlationId req)
when (resp.errorCode /= K.NONE) $
errorWithoutStackTrace $ "List groups failed: " <> show resp.errorCode
let titles = ["ID", "ProtocolType"]
K.NonNullKaArray groups = resp.groups
lenses = [ Text.unpack . (.groupId)
, Text.unpack . (.protocolType)
]
stats = (\s -> ($ s) <$> lenses) <$> (V.toList groups)
putStrLn $ simpleShowTable (map (, 30, Table.left) titles) stats
brokers <- findAllBrokers host port
let req = K.ListGroupsRequestV0
rows <- V.forM brokers $ \broker -> do
resp <- KA.withSendAndRecv broker.host broker.port (KA.listGroups correlationId req)
when (resp.errorCode /= K.NONE) $
errorWithoutStackTrace $ "List groups failed: " <> show resp.errorCode
let K.NonNullKaArray groups = resp.groups
lenses = [ Text.unpack . (.groupId)
, const broker.host
, Text.unpack . (.protocolType)
]
stats = (\s -> ($ s) <$> lenses) <$> (V.toList groups)
return stats

let titles = ["ID", "Host", "ProtocolType"]
putStrLn $ simpleShowTable (map (, 30, Table.left) titles) (concat rows)

handleGroupShow :: Options -> Text -> IO ()
handleGroupShow Options{..} name = do
Expand Down Expand Up @@ -568,3 +573,19 @@ unsafeWithStdString f = do
!r <- f ptr
bs <- HsForeign.unsafePeekStdString ptr
pure (bs, r)

data Broker = Broker
{ nodeId :: !Int32
, host :: String
, port :: !Int
}

findAllBrokers :: String -> Int -> IO (V.Vector Broker)
findAllBrokers host port = do
let req = K.MetadataRequestV0 (K.KaArray Nothing)
correlationId <- getCorrelationId
resp <- KA.withSendAndRecv host port (KA.metadata correlationId req)
let K.NonNullKaArray brokers = resp.brokers
when (V.null brokers) $
errorWithoutStackTrace $ "Get empty brokers"
return $ V.map (\b -> Broker{nodeId=b.nodeId, host=(Text.unpack b.host), port=(fromIntegral b.port)}) brokers

0 comments on commit 876b7a2

Please sign in to comment.