Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

hstream-store: CheckpointStore support updating the LSNs for many logs #1618

Merged
merged 1 commit into from
Sep 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 82 additions & 48 deletions hstream-store/HStream/Store/Internal/LogDevice/Checkpoint.hs
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,6 @@ ckpStoreGetLSN store customid logid =
_ <- E.throwStreamErrorIfNotOK' errno
return lsn

ckpStoreGetLSNSync :: LDCheckpointStore -> CBytes -> C_LogID -> IO LSN
ckpStoreGetLSNSync store customid logid =
ZC.withCBytes customid $ \customid' ->
withForeignPtr store $ \store' -> do
(ret_lsn, _) <- Z.withPrimSafe LSN_INVALID $ \sn' ->
E.throwStreamErrorIfNotOK $ c_checkpoint_store_get_lsn_sync_safe store' customid' logid sn'
return ret_lsn

ckpStoreUpdateLSN :: LDCheckpointStore -> CBytes -> C_LogID -> LSN -> IO ()
ckpStoreUpdateLSN = ckpStoreUpdateLSN' (-1)

Expand All @@ -84,6 +76,25 @@ ckpStoreUpdateLSN' retries store customid logid sn =
let f = FFI.withAsyncPrimUnsafe (0 :: ErrorCode) $ c_checkpoint_store_update_lsn store' customid' logid sn
void $ FFI.retryWhileAgain f retries

ckpStoreUpdateMultiLSN
:: LDCheckpointStore -> CBytes -> Map C_LogID LSN -> IO ()
ckpStoreUpdateMultiLSN = ckpStoreUpdateMultiLSN' (-1)

ckpStoreUpdateMultiLSN'
:: Int -> LDCheckpointStore -> CBytes -> Map C_LogID LSN -> IO ()
ckpStoreUpdateMultiLSN' retries store customid sns =
ZC.withCBytesUnsafe customid $ \customid' ->
withForeignPtr store $ \store' -> do
let xs = Map.toList sns
ka = Z.primArrayFromList $ map fst xs
va = Z.primArrayFromList $ map snd xs
let f =
Z.withPrimArrayUnsafe ka $ \ks' len ->
Z.withPrimArrayUnsafe va $ \vs' _len ->
FFI.withAsyncPrimUnsafe (0 :: ErrorCode) $
checkpoint_store_update_multi_lsn store' customid' ks' vs' (fromIntegral len)
void $ FFI.retryWhileAgain f retries

ckpStoreRemoveCheckpoints
:: HasCallStack
=> LDCheckpointStore -> CBytes -> VP.Vector C_LogID -> IO ()
Expand All @@ -102,27 +113,6 @@ ckpStoreRemoveAllCheckpoints store customid =
checkpoint_store_remove_all_checkpoints store' customid'
void $ E.throwStreamErrorIfNotOK' errno

ckpStoreUpdateLSNSync :: LDCheckpointStore -> CBytes -> C_LogID -> LSN -> IO ()
ckpStoreUpdateLSNSync store customid logid sn =
ZC.withCBytes customid $ \customid' ->
withForeignPtr store $ \store' -> do
void $ E.throwStreamErrorIfNotOK $ c_checkpoint_store_update_lsn_sync_safe store' customid' logid sn

updateMultiSequenceNumSync
:: LDCheckpointStore
-> CBytes
-> Map C_LogID LSN
-> IO ()
updateMultiSequenceNumSync store customid sns =
ZC.withCBytes customid $ \customid' ->
withForeignPtr store $ \store' -> do
let xs = Map.toList sns
let ka = Z.primArrayFromList $ map fst xs
va = Z.primArrayFromList $ map snd xs
Z.withPrimArraySafe ka $ \ks' len ->
Z.withPrimArraySafe va $ \vs' _len -> void $ E.throwStreamErrorIfNotOK $
c_checkpoint_store_update_multi_lsn_sync_safe store' customid' ks' vs' (fromIntegral len)

foreign import ccall unsafe "hs_logdevice.h new_file_based_checkpoint_store"
c_new_file_based_checkpoint_store :: BA# Word8 -> IO (Ptr LogDeviceCheckpointStore)

Expand All @@ -145,40 +135,33 @@ foreign import ccall safe "hs_logdevice.h free_checkpoint_store"
foreign import ccall safe "hs_logdevice.h &free_checkpoint_store"
c_free_checkpoint_store_fun :: FunPtr (Ptr LogDeviceCheckpointStore -> IO ())

