From 225a251211b046ca54f4d8445e94d98b4a22447f Mon Sep 17 00:00:00 2001 From: nbridge-jump <123656380+nbridge-jump@users.noreply.github.com> Date: Fri, 8 Dec 2023 18:48:26 +0000 Subject: [PATCH] QUIC - added state for tracking acked hs_data bytes QUIC - added reordering to test_quic_drops QUIC - fixed race in max_data handling QUIC - fixes to ACK handling and to packet number handling --- src/tango/quic/fd_quic.c | 173 +++++++++++++++---------- src/tango/quic/fd_quic_conn.h | 7 + src/tango/quic/fd_quic_pkt_meta.h | 64 +++++++-- src/tango/quic/tests/test_quic_drops.c | 150 +++++++++++++-------- 4 files changed, 260 insertions(+), 134 deletions(-) diff --git a/src/tango/quic/fd_quic.c b/src/tango/quic/fd_quic.c index beeb0b4d3d..74da7ad87b 100644 --- a/src/tango/quic/fd_quic.c +++ b/src/tango/quic/fd_quic.c @@ -1494,6 +1494,26 @@ fd_quic_handle_v1_initial( fd_quic_t * quic, return FD_QUIC_PARSE_FAIL; } + /* insert into connection map at orig_dst_conn_id */ + fd_quic_conn_entry_t * insert_entry = + fd_quic_conn_map_insert( state->conn_map, &orig_dst_conn_id ); + + /* if insert failed (should be impossible) fail, and do not remove connection + from free list */ + if( FD_UNLIKELY( insert_entry == NULL ) ) { + FD_LOG_WARNING(( "fd_quic_conn_create failed: failed to register new conn ID" )); + fd_quic_conn_close( conn, FD_QUIC_CONN_REASON_INTERNAL_ERROR ); + return FD_QUIC_PARSE_FAIL; + } + + /* set connection map insert_entry to new connection */ + insert_entry->conn = conn; + + /* keep orig_dst_conn_id */ + + memcpy( &conn->orig_dst_conn_id, &orig_dst_conn_id, sizeof( orig_dst_conn_id ) ); + + /* set the value for the caller */ *p_conn = conn; @@ -1581,14 +1601,6 @@ fd_quic_handle_v1_initial( fd_quic_t * quic, /* reconstruct packet number */ fd_quic_reconstruct_pkt_num( &pkt_number, pkt_number_sz, conn->exp_pkt_number[pn_space] ); - /* TODO need min packet allowed AND expected packet number */ - /* packet number must be greater than the last processed - on a new connection, the minimum allowed is set to zero */ - if( FD_UNLIKELY( pkt_number < conn->exp_pkt_number[pn_space] ) ) { - /* packet already processed or abandoned, simply discard */ - return tot_sz; /* return bytes to allow for more packets to be processed */ - } - /* set packet number on the context */ pkt->pkt_number = pkt_number; } else { @@ -1627,14 +1639,6 @@ fd_quic_handle_v1_initial( fd_quic_t * quic, /* reconstruct packet number */ fd_quic_reconstruct_pkt_num( &pkt_number, pkt_number_sz, conn->exp_pkt_number[pn_space] ); - /* TODO need min packet allowed AND expected packet number */ - /* packet number must be greater than the last processed - on a new connection, the minimum allowed is set to zero */ - if( FD_UNLIKELY( pkt_number < conn->exp_pkt_number[pn_space] ) ) { - /* packet already processed or abandoned, simply discard */ - return tot_sz; /* return bytes to allow for more packets to be processed */ - } - /* set packet number on the context */ pkt->pkt_number = pkt_number; @@ -1804,14 +1808,6 @@ fd_quic_handle_v1_handshake( /* reconstruct packet number */ fd_quic_reconstruct_pkt_num( &pkt_number, pkt_number_sz, conn->exp_pkt_number[pn_space] ); - /* TODO need min packet allowed AND expected packet number */ - /* packet number must be greater than the last processed - on a new connection, the minimum allowed is set to zero */ - if( FD_UNLIKELY( pkt_number < conn->exp_pkt_number[pn_space] ) ) { - /* packet already processed or abandoned, simply discard */ - return tot_sz; /* return bytes to allow for more packets to be processed */ - } - /* set packet number on the context */ pkt->pkt_number = pkt_number; } else { @@ -1848,13 +1844,6 @@ fd_quic_handle_v1_handshake( /* reconstruct packet number */ fd_quic_reconstruct_pkt_num( &pkt_number, pkt_number_sz, conn->exp_pkt_number[pn_space] ); - /* packet number must be greater than the last processed - on a new connection, the minimum allowed is set to zero */ - if( FD_UNLIKELY( pkt_number < conn->exp_pkt_number[enc_level] ) ) { - /* packet already processed or abandoned, simply discard */ - return tot_sz; /* return bytes to allow for more packets to be processed */ - } - /* set packet number on the context */ pkt->pkt_number = pkt_number; @@ -2095,14 +2084,6 @@ fd_quic_handle_v1_one_rtt( fd_quic_t * quic, fd_quic_conn_t * conn, fd_quic_pkt_ /* reconstruct packet number */ fd_quic_reconstruct_pkt_num( &pkt_number, pkt_number_sz, conn->exp_pkt_number[pn_space] ); - /* TODO need min packet allowed AND expected packet number */ - /* packet number must be greater than the last processed - on a new connection, the minimum allowed is set to zero */ - if( FD_UNLIKELY( pkt_number < conn->exp_pkt_number[pn_space] ) ) { - /* packet already processed or abandoned, simply discard */ - return tot_sz; /* return bytes to allow for more packets to be processed */ - } - /* set packet number on the context */ pkt->pkt_number = pkt_number; @@ -2150,15 +2131,6 @@ fd_quic_handle_v1_one_rtt( fd_quic_t * quic, fd_quic_conn_t * conn, fd_quic_pkt_ /* reconstruct packet number */ fd_quic_reconstruct_pkt_num( &pkt_number, pkt_number_sz, conn->exp_pkt_number[pn_space] ); - /* packet number must be greater than the last processed - on a new connection, the minimum allowed is set to zero */ - if( FD_UNLIKELY( pkt_number < conn->exp_pkt_number[pn_space] ) ) { - FD_DEBUG( FD_LOG_DEBUG(( "packet number less than expected. Discarding" )) ); - - /* packet already processed or abandoned, simply discard */ - return tot_sz; /* return bytes to allow for more packets to be processed */ - } - /* since the packet number is greater than the highest last seen, do spin bit processing */ /* TODO by spec 1 in 16 connections should have this disabled */ @@ -2406,6 +2378,7 @@ fd_quic_ack_pkt( fd_quic_t * quic, fd_quic_conn_t * conn, fd_quic_pkt_t * pkt ) /* packet contains ack-eliciting frame */ if( ack_mandatory ) { pkt->ack_flag = 0; + /* initial and handshake packets get ack'ed immediately */ if( enc_level != fd_quic_enc_level_initial_id && enc_level != fd_quic_enc_level_handshake_id ) { ulong peer_max_ack_delay = fd_ulong_max( 1, conn->peer_max_ack_delay ); @@ -3634,7 +3607,8 @@ fd_quic_conn_tx( fd_quic_t * quic, fd_quic_conn_t * conn ) { fd_quic_pkt_hdr_t pkt_hdr; - fd_quic_pkt_meta_t * pkt_meta = NULL; + fd_quic_pkt_meta_t * pkt_meta = NULL; + ulong pkt_meta_var_idx = 0UL; if( conn->tx_ptr != conn->tx_buf ) { fd_quic_tx_buffered( quic, conn, 0 ); @@ -3976,9 +3950,11 @@ fd_quic_conn_tx( fd_quic_t * quic, fd_quic_conn_t * conn ) { ulong hs_offset = 0; /* offset within the current hs_data */ /* either include handshake data or stream data, but not both */ + ulong sent_offset = conn->hs_sent_bytes[enc_level]; + ulong ackd_offset = conn->hs_ackd_bytes[enc_level]; if( hs_data ) { /* offset within stream */ - ulong offset = conn->hs_sent_bytes[enc_level]; + ulong offset = fd_ulong_max( sent_offset, ackd_offset ); /* track pkt_meta values */ ulong offset_lo = offset; @@ -4044,8 +4020,8 @@ fd_quic_conn_tx( fd_quic_t * quic, fd_quic_conn_t * conn ) { offset_hi += cur_data_sz; /* move to next hs_data */ - offset += cur_data_sz; - data_sz += cur_data_sz; + offset += cur_data_sz; + data_sz += cur_data_sz; /* TODO load more hs_data into a crypto frame, if available currently tricky, because encode_crypto_frame copies payload */ @@ -4067,7 +4043,7 @@ fd_quic_conn_tx( fd_quic_t * quic, fd_quic_conn_t * conn ) { /* are we at application level of encryption? */ if( enc_level == fd_quic_enc_level_appdata_id ) { - if( conn->handshake_done_send ) { + if( conn->handshake_done_send /* && !conn->handshake_done_ackd TODO */ ) { /* send handshake done frame */ frame_sz = 1; pkt_meta->flags |= FD_QUIC_PKT_META_FLAGS_HS_DONE; @@ -4077,7 +4053,8 @@ fd_quic_conn_tx( fd_quic_t * quic, fd_quic_conn_t * conn ) { } if( conn->upd_pkt_number >= pkt_number ) { - if( conn->flags & FD_QUIC_CONN_FLAGS_MAX_DATA ) { + if( ( conn->flags & FD_QUIC_CONN_FLAGS_MAX_DATA ) && + ( conn->rx_max_data > conn->rx_max_data_ackd ) ) { /* send max_data frame */ frame.max_data.max_data = conn->rx_max_data; @@ -4094,6 +4071,16 @@ fd_quic_conn_tx( fd_quic_t * quic, fd_quic_conn_t * conn ) { pkt_meta->flags |= FD_QUIC_PKT_META_FLAGS_MAX_DATA; pkt_meta->expiry = fd_ulong_min( pkt_meta->expiry, now + 3u * conn->rtt ); + if( pkt_meta_var_idx < FD_QUIC_PKT_META_VAR_MAX ) { + pkt_meta->var[pkt_meta_var_idx].key = + (fd_quic_pkt_meta_key_t){ + .type = FD_QUIC_PKT_META_TYPE_OTHER, + .flags = FD_QUIC_CONN_FLAGS_MAX_DATA + }; + pkt_meta_var_idx++; + pkt_meta->var_sz = (uchar)pkt_meta_var_idx; /* TODO consolidate var_sz updates */ + } + conn->upd_pkt_number = pkt_number; } } @@ -5250,7 +5237,9 @@ fd_quic_conn_create( fd_quic_t * quic, /* crypto offset for first packet always starts at 0 */ fd_memset( conn->rx_crypto_offset, 0, sizeof( conn->rx_crypto_offset ) ); + /* TODO lots of fd_memset calls that should really be builtin memset */ fd_memset( conn->hs_sent_bytes, 0, sizeof( conn->hs_sent_bytes ) ); + fd_memset( conn->hs_ackd_bytes, 0, sizeof( conn->hs_ackd_bytes ) ); fd_memset( &conn->secrets, 0, sizeof( conn->secrets ) ); fd_memset( &conn->keys, 0, sizeof( conn->keys ) ); @@ -5476,6 +5465,11 @@ fd_quic_pkt_meta_retry( fd_quic_t * quic, if( enc_level < peer_enc_level ) { /* free pkt_meta */ + /* treat the original packet as-if it were ack'ed */ + fd_quic_reclaim_pkt_meta( conn, + pkt_meta, + enc_level ); + /* remove from list */ fd_quic_pkt_meta_remove( sent, prior, pkt_meta ); @@ -5495,12 +5489,10 @@ fd_quic_pkt_meta_retry( fd_quic_t * quic, if( flags & FD_QUIC_PKT_META_FLAGS_HS_DATA ) { /* find handshake data to retry */ /* reset offset to beginning of retried range if necessary */ - ulong offset = pkt_meta->range.offset_lo; + ulong offset = fd_ulong_max( conn->hs_ackd_bytes[enc_level], pkt_meta->range.offset_lo ); if( offset < conn->hs_sent_bytes[enc_level] ) { conn->hs_sent_bytes[enc_level] = offset; - /* TODO might need to have a member "hs_ackd_bytes" so we don't - try to resend bytes that were acked (and may have been discarded) */ - /* TODO do we need to set upd_pkt_number etc? */ + conn->upd_pkt_number = FD_QUIC_PKT_NUM_PENDING; } } if( flags & FD_QUIC_PKT_META_FLAGS_STREAM ) { @@ -5539,8 +5531,11 @@ fd_quic_pkt_meta_retry( fd_quic_t * quic, conn->upd_pkt_number = FD_QUIC_PKT_NUM_PENDING; } if( flags & FD_QUIC_PKT_META_FLAGS_MAX_DATA ) { - conn->flags |= FD_QUIC_CONN_FLAGS_MAX_DATA; - conn->upd_pkt_number = FD_QUIC_PKT_NUM_PENDING; + /* set max_data to be sent only if unacked */ + if( conn->rx_max_data_ackd < conn->rx_max_data ) { + conn->flags |= FD_QUIC_CONN_FLAGS_MAX_DATA; + conn->upd_pkt_number = FD_QUIC_PKT_NUM_PENDING; + } } if( flags & FD_QUIC_PKT_META_FLAGS_MAX_STREAM_DATA ) { /* we don't have the stream id for the max_stream_data stream @@ -5630,8 +5625,9 @@ fd_quic_reclaim_pkt_meta( fd_quic_conn_t * conn, fd_quic_pkt_meta_t * pkt_meta, uint enc_level ) { - uint flags = pkt_meta->flags; - ulong pkt_number = pkt_meta->pkt_number; + uint flags = pkt_meta->flags; + ulong pkt_number = pkt_meta->pkt_number; + fd_quic_range_t range = pkt_meta->range; if( FD_UNLIKELY( flags & FD_QUIC_PKT_META_FLAGS_KEY_UPDATE ) ) { /* what key phase was used for packet? */ @@ -5682,8 +5678,29 @@ fd_quic_reclaim_pkt_meta( fd_quic_conn_t * conn, } if( flags & FD_QUIC_PKT_META_FLAGS_HS_DATA ) { - /* actually, it is assumed the offsets are from the beginning of this - data... so we can't free any here */ + /* is this ack'ing the next consecutive bytes? + if so, we can increase the ack'd bytes + if not, we retransmit the bytes expected to be ack'd + we assume a gap means a dropped packet, and + this policy allows us to free up the pkt_meta here */ + ulong hs_ackd_bytes = conn->hs_ackd_bytes[enc_level]; + if( range.offset_lo <= hs_ackd_bytes ) { + hs_ackd_bytes = conn->hs_ackd_bytes[enc_level] + = fd_ulong_max( hs_ackd_bytes, range.offset_hi ); + + /* remove any unused hs_data */ + fd_quic_tls_hs_data_t * hs_data = NULL; + + hs_data = fd_quic_tls_get_hs_data( conn->tls_hs, (int)enc_level ); + while( hs_data && hs_data->offset + hs_data->data_sz <= hs_ackd_bytes ) { + fd_quic_tls_pop_hs_data( conn->tls_hs, (int)enc_level ); + hs_data = fd_quic_tls_get_hs_data( conn->tls_hs, (int)enc_level ); + } + } else { + conn->hs_sent_bytes[enc_level] = + fd_ulong_min( conn->hs_sent_bytes[enc_level], hs_ackd_bytes ); + conn->upd_pkt_number = FD_QUIC_PKT_NUM_PENDING; + } } if( flags & FD_QUIC_PKT_META_FLAGS_HS_DONE ) { @@ -5697,7 +5714,31 @@ fd_quic_reclaim_pkt_meta( fd_quic_conn_t * conn, } if( flags & FD_QUIC_PKT_META_FLAGS_MAX_DATA ) { - conn->flags &= ~FD_QUIC_CONN_FLAGS_MAX_DATA; + ulong max_data_ackd = 0UL; + for( ulong j = 0UL; j < pkt_meta->var_sz; ++j ) { + if( pkt_meta->var[j].key.type == FD_QUIC_PKT_META_TYPE_OTHER && + pkt_meta->var[j].key.flags == FD_QUIC_PKT_META_FLAGS_MAX_DATA ) { + max_data_ackd = pkt_meta->var[j].value; + } + } + + /* ack can only increase max_data_ackd */ + max_data_ackd = fd_ulong_max( max_data_ackd, conn->rx_max_data_ackd ); + + /* max_data_ackd > rx_max_data is a protocol violation */ + if( FD_UNLIKELY( max_data_ackd > conn->rx_max_data ) ) { + /* this is a protocol violation, so inform the peer */ + fd_quic_conn_error( conn, FD_QUIC_CONN_REASON_PROTOCOL_VIOLATION ); + return; + } + + /* clear flag only if acked value == current value */ + if( FD_LIKELY( max_data_ackd == conn->rx_max_data ) ) { + conn->flags &= ~FD_QUIC_CONN_FLAGS_MAX_DATA; + } + + /* set the ackd value */ + conn->rx_max_data_ackd = max_data_ackd; } if( flags & FD_QUIC_PKT_META_FLAGS_MAX_STREAMS_BIDIR ) { @@ -5727,8 +5768,6 @@ fd_quic_reclaim_pkt_meta( fd_quic_conn_t * conn, } if( flags & FD_QUIC_PKT_META_FLAGS_STREAM ) { - fd_quic_range_t range = pkt_meta->range; - /* find stream */ ulong stream_id = pkt_meta->stream_id; fd_quic_stream_t * stream = NULL; diff --git a/src/tango/quic/fd_quic_conn.h b/src/tango/quic/fd_quic_conn.h index 479c3e5cd9..5ea4532d43 100644 --- a/src/tango/quic/fd_quic_conn.h +++ b/src/tango/quic/fd_quic_conn.h @@ -17,6 +17,8 @@ #define FD_QUIC_CONN_STATE_ABORT 5 /* connection terminating due to error */ #define FD_QUIC_CONN_STATE_CLOSE_PENDING 6 /* connection is closing */ #define FD_QUIC_CONN_STATE_DEAD 7 /* connection about to be freed */ +#define FD_QUIC_CONN_STATE_CLOSING 8 /* waiting for a clean close (initiator) */ +#define FD_QUIC_CONN_STATE_DRAIN 9 /* waiting for a clean close (peer) */ enum { FD_QUIC_CONN_REASON_NO_ERROR = 0x00, /* No error */ @@ -116,6 +118,7 @@ struct fd_quic_conn { /* handshake members */ int handshake_complete; /* have we completed a successful handshake? */ int handshake_done_send; /* do we need to send handshake-done to peer? */ + int handshake_done_ackd; /* was handshake_done ack'ed? */ fd_quic_tls_hs_t * tls_hs; /* expected handshake data offset - one per encryption level @@ -131,6 +134,9 @@ struct fd_quic_conn { /* amount of handshake data already sent from head of queue */ ulong hs_sent_bytes[4]; + /* amount of handshake data ack'ed by peer counted from head of queue */ + ulong hs_ackd_bytes[4]; + /* secret members */ fd_quic_crypto_secrets_t secrets; fd_quic_crypto_keys_t keys[4][2]; /* a set of keys for each of the encoding levels, and for client/server */ @@ -242,6 +248,7 @@ struct fd_quic_conn { send to us */ ulong rx_tot_data; /* total of all bytes received across all streams and including implied bytes */ + ulong rx_max_data_ackd; /* max max_data acked by peer */ uint flags; # define FD_QUIC_CONN_FLAGS_MAX_DATA (1u<<0u) diff --git a/src/tango/quic/fd_quic_pkt_meta.h b/src/tango/quic/fd_quic_pkt_meta.h index 899fa34720..f5587c3f16 100644 --- a/src/tango/quic/fd_quic_pkt_meta.h +++ b/src/tango/quic/fd_quic_pkt_meta.h @@ -17,22 +17,60 @@ struct fd_quic_range { /* TODO convert to a union with various types of metadata overlaid */ +/* fd_quic_pkt_meta_var used for tracking max_data, max_stream_data and + * max_streams + * + * type: FD_QUIC_PKT_META_TYPE_STREAM_DATA + * FD_QUIC_PKT_META_TYPE_MAX_STREAM_DATA + * FD_QUIC_PKT_META_TYPE_OTHER + * flags: FD_QUIC_PKT_META_FLAGS_* + * value: max_data number of bytes + * max_stream_data number of bytes + * max_streams number of streams + */ +union fd_quic_pkt_meta_key { + union { + ulong stream_id:62; + struct { + ulong flags:62; + ulong type:2; +#define FD_QUIC_PKT_META_TYPE_OTHER 0 +#define FD_QUIC_PKT_META_TYPE_STREAM_DATA 1 +#define FD_QUIC_PKT_META_TYPE_MAX_STREAM_DATA 2 + }; + }; +}; +typedef union fd_quic_pkt_meta_key fd_quic_pkt_meta_key_t; + +struct fd_quic_pkt_meta_var { + fd_quic_pkt_meta_key_t key; + ulong value; +}; +typedef struct fd_quic_pkt_meta_var fd_quic_pkt_meta_var_t; + +/* the max number of pkt_meta_var entries in pkt_meta + this limits the number of max_data, max_stream_data and max_streams + allowed in a single quic packet */ +#define FD_QUIC_PKT_META_VAR_MAX 16 + /* fd_quic_pkt_meta tracks the metadata of data sent to the peer used when acks arrive to determine what is being acked specifically */ struct fd_quic_pkt_meta { /* stores metadata about what was sent in the identified packet */ - ulong pkt_number; /* the packet number */ - uchar enc_level; /* every packet is sent at a specific + ulong pkt_number; /* the packet number */ + uchar enc_level; /* every packet is sent at a specific enc_level */ - uchar pn_space; /* packet number space (must be consistent + uchar pn_space; /* packet number space (must be consistent with enc_level) */ - uchar status; + uchar status; # define FD_QUIC_PKT_META_STATUS_UNUSED ((uchar)0) # define FD_QUIC_PKT_META_STATUS_PENDING ((uchar)1) # define FD_QUIC_PKT_META_STATUS_SENT ((uchar)2) + uchar var_sz; /* number of populated entries in var */ + /* does/should the referenced packet contain: FD_QUIC_PKT_META_FLAGS_HS_DATA handshake data FD_QUIC_PKT_META_FLAGS_STREAM stream data @@ -47,7 +85,7 @@ struct fd_quic_pkt_meta { FD_QUIC_PKT_META_FLAGS_KEY_PHASE set only if key_phase was set in the short-header some of these flags are mutually exclusive */ - uint flags; /* flags */ + uint flags; /* flags */ # define FD_QUIC_PKT_META_FLAGS_HS_DATA (1u<<0u) # define FD_QUIC_PKT_META_FLAGS_STREAM (1u<<1u) # define FD_QUIC_PKT_META_FLAGS_HS_DONE (1u<<2u) @@ -59,16 +97,18 @@ struct fd_quic_pkt_meta { # define FD_QUIC_PKT_META_FLAGS_CLOSE (1u<<8u) # define FD_QUIC_PKT_META_FLAGS_KEY_UPDATE (1u<<9u) # define FD_QUIC_PKT_META_FLAGS_KEY_PHASE (1u<<10u) - fd_quic_range_t range; /* range of bytes referred to by this meta */ - /* stream data or crypto data */ - /* we currently do not put both in the same packet */ - ulong stream_id; /* if this contains stream data, - the stream id, else zero */ + fd_quic_range_t range; /* range of bytes referred to by this meta */ + /* stream data or crypto data */ + /* we currently do not put both in the same packet */ + ulong stream_id; /* if this contains stream data, + the stream id, else zero */ - ulong expiry; /* time pkt_meta expires... this is the time the + ulong expiry; /* time pkt_meta expires... this is the time the ack is expected by */ - fd_quic_pkt_meta_t * next; /* next in current list */ + fd_quic_pkt_meta_var_t var[FD_QUIC_PKT_META_VAR_MAX]; + + fd_quic_pkt_meta_t * next; /* next in current list */ }; diff --git a/src/tango/quic/tests/test_quic_drops.c b/src/tango/quic/tests/test_quic_drops.c index 9273d2036d..87d7f47fd2 100644 --- a/src/tango/quic/tests/test_quic_drops.c +++ b/src/tango/quic/tests/test_quic_drops.c @@ -8,7 +8,7 @@ #include /* number of streams to send/receive */ -#define NUM_STREAMS 100 +#define NUM_STREAMS 1000 /* done flags */ @@ -43,49 +43,10 @@ struct net_fibre_args { fd_fibre_pipe_t * input; fd_fibre_pipe_t * release; float thresh; + int dir; /* 0=client->server 1=server->client */ }; typedef struct net_fibre_args net_fibre_args_t; -void -net_fibre_main( void * vp_args ) { - /* get args */ - net_fibre_args_t * args = (net_fibre_args_t*)vp_args; - - /* input pipe, and destination aios */ - fd_fibre_pipe_t * input = args->input; - fd_fibre_pipe_t * release = args->release; - - float thresh = args->thresh; - - int rtn = 0; - ulong idx = 0; - - while( !(client_done && server_done) ) { - /* wait for data on pipe */ - rtn = fd_fibre_pipe_read( input, &idx, (long)60e9 ); - if( !rtn ) { - printf( "net_fibre_main: no data in 60s\n" ); - exit(1); - } - - /* we have a message */ - - /* drop? */ - if( rnd() < thresh ) { - /* free slot */ - rtn = fd_fibre_pipe_write( release, idx, (long)1e6 ); - if( rtn ) { - printf( "net_fibre_main: timeout trying to return idx\n" ); - exit(1); - } - - continue; - } - - /* could insert into a reorder buffer here */ - } -} - /* man-in-the-middle for testing drops */ @@ -93,8 +54,12 @@ struct mitm_ctx { fd_aio_t local; fd_aio_t const * dst; fd_aio_t const * pcap; - rng_t thresh; + rng_t thresh_drop; + rng_t thresh_reorder; int server; + + ulong reorder_sz; + uchar reorder_buf[2048]; }; typedef struct mitm_ctx mitm_ctx_t; @@ -104,6 +69,9 @@ mitm_tx( void * ctx, ulong batch_cnt, ulong * opt_batch_idx, int flush ) { + (void)flush; + (void)opt_batch_idx; + mitm_ctx_t * mitm_ctx = (mitm_ctx_t*)ctx; /* each time data transfers, the schedule might change @@ -111,19 +79,60 @@ mitm_tx( void * ctx, if( client_fibre && mitm_ctx->server ) fd_fibre_wake( client_fibre ); if( server_fibre && !mitm_ctx->server ) fd_fibre_wake( server_fibre ); - /* write to pcap even if dropping */ - if( mitm_ctx->pcap ) { - fd_aio_send( mitm_ctx->pcap, batch, batch_cnt, opt_batch_idx, 1 ); + /* write to pcap */ +#define PCAP( batch, batch_cnt ) \ + if( mitm_ctx->pcap ) { \ + fd_aio_send( mitm_ctx->pcap, (batch), (batch_cnt), NULL, 1 ); \ } - /* generate a random number and compare with threshold, and either pass thru or drop */ + /* go packet by packet */ + for( ulong j = 0UL; j < batch_cnt; ++j ) { + /* generate a random number and compare with threshold, and either pass thru or drop */ + + rng_t rnd_num = rnd(); - if( rnd() < mitm_ctx->thresh ) { - /* dropping behaves as-if the send was successful */ - return FD_AIO_SUCCESS; - } else { - return fd_aio_send( mitm_ctx->dst, batch, batch_cnt, opt_batch_idx, flush ); + if( rnd_num < mitm_ctx->thresh_drop ) { + /* dropping behaves as-if the send was successful */ + continue; + } + + if( rnd_num < mitm_ctx->thresh_reorder ) { + /* reorder */ + + /* logic: + if we already have a reordered buffer, delay it another packet + else store the current packet into the reorder buffer */ + if( mitm_ctx->reorder_sz > 0UL ) { + fd_aio_pkt_info_t lcl_batch[1] = { batch[j] }; + fd_aio_send( mitm_ctx->dst, lcl_batch, 1UL, NULL, 1 ); + PCAP(lcl_batch,1UL); + + /* clear buffer */ + mitm_ctx->reorder_sz = 0UL; + } else { + fd_memcpy( mitm_ctx->reorder_buf, batch[j].buf, batch[j].buf_sz ); + mitm_ctx->reorder_sz = batch[j].buf_sz; + } + continue; + } + + /* send new packet */ + fd_aio_pkt_info_t batch_0[1] = { batch[j] }; + fd_aio_send( mitm_ctx->dst, batch_0, 1UL, NULL, 1 ); + PCAP(batch_0,1UL); + + /* we aren't dropping or reordering, but we might have a prior reorder */ + if( mitm_ctx->reorder_sz > 0UL ) { + fd_aio_pkt_info_t batch_1[1] = {{ .buf = mitm_ctx->reorder_buf, .buf_sz = (ushort)mitm_ctx->reorder_sz }}; + fd_aio_send( mitm_ctx->dst, batch_1, 1UL, NULL, 1 ); + PCAP(batch_1,1UL); + + /* clear the sent buffer */ + mitm_ctx->reorder_sz = 0UL; + } } + + return FD_AIO_SUCCESS; } static void @@ -141,8 +150,9 @@ mitm_link( fd_quic_t * quic_a, fd_quic_t * quic_b, mitm_ctx_t * mitm, fd_aio_t c } static void -mitm_set_thresh( mitm_ctx_t * mitm_ctx, rng_t thresh ) { - mitm_ctx->thresh = thresh; +mitm_set_thresh( mitm_ctx_t * mitm_ctx, rng_t thresh_drop, rng_t thresh_reorder ) { + mitm_ctx->thresh_drop = thresh_drop; + mitm_ctx->thresh_reorder = thresh_reorder; } static void @@ -158,6 +168,7 @@ static void my_tls_keylog( void * quic_ctx, char const * line ) { (void)quic_ctx; + FD_LOG_WARNING(( "SECRET: %s", line )); fd_pcapng_fwrite_tls_key_log( (uchar const *)line, (uint)strlen( line ), pcap_server_to_client.pcapng ); } @@ -327,6 +338,25 @@ client_fibre_fn( void * vp_arg ) { if( !stream ) { if( conn->state == FD_QUIC_CONN_STATE_ACTIVE ) { FD_LOG_WARNING(( "Client unable to obtain a stream. now: %lu", (ulong)now )); + ulong live = next_wakeup + (ulong)1e9; + do { + next_wakeup = fd_quic_get_next_wakeup( quic ); + + if( next_wakeup > live ) { + live = next_wakeup + (ulong)next_wakeup; + FD_LOG_WARNING(( "Client waiting for a stream time: %lu", (ulong)now )); + } + + /* wake up at either next service or next send, whichever is sooner */ + fd_fibre_wait_until( (long)next_wakeup ); + + fd_quic_service( quic ); + + if( !conn ) break; + + stream = fd_quic_conn_new_stream( conn, FD_QUIC_TYPE_UNIDIR ); + } while( !stream ); + FD_LOG_WARNING(( "Client obtained a stream" )); } next_send = now + period_ns; /* ensure we make progress */ continue; @@ -343,6 +373,16 @@ client_fibre_fn( void * vp_arg ) { /* successful - stream will begin closing */ if( ++sent % 15 == 0 ) { + /* wait for last sends to complete */ + /* TODO add callback for this */ + ulong timeout = now + (ulong)3e6; + while( now < timeout ) { + fd_quic_service( quic ); + + /* allow server to process */ + fd_fibre_wait_until( (long)fd_quic_get_next_wakeup( quic ) ); + } + fd_quic_conn_close( conn, 0 ); sent = 0; @@ -497,8 +537,8 @@ main( int argc, char ** argv ) { mitm_link( client_quic, server_quic, &mitm_client_to_server, fd_aio_pcapng_get_aio( &pcap_client_to_server ) ); mitm_link( server_quic, client_quic, &mitm_server_to_client, fd_aio_pcapng_get_aio( &pcap_server_to_client ) ); - mitm_set_thresh( &mitm_client_to_server, 0.30f ); - mitm_set_thresh( &mitm_server_to_client, 0.30f ); + mitm_set_thresh( &mitm_client_to_server, 0.00f, 0.40f ); + mitm_set_thresh( &mitm_server_to_client, 0.00f, 0.40f ); mitm_set_server( &mitm_client_to_server, 0 ); mitm_set_server( &mitm_server_to_client, 1 );