From d90666a206d14095d1b9d53642c2711a96f4cac4 Mon Sep 17 00:00:00 2001 From: William Casarin Date: Mon, 9 Dec 2024 13:49:35 -0800 Subject: [PATCH 1/2] mem: reduce default queue size This was overkill and was using lots of memory Signed-off-by: William Casarin --- src/nostrdb.c | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/nostrdb.c b/src/nostrdb.c index 16102cf..a6b5007 100644 --- a/src/nostrdb.c +++ b/src/nostrdb.c @@ -41,8 +41,7 @@ #define MAX_FILTERS 16 // the maximum size of inbox queues -static const int DEFAULT_QUEUE_SIZE = 1000000; - +static const int DEFAULT_QUEUE_SIZE = 32768; // increase if we need bigger filters #define NDB_FILTER_PAGES 64 @@ -6654,7 +6653,7 @@ uint64_t ndb_subscribe(struct ndb *ndb, struct ndb_filter *filters, int num_filt return 0; // 500k ought to be enough for anyone - buflen = sizeof(uint64_t) * 65536; + buflen = sizeof(uint64_t) * DEFAULT_QUEUE_SIZE; buf = malloc(buflen); if (!prot_queue_init(&sub->inbox, buf, buflen, sizeof(uint64_t))) { From 67f3a5e4df3ecf9be0f1971d14123e7e0d17381f Mon Sep 17 00:00:00 2001 From: William Casarin Date: Mon, 9 Dec 2024 14:48:31 -0800 Subject: [PATCH 2/2] make the subscription monitor threadsafe This was the only thing that wasn't threadsafe. Add a simple mutex instead of a queue so that polling is quick. This also means we can't really return the internal subscriptions anymore, so we remove that for now until we have a safer interface. Fixes: https://github.com/damus-io/nostrdb/issues/55 Signed-off-by: William Casarin --- src/nostrdb.c | 134 ++++++++++++++++++++++++++++++++------------------ src/nostrdb.h | 1 - 2 files changed, 87 insertions(+), 48 deletions(-) diff --git a/src/nostrdb.c b/src/nostrdb.c index a6b5007..7ea5289 100644 --- a/src/nostrdb.c +++ b/src/nostrdb.c @@ -170,9 +170,10 @@ struct ndb_writer { }; struct ndb_ingester { + struct ndb_lmdb *lmdb; uint32_t flags; struct threadpool tp; - struct ndb_writer *writer; + struct prot_queue *writer_inbox; void *filter_context; ndb_ingest_filter_fn filter; }; @@ -193,6 +194,11 @@ struct ndb_monitor { ndb_sub_fn sub_cb; void *sub_cb_ctx; int num_subscriptions; + + // monitor isn't a full inbox. We want pollers to be able to poll + // subscriptions efficiently without going through a message queue, so + // we use a simple mutex here. + pthread_mutex_t mutex; }; struct ndb { @@ -1721,13 +1727,6 @@ int ndb_note_verify(void *ctx, unsigned char pubkey[32], unsigned char id[32], return 1; } -static inline int ndb_writer_queue_msgs(struct ndb_writer *writer, - struct ndb_writer_msg *msgs, - int num_msgs) -{ - return prot_queue_push_all(&writer->inbox, msgs, num_msgs); -} - static int ndb_writer_queue_note(struct ndb_writer *writer, struct ndb_note *note, size_t note_len) { @@ -2200,7 +2199,7 @@ static int ndb_ingester_process_event(secp256k1_context *ctx, // we will use this to check if we already have it in the DB during // ID parsing controller.read_txn = read_txn; - controller.lmdb = ingester->writer->lmdb; + controller.lmdb = ingester->lmdb; cb.fn = ndb_ingester_json_controller; cb.data = &controller; @@ -3928,6 +3927,14 @@ static void ndb_write_version(struct ndb_txn *txn, uint64_t version) //fprintf(stderr, "writing version %" PRIu64 "\n", version); } +static void ndb_monitor_lock(struct ndb_monitor *mon) { + pthread_mutex_lock(&mon->mutex); +} + +static void ndb_monitor_unlock(struct ndb_monitor *mon) { + pthread_mutex_unlock(&mon->mutex); +} + struct written_note { uint64_t note_id; struct ndb_writer_note *note; @@ -3945,6 +3952,8 @@ static void ndb_notify_subscriptions(struct ndb_monitor *monitor, struct ndb_note *note; struct ndb_subscription *sub; + ndb_monitor_lock(monitor); + for (i = 0; i < monitor->num_subscriptions; i++) { sub = &monitor->subscriptions[i]; ndb_debug("checking subscription %d, %d notes\n", i, num_notes); @@ -3975,6 +3984,8 @@ static void ndb_notify_subscriptions(struct ndb_monitor *monitor, monitor->sub_cb(monitor->sub_cb_ctx, sub->subid); } } + + ndb_monitor_unlock(monitor); } static void *ndb_writer_thread(void *data) @@ -4117,7 +4128,7 @@ static void *ndb_ingester_thread(void *data) secp256k1_context *ctx; struct thread *thread = data; struct ndb_ingester *ingester = (struct ndb_ingester *)thread->ctx; - struct ndb_lmdb *lmdb = ingester->writer->lmdb; + struct ndb_lmdb *lmdb = ingester->lmdb; struct ndb_ingester_msg msgs[THREAD_QUEUE_BATCH], *msg; struct ndb_writer_msg outs[THREAD_QUEUE_BATCH], *out; int i, to_write, popped, done, any_event; @@ -4172,7 +4183,7 @@ static void *ndb_ingester_thread(void *data) if (to_write > 0) { ndb_debug("pushing %d events to write queue\n", to_write); - if (!ndb_writer_queue_msgs(ingester->writer, outs, to_write)) { + if (!prot_queue_push_all(ingester->writer_inbox, outs, to_write)) { ndb_debug("failed pushing %d events to write queue\n", to_write); } } @@ -4212,7 +4223,8 @@ static int ndb_writer_init(struct ndb_writer *writer, struct ndb_lmdb *lmdb, // initialize the ingester queue and then spawn the thread static int ndb_ingester_init(struct ndb_ingester *ingester, - struct ndb_writer *writer, + struct ndb_lmdb *lmdb, + struct prot_queue *writer_inbox, const struct ndb_config *config) { int elem_size, num_elems; @@ -4222,7 +4234,8 @@ static int ndb_ingester_init(struct ndb_ingester *ingester, elem_size = sizeof(struct ndb_ingester_msg); num_elems = DEFAULT_QUEUE_SIZE; - ingester->writer = writer; + ingester->writer_inbox = writer_inbox; + ingester->lmdb = lmdb; ingester->flags = config->flags; ingester->filter = config->ingest_filter; ingester->filter_context = config->filter_context; @@ -4448,6 +4461,7 @@ static void ndb_monitor_init(struct ndb_monitor *monitor, ndb_sub_fn cb, monitor->num_subscriptions = 0; monitor->sub_cb = cb; monitor->sub_cb_ctx = sub_cb_ctx; + pthread_mutex_init(&monitor->mutex, NULL); } void ndb_filter_group_destroy(struct ndb_filter_group *group) @@ -4499,7 +4513,7 @@ int ndb_init(struct ndb **pndb, const char *filename, const struct ndb_config *c return 0; } - if (!ndb_ingester_init(&ndb->ingester, &ndb->writer, config)) { + if (!ndb_ingester_init(&ndb->ingester, &ndb->lmdb, &ndb->writer.inbox, config)) { fprintf(stderr, "failed to initialize %d ingester thread(s)\n", config->ingester_threads); return 0; @@ -6544,13 +6558,15 @@ struct ndb_blocks *ndb_get_blocks_by_key(struct ndb *ndb, struct ndb_txn *txn, u return blocks; } -struct ndb_subscription *ndb_find_subscription(struct ndb *ndb, uint64_t subid, int *index) +// please call ndb_monitor_lock before calling this +static struct ndb_subscription * +ndb_monitor_find_subscription(struct ndb_monitor *monitor, uint64_t subid, int *index) { struct ndb_subscription *sub, *tsub; int i; - for (i = 0, sub = NULL; i < ndb->monitor.num_subscriptions; i++) { - tsub = &ndb->monitor.subscriptions[i]; + for (i = 0, sub = NULL; i < monitor->num_subscriptions; i++) { + tsub = &monitor->subscriptions[i]; if (tsub->subid == subid) { sub = tsub; if (index) @@ -6566,38 +6582,63 @@ int ndb_poll_for_notes(struct ndb *ndb, uint64_t subid, uint64_t *note_ids, int note_id_capacity) { struct ndb_subscription *sub; + int res; if (subid == 0) return 0; - if (!(sub = ndb_find_subscription(ndb, subid, NULL))) - return 0; + ndb_monitor_lock(&ndb->monitor); + + if (!(sub = ndb_monitor_find_subscription(&ndb->monitor, subid, NULL))) + res = 0; + else + res = prot_queue_try_pop_all(&sub->inbox, note_ids, note_id_capacity); + + ndb_monitor_unlock(&ndb->monitor); - return prot_queue_try_pop_all(&sub->inbox, note_ids, note_id_capacity); + return res; } int ndb_wait_for_notes(struct ndb *ndb, uint64_t subid, uint64_t *note_ids, - int note_id_capacity) + int note_id_capacity) { struct ndb_subscription *sub; + struct prot_queue *queue_inbox; - // this is not a valid subscription id + // this is not a valid subscription id if (subid == 0) return 0; - if (!(sub = ndb_find_subscription(ndb, subid, NULL))) + ndb_monitor_lock(&ndb->monitor); + + if (!(sub = ndb_monitor_find_subscription(&ndb->monitor, subid, NULL))) { + ndb_monitor_unlock(&ndb->monitor); return 0; + } + + queue_inbox = &sub->inbox; - return prot_queue_pop_all(&sub->inbox, note_ids, note_id_capacity); + ndb_monitor_unlock(&ndb->monitor); + + // there is technically a race condition if the thread yeilds at this + // comment and a subscription is added/removed. A deadlock in the + // writer queue would be much worse though. This function is dubious + // anyways. + + return prot_queue_pop_all(queue_inbox, note_ids, note_id_capacity); } int ndb_unsubscribe(struct ndb *ndb, uint64_t subid) { struct ndb_subscription *sub; - int index, elems_to_move; + int index, res, elems_to_move; - if (!(sub = ndb_find_subscription(ndb, subid, &index))) - return 0; + ndb_monitor_lock(&ndb->monitor); + + if (!(sub = ndb_monitor_find_subscription(&ndb->monitor, subid, &index))) { + res = 0; + goto done; + } ndb_subscription_destroy(sub); @@ -6607,21 +6648,12 @@ int ndb_unsubscribe(struct ndb *ndb, uint64_t subid) &ndb->monitor.subscriptions[index+1], elems_to_move * sizeof(*sub)); - return 1; -} + res = 1; -struct ndb_filter *ndb_subscription_filters(struct ndb *ndb, uint64_t subid, int *filters) -{ - struct ndb_subscription *sub; +done: + ndb_monitor_unlock(&ndb->monitor); - sub = ndb_find_subscription(ndb, subid, NULL); - if (sub) { - *filters = sub->group.num_filters; - return sub->group.filters; - } - - *filters = 0; - return NULL; + return res; } int ndb_num_subscriptions(struct ndb *ndb) @@ -6633,24 +6665,27 @@ uint64_t ndb_subscribe(struct ndb *ndb, struct ndb_filter *filters, int num_filt { static uint64_t subids = 0; struct ndb_subscription *sub; - int index; size_t buflen; uint64_t subid; char *buf; + ndb_monitor_lock(&ndb->monitor); + if (ndb->monitor.num_subscriptions + 1 >= MAX_SUBSCRIPTIONS) { fprintf(stderr, "too many subscriptions\n"); - return 0; + subid = 0; + goto done; } - index = ndb->monitor.num_subscriptions++; - sub = &ndb->monitor.subscriptions[index]; + sub = &ndb->monitor.subscriptions[ndb->monitor.num_subscriptions]; subid = ++subids; sub->subid = subid; ndb_filter_group_init(&sub->group); - if (!ndb_filter_group_add_filters(&sub->group, filters, num_filters)) - return 0; + if (!ndb_filter_group_add_filters(&sub->group, filters, num_filters)) { + subid = 0; + goto done; + } // 500k ought to be enough for anyone buflen = sizeof(uint64_t) * DEFAULT_QUEUE_SIZE; @@ -6658,8 +6693,13 @@ uint64_t ndb_subscribe(struct ndb *ndb, struct ndb_filter *filters, int num_filt if (!prot_queue_init(&sub->inbox, buf, buflen, sizeof(uint64_t))) { fprintf(stderr, "failed to push prot queue\n"); - return 0; + subid = 0; + goto done; } + ndb->monitor.num_subscriptions++; +done: + ndb_monitor_unlock(&ndb->monitor); + return subid; } diff --git a/src/nostrdb.h b/src/nostrdb.h index f6d09b0..ec6afdf 100644 --- a/src/nostrdb.h +++ b/src/nostrdb.h @@ -530,7 +530,6 @@ int ndb_wait_for_notes(struct ndb *, uint64_t subid, uint64_t *note_ids, int not int ndb_poll_for_notes(struct ndb *, uint64_t subid, uint64_t *note_ids, int note_id_capacity); int ndb_unsubscribe(struct ndb *, uint64_t subid); int ndb_num_subscriptions(struct ndb *); -struct ndb_filter *ndb_subscription_filters(struct ndb *, uint64_t subid, int *filters); // FULLTEXT SEARCH int ndb_text_search(struct ndb_txn *txn, const char *query, struct ndb_text_search_results *, struct ndb_text_search_config *);