Skip to content

Commit

Permalink
net: clean up sigs + route based on src port
Browse files Browse the repository at this point in the history
  • Loading branch information
mmcgee-jump committed Mar 8, 2024
1 parent 7cf25d2 commit 6548ed1
Show file tree
Hide file tree
Showing 6 changed files with 72 additions and 77 deletions.
7 changes: 3 additions & 4 deletions src/app/fdctl/config.c
Original file line number Diff line number Diff line change
Expand Up @@ -780,9 +780,9 @@ topo_initialize( config_t * config ) {
tile->net.xdp_rx_queue_size = config->tiles.net.xdp_rx_queue_size;
tile->net.xdp_tx_queue_size = config->tiles.net.xdp_tx_queue_size;
tile->net.src_ip_addr = config->tiles.net.ip_addr;
tile->net.allow_ports[ 0 ] = config->tiles.quic.regular_transaction_listen_port;
tile->net.allow_ports[ 1 ] = config->tiles.quic.quic_transaction_listen_port;
tile->net.allow_ports[ 2 ] = config->tiles.shred.shred_listen_port;
tile->net.shred_listen_port = config->tiles.shred.shred_listen_port;
tile->net.quic_transaction_listen_port = config->tiles.quic.quic_transaction_listen_port;
tile->net.legacy_transaction_listen_port = config->tiles.quic.regular_transaction_listen_port;

} else if( FD_UNLIKELY( !strcmp( tile->name, "netmux" ) ) ) {

Expand All @@ -798,7 +798,6 @@ topo_initialize( config_t * config ) {
tile->quic.tx_buf_size = config->tiles.quic.tx_buf_size;
tile->quic.ip_addr = config->tiles.net.ip_addr;
tile->quic.quic_transaction_listen_port = config->tiles.quic.quic_transaction_listen_port;
tile->quic.legacy_transaction_listen_port = config->tiles.quic.regular_transaction_listen_port;
tile->quic.idle_timeout_millis = config->tiles.quic.idle_timeout_millis;
tile->quic.retry = config->tiles.quic.retry;
tile->quic.max_concurrent_streams_per_connection = config->tiles.quic.max_concurrent_streams_per_connection;
Expand Down
50 changes: 31 additions & 19 deletions src/app/fdctl/run/tiles/fd_net.c
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@

#include <linux/unistd.h>

#define FD_NET_PORT_ALLOW_CNT (sizeof(((fd_topo_tile_t*)0)->net.allow_ports)/sizeof(((fd_topo_tile_t*)0)->net.allow_ports[ 0 ]))

typedef struct {
ulong xsk_aio_cnt;
fd_xsk_aio_t * xsk_aio[ 2 ];
Expand All @@ -28,7 +26,10 @@ typedef struct {

uint src_ip_addr;
uchar src_mac_addr[6];
ushort allow_ports[ FD_NET_PORT_ALLOW_CNT ];

ushort shred_listen_port;
ushort quic_transaction_listen_port;
ushort legacy_transaction_listen_port;

fd_wksp_t * in_mem;
ulong in_chunk0;
Expand Down Expand Up @@ -126,22 +127,29 @@ net_rx_aio_send( void * _ctx,
/* Ignore if UDP header is too short */
if( FD_UNLIKELY( udp+8U > packet_end ) ) continue;

/* Extract IP dest addr and UDP dest port */
/* Extract IP dest addr and UDP src/dest port */
uint ip_srcaddr = *(uint *)( iphdr+12UL );
ushort udp_srcport = fd_ushort_bswap( *(ushort *)( udp+0UL ) );
ushort udp_dstport = fd_ushort_bswap( *(ushort *)( udp+2UL ) );

int allow_port = 0;
for( ulong i=0UL; i<FD_NET_PORT_ALLOW_CNT; i++ ) allow_port |= udp_dstport==ctx->allow_ports[ i ];
if( FD_UNLIKELY( !allow_port ) )
ushort proto;
if( FD_UNLIKELY( udp_dstport==ctx->shred_listen_port ) ) proto = DST_PROTO_SHRED;
else if( FD_UNLIKELY( udp_dstport==ctx->quic_transaction_listen_port) ) proto = DST_PROTO_TPU_QUIC;
else if( FD_UNLIKELY( udp_dstport==ctx->legacy_transaction_listen_port) ) proto = DST_PROTO_TPU_UDP;
else {
FD_LOG_ERR(( "Firedancer received a UDP packet on port %hu which was not expected. "
"Only ports %hu, %hu, and %hu should be configured to forward packets. Do "
"you need to reload the XDP program?",
udp_dstport, ctx->allow_ports[ 0 ], ctx->allow_ports[ 1 ], ctx->allow_ports[ 2 ] ));
udp_dstport,
ctx->shred_listen_port,
ctx->quic_transaction_listen_port,
ctx->legacy_transaction_listen_port ));
}

fd_memcpy( fd_chunk_to_laddr( ctx->out_mem, ctx->out_chunk ), packet, batch[i].buf_sz );

/* tile can decide how to partition based on src ip addr and port */
ulong sig = fd_disco_netmux_sig( ip_srcaddr, udp_dstport, 14UL+8UL+iplen, SRC_TILE_NET, 0 );
/* tile can decide how to partition based on src ip addr and src port */
ulong sig = fd_disco_netmux_sig( ip_srcaddr, udp_srcport, 0U, proto, 14UL+8UL+iplen );

ulong tspub = (ulong)fd_frag_meta_ts_comp( fd_tickcount() );
fd_mux_publish( ctx->mux, sig, ctx->out_chunk, batch[i].buf_sz, 0, 0, tspub );
Expand Down Expand Up @@ -182,8 +190,8 @@ during_housekeeping( void * _ctx ) {
FD_FN_PURE static int
route_loopback( uint tile_ip_addr,
ulong sig ) {
return fd_disco_netmux_sig_ip_addr( sig )==FD_IP4_ADDR(127,0,0,1) ||
fd_disco_netmux_sig_ip_addr( sig )==tile_ip_addr;
return fd_disco_netmux_sig_dst_ip( sig )==FD_IP4_ADDR(127,0,0,1) ||
fd_disco_netmux_sig_dst_ip( sig )==tile_ip_addr;
}

static void
Expand All @@ -196,21 +204,26 @@ before_frag( void * _ctx,

fd_net_ctx_t * ctx = (fd_net_ctx_t *)_ctx;

ushort src_tile = fd_disco_netmux_sig_src_tile( sig );
ulong proto = fd_disco_netmux_sig_proto( sig );
if( FD_UNLIKELY( proto!=DST_PROTO_OUTGOING ) ) {
*opt_filter = 1;
return;
}

/* Round robin by sequence number for now, QUIC should be modified to
echo the net tile index back so we can transmit on the same queue.
127.0.0.1 packets for localhost must go out on net tile 0 which
owns the loopback interface XSK, which only has 1 queue. */

int handled_packet = 0;
if( FD_UNLIKELY( route_loopback( ctx->src_ip_addr, sig ) ) ) {
handled_packet = ctx->round_robin_id == 0;
} else {
handled_packet = (seq % ctx->round_robin_cnt) == ctx->round_robin_id;
}

if( FD_UNLIKELY( src_tile==SRC_TILE_NET || !handled_packet ) ) {
if( FD_UNLIKELY( !handled_packet ) ) {
*opt_filter = 1;
}
}
Expand Down Expand Up @@ -285,7 +298,7 @@ after_frag( void * _ctx,
ctx->lo_tx->send_func( ctx->xsk_aio[ 1 ], &aio_buf, 1, NULL, 1 );
} else {
/* extract dst ip */
uint dst_ip = fd_uint_bswap( fd_disco_netmux_sig_ip_addr( *opt_sig ) );
uint dst_ip = fd_uint_bswap( fd_disco_netmux_sig_dst_ip( *opt_sig ) );

uint next_hop = 0U;
uchar dst_mac[6] = {0};
Expand Down Expand Up @@ -437,10 +450,9 @@ unprivileged_init( fd_topo_t * topo,
ctx->src_ip_addr = tile->net.src_ip_addr;
memcpy( ctx->src_mac_addr, tile->net.src_mac_addr, 6UL );

for( ulong i=0UL; i<FD_NET_PORT_ALLOW_CNT; i++ ) {
if( FD_UNLIKELY( !tile->net.allow_ports[ i ] ) ) FD_LOG_ERR(( "net tile listen port %lu was 0", i ));
ctx->allow_ports[ i ] = tile->net.allow_ports[ i ];
}
ctx->shred_listen_port = tile->net.shred_listen_port;
ctx->quic_transaction_listen_port = tile->net.quic_transaction_listen_port;
ctx->legacy_transaction_listen_port = tile->net.legacy_transaction_listen_port;

/* Put a bound on chunks we read from the input, to make sure they
are within in the data region of the workspace. */
Expand Down
34 changes: 8 additions & 26 deletions src/app/fdctl/run/tiles/fd_quic.c
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,6 @@ typedef struct {
fd_quic_t * quic;
const fd_aio_t * quic_rx_aio;

ushort legacy_transaction_port; /* port for receiving non-QUIC (raw UDP) transactions on*/

fd_keyguard_client_t keyguard_client[1];

uchar buffer[ FD_NET_MTU ];
Expand Down Expand Up @@ -240,29 +238,17 @@ before_frag( void * _ctx,

fd_quic_ctx_t * ctx = (fd_quic_ctx_t *)_ctx;

ushort dst_port = fd_disco_netmux_sig_port( sig );
ulong src_ip_addr = fd_disco_netmux_sig_ip_addr( sig );
ushort src_tile = fd_disco_netmux_sig_src_tile( sig );

if( FD_UNLIKELY( src_tile!=SRC_TILE_NET ) ) {
ulong proto = fd_disco_netmux_sig_proto( sig );
if( FD_UNLIKELY( proto!=DST_PROTO_TPU_UDP && proto!=DST_PROTO_TPU_QUIC ) ) {
*opt_filter = 1;
return;
}

int handled_port = dst_port == ctx->legacy_transaction_port ||
dst_port == ctx->quic->config.net.listen_udp_port;

/* Ignore traffic e.g. for shred tile */
if( FD_UNLIKELY( !handled_port ) ) {
ulong hash = fd_disco_netmux_sig_hash( sig );
if( FD_UNLIKELY( (hash % ctx->round_robin_cnt) != ctx->round_robin_id ) ) {
*opt_filter = 1;
return;
}

int handled_ip_address = (src_ip_addr % ctx->round_robin_cnt) == ctx->round_robin_id;

if( FD_UNLIKELY( !handled_port || !handled_ip_address ) ) {
*opt_filter = 1;
}
}

static void
Expand Down Expand Up @@ -306,12 +292,12 @@ after_frag( void * _ctx,

fd_quic_ctx_t * ctx = (fd_quic_ctx_t *)_ctx;

ushort dst_port = fd_disco_netmux_sig_port( *opt_sig );
ulong proto = fd_disco_netmux_sig_proto( *opt_sig );

if( FD_LIKELY( dst_port==ctx->quic->config.net.listen_udp_port ) ) {
if( FD_LIKELY( proto==DST_PROTO_TPU_QUIC ) ) {
fd_aio_pkt_info_t pkt = { .buf = ctx->buffer, .buf_sz = (ushort)*opt_sz };
fd_aio_send( ctx->quic_rx_aio, &pkt, 1, NULL, 1 );
} else if( FD_LIKELY( dst_port==ctx->legacy_transaction_port ) ) {
} else if( FD_LIKELY( proto==DST_PROTO_TPU_UDP ) ) {
ulong network_hdr_sz = fd_disco_netmux_sig_hdr_sz( *opt_sig );
if( FD_UNLIKELY( *opt_sz<network_hdr_sz ) ) {
/* Transaction not valid if the packet isn't large enough for the network
Expand Down Expand Up @@ -496,7 +482,6 @@ quic_tx_aio_send( void * _ctx,

uint test_ethip = ( (uint)packet[12] << 16u ) | ( (uint)packet[13] << 8u ) | (uint)packet[23];
uint ip_dstaddr = 0;
ushort udp_dstport = 0;
if( FD_LIKELY( test_ethip==0x080011 ) ) {
/* IPv4 is variable-length, so lookup IHL to find start of UDP */
uint iplen = ( ( (uint)iphdr[0] ) & 0x0FU ) * 4U;
Expand All @@ -510,12 +495,11 @@ quic_tx_aio_send( void * _ctx,

/* Extract IP dest addr and UDP dest port */
ip_dstaddr = *(uint *)( iphdr+16UL );
udp_dstport = fd_ushort_bswap( *(ushort *)( udp+2UL ) );
}

/* send packets are just round-robined by sequence number, so for now
just indicate where they came from so they don't bounce back */
ulong sig = fd_disco_netmux_sig( ip_dstaddr, udp_dstport, FD_NETMUX_SIG_MIN_HDR_SZ, SRC_TILE_QUIC, 0 );
ulong sig = fd_disco_netmux_sig( 0U, 0U, ip_dstaddr, DST_PROTO_OUTGOING, FD_NETMUX_SIG_MIN_HDR_SZ );

ulong tspub = (ulong)fd_frag_meta_ts_comp( fd_tickcount() );
fd_mcache_publish( ctx->net_out_mcache,
Expand Down Expand Up @@ -683,8 +667,6 @@ unprivileged_init( fd_topo_t * topo,
ctx->round_robin_cnt = fd_topo_tile_name_cnt( topo, tile->name );
ctx->round_robin_id = tile->kind_id;

ctx->legacy_transaction_port = tile->quic.legacy_transaction_listen_port;

ulong scratch_top = FD_SCRATCH_ALLOC_FINI( l, 1UL );
if( FD_UNLIKELY( scratch_top > (ulong)scratch + scratch_footprint( tile ) ) )
FD_LOG_ERR(( "scratch overflow %lu %lu %lu", scratch_top - (ulong)scratch - scratch_footprint( tile ), scratch_top, (ulong)scratch + scratch_footprint( tile ) ));
Expand Down
11 changes: 4 additions & 7 deletions src/app/fdctl/run/tiles/fd_shred.c
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,6 @@ typedef struct {

uint src_ip_addr;
uchar src_mac_addr[ 6 ];
ushort shred_listen_port;

/* shred34 and fec_sets are very related: fec_sets[i] has pointers
to the shreds in shred34[4*i + k] for k=0,1,2,3. */
Expand Down Expand Up @@ -262,17 +261,16 @@ finalize_new_cluster_contact_info( fd_shred_ctx_t * ctx ) {
}

static void
before_frag( void * _ctx,
before_frag( void * ctx,
ulong in_idx,
ulong seq,
ulong sig,
int * opt_filter ) {
(void)ctx;
(void)seq;

fd_shred_ctx_t * ctx = (fd_shred_ctx_t *)_ctx;

if( FD_LIKELY( in_idx==NET_IN_IDX ) ) {
*opt_filter = fd_disco_netmux_sig_port( sig )!=ctx->shred_listen_port;
*opt_filter = fd_disco_netmux_sig_proto( sig )!=DST_PROTO_SHRED;
} else if( FD_LIKELY( in_idx==POH_IN_IDX ) ) {
*opt_filter = fd_disco_poh_sig_pkt_type( sig )!=POH_PKT_TYPE_MICROBLOCK;
}
Expand Down Expand Up @@ -433,7 +431,7 @@ send_shred( fd_shred_ctx_t * ctx,
ulong pkt_sz = shred_sz + sizeof(eth_ip_udp_t);

ulong tspub = fd_frag_meta_ts_comp( fd_tickcount() );
ulong sig = fd_disco_netmux_sig( dest->ip4, dest->port, FD_NETMUX_SIG_MIN_HDR_SZ, SRC_TILE_SHRED, (ushort)0 );
ulong sig = fd_disco_netmux_sig( 0U, 0U, dest->ip4, DST_PROTO_OUTGOING, FD_NETMUX_SIG_MIN_HDR_SZ );
fd_mcache_publish( ctx->net_out_mcache, ctx->net_out_depth, ctx->net_out_seq, sig, ctx->net_out_chunk,
pkt_sz, 0UL, tsorig, tspub );
ctx->net_out_seq = fd_seq_inc( ctx->net_out_seq, 1UL );
Expand Down Expand Up @@ -789,7 +787,6 @@ unprivileged_init( fd_topo_t * topo,

ctx->src_ip_addr = tile->shred.ip_addr;
fd_memcpy( ctx->src_mac_addr, tile->shred.src_mac_addr, 6UL );
ctx->shred_listen_port = tile->shred.shred_listen_port;

ctx->pending_batch.microblock_cnt = 0UL;
ctx->pending_batch.pos = 0UL;
Expand Down
41 changes: 22 additions & 19 deletions src/disco/fd_disco_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@

#include "../util/wksp/fd_wksp_private.h"

#define SRC_TILE_NET (0UL)
#define SRC_TILE_QUIC (1UL)
#define SRC_TILE_SHRED (2UL)
#define DST_PROTO_OUTGOING (0UL)
#define DST_PROTO_TPU_UDP (1UL)
#define DST_PROTO_TPU_QUIC (2UL)
#define DST_PROTO_SHRED (3UL)

#define POH_PKT_TYPE_MICROBLOCK (0UL)
#define POH_PKT_TYPE_BECAME_LEADER (1UL)
Expand Down Expand Up @@ -40,25 +41,33 @@ FD_PROTOTYPES_BEGIN
/* hdr_sz is the total size of network headers, including eth, ip, udp.
Ignored for outgoing packets. */
FD_FN_CONST static inline ulong
fd_disco_netmux_sig( ulong ip_addr,
ushort port,
ulong hdr_sz,
ushort src_tile,
ushort dst_idx ) {
fd_disco_netmux_sig( uint src_ip_addr,
ushort src_port,
uint dst_ip_addr,
ulong proto,
ulong hdr_sz ) {
/* The size of an Ethernet header is 14+4k bytes, where 0<=k<=3 (?) is
the number of vlan tags. The size of an IP header is 4j, where
5<=j<=15 is the size given in the header. The size of a UDP header
is 8B. Thus, the total sum of these is 42+4i, where i=k+j-5,
0<=i<=13. Since bits are at a premium here, we compress the header
size by just storing i. */
ulong hdr_sz_i = ((hdr_sz - 42UL)>>2)&0xFUL;
return (((ulong)ip_addr)<<32UL) | (((ulong)port)<<16UL) | ((src_tile&0xFUL)<<12UL) | ((hdr_sz_i&0xFUL)<<4UL) | (dst_idx&0xFUL);
ulong hash = fd_uint_hash( src_ip_addr ) + src_port;

/* Currently only using 52 bits of the provided 64. */
return (hash<<56UL) | ((proto&0xFFUL)<<48UL) | ((hdr_sz_i&0xFUL)<<44UL) | (((ulong)dst_ip_addr)<<12UL);
}

FD_FN_CONST static inline uint fd_disco_netmux_sig_ip_addr ( ulong sig ) { return (uint)((sig>>32UL) & 0xFFFFFFFFUL); }
FD_FN_CONST static inline ushort fd_disco_netmux_sig_port ( ulong sig ) { return (sig>>16UL) & 0xFFFFUL; }
FD_FN_CONST static inline ushort fd_disco_netmux_sig_src_tile( ulong sig ) { return (sig>>12UL) & 0xFUL; }
FD_FN_CONST static inline ushort fd_disco_netmux_sig_dst_idx ( ulong sig ) { return (sig>> 0UL) & 0xFUL; }
FD_FN_CONST static inline ulong fd_disco_netmux_sig_hash ( ulong sig ) { return (sig>>56UL) & 0xFFUL; }
FD_FN_CONST static inline ulong fd_disco_netmux_sig_proto ( ulong sig ) { return (sig>>48UL) & 0xFFUL; }
FD_FN_CONST static inline uint fd_disco_netmux_sig_dst_ip( ulong sig ) { return (uint)((sig>>12UL) & 0xFFFFFFFFUL); }

/* fd_disco_netmux_sig_hdr_sz extracts the total size of the Ethernet,
IP, and UDP headers from the netmux signature field. The UDP payload
of the packet stored in the corresponding frag begins at the returned
offset. */
FD_FN_CONST static inline ulong fd_disco_netmux_sig_hdr_sz( ulong sig ) { return 4UL*((sig>>44UL) & 0xFUL) + 42UL; }

FD_FN_CONST static inline ulong
fd_disco_poh_sig( ulong slot,
Expand All @@ -77,12 +86,6 @@ FD_FN_CONST static inline ulong fd_disco_poh_sig_pkt_type( ulong sig ) { return
FD_FN_CONST static inline ulong fd_disco_poh_sig_slot( ulong sig ) { return (sig >> 8); }
FD_FN_CONST static inline ulong fd_disco_poh_sig_bank_tile( ulong sig ) { return (sig >> 1) & 0x7FUL; }

/* fd_disco_netmux_sig_hdr_sz extracts the total size of the Ethernet,
IP, and UDP headers from the netmux signature field. The UDP payload
of the packet stored in the corresponding frag begins at the returned
offset. */
FD_FN_CONST static inline ulong fd_disco_netmux_sig_hdr_sz ( ulong sig ) { return 4UL*((sig>>4UL) & 0xFUL) + 42UL; }

FD_FN_PURE static inline ulong
fd_disco_compact_chunk0( void * wksp ) {
return (((struct fd_wksp_private *)wksp)->gaddr_lo) >> FD_CHUNK_LG_SZ;
Expand Down
6 changes: 4 additions & 2 deletions src/disco/topo/fd_topo.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,10 @@ typedef struct {
ulong xdp_aio_depth;
uint src_ip_addr;
uchar src_mac_addr[6];
ushort allow_ports[ 3 ];

ushort shred_listen_port;
ushort quic_transaction_listen_port;
ushort legacy_transaction_listen_port;
} net;

struct {
Expand All @@ -141,7 +144,6 @@ typedef struct {
uint ip_addr;
uchar src_mac_addr[ 6 ];
ushort quic_transaction_listen_port;
ushort legacy_transaction_listen_port;
ulong idle_timeout_millis;
char identity_key_path[ PATH_MAX ];
int retry;
Expand Down

0 comments on commit 6548ed1

Please sign in to comment.