Skip to content

Commit

Permalink
events: add variable entry size circular queue
Browse files Browse the repository at this point in the history
  • Loading branch information
mmcgee-jump committed Dec 18, 2024
1 parent 13706e3 commit 1e27d60
Show file tree
Hide file tree
Showing 4 changed files with 391 additions and 0 deletions.
5 changes: 5 additions & 0 deletions src/disco/events/Local.mk
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
$(call add-hdrs,fd_circq.h)
$(call add-objs,fd_circq,fd_disco)

$(call make-unit-test,test_circq,test_circq,fd_disco fd_flamenco fd_tango fd_util)
$(call run-unit-test,test_circq)
163 changes: 163 additions & 0 deletions src/disco/events/fd_circq.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
#include "fd_circq.h"

struct __attribute__((aligned(8UL))) fd_circq_message_private {
ulong align;
ulong footprint;

/* Offset withn the circular buffer data region of where the next
message starts, if there is one. This is not always the same as
aligning up this message + footprint, because the next message may
have wrapped around to the start of the buffer. */
ulong next;
};

typedef struct fd_circq_message_private fd_circq_message_t;

FD_FN_CONST ulong
fd_circq_align( void ) {
return FD_CIRCQ_ALIGN;
}

FD_FN_CONST ulong
fd_circq_footprint( ulong sz ) {
return sizeof( fd_circq_t ) + sz;
}

void *
fd_circq_new( void * shmem,
ulong sz ) {
fd_circq_t * circq = (fd_circq_t *)shmem;
circq->cnt = 0UL;
circq->head = 0UL;
circq->tail = 0UL;
circq->size = sz;
return shmem;
}

fd_circq_t *
fd_circq_join( void * shbuf ) {
return (fd_circq_t *)shbuf;
}

void *
fd_circq_leave( fd_circq_t * buf ) {
return (void *)buf;
}

void *
fd_circq_delete( void * shbuf ) {
return shbuf;
}

static inline void FD_FN_UNUSED
verify( fd_circq_t * circq ) {
FD_TEST( circq->head<circq->size );
FD_TEST( circq->tail<circq->size );
FD_TEST( circq->tail!=circq->head || circq->cnt<=1 );
if( !circq->cnt ) {
FD_TEST( circq->head==0UL );
FD_TEST( circq->tail==0UL );
} else if( circq->cnt==1UL ) {
FD_TEST( circq->head==circq->tail );
}

uchar * buf = (uchar *)(circq+1);

ulong current = circq->head;
int wrapped = 0;
for( ulong i=0UL; i<circq->cnt; i++ ) {
fd_circq_message_t * message = (fd_circq_message_t *)(buf+current);
ulong start = current;
ulong end = fd_ulong_align_up( start+sizeof( fd_circq_message_t ), message->align ) + message->footprint;
if( wrapped ) FD_TEST( end<=circq->head );
FD_TEST( start<end );
FD_TEST( end<=circq->size );
current = message->next;
if( current<start ) wrapped = 1;
}
}

static void
evict( fd_circq_t * circq,
ulong from,
ulong to ) {
uchar * buf = (uchar *)(circq+1);

for(;;) {
if( FD_UNLIKELY( !circq->cnt ) ) return;

fd_circq_message_t * head = (fd_circq_message_t *)(buf+circq->head);

ulong start = circq->head;
ulong end = fd_ulong_align_up( start + sizeof( fd_circq_message_t ), head->align ) + head->footprint;

if( FD_UNLIKELY( (start<to && end>from) ) ) {
circq->cnt--;
circq->metrics.drop_cnt++;
if( FD_LIKELY( !circq->cnt ) ) circq->head = circq->tail = 0UL;
else circq->head = head->next;
} else {
break;
}
}
}

