diff --git a/common/base/HStream/Foreign.hs b/common/base/HStream/Foreign.hs index b041ae056..d15bcd9ce 100644 --- a/common/base/HStream/Foreign.hs +++ b/common/base/HStream/Foreign.hs @@ -6,6 +6,7 @@ module HStream.Foreign ( PeekNFun + , DeleteFun , peekN , BA# (..) , MBA# (..) @@ -24,8 +25,12 @@ module HStream.Foreign -- * List , StdVector , FollySmallVector - , peekFollySmallVectorDoubleN + , peekStdVectorWord64 + , peekStdVectorWord64Off + , peekStdVectorWord64N , peekFollySmallVectorDouble + , peekFollySmallVectorDoubleOff + , peekFollySmallVectorDoubleN -- * Map , PeekMapFun @@ -38,8 +43,9 @@ module HStream.Foreign , c_delete_vector_of_string , c_delete_vector_of_int , c_delete_vector_of_int64 + , c_delete_vector_of_uint64 , c_delete_std_vec_of_folly_small_vec_of_double - , cal_offset_std_string + , c_cal_offset_std_string , bool2cbool ) where @@ -49,12 +55,12 @@ import Data.Int (Int64) import Data.Map.Strict (Map) import qualified Data.Map.Strict as Map import Data.Primitive +import Data.Word import Foreign.C.Types import Foreign.ForeignPtr import Foreign.Ptr import Foreign.Storable import GHC.Exts -import GHC.Prim import qualified Z.Data.Array as Z import qualified Z.Data.CBytes as CBytes import Z.Data.CBytes (CBytes) @@ -77,6 +83,20 @@ newtype MBA# a = MBA# (MutableByteArray# RealWorld) -- TODO: ghc-9.4+ deprecates ArrayArray#, consider using Array# instead newtype BAArray# a = BAArray# ArrayArray# +------------------------------------------------------------------------------- + +#define HS_CPP_GET_SIZE(CFUN, HSOBJ) \ + foreign import ccall unsafe "get_size_##CFUN" \ + c_get_size_##CFUN :: Ptr HSOBJ -> IO Int + +#define HS_CPP_PEEK(CFUN, HSOBJ, VAL_TYPE) \ + foreign import ccall unsafe "peek_##CFUN" \ + c_peek_##CFUN :: Ptr HSOBJ -> Int -> Ptr VAL_TYPE -> IO () + +#define HS_CPP_CAL_OFFSET(CFUN, HSOBJ) \ + foreign import ccall unsafe "cal_offset_##CFUN" \ + c_cal_offset_##CFUN :: Ptr HSOBJ -> Int -> IO (Ptr HSOBJ) + ------------------------------------------------------------------------------- -- Optional @@ -115,7 +135,7 @@ peekStdStringToCBytesN len ptr peekStdStringToCBytesIdx :: Ptr Z.StdString -> Int -> IO CBytes peekStdStringToCBytesIdx p offset = do - ptr <- cal_offset_std_string p offset + ptr <- c_cal_offset_std_string p offset siz :: Int <- Z.hs_std_string_size ptr let !siz' = siz + 1 (mpa@(Z.MutablePrimArray mba#) :: Z.MutablePrimArray Z.RealWorld a) <- Z.newPrimArray siz' @@ -132,58 +152,58 @@ withStdStringUnsafe f = do else do str <- finally (peekStdStringToCBytesIdx ptr' 0) (c_delete_string ptr') pure (str, ret) +HS_CPP_CAL_OFFSET(std_string, StdString) + ------------------------------------------------------------------------------- data StdVector a - data FollySmallVector a -peekFollySmallVectorDoubleN :: Int -> Ptr (FollySmallVector Double) -> IO [[Double]] -peekFollySmallVectorDoubleN len ptr - | len <= 0 || ptr == nullPtr = return [] - | otherwise = forM [0..len-1] (peekFollySmallVectorDouble ptr) - -- TODO: use Vector or Array as returned value, so that we optimise(remove) the -- "peekN" function to copy the memory twice. -peekFollySmallVectorDouble :: Ptr (FollySmallVector Double) -> Int -> IO [Double] -peekFollySmallVectorDouble ptr offset = do - ptr' <- c_cal_offset_vec_of_folly_small_vec_of_double ptr offset - size <- c_get_size_folly_small_vec_of_double ptr' - fp <- mallocForeignPtrBytes size - withForeignPtr fp $ \data' -> do - c_peek_folly_small_vec_of_double ptr' size data' - peekN size data' - -#define HS_CPP_VEC_SIZE(CFUN, HSOBJ) \ - foreign import ccall unsafe "hs_cpp_lib.h CFUN" \ - c_##CFUN :: Ptr HSOBJ -> IO Int - -#define HS_CPP_VEC_PEEK(CFUN, HSOBJ, VAL_TYPE) \ - foreign import ccall unsafe "hs_cpp_lib.h CFUN" \ - c_##CFUN :: Ptr HSOBJ -> Int -> Ptr VAL_TYPE -> IO () - -#define HS_CPP_VEC_OFFSET(CFUN, HSOBJ) \ - foreign import ccall unsafe "hs_cpp_lib.h CFUN" \ - c_##CFUN :: Ptr HSOBJ -> Int -> IO (Ptr HSOBJ) - -HS_CPP_VEC_SIZE(get_size_folly_small_vec_of_double, (FollySmallVector Double)) -HS_CPP_VEC_PEEK(peek_folly_small_vec_of_double, (FollySmallVector Double), Double) -HS_CPP_VEC_OFFSET(cal_offset_vec_of_folly_small_vec_of_double, (FollySmallVector Double)) +#define HS_PEEK(ty, a, cfun) \ + peek##ty##a##Off :: Ptr (ty a) -> Int -> IO [a]; \ + peek##ty##a##Off ptr offset = (do \ + ptr' <- c_cal_offset_##cfun ptr offset; \ + size <- c_get_size_##cfun ptr'; \ + fp <- mallocForeignPtrBytes size; \ + withForeignPtr fp (\data' -> (do c_peek_##cfun ptr' size data'; \ + peekN size data')) ); \ + \ + peek##ty##a :: Ptr (ty a) -> IO [a]; \ + peek##ty##a ptr = peek##ty##a##Off ptr 0; \ + \ + peek##ty##a##N :: Int -> Ptr (ty a) -> IO [[a]]; \ + peek##ty##a##N len ptr \ + | len <= 0 || ptr == nullPtr = return [] \ + | otherwise = forM [0..len-1] (peek##ty##a##Off ptr); + +HS_PEEK(StdVector, Word64, vec_of_uint64) +HS_PEEK(FollySmallVector, Double, folly_small_vec_of_double) + +HS_CPP_GET_SIZE(vec_of_uint64, (StdVector Word64)) +HS_CPP_GET_SIZE(folly_small_vec_of_double, (FollySmallVector Double)) + +HS_CPP_CAL_OFFSET(vec_of_uint64, (StdVector Word64)) +HS_CPP_CAL_OFFSET(folly_small_vec_of_double, (FollySmallVector Double)) + +HS_CPP_PEEK(vec_of_uint64, (StdVector Word64), Word64) +HS_CPP_PEEK(folly_small_vec_of_double, (FollySmallVector Double), Double) ------------------------------------------------------------------------------- -type PeekMapFun a dk dv ck cv +type PeekMapFun a pk pv dk dv = MBA# Int -- ^ returned map size + -> MBA# (Ptr pk) -> MBA# (Ptr pv) -> MBA# (Ptr dk) -> MBA# (Ptr dv) - -> MBA# (Ptr ck) -> MBA# (Ptr cv) -> IO a peekCppMap - :: forall a dk dv ck cv k v. Ord k - => PeekMapFun a dk dv ck cv - -> PeekNFun dk k -> DeleteFun ck - -> PeekNFun dv v -> DeleteFun cv + :: forall a pk pv dk dv k v. Ord k + => PeekMapFun a pk pv dk dv + -> PeekNFun pk k -> DeleteFun dk + -> PeekNFun pv v -> DeleteFun dv -> IO (a, Map.Map k v) peekCppMap f peekKey delKey peekVal delVal = do (len, (keys_ptr, (values_ptr, (keys_vec, (values_vec, ret))))) <- @@ -194,10 +214,10 @@ peekCppMap f peekKey delKey peekVal delVal = do Z.withPrimUnsafe nullPtr $ \values_vec -> f (MBA# len) (MBA# keys) (MBA# values) (MBA# keys_vec) (MBA# values_vec) finally - (buildExtras ret len keys_ptr values_ptr) + (buildMap ret len keys_ptr values_ptr) (delKey keys_vec <> delVal values_vec) where - buildExtras ret len keys_ptr values_ptr = do + buildMap ret len keys_ptr values_ptr = do keys <- peekKey len keys_ptr values <- peekVal len values_ptr return (ret, Map.fromList $ zip keys values) @@ -231,14 +251,9 @@ HS_CPP_DELETE(delete_string, StdString) HS_CPP_DELETE(delete_vector_of_string, (StdVector StdString)) HS_CPP_DELETE(delete_vector_of_int, (StdVector CInt)) HS_CPP_DELETE(delete_vector_of_int64, (StdVector Int64)) +HS_CPP_DELETE(delete_vector_of_uint64, (StdVector Int64)) HS_CPP_DELETE(delete_std_vec_of_folly_small_vec_of_double, (StdVector (FollySmallVector Double))) -#define HS_CPP_CAL_OFFSET(CFUN, HSOBJ) \ - foreign import ccall unsafe "hs_cpp_lib.h CFUN" \ - CFUN :: Ptr HSOBJ -> Int -> IO (Ptr HSOBJ) - -HS_CPP_CAL_OFFSET(cal_offset_std_string, StdString) - bool2cbool :: Bool -> CBool bool2cbool True = 1 bool2cbool False = 0 diff --git a/common/base/cbits/hs_struct.cpp b/common/base/cbits/hs_struct.cpp index e24e1ef4a..0e33abc7b 100644 --- a/common/base/cbits/hs_struct.cpp +++ b/common/base/cbits/hs_struct.cpp @@ -16,13 +16,16 @@ std::string* copy_std_string(std::string&& str) { return current + offset; \ } -#define VECTOR_SIZE(NAME, VEC_TYPE) \ - HsInt get_size_##NAME(const VEC_TYPE* vec) { return vec->size(); } +#define GET_SIZE(NAME, VAL_TYPE) \ + HsInt get_size_##NAME(const VAL_TYPE* v) { return v->size(); } + +#define DEL_FUNCTION(NAME, TYPE) \ + void delete_##NAME(TYPE* p) { delete p; } #define PEEK_VECTOR(NAME, VEC_TYPE, VAL_TYPE) \ - void peek_##NAME(const VEC_TYPE* vec, HsInt len, VAL_TYPE* vals) { \ - assert(("peek_##NAME: size mismatch!", len == vec->size())); \ - for (int i = 0; i < len; i++) { \ + void peek_##NAME(const VEC_TYPE* vec, HsInt vals_len, VAL_TYPE* vals) { \ + assert(("peek_##NAME: size mismatch!", vals_len == vec->size())); \ + for (int i = 0; i < vals_len; i++) { \ (vals)[i] = (*vec)[i]; \ } \ } @@ -32,25 +35,24 @@ extern "C" { // Unfortunately, there is no generic in c -CAL_OFFSET(std_string, std::string); -CAL_OFFSET(vec_of_folly_small_vec_of_double, - folly::small_vector); +GET_SIZE(vec_of_double, std::vector); +GET_SIZE(vec_of_uint64, std::vector); +GET_SIZE(folly_small_vec_of_double, folly::small_vector); -VECTOR_SIZE(vec_of_double, std::vector); -VECTOR_SIZE(folly_small_vec_of_double, folly::small_vector); +// Ptr a -> Int -> Ptr a +CAL_OFFSET(std_string, std::string); +CAL_OFFSET(vec_of_uint64, std::vector); +CAL_OFFSET(folly_small_vec_of_double, folly::small_vector); +PEEK_VECTOR(vec_of_uint64, std::vector, uint64_t) PEEK_VECTOR(folly_small_vec_of_double, folly::small_vector, double); -// ---------------------------------------------------------------------------- - -#define DEL_FUNCTION(NAME, TYPE) \ - void delete_##NAME(TYPE* p) { delete p; } - DEL_FUNCTION(vector_of_int, std::vector); DEL_FUNCTION(string, std::string); DEL_FUNCTION(vector_of_string, std::vector); DEL_FUNCTION(vector_of_int64, std::vector); +DEL_FUNCTION(vector_of_uint64, std::vector); DEL_FUNCTION(std_vec_of_folly_small_vec_of_double, std::vector>); diff --git a/common/hstream/cbits/hs_struct.cpp b/common/hstream/cbits/hs_struct.cpp deleted file mode 100644 index e24e1ef4a..000000000 --- a/common/hstream/cbits/hs_struct.cpp +++ /dev/null @@ -1,59 +0,0 @@ -#include "hs_cpp_lib.h" - -// ---------------------------------------------------------------------------- -// StdString - -std::string* copy_std_string(std::string&& str) { - auto value = new std::string; - *value = str; - return value; -} - -// ---------------------------------------------------------------------------- - -#define CAL_OFFSET(NAME, VAL_TYPE) \ - VAL_TYPE* cal_offset_##NAME(VAL_TYPE* current, HsInt offset) { \ - return current + offset; \ - } - -#define VECTOR_SIZE(NAME, VEC_TYPE) \ - HsInt get_size_##NAME(const VEC_TYPE* vec) { return vec->size(); } - -#define PEEK_VECTOR(NAME, VEC_TYPE, VAL_TYPE) \ - void peek_##NAME(const VEC_TYPE* vec, HsInt len, VAL_TYPE* vals) { \ - assert(("peek_##NAME: size mismatch!", len == vec->size())); \ - for (int i = 0; i < len; i++) { \ - (vals)[i] = (*vec)[i]; \ - } \ - } - -extern "C" { -// ---------------------------------------------------------------------------- - -// Unfortunately, there is no generic in c - -CAL_OFFSET(std_string, std::string); -CAL_OFFSET(vec_of_folly_small_vec_of_double, - folly::small_vector); - -VECTOR_SIZE(vec_of_double, std::vector); -VECTOR_SIZE(folly_small_vec_of_double, folly::small_vector); - -PEEK_VECTOR(folly_small_vec_of_double, folly::small_vector, - double); - -// ---------------------------------------------------------------------------- - -#define DEL_FUNCTION(NAME, TYPE) \ - void delete_##NAME(TYPE* p) { delete p; } - -DEL_FUNCTION(vector_of_int, std::vector); -DEL_FUNCTION(string, std::string); -DEL_FUNCTION(vector_of_string, std::vector); -DEL_FUNCTION(vector_of_int64, std::vector); -DEL_FUNCTION(std_vec_of_folly_small_vec_of_double, - std::vector>); - -// ---------------------------------------------------------------------------- -// Extern End -} diff --git a/hstream-store/HStream/Store/Internal/Foreign.hs b/hstream-store/HStream/Store/Internal/Foreign.hs index 6d393cf8b..fee5bc138 100644 --- a/hstream-store/HStream/Store/Internal/Foreign.hs +++ b/hstream-store/HStream/Store/Internal/Foreign.hs @@ -7,11 +7,12 @@ module HStream.Store.Internal.Foreign where import Control.Concurrent (newEmptyMVar, takeMVar) -import Control.Exception (mask_, onException) +import Control.Exception (finally, mask_, onException) import Control.Monad.Primitive import Data.Primitive import Foreign.C import Foreign.ForeignPtr +import Foreign.Ptr import Foreign.StablePtr import GHC.Conc import GHC.Exts @@ -20,6 +21,7 @@ import Z.Data.CBytes (CBytes) import qualified Z.Foreign as Z import Z.Foreign (BA#, MBA#) +-- TODO: Use HStream.Foreign.BA# instead import HStream.Foreign hiding (BA#, MBA#) import qualified HStream.Logger as Log import qualified HStream.Store.Exception as E @@ -105,6 +107,62 @@ withAsyncPrimUnsafe3' a b c f g = mask_ $ do return e return (a_, b_, c_, e_) +-- Similar to HStream.Foreign.PeekMapFun +-- +-- TODO: Use HStream.Foreign.PeekMapFun instead +type MapFun a pk pv dk dv + = MBA# Int + -- ^ returned map size + -> MBA# (Ptr pk) -> MBA# (Ptr pv) + -- ^ pointer to peek + -> MBA# (Ptr dk) -> MBA# (Ptr dv) + -- ^ pointer to delete + -> IO a + +withAsyncPrimMapUnsafe + :: (Prim p) + => p + -> PeekNFun pk k -> DeleteFun dk + -> PeekNFun pv v -> DeleteFun dv + -> (StablePtr PrimMVar -> Int -> MBA# p -> MapFun a k v vk vv) + -> IO (a, p, [(k, v)]) +withAsyncPrimMapUnsafe p peekk delk peekv delv f = + withAsyncPrimMapUnsafe' p peekk delk peekv delv f pure + +withAsyncPrimMapUnsafe' + :: (Prim p) + => p + -> PeekNFun pk k -> DeleteFun dk + -> PeekNFun pv v -> DeleteFun dv + -> (StablePtr PrimMVar -> Int -> MBA# p -> MapFun a pk pv dk dv) + -> (a -> IO b) + -> IO (b, p, [(k, v)]) +withAsyncPrimMapUnsafe' p peekk delk peekv delv f g = mask_ $ do + mvar <- newEmptyMVar + sp <- newStablePtrPrimMVar mvar + (p_, (len_, (keys_, (values_, (keys_vec_, (values_vec_, b_)))))) <- + withPrimSafe' p $ \p' -> + withPrimSafe' (0 :: Int) $ \len -> + withPrimSafe' nullPtr $ \keys -> + withPrimSafe' nullPtr $ \values -> + withPrimSafe' nullPtr $ \keys_vec -> + withPrimSafe' nullPtr $ \values_vec -> do + (cap, _) <- threadCapability =<< myThreadId + b <- g =<< f sp cap p' len keys values keys_vec values_vec + takeMVar mvar `onException` forkIO (do takeMVar mvar + primitive_ (touch# p') + primitive_ (touch# len) + primitive_ (touch# keys) + primitive_ (touch# values) + primitive_ (touch# keys_vec) + primitive_ (touch# values_vec)) + return b + finally + (do ret_keys <- peekk len_ keys_ + ret_values <- peekv len_ values_ + return (b_, p_, zip ret_keys ret_values)) + (delk keys_vec_ <> delv values_vec_) -- delete a nullptr is OK + withAsync :: HasCallStack => Int -> (Ptr a -> IO a) -> (StablePtr PrimMVar -> Int -> Ptr a -> IO ErrorCode) diff --git a/hstream-store/HStream/Store/Internal/LogDevice/Checkpoint.hs b/hstream-store/HStream/Store/Internal/LogDevice/Checkpoint.hs index d5ebc7ae1..e8ca1de53 100644 --- a/hstream-store/HStream/Store/Internal/LogDevice/Checkpoint.hs +++ b/hstream-store/HStream/Store/Internal/LogDevice/Checkpoint.hs @@ -18,6 +18,7 @@ import Z.Data.CBytes (CBytes) import qualified Z.Foreign as Z import Z.Foreign (BA#, MBA#) +import qualified HStream.Foreign as F import qualified HStream.Store.Exception as E import qualified HStream.Store.Internal.Foreign as FFI import HStream.Store.Internal.Types @@ -66,6 +67,22 @@ ckpStoreGetLSN store customid logid = _ <- E.throwStreamErrorIfNotOK' errno return lsn +ckpStoreGetAllCheckpoints' :: LDCheckpointStore -> CBytes -> IO [(C_LogID, LSN)] +ckpStoreGetAllCheckpoints' store customid = + ZC.withCBytesUnsafe customid $ \customid' -> + withForeignPtr store $ \store' -> do + (_, errno, ret) <- FFI.withAsyncPrimMapUnsafe + C_OK + F.peekN F.c_delete_vector_of_uint64 + F.peekN F.c_delete_vector_of_uint64 + (checkpoint_store_get_all_checkpoints store' customid') + _ <- E.throwStreamErrorIfNotOK' errno + pure ret + +ckpStoreGetAllCheckpoints :: LDCheckpointStore -> CBytes -> IO (Map C_LogID LSN) +ckpStoreGetAllCheckpoints store customid = + Map.fromList <$> ckpStoreGetAllCheckpoints' store customid + ckpStoreUpdateLSN :: LDCheckpointStore -> CBytes -> C_LogID -> LSN -> IO () ckpStoreUpdateLSN = ckpStoreUpdateLSN' (-1) @@ -145,6 +162,16 @@ foreign import ccall unsafe "hs_logdevice.h checkpoint_store_get_lsn" -> MBA# LSN -- ^ value out: lsn -> IO () +foreign import ccall unsafe "hs_logdevice.h checkpoint_store_get_all_checkpoints" + checkpoint_store_get_all_checkpoints + :: Ptr LogDeviceCheckpointStore + -> BA# Word8 -- ^ customer_id + -> StablePtr PrimMVar -> Int + -> MBA# ErrorCode -- ^ value out: error code + -> MBA# Int -> MBA# (Ptr C_LogID) -> MBA# (Ptr LSN) + -> MBA# (Ptr (F.StdVector C_LogID)) -> MBA# (Ptr (F.StdVector LSN)) + -> IO () + foreign import ccall unsafe "hs_logdevice.h checkpoint_store_update_lsn" c_checkpoint_store_update_lsn :: Ptr LogDeviceCheckpointStore diff --git a/hstream-store/HStream/Store/Internal/LogDevice/LogAttributes.hs b/hstream-store/HStream/Store/Internal/LogDevice/LogAttributes.hs index 1af79fdc5..3b8a442f8 100644 --- a/hstream-store/HStream/Store/Internal/LogDevice/LogAttributes.hs +++ b/hstream-store/HStream/Store/Internal/LogDevice/LogAttributes.hs @@ -205,6 +205,7 @@ peekLogAttributes ptr = do #undef _ARG #undef _MAYBE_ARG +-- TODO: Simplify by using peekCppMap function in hstream-common-base peekLogAttributesExtras :: Ptr LogDeviceLogAttributes -> IO (Map CBytes CBytes) peekLogAttributesExtras attrs = do (len, (keys_ptr, (values_ptr, (keys_vec, (values_vec, _))))) <- diff --git a/hstream-store/HStream/Store/Stream.hs b/hstream-store/HStream/Store/Stream.hs index cb4b21d0d..d92ea3bd6 100644 --- a/hstream-store/HStream/Store/Stream.hs +++ b/hstream-store/HStream/Store/Stream.hs @@ -82,6 +82,8 @@ module HStream.Store.Stream , LD.newRSMBasedCheckpointStore , LD.newZookeeperBasedCheckpointStore , LD.ckpStoreGetLSN + , LD.ckpStoreGetAllCheckpoints + , LD.ckpStoreGetAllCheckpoints' , LD.ckpStoreUpdateLSN , LD.ckpStoreUpdateMultiLSN , LD.ckpStoreRemoveCheckpoints diff --git a/hstream-store/cbits/logdevice/hs_checkpoint.cpp b/hstream-store/cbits/logdevice/hs_checkpoint.cpp index fbc735a10..24b988f50 100644 --- a/hstream-store/cbits/logdevice/hs_checkpoint.cpp +++ b/hstream-store/cbits/logdevice/hs_checkpoint.cpp @@ -72,6 +72,37 @@ void checkpoint_store_get_lsn(logdevice_checkpoint_store_t* store, store->rep->getLSN(customer_id_, logid_t(logid), cb); } +void checkpoint_store_get_all_checkpoints( + logdevice_checkpoint_store_t* store, const char* customer_id, + HsStablePtr mvar, HsInt cap, facebook::logdevice::Status* st_out, + HsInt* len, c_logid_t** keys_ptr, c_lsn_t** values_ptr, + std::vector** keys_, std::vector** values_) { + auto cb = [st_out, cap, mvar, len, keys_ptr, values_ptr, keys_, + values_](facebook::logdevice::Status st, + std::map& log_lsn_map) { + if (st_out) { + *st_out = st; + } + if (st == ld::Status::OK) { + std::vector* keys = new std::vector; + std::vector* values = new std::vector; + + for (const auto& [key, value] : log_lsn_map) { + keys->push_back(key); + values->push_back(value); + } + + *len = keys->size(); + *keys_ptr = keys->data(); + *values_ptr = values->data(); + *keys_ = keys; + *values_ = values; + } + hs_try_putmvar(cap, mvar); + }; + store->rep->getAllCheckpoints(std::string(customer_id), std::move(cb)); +} + void checkpoint_store_update_lsn(logdevice_checkpoint_store_t* store, const char* customer_id, c_logid_t logid, c_lsn_t lsn, HsStablePtr mvar, HsInt cap, diff --git a/hstream-store/test/HStream/Store/CheckpointStoreSpec.hs b/hstream-store/test/HStream/Store/CheckpointStoreSpec.hs index c7a413cea..162494316 100644 --- a/hstream-store/test/HStream/Store/CheckpointStoreSpec.hs +++ b/hstream-store/test/HStream/Store/CheckpointStoreSpec.hs @@ -2,7 +2,7 @@ module HStream.Store.CheckpointStoreSpec (spec) where -import Control.Monad (void) +import Control.Monad (forM, void) import qualified Data.Map.Strict as Map import qualified Data.Vector.Primitive as VP import Test.Hspec @@ -35,6 +35,13 @@ storeSpec new_ckp_store = do S.ckpStoreUpdateLSN checkpointStore "customer1" logid 2 S.ckpStoreGetLSN checkpointStore "customer1" logid `shouldReturn` 2 + it "get all" $ do + expected <- forM [1..1000] $ \i -> do + S.ckpStoreUpdateLSN checkpointStore "customer_get_all" i i + pure (i, i) + allCkps <- S.ckpStoreGetAllCheckpoints checkpointStore "customer_get_all" + Map.toAscList allCkps `shouldBe` expected + it "update multi lsn" $ do S.ckpStoreUpdateLSN checkpointStore "customer1" 1 1 S.ckpStoreGetLSN checkpointStore "customer1" 1 `shouldReturn` 1