Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make struct ndb threadsafe #56

Merged
merged 2 commits into from
Dec 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 89 additions & 50 deletions src/nostrdb.c
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,7 @@
#define MAX_FILTERS 16

// the maximum size of inbox queues
static const int DEFAULT_QUEUE_SIZE = 1000000;

static const int DEFAULT_QUEUE_SIZE = 32768;

// increase if we need bigger filters
#define NDB_FILTER_PAGES 64
Expand Down Expand Up @@ -171,9 +170,10 @@ struct ndb_writer {
};

struct ndb_ingester {
struct ndb_lmdb *lmdb;
uint32_t flags;
struct threadpool tp;
struct ndb_writer *writer;
struct prot_queue *writer_inbox;
void *filter_context;
ndb_ingest_filter_fn filter;
};
Expand All @@ -194,6 +194,11 @@ struct ndb_monitor {
ndb_sub_fn sub_cb;
void *sub_cb_ctx;
int num_subscriptions;

// monitor isn't a full inbox. We want pollers to be able to poll
// subscriptions efficiently without going through a message queue, so
// we use a simple mutex here.
pthread_mutex_t mutex;
};

struct ndb {
Expand Down Expand Up @@ -1722,13 +1727,6 @@ int ndb_note_verify(void *ctx, unsigned char pubkey[32], unsigned char id[32],
return 1;
}

static inline int ndb_writer_queue_msgs(struct ndb_writer *writer,
struct ndb_writer_msg *msgs,
int num_msgs)
{
return prot_queue_push_all(&writer->inbox, msgs, num_msgs);
}

static int ndb_writer_queue_note(struct ndb_writer *writer,
struct ndb_note *note, size_t note_len)
{
Expand Down Expand Up @@ -2201,7 +2199,7 @@ static int ndb_ingester_process_event(secp256k1_context *ctx,
// we will use this to check if we already have it in the DB during
// ID parsing
controller.read_txn = read_txn;
controller.lmdb = ingester->writer->lmdb;
controller.lmdb = ingester->lmdb;
cb.fn = ndb_ingester_json_controller;
cb.data = &controller;

Expand Down Expand Up @@ -3929,6 +3927,14 @@ static void ndb_write_version(struct ndb_txn *txn, uint64_t version)
//fprintf(stderr, "writing version %" PRIu64 "\n", version);
}

static void ndb_monitor_lock(struct ndb_monitor *mon) {
pthread_mutex_lock(&mon->mutex);
}

static void ndb_monitor_unlock(struct ndb_monitor *mon) {
pthread_mutex_unlock(&mon->mutex);
}

struct written_note {
uint64_t note_id;
struct ndb_writer_note *note;
Expand All @@ -3946,6 +3952,8 @@ static void ndb_notify_subscriptions(struct ndb_monitor *monitor,
struct ndb_note *note;
struct ndb_subscription *sub;

ndb_monitor_lock(monitor);

for (i = 0; i < monitor->num_subscriptions; i++) {
sub = &monitor->subscriptions[i];
ndb_debug("checking subscription %d, %d notes\n", i, num_notes);
Expand Down Expand Up @@ -3976,6 +3984,8 @@ static void ndb_notify_subscriptions(struct ndb_monitor *monitor,
monitor->sub_cb(monitor->sub_cb_ctx, sub->subid);
}
}

ndb_monitor_unlock(monitor);
}

static void *ndb_writer_thread(void *data)
Expand Down Expand Up @@ -4118,7 +4128,7 @@ static void *ndb_ingester_thread(void *data)
secp256k1_context *ctx;
struct thread *thread = data;
struct ndb_ingester *ingester = (struct ndb_ingester *)thread->ctx;
struct ndb_lmdb *lmdb = ingester->writer->lmdb;
struct ndb_lmdb *lmdb = ingester->lmdb;
struct ndb_ingester_msg msgs[THREAD_QUEUE_BATCH], *msg;
struct ndb_writer_msg outs[THREAD_QUEUE_BATCH], *out;
int i, to_write, popped, done, any_event;
Expand Down Expand Up @@ -4173,7 +4183,7 @@ static void *ndb_ingester_thread(void *data)

if (to_write > 0) {
ndb_debug("pushing %d events to write queue\n", to_write);
if (!ndb_writer_queue_msgs(ingester->writer, outs, to_write)) {
if (!prot_queue_push_all(ingester->writer_inbox, outs, to_write)) {
ndb_debug("failed pushing %d events to write queue\n", to_write);
}
}
Expand Down Expand Up @@ -4213,7 +4223,8 @@ static int ndb_writer_init(struct ndb_writer *writer, struct ndb_lmdb *lmdb,

// initialize the ingester queue and then spawn the thread
static int ndb_ingester_init(struct ndb_ingester *ingester,
struct ndb_writer *writer,
struct ndb_lmdb *lmdb,
struct prot_queue *writer_inbox,
const struct ndb_config *config)
{
int elem_size, num_elems;
Expand All @@ -4223,7 +4234,8 @@ static int ndb_ingester_init(struct ndb_ingester *ingester,
elem_size = sizeof(struct ndb_ingester_msg);
num_elems = DEFAULT_QUEUE_SIZE;

ingester->writer = writer;
ingester->writer_inbox = writer_inbox;
ingester->lmdb = lmdb;
ingester->flags = config->flags;
ingester->filter = config->ingest_filter;
ingester->filter_context = config->filter_context;
Expand Down Expand Up @@ -4449,6 +4461,7 @@ static void ndb_monitor_init(struct ndb_monitor *monitor, ndb_sub_fn cb,
monitor->num_subscriptions = 0;
monitor->sub_cb = cb;
monitor->sub_cb_ctx = sub_cb_ctx;
pthread_mutex_init(&monitor->mutex, NULL);
}

void ndb_filter_group_destroy(struct ndb_filter_group *group)
Expand Down Expand Up @@ -4500,7 +4513,7 @@ int ndb_init(struct ndb **pndb, const char *filename, const struct ndb_config *c
return 0;
}

if (!ndb_ingester_init(&ndb->ingester, &ndb->writer, config)) {
if (!ndb_ingester_init(&ndb->ingester, &ndb->lmdb, &ndb->writer.inbox, config)) {
fprintf(stderr, "failed to initialize %d ingester thread(s)\n",
config->ingester_threads);
return 0;
Expand Down Expand Up @@ -6545,13 +6558,15 @@ struct ndb_blocks *ndb_get_blocks_by_key(struct ndb *ndb, struct ndb_txn *txn, u
return blocks;
}

struct ndb_subscription *ndb_find_subscription(struct ndb *ndb, uint64_t subid, int *index)
// please call ndb_monitor_lock before calling this
static struct ndb_subscription *
ndb_monitor_find_subscription(struct ndb_monitor *monitor, uint64_t subid, int *index)
{
struct ndb_subscription *sub, *tsub;
int i;

for (i = 0, sub = NULL; i < ndb->monitor.num_subscriptions; i++) {
tsub = &ndb->monitor.subscriptions[i];
for (i = 0, sub = NULL; i < monitor->num_subscriptions; i++) {
tsub = &monitor->subscriptions[i];
if (tsub->subid == subid) {
sub = tsub;
if (index)
Expand All @@ -6567,38 +6582,63 @@ int ndb_poll_for_notes(struct ndb *ndb, uint64_t subid, uint64_t *note_ids,
int note_id_capacity)
{
struct ndb_subscription *sub;
int res;

if (subid == 0)
return 0;

if (!(sub = ndb_find_subscription(ndb, subid, NULL)))
return 0;
ndb_monitor_lock(&ndb->monitor);

if (!(sub = ndb_monitor_find_subscription(&ndb->monitor, subid, NULL)))
res = 0;
else
res = prot_queue_try_pop_all(&sub->inbox, note_ids, note_id_capacity);

return prot_queue_try_pop_all(&sub->inbox, note_ids, note_id_capacity);
ndb_monitor_unlock(&ndb->monitor);

return res;
}

int ndb_wait_for_notes(struct ndb *ndb, uint64_t subid, uint64_t *note_ids,
int note_id_capacity)
int note_id_capacity)
{
struct ndb_subscription *sub;
struct prot_queue *queue_inbox;

// this is not a valid subscription id
// this is not a valid subscription id
if (subid == 0)
return 0;

if (!(sub = ndb_find_subscription(ndb, subid, NULL)))
ndb_monitor_lock(&ndb->monitor);

if (!(sub = ndb_monitor_find_subscription(&ndb->monitor, subid, NULL))) {
ndb_monitor_unlock(&ndb->monitor);
return 0;
}

return prot_queue_pop_all(&sub->inbox, note_ids, note_id_capacity);
queue_inbox = &sub->inbox;

ndb_monitor_unlock(&ndb->monitor);

// there is technically a race condition if the thread yeilds at this
// comment and a subscription is added/removed. A deadlock in the
// writer queue would be much worse though. This function is dubious
// anyways.

return prot_queue_pop_all(queue_inbox, note_ids, note_id_capacity);
}

int ndb_unsubscribe(struct ndb *ndb, uint64_t subid)
{
struct ndb_subscription *sub;
int index, elems_to_move;
int index, res, elems_to_move;

if (!(sub = ndb_find_subscription(ndb, subid, &index)))
return 0;
ndb_monitor_lock(&ndb->monitor);

if (!(sub = ndb_monitor_find_subscription(&ndb->monitor, subid, &index))) {
res = 0;
goto done;
}

ndb_subscription_destroy(sub);

Expand All @@ -6608,21 +6648,12 @@ int ndb_unsubscribe(struct ndb *ndb, uint64_t subid)
&ndb->monitor.subscriptions[index+1],
elems_to_move * sizeof(*sub));

return 1;
}

struct ndb_filter *ndb_subscription_filters(struct ndb *ndb, uint64_t subid, int *filters)
{
struct ndb_subscription *sub;
res = 1;

sub = ndb_find_subscription(ndb, subid, NULL);
if (sub) {
*filters = sub->group.num_filters;
return sub->group.filters;
}
done:
ndb_monitor_unlock(&ndb->monitor);

*filters = 0;
return NULL;
return res;
}

int ndb_num_subscriptions(struct ndb *ndb)
Expand All @@ -6634,33 +6665,41 @@ uint64_t ndb_subscribe(struct ndb *ndb, struct ndb_filter *filters, int num_filt
{
static uint64_t subids = 0;
struct ndb_subscription *sub;
int index;
size_t buflen;
uint64_t subid;
char *buf;

ndb_monitor_lock(&ndb->monitor);

if (ndb->monitor.num_subscriptions + 1 >= MAX_SUBSCRIPTIONS) {
fprintf(stderr, "too many subscriptions\n");
return 0;
subid = 0;
goto done;
}

index = ndb->monitor.num_subscriptions++;
sub = &ndb->monitor.subscriptions[index];
sub = &ndb->monitor.subscriptions[ndb->monitor.num_subscriptions];
subid = ++subids;
sub->subid = subid;

ndb_filter_group_init(&sub->group);
if (!ndb_filter_group_add_filters(&sub->group, filters, num_filters))
return 0;
if (!ndb_filter_group_add_filters(&sub->group, filters, num_filters)) {
subid = 0;
goto done;
}

// 500k ought to be enough for anyone
buflen = sizeof(uint64_t) * 65536;
buflen = sizeof(uint64_t) * DEFAULT_QUEUE_SIZE;
buf = malloc(buflen);

if (!prot_queue_init(&sub->inbox, buf, buflen, sizeof(uint64_t))) {
fprintf(stderr, "failed to push prot queue\n");
return 0;
subid = 0;
goto done;
}

ndb->monitor.num_subscriptions++;
done:
ndb_monitor_unlock(&ndb->monitor);

return subid;
}
1 change: 0 additions & 1 deletion src/nostrdb.h
Original file line number Diff line number Diff line change
Expand Up @@ -530,7 +530,6 @@ int ndb_wait_for_notes(struct ndb *, uint64_t subid, uint64_t *note_ids, int not
int ndb_poll_for_notes(struct ndb *, uint64_t subid, uint64_t *note_ids, int note_id_capacity);
int ndb_unsubscribe(struct ndb *, uint64_t subid);
int ndb_num_subscriptions(struct ndb *);
struct ndb_filter *ndb_subscription_filters(struct ndb *, uint64_t subid, int *filters);

// FULLTEXT SEARCH
int ndb_text_search(struct ndb_txn *txn, const char *query, struct ndb_text_search_results *, struct ndb_text_search_config *);
Expand Down
Loading