Skip to content

Commit

Permalink
Remove local completion for group construct
Browse files Browse the repository at this point in the history
It was a false optimization as we need global knowledge of
group existence/membership anyway.

Signed-off-by: Ralph Castain <[email protected]>
  • Loading branch information
rhc54 committed Dec 22, 2024
1 parent 19b0a63 commit 38bc4e5
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 179 deletions.
11 changes: 6 additions & 5 deletions src/mca/grpcomm/direct/grpcomm_direct_group.c
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,9 @@ int prte_grpcomm_direct_group(pmix_group_operation_t op, char *grpid,
prte_pmix_grp_caddy_t *cd;

PMIX_OUTPUT_VERBOSE((1, prte_grpcomm_base_framework.framework_output,
"%s grpcomm:direct:group for \"%s\" with %lu procs",
PRTE_NAME_PRINT(PRTE_PROC_MY_NAME), grpid, nprocs));
"%s grpcomm:direct:group %s for \"%s\" with %lu procs",
PRTE_NAME_PRINT(PRTE_PROC_MY_NAME),
PMIx_Group_operation_string(op), grpid, nprocs));

cd = PMIX_NEW(prte_pmix_grp_caddy_t);
cd->op = op;
Expand Down Expand Up @@ -442,8 +443,8 @@ void prte_grpcomm_direct_grp_recv(int status, pmix_proc_t *sender,

if (PRTE_PROC_IS_MASTER) {
PMIX_OUTPUT_VERBOSE((1, prte_grpcomm_base_framework.framework_output,
"%s grpcomm:direct group HNP reports complete",
PRTE_NAME_PRINT(PRTE_PROC_MY_NAME)));
"%s grpcomm:direct group HNP reports complete for %s",
PRTE_NAME_PRINT(PRTE_PROC_MY_NAME), coll->sig->groupID));