foreign import ccall safe "hs_logdevice.h checkpoint_store_get_lsn_sync"
c_checkpoint_store_get_lsn_sync_safe
:: Ptr LogDeviceCheckpointStore
-> Ptr Word8 -- ^ customer_id
-> C_LogID
-> Ptr LSN -- ^ value out
-> IO ErrorCode

foreign import ccall unsafe "hs_logdevice.h checkpoint_store_get_lsn"
c_checkpoint_store_get_lsn
:: Ptr LogDeviceCheckpointStore
-> BA# Word8 -- ^ customer_id
-> C_LogID
-> StablePtr PrimMVar -> Int
-> MBA# Word8 -- ^ ErrorCode
-> MBA# Word8 -- ^ value out
-> MBA# ErrorCode -- ^ value out: error code
-> MBA# LSN -- ^ value out: lsn
-> IO ()

foreign import ccall safe "hs_logdevice.h checkpoint_store_update_lsn_sync"
c_checkpoint_store_update_lsn_sync_safe
:: Ptr LogDeviceCheckpointStore
-> Ptr Word8 -- ^ customer_id
-> C_LogID
-> LSN
-> IO ErrorCode

foreign import ccall unsafe "hs_logdevice.h checkpoint_store_update_lsn"
c_checkpoint_store_update_lsn
:: Ptr LogDeviceCheckpointStore
-> BA# Word8 -- ^ customer_id
-> C_LogID
-> LSN
-> StablePtr PrimMVar -> Int
-> MBA# Word8
-> MBA# ErrorCode
-> IO ()

foreign import ccall unsafe "hs_logdevice.h checkpoint_store_update_multi_lsn"
checkpoint_store_update_multi_lsn
:: Ptr LogDeviceCheckpointStore
-> BA# Word8 -- ^ customer_id
-> BA# C_LogID -> BA# LSN -> Word -- ^ map of (logid, lsn)
-> StablePtr PrimMVar -> Int
-> MBA# ErrorCode
-> IO ()

foreign import ccall unsafe "hs_logdevice.h checkpoint_store_remove_checkpoints"
Expand All @@ -198,6 +181,57 @@ foreign import ccall unsafe "hs_logdevice.h checkpoint_store_remove_all_checkpoi
-> MBA# ErrorCode
-> IO ()

-------------------------------------------------------------------------------
-- DEPRECATED

