Skip to content

Commit

Permalink
Merge pull request #20 from savonarola/0409-fix-concurrency-issues
Browse files Browse the repository at this point in the history
chore: improve concurrency handling
  • Loading branch information
savonarola authored Apr 11, 2024
2 parents c0fcb4d + 5ac3426 commit 72e7014
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 6 deletions.
41 changes: 36 additions & 5 deletions c_src/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include <stdio.h>

#include "erl_nif.h"
#include "erl_driver.h"
#include "fdb.h"

#include "atoms.h"
Expand Down Expand Up @@ -45,6 +46,8 @@ typedef struct _ErlFDBSt
ErlNifTid network_tid;
ErlNifMutex* lock;
ErlNifCond* cond;
ErlDrvTSDKey future_proc_env_key;

} ErlFDBSt;


Expand Down Expand Up @@ -76,19 +79,30 @@ erlfdb_future_cb(FDBFuture* fdb_future, void* data)
{
ErlFDBFuture* future = (ErlFDBFuture*) data;
ERL_NIF_TERM msg;
ErlNifEnv* proc_env;
bool cancelled;

enif_mutex_lock(future->cancel_lock);
cancelled = future->cancelled;
enif_mutex_unlock(future->cancel_lock);

if(!cancelled) {
enif_mutex_lock(future->msg_lock);
msg = T2(future->msg_env, future->msg_ref, ATOM_ready);
enif_send(NULL, &(future->pid), future->msg_env, msg);
enif_mutex_unlock(future->msg_lock);
enif_mutex_lock(future->msg_lock);

if(future->msg_env != NULL) {

if(!cancelled) {
proc_env = (ErlNifEnv*) erl_drv_tsd_get(future->future_proc_env_key);
msg = T2(future->msg_env, future->msg_ref, ATOM_ready);
enif_send(proc_env, &(future->pid), future->msg_env, msg);
} else {
enif_free_env(future->msg_env);
}

future->msg_env = NULL;
}

enif_mutex_unlock(future->msg_lock);

// We're now done with this future which means we need
// to release our handle to it. See erlfdb_create_future
// for more on why this happens here.
Expand All @@ -106,6 +120,8 @@ erlfdb_create_future(ErlNifEnv* env, FDBFuture* future, ErlFDBFutureGetter gette
ERL_NIF_TERM ref = enif_make_ref(env);
ERL_NIF_TERM ret;
fdb_error_t err;
ErlFDBSt* st = (ErlFDBSt*) enif_priv_data(env);


// TODO: check results of each operation.
// TODO: return error if any operation fails and clean previously allocated resources.
Expand All @@ -121,6 +137,8 @@ erlfdb_create_future(ErlNifEnv* env, FDBFuture* future, ErlFDBFutureGetter gette
f->cancelled = false;
f->cancel_lock = enif_mutex_create("fdb:future_cancel_lock");

f->future_proc_env_key = st->future_proc_env_key;

// This resource reference counting dance is a bit
// awkward as erlfdb_future_cb can be called both
// synchronously and asynchronously.
Expand Down Expand Up @@ -343,6 +361,13 @@ static int
erlfdb_load(ErlNifEnv* env, void** priv, ERL_NIF_TERM num_schedulers)
{
ErlFDBSt* st = (ErlFDBSt*) enif_alloc(sizeof(ErlFDBSt));
int key_create_ret;

key_create_ret = erl_drv_tsd_key_create("erlfdb_future_proc_env_key", &(st->future_proc_env_key));

if(key_create_ret != 0) {
return 1;
}

erlfdb_init_atoms(env);

Expand All @@ -353,6 +378,7 @@ erlfdb_load(ErlNifEnv* env, void** priv, ERL_NIF_TERM num_schedulers)
st->lock = enif_mutex_create("fdb:st_lock");
st->cond = enif_cond_create("fdb:st_cond");


st->lib_state = ErlFDB_LOADED;
*priv = st;

Expand Down Expand Up @@ -388,8 +414,10 @@ erlfdb_unload(ErlNifEnv* env, void* priv)

enif_mutex_destroy(st->lock);
enif_cond_destroy(st->cond);
erl_drv_tsd_key_destroy(st->future_proc_env_key);
enif_free(priv);


return;
}

Expand Down Expand Up @@ -567,8 +595,11 @@ erlfdb_future_cancel(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
future->cancelled = true;
enif_mutex_unlock(future->cancel_lock);

erl_drv_tsd_set(st->future_proc_env_key, env);

fdb_future_cancel(future->future);

erl_drv_tsd_set(st->future_proc_env_key, NULL);

return ATOM_ok;
}
Expand Down
3 changes: 2 additions & 1 deletion c_src/resources.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include <stdbool.h>

#include "erl_nif.h"
#include "erl_driver.h"
#include "fdb.h"


Expand All @@ -36,7 +37,7 @@ struct _ErlFDBFuture
ErlNifEnv* msg_env;
ERL_NIF_TERM msg_ref;
ErlNifMutex* msg_lock;

ErlDrvTSDKey future_proc_env_key;
bool cancelled;
ErlNifMutex* cancel_lock;
};
Expand Down

0 comments on commit 72e7014

Please sign in to comment.