From d63510fd089a28510ee9c359b05b2f917d75c721 Mon Sep 17 00:00:00 2001 From: Philip Taffet <123486962+ptaffet-jump@users.noreply.github.com> Date: Wed, 23 Oct 2024 10:24:45 -0500 Subject: [PATCH] pack: pace CUs through block --- src/app/fdctl/run/tiles/fd_pack.c | 43 +++++++++++++++-- src/ballet/pack/fd_pack.c | 3 +- src/ballet/pack/fd_pack.h | 8 ++++ src/ballet/pack/fd_pack_pacing.h | 78 +++++++++++++++++++++++++++++++ 4 files changed, 128 insertions(+), 4 deletions(-) create mode 100644 src/ballet/pack/fd_pack_pacing.h diff --git a/src/app/fdctl/run/tiles/fd_pack.c b/src/app/fdctl/run/tiles/fd_pack.c index b4339d0923..ad0d21ff19 100644 --- a/src/app/fdctl/run/tiles/fd_pack.c +++ b/src/app/fdctl/run/tiles/fd_pack.c @@ -6,6 +6,7 @@ #include "../../../../disco/shred/fd_shredder.h" #include "../../../../disco/metrics/fd_metrics.h" #include "../../../../ballet/pack/fd_pack.h" +#include "../../../../ballet/pack/fd_pack_pacing.h" #include @@ -137,6 +138,10 @@ typedef struct { ulong slot_max_data; int larger_shred_limits_per_block; + /* Cost limit (in cost units) for each block. Typically + FD_PACK_MAX_COST_PER_BLOCK or LARDER_MAX_COST_PER_BLOCK. */ + ulong slot_max_cost; + /* If drain_banks is non-zero, then the pack tile must wait until all banks are idle before scheduling any more microblocks. This is primarily helpful in irregular leader transitions, e.g. while being @@ -160,6 +165,14 @@ typedef struct { long _slot_end_ns; long slot_end_ns; + /* pacer and ticks_per_ns are used for pacing CUs through the slot, + i.e. deciding when to schedule a microblock given the number of CUs + that have been consumed so far. pacer is an opaque pacing object, + which is initialized when the pack tile is packing a slot. + ticks_per_ns is the cached value from tempo. */ + fd_pack_pacing_t pacer[1]; + double ticks_per_ns; + /* last_successful_insert stores the tickcount of the last successful transaction insert. */ long last_successful_insert; @@ -190,6 +203,7 @@ typedef struct { int poll_cursor; /* in [0, bank_cnt), the next bank to poll */ int use_consumed_cus; long skip_cnt; + long schedule_next; /* the tick value at which to schedule the next block for pacing purposes */ ulong * bank_current[ FD_PACK_PACK_MAX_OUT ]; ulong bank_expect[ FD_PACK_PACK_MAX_OUT ]; /* bank_ready_at[x] means don't check bank x until tickcount is at @@ -346,6 +360,8 @@ after_credit( fd_pack_ctx_t * ctx, long now = fd_tickcount(); + if( FD_UNLIKELY( nowschedule_next ) ) return; + ulong bank_cnt = ctx->bank_cnt; /* If we're using CU rebates, then we have one in for each bank in @@ -484,8 +500,9 @@ after_credit( fd_pack_ctx_t * ctx, if( FD_LIKELY( schedule_cnt ) ) { any_scheduled = 1; - ulong tsorig = (ulong)fd_frag_meta_ts_comp( now ); /* A bound on when we observed bank was idle */ - ulong tspub = (ulong)fd_frag_meta_ts_comp( fd_tickcount() ); + long now2 = fd_tickcount(); + ulong tsorig = (ulong)fd_frag_meta_ts_comp( now ); /* A bound on when we observed bank was idle */ + ulong tspub = (ulong)fd_frag_meta_ts_comp( now2 ); ulong chunk = ctx->out_chunk; ulong msg_sz = schedule_cnt*sizeof(fd_txn_p_t); fd_microblock_bank_trailer_t * trailer = (fd_microblock_bank_trailer_t*)((uchar*)microblock_dst+msg_sz); @@ -495,12 +512,13 @@ after_credit( fd_pack_ctx_t * ctx, ulong sig = fd_disco_poh_sig( ctx->leader_slot, POH_PKT_TYPE_MICROBLOCK, (ulong)i ); fd_stem_publish( stem, 0UL, sig, chunk, msg_sz+sizeof(fd_microblock_bank_trailer_t), 0UL, tsorig, tspub ); ctx->bank_expect[ i ] = stem->seqs[0]-1UL; - ctx->bank_ready_at[i] = now + (long)ctx->microblock_duration_ticks; + ctx->bank_ready_at[i] = now2 + (long)ctx->microblock_duration_ticks; ctx->out_chunk = fd_dcache_compact_next( ctx->out_chunk, msg_sz+sizeof(fd_microblock_bank_trailer_t), ctx->out_chunk0, ctx->out_wmark ); ctx->slot_microblock_cnt++; ctx->bank_idle_bitset = fd_ulong_pop_lsb( ctx->bank_idle_bitset ); ctx->skip_cnt = (long)schedule_cnt * fd_long_if( ctx->use_consumed_cus, (long)bank_cnt/2L, 1L ); + ctx->schedule_next = fd_pack_pacing_next( ctx->pacer, fd_pack_current_block_cost( ctx->pack ), now2 ); } } @@ -561,6 +579,9 @@ during_frag( fd_pack_ctx_t * ctx, if( FD_UNLIKELY( chunkin[ in_idx ].chunk0 || chunk>ctx->in[ in_idx ].wmark || sz!=sizeof(fd_became_leader_t) ) ) FD_LOG_ERR(( "chunk %lu %lu corrupt, not in range [%lu,%lu]", chunk, sz, ctx->in[ in_idx ].chunk0, ctx->in[ in_idx ].wmark )); + long now_ticks = fd_tickcount(); + long now_ns = fd_log_wallclock(); + if( FD_UNLIKELY( ctx->leader_slot!=ULONG_MAX ) ) { FD_LOG_WARNING(( "switching to slot %lu while packing for slot %lu. Draining bank tiles.", fd_disco_poh_sig_slot( sig ), ctx->leader_slot )); ctx->drain_banks = 1; @@ -579,6 +600,14 @@ during_frag( fd_pack_ctx_t * ctx, /* Reserve some space in the block for ticks */ ctx->slot_max_data = (ctx->larger_shred_limits_per_block ? LARGER_MAX_DATA_PER_BLOCK : FD_PACK_MAX_DATA_PER_BLOCK) - 48UL*(became_leader->ticks_per_slot+became_leader->total_skipped_ticks); + /* ticks_per_ns is probably relatively stable over 400ms, but not + over several hours, so we need to compute the slot duration in + milliseconds first and then convert to ticks. This doesn't need + to be super accurate, but we don't want it to vary wildly. */ + long end_ticks = now_ticks + (long)((double)fd_long_max( became_leader->slot_end_ns - now_ns, 1L )*ctx->ticks_per_ns); + /* We may still get overrun, but then we'll never use this and just + reinitialize it the next time when we actually become leader. */ + fd_pack_pacing_init( ctx->pacer, now_ticks, end_ticks, ctx->slot_max_cost ); FD_LOG_INFO(( "pack_became_leader(slot=%lu,ends_at=%ld)", ctx->leader_slot, became_leader->slot_end_ns )); @@ -704,6 +733,7 @@ after_frag( fd_pack_ctx_t * ctx, ctx->slot_end_ns = ctx->_slot_end_ns; fd_pack_set_block_limits( ctx->pack, ctx->slot_max_microblocks, ctx->slot_max_data ); + ctx->schedule_next = fd_pack_pacing_next( ctx->pacer, fd_pack_current_block_cost( ctx->pack ), now ); break; } case IN_KIND_BUNDLE: { @@ -716,6 +746,7 @@ after_frag( fd_pack_ctx_t * ctx, fd_pack_rebate_cus( ctx->pack, ctx->pending_rebate, ctx->pending_rebate_cnt ); ctx->pending_rebate_cnt = 0UL; + ctx->schedule_next = fd_pack_pacing_next( ctx->pacer, fd_pack_current_block_cost( ctx->pack ), now ); break; } case IN_KIND_RESOLV: { @@ -756,6 +787,8 @@ unprivileged_init( fd_topo_t * topo, .max_microblocks_per_block = (ulong)UINT_MAX, /* Limit not known yet */ }}; + if( FD_UNLIKELY( tile->pack.max_pending_transactions >= USHORT_MAX ) ) FD_LOG_ERR(( "pack tile supports up to %lu pending transactions", USHORT_MAX-1UL )); + ulong pack_footprint = fd_pack_footprint( tile->pack.max_pending_transactions, tile->pack.bank_tile_count, limits ); FD_SCRATCH_ALLOC_INIT( l, scratch ); @@ -800,7 +833,11 @@ unprivileged_init( fd_topo_t * topo, ctx->slot_max_microblocks = 0UL; ctx->slot_max_data = 0UL; ctx->larger_shred_limits_per_block = tile->pack.larger_shred_limits_per_block; + ctx->slot_max_cost = limits->max_cost_per_block; + ctx->drain_banks = 0; + ctx->approx_wallclock_ns = fd_log_wallclock(); ctx->rng = rng; + ctx->ticks_per_ns = fd_tempo_tick_per_ns( NULL ); ctx->last_successful_insert = 0L; ctx->highest_observed_slot = 0UL; ctx->microblock_duration_ticks = (ulong)(fd_tempo_tick_per_ns( NULL )*(double)MICROBLOCK_DURATION_NS + 0.5); diff --git a/src/ballet/pack/fd_pack.c b/src/ballet/pack/fd_pack.c index 9e021d3dd2..8b8520e0ef 100644 --- a/src/ballet/pack/fd_pack.c +++ b/src/ballet/pack/fd_pack.c @@ -1611,7 +1611,8 @@ fd_pack_schedule_next_microblock( fd_pack_t * pack, return scheduled; } -ulong fd_pack_bank_tile_cnt( fd_pack_t const * pack ) { return pack->bank_tile_cnt; } +ulong fd_pack_bank_tile_cnt ( fd_pack_t const * pack ) { return pack->bank_tile_cnt; } +ulong fd_pack_current_block_cost( fd_pack_t const * pack ) { return pack->cumulative_block_cost; } void diff --git a/src/ballet/pack/fd_pack.h b/src/ballet/pack/fd_pack.h index 386866cf9e..49eb29149b 100644 --- a/src/ballet/pack/fd_pack.h +++ b/src/ballet/pack/fd_pack.h @@ -161,6 +161,14 @@ fd_pack_avail_txn_cnt( fd_pack_t const * pack ) { return *((ulong const *)((uchar const *)pack + FD_PACK_PENDING_TXN_CNT_OFF)); } +/* fd_pack_current_block_cost returns the number of CUs that have been + scheduled in the current block, net of any rebates. It should be + between 0 and the specified value of max_cost_per_block, but it can + be slightly higher due to temporary cost model nonsense. Due to + rebates, this number may decrease as the block progresses. pack must + be a valid local join. */ +FD_FN_PURE ulong fd_pack_current_block_cost( fd_pack_t const * pack ); + /* fd_pack_bank_tile_cnt: returns the value of bank_tile_cnt provided in pack when the pack object was initialized with fd_pack_new. pack must be a valid local join. The result will be in [1, diff --git a/src/ballet/pack/fd_pack_pacing.h b/src/ballet/pack/fd_pack_pacing.h new file mode 100644 index 0000000000..c7ea43a2a2 --- /dev/null +++ b/src/ballet/pack/fd_pack_pacing.h @@ -0,0 +1,78 @@ +#ifndef HEADER_fd_src_ballet_pack_fd_pack_pacing_h +#define HEADER_fd_src_ballet_pack_fd_pack_pacing_h + +/* One of the keys to packing well is properly pacing CU consumption. + Without pacing, pack will end up filling the block with non-ideal + transactions. Since at the current limits, the banks can execute a + block worth of CUs in a fraction of the block time, without pacing, + any lucrative transactions that arrive at towards the end of a block + will have to be delayed until the next block (or another leader's + block if it's the last in our rotation). */ + + +struct fd_pack_pacing_private { + /* Start and end time of block in ticks */ + long t_start; + long t_end; + /* Number of CUs in the block */ + ulong max_cus; + + ulong home_stretch_cutoff; /* in CUs, where the slope switches */ + float raw_slope; /* in ticks per CU */ + float offset; /* in ticks */ +}; + +typedef struct fd_pack_pacing_private fd_pack_pacing_t; + + +/* fd_pack_pacing_init begins pacing for a slot which starts at now and + ends at t_end (both measured in fd_tickcount() space) and will + contain cus CUs. cus in (0, 2^32). t_end - t_start should be about + 400ms or less, but must be in (0, 2^32) as well. */ +static inline void +fd_pack_pacing_init( fd_pack_pacing_t * pacer, + long t_start, + long t_end, + ulong max_cus ) { + /* The exact style of pacing needs to be the subject of quantitative + experimentation, so for now we're just doing something that passes + the gut check. We'll pace for 90% of the CUs through the first 75% + of the block time, and then the last 10% through the last 25% of the + block time. This gives us pretty good tolerance against transactions + taking longer to execute than we expect (the extreme of which being + transactions that fail to land). */ + + pacer->t_start = t_start; + pacer->t_end = t_end; + pacer->max_cus = max_cus; + + pacer->raw_slope = (float)(t_end - t_start)/(float)max_cus; + pacer->offset = 1.5f * (float)(t_end - t_start); /* the math works out to be 1.5x */ + pacer->home_stretch_cutoff = (max_cus*9UL + 4UL)/10UL; +} + +/* fd_pack_pacing_next returns the time (in fd_tickcount() space) at + which the next attempt should be made to schedule transactions. + + The returned value will typically be between t_start and t_end, but + may be slightly out of range due to rounding or if consumed_cus is + larger than the max cu value provided in fd_pack_pacing_init. + consumed_cus need not increase monotonically between calls. + + now should be the time at which the consumed_cus was measured. It's + not used right now, but is provided to allow for more sophisticated + implementations in the future. + + fd_pack_pacing_init must have been called prior to the first call of + fd_pack_pacing_next. */ +static inline long +fd_pack_pacing_next( fd_pack_pacing_t * pacer, + ulong consumed_cus, + long now ) { + (void)now; + int non_home_stretch = consumed_cus < pacer->home_stretch_cutoff; + return pacer->t_start + (long)( (float)consumed_cus * pacer->raw_slope * fd_float_if( non_home_stretch, 0.75f/0.9f, 0.25f/0.1f ) + - fd_float_if( non_home_stretch, 0.0f, pacer->offset )); +} + +#endif /* HEADER_fd_src_ballet_pack_fd_pack_pacing_h */