{-# DEPRECATED ckpStoreGetLSNSync "Use ckpStoreGetLSN instead" #-}
ckpStoreGetLSNSync :: LDCheckpointStore -> CBytes -> C_LogID -> IO LSN
ckpStoreGetLSNSync store customid logid =
ZC.withCBytes customid $ \customid' ->
withForeignPtr store $ \store' -> do
(ret_lsn, _) <- Z.withPrimSafe LSN_INVALID $ \sn' ->
E.throwStreamErrorIfNotOK $ c_checkpoint_store_get_lsn_sync_safe store' customid' logid sn'
return ret_lsn

{-# DEPRECATED ckpStoreUpdateLSNSync "Use ckpStoreUpdateLSN instead" #-}
ckpStoreUpdateLSNSync :: LDCheckpointStore -> CBytes -> C_LogID -> LSN -> IO ()
ckpStoreUpdateLSNSync store customid logid sn =
ZC.withCBytes customid $ \customid' ->
withForeignPtr store $ \store' -> do
void $ E.throwStreamErrorIfNotOK $ c_checkpoint_store_update_lsn_sync_safe store' customid' logid sn

{-# DEPRECATED updateMultiSequenceNumSync "Use ckpStoreUpdateMultiLSN instead" #-}
updateMultiSequenceNumSync
:: LDCheckpointStore
-> CBytes
-> Map C_LogID LSN
-> IO ()
updateMultiSequenceNumSync store customid sns =
ZC.withCBytes customid $ \customid' ->
withForeignPtr store $ \store' -> do
let xs = Map.toList sns
let ka = Z.primArrayFromList $ map fst xs
va = Z.primArrayFromList $ map snd xs
Z.withPrimArraySafe ka $ \ks' len ->
Z.withPrimArraySafe va $ \vs' _len -> void $ E.throwStreamErrorIfNotOK $
c_checkpoint_store_update_multi_lsn_sync_safe store' customid' ks' vs' (fromIntegral len)

foreign import ccall safe "hs_logdevice.h checkpoint_store_get_lsn_sync"
c_checkpoint_store_get_lsn_sync_safe
:: Ptr LogDeviceCheckpointStore
-> Ptr Word8 -- ^ customer_id
-> C_LogID
-> Ptr LSN -- ^ value out
-> IO ErrorCode

foreign import ccall safe "hs_logdevice.h checkpoint_store_update_lsn_sync"
c_checkpoint_store_update_lsn_sync_safe
:: Ptr LogDeviceCheckpointStore
-> Ptr Word8 -- ^ customer_id
-> C_LogID
-> LSN
-> IO ErrorCode

foreign import ccall safe "hs_logdevice.h checkpoint_store_update_multi_lsn_sync"
c_checkpoint_store_update_multi_lsn_sync_safe
:: Ptr LogDeviceCheckpointStore
Expand Down
2 changes: 1 addition & 1 deletion hstream-store/HStream/Store/Stream.hs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE CPP #-}
{-# LANGUAGE MagicHash #-}
{-# LANGUAGE MultiWayIf #-}
{-# LANGUAGE PatternSynonyms #-}

Expand Down Expand Up @@ -80,6 +79,7 @@ module HStream.Store.Stream
, LD.newZookeeperBasedCheckpointStore
, LD.ckpStoreGetLSN
, LD.ckpStoreUpdateLSN
, LD.ckpStoreUpdateMultiLSN
, LD.ckpStoreRemoveCheckpoints
, LD.ckpStoreRemoveAllCheckpoints
--
Expand Down
19 changes: 19 additions & 0 deletions hstream-store/cbits/logdevice/hs_checkpoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,25 @@ void checkpoint_store_update_lsn(logdevice_checkpoint_store_t* store,
store->rep->updateLSN(customer_id_, logid_t(logid), lsn, cb);
}

void checkpoint_store_update_multi_lsn(logdevice_checkpoint_store_t* store,
const char* customer_id,
c_logid_t* logids, c_lsn_t* lsns,
size_t len, HsStablePtr mvar, HsInt cap,
facebook::logdevice::Status* st_out) {
std::string customer_id_ = std::string(customer_id);
std::map<logid_t, lsn_t> checkpoints;
for (int i = 0; i < len; ++i)
checkpoints[logid_t(logids[i])] = lsns[i];
auto cb = [st_out, cap, mvar](facebook::logdevice::Status st) {
if (st_out) {
*st_out = st;
}
hs_try_putmvar(cap, mvar);
};

return store->rep->updateLSN(customer_id_, checkpoints, cb);
}

void checkpoint_store_remove_checkpoints(logdevice_checkpoint_store_t* store,
const char* customer_id,
c_logid_t* logids, HsInt logid_offset,
Expand Down
10 changes: 10 additions & 0 deletions hstream-store/test/HStream/Store/CheckpointStoreSpec.hs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
module HStream.Store.CheckpointStoreSpec (spec) where

import Control.Monad (void)
import qualified Data.Map.Strict as Map
import qualified Data.Vector.Primitive as VP
import Test.Hspec

Expand Down Expand Up @@ -34,6 +35,15 @@ storeSpec new_ckp_store = do
S.ckpStoreUpdateLSN checkpointStore "customer1" logid 2
S.ckpStoreGetLSN checkpointStore "customer1" logid `shouldReturn` 2

it "update multi lsn" $ do
S.ckpStoreUpdateLSN checkpointStore "customer1" 1 1
S.ckpStoreGetLSN checkpointStore "customer1" 1 `shouldReturn` 1
S.ckpStoreUpdateMultiLSN checkpointStore "customer1" $
Map.fromList [(1, 2), (2, 3), (3, 5)]
S.ckpStoreGetLSN checkpointStore "customer1" 1 `shouldReturn` 2
S.ckpStoreGetLSN checkpointStore "customer1" 2 `shouldReturn` 3
S.ckpStoreGetLSN checkpointStore "customer1" 3 `shouldReturn` 5

it "remove checkpoints" $ do
S.ckpStoreGetLSN checkpointStore "customer2" logid `shouldThrow` S.isNOTFOUND
S.ckpStoreUpdateLSN checkpointStore "customer2" logid 2
Expand Down