Skip to content

Commit

Permalink
hstream-kafka-client: wip
Browse files Browse the repository at this point in the history
  • Loading branch information
alissa-tung committed Sep 26, 2023
1 parent a03ccc9 commit fa2fa44
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 34 deletions.
43 changes: 9 additions & 34 deletions hstream-kafka/app/Main.hs
Original file line number Diff line number Diff line change
Expand Up @@ -12,43 +12,18 @@ import qualified Network.Socket.ByteString as NW
import Options.Applicative as AP

import HStream.Kafka.Client.CliParser
import HStream.Kafka.Client.Core
import HStream.Kafka.Client.Network

defaultAddr = "127.0.0.1"
defaultPort = 51801


main :: IO ()
main = withSock defaultAddr defaultPort $ \sock -> do
let reqHeader = K.RequestHeader
(K.ApiKey 18) 3 1
(Right $ Just "hstream_kafka_client")
(Right K.EmptyTaggedFields)
reqBody = K.ApiVersionsRequestV3
(K.CompactString "")
(K.CompactString "")
K.EmptyTaggedFields
reqBs = K.runPut reqHeader <> K.runPut reqBody
req = K.runPut (fromIntegral (BS.length reqBs) :: Int32) <> reqBs
NW.sendAll sock req
print =<< getRespApiVersionsResponseV3 sock



-- nodeLsCommand -> getClusterAdmin -> DescribeCluster
handleListNodes :: Options -> IO ()
handleListNodes opts = withSock defaultAddr defaultPort $ \sock -> do
let reqHeader = K.RequestHeader
(K.ApiKey 18) 3 1
(Right $ Just "hstream_kafka_client")
(Right K.EmptyTaggedFields)
reqBody = K.ApiVersionsRequestV3
(K.CompactString "")
(K.CompactString "")
K.EmptyTaggedFields
reqBs = K.runPut reqHeader <> K.runPut reqBody
reqBs' = K.runPut (fromIntegral (BS.length reqBs) :: Int32) <> reqBs
NW.sendAll sock reqBs'

getRespApiVersionsResponseV3 :: NW.Socket -> IO K.ApiVersionsResponseV3
getRespApiVersionsResponseV3 = recvResp @K.ApiVersionsResponseV3
main = do
(cmd, opts) <- runKafkaCliParser
case cmd of
Nodes -> handleCmdNodes opts
Topics -> handleCmdTopics opts
DescribeTopic topic -> handleCmdDescribeTopic opts topic
Groups -> handleCmdGroups opts
DescribeGroup group -> handleCmdDescribeGroup opts group
29 changes: 29 additions & 0 deletions hstream-kafka/client/HStream/Kafka/Client/Core.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
module HStream.Kafka.Client.Core where

import Control.Exception
import Control.Monad
import qualified Data.ByteString as BS
import Data.Int
import qualified Data.Vector as V
import qualified Kafka.Protocol.Encoding as K
import qualified Kafka.Protocol.Message as K
import qualified Network.Socket as NW
import qualified Network.Socket.ByteString as NW
import Options.Applicative as AP

import HStream.Kafka.Client.CliParser

handleCmdNodes :: Options -> IO ()
handleCmdNodes = undefined

handleCmdTopics :: Options -> IO ()
handleCmdTopics = undefined

handleCmdDescribeTopic :: Options -> String -> IO ()
handleCmdDescribeTopic = undefined

handleCmdGroups :: Options -> IO ()
handleCmdGroups = undefined

handleCmdDescribeGroup :: Options -> String -> IO ()
handleCmdDescribeGroup = undefined
1 change: 1 addition & 0 deletions hstream-kafka/hstream-kafka.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ library hstream-kafka-client

exposed-modules:
HStream.Kafka.Client.CliParser
HStream.Kafka.Client.Core
HStream.Kafka.Client.Network

hs-source-dirs: client
Expand Down

0 comments on commit fa2fa44

Please sign in to comment.