diff --git a/hstream-store/HStream/Store/Internal/LogDevice/Checkpoint.hs b/hstream-store/HStream/Store/Internal/LogDevice/Checkpoint.hs index 1a6ae0199..d5ebc7ae1 100644 --- a/hstream-store/HStream/Store/Internal/LogDevice/Checkpoint.hs +++ b/hstream-store/HStream/Store/Internal/LogDevice/Checkpoint.hs @@ -66,14 +66,6 @@ ckpStoreGetLSN store customid logid = _ <- E.throwStreamErrorIfNotOK' errno return lsn -ckpStoreGetLSNSync :: LDCheckpointStore -> CBytes -> C_LogID -> IO LSN -ckpStoreGetLSNSync store customid logid = - ZC.withCBytes customid $ \customid' -> - withForeignPtr store $ \store' -> do - (ret_lsn, _) <- Z.withPrimSafe LSN_INVALID $ \sn' -> - E.throwStreamErrorIfNotOK $ c_checkpoint_store_get_lsn_sync_safe store' customid' logid sn' - return ret_lsn - ckpStoreUpdateLSN :: LDCheckpointStore -> CBytes -> C_LogID -> LSN -> IO () ckpStoreUpdateLSN = ckpStoreUpdateLSN' (-1) @@ -84,6 +76,25 @@ ckpStoreUpdateLSN' retries store customid logid sn = let f = FFI.withAsyncPrimUnsafe (0 :: ErrorCode) $ c_checkpoint_store_update_lsn store' customid' logid sn void $ FFI.retryWhileAgain f retries +ckpStoreUpdateMultiLSN + :: LDCheckpointStore -> CBytes -> Map C_LogID LSN -> IO () +ckpStoreUpdateMultiLSN = ckpStoreUpdateMultiLSN' (-1) + +ckpStoreUpdateMultiLSN' + :: Int -> LDCheckpointStore -> CBytes -> Map C_LogID LSN -> IO () +ckpStoreUpdateMultiLSN' retries store customid sns = + ZC.withCBytesUnsafe customid $ \customid' -> + withForeignPtr store $ \store' -> do + let xs = Map.toList sns + ka = Z.primArrayFromList $ map fst xs + va = Z.primArrayFromList $ map snd xs + let f = + Z.withPrimArrayUnsafe ka $ \ks' len -> + Z.withPrimArrayUnsafe va $ \vs' _len -> + FFI.withAsyncPrimUnsafe (0 :: ErrorCode) $ + checkpoint_store_update_multi_lsn store' customid' ks' vs' (fromIntegral len) + void $ FFI.retryWhileAgain f retries + ckpStoreRemoveCheckpoints :: HasCallStack => LDCheckpointStore -> CBytes -> VP.Vector C_LogID -> IO () @@ -102,27 +113,6 @@ ckpStoreRemoveAllCheckpoints store customid = checkpoint_store_remove_all_checkpoints store' customid' void $ E.throwStreamErrorIfNotOK' errno -ckpStoreUpdateLSNSync :: LDCheckpointStore -> CBytes -> C_LogID -> LSN -> IO () -ckpStoreUpdateLSNSync store customid logid sn = - ZC.withCBytes customid $ \customid' -> - withForeignPtr store $ \store' -> do - void $ E.throwStreamErrorIfNotOK $ c_checkpoint_store_update_lsn_sync_safe store' customid' logid sn - -updateMultiSequenceNumSync - :: LDCheckpointStore - -> CBytes - -> Map C_LogID LSN - -> IO () -updateMultiSequenceNumSync store customid sns = - ZC.withCBytes customid $ \customid' -> - withForeignPtr store $ \store' -> do - let xs = Map.toList sns - let ka = Z.primArrayFromList $ map fst xs - va = Z.primArrayFromList $ map snd xs - Z.withPrimArraySafe ka $ \ks' len -> - Z.withPrimArraySafe va $ \vs' _len -> void $ E.throwStreamErrorIfNotOK $ - c_checkpoint_store_update_multi_lsn_sync_safe store' customid' ks' vs' (fromIntegral len) - foreign import ccall unsafe "hs_logdevice.h new_file_based_checkpoint_store" c_new_file_based_checkpoint_store :: BA# Word8 -> IO (Ptr LogDeviceCheckpointStore) @@ -145,32 +135,16 @@ foreign import ccall safe "hs_logdevice.h free_checkpoint_store" foreign import ccall safe "hs_logdevice.h &free_checkpoint_store" c_free_checkpoint_store_fun :: FunPtr (Ptr LogDeviceCheckpointStore -> IO ()) -foreign import ccall safe "hs_logdevice.h checkpoint_store_get_lsn_sync" - c_checkpoint_store_get_lsn_sync_safe - :: Ptr LogDeviceCheckpointStore - -> Ptr Word8 -- ^ customer_id - -> C_LogID - -> Ptr LSN -- ^ value out - -> IO ErrorCode - foreign import ccall unsafe "hs_logdevice.h checkpoint_store_get_lsn" c_checkpoint_store_get_lsn :: Ptr LogDeviceCheckpointStore -> BA# Word8 -- ^ customer_id -> C_LogID -> StablePtr PrimMVar -> Int - -> MBA# Word8 -- ^ ErrorCode - -> MBA# Word8 -- ^ value out + -> MBA# ErrorCode -- ^ value out: error code + -> MBA# LSN -- ^ value out: lsn -> IO () -foreign import ccall safe "hs_logdevice.h checkpoint_store_update_lsn_sync" - c_checkpoint_store_update_lsn_sync_safe - :: Ptr LogDeviceCheckpointStore - -> Ptr Word8 -- ^ customer_id - -> C_LogID - -> LSN - -> IO ErrorCode - foreign import ccall unsafe "hs_logdevice.h checkpoint_store_update_lsn" c_checkpoint_store_update_lsn :: Ptr LogDeviceCheckpointStore @@ -178,7 +152,16 @@ foreign import ccall unsafe "hs_logdevice.h checkpoint_store_update_lsn" -> C_LogID -> LSN -> StablePtr PrimMVar -> Int - -> MBA# Word8 + -> MBA# ErrorCode + -> IO () + +foreign import ccall unsafe "hs_logdevice.h checkpoint_store_update_multi_lsn" + checkpoint_store_update_multi_lsn + :: Ptr LogDeviceCheckpointStore + -> BA# Word8 -- ^ customer_id + -> BA# C_LogID -> BA# LSN -> Word -- ^ map of (logid, lsn) + -> StablePtr PrimMVar -> Int + -> MBA# ErrorCode -> IO () foreign import ccall unsafe "hs_logdevice.h checkpoint_store_remove_checkpoints" @@ -198,6 +181,57 @@ foreign import ccall unsafe "hs_logdevice.h checkpoint_store_remove_all_checkpoi -> MBA# ErrorCode -> IO () +------------------------------------------------------------------------------- +-- DEPRECATED + +{-# DEPRECATED ckpStoreGetLSNSync "Use ckpStoreGetLSN instead" #-} +ckpStoreGetLSNSync :: LDCheckpointStore -> CBytes -> C_LogID -> IO LSN +ckpStoreGetLSNSync store customid logid = + ZC.withCBytes customid $ \customid' -> + withForeignPtr store $ \store' -> do + (ret_lsn, _) <- Z.withPrimSafe LSN_INVALID $ \sn' -> + E.throwStreamErrorIfNotOK $ c_checkpoint_store_get_lsn_sync_safe store' customid' logid sn' + return ret_lsn + +{-# DEPRECATED ckpStoreUpdateLSNSync "Use ckpStoreUpdateLSN instead" #-} +ckpStoreUpdateLSNSync :: LDCheckpointStore -> CBytes -> C_LogID -> LSN -> IO () +ckpStoreUpdateLSNSync store customid logid sn = + ZC.withCBytes customid $ \customid' -> + withForeignPtr store $ \store' -> do + void $ E.throwStreamErrorIfNotOK $ c_checkpoint_store_update_lsn_sync_safe store' customid' logid sn + +{-# DEPRECATED updateMultiSequenceNumSync "Use ckpStoreUpdateMultiLSN instead" #-} +updateMultiSequenceNumSync + :: LDCheckpointStore + -> CBytes + -> Map C_LogID LSN + -> IO () +updateMultiSequenceNumSync store customid sns = + ZC.withCBytes customid $ \customid' -> + withForeignPtr store $ \store' -> do + let xs = Map.toList sns + let ka = Z.primArrayFromList $ map fst xs + va = Z.primArrayFromList $ map snd xs + Z.withPrimArraySafe ka $ \ks' len -> + Z.withPrimArraySafe va $ \vs' _len -> void $ E.throwStreamErrorIfNotOK $ + c_checkpoint_store_update_multi_lsn_sync_safe store' customid' ks' vs' (fromIntegral len) + +foreign import ccall safe "hs_logdevice.h checkpoint_store_get_lsn_sync" + c_checkpoint_store_get_lsn_sync_safe + :: Ptr LogDeviceCheckpointStore + -> Ptr Word8 -- ^ customer_id + -> C_LogID + -> Ptr LSN -- ^ value out + -> IO ErrorCode + +foreign import ccall safe "hs_logdevice.h checkpoint_store_update_lsn_sync" + c_checkpoint_store_update_lsn_sync_safe + :: Ptr LogDeviceCheckpointStore + -> Ptr Word8 -- ^ customer_id + -> C_LogID + -> LSN + -> IO ErrorCode + foreign import ccall safe "hs_logdevice.h checkpoint_store_update_multi_lsn_sync" c_checkpoint_store_update_multi_lsn_sync_safe :: Ptr LogDeviceCheckpointStore diff --git a/hstream-store/HStream/Store/Stream.hs b/hstream-store/HStream/Store/Stream.hs index e7e09b739..9535522e2 100644 --- a/hstream-store/HStream/Store/Stream.hs +++ b/hstream-store/HStream/Store/Stream.hs @@ -1,6 +1,5 @@ {-# LANGUAGE BangPatterns #-} {-# LANGUAGE CPP #-} -{-# LANGUAGE MagicHash #-} {-# LANGUAGE MultiWayIf #-} {-# LANGUAGE PatternSynonyms #-} @@ -80,6 +79,7 @@ module HStream.Store.Stream , LD.newZookeeperBasedCheckpointStore , LD.ckpStoreGetLSN , LD.ckpStoreUpdateLSN + , LD.ckpStoreUpdateMultiLSN , LD.ckpStoreRemoveCheckpoints , LD.ckpStoreRemoveAllCheckpoints -- diff --git a/hstream-store/cbits/logdevice/hs_checkpoint.cpp b/hstream-store/cbits/logdevice/hs_checkpoint.cpp index 195d15e26..fbc735a10 100644 --- a/hstream-store/cbits/logdevice/hs_checkpoint.cpp +++ b/hstream-store/cbits/logdevice/hs_checkpoint.cpp @@ -86,6 +86,25 @@ void checkpoint_store_update_lsn(logdevice_checkpoint_store_t* store, store->rep->updateLSN(customer_id_, logid_t(logid), lsn, cb); } +void checkpoint_store_update_multi_lsn(logdevice_checkpoint_store_t* store, + const char* customer_id, + c_logid_t* logids, c_lsn_t* lsns, + size_t len, HsStablePtr mvar, HsInt cap, + facebook::logdevice::Status* st_out) { + std::string customer_id_ = std::string(customer_id); + std::map checkpoints; + for (int i = 0; i < len; ++i) + checkpoints[logid_t(logids[i])] = lsns[i]; + auto cb = [st_out, cap, mvar](facebook::logdevice::Status st) { + if (st_out) { + *st_out = st; + } + hs_try_putmvar(cap, mvar); + }; + + return store->rep->updateLSN(customer_id_, checkpoints, cb); +} + void checkpoint_store_remove_checkpoints(logdevice_checkpoint_store_t* store, const char* customer_id, c_logid_t* logids, HsInt logid_offset, diff --git a/hstream-store/test/HStream/Store/CheckpointStoreSpec.hs b/hstream-store/test/HStream/Store/CheckpointStoreSpec.hs index af50b6473..c7a413cea 100644 --- a/hstream-store/test/HStream/Store/CheckpointStoreSpec.hs +++ b/hstream-store/test/HStream/Store/CheckpointStoreSpec.hs @@ -3,6 +3,7 @@ module HStream.Store.CheckpointStoreSpec (spec) where import Control.Monad (void) +import qualified Data.Map.Strict as Map import qualified Data.Vector.Primitive as VP import Test.Hspec @@ -34,6 +35,15 @@ storeSpec new_ckp_store = do S.ckpStoreUpdateLSN checkpointStore "customer1" logid 2 S.ckpStoreGetLSN checkpointStore "customer1" logid `shouldReturn` 2 + it "update multi lsn" $ do + S.ckpStoreUpdateLSN checkpointStore "customer1" 1 1 + S.ckpStoreGetLSN checkpointStore "customer1" 1 `shouldReturn` 1 + S.ckpStoreUpdateMultiLSN checkpointStore "customer1" $ + Map.fromList [(1, 2), (2, 3), (3, 5)] + S.ckpStoreGetLSN checkpointStore "customer1" 1 `shouldReturn` 2 + S.ckpStoreGetLSN checkpointStore "customer1" 2 `shouldReturn` 3 + S.ckpStoreGetLSN checkpointStore "customer1" 3 `shouldReturn` 5 + it "remove checkpoints" $ do S.ckpStoreGetLSN checkpointStore "customer2" logid `shouldThrow` S.isNOTFOUND S.ckpStoreUpdateLSN checkpointStore "customer2" logid 2