diff --git a/hstream-kafka/app/Main.hs b/hstream-kafka/app/Main.hs index d89861791..7619d091f 100644 --- a/hstream-kafka/app/Main.hs +++ b/hstream-kafka/app/Main.hs @@ -5,6 +5,7 @@ import Control.Monad import qualified Data.ByteString as BS import Data.Int import qualified Data.Vector as V +import qualified HStream.Logger as Log import qualified Kafka.Protocol.Encoding as K import qualified Kafka.Protocol.Message as K import qualified Network.Socket as NW @@ -15,9 +16,11 @@ import HStream.Kafka.Client.CliParser import HStream.Kafka.Client.Core import HStream.Kafka.Client.Network + main :: IO () main = do - (cmd, opts) <- runKafkaCliParser + parsed@(cmd, opts) <- runKafkaCliParser + Log.debug . Log.buildString $ "command and options: " <> show parsed case cmd of Nodes -> handleCmdNodes opts Topics -> handleCmdTopics opts diff --git a/hstream-kafka/client/HStream/Kafka/Client/CliParser.hs b/hstream-kafka/client/HStream/Kafka/Client/CliParser.hs index 2442809d2..4deddec6b 100644 --- a/hstream-kafka/client/HStream/Kafka/Client/CliParser.hs +++ b/hstream-kafka/client/HStream/Kafka/Client/CliParser.hs @@ -19,6 +19,7 @@ data Command | DeleteTopic String | Groups | DescribeGroup String + deriving (Show) data Options = Options { brokers :: Maybe String, @@ -27,6 +28,7 @@ data Options = Options schemaRegistry :: Maybe String, verbose :: Bool } + deriving (Show) parseCommand :: Parser Command parseCommand = diff --git a/hstream-kafka/client/HStream/Kafka/Client/Core.hs b/hstream-kafka/client/HStream/Kafka/Client/Core.hs index 8ef94876b..3aaa995b2 100644 --- a/hstream-kafka/client/HStream/Kafka/Client/Core.hs +++ b/hstream-kafka/client/HStream/Kafka/Client/Core.hs @@ -6,6 +6,7 @@ import qualified Data.ByteString as BS import Data.Int import qualified Data.Text as T import qualified Data.Vector as V +import qualified HStream.Logger as Log import qualified Kafka.Protocol.Encoding as K import qualified Kafka.Protocol.Message as K import qualified Network.Socket as NW @@ -21,6 +22,7 @@ defaultTimeoutMs = 5000 getAllMetadata :: Options -> IO K.MetadataResponseV1 getAllMetadata opts = do + Log.debug . Log.buildString $ "getAllMetadata: begin" let reqHeader = K.RequestHeader { K.requestApiKey = 3 , K.requestApiVersion = 1 @@ -32,16 +34,22 @@ getAllMetadata opts = do (K.KaArray $ Just V.empty) reqBs' = K.runPut reqHeader <> K.runPut @K.MetadataRequestV1 reqBody reqLen = K.runPut @Int32 $ fromIntegral (BS.length reqBs') - reqBs = reqLen <> reqBs - withSock opts $ \sock -> do + reqBs = reqLen <> reqBs' + Log.debug . Log.buildString $ "getAllMetadata: ready to send" + ret <- withSock opts $ \sock -> do + Log.debug . Log.buildString $ "getAllMetadata: conn" NW.sendAll sock reqBs + Log.debug . Log.buildString $ "getAllMetadata: req sent" recvResp @K.MetadataResponseV1 sock + Log.debug . Log.buildString $ "getAllMetadata: end" + pure ret -- describeConfigs :: Options -> IO K.DescribeConfigsV4 -- describeConfigs opts = undefined handleCmdNodes :: Options -> IO () handleCmdNodes opts = do + Log.debug . Log.buildString $ "handleCmdNodes: begin" K.MetadataResponseV1 brokers controllerId _topics <- getAllMetadata opts let titles = ["ID", "ADDRESS", "CONTROLLER"] lenses = [ \(K.MetadataResponseBrokerV1 nodeId host port rack) -> show nodeId -- broker.nodeId diff --git a/hstream-kafka/client/HStream/Kafka/Client/Network.hs b/hstream-kafka/client/HStream/Kafka/Client/Network.hs index 3470f6f66..1c0a8a0e4 100644 --- a/hstream-kafka/client/HStream/Kafka/Client/Network.hs +++ b/hstream-kafka/client/HStream/Kafka/Client/Network.hs @@ -14,7 +14,7 @@ import Options.Applicative as AP import HStream.Kafka.Client.CliParser defaultAddr = "127.0.0.1" -defaultPort = 33389 +defaultPort = 44243 withSock :: Options -> (NW.Socket -> IO a) -> IO a withSock opts talk = do