Skip to content

Commit

Permalink
Remove --zkuri option && add replace it with --meta-store (#1066)
Browse files Browse the repository at this point in the history
  • Loading branch information
Time-Hu authored and 4eUeP committed Sep 16, 2022
1 parent 50abf74 commit 883346c
Show file tree
Hide file tree
Showing 14 changed files with 175 additions and 108 deletions.
2 changes: 1 addition & 1 deletion common/HStream/Logger.hs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ setLogLevel level withColor = do
}
Log.setDefaultLogger =<< Log.newStdLogger config

newtype Level = Level {unLevel :: Log.Level}
newtype Level = Level {unLevel :: Log.Level} deriving (Eq)

instance Show Level where
show (Level Log.CRITICAL) = "critical"
Expand Down
5 changes: 4 additions & 1 deletion conf/hstream.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,10 @@ hserver:
# Comma separated host:port pairs, each corresponding to a zk zookeeper server.
# e.g. 127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183
# the value must be given and can be overwritten by cli option `--zkuri`
zkuri: "127.0.0.1:2181"

meta-store: zk://127.0.0.1:2181

connector-meta-store: zk://127.0.0.1:2181

# To start a cluster, a list of host (and port) pairs must be provided, if the port is not specified,
# the server will use `internal-port` value specified above.
Expand Down
4 changes: 2 additions & 2 deletions docker/quick-start.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ services:
--server-id 100 \
--seed-nodes "hserver0:6571,hserver1:6573" \
--address $$(hostname -I | awk '{print $$1}') \
--zkuri zookeeper:2181 \
--meta-store zk://zookeeper:2181 \
--store-config /data/store/logdevice.conf \
--store-admin-host hstore --store-admin-port 6440 \
--io-tasks-path /tmp/io/tasks \
Expand Down Expand Up @@ -61,7 +61,7 @@ services:
--server-id 101 \
--seed-nodes "hserver0:6571,hserver1:6573" \
--address $$(hostname -I | awk '{print $$1}') \
--zkuri zookeeper:2181 \
--meta-store zk://zookeeper:2181 \
--store-config /data/store/logdevice.conf \
--store-admin-host hstore --store-admin-port 6440 \
--io-tasks-path /tmp/io/tasks \
Expand Down
2 changes: 1 addition & 1 deletion hstream-io/HStream/IO/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ data IOOptions = IOOptions
, optTasksPath :: T.Text
, optSourceImages :: HM.HashMap T.Text T.Text
, optSinkImages :: HM.HashMap T.Text T.Text
} deriving (Show)
} deriving (Show, Eq)

