Skip to content

Commit

Permalink
Add two fields to fd_topo_run_tile_t:
Browse files Browse the repository at this point in the history
  int for_tpool;
      when set this a call to run_tile_thread() will not spawn a thread so the tpool can handle the thread spawning later
  int   (*main                    )( void );
      When not NULL this function is called instead of fd_mux_tile().
  • Loading branch information
llamb-jump committed May 6, 2024
1 parent fb08b95 commit 9d75004
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 18 deletions.
2 changes: 2 additions & 0 deletions src/app/fddev/main1.c
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ extern fd_topo_run_tile_t fd_tile_bencho;
extern fd_topo_run_tile_t fd_tile_benchg;
extern fd_topo_run_tile_t fd_tile_benchs;

fd_topo_run_tile_t fd_tile_tvu_thread = { .name = "thread", .for_tpool = 1 };

fd_topo_run_tile_t * TILES[] = {
&fd_tile_net,
&fd_tile_netmux,
Expand Down
2 changes: 2 additions & 0 deletions src/disco/topo/fd_topo.h
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,7 @@ typedef struct {
ulong mux_flags;
ulong burst;
ulong rlimit_file_cnt;
int for_tpool;
void * (*mux_ctx )( void * scratch );

fd_mux_during_housekeeping_fn * mux_during_housekeeping;
Expand All @@ -274,6 +275,7 @@ typedef struct {
ulong (*loose_footprint )( fd_topo_tile_t const * tile );
void (*privileged_init )( fd_topo_t * topo, fd_topo_tile_t * tile, void * scratch );
void (*unprivileged_init )( fd_topo_t * topo, fd_topo_tile_t * tile, void * scratch );
int (*main )( void );
} fd_topo_run_tile_t;

FD_PROTOTYPES_BEGIN
Expand Down
43 changes: 25 additions & 18 deletions src/disco/topo/fd_topo_run.c
Original file line number Diff line number Diff line change
Expand Up @@ -161,22 +161,27 @@ fd_topo_run_tile( fd_topo_t * topo,
if( FD_UNLIKELY( tile_run->lazy ) ) lazy = tile_run->lazy( tile_mem );

fd_rng_t rng[1];
fd_mux_tile( tile->cnc,
tile_run->mux_flags,
polled_in_cnt,
in_mcache,
in_fseq,
tile->out_link_id_primary == ULONG_MAX ? NULL : topo->links[ tile->out_link_id_primary ].mcache,
out_cnt_reliable,
out_fseq,
tile_run->burst,
0,
lazy,
fd_rng_join( fd_rng_new( rng, 0, 0UL ) ),
fd_alloca( FD_MUX_TILE_SCRATCH_ALIGN, FD_MUX_TILE_SCRATCH_FOOTPRINT( polled_in_cnt, out_cnt_reliable ) ),
ctx,
&callbacks );
FD_LOG_ERR(( "tile run loop returned" ));
int ret = 0;
if( FD_LIKELY( tile_run->main == NULL ) ) {
ret = fd_mux_tile( tile->cnc,
tile_run->mux_flags,
polled_in_cnt,
in_mcache,
in_fseq,
tile->out_link_id_primary == ULONG_MAX ? NULL : topo->links[ tile->out_link_id_primary ].mcache,
out_cnt_reliable,
out_fseq,
tile_run->burst,
0,
lazy,
fd_rng_join( fd_rng_new( rng, 0, 0UL ) ),
fd_alloca( FD_MUX_TILE_SCRATCH_ALIGN, FD_MUX_TILE_SCRATCH_FOOTPRINT( polled_in_cnt, out_cnt_reliable ) ),
ctx,
&callbacks );
} else {
ret = tile_run->main();
}
FD_LOG_ERR(( "tile run loop returned: %d", ret ));
}

typedef struct {
Expand Down Expand Up @@ -279,7 +284,7 @@ fd_topo_tile_stack_new( int optimize,
return stack;
}

static inline pthread_t
static inline void
run_tile_thread( fd_topo_t * topo,
fd_topo_tile_t * tile,
fd_topo_run_tile_t tile_run,
Expand All @@ -288,6 +293,9 @@ run_tile_thread( fd_topo_t * topo,
int * done_futex,
fd_cpuset_t const * floating_cpu_set,
int floating_priority ) {
/* tpool will assign a thread later */
if( FD_UNLIKELY( tile_run.for_tpool ) ) return;

/* TODO: Use a better CPU idx for the stack if tile is floating */
ulong stack_cpu_idx = 0UL;
if( FD_LIKELY( tile->cpu_idx<65535UL ) ) stack_cpu_idx = tile->cpu_idx;
Expand Down Expand Up @@ -333,7 +341,6 @@ run_tile_thread( fd_topo_t * topo,
if( FD_UNLIKELY( pthread_create( &pthread, attr, run_tile_thread_main, &args ) ) ) FD_LOG_ERR(( "pthread_create() failed (%i-%s)", errno, fd_io_strerror( errno ) ));

while( !FD_VOLATILE( args.copied ) ) FD_SPIN_PAUSE();
return pthread;
}

void
Expand Down

0 comments on commit 9d75004

Please sign in to comment.