diff --git a/src/nostrdb.c b/src/nostrdb.c index 5af5f68..abc066e 100644 --- a/src/nostrdb.c +++ b/src/nostrdb.c @@ -1560,7 +1560,8 @@ struct ndb_writer_msg { static inline int ndb_writer_queue_msg(struct ndb_writer *writer, struct ndb_writer_msg *msg) { - return prot_queue_push(&writer->inbox, msg); + prot_queue_push(&writer->inbox, msg); + return 1; } static int ndb_migrate_utf8_profile_names(struct ndb *ndb) @@ -1670,7 +1671,8 @@ static inline int ndb_writer_queue_msgs(struct ndb_writer *writer, struct ndb_writer_msg *msgs, int num_msgs) { - return prot_queue_push_all(&writer->inbox, msgs, num_msgs); + prot_queue_push_many(&writer->inbox, msgs, num_msgs); + return 1; } static int ndb_writer_queue_note(struct ndb_writer *writer, @@ -1682,7 +1684,8 @@ static int ndb_writer_queue_note(struct ndb_writer *writer, msg.note.note = note; msg.note.note_len = note_len; - return prot_queue_push(&writer->inbox, &msg); + prot_queue_push(&writer->inbox, &msg); + return 1; } static void ndb_writer_last_profile_fetch(struct ndb_txn *txn, @@ -3797,11 +3800,8 @@ static void ndb_notify_subscriptions(struct ndb_monitor *monitor, if (ndb_filter_group_matches(&sub->group, note)) { ndb_debug("pushing note\n"); - if (!prot_queue_push(&sub->inbox, &written->note_id)) { - ndb_debug("couldn't push note to subscriber"); - } else { - pushed++; - } + prot_queue_push(&sub->inbox, &written->note_id); + pushed++; } else { ndb_debug("not pushing note\n"); } @@ -3838,7 +3838,7 @@ static void *ndb_writer_thread(void *data) while (!done) { txn.mdb_txn = NULL; num_notes = 0; - popped = prot_queue_pop_all(&writer->inbox, msgs, THREAD_QUEUE_BATCH); + popped = prot_queue_pop_many(&writer->inbox, msgs, THREAD_QUEUE_BATCH); ndb_debug("writer popped %d items\n", popped); any_note = 0; @@ -3969,7 +3969,7 @@ static void *ndb_ingester_thread(void *data) to_write = 0; any_event = 0; - popped = prot_queue_pop_all(&thread->inbox, msgs, THREAD_QUEUE_BATCH); + popped = prot_queue_pop_many(&thread->inbox, msgs, THREAD_QUEUE_BATCH); ndb_debug("ingester popped %d items\n", popped); for (i = 0; i < popped; i++) { @@ -4081,7 +4081,7 @@ static int ndb_writer_destroy(struct ndb_writer *writer) // kill thread msg.type = NDB_WRITER_QUIT; - if (!prot_queue_push(&writer->inbox, &msg)) { + if (!prot_queue_try_push(&writer->inbox, &msg)) { // queue is too full to push quit message. just kill it. pthread_exit(&writer->thread_id); } else { @@ -6414,7 +6414,7 @@ int ndb_poll_for_notes(struct ndb *ndb, uint64_t subid, uint64_t *note_ids, if (!(sub = ndb_find_subscription(ndb, subid, NULL))) return 0; - return prot_queue_try_pop_all(&sub->inbox, note_ids, note_id_capacity); + return prot_queue_try_pop_many(&sub->inbox, note_ids, note_id_capacity); } int ndb_wait_for_notes(struct ndb *ndb, uint64_t subid, uint64_t *note_ids, @@ -6429,7 +6429,7 @@ int ndb_wait_for_notes(struct ndb *ndb, uint64_t subid, uint64_t *note_ids, if (!(sub = ndb_find_subscription(ndb, subid, NULL))) return 0; - return prot_queue_pop_all(&sub->inbox, note_ids, note_id_capacity); + return prot_queue_pop_many(&sub->inbox, note_ids, note_id_capacity); } int ndb_unsubscribe(struct ndb *ndb, uint64_t subid) diff --git a/src/protected_queue.c b/src/protected_queue.c index 3ada518..d0ad224 100644 --- a/src/protected_queue.c +++ b/src/protected_queue.c @@ -1,153 +1,125 @@ #include "protected_queue.h" -/* - * Push an element onto the queue. - * Params: - * q - Pointer to the queue. - * data - Pointer to the data element to be pushed. - * - * Blocks if no space is available. - */ -int prot_queue_push(struct prot_queue* q, void *data) +static void copy_bytes_in(struct prot_queue *q, + const void *src, + size_t bytes) { - int cap; - - pthread_mutex_lock(&q->mutex); - - // Wait until there's room. - while ((cap = prot_queue_capacity(q)) == q->count) - pthread_cond_wait(&q->cond_removed, &q->mutex); - - memcpy(&q->buf[q->tail * q->elem_size], data, q->elem_size); - q->tail = (q->tail + 1) % cap; - q->count++; - - pthread_cond_signal(&q->cond_added); - pthread_mutex_unlock(&q->mutex); - return 1; + /* If it wraps, we do it in two parts */ + if (unlikely(q->tail + bytes > q->buflen)) { + size_t part1 = q->buflen - q->tail; + memcpy(q->buf + q->tail, src, part1); + q->tail = 0; + q->bytes += part1; + src = (const char *)src + part1; + bytes -= part1; + } + memcpy(q->buf + q->tail, src, bytes); + q->tail += bytes; + q->bytes += bytes; } -/* - * Push multiple elements onto the queue. - * Params: - * q - Pointer to the queue. - * data - Pointer to the data elements to be pushed. - * count - Number of elements to push. - * - * Returns the number of elements successfully pushed, 0 if the queue is full or if there is not enough contiguous space. - */ -int prot_queue_push_all(struct prot_queue* q, void *data, int count) +static void copy_bytes_out(struct prot_queue *q, + void *dst, + size_t bytes) { - int cap; - int first_copy_count, second_copy_count; - - cap = prot_queue_capacity(q); - assert(count <= cap); - pthread_mutex_lock(&q->mutex); - - while (q->count + count > cap) - pthread_cond_wait(&q->cond_removed, &q->mutex); - - first_copy_count = min(count, cap - q->tail); // Elements until the end of the buffer - second_copy_count = count - first_copy_count; // Remaining elements if wrap around - - memcpy(&q->buf[q->tail * q->elem_size], data, first_copy_count * q->elem_size); - q->tail = (q->tail + first_copy_count) % cap; - - if (second_copy_count > 0) { - // If there is a wrap around, copy the remaining elements - memcpy(&q->buf[q->tail * q->elem_size], (char *)data + first_copy_count * q->elem_size, second_copy_count * q->elem_size); - q->tail = (q->tail + second_copy_count) % cap; + /* If it wraps, we do it in two parts */ + if (unlikely(q->head + bytes > q->buflen)) { + size_t part1 = q->buflen - q->head; + memcpy(dst, q->buf + q->head, part1); + q->head = 0; + q->bytes -= part1; + dst = (char *)dst + part1; + bytes -= part1; } - - q->count += count; - - pthread_cond_signal(&q->cond_added); // Signal a waiting thread - pthread_mutex_unlock(&q->mutex); - - return count; + memcpy(dst, q->buf + q->head, bytes); + q->head += bytes; + q->bytes -= bytes; } -/* - * Try to pop an element from the queue without blocking. - * Params: - * q - Pointer to the queue. - * data - Pointer to where the popped data will be stored. - * Returns 1 if successful, 0 if the queue is empty. - */ -int prot_queue_try_pop_all(struct prot_queue *q, void *data, int max_items) { - int items_to_pop, items_until_end; - +static bool push(struct prot_queue* q, const void *data, size_t len, + bool block) +{ pthread_mutex_lock(&q->mutex); - if (q->count == 0) { - pthread_mutex_unlock(&q->mutex); - return 0; + // Wait until there's room. + while (q->bytes + len > q->buflen) { + if (!block) { + pthread_mutex_unlock(&q->mutex); + return false; + } + pthread_cond_wait(&q->cond_removed, &q->mutex); } - items_until_end = (q->buflen - q->head * q->elem_size) / q->elem_size; - items_to_pop = min(q->count, max_items); - items_to_pop = min(items_to_pop, items_until_end); - - memcpy(data, &q->buf[q->head * q->elem_size], items_to_pop * q->elem_size); - q->head = (q->head + items_to_pop) % prot_queue_capacity(q); - q->count -= items_to_pop; + copy_bytes_in(q, data, len); - pthread_cond_signal(&q->cond_removed); // Signal a waiting thread + pthread_cond_signal(&q->cond_added); pthread_mutex_unlock(&q->mutex); - return items_to_pop; + return true; } -/* - * Wait until we have elements, and then pop multiple elements from the queue - * up to the specified maximum. - * - * Params: - * q - Pointer to the queue. - * buffer - Pointer to the buffer where popped data will be stored. - * max_items - Maximum number of items to pop from the queue. - * Returns the actual number of items popped. - */ -int prot_queue_pop_all(struct prot_queue *q, void *dest, int max_items) { +static size_t pull(struct prot_queue* q, + void *data, + size_t min_len, size_t max_len, + bool block) +{ + size_t len; pthread_mutex_lock(&q->mutex); - // Wait until there's at least one item to pop - while (q->count == 0) { + // Wait until there's enough contents. + while (q->bytes < min_len) { + if (!block) { + pthread_mutex_unlock(&q->mutex); + return false; + } pthread_cond_wait(&q->cond_added, &q->mutex); } - int items_until_end = (q->buflen - q->head * q->elem_size) / q->elem_size; - int items_to_pop = min(q->count, max_items); - items_to_pop = min(items_to_pop, items_until_end); - - memcpy(dest, &q->buf[q->head * q->elem_size], items_to_pop * q->elem_size); - q->head = (q->head + items_to_pop) % prot_queue_capacity(q); - q->count -= items_to_pop; + len = q->bytes; + if (len > max_len) + len = max_len; + copy_bytes_out(q, data, len); pthread_cond_signal(&q->cond_removed); pthread_mutex_unlock(&q->mutex); + return len; +} - return items_to_pop; +/* Push an element onto the queue. Blocks if it needs to */ +void prot_queue_push(struct prot_queue* q, const void *data) +{ + prot_queue_push_many(q, data, 1); } -/* - * Pop an element from the queue. Blocks if the queue is empty. - * Params: - * q - Pointer to the queue. - * data - Pointer to where the popped data will be stored. - */ -void prot_queue_pop(struct prot_queue *q, void *data) { - pthread_mutex_lock(&q->mutex); +/* Push elements onto the queue. Blocks if it needs to */ +void prot_queue_push_many(struct prot_queue* q, + const void *data, + size_t count) +{ + push(q, data, count * q->elem_size, true); +} - while (q->count == 0) - pthread_cond_wait(&q->cond_added, &q->mutex); +/* Push an element onto the queue. Returns false if it would block. */ +bool prot_queue_try_push(struct prot_queue* q, const void *data) +{ + return push(q, data, q->elem_size, false); +} + +size_t prot_queue_try_pop_many(struct prot_queue *q, void *data, + size_t max_items) +{ + return pull(q, data, q->elem_size, max_items * q->elem_size, false) + / q->elem_size; +} - memcpy(data, &q->buf[q->head * q->elem_size], q->elem_size); - q->head = (q->head + 1) % prot_queue_capacity(q); - q->count--; +size_t prot_queue_pop_many(struct prot_queue *q, void *data, size_t max_items) +{ + return pull(q, data, q->elem_size, max_items * q->elem_size, true) + / q->elem_size; +} - pthread_cond_signal(&q->cond_removed); - pthread_mutex_unlock(&q->mutex); +void prot_queue_pop(struct prot_queue *q, void *data) +{ + pull(q, data, q->elem_size, q->elem_size, true); } /* diff --git a/src/protected_queue.h b/src/protected_queue.h index ac19553..573a209 100644 --- a/src/protected_queue.h +++ b/src/protected_queue.h @@ -1,5 +1,5 @@ /* - * This header file provides a thread-safe queue implementation for generic +v * This header file provides a thread-safe queue implementation for generic * data elements. It uses POSIX threads (pthreads) to ensure thread safety. * The queue allows for pushing and popping elements, with the ability to * block or non-block on pop operations. Users are responsible for providing @@ -24,14 +24,18 @@ * generic data elements. */ struct prot_queue { + /* These don't change */ unsigned char *buf; size_t buflen; - - int head; - int tail; - int count; int elem_size; + /* These do: you must hold mutex! Offsets in *bytes* */ + size_t head; + size_t tail; + /* This seems redundant, but it isn't: head == tail means either + * full, or empty! */ + size_t bytes; + pthread_mutex_t mutex; /* Added */ pthread_cond_t cond_added; @@ -52,13 +56,12 @@ struct prot_queue { static inline int prot_queue_init(struct prot_queue* q, void* buf, size_t buflen, int elem_size) { - // buffer elements must fit nicely in the buffer - if (buflen == 0 || buflen % elem_size != 0) - assert(!"queue elements don't fit nicely"); + if (buflen < elem_size) + assert(!"What is this, a queue for ants?"); q->head = 0; q->tail = 0; - q->count = 0; + q->bytes = 0; q->buf = buf; q->buflen = buflen; q->elem_size = elem_size; @@ -70,35 +73,23 @@ static inline int prot_queue_init(struct prot_queue* q, void* buf, return 1; } -/* - * Return the capacity of the queue. - * q - Pointer to the queue. - */ -static inline size_t prot_queue_capacity(struct prot_queue *q) { - return q->buflen / q->elem_size; -} +/* Push an element onto the queue. Blocks if it needs to */ +void prot_queue_push(struct prot_queue* q, const void *data); -int prot_queue_push(struct prot_queue* q, void *data); +/* Push elements onto the queue. Blocks if it needs to */ +void prot_queue_push_many(struct prot_queue* q, const void *data, size_t count); -/* - * Push multiple elements onto the queue. - * Params: - * q - Pointer to the queue. - * data - Pointer to the data elements to be pushed. - * count - Number of elements to push. - * - * Returns the number of elements successfully pushed, 0 if the queue is full or if there is not enough contiguous space. - */ -int prot_queue_push_all(struct prot_queue* q, void *data, int count); +/* Push an element onto the queue. Returns false if it would block. */ +bool prot_queue_try_push(struct prot_queue* q, const void *data); /* * Try to pop an element from the queue without blocking. * Params: * q - Pointer to the queue. * data - Pointer to where the popped data will be stored. - * Returns 1 if successful, 0 if the queue is empty. + * Returns number popped (<= max_items) */ -int prot_queue_try_pop_all(struct prot_queue *q, void *data, int max_items); +size_t prot_queue_try_pop_many(struct prot_queue *q, void *data, size_t max_items); /* * Wait until we have elements, and then pop multiple elements from the queue @@ -108,9 +99,9 @@ int prot_queue_try_pop_all(struct prot_queue *q, void *data, int max_items); * q - Pointer to the queue. * buffer - Pointer to the buffer where popped data will be stored. * max_items - Maximum number of items to pop from the queue. - * Returns the actual number of items popped. + * Returns the actual number of items popped (> 0). */ -int prot_queue_pop_all(struct prot_queue *q, void *dest, int max_items); +size_t prot_queue_pop_many(struct prot_queue *q, void *data, size_t max_items); /* * Pop an element from the queue. Blocks if the queue is empty. diff --git a/src/threadpool.h b/src/threadpool.h index c79adb1..f297ee2 100644 --- a/src/threadpool.h +++ b/src/threadpool.h @@ -73,14 +73,16 @@ static inline struct thread *threadpool_next_thread(struct threadpool *tp) static inline int threadpool_dispatch(struct threadpool *tp, void *msg) { struct thread *t = threadpool_next_thread(tp); - return prot_queue_push(&t->inbox, msg); + prot_queue_push(&t->inbox, msg); + return 1; } static inline int threadpool_dispatch_all(struct threadpool *tp, void *msgs, int num_msgs) { struct thread *t = threadpool_next_thread(tp); - return prot_queue_push_all(&t->inbox, msgs, num_msgs); + prot_queue_push_many(&t->inbox, msgs, num_msgs); + return 1; } static inline void threadpool_destroy(struct threadpool *tp) @@ -89,7 +91,7 @@ static inline void threadpool_destroy(struct threadpool *tp) for (int i = 0; i < tp->num_threads; i++) { t = &tp->pool[i]; - if (!prot_queue_push(&t->inbox, tp->quit_msg)) { + if (!prot_queue_try_push(&t->inbox, tp->quit_msg)) { pthread_exit(&t->thread_id); } else { pthread_join(t->thread_id, NULL); diff --git a/test.c b/test.c index 183963a..5e283dc 100644 --- a/test.c +++ b/test.c @@ -1235,22 +1235,22 @@ static void test_queue_init_pop_push() { // Push and Pop data = 5; - assert(prot_queue_push(&q, &data) == 1); + assert(prot_queue_try_push(&q, &data) == true); prot_queue_pop(&q, &data); assert(data == 5); // Push to full, and then fail to push for (int i = 0; i < TEST_BUF_SIZE; i++) { - assert(prot_queue_push(&q, &i) == 1); + assert(prot_queue_try_push(&q, &i) == true); } -// assert(prot_queue_push(&q, &data) == 0); // Should fail as queue is full + assert(prot_queue_try_push(&q, &data) == false); // Should fail as queue is full // Pop to empty, and then fail to pop for (int i = 0; i < TEST_BUF_SIZE; i++) { - assert(prot_queue_try_pop_all(&q, &data, 1) == 1); + assert(prot_queue_try_pop_many(&q, &data, 1) == 1); assert(data == i); } - assert(prot_queue_try_pop_all(&q, &data, 1) == 0); // Should fail as queue is empty + assert(prot_queue_try_pop_many(&q, &data, 1) == 0); // Should fail as queue is empty } // This function will be used by threads to test thread safety. @@ -1285,7 +1285,7 @@ static void test_queue_thread_safety() { // After all operations, the queue should be empty int data; - assert(prot_queue_try_pop_all(&q, &data, 1) == 0); + assert(prot_queue_try_pop_many(&q, &data, 1) == 0); } static void test_queue_boundary_conditions() { @@ -1298,35 +1298,35 @@ static void test_queue_boundary_conditions() { // Push to full for (int i = 0; i < TEST_BUF_SIZE; i++) { - assert(prot_queue_push(&q, &i) == 1); + assert(prot_queue_try_push(&q, &i) == 1); } // Try to push to a full queue int old_head = q.head; int old_tail = q.tail; - int old_count = q.count; -// assert(prot_queue_push(&q, &data) == 0); + int old_bytes = q.bytes; + assert(prot_queue_try_push(&q, &data) == 0); // Assert the queue's state has not changed assert(old_head == q.head); assert(old_tail == q.tail); - assert(old_count == q.count); + assert(old_bytes == q.bytes); // Pop to empty for (int i = 0; i < TEST_BUF_SIZE; i++) { - assert(prot_queue_try_pop_all(&q, &data, 1) == 1); + assert(prot_queue_try_pop_many(&q, &data, 1) == 1); } // Try to pop from an empty queue old_head = q.head; old_tail = q.tail; - old_count = q.count; - assert(prot_queue_try_pop_all(&q, &data, 1) == 0); + old_bytes = q.bytes; + assert(prot_queue_try_pop_many(&q, &data, 1) == 0); // Assert the queue's state has not changed assert(old_head == q.head); assert(old_tail == q.tail); - assert(old_count == q.count); + assert(old_bytes == q.bytes); } static void test_fast_strchr()