Skip to content

Commit

Permalink
Merge branch 'main' into update-log
Browse files Browse the repository at this point in the history
  • Loading branch information
YangKian authored Apr 25, 2024
2 parents d2d98b1 + 3d22f08 commit 29eac3c
Show file tree
Hide file tree
Showing 20 changed files with 291 additions and 189 deletions.
46 changes: 46 additions & 0 deletions common/hstream/HStream/Common/ZookeeperClient.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
module HStream.Common.ZookeeperClient
( ZookeeperClient
, withZookeeperClient
, unsafeGetZHandle
) where

import Data.Word
import Foreign.C.Types
import Foreign.ForeignPtr
import Foreign.Ptr
import Unsafe.Coerce (unsafeCoerce)
import Z.Data.CBytes (CBytes, withCBytes)
import ZooKeeper.Types (ZHandle)

newtype ZookeeperClient = ZookeeperClient (Ptr CZookeeperClient)

withZookeeperClient :: CBytes -> CInt -> (ZookeeperClient -> IO a) -> IO a
withZookeeperClient quorum timeout f = do
fp <- newZookeeperClient quorum timeout
withForeignPtr fp $ f . ZookeeperClient

newZookeeperClient :: CBytes -> CInt -> IO (ForeignPtr CZookeeperClient)
newZookeeperClient quorum timeout = do
withCBytes quorum $ \quorum' -> do
client <- new_zookeeper_client quorum' timeout
newForeignPtr delete_zookeeper_client client

-- It's safe to use unsafeGetZHandle in hstream since we donot free the
-- ZookeeperClient in the server lifetime.
unsafeGetZHandle :: ZookeeperClient -> IO ZHandle
unsafeGetZHandle (ZookeeperClient ptr) =
-- TODO: let zoovisitor exports the ZHandle constructor
--
-- It's safe to use unsafeCoerce here because the ZHandle is a newtype.
unsafeCoerce <$> get_underlying_handle ptr

data CZookeeperClient

foreign import ccall safe "new_zookeeper_client"
new_zookeeper_client :: Ptr Word8 -> CInt -> IO (Ptr CZookeeperClient)

foreign import ccall unsafe "&delete_zookeeper_client"
delete_zookeeper_client :: FunPtr (Ptr CZookeeperClient -> IO ())

foreign import ccall unsafe "get_underlying_handle"
get_underlying_handle :: Ptr CZookeeperClient -> IO (Ptr ())
86 changes: 49 additions & 37 deletions common/hstream/HStream/Common/ZookeeperSlotAlloc.hs
Original file line number Diff line number Diff line change
Expand Up @@ -14,29 +14,31 @@ module HStream.Common.ZookeeperSlotAlloc
, getSlotValueByName
) where

import Control.Exception (throwIO, try)
import Control.Exception (throwIO, try)
import Control.Monad
import qualified Data.ByteString.Lazy as BSL
import qualified Data.Map.Strict as Map
import Data.Maybe (isJust)
import Data.Text (Text)
import Data.Word (Word64)
import GHC.Stack (HasCallStack)
import qualified Proto3.Suite as PB
import qualified Z.Data.Builder as ZB
import qualified Z.Data.CBytes as CBytes
import Z.Data.CBytes (CBytes)
import qualified Z.Data.Parser as ZP
import qualified Z.Data.Vector as ZV
import qualified Z.Foreign as Z
import qualified ZooKeeper as ZK
import qualified ZooKeeper.Exception as ZK
import qualified ZooKeeper.Recipe as ZK
import qualified ZooKeeper.Types as ZK

import qualified HStream.Common.ProtoTypes as HT
import qualified HStream.Exception as HE
import qualified HStream.Logger as Log
import qualified Data.ByteString.Lazy as BSL
import qualified Data.Map.Strict as Map
import Data.Maybe (isJust)
import Data.Text (Text)
import Data.Word (Word64)
import GHC.Stack (HasCallStack)
import qualified Proto3.Suite as PB
import qualified Z.Data.Builder as ZB
import qualified Z.Data.CBytes as CBytes
import Z.Data.CBytes (CBytes)
import qualified Z.Data.Parser as ZP
import qualified Z.Data.Vector as ZV
import qualified Z.Foreign as Z
import qualified ZooKeeper as ZK
import qualified ZooKeeper.Exception as ZK
import qualified ZooKeeper.Recipe as ZK
import qualified ZooKeeper.Types as ZK

