Skip to content

Commit

Permalink
hstream-kafka-client: wip
Browse files Browse the repository at this point in the history
  • Loading branch information
alissa-tung committed Sep 27, 2023
1 parent b0f39ee commit 4f0c277
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 5 deletions.
9 changes: 5 additions & 4 deletions hstream-kafka/client/HStream/Kafka/Client/Core.hs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ getAllMetadata opts = do
, K.requestApiVersion = 1
, K.requestCorrelationId = 1 -- FIXME
, K.requestClientId = requestClientId
, K.requesteTaggedFields = Right K.EmptyTaggedFields
, K.requesteTaggedFields = Left K.Unsupported
}
reqBody = K.MetadataRequestV0
(K.KaArray $ Just V.empty)
Expand All @@ -38,6 +38,7 @@ getAllMetadata opts = do
Log.debug . Log.buildString $ "getAllMetadata: ready to send"
ret <- withSock opts $ \sock -> do
Log.debug . Log.buildString $ "getAllMetadata: conn"
print reqBs
NW.sendAll sock reqBs
Log.debug . Log.buildString $ "getAllMetadata: req sent"
recvResp @K.MetadataResponseV1 sock
Expand All @@ -59,7 +60,7 @@ handleCmdNodes opts = do
brokers' = let K.KaArray (Just xs) = brokers
in V.toList xs
stats = (\s -> ($ s) <$> lenses) <$> brokers'
print $ simpleShowTable (map (, 30, Table.left) titles) stats
putStrLn $ simpleShowTable (map (, 30, Table.left) titles) stats

-- "NAME\tPARTITIONS\tREPLICAS\t\n"
-- topic.name, topic.NumPartitions, topic.ReplicationFactor
Expand Down Expand Up @@ -101,7 +102,7 @@ handleCreateTopic opts topicName numPartitions replicationFactor = do
, \(K.CreatableTopicResultV0 name errorCode) -> show errorCode
]
stats = (\s -> ($ s) <$> lenses) <$> rets
print $ simpleShowTable (map (, 30, Table.left) titles) (V.toList stats)
putStrLn $ simpleShowTable (map (, 30, Table.left) titles) (V.toList stats)

handleDeleteTopic :: Options -> String -> IO ()
handleDeleteTopic opts topicName = do
Expand All @@ -123,4 +124,4 @@ handleDeleteTopic opts topicName = do
, \(K.DeletableTopicResultV0 name errorCode) -> show errorCode
]
stats = (\s -> ($ s) <$> lenses) <$> rets
print $ simpleShowTable (map (, 30, Table.left) titles) (V.toList stats)
putStrLn $ simpleShowTable (map (, 30, Table.left) titles) (V.toList stats)
2 changes: 1 addition & 1 deletion hstream-kafka/client/HStream/Kafka/Client/Network.hs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import Options.Applicative as AP
import HStream.Kafka.Client.CliParser

defaultAddr = "127.0.0.1"
defaultPort = 44243
defaultPort = 46721

withSock :: Options -> (NW.Socket -> IO a) -> IO a
withSock opts talk = do
Expand Down
1 change: 1 addition & 0 deletions hstream-kafka/protocol/Kafka/Network.hs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ runServer opts handlers =
Fail _ err -> E.throwIO $ DecodeError $ "Fail, " <> err

runHandler reqBs = do
Log.debug . Log.buildString $ "Kafka runHandler: parsing RequestHeader"
headerResult <- runParser @RequestHeader get reqBs
case headerResult of
Done l RequestHeader{..} -> do
Expand Down

0 comments on commit 4f0c277

Please sign in to comment.