diff --git a/c_src/main.c b/c_src/main.c index 08372d8..11ac73b 100644 --- a/c_src/main.c +++ b/c_src/main.c @@ -15,6 +15,7 @@ #include #include "erl_nif.h" +#include "erl_driver.h" #include "fdb.h" #include "atoms.h" @@ -45,6 +46,8 @@ typedef struct _ErlFDBSt ErlNifTid network_tid; ErlNifMutex* lock; ErlNifCond* cond; + ErlDrvTSDKey future_proc_env_key; + } ErlFDBSt; @@ -76,19 +79,30 @@ erlfdb_future_cb(FDBFuture* fdb_future, void* data) { ErlFDBFuture* future = (ErlFDBFuture*) data; ERL_NIF_TERM msg; + ErlNifEnv* proc_env; bool cancelled; enif_mutex_lock(future->cancel_lock); cancelled = future->cancelled; enif_mutex_unlock(future->cancel_lock); - if(!cancelled) { - enif_mutex_lock(future->msg_lock); - msg = T2(future->msg_env, future->msg_ref, ATOM_ready); - enif_send(NULL, &(future->pid), future->msg_env, msg); - enif_mutex_unlock(future->msg_lock); + enif_mutex_lock(future->msg_lock); + + if(future->msg_env != NULL) { + + if(!cancelled) { + proc_env = (ErlNifEnv*) erl_drv_tsd_get(future->future_proc_env_key); + msg = T2(future->msg_env, future->msg_ref, ATOM_ready); + enif_send(proc_env, &(future->pid), future->msg_env, msg); + } else { + enif_free_env(future->msg_env); + } + + future->msg_env = NULL; } + enif_mutex_unlock(future->msg_lock); + // We're now done with this future which means we need // to release our handle to it. See erlfdb_create_future // for more on why this happens here. @@ -106,6 +120,8 @@ erlfdb_create_future(ErlNifEnv* env, FDBFuture* future, ErlFDBFutureGetter gette ERL_NIF_TERM ref = enif_make_ref(env); ERL_NIF_TERM ret; fdb_error_t err; + ErlFDBSt* st = (ErlFDBSt*) enif_priv_data(env); + // TODO: check results of each operation. // TODO: return error if any operation fails and clean previously allocated resources. @@ -121,6 +137,8 @@ erlfdb_create_future(ErlNifEnv* env, FDBFuture* future, ErlFDBFutureGetter gette f->cancelled = false; f->cancel_lock = enif_mutex_create("fdb:future_cancel_lock"); + f->future_proc_env_key = st->future_proc_env_key; + // This resource reference counting dance is a bit // awkward as erlfdb_future_cb can be called both // synchronously and asynchronously. @@ -343,6 +361,13 @@ static int erlfdb_load(ErlNifEnv* env, void** priv, ERL_NIF_TERM num_schedulers) { ErlFDBSt* st = (ErlFDBSt*) enif_alloc(sizeof(ErlFDBSt)); + int key_create_ret; + + key_create_ret = erl_drv_tsd_key_create("erlfdb_future_proc_env_key", &(st->future_proc_env_key)); + + if(key_create_ret != 0) { + return 1; + } erlfdb_init_atoms(env); @@ -353,6 +378,7 @@ erlfdb_load(ErlNifEnv* env, void** priv, ERL_NIF_TERM num_schedulers) st->lock = enif_mutex_create("fdb:st_lock"); st->cond = enif_cond_create("fdb:st_cond"); + st->lib_state = ErlFDB_LOADED; *priv = st; @@ -390,6 +416,8 @@ erlfdb_unload(ErlNifEnv* env, void* priv) enif_cond_destroy(st->cond); enif_free(priv); + erl_drv_tsd_key_destroy(st->future_proc_env_key); + return; } @@ -567,8 +595,11 @@ erlfdb_future_cancel(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) future->cancelled = true; enif_mutex_unlock(future->cancel_lock); + erl_drv_tsd_set(st->future_proc_env_key, env); + fdb_future_cancel(future->future); + erl_drv_tsd_set(st->future_proc_env_key, NULL); return ATOM_ok; } diff --git a/c_src/resources.h b/c_src/resources.h index be02c7f..129dc33 100644 --- a/c_src/resources.h +++ b/c_src/resources.h @@ -16,6 +16,7 @@ #include #include "erl_nif.h" +#include "erl_driver.h" #include "fdb.h" @@ -36,7 +37,7 @@ struct _ErlFDBFuture ErlNifEnv* msg_env; ERL_NIF_TERM msg_ref; ErlNifMutex* msg_lock; - + ErlDrvTSDKey future_proc_env_key; bool cancelled; ErlNifMutex* cancel_lock; };