diff --git a/src/waltz/quic/fd_quic.c b/src/waltz/quic/fd_quic.c index eee27185bc..bded7c3085 100644 --- a/src/waltz/quic/fd_quic.c +++ b/src/waltz/quic/fd_quic.c @@ -38,6 +38,19 @@ #define PRQ_TIMEOUT_T ulong #include "../../util/tmpl/fd_prq.c" +/* Using RB tree as priority queue for time based processing */ +#define REDBLK_T fd_quic_rb_event_t +#define REDBLK_NAME rb_service_queue +#include "../../util/tmpl/fd_redblack.c" + +long +rb_service_queue_compare( fd_quic_rb_event_t * left, + fd_quic_rb_event_t * right) { + long dt = (long)( left->timeout - right->timeout ); + long di = (long)( left->conn_idx - right->conn_idx ); + return dt == 0 ? di : dt; +} + /* Declare map type for stream_id -> stream* */ #define MAP_NAME fd_quic_stream_map #define MAP_KEY stream_id @@ -163,9 +176,9 @@ fd_quic_footprint_ext( fd_quic_limits_t const * limits, offs += conn_map_footprint; /* allocate space for events priority queue */ - offs = fd_ulong_align_up( offs, service_queue_align() ); + offs = fd_ulong_align_up( offs, rb_service_queue_align() ); layout->event_queue_off = offs; - ulong event_queue_footprint = service_queue_footprint( conn_cnt + 1 ); + ulong event_queue_footprint = rb_service_queue_footprint( conn_cnt+1U ); if( FD_UNLIKELY( !event_queue_footprint ) ) { FD_LOG_WARNING(( "invalid service_queue_footprint" )); return 0UL; } offs += event_queue_footprint; @@ -474,12 +487,13 @@ fd_quic_init( fd_quic_t * quic ) { /* State: Initialize service queue */ ulong service_queue_laddr = (ulong)quic + layout.event_queue_off; - void * v_service_queue = service_queue_new( (void *)service_queue_laddr, limits->conn_cnt+1U ); - state->service_queue = service_queue_join( v_service_queue ); - if( FD_UNLIKELY( !state->service_queue ) ) { + void * v_service_queue = rb_service_queue_new( (void *)service_queue_laddr, limits->conn_cnt+1U ); + state->rb_service_queue = rb_service_queue_join( v_service_queue ); + if( FD_UNLIKELY( !state->rb_service_queue ) ) { FD_LOG_WARNING(( "NULL service_queue" )); return NULL; } + state->rb_service_queue_root = NULL; /* Check TX AIO */ @@ -860,8 +874,9 @@ fd_quic_fini( fd_quic_t * quic ) { /* Delete service queue */ - service_queue_delete( service_queue_leave( state->service_queue ) ); - state->service_queue = NULL; + rb_service_queue_delete( rb_service_queue_leave( state->rb_service_queue ) ); + state->rb_service_queue = NULL; + state->rb_service_queue_root = NULL; /* Delete conn ID map */ @@ -2150,45 +2165,69 @@ fd_quic_handle_v1_one_rtt( fd_quic_t * quic, void fd_quic_schedule_conn( fd_quic_conn_t * conn ) { - fd_quic_t * quic = conn->quic; fd_quic_state_t * state = fd_quic_get_state( quic ); ulong timeout = conn->next_service_time; + fd_quic_rb_event_t * rb_service_queue = state->rb_service_queue; + + fd_quic_rb_event_t * event = NULL; + /* scheduled? */ - if( conn->in_service ) { + if( conn->in_schedule ) { + fd_quic_rb_event_t key[1] = {{ .timeout = conn->sched_service_time, + .conn_idx = conn->conn_idx }}; + /* find conn in events, then remove, update, insert */ - fd_quic_event_t * event = NULL; - ulong event_idx = 0; - ulong cnt = service_queue_cnt( state->service_queue ); - for( ulong j = 0; j < cnt; ++j ) { - fd_quic_event_t * cur_event = state->service_queue + j; - if( cur_event->conn == conn ) { - event = cur_event; - event_idx = j; - break; - } - } + event = rb_service_queue_find( rb_service_queue, + state->rb_service_queue_root, + key ); - if( FD_LIKELY( event ) ) { - /* remove key */ - service_queue_remove( state->service_queue, event_idx ); + /* sanity check */ + if( FD_UNLIKELY( !event ) ) { + FD_LOG_ERR(( "event expected in service queue, but not found" )); + } + } - /* TODO can use a priority queue key-reduce operation, which may be done more - quickly than remove, insert */ + if( !event ) { + event = rb_service_queue_acquire( rb_service_queue ); + if( FD_UNLIKELY( !event ) ) { + FD_LOG_ERR(( "service_queue pool empty - this should be logically impossible" )); } + } else { + /* remove from the service queue */ + rb_service_queue_remove( rb_service_queue, + &state->rb_service_queue_root, + event ); } timeout = fd_ulong_max( timeout, state->now + 1UL ); + event->conn = conn; + event->timeout = timeout; + event->conn_idx = conn->conn_idx; + + /* only one entry allowed at each key */ + /* sanity check */ + /* this shouldn't be necessary, as conn_idx in the key should make */ + /* it unique */ + fd_quic_rb_event_t * find_event = + rb_service_queue_find( rb_service_queue, + state->rb_service_queue_root, + event ); + if( FD_UNLIKELY( find_event ) ) { + FD_LOG_ERR(( "event should not be in schedule, but is" )); + } + /* insert key */ - fd_quic_event_t event[1] = {{ .timeout = timeout, .conn = conn }}; - service_queue_insert( state->service_queue, event ); + rb_service_queue_insert( rb_service_queue, + &state->rb_service_queue_root, + event ); conn->sched_service_time = timeout; conn->next_service_time = timeout; - conn->in_service = 1; + conn->in_schedule = 1; } /* get the service interval, while ensuring the value @@ -2216,44 +2255,16 @@ fd_quic_reschedule_conn( fd_quic_conn_t * conn, timeout = fd_ulong_min( timeout, now + service_interval ); timeout = fd_ulong_max( timeout, now + 1UL ); - /* scheduled? */ - if( conn->in_service ) { - timeout = fd_ulong_min( timeout, conn->next_service_time ); - - /* in the queue, but already scheduled sooner */ - if( timeout >= conn->sched_service_time ) { - return; - } - - /* find conn in events, then remove, update, insert */ - fd_quic_event_t * event = NULL; - ulong event_idx = 0; - ulong cnt = service_queue_cnt( state->service_queue ); - for( ulong j = 0; j < cnt; ++j ) { - fd_quic_event_t * cur_event = state->service_queue + j; - if( cur_event->conn == conn ) { - event = cur_event; - event_idx = j; - break; - } - } - - if( FD_LIKELY( event ) ) { - /* remove key */ - service_queue_remove( state->service_queue, event_idx ); - - /* TODO can use a priority queue key-reduce operation, which may be done more - quickly than remove, insert */ - } - + if( conn->in_schedule ) { + if( timeout >= conn->sched_service_time ) return; conn->next_service_time = timeout; fd_quic_schedule_conn( conn ); - - return; + } else { + if( timeout >= conn->next_service_time ) return; + conn->next_service_time = timeout; + /* if we're not in the schedule, fd_quic_conn_service will */ + /* insert us */ } - - /* since we're not in the service queue, just set the next_service_time */ - conn->next_service_time = timeout; } /* fd_quic_ack_join is used to reduce the resources consumed managing acks */ @@ -3289,10 +3300,15 @@ fd_quic_service( fd_quic_t * quic ) { fd_quic_assign_streams( quic ); } + fd_quic_rb_event_t * rb_service_queue = state->rb_service_queue; + /* service events */ fd_quic_conn_t * conn = NULL; - while( service_queue_cnt( state->service_queue ) ) { - fd_quic_event_t * event = &state->service_queue[0]; + while( 1 ) { + fd_quic_rb_event_t * event = + rb_service_queue_minimum( rb_service_queue, + state->rb_service_queue_root ); + if( !event ) break; /* copy before removing event */ conn = event->conn; @@ -3306,10 +3322,14 @@ fd_quic_service( fd_quic_t * quic ) { conn->next_service_time = now + fd_quic_get_service_interval( quic ); /* remove event, later reinserted at new time */ - service_queue_remove_min( state->service_queue ); + rb_service_queue_remove( rb_service_queue, + &state->rb_service_queue_root, + event ); + rb_service_queue_release( rb_service_queue, event ); /* unset "in service queue" */ - conn->in_service = 0; + conn->in_schedule = 0; + conn->sched_service_time = ~0UL; if( FD_UNLIKELY( conn->state == FD_QUIC_CONN_STATE_INVALID ) ) { /* connection shouldn't have been scheduled, @@ -3363,7 +3383,7 @@ fd_quic_service( fd_quic_t * quic ) { default: /* only schedule if not already scheduled */ - if( !conn->in_service ) { + if( !conn->in_schedule ) { fd_quic_schedule_conn( conn ); } } @@ -5311,8 +5331,11 @@ fd_quic_get_next_wakeup( fd_quic_t * quic ) { } ulong t = ~(ulong)0; - if( service_queue_cnt( state->service_queue ) ) { - t = state->service_queue[0].timeout; + fd_quic_rb_event_t * event = + rb_service_queue_minimum( state->rb_service_queue, + state->rb_service_queue_root ); + if( event ) { + t = event->timeout; } return t; diff --git a/src/waltz/quic/fd_quic_conn.h b/src/waltz/quic/fd_quic_conn.h index 83bef92d88..bf5afd8e15 100644 --- a/src/waltz/quic/fd_quic_conn.h +++ b/src/waltz/quic/fd_quic_conn.h @@ -90,8 +90,8 @@ struct fd_quic_conn { uint version; /* QUIC version of the connection */ ulong next_service_time; /* time service should be called next */ - ulong sched_service_time; /* time service is scheduled for, if in_service=1 */ - int in_service; /* whether the conn is in the service queue */ + ulong sched_service_time; /* time service is scheduled for, if conn in service_queue */ + int in_schedule; /* whether the conn is in the service schedule */ uchar called_conn_new; /* whether we need to call conn_final on teardown */ /* we can have multiple connection ids */ diff --git a/src/waltz/quic/fd_quic_private.h b/src/waltz/quic/fd_quic_private.h index 8a7caa1616..a0007cbac3 100644 --- a/src/waltz/quic/fd_quic_private.h +++ b/src/waltz/quic/fd_quic_private.h @@ -45,6 +45,22 @@ struct fd_quic_event { }; typedef struct fd_quic_event fd_quic_event_t; +struct fd_quic_rb_event { + // key is time of event and the connection id + // this makes the key uniq, suitable for the red black tree + ulong timeout; + ulong conn_idx; + + fd_quic_conn_t * conn; + + // required by redblack interface + ulong redblack_parent; + ulong redblack_left; + ulong redblack_right; + int redblack_color; +}; +typedef struct fd_quic_rb_event fd_quic_rb_event_t; + /* structure for a cummulative summation tree */ struct fd_quic_cs_tree { ulong cnt; @@ -85,6 +101,8 @@ struct __attribute__((aligned(16UL))) fd_quic_state_private { ulong free_conns; /* count of free connections */ fd_quic_conn_map_t * conn_map; /* map connection ids -> connection */ fd_quic_event_t * service_queue; /* priority queue of connections by service time */ + fd_quic_rb_event_t * rb_service_queue; /* priority queue of connections */ + fd_quic_rb_event_t * rb_service_queue_root; fd_quic_stream_pool_t * stream_pool; /* stream pool */ fd_quic_cs_tree_t * cs_tree; /* cummulative summation tree */ diff --git a/src/waltz/quic/tests/fd_quic_sandbox.c b/src/waltz/quic/tests/fd_quic_sandbox.c index 5f1123834e..e97930a604 100644 --- a/src/waltz/quic/tests/fd_quic_sandbox.c +++ b/src/waltz/quic/tests/fd_quic_sandbox.c @@ -313,7 +313,7 @@ fd_quic_sandbox_new_conn_established( fd_quic_sandbox_t * sandbox, conn->state = FD_QUIC_CONN_STATE_ACTIVE; conn->established = 1; - conn->in_service = 1; + conn->in_schedule = 1; /* Mock a completed handshake */ conn->handshake_complete = 1;