diff --git a/src/mca/grpcomm/direct/grpcomm_direct_group.c b/src/mca/grpcomm/direct/grpcomm_direct_group.c index 52c6807fe5..0dac5ff902 100644 --- a/src/mca/grpcomm/direct/grpcomm_direct_group.c +++ b/src/mca/grpcomm/direct/grpcomm_direct_group.c @@ -73,8 +73,9 @@ int prte_grpcomm_direct_group(pmix_group_operation_t op, char *grpid, prte_pmix_grp_caddy_t *cd; PMIX_OUTPUT_VERBOSE((1, prte_grpcomm_base_framework.framework_output, - "%s grpcomm:direct:group for \"%s\" with %lu procs", - PRTE_NAME_PRINT(PRTE_PROC_MY_NAME), grpid, nprocs)); + "%s grpcomm:direct:group %s for \"%s\" with %lu procs", + PRTE_NAME_PRINT(PRTE_PROC_MY_NAME), + PMIx_Group_operation_string(op), grpid, nprocs)); cd = PMIX_NEW(prte_pmix_grp_caddy_t); cd->op = op; @@ -442,8 +443,8 @@ void prte_grpcomm_direct_grp_recv(int status, pmix_proc_t *sender, if (PRTE_PROC_IS_MASTER) { PMIX_OUTPUT_VERBOSE((1, prte_grpcomm_base_framework.framework_output, - "%s grpcomm:direct group HNP reports complete", - PRTE_NAME_PRINT(PRTE_PROC_MY_NAME))); + "%s grpcomm:direct group HNP reports complete for %s", + PRTE_NAME_PRINT(PRTE_PROC_MY_NAME), coll->sig->groupID)); /* the allgather is complete - send the xcast */ if (PMIX_GROUP_CONSTRUCT == sig->op) { @@ -702,7 +703,7 @@ void prte_grpcomm_direct_grp_release(int status, pmix_proc_t *sender, PMIX_ACQUIRE_OBJECT(cd); pmix_output_verbose(2, prte_pmix_server_globals.output, - "%s group request complete", + "%s group release recvd", PRTE_NAME_PRINT(PRTE_PROC_MY_NAME)); // unpack the signature diff --git a/src/prted/pmix/pmix_server_group.c b/src/prted/pmix/pmix_server_group.c index b833e9f92b..3536805d95 100644 --- a/src/prted/pmix/pmix_server_group.c +++ b/src/prted/pmix/pmix_server_group.c @@ -72,158 +72,12 @@ pmix_status_t pmix_server_group_fn(pmix_group_operation_t op, char *grpid, #else -static void relcb(void *cbdata) -{ - prte_pmix_grp_caddy_t *cd = (prte_pmix_grp_caddy_t*)cbdata; - PMIX_RELEASE(cd); -} - -static void opcbfunc(int status, void *cbdata) -{ - prte_pmix_grp_caddy_t *cd = (prte_pmix_grp_caddy_t*)cbdata; - PRTE_HIDE_UNUSED_PARAMS(status); - - PMIX_RELEASE(cd); -} - -static void local_complete(int sd, short args, void *cbdata) -{ - prte_pmix_grp_caddy_t *cd = (prte_pmix_grp_caddy_t*)cbdata; - prte_pmix_grp_caddy_t *cd2; - pmix_server_pset_t *pset; - pmix_data_array_t members = PMIX_DATA_ARRAY_STATIC_INIT; - pmix_proc_t *addmembers = NULL; - size_t nmembers = 0, naddmembers = 0; - pmix_proc_t *p; - void *ilist; - pmix_status_t rc; - size_t n; - pmix_data_array_t darray; - pmix_data_buffer_t dbuf; - pmix_byte_object_t bo; - PRTE_HIDE_UNUSED_PARAMS(sd, args); - - if (PMIX_GROUP_CONSTRUCT == cd->op) { - - PMIX_INFO_LIST_START(ilist); - - for (n=0; n < cd->ndirs; n++) { - // check if they gave us any grp or endpt info - if (PMIX_CHECK_KEY(&cd->directives[n], PMIX_PROC_DATA) || - PMIX_CHECK_KEY(&cd->directives[n], PMIX_GROUP_INFO)) { - rc = PMIx_Info_list_add_value(ilist, cd->directives[n].key, &cd->directives[n].value); - if (PMIX_SUCCESS != rc) { - PMIX_ERROR_LOG(rc); - } - // check for add members - server lib would have aggregated them - } else if (PMIX_CHECK_KEY(&cd->directives[n], PMIX_GROUP_ADD_MEMBERS)) { - naddmembers = cd->directives[n].value.data.darray->size; - addmembers = (pmix_proc_t*)cd->directives[n].value.data.darray->array; - } - } - - // construct the final group membership - nmembers = cd->nprocs + naddmembers; - PMIX_DATA_ARRAY_CONSTRUCT(&members, nmembers, PMIX_PROC); - p = (pmix_proc_t*)members.array; - memcpy(p, cd->procs, cd->nprocs * sizeof(pmix_proc_t)); - if (0 < naddmembers) { - memcpy(&p[cd->nprocs], addmembers, naddmembers * sizeof(pmix_proc_t)); - } - PMIX_INFO_LIST_ADD(rc, ilist, PMIX_GROUP_MEMBERSHIP, &members, PMIX_DATA_ARRAY); - if (PMIX_SUCCESS != rc) { - PMIX_ERROR_LOG(rc); - } - - PMIX_INFO_LIST_ADD(rc, ilist, PMIX_GROUP_ID, cd->grpid, PMIX_STRING); - if (PMIX_SUCCESS != rc) { - PMIX_ERROR_LOG(rc); - } - - /* add it to our list of known groups */ - pset = PMIX_NEW(pmix_server_pset_t); - pset->name = strdup(cd->grpid); - pset->num_members = nmembers; - PMIX_PROC_CREATE(pset->members, nmembers); - memcpy(pset->members, p, nmembers * sizeof(pmix_proc_t)); - pmix_list_append(&prte_pmix_server_globals.groups, &pset->super); - - // convert the info list - PMIX_INFO_LIST_CONVERT(rc, ilist, &darray); - cd->info = (pmix_info_t*)darray.array; - cd->ninfo = darray.size; - PMIX_INFO_LIST_RELEASE(ilist); - - // generate events for any add members as they are waiting for notification - if (NULL != addmembers) { - - cd2 = PMIX_NEW(prte_pmix_grp_caddy_t); - cd2->ninfo = cd->ninfo + 3; - PMIX_INFO_CREATE(cd2->info, cd2->ninfo); - // carry over the info we created - for (n=0; n < cd->ninfo; n++) { - rc = PMIx_Info_xfer(&cd2->info[n], &cd->info[n]); - if (PMIX_SUCCESS != rc) { - PMIX_ERROR_LOG(rc); - } - } - - // set the range to be only procs that were added - darray.type = PMIX_PROC; - darray.array = addmembers; - darray.size = naddmembers; - // load the array - note: this copies the array! - PMIX_INFO_LOAD(&cd2->info[n], PMIX_EVENT_CUSTOM_RANGE, &darray, PMIX_DATA_ARRAY); - ++n; - - // mark that this event stays local and does not go up to the host - PMIX_INFO_LOAD(&cd2->info[n], PMIX_EVENT_STAYS_LOCAL, NULL, PMIX_BOOL); - ++n; - - // add the job-level info - PMIX_DATA_BUFFER_CONSTRUCT(&dbuf); - rc = PMIx_server_collect_job_info(p, nmembers, &dbuf); - if (PMIX_SUCCESS == rc) { - PMIx_Data_buffer_unload(&dbuf, &bo.bytes, &bo.size); - PMIX_INFO_LOAD(&cd2->info[n], PMIX_GROUP_JOB_INFO, &bo, PMIX_BYTE_OBJECT); - PMIX_BYTE_OBJECT_DESTRUCT(&bo); - } - PMIX_DATA_BUFFER_DESTRUCT(&dbuf); - - // notify local procs - PMIx_Notify_event(PMIX_GROUP_INVITED, &prte_process_info.myproc, - PMIX_RANGE_CUSTOM, - cd2->info, cd2->ninfo, opcbfunc, cd2); - } - - // return this to the PMIx server - cd->cbfunc(PMIX_SUCCESS, cd->info, cd->ninfo, cd->cbdata, relcb, cd); - - } else { - /* find this group ID on our list of groups and remove it */ - PMIX_LIST_FOREACH(pset, &prte_pmix_server_globals.groups, pmix_server_pset_t) - { - if (0 == strcmp(pset->name, cd->grpid)) { - pmix_list_remove_item(&prte_pmix_server_globals.groups, &pset->super); - PMIX_RELEASE(pset); - break; - } - } - - // return their callback - cd->cbfunc(PMIX_SUCCESS, NULL, 0, cd->cbdata, relcb, cd); - } -} - pmix_status_t pmix_server_group_fn(pmix_group_operation_t op, char *grpid, const pmix_proc_t procs[], size_t nprocs, const pmix_info_t directives[], size_t ndirs, pmix_info_cbfunc_t cbfunc, void *cbdata) { - prte_pmix_grp_caddy_t *cd; int rc; - size_t i; - bool force_local = false; pmix_output_verbose(2, prte_pmix_server_globals.output, "%s Group request %s recvd with %lu directives", @@ -235,34 +89,6 @@ pmix_status_t pmix_server_group_fn(pmix_group_operation_t op, char *grpid, return PMIX_ERR_BAD_PARAM; } - /* check the directives */ - for (i = 0; i < ndirs; i++) { - /* see if this is local only */ - if (PMIX_CHECK_KEY(&directives[i], PMIX_GROUP_LOCAL_ONLY)) { - force_local = PMIX_INFO_TRUE(&directives[i]); - break; - } - } - - /* if they insist on forcing local completion of the operation, then - * we are done */ - if (force_local) { - pmix_output_verbose(2, prte_pmix_server_globals.output, - "%s group request - purely local", - PRTE_NAME_PRINT(PRTE_PROC_MY_NAME)); - cd = PMIX_NEW(prte_pmix_grp_caddy_t); - cd->op = op; - cd->grpid = strdup(grpid); - cd->procs = procs; - cd->nprocs = nprocs; - cd->directives = directives; - cd->ndirs = ndirs; - cd->cbfunc = cbfunc; - cd->cbdata = cbdata; - PRTE_PMIX_THREADSHIFT(cd, prte_event_base, local_complete); - return PMIX_SUCCESS; - } - rc = prte_grpcomm.group(op, grpid, procs, nprocs, directives, ndirs, cbfunc, cbdata); if (PRTE_SUCCESS != rc) {