uchar *
fd_circq_push_back( fd_circq_t * circq,
ulong align,
ulong footprint ) {
if( FD_UNLIKELY( !fd_ulong_is_pow2( align ) ) ) {
FD_LOG_WARNING(( "align must be a power of 2" ));
return NULL;
}
if( FD_UNLIKELY( align>FD_CIRCQ_ALIGN ) ) {
FD_LOG_WARNING(( "align must be at most %lu", FD_CIRCQ_ALIGN ));
return NULL;
}

ulong required = fd_ulong_align_up( sizeof( fd_circq_message_t ), align ) + footprint;
if( FD_UNLIKELY( required>circq->size ) ) {
FD_LOG_WARNING(( "tried to push message which was too large %lu>%lu", required, circq->size ));
return NULL;
}

uchar * buf = (uchar *)(circq+1);

ulong current = 0UL;
fd_circq_message_t * message = NULL;
if( FD_LIKELY( circq->cnt ) ) {
message = (fd_circq_message_t *)(buf+circq->tail);
current = fd_ulong_align_up( fd_ulong_align_up( circq->tail+sizeof( fd_circq_message_t ), message->align )+message->footprint, alignof( fd_circq_message_t ) );
}

if( FD_UNLIKELY( current+required>circq->size ) ) {
evict( circq, current, circq->size );
evict( circq, 0UL, required );

circq->tail = 0UL;
if( FD_LIKELY( circq->cnt && message ) ) message->next = 0UL;
} else {
evict( circq, current, current+required );

circq->tail = current;
if( FD_LIKELY( circq->cnt && message ) ) message->next = current;
}

circq->cnt++;
fd_circq_message_t * next_message = (fd_circq_message_t *)(buf+circq->tail);
next_message->align = align;
next_message->footprint = footprint;
return (uchar *)(next_message+1);
}

uchar const *
fd_circq_pop_front( fd_circq_t * circq ) {
if( FD_UNLIKELY( !circq->cnt ) ) return NULL;

circq->cnt--;
fd_circq_message_t * message = (fd_circq_message_t *)((uchar *)(circq+1)+circq->head);
if( FD_UNLIKELY( !circq->cnt ) ) circq->head = circq->tail = 0UL;
else circq->head = message->next;
FD_TEST( circq->head<circq->size );
return (uchar *)(message+1);
}
99 changes: 99 additions & 0 deletions src/disco/events/fd_circq.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
#ifndef HEADER_fd_src_disco_events_fd_circq_h
#define HEADER_fd_src_disco_events_fd_circq_h

/* The circular buffer is a structure, which stores a queue of messages,
supporting two operations: push_back and pop_front. Unlike a regular
queue, the circular buffer is fixed size and push_back must always
succeed.
To ensure push_back always succeeds, the circular buffer will evict
old messages if necessary to make room for the new one.
One more complication is that the circular buffer must store
metadata about the messages in the data buffer itself, as it does not
have a separate metadata region. The structure of the buffer then
looks as follows:
+-------+-----+------+-----+-------+-----+------+-----+-------+-----+------+
+ meta0 | pad | msg0 | pad | meta1 | pad | msg1 | pad | meta2 | pad | msg2 |
+-------+-----+------+-----+-------+-----+------+-----+-------+-----+------+
^ | ^ | ^ |
| +-----next---------next--+ +------------------------+ |
| |
head tail
Here, the meta elements are fd_circq_message_t, which each point to
the next message in the queue, and head, tail are the head and tail
of the queue respectively. */

#include "../fd_disco_base.h"

#define FD_CIRCQ_ALIGN (4096UL)

struct __attribute__((aligned(FD_CIRCQ_ALIGN))) fd_circq_private {
/* Current count of elements in the queue. */
ulong cnt;

/* These are offsets relative to the end of this struct of the
metadata for the first, and last message in the queue,
respectively. */
ulong head;
ulong tail;

ulong size;

struct {
ulong drop_cnt;
} metrics;

/* padding out to 4k here ... */
};

typedef struct fd_circq_private fd_circq_t;

FD_PROTOTYPES_BEGIN

FD_FN_CONST ulong
fd_circq_align( void );

FD_FN_CONST ulong
fd_circq_footprint( ulong sz );

void *
fd_circq_new( void * shmem,
ulong sz );

fd_circq_t *
fd_circq_join( void * shbuf );

void *
fd_circq_leave( fd_circq_t * buf );

void *
fd_circq_delete( void * shbuf );

/* fd_circq_push_back appends a message of size sz into the circular
buffer, evicting any old messages if they would be overwritten when
the buffer wraps around. Returns the address of the memory contents
in the buffer on success, or NULL on failure. The only two reasons
for failure are if the requested sz (along with the message metadata)
exceeds the size of the entire buffer and can't fit, or if the
requested alignment is not a power of 2, or is larger than 4096. */

uchar *
fd_circq_push_back( fd_circq_t * circq,
ulong align,
ulong footprint );

/* fd_circq_pop_front pops the oldest message from the circular buffer
and returns the address of the memory contents in the buffer. The
memory contents are guaranteed to be valid until the next call to
fd_circq_push_back. Returns NULL if there are no messages in the
circular buffer. */

uchar const *
fd_circq_pop_front( fd_circq_t * circq );

FD_PROTOTYPES_END

#endif /* HEADER_fd_src_disco_events_fd_circq_h */
124 changes: 124 additions & 0 deletions src/disco/events/test_circq.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
#include "../fd_disco.h"