data IOTask = IOTask
{ taskId :: T.Text
Expand Down
2 changes: 1 addition & 1 deletion hstream-store/HStream/Store/Logger.hs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import qualified Text.Read as Read

type FD = CInt

newtype LDLogLevel = LDLogLevel {unLDLogLevel :: FFI.C_DBG_LEVEL}
newtype LDLogLevel = LDLogLevel {unLDLogLevel :: FFI.C_DBG_LEVEL} deriving (Eq)

instance Read LDLogLevel where
readPrec = do
Expand Down
56 changes: 35 additions & 21 deletions hstream/app/server.hs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import Data.Word (Word16)
import qualified Network.GRPC.HighLevel as GRPC
import qualified Network.GRPC.HighLevel.Client as GRPC
import qualified Network.GRPC.HighLevel.Generated as GRPC
import Network.HTTP.Client (defaultManagerSettings,
newManager)
import Text.RawString.QQ (r)
import Z.Data.CBytes (CBytes)
import ZooKeeper (withResource,
Expand All @@ -38,7 +40,10 @@ import HStream.Gossip (GossipContext (..),
initGossipContext,
startGossip)
import qualified HStream.Logger as Log
import HStream.MetaStore.Types (MetaHandle (..),
RHandle (..))
import HStream.Server.Config (AdvertisedListeners,
MetaStoreAddr (..),
ServerOpts (..), TlsConfig,
advertisedListenersToPB,
getConfig)
Expand All @@ -48,7 +53,8 @@ import HStream.Server.HStreamApi (NodeState (..),
import qualified HStream.Server.HStreamInternal as I
import HStream.Server.Initialization (initializeServer,
initializeTlsConfig)
import HStream.Server.MetaData (initializeAncestors)
import HStream.Server.MetaData (initializeAncestors,
initializeTables)
import HStream.Server.Types (ServerContext (..),
ServerState)
import qualified HStream.Store.Logger as Log
Expand All @@ -70,27 +76,35 @@ app config@ServerOpts{..} = do
Log.setLogDeviceDbgLevel' _ldLogLevel

-- TODO: remove me
serverState <- newMVar (EnumPB NodeStateStarting)

let zkRes = zookeeperResInit _zkUri (Just $ globalWatcherFn serverState) 5000 Nothing 0
serverHostBS = cbytes2bs _serverHost
withResource zkRes $ \zk -> do
initializeAncestors zk

let serverNode =
I.ServerNode { serverNodeId = _serverID
, serverNodeHost = encodeUtf8 . T.pack $ _serverAddress
, serverNodePort = fromIntegral _serverPort
, serverNodeGossipPort = fromIntegral _serverInternalPort
, serverNodeAdvertisedListeners = advertisedListenersToPB _serverAdvertisedListeners
}
gossipContext <- initGossipContext defaultGossipOpts mempty serverNode _seedNodes

serverContext <- initializeServer config gossipContext zk serverState
void . forkIO $ updateHashRing gossipContext (loadBalanceHashRing serverContext)

concurrently_ (startGossip gossipContext)
(serve serverHostBS _serverPort _tlsConfig serverContext _serverAdvertisedListeners)
serverState <- newMVar (EnumPB NodeStateStarting)
case _metaStore of
ZkAddr addr -> do
let zkRes = zookeeperResInit addr (Just $ globalWatcherFn serverState) 5000 Nothing 0
withResource zkRes $ \zk ->
initializeAncestors zk >> action serverState (ZkHandle zk)
RqAddr addr -> do
m <- newManager defaultManagerSettings
let rq = RHandle m addr
initializeTables rq
action serverState $ RLHandle rq
where
action serverState h = do
let serverNode =
I.ServerNode { serverNodeId = _serverID
, serverNodeHost = encodeUtf8 . T.pack $ _serverAddress
, serverNodePort = fromIntegral _serverPort
, serverNodeGossipPort = fromIntegral _serverInternalPort
, serverNodeAdvertisedListeners = advertisedListenersToPB _serverAdvertisedListeners
}
serverHostBS = cbytes2bs _serverHost
gossipContext <- initGossipContext defaultGossipOpts mempty serverNode _seedNodes

serverContext <- initializeServer config gossipContext h serverState
void . forkIO $ updateHashRing gossipContext (loadBalanceHashRing serverContext)

concurrently_ (startGossip gossipContext)
(serve serverHostBS _serverPort _tlsConfig serverContext _serverAdvertisedListeners)

serve :: ByteString
-> Word16
Expand Down
7 changes: 7 additions & 0 deletions hstream/hstream.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ library
, microlens-aeson
, mysql-haskell
, network
, network-uri
, optparse-applicative
, proto3-suite
, proto3-wire
Expand Down Expand Up @@ -172,6 +173,7 @@ executable hstream-server
, hstream-common-stats
, hstream-gossip
, hstream-store
, http-client
, memory
, optparse-applicative
, proto3-suite
Expand Down Expand Up @@ -244,6 +246,7 @@ test-suite hstream-test
hs-source-dirs: test
build-depends:
, aeson
, aeson-pretty
, async
, base >=4.11 && <5
, bytestring
Expand All @@ -252,8 +255,11 @@ test-suite hstream-test
, grpc-haskell-core
, hspec
, hstream
, hstream-admin
, hstream-common
, hstream-common-stats
, hstream-gossip
, hstream-io
, hstream-sql
, hstream-store
, io-streams
Expand All @@ -265,6 +271,7 @@ test-suite hstream-test
, text
, unordered-containers
, vector
, yaml
, Z-Data
, Z-IO
, zoovisitor
Expand Down
80 changes: 64 additions & 16 deletions hstream/src/HStream/Server/Config.hs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,14 @@

module HStream.Server.Config
( ServerOpts (..)
, CliOptions (..)
, TlsConfig (..)
, AdvertisedListeners
, advertisedListenersToPB
, getConfig
, MetaStoreAddr(..)
, parseJSONToOptions
, readProtocol
) where

import Control.Exception (throwIO)
Expand All @@ -17,10 +21,12 @@ import qualified Data.Attoparsec.Text as AP
import Data.Bifunctor (second)
import Data.ByteString (ByteString)
import qualified Data.ByteString.Char8 as BSC
import qualified Data.HashMap.Strict as HM
import Data.Map.Strict (Map)
import qualified Data.Map.Strict as Map
import Data.Maybe (fromMaybe)
import Data.Text (Text)
import qualified Data.Text as T
import qualified Data.Text as Text
import Data.Text.Encoding (encodeUtf8)
import Data.Vector (Vector)
Expand All @@ -37,17 +43,17 @@ import Options.Applicative as O (Alternative ((<|>)),
auto, defaultPrefs,
execParserPure, flag,
fullDesc, help, helper,
info, long, metavar,
option, optional,
info, long, maybeReader,
metavar, option, optional,
progDesc, renderFailure,
short, showDefault,
strOption, value, (<**>))
import System.Directory (makeAbsolute)
import System.Environment (getArgs, getProgName)
import System.Exit (exitSuccess)
import qualified Z.Data.CBytes as CB
import Z.Data.CBytes (CBytes)

import qualified Data.HashMap.Strict as HM
import qualified HStream.Admin.Store.API as AA
import HStream.Gossip (GossipOpts (..),
defaultGossipOpts)
Expand All @@ -57,27 +63,33 @@ import qualified HStream.Server.HStreamInternal as SAI
import HStream.Store (Compression (..))
import qualified HStream.Store.Logger as Log


-------------------------------------------------------------------------------

data TlsConfig = TlsConfig
{ keyPath :: String
, certPath :: String
, caPath :: Maybe String
} deriving (Show)
} deriving (Show, Eq)

type AdvertisedListeners = Map Text (Vector SAI.Listener)

advertisedListenersToPB :: AdvertisedListeners -> Map Text (Maybe SAI.ListOfListener)
advertisedListenersToPB = Map.map $ Just . SAI.ListOfListener

data MetaStoreAddr
= ZkAddr CBytes
| RqAddr Text
deriving (Eq)

data ServerOpts = ServerOpts
{ _serverHost :: !CBytes
, _serverPort :: !Word16
, _serverInternalPort :: !Word16
, _serverAddress :: !String
, _serverAdvertisedListeners :: !AdvertisedListeners
, _serverID :: !Word32
, _zkUri :: !CBytes
, _metaStore :: !MetaStoreAddr
, _ldConfigPath :: !CBytes
, _topicRepFactor :: !Int
, _ckpRepFactor :: !Int
Expand All @@ -97,7 +109,8 @@ data ServerOpts = ServerOpts

, _gossipOpts :: !GossipOpts
, _ioOptions :: !IO.IOOptions
} deriving (Show)
, _connectorMetaStore :: !MetaStoreAddr
} deriving (Show, Eq)

getConfig :: IO ServerOpts
getConfig = do
Expand Down Expand Up @@ -134,7 +147,7 @@ data CliOptions = CliOptions
, _serverLogLevel_ :: !(Maybe Log.Level)
, _serverLogWithColor_ :: !Bool
, _compression_ :: !(Maybe Compression)
, _zkUri_ :: !(Maybe CBytes)
, _metaStore_ :: !(Maybe MetaStoreAddr)
, _seedNodes_ :: !(Maybe Text)

, _enableTls_ :: !Bool
Expand Down Expand Up @@ -168,7 +181,7 @@ cliOptionsParser = do
_ldAdminPort_ <- optional ldAdminPort
_ldAdminHost_ <- optional ldAdminHost
_ldLogLevel_ <- optional ldLogLevel
_zkUri_ <- optional zkUri
_metaStore_ <- optional metaStore
_serverLogLevel_ <- optional logLevel
_compression_ <- optional compression
_serverLogWithColor_ <- logWithColor
Expand All @@ -191,7 +204,7 @@ parseJSONToOptions CliOptions {..} obj = do
nodeInternalPort <- nodeCfgObj .:? "internal-port" .!= 6571
advertisedListeners <- nodeCfgObj .:? "advertised-listeners"

zkuri <- nodeCfgObj .: "zkuri"
nodeMetaStore <- parseMetaStoreAddr <$> nodeCfgObj .: "meta-store" :: Y.Parser MetaStoreAddr
serverCompression <- read <$> nodeCfgObj .:? "compression" .!= "lz4"
nodeLogLevel <- nodeCfgObj .:? "log-level" .!= "info"
nodeLogWithColor <- nodeCfgObj .:? "log-with-color" .!= True
Expand All @@ -208,7 +221,7 @@ parseJSONToOptions CliOptions {..} obj = do
let _serverAddress = fromMaybe nodeAddress _serverAddress_
let _serverAdvertisedListeners = fromMaybe Map.empty advertisedListeners

let _zkUri = fromMaybe zkuri _zkUri_
let _metaStore = fromMaybe nodeMetaStore _metaStore_
let _serverLogLevel = fromMaybe (read nodeLogLevel) _serverLogLevel_
let _serverLogWithColor = nodeLogWithColor || _serverLogWithColor_
let _compression = fromMaybe serverCompression _compression_
Expand Down Expand Up @@ -261,7 +274,13 @@ parseJSONToOptions CliOptions {..} obj = do
(_, _, Nothing) -> errorWithoutStackTrace "enable-tls=true, but tls-cert-path is empty"
(_, Just kp, Just cp) -> Just $ TlsConfig kp cp _tlsCaPath


-- hstream io config
-- FIXME: connector meta store should be part of ioOpts
_connectorMetaStore <- parseMetaStoreAddr <$> nodeCfgObj .:? "connector-meta-store" .!= T.pack (show nodeMetaStore)
case _connectorMetaStore of
ZkAddr _ -> return ()
_ -> error "Currently only support connectors with zookeeper meta store"
nodeIOCfg <- nodeCfgObj .:? "hstream-io" .!= mempty
nodeIOTasksPath <- nodeIOCfg .:? "tasks-path" .!= "/tmp/io/tasks"
nodeIOTasksNetwork <- nodeIOCfg .:? "tasks-network" .!= "host"
Expand Down Expand Up @@ -352,13 +371,12 @@ ldLogLevel = option auto
<> metavar "[critical|error|warning|notify|info|debug|spew]"
<> help "Store log level"

zkUri :: O.Parser CBytes
zkUri = strOption
$ long "zkuri"
metaStore :: O.Parser MetaStoreAddr
metaStore = option (O.maybeReader (Just . parseMetaStoreAddr . T.pack))
$ long "meta-store"
<> metavar "STR"
<> help ( "comma separated host:port pairs, each corresponding"
<> "to a zk zookeeper server. "
<> "e.g. \"127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183\"")
<> help ( "Meta store address, currently support zookeeper and rqlite"
<> "such as \"zk://127.0.0.1:2181,127.0.0.1:2182 , \"rq://127.0.0.1:4001\"")

--TODO: This option will be removed once we can get config from admin server.
storeConfigPath :: O.Parser CBytes
Expand Down Expand Up @@ -416,3 +434,33 @@ parseHostPorts = AP.parseOnly (hostPortParser `AP.sepBy` AP.char ',')
host <- encodeUtf8 <$> AP.takeTill (`elem` [':', ','])
port <- optional (AP.char ':' *> AP.decimal)
return (host, port)

parseMetaStoreAddr :: Text -> MetaStoreAddr
parseMetaStoreAddr t =
case AP.parseOnly metaStoreP t of
Right (s, ip)
| s == "zk" -> ZkAddr . CB.pack .T.unpack $ ip
| s == "rq" -> RqAddr ip
| otherwise -> errorWithoutStackTrace $ "Invalid meta store address, unsupported scheme: " <> show s
Left eMsg -> errorWithoutStackTrace eMsg

metaStoreP :: AP.Parser (Text, Text)
metaStoreP = do
scheme <- AP.takeTill (== ':')
AP.string "://"
ip <- AP.takeText
return (scheme, ip)

-- TODO: Haskell libraries does not support the case where multiple auths exist
-- case parseURI str of
-- Just URI{..} -> case uriAuthority of
-- Just URIAuth{..}
-- | uriScheme == "zk:" -> ZkAddr . CB.pack $ uriRegName <> uriPort
-- | uriScheme == "rq:" -> RqAddr . T.pack $ uriRegName <> uriPort
-- | otherwise -> errorWithoutStackTrace $ "Invalid meta store address, unsupported scheme: " <> uriScheme
-- Nothing -> errorWithoutStackTrace $ "Invalid meta store address, no Auth: " <> str
-- Nothing -> errorWithoutStackTrace $ "Invalid meta store address, no parse: " <> str

instance Show MetaStoreAddr where
show (ZkAddr addr) = "zk://" <> CB.unpack addr
show (RqAddr addr) = "rq://" <> T.unpack addr
Loading

0 comments on commit 883346c

Please sign in to comment.