Skip to content

Commit

Permalink
protected_queue: rework for simplicity.
Browse files Browse the repository at this point in the history
1. Write generic routines to copy bytes in and out of ring buffer.
2. Write generic routines to pull/pull with/without blocking.
3. Implemented *_many and *_try functions in these terms.

Signed-off-by: Rusty Russell <[email protected]>
  • Loading branch information
rustyrussell committed Aug 27, 2024
1 parent 29ee224 commit ff577a3
Show file tree
Hide file tree
Showing 5 changed files with 145 additions and 180 deletions.
26 changes: 13 additions & 13 deletions src/nostrdb.c
Original file line number Diff line number Diff line change
Expand Up @@ -1560,7 +1560,8 @@ struct ndb_writer_msg {
static inline int ndb_writer_queue_msg(struct ndb_writer *writer,
struct ndb_writer_msg *msg)
{
return prot_queue_push(&writer->inbox, msg);
prot_queue_push(&writer->inbox, msg);
return 1;
}

static int ndb_migrate_utf8_profile_names(struct ndb *ndb)
Expand Down Expand Up @@ -1670,7 +1671,8 @@ static inline int ndb_writer_queue_msgs(struct ndb_writer *writer,
struct ndb_writer_msg *msgs,
int num_msgs)
{
return prot_queue_push_all(&writer->inbox, msgs, num_msgs);
prot_queue_push_many(&writer->inbox, msgs, num_msgs);
return 1;
}

static int ndb_writer_queue_note(struct ndb_writer *writer,
Expand All @@ -1682,7 +1684,8 @@ static int ndb_writer_queue_note(struct ndb_writer *writer,
msg.note.note = note;
msg.note.note_len = note_len;

return prot_queue_push(&writer->inbox, &msg);
prot_queue_push(&writer->inbox, &msg);
return 1;
}

static void ndb_writer_last_profile_fetch(struct ndb_txn *txn,
Expand Down Expand Up @@ -3797,11 +3800,8 @@ static void ndb_notify_subscriptions(struct ndb_monitor *monitor,
if (ndb_filter_group_matches(&sub->group, note)) {
ndb_debug("pushing note\n");

if (!prot_queue_push(&sub->inbox, &written->note_id)) {
ndb_debug("couldn't push note to subscriber");
} else {
pushed++;
}
prot_queue_push(&sub->inbox, &written->note_id);
pushed++;
} else {
ndb_debug("not pushing note\n");
}
Expand Down Expand Up @@ -3838,7 +3838,7 @@ static void *ndb_writer_thread(void *data)
while (!done) {
txn.mdb_txn = NULL;
num_notes = 0;
popped = prot_queue_pop_all(&writer->inbox, msgs, THREAD_QUEUE_BATCH);
popped = prot_queue_pop_many(&writer->inbox, msgs, THREAD_QUEUE_BATCH);
ndb_debug("writer popped %d items\n", popped);

any_note = 0;
Expand Down Expand Up @@ -3969,7 +3969,7 @@ static void *ndb_ingester_thread(void *data)
to_write = 0;
any_event = 0;

popped = prot_queue_pop_all(&thread->inbox, msgs, THREAD_QUEUE_BATCH);
popped = prot_queue_pop_many(&thread->inbox, msgs, THREAD_QUEUE_BATCH);
ndb_debug("ingester popped %d items\n", popped);

for (i = 0; i < popped; i++) {
Expand Down Expand Up @@ -4081,7 +4081,7 @@ static int ndb_writer_destroy(struct ndb_writer *writer)

// kill thread
msg.type = NDB_WRITER_QUIT;
if (!prot_queue_push(&writer->inbox, &msg)) {
if (!prot_queue_try_push(&writer->inbox, &msg)) {
// queue is too full to push quit message. just kill it.
pthread_exit(&writer->thread_id);
} else {
Expand Down Expand Up @@ -6414,7 +6414,7 @@ int ndb_poll_for_notes(struct ndb *ndb, uint64_t subid, uint64_t *note_ids,
if (!(sub = ndb_find_subscription(ndb, subid, NULL)))
return 0;

return prot_queue_try_pop_all(&sub->inbox, note_ids, note_id_capacity);
return prot_queue_try_pop_many(&sub->inbox, note_ids, note_id_capacity);
}

int ndb_wait_for_notes(struct ndb *ndb, uint64_t subid, uint64_t *note_ids,
Expand All @@ -6429,7 +6429,7 @@ int ndb_wait_for_notes(struct ndb *ndb, uint64_t subid, uint64_t *note_ids,
if (!(sub = ndb_find_subscription(ndb, subid, NULL)))
return 0;

return prot_queue_pop_all(&sub->inbox, note_ids, note_id_capacity);
return prot_queue_pop_many(&sub->inbox, note_ids, note_id_capacity);
}

int ndb_unsubscribe(struct ndb *ndb, uint64_t subid)
Expand Down
210 changes: 91 additions & 119 deletions src/protected_queue.c
Original file line number Diff line number Diff line change
@@ -1,153 +1,125 @@
#include "protected_queue.h"

/*
* Push an element onto the queue.
* Params:
* q - Pointer to the queue.
* data - Pointer to the data element to be pushed.
*
* Blocks if no space is available.
*/
int prot_queue_push(struct prot_queue* q, void *data)
static void copy_bytes_in(struct prot_queue *q,
const void *src,
size_t bytes)
{
int cap;

pthread_mutex_lock(&q->mutex);

// Wait until there's room.
while ((cap = prot_queue_capacity(q)) == q->count)
pthread_cond_wait(&q->cond_removed, &q->mutex);

memcpy(&q->buf[q->tail * q->elem_size], data, q->elem_size);
q->tail = (q->tail + 1) % cap;
q->count++;

pthread_cond_signal(&q->cond_added);
pthread_mutex_unlock(&q->mutex);
return 1;
/* If it wraps, we do it in two parts */
if (unlikely(q->tail + bytes > q->buflen)) {
size_t part1 = q->buflen - q->tail;
memcpy(q->buf + q->tail, src, part1);
q->tail = 0;
q->bytes += part1;
src = (const char *)src + part1;
bytes -= part1;
}
memcpy(q->buf + q->tail, src, bytes);
q->tail += bytes;
q->bytes += bytes;
}

/*
* Push multiple elements onto the queue.
* Params:
* q - Pointer to the queue.
* data - Pointer to the data elements to be pushed.
* count - Number of elements to push.
*
* Returns the number of elements successfully pushed, 0 if the queue is full or if there is not enough contiguous space.
*/
int prot_queue_push_all(struct prot_queue* q, void *data, int count)
static void copy_bytes_out(struct prot_queue *q,
void *dst,
size_t bytes)
{
int cap;
int first_copy_count, second_copy_count;

cap = prot_queue_capacity(q);
assert(count <= cap);
pthread_mutex_lock(&q->mutex);

while (q->count + count > cap)
pthread_cond_wait(&q->cond_removed, &q->mutex);

first_copy_count = min(count, cap - q->tail); // Elements until the end of the buffer
second_copy_count = count - first_copy_count; // Remaining elements if wrap around

memcpy(&q->buf[q->tail * q->elem_size], data, first_copy_count * q->elem_size);
q->tail = (q->tail + first_copy_count) % cap;

if (second_copy_count > 0) {
// If there is a wrap around, copy the remaining elements
memcpy(&q->buf[q->tail * q->elem_size], (char *)data + first_copy_count * q->elem_size, second_copy_count * q->elem_size);
q->tail = (q->tail + second_copy_count) % cap;
/* If it wraps, we do it in two parts */
if (unlikely(q->head + bytes > q->buflen)) {
size_t part1 = q->buflen - q->head;
memcpy(dst, q->buf + q->head, part1);
q->head = 0;
q->bytes -= part1;
dst = (char *)dst + part1;
bytes -= part1;
}

q->count += count;

pthread_cond_signal(&q->cond_added); // Signal a waiting thread
pthread_mutex_unlock(&q->mutex);

return count;
memcpy(dst, q->buf + q->head, bytes);
q->head += bytes;
q->bytes -= bytes;
}

/*
* Try to pop an element from the queue without blocking.
* Params:
* q - Pointer to the queue.
* data - Pointer to where the popped data will be stored.
* Returns 1 if successful, 0 if the queue is empty.
*/
int prot_queue_try_pop_all(struct prot_queue *q, void *data, int max_items) {
int items_to_pop, items_until_end;

static bool push(struct prot_queue* q, const void *data, size_t len,
bool block)
{
pthread_mutex_lock(&q->mutex);

if (q->count == 0) {
pthread_mutex_unlock(&q->mutex);
return 0;
// Wait until there's room.
while (q->bytes + len > q->buflen) {
if (!block) {
pthread_mutex_unlock(&q->mutex);
return false;
}
pthread_cond_wait(&q->cond_removed, &q->mutex);
}

items_until_end = (q->buflen - q->head * q->elem_size) / q->elem_size;
items_to_pop = min(q->count, max_items);
items_to_pop = min(items_to_pop, items_until_end);

memcpy(data, &q->buf[q->head * q->elem_size], items_to_pop * q->elem_size);
q->head = (q->head + items_to_pop) % prot_queue_capacity(q);
q->count -= items_to_pop;
copy_bytes_in(q, data, len);

pthread_cond_signal(&q->cond_removed); // Signal a waiting thread
pthread_cond_signal(&q->cond_added);
pthread_mutex_unlock(&q->mutex);
return items_to_pop;
return true;
}

/*
* Wait until we have elements, and then pop multiple elements from the queue
* up to the specified maximum.
*
* Params:
* q - Pointer to the queue.
* buffer - Pointer to the buffer where popped data will be stored.
* max_items - Maximum number of items to pop from the queue.
* Returns the actual number of items popped.
*/
int prot_queue_pop_all(struct prot_queue *q, void *dest, int max_items) {
static size_t pull(struct prot_queue* q,
void *data,
size_t min_len, size_t max_len,
bool block)
{
size_t len;
pthread_mutex_lock(&q->mutex);

// Wait until there's at least one item to pop
while (q->count == 0) {
// Wait until there's enough contents.
while (q->bytes < min_len) {
if (!block) {
pthread_mutex_unlock(&q->mutex);
return false;
}
pthread_cond_wait(&q->cond_added, &q->mutex);
}

int items_until_end = (q->buflen - q->head * q->elem_size) / q->elem_size;
int items_to_pop = min(q->count, max_items);
items_to_pop = min(items_to_pop, items_until_end);

memcpy(dest, &q->buf[q->head * q->elem_size], items_to_pop * q->elem_size);
q->head = (q->head + items_to_pop) % prot_queue_capacity(q);
q->count -= items_to_pop;
len = q->bytes;
if (len > max_len)
len = max_len;
copy_bytes_out(q, data, len);

pthread_cond_signal(&q->cond_removed);
pthread_mutex_unlock(&q->mutex);
return len;
}

return items_to_pop;
/* Push an element onto the queue. Blocks if it needs to */
void prot_queue_push(struct prot_queue* q, const void *data)
{
prot_queue_push_many(q, data, 1);
}

/*
* Pop an element from the queue. Blocks if the queue is empty.
* Params:
* q - Pointer to the queue.
* data - Pointer to where the popped data will be stored.
*/
void prot_queue_pop(struct prot_queue *q, void *data) {
pthread_mutex_lock(&q->mutex);
/* Push elements onto the queue. Blocks if it needs to */
void prot_queue_push_many(struct prot_queue* q,
const void *data,
size_t count)
{
push(q, data, count * q->elem_size, true);
}

while (q->count == 0)
pthread_cond_wait(&q->cond_added, &q->mutex);
/* Push an element onto the queue. Returns false if it would block. */
bool prot_queue_try_push(struct prot_queue* q, const void *data)
{
return push(q, data, q->elem_size, false);
}

size_t prot_queue_try_pop_many(struct prot_queue *q, void *data,
size_t max_items)
{
return pull(q, data, q->elem_size, max_items * q->elem_size, false)
/ q->elem_size;
}

memcpy(data, &q->buf[q->head * q->elem_size], q->elem_size);
q->head = (q->head + 1) % prot_queue_capacity(q);
q->count--;
size_t prot_queue_pop_many(struct prot_queue *q, void *data, size_t max_items)
{
return pull(q, data, q->elem_size, max_items * q->elem_size, true)
/ q->elem_size;
}

pthread_cond_signal(&q->cond_removed);
pthread_mutex_unlock(&q->mutex);
void prot_queue_pop(struct prot_queue *q, void *data)
{
pull(q, data, q->elem_size, q->elem_size, true);
}

/*
Expand Down
Loading

0 comments on commit ff577a3

Please sign in to comment.