/* the allgather is complete - send the xcast */
if (PMIX_GROUP_CONSTRUCT == sig->op) {
Expand Down Expand Up @@ -702,7 +703,7 @@ void prte_grpcomm_direct_grp_release(int status, pmix_proc_t *sender,
PMIX_ACQUIRE_OBJECT(cd);

pmix_output_verbose(2, prte_pmix_server_globals.output,
"%s group request complete",
"%s group release recvd",
PRTE_NAME_PRINT(PRTE_PROC_MY_NAME));

// unpack the signature
Expand Down
174 changes: 0 additions & 174 deletions src/prted/pmix/pmix_server_group.c
Original file line number Diff line number Diff line change
Expand Up @@ -72,158 +72,12 @@ pmix_status_t pmix_server_group_fn(pmix_group_operation_t op, char *grpid,

#else

static void relcb(void *cbdata)
{
prte_pmix_grp_caddy_t *cd = (prte_pmix_grp_caddy_t*)cbdata;
PMIX_RELEASE(cd);
}

static void opcbfunc(int status, void *cbdata)
{
prte_pmix_grp_caddy_t *cd = (prte_pmix_grp_caddy_t*)cbdata;
PRTE_HIDE_UNUSED_PARAMS(status);

PMIX_RELEASE(cd);
}

static void local_complete(int sd, short args, void *cbdata)
{
prte_pmix_grp_caddy_t *cd = (prte_pmix_grp_caddy_t*)cbdata;
prte_pmix_grp_caddy_t *cd2;
pmix_server_pset_t *pset;
pmix_data_array_t members = PMIX_DATA_ARRAY_STATIC_INIT;
pmix_proc_t *addmembers = NULL;
size_t nmembers = 0, naddmembers = 0;
pmix_proc_t *p;
void *ilist;
pmix_status_t rc;
size_t n;
pmix_data_array_t darray;
pmix_data_buffer_t dbuf;
pmix_byte_object_t bo;
PRTE_HIDE_UNUSED_PARAMS(sd, args);

if (PMIX_GROUP_CONSTRUCT == cd->op) {

PMIX_INFO_LIST_START(ilist);

for (n=0; n < cd->ndirs; n++) {
// check if they gave us any grp or endpt info
if (PMIX_CHECK_KEY(&cd->directives[n], PMIX_PROC_DATA) ||
PMIX_CHECK_KEY(&cd->directives[n], PMIX_GROUP_INFO)) {
rc = PMIx_Info_list_add_value(ilist, cd->directives[n].key, &cd->directives[n].value);
if (PMIX_SUCCESS != rc) {
PMIX_ERROR_LOG(rc);
}
// check for add members - server lib would have aggregated them
} else if (PMIX_CHECK_KEY(&cd->directives[n], PMIX_GROUP_ADD_MEMBERS)) {
naddmembers = cd->directives[n].value.data.darray->size;
addmembers = (pmix_proc_t*)cd->directives[n].value.data.darray->array;
}
}

// construct the final group membership
nmembers = cd->nprocs + naddmembers;
PMIX_DATA_ARRAY_CONSTRUCT(&members, nmembers, PMIX_PROC);
p = (pmix_proc_t*)members.array;
memcpy(p, cd->procs, cd->nprocs * sizeof(pmix_proc_t));
if (0 < naddmembers) {
memcpy(&p[cd->nprocs], addmembers, naddmembers * sizeof(pmix_proc_t));
}
PMIX_INFO_LIST_ADD(rc, ilist, PMIX_GROUP_MEMBERSHIP, &members, PMIX_DATA_ARRAY);
if (PMIX_SUCCESS != rc) {
PMIX_ERROR_LOG(rc);
}

PMIX_INFO_LIST_ADD(rc, ilist, PMIX_GROUP_ID, cd->grpid, PMIX_STRING);
if (PMIX_SUCCESS != rc) {
PMIX_ERROR_LOG(rc);
}

/* add it to our list of known groups */
pset = PMIX_NEW(pmix_server_pset_t);
pset->name = strdup(cd->grpid);
pset->num_members = nmembers;
PMIX_PROC_CREATE(pset->members, nmembers);
memcpy(pset->members, p, nmembers * sizeof(pmix_proc_t));
pmix_list_append(&prte_pmix_server_globals.groups, &pset->super);

// convert the info list
PMIX_INFO_LIST_CONVERT(rc, ilist, &darray);
cd->info = (pmix_info_t*)darray.array;
cd->ninfo = darray.size;
PMIX_INFO_LIST_RELEASE(ilist);

// generate events for any add members as they are waiting for notification
if (NULL != addmembers) {

cd2 = PMIX_NEW(prte_pmix_grp_caddy_t);
cd2->ninfo = cd->ninfo + 3;
PMIX_INFO_CREATE(cd2->info, cd2->ninfo);
// carry over the info we created
for (n=0; n < cd->ninfo; n++) {
rc = PMIx_Info_xfer(&cd2->info[n], &cd->info[n]);
if (PMIX_SUCCESS != rc) {
PMIX_ERROR_LOG(rc);
}
}

// set the range to be only procs that were added
darray.type = PMIX_PROC;
darray.array = addmembers;
darray.size = naddmembers;
// load the array - note: this copies the array!
PMIX_INFO_LOAD(&cd2->info[n], PMIX_EVENT_CUSTOM_RANGE, &darray, PMIX_DATA_ARRAY);
++n;

// mark that this event stays local and does not go up to the host
PMIX_INFO_LOAD(&cd2->info[n], PMIX_EVENT_STAYS_LOCAL, NULL, PMIX_BOOL);
++n;

// add the job-level info
PMIX_DATA_BUFFER_CONSTRUCT(&dbuf);
rc = PMIx_server_collect_job_info(p, nmembers, &dbuf);
if (PMIX_SUCCESS == rc) {
PMIx_Data_buffer_unload(&dbuf, &bo.bytes, &bo.size);
PMIX_INFO_LOAD(&cd2->info[n], PMIX_GROUP_JOB_INFO, &bo, PMIX_BYTE_OBJECT);
PMIX_BYTE_OBJECT_DESTRUCT(&bo);
}
PMIX_DATA_BUFFER_DESTRUCT(&dbuf);

// notify local procs
PMIx_Notify_event(PMIX_GROUP_INVITED, &prte_process_info.myproc,
PMIX_RANGE_CUSTOM,
cd2->info, cd2->ninfo, opcbfunc, cd2);
}

// return this to the PMIx server
cd->cbfunc(PMIX_SUCCESS, cd->info, cd->ninfo, cd->cbdata, relcb, cd);

} else {
/* find this group ID on our list of groups and remove it */
PMIX_LIST_FOREACH(pset, &prte_pmix_server_globals.groups, pmix_server_pset_t)
{
if (0 == strcmp(pset->name, cd->grpid)) {
pmix_list_remove_item(&prte_pmix_server_globals.groups, &pset->super);
PMIX_RELEASE(pset);
break;
}
}

// return their callback
cd->cbfunc(PMIX_SUCCESS, NULL, 0, cd->cbdata, relcb, cd);
}
}

pmix_status_t pmix_server_group_fn(pmix_group_operation_t op, char *grpid,
const pmix_proc_t procs[], size_t nprocs,
const pmix_info_t directives[], size_t ndirs,
pmix_info_cbfunc_t cbfunc, void *cbdata)
{
prte_pmix_grp_caddy_t *cd;
int rc;
size_t i;
bool force_local = false;

pmix_output_verbose(2, prte_pmix_server_globals.output,
"%s Group request %s recvd with %lu directives",
Expand All @@ -235,34 +89,6 @@ pmix_status_t pmix_server_group_fn(pmix_group_operation_t op, char *grpid,
return PMIX_ERR_BAD_PARAM;
}

/* check the directives */
for (i = 0; i < ndirs; i++) {
/* see if this is local only */
if (PMIX_CHECK_KEY(&directives[i], PMIX_GROUP_LOCAL_ONLY)) {
force_local = PMIX_INFO_TRUE(&directives[i]);
break;
}
}

/* if they insist on forcing local completion of the operation, then
* we are done */
if (force_local) {
pmix_output_verbose(2, prte_pmix_server_globals.output,
"%s group request - purely local",
PRTE_NAME_PRINT(PRTE_PROC_MY_NAME));
cd = PMIX_NEW(prte_pmix_grp_caddy_t);
cd->op = op;
cd->grpid = strdup(grpid);
cd->procs = procs;
cd->nprocs = nprocs;
cd->directives = directives;
cd->ndirs = ndirs;
cd->cbfunc = cbfunc;
cd->cbdata = cbdata;
PRTE_PMIX_THREADSHIFT(cd, prte_event_base, local_complete);
return PMIX_SUCCESS;
}

rc = prte_grpcomm.group(op, grpid, procs, nprocs,
directives, ndirs, cbfunc, cbdata);
if (PRTE_SUCCESS != rc) {
Expand Down

0 comments on commit 38bc4e5

Please sign in to comment.