diff --git a/src/mca/grpcomm/base/grpcomm_base_frame.c b/src/mca/grpcomm/base/grpcomm_base_frame.c index 6afc6cd9fd..de89eaee71 100644 --- a/src/mca/grpcomm/base/grpcomm_base_frame.c +++ b/src/mca/grpcomm/base/grpcomm_base_frame.c @@ -127,8 +127,12 @@ PMIX_CLASS_INSTANCE(prte_grpcomm_base_active_t, static void scon(prte_grpcomm_signature_t *p) { p->groupID = NULL; + p->ctxid = 0; + p->ctxid_assigned = false; p->signature = NULL; p->sz = 0; + p->addmembers = NULL; + p->nmembers = 0; } static void sdes(prte_grpcomm_signature_t *p) { @@ -138,6 +142,9 @@ static void sdes(prte_grpcomm_signature_t *p) if (NULL != p->signature) { free(p->signature); } + if (NULL != p->addmembers) { + free(p->addmembers); + } } PMIX_CLASS_INSTANCE(prte_grpcomm_signature_t, pmix_object_t, diff --git a/src/mca/grpcomm/base/grpcomm_base_stubs.c b/src/mca/grpcomm/base/grpcomm_base_stubs.c index 986cf1144e..2d8ed64481 100644 --- a/src/mca/grpcomm/base/grpcomm_base_stubs.c +++ b/src/mca/grpcomm/base/grpcomm_base_stubs.c @@ -195,7 +195,8 @@ prte_grpcomm_coll_t *prte_grpcomm_base_get_tracker(prte_grpcomm_signature_t *sig { prte_grpcomm_coll_t *coll; int rc; - size_t n; + pmix_proc_t *p; + size_t n, nmb; /* search the existing tracker list to see if this already exists - we * default to using the groupID if one is given, otherwise we fallback @@ -210,23 +211,21 @@ prte_grpcomm_coll_t *prte_grpcomm_base_get_tracker(prte_grpcomm_signature_t *sig /* if only one is NULL, then we can't possibly match */ break; } - if (sig->sz == coll->sig->sz) { - if (NULL != sig->groupID) { - // must match groupID's - if (NULL != coll->sig->groupID && 0 == strcmp(sig->groupID, coll->sig->groupID)) { - PMIX_OUTPUT_VERBOSE((1, prte_grpcomm_base_framework.framework_output, - "%s grpcomm:base:returning existing collective", - PRTE_NAME_PRINT(PRTE_PROC_MY_NAME))); - return coll; - } - } else { - // must match proc signature - if (0 == memcmp(sig->signature, coll->sig->signature, sig->sz * sizeof(pmix_proc_t))) { - PMIX_OUTPUT_VERBOSE((1, prte_grpcomm_base_framework.framework_output, - "%s grpcomm:base:returning existing collective", - PRTE_NAME_PRINT(PRTE_PROC_MY_NAME))); - return coll; - } + if (NULL != sig->groupID) { + // must match groupID's + if (NULL != coll->sig->groupID && 0 == strcmp(sig->groupID, coll->sig->groupID)) { + PMIX_OUTPUT_VERBOSE((1, prte_grpcomm_base_framework.framework_output, + "%s grpcomm:base:returning existing collective", + PRTE_NAME_PRINT(PRTE_PROC_MY_NAME))); + goto checkmembers; + } + } else if (sig->sz == coll->sig->sz) { + // must match proc signature + if (0 == memcmp(sig->signature, coll->sig->signature, sig->sz * sizeof(pmix_proc_t))) { + PMIX_OUTPUT_VERBOSE((1, prte_grpcomm_base_framework.framework_output, + "%s grpcomm:base:returning existing collective", + PRTE_NAME_PRINT(PRTE_PROC_MY_NAME))); + goto checkmembers; } } } @@ -270,6 +269,25 @@ prte_grpcomm_coll_t *prte_grpcomm_base_get_tracker(prte_grpcomm_signature_t *sig } } +checkmembers: + // add any addmembers that were given + if (NULL != sig->addmembers) { + if (NULL == coll->sig->addmembers) { + PMIX_PROC_CREATE(coll->sig->addmembers, sig->nmembers); + coll->sig->nmembers = sig->nmembers; + memcpy(coll->sig->addmembers, sig->addmembers, coll->sig->nmembers * sizeof(pmix_proc_t)); + } else { + // aggregate them + nmb = coll->sig->nmembers + sig->nmembers; + PMIX_PROC_CREATE(p, nmb); + memcpy(p, coll->sig->addmembers, coll->sig->nmembers * sizeof(pmix_proc_t)); + memcpy(&p[coll->sig->nmembers], sig->addmembers, sig->nmembers * sizeof(pmix_proc_t)); + PMIX_PROC_FREE(coll->sig->addmembers, coll->sig->nmembers); + coll->sig->addmembers = p; + coll->sig->nmembers = nmb; + } + } + return coll; } diff --git a/src/mca/grpcomm/direct/grpcomm_direct.c b/src/mca/grpcomm/direct/grpcomm_direct.c index 07904d5b20..dac1e5d69f 100644 --- a/src/mca/grpcomm/direct/grpcomm_direct.c +++ b/src/mca/grpcomm/direct/grpcomm_direct.c @@ -25,6 +25,7 @@ #include "src/class/pmix_list.h" #include "src/pmix/pmix-internal.h" +#include "src/prted/pmix/pmix_server_internal.h" #include "src/mca/errmgr/errmgr.h" #include "src/rml/rml.h" #include "src/mca/state/state.h" @@ -163,6 +164,7 @@ static void allgather_recv(int status, pmix_proc_t *sender, int rc, timeout; size_t n, ninfo, memsize, m; bool assignID = false; + bool found; pmix_proc_t *addmembers = NULL; size_t num_members = 0; prte_namelist_t *nm; @@ -183,8 +185,8 @@ static void allgather_recv(int status, pmix_proc_t *sender, /* unpack the signature */ rc = prte_grpcomm_sig_unpack(buffer, &sig); - if (PMIX_SUCCESS != rc) { - PMIX_ERROR_LOG(rc); + if (PRTE_SUCCESS != rc) { + PRTE_ERROR_LOG(rc); } /* check for the tracker and create it if not found */ @@ -271,12 +273,25 @@ static void allgather_recv(int status, pmix_proc_t *sender, /* update the info with the collected value */ info[n].value.type = PMIX_BOOL; info[n].value.data.flag = coll->assignID; - } else if (PMIX_CHECK_KEY(&info[n], PMIX_GROUP_ADD_MEMBERS)) { - addmembers = (pmix_proc_t*)info[n].value.data.darray->array; - num_members = info[n].value.data.darray->size; - for (m=0; m < num_members; m++) { + } + } + + // check for any added members + if (NULL != sig->addmembers) { + // add them to the global collective + for (m=0; m < sig->nmembers; m++) { + // check to see if we already have this member + found = false; + PMIX_LIST_FOREACH(nm, &coll->addmembers, prte_namelist_t) { + if (PMIX_CHECK_PROCID(&nm->name, &sig->addmembers[m])) { + // already have it + found = true; + break; + } + } + if (!found) { nm = PMIX_NEW(prte_namelist_t); - PMIX_XFER_PROCID(&nm->name, &addmembers[m]); + PMIX_XFER_PROCID(&nm->name, &sig->addmembers[m]); pmix_list_append(&coll->addmembers, &nm->super); } } @@ -305,8 +320,16 @@ static void allgather_recv(int status, pmix_proc_t *sender, PRTE_NAME_PRINT(PRTE_PROC_MY_NAME))); /* the allgather is complete - send the xcast */ PMIX_DATA_BUFFER_CREATE(reply); + + /* if we were asked to provide a context id, do so */ + if (assignID) { + coll->sig->ctxid = prte_grpcomm_base.context_id; + --prte_grpcomm_base.context_id; + coll->sig->ctxid_assigned = true; + } + /* pack the signature */ - rc = prte_grpcomm_sig_pack(reply, sig); + rc = prte_grpcomm_sig_pack(reply, coll->sig); if (PMIX_SUCCESS != rc) { PMIX_ERROR_LOG(rc); PMIX_DATA_BUFFER_RELEASE(reply); @@ -324,12 +347,9 @@ static void allgather_recv(int status, pmix_proc_t *sender, /* add some values to the payload in the bucket */ PMIX_DATA_BUFFER_CONSTRUCT(&ctrlbuf); - /* if we were asked to provide a context id, do so */ + /* if we assigned a context id, include it in the bucket as well */ if (assignID) { - size_t sz; - sz = prte_grpcomm_base.context_id; - --prte_grpcomm_base.context_id; - PMIX_INFO_LOAD(&infostat, PMIX_GROUP_CONTEXT_ID, &sz, PMIX_SIZE); + PMIX_INFO_LOAD(&infostat, PMIX_GROUP_CONTEXT_ID, &coll->sig->ctxid, PMIX_SIZE); rc = PMIx_Data_pack(NULL, &ctrlbuf, &infostat, 1, PMIX_INFO); PMIX_INFO_DESTRUCT(&infostat); if (PMIX_SUCCESS != rc) { @@ -383,6 +403,7 @@ static void allgather_recv(int status, pmix_proc_t *sender, PMIX_RELEASE(sig); return; } + /* send the release via xcast */ (void) prte_grpcomm.xcast(sig, PRTE_RML_TAG_COLL_RELEASE, reply); } else { @@ -668,6 +689,14 @@ static void xcast_recv(int status, pmix_proc_t *sender, PMIX_DATA_BUFFER_DESTRUCT(&datbuf); } +static void opcbfunc(pmix_status_t status, void *cbdata) +{ + prte_pmix_server_op_caddy_t *cd = (prte_pmix_server_op_caddy_t*)cbdata; + + PMIX_INFO_FREE(cd->info, cd->ninfo); + PMIX_RELEASE(cd); +} + static void barrier_release(int status, pmix_proc_t *sender, pmix_data_buffer_t *buffer, prte_rml_tag_t tag, void *cbdata) @@ -676,6 +705,9 @@ static void barrier_release(int status, pmix_proc_t *sender, int rc, ret; prte_grpcomm_signature_t *sig = NULL; prte_grpcomm_coll_t *coll; + pmix_data_array_t darray; + prte_pmix_server_op_caddy_t *cd; + void *values; PRTE_HIDE_UNUSED_PARAMS(status, sender, tag, cbdata); PMIX_OUTPUT_VERBOSE((5, prte_grpcomm_base_framework.framework_output, @@ -698,6 +730,49 @@ static void barrier_release(int status, pmix_proc_t *sender, return; } + // if there are added members, notify any that are local to us + if (NULL != sig->addmembers) { + // we have to handle these even if we did not participate in + // the collective as an added member might be on a non-participating + // node. Start by providing the overall membership + cd = PMIX_NEW(prte_pmix_server_op_caddy_t); + PMIX_INFO_LIST_START(values); + + /* protect against infinite loops by marking that this notification was + * passed down to the server by me */ + PMIX_INFO_LIST_ADD(rc, values, "prte.notify.donotloop", NULL, PMIX_BOOL); + // add the context ID, if one was given + if (sig->ctxid_assigned) { + PMIX_INFO_LIST_ADD(rc, values, PMIX_GROUP_CONTEXT_ID, &sig->ctxid, PMIX_SIZE); + } + // load the membership + cd->nprocs = sig->sz + sig->nmembers; + PMIX_PROC_CREATE(cd->procs, cd->nprocs); + memcpy(cd->procs, sig->signature, sig->sz * sizeof(pmix_proc_t)); + memcpy(&cd->procs[sig->sz], sig->addmembers, sig->nmembers * sizeof(pmix_proc_t)); + darray.type = PMIX_PROC; + darray.array = cd->procs; + darray.size = cd->nprocs; + // load the array - note: this copies the array! + PMIX_INFO_LIST_ADD(rc, values, PMIX_GROUP_MEMBERSHIP, &darray, PMIX_DATA_ARRAY); + PMIX_PROC_FREE(cd->procs, cd->nprocs); + // provide the group ID + PMIX_INFO_LIST_ADD(rc, values, PMIX_GROUP_ID, sig->groupID, PMIX_STRING); + // set the range to be only procs that were added + darray.array = sig->addmembers; + darray.size = sig->nmembers; + // load the array - note: this copies the array! + PMIX_INFO_LIST_ADD(rc, values, PMIX_EVENT_CUSTOM_RANGE, &darray, PMIX_DATA_ARRAY); + // convert the list to an array + PMIX_INFO_LIST_CONVERT(rc, values, &darray); + cd->info = (pmix_info_t*)darray.array; + cd->ninfo = darray.size; + PMIX_INFO_LIST_RELEASE(values); + // notify local procs + PMIx_Notify_event(PMIX_GROUP_INVITED, PRTE_PROC_MY_NAME, PMIX_RANGE_CUSTOM, + cd->info, cd->ninfo, opcbfunc, cd); + } + /* check for the tracker - it is not an error if not * found as that just means we wre not involved * in the collective */ diff --git a/src/mca/grpcomm/grpcomm.h b/src/mca/grpcomm/grpcomm.h index a423533a70..7c34500b25 100644 --- a/src/mca/grpcomm/grpcomm.h +++ b/src/mca/grpcomm/grpcomm.h @@ -64,8 +64,12 @@ typedef int (*prte_grpcomm_rbcast_cb_t)(pmix_data_buffer_t *buffer); typedef struct { pmix_object_t super; char *groupID; + size_t ctxid; + bool ctxid_assigned; pmix_proc_t *signature; size_t sz; + pmix_proc_t *addmembers; + size_t nmembers; } prte_grpcomm_signature_t; PRTE_EXPORT PMIX_CLASS_DECLARATION(prte_grpcomm_signature_t); diff --git a/src/mca/state/dvm/state_dvm.c b/src/mca/state/dvm/state_dvm.c index 1b3da8d84d..c522fe8e4f 100644 --- a/src/mca/state/dvm/state_dvm.c +++ b/src/mca/state/dvm/state_dvm.c @@ -320,6 +320,7 @@ static void vm_ready(int fd, short args, void *cbdata) } /* goes to all daemons */ + PMIX_CONSTRUCT(&sig, prte_grpcomm_signature_t); PMIX_PROC_CREATE(sig.signature, 1); PMIX_LOAD_PROCID(&sig.signature[0], PRTE_PROC_MY_NAME->nspace, PMIX_RANK_WILDCARD); sig.sz = 1; diff --git a/src/prted/pmix/Makefile.am b/src/prted/pmix/Makefile.am index b3edd789f1..a046447a01 100644 --- a/src/prted/pmix/Makefile.am +++ b/src/prted/pmix/Makefile.am @@ -22,4 +22,5 @@ libprrte_la_SOURCES += \ prted/pmix/pmix_server_gen.c \ prted/pmix/pmix_server_queries.c \ prted/pmix/pmix_server_allocate.c \ - prted/pmix/pmix_server_session.c + prted/pmix/pmix_server_session.c \ + prted/pmix/pmix_server_group.c diff --git a/src/prted/pmix/pmix_server_gen.c b/src/prted/pmix/pmix_server_gen.c index a8e7a97d80..7e0f60f19c 100644 --- a/src/prted/pmix/pmix_server_gen.c +++ b/src/prted/pmix/pmix_server_gen.c @@ -1004,279 +1004,6 @@ pmix_status_t pmix_server_job_ctrl_fn(const pmix_proc_t *requestor, const pmix_p return PMIX_ERR_NOT_SUPPORTED; } -static void relcb(void *cbdata) -{ - prte_pmix_mdx_caddy_t *cd = (prte_pmix_mdx_caddy_t *) cbdata; - - if (NULL != cd->info) { - PMIX_INFO_FREE(cd->info, cd->ninfo); - } - PMIX_RELEASE(cd); -} -static void group_release(int status, pmix_data_buffer_t *buf, void *cbdata) -{ - prte_pmix_mdx_caddy_t *cd = (prte_pmix_mdx_caddy_t *) cbdata; - int32_t cnt; - int rc = PRTE_SUCCESS; - pmix_status_t ret; - bool assignedID = false; - bool procsadded = false; - size_t cid; - pmix_proc_t *procs, *members; - size_t n, num_members; - pmix_data_array_t darray; - pmix_info_t info; - pmix_data_buffer_t dbuf; - pmix_byte_object_t bo; - int32_t byused; - pmix_server_pset_t *pset; - - PMIX_ACQUIRE_OBJECT(cd); - - pmix_output_verbose(2, prte_pmix_server_globals.output, - "%s group request complete", - PRTE_NAME_PRINT(PRTE_PROC_MY_NAME)); - - if (PRTE_SUCCESS != status) { - rc = status; - goto complete; - } - - /* if this was a destruct operation, then there is nothing - * further we need do */ - if (PMIX_GROUP_DESTRUCT == cd->op) { - /* find this group ID on our list of groups */ - PMIX_LIST_FOREACH(pset, &prte_pmix_server_globals.groups, pmix_server_pset_t) - { - if (0 == strcmp(pset->name, cd->grpid)) { - pmix_list_remove_item(&prte_pmix_server_globals.groups, &pset->super); - PMIX_RELEASE(pset); - break; - } - } - rc = status; - goto complete; - } - - /* check for any directives */ - cnt = 1; - rc = PMIx_Data_unpack(NULL, buf, &bo, &cnt, PMIX_BYTE_OBJECT); - if (PMIX_SUCCESS != rc) { - PMIX_ERROR_LOG(rc); - goto complete; - } - PMIX_DATA_BUFFER_CONSTRUCT(&dbuf); - PMIX_DATA_BUFFER_LOAD(&dbuf, bo.bytes, bo.size); - - cd->ninfo = 2; - cnt = 1; - rc = PMIx_Data_unpack(NULL, &dbuf, &info, &cnt, PMIX_INFO); - while (PMIX_SUCCESS == rc) { - if (PMIX_CHECK_KEY(&info, PMIX_GROUP_CONTEXT_ID)) { - PMIX_VALUE_GET_NUMBER(rc, &info.value, cid, size_t); - if (PMIX_SUCCESS != rc) { - PMIX_ERROR_LOG(rc); - cd->ninfo = 0; - PMIX_DATA_BUFFER_DESTRUCT(&dbuf); - goto complete; - } - assignedID = true; - cd->ninfo++; - } else if (PMIX_CHECK_KEY(&info, PMIX_GROUP_ADD_MEMBERS)) { - members = (pmix_proc_t*)info.value.data.darray->array; - num_members = info.value.data.darray->size; - PMIX_PROC_CREATE(procs, cd->nprocs + num_members); - for (n=0; n < cd->nprocs; n++) { - PMIX_XFER_PROCID(&procs[n], &cd->procs[n]); - } - for (n=0; n < num_members; n++) { - PMIX_XFER_PROCID(&procs[n+cd->nprocs], &members[n]); - } - PMIX_PROC_FREE(cd->procs, cd->nprocs); - cd->procs = procs; - cd->nprocs += num_members; - procsadded = true; - } - /* cleanup */ - PMIX_INFO_DESTRUCT(&info); - /* get the next object */ - cnt = 1; - rc = PMIx_Data_unpack(NULL, &dbuf, &info, &cnt, PMIX_INFO); - } - PMIX_DATA_BUFFER_DESTRUCT(&dbuf); - /* the unpacking loop will have ended when the unpack either - * went past the end of the buffer */ - if (PMIX_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) { - PMIX_ERROR_LOG(rc); - goto complete; - } - rc = PMIX_SUCCESS; - - if (PMIX_GROUP_CONSTRUCT == cd->op) { - /* add it to our list of known groups */ - pset = PMIX_NEW(pmix_server_pset_t); - pset->name = strdup(cd->grpid); - pset->num_members = cd->nprocs; - PMIX_PROC_CREATE(pset->members, pset->num_members); - memcpy(pset->members, cd->procs, cd->nprocs * sizeof(pmix_proc_t)); - pmix_list_append(&prte_pmix_server_globals.groups, &pset->super); - } - - /* if anything is left in the buffer, then it is - * modex data that needs to be stored */ - PMIX_BYTE_OBJECT_CONSTRUCT(&bo); - byused = buf->bytes_used - (buf->unpack_ptr - buf->base_ptr); - if (0 < byused) { - bo.bytes = buf->unpack_ptr; - bo.size = byused; - } - if (NULL != bo.bytes && 0 < bo.size) { - cd->ninfo++; - } - - PMIX_INFO_CREATE(cd->info, cd->ninfo); - n = 0; - // pass back the final group membership - darray.type = PMIX_PROC; - darray.array = cd->procs; - darray.size = cd->nprocs; - PMIX_INFO_LOAD(&cd->info[n], PMIX_GROUP_MEMBERSHIP, &darray, PMIX_DATA_ARRAY); - PMIX_PROC_FREE(cd->procs, cd->nprocs); - ++n; - if (assignedID) { - PMIX_INFO_LOAD(&cd->info[n], PMIX_GROUP_CONTEXT_ID, &cid, PMIX_SIZE); - ++n; - } - if (NULL != bo.bytes && 0 < bo.size) { - PMIX_INFO_LOAD(&cd->info[n], PMIX_GROUP_ENDPT_DATA, &bo, PMIX_BYTE_OBJECT); - } - -complete: - ret = prte_pmix_convert_rc(rc); - /* return to the local procs in the collective */ - if (NULL != cd->infocbfunc) { - cd->infocbfunc(ret, cd->info, cd->ninfo, cd->cbdata, relcb, cd); - } else { - if (NULL != cd->info) { - PMIX_INFO_FREE(cd->info, cd->ninfo); - } - PMIX_RELEASE(cd); - } -} - -pmix_status_t pmix_server_group_fn(pmix_group_operation_t op, char *grpid, - const pmix_proc_t procs[], size_t nprocs, - const pmix_info_t directives[], size_t ndirs, - pmix_info_cbfunc_t cbfunc, void *cbdata) -{ - prte_pmix_mdx_caddy_t *cd; - int rc; - size_t i; - bool assignID = false; - pmix_server_pset_t *pset; - bool fence = false; - bool force_local = false; - pmix_byte_object_t *bo = NULL; - struct timeval tv = {0, 0}; - - pmix_output_verbose(2, prte_pmix_server_globals.output, - "%s group request recvd", - PRTE_NAME_PRINT(PRTE_PROC_MY_NAME)); - - /* they are required to pass us an id */ - if (NULL == grpid) { - return PMIX_ERR_BAD_PARAM; - } - - /* check the directives */ - for (i = 0; i < ndirs; i++) { - /* see if they want a context id assigned */ - if (PMIX_CHECK_KEY(&directives[i], PMIX_GROUP_ASSIGN_CONTEXT_ID)) { - assignID = PMIX_INFO_TRUE(&directives[i]); - } else if (PMIX_CHECK_KEY(&directives[i], PMIX_EMBED_BARRIER)) { - fence = PMIX_INFO_TRUE(&directives[i]); - } else if (PMIX_CHECK_KEY(&directives[i], PMIX_GROUP_ENDPT_DATA)) { - bo = (pmix_byte_object_t *) &directives[i].value.data.bo; - } else if (PMIX_CHECK_KEY(&directives[i], PMIX_TIMEOUT)) { - tv.tv_sec = directives[i].value.data.uint32; - } else if (PMIX_CHECK_KEY(&directives[i], PMIX_GROUP_LOCAL_ONLY)) { - force_local = PMIX_INFO_TRUE(&directives[i]); - } - } - - /* if they don't want us to do a fence and they don't want a - * context id assigned, or they insist on forcing local - * completion of the operation, then we are done */ - if ((!fence && !assignID) || force_local) { - pmix_output_verbose(2, prte_pmix_server_globals.output, - "%s group request - purely local", - PRTE_NAME_PRINT(PRTE_PROC_MY_NAME)); - if (PMIX_GROUP_CONSTRUCT == op) { - /* add it to our list of known groups */ - pset = PMIX_NEW(pmix_server_pset_t); - pset->name = strdup(grpid); - pset->num_members = nprocs; - PMIX_PROC_CREATE(pset->members, pset->num_members); - memcpy(pset->members, procs, nprocs * sizeof(pmix_proc_t)); - pmix_list_append(&prte_pmix_server_globals.groups, &pset->super); - } else if (PMIX_GROUP_DESTRUCT == op) { - /* find this group ID on our list of groups */ - PMIX_LIST_FOREACH(pset, &prte_pmix_server_globals.groups, pmix_server_pset_t) - { - if (0 == strcmp(pset->name, grpid)) { - pmix_list_remove_item(&prte_pmix_server_globals.groups, &pset->super); - PMIX_RELEASE(pset); - break; - } - } - } - return PMIX_OPERATION_SUCCEEDED; - } - - cd = PMIX_NEW(prte_pmix_mdx_caddy_t); - cd->op = op; - cd->grpid = strdup(grpid); - /* have to copy the procs in case we add members */ - PMIX_PROC_CREATE(cd->procs, nprocs); - memcpy(cd->procs, procs, nprocs * sizeof(pmix_proc_t)); - cd->nprocs = nprocs; - cd->grpcbfunc = group_release; - cd->infocbfunc = cbfunc; - cd->cbdata = cbdata; - - /* compute the signature of this collective */ - cd->sig = PMIX_NEW(prte_grpcomm_signature_t); - cd->sig->groupID = strdup(grpid); - if (NULL != procs) { - cd->sig->sz = nprocs; - cd->sig->signature = (pmix_proc_t *) malloc(cd->sig->sz * sizeof(pmix_proc_t)); - memcpy(cd->sig->signature, procs, cd->sig->sz * sizeof(pmix_proc_t)); - } - /* setup the ctrls blob - this will include any "add_members" directive */ - rc = prte_pack_ctrl_options(&cd->ctrls, directives, ndirs); - if (PMIX_SUCCESS != rc) { - PMIX_RELEASE(cd); - return rc; - } - PMIX_DATA_BUFFER_CREATE(cd->buf); - /* if they provided us with a data blob, send it along */ - if (NULL != bo) { - /* We don't own the byte_object and so we have to - * copy it here */ - rc = PMIx_Data_embed(cd->buf, bo); - if (PMIX_SUCCESS != rc) { - PMIX_ERROR_LOG(rc); - } - } - /* pass it to the global collective algorithm */ - if (PRTE_SUCCESS != (rc = prte_grpcomm.allgather(cd))) { - PRTE_ERROR_LOG(rc); - PMIX_RELEASE(cd); - return PMIX_ERROR; - } - return PMIX_SUCCESS; -} - pmix_status_t pmix_server_iof_pull_fn(const pmix_proc_t procs[], size_t nprocs, const pmix_info_t directives[], size_t ndirs, pmix_iof_channel_t channels, pmix_op_cbfunc_t cbfunc, diff --git a/src/prted/pmix/pmix_server_group.c b/src/prted/pmix/pmix_server_group.c new file mode 100644 index 0000000000..4dd6936878 --- /dev/null +++ b/src/prted/pmix/pmix_server_group.c @@ -0,0 +1,377 @@ +/* + * Copyright (c) 2004-2010 The Trustees of Indiana University and Indiana + * University Research and Technology + * Corporation. All rights reserved. + * Copyright (c) 2004-2011 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2005 The Regents of the University of California. + * All rights reserved. + * Copyright (c) 2006-2013 Los Alamos National Security, LLC. + * All rights reserved. + * Copyright (c) 2009-2020 Cisco Systems, Inc. All rights reserved + * Copyright (c) 2011 Oak Ridge National Labs. All rights reserved. + * Copyright (c) 2013-2020 Intel, Inc. All rights reserved. + * Copyright (c) 2014-2017 Mellanox Technologies, Inc. + * All rights reserved. + * Copyright (c) 2014-2019 Research Organization for Information Science + * and Technology (RIST). All rights reserved. + * Copyright (c) 2020 IBM Corporation. All rights reserved. + * Copyright (c) 2021-2024 Nanook Consulting All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + * + */ + +#include "prte_config.h" + +#ifdef HAVE_UNISTD_H +# include +#endif + +#include "src/hwloc/hwloc-internal.h" +#include "src/pmix/pmix-internal.h" +#include "src/util/pmix_argv.h" +#include "src/util/pmix_output.h" + +#include "src/mca/errmgr/errmgr.h" +#include "src/mca/grpcomm/base/base.h" +#include "src/mca/iof/base/base.h" +#include "src/mca/iof/iof.h" +#include "src/mca/plm/base/plm_private.h" +#include "src/mca/plm/plm.h" +#include "src/mca/plm/base/plm_private.h" +#include "src/mca/rmaps/rmaps_types.h" +#include "src/rml/rml.h" +#include "src/mca/schizo/schizo.h" +#include "src/mca/state/state.h" +#include "src/runtime/prte_globals.h" +#include "src/runtime/prte_locks.h" +#include "src/threads/pmix_threads.h" +#include "src/util/name_fns.h" +#include "src/util/pmix_show_help.h" + +#include "src/prted/pmix/pmix_server_internal.h" + +static void relcb(void *cbdata) +{ + prte_pmix_mdx_caddy_t *cd = (prte_pmix_mdx_caddy_t *) cbdata; + + if (NULL != cd->info) { + PMIX_INFO_FREE(cd->info, cd->ninfo); + } + PMIX_RELEASE(cd); +} +static void group_release(int status, pmix_data_buffer_t *buf, void *cbdata) +{ + prte_pmix_mdx_caddy_t *cd = (prte_pmix_mdx_caddy_t *) cbdata; + int32_t cnt; + int rc = PRTE_SUCCESS; + pmix_status_t ret; + bool assignedID = false; + bool procsadded = false; + size_t cid; + pmix_proc_t *procs, *members; + size_t n, num_members; + pmix_data_array_t darray; + pmix_info_t info; + pmix_data_buffer_t dbuf; + pmix_byte_object_t bo; + int32_t byused; + pmix_server_pset_t *pset; + + PMIX_ACQUIRE_OBJECT(cd); + + pmix_output_verbose(2, prte_pmix_server_globals.output, + "%s group request complete", + PRTE_NAME_PRINT(PRTE_PROC_MY_NAME)); + + if (PRTE_SUCCESS != status) { + rc = status; + goto complete; + } + + /* if this was a destruct operation, then there is nothing + * further we need do */ + if (PMIX_GROUP_DESTRUCT == cd->op) { + /* find this group ID on our list of groups */ + PMIX_LIST_FOREACH(pset, &prte_pmix_server_globals.groups, pmix_server_pset_t) + { + if (0 == strcmp(pset->name, cd->grpid)) { + pmix_list_remove_item(&prte_pmix_server_globals.groups, &pset->super); + PMIX_RELEASE(pset); + break; + } + } + rc = status; + goto complete; + } + + /* check for any directives */ + cnt = 1; + rc = PMIx_Data_unpack(NULL, buf, &bo, &cnt, PMIX_BYTE_OBJECT); + if (PMIX_SUCCESS != rc) { + PMIX_ERROR_LOG(rc); + goto complete; + } + PMIX_DATA_BUFFER_CONSTRUCT(&dbuf); + PMIX_DATA_BUFFER_LOAD(&dbuf, bo.bytes, bo.size); + + cd->ninfo = 2; + cnt = 1; + rc = PMIx_Data_unpack(NULL, &dbuf, &info, &cnt, PMIX_INFO); + while (PMIX_SUCCESS == rc) { + if (PMIX_CHECK_KEY(&info, PMIX_GROUP_CONTEXT_ID)) { + PMIX_VALUE_GET_NUMBER(rc, &info.value, cid, size_t); + if (PMIX_SUCCESS != rc) { + PMIX_ERROR_LOG(rc); + cd->ninfo = 0; + PMIX_DATA_BUFFER_DESTRUCT(&dbuf); + goto complete; + } + assignedID = true; + cd->ninfo++; + } else if (PMIX_CHECK_KEY(&info, PMIX_GROUP_ADD_MEMBERS)) { + members = (pmix_proc_t*)info.value.data.darray->array; + num_members = info.value.data.darray->size; + PMIX_PROC_CREATE(procs, cd->nprocs + num_members); + for (n=0; n < cd->nprocs; n++) { + PMIX_XFER_PROCID(&procs[n], &cd->procs[n]); + } + for (n=0; n < num_members; n++) { + PMIX_XFER_PROCID(&procs[n+cd->nprocs], &members[n]); + } + PMIX_PROC_FREE(cd->procs, cd->nprocs); + cd->procs = procs; + cd->nprocs += num_members; + procsadded = true; + } + /* cleanup */ + PMIX_INFO_DESTRUCT(&info); + /* get the next object */ + cnt = 1; + rc = PMIx_Data_unpack(NULL, &dbuf, &info, &cnt, PMIX_INFO); + } + PMIX_DATA_BUFFER_DESTRUCT(&dbuf); + + /* the unpacking loop will have ended when the unpack either + * went past the end of the buffer */ + if (PMIX_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) { + PMIX_ERROR_LOG(rc); + goto complete; + } + rc = PMIX_SUCCESS; + + if (PMIX_GROUP_CONSTRUCT == cd->op) { + /* add it to our list of known groups */ + pset = PMIX_NEW(pmix_server_pset_t); + pset->name = strdup(cd->grpid); + pset->num_members = cd->nprocs; + PMIX_PROC_CREATE(pset->members, pset->num_members); + memcpy(pset->members, cd->procs, cd->nprocs * sizeof(pmix_proc_t)); + pmix_list_append(&prte_pmix_server_globals.groups, &pset->super); + } + + /* if anything is left in the buffer, then it is + * modex data that needs to be stored */ + PMIX_BYTE_OBJECT_CONSTRUCT(&bo); + byused = buf->bytes_used - (buf->unpack_ptr - buf->base_ptr); + if (0 < byused) { + bo.bytes = buf->unpack_ptr; + bo.size = byused; + } + if (NULL != bo.bytes && 0 < bo.size) { + cd->ninfo++; + } + + PMIX_INFO_CREATE(cd->info, cd->ninfo); + n = 0; + // pass back the final group membership + darray.type = PMIX_PROC; + darray.array = cd->procs; + darray.size = cd->nprocs; + PMIX_INFO_LOAD(&cd->info[n], PMIX_GROUP_MEMBERSHIP, &darray, PMIX_DATA_ARRAY); + PMIX_PROC_FREE(cd->procs, cd->nprocs); + ++n; + if (assignedID) { + PMIX_INFO_LOAD(&cd->info[n], PMIX_GROUP_CONTEXT_ID, &cid, PMIX_SIZE); + ++n; + } + if (NULL != bo.bytes && 0 < bo.size) { + PMIX_INFO_LOAD(&cd->info[n], PMIX_GROUP_ENDPT_DATA, &bo, PMIX_BYTE_OBJECT); + } + +complete: + ret = prte_pmix_convert_rc(rc); + /* return to the local procs in the collective */ + if (NULL != cd->infocbfunc) { + cd->infocbfunc(ret, cd->info, cd->ninfo, cd->cbdata, relcb, cd); + } else { + if (NULL != cd->info) { + PMIX_INFO_FREE(cd->info, cd->ninfo); + } + PMIX_RELEASE(cd); + } +} + +pmix_status_t pmix_server_group_fn(pmix_group_operation_t op, char *grpid, + const pmix_proc_t procs[], size_t nprocs, + const pmix_info_t directives[], size_t ndirs, + pmix_info_cbfunc_t cbfunc, void *cbdata) +{ + prte_pmix_mdx_caddy_t *cd; + int rc; + size_t i; + bool assignID = false; + pmix_server_pset_t *pset; + bool fence = false; + bool force_local = false; + pmix_proc_t *members = NULL; + pmix_proc_t *mbrs, *p; + size_t num_members = 0; + size_t nmembers; + bool copied = false; + pmix_byte_object_t *bo = NULL; + struct timeval tv = {0, 0}; + + pmix_output_verbose(2, prte_pmix_server_globals.output, + "%s Group request recvd with %u directives", + PRTE_NAME_PRINT(PRTE_PROC_MY_NAME), ndirs); + + /* they are required to pass us an id */ + if (NULL == grpid) { + return PMIX_ERR_BAD_PARAM; + } + + /* check the directives */ + for (i = 0; i < ndirs; i++) { + /* see if they want a context id assigned */ + if (PMIX_CHECK_KEY(&directives[i], PMIX_GROUP_ASSIGN_CONTEXT_ID)) { + assignID = PMIX_INFO_TRUE(&directives[i]); + } else if (PMIX_CHECK_KEY(&directives[i], PMIX_EMBED_BARRIER)) { + fence = PMIX_INFO_TRUE(&directives[i]); + } else if (PMIX_CHECK_KEY(&directives[i], PMIX_GROUP_ENDPT_DATA)) { + bo = (pmix_byte_object_t *) &directives[i].value.data.bo; + } else if (PMIX_CHECK_KEY(&directives[i], PMIX_TIMEOUT)) { + tv.tv_sec = directives[i].value.data.uint32; + } else if (PMIX_CHECK_KEY(&directives[i], PMIX_GROUP_LOCAL_ONLY)) { + force_local = PMIX_INFO_TRUE(&directives[i]); + } else if (PMIX_CHECK_KEY(&directives[i], PMIX_GROUP_ADD_MEMBERS)) { + // there can be more than one entry here as this is the aggregate + // of info keys from local procs that called group_construct + if (NULL == members) { + members = (pmix_proc_t*)directives[i].value.data.darray->array; + num_members = directives[i].value.data.darray->size; + } else { + copied = true; + // need to aggregate these + mbrs = (pmix_proc_t*)directives[i].value.data.darray->array; + nmembers = directives[i].value.data.darray->size; + // create a new array + PMIX_PROC_CREATE(p, nmembers * num_members); + // xfer data across + memcpy(p, members, num_members * sizeof(pmix_proc_t)); + memcpy(&p[num_members], mbrs, nmembers * sizeof(pmix_proc_t)); + // release the old array + PMIX_PROC_FREE(members, num_members); + // complete the xfer + members = p; + num_members = num_members + nmembers; + } + } + } + + /* if they don't want us to do a fence and they don't want a + * context id assigned and they aren't adding members, or they + * insist on forcing local completion of the operation, then + * we are done */ + if ((!fence && !assignID && NULL == members) || force_local) { + pmix_output_verbose(2, prte_pmix_server_globals.output, + "%s group request - purely local", + PRTE_NAME_PRINT(PRTE_PROC_MY_NAME)); + if (PMIX_GROUP_CONSTRUCT == op) { + /* add it to our list of known groups */ + pset = PMIX_NEW(pmix_server_pset_t); + pset->name = strdup(grpid); + if (NULL != members) {} + pset->num_members = nprocs; + if (NULL != members) { + pset->num_members += num_members; + } + PMIX_PROC_CREATE(pset->members, pset->num_members); + memcpy(pset->members, procs, nprocs * sizeof(pmix_proc_t)); + if (NULL != members) { + memcpy(&pset->members[nprocs], members, num_members * sizeof(pmix_proc_t)); + } + pmix_list_append(&prte_pmix_server_globals.groups, &pset->super); + } else if (PMIX_GROUP_DESTRUCT == op) { + /* find this group ID on our list of groups */ + PMIX_LIST_FOREACH(pset, &prte_pmix_server_globals.groups, pmix_server_pset_t) + { + if (0 == strcmp(pset->name, grpid)) { + pmix_list_remove_item(&prte_pmix_server_globals.groups, &pset->super); + PMIX_RELEASE(pset); + break; + } + } + } + return PMIX_OPERATION_SUCCEEDED; + } + + cd = PMIX_NEW(prte_pmix_mdx_caddy_t); + cd->op = op; + cd->grpid = strdup(grpid); + /* have to copy the procs in case we add members */ + PMIX_PROC_CREATE(cd->procs, nprocs); + memcpy(cd->procs, procs, nprocs * sizeof(pmix_proc_t)); + cd->nprocs = nprocs; + cd->grpcbfunc = group_release; + cd->infocbfunc = cbfunc; + cd->cbdata = cbdata; + + /* compute the signature of this collective */ + cd->sig = PMIX_NEW(prte_grpcomm_signature_t); + cd->sig->groupID = strdup(grpid); + if (NULL != procs) { + cd->sig->sz = nprocs; + cd->sig->signature = (pmix_proc_t *) malloc(cd->sig->sz * sizeof(pmix_proc_t)); + memcpy(cd->sig->signature, procs, cd->sig->sz * sizeof(pmix_proc_t)); + } + if (NULL != members) { + cd->sig->nmembers = num_members; + if (copied) { + cd->sig->addmembers = members; + } else { + cd->sig->addmembers = (pmix_proc_t *) malloc(num_members * sizeof(pmix_proc_t)); + memcpy(cd->sig->addmembers, members, num_members * sizeof(pmix_proc_t)); + } + } + /* setup the ctrls blob - this will include any "add_members" directive */ + rc = prte_pack_ctrl_options(&cd->ctrls, directives, ndirs); + if (PMIX_SUCCESS != rc) { + PMIX_RELEASE(cd); + return rc; + } + PMIX_DATA_BUFFER_CREATE(cd->buf); + /* if they provided us with a data blob, send it along */ + if (NULL != bo) { + /* We don't own the byte_object and so we have to + * copy it here */ + rc = PMIx_Data_embed(cd->buf, bo); + if (PMIX_SUCCESS != rc) { + PMIX_ERROR_LOG(rc); + } + } + /* pass it to the global collective algorithm */ + if (PRTE_SUCCESS != (rc = prte_grpcomm.allgather(cd))) { + PRTE_ERROR_LOG(rc); + PMIX_RELEASE(cd); + return PMIX_ERROR; + } + return PMIX_SUCCESS; +} diff --git a/src/rml/rml_types.h b/src/rml/rml_types.h index 91dafb4cd2..a22975e9d1 100644 --- a/src/rml/rml_types.h +++ b/src/rml/rml_types.h @@ -191,41 +191,41 @@ typedef void (*prte_rml_buffer_callback_fn_t)(int status, pmix_proc_t *peer, #define PRTE_RML_TAG_CLOSE_CHANNEL_ACCEPT 58 /* error notifications */ -#define PRTE_RML_TAG_NOTIFICATION 59 +#define PRTE_RML_TAG_NOTIFICATION 59 /* stacktrace for debug */ -#define PRTE_RML_TAG_STACK_TRACE 60 +#define PRTE_RML_TAG_STACK_TRACE 60 /* memory profile */ -#define PRTE_RML_TAG_MEMPROFILE 61 +#define PRTE_RML_TAG_MEMPROFILE 61 /* topology report */ -#define PRTE_RML_TAG_TOPOLOGY_REPORT 62 +#define PRTE_RML_TAG_TOPOLOGY_REPORT 62 /* warmup connection - simply establishes the connection */ -#define PRTE_RML_TAG_WARMUP_CONNECTION 63 +#define PRTE_RML_TAG_WARMUP_CONNECTION 63 /* node regex report */ -#define PRTE_RML_TAG_NODE_REGEX_REPORT 64 +#define PRTE_RML_TAG_NODE_REGEX_REPORT 64 /* pmix log requests */ -#define PRTE_RML_TAG_LOGGING 65 +#define PRTE_RML_TAG_LOGGING 65 /* error propagate */ -#define PRTE_RML_TAG_RBCAST 66 +#define PRTE_RML_TAG_RBCAST 66 /* heartbeat request */ -#define PRTE_RML_TAG_HEARTBEAT_REQUEST 70 +#define PRTE_RML_TAG_HEARTBEAT_REQUEST 70 /* error propagate */ -#define PRTE_RML_TAG_PROPAGATE 71 +#define PRTE_RML_TAG_PROPAGATE 71 /* scheduler requests */ -#define PRTE_RML_TAG_SCHED 72 -#define PRTE_RML_TAG_SCHED_RESP 73 +#define PRTE_RML_TAG_SCHED 72 +#define PRTE_RML_TAG_SCHED_RESP 73 -#define PRTE_RML_TAG_MAX 100 +#define PRTE_RML_TAG_MAX 100 #define PRTE_RML_TAG_NTOH(t) ntohl(t) #define PRTE_RML_TAG_HTON(t) htonl(t) diff --git a/src/runtime/data_type_support/prte_dt_packing_fns.c b/src/runtime/data_type_support/prte_dt_packing_fns.c index 6e04a4399a..44f72fda4c 100644 --- a/src/runtime/data_type_support/prte_dt_packing_fns.c +++ b/src/runtime/data_type_support/prte_dt_packing_fns.c @@ -622,6 +622,34 @@ int prte_grpcomm_sig_pack(pmix_data_buffer_t *bkt, return prte_pmix_convert_status(rc); } + // pack the context ID, if one was given + rc = PMIx_Data_pack(NULL, bkt, &sig->ctxid_assigned, 1, PMIX_BOOL); + if (PMIX_SUCCESS != rc) { + PMIX_ERROR_LOG(rc); + return prte_pmix_convert_status(rc); + } + if (sig->ctxid_assigned) { + rc = PMIx_Data_pack(NULL, bkt, &sig->ctxid, 1, PMIX_SIZE); + if (PMIX_SUCCESS != rc) { + PMIX_ERROR_LOG(rc); + return prte_pmix_convert_status(rc); + } + } + + // pack added members, if given + rc = PMIx_Data_pack(NULL, bkt, &sig->nmembers, 1, PMIX_SIZE); + if (PMIX_SUCCESS != rc) { + PMIX_ERROR_LOG(rc); + return prte_pmix_convert_status(rc); + } + if (0 < sig->nmembers) { + rc = PMIx_Data_pack(NULL, bkt, sig->addmembers, sig->nmembers, PMIX_PROC); + if (PMIX_SUCCESS != rc) { + PMIX_ERROR_LOG(rc); + return prte_pmix_convert_status(rc); + } + } + if (NULL != sig->groupID) { // add the groupID if one is given rc = PMIx_Data_pack(NULL, bkt, &sig->groupID, 1, PMIX_STRING); diff --git a/src/runtime/data_type_support/prte_dt_unpacking_fns.c b/src/runtime/data_type_support/prte_dt_unpacking_fns.c index a8d750a5c1..fcbf167f6c 100644 --- a/src/runtime/data_type_support/prte_dt_unpacking_fns.c +++ b/src/runtime/data_type_support/prte_dt_unpacking_fns.c @@ -733,6 +733,43 @@ int prte_grpcomm_sig_unpack(pmix_data_buffer_t *buffer, return prte_pmix_convert_status(rc); } + // unpack the context ID, if one was assigned + cnt = 1; + rc = PMIx_Data_unpack(NULL, buffer, &s->ctxid_assigned, &cnt, PMIX_BOOL); + if (PMIX_SUCCESS != rc) { + PMIX_ERROR_LOG(rc); + PMIX_RELEASE(s); + return prte_pmix_convert_status(rc); + } + if (s->ctxid_assigned) { + cnt = 1; + rc = PMIx_Data_unpack(NULL, buffer, &s->ctxid, &cnt, PMIX_SIZE); + if (PMIX_SUCCESS != rc) { + PMIX_ERROR_LOG(rc); + PMIX_RELEASE(s); + return prte_pmix_convert_status(rc); + } + } + + // unpack the added members + cnt = 1; + rc = PMIx_Data_unpack(NULL, buffer, &s->nmembers, &cnt, PMIX_SIZE); + if (PMIX_SUCCESS != rc) { + PMIX_ERROR_LOG(rc); + PMIX_RELEASE(s); + return prte_pmix_convert_status(rc); + } + if (0 < s->nmembers) { + PMIX_PROC_CREATE(s->addmembers, s->nmembers); + cnt = s->nmembers; + rc = PMIx_Data_unpack(NULL, buffer, s->addmembers, &cnt, PMIX_PROC); + if (PMIX_SUCCESS != rc) { + PMIX_ERROR_LOG(rc); + PMIX_RELEASE(s); + return prte_pmix_convert_status(rc); + } + } + // try and unpack the groupID - error is okay, just means // one wasn't provided save = buffer->unpack_ptr; @@ -741,7 +778,6 @@ int prte_grpcomm_sig_unpack(pmix_data_buffer_t *buffer, // ignore the return code if (PMIX_SUCCESS != rc) { buffer->unpack_ptr = save; // reset location for next unpack - rc = PMIX_SUCCESS; } *sig = s;