Skip to content

Commit

Permalink
Starting integration of gui tile with milestone 3 release
Browse files Browse the repository at this point in the history
  • Loading branch information
asiegel-jt committed Dec 9, 2024
1 parent e6f511d commit a805f5b
Show file tree
Hide file tree
Showing 6 changed files with 276 additions and 17 deletions.
76 changes: 73 additions & 3 deletions src/app/fdctl/run/tiles/fd_gossip.c
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "../../../../util/net/fd_ip4.h"
#include "../../../../util/net/fd_udp.h"
#include "../../../../util/net/fd_net_headers.h"
#include "../../../../disco/plugin/fd_plugin.h"

#include "generated/gossip_seccomp.h"

Expand All @@ -39,9 +40,10 @@
#define SIGN_OUT_IDX 4
#define VOTER_OUT_IDX 5
#define REPLAY_OUT_IDX 6
#define EQVOC_OUT_IDX 7
#define PLUGIN_OUT_IDX 7

#define CONTACT_INFO_PUBLISH_TIME_NS ((long)5e9)
#define PLUGIN_PUBLISH_TIME_NS ((long)60e9)

/* Scratch space is used for deserializing a gossip message.
TODO: update */
Expand All @@ -65,6 +67,7 @@ struct fd_gossip_tile_ctx {
fd_gossip_t * gossip;
fd_gossip_config_t gossip_config;
long last_shred_dest_push_time;
long last_plugin_push_time;

ulong gossip_seed;

Expand Down Expand Up @@ -161,6 +164,13 @@ struct fd_gossip_tile_ctx {
ulong net_out_wmark;
ulong net_out_chunk;

// Inputs to plugin/gui

fd_wksp_t * gossip_plugin_out_mem;
ulong gossip_plugin_out_chunk0;
ulong gossip_plugin_out_wmark;
ulong gossip_plugin_out_chunk;

uchar identity_private_key[32];
fd_pubkey_t identity_public_key;

Expand Down Expand Up @@ -523,6 +533,48 @@ after_frag( fd_gossip_tile_ctx_t * ctx,
fd_gossip_recv_packet( ctx->gossip, ctx->gossip_buffer + hdr_sz, ctx->gossip_buffer_sz - hdr_sz, &peer_addr );
}

static void
publish_peers_to_plugin( fd_gossip_tile_ctx_t * ctx,
fd_stem_context_t * stem ) {
static const ulong FIREDANCER_CLUSTER_NODE_CNT = 200*201 - 1;
uchar * dst = (uchar *)fd_chunk_to_laddr( ctx->gossip_plugin_out_mem, ctx->gossip_plugin_out_chunk );

ulong i = 0;
for( fd_contact_info_table_iter_t iter = fd_contact_info_table_iter_init( ctx->contact_info_table );
!fd_contact_info_table_iter_done( ctx->contact_info_table, iter ) && i < FIREDANCER_CLUSTER_NODE_CNT;
iter = fd_contact_info_table_iter_next( ctx->contact_info_table, iter ), ++i ) {
fd_contact_info_elem_t const * ele = fd_contact_info_table_iter_ele_const( ctx->contact_info_table, iter );
fd_gossip_update_msg_t * msg = (fd_gossip_update_msg_t *)(dst + sizeof(ulong) + i*sizeof(fd_gossip_update_msg_t));
memcpy( msg->pubkey, ele->contact_info.id.key, sizeof(fd_pubkey_t) );
msg->wallclock = ele->contact_info.wallclock;
msg->shred_version = ele->contact_info.shred_version;
#define COPY_ADDR( _idx_, _srcname_ ) \
if( ele->contact_info._srcname_.discriminant == fd_gossip_socket_addr_enum_ip4 ) { \
msg->addrs[ _idx_ ].ip = ele->contact_info._srcname_.inner.ip4.addr; \
msg->addrs[ _idx_ ].port = ele->contact_info._srcname_.inner.ip4.port; \
}
COPY_ADDR(0, gossip);
COPY_ADDR(1, rpc);
COPY_ADDR(2, rpc_pubsub);
COPY_ADDR(3, repair);
// COPY_ADDR(4, serve_repair_socket_quic); FIX THESE CASES
// COPY_ADDR(5, tpu_socket_udp);
// COPY_ADDR(6, tpu_socket_quic);
// COPY_ADDR(7, tvu_socket_udp);
// COPY_ADDR(8, tvu_socket_quic);
// COPY_ADDR(9, tpu_forwards_socket_udp);
// COPY_ADDR(10, tpu_forwards_socket_quic);
COPY_ADDR(11, tpu_vote);
}

*(ulong *)dst = i;
ulong data_sz = i*sizeof(fd_gossip_update_msg_t);

ulong tspub = (ulong)fd_frag_meta_ts_comp( fd_tickcount() );
fd_stem_publish( stem, PLUGIN_OUT_IDX, FD_PLUGIN_MSG_GOSSIP_UPDATE, ctx->gossip_plugin_out_chunk, data_sz, 0UL, 0UL, tspub );
ctx->gossip_plugin_out_chunk = fd_dcache_compact_next( ctx->gossip_plugin_out_chunk, data_sz, ctx->gossip_plugin_out_chunk0, ctx->gossip_plugin_out_wmark );
}

static void
after_credit( fd_gossip_tile_ctx_t * ctx,
fd_stem_context_t * stem,
Expand Down Expand Up @@ -645,6 +697,11 @@ after_credit( fd_gossip_tile_ctx_t * ctx,
}
}

if( ctx->gossip_plugin_out_mem && FD_UNLIKELY( ( now - ctx->last_plugin_push_time )>PLUGIN_PUBLISH_TIME_NS ) ) {
ctx->last_plugin_push_time = now;
publish_peers_to_plugin( ctx, stem );
}

if( FD_UNLIKELY(( ctx->restart_last_vote_msg_sz!=0 )&&( ctx->restart_last_vote_push_time+LAST_VOTED_FORK_PUBLISH_PERIOD_NS<now )) ) {
ctx->restart_last_vote_push_time = now;
fd_crds_data_t restart_last_vote_msg;
Expand Down Expand Up @@ -699,15 +756,15 @@ unprivileged_init( fd_topo_t * topo,
tile->in_cnt, topo->links[ tile->in_link_id[ 0 ] ].name, topo->links[ tile->in_link_id[ 1 ] ].name ));
}

if( FD_UNLIKELY( tile->out_cnt != 7UL ||
if( FD_UNLIKELY( tile->out_cnt < 7UL ||
strcmp( topo->links[ tile->out_link_id[ NET_OUT_IDX ] ].name, "gossip_net" ) ||
strcmp( topo->links[ tile->out_link_id[ SHRED_OUT_IDX ] ].name, "crds_shred" ) ||
strcmp( topo->links[ tile->out_link_id[ REPAIR_OUT_IDX ] ].name, "gossip_repai" ) ||
strcmp( topo->links[ tile->out_link_id[ DEDUP_OUT_IDX ] ].name, "gossip_dedup" ) ||
strcmp( topo->links[ tile->out_link_id[ SIGN_OUT_IDX ] ].name, "gossip_sign" ) ||
strcmp( topo->links[ tile->out_link_id[ VOTER_OUT_IDX ] ].name, "gossip_voter" ) ||
strcmp( topo->links[ tile->out_link_id[ REPLAY_OUT_IDX ] ].name, "gossip_repla" ) ) ) {
FD_LOG_ERR(( "gossip tile has none or unexpected output links %lu %s %s",
FD_LOG_ERR(( "gossip tile has missing output links %lu %s %s",
tile->out_cnt, topo->links[ tile->out_link_id[ 0 ] ].name, topo->links[ tile->out_link_id[ 1 ] ].name ));
}

Expand Down Expand Up @@ -888,6 +945,19 @@ unprivileged_init( fd_topo_t * topo,
ctx->replay_out_wmark = fd_dcache_compact_wmark ( ctx->replay_out_mem, replay_out->dcache, replay_out->mtu );
ctx->replay_out_chunk = ctx->replay_out_chunk0;

if( FD_LIKELY( tile->gossip.plugins_enabled ) ) {
if( FD_UNLIKELY( tile->out_cnt < 8UL ||
strcmp( topo->links[ tile->out_link_id[ PLUGIN_OUT_IDX ] ].name, "gossip_plugi" ) ) ) {
FD_LOG_ERR(( "gossip tile has missing output links %lu", tile->out_cnt ));
}

fd_topo_link_t * gossip_plugin_out = &topo->links[ tile->out_link_id[ PLUGIN_OUT_IDX ] ];
ctx->gossip_plugin_out_mem = topo->workspaces[ topo->objs[ gossip_plugin_out->dcache_obj_id ].wksp_id ].wksp;
ctx->gossip_plugin_out_chunk0 = fd_dcache_compact_chunk0( ctx->gossip_plugin_out_mem, gossip_plugin_out->dcache );
ctx->gossip_plugin_out_wmark = fd_dcache_compact_wmark ( ctx->gossip_plugin_out_mem, gossip_plugin_out->dcache, gossip_plugin_out->mtu );
ctx->gossip_plugin_out_chunk = ctx->gossip_plugin_out_chunk0;
}

ulong scratch_top = FD_SCRATCH_ALLOC_FINI( l, 1UL );
if( FD_UNLIKELY( scratch_top>( (ulong)scratch + scratch_footprint( tile ) ) ) )
FD_LOG_ERR(( "scratch overflow %lu %lu %lu", scratch_top - (ulong)scratch - scratch_footprint( tile ), scratch_top, (ulong)scratch + scratch_footprint( tile ) ));
Expand Down
2 changes: 1 addition & 1 deletion src/app/fdctl/run/tiles/fd_plugin.c
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ after_frag( fd_plugin_ctx_t * ctx,
switch( in_idx ) {
/* replay_plugin */
case 0UL: {
FD_TEST( sig==FD_PLUGIN_MSG_SLOT_ROOTED || sig==FD_PLUGIN_MSG_SLOT_OPTIMISTICALLY_CONFIRMED || sig==FD_PLUGIN_MSG_SLOT_COMPLETED || sig==FD_PLUGIN_MSG_SLOT_RESET );
FD_TEST( sig==FD_PLUGIN_MSG_SLOT_ROOTED || sig==FD_PLUGIN_MSG_SLOT_OPTIMISTICALLY_CONFIRMED || sig==FD_PLUGIN_MSG_SLOT_COMPLETED || sig==FD_PLUGIN_MSG_SLOT_RESET || sig==FD_PLUGIN_MSG_START_PROGRESS );
break;
}
/* gossip_plugin */
Expand Down
131 changes: 118 additions & 13 deletions src/app/fdctl/run/tiles/fd_replay.c
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include "../../../../choreo/fd_choreo.h"
#include "../../../../disco/store/fd_epoch_forks.h"
#include "../../../../funk/fd_funk_filemap.h"
#include "../../../../disco/plugin/fd_plugin.h"

#include <arpa/inet.h>
#include <errno.h>
Expand All @@ -45,9 +46,6 @@
#include <sys/types.h>
#include <unistd.h>


// #define STOP_SLOT 280859632

/* An estimate of the max number of transactions in a block. If there are more
transactions, they must be split into multiple sets. */
#define MAX_TXNS_PER_REPLAY ( ( FD_SHRED_MAX_PER_SLOT * FD_SHRED_MAX_SZ) / FD_TXN_MIN_SERIALIZED_SZ )
Expand All @@ -63,6 +61,7 @@
#define GOSSIP_OUT_IDX (3UL)
#define STORE_OUT_IDX (4UL)
#define POH_OUT_IDX (5UL)
#define REPLAY_PLUG_OUT_IDX (6UL)

/* Scratch space estimates.
TODO: Update constants and add explanation
Expand Down Expand Up @@ -165,6 +164,13 @@ struct fd_replay_tile_ctx {
ulong stake_weights_out_wmark;
ulong stake_weights_out_chunk;

// Inputs to plugin/gui

fd_wksp_t * replay_plugin_out_mem;
ulong replay_plugin_out_chunk0;
ulong replay_plugin_out_wmark;
ulong replay_plugin_out_chunk;

char const * blockstore_checkpt;
int tx_metadata_storage;
char const * funk_checkpt;
Expand Down Expand Up @@ -650,8 +656,22 @@ publish_account_notifications( fd_replay_tile_ctx_t * ctx,
FD_LOG_DEBUG(("TIMING: notify_account_time - slot: %lu, elapsed: %6.6f ms", curr_slot, (double)notify_time_ns * 1e-6));
}

static void
replay_plugin_publish( fd_replay_tile_ctx_t * ctx,
fd_stem_context_t * stem,
ulong sig,
uchar const * data,
ulong data_sz ) {
uchar * dst = (uchar *)fd_chunk_to_laddr( ctx->replay_plugin_out_mem, ctx->replay_plugin_out_chunk );
fd_memcpy( dst, data, data_sz );
ulong tspub = (ulong)fd_frag_meta_ts_comp( fd_tickcount() );
fd_stem_publish( stem, REPLAY_PLUG_OUT_IDX, sig, ctx->replay_plugin_out_chunk, data_sz, 0UL, 0UL, tspub );
ctx->replay_plugin_out_chunk = fd_dcache_compact_next( ctx->replay_plugin_out_chunk, data_sz, ctx->replay_plugin_out_chunk0, ctx->replay_plugin_out_wmark );
}

static void
publish_slot_notifications( fd_replay_tile_ctx_t * ctx,
fd_stem_context_t * stem,
fd_fork_t * fork,
fd_block_map_t const * block_map_entry,
ulong curr_slot ) {
Expand Down Expand Up @@ -686,6 +706,21 @@ publish_slot_notifications( fd_replay_tile_ctx_t * ctx,
#undef NOTIFY_END
notify_time_ns += fd_log_wallclock();
FD_LOG_DEBUG(("TIMING: notify_slot_time - slot: %lu, elapsed: %6.6f ms", curr_slot, (double)notify_time_ns * 1e-6));

if( ctx->replay_plugin_out_mem ) {
fd_replay_complete_msg_t msg2 = {
.slot = curr_slot,
.total_txn_count = fork->slot_ctx.slot_bank.transaction_count - fork->slot_ctx.parent_transaction_count,
.nonvote_txn_count = 0,
.failed_txn_count = 0,
.nonvote_failed_txn_count = 0,
.compute_units = 0,
.transaction_fee = fork->slot_ctx.slot_bank.collected_execution_fees,
.priority_fee = fork->slot_ctx.slot_bank.collected_priority_fees,
.parent_slot = ctx->parent_slot,
};
replay_plugin_publish( ctx, stem, FD_PLUGIN_MSG_SLOT_COMPLETED, (uchar const *)&msg2, sizeof(msg2) );
}
}

static void
Expand Down Expand Up @@ -1021,7 +1056,7 @@ after_frag( fd_replay_tile_ctx_t * ctx,
}

// Notify for updated slot info
publish_slot_notifications( ctx, fork, block_map_entry, curr_slot );
publish_slot_notifications( ctx, stem, fork, block_map_entry, curr_slot );

fd_blockstore_start_write( ctx->blockstore );

Expand Down Expand Up @@ -1060,6 +1095,35 @@ after_frag( fd_replay_tile_ctx_t * ctx,
reset_fork = new_reset_fork;
}

/* Update the gui */

if( ctx->replay_plugin_out_mem ) {
/* FIXME. We need a more efficient way to compute the ancestor chain. */
uchar msg[4098*8] __attribute__( ( aligned( 8U ) ) );
fd_memset( msg, 0, sizeof(msg) );
fd_blockstore_start_read( ctx->blockstore );
ulong s = reset_fork->slot_ctx.slot_bank.slot;
*(ulong*)(msg + 16U) = s;
ulong i = 0;
do {
block_map_entry = fd_blockstore_block_map_query( ctx->blockstore, s );
if( block_map_entry == NULL ) {
break;
}
s = block_map_entry->parent_slot;
if( s < ctx->blockstore->smr ) {
break;
}
*(ulong*)(msg + 24U + i*8U) = s;
if( ++i == 4095U ) {
break;
}
} while( 1 );
*(ulong*)(msg + 8U) = i;
fd_blockstore_end_read( ctx->blockstore );
replay_plugin_publish( ctx, stem, FD_PLUGIN_MSG_SLOT_RESET, msg, sizeof(msg) );
}

fd_microblock_trailer_t * microblock_trailer = (fd_microblock_trailer_t *)(txns + txn_cnt);
memcpy( microblock_trailer->hash, reset_fork->slot_ctx.slot_bank.block_hash_queue.last_hash->uc, sizeof(fd_hash_t) );
if( ctx->poh_init_done == 1 ) {
Expand Down Expand Up @@ -1262,9 +1326,21 @@ tpool_boot( fd_topo_t * topo, ulong total_thread_count ) {
}

static void
read_snapshot( void * _ctx, char const * snapshotfile, char const * incremental ) {
read_snapshot( void * _ctx,
fd_stem_context_t * stem,
char const * snapshotfile,
char const * incremental ) {
fd_replay_tile_ctx_t * ctx = (fd_replay_tile_ctx_t *)_ctx;

if( ctx->replay_plugin_out_mem ) {
// ValidatorStartProgress::DownloadingSnapshot
uchar msg[56];
fd_memset( msg, 0, sizeof(msg) );
msg[0] = 2;
msg[1] = 1;
replay_plugin_publish( ctx, stem, FD_PLUGIN_MSG_START_PROGRESS, msg, sizeof(msg) );
}

/* Pass the slot_ctx to snapshot_load or recover_banks */

const char * snapshot = snapshotfile;
Expand All @@ -1277,10 +1353,27 @@ read_snapshot( void * _ctx, char const * snapshotfile, char const * incremental

/* Load incremental */

if( ctx->replay_plugin_out_mem ) {
// ValidatorStartProgress::DownloadingSnapshot
uchar msg[56];
fd_memset( msg, 0, sizeof(msg) );
msg[0] = 2;
msg[1] = 0;
replay_plugin_publish( ctx, stem, FD_PLUGIN_MSG_START_PROGRESS, msg, sizeof(msg) );
}

if( strlen( incremental ) > 0 ) {
fd_snapshot_load( incremental, ctx->slot_ctx, ctx->tpool, false, false, FD_SNAPSHOT_TYPE_INCREMENTAL );
}

if( ctx->replay_plugin_out_mem ) {
// ValidatorStartProgress::DownloadedFullSnapshot
uchar msg[56];
fd_memset( msg, 0, sizeof(msg) );
msg[0] = 3;
replay_plugin_publish( ctx, stem, FD_PLUGIN_MSG_START_PROGRESS, msg, sizeof(msg) );
}

fd_runtime_update_leaders( ctx->slot_ctx, ctx->slot_ctx->slot_bank.slot );
FD_LOG_NOTICE(( "starting fd_bpf_scan_and_create_bpf_program_cache_entry..." ));
fd_funk_start_write( ctx->slot_ctx->acc_mgr->funk );
Expand Down Expand Up @@ -1391,7 +1484,7 @@ init_snapshot( fd_replay_tile_ctx_t * ctx,
FD_SCRATCH_SCOPE_BEGIN {
uchar is_snapshot = strlen( ctx->snapshot ) > 0;
if( is_snapshot ) {
read_snapshot( ctx, ctx->snapshot, ctx->incremental );
read_snapshot( ctx, stem, ctx->snapshot, ctx->incremental );
}

fd_runtime_read_genesis( ctx->slot_ctx, ctx->genesis, is_snapshot, ctx->capture_ctx, ctx->tpool );
Expand Down Expand Up @@ -1474,6 +1567,14 @@ after_credit( fd_replay_tile_ctx_t * ctx,
} FD_SCRATCH_SCOPE_END;
}
}

if( ctx->replay_plugin_out_mem ) {
// ValidatorStartProgress::Running
uchar msg[56];
fd_memset( msg, 0, sizeof(msg) );
msg[0] = 11;
replay_plugin_publish( ctx, stem, FD_PLUGIN_MSG_START_PROGRESS, msg, sizeof(msg) );
}
}

if( FD_UNLIKELY( ctx->in_wen_restart ) ) {
Expand All @@ -1493,7 +1594,6 @@ after_credit( fd_replay_tile_ctx_t * ctx,
}
}


static void
during_housekeeping( void * _ctx ) {
fd_replay_tile_ctx_t * ctx = (fd_replay_tile_ctx_t *)_ctx;
Expand Down Expand Up @@ -1847,12 +1947,6 @@ unprivileged_init( fd_topo_t * topo,
poh_out->chunk = poh_out->chunk0;
}

// ulong busy_obj_id = fd_pod_queryf_ulong( topo->props, ULONG_MAX, "first_turbine" );
// FD_TEST( busy_obj_id != ULONG_MAX );
// ctx->first_turbine = fd_fseq_join( fd_topo_obj_laddr( topo, busy_obj_id ) );
// if( FD_UNLIKELY( !ctx->first_turbine ) )
// FD_LOG_ERR( ( "replay tile %lu has no busy flag", tile->kind_id ) );

ctx->poh_init_done = 0U;
ctx->snapshot_init_done = 0;

Expand Down Expand Up @@ -1949,6 +2043,17 @@ unprivileged_init( fd_topo_t * topo,
ctx->stake_weights_out_wmark = fd_dcache_compact_wmark ( ctx->stake_weights_out_mem, stake_weights_out->dcache, stake_weights_out->mtu );
ctx->stake_weights_out_chunk = ctx->stake_weights_out_chunk0;

if( FD_LIKELY( tile->replay.plugins_enabled ) ) {
fd_topo_link_t const * replay_plugin_out = &topo->links[ tile->out_link_id[ REPLAY_PLUG_OUT_IDX] ];
if( strcmp( replay_plugin_out->name, "replay_plugi" ) ) {
FD_LOG_ERR(("output link confusion for output %lu", REPLAY_PLUG_OUT_IDX));
}
ctx->replay_plugin_out_mem = topo->workspaces[ topo->objs[ replay_plugin_out->dcache_obj_id ].wksp_id ].wksp;
ctx->replay_plugin_out_chunk0 = fd_dcache_compact_chunk0( ctx->replay_plugin_out_mem, replay_plugin_out->dcache );
ctx->replay_plugin_out_wmark = fd_dcache_compact_wmark ( ctx->replay_plugin_out_mem, replay_plugin_out->dcache, replay_plugin_out->mtu );
ctx->replay_plugin_out_chunk = ctx->replay_plugin_out_chunk0;
}

if( strnlen( tile->replay.slots_replayed, sizeof(tile->replay.slots_replayed) )>0UL ) {
ctx->slots_replayed_file = fopen( tile->replay.slots_replayed, "w" );
FD_TEST( ctx->slots_replayed_file );
Expand Down
Loading

0 comments on commit a805f5b

Please sign in to comment.