import qualified HStream.Common.ProtoTypes as HT
import HStream.Common.ZookeeperClient (ZookeeperClient,
unsafeGetZHandle)
import qualified HStream.Exception as HE
import qualified HStream.Logger as Log

-------------------------------------------------------------------------------

Expand All @@ -47,7 +49,7 @@ type SlotAttrs = Map.Map Text Text
data SlotConfig = SlotConfig
{ slotRoot :: {-# UNPACK #-} !CBytes
-- ^ NOTE: the root path should NOT have a trailing '/'
, slotZkHandler :: {-# UNPACK #-} !ZK.ZHandle
, slotZkHandler :: {-# UNPACK #-} !ZookeeperClient
, slotOffset :: {-# UNPACK #-} !Word64
, slotMaxCapbility :: {-# UNPACK #-} !Word64
}
Expand All @@ -70,13 +72,16 @@ data SlotConfig = SlotConfig
-- - ...
initSlot :: SlotConfig -> IO ()
initSlot SlotConfig{..} = do
void $ ZK.zooCreateIfMissing slotZkHandler slotRoot Nothing ZK.zooOpenAclUnsafe ZK.ZooPersistent
void $ ZK.zooCreateIfMissing slotZkHandler (slotRoot <> "/free") (Just $ encodeSlotValue 0) ZK.zooOpenAclUnsafe ZK.ZooPersistent
void $ ZK.zooCreateIfMissing slotZkHandler (slotRoot <> "/table") Nothing ZK.zooOpenAclUnsafe ZK.ZooPersistent
void $ ZK.zooCreateIfMissing slotZkHandler (slotRoot <> "/name") Nothing ZK.zooOpenAclUnsafe ZK.ZooPersistent
zh <- unsafeGetZHandle slotZkHandler
void $ ZK.zooCreateIfMissing zh slotRoot Nothing ZK.zooOpenAclUnsafe ZK.ZooPersistent
void $ ZK.zooCreateIfMissing zh (slotRoot <> "/free") (Just $ encodeSlotValue 0) ZK.zooOpenAclUnsafe ZK.ZooPersistent
void $ ZK.zooCreateIfMissing zh (slotRoot <> "/table") Nothing ZK.zooOpenAclUnsafe ZK.ZooPersistent
void $ ZK.zooCreateIfMissing zh (slotRoot <> "/name") Nothing ZK.zooOpenAclUnsafe ZK.ZooPersistent

clearSlot :: SlotConfig -> IO ()
clearSlot SlotConfig{..} = ZK.zooDeleteAll slotZkHandler slotRoot
clearSlot SlotConfig{..} = do
zh <- unsafeGetZHandle slotZkHandler
ZK.zooDeleteAll zh slotRoot

allocateSlot
:: HasCallStack
Expand All @@ -99,7 +104,8 @@ allocateSlot c@SlotConfig{..} name attrs valAttrs = withLock c $ do
let op1 = ZK.zooCreateOpInit name_path (Just $ encodeSlot slot) 0 ZK.zooOpenAclUnsafe ZK.ZooPersistent
op2 = ZK.zooSetOpInit free_path (Just $ encodeSlotValue free_slot') Nothing

results <- ZK.zooMulti slotZkHandler [op1, op2]
zh <- unsafeGetZHandle slotZkHandler
results <- ZK.zooMulti zh [op1, op2]
-- The length of the results should be 2
ZK.assertZooOpResultOK $ results !! 0
ZK.assertZooOpResultOK $ results !! 1
Expand Down Expand Up @@ -129,7 +135,8 @@ deallocateSlot c@SlotConfig{..} name = withLock c $ do
(free_slot_val', table_set_ops) <- deallocate free_slot_val vals []
let op2 = ZK.zooSetOpInit free_slot_node (Just $ encodeSlotValue free_slot_val') Nothing
op3 = ZK.zooDeleteOpInit name_node Nothing
results <- ZK.zooMulti slotZkHandler (table_set_ops ++ [op2, op3])
zh <- unsafeGetZHandle slotZkHandler
results <- ZK.zooMulti zh (table_set_ops ++ [op2, op3])
forM_ results ZK.assertZooOpResultOK
pure vals
where
Expand All @@ -143,7 +150,8 @@ deallocateSlot c@SlotConfig{..} name = withLock c $ do
doesSlotExist :: SlotConfig -> CBytes -> IO Bool
doesSlotExist SlotConfig{..} name = do
let path = slotRoot <> "/name/" <> name
isJust <$> ZK.zooExists slotZkHandler path
zh <- unsafeGetZHandle slotZkHandler
isJust <$> ZK.zooExists zh path

doesSlotValueExist :: SlotConfig -> CBytes -> SlotValue -> IO Bool
doesSlotValueExist c name value = do
Expand Down Expand Up @@ -177,10 +185,11 @@ getTable SlotConfig{..} idx = do

let node = slotRoot <> "/table/" <> CBytes.buildCBytes (ZB.int idx)
errmsg = "getTable failed: " <> show node
e <- try $ ZK.dataCompletionValue <$> ZK.zooGet slotZkHandler node
zh <- unsafeGetZHandle slotZkHandler
e <- try $ ZK.dataCompletionValue <$> ZK.zooGet zh node
case e of
Left (_ :: ZK.ZNONODE) -> do
void $ ZK.zooCreate slotZkHandler node (Just $ encodeSlotValue $ idx + 1) ZK.zooOpenAclUnsafe ZK.ZooPersistent
void $ ZK.zooCreate zh node (Just $ encodeSlotValue $ idx + 1) ZK.zooOpenAclUnsafe ZK.ZooPersistent
return (idx + 1)
Right (Just a) -> decodeSlotValueThrow a
Right Nothing -> throwIO $ HE.SlotAllocDecodeError errmsg
Expand All @@ -189,15 +198,17 @@ getTable SlotConfig{..} idx = do

slotValueGet :: HasCallStack => SlotConfig -> CBytes -> String -> IO SlotValue
slotValueGet SlotConfig{..} path errmsg = do
m_bs <- ZK.dataCompletionValue <$> ZK.zooGet slotZkHandler path
zh <- unsafeGetZHandle slotZkHandler
m_bs <- ZK.dataCompletionValue <$> ZK.zooGet zh path
case m_bs of
Just bs -> decodeSlotValueThrow bs
Nothing -> throwIO $ HE.SlotAllocDecodeError errmsg
{-# INLINABLE slotValueGet #-}

slotGet :: HasCallStack => SlotConfig -> CBytes -> String -> IO HT.Slot
slotGet SlotConfig{..} path errmsg = do
m_bs <- ZK.dataCompletionValue <$> ZK.zooGet slotZkHandler path
zh <- unsafeGetZHandle slotZkHandler
m_bs <- ZK.dataCompletionValue <$> ZK.zooGet zh path
case m_bs of
Just bs -> decodeSlotThrow bs
Nothing -> throwIO $ HE.SlotAllocDecodeError errmsg
Expand Down Expand Up @@ -226,6 +237,7 @@ decodeSlotValueThrow b =

withLock :: SlotConfig -> IO a -> IO a
withLock SlotConfig{..} action = do
clientid <- ZK.clientId <$> (ZK.peekClientId =<< ZK.zooClientID slotZkHandler)
zh <- unsafeGetZHandle slotZkHandler
clientid <- ZK.clientId <$> (ZK.peekClientId =<< ZK.zooClientID zh)
let clientid' = CBytes.buildCBytes $ ZB.hex clientid
ZK.withLock slotZkHandler slotRoot clientid' action
ZK.withLock zh slotRoot clientid' action
61 changes: 34 additions & 27 deletions common/hstream/HStream/MetaStore/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,9 @@ import qualified Z.Foreign as ZF
import qualified ZooKeeper as Z
import ZooKeeper.Exception (ZooException)
import qualified ZooKeeper.Types as Z
import ZooKeeper.Types (ZHandle)

import HStream.Common.ZookeeperClient (ZookeeperClient,
unsafeGetZHandle)
import qualified HStream.MetaStore.FileUtils as File
import HStream.MetaStore.RqliteUtils (ROp (..), transaction)
import qualified HStream.MetaStore.RqliteUtils as RQ
Expand All @@ -50,15 +51,14 @@ type Version = Int
type MetaType value handle = (MetaStore value handle, HasPath value handle)
type FHandle = FilePath
data RHandle = RHandle Manager Url

data MetaHandle
= ZkHandle ZHandle
= ZKHandle ZookeeperClient -- ^ Zookeeper handle with auto reconnection.
| RLHandle RHandle
| FileHandle FHandle
-- TODO
-- | LocalHandle FHandle

instance Show MetaHandle where
show (ZkHandle _) = "Zookeeper Handle"
show (ZKHandle _) = "Zookeeper Handle"
show (RLHandle _) = "RQLite Handle"
show (FileHandle _) = "LocalFile Handle"

Expand Down Expand Up @@ -105,45 +105,49 @@ class MetaStore value handle where
class MetaMulti handle where
metaMulti :: [MetaOp] -> handle -> IO ()

instance MetaStore value ZHandle where
myPath mid = myRootPath @value @ZHandle <> "/" <> mid
insertMeta mid x zk = RETHROW(createInsertZK zk (myPath @value @ZHandle mid) x ,ZHandle)
updateMeta mid x mv zk = RETHROW(setZkData zk (myPath @value @ZHandle mid) x mv,ZHandle)
upsertMeta mid x zk = RETHROW(upsertZkData zk (myPath @value @ZHandle mid) x ,ZHandle)
deleteMeta mid mv zk = RETHROW(deleteZkPath zk (myPath @value @ZHandle mid) mv ,ZHandle)
deleteAllMeta zk = RETHROW(deleteZkChildren zk (myRootPath @value @ZHandle) ,ZHandle)
instance MetaStore value ZookeeperClient where
myPath mid = myRootPath @value @ZookeeperClient <> "/" <> mid
insertMeta mid x zk = RETHROW(do zk' <- unsafeGetZHandle zk; createInsertZK zk' (myPath @value @ZookeeperClient mid) x ,ZookeeperClient)
updateMeta mid x mv zk = RETHROW(do zk' <- unsafeGetZHandle zk; setZkData zk' (myPath @value @ZookeeperClient mid) x mv,ZookeeperClient)
upsertMeta mid x zk = RETHROW(do zk' <- unsafeGetZHandle zk; upsertZkData zk' (myPath @value @ZookeeperClient mid) x ,ZookeeperClient)
deleteMeta mid mv zk = RETHROW(do zk' <- unsafeGetZHandle zk; deleteZkPath zk' (myPath @value @ZookeeperClient mid) mv ,ZookeeperClient)
deleteAllMeta zk = RETHROW(do zk' <- unsafeGetZHandle zk; deleteZkChildren zk' (myRootPath @value @ZookeeperClient) ,ZookeeperClient)
where
mid = "some of the meta when deleting"

checkMetaExists mid zk = RETHROW(isJust <$> Z.zooExists zk (textToCBytes (myPath @value @ZHandle mid)),ZHandle)
getMeta mid zk = RETHROW(decodeZNodeValue zk (myPath @value @ZHandle mid),ZHandle)
getMetaWithVer mid zk = RETHROW(action,ZHandle)
checkMetaExists mid zk = RETHROW(do zk' <- unsafeGetZHandle zk; isJust <$> Z.zooExists zk' (textToCBytes (myPath @value @ZookeeperClient mid)),ZookeeperClient)
getMeta mid zk = RETHROW(do zk' <- unsafeGetZHandle zk; decodeZNodeValue zk' (myPath @value @ZookeeperClient mid),ZookeeperClient)
getMetaWithVer mid zkclient = RETHROW(action,ZookeeperClient)
where
action = do
e_a <- try $ Z.zooGet zk (textToCBytes $ myPath @value @ZHandle mid)
zk <- unsafeGetZHandle zkclient
e_a <- try $ Z.zooGet zk (textToCBytes $ myPath @value @ZookeeperClient mid)
case e_a of
Left (_ :: ZooException) -> return Nothing
Right a -> return $ (, fromIntegral . Z.statVersion . Z.dataCompletionStat $ a) <$> decodeDataCompletion a

getAllMeta zk = RETHROW(action,ZHandle)
getAllMeta zkclient = RETHROW(action,ZookeeperClient)
where
mid = "some of the meta when getting "
action = do
let path = textToCBytes $ myRootPath @value @ZHandle
let path = textToCBytes $ myRootPath @value @ZookeeperClient
zk <- unsafeGetZHandle zkclient
ids <- Z.unStrVec . Z.strsCompletionValues <$> Z.zooGetChildren zk path
idAndValues <- catMaybes <$> mapM (\x -> let x' = cBytesToText x in getMeta @value x' zk <&> fmap (x',)) ids
idAndValues <- catMaybes <$> mapM (\x -> let x' = cBytesToText x in getMeta @value x' zkclient <&> fmap (x',)) ids
pure $ Map.fromList idAndValues
listMeta zk = RETHROW(action,ZHandle)
listMeta zkclient = RETHROW(action,ZookeeperClient)
where
mid = "some of the meta when listing"
action = do
let path = textToCBytes $ myRootPath @value @ZHandle
let path = textToCBytes $ myRootPath @value @ZookeeperClient
zk <- unsafeGetZHandle zkclient
ids <- Z.unStrVec . Z.strsCompletionValues <$> Z.zooGetChildren zk path
catMaybes <$> mapM (flip (getMeta @value) zk . cBytesToText) ids
catMaybes <$> mapM (flip (getMeta @value) zkclient . cBytesToText) ids

instance MetaMulti ZHandle where
metaMulti ops zk = do
instance MetaMulti ZookeeperClient where
metaMulti ops zkclient = do
let zOps = map opToZ ops
zk <- unsafeGetZHandle zkclient
void $ Z.zooMulti zk zOps
where
opToZ op = case op of
Expand Down Expand Up @@ -209,12 +213,15 @@ instance MetaMulti FHandle where
DeleteOp p k mv -> File.DeleteOp p k mv
CheckOp p k v -> File.CheckOp p k v

instance (ToJSON a, FromJSON a, HasPath a ZHandle, HasPath a RHandle, HasPath a FHandle, Show a) => HasPath a MetaHandle
instance (ToJSON a, FromJSON a, HasPath a ZookeeperClient, HasPath a RHandle, HasPath a FHandle, Show a) => HasPath a MetaHandle

#define USE_WHICH_HANDLE(handle, action) \
case handle of ZkHandle zk -> action zk; RLHandle rq -> action rq; FileHandle io -> action io;
case handle of \
ZKHandle zk -> action zk; \
RLHandle rq -> action rq; \
FileHandle io -> action io;

instance (HasPath value ZHandle, HasPath value RHandle, HasPath value FHandle) => MetaStore value MetaHandle where
instance (HasPath value ZookeeperClient, HasPath value RHandle, HasPath value FHandle) => MetaStore value MetaHandle where
myPath = undefined
listMeta h = USE_WHICH_HANDLE(h, listMeta @value)
insertMeta mid x h = USE_WHICH_HANDLE(h, insertMeta mid x)
Expand Down
33 changes: 33 additions & 0 deletions common/hstream/cbits/hs_zookeeper_client.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
#include <logdevice/common/ZookeeperClient.h>
#include <zookeeper/zookeeper.h>

// We directly use the ZookeeperClient class from LogDevice, it handles
// SessionExpire that we need to handle.
//
// TODO:
//
// 1. Currently logging messages are printed by ld_info, which is controlled by
// `--store-log-level`. We need to find a way to use the hstream logger.

extern "C" {

typedef struct zookeeper_client_t {
std::unique_ptr<facebook::logdevice::ZookeeperClient> rep;
} zookeeper_client_t;

zookeeper_client_t* new_zookeeper_client(const char* quorum,
int session_timeout) {
zookeeper_client_t* client = new zookeeper_client_t();
client->rep = std::make_unique<facebook::logdevice::ZookeeperClient>(
std::string(quorum), std::chrono::milliseconds(session_timeout));
return client;
}

zhandle_t* get_underlying_handle(zookeeper_client_t* client) {
return client->rep->getHandle().get();
}

void delete_zookeeper_client(zookeeper_client_t* client) { delete client; }

// end extern "C"
}
2 changes: 2 additions & 0 deletions common/hstream/hstream-common.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ library
HStream.Common.GrpcHaskell
HStream.Common.Query
HStream.Common.Types
HStream.Common.ZookeeperClient
HStream.Common.ZookeeperSlotAlloc
HStream.Exception
HStream.Instances
Expand Down Expand Up @@ -86,6 +87,7 @@ library

cxx-sources:
cbits/hash.cpp
cbits/hs_zookeeper_client.cpp
cbits/query/tables/AdminCommandTable.cpp
cbits/query.cpp

Expand Down
Loading

0 comments on commit 29eac3c

Please sign in to comment.