From c0f005328fb83706f3cef89fe553a7717685034c Mon Sep 17 00:00:00 2001 From: mu <59917266+4eUeP@users.noreply.github.com> Date: Thu, 28 Sep 2023 10:50:17 +0800 Subject: [PATCH] hstream-store: add ckpStoreGetAllCheckpoints (#1623) * get all checkpoints from checkpointStore * more type safe --- common/base/HStream/Foreign.hs | 115 ++++++++++-------- common/base/cbits/hs_struct.cpp | 34 +++--- common/hstream/HStream/Common/Query.hs | 2 +- common/hstream/cbits/hs_struct.cpp | 59 --------- .../HStream/Store/Internal/Foreign.hs | 74 +++++++++-- .../HStream/Store/Internal/LogDevice.hs | 9 +- .../Store/Internal/LogDevice/Checkpoint.hs | 41 +++++-- .../Store/Internal/LogDevice/LDQuery.hs | 57 +++++---- .../Store/Internal/LogDevice/LogAttributes.hs | 3 +- .../Internal/LogDevice/LogConfigTypes.hs | 83 +++++++------ .../Store/Internal/LogDevice/Reader.hs | 20 +-- .../LogDevice/VersionedConfigStore.hs | 10 +- hstream-store/HStream/Store/Stream.hs | 2 + .../cbits/logdevice/hs_checkpoint.cpp | 31 +++++ hstream-store/cbits/utils.cpp | 3 +- hstream-store/hstream-store.cabal | 2 +- .../test/HStream/Store/CheckpointStoreSpec.hs | 9 +- 17 files changed, 314 insertions(+), 240 deletions(-) delete mode 100644 common/hstream/cbits/hs_struct.cpp diff --git a/common/base/HStream/Foreign.hs b/common/base/HStream/Foreign.hs index b041ae056..7919eaaf2 100644 --- a/common/base/HStream/Foreign.hs +++ b/common/base/HStream/Foreign.hs @@ -6,6 +6,7 @@ module HStream.Foreign ( PeekNFun + , DeleteFun , peekN , BA# (..) , MBA# (..) @@ -24,8 +25,12 @@ module HStream.Foreign -- * List , StdVector , FollySmallVector - , peekFollySmallVectorDoubleN + , peekStdVectorWord64 + , peekStdVectorWord64Off + , peekStdVectorWord64N , peekFollySmallVectorDouble + , peekFollySmallVectorDoubleOff + , peekFollySmallVectorDoubleN -- * Map , PeekMapFun @@ -36,10 +41,11 @@ module HStream.Foreign -- * Misc , c_delete_string , c_delete_vector_of_string - , c_delete_vector_of_int + , c_delete_vector_of_cint , c_delete_vector_of_int64 + , c_delete_vector_of_uint64 , c_delete_std_vec_of_folly_small_vec_of_double - , cal_offset_std_string + , c_cal_offset_std_string , bool2cbool ) where @@ -49,12 +55,12 @@ import Data.Int (Int64) import Data.Map.Strict (Map) import qualified Data.Map.Strict as Map import Data.Primitive +import Data.Word import Foreign.C.Types import Foreign.ForeignPtr import Foreign.Ptr import Foreign.Storable import GHC.Exts -import GHC.Prim import qualified Z.Data.Array as Z import qualified Z.Data.CBytes as CBytes import Z.Data.CBytes (CBytes) @@ -77,6 +83,20 @@ newtype MBA# a = MBA# (MutableByteArray# RealWorld) -- TODO: ghc-9.4+ deprecates ArrayArray#, consider using Array# instead newtype BAArray# a = BAArray# ArrayArray# +------------------------------------------------------------------------------- + +#define HS_CPP_GET_SIZE(CFUN, HSOBJ) \ + foreign import ccall unsafe "get_size_##CFUN" \ + c_get_size_##CFUN :: Ptr HSOBJ -> IO Int + +#define HS_CPP_PEEK(CFUN, HSOBJ, VAL_TYPE) \ + foreign import ccall unsafe "peek_##CFUN" \ + c_peek_##CFUN :: Ptr HSOBJ -> Int -> Ptr VAL_TYPE -> IO () + +#define HS_CPP_CAL_OFFSET(CFUN, HSOBJ) \ + foreign import ccall unsafe "cal_offset_##CFUN" \ + c_cal_offset_##CFUN :: Ptr HSOBJ -> Int -> IO (Ptr HSOBJ) + ------------------------------------------------------------------------------- -- Optional @@ -115,7 +135,7 @@ peekStdStringToCBytesN len ptr peekStdStringToCBytesIdx :: Ptr Z.StdString -> Int -> IO CBytes peekStdStringToCBytesIdx p offset = do - ptr <- cal_offset_std_string p offset + ptr <- c_cal_offset_std_string p offset siz :: Int <- Z.hs_std_string_size ptr let !siz' = siz + 1 (mpa@(Z.MutablePrimArray mba#) :: Z.MutablePrimArray Z.RealWorld a) <- Z.newPrimArray siz' @@ -132,58 +152,58 @@ withStdStringUnsafe f = do else do str <- finally (peekStdStringToCBytesIdx ptr' 0) (c_delete_string ptr') pure (str, ret) +HS_CPP_CAL_OFFSET(std_string, StdString) + ------------------------------------------------------------------------------- data StdVector a - data FollySmallVector a -peekFollySmallVectorDoubleN :: Int -> Ptr (FollySmallVector Double) -> IO [[Double]] -peekFollySmallVectorDoubleN len ptr - | len <= 0 || ptr == nullPtr = return [] - | otherwise = forM [0..len-1] (peekFollySmallVectorDouble ptr) - -- TODO: use Vector or Array as returned value, so that we optimise(remove) the -- "peekN" function to copy the memory twice. -peekFollySmallVectorDouble :: Ptr (FollySmallVector Double) -> Int -> IO [Double] -peekFollySmallVectorDouble ptr offset = do - ptr' <- c_cal_offset_vec_of_folly_small_vec_of_double ptr offset - size <- c_get_size_folly_small_vec_of_double ptr' - fp <- mallocForeignPtrBytes size - withForeignPtr fp $ \data' -> do - c_peek_folly_small_vec_of_double ptr' size data' - peekN size data' - -#define HS_CPP_VEC_SIZE(CFUN, HSOBJ) \ - foreign import ccall unsafe "hs_cpp_lib.h CFUN" \ - c_##CFUN :: Ptr HSOBJ -> IO Int - -#define HS_CPP_VEC_PEEK(CFUN, HSOBJ, VAL_TYPE) \ - foreign import ccall unsafe "hs_cpp_lib.h CFUN" \ - c_##CFUN :: Ptr HSOBJ -> Int -> Ptr VAL_TYPE -> IO () - -#define HS_CPP_VEC_OFFSET(CFUN, HSOBJ) \ - foreign import ccall unsafe "hs_cpp_lib.h CFUN" \ - c_##CFUN :: Ptr HSOBJ -> Int -> IO (Ptr HSOBJ) - -HS_CPP_VEC_SIZE(get_size_folly_small_vec_of_double, (FollySmallVector Double)) -HS_CPP_VEC_PEEK(peek_folly_small_vec_of_double, (FollySmallVector Double), Double) -HS_CPP_VEC_OFFSET(cal_offset_vec_of_folly_small_vec_of_double, (FollySmallVector Double)) +#define HS_PEEK(ty, a, cfun) \ + peek##ty##a##Off :: Ptr (ty a) -> Int -> IO [a]; \ + peek##ty##a##Off ptr offset = (do \ + ptr' <- c_cal_offset_##cfun ptr offset; \ + size <- c_get_size_##cfun ptr'; \ + fp <- mallocForeignPtrBytes size; \ + withForeignPtr fp (\data' -> (do c_peek_##cfun ptr' size data'; \ + peekN size data')) ); \ + \ + peek##ty##a :: Ptr (ty a) -> IO [a]; \ + peek##ty##a ptr = peek##ty##a##Off ptr 0; \ + \ + peek##ty##a##N :: Int -> Ptr (ty a) -> IO [[a]]; \ + peek##ty##a##N len ptr \ + | len <= 0 || ptr == nullPtr = return [] \ + | otherwise = forM [0..len-1] (peek##ty##a##Off ptr); + +HS_PEEK(StdVector, Word64, vec_of_uint64) +HS_PEEK(FollySmallVector, Double, folly_small_vec_of_double) + +HS_CPP_GET_SIZE(vec_of_uint64, (StdVector Word64)) +HS_CPP_GET_SIZE(folly_small_vec_of_double, (FollySmallVector Double)) + +HS_CPP_CAL_OFFSET(vec_of_uint64, (StdVector Word64)) +HS_CPP_CAL_OFFSET(folly_small_vec_of_double, (FollySmallVector Double)) + +HS_CPP_PEEK(vec_of_uint64, (StdVector Word64), Word64) +HS_CPP_PEEK(folly_small_vec_of_double, (FollySmallVector Double), Double) ------------------------------------------------------------------------------- -type PeekMapFun a dk dv ck cv +type PeekMapFun a pk pv dk dv = MBA# Int -- ^ returned map size + -> MBA# (Ptr pk) -> MBA# (Ptr pv) -> MBA# (Ptr dk) -> MBA# (Ptr dv) - -> MBA# (Ptr ck) -> MBA# (Ptr cv) -> IO a peekCppMap - :: forall a dk dv ck cv k v. Ord k - => PeekMapFun a dk dv ck cv - -> PeekNFun dk k -> DeleteFun ck - -> PeekNFun dv v -> DeleteFun cv + :: forall a pk pv dk dv k v. Ord k + => PeekMapFun a pk pv dk dv + -> PeekNFun pk k -> DeleteFun dk + -> PeekNFun pv v -> DeleteFun dv -> IO (a, Map.Map k v) peekCppMap f peekKey delKey peekVal delVal = do (len, (keys_ptr, (values_ptr, (keys_vec, (values_vec, ret))))) <- @@ -194,10 +214,10 @@ peekCppMap f peekKey delKey peekVal delVal = do Z.withPrimUnsafe nullPtr $ \values_vec -> f (MBA# len) (MBA# keys) (MBA# values) (MBA# keys_vec) (MBA# values_vec) finally - (buildExtras ret len keys_ptr values_ptr) + (buildMap ret len keys_ptr values_ptr) (delKey keys_vec <> delVal values_vec) where - buildExtras ret len keys_ptr values_ptr = do + buildMap ret len keys_ptr values_ptr = do keys <- peekKey len keys_ptr values <- peekVal len values_ptr return (ret, Map.fromList $ zip keys values) @@ -229,16 +249,11 @@ withPrimListPairUnsafe pairs f = do HS_CPP_DELETE(delete_string, StdString) HS_CPP_DELETE(delete_vector_of_string, (StdVector StdString)) -HS_CPP_DELETE(delete_vector_of_int, (StdVector CInt)) +HS_CPP_DELETE(delete_vector_of_cint, (StdVector CInt)) HS_CPP_DELETE(delete_vector_of_int64, (StdVector Int64)) +HS_CPP_DELETE(delete_vector_of_uint64, (StdVector Word64)) HS_CPP_DELETE(delete_std_vec_of_folly_small_vec_of_double, (StdVector (FollySmallVector Double))) -#define HS_CPP_CAL_OFFSET(CFUN, HSOBJ) \ - foreign import ccall unsafe "hs_cpp_lib.h CFUN" \ - CFUN :: Ptr HSOBJ -> Int -> IO (Ptr HSOBJ) - -HS_CPP_CAL_OFFSET(cal_offset_std_string, StdString) - bool2cbool :: Bool -> CBool bool2cbool True = 1 bool2cbool False = 0 diff --git a/common/base/cbits/hs_struct.cpp b/common/base/cbits/hs_struct.cpp index e24e1ef4a..09afb8c73 100644 --- a/common/base/cbits/hs_struct.cpp +++ b/common/base/cbits/hs_struct.cpp @@ -16,13 +16,16 @@ std::string* copy_std_string(std::string&& str) { return current + offset; \ } -#define VECTOR_SIZE(NAME, VEC_TYPE) \ - HsInt get_size_##NAME(const VEC_TYPE* vec) { return vec->size(); } +#define GET_SIZE(NAME, VAL_TYPE) \ + HsInt get_size_##NAME(const VAL_TYPE* v) { return v->size(); } + +#define DEL_FUNCTION(NAME, TYPE) \ + void delete_##NAME(TYPE* p) { delete p; } #define PEEK_VECTOR(NAME, VEC_TYPE, VAL_TYPE) \ - void peek_##NAME(const VEC_TYPE* vec, HsInt len, VAL_TYPE* vals) { \ - assert(("peek_##NAME: size mismatch!", len == vec->size())); \ - for (int i = 0; i < len; i++) { \ + void peek_##NAME(const VEC_TYPE* vec, HsInt vals_len, VAL_TYPE* vals) { \ + assert(("peek_##NAME: size mismatch!", vals_len == vec->size())); \ + for (int i = 0; i < vals_len; i++) { \ (vals)[i] = (*vec)[i]; \ } \ } @@ -32,25 +35,24 @@ extern "C" { // Unfortunately, there is no generic in c -CAL_OFFSET(std_string, std::string); -CAL_OFFSET(vec_of_folly_small_vec_of_double, - folly::small_vector); +GET_SIZE(vec_of_double, std::vector); +GET_SIZE(vec_of_uint64, std::vector); +GET_SIZE(folly_small_vec_of_double, folly::small_vector); -VECTOR_SIZE(vec_of_double, std::vector); -VECTOR_SIZE(folly_small_vec_of_double, folly::small_vector); +// Ptr a -> Int -> Ptr a +CAL_OFFSET(std_string, std::string); +CAL_OFFSET(vec_of_uint64, std::vector); +CAL_OFFSET(folly_small_vec_of_double, folly::small_vector); +PEEK_VECTOR(vec_of_uint64, std::vector, uint64_t) PEEK_VECTOR(folly_small_vec_of_double, folly::small_vector, double); -// ---------------------------------------------------------------------------- - -#define DEL_FUNCTION(NAME, TYPE) \ - void delete_##NAME(TYPE* p) { delete p; } - -DEL_FUNCTION(vector_of_int, std::vector); +DEL_FUNCTION(vector_of_cint, std::vector); DEL_FUNCTION(string, std::string); DEL_FUNCTION(vector_of_string, std::vector); DEL_FUNCTION(vector_of_int64, std::vector); +DEL_FUNCTION(vector_of_uint64, std::vector); DEL_FUNCTION(std_vec_of_folly_small_vec_of_double, std::vector>); diff --git a/common/hstream/HStream/Common/Query.hs b/common/hstream/HStream/Common/Query.hs index 4f41d6115..c4a0389f9 100644 --- a/common/hstream/HStream/Common/Query.hs +++ b/common/hstream/HStream/Common/Query.hs @@ -229,7 +229,7 @@ queryResultMetadata results idx = withForeignPtr results $ \results_ptr -> do (MBA# reason_val') (MBA# reason_del') failures <- if len >= 1 then do keys <- finally (peekN (fromIntegral len) key_val) - (c_delete_vector_of_int key_del) + (c_delete_vector_of_cint key_del) addrs <- finally (peekStdStringToCBytesN (fromIntegral len) addr_val) (c_delete_vector_of_string addr_del) reasons <- finally (peekStdStringToCBytesN (fromIntegral len) reason_val) diff --git a/common/hstream/cbits/hs_struct.cpp b/common/hstream/cbits/hs_struct.cpp deleted file mode 100644 index e24e1ef4a..000000000 --- a/common/hstream/cbits/hs_struct.cpp +++ /dev/null @@ -1,59 +0,0 @@ -#include "hs_cpp_lib.h" - -// ---------------------------------------------------------------------------- -// StdString - -std::string* copy_std_string(std::string&& str) { - auto value = new std::string; - *value = str; - return value; -} - -// ---------------------------------------------------------------------------- - -#define CAL_OFFSET(NAME, VAL_TYPE) \ - VAL_TYPE* cal_offset_##NAME(VAL_TYPE* current, HsInt offset) { \ - return current + offset; \ - } - -#define VECTOR_SIZE(NAME, VEC_TYPE) \ - HsInt get_size_##NAME(const VEC_TYPE* vec) { return vec->size(); } - -#define PEEK_VECTOR(NAME, VEC_TYPE, VAL_TYPE) \ - void peek_##NAME(const VEC_TYPE* vec, HsInt len, VAL_TYPE* vals) { \ - assert(("peek_##NAME: size mismatch!", len == vec->size())); \ - for (int i = 0; i < len; i++) { \ - (vals)[i] = (*vec)[i]; \ - } \ - } - -extern "C" { -// ---------------------------------------------------------------------------- - -// Unfortunately, there is no generic in c - -CAL_OFFSET(std_string, std::string); -CAL_OFFSET(vec_of_folly_small_vec_of_double, - folly::small_vector); - -VECTOR_SIZE(vec_of_double, std::vector); -VECTOR_SIZE(folly_small_vec_of_double, folly::small_vector); - -PEEK_VECTOR(folly_small_vec_of_double, folly::small_vector, - double); - -// ---------------------------------------------------------------------------- - -#define DEL_FUNCTION(NAME, TYPE) \ - void delete_##NAME(TYPE* p) { delete p; } - -DEL_FUNCTION(vector_of_int, std::vector); -DEL_FUNCTION(string, std::string); -DEL_FUNCTION(vector_of_string, std::vector); -DEL_FUNCTION(vector_of_int64, std::vector); -DEL_FUNCTION(std_vec_of_folly_small_vec_of_double, - std::vector>); - -// ---------------------------------------------------------------------------- -// Extern End -} diff --git a/hstream-store/HStream/Store/Internal/Foreign.hs b/hstream-store/HStream/Store/Internal/Foreign.hs index 6d393cf8b..55b6292b4 100644 --- a/hstream-store/HStream/Store/Internal/Foreign.hs +++ b/hstream-store/HStream/Store/Internal/Foreign.hs @@ -7,20 +7,20 @@ module HStream.Store.Internal.Foreign where import Control.Concurrent (newEmptyMVar, takeMVar) -import Control.Exception (mask_, onException) +import Control.Exception (finally, mask_, onException) import Control.Monad.Primitive import Data.Primitive import Foreign.C import Foreign.ForeignPtr +import Foreign.Ptr import Foreign.StablePtr import GHC.Conc import GHC.Exts import GHC.Stack import Z.Data.CBytes (CBytes) import qualified Z.Foreign as Z -import Z.Foreign (BA#, MBA#) -import HStream.Foreign hiding (BA#, MBA#) +import HStream.Foreign import qualified HStream.Logger as Log import qualified HStream.Store.Exception as E import HStream.Store.Internal.Types @@ -30,9 +30,9 @@ cbool2bool = (/= 0) {-# INLINE cbool2bool #-} unsafeFreezeBA# :: MBA# a -> BA# a -unsafeFreezeBA# mba# = +unsafeFreezeBA# (MBA# mba#) = case unsafeFreezeByteArray# mba# realWorld# of - (# _, ba# #) -> ba# + (# _, ba# #) -> BA# ba# -- Actually, these unsafe functions can be used for both unsafe & safe ffi(?). withAsyncPrimUnsafe @@ -105,6 +105,62 @@ withAsyncPrimUnsafe3' a b c f g = mask_ $ do return e return (a_, b_, c_, e_) +-- Similar to HStream.Foreign.PeekMapFun +-- +-- TODO: Use HStream.Foreign.PeekMapFun instead +type MapFun a pk pv dk dv + = MBA# Int + -- ^ returned map size + -> MBA# (Ptr pk) -> MBA# (Ptr pv) + -- ^ pointer to peek + -> MBA# (Ptr dk) -> MBA# (Ptr dv) + -- ^ pointer to delete + -> IO a + +withAsyncPrimMapUnsafe + :: (Prim p) + => p + -> PeekNFun pk k -> DeleteFun dk + -> PeekNFun pv v -> DeleteFun dv + -> (StablePtr PrimMVar -> Int -> MBA# p -> MapFun a pk pv dk dv) + -> IO (a, p, [(k, v)]) +withAsyncPrimMapUnsafe p peekk delk peekv delv f = + withAsyncPrimMapUnsafe' p peekk delk peekv delv f pure + +withAsyncPrimMapUnsafe' + :: (Prim p) + => p + -> PeekNFun pk k -> DeleteFun dk + -> PeekNFun pv v -> DeleteFun dv + -> (StablePtr PrimMVar -> Int -> MBA# p -> MapFun a pk pv dk dv) + -> (a -> IO b) + -> IO (b, p, [(k, v)]) +withAsyncPrimMapUnsafe' p peekk delk peekv delv f g = mask_ $ do + mvar <- newEmptyMVar + sp <- newStablePtrPrimMVar mvar + (p_, (len_, (keys_, (values_, (keys_vec_, (values_vec_, b_)))))) <- + withPrimSafe' p $ \p' -> + withPrimSafe' (0 :: Int) $ \len -> + withPrimSafe' nullPtr $ \keys -> + withPrimSafe' nullPtr $ \values -> + withPrimSafe' nullPtr $ \keys_vec -> + withPrimSafe' nullPtr $ \values_vec -> do + (cap, _) <- threadCapability =<< myThreadId + b <- g =<< f sp cap p' len keys values keys_vec values_vec + takeMVar mvar `onException` forkIO (do takeMVar mvar + primitive_ (touch# p') + primitive_ (touch# len) + primitive_ (touch# keys) + primitive_ (touch# values) + primitive_ (touch# keys_vec) + primitive_ (touch# values_vec)) + return b + finally + (do ret_keys <- peekk len_ keys_ + ret_values <- peekv len_ values_ + return (b_, p_, zip ret_keys ret_values)) + (delk keys_vec_ <> delv values_vec_) -- delete a nullptr is OK + withAsync :: HasCallStack => Int -> (Ptr a -> IO a) -> (StablePtr PrimMVar -> Int -> Ptr a -> IO ErrorCode) @@ -153,7 +209,7 @@ withPrimSafe' :: forall a b. Prim a => a -> (MBA# a -> IO b) -> IO (a, b) withPrimSafe' v f = do mpa@(MutablePrimArray mba#) <- newAlignedPinnedPrimArray 1 writePrimArray mpa 0 v - !b <- f mba# + !b <- f (MBA# mba#) !a <- readPrimArray mpa 0 return (a, b) {-# INLINE withPrimSafe' #-} @@ -167,12 +223,6 @@ peekVectorStringToCBytes ptr = do foreign import ccall unsafe "hs_logdevice.h hs_cal_std_string_off" hs_cal_std_string_off :: Ptr Z.StdString -> Int -> IO (Ptr Z.StdString) -foreign import ccall unsafe "hs_logdevice.h delete_vector_of_string" - delete_vector_of_string :: Ptr (StdVector Z.StdString) -> IO () - -foreign import ccall unsafe "hs_logdevice.h delete_vector_of_cint" - delete_vector_of_cint :: Ptr (StdVector CInt) -> IO () - foreign import ccall unsafe "hs_logdevice.h get_vector_of_string_size" get_vector_of_string_size :: Ptr (StdVector Z.StdString) -> IO Int diff --git a/hstream-store/HStream/Store/Internal/LogDevice.hs b/hstream-store/HStream/Store/Internal/LogDevice.hs index a7c642132..bc2f0217d 100644 --- a/hstream-store/HStream/Store/Internal/LogDevice.hs +++ b/hstream-store/HStream/Store/Internal/LogDevice.hs @@ -36,12 +36,10 @@ import qualified Z.Data.CBytes as CBytes import Z.Data.CBytes (CBytes) import Z.Data.Vector (Bytes) import qualified Z.Foreign as Z -import Z.Foreign (BA#, - MBA#) +import HStream.Foreign (BA# (..), + MBA# (..)) import qualified HStream.Store.Exception as E -import HStream.Store.Internal.Types - import HStream.Store.Internal.Foreign import HStream.Store.Internal.LogDevice.Checkpoint import HStream.Store.Internal.LogDevice.Configuration @@ -51,6 +49,7 @@ import HStream.Store.Internal.LogDevice.LogConfigTypes import HStream.Store.Internal.LogDevice.Reader import HStream.Store.Internal.LogDevice.VersionedConfigStore import HStream.Store.Internal.LogDevice.Writer +import HStream.Store.Internal.Types ------------------------------------------------------------------------------- -- Client @@ -260,7 +259,7 @@ findKey client logid key accuracy = withForeignPtr client $ \client' -> CBytes.withCBytesUnsafe key $ \key' -> do (errno, lo_lsn, hi_lsn, _) <- withAsyncPrimUnsafe3' (0 :: ErrorCode) LSN_INVALID LSN_INVALID - (ld_client_find_key client' logid key' $ unFindKeyAccuracy accuracy) E.throwSubmitIfNotOK + (ld_client_find_key client' logid (BA# key') $ unFindKeyAccuracy accuracy) E.throwSubmitIfNotOK void $ E.throwStreamErrorIfNotOK' errno return (lo_lsn, hi_lsn) diff --git a/hstream-store/HStream/Store/Internal/LogDevice/Checkpoint.hs b/hstream-store/HStream/Store/Internal/LogDevice/Checkpoint.hs index d5ebc7ae1..ab579d258 100644 --- a/hstream-store/HStream/Store/Internal/LogDevice/Checkpoint.hs +++ b/hstream-store/HStream/Store/Internal/LogDevice/Checkpoint.hs @@ -16,8 +16,8 @@ import GHC.Stack (HasCallStack) import qualified Z.Data.CBytes as ZC import Z.Data.CBytes (CBytes) import qualified Z.Foreign as Z -import Z.Foreign (BA#, MBA#) +import HStream.Foreign import qualified HStream.Store.Exception as E import qualified HStream.Store.Internal.Foreign as FFI import HStream.Store.Internal.Types @@ -33,7 +33,7 @@ import HStream.Store.Internal.Types newFileBasedCheckpointStore :: CBytes -> IO LDCheckpointStore newFileBasedCheckpointStore root_path = ZC.withCBytesUnsafe root_path $ \path' -> do - i <- c_new_file_based_checkpoint_store path' + i <- c_new_file_based_checkpoint_store (BA# path') newForeignPtr c_free_checkpoint_store_fun i newRSMBasedCheckpointStore @@ -62,10 +62,26 @@ ckpStoreGetLSN store customid logid = ZC.withCBytesUnsafe customid $ \customid' -> withForeignPtr store $ \store' -> do (errno, lsn, _) <- FFI.withAsyncPrimUnsafe2 (0 :: ErrorCode) LSN_INVALID $ - c_checkpoint_store_get_lsn store' customid' logid + c_checkpoint_store_get_lsn store' (BA# customid') logid _ <- E.throwStreamErrorIfNotOK' errno return lsn +ckpStoreGetAllCheckpoints' :: LDCheckpointStore -> CBytes -> IO [(C_LogID, LSN)] +ckpStoreGetAllCheckpoints' store customid = + ZC.withCBytesUnsafe customid $ \customid' -> + withForeignPtr store $ \store' -> do + (_, errno, ret) <- FFI.withAsyncPrimMapUnsafe + C_OK + peekN c_delete_vector_of_uint64 + peekN c_delete_vector_of_uint64 + (checkpoint_store_get_all_checkpoints store' (BA# customid')) + _ <- E.throwStreamErrorIfNotOK' errno + pure ret + +ckpStoreGetAllCheckpoints :: LDCheckpointStore -> CBytes -> IO (Map C_LogID LSN) +ckpStoreGetAllCheckpoints store customid = + Map.fromList <$> ckpStoreGetAllCheckpoints' store customid + ckpStoreUpdateLSN :: LDCheckpointStore -> CBytes -> C_LogID -> LSN -> IO () ckpStoreUpdateLSN = ckpStoreUpdateLSN' (-1) @@ -73,7 +89,7 @@ ckpStoreUpdateLSN' :: Int -> LDCheckpointStore -> CBytes -> C_LogID -> LSN -> IO ckpStoreUpdateLSN' retries store customid logid sn = ZC.withCBytesUnsafe customid $ \customid' -> withForeignPtr store $ \store' -> do - let f = FFI.withAsyncPrimUnsafe (0 :: ErrorCode) $ c_checkpoint_store_update_lsn store' customid' logid sn + let f = FFI.withAsyncPrimUnsafe (0 :: ErrorCode) $ c_checkpoint_store_update_lsn store' (BA# customid') logid sn void $ FFI.retryWhileAgain f retries ckpStoreUpdateMultiLSN @@ -92,7 +108,8 @@ ckpStoreUpdateMultiLSN' retries store customid sns = Z.withPrimArrayUnsafe ka $ \ks' len -> Z.withPrimArrayUnsafe va $ \vs' _len -> FFI.withAsyncPrimUnsafe (0 :: ErrorCode) $ - checkpoint_store_update_multi_lsn store' customid' ks' vs' (fromIntegral len) + checkpoint_store_update_multi_lsn store' + (BA# customid') (BA# ks') (BA# vs') (fromIntegral len) void $ FFI.retryWhileAgain f retries ckpStoreRemoveCheckpoints @@ -102,7 +119,7 @@ ckpStoreRemoveCheckpoints store customid (VP.Vector offset len (Z.ByteArray ba#) ZC.withCBytesUnsafe customid $ \customid' -> withForeignPtr store $ \store' -> do (errno, _) <- FFI.withAsyncPrimUnsafe (0 :: ErrorCode) $ - checkpoint_store_remove_checkpoints store' customid' ba# offset len + checkpoint_store_remove_checkpoints store' (BA# customid') (BA# ba#) offset len void $ E.throwStreamErrorIfNotOK' errno ckpStoreRemoveAllCheckpoints :: HasCallStack => LDCheckpointStore -> CBytes -> IO () @@ -110,7 +127,7 @@ ckpStoreRemoveAllCheckpoints store customid = ZC.withCBytesUnsafe customid $ \customid' -> withForeignPtr store $ \store' -> do (errno, _) <- FFI.withAsyncPrimUnsafe (0 :: ErrorCode) $ - checkpoint_store_remove_all_checkpoints store' customid' + checkpoint_store_remove_all_checkpoints store' (BA# customid') void $ E.throwStreamErrorIfNotOK' errno foreign import ccall unsafe "hs_logdevice.h new_file_based_checkpoint_store" @@ -145,6 +162,16 @@ foreign import ccall unsafe "hs_logdevice.h checkpoint_store_get_lsn" -> MBA# LSN -- ^ value out: lsn -> IO () +foreign import ccall unsafe "hs_logdevice.h checkpoint_store_get_all_checkpoints" + checkpoint_store_get_all_checkpoints + :: Ptr LogDeviceCheckpointStore + -> BA# Word8 -- ^ customer_id + -> StablePtr PrimMVar -> Int + -> MBA# ErrorCode -- ^ value out: error code + -> MBA# Int -> MBA# (Ptr C_LogID) -> MBA# (Ptr LSN) + -> MBA# (Ptr (StdVector C_LogID)) -> MBA# (Ptr (StdVector LSN)) + -> IO () + foreign import ccall unsafe "hs_logdevice.h checkpoint_store_update_lsn" c_checkpoint_store_update_lsn :: Ptr LogDeviceCheckpointStore diff --git a/hstream-store/HStream/Store/Internal/LogDevice/LDQuery.hs b/hstream-store/HStream/Store/Internal/LogDevice/LDQuery.hs index 025a014b0..6e9287b07 100644 --- a/hstream-store/HStream/Store/Internal/LogDevice/LDQuery.hs +++ b/hstream-store/HStream/Store/Internal/LogDevice/LDQuery.hs @@ -3,24 +3,22 @@ module HStream.Store.Internal.LogDevice.LDQuery where import Control.Exception -import Control.Monad (forM) +import Control.Monad (forM) import Data.Int -import Data.IntMap.Strict (IntMap) -import qualified Data.IntMap.Strict as IntMap +import Data.IntMap.Strict (IntMap) +import qualified Data.IntMap.Strict as IntMap import Data.Word import Foreign.C import Foreign.ForeignPtr -import Foreign.Marshal.Alloc (free) +import Foreign.Marshal.Alloc (free) import Foreign.Ptr import GHC.Stack -import qualified Z.Data.CBytes as CBytes -import Z.Data.CBytes (CBytes) -import qualified Z.Foreign as Z -import Z.Foreign (BA#, MBA#) +import qualified Z.Data.CBytes as CBytes +import Z.Data.CBytes (CBytes) +import qualified Z.Foreign as Z -import HStream.Foreign hiding (BA#, MBA#) -import HStream.Store.Exception (throwStoreError) -import HStream.Store.Internal.Foreign +import HStream.Foreign +import HStream.Store.Exception (throwStoreError) data C_LDQuery data C_QueryResults @@ -66,11 +64,12 @@ showTables ldq = withForeignPtr ldq $ \ldq' -> do Z.withPrimUnsafe nullPtr $ \name_del' -> Z.withPrimUnsafe nullPtr $ \desc_val' -> Z.withPrimUnsafe nullPtr $ \desc_del' -> - ldquery_show_tables ldq' len' name_val' name_del' desc_val' desc_del' + ldquery_show_tables ldq' (MBA# len') (MBA# name_val') (MBA# name_del') + (MBA# desc_val') (MBA# desc_del') finally (zip <$> peekStdStringToCBytesN (fromIntegral len) name_val <*> peekStdStringToCBytesN (fromIntegral len) desc_val) - (delete_vector_of_string name_del <> delete_vector_of_string desc_del) + (c_delete_vector_of_string name_del <> c_delete_vector_of_string desc_del) showTableColumns :: LDQuery -> CBytes -> IO ([CBytes], [CBytes], [CBytes]) showTableColumns ldq tableName = @@ -84,16 +83,16 @@ showTableColumns ldq tableName = Z.withPrimUnsafe nullPtr $ \type_del' -> Z.withPrimUnsafe nullPtr $ \desc_val' -> Z.withPrimUnsafe nullPtr $ \desc_del' -> - ldquery_show_table_columns ldq' tableName' len' - name_val' name_del' - type_val' type_del' - desc_val' desc_del' + ldquery_show_table_columns ldq' (BA# tableName') (MBA# len') + (MBA# name_val') (MBA# name_del') + (MBA# type_val') (MBA# type_del') + (MBA# desc_val') (MBA# desc_del') finally ((,,) <$> peekStdStringToCBytesN (fromIntegral len) name_val <*> peekStdStringToCBytesN (fromIntegral len) type_val <*> peekStdStringToCBytesN (fromIntegral len) desc_val) - (delete_vector_of_string name_del <> delete_vector_of_string type_del - <> delete_vector_of_string desc_del) + (c_delete_vector_of_string name_del <> c_delete_vector_of_string type_del + <> c_delete_vector_of_string desc_del) runQuery :: LDQuery -> CBytes -> IO [QueryResult] runQuery ldq query = do @@ -125,7 +124,7 @@ queryResultHeaders :: QueryResults -> Int -> IO [CBytes] queryResultHeaders results idx = withForeignPtr results $ \results_ptr -> do (len, (val, _)) <- Z.withPrimUnsafe 0 $ \len' -> Z.withPrimUnsafe nullPtr $ \val' -> - queryResults__headers results_ptr idx len' val' + queryResults__headers results_ptr idx (MBA# len') (MBA# val') peekStdStringToCBytesN len val queryResultRows :: QueryResults -> Int -> IO [[CBytes]] @@ -136,7 +135,7 @@ queryResultRows results idx = withForeignPtr results $ \results_ptr -> do (len, (val, _)) <- Z.withPrimUnsafe 0 $ \len' -> Z.withPrimUnsafe nullPtr $ \val' -> - queryResults_rows_val results_ptr idx (fromIntegral rowIdx) len' val' + queryResults_rows_val results_ptr idx (fromIntegral rowIdx) (MBA# len') (MBA# val') peekStdStringToCBytesN len val else return [] @@ -144,7 +143,7 @@ queryResultColsMaxSize :: QueryResults -> Int -> IO [CSize] queryResultColsMaxSize results idx = withForeignPtr results $ \results_ptr -> do (len, (val, _)) <- Z.withPrimUnsafe @CSize 0 $ \len' -> Z.withPrimUnsafe nullPtr $ \val' -> - queryResults__cols_max_size results_ptr idx len' val' + queryResults__cols_max_size results_ptr idx (MBA# len') (MBA# val') peekN (fromIntegral len) val queryResultMetadata :: QueryResults -> Int -> IO ActiveQueryMetadata @@ -159,17 +158,17 @@ queryResultMetadata results idx = withForeignPtr results $ \results_ptr -> do Z.withPrimUnsafe nullPtr $ \addr_del' -> Z.withPrimUnsafe nullPtr $ \reason_val' -> Z.withPrimUnsafe nullPtr $ \reason_del' -> - queryResults__metadata_failures results_ptr idx len' - key_val' key_del' - addr_val' addr_del' - reason_val' reason_del' + queryResults__metadata_failures results_ptr idx (MBA# len') + (MBA# key_val') (MBA# key_del') + (MBA# addr_val') (MBA# addr_del') + (MBA# reason_val') (MBA# reason_del') failures <- if len >= 1 then do keys <- finally (peekN (fromIntegral len) key_val) - (delete_vector_of_cint key_del) + (c_delete_vector_of_cint key_del) addrs <- finally (peekStdStringToCBytesN (fromIntegral len) addr_val) - (delete_vector_of_string addr_del) + (c_delete_vector_of_string addr_del) reasons <- finally (peekStdStringToCBytesN (fromIntegral len) reason_val) - (delete_vector_of_string reason_del) + (c_delete_vector_of_string reason_del) let details = zipWith FailedNodeDetails addrs reasons return $ IntMap.fromList $ zip keys details else return IntMap.empty diff --git a/hstream-store/HStream/Store/Internal/LogDevice/LogAttributes.hs b/hstream-store/HStream/Store/Internal/LogDevice/LogAttributes.hs index 1af79fdc5..f71c62fd7 100644 --- a/hstream-store/HStream/Store/Internal/LogDevice/LogAttributes.hs +++ b/hstream-store/HStream/Store/Internal/LogDevice/LogAttributes.hs @@ -205,6 +205,7 @@ peekLogAttributes ptr = do #undef _ARG #undef _MAYBE_ARG +-- TODO: Simplify by using peekCppMap function in hstream-common-base peekLogAttributesExtras :: Ptr LogDeviceLogAttributes -> IO (Map CBytes CBytes) peekLogAttributesExtras attrs = do (len, (keys_ptr, (values_ptr, (keys_vec, (values_vec, _))))) <- @@ -218,7 +219,7 @@ peekLogAttributesExtras attrs = do (MBA# len) (MBA# keys) (MBA# values) (MBA# keys_vec) (MBA# values_vec) finally (buildExtras (fromIntegral len) keys_ptr values_ptr) - (delete_vector_of_string keys_vec <> delete_vector_of_string values_vec) + (c_delete_vector_of_string keys_vec <> c_delete_vector_of_string values_vec) where buildExtras len keys_ptr values_ptr = do keys <- peekStdStringToCBytesN len keys_ptr diff --git a/hstream-store/HStream/Store/Internal/LogDevice/LogConfigTypes.hs b/hstream-store/HStream/Store/Internal/LogDevice/LogConfigTypes.hs index 035ad1e5f..d74c91f2a 100644 --- a/hstream-store/HStream/Store/Internal/LogDevice/LogConfigTypes.hs +++ b/hstream-store/HStream/Store/Internal/LogDevice/LogConfigTypes.hs @@ -29,11 +29,8 @@ import GHC.Stack (HasCallStack, import qualified Z.Data.CBytes as CBytes import Z.Data.CBytes (CBytes) import qualified Z.Foreign as Z -import Z.Foreign (BA#, BAArray#, - MBA#) -import HStream.Foreign hiding (BA#, - BAArray#, MBA#) +import HStream.Foreign import qualified HStream.Store.Exception as E import HStream.Store.Internal.Foreign import HStream.Store.Internal.LogDevice.LogAttributes @@ -45,8 +42,8 @@ import HStream.Store.Internal.Types getLogAttrsExtra :: LDLogAttrs -> CBytes -> IO (Maybe CBytes) getLogAttrsExtra attrs key = withForeignPtr attrs $ \attrs' -> CBytes.withCBytesUnsafe key $ \key' -> do - et <- c_exist_log_attrs_extras attrs' key' - if et then Just . CBytes.fromBytes <$> Z.fromStdString (c_get_log_attrs_extra attrs' key') + et <- c_exist_log_attrs_extras attrs' (BA# key') + if et then Just . CBytes.fromBytes <$> Z.fromStdString (c_get_log_attrs_extra attrs' (BA# key')) else return Nothing updateLogAttrsExtrasPtr @@ -59,7 +56,7 @@ updateLogAttrsExtrasPtr attrs' logExtraAttrs = do vs = map (CBytes.rawPrimArray . snd) extras Z.withPrimArrayListUnsafe ks $ \ks' l -> Z.withPrimArrayListUnsafe vs $ \vs' _ -> do - i <- c_update_log_attrs_extras attrs' l ks' vs' + i <- c_update_log_attrs_extras attrs' l (BAArray# ks') (BAArray# vs') newForeignPtr c_free_log_attributes_fun i getAttrsExtrasFromPtr :: Ptr LogDeviceLogAttributes -> IO (Map.Map CBytes CBytes) @@ -70,10 +67,11 @@ getAttrsExtrasFromPtr attrs = do Z.withPrimUnsafe nullPtr $ \values -> Z.withPrimUnsafe nullPtr $ \keys_vec -> Z.withPrimUnsafe nullPtr $ \values_vec -> - c_get_attribute_extras attrs len keys values keys_vec values_vec + c_get_attribute_extras attrs (MBA# len) (MBA# keys) (MBA# values) + (MBA# keys_vec) (MBA# values_vec) finally (buildExtras (fromIntegral len) keys_ptr values_ptr) - (delete_vector_of_string keys_vec <> delete_vector_of_string values_vec) + (c_delete_vector_of_string keys_vec <> c_delete_vector_of_string values_vec) where buildExtras len keys_ptr values_ptr = do keys <- peekStdStringToCBytesN len keys_ptr @@ -86,10 +84,10 @@ getAttrsReplicationFactorFromPtr attrs = fromIntegral <$> c_get_replication_fact foreign import ccall unsafe "hs_logdevice.h get_attribute_extras" c_get_attribute_extras :: Ptr LogDeviceLogAttributes -> MBA# CSize - -> MBA# (Ptr Z.StdString) - -> MBA# (Ptr Z.StdString) - -> MBA# (Ptr (StdVector Z.StdString)) - -> MBA# (Ptr (StdVector Z.StdString)) + -> MBA# (Ptr StdString) + -> MBA# (Ptr StdString) + -> MBA# (Ptr (StdVector StdString)) + -> MBA# (Ptr (StdVector StdString)) -> IO () foreign import ccall unsafe "hs_logdevice.h get_replication_factor" @@ -117,7 +115,7 @@ foreign import ccall unsafe "hs_logdevice.h get_log_attrs_extra" c_get_log_attrs_extra :: Ptr LogDeviceLogAttributes -> BA# Word8 - -> IO (Ptr Z.StdString) + -> IO (Ptr StdString) foreign import ccall unsafe "hs_logdevice.h free_log_attributes" c_free_log_attributes :: Ptr LogDeviceLogAttributes -> IO () @@ -217,7 +215,7 @@ getLogDirectory :: HasCallStack => LDClient -> CBytes -> IO LDDirectory getLogDirectory client path = CBytes.withCBytesUnsafe path $ \path' -> withForeignPtr client $ \client' -> do - let cfun = c_ld_client_get_directory client' path' + let cfun = c_ld_client_get_directory client' (BA# path') (errno, dir, _) <- withAsyncPrimUnsafe2' (0 :: ErrorCode) nullPtr cfun (E.throwSubmitIfNotOK . fromIntegral) _ <- E.throwStreamErrorIfNotOK' errno @@ -236,8 +234,9 @@ logDirectoryGetLogsName recursive dir = withForeignPtr dir $ \dir' -> do (len, (names_ptr, stdvec_ptr)) <- Z.withPrimUnsafe 0 $ \len' -> Z.withPrimUnsafe nullPtr $ \names' -> - fst <$> Z.withPrimUnsafe nullPtr (c_ld_logdirectory_get_logs_name dir' recursive len' names') - finally (peekStdStringToCBytesN len names_ptr) (delete_vector_of_string stdvec_ptr) + fst <$> Z.withPrimUnsafe nullPtr (\p -> + c_ld_logdirectory_get_logs_name dir' recursive (MBA# len') (MBA# names') (MBA# p)) + finally (peekStdStringToCBytesN len names_ptr) (c_delete_vector_of_string stdvec_ptr) logDirectoryGetVersion :: LDDirectory -> IO C_LogsConfigVersion logDirectoryGetVersion dir = withForeignPtr dir c_ld_logdirectory_get_version @@ -253,7 +252,7 @@ makeLogDirectory client path attrs mkParent = do withForeignPtr client $ \client' -> withForeignPtr logAttrs $ \attrs' -> CBytes.withCBytesUnsafe path $ \path' -> do - let cfun = c_ld_client_make_directory client' path' mkParent attrs' + let cfun = c_ld_client_make_directory client' (BA# path') mkParent attrs' MakeDirectoryCbData errno directory _ <- withAsync makeDirectoryCbDataSize peekMakeDirectoryCbData cfun void $ E.throwStreamErrorIfNotOK' errno @@ -265,7 +264,7 @@ removeLogDirectory client path recursive = withForeignPtr client $ \client' -> do let size = logsConfigStatusCbDataSize peek_data = peekLogsConfigStatusCbData - cfun = c_ld_client_remove_directory client' path' recursive + cfun = c_ld_client_remove_directory client' (BA# path') recursive LogsConfigStatusCbData errno version _ <- withAsync size peek_data cfun void $ E.throwStreamErrorIfNotOK' errno return version @@ -275,28 +274,28 @@ logDirChildrenNames dir = withForeignPtr dir $ \dir' -> do (len, (raw_ptr, names_ptr)) <- Z.withPrimUnsafe @Int 0 $ \len' -> Z.withPrimUnsafe nullPtr $ \names' -> - c_ld_logdir_children_keys dir' len' names' - finally (peekStdStringToCBytesN len names_ptr) (delete_vector_of_string raw_ptr) + c_ld_logdir_children_keys dir' (MBA# len') (MBA# names') + finally (peekStdStringToCBytesN len names_ptr) (c_delete_vector_of_string raw_ptr) logDirLogsNames :: LDDirectory -> IO [CBytes] logDirLogsNames dir = withForeignPtr dir $ \dir' -> do (len, (raw_ptr, names_ptr)) <- Z.withPrimUnsafe @Int 0 $ \len' -> Z.withPrimUnsafe nullPtr $ \names' -> - c_ld_logdir_logs_keys dir' len' names' - finally (peekStdStringToCBytesN len names_ptr) (delete_vector_of_string raw_ptr) + c_ld_logdir_logs_keys dir' (MBA# len') (MBA# names') + finally (peekStdStringToCBytesN len names_ptr) (c_delete_vector_of_string raw_ptr) logDirChildFullName :: LDDirectory -> CBytes -> IO CBytes logDirChildFullName dir name = withForeignPtr dir $ \dir' -> CBytes.withCBytesUnsafe name $ \name' -> - CBytes.fromCString =<< c_ld_logdir_child_full_name dir' name' + CBytes.fromCString =<< c_ld_logdir_child_full_name dir' (BA# name') logDirLogFullName :: LDDirectory -> CBytes -> IO CBytes logDirLogFullName dir name = withForeignPtr dir $ \dir' -> CBytes.withCBytesUnsafe name $ \name' -> - CBytes.fromCString =<< c_ld_logdir_log_full_name dir' name' + CBytes.fromCString =<< c_ld_logdir_log_full_name dir' (BA# name') -- Note that this pointer is only valid if LogDirectory is valid. logDirectoryGetAttrsPtr :: LDDirectory -> IO (Ptr LogDeviceLogAttributes) @@ -321,14 +320,14 @@ foreign import ccall unsafe "hs_logdevice.h ld_logdir_log_full_name" foreign import ccall unsafe "hs_logdevice.h ld_logdir_children_keys" c_ld_logdir_children_keys :: Ptr LogDeviceLogDirectory - -> MBA# CSize -> MBA# (Ptr (StdVector Z.StdString)) - -> IO (Ptr Z.StdString) + -> MBA# Int -> MBA# (Ptr (StdVector StdString)) + -> IO (Ptr StdString) foreign import ccall unsafe "hs_logdevice.h ld_logdir_logs_keys" c_ld_logdir_logs_keys :: Ptr LogDeviceLogDirectory - -> MBA# CSize -> MBA# (Ptr (StdVector Z.StdString)) - -> IO (Ptr Z.StdString) + -> MBA# Int -> MBA# (Ptr (StdVector StdString)) + -> IO (Ptr StdString) foreign import ccall unsafe "hs_logdevice.h ld_client_make_directory_sync" c_ld_client_make_directory_sync @@ -388,8 +387,8 @@ foreign import ccall unsafe "hs_logdevice.h ld_logdirectory_get_logs_name" c_ld_logdirectory_get_logs_name :: Ptr LogDeviceLogDirectory -> Bool -- ^ recursive - -> MBA# CSize -> MBA# (Ptr Z.StdString) - -> MBA# (Ptr (StdVector Z.StdString)) + -> MBA# CSize -> MBA# (Ptr StdString) + -> MBA# (Ptr (StdVector StdString)) -> IO () ------------------------------------------------------------------------------- @@ -418,7 +417,7 @@ makeLogGroupSync client path start end attrs mkParent = do CBytes.withCBytesUnsafe path $ \path' -> do (group', _) <- Z.withPrimUnsafe nullPtr $ \group'' -> void $ E.throwStreamErrorIfNotOK $ - c_ld_client_make_loggroup_sync client' path' start end attrs' mkParent group'' + c_ld_client_make_loggroup_sync client' (BA# path') start end attrs' mkParent (MBA# group'') newForeignPtr c_free_logdevice_loggroup_fun group' makeLogGroup @@ -434,7 +433,7 @@ makeLogGroup client path start end attrs mkParent = do withForeignPtr client $ \client' -> withForeignPtr logAttrs $ \attrs' -> CBytes.withCBytesUnsafe path $ \path' -> do - let cfun = c_ld_client_make_loggroup client' path' start end attrs' mkParent + let cfun = c_ld_client_make_loggroup client' (BA# path') start end attrs' mkParent MakeLogGroupCbData errno group _ <- withAsync makeLogGroupCbDataSize peekMakeLogGroupCbData cfun void $ E.throwStreamErrorIfNotOK' errno @@ -445,7 +444,7 @@ getLogGroup :: HasCallStack => LDClient -> CBytes -> IO LDLogGroup getLogGroup client path = withForeignPtr client $ \client' -> CBytes.withCBytesUnsafe path $ \path' -> do - let cfun = c_ld_client_get_loggroup client' path' + let cfun = c_ld_client_get_loggroup client' (BA# path') (errno, group_ptr, _) <- withAsyncPrimUnsafe2 (0 :: ErrorCode) nullPtr cfun void $ E.throwStreamErrorIfNotOK' errno newForeignPtr c_free_logdevice_loggroup_fun group_ptr @@ -496,7 +495,7 @@ renameLogGroup client from_path to_path = withForeignPtr client $ \client' -> do let size = logsConfigStatusCbDataSize peek_data = peekLogsConfigStatusCbData - cfun = c_ld_client_rename client' from_path_ to_path_ + cfun = c_ld_client_rename client' (BA# from_path_) (BA# to_path_) LogsConfigStatusCbData errno version _ <- withAsync size peek_data cfun void $ E.throwStreamErrorIfNotOK' errno return version @@ -521,7 +520,7 @@ removeLogGroup client path = withForeignPtr client $ \client' -> do let size = logsConfigStatusCbDataSize peek_data = peekLogsConfigStatusCbData - cfun = c_ld_client_remove_loggroup client' path_ + cfun = c_ld_client_remove_loggroup client' (BA# path_) LogsConfigStatusCbData errno version _ <- withAsync size peek_data cfun void $ E.throwStreamErrorIfNotOK' errno return version @@ -532,7 +531,7 @@ logGroupGetRange group = (start_ret, (end_ret, _)) <- Z.withPrimUnsafe C_LOGID_MIN_INVALID $ \start' -> Z.withPrimUnsafe C_LOGID_MIN_INVALID $ \end' -> - c_ld_loggroup_get_range group' start' end' + c_ld_loggroup_get_range group' (MBA# start') (MBA# end') return (start_ret, end_ret) {-# INLINE logGroupGetRange #-} @@ -560,8 +559,8 @@ logGroupGetExtraAttr :: LDLogGroup -> CBytes -> IO (Maybe CBytes) logGroupGetExtraAttr group key = withForeignPtr group $ \group' -> CBytes.withCBytesUnsafe key $ \key' -> do attrs' <- c_ld_loggroup_get_attrs group' - et <- c_exist_log_attrs_extras attrs' key' - if et then Just . CBytes.fromBytes <$> Z.fromStdString (c_get_log_attrs_extra attrs' key') + et <- c_exist_log_attrs_extras attrs' (BA# key') + if et then Just . CBytes.fromBytes <$> Z.fromStdString (c_get_log_attrs_extra attrs' (BA# key')) else return Nothing {-# INLINE logGroupGetExtraAttr #-} @@ -578,7 +577,7 @@ logGroupUpdateExtraAttrs client group extraAttrs = Z.withPrimArrayListUnsafe vs $ \vs' _ -> do let size = logsConfigStatusCbDataSize peek_data = peekLogsConfigStatusCbData - cfun = c_ld_loggroup_update_extra_attrs client' group' l ks' vs' + cfun = c_ld_loggroup_update_extra_attrs client' group' l (BAArray# ks') (BAArray# vs') LogsConfigStatusCbData errno version _failure_reason <- withAsync size peek_data cfun void $ E.throwStreamErrorIfNotOK' errno syncLogsConfigVersion client version @@ -611,7 +610,7 @@ logGroupSetRange client path (start, end) = CBytes.withCBytesUnsafe path $ \path' -> do let size = logsConfigStatusCbDataSize peek_data = peekLogsConfigStatusCbData - cfun = c_ld_client_set_log_group_range client' path' start end + cfun = c_ld_client_set_log_group_range client' (BA# path') start end LogsConfigStatusCbData errno version _ <- withAsync size peek_data cfun void $ E.throwStreamErrorIfNotOK' errno return version @@ -749,7 +748,7 @@ ldWriteAttributes client path attrs' = CBytes.withCBytesUnsafe path $ \path' -> do let size = logsConfigStatusCbDataSize peek_data = peekLogsConfigStatusCbData - cfun = c_ld_client_set_attributes client' path' attrs' + cfun = c_ld_client_set_attributes client' (BA# path') attrs' LogsConfigStatusCbData errno version _ <- withAsync size peek_data cfun void $ E.throwStreamErrorIfNotOK' errno return version diff --git a/hstream-store/HStream/Store/Internal/LogDevice/Reader.hs b/hstream-store/HStream/Store/Internal/LogDevice/Reader.hs index dd5e281c6..f2fa7fa5e 100644 --- a/hstream-store/HStream/Store/Internal/LogDevice/Reader.hs +++ b/hstream-store/HStream/Store/Internal/LogDevice/Reader.hs @@ -22,8 +22,8 @@ import qualified Z.Data.CBytes as ZC import Z.Data.CBytes (CBytes) import Z.Data.Vector.Base (Bytes) import qualified Z.Foreign as Z -import Z.Foreign (BA#, MBA#) +import HStream.Foreign (BA# (..), MBA# (..)) import qualified HStream.Logger as Log import qualified HStream.Store.Exception as E import HStream.Store.Internal.Foreign (cbool2bool, retryWhileAgain, @@ -66,7 +66,7 @@ newLDSyncCkpReader name reader store = -- FIXME: The number of retries when synchronously writing checkpoints. -- We only use the async cpp function, so this option has no means currently let retries = 10 - i <- c_new_logdevice_sync_checkpointed_reader name' reader' store' retries + i <- c_new_logdevice_sync_checkpointed_reader (BA# name') reader' store' retries newForeignPtr c_free_sync_checkpointed_reader_fun i -- | Start reading a log. @@ -293,7 +293,8 @@ writeCheckpoints reader sns retries = va = Z.primArrayFromList $ map snd xs Z.withPrimArrayUnsafe ka $ \ks' len -> Z.withPrimArrayUnsafe va $ \vs' _ -> do - let f = withAsyncPrimUnsafe (0 :: ErrorCode) $ crb_write_checkpoints reader' ks' vs' (fromIntegral len) + let f = withAsyncPrimUnsafe (0 :: ErrorCode) $ + crb_write_checkpoints reader' (BA# ks') (BA# vs') (fromIntegral len) retryWhileAgain f retries {-# INLINABLE writeCheckpoints #-} @@ -302,7 +303,8 @@ writeLastCheckpoints reader xs retries = withForeignPtr reader $ \reader' -> do let topicIDs = Z.primArrayFromList xs Z.withPrimArrayUnsafe topicIDs $ \id' len -> do - let f = withAsyncPrimUnsafe (0 :: ErrorCode) $ crb_write_last_read_checkpoints reader' id' (fromIntegral len) + let f = withAsyncPrimUnsafe (0 :: ErrorCode) $ + crb_write_last_read_checkpoints reader' (BA# id') (fromIntegral len) retryWhileAgain f retries {-# INLINABLE writeLastCheckpoints #-} @@ -310,7 +312,7 @@ removeCheckpoints :: HasCallStack => LDSyncCkpReader -> [C_LogID] -> IO () removeCheckpoints reader xs = withForeignPtr reader $ \reader' -> do let logids = Z.primArrayFromList xs Z.withPrimArrayUnsafe logids $ \id' len -> do - let f = crb_asyncRemoveCheckpoints reader' id' (fromIntegral len) + let f = crb_asyncRemoveCheckpoints reader' (BA# id') (fromIntegral len) (err, _ret) <- withAsyncPrimUnsafe (0 :: ErrorCode) f void $ E.throwStreamErrorIfNotOK' err @@ -479,7 +481,7 @@ foreign import ccall unsafe "hs_logdevice.h crb_write_checkpoints" -> BA# LSN -> Word -> StablePtr PrimMVar -> Int - -> MBA# Word8 + -> MBA# ErrorCode -> IO () foreign import ccall unsafe "hs_logdevice.h crb_write_last_read_checkpoints" @@ -488,7 +490,7 @@ foreign import ccall unsafe "hs_logdevice.h crb_write_last_read_checkpoints" -> BA# C_LogID -> Word -> StablePtr PrimMVar -> Int - -> MBA# Word8 + -> MBA# ErrorCode -> IO () foreign import ccall unsafe "hs_logdevice.h crb_asyncRemoveCheckpoints" @@ -496,14 +498,14 @@ foreign import ccall unsafe "hs_logdevice.h crb_asyncRemoveCheckpoints" :: Ptr LogDeviceSyncCheckpointedReader -> BA# C_LogID -> Word -> StablePtr PrimMVar -> Int - -> MBA# Word8 + -> MBA# ErrorCode -> IO () foreign import ccall unsafe "hs_logdevice.h crb_asyncRemoveAllCheckpoints" crb_asyncRemoveAllCheckpoints :: Ptr LogDeviceSyncCheckpointedReader -> StablePtr PrimMVar -> Int - -> MBA# Word8 + -> MBA# ErrorCode -> IO () foreign import ccall safe "hs_logdevice.h sync_write_last_read_checkpoints" diff --git a/hstream-store/HStream/Store/Internal/LogDevice/VersionedConfigStore.hs b/hstream-store/HStream/Store/Internal/LogDevice/VersionedConfigStore.hs index c387cb83a..8a328af2f 100644 --- a/hstream-store/HStream/Store/Internal/LogDevice/VersionedConfigStore.hs +++ b/hstream-store/HStream/Store/Internal/LogDevice/VersionedConfigStore.hs @@ -14,8 +14,8 @@ import qualified Z.Data.CBytes as CBytes import Z.Data.CBytes (CBytes) import Z.Data.Vector (Bytes) import qualified Z.Foreign as Z -import Z.Foreign (BA#) +import HStream.Foreign (BA# (..), MBA# (..)) import qualified HStream.Logger as Log import qualified HStream.Store.Exception as E import HStream.Store.Internal.Foreign @@ -56,7 +56,7 @@ vcsGetConfig -> IO Bytes vcsGetConfig vcs key m_base_version auto_retries = withForeignPtr vcs $ \vcs' -> CBytes.withCBytesUnsafe key $ \key' -> - go vcs' key' auto_retries + go vcs' (BA# key') auto_retries where go vcs' key' retries = do cbData <- run vcs' key' @@ -71,7 +71,7 @@ vcsGetConfig vcs key m_base_version auto_retries = run vcs' key' = case m_base_version of Just bv -> fmap snd $ Z.withPrimUnsafe bv $ \ver_ -> - f $ c_logdevice_vcs_get_config vcs' key' (unsafeFreezeBA# ver_) + f $ c_logdevice_vcs_get_config vcs' key' (unsafeFreezeBA# (MBA# ver_)) Nothing -> f $ c_logdevice_vcs_get_config' vcs' key' nullPtr f = withAsyncVoid vcsValueCallbackDataSize peekVcsValueCallbackData @@ -91,7 +91,7 @@ ldVcsGetLatestConfig -> IO Bytes ldVcsGetLatestConfig vcs key auto_retries = withForeignPtr vcs $ \vcs' -> CBytes.withCBytesUnsafe key $ \key' -> - go vcs' key' auto_retries + go vcs' (BA# key') auto_retries where go vcs' key' retries = do cbData <- withAsyncVoid vcsValueCallbackDataSize peekVcsValueCallbackData @@ -149,7 +149,7 @@ ldVcsUpdateConfig vcs key val cond auto_retries = withForeignPtr vcs $ \vcs' -> CBytes.withCBytesUnsafe key $ \key' -> Z.withPrimVectorUnsafe val $ \val' offset len -> - go vcs' key' val' offset len auto_retries + go vcs' (BA# key') (BA# val') offset len auto_retries where go vcs' key' val' offset len retries = do cbData <- withAsyncVoid vcsWriteCallbackDataSize peekVcsWriteCallbackData diff --git a/hstream-store/HStream/Store/Stream.hs b/hstream-store/HStream/Store/Stream.hs index cb4b21d0d..d92ea3bd6 100644 --- a/hstream-store/HStream/Store/Stream.hs +++ b/hstream-store/HStream/Store/Stream.hs @@ -82,6 +82,8 @@ module HStream.Store.Stream , LD.newRSMBasedCheckpointStore , LD.newZookeeperBasedCheckpointStore , LD.ckpStoreGetLSN + , LD.ckpStoreGetAllCheckpoints + , LD.ckpStoreGetAllCheckpoints' , LD.ckpStoreUpdateLSN , LD.ckpStoreUpdateMultiLSN , LD.ckpStoreRemoveCheckpoints diff --git a/hstream-store/cbits/logdevice/hs_checkpoint.cpp b/hstream-store/cbits/logdevice/hs_checkpoint.cpp index fbc735a10..24b988f50 100644 --- a/hstream-store/cbits/logdevice/hs_checkpoint.cpp +++ b/hstream-store/cbits/logdevice/hs_checkpoint.cpp @@ -72,6 +72,37 @@ void checkpoint_store_get_lsn(logdevice_checkpoint_store_t* store, store->rep->getLSN(customer_id_, logid_t(logid), cb); } +void checkpoint_store_get_all_checkpoints( + logdevice_checkpoint_store_t* store, const char* customer_id, + HsStablePtr mvar, HsInt cap, facebook::logdevice::Status* st_out, + HsInt* len, c_logid_t** keys_ptr, c_lsn_t** values_ptr, + std::vector** keys_, std::vector** values_) { + auto cb = [st_out, cap, mvar, len, keys_ptr, values_ptr, keys_, + values_](facebook::logdevice::Status st, + std::map& log_lsn_map) { + if (st_out) { + *st_out = st; + } + if (st == ld::Status::OK) { + std::vector* keys = new std::vector; + std::vector* values = new std::vector; + + for (const auto& [key, value] : log_lsn_map) { + keys->push_back(key); + values->push_back(value); + } + + *len = keys->size(); + *keys_ptr = keys->data(); + *values_ptr = values->data(); + *keys_ = keys; + *values_ = values; + } + hs_try_putmvar(cap, mvar); + }; + store->rep->getAllCheckpoints(std::string(customer_id), std::move(cb)); +} + void checkpoint_store_update_lsn(logdevice_checkpoint_store_t* store, const char* customer_id, c_logid_t logid, c_lsn_t lsn, HsStablePtr mvar, HsInt cap, diff --git a/hstream-store/cbits/utils.cpp b/hstream-store/cbits/utils.cpp index 7497e66e1..02180ebfe 100644 --- a/hstream-store/cbits/utils.cpp +++ b/hstream-store/cbits/utils.cpp @@ -34,12 +34,11 @@ template char* copyString(const T& payload) { extern "C" { +// TODO: remove std::string* hs_cal_std_string_off(std::string* str, HsInt idx) { return str + idx; } -void delete_vector_of_cint(std::vector* ss) { delete ss; } - HsInt get_vector_of_string_size(std::vector* ss) { return ss->size(); } diff --git a/hstream-store/hstream-store.cabal b/hstream-store/hstream-store.cabal index 1f4370cb9..8bb2f8a99 100644 --- a/hstream-store/hstream-store.cabal +++ b/hstream-store/hstream-store.cabal @@ -90,10 +90,10 @@ library , hashable >=1.3.5 && <1.5 , hstream-common-base , primitive ^>=0.7 + , text , vector >=0.12 && <0.14 , vector-algorithms ^>=0.9 , Z-Data - , text default-language: Haskell2010 default-extensions: diff --git a/hstream-store/test/HStream/Store/CheckpointStoreSpec.hs b/hstream-store/test/HStream/Store/CheckpointStoreSpec.hs index c7a413cea..162494316 100644 --- a/hstream-store/test/HStream/Store/CheckpointStoreSpec.hs +++ b/hstream-store/test/HStream/Store/CheckpointStoreSpec.hs @@ -2,7 +2,7 @@ module HStream.Store.CheckpointStoreSpec (spec) where -import Control.Monad (void) +import Control.Monad (forM, void) import qualified Data.Map.Strict as Map import qualified Data.Vector.Primitive as VP import Test.Hspec @@ -35,6 +35,13 @@ storeSpec new_ckp_store = do S.ckpStoreUpdateLSN checkpointStore "customer1" logid 2 S.ckpStoreGetLSN checkpointStore "customer1" logid `shouldReturn` 2 + it "get all" $ do + expected <- forM [1..1000] $ \i -> do + S.ckpStoreUpdateLSN checkpointStore "customer_get_all" i i + pure (i, i) + allCkps <- S.ckpStoreGetAllCheckpoints checkpointStore "customer_get_all" + Map.toAscList allCkps `shouldBe` expected + it "update multi lsn" $ do S.ckpStoreUpdateLSN checkpointStore "customer1" 1 1 S.ckpStoreGetLSN checkpointStore "customer1" 1 `shouldReturn` 1