Skip to content

Commit

Permalink
Rename vms_stream_fetch
Browse files Browse the repository at this point in the history
Reflect if it is dropping or not.
  • Loading branch information
mchalupa committed Feb 1, 2024
1 parent bf6c967 commit e269de2
Show file tree
Hide file tree
Showing 7 changed files with 56 additions and 17 deletions.
8 changes: 5 additions & 3 deletions include/vamos-buffers/core/arbiter.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,12 @@ size_t vms_arbiter_buffer_drop_older_than(vms_arbiter_buffer *buffer,
vms_eventid id);
bool vms_arbiter_buffer_pop(vms_arbiter_buffer *q, void *buff);

void *vms_stream_fetch(vms_stream *stream, vms_arbiter_buffer *buffer);
void *vms_stream_fetch(vms_stream *stream);
void *vms_stream_fetch_dropping(vms_stream *stream, vms_arbiter_buffer *buffer);

void *vms_stream_filter_fetch(vms_stream *stream, vms_arbiter_buffer *buffer,
vms_stream_filter_fn filter);
void *vms_stream_fetch_dropping_filter(vms_stream *stream,
vms_arbiter_buffer *buffer,
vms_stream_filter_fn filter);

bool vms_arbiter_buffer_is_done(vms_arbiter_buffer *buffer);

Expand Down
17 changes: 17 additions & 0 deletions include/vamos-buffers/core/vector-macro.h
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,23 @@
--VEC_SIZE(vec); \
} while (0)

/**
* Remove the first occurence of `elem` from `vec`. `elem` must be a simple
* type, since we use == with it.
* IMPORTANT: The removal does shuffle the elements, in particular,
* the last element will be copied to the place of the removed one.
*/
#define VEC_UNORD_REMOVE(vec, elem) \
do { \
for (int i = 0; i < VEC_SIZE(vec); ++i) { \
if (VEC_AT(i) == elem) { \
vec[i] = vec[VEC_SIZE(vec) - 1]; \
--VEC_SIZE(vec); \
break; \
} \
} \
while (0)

/**
* Return the pointer to the top element in the vector.
* There is no check if there is any element. If there is none,
Expand Down
34 changes: 27 additions & 7 deletions src/core/arbiter.c
Original file line number Diff line number Diff line change
Expand Up @@ -567,7 +567,26 @@ static bool handle_dropping_event(vms_stream *stream,
}

/* wait for an event on the 'stream' */
void *vms_stream_fetch(vms_stream *stream, vms_arbiter_buffer *buffer) {
void *vms_stream_fetch(vms_stream *stream) {
void *ev;
if ((ev = get_event(stream))) {
assert(ev && "Dont have event");
assert(!vms_event_is_hole((vms_event *)ev) && "Got dropped event");
assert(vms_event_id(ev) == ++stream->last_event_id &&
"IDs are inconsistent");
/*
printf("FETCH: read event { kind = %lu, id = %lu}\n",
((vms_event*)ev)->kind,
((vms_event*)ev)->id);
*/
}

return ev;
}

/* Wait for an event on the 'stream'. If `buffer` is full, start dropping. */
void *vms_stream_fetch_dropping(vms_stream *stream,
vms_arbiter_buffer *buffer) {
void *ev;
size_t last_ev_id = 1;
while (1) {
Expand All @@ -594,13 +613,14 @@ void *vms_stream_fetch(vms_stream *stream, vms_arbiter_buffer *buffer) {
return ev;
}

/* got to next iteration to try the next event */
/* go to next iteration to try the next event */
}
}

/* FIXME: do not duplicate the code */
void *vms_stream_filter_fetch(vms_stream *stream, vms_arbiter_buffer *buffer,
vms_stream_filter_fn filter) {
void *vms_stream_fetch_dropping_filter(vms_stream *stream,
vms_arbiter_buffer *buffer,
vms_stream_filter_fn filter) {
void *ev;
size_t last_ev_id = 1;
while (1) {
Expand Down Expand Up @@ -634,7 +654,7 @@ void *vms_stream_filter_fetch(vms_stream *stream, vms_arbiter_buffer *buffer,
return ev;
}

/* got to next iteration to try the next event */
/* go to next iteration to try the next event */
}
}

Expand Down Expand Up @@ -683,10 +703,10 @@ void vms_arbiter_buffer_dump_stats(vms_arbiter_buffer *buffer) {
s->last_event_id);
COLOR_RESET
#endif
fprintf(stderr, " vms_stream_fetch() fetched %lu events\n",
fprintf(stderr, " vms_stream_fetch*() fetched %lu events\n",
s->fetched_events);
fprintf(stderr,
" vms_stream_fetch() totally dropped %lu events in %lu holes\n",
" vms_stream_fetch*() totally dropped %lu events in %lu holes\n",
buffer->total_dropped_num, buffer->total_dropped_times);
fprintf(stderr, " Last event was drop: %s\n",
buffer->last_was_drop ? "true" : "false");
Expand Down
2 changes: 1 addition & 1 deletion src/core/shamon.c
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ static int default_buffer_manager_thrd(void *data) {

void *ev, *out;
while (1) {
ev = vms_stream_fetch(stream, buffer);
ev = vms_stream_fetch_dropping(stream, buffer);
if (!ev) {
break;
}
Expand Down
4 changes: 2 additions & 2 deletions tests/fetch-test-2.c
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ int main(void) {

struct event* ev;
for (size_t i = 1; i <= 10000; ++i) {
ev = vms_stream_fetch(stream, arbiter_buffer);
ev = vms_stream_fetch_dropping(stream, arbiter_buffer);
/* there should be no dropped event generated */
assert(vms_arbiter_buffer_size(arbiter_buffer) == 0);

Expand All @@ -121,7 +121,7 @@ int main(void) {
vms_stream_consume(stream, 1);
}

ev = vms_stream_fetch(stream, arbiter_buffer);
ev = vms_stream_fetch_dropping(stream, arbiter_buffer);
assert(!ev);

main_finished = 1;
Expand Down
4 changes: 2 additions & 2 deletions tests/fetch-test-3.c
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ int main(void) {
struct event *ev;
struct event *out;
for (size_t i = 0; i < 10000; ++i) {
ev = vms_stream_fetch(stream, arbiter_buffer);
ev = vms_stream_fetch_dropping(stream, arbiter_buffer);
/* there should be no dropped event generated
* since we are outputing the events into another buffer */
assert(vms_arbiter_buffer_size(arbiter_buffer) == 0);
Expand All @@ -121,7 +121,7 @@ int main(void) {
vms_stream_consume(stream, 1);
}

ev = vms_stream_fetch(stream, arbiter_buffer);
ev = vms_stream_fetch_dropping(stream, arbiter_buffer);
assert(!ev);

main_finished = 1;
Expand Down
4 changes: 2 additions & 2 deletions tests/fetch-test.c
Original file line number Diff line number Diff line change
Expand Up @@ -57,15 +57,15 @@ int main(void) {

struct event *ev;
for (int i = 0; i < 4; ++i) {
ev = vms_stream_fetch(stream, arbiter_buffer);
ev = vms_stream_fetch_dropping(stream, arbiter_buffer);
assert(ev);
assert(ev->n == i);
// printf("POP Event: {{%lu, %lu}, %d}\n", ev->base.id, ev->base.kind,
// ev->n);
vms_stream_consume(stream, 1);
}

ev = vms_stream_fetch(stream, arbiter_buffer);
ev = vms_stream_fetch_dropping(stream, arbiter_buffer);
assert(!ev);

pthread_join(tid, NULL);
Expand Down

0 comments on commit e269de2

Please sign in to comment.