From 3d22f08281667748b741878169f92f851c68e134 Mon Sep 17 00:00:00 2001 From: mu <59917266+4eUeP@users.noreply.github.com> Date: Thu, 25 Apr 2024 17:52:28 +0800 Subject: [PATCH] ZookeeperClient: support auto reconnection (#1800) * Add a ZookeeperClient to support auto reconnection * Replace ZHandle with ZookeeperClient --- .../hstream/HStream/Common/ZookeeperClient.hs | 46 ++++++++++ .../HStream/Common/ZookeeperSlotAlloc.hs | 86 +++++++++++-------- common/hstream/HStream/MetaStore/Types.hs | 61 +++++++------ common/hstream/cbits/hs_zookeeper_client.cpp | 33 +++++++ common/hstream/hstream-common.cabal | 2 + common/hstream/test/HStream/MetaStoreSpec.hs | 16 ++-- common/hstream/test/HStream/TestUtils.hs | 21 ++--- .../test/HStream/ZookeeperSlotAllocSpec.hs | 12 +-- .../server/HStream/Common/Server/MetaData.hs | 21 +++-- hstream-io/HStream/IO/Types.hs | 60 ++++++------- .../HStream/Kafka/Common/AclStore.hs | 4 +- .../HStream/Kafka/Server/MetaData.hs | 11 +-- hstream-kafka/HStream/Kafka/Server/Types.hs | 2 +- .../tests/HStream/Kafka/Common/TestUtils.hs | 9 +- hstream/app/lib/KafkaServer.hs | 10 +-- hstream/app/server.hs | 30 +++---- .../HStream/Server/Experimental/StreamV2.hs | 6 +- hstream/src/HStream/Server/MetaData/Types.hs | 20 ++--- hstream/src/HStream/Server/MetaData/Utils.hs | 4 +- hstream/src/HStream/Server/MetaData/Value.hs | 26 +++--- 20 files changed, 291 insertions(+), 189 deletions(-) create mode 100644 common/hstream/HStream/Common/ZookeeperClient.hs create mode 100644 common/hstream/cbits/hs_zookeeper_client.cpp diff --git a/common/hstream/HStream/Common/ZookeeperClient.hs b/common/hstream/HStream/Common/ZookeeperClient.hs new file mode 100644 index 000000000..227fb34f9 --- /dev/null +++ b/common/hstream/HStream/Common/ZookeeperClient.hs @@ -0,0 +1,46 @@ +module HStream.Common.ZookeeperClient + ( ZookeeperClient + , withZookeeperClient + , unsafeGetZHandle + ) where + +import Data.Word +import Foreign.C.Types +import Foreign.ForeignPtr +import Foreign.Ptr +import Unsafe.Coerce (unsafeCoerce) +import Z.Data.CBytes (CBytes, withCBytes) +import ZooKeeper.Types (ZHandle) + +newtype ZookeeperClient = ZookeeperClient (Ptr CZookeeperClient) + +withZookeeperClient :: CBytes -> CInt -> (ZookeeperClient -> IO a) -> IO a +withZookeeperClient quorum timeout f = do + fp <- newZookeeperClient quorum timeout + withForeignPtr fp $ f . ZookeeperClient + +newZookeeperClient :: CBytes -> CInt -> IO (ForeignPtr CZookeeperClient) +newZookeeperClient quorum timeout = do + withCBytes quorum $ \quorum' -> do + client <- new_zookeeper_client quorum' timeout + newForeignPtr delete_zookeeper_client client + +-- It's safe to use unsafeGetZHandle in hstream since we donot free the +-- ZookeeperClient in the server lifetime. +unsafeGetZHandle :: ZookeeperClient -> IO ZHandle +unsafeGetZHandle (ZookeeperClient ptr) = + -- TODO: let zoovisitor exports the ZHandle constructor + -- + -- It's safe to use unsafeCoerce here because the ZHandle is a newtype. + unsafeCoerce <$> get_underlying_handle ptr + +data CZookeeperClient + +foreign import ccall safe "new_zookeeper_client" + new_zookeeper_client :: Ptr Word8 -> CInt -> IO (Ptr CZookeeperClient) + +foreign import ccall unsafe "&delete_zookeeper_client" + delete_zookeeper_client :: FunPtr (Ptr CZookeeperClient -> IO ()) + +foreign import ccall unsafe "get_underlying_handle" + get_underlying_handle :: Ptr CZookeeperClient -> IO (Ptr ()) diff --git a/common/hstream/HStream/Common/ZookeeperSlotAlloc.hs b/common/hstream/HStream/Common/ZookeeperSlotAlloc.hs index dfe771a62..d3694253b 100644 --- a/common/hstream/HStream/Common/ZookeeperSlotAlloc.hs +++ b/common/hstream/HStream/Common/ZookeeperSlotAlloc.hs @@ -14,29 +14,31 @@ module HStream.Common.ZookeeperSlotAlloc , getSlotValueByName ) where -import Control.Exception (throwIO, try) +import Control.Exception (throwIO, try) import Control.Monad -import qualified Data.ByteString.Lazy as BSL -import qualified Data.Map.Strict as Map -import Data.Maybe (isJust) -import Data.Text (Text) -import Data.Word (Word64) -import GHC.Stack (HasCallStack) -import qualified Proto3.Suite as PB -import qualified Z.Data.Builder as ZB -import qualified Z.Data.CBytes as CBytes -import Z.Data.CBytes (CBytes) -import qualified Z.Data.Parser as ZP -import qualified Z.Data.Vector as ZV -import qualified Z.Foreign as Z -import qualified ZooKeeper as ZK -import qualified ZooKeeper.Exception as ZK -import qualified ZooKeeper.Recipe as ZK -import qualified ZooKeeper.Types as ZK - -import qualified HStream.Common.ProtoTypes as HT -import qualified HStream.Exception as HE -import qualified HStream.Logger as Log +import qualified Data.ByteString.Lazy as BSL +import qualified Data.Map.Strict as Map +import Data.Maybe (isJust) +import Data.Text (Text) +import Data.Word (Word64) +import GHC.Stack (HasCallStack) +import qualified Proto3.Suite as PB +import qualified Z.Data.Builder as ZB +import qualified Z.Data.CBytes as CBytes +import Z.Data.CBytes (CBytes) +import qualified Z.Data.Parser as ZP +import qualified Z.Data.Vector as ZV +import qualified Z.Foreign as Z +import qualified ZooKeeper as ZK +import qualified ZooKeeper.Exception as ZK +import qualified ZooKeeper.Recipe as ZK +import qualified ZooKeeper.Types as ZK + +import qualified HStream.Common.ProtoTypes as HT +import HStream.Common.ZookeeperClient (ZookeeperClient, + unsafeGetZHandle) +import qualified HStream.Exception as HE +import qualified HStream.Logger as Log ------------------------------------------------------------------------------- @@ -47,7 +49,7 @@ type SlotAttrs = Map.Map Text Text data SlotConfig = SlotConfig { slotRoot :: {-# UNPACK #-} !CBytes -- ^ NOTE: the root path should NOT have a trailing '/' - , slotZkHandler :: {-# UNPACK #-} !ZK.ZHandle + , slotZkHandler :: {-# UNPACK #-} !ZookeeperClient , slotOffset :: {-# UNPACK #-} !Word64 , slotMaxCapbility :: {-# UNPACK #-} !Word64 } @@ -70,13 +72,16 @@ data SlotConfig = SlotConfig -- - ... initSlot :: SlotConfig -> IO () initSlot SlotConfig{..} = do - void $ ZK.zooCreateIfMissing slotZkHandler slotRoot Nothing ZK.zooOpenAclUnsafe ZK.ZooPersistent - void $ ZK.zooCreateIfMissing slotZkHandler (slotRoot <> "/free") (Just $ encodeSlotValue 0) ZK.zooOpenAclUnsafe ZK.ZooPersistent - void $ ZK.zooCreateIfMissing slotZkHandler (slotRoot <> "/table") Nothing ZK.zooOpenAclUnsafe ZK.ZooPersistent - void $ ZK.zooCreateIfMissing slotZkHandler (slotRoot <> "/name") Nothing ZK.zooOpenAclUnsafe ZK.ZooPersistent + zh <- unsafeGetZHandle slotZkHandler + void $ ZK.zooCreateIfMissing zh slotRoot Nothing ZK.zooOpenAclUnsafe ZK.ZooPersistent + void $ ZK.zooCreateIfMissing zh (slotRoot <> "/free") (Just $ encodeSlotValue 0) ZK.zooOpenAclUnsafe ZK.ZooPersistent + void $ ZK.zooCreateIfMissing zh (slotRoot <> "/table") Nothing ZK.zooOpenAclUnsafe ZK.ZooPersistent + void $ ZK.zooCreateIfMissing zh (slotRoot <> "/name") Nothing ZK.zooOpenAclUnsafe ZK.ZooPersistent clearSlot :: SlotConfig -> IO () -clearSlot SlotConfig{..} = ZK.zooDeleteAll slotZkHandler slotRoot +clearSlot SlotConfig{..} = do + zh <- unsafeGetZHandle slotZkHandler + ZK.zooDeleteAll zh slotRoot allocateSlot :: HasCallStack @@ -99,7 +104,8 @@ allocateSlot c@SlotConfig{..} name attrs valAttrs = withLock c $ do let op1 = ZK.zooCreateOpInit name_path (Just $ encodeSlot slot) 0 ZK.zooOpenAclUnsafe ZK.ZooPersistent op2 = ZK.zooSetOpInit free_path (Just $ encodeSlotValue free_slot') Nothing - results <- ZK.zooMulti slotZkHandler [op1, op2] + zh <- unsafeGetZHandle slotZkHandler + results <- ZK.zooMulti zh [op1, op2] -- The length of the results should be 2 ZK.assertZooOpResultOK $ results !! 0 ZK.assertZooOpResultOK $ results !! 1 @@ -129,7 +135,8 @@ deallocateSlot c@SlotConfig{..} name = withLock c $ do (free_slot_val', table_set_ops) <- deallocate free_slot_val vals [] let op2 = ZK.zooSetOpInit free_slot_node (Just $ encodeSlotValue free_slot_val') Nothing op3 = ZK.zooDeleteOpInit name_node Nothing - results <- ZK.zooMulti slotZkHandler (table_set_ops ++ [op2, op3]) + zh <- unsafeGetZHandle slotZkHandler + results <- ZK.zooMulti zh (table_set_ops ++ [op2, op3]) forM_ results ZK.assertZooOpResultOK pure vals where @@ -143,7 +150,8 @@ deallocateSlot c@SlotConfig{..} name = withLock c $ do doesSlotExist :: SlotConfig -> CBytes -> IO Bool doesSlotExist SlotConfig{..} name = do let path = slotRoot <> "/name/" <> name - isJust <$> ZK.zooExists slotZkHandler path + zh <- unsafeGetZHandle slotZkHandler + isJust <$> ZK.zooExists zh path doesSlotValueExist :: SlotConfig -> CBytes -> SlotValue -> IO Bool doesSlotValueExist c name value = do @@ -177,10 +185,11 @@ getTable SlotConfig{..} idx = do let node = slotRoot <> "/table/" <> CBytes.buildCBytes (ZB.int idx) errmsg = "getTable failed: " <> show node - e <- try $ ZK.dataCompletionValue <$> ZK.zooGet slotZkHandler node + zh <- unsafeGetZHandle slotZkHandler + e <- try $ ZK.dataCompletionValue <$> ZK.zooGet zh node case e of Left (_ :: ZK.ZNONODE) -> do - void $ ZK.zooCreate slotZkHandler node (Just $ encodeSlotValue $ idx + 1) ZK.zooOpenAclUnsafe ZK.ZooPersistent + void $ ZK.zooCreate zh node (Just $ encodeSlotValue $ idx + 1) ZK.zooOpenAclUnsafe ZK.ZooPersistent return (idx + 1) Right (Just a) -> decodeSlotValueThrow a Right Nothing -> throwIO $ HE.SlotAllocDecodeError errmsg @@ -189,7 +198,8 @@ getTable SlotConfig{..} idx = do slotValueGet :: HasCallStack => SlotConfig -> CBytes -> String -> IO SlotValue slotValueGet SlotConfig{..} path errmsg = do - m_bs <- ZK.dataCompletionValue <$> ZK.zooGet slotZkHandler path + zh <- unsafeGetZHandle slotZkHandler + m_bs <- ZK.dataCompletionValue <$> ZK.zooGet zh path case m_bs of Just bs -> decodeSlotValueThrow bs Nothing -> throwIO $ HE.SlotAllocDecodeError errmsg @@ -197,7 +207,8 @@ slotValueGet SlotConfig{..} path errmsg = do slotGet :: HasCallStack => SlotConfig -> CBytes -> String -> IO HT.Slot slotGet SlotConfig{..} path errmsg = do - m_bs <- ZK.dataCompletionValue <$> ZK.zooGet slotZkHandler path + zh <- unsafeGetZHandle slotZkHandler + m_bs <- ZK.dataCompletionValue <$> ZK.zooGet zh path case m_bs of Just bs -> decodeSlotThrow bs Nothing -> throwIO $ HE.SlotAllocDecodeError errmsg @@ -226,6 +237,7 @@ decodeSlotValueThrow b = withLock :: SlotConfig -> IO a -> IO a withLock SlotConfig{..} action = do - clientid <- ZK.clientId <$> (ZK.peekClientId =<< ZK.zooClientID slotZkHandler) + zh <- unsafeGetZHandle slotZkHandler + clientid <- ZK.clientId <$> (ZK.peekClientId =<< ZK.zooClientID zh) let clientid' = CBytes.buildCBytes $ ZB.hex clientid - ZK.withLock slotZkHandler slotRoot clientid' action + ZK.withLock zh slotRoot clientid' action diff --git a/common/hstream/HStream/MetaStore/Types.hs b/common/hstream/HStream/MetaStore/Types.hs index dcf2fe4c3..2ebd67aff 100644 --- a/common/hstream/HStream/MetaStore/Types.hs +++ b/common/hstream/HStream/MetaStore/Types.hs @@ -29,8 +29,9 @@ import qualified Z.Foreign as ZF import qualified ZooKeeper as Z import ZooKeeper.Exception (ZooException) import qualified ZooKeeper.Types as Z -import ZooKeeper.Types (ZHandle) +import HStream.Common.ZookeeperClient (ZookeeperClient, + unsafeGetZHandle) import qualified HStream.MetaStore.FileUtils as File import HStream.MetaStore.RqliteUtils (ROp (..), transaction) import qualified HStream.MetaStore.RqliteUtils as RQ @@ -50,15 +51,14 @@ type Version = Int type MetaType value handle = (MetaStore value handle, HasPath value handle) type FHandle = FilePath data RHandle = RHandle Manager Url + data MetaHandle - = ZkHandle ZHandle + = ZKHandle ZookeeperClient -- ^ Zookeeper handle with auto reconnection. | RLHandle RHandle | FileHandle FHandle --- TODO --- | LocalHandle FHandle instance Show MetaHandle where - show (ZkHandle _) = "Zookeeper Handle" + show (ZKHandle _) = "Zookeeper Handle" show (RLHandle _) = "RQLite Handle" show (FileHandle _) = "LocalFile Handle" @@ -105,45 +105,49 @@ class MetaStore value handle where class MetaMulti handle where metaMulti :: [MetaOp] -> handle -> IO () -instance MetaStore value ZHandle where - myPath mid = myRootPath @value @ZHandle <> "/" <> mid - insertMeta mid x zk = RETHROW(createInsertZK zk (myPath @value @ZHandle mid) x ,ZHandle) - updateMeta mid x mv zk = RETHROW(setZkData zk (myPath @value @ZHandle mid) x mv,ZHandle) - upsertMeta mid x zk = RETHROW(upsertZkData zk (myPath @value @ZHandle mid) x ,ZHandle) - deleteMeta mid mv zk = RETHROW(deleteZkPath zk (myPath @value @ZHandle mid) mv ,ZHandle) - deleteAllMeta zk = RETHROW(deleteZkChildren zk (myRootPath @value @ZHandle) ,ZHandle) +instance MetaStore value ZookeeperClient where + myPath mid = myRootPath @value @ZookeeperClient <> "/" <> mid + insertMeta mid x zk = RETHROW(do zk' <- unsafeGetZHandle zk; createInsertZK zk' (myPath @value @ZookeeperClient mid) x ,ZookeeperClient) + updateMeta mid x mv zk = RETHROW(do zk' <- unsafeGetZHandle zk; setZkData zk' (myPath @value @ZookeeperClient mid) x mv,ZookeeperClient) + upsertMeta mid x zk = RETHROW(do zk' <- unsafeGetZHandle zk; upsertZkData zk' (myPath @value @ZookeeperClient mid) x ,ZookeeperClient) + deleteMeta mid mv zk = RETHROW(do zk' <- unsafeGetZHandle zk; deleteZkPath zk' (myPath @value @ZookeeperClient mid) mv ,ZookeeperClient) + deleteAllMeta zk = RETHROW(do zk' <- unsafeGetZHandle zk; deleteZkChildren zk' (myRootPath @value @ZookeeperClient) ,ZookeeperClient) where mid = "some of the meta when deleting" - checkMetaExists mid zk = RETHROW(isJust <$> Z.zooExists zk (textToCBytes (myPath @value @ZHandle mid)),ZHandle) - getMeta mid zk = RETHROW(decodeZNodeValue zk (myPath @value @ZHandle mid),ZHandle) - getMetaWithVer mid zk = RETHROW(action,ZHandle) + checkMetaExists mid zk = RETHROW(do zk' <- unsafeGetZHandle zk; isJust <$> Z.zooExists zk' (textToCBytes (myPath @value @ZookeeperClient mid)),ZookeeperClient) + getMeta mid zk = RETHROW(do zk' <- unsafeGetZHandle zk; decodeZNodeValue zk' (myPath @value @ZookeeperClient mid),ZookeeperClient) + getMetaWithVer mid zkclient = RETHROW(action,ZookeeperClient) where action = do - e_a <- try $ Z.zooGet zk (textToCBytes $ myPath @value @ZHandle mid) + zk <- unsafeGetZHandle zkclient + e_a <- try $ Z.zooGet zk (textToCBytes $ myPath @value @ZookeeperClient mid) case e_a of Left (_ :: ZooException) -> return Nothing Right a -> return $ (, fromIntegral . Z.statVersion . Z.dataCompletionStat $ a) <$> decodeDataCompletion a - getAllMeta zk = RETHROW(action,ZHandle) + getAllMeta zkclient = RETHROW(action,ZookeeperClient) where mid = "some of the meta when getting " action = do - let path = textToCBytes $ myRootPath @value @ZHandle + let path = textToCBytes $ myRootPath @value @ZookeeperClient + zk <- unsafeGetZHandle zkclient ids <- Z.unStrVec . Z.strsCompletionValues <$> Z.zooGetChildren zk path - idAndValues <- catMaybes <$> mapM (\x -> let x' = cBytesToText x in getMeta @value x' zk <&> fmap (x',)) ids + idAndValues <- catMaybes <$> mapM (\x -> let x' = cBytesToText x in getMeta @value x' zkclient <&> fmap (x',)) ids pure $ Map.fromList idAndValues - listMeta zk = RETHROW(action,ZHandle) + listMeta zkclient = RETHROW(action,ZookeeperClient) where mid = "some of the meta when listing" action = do - let path = textToCBytes $ myRootPath @value @ZHandle + let path = textToCBytes $ myRootPath @value @ZookeeperClient + zk <- unsafeGetZHandle zkclient ids <- Z.unStrVec . Z.strsCompletionValues <$> Z.zooGetChildren zk path - catMaybes <$> mapM (flip (getMeta @value) zk . cBytesToText) ids + catMaybes <$> mapM (flip (getMeta @value) zkclient . cBytesToText) ids -instance MetaMulti ZHandle where - metaMulti ops zk = do +instance MetaMulti ZookeeperClient where + metaMulti ops zkclient = do let zOps = map opToZ ops + zk <- unsafeGetZHandle zkclient void $ Z.zooMulti zk zOps where opToZ op = case op of @@ -209,12 +213,15 @@ instance MetaMulti FHandle where DeleteOp p k mv -> File.DeleteOp p k mv CheckOp p k v -> File.CheckOp p k v -instance (ToJSON a, FromJSON a, HasPath a ZHandle, HasPath a RHandle, HasPath a FHandle, Show a) => HasPath a MetaHandle +instance (ToJSON a, FromJSON a, HasPath a ZookeeperClient, HasPath a RHandle, HasPath a FHandle, Show a) => HasPath a MetaHandle #define USE_WHICH_HANDLE(handle, action) \ - case handle of ZkHandle zk -> action zk; RLHandle rq -> action rq; FileHandle io -> action io; + case handle of \ + ZKHandle zk -> action zk; \ + RLHandle rq -> action rq; \ + FileHandle io -> action io; -instance (HasPath value ZHandle, HasPath value RHandle, HasPath value FHandle) => MetaStore value MetaHandle where +instance (HasPath value ZookeeperClient, HasPath value RHandle, HasPath value FHandle) => MetaStore value MetaHandle where myPath = undefined listMeta h = USE_WHICH_HANDLE(h, listMeta @value) insertMeta mid x h = USE_WHICH_HANDLE(h, insertMeta mid x) diff --git a/common/hstream/cbits/hs_zookeeper_client.cpp b/common/hstream/cbits/hs_zookeeper_client.cpp new file mode 100644 index 000000000..a6956e18f --- /dev/null +++ b/common/hstream/cbits/hs_zookeeper_client.cpp @@ -0,0 +1,33 @@ +#include +#include + +// We directly use the ZookeeperClient class from LogDevice, it handles +// SessionExpire that we need to handle. +// +// TODO: +// +// 1. Currently logging messages are printed by ld_info, which is controlled by +// `--store-log-level`. We need to find a way to use the hstream logger. + +extern "C" { + +typedef struct zookeeper_client_t { + std::unique_ptr rep; +} zookeeper_client_t; + +zookeeper_client_t* new_zookeeper_client(const char* quorum, + int session_timeout) { + zookeeper_client_t* client = new zookeeper_client_t(); + client->rep = std::make_unique( + std::string(quorum), std::chrono::milliseconds(session_timeout)); + return client; +} + +zhandle_t* get_underlying_handle(zookeeper_client_t* client) { + return client->rep->getHandle().get(); +} + +void delete_zookeeper_client(zookeeper_client_t* client) { delete client; } + +// end extern "C" +} diff --git a/common/hstream/hstream-common.cabal b/common/hstream/hstream-common.cabal index fc479d1b1..fabd51977 100644 --- a/common/hstream/hstream-common.cabal +++ b/common/hstream/hstream-common.cabal @@ -55,6 +55,7 @@ library HStream.Common.GrpcHaskell HStream.Common.Query HStream.Common.Types + HStream.Common.ZookeeperClient HStream.Common.ZookeeperSlotAlloc HStream.Exception HStream.Instances @@ -86,6 +87,7 @@ library cxx-sources: cbits/hash.cpp + cbits/hs_zookeeper_client.cpp cbits/query/tables/AdminCommandTable.cpp cbits/query.cpp diff --git a/common/hstream/test/HStream/MetaStoreSpec.hs b/common/hstream/test/HStream/MetaStoreSpec.hs index 6c727c3e9..109cedcb8 100644 --- a/common/hstream/test/HStream/MetaStoreSpec.hs +++ b/common/hstream/test/HStream/MetaStoreSpec.hs @@ -15,10 +15,10 @@ import System.Environment (lookupEnv) import System.IO (hClose, openTempFile) import Test.Hspec import Test.QuickCheck (generate) -import ZooKeeper (withResource, - zookeeperResInit) -import ZooKeeper.Types (ZHandle) +import HStream.Common.ZookeeperClient (ZookeeperClient, + unsafeGetZHandle, + withZookeeperClient) import qualified HStream.Logger as Log import qualified HStream.MetaStore.FileUtils as File import HStream.MetaStore.RqliteUtils (createTable, deleteTable) @@ -45,9 +45,8 @@ spec = do -- zookeeper portZk <- runIO $ fromMaybe "2181" <$> lookupEnv "ZOOKEEPER_LOCAL_PORT" let urlZk = textToCBytes $ T.pack $ host <> ":" <> portZk - let res = zookeeperResInit urlZk Nothing 5000 Nothing 0 - runIO $ withResource res $ \zk -> do - let mHandle2 = ZkHandle zk + runIO $ withZookeeperClient urlZk 5000 $ \zk -> do + let mHandle2 = ZKHandle zk hspec $ smokeTest mHandle2 -- local file @@ -62,8 +61,9 @@ spec = do initMeta :: MetaHandle -> IO () initMeta h = case h of - ZkHandle zk -> do - tryCreate zk (textToCBytes $ myRootPath @MetaExample @ZHandle) + ZKHandle zk -> do + zh <- unsafeGetZHandle zk + tryCreate zh (textToCBytes $ myRootPath @MetaExample @ZookeeperClient) RLHandle (RHandle m url) -> do createTable m url (myRootPath @MetaExample @RHandle) FileHandle fh -> do diff --git a/common/hstream/test/HStream/TestUtils.hs b/common/hstream/test/HStream/TestUtils.hs index 2a0fd63c8..53fbdde9c 100644 --- a/common/hstream/test/HStream/TestUtils.hs +++ b/common/hstream/test/HStream/TestUtils.hs @@ -1,18 +1,19 @@ {-# LANGUAGE DeriveAnyClass #-} {-# LANGUAGE FlexibleInstances #-} {-# LANGUAGE MultiParamTypeClasses #-} -{-# LANGUAGE TypeSynonymInstances #-} module HStream.TestUtils where -import qualified Data.Aeson as A -import qualified Data.Text as T -import GHC.Generics (Generic) -import HStream.MetaStore.Types (FHandle, HasPath (..), RHandle) -import Test.QuickCheck (Arbitrary (..), Gen, chooseEnum, - elements, frequency, listOf, listOf1, - oneof) -import ZooKeeper.Types (ZHandle) +import qualified Data.Aeson as A +import qualified Data.Text as T +import GHC.Generics (Generic) +import Test.QuickCheck (Arbitrary (..), Gen, + chooseEnum, elements, + frequency, listOf, listOf1, + oneof) + +import HStream.Common.ZookeeperClient (ZookeeperClient) +import HStream.MetaStore.Types (FHandle, HasPath (..), RHandle) data MetaExample = Meta { metaId :: T.Text @@ -21,7 +22,7 @@ data MetaExample = Meta { } deriving (Ord, Show, Eq, Generic, A.FromJSON, A.ToJSON) -instance HasPath MetaExample ZHandle where +instance HasPath MetaExample ZookeeperClient where myRootPath = "/testTemp" instance HasPath MetaExample RHandle where myRootPath = "testTemp" diff --git a/common/hstream/test/HStream/ZookeeperSlotAllocSpec.hs b/common/hstream/test/HStream/ZookeeperSlotAllocSpec.hs index 6463432a1..6aa625f52 100644 --- a/common/hstream/test/HStream/ZookeeperSlotAllocSpec.hs +++ b/common/hstream/test/HStream/ZookeeperSlotAllocSpec.hs @@ -13,6 +13,7 @@ import ZooKeeper (withResource, import ZooKeeper.Exception (ZNODEEXISTS) import ZooKeeper.Types (ZHandle) +import HStream.Common.ZookeeperClient import HStream.Common.ZookeeperSlotAlloc import HStream.Exception import HStream.Utils (textToCBytes) @@ -22,16 +23,15 @@ spec = do let host = "127.0.0.1" port <- runIO $ fromMaybe "2181" <$> lookupEnv "ZOOKEEPER_LOCAL_PORT" let url = textToCBytes $ T.pack $ host <> ":" <> port - res = zookeeperResInit url Nothing 5000 Nothing 0 - runIO $ withResource res $ \zh -> hspec $ runTests zh + runIO $ withZookeeperClient url 5000 $ \zk -> hspec $ runTests zk -- TODO: -- 1. properties tests -- 2. concurrent tests -runTests :: ZHandle -> Spec -runTests zh = describe "HStream.ZookeeperSlotAllocSpec" $ do - let c1 = SlotConfig{slotRoot="/tmp_a", slotZkHandler=zh, slotOffset=1, slotMaxCapbility=50} - c2 = SlotConfig{slotRoot="/tmp_b", slotZkHandler=zh, slotOffset=1, slotMaxCapbility=2} +runTests :: ZookeeperClient -> Spec +runTests zk = describe "HStream.ZookeeperSlotAllocSpec" $ do + let c1 = SlotConfig{slotRoot="/tmp_a", slotZkHandler=zk, slotOffset=1, slotMaxCapbility=50} + c2 = SlotConfig{slotRoot="/tmp_b", slotZkHandler=zk, slotOffset=1, slotMaxCapbility=2} valAttrs1 = SlotValueAttrs (Map.fromList [("k1", "v1")]) valAttrs2 = SlotValueAttrs (Map.fromList [("k2", "v2")]) diff --git a/common/server/HStream/Common/Server/MetaData.hs b/common/server/HStream/Common/Server/MetaData.hs index 262e9cd91..38a48c9e6 100644 --- a/common/server/HStream/Common/Server/MetaData.hs +++ b/common/server/HStream/Common/Server/MetaData.hs @@ -22,7 +22,10 @@ import Data.Aeson (FromJSON (..), ToJSON (..)) import qualified Data.Aeson as Aeson import qualified Data.ByteString.Lazy as BSL +import Data.Int (Int32) import Data.Text (Text) +import qualified Data.Text as T +import qualified Data.Vector as V import Data.Word import GHC.Generics (Generic) import GHC.Stack (HasCallStack) @@ -30,12 +33,10 @@ import System.Directory (doesFileExist) import System.FileLock (SharedExclusive (Exclusive), withTryFileLock) import Z.Data.CBytes (CBytes) -import ZooKeeper.Types (ZHandle) -import Data.Int (Int32) -import qualified Data.Text as T -import qualified Data.Vector as V import HStream.Common.Server.MetaData.Values +import HStream.Common.ZookeeperClient (ZookeeperClient, + unsafeGetZHandle) import qualified HStream.Exception as HE import qualified HStream.MetaStore.FileUtils as File import qualified HStream.MetaStore.RqliteUtils as Rqlite @@ -56,7 +57,7 @@ data TaskAllocation = TaskAllocation instance FromJSON TaskAllocation instance ToJSON TaskAllocation -instance HasPath TaskAllocation ZHandle where +instance HasPath TaskAllocation ZookeeperClient where myRootPath = rootPath <> "/taskAllocations" instance HasPath TaskAllocation RHandle where @@ -90,7 +91,7 @@ data GroupMetadataValue instance FromJSON GroupMetadataValue instance ToJSON GroupMetadataValue -instance HasPath GroupMetadataValue ZHandle where +instance HasPath GroupMetadataValue ZookeeperClient where myRootPath = rootPath <> "/groups" instance HasPath GroupMetadataValue RHandle where @@ -118,7 +119,7 @@ instance ToJSON MemberMetadataValue ------------------------------------------------------------ -- metadata for some common utils ------------------------------------------------------------ -instance HasPath Proto.Timestamp ZHandle where +instance HasPath Proto.Timestamp ZookeeperClient where myRootPath = rootPath <> "/timestamp" instance HasPath Proto.Timestamp RHandle where myRootPath = "timestamp" @@ -128,8 +129,10 @@ instance HasPath Proto.Timestamp FHandle where ------------------------------------------------------------ -- Metadata Initialization (common methods) ------------------------------------------------------------ -initializeZkPaths :: HasCallStack => ZHandle -> [CBytes] -> IO () -initializeZkPaths zk = mapM_ (ZK.tryCreate zk) +initializeZkPaths :: HasCallStack => ZookeeperClient -> [CBytes] -> IO () +initializeZkPaths zkclient paths = do + zh <- unsafeGetZHandle zkclient + mapM_ (ZK.tryCreate zh) paths initializeRqTables :: RHandle -> [Text] -> IO () initializeRqTables (RHandle m url) = mapM_ (handleExists . Rqlite.createTable m url) diff --git a/hstream-io/HStream/IO/Types.hs b/hstream-io/HStream/IO/Types.hs index 3f46a55eb..1453525df 100644 --- a/hstream-io/HStream/IO/Types.hs +++ b/hstream-io/HStream/IO/Types.hs @@ -5,33 +5,33 @@ module HStream.IO.Types where -import qualified Control.Concurrent as C -import Control.Exception (Exception) -import qualified Control.Exception as E -import qualified Data.Aeson as J -import qualified Data.Aeson.KeyMap as J -import qualified Data.Aeson.Text as J -import qualified Data.ByteString.Lazy as BSL -import qualified Data.ByteString.Lazy.Char8 as BSLC -import qualified Data.HashMap.Strict as HM -import Data.IORef (IORef) -import qualified Data.Text as T -import qualified Data.Text.Lazy as TL -import qualified Data.Vector as Vector -import GHC.Generics (Generic) -import qualified GHC.IO.Handle as IO -import qualified System.Process.Typed as TP -import ZooKeeper.Types (ZHandle) - -import qualified HStream.Exception as E -import qualified HStream.IO.LineReader as LR -import HStream.MetaStore.Types (FHandle, HasPath (..), MetaHandle, - RHandle (..)) -import qualified HStream.Server.HStreamApi as API -import qualified HStream.Stats as Stats -import qualified HStream.ThirdParty.Protobuf as Grpc -import qualified HStream.ThirdParty.Protobuf as PB -import Proto3.Suite (def) +import qualified Control.Concurrent as C +import Control.Exception (Exception) +import qualified Control.Exception as E +import qualified Data.Aeson as J +import qualified Data.Aeson.KeyMap as J +import qualified Data.Aeson.Text as J +import qualified Data.ByteString.Lazy as BSL +import qualified Data.ByteString.Lazy.Char8 as BSLC +import qualified Data.HashMap.Strict as HM +import Data.IORef (IORef) +import qualified Data.Text as T +import qualified Data.Text.Lazy as TL +import qualified Data.Vector as Vector +import GHC.Generics (Generic) +import qualified GHC.IO.Handle as IO +import qualified System.Process.Typed as TP + +import HStream.Common.ZookeeperClient (ZookeeperClient) +import qualified HStream.Exception as E +import qualified HStream.IO.LineReader as LR +import HStream.MetaStore.Types (FHandle, HasPath (..), + MetaHandle, RHandle (..)) +import qualified HStream.Server.HStreamApi as API +import qualified HStream.Stats as Stats +import qualified HStream.ThirdParty.Protobuf as Grpc +import qualified HStream.ThirdParty.Protobuf as PB +import Proto3.Suite (def) data IOTaskType = SOURCE | SINK deriving (Show, Eq, Generic, J.FromJSON, J.ToJSON) @@ -130,13 +130,13 @@ newtype TaskKvMeta = TaskKvMeta ioRootPath :: T.Text ioRootPath = "/hstream/io" -instance HasPath TaskMeta ZHandle where +instance HasPath TaskMeta ZookeeperClient where myRootPath = ioRootPath <> "/tasks" -instance HasPath TaskIdMeta ZHandle where +instance HasPath TaskIdMeta ZookeeperClient where myRootPath = ioRootPath <> "/taskNames" -instance HasPath TaskKvMeta ZHandle where +instance HasPath TaskKvMeta ZookeeperClient where myRootPath = ioRootPath <> "/taskKvs" instance HasPath TaskMeta RHandle where diff --git a/hstream-kafka/HStream/Kafka/Common/AclStore.hs b/hstream-kafka/HStream/Kafka/Common/AclStore.hs index 89be63e8d..1271e0c71 100644 --- a/hstream-kafka/HStream/Kafka/Common/AclStore.hs +++ b/hstream-kafka/HStream/Kafka/Common/AclStore.hs @@ -5,15 +5,15 @@ module HStream.Kafka.Common.AclStore where import Control.Monad import qualified Data.Map.Strict as Map import qualified Data.Text as T -import ZooKeeper.Types (ZHandle) import qualified HStream.Common.Server.MetaData as Meta +import HStream.Common.ZookeeperClient (ZookeeperClient) import HStream.Kafka.Common.AclEntry import HStream.Kafka.Common.Resource import HStream.MetaStore.Types () import qualified HStream.MetaStore.Types as Meta -instance Meta.HasPath AclResourceNode ZHandle where +instance Meta.HasPath AclResourceNode ZookeeperClient where myRootPath = Meta.kafkaRootPath <> "/acls" instance Meta.HasPath AclResourceNode Meta.RHandle where myRootPath = "acls" diff --git a/hstream-kafka/HStream/Kafka/Server/MetaData.hs b/hstream-kafka/HStream/Kafka/Server/MetaData.hs index 0ee127f47..1235554ed 100644 --- a/hstream-kafka/HStream/Kafka/Server/MetaData.hs +++ b/hstream-kafka/HStream/Kafka/Server/MetaData.hs @@ -6,6 +6,7 @@ import Z.Data.CBytes (CBytes) import ZooKeeper.Types (ZHandle) import HStream.Common.Server.MetaData +import HStream.Common.ZookeeperClient (ZookeeperClient) import HStream.Kafka.Common.AclEntry import HStream.Kafka.Common.AclStore () import HStream.MetaStore.Types @@ -18,10 +19,10 @@ kafkaZkPaths :: [CBytes] kafkaZkPaths = [ textToCBytes rootPath , textToCBytes kafkaRootPath - , textToCBytes $ myRootPath @Proto.Timestamp @ZHandle - , textToCBytes $ myRootPath @TaskAllocation @ZHandle - , textToCBytes $ myRootPath @GroupMetadataValue @ZHandle - , textToCBytes $ myRootPath @AclResourceNode @ZHandle + , textToCBytes $ myRootPath @Proto.Timestamp @ZookeeperClient + , textToCBytes $ myRootPath @TaskAllocation @ZookeeperClient + , textToCBytes $ myRootPath @GroupMetadataValue @ZookeeperClient + , textToCBytes $ myRootPath @AclResourceNode @ZookeeperClient ] kafkaRqTables :: [Text] @@ -40,7 +41,7 @@ kafkaFileTables = , myRootPath @AclResourceNode @FHandle ] -initKafkaZkPaths :: HasCallStack => ZHandle -> IO () +initKafkaZkPaths :: HasCallStack => ZookeeperClient -> IO () initKafkaZkPaths zk = initializeZkPaths zk kafkaZkPaths initKafkaRqTables :: RHandle -> IO () diff --git a/hstream-kafka/HStream/Kafka/Server/Types.hs b/hstream-kafka/HStream/Kafka/Server/Types.hs index efc3b3194..b79a18b04 100644 --- a/hstream-kafka/HStream/Kafka/Server/Types.hs +++ b/hstream-kafka/HStream/Kafka/Server/Types.hs @@ -72,7 +72,7 @@ initServerContext opts@ServerOpts{..} gossipContext mh = do authorizer <- case _enableAcl of False -> return $ AuthorizerObject @AuthorizerObject Nothing True -> case mh of - ZkHandle zkHandle -> do + ZKHandle zkHandle -> do x <- newAclAuthorizer (pure zkHandle) initAclAuthorizer x return $ AuthorizerObject (Just x) diff --git a/hstream-kafka/tests/HStream/Kafka/Common/TestUtils.hs b/hstream-kafka/tests/HStream/Kafka/Common/TestUtils.hs index df49a3012..e71a59818 100644 --- a/hstream-kafka/tests/HStream/Kafka/Common/TestUtils.hs +++ b/hstream-kafka/tests/HStream/Kafka/Common/TestUtils.hs @@ -28,8 +28,8 @@ import System.IO.Unsafe (unsafePerformIO) import Test.Hspec import qualified Z.Data.CBytes as CB import qualified Z.Data.CBytes as CBytes -import qualified ZooKeeper as ZK +import HStream.Common.ZookeeperClient import HStream.Kafka.Common.Acl import HStream.Kafka.Common.Authorizer import HStream.Kafka.Common.Authorizer.Class @@ -113,10 +113,9 @@ withZkBasedAclAuthorizer :: ActionWith AuthorizerObject -> IO () withZkBasedAclAuthorizer action = do zkPortStr <- fromMaybe "2181" <$> lookupEnv "ZOOKEEPER_LOCAL_PORT" let zkAddr = "127.0.0.1" <> ":" <> CB.pack zkPortStr - let res = ZK.zookeeperResInit zkAddr Nothing 5000 Nothing 0 - ZK.withResource res $ \zkHandle -> do - Meta.initKafkaZkPaths zkHandle - authorizer <- newAclAuthorizer (pure zkHandle) + withZookeeperClient zkAddr 5000 $ \zk -> do + Meta.initKafkaZkPaths zk + authorizer <- newAclAuthorizer (pure zk) initAclAuthorizer authorizer action (AuthorizerObject $ Just authorizer) diff --git a/hstream/app/lib/KafkaServer.hs b/hstream/app/lib/KafkaServer.hs index 7eec7ce01..f786ceb5b 100644 --- a/hstream/app/lib/KafkaServer.hs +++ b/hstream/app/lib/KafkaServer.hs @@ -26,20 +26,18 @@ import Data.Maybe (isJust) import qualified Data.Set as Set import qualified Data.Text as T import Data.Text.Encoding (decodeUtf8, encodeUtf8) +import qualified Data.Vector as V import Network.HTTP.Client (defaultManagerSettings, newManager) import System.Environment (getArgs) import System.IO (hPutStrLn, stderr) -import ZooKeeper (withResource, - zookeeperResInit) - -import qualified Data.Vector as V import HStream.Base (setupFatalSignalHandler) import HStream.Common.Server.HashRing (updateHashRing) import qualified HStream.Common.Server.MetaData as M import qualified HStream.Common.Server.TaskManager as TM import HStream.Common.Types (getHStreamVersion) +import HStream.Common.ZookeeperClient (withZookeeperClient) import qualified HStream.Exception as HE import HStream.Gossip (GossipContext (..), defaultGossipOpts, @@ -90,8 +88,8 @@ app config@ServerOpts{..} = do logType _serverLogFlushImmediately case _metaStore of ZkAddr addr -> do - let zkRes = zookeeperResInit addr Nothing 5000 Nothing 0 - withResource zkRes $ \zk -> M.initKafkaZkPaths zk >> action (ZkHandle zk) + withZookeeperClient addr 5000 $ \zkclient -> + M.initKafkaZkPaths zkclient >> action (ZKHandle zkclient) RqAddr addr -> do m <- newManager defaultManagerSettings let rq = RHandle m addr diff --git a/hstream/app/server.hs b/hstream/app/server.hs index e49592559..fb1d33971 100644 --- a/hstream/app/server.hs +++ b/hstream/app/server.hs @@ -2,10 +2,8 @@ {-# LANGUAGE DataKinds #-} {-# LANGUAGE GADTs #-} {-# LANGUAGE LambdaCase #-} -{-# LANGUAGE NamedFieldPuns #-} {-# LANGUAGE OverloadedRecordDot #-} {-# LANGUAGE OverloadedStrings #-} -{-# LANGUAGE PatternSynonyms #-} {-# LANGUAGE RecordWildCards #-} {-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE TypeApplications #-} @@ -31,14 +29,13 @@ import Network.HTTP.Client (defaultManagerSettings, newManager) import System.Environment (getArgs) import System.IO (hPutStrLn, stderr) -import ZooKeeper (withResource, - zookeeperResInit) import HStream.Base (setupFatalSignalHandler) import HStream.Common.Server.HashRing (updateHashRing) import HStream.Common.Server.MetaData (TaskAllocation (..), clusterStartTimeId) import HStream.Common.Types (getHStreamVersion) +import HStream.Common.ZookeeperClient (withZookeeperClient) import HStream.Exception import HStream.Gossip (GossipContext (..), defaultGossipOpts, @@ -115,18 +112,19 @@ app config@ServerOpts{..} = do logType _serverLogFlushImmediately bracket (openRocksDBHandle _querySnapshotPath) closeRocksDBHandle $ \db_m -> - case _metaStore of - ZkAddr addr -> do - let zkRes = zookeeperResInit addr Nothing 5000 Nothing 0 - withResource zkRes $ \zk -> initHStreamZkPaths zk >> action (ZkHandle zk) db_m - RqAddr addr -> do - m <- newManager defaultManagerSettings - let rq = RHandle m addr - initHStreamRqTables rq - action (RLHandle rq) db_m - FileAddr addr -> do - initHStreamFileTables addr - action (FileHandle addr) db_m + case _metaStore of + ZkAddr addr -> do + withZookeeperClient addr 5000 $ \zkclient -> do + initHStreamZkPaths zkclient + action (ZKHandle zkclient) db_m + RqAddr addr -> do + m <- newManager defaultManagerSettings + let rq = RHandle m addr + initHStreamRqTables rq + action (RLHandle rq) db_m + FileAddr addr -> do + initHStreamFileTables addr + action (FileHandle addr) db_m where action h db_m = do hstreamVersion <- getHStreamVersion diff --git a/hstream/src/HStream/Server/Experimental/StreamV2.hs b/hstream/src/HStream/Server/Experimental/StreamV2.hs index 5452612b9..ea1c829c2 100644 --- a/hstream/src/HStream/Server/Experimental/StreamV2.hs +++ b/hstream/src/HStream/Server/Experimental/StreamV2.hs @@ -13,6 +13,7 @@ import qualified ZooKeeper as ZK import qualified ZooKeeper.Exception as ZK import qualified ZooKeeper.Types as ZK +import HStream.Common.ZookeeperClient (unsafeGetZHandle) import qualified HStream.Common.ZookeeperSlotAlloc as Slot import qualified HStream.MetaStore.Types as Meta import qualified HStream.Server.Handler.Admin as H @@ -137,11 +138,12 @@ initDefLogGoup ServerContext{..} = do initZkSlot :: ServerContext -> IO Slot.SlotConfig initZkSlot sc@ServerContext{..} = do case metaHandle of - Meta.ZkHandle zh -> do + Meta.ZKHandle zhclient -> do + zh <- unsafeGetZHandle zhclient catch (void $ ZK.zooCreate zh "/hstream" Nothing ZK.zooOpenAclUnsafe ZK.ZooPersistent) $ \(_ :: ZK.ZNODEEXISTS) -> pure () let slotConfig = Slot.SlotConfig{ slotRoot = defZkRoot - , slotZkHandler = zh + , slotZkHandler = zhclient , slotOffset = defLogGroupStart , slotMaxCapbility = defLogGroupEnd - defLogGroupStart + 1 } diff --git a/hstream/src/HStream/Server/MetaData/Types.hs b/hstream/src/HStream/Server/MetaData/Types.hs index cfbd65bb8..0818e4b80 100644 --- a/hstream/src/HStream/Server/MetaData/Types.hs +++ b/hstream/src/HStream/Server/MetaData/Types.hs @@ -47,7 +47,9 @@ module HStream.Server.MetaData.Types ) where import Control.Exception (catches) +import Control.Monad (forM) import Data.Aeson (FromJSON (..), ToJSON (..)) +import qualified Data.Aeson as Aeson import qualified Data.HashMap.Strict as HM import Data.Int (Int64) import qualified Data.IntMap as IntMap @@ -62,11 +64,9 @@ import Data.Word (Word32, Word64) import GHC.Generics (Generic) import GHC.IO (unsafePerformIO) import GHC.Stack -import ZooKeeper.Types (ZHandle) -import Control.Monad (forM) -import qualified Data.Aeson as Aeson import HStream.Common.Server.MetaData (rootPath) +import HStream.Common.ZookeeperClient (ZookeeperClient) import qualified HStream.Logger as Log import HStream.MetaStore.Types (FHandle, HasPath (..), MetaHandle, @@ -175,22 +175,22 @@ data ShardReaderMeta = ShardReaderMeta -- ^ use to record start time offset } deriving (Show, Generic, FromJSON, ToJSON) -instance HasPath ShardReaderMeta ZHandle where +instance HasPath ShardReaderMeta ZookeeperClient where myRootPath = rootPath <> "/shardReader" myExceptionHandler = zkExceptionHandlers ResShardReader -instance HasPath SubscriptionWrap ZHandle where +instance HasPath SubscriptionWrap ZookeeperClient where myRootPath = rootPath <> "/subscriptions" myExceptionHandler = zkExceptionHandlers ResSubscription -instance HasPath QueryInfo ZHandle where +instance HasPath QueryInfo ZookeeperClient where myRootPath = rootPath <> "/queries" myExceptionHandler = zkExceptionHandlers ResQuery -instance HasPath ViewInfo ZHandle where +instance HasPath ViewInfo ZookeeperClient where myRootPath = rootPath <> "/views" myExceptionHandler = zkExceptionHandlers ResView -instance HasPath QueryStatus ZHandle where +instance HasPath QueryStatus ZookeeperClient where myRootPath = rootPath <> "/queryStatus" myExceptionHandler = zkExceptionHandlers ResQuery -instance HasPath QVRelation ZHandle where +instance HasPath QVRelation ZookeeperClient where myRootPath = rootPath <> "/qvRelation" instance HasPath ShardReaderMeta RHandle where @@ -317,7 +317,7 @@ groupbyStores = unsafePerformIO $ newIORef HM.empty -------------------------------------------------------------------------------- #ifdef HStreamEnableSchema -instance HasPath SQL.Schema ZHandle where +instance HasPath SQL.Schema ZookeeperClient where myRootPath = rootPath <> "/schemas" instance HasPath SQL.Schema FHandle where myRootPath = "schemas" diff --git a/hstream/src/HStream/Server/MetaData/Utils.hs b/hstream/src/HStream/Server/MetaData/Utils.hs index 1658739af..cfe249ede 100644 --- a/hstream/src/HStream/Server/MetaData/Utils.hs +++ b/hstream/src/HStream/Server/MetaData/Utils.hs @@ -3,11 +3,11 @@ module HStream.Server.MetaData.Utils where import Control.Monad import qualified Data.Text as T import GHC.Stack (HasCallStack) -import ZooKeeper.Types (ZHandle) import HStream.Common.Server.MetaData (initializeFileTables, initializeRqTables, initializeZkPaths) +import HStream.Common.ZookeeperClient (ZookeeperClient) import HStream.MetaStore.Types (FHandle, MetaHandle, MetaMulti (..), MetaStore (..), RHandle (..)) @@ -15,7 +15,7 @@ import HStream.Server.HStreamApi (Subscription (..)) import HStream.Server.MetaData.Value (fileTables, paths, tables) import HStream.Server.Types (SubscriptionWrap (..)) -initHStreamZkPaths :: HasCallStack => ZHandle -> IO () +initHStreamZkPaths :: HasCallStack => ZookeeperClient -> IO () initHStreamZkPaths zk = initializeZkPaths zk paths initHStreamRqTables :: RHandle -> IO () diff --git a/hstream/src/HStream/Server/MetaData/Value.hs b/hstream/src/HStream/Server/MetaData/Value.hs index 0c2e53384..06a75d7c8 100644 --- a/hstream/src/HStream/Server/MetaData/Value.hs +++ b/hstream/src/HStream/Server/MetaData/Value.hs @@ -4,9 +4,9 @@ module HStream.Server.MetaData.Value where import Data.Text (Text) import Z.Data.CBytes (CBytes) -import ZooKeeper.Types (ZHandle) import HStream.Common.Server.MetaData (TaskAllocation) +import HStream.Common.ZookeeperClient (ZookeeperClient) import HStream.IO.Types import HStream.MetaStore.Types (FHandle, HasPath (myRootPath), RHandle (..)) @@ -19,19 +19,19 @@ import HStream.Utils (textToCBytes) paths :: [CBytes] paths = [ textToCBytes rootPath , textToCBytes ioRootPath - , textToCBytes $ myRootPath @TaskIdMeta @ZHandle - , textToCBytes $ myRootPath @TaskMeta @ZHandle - , textToCBytes $ myRootPath @TaskKvMeta @ZHandle - , textToCBytes $ myRootPath @ShardReaderMeta @ZHandle - , textToCBytes $ myRootPath @QueryInfo @ZHandle - , textToCBytes $ myRootPath @QueryStatus @ZHandle - , textToCBytes $ myRootPath @ViewInfo @ZHandle - , textToCBytes $ myRootPath @SubscriptionWrap @ZHandle - , textToCBytes $ myRootPath @Proto.Timestamp @ZHandle - , textToCBytes $ myRootPath @TaskAllocation @ZHandle - , textToCBytes $ myRootPath @QVRelation @ZHandle + , textToCBytes $ myRootPath @TaskIdMeta @ZookeeperClient + , textToCBytes $ myRootPath @TaskMeta @ZookeeperClient + , textToCBytes $ myRootPath @TaskKvMeta @ZookeeperClient + , textToCBytes $ myRootPath @ShardReaderMeta @ZookeeperClient + , textToCBytes $ myRootPath @QueryInfo @ZookeeperClient + , textToCBytes $ myRootPath @QueryStatus @ZookeeperClient + , textToCBytes $ myRootPath @ViewInfo @ZookeeperClient + , textToCBytes $ myRootPath @SubscriptionWrap @ZookeeperClient + , textToCBytes $ myRootPath @Proto.Timestamp @ZookeeperClient + , textToCBytes $ myRootPath @TaskAllocation @ZookeeperClient + , textToCBytes $ myRootPath @QVRelation @ZookeeperClient #ifdef HStreamEnableSchema - , textToCBytes $ myRootPath @SQL.Schema @ZHandle + , textToCBytes $ myRootPath @SQL.Schema @ZookeeperClient #endif ]