Skip to content

Commit

Permalink
hstream-store: add ckpStoreGetAllCheckpoints (#1623)
Browse files Browse the repository at this point in the history
* get all checkpoints from checkpointStore

* more type safe
  • Loading branch information
4eUeP authored Sep 28, 2023
1 parent 80285fb commit c0f0053
Show file tree
Hide file tree
Showing 17 changed files with 314 additions and 240 deletions.
115 changes: 65 additions & 50 deletions common/base/HStream/Foreign.hs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

module HStream.Foreign
( PeekNFun
, DeleteFun
, peekN
, BA# (..)
, MBA# (..)
Expand All @@ -24,8 +25,12 @@ module HStream.Foreign
-- * List
, StdVector
, FollySmallVector
, peekFollySmallVectorDoubleN
, peekStdVectorWord64
, peekStdVectorWord64Off
, peekStdVectorWord64N
, peekFollySmallVectorDouble
, peekFollySmallVectorDoubleOff
, peekFollySmallVectorDoubleN

-- * Map
, PeekMapFun
Expand All @@ -36,10 +41,11 @@ module HStream.Foreign
-- * Misc
, c_delete_string
, c_delete_vector_of_string
, c_delete_vector_of_int
, c_delete_vector_of_cint
, c_delete_vector_of_int64
, c_delete_vector_of_uint64
, c_delete_std_vec_of_folly_small_vec_of_double
, cal_offset_std_string
, c_cal_offset_std_string
, bool2cbool
) where

Expand All @@ -49,12 +55,12 @@ import Data.Int (Int64)
import Data.Map.Strict (Map)
import qualified Data.Map.Strict as Map
import Data.Primitive
import Data.Word
import Foreign.C.Types
import Foreign.ForeignPtr
import Foreign.Ptr
import Foreign.Storable
import GHC.Exts
import GHC.Prim
import qualified Z.Data.Array as Z
import qualified Z.Data.CBytes as CBytes
import Z.Data.CBytes (CBytes)
Expand All @@ -77,6 +83,20 @@ newtype MBA# a = MBA# (MutableByteArray# RealWorld)
-- TODO: ghc-9.4+ deprecates ArrayArray#, consider using Array# instead
newtype BAArray# a = BAArray# ArrayArray#

-------------------------------------------------------------------------------

#define HS_CPP_GET_SIZE(CFUN, HSOBJ) \
foreign import ccall unsafe "get_size_##CFUN" \
c_get_size_##CFUN :: Ptr HSOBJ -> IO Int

#define HS_CPP_PEEK(CFUN, HSOBJ, VAL_TYPE) \
foreign import ccall unsafe "peek_##CFUN" \
c_peek_##CFUN :: Ptr HSOBJ -> Int -> Ptr VAL_TYPE -> IO ()

#define HS_CPP_CAL_OFFSET(CFUN, HSOBJ) \
foreign import ccall unsafe "cal_offset_##CFUN" \
c_cal_offset_##CFUN :: Ptr HSOBJ -> Int -> IO (Ptr HSOBJ)

-------------------------------------------------------------------------------
-- Optional

Expand Down Expand Up @@ -115,7 +135,7 @@ peekStdStringToCBytesN len ptr

peekStdStringToCBytesIdx :: Ptr Z.StdString -> Int -> IO CBytes
peekStdStringToCBytesIdx p offset = do
ptr <- cal_offset_std_string p offset
ptr <- c_cal_offset_std_string p offset
siz :: Int <- Z.hs_std_string_size ptr
let !siz' = siz + 1
(mpa@(Z.MutablePrimArray mba#) :: Z.MutablePrimArray Z.RealWorld a) <- Z.newPrimArray siz'
Expand All @@ -132,58 +152,58 @@ withStdStringUnsafe f = do
else do str <- finally (peekStdStringToCBytesIdx ptr' 0) (c_delete_string ptr')
pure (str, ret)

HS_CPP_CAL_OFFSET(std_string, StdString)

-------------------------------------------------------------------------------

data StdVector a

data FollySmallVector a

peekFollySmallVectorDoubleN :: Int -> Ptr (FollySmallVector Double) -> IO [[Double]]
peekFollySmallVectorDoubleN len ptr
| len <= 0 || ptr == nullPtr = return []
| otherwise = forM [0..len-1] (peekFollySmallVectorDouble ptr)

-- TODO: use Vector or Array as returned value, so that we optimise(remove) the
-- "peekN" function to copy the memory twice.
peekFollySmallVectorDouble :: Ptr (FollySmallVector Double) -> Int -> IO [Double]
peekFollySmallVectorDouble ptr offset = do
ptr' <- c_cal_offset_vec_of_folly_small_vec_of_double ptr offset
size <- c_get_size_folly_small_vec_of_double ptr'
fp <- mallocForeignPtrBytes size
withForeignPtr fp $ \data' -> do
c_peek_folly_small_vec_of_double ptr' size data'
peekN size data'

#define HS_CPP_VEC_SIZE(CFUN, HSOBJ) \
foreign import ccall unsafe "hs_cpp_lib.h CFUN" \
c_##CFUN :: Ptr HSOBJ -> IO Int

#define HS_CPP_VEC_PEEK(CFUN, HSOBJ, VAL_TYPE) \
foreign import ccall unsafe "hs_cpp_lib.h CFUN" \
c_##CFUN :: Ptr HSOBJ -> Int -> Ptr VAL_TYPE -> IO ()

#define HS_CPP_VEC_OFFSET(CFUN, HSOBJ) \
foreign import ccall unsafe "hs_cpp_lib.h CFUN" \
c_##CFUN :: Ptr HSOBJ -> Int -> IO (Ptr HSOBJ)

HS_CPP_VEC_SIZE(get_size_folly_small_vec_of_double, (FollySmallVector Double))
HS_CPP_VEC_PEEK(peek_folly_small_vec_of_double, (FollySmallVector Double), Double)
HS_CPP_VEC_OFFSET(cal_offset_vec_of_folly_small_vec_of_double, (FollySmallVector Double))
#define HS_PEEK(ty, a, cfun) \
peek##ty##a##Off :: Ptr (ty a) -> Int -> IO [a]; \
peek##ty##a##Off ptr offset = (do \
ptr' <- c_cal_offset_##cfun ptr offset; \
size <- c_get_size_##cfun ptr'; \
fp <- mallocForeignPtrBytes size; \
withForeignPtr fp (\data' -> (do c_peek_##cfun ptr' size data'; \
peekN size data')) ); \
\
peek##ty##a :: Ptr (ty a) -> IO [a]; \
peek##ty##a ptr = peek##ty##a##Off ptr 0; \
\
peek##ty##a##N :: Int -> Ptr (ty a) -> IO [[a]]; \
peek##ty##a##N len ptr \
| len <= 0 || ptr == nullPtr = return [] \
| otherwise = forM [0..len-1] (peek##ty##a##Off ptr);

HS_PEEK(StdVector, Word64, vec_of_uint64)
HS_PEEK(FollySmallVector, Double, folly_small_vec_of_double)

HS_CPP_GET_SIZE(vec_of_uint64, (StdVector Word64))
HS_CPP_GET_SIZE(folly_small_vec_of_double, (FollySmallVector Double))

HS_CPP_CAL_OFFSET(vec_of_uint64, (StdVector Word64))
HS_CPP_CAL_OFFSET(folly_small_vec_of_double, (FollySmallVector Double))

HS_CPP_PEEK(vec_of_uint64, (StdVector Word64), Word64)
HS_CPP_PEEK(folly_small_vec_of_double, (FollySmallVector Double), Double)

-------------------------------------------------------------------------------

type PeekMapFun a dk dv ck cv
type PeekMapFun a pk pv dk dv
= MBA# Int
-- ^ returned map size
-> MBA# (Ptr pk) -> MBA# (Ptr pv)
-> MBA# (Ptr dk) -> MBA# (Ptr dv)
-> MBA# (Ptr ck) -> MBA# (Ptr cv)
-> IO a

peekCppMap
:: forall a dk dv ck cv k v. Ord k
=> PeekMapFun a dk dv ck cv
-> PeekNFun dk k -> DeleteFun ck
-> PeekNFun dv v -> DeleteFun cv
:: forall a pk pv dk dv k v. Ord k
=> PeekMapFun a pk pv dk dv
-> PeekNFun pk k -> DeleteFun dk
-> PeekNFun pv v -> DeleteFun dv
-> IO (a, Map.Map k v)
peekCppMap f peekKey delKey peekVal delVal = do
(len, (keys_ptr, (values_ptr, (keys_vec, (values_vec, ret))))) <-
Expand All @@ -194,10 +214,10 @@ peekCppMap f peekKey delKey peekVal delVal = do
Z.withPrimUnsafe nullPtr $ \values_vec ->
f (MBA# len) (MBA# keys) (MBA# values) (MBA# keys_vec) (MBA# values_vec)
finally
(buildExtras ret len keys_ptr values_ptr)
(buildMap ret len keys_ptr values_ptr)
(delKey keys_vec <> delVal values_vec)
where
buildExtras ret len keys_ptr values_ptr = do
buildMap ret len keys_ptr values_ptr = do
keys <- peekKey len keys_ptr
values <- peekVal len values_ptr
return (ret, Map.fromList $ zip keys values)
Expand Down Expand Up @@ -229,16 +249,11 @@ withPrimListPairUnsafe pairs f = do

HS_CPP_DELETE(delete_string, StdString)
HS_CPP_DELETE(delete_vector_of_string, (StdVector StdString))
HS_CPP_DELETE(delete_vector_of_int, (StdVector CInt))
HS_CPP_DELETE(delete_vector_of_cint, (StdVector CInt))
HS_CPP_DELETE(delete_vector_of_int64, (StdVector Int64))
HS_CPP_DELETE(delete_vector_of_uint64, (StdVector Word64))
HS_CPP_DELETE(delete_std_vec_of_folly_small_vec_of_double, (StdVector (FollySmallVector Double)))

#define HS_CPP_CAL_OFFSET(CFUN, HSOBJ) \
foreign import ccall unsafe "hs_cpp_lib.h CFUN" \
CFUN :: Ptr HSOBJ -> Int -> IO (Ptr HSOBJ)

HS_CPP_CAL_OFFSET(cal_offset_std_string, StdString)

bool2cbool :: Bool -> CBool
bool2cbool True = 1
bool2cbool False = 0
34 changes: 18 additions & 16 deletions common/base/cbits/hs_struct.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,16 @@ std::string* copy_std_string(std::string&& str) {
return current + offset; \
}

#define VECTOR_SIZE(NAME, VEC_TYPE) \
HsInt get_size_##NAME(const VEC_TYPE* vec) { return vec->size(); }
#define GET_SIZE(NAME, VAL_TYPE) \
HsInt get_size_##NAME(const VAL_TYPE* v) { return v->size(); }

#define DEL_FUNCTION(NAME, TYPE) \
void delete_##NAME(TYPE* p) { delete p; }

#define PEEK_VECTOR(NAME, VEC_TYPE, VAL_TYPE) \
void peek_##NAME(const VEC_TYPE* vec, HsInt len, VAL_TYPE* vals) { \
assert(("peek_##NAME: size mismatch!", len == vec->size())); \
for (int i = 0; i < len; i++) { \
void peek_##NAME(const VEC_TYPE* vec, HsInt vals_len, VAL_TYPE* vals) { \
assert(("peek_##NAME: size mismatch!", vals_len == vec->size())); \
for (int i = 0; i < vals_len; i++) { \
(vals)[i] = (*vec)[i]; \
} \
}
Expand All @@ -32,25 +35,24 @@ extern "C" {

// Unfortunately, there is no generic in c

CAL_OFFSET(std_string, std::string);
CAL_OFFSET(vec_of_folly_small_vec_of_double,
folly::small_vector<double COMMA 4>);
GET_SIZE(vec_of_double, std::vector<double>);
GET_SIZE(vec_of_uint64, std::vector<uint64_t>);
GET_SIZE(folly_small_vec_of_double, folly::small_vector<double COMMA 4>);

VECTOR_SIZE(vec_of_double, std::vector<double>);
VECTOR_SIZE(folly_small_vec_of_double, folly::small_vector<double COMMA 4>);
// Ptr a -> Int -> Ptr a
CAL_OFFSET(std_string, std::string);
CAL_OFFSET(vec_of_uint64, std::vector<uint64_t>);
CAL_OFFSET(folly_small_vec_of_double, folly::small_vector<double COMMA 4>);

PEEK_VECTOR(vec_of_uint64, std::vector<uint64_t>, uint64_t)
PEEK_VECTOR(folly_small_vec_of_double, folly::small_vector<double COMMA 4>,
double);

// ----------------------------------------------------------------------------

#define DEL_FUNCTION(NAME, TYPE) \
void delete_##NAME(TYPE* p) { delete p; }

DEL_FUNCTION(vector_of_int, std::vector<int>);
DEL_FUNCTION(vector_of_cint, std::vector<int>);
DEL_FUNCTION(string, std::string);
DEL_FUNCTION(vector_of_string, std::vector<std::string>);
DEL_FUNCTION(vector_of_int64, std::vector<int64_t>);
DEL_FUNCTION(vector_of_uint64, std::vector<uint64_t>);
DEL_FUNCTION(std_vec_of_folly_small_vec_of_double,
std::vector<folly::small_vector<double COMMA 4>>);

Expand Down
2 changes: 1 addition & 1 deletion common/hstream/HStream/Common/Query.hs
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ queryResultMetadata results idx = withForeignPtr results $ \results_ptr -> do
(MBA# reason_val') (MBA# reason_del')
failures <- if len >= 1
then do keys <- finally (peekN (fromIntegral len) key_val)
(c_delete_vector_of_int key_del)
(c_delete_vector_of_cint key_del)
addrs <- finally (peekStdStringToCBytesN (fromIntegral len) addr_val)
(c_delete_vector_of_string addr_del)
reasons <- finally (peekStdStringToCBytesN (fromIntegral len) reason_val)
Expand Down
59 changes: 0 additions & 59 deletions common/hstream/cbits/hs_struct.cpp

This file was deleted.

Loading

0 comments on commit c0f0053

Please sign in to comment.