From 209b4099040efa8ca275a2947add3413ce540aed Mon Sep 17 00:00:00 2001 From: mu <59917266+4eUeP@users.noreply.github.com> Date: Wed, 27 Sep 2023 10:56:32 +0800 Subject: [PATCH] more type safe --- common/base/HStream/Foreign.hs | 6 +- common/hstream/HStream/Common/Query.hs | 2 +- .../HStream/Store/Internal/Foreign.hs | 12 ++- .../HStream/Store/Internal/LogDevice.hs | 9 +-- .../Store/Internal/LogDevice/Checkpoint.hs | 24 +++--- .../Internal/LogDevice/LogConfigTypes.hs | 75 +++++++++---------- .../Store/Internal/LogDevice/Reader.hs | 20 ++--- .../LogDevice/VersionedConfigStore.hs | 10 +-- 8 files changed, 78 insertions(+), 80 deletions(-) diff --git a/common/base/HStream/Foreign.hs b/common/base/HStream/Foreign.hs index d15bcd9ce..7919eaaf2 100644 --- a/common/base/HStream/Foreign.hs +++ b/common/base/HStream/Foreign.hs @@ -41,7 +41,7 @@ module HStream.Foreign -- * Misc , c_delete_string , c_delete_vector_of_string - , c_delete_vector_of_int + , c_delete_vector_of_cint , c_delete_vector_of_int64 , c_delete_vector_of_uint64 , c_delete_std_vec_of_folly_small_vec_of_double @@ -249,9 +249,9 @@ withPrimListPairUnsafe pairs f = do HS_CPP_DELETE(delete_string, StdString) HS_CPP_DELETE(delete_vector_of_string, (StdVector StdString)) -HS_CPP_DELETE(delete_vector_of_int, (StdVector CInt)) +HS_CPP_DELETE(delete_vector_of_cint, (StdVector CInt)) HS_CPP_DELETE(delete_vector_of_int64, (StdVector Int64)) -HS_CPP_DELETE(delete_vector_of_uint64, (StdVector Int64)) +HS_CPP_DELETE(delete_vector_of_uint64, (StdVector Word64)) HS_CPP_DELETE(delete_std_vec_of_folly_small_vec_of_double, (StdVector (FollySmallVector Double))) bool2cbool :: Bool -> CBool diff --git a/common/hstream/HStream/Common/Query.hs b/common/hstream/HStream/Common/Query.hs index 4f41d6115..c4a0389f9 100644 --- a/common/hstream/HStream/Common/Query.hs +++ b/common/hstream/HStream/Common/Query.hs @@ -229,7 +229,7 @@ queryResultMetadata results idx = withForeignPtr results $ \results_ptr -> do (MBA# reason_val') (MBA# reason_del') failures <- if len >= 1 then do keys <- finally (peekN (fromIntegral len) key_val) - (c_delete_vector_of_int key_del) + (c_delete_vector_of_cint key_del) addrs <- finally (peekStdStringToCBytesN (fromIntegral len) addr_val) (c_delete_vector_of_string addr_del) reasons <- finally (peekStdStringToCBytesN (fromIntegral len) reason_val) diff --git a/hstream-store/HStream/Store/Internal/Foreign.hs b/hstream-store/HStream/Store/Internal/Foreign.hs index fee5bc138..e81113c24 100644 --- a/hstream-store/HStream/Store/Internal/Foreign.hs +++ b/hstream-store/HStream/Store/Internal/Foreign.hs @@ -19,10 +19,8 @@ import GHC.Exts import GHC.Stack import Z.Data.CBytes (CBytes) import qualified Z.Foreign as Z -import Z.Foreign (BA#, MBA#) --- TODO: Use HStream.Foreign.BA# instead -import HStream.Foreign hiding (BA#, MBA#) +import HStream.Foreign import qualified HStream.Logger as Log import qualified HStream.Store.Exception as E import HStream.Store.Internal.Types @@ -32,9 +30,9 @@ cbool2bool = (/= 0) {-# INLINE cbool2bool #-} unsafeFreezeBA# :: MBA# a -> BA# a -unsafeFreezeBA# mba# = +unsafeFreezeBA# (MBA# mba#) = case unsafeFreezeByteArray# mba# realWorld# of - (# _, ba# #) -> ba# + (# _, ba# #) -> BA# ba# -- Actually, these unsafe functions can be used for both unsafe & safe ffi(?). withAsyncPrimUnsafe @@ -124,7 +122,7 @@ withAsyncPrimMapUnsafe => p -> PeekNFun pk k -> DeleteFun dk -> PeekNFun pv v -> DeleteFun dv - -> (StablePtr PrimMVar -> Int -> MBA# p -> MapFun a k v vk vv) + -> (StablePtr PrimMVar -> Int -> MBA# p -> MapFun a pk pv dk dv) -> IO (a, p, [(k, v)]) withAsyncPrimMapUnsafe p peekk delk peekv delv f = withAsyncPrimMapUnsafe' p peekk delk peekv delv f pure @@ -211,7 +209,7 @@ withPrimSafe' :: forall a b. Prim a => a -> (MBA# a -> IO b) -> IO (a, b) withPrimSafe' v f = do mpa@(MutablePrimArray mba#) <- newAlignedPinnedPrimArray 1 writePrimArray mpa 0 v - !b <- f mba# + !b <- f (MBA# mba#) !a <- readPrimArray mpa 0 return (a, b) {-# INLINE withPrimSafe' #-} diff --git a/hstream-store/HStream/Store/Internal/LogDevice.hs b/hstream-store/HStream/Store/Internal/LogDevice.hs index a7c642132..bc2f0217d 100644 --- a/hstream-store/HStream/Store/Internal/LogDevice.hs +++ b/hstream-store/HStream/Store/Internal/LogDevice.hs @@ -36,12 +36,10 @@ import qualified Z.Data.CBytes as CBytes import Z.Data.CBytes (CBytes) import Z.Data.Vector (Bytes) import qualified Z.Foreign as Z -import Z.Foreign (BA#, - MBA#) +import HStream.Foreign (BA# (..), + MBA# (..)) import qualified HStream.Store.Exception as E -import HStream.Store.Internal.Types - import HStream.Store.Internal.Foreign import HStream.Store.Internal.LogDevice.Checkpoint import HStream.Store.Internal.LogDevice.Configuration @@ -51,6 +49,7 @@ import HStream.Store.Internal.LogDevice.LogConfigTypes import HStream.Store.Internal.LogDevice.Reader import HStream.Store.Internal.LogDevice.VersionedConfigStore import HStream.Store.Internal.LogDevice.Writer +import HStream.Store.Internal.Types ------------------------------------------------------------------------------- -- Client @@ -260,7 +259,7 @@ findKey client logid key accuracy = withForeignPtr client $ \client' -> CBytes.withCBytesUnsafe key $ \key' -> do (errno, lo_lsn, hi_lsn, _) <- withAsyncPrimUnsafe3' (0 :: ErrorCode) LSN_INVALID LSN_INVALID - (ld_client_find_key client' logid key' $ unFindKeyAccuracy accuracy) E.throwSubmitIfNotOK + (ld_client_find_key client' logid (BA# key') $ unFindKeyAccuracy accuracy) E.throwSubmitIfNotOK void $ E.throwStreamErrorIfNotOK' errno return (lo_lsn, hi_lsn) diff --git a/hstream-store/HStream/Store/Internal/LogDevice/Checkpoint.hs b/hstream-store/HStream/Store/Internal/LogDevice/Checkpoint.hs index e8ca1de53..ab579d258 100644 --- a/hstream-store/HStream/Store/Internal/LogDevice/Checkpoint.hs +++ b/hstream-store/HStream/Store/Internal/LogDevice/Checkpoint.hs @@ -16,9 +16,8 @@ import GHC.Stack (HasCallStack) import qualified Z.Data.CBytes as ZC import Z.Data.CBytes (CBytes) import qualified Z.Foreign as Z -import Z.Foreign (BA#, MBA#) -import qualified HStream.Foreign as F +import HStream.Foreign import qualified HStream.Store.Exception as E import qualified HStream.Store.Internal.Foreign as FFI import HStream.Store.Internal.Types @@ -34,7 +33,7 @@ import HStream.Store.Internal.Types newFileBasedCheckpointStore :: CBytes -> IO LDCheckpointStore newFileBasedCheckpointStore root_path = ZC.withCBytesUnsafe root_path $ \path' -> do - i <- c_new_file_based_checkpoint_store path' + i <- c_new_file_based_checkpoint_store (BA# path') newForeignPtr c_free_checkpoint_store_fun i newRSMBasedCheckpointStore @@ -63,7 +62,7 @@ ckpStoreGetLSN store customid logid = ZC.withCBytesUnsafe customid $ \customid' -> withForeignPtr store $ \store' -> do (errno, lsn, _) <- FFI.withAsyncPrimUnsafe2 (0 :: ErrorCode) LSN_INVALID $ - c_checkpoint_store_get_lsn store' customid' logid + c_checkpoint_store_get_lsn store' (BA# customid') logid _ <- E.throwStreamErrorIfNotOK' errno return lsn @@ -73,9 +72,9 @@ ckpStoreGetAllCheckpoints' store customid = withForeignPtr store $ \store' -> do (_, errno, ret) <- FFI.withAsyncPrimMapUnsafe C_OK - F.peekN F.c_delete_vector_of_uint64 - F.peekN F.c_delete_vector_of_uint64 - (checkpoint_store_get_all_checkpoints store' customid') + peekN c_delete_vector_of_uint64 + peekN c_delete_vector_of_uint64 + (checkpoint_store_get_all_checkpoints store' (BA# customid')) _ <- E.throwStreamErrorIfNotOK' errno pure ret @@ -90,7 +89,7 @@ ckpStoreUpdateLSN' :: Int -> LDCheckpointStore -> CBytes -> C_LogID -> LSN -> IO ckpStoreUpdateLSN' retries store customid logid sn = ZC.withCBytesUnsafe customid $ \customid' -> withForeignPtr store $ \store' -> do - let f = FFI.withAsyncPrimUnsafe (0 :: ErrorCode) $ c_checkpoint_store_update_lsn store' customid' logid sn + let f = FFI.withAsyncPrimUnsafe (0 :: ErrorCode) $ c_checkpoint_store_update_lsn store' (BA# customid') logid sn void $ FFI.retryWhileAgain f retries ckpStoreUpdateMultiLSN @@ -109,7 +108,8 @@ ckpStoreUpdateMultiLSN' retries store customid sns = Z.withPrimArrayUnsafe ka $ \ks' len -> Z.withPrimArrayUnsafe va $ \vs' _len -> FFI.withAsyncPrimUnsafe (0 :: ErrorCode) $ - checkpoint_store_update_multi_lsn store' customid' ks' vs' (fromIntegral len) + checkpoint_store_update_multi_lsn store' + (BA# customid') (BA# ks') (BA# vs') (fromIntegral len) void $ FFI.retryWhileAgain f retries ckpStoreRemoveCheckpoints @@ -119,7 +119,7 @@ ckpStoreRemoveCheckpoints store customid (VP.Vector offset len (Z.ByteArray ba#) ZC.withCBytesUnsafe customid $ \customid' -> withForeignPtr store $ \store' -> do (errno, _) <- FFI.withAsyncPrimUnsafe (0 :: ErrorCode) $ - checkpoint_store_remove_checkpoints store' customid' ba# offset len + checkpoint_store_remove_checkpoints store' (BA# customid') (BA# ba#) offset len void $ E.throwStreamErrorIfNotOK' errno ckpStoreRemoveAllCheckpoints :: HasCallStack => LDCheckpointStore -> CBytes -> IO () @@ -127,7 +127,7 @@ ckpStoreRemoveAllCheckpoints store customid = ZC.withCBytesUnsafe customid $ \customid' -> withForeignPtr store $ \store' -> do (errno, _) <- FFI.withAsyncPrimUnsafe (0 :: ErrorCode) $ - checkpoint_store_remove_all_checkpoints store' customid' + checkpoint_store_remove_all_checkpoints store' (BA# customid') void $ E.throwStreamErrorIfNotOK' errno foreign import ccall unsafe "hs_logdevice.h new_file_based_checkpoint_store" @@ -169,7 +169,7 @@ foreign import ccall unsafe "hs_logdevice.h checkpoint_store_get_all_checkpoints -> StablePtr PrimMVar -> Int -> MBA# ErrorCode -- ^ value out: error code -> MBA# Int -> MBA# (Ptr C_LogID) -> MBA# (Ptr LSN) - -> MBA# (Ptr (F.StdVector C_LogID)) -> MBA# (Ptr (F.StdVector LSN)) + -> MBA# (Ptr (StdVector C_LogID)) -> MBA# (Ptr (StdVector LSN)) -> IO () foreign import ccall unsafe "hs_logdevice.h checkpoint_store_update_lsn" diff --git a/hstream-store/HStream/Store/Internal/LogDevice/LogConfigTypes.hs b/hstream-store/HStream/Store/Internal/LogDevice/LogConfigTypes.hs index 035ad1e5f..12080774f 100644 --- a/hstream-store/HStream/Store/Internal/LogDevice/LogConfigTypes.hs +++ b/hstream-store/HStream/Store/Internal/LogDevice/LogConfigTypes.hs @@ -29,11 +29,8 @@ import GHC.Stack (HasCallStack, import qualified Z.Data.CBytes as CBytes import Z.Data.CBytes (CBytes) import qualified Z.Foreign as Z -import Z.Foreign (BA#, BAArray#, - MBA#) -import HStream.Foreign hiding (BA#, - BAArray#, MBA#) +import HStream.Foreign import qualified HStream.Store.Exception as E import HStream.Store.Internal.Foreign import HStream.Store.Internal.LogDevice.LogAttributes @@ -45,8 +42,8 @@ import HStream.Store.Internal.Types getLogAttrsExtra :: LDLogAttrs -> CBytes -> IO (Maybe CBytes) getLogAttrsExtra attrs key = withForeignPtr attrs $ \attrs' -> CBytes.withCBytesUnsafe key $ \key' -> do - et <- c_exist_log_attrs_extras attrs' key' - if et then Just . CBytes.fromBytes <$> Z.fromStdString (c_get_log_attrs_extra attrs' key') + et <- c_exist_log_attrs_extras attrs' (BA# key') + if et then Just . CBytes.fromBytes <$> Z.fromStdString (c_get_log_attrs_extra attrs' (BA# key')) else return Nothing updateLogAttrsExtrasPtr @@ -59,7 +56,7 @@ updateLogAttrsExtrasPtr attrs' logExtraAttrs = do vs = map (CBytes.rawPrimArray . snd) extras Z.withPrimArrayListUnsafe ks $ \ks' l -> Z.withPrimArrayListUnsafe vs $ \vs' _ -> do - i <- c_update_log_attrs_extras attrs' l ks' vs' + i <- c_update_log_attrs_extras attrs' l (BAArray# ks') (BAArray# vs') newForeignPtr c_free_log_attributes_fun i getAttrsExtrasFromPtr :: Ptr LogDeviceLogAttributes -> IO (Map.Map CBytes CBytes) @@ -70,7 +67,8 @@ getAttrsExtrasFromPtr attrs = do Z.withPrimUnsafe nullPtr $ \values -> Z.withPrimUnsafe nullPtr $ \keys_vec -> Z.withPrimUnsafe nullPtr $ \values_vec -> - c_get_attribute_extras attrs len keys values keys_vec values_vec + c_get_attribute_extras attrs (MBA# len) (MBA# keys) (MBA# values) + (MBA# keys_vec) (MBA# values_vec) finally (buildExtras (fromIntegral len) keys_ptr values_ptr) (delete_vector_of_string keys_vec <> delete_vector_of_string values_vec) @@ -86,10 +84,10 @@ getAttrsReplicationFactorFromPtr attrs = fromIntegral <$> c_get_replication_fact foreign import ccall unsafe "hs_logdevice.h get_attribute_extras" c_get_attribute_extras :: Ptr LogDeviceLogAttributes -> MBA# CSize - -> MBA# (Ptr Z.StdString) - -> MBA# (Ptr Z.StdString) - -> MBA# (Ptr (StdVector Z.StdString)) - -> MBA# (Ptr (StdVector Z.StdString)) + -> MBA# (Ptr StdString) + -> MBA# (Ptr StdString) + -> MBA# (Ptr (StdVector StdString)) + -> MBA# (Ptr (StdVector StdString)) -> IO () foreign import ccall unsafe "hs_logdevice.h get_replication_factor" @@ -117,7 +115,7 @@ foreign import ccall unsafe "hs_logdevice.h get_log_attrs_extra" c_get_log_attrs_extra :: Ptr LogDeviceLogAttributes -> BA# Word8 - -> IO (Ptr Z.StdString) + -> IO (Ptr StdString) foreign import ccall unsafe "hs_logdevice.h free_log_attributes" c_free_log_attributes :: Ptr LogDeviceLogAttributes -> IO () @@ -217,7 +215,7 @@ getLogDirectory :: HasCallStack => LDClient -> CBytes -> IO LDDirectory getLogDirectory client path = CBytes.withCBytesUnsafe path $ \path' -> withForeignPtr client $ \client' -> do - let cfun = c_ld_client_get_directory client' path' + let cfun = c_ld_client_get_directory client' (BA# path') (errno, dir, _) <- withAsyncPrimUnsafe2' (0 :: ErrorCode) nullPtr cfun (E.throwSubmitIfNotOK . fromIntegral) _ <- E.throwStreamErrorIfNotOK' errno @@ -236,7 +234,8 @@ logDirectoryGetLogsName recursive dir = withForeignPtr dir $ \dir' -> do (len, (names_ptr, stdvec_ptr)) <- Z.withPrimUnsafe 0 $ \len' -> Z.withPrimUnsafe nullPtr $ \names' -> - fst <$> Z.withPrimUnsafe nullPtr (c_ld_logdirectory_get_logs_name dir' recursive len' names') + fst <$> Z.withPrimUnsafe nullPtr (\p -> + c_ld_logdirectory_get_logs_name dir' recursive (MBA# len') (MBA# names') (MBA# p)) finally (peekStdStringToCBytesN len names_ptr) (delete_vector_of_string stdvec_ptr) logDirectoryGetVersion :: LDDirectory -> IO C_LogsConfigVersion @@ -253,7 +252,7 @@ makeLogDirectory client path attrs mkParent = do withForeignPtr client $ \client' -> withForeignPtr logAttrs $ \attrs' -> CBytes.withCBytesUnsafe path $ \path' -> do - let cfun = c_ld_client_make_directory client' path' mkParent attrs' + let cfun = c_ld_client_make_directory client' (BA# path') mkParent attrs' MakeDirectoryCbData errno directory _ <- withAsync makeDirectoryCbDataSize peekMakeDirectoryCbData cfun void $ E.throwStreamErrorIfNotOK' errno @@ -265,7 +264,7 @@ removeLogDirectory client path recursive = withForeignPtr client $ \client' -> do let size = logsConfigStatusCbDataSize peek_data = peekLogsConfigStatusCbData - cfun = c_ld_client_remove_directory client' path' recursive + cfun = c_ld_client_remove_directory client' (BA# path') recursive LogsConfigStatusCbData errno version _ <- withAsync size peek_data cfun void $ E.throwStreamErrorIfNotOK' errno return version @@ -275,7 +274,7 @@ logDirChildrenNames dir = withForeignPtr dir $ \dir' -> do (len, (raw_ptr, names_ptr)) <- Z.withPrimUnsafe @Int 0 $ \len' -> Z.withPrimUnsafe nullPtr $ \names' -> - c_ld_logdir_children_keys dir' len' names' + c_ld_logdir_children_keys dir' (MBA# len') (MBA# names') finally (peekStdStringToCBytesN len names_ptr) (delete_vector_of_string raw_ptr) logDirLogsNames :: LDDirectory -> IO [CBytes] @@ -283,20 +282,20 @@ logDirLogsNames dir = withForeignPtr dir $ \dir' -> do (len, (raw_ptr, names_ptr)) <- Z.withPrimUnsafe @Int 0 $ \len' -> Z.withPrimUnsafe nullPtr $ \names' -> - c_ld_logdir_logs_keys dir' len' names' + c_ld_logdir_logs_keys dir' (MBA# len') (MBA# names') finally (peekStdStringToCBytesN len names_ptr) (delete_vector_of_string raw_ptr) logDirChildFullName :: LDDirectory -> CBytes -> IO CBytes logDirChildFullName dir name = withForeignPtr dir $ \dir' -> CBytes.withCBytesUnsafe name $ \name' -> - CBytes.fromCString =<< c_ld_logdir_child_full_name dir' name' + CBytes.fromCString =<< c_ld_logdir_child_full_name dir' (BA# name') logDirLogFullName :: LDDirectory -> CBytes -> IO CBytes logDirLogFullName dir name = withForeignPtr dir $ \dir' -> CBytes.withCBytesUnsafe name $ \name' -> - CBytes.fromCString =<< c_ld_logdir_log_full_name dir' name' + CBytes.fromCString =<< c_ld_logdir_log_full_name dir' (BA# name') -- Note that this pointer is only valid if LogDirectory is valid. logDirectoryGetAttrsPtr :: LDDirectory -> IO (Ptr LogDeviceLogAttributes) @@ -321,14 +320,14 @@ foreign import ccall unsafe "hs_logdevice.h ld_logdir_log_full_name" foreign import ccall unsafe "hs_logdevice.h ld_logdir_children_keys" c_ld_logdir_children_keys :: Ptr LogDeviceLogDirectory - -> MBA# CSize -> MBA# (Ptr (StdVector Z.StdString)) - -> IO (Ptr Z.StdString) + -> MBA# Int -> MBA# (Ptr (StdVector StdString)) + -> IO (Ptr StdString) foreign import ccall unsafe "hs_logdevice.h ld_logdir_logs_keys" c_ld_logdir_logs_keys :: Ptr LogDeviceLogDirectory - -> MBA# CSize -> MBA# (Ptr (StdVector Z.StdString)) - -> IO (Ptr Z.StdString) + -> MBA# Int -> MBA# (Ptr (StdVector StdString)) + -> IO (Ptr StdString) foreign import ccall unsafe "hs_logdevice.h ld_client_make_directory_sync" c_ld_client_make_directory_sync @@ -388,8 +387,8 @@ foreign import ccall unsafe "hs_logdevice.h ld_logdirectory_get_logs_name" c_ld_logdirectory_get_logs_name :: Ptr LogDeviceLogDirectory -> Bool -- ^ recursive - -> MBA# CSize -> MBA# (Ptr Z.StdString) - -> MBA# (Ptr (StdVector Z.StdString)) + -> MBA# CSize -> MBA# (Ptr StdString) + -> MBA# (Ptr (StdVector StdString)) -> IO () ------------------------------------------------------------------------------- @@ -418,7 +417,7 @@ makeLogGroupSync client path start end attrs mkParent = do CBytes.withCBytesUnsafe path $ \path' -> do (group', _) <- Z.withPrimUnsafe nullPtr $ \group'' -> void $ E.throwStreamErrorIfNotOK $ - c_ld_client_make_loggroup_sync client' path' start end attrs' mkParent group'' + c_ld_client_make_loggroup_sync client' (BA# path') start end attrs' mkParent (MBA# group'') newForeignPtr c_free_logdevice_loggroup_fun group' makeLogGroup @@ -434,7 +433,7 @@ makeLogGroup client path start end attrs mkParent = do withForeignPtr client $ \client' -> withForeignPtr logAttrs $ \attrs' -> CBytes.withCBytesUnsafe path $ \path' -> do - let cfun = c_ld_client_make_loggroup client' path' start end attrs' mkParent + let cfun = c_ld_client_make_loggroup client' (BA# path') start end attrs' mkParent MakeLogGroupCbData errno group _ <- withAsync makeLogGroupCbDataSize peekMakeLogGroupCbData cfun void $ E.throwStreamErrorIfNotOK' errno @@ -445,7 +444,7 @@ getLogGroup :: HasCallStack => LDClient -> CBytes -> IO LDLogGroup getLogGroup client path = withForeignPtr client $ \client' -> CBytes.withCBytesUnsafe path $ \path' -> do - let cfun = c_ld_client_get_loggroup client' path' + let cfun = c_ld_client_get_loggroup client' (BA# path') (errno, group_ptr, _) <- withAsyncPrimUnsafe2 (0 :: ErrorCode) nullPtr cfun void $ E.throwStreamErrorIfNotOK' errno newForeignPtr c_free_logdevice_loggroup_fun group_ptr @@ -496,7 +495,7 @@ renameLogGroup client from_path to_path = withForeignPtr client $ \client' -> do let size = logsConfigStatusCbDataSize peek_data = peekLogsConfigStatusCbData - cfun = c_ld_client_rename client' from_path_ to_path_ + cfun = c_ld_client_rename client' (BA# from_path_) (BA# to_path_) LogsConfigStatusCbData errno version _ <- withAsync size peek_data cfun void $ E.throwStreamErrorIfNotOK' errno return version @@ -521,7 +520,7 @@ removeLogGroup client path = withForeignPtr client $ \client' -> do let size = logsConfigStatusCbDataSize peek_data = peekLogsConfigStatusCbData - cfun = c_ld_client_remove_loggroup client' path_ + cfun = c_ld_client_remove_loggroup client' (BA# path_) LogsConfigStatusCbData errno version _ <- withAsync size peek_data cfun void $ E.throwStreamErrorIfNotOK' errno return version @@ -532,7 +531,7 @@ logGroupGetRange group = (start_ret, (end_ret, _)) <- Z.withPrimUnsafe C_LOGID_MIN_INVALID $ \start' -> Z.withPrimUnsafe C_LOGID_MIN_INVALID $ \end' -> - c_ld_loggroup_get_range group' start' end' + c_ld_loggroup_get_range group' (MBA# start') (MBA# end') return (start_ret, end_ret) {-# INLINE logGroupGetRange #-} @@ -560,8 +559,8 @@ logGroupGetExtraAttr :: LDLogGroup -> CBytes -> IO (Maybe CBytes) logGroupGetExtraAttr group key = withForeignPtr group $ \group' -> CBytes.withCBytesUnsafe key $ \key' -> do attrs' <- c_ld_loggroup_get_attrs group' - et <- c_exist_log_attrs_extras attrs' key' - if et then Just . CBytes.fromBytes <$> Z.fromStdString (c_get_log_attrs_extra attrs' key') + et <- c_exist_log_attrs_extras attrs' (BA# key') + if et then Just . CBytes.fromBytes <$> Z.fromStdString (c_get_log_attrs_extra attrs' (BA# key')) else return Nothing {-# INLINE logGroupGetExtraAttr #-} @@ -578,7 +577,7 @@ logGroupUpdateExtraAttrs client group extraAttrs = Z.withPrimArrayListUnsafe vs $ \vs' _ -> do let size = logsConfigStatusCbDataSize peek_data = peekLogsConfigStatusCbData - cfun = c_ld_loggroup_update_extra_attrs client' group' l ks' vs' + cfun = c_ld_loggroup_update_extra_attrs client' group' l (BAArray# ks') (BAArray# vs') LogsConfigStatusCbData errno version _failure_reason <- withAsync size peek_data cfun void $ E.throwStreamErrorIfNotOK' errno syncLogsConfigVersion client version @@ -611,7 +610,7 @@ logGroupSetRange client path (start, end) = CBytes.withCBytesUnsafe path $ \path' -> do let size = logsConfigStatusCbDataSize peek_data = peekLogsConfigStatusCbData - cfun = c_ld_client_set_log_group_range client' path' start end + cfun = c_ld_client_set_log_group_range client' (BA# path') start end LogsConfigStatusCbData errno version _ <- withAsync size peek_data cfun void $ E.throwStreamErrorIfNotOK' errno return version @@ -749,7 +748,7 @@ ldWriteAttributes client path attrs' = CBytes.withCBytesUnsafe path $ \path' -> do let size = logsConfigStatusCbDataSize peek_data = peekLogsConfigStatusCbData - cfun = c_ld_client_set_attributes client' path' attrs' + cfun = c_ld_client_set_attributes client' (BA# path') attrs' LogsConfigStatusCbData errno version _ <- withAsync size peek_data cfun void $ E.throwStreamErrorIfNotOK' errno return version diff --git a/hstream-store/HStream/Store/Internal/LogDevice/Reader.hs b/hstream-store/HStream/Store/Internal/LogDevice/Reader.hs index dd5e281c6..f2fa7fa5e 100644 --- a/hstream-store/HStream/Store/Internal/LogDevice/Reader.hs +++ b/hstream-store/HStream/Store/Internal/LogDevice/Reader.hs @@ -22,8 +22,8 @@ import qualified Z.Data.CBytes as ZC import Z.Data.CBytes (CBytes) import Z.Data.Vector.Base (Bytes) import qualified Z.Foreign as Z -import Z.Foreign (BA#, MBA#) +import HStream.Foreign (BA# (..), MBA# (..)) import qualified HStream.Logger as Log import qualified HStream.Store.Exception as E import HStream.Store.Internal.Foreign (cbool2bool, retryWhileAgain, @@ -66,7 +66,7 @@ newLDSyncCkpReader name reader store = -- FIXME: The number of retries when synchronously writing checkpoints. -- We only use the async cpp function, so this option has no means currently let retries = 10 - i <- c_new_logdevice_sync_checkpointed_reader name' reader' store' retries + i <- c_new_logdevice_sync_checkpointed_reader (BA# name') reader' store' retries newForeignPtr c_free_sync_checkpointed_reader_fun i -- | Start reading a log. @@ -293,7 +293,8 @@ writeCheckpoints reader sns retries = va = Z.primArrayFromList $ map snd xs Z.withPrimArrayUnsafe ka $ \ks' len -> Z.withPrimArrayUnsafe va $ \vs' _ -> do - let f = withAsyncPrimUnsafe (0 :: ErrorCode) $ crb_write_checkpoints reader' ks' vs' (fromIntegral len) + let f = withAsyncPrimUnsafe (0 :: ErrorCode) $ + crb_write_checkpoints reader' (BA# ks') (BA# vs') (fromIntegral len) retryWhileAgain f retries {-# INLINABLE writeCheckpoints #-} @@ -302,7 +303,8 @@ writeLastCheckpoints reader xs retries = withForeignPtr reader $ \reader' -> do let topicIDs = Z.primArrayFromList xs Z.withPrimArrayUnsafe topicIDs $ \id' len -> do - let f = withAsyncPrimUnsafe (0 :: ErrorCode) $ crb_write_last_read_checkpoints reader' id' (fromIntegral len) + let f = withAsyncPrimUnsafe (0 :: ErrorCode) $ + crb_write_last_read_checkpoints reader' (BA# id') (fromIntegral len) retryWhileAgain f retries {-# INLINABLE writeLastCheckpoints #-} @@ -310,7 +312,7 @@ removeCheckpoints :: HasCallStack => LDSyncCkpReader -> [C_LogID] -> IO () removeCheckpoints reader xs = withForeignPtr reader $ \reader' -> do let logids = Z.primArrayFromList xs Z.withPrimArrayUnsafe logids $ \id' len -> do - let f = crb_asyncRemoveCheckpoints reader' id' (fromIntegral len) + let f = crb_asyncRemoveCheckpoints reader' (BA# id') (fromIntegral len) (err, _ret) <- withAsyncPrimUnsafe (0 :: ErrorCode) f void $ E.throwStreamErrorIfNotOK' err @@ -479,7 +481,7 @@ foreign import ccall unsafe "hs_logdevice.h crb_write_checkpoints" -> BA# LSN -> Word -> StablePtr PrimMVar -> Int - -> MBA# Word8 + -> MBA# ErrorCode -> IO () foreign import ccall unsafe "hs_logdevice.h crb_write_last_read_checkpoints" @@ -488,7 +490,7 @@ foreign import ccall unsafe "hs_logdevice.h crb_write_last_read_checkpoints" -> BA# C_LogID -> Word -> StablePtr PrimMVar -> Int - -> MBA# Word8 + -> MBA# ErrorCode -> IO () foreign import ccall unsafe "hs_logdevice.h crb_asyncRemoveCheckpoints" @@ -496,14 +498,14 @@ foreign import ccall unsafe "hs_logdevice.h crb_asyncRemoveCheckpoints" :: Ptr LogDeviceSyncCheckpointedReader -> BA# C_LogID -> Word -> StablePtr PrimMVar -> Int - -> MBA# Word8 + -> MBA# ErrorCode -> IO () foreign import ccall unsafe "hs_logdevice.h crb_asyncRemoveAllCheckpoints" crb_asyncRemoveAllCheckpoints :: Ptr LogDeviceSyncCheckpointedReader -> StablePtr PrimMVar -> Int - -> MBA# Word8 + -> MBA# ErrorCode -> IO () foreign import ccall safe "hs_logdevice.h sync_write_last_read_checkpoints" diff --git a/hstream-store/HStream/Store/Internal/LogDevice/VersionedConfigStore.hs b/hstream-store/HStream/Store/Internal/LogDevice/VersionedConfigStore.hs index c387cb83a..8a328af2f 100644 --- a/hstream-store/HStream/Store/Internal/LogDevice/VersionedConfigStore.hs +++ b/hstream-store/HStream/Store/Internal/LogDevice/VersionedConfigStore.hs @@ -14,8 +14,8 @@ import qualified Z.Data.CBytes as CBytes import Z.Data.CBytes (CBytes) import Z.Data.Vector (Bytes) import qualified Z.Foreign as Z -import Z.Foreign (BA#) +import HStream.Foreign (BA# (..), MBA# (..)) import qualified HStream.Logger as Log import qualified HStream.Store.Exception as E import HStream.Store.Internal.Foreign @@ -56,7 +56,7 @@ vcsGetConfig -> IO Bytes vcsGetConfig vcs key m_base_version auto_retries = withForeignPtr vcs $ \vcs' -> CBytes.withCBytesUnsafe key $ \key' -> - go vcs' key' auto_retries + go vcs' (BA# key') auto_retries where go vcs' key' retries = do cbData <- run vcs' key' @@ -71,7 +71,7 @@ vcsGetConfig vcs key m_base_version auto_retries = run vcs' key' = case m_base_version of Just bv -> fmap snd $ Z.withPrimUnsafe bv $ \ver_ -> - f $ c_logdevice_vcs_get_config vcs' key' (unsafeFreezeBA# ver_) + f $ c_logdevice_vcs_get_config vcs' key' (unsafeFreezeBA# (MBA# ver_)) Nothing -> f $ c_logdevice_vcs_get_config' vcs' key' nullPtr f = withAsyncVoid vcsValueCallbackDataSize peekVcsValueCallbackData @@ -91,7 +91,7 @@ ldVcsGetLatestConfig -> IO Bytes ldVcsGetLatestConfig vcs key auto_retries = withForeignPtr vcs $ \vcs' -> CBytes.withCBytesUnsafe key $ \key' -> - go vcs' key' auto_retries + go vcs' (BA# key') auto_retries where go vcs' key' retries = do cbData <- withAsyncVoid vcsValueCallbackDataSize peekVcsValueCallbackData @@ -149,7 +149,7 @@ ldVcsUpdateConfig vcs key val cond auto_retries = withForeignPtr vcs $ \vcs' -> CBytes.withCBytesUnsafe key $ \key' -> Z.withPrimVectorUnsafe val $ \val' offset len -> - go vcs' key' val' offset len auto_retries + go vcs' (BA# key') (BA# val') offset len auto_retries where go vcs' key' val' offset len retries = do cbData <- withAsyncVoid vcsWriteCallbackDataSize peekVcsWriteCallbackData