#include "fd_circq.h"

void
test_simple1( void ) {
uchar buf[ 128UL+4096UL ];
fd_circq_t * circq = fd_circq_join( fd_circq_new( buf, 128 ) );
FD_TEST( circq );

fd_rng_t _rng[1];
fd_rng_t * rng = fd_rng_join( fd_rng_new( _rng, 0U, 0UL ) );

for( ulong i=0UL; i<8192UL*8192UL; i++ ) {
uchar * msg = fd_circq_push_back( circq, fd_ulong_pow2( (int)fd_rng_ulong_roll( rng, 5 ) ), 1UL+fd_rng_ulong_roll( rng, 64 ) );
FD_TEST( msg );
}
}

void
test_simple2( void ) {
uchar buf[ 32UL+1024UL ];
fd_circq_t * circq = fd_circq_join( fd_circq_new( buf, 64UL ) );
FD_TEST( circq );

uchar * msg = fd_circq_push_back( circq, 1UL, 8UL );
msg[ 0 ] = 'X';
msg[ 7 ] = 'A';

uchar const * msg2 = fd_circq_pop_front( circq );

FD_TEST( msg2 );
FD_TEST( msg2[0]=='X' );
FD_TEST( msg2[7]=='A' );

FD_TEST( !fd_circq_pop_front( circq ) );
FD_TEST( !fd_circq_pop_front( circq ) );
FD_TEST( !fd_circq_pop_front( circq ) );

msg = fd_circq_push_back( circq, 1UL, 8UL );
msg[ 0 ] = 'X';
msg[ 7 ] = 'A';

msg = fd_circq_push_back( circq, 1UL, 8UL );
FD_TEST( circq->cnt==2UL );
msg[ 0 ] = '2';
msg[ 7 ] = '3';

msg2 = fd_circq_pop_front( circq );
FD_TEST( msg2 );
FD_TEST( msg2[0]=='X' );
FD_TEST( msg2[7]=='A' );

msg2 = fd_circq_pop_front( circq );
FD_TEST( msg2 );
FD_TEST( msg2[0]=='2' );
FD_TEST( msg2[7]=='3' );

FD_TEST( !fd_circq_pop_front( circq ) );

msg = fd_circq_push_back( circq, 1UL, 9UL );
msg[ 0 ] = 'X';
msg[ 7 ] = 'A';

msg = fd_circq_push_back( circq, 1UL, 8UL );
msg[ 0 ] = '2';
msg[ 7 ] = '3';

msg2 = fd_circq_pop_front( circq );
FD_TEST( msg2 );
FD_TEST( msg2[0]=='2' );
FD_TEST( msg2[7]=='3' );

FD_TEST( !fd_circq_pop_front( circq ) );
}

void
test_simple3( void ) {
uchar buf[ 32UL+1024UL ];
fd_circq_t * circq = fd_circq_join( fd_circq_new( buf, 128 ) );
FD_TEST( circq );

fd_rng_t _rng[1];
fd_rng_t * rng = fd_rng_join( fd_rng_new( _rng, 6U, 0UL ) );

for( ulong i=0UL; i<8192UL; i++ ) {
if( 0UL==fd_rng_ulong_roll( rng, 2 ) ) fd_circq_pop_front( circq );
fd_circq_push_back( circq, 1+fd_rng_ulong_roll( rng, 256UL ), 1UL+fd_rng_ulong_roll( rng, 25UL ) );
}
}

void
test_bounds( void ) {
uchar buf[ 32UL+1024UL ];
fd_circq_t * circq = fd_circq_join( fd_circq_new( buf, 1024UL ) );
FD_TEST( circq );

FD_TEST( fd_circq_push_back( circq, 1UL, 1024UL-25UL ) );
FD_TEST( fd_circq_push_back( circq, 1UL, 1024UL-24UL ) );
FD_TEST( fd_circq_push_back( circq, 8UL, 1024UL-24UL ) );
FD_TEST( !fd_circq_push_back( circq, 1UL, 1024UL-23UL ) );
}

int
main( int argc,
char ** argv ) {
fd_boot( &argc, &argv );

test_simple1();
FD_LOG_NOTICE(( "OK: simple1"));

test_simple2();
FD_LOG_NOTICE(( "OK: simple2"));

test_simple3();
FD_LOG_NOTICE(( "OK: simple3"));

test_bounds();
FD_LOG_NOTICE(( "OK: bounds"));

FD_LOG_NOTICE(( "pass" ));
fd_halt();
return 0;
}

0 comments on commit 1e27d60

Please sign in to comment.