Skip to content

Commit

Permalink
pack: pace CUs through block
Browse files Browse the repository at this point in the history
  • Loading branch information
ptaffet-jump committed Dec 5, 2024
1 parent df22a40 commit d63510f
Show file tree
Hide file tree
Showing 4 changed files with 128 additions and 4 deletions.
43 changes: 40 additions & 3 deletions src/app/fdctl/run/tiles/fd_pack.c
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include "../../../../disco/shred/fd_shredder.h"
#include "../../../../disco/metrics/fd_metrics.h"
#include "../../../../ballet/pack/fd_pack.h"
#include "../../../../ballet/pack/fd_pack_pacing.h"

#include <linux/unistd.h>

Expand Down Expand Up @@ -137,6 +138,10 @@ typedef struct {
ulong slot_max_data;
int larger_shred_limits_per_block;

/* Cost limit (in cost units) for each block. Typically
FD_PACK_MAX_COST_PER_BLOCK or LARDER_MAX_COST_PER_BLOCK. */
ulong slot_max_cost;

/* If drain_banks is non-zero, then the pack tile must wait until all
banks are idle before scheduling any more microblocks. This is
primarily helpful in irregular leader transitions, e.g. while being
Expand All @@ -160,6 +165,14 @@ typedef struct {
long _slot_end_ns;
long slot_end_ns;

/* pacer and ticks_per_ns are used for pacing CUs through the slot,
i.e. deciding when to schedule a microblock given the number of CUs
that have been consumed so far. pacer is an opaque pacing object,
which is initialized when the pack tile is packing a slot.
ticks_per_ns is the cached value from tempo. */
fd_pack_pacing_t pacer[1];
double ticks_per_ns;

/* last_successful_insert stores the tickcount of the last
successful transaction insert. */
long last_successful_insert;
Expand Down Expand Up @@ -190,6 +203,7 @@ typedef struct {
int poll_cursor; /* in [0, bank_cnt), the next bank to poll */
int use_consumed_cus;
long skip_cnt;
long schedule_next; /* the tick value at which to schedule the next block for pacing purposes */
ulong * bank_current[ FD_PACK_PACK_MAX_OUT ];
ulong bank_expect[ FD_PACK_PACK_MAX_OUT ];
/* bank_ready_at[x] means don't check bank x until tickcount is at
Expand Down Expand Up @@ -346,6 +360,8 @@ after_credit( fd_pack_ctx_t * ctx,

long now = fd_tickcount();

if( FD_UNLIKELY( now<ctx->schedule_next ) ) return;

ulong bank_cnt = ctx->bank_cnt;

/* If we're using CU rebates, then we have one in for each bank in
Expand Down Expand Up @@ -484,8 +500,9 @@ after_credit( fd_pack_ctx_t * ctx,

if( FD_LIKELY( schedule_cnt ) ) {
any_scheduled = 1;
ulong tsorig = (ulong)fd_frag_meta_ts_comp( now ); /* A bound on when we observed bank was idle */
ulong tspub = (ulong)fd_frag_meta_ts_comp( fd_tickcount() );
long now2 = fd_tickcount();
ulong tsorig = (ulong)fd_frag_meta_ts_comp( now ); /* A bound on when we observed bank was idle */
ulong tspub = (ulong)fd_frag_meta_ts_comp( now2 );
ulong chunk = ctx->out_chunk;
ulong msg_sz = schedule_cnt*sizeof(fd_txn_p_t);
fd_microblock_bank_trailer_t * trailer = (fd_microblock_bank_trailer_t*)((uchar*)microblock_dst+msg_sz);
Expand All @@ -495,12 +512,13 @@ after_credit( fd_pack_ctx_t * ctx,
ulong sig = fd_disco_poh_sig( ctx->leader_slot, POH_PKT_TYPE_MICROBLOCK, (ulong)i );
fd_stem_publish( stem, 0UL, sig, chunk, msg_sz+sizeof(fd_microblock_bank_trailer_t), 0UL, tsorig, tspub );
ctx->bank_expect[ i ] = stem->seqs[0]-1UL;
ctx->bank_ready_at[i] = now + (long)ctx->microblock_duration_ticks;
ctx->bank_ready_at[i] = now2 + (long)ctx->microblock_duration_ticks;
ctx->out_chunk = fd_dcache_compact_next( ctx->out_chunk, msg_sz+sizeof(fd_microblock_bank_trailer_t), ctx->out_chunk0, ctx->out_wmark );
ctx->slot_microblock_cnt++;

ctx->bank_idle_bitset = fd_ulong_pop_lsb( ctx->bank_idle_bitset );
ctx->skip_cnt = (long)schedule_cnt * fd_long_if( ctx->use_consumed_cus, (long)bank_cnt/2L, 1L );
ctx->schedule_next = fd_pack_pacing_next( ctx->pacer, fd_pack_current_block_cost( ctx->pack ), now2 );
}
}

Expand Down Expand Up @@ -561,6 +579,9 @@ during_frag( fd_pack_ctx_t * ctx,
if( FD_UNLIKELY( chunk<ctx->in[ in_idx ].chunk0 || chunk>ctx->in[ in_idx ].wmark || sz!=sizeof(fd_became_leader_t) ) )
FD_LOG_ERR(( "chunk %lu %lu corrupt, not in range [%lu,%lu]", chunk, sz, ctx->in[ in_idx ].chunk0, ctx->in[ in_idx ].wmark ));

long now_ticks = fd_tickcount();
long now_ns = fd_log_wallclock();

if( FD_UNLIKELY( ctx->leader_slot!=ULONG_MAX ) ) {
FD_LOG_WARNING(( "switching to slot %lu while packing for slot %lu. Draining bank tiles.", fd_disco_poh_sig_slot( sig ), ctx->leader_slot ));
ctx->drain_banks = 1;
Expand All @@ -579,6 +600,14 @@ during_frag( fd_pack_ctx_t * ctx,
/* Reserve some space in the block for ticks */
ctx->slot_max_data = (ctx->larger_shred_limits_per_block ? LARGER_MAX_DATA_PER_BLOCK : FD_PACK_MAX_DATA_PER_BLOCK)
- 48UL*(became_leader->ticks_per_slot+became_leader->total_skipped_ticks);
/* ticks_per_ns is probably relatively stable over 400ms, but not
over several hours, so we need to compute the slot duration in
milliseconds first and then convert to ticks. This doesn't need
to be super accurate, but we don't want it to vary wildly. */
long end_ticks = now_ticks + (long)((double)fd_long_max( became_leader->slot_end_ns - now_ns, 1L )*ctx->ticks_per_ns);
/* We may still get overrun, but then we'll never use this and just
reinitialize it the next time when we actually become leader. */
fd_pack_pacing_init( ctx->pacer, now_ticks, end_ticks, ctx->slot_max_cost );

FD_LOG_INFO(( "pack_became_leader(slot=%lu,ends_at=%ld)", ctx->leader_slot, became_leader->slot_end_ns ));

Expand Down Expand Up @@ -704,6 +733,7 @@ after_frag( fd_pack_ctx_t * ctx,

ctx->slot_end_ns = ctx->_slot_end_ns;
fd_pack_set_block_limits( ctx->pack, ctx->slot_max_microblocks, ctx->slot_max_data );
ctx->schedule_next = fd_pack_pacing_next( ctx->pacer, fd_pack_current_block_cost( ctx->pack ), now );
break;
}
case IN_KIND_BUNDLE: {
Expand All @@ -716,6 +746,7 @@ after_frag( fd_pack_ctx_t * ctx,

fd_pack_rebate_cus( ctx->pack, ctx->pending_rebate, ctx->pending_rebate_cnt );
ctx->pending_rebate_cnt = 0UL;
ctx->schedule_next = fd_pack_pacing_next( ctx->pacer, fd_pack_current_block_cost( ctx->pack ), now );
break;
}
case IN_KIND_RESOLV: {
Expand Down Expand Up @@ -756,6 +787,8 @@ unprivileged_init( fd_topo_t * topo,
.max_microblocks_per_block = (ulong)UINT_MAX, /* Limit not known yet */
}};

if( FD_UNLIKELY( tile->pack.max_pending_transactions >= USHORT_MAX ) ) FD_LOG_ERR(( "pack tile supports up to %lu pending transactions", USHORT_MAX-1UL ));

ulong pack_footprint = fd_pack_footprint( tile->pack.max_pending_transactions, tile->pack.bank_tile_count, limits );

FD_SCRATCH_ALLOC_INIT( l, scratch );
Expand Down Expand Up @@ -800,7 +833,11 @@ unprivileged_init( fd_topo_t * topo,
ctx->slot_max_microblocks = 0UL;
ctx->slot_max_data = 0UL;
ctx->larger_shred_limits_per_block = tile->pack.larger_shred_limits_per_block;
ctx->slot_max_cost = limits->max_cost_per_block;
ctx->drain_banks = 0;
ctx->approx_wallclock_ns = fd_log_wallclock();
ctx->rng = rng;
ctx->ticks_per_ns = fd_tempo_tick_per_ns( NULL );
ctx->last_successful_insert = 0L;
ctx->highest_observed_slot = 0UL;
ctx->microblock_duration_ticks = (ulong)(fd_tempo_tick_per_ns( NULL )*(double)MICROBLOCK_DURATION_NS + 0.5);
Expand Down
3 changes: 2 additions & 1 deletion src/ballet/pack/fd_pack.c
Original file line number Diff line number Diff line change
Expand Up @@ -1611,7 +1611,8 @@ fd_pack_schedule_next_microblock( fd_pack_t * pack,
return scheduled;
}

ulong fd_pack_bank_tile_cnt( fd_pack_t const * pack ) { return pack->bank_tile_cnt; }
ulong fd_pack_bank_tile_cnt ( fd_pack_t const * pack ) { return pack->bank_tile_cnt; }
ulong fd_pack_current_block_cost( fd_pack_t const * pack ) { return pack->cumulative_block_cost; }


void
Expand Down
8 changes: 8 additions & 0 deletions src/ballet/pack/fd_pack.h
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,14 @@ fd_pack_avail_txn_cnt( fd_pack_t const * pack ) {
return *((ulong const *)((uchar const *)pack + FD_PACK_PENDING_TXN_CNT_OFF));
}

/* fd_pack_current_block_cost returns the number of CUs that have been
scheduled in the current block, net of any rebates. It should be
between 0 and the specified value of max_cost_per_block, but it can
be slightly higher due to temporary cost model nonsense. Due to
rebates, this number may decrease as the block progresses. pack must
be a valid local join. */
FD_FN_PURE ulong fd_pack_current_block_cost( fd_pack_t const * pack );

/* fd_pack_bank_tile_cnt: returns the value of bank_tile_cnt provided in
pack when the pack object was initialized with fd_pack_new. pack
must be a valid local join. The result will be in [1,
Expand Down
78 changes: 78 additions & 0 deletions src/ballet/pack/fd_pack_pacing.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
#ifndef HEADER_fd_src_ballet_pack_fd_pack_pacing_h
#define HEADER_fd_src_ballet_pack_fd_pack_pacing_h

/* One of the keys to packing well is properly pacing CU consumption.
Without pacing, pack will end up filling the block with non-ideal
transactions. Since at the current limits, the banks can execute a
block worth of CUs in a fraction of the block time, without pacing,
any lucrative transactions that arrive at towards the end of a block
will have to be delayed until the next block (or another leader's
block if it's the last in our rotation). */


struct fd_pack_pacing_private {
/* Start and end time of block in ticks */
long t_start;
long t_end;
/* Number of CUs in the block */
ulong max_cus;

ulong home_stretch_cutoff; /* in CUs, where the slope switches */
float raw_slope; /* in ticks per CU */
float offset; /* in ticks */
};

typedef struct fd_pack_pacing_private fd_pack_pacing_t;


/* fd_pack_pacing_init begins pacing for a slot which starts at now and
ends at t_end (both measured in fd_tickcount() space) and will
contain cus CUs. cus in (0, 2^32). t_end - t_start should be about
400ms or less, but must be in (0, 2^32) as well. */
static inline void
fd_pack_pacing_init( fd_pack_pacing_t * pacer,
long t_start,
long t_end,
ulong max_cus ) {
/* The exact style of pacing needs to be the subject of quantitative
experimentation, so for now we're just doing something that passes
the gut check. We'll pace for 90% of the CUs through the first 75%
of the block time, and then the last 10% through the last 25% of the
block time. This gives us pretty good tolerance against transactions
taking longer to execute than we expect (the extreme of which being
transactions that fail to land). */

pacer->t_start = t_start;
pacer->t_end = t_end;
pacer->max_cus = max_cus;

pacer->raw_slope = (float)(t_end - t_start)/(float)max_cus;
pacer->offset = 1.5f * (float)(t_end - t_start); /* the math works out to be 1.5x */
pacer->home_stretch_cutoff = (max_cus*9UL + 4UL)/10UL;
}

/* fd_pack_pacing_next returns the time (in fd_tickcount() space) at
which the next attempt should be made to schedule transactions.
The returned value will typically be between t_start and t_end, but
may be slightly out of range due to rounding or if consumed_cus is
larger than the max cu value provided in fd_pack_pacing_init.
consumed_cus need not increase monotonically between calls.
now should be the time at which the consumed_cus was measured. It's
not used right now, but is provided to allow for more sophisticated
implementations in the future.
fd_pack_pacing_init must have been called prior to the first call of
fd_pack_pacing_next. */
static inline long
fd_pack_pacing_next( fd_pack_pacing_t * pacer,
ulong consumed_cus,
long now ) {
(void)now;
int non_home_stretch = consumed_cus < pacer->home_stretch_cutoff;
return pacer->t_start + (long)( (float)consumed_cus * pacer->raw_slope * fd_float_if( non_home_stretch, 0.75f/0.9f, 0.25f/0.1f )
- fd_float_if( non_home_stretch, 0.0f, pacer->offset ));
}

#endif /* HEADER_fd_src_ballet_pack_fd_pack_pacing_h */

0 comments on commit d63510f

Please sign in to comment.