Skip to content

Commit

Permalink
tpu: move txn parse into verify tile
Browse files Browse the repository at this point in the history
  • Loading branch information
mmcgee-jump committed May 24, 2024
1 parent f669b94 commit fffac4b
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 117 deletions.
80 changes: 53 additions & 27 deletions src/app/fdctl/run/tiles/fd_verify.c
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ during_frag( void * _ctx,

fd_verify_ctx_t * ctx = (fd_verify_ctx_t *)_ctx;

if( FD_UNLIKELY( chunk<ctx->in[in_idx].chunk0 || chunk>ctx->in[in_idx].wmark || sz > FD_TPU_DCACHE_MTU ) )
if( FD_UNLIKELY( chunk<ctx->in[in_idx].chunk0 || chunk>ctx->in[in_idx].wmark || sz>FD_TPU_MTU ) )
FD_LOG_ERR(( "chunk %lu %lu corrupt, not in range [%lu,%lu]", chunk, sz, ctx->in[in_idx].chunk0, ctx->in[in_idx].wmark ));

uchar * src = (uchar *)fd_chunk_to_laddr( ctx->in[in_idx].mem, chunk );
Expand All @@ -86,46 +86,72 @@ after_frag( void * _ctx,
(void)in_idx;
(void)seq;
(void)opt_sig;
(void)opt_tsorig;
(void)mux;
(void)opt_chunk;

fd_verify_ctx_t * ctx = (fd_verify_ctx_t *)_ctx;

/* Sanity check that should never fail. We should have atleast
FD_TPU_DCACHE_MTU bytes available. */
if( FD_UNLIKELY( *opt_sz < sizeof(ushort) ) ) {
FD_LOG_ERR( ("invalid opt_sz(%lx)", *opt_sz ) );
}
/* At this point, the payload only contains the serialized txn.
Beyond end of txn, but within bounds of msg layout, add a trailer
describing the txn layout.
[ payload ] (payload_sz bytes)
[ pad: align to 2B ] (0-1 bytes)
[ fd_txn_t ] (? bytes)
[ payload_sz ] (2B) */

ulong payload_sz = *opt_sz;
ulong txnt_off = fd_ulong_align_up( payload_sz, 2UL );

/* Ensure sufficient space to store trailer */

long txnt_maxsz = (long)FD_TPU_DCACHE_MTU -
(long)txnt_off -
(long)sizeof(ushort);
if( FD_UNLIKELY( txnt_maxsz<(long)FD_TXN_MAX_SZ ) ) FD_LOG_ERR(( "got malformed txn (sz %lu) does not fit in dcache", payload_sz ));

uchar const * txn = fd_chunk_to_laddr( ctx->out_mem, ctx->out_chunk );
fd_txn_t * txn_t = (fd_txn_t *)((ulong)txn + txnt_off);

uchar * udp_payload = (uchar *)fd_chunk_to_laddr( ctx->out_mem, ctx->out_chunk );
ushort payload_sz = *(ushort*)(udp_payload + *opt_sz - sizeof(ushort));
/* Parse transaction */

/* Make sure payload_sz is valid */
if( FD_UNLIKELY( payload_sz > FD_TPU_DCACHE_MTU ) ) {
FD_LOG_ERR( ("invalid payload_sz(%x)", payload_sz) );
ulong txn_t_sz = fd_txn_parse( txn, payload_sz, txn_t, NULL );
if( FD_UNLIKELY( !txn_t_sz ) ) {
*opt_filter = 1; /* Invalid txn fails to parse. */
return;
}

/* txn contents are located in shared memory accessible to the dedup tile
and the contents are controlled by the quic tile. We must perform
validation */
fd_txn_t * txn = (fd_txn_t*) fd_ulong_align_up( (ulong)(udp_payload) + payload_sz, 2UL );
/* Write payload_sz */

/* fd_txn_parse always returns a multiple of 2 so this sz is
correctly aligned. */
ushort * payload_sz_p = (ushort *)( (ulong)txn_t + txn_t_sz );
*payload_sz_p = (ushort)payload_sz;

/* End of message */

ulong new_sz = ( (ulong)payload_sz_p + sizeof(ushort) ) - (ulong)txn;
if( FD_UNLIKELY( new_sz>FD_TPU_DCACHE_MTU ) ) {
FD_LOG_CRIT(( "memory corruption detected (txn_sz=%lu txn_t_sz=%lu)",
payload_sz, txn_t_sz ));
}

/* We need to access signatures and accounts, which are all before the recent_blockhash_off.
We assert that the payload_sz includes all signatures and account pubkeys we need. */
ushort recent_blockhash_off = txn->recent_blockhash_off;
if( FD_UNLIKELY( recent_blockhash_off >= payload_sz ) ) {
FD_LOG_ERR( ("txn is invalid: payload_sz = %x, recent_blockhash_off = %x", payload_sz, recent_blockhash_off ) );
ushort recent_blockhash_off = txn_t->recent_blockhash_off;
if( FD_UNLIKELY( recent_blockhash_off>=*opt_sz ) ) {
FD_LOG_ERR( ("txn is invalid: payload_sz = %lx, recent_blockhash_off = %x", *opt_sz, recent_blockhash_off ) );
}

int res = fd_txn_verify( ctx, udp_payload, payload_sz, txn, opt_sig );
if( FD_UNLIKELY( res != FD_TXN_VERIFY_SUCCESS ) ) {
*opt_filter = 1;
ulong txn_sig;
int res = fd_txn_verify( ctx, txn, (ushort)payload_sz, txn_t, &txn_sig );
if( FD_UNLIKELY( res!=FD_TXN_VERIFY_SUCCESS ) ) {
*opt_filter = 1; /* Signature verification failed. */
return;
}

*opt_filter = 0;
*opt_chunk = ctx->out_chunk;
ctx->out_chunk = fd_dcache_compact_next( ctx->out_chunk, *opt_sz, ctx->out_chunk0, ctx->out_wmark );
ulong tspub = (ulong)fd_frag_meta_ts_comp( fd_tickcount() );
fd_mux_publish( mux, txn_sig, ctx->out_chunk, new_sz, 0UL, *opt_tsorig, tspub );
ctx->out_chunk = fd_dcache_compact_next( ctx->out_chunk, new_sz, ctx->out_chunk0, ctx->out_wmark );
}

static void
Expand Down Expand Up @@ -203,7 +229,7 @@ populate_allowed_fds( void * scratch,

fd_topo_run_tile_t fd_tile_verify = {
.name = "verify",
.mux_flags = FD_MUX_FLAG_COPY, /* must copy frags for tile isolation and security */
.mux_flags = FD_MUX_FLAG_COPY | FD_MUX_FLAG_MANUAL_PUBLISH,
.burst = 1UL,
.mux_ctx = mux_ctx,
.mux_before_frag = before_frag,
Expand Down
3 changes: 1 addition & 2 deletions src/disco/metrics/metrics.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@ metric introduced.
<int value="0" name="Success" label="Success" />
<int value="1" name="ErrorOversize" label="Oversize message" />
<int value="2" name="ErrorSkip" label="Out-of-order data within QUIC stream" />
<int value="3" name="ErrorTransaction" label="Rejected transaction" />
<int value="4" name="ErrorState" label="Unexpected slot state" />
<int value="3" name="ErrorState" label="Unexpected slot state" />
</enum>

<enum name="QuicStreamType">
Expand Down
5 changes: 2 additions & 3 deletions src/disco/quic/fd_tpu.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
/* FD_TPU_REASM_MTU is the max tango frag sz sent by an fd_tpu_reasm_t.
FD_TPU_REASM_CHUNK_MTU*FD_CHUNK_SZ == FD_TPU_REASM_MTU */

#define FD_TPU_REASM_CHUNK_MTU (FD_ULONG_ALIGN_UP( FD_TPU_DCACHE_MTU, FD_CHUNK_SZ )>>FD_CHUNK_LG_SZ)
#define FD_TPU_REASM_CHUNK_MTU (FD_ULONG_ALIGN_UP( FD_TPU_MTU, FD_CHUNK_SZ )>>FD_CHUNK_LG_SZ)
#define FD_TPU_REASM_MTU (FD_TPU_REASM_CHUNK_MTU<<FD_CHUNK_LG_SZ)

#define FD_TPU_REASM_ALIGN FD_CHUNK_ALIGN
Expand All @@ -36,8 +36,7 @@
#define FD_TPU_REASM_SUCCESS (0)
#define FD_TPU_REASM_ERR_SZ (1) /* oversz msg */
#define FD_TPU_REASM_ERR_SKIP (2) /* out-of-order data within QUIC stream */
#define FD_TPU_REASM_ERR_TXN (3) /* rejected transaction (invalid?) */
#define FD_TPU_REASM_ERR_STATE (4) /* unexpected slot state */
#define FD_TPU_REASM_ERR_STATE (3) /* unexpected slot state */

/* FD_TPU_REASM_STATE_{...} are reasm slot states */

Expand Down
62 changes: 0 additions & 62 deletions src/disco/quic/fd_tpu_reasm.c
Original file line number Diff line number Diff line change
Expand Up @@ -166,62 +166,6 @@ fd_tpu_reasm_append( fd_tpu_reasm_t * reasm,
return FD_TPU_REASM_SUCCESS;
}

static fd_tpu_reasm_slot_t *
append_descriptor( fd_tpu_reasm_slot_t * slot,
uchar * data ) {

/* At this point, the payload only contains the serialized txn.
Beyond end of txn, but within bounds of msg layout, add a trailer
describing the txn layout.
[ payload ] (txn_sz bytes)
[ pad-align 2B ] (? bytes)
[ fd_txn_t ] (? bytes)
[ payload_sz ] (2B) */

ulong txn_sz = slot->sz;
ulong txnt_off = fd_ulong_align_up( txn_sz, 2UL );

/* Ensure sufficient space to store trailer */

long txnt_maxsz = (long)FD_TPU_DCACHE_MTU -
(long)txnt_off -
(long)sizeof(ushort);
if( FD_UNLIKELY( txnt_maxsz < (long)FD_TXN_MAX_SZ ) ) {
FD_LOG_WARNING(( "not enough chunks to fit txn (sz %lu)", txn_sz ));
return NULL;
}

uchar const * txn = data;
void * txn_t = data + txnt_off;

/* Parse transaction */

ulong txn_t_sz = fd_txn_parse( txn, txn_sz, txn_t, NULL );
if( FD_UNLIKELY( !txn_t_sz ) ) {
FD_LOG_DEBUG(( "fd_txn_parse(sz=%lu) failed", txn_sz ));
return NULL; /* invalid txn (punish QUIC client?) */
}

/* Write payload_sz */

ushort * payload_sz_p = (ushort *)( (ulong)txn_t + txn_t_sz );
/* TODO assert payload_sz is aligned by alignof(ushort)? */
*payload_sz_p = (ushort)txn_sz;

/* End of message */

ulong new_sz = ( (ulong)payload_sz_p + sizeof(ushort) ) - (ulong)data;
if( FD_UNLIKELY( new_sz>FD_TPU_DCACHE_MTU ) ) {
FD_LOG_CRIT(( "memory corruption detected (txn_sz=%lu txn_t_sz=%lu)",
txn_sz, txn_t_sz ));
return NULL;
}

slot->sz = (ushort)new_sz;
return slot;
}

int
fd_tpu_reasm_publish( fd_tpu_reasm_t * reasm,
fd_tpu_reasm_slot_t * slot,
Expand All @@ -245,12 +189,6 @@ fd_tpu_reasm_publish( fd_tpu_reasm_t * reasm,
base, (void *)slot, (void *)reasm ));
}

/* Parse transaction and append descriptor */
if( FD_UNLIKELY( !append_descriptor( slot, data ) ) ) {
fd_tpu_reasm_cancel( reasm, slot );
return FD_TPU_REASM_ERR_TXN;
}

/* Acquire mcache line */
ulong depth = reasm->depth;
fd_frag_meta_t * meta = mcache + fd_mcache_line_idx( seq, depth );
Expand Down
24 changes: 1 addition & 23 deletions src/disco/quic/test_tpu_reasm.c
Original file line number Diff line number Diff line change
Expand Up @@ -153,23 +153,6 @@ main( int argc,
fd_tpu_reasm_reset( reasm );
verify_state( reasm, mcache );

FD_LOG_INFO(( "Test invalid txn" ));

do {
fd_tpu_reasm_slot_t * slot = fd_tpu_reasm_prepare( reasm, 0UL );
FD_TEST( slot->state == FD_TPU_REASM_STATE_BUSY );
slot->sz = 0UL; /* guaranteed to fail parse */
FD_TEST( fd_tpu_reasm_publish( reasm, slot, mcache, reasm, seq, 0UL )
== FD_TPU_REASM_ERR_TXN );
FD_TEST( slot->state == FD_TPU_REASM_STATE_FREE );
verify_state( reasm, mcache );

fd_frag_meta_t * mline = mcache + fd_mcache_line_idx( seq, depth );
FD_TEST( mline->seq != seq );

seq = fd_seq_inc( seq, 1UL );
} while(0);

FD_LOG_INFO(( "Test basic publishing" ));

do {
Expand All @@ -186,7 +169,7 @@ main( int argc,
fd_frag_meta_t * mline = mcache + line_idx;

FD_TEST( mline->seq == seq );
FD_TEST( mline->sz >= transaction4_sz+sizeof(fd_txn_t) );
FD_TEST( mline->sz == transaction4_sz );
FD_TEST( (ulong)(slot - slots) == pub_slots[ line_idx ] );

uchar const * data = fd_chunk_to_laddr_const( tpu_reasm_mem, mline->chunk );
Expand Down Expand Up @@ -231,11 +214,6 @@ main( int argc,
FD_TEST( slot->state == FD_TPU_REASM_STATE_FREE );
FD_TEST( reasm->tail == slot_idx );
check_free_diff( verify_state( reasm, mcache ), +1L );
} else if( roll<0x40000000U ) {
FD_TEST( fd_tpu_reasm_publish( reasm, slot, mcache, reasm, seq, fd_rng_ulong( rng ) )
== FD_TPU_REASM_ERR_TXN );
seq = fd_seq_inc( seq, 1UL );
check_free_diff( verify_state( reasm, mcache ), +1L );
} else {
FD_TEST( fd_tpu_reasm_append( reasm, slot, transaction4, transaction4_sz, 0UL )
== FD_TPU_REASM_SUCCESS );
Expand Down

0 comments on commit fffac4b

Please sign in to comment.