From fa2fa44188485c664f0587b23fc7b3b284017368 Mon Sep 17 00:00:00 2001 From: Alissa Tung Date: Tue, 26 Sep 2023 11:03:18 +0800 Subject: [PATCH] hstream-kafka-client: wip --- hstream-kafka/app/Main.hs | 43 ++++--------------- .../client/HStream/Kafka/Client/Core.hs | 29 +++++++++++++ hstream-kafka/hstream-kafka.cabal | 1 + 3 files changed, 39 insertions(+), 34 deletions(-) create mode 100644 hstream-kafka/client/HStream/Kafka/Client/Core.hs diff --git a/hstream-kafka/app/Main.hs b/hstream-kafka/app/Main.hs index 911b75418..a0a19219a 100644 --- a/hstream-kafka/app/Main.hs +++ b/hstream-kafka/app/Main.hs @@ -12,43 +12,18 @@ import qualified Network.Socket.ByteString as NW import Options.Applicative as AP import HStream.Kafka.Client.CliParser +import HStream.Kafka.Client.Core import HStream.Kafka.Client.Network defaultAddr = "127.0.0.1" defaultPort = 51801 - main :: IO () -main = withSock defaultAddr defaultPort $ \sock -> do - let reqHeader = K.RequestHeader - (K.ApiKey 18) 3 1 - (Right $ Just "hstream_kafka_client") - (Right K.EmptyTaggedFields) - reqBody = K.ApiVersionsRequestV3 - (K.CompactString "") - (K.CompactString "") - K.EmptyTaggedFields - reqBs = K.runPut reqHeader <> K.runPut reqBody - req = K.runPut (fromIntegral (BS.length reqBs) :: Int32) <> reqBs - NW.sendAll sock req - print =<< getRespApiVersionsResponseV3 sock - - - --- nodeLsCommand -> getClusterAdmin -> DescribeCluster -handleListNodes :: Options -> IO () -handleListNodes opts = withSock defaultAddr defaultPort $ \sock -> do - let reqHeader = K.RequestHeader - (K.ApiKey 18) 3 1 - (Right $ Just "hstream_kafka_client") - (Right K.EmptyTaggedFields) - reqBody = K.ApiVersionsRequestV3 - (K.CompactString "") - (K.CompactString "") - K.EmptyTaggedFields - reqBs = K.runPut reqHeader <> K.runPut reqBody - reqBs' = K.runPut (fromIntegral (BS.length reqBs) :: Int32) <> reqBs - NW.sendAll sock reqBs' - -getRespApiVersionsResponseV3 :: NW.Socket -> IO K.ApiVersionsResponseV3 -getRespApiVersionsResponseV3 = recvResp @K.ApiVersionsResponseV3 +main = do + (cmd, opts) <- runKafkaCliParser + case cmd of + Nodes -> handleCmdNodes opts + Topics -> handleCmdTopics opts + DescribeTopic topic -> handleCmdDescribeTopic opts topic + Groups -> handleCmdGroups opts + DescribeGroup group -> handleCmdDescribeGroup opts group diff --git a/hstream-kafka/client/HStream/Kafka/Client/Core.hs b/hstream-kafka/client/HStream/Kafka/Client/Core.hs new file mode 100644 index 000000000..8c4eff93d --- /dev/null +++ b/hstream-kafka/client/HStream/Kafka/Client/Core.hs @@ -0,0 +1,29 @@ +module HStream.Kafka.Client.Core where + +import Control.Exception +import Control.Monad +import qualified Data.ByteString as BS +import Data.Int +import qualified Data.Vector as V +import qualified Kafka.Protocol.Encoding as K +import qualified Kafka.Protocol.Message as K +import qualified Network.Socket as NW +import qualified Network.Socket.ByteString as NW +import Options.Applicative as AP + +import HStream.Kafka.Client.CliParser + +handleCmdNodes :: Options -> IO () +handleCmdNodes = undefined + +handleCmdTopics :: Options -> IO () +handleCmdTopics = undefined + +handleCmdDescribeTopic :: Options -> String -> IO () +handleCmdDescribeTopic = undefined + +handleCmdGroups :: Options -> IO () +handleCmdGroups = undefined + +handleCmdDescribeGroup :: Options -> String -> IO () +handleCmdDescribeGroup = undefined diff --git a/hstream-kafka/hstream-kafka.cabal b/hstream-kafka/hstream-kafka.cabal index bf69541a2..ace0f72fc 100644 --- a/hstream-kafka/hstream-kafka.cabal +++ b/hstream-kafka/hstream-kafka.cabal @@ -197,6 +197,7 @@ library hstream-kafka-client exposed-modules: HStream.Kafka.Client.CliParser + HStream.Kafka.Client.Core HStream.Kafka.Client.Network hs-source-dirs: client