diff --git a/src/mca/grpcomm/direct/grpcomm_direct.h b/src/mca/grpcomm/direct/grpcomm_direct.h index abf041533d..ed334e7309 100644 --- a/src/mca/grpcomm/direct/grpcomm_direct.h +++ b/src/mca/grpcomm/direct/grpcomm_direct.h @@ -59,6 +59,7 @@ typedef struct { pmix_proc_t *members; // initially supplied procs size_t nmembers; size_t bootstrap; + bool follower; pmix_proc_t *addmembers; // procs supplied as add-members size_t naddmembers; } prte_grpcomm_direct_group_signature_t; @@ -106,10 +107,28 @@ typedef struct { size_t ndmns; /** my index in the dmns array */ unsigned long my_rank; - /* number of buckets expected */ - size_t nexpected; - /* number reported in */ - size_t nreported; + /* type of collective */ + bool bootstrap; + + /*** NON-BOOTSTRAP TRACKERS ***/ + size_t nexpected; // number of buckets expected + size_t nreported; // number reported in + + /*** BOOTSTRAP TRACKERS ***/ + // "leaders" are group members reporting as + // themselves for bootstrap - they know how + // many leaders there are (which is in the bootstrap + // parameter), but not who they are. Bootstrap is + // complete when nleaders_reported == bootstrap + // AND naddmembers_reported == naddmembers + size_t nleaders; // number of leaders expected + size_t nleaders_reported; // number reported in + // "add-members" are procs that report with NULL + // for the proc parameter - thereby indicating that + // they don't know the other procs in the group + size_t nfollowers; // number of add-member procs expected to participate + size_t nfollowers_reported; // number reported in + /* controls values */ bool assignID; int timeout; @@ -185,6 +204,67 @@ void prte_grpcomm_direct_grp_release(int status, pmix_proc_t *sender, pmix_data_buffer_t *buffer, prte_rml_tag_t tag, void *cbdata); +static inline void print_signature(prte_grpcomm_direct_group_signature_t *sig) +{ + char **msg = NULL; + char *tmp; + size_t n; + + PMIx_Argv_append_nosize(&msg, "SIGNATURE:"); + pmix_asprintf(&tmp, "\tOP: %s", PMIx_Group_operation_string(sig->op)); + PMIx_Argv_append_nosize(&msg, tmp); + free(tmp); + + pmix_asprintf(&tmp, "\tGRPID: %s", sig->groupID); + PMIx_Argv_append_nosize(&msg, tmp); + free(tmp); + + pmix_asprintf(&tmp, "\tASSIGN CTXID: %s", sig->assignID ? "T" : "F"); + PMIx_Argv_append_nosize(&msg, tmp); + free(tmp); + + if (sig->assignID) { + pmix_asprintf(&tmp, "\tCTXID: %lu", sig->ctxid); + PMIx_Argv_append_nosize(&msg, tmp); + free(tmp); + } + + pmix_asprintf(&tmp, "\tNMEMBERS: %lu", sig->nmembers); + PMIx_Argv_append_nosize(&msg, tmp); + free(tmp); + if (0 < sig->nmembers) { + for (n=0; n < sig->nmembers; n++) { + pmix_asprintf(&tmp, "\t\t%s", PMIX_NAME_PRINT(&sig->members[n])); + PMIx_Argv_append_nosize(&msg, tmp); + free(tmp); + } + } + + pmix_asprintf(&tmp, "\tBOOTSTRAP: %lu", sig->bootstrap); + PMIx_Argv_append_nosize(&msg, tmp); + free(tmp); + + pmix_asprintf(&tmp, "\tFOLLOWER: %s", sig->follower ? "T" : "F"); + PMIx_Argv_append_nosize(&msg, tmp); + free(tmp); + + pmix_asprintf(&tmp, "\tNADDMEMBERS: %lu", sig->naddmembers); + PMIx_Argv_append_nosize(&msg, tmp); + free(tmp); + if (0 < sig->naddmembers) { + for (n=0; n < sig->naddmembers; n++) { + pmix_asprintf(&tmp, "\t\t%s", PMIX_NAME_PRINT(&sig->addmembers[n])); + PMIx_Argv_append_nosize(&msg, tmp); + free(tmp); + } + } + + tmp = PMIx_Argv_join(msg, '\n'); + PMIx_Argv_free(msg); + pmix_output(0, "%s", tmp); + free(tmp); +} + END_C_DECLS #endif diff --git a/src/mca/grpcomm/direct/grpcomm_direct_component.c b/src/mca/grpcomm/direct/grpcomm_direct_component.c index 98dcef65e5..472296872f 100644 --- a/src/mca/grpcomm/direct/grpcomm_direct_component.c +++ b/src/mca/grpcomm/direct/grpcomm_direct_component.c @@ -78,6 +78,7 @@ static void sgcon(prte_grpcomm_direct_group_signature_t *p)\ p->members = NULL; p->nmembers = 0; p->bootstrap = 0; + p->follower = false; p->addmembers = NULL; p->naddmembers = 0; } @@ -133,6 +134,10 @@ static void gccon(prte_grpcomm_group_t *p) p->ndmns = 0; p->nexpected = 0; p->nreported = 0; + p->nleaders = 0; + p->nleaders_reported = 0; + p->nfollowers = 0; + p->nfollowers_reported = 0; p->assignID = false; p->timeout = 0; p->memsize = 0; diff --git a/src/mca/grpcomm/direct/grpcomm_direct_group.c b/src/mca/grpcomm/direct/grpcomm_direct_group.c index 53f3def57b..52c6807fe5 100644 --- a/src/mca/grpcomm/direct/grpcomm_direct_group.c +++ b/src/mca/grpcomm/direct/grpcomm_direct_group.c @@ -73,7 +73,7 @@ int prte_grpcomm_direct_group(pmix_group_operation_t op, char *grpid, prte_pmix_grp_caddy_t *cd; PMIX_OUTPUT_VERBOSE((1, prte_grpcomm_base_framework.framework_output, - "%s grpcomm:direct:group for %s with %lu procs", + "%s grpcomm:direct:group for \"%s\" with %lu procs", PRTE_NAME_PRINT(PRTE_PROC_MY_NAME), grpid, nprocs)); cd = PMIX_NEW(prte_pmix_grp_caddy_t); @@ -116,19 +116,12 @@ static void group(int sd, short args, void *cbdata) sig.nmembers = cd->nprocs; PMIX_PROC_CREATE(sig.members, sig.nmembers); memcpy(sig.members, cd->procs, sig.nmembers * sizeof(pmix_proc_t)); + } else { + // if no procs were given, then this must be a follower + sig.follower = true; } sig.op = cd->op; - /* create a tracker for this operation */ - if (NULL == (coll = get_tracker(&sig, true))) { - PRTE_ERROR_LOG(PRTE_ERR_NOT_FOUND); - PMIX_RELEASE(cd); - PMIX_DESTRUCT(&sig); - return; - } - coll->cbfunc = cd->cbfunc; - coll->cbdata = cd->cbdata; - // setup to track endpts and grpinfo endpts = PMIx_Info_list_start(); grpinfo = PMIx_Info_list_start(); @@ -184,6 +177,16 @@ static void group(int sd, short args, void *cbdata) } } + /* create a tracker for this operation */ + if (NULL == (coll = get_tracker(&sig, true))) { + PRTE_ERROR_LOG(PRTE_ERR_NOT_FOUND); + PMIX_RELEASE(cd); + PMIX_DESTRUCT(&sig); + return; + } + coll->cbfunc = cd->cbfunc; + coll->cbdata = cd->cbdata; + // create the relay buffer PMIX_DATA_BUFFER_CREATE(relay); @@ -242,7 +245,7 @@ static void group(int sd, short args, void *cbdata) PMIx_Info_list_release(endpts); /* if this is a bootstrap operation, send it directly to the HNP */ - if (0 < sig.bootstrap || NULL == sig.members) { + if (coll->bootstrap) { PMIX_OUTPUT_VERBOSE((1, prte_grpcomm_base_framework.framework_output, "%s grpcomm:direct:grp bootstrap sending %lu bytes to HNP", PRTE_NAME_PRINT(PRTE_PROC_MY_NAME), relay->bytes_used)); @@ -407,16 +410,35 @@ void prte_grpcomm_direct_grp_recv(int status, pmix_proc_t *sender, } } - /* increment nprocs reported for collective */ - coll->nreported++; + /* track procs reported for collective */ + if (0 < sig->bootstrap) { + // this came from a bootstrap leader + coll->nleaders_reported++; + PMIX_OUTPUT_VERBOSE((1, prte_grpcomm_base_framework.framework_output, + "%s grpcomm:direct group recv leader nrep %d of %d", + PRTE_NAME_PRINT(PRTE_PROC_MY_NAME), (int) coll->nleaders_reported, + (int) coll->nleaders)); + } else if (sig->follower) { + // came from a bootstrap follower + coll->nfollowers_reported++; + PMIX_OUTPUT_VERBOSE((1, prte_grpcomm_base_framework.framework_output, + "%s grpcomm:direct group recv follower nrep %d of %d", + PRTE_NAME_PRINT(PRTE_PROC_MY_NAME), (int) coll->nfollowers_reported, + (int) coll->nfollowers)); + } else { + // group collective op + coll->nreported++; + PMIX_OUTPUT_VERBOSE((1, prte_grpcomm_base_framework.framework_output, + "%s grpcomm:direct group recv nexpected %d nrep %d", + PRTE_NAME_PRINT(PRTE_PROC_MY_NAME), (int) coll->nexpected, + (int) coll->nreported)); + } - PMIX_OUTPUT_VERBOSE((1, prte_grpcomm_base_framework.framework_output, - "%s grpcomm:direct group recv nexpected %d nrep %d", - PRTE_NAME_PRINT(PRTE_PROC_MY_NAME), (int) coll->nexpected, - (int) coll->nreported)); /* see if everyone has reported */ - if (coll->nreported == coll->nexpected) { + if ((coll->bootstrap && (coll->nleaders_reported == coll->nleaders && + coll->nfollowers_reported == coll->nfollowers)) || + (!coll->bootstrap && coll->nreported == coll->nexpected)) { if (PRTE_PROC_IS_MASTER) { PMIX_OUTPUT_VERBOSE((1, prte_grpcomm_base_framework.framework_output, @@ -458,7 +480,7 @@ void prte_grpcomm_direct_grp_recv(int status, pmix_proc_t *sender, for (m=0; m < coll->sig->naddmembers; m++) { found = false; PMIX_LIST_FOREACH(nm, &nmlist, prte_namelist_t) { - if (PMIX_CHECK_PROCID(&coll->sig->members[m], &nm->name)) { + if (PMIX_CHECK_PROCID(&coll->sig->addmembers[m], &nm->name)) { // if the new one is rank=WILDCARD, then ensure // we keep it as wildcard if (PMIX_RANK_WILDCARD == coll->sig->addmembers[m].rank) { @@ -633,14 +655,6 @@ static void relcb(void *cbdata) PMIX_RELEASE(cd); } -static void opcbfunc(int status, void *cbdata) -{ - prte_pmix_grp_caddy_t *cd = (prte_pmix_grp_caddy_t*)cbdata; - PRTE_HIDE_UNUSED_PARAMS(status); - - PMIX_RELEASE(cd); -} - static void lkopcbfunc(int status, void *cbdata) { prte_pmix_grp_caddy_t *cd = (prte_pmix_grp_caddy_t*)cbdata; @@ -855,70 +869,6 @@ void prte_grpcomm_direct_grp_release(int status, pmix_proc_t *sender, // regardless of prior error, we MUST notify any pending clients // so they don't hang - if (NULL == coll || NULL != sig->addmembers) { - // still need to generate invite event for procs - // that might be on nodes that were not involved - // in the original collective - - PMIX_INFO_LIST_START(ilist); - - // provide the group ID since the invitee won't have it - PMIX_INFO_LIST_ADD(rc, ilist, PMIX_GROUP_ID, sig->groupID, PMIX_STRING); - if (PMIX_SUCCESS != rc) { - PMIX_ERROR_LOG(rc); - } - - // set the range to be only procs that were added - darray.type = PMIX_PROC; - darray.array = sig->addmembers; - darray.size = sig->naddmembers; - // load the array - note: this copies the array! - PMIX_INFO_LIST_ADD(rc, ilist, PMIX_EVENT_CUSTOM_RANGE, &darray, PMIX_DATA_ARRAY); - if (PMIX_SUCCESS != rc) { - PMIX_ERROR_LOG(rc); - } - - // mark that this event stays local and does not go up to the host - PMIX_INFO_LIST_ADD(rc, ilist, PMIX_EVENT_STAYS_LOCAL, NULL, PMIX_BOOL); - if (PMIX_SUCCESS != rc) { - PMIX_ERROR_LOG(rc); - } - - if (NULL != finalmembership) { - // pass back the final group membership - darray.type = PMIX_PROC; - darray.array = finalmembership; - darray.size = nfinal; - // load the array - note: this copies the array! - PMIX_INFO_LIST_ADD(rc, ilist, PMIX_GROUP_MEMBERSHIP, &darray, PMIX_DATA_ARRAY); - if (PMIX_SUCCESS != rc) { - PMIX_ERROR_LOG(rc); - } - } - - // pass any assigned context ID - if (sig->ctxid_assigned) { - PMIX_INFO_LIST_ADD(rc, ilist, PMIX_GROUP_CONTEXT_ID, &sig->ctxid, PMIX_SIZE); - if (PMIX_SUCCESS != rc) { - PMIX_ERROR_LOG(rc); - } - } - - // convert for passing in event - PMIX_INFO_LIST_CONVERT(rc, ilist, &darray); - if (PMIX_SUCCESS != rc && PMIX_ERR_EMPTY != rc) { - PMIX_ERROR_LOG(rc); - } - cd = PMIX_NEW(prte_pmix_grp_caddy_t); - cd->info = (pmix_info_t*)darray.array; - cd->ninfo = darray.size; - PMIX_INFO_LIST_RELEASE(ilist); - - // notify local procs - PMIx_Notify_event(PMIX_GROUP_INVITED, &prte_process_info.myproc, PMIX_RANGE_CUSTOM, - cd->info, cd->ninfo, opcbfunc, (void*)cd); - } - if (NULL != coll && NULL != coll->cbfunc) { // service the procs that are part of the collective @@ -972,7 +922,8 @@ void prte_grpcomm_direct_grp_release(int status, pmix_proc_t *sender, } -static prte_grpcomm_group_t *get_tracker(prte_grpcomm_direct_group_signature_t *sig, bool create) +static prte_grpcomm_group_t *get_tracker(prte_grpcomm_direct_group_signature_t *sig, + bool create) { prte_grpcomm_group_t *coll; int rc; @@ -997,6 +948,68 @@ static prte_grpcomm_group_t *get_tracker(prte_grpcomm_direct_group_signature_t * "%s grpcomm:direct:group:returning existing collective %s", PRTE_NAME_PRINT(PRTE_PROC_MY_NAME), sig->groupID)); + // if this is a bootstrap, then we have to track the number of leaders + if (0 < sig->bootstrap) { + if (0 < coll->nleaders) { + if (coll->nleaders != sig->bootstrap) { + // this is an error + PMIX_ERROR_LOG(PMIX_ERR_BAD_PARAM); + return NULL; + } + } else { + // collective tracker could have been created by a follower, + // which means nleaders will not have been set + coll->nleaders = sig->bootstrap; + } + coll->bootstrap = true; + // add this proc to the list of members + PMIX_CONSTRUCT(&plist, pmix_list_t); + for (n=0; n < sig->nmembers; n++) { + // see if we already have this proc + found = false; + for (nmb=0; nmb < coll->sig->nmembers; nmb++) { + if (PMIX_CHECK_PROCID(&sig->members[n], &coll->sig->members[nmb])) { + // yes, we do + found = true; + // check for wildcard as that needs to be retained + if (PMIX_RANK_WILDCARD == sig->members[n].rank) { + coll->sig->members[n].rank = PMIX_RANK_WILDCARD; + } + break; + } + } + if (!found) { + // cache the proc + nm = PMIX_NEW(prte_namelist_t); + memcpy(&nm->name, &sig->members[n], sizeof(pmix_proc_t)); + pmix_list_append(&plist, &nm->super); + } + } + // add any missing procs to the addmembers + if (0 < pmix_list_get_size(&plist)) { + n = coll->sig->nmembers + pmix_list_get_size(&plist); + PMIX_PROC_CREATE(p, n); + if (NULL != coll->sig->members) { + memcpy(p, coll->sig->members, coll->sig->nmembers * sizeof(pmix_proc_t)); + } + n = coll->sig->nmembers; + PMIX_LIST_FOREACH(nm, &plist, prte_namelist_t) { + memcpy(&p[n], &nm->name, sizeof(pmix_proc_t)); + ++n; + } + PMIX_LIST_DESTRUCT(&plist); + if (NULL != coll->sig->members) { + PMIX_PROC_FREE(coll->sig->members, coll->sig->nmembers); + } + coll->sig->members = p; + coll->sig->nmembers = n; + } + + } else if (sig->follower) { + // just ensure the bootstrap flag is set + coll->bootstrap = true; + } + // if we are adding members, aggregate them if (0 < sig->naddmembers) { PMIX_CONSTRUCT(&plist, pmix_list_t); @@ -1039,6 +1052,7 @@ static prte_grpcomm_group_t *get_tracker(prte_grpcomm_direct_group_signature_t * } coll->sig->addmembers = p; coll->sig->naddmembers = n; + coll->nfollowers = n; } } if (!coll->sig->assignID && sig->assignID) { @@ -1049,7 +1063,7 @@ static prte_grpcomm_group_t *get_tracker(prte_grpcomm_direct_group_signature_t * } /* if we get here, then this is a new collective - so create - * the tracker for it */ + * the tracker for it unless directed otherwise */ if (!create) { PMIX_OUTPUT_VERBOSE((1, prte_grpcomm_base_framework.framework_output, "%s grpcomm:base: not creating new coll", @@ -1057,6 +1071,7 @@ static prte_grpcomm_group_t *get_tracker(prte_grpcomm_direct_group_signature_t * return NULL; } + coll = PMIX_NEW(prte_grpcomm_group_t); coll->sig = PMIX_NEW(prte_grpcomm_direct_group_signature_t); coll->sig->op = sig->op; @@ -1073,15 +1088,17 @@ static prte_grpcomm_group_t *get_tracker(prte_grpcomm_direct_group_signature_t * coll->sig->addmembers = (pmix_proc_t *) malloc(coll->sig->naddmembers * sizeof(pmix_proc_t)); memcpy(coll->sig->addmembers, sig->addmembers, coll->sig->naddmembers * sizeof(pmix_proc_t)); } - + coll->nfollowers = coll->sig->naddmembers; // need to know the bootstrap in case one is ongoing - coll->sig->bootstrap = sig->bootstrap; + coll->nleaders = coll->sig->bootstrap; + if (0 < sig->bootstrap || sig->follower) { + coll->bootstrap = true; + } pmix_list_append(&prte_mca_grpcomm_direct_component.group_ops, &coll->super); /* if this is a bootstrap operation, then there is no "rollup" * collective - each daemon reports directly to the DVM controller */ - if (0 < coll->sig->bootstrap) { - coll->nexpected = coll->sig->bootstrap; + if (coll->bootstrap) { return coll; } @@ -1298,6 +1315,13 @@ static int pack_signature(pmix_data_buffer_t *bkt, return prte_pmix_convert_status(rc); } + // pack follower flag + rc = PMIx_Data_pack(NULL, bkt, &sig->follower, 1, PMIX_BOOL); + if (PMIX_SUCCESS != rc) { + PMIX_ERROR_LOG(rc); + return prte_pmix_convert_status(rc); + } + // pack added membership, if given rc = PMIx_Data_pack(NULL, bkt, &sig->naddmembers, 1, PMIX_SIZE); if (PMIX_SUCCESS != rc) { @@ -1397,6 +1421,15 @@ static int unpack_signature(pmix_data_buffer_t *buffer, return prte_pmix_convert_status(rc); } + // unpack the follower flag + cnt = 1; + rc = PMIx_Data_unpack(NULL, buffer, &s->follower, &cnt, PMIX_BOOL); + if (PMIX_SUCCESS != rc) { + PMIX_ERROR_LOG(rc); + PMIX_RELEASE(s); + return prte_pmix_convert_status(rc); + } + // unpack the added members cnt = 1; rc = PMIx_Data_unpack(NULL, buffer, &s->naddmembers, &cnt, PMIX_SIZE);