Skip to content

Commit

Permalink
hstream-kafka-client: wip
Browse files Browse the repository at this point in the history
  • Loading branch information
alissa-tung committed Sep 27, 2023
1 parent 72fbd59 commit 5bfb75a
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 4 deletions.
5 changes: 4 additions & 1 deletion hstream-kafka/app/Main.hs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import Control.Monad
import qualified Data.ByteString as BS
import Data.Int
import qualified Data.Vector as V
import qualified HStream.Logger as Log
import qualified Kafka.Protocol.Encoding as K
import qualified Kafka.Protocol.Message as K
import qualified Network.Socket as NW
Expand All @@ -15,9 +16,11 @@ import HStream.Kafka.Client.CliParser
import HStream.Kafka.Client.Core
import HStream.Kafka.Client.Network


main :: IO ()
main = do
(cmd, opts) <- runKafkaCliParser
parsed@(cmd, opts) <- runKafkaCliParser
Log.debug . Log.buildString $ "command and options: " <> show parsed
case cmd of
Nodes -> handleCmdNodes opts
Topics -> handleCmdTopics opts
Expand Down
2 changes: 2 additions & 0 deletions hstream-kafka/client/HStream/Kafka/Client/CliParser.hs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ data Command
| DeleteTopic String
| Groups
| DescribeGroup String
deriving (Show)

data Options = Options
{ brokers :: Maybe String,
Expand All @@ -27,6 +28,7 @@ data Options = Options
schemaRegistry :: Maybe String,
verbose :: Bool
}
deriving (Show)

parseCommand :: Parser Command
parseCommand =
Expand Down
12 changes: 10 additions & 2 deletions hstream-kafka/client/HStream/Kafka/Client/Core.hs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import qualified Data.ByteString as BS
import Data.Int
import qualified Data.Text as T
import qualified Data.Vector as V
import qualified HStream.Logger as Log
import qualified Kafka.Protocol.Encoding as K
import qualified Kafka.Protocol.Message as K
import qualified Network.Socket as NW
Expand All @@ -21,6 +22,7 @@ defaultTimeoutMs = 5000

getAllMetadata :: Options -> IO K.MetadataResponseV1
getAllMetadata opts = do
Log.debug . Log.buildString $ "getAllMetadata: begin"
let reqHeader = K.RequestHeader
{ K.requestApiKey = 3
, K.requestApiVersion = 1
Expand All @@ -32,16 +34,22 @@ getAllMetadata opts = do
(K.KaArray $ Just V.empty)
reqBs' = K.runPut reqHeader <> K.runPut @K.MetadataRequestV1 reqBody
reqLen = K.runPut @Int32 $ fromIntegral (BS.length reqBs')
reqBs = reqLen <> reqBs
withSock opts $ \sock -> do
reqBs = reqLen <> reqBs'
Log.debug . Log.buildString $ "getAllMetadata: ready to send"
ret <- withSock opts $ \sock -> do
Log.debug . Log.buildString $ "getAllMetadata: conn"
NW.sendAll sock reqBs
Log.debug . Log.buildString $ "getAllMetadata: req sent"
recvResp @K.MetadataResponseV1 sock
Log.debug . Log.buildString $ "getAllMetadata: end"
pure ret

-- describeConfigs :: Options -> IO K.DescribeConfigsV4
-- describeConfigs opts = undefined

handleCmdNodes :: Options -> IO ()
handleCmdNodes opts = do
Log.debug . Log.buildString $ "handleCmdNodes: begin"
K.MetadataResponseV1 brokers controllerId _topics <- getAllMetadata opts
let titles = ["ID", "ADDRESS", "CONTROLLER"]
lenses = [ \(K.MetadataResponseBrokerV1 nodeId host port rack) -> show nodeId -- broker.nodeId
Expand Down
2 changes: 1 addition & 1 deletion hstream-kafka/client/HStream/Kafka/Client/Network.hs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import Options.Applicative as AP
import HStream.Kafka.Client.CliParser

defaultAddr = "127.0.0.1"
defaultPort = 33389
defaultPort = 44243

withSock :: Options -> (NW.Socket -> IO a) -> IO a
withSock opts talk = do
Expand Down

0 comments on commit 5bfb75a

Please sign in to comment.