Skip to content

Commit

Permalink
hstream-store: CheckpointStore support updating the LSNs for many logs (
Browse files Browse the repository at this point in the history
  • Loading branch information
4eUeP authored Sep 22, 2023
1 parent 6cff5f3 commit ce4a91d
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 49 deletions.
130 changes: 82 additions & 48 deletions hstream-store/HStream/Store/Internal/LogDevice/Checkpoint.hs
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,6 @@ ckpStoreGetLSN store customid logid =
_ <- E.throwStreamErrorIfNotOK' errno
return lsn

ckpStoreGetLSNSync :: LDCheckpointStore -> CBytes -> C_LogID -> IO LSN
ckpStoreGetLSNSync store customid logid =
ZC.withCBytes customid $ \customid' ->
withForeignPtr store $ \store' -> do
(ret_lsn, _) <- Z.withPrimSafe LSN_INVALID $ \sn' ->
E.throwStreamErrorIfNotOK $ c_checkpoint_store_get_lsn_sync_safe store' customid' logid sn'
return ret_lsn

ckpStoreUpdateLSN :: LDCheckpointStore -> CBytes -> C_LogID -> LSN -> IO ()
ckpStoreUpdateLSN = ckpStoreUpdateLSN' (-1)

Expand All @@ -84,6 +76,25 @@ ckpStoreUpdateLSN' retries store customid logid sn =
let f = FFI.withAsyncPrimUnsafe (0 :: ErrorCode) $ c_checkpoint_store_update_lsn store' customid' logid sn
void $ FFI.retryWhileAgain f retries

ckpStoreUpdateMultiLSN
:: LDCheckpointStore -> CBytes -> Map C_LogID LSN -> IO ()
ckpStoreUpdateMultiLSN = ckpStoreUpdateMultiLSN' (-1)

ckpStoreUpdateMultiLSN'
:: Int -> LDCheckpointStore -> CBytes -> Map C_LogID LSN -> IO ()
ckpStoreUpdateMultiLSN' retries store customid sns =
ZC.withCBytesUnsafe customid $ \customid' ->
withForeignPtr store $ \store' -> do
let xs = Map.toList sns
ka = Z.primArrayFromList $ map fst xs
va = Z.primArrayFromList $ map snd xs
let f =
Z.withPrimArrayUnsafe ka $ \ks' len ->
Z.withPrimArrayUnsafe va $ \vs' _len ->
FFI.withAsyncPrimUnsafe (0 :: ErrorCode) $
checkpoint_store_update_multi_lsn store' customid' ks' vs' (fromIntegral len)
void $ FFI.retryWhileAgain f retries

ckpStoreRemoveCheckpoints
:: HasCallStack
=> LDCheckpointStore -> CBytes -> VP.Vector C_LogID -> IO ()
Expand All @@ -102,27 +113,6 @@ ckpStoreRemoveAllCheckpoints store customid =
checkpoint_store_remove_all_checkpoints store' customid'
void $ E.throwStreamErrorIfNotOK' errno

ckpStoreUpdateLSNSync :: LDCheckpointStore -> CBytes -> C_LogID -> LSN -> IO ()
ckpStoreUpdateLSNSync store customid logid sn =
ZC.withCBytes customid $ \customid' ->
withForeignPtr store $ \store' -> do
void $ E.throwStreamErrorIfNotOK $ c_checkpoint_store_update_lsn_sync_safe store' customid' logid sn

updateMultiSequenceNumSync
:: LDCheckpointStore
-> CBytes
-> Map C_LogID LSN
-> IO ()
updateMultiSequenceNumSync store customid sns =
ZC.withCBytes customid $ \customid' ->
withForeignPtr store $ \store' -> do
let xs = Map.toList sns
let ka = Z.primArrayFromList $ map fst xs
va = Z.primArrayFromList $ map snd xs
Z.withPrimArraySafe ka $ \ks' len ->
Z.withPrimArraySafe va $ \vs' _len -> void $ E.throwStreamErrorIfNotOK $
c_checkpoint_store_update_multi_lsn_sync_safe store' customid' ks' vs' (fromIntegral len)

foreign import ccall unsafe "hs_logdevice.h new_file_based_checkpoint_store"
c_new_file_based_checkpoint_store :: BA# Word8 -> IO (Ptr LogDeviceCheckpointStore)

Expand All @@ -145,40 +135,33 @@ foreign import ccall safe "hs_logdevice.h free_checkpoint_store"
foreign import ccall safe "hs_logdevice.h &free_checkpoint_store"
c_free_checkpoint_store_fun :: FunPtr (Ptr LogDeviceCheckpointStore -> IO ())

foreign import ccall safe "hs_logdevice.h checkpoint_store_get_lsn_sync"
c_checkpoint_store_get_lsn_sync_safe
:: Ptr LogDeviceCheckpointStore
-> Ptr Word8 -- ^ customer_id
-> C_LogID
-> Ptr LSN -- ^ value out
-> IO ErrorCode

foreign import ccall unsafe "hs_logdevice.h checkpoint_store_get_lsn"
c_checkpoint_store_get_lsn
:: Ptr LogDeviceCheckpointStore
-> BA# Word8 -- ^ customer_id
-> C_LogID
-> StablePtr PrimMVar -> Int
-> MBA# Word8 -- ^ ErrorCode
-> MBA# Word8 -- ^ value out
-> MBA# ErrorCode -- ^ value out: error code
-> MBA# LSN -- ^ value out: lsn
-> IO ()

foreign import ccall safe "hs_logdevice.h checkpoint_store_update_lsn_sync"
c_checkpoint_store_update_lsn_sync_safe
:: Ptr LogDeviceCheckpointStore
-> Ptr Word8 -- ^ customer_id
-> C_LogID
-> LSN
-> IO ErrorCode

foreign import ccall unsafe "hs_logdevice.h checkpoint_store_update_lsn"
c_checkpoint_store_update_lsn
:: Ptr LogDeviceCheckpointStore
-> BA# Word8 -- ^ customer_id
-> C_LogID
-> LSN
-> StablePtr PrimMVar -> Int
-> MBA# Word8
-> MBA# ErrorCode
-> IO ()

foreign import ccall unsafe "hs_logdevice.h checkpoint_store_update_multi_lsn"
checkpoint_store_update_multi_lsn
:: Ptr LogDeviceCheckpointStore
-> BA# Word8 -- ^ customer_id
-> BA# C_LogID -> BA# LSN -> Word -- ^ map of (logid, lsn)
-> StablePtr PrimMVar -> Int
-> MBA# ErrorCode
-> IO ()

foreign import ccall unsafe "hs_logdevice.h checkpoint_store_remove_checkpoints"
Expand All @@ -198,6 +181,57 @@ foreign import ccall unsafe "hs_logdevice.h checkpoint_store_remove_all_checkpoi
-> MBA# ErrorCode
-> IO ()

-------------------------------------------------------------------------------
-- DEPRECATED

{-# DEPRECATED ckpStoreGetLSNSync "Use ckpStoreGetLSN instead" #-}
ckpStoreGetLSNSync :: LDCheckpointStore -> CBytes -> C_LogID -> IO LSN
ckpStoreGetLSNSync store customid logid =
ZC.withCBytes customid $ \customid' ->
withForeignPtr store $ \store' -> do
(ret_lsn, _) <- Z.withPrimSafe LSN_INVALID $ \sn' ->
E.throwStreamErrorIfNotOK $ c_checkpoint_store_get_lsn_sync_safe store' customid' logid sn'
return ret_lsn

{-# DEPRECATED ckpStoreUpdateLSNSync "Use ckpStoreUpdateLSN instead" #-}
ckpStoreUpdateLSNSync :: LDCheckpointStore -> CBytes -> C_LogID -> LSN -> IO ()
ckpStoreUpdateLSNSync store customid logid sn =
ZC.withCBytes customid $ \customid' ->
withForeignPtr store $ \store' -> do
void $ E.throwStreamErrorIfNotOK $ c_checkpoint_store_update_lsn_sync_safe store' customid' logid sn

{-# DEPRECATED updateMultiSequenceNumSync "Use ckpStoreUpdateMultiLSN instead" #-}
updateMultiSequenceNumSync
:: LDCheckpointStore
-> CBytes
-> Map C_LogID LSN
-> IO ()
updateMultiSequenceNumSync store customid sns =
ZC.withCBytes customid $ \customid' ->
withForeignPtr store $ \store' -> do
let xs = Map.toList sns
let ka = Z.primArrayFromList $ map fst xs
va = Z.primArrayFromList $ map snd xs
Z.withPrimArraySafe ka $ \ks' len ->
Z.withPrimArraySafe va $ \vs' _len -> void $ E.throwStreamErrorIfNotOK $
c_checkpoint_store_update_multi_lsn_sync_safe store' customid' ks' vs' (fromIntegral len)

foreign import ccall safe "hs_logdevice.h checkpoint_store_get_lsn_sync"
c_checkpoint_store_get_lsn_sync_safe
:: Ptr LogDeviceCheckpointStore
-> Ptr Word8 -- ^ customer_id
-> C_LogID
-> Ptr LSN -- ^ value out
-> IO ErrorCode

foreign import ccall safe "hs_logdevice.h checkpoint_store_update_lsn_sync"
c_checkpoint_store_update_lsn_sync_safe
:: Ptr LogDeviceCheckpointStore
-> Ptr Word8 -- ^ customer_id
-> C_LogID
-> LSN
-> IO ErrorCode

foreign import ccall safe "hs_logdevice.h checkpoint_store_update_multi_lsn_sync"
c_checkpoint_store_update_multi_lsn_sync_safe
:: Ptr LogDeviceCheckpointStore
Expand Down
2 changes: 1 addition & 1 deletion hstream-store/HStream/Store/Stream.hs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE CPP #-}
{-# LANGUAGE MagicHash #-}
{-# LANGUAGE MultiWayIf #-}
{-# LANGUAGE PatternSynonyms #-}

Expand Down Expand Up @@ -80,6 +79,7 @@ module HStream.Store.Stream
, LD.newZookeeperBasedCheckpointStore
, LD.ckpStoreGetLSN
, LD.ckpStoreUpdateLSN
, LD.ckpStoreUpdateMultiLSN
, LD.ckpStoreRemoveCheckpoints
, LD.ckpStoreRemoveAllCheckpoints
--
Expand Down
19 changes: 19 additions & 0 deletions hstream-store/cbits/logdevice/hs_checkpoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,25 @@ void checkpoint_store_update_lsn(logdevice_checkpoint_store_t* store,
store->rep->updateLSN(customer_id_, logid_t(logid), lsn, cb);
}

void checkpoint_store_update_multi_lsn(logdevice_checkpoint_store_t* store,
const char* customer_id,
c_logid_t* logids, c_lsn_t* lsns,
size_t len, HsStablePtr mvar, HsInt cap,
facebook::logdevice::Status* st_out) {
std::string customer_id_ = std::string(customer_id);
std::map<logid_t, lsn_t> checkpoints;
for (int i = 0; i < len; ++i)
checkpoints[logid_t(logids[i])] = lsns[i];
auto cb = [st_out, cap, mvar](facebook::logdevice::Status st) {
if (st_out) {
*st_out = st;
}
hs_try_putmvar(cap, mvar);
};

return store->rep->updateLSN(customer_id_, checkpoints, cb);
}

void checkpoint_store_remove_checkpoints(logdevice_checkpoint_store_t* store,
const char* customer_id,
c_logid_t* logids, HsInt logid_offset,
Expand Down
10 changes: 10 additions & 0 deletions hstream-store/test/HStream/Store/CheckpointStoreSpec.hs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
module HStream.Store.CheckpointStoreSpec (spec) where

import Control.Monad (void)
import qualified Data.Map.Strict as Map
import qualified Data.Vector.Primitive as VP
import Test.Hspec

Expand Down Expand Up @@ -34,6 +35,15 @@ storeSpec new_ckp_store = do
S.ckpStoreUpdateLSN checkpointStore "customer1" logid 2
S.ckpStoreGetLSN checkpointStore "customer1" logid `shouldReturn` 2

it "update multi lsn" $ do
S.ckpStoreUpdateLSN checkpointStore "customer1" 1 1
S.ckpStoreGetLSN checkpointStore "customer1" 1 `shouldReturn` 1
S.ckpStoreUpdateMultiLSN checkpointStore "customer1" $
Map.fromList [(1, 2), (2, 3), (3, 5)]
S.ckpStoreGetLSN checkpointStore "customer1" 1 `shouldReturn` 2
S.ckpStoreGetLSN checkpointStore "customer1" 2 `shouldReturn` 3
S.ckpStoreGetLSN checkpointStore "customer1" 3 `shouldReturn` 5

it "remove checkpoints" $ do
S.ckpStoreGetLSN checkpointStore "customer2" logid `shouldThrow` S.isNOTFOUND
S.ckpStoreUpdateLSN checkpointStore "customer2" logid 2
Expand Down

0 comments on commit ce4a91d

Please sign in to comment.