Skip to content

Commit

Permalink
fix mem error while session event can be triggered multi-times
Browse files Browse the repository at this point in the history
  • Loading branch information
4eUeP committed Mar 22, 2022
1 parent 603a12e commit 88d543c
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 63 deletions.
87 changes: 74 additions & 13 deletions cbits/hs_zk.c
Original file line number Diff line number Diff line change
@@ -1,17 +1,31 @@
#include "hs_zk.h"

// ----------------------------------------------------------------------------
// Helpers
// ----------------------------------------------------------------------------

const stat_t* dup_stat(const stat_t* old_stat);
const string_vector_t* dup_string_vector(const string_vector_t* old_strings);
const acl_vector_t* dup_acl_vector(const acl_vector_t* old_acls);

// ----------------------------------------------------------------------------
// Callback Functions
// ----------------------------------------------------------------------------

void hs_zookeeper_watcher_fn(zhandle_t* zh, int type, int state,
const char* path, void* watcherCtx) {
hs_watcher_ctx_t* watcher_ctx = (hs_watcher_ctx_t*)watcherCtx;
watcher_ctx->zh = zh;
watcher_ctx->type = type;
watcher_ctx->state = state;
watcher_ctx->path = strdup(path);
hs_try_putmvar(watcher_ctx->cap, watcher_ctx->mvar);
void hs_zookeeper_node_watcher_fn(zhandle_t* zh, int type, int state,
const char* path, void* watcherCtx) {
if (type != ZOO_SESSION_EVENT) {
hs_watcher_ctx_t* watcher_ctx = (hs_watcher_ctx_t*)watcherCtx;
watcher_ctx->zh = zh;
watcher_ctx->type = type;
watcher_ctx->state = state;
watcher_ctx->path = strdup(path);
hs_try_putmvar(watcher_ctx->cap, watcher_ctx->mvar);
} else {
fprintf(stderr,
"zoovisitor.hs_zookeeper_node_watcher_fn: ignore event type: %d\n",
type);
}
}

/**
Expand Down Expand Up @@ -262,7 +276,7 @@ int hs_zoo_awget(zhandle_t* zh, const char* path, HsStablePtr mvar_w,
watcher_ctx->cap = cap;
data_completion->mvar = mvar_f;
data_completion->cap = cap;
return zoo_awget(zh, path, hs_zookeeper_watcher_fn, watcher_ctx,
return zoo_awget(zh, path, hs_zookeeper_node_watcher_fn, watcher_ctx,
hs_data_completion_fn, data_completion);
}

Expand Down Expand Up @@ -298,7 +312,7 @@ int hs_zoo_awexists(zhandle_t* zh, const char* path, HsStablePtr mvar_w,
watcher_ctx->cap = cap;
stat_completion->mvar = mvar_f;
stat_completion->cap = cap;
return zoo_awexists(zh, path, hs_zookeeper_watcher_fn, watcher_ctx,
return zoo_awexists(zh, path, hs_zookeeper_node_watcher_fn, watcher_ctx,
hs_stat_completion_fn, stat_completion);
}

Expand All @@ -319,7 +333,7 @@ int hs_zoo_awget_children(zhandle_t* zh, const char* path, HsStablePtr mvar_w,
watcher_ctx->cap = cap;
strings_completion->mvar = mvar_f;
strings_completion->cap = cap;
return zoo_awget_children(zh, path, hs_zookeeper_watcher_fn, watcher_ctx,
return zoo_awget_children(zh, path, hs_zookeeper_node_watcher_fn, watcher_ctx,
hs_strings_completion_fn, strings_completion);
}

Expand All @@ -340,8 +354,9 @@ int hs_zoo_awget_children2(zhandle_t* zh, const char* path, HsStablePtr mvar_w,
watcher_ctx->cap = cap;
strings_stat->mvar = mvar_f;
strings_stat->cap = cap;
return zoo_awget_children2(zh, path, hs_zookeeper_watcher_fn, watcher_ctx,
hs_strings_stat_completion_fn, strings_stat);
return zoo_awget_children2(zh, path, hs_zookeeper_node_watcher_fn,
watcher_ctx, hs_strings_stat_completion_fn,
strings_stat);
}

int hs_zoo_aget_acl(zhandle_t* zh, const char* path, HsStablePtr mvar,
Expand Down Expand Up @@ -373,3 +388,49 @@ void hs_zoo_set_op_init(zoo_op_t* op, const char* path, const char* value,
stat_t* stat) {
zoo_set_op_init(op, path, value + valoffset, valuelen, version, stat);
}

// ----------------------------------------------------------------------------
// Helpers

const stat_t* dup_stat(const stat_t* old_stat) {
stat_t* new_stat = (stat_t*)malloc(sizeof(stat_t));
new_stat = memcpy(new_stat, old_stat, sizeof(stat_t));
return new_stat;
}

const string_vector_t* dup_string_vector(const string_vector_t* old_strings) {
int count = old_strings->count;
if (count < 0) {
fprintf(stderr, "dup_string_vector error: count %d\n", count);
return NULL;
}
string_vector_t* new_strings =
(string_vector_t*)malloc(sizeof(string_vector_t));
char** vals = malloc(count * sizeof(char*));
for (int i = 0; i < count; ++i) {
vals[i] = strdup(old_strings->data[i]);
}
new_strings->count = count;
new_strings->data = vals;
return new_strings;
}

const acl_vector_t* dup_acl_vector(const acl_vector_t* old_acls) {
int count = old_acls->count;
if (count < 0) {
fprintf(stderr, "dup_acl_vector error: count %d\n", count);
return NULL;
}
acl_t* data = (acl_t*)malloc(count * sizeof(acl_t));
acl_t* old_data = old_acls->data;
for (int i = 0; i < count; ++i) {
data[i].perms = old_data[i].perms;
data[i].id.scheme = strdup(old_data[i].id.scheme);
data[i].id.id = strdup(old_data[i].id.id);
}

acl_vector_t* new_acls = (acl_vector_t*)malloc(sizeof(acl_vector_t));
new_acls->count = count;
new_acls->data = data;
return new_acls;
}
43 changes: 0 additions & 43 deletions include/hs_zk.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,49 +12,6 @@ typedef struct String_vector string_vector_t;
typedef struct ACL acl_t;
typedef struct ACL_vector acl_vector_t;

const stat_t* dup_stat(const stat_t* old_stat) {
stat_t* new_stat = (stat_t*)malloc(sizeof(stat_t));
new_stat = memcpy(new_stat, old_stat, sizeof(stat_t));
return new_stat;
}

const string_vector_t* dup_string_vector(const string_vector_t* old_strings) {
int count = old_strings->count;
if (count < 0) {
fprintf(stderr, "dup_string_vector error: count %d\n", count);
return NULL;
}
string_vector_t* new_strings =
(string_vector_t*)malloc(sizeof(string_vector_t));
char** vals = malloc(count * sizeof(char*));
for (int i = 0; i < count; ++i) {
vals[i] = strdup(old_strings->data[i]);
}
new_strings->count = count;
new_strings->data = vals;
return new_strings;
}

const acl_vector_t* dup_acl_vector(const acl_vector_t* old_acls) {
int count = old_acls->count;
if (count < 0) {
fprintf(stderr, "dup_acl_vector error: count %d\n", count);
return NULL;
}
acl_t* data = (acl_t*)malloc(count * sizeof(acl_t));
acl_t* old_data = old_acls->data;
for (int i = 0; i < count; ++i) {
data[i].perms = old_data[i].perms;
data[i].id.scheme = strdup(old_data[i].id.scheme);
data[i].id.id = strdup(old_data[i].id.id);
}

acl_vector_t* new_acls = (acl_vector_t*)malloc(sizeof(acl_vector_t));
new_acls->count = count;
new_acls->data = data;
return new_acls;
}

typedef struct hs_watcher_ctx_t {
HsStablePtr mvar;
HsInt cap;
Expand Down
20 changes: 16 additions & 4 deletions src/ZooKeeper.hs
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ import Foreign.C (CInt)
import Foreign.Ptr (FunPtr, Ptr, freeHaskellFunPtr,
nullFunPtr, nullPtr, plusPtr)
import GHC.Stack (HasCallStack)
import Z.Data.CBytes (CBytes)
import qualified Z.Data.CBytes as CBytes
import Z.Data.CBytes (CBytes)
import Z.Data.Vector (Bytes)
import qualified Z.Foreign as Z
import qualified Z.IO.FileSystem.FilePath as ZF
Expand All @@ -66,7 +66,8 @@ zookeeperResInit
-- server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
-> Maybe T.WatcherFn
-- ^ the global watcher callback function. When notifications are
-- triggered this function will be invoked.
-- triggered this function will be invoked. FIXME: Calling any zoo operations
-- (e.g. zooGet) will cause an infinite block.
-> CInt
-- ^ session expiration time in milliseconds
-> Maybe T.ClientID
Expand Down Expand Up @@ -205,8 +206,10 @@ zooWatchGet
-> CBytes
-> (T.HsWatcherCtx -> IO ())
-- ^ The watcher callback.
--
-- A watch will be set at the server to notify the client if the node changes.
--
-- /NOTE: this works different with c client. You will only receive data
-- and child watches, no session watches./
-> (T.DataCompletion -> IO ())
-- ^ The result callback when the request completes.
--
Expand Down Expand Up @@ -319,6 +322,9 @@ zooWatchExists
-- A watch will set on the specified znode on the server. The watch will be
-- set even if the node does not exist. This allows clients to watch for
-- nodes to appear.
--
-- /NOTE: this works different with c client. You will only receive data
-- and child watches, no session watches./
-> (Maybe T.StatCompletion -> IO ())
-- ^ The result callback when the request completes. Nothing means
-- the node does not exist.
Expand Down Expand Up @@ -386,6 +392,9 @@ zooWatchGetChildren
-> (T.HsWatcherCtx -> IO ())
-- ^ The watcher callback. A watch will be set at the server to notify
-- the client if the node changes.
--
-- /NOTE: this works different with c client. You will only receive data
-- and child watches, no session watches./
-> (T.StringsCompletion -> IO ())
-- ^ The result callback when the request completes.
--
Expand Down Expand Up @@ -454,6 +463,8 @@ zooWatchGetChildren2
-> (T.HsWatcherCtx -> IO ())
-- ^ The watcher callback. A watch will be set at the server to notify
-- the client if the node changes.
-- /NOTE: this works different with c client. You will only receive data
-- and child watches, no session watches./
-> (T.StringsStatCompletion -> IO ())
-- ^ The result callback when the request completes.
--
Expand Down Expand Up @@ -654,7 +665,8 @@ zookeeperInit
-- server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
-> Maybe T.WatcherFn
-- ^ the global watcher callback function. When notifications are
-- triggered this function will be invoked.
-- triggered this function will be invoked. FIXME: Calling any zoo operations
-- (e.g. zooGet) will cause an infinite block.
-> CInt
-- ^ session expiration time in milliseconds
-> Maybe T.ClientID
Expand Down
7 changes: 4 additions & 3 deletions src/ZooKeeper/Internal/FFI.hsc
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
{-# LANGUAGE CApiFFI #-}
{-# LANGUAGE CPP #-}
{-# LANGUAGE UnliftedFFITypes #-}

Expand Down Expand Up @@ -228,10 +229,10 @@ foreign import ccall unsafe "hs_zk.h hs_zoo_set_op_init"
foreign import ccall safe "zookeeper.h zoo_set_watcher"
zoo_set_watcher :: ZHandle -> FunPtr CWatcherFn -> IO (FunPtr CWatcherFn)

foreign import ccall unsafe "hs_zk.h zoo_check_op_init"
foreign import ccall unsafe "zookeeper.h zoo_check_op_init"
c_zoo_check_op_init :: Ptr CZooOp -> BA## Word8 -> CInt -> IO ()

foreign import ccall unsafe "hs_zk.h is_unrecoverable"
foreign import ccall unsafe "zookeeper.h is_unrecoverable"
c_is_unrecoverable :: ZHandle -> IO CInt

-------------------------------------------------------------------------------
Expand Down Expand Up @@ -296,4 +297,4 @@ withZKAsync2 size1 peekRet1 peekData1 f1 size2 peekRet2 peekData2 f2 g = mask_ $
case rc1 of
CZOK -> f1 =<< Right <$> peekData1 data1'
_ -> f1 $ Left rc1
{-# INLINE withZKAsync2 #-}
{-# INLINABLE withZKAsync2 #-}

0 comments on commit 88d543c

Please sign in to comment.