From cb87ece4ba14658d5089db3552c2a96b1e00310e Mon Sep 17 00:00:00 2001 From: Ralph Castain Date: Mon, 19 Feb 2024 16:22:47 -0700 Subject: [PATCH] Properly invite members being added to a group If members are being added to a PMIx_Group_construct operation, then those members need to be invited to join the group via PMIx event. Properly track that via the grpcomm signature object, including the info required to be passed to the invited members so they have a complete picture of the group. Signed-off-by: Ralph Castain --- src/mca/grpcomm/base/grpcomm_base_frame.c | 7 + src/mca/grpcomm/base/grpcomm_base_stubs.c | 54 ++- src/mca/grpcomm/direct/grpcomm_direct.c | 101 ++++- src/mca/grpcomm/grpcomm.h | 4 + src/mca/state/dvm/state_dvm.c | 1 + src/prted/pmix/Makefile.am | 3 +- src/prted/pmix/pmix_server_gen.c | 273 ------------- src/prted/pmix/pmix_server_group.c | 377 ++++++++++++++++++ src/rml/rml_types.h | 26 +- .../data_type_support/prte_dt_packing_fns.c | 28 ++ .../data_type_support/prte_dt_unpacking_fns.c | 38 +- 11 files changed, 593 insertions(+), 319 deletions(-) create mode 100644 src/prted/pmix/pmix_server_group.c diff --git a/src/mca/grpcomm/base/grpcomm_base_frame.c b/src/mca/grpcomm/base/grpcomm_base_frame.c index 6afc6cd9fd..de89eaee71 100644 --- a/src/mca/grpcomm/base/grpcomm_base_frame.c +++ b/src/mca/grpcomm/base/grpcomm_base_frame.c @@ -127,8 +127,12 @@ PMIX_CLASS_INSTANCE(prte_grpcomm_base_active_t, static void scon(prte_grpcomm_signature_t *p) { p->groupID = NULL; + p->ctxid = 0; + p->ctxid_assigned = false; p->signature = NULL; p->sz = 0; + p->addmembers = NULL; + p->nmembers = 0; } static void sdes(prte_grpcomm_signature_t *p) { @@ -138,6 +142,9 @@ static void sdes(prte_grpcomm_signature_t *p) if (NULL != p->signature) { free(p->signature); } + if (NULL != p->addmembers) { + free(p->addmembers); + } } PMIX_CLASS_INSTANCE(prte_grpcomm_signature_t, pmix_object_t, diff --git a/src/mca/grpcomm/base/grpcomm_base_stubs.c b/src/mca/grpcomm/base/grpcomm_base_stubs.c index 986cf1144e..2d8ed64481 100644 --- a/src/mca/grpcomm/base/grpcomm_base_stubs.c +++ b/src/mca/grpcomm/base/grpcomm_base_stubs.c @@ -195,7 +195,8 @@ prte_grpcomm_coll_t *prte_grpcomm_base_get_tracker(prte_grpcomm_signature_t *sig { prte_grpcomm_coll_t *coll; int rc; - size_t n; + pmix_proc_t *p; + size_t n, nmb; /* search the existing tracker list to see if this already exists - we * default to using the groupID if one is given, otherwise we fallback @@ -210,23 +211,21 @@ prte_grpcomm_coll_t *prte_grpcomm_base_get_tracker(prte_grpcomm_signature_t *sig /* if only one is NULL, then we can't possibly match */ break; } - if (sig->sz == coll->sig->sz) { - if (NULL != sig->groupID) { - // must match groupID's - if (NULL != coll->sig->groupID && 0 == strcmp(sig->groupID, coll->sig->groupID)) { - PMIX_OUTPUT_VERBOSE((1, prte_grpcomm_base_framework.framework_output, - "%s grpcomm:base:returning existing collective", - PRTE_NAME_PRINT(PRTE_PROC_MY_NAME))); - return coll; - } - } else { - // must match proc signature - if (0 == memcmp(sig->signature, coll->sig->signature, sig->sz * sizeof(pmix_proc_t))) { - PMIX_OUTPUT_VERBOSE((1, prte_grpcomm_base_framework.framework_output, - "%s grpcomm:base:returning existing collective", - PRTE_NAME_PRINT(PRTE_PROC_MY_NAME))); - return coll; - } + if (NULL != sig->groupID) { + // must match groupID's + if (NULL != coll->sig->groupID && 0 == strcmp(sig->groupID, coll->sig->groupID)) { + PMIX_OUTPUT_VERBOSE((1, prte_grpcomm_base_framework.framework_output, + "%s grpcomm:base:returning existing collective", + PRTE_NAME_PRINT(PRTE_PROC_MY_NAME))); + goto checkmembers; + } + } else if (sig->sz == coll->sig->sz) { + // must match proc signature + if (0 == memcmp(sig->signature, coll->sig->signature, sig->sz * sizeof(pmix_proc_t))) { + PMIX_OUTPUT_VERBOSE((1, prte_grpcomm_base_framework.framework_output, + "%s grpcomm:base:returning existing collective", + PRTE_NAME_PRINT(PRTE_PROC_MY_NAME))); + goto checkmembers; } } } @@ -270,6 +269,25 @@ prte_grpcomm_coll_t *prte_grpcomm_base_get_tracker(prte_grpcomm_signature_t *sig } } +checkmembers: + // add any addmembers that were given + if (NULL != sig->addmembers) { + if (NULL == coll->sig->addmembers) { + PMIX_PROC_CREATE(coll->sig->addmembers, sig->nmembers); + coll->sig->nmembers = sig->nmembers; + memcpy(coll->sig->addmembers, sig->addmembers, coll->sig->nmembers * sizeof(pmix_proc_t)); + } else { + // aggregate them + nmb = coll->sig->nmembers + sig->nmembers; + PMIX_PROC_CREATE(p, nmb); + memcpy(p, coll->sig->addmembers, coll->sig->nmembers * sizeof(pmix_proc_t)); + memcpy(&p[coll->sig->nmembers], sig->addmembers, sig->nmembers * sizeof(pmix_proc_t)); + PMIX_PROC_FREE(coll->sig->addmembers, coll->sig->nmembers); + coll->sig->addmembers = p; + coll->sig->nmembers = nmb; + } + } + return coll; } diff --git a/src/mca/grpcomm/direct/grpcomm_direct.c b/src/mca/grpcomm/direct/grpcomm_direct.c index 07904d5b20..dac1e5d69f 100644 --- a/src/mca/grpcomm/direct/grpcomm_direct.c +++ b/src/mca/grpcomm/direct/grpcomm_direct.c @@ -25,6 +25,7 @@ #include "src/class/pmix_list.h" #include "src/pmix/pmix-internal.h" +#include "src/prted/pmix/pmix_server_internal.h" #include "src/mca/errmgr/errmgr.h" #include "src/rml/rml.h" #include "src/mca/state/state.h" @@ -163,6 +164,7 @@ static void allgather_recv(int status, pmix_proc_t *sender, int rc, timeout; size_t n, ninfo, memsize, m; bool assignID = false; + bool found; pmix_proc_t *addmembers = NULL; size_t num_members = 0; prte_namelist_t *nm; @@ -183,8 +185,8 @@ static void allgather_recv(int status, pmix_proc_t *sender, /* unpack the signature */ rc = prte_grpcomm_sig_unpack(buffer, &sig); - if (PMIX_SUCCESS != rc) { - PMIX_ERROR_LOG(rc); + if (PRTE_SUCCESS != rc) { + PRTE_ERROR_LOG(rc); } /* check for the tracker and create it if not found */ @@ -271,12 +273,25 @@ static void allgather_recv(int status, pmix_proc_t *sender, /* update the info with the collected value */ info[n].value.type = PMIX_BOOL; info[n].value.data.flag = coll->assignID; - } else if (PMIX_CHECK_KEY(&info[n], PMIX_GROUP_ADD_MEMBERS)) { - addmembers = (pmix_proc_t*)info[n].value.data.darray->array; - num_members = info[n].value.data.darray->size; - for (m=0; m < num_members; m++) { + } + } + + // check for any added members + if (NULL != sig->addmembers) { + // add them to the global collective + for (m=0; m < sig->nmembers; m++) { + // check to see if we already have this member + found = false; + PMIX_LIST_FOREACH(nm, &coll->addmembers, prte_namelist_t) { + if (PMIX_CHECK_PROCID(&nm->name, &sig->addmembers[m])) { + // already have it + found = true; + break; + } + } + if (!found) { nm = PMIX_NEW(prte_namelist_t); - PMIX_XFER_PROCID(&nm->name, &addmembers[m]); + PMIX_XFER_PROCID(&nm->name, &sig->addmembers[m]); pmix_list_append(&coll->addmembers, &nm->super); } } @@ -305,8 +320,16 @@ static void allgather_recv(int status, pmix_proc_t *sender, PRTE_NAME_PRINT(PRTE_PROC_MY_NAME))); /* the allgather is complete - send the xcast */ PMIX_DATA_BUFFER_CREATE(reply); + + /* if we were asked to provide a context id, do so */ + if (assignID) { + coll->sig->ctxid = prte_grpcomm_base.context_id; + --prte_grpcomm_base.context_id; + coll->sig->ctxid_assigned = true; + } + /* pack the signature */ - rc = prte_grpcomm_sig_pack(reply, sig); + rc = prte_grpcomm_sig_pack(reply, coll->sig); if (PMIX_SUCCESS != rc) { PMIX_ERROR_LOG(rc); PMIX_DATA_BUFFER_RELEASE(reply); @@ -324,12 +347,9 @@ static void allgather_recv(int status, pmix_proc_t *sender, /* add some values to the payload in the bucket */ PMIX_DATA_BUFFER_CONSTRUCT(&ctrlbuf); - /* if we were asked to provide a context id, do so */ + /* if we assigned a context id, include it in the bucket as well */ if (assignID) { - size_t sz; - sz = prte_grpcomm_base.context_id; - --prte_grpcomm_base.context_id; - PMIX_INFO_LOAD(&infostat, PMIX_GROUP_CONTEXT_ID, &sz, PMIX_SIZE); + PMIX_INFO_LOAD(&infostat, PMIX_GROUP_CONTEXT_ID, &coll->sig->ctxid, PMIX_SIZE); rc = PMIx_Data_pack(NULL, &ctrlbuf, &infostat, 1, PMIX_INFO); PMIX_INFO_DESTRUCT(&infostat); if (PMIX_SUCCESS != rc) { @@ -383,6 +403,7 @@ static void allgather_recv(int status, pmix_proc_t *sender, PMIX_RELEASE(sig); return; } + /* send the release via xcast */ (void) prte_grpcomm.xcast(sig, PRTE_RML_TAG_COLL_RELEASE, reply); } else { @@ -668,6 +689,14 @@ static void xcast_recv(int status, pmix_proc_t *sender, PMIX_DATA_BUFFER_DESTRUCT(&datbuf); } +static void opcbfunc(pmix_status_t status, void *cbdata) +{ + prte_pmix_server_op_caddy_t *cd = (prte_pmix_server_op_caddy_t*)cbdata; + + PMIX_INFO_FREE(cd->info, cd->ninfo); + PMIX_RELEASE(cd); +} + static void barrier_release(int status, pmix_proc_t *sender, pmix_data_buffer_t *buffer, prte_rml_tag_t tag, void *cbdata) @@ -676,6 +705,9 @@ static void barrier_release(int status, pmix_proc_t *sender, int rc, ret; prte_grpcomm_signature_t *sig = NULL; prte_grpcomm_coll_t *coll; + pmix_data_array_t darray; + prte_pmix_server_op_caddy_t *cd; + void *values; PRTE_HIDE_UNUSED_PARAMS(status, sender, tag, cbdata); PMIX_OUTPUT_VERBOSE((5, prte_grpcomm_base_framework.framework_output, @@ -698,6 +730,49 @@ static void barrier_release(int status, pmix_proc_t *sender, return; } + // if there are added members, notify any that are local to us + if (NULL != sig->addmembers) { + // we have to handle these even if we did not participate in + // the collective as an added member might be on a non-participating + // node. Start by providing the overall membership + cd = PMIX_NEW(prte_pmix_server_op_caddy_t); + PMIX_INFO_LIST_START(values); + + /* protect against infinite loops by marking that this notification was + * passed down to the server by me */ + PMIX_INFO_LIST_ADD(rc, values, "prte.notify.donotloop", NULL, PMIX_BOOL); + // add the context ID, if one was given + if (sig->ctxid_assigned) { + PMIX_INFO_LIST_ADD(rc, values, PMIX_GROUP_CONTEXT_ID, &sig->ctxid, PMIX_SIZE); + } + // load the membership + cd->nprocs = sig->sz + sig->nmembers; + PMIX_PROC_CREATE(cd->procs, cd->nprocs); + memcpy(cd->procs, sig->signature, sig->sz * sizeof(pmix_proc_t)); + memcpy(&cd->procs[sig->sz], sig->addmembers, sig->nmembers * sizeof(pmix_proc_t)); + darray.type = PMIX_PROC; + darray.array = cd->procs; + darray.size = cd->nprocs; + // load the array - note: this copies the array! + PMIX_INFO_LIST_ADD(rc, values, PMIX_GROUP_MEMBERSHIP, &darray, PMIX_DATA_ARRAY); + PMIX_PROC_FREE(cd->procs, cd->nprocs); + // provide the group ID + PMIX_INFO_LIST_ADD(rc, values, PMIX_GROUP_ID, sig->groupID, PMIX_STRING); + // set the range to be only procs that were added + darray.array = sig->addmembers; + darray.size = sig->nmembers; + // load the array - note: this copies the array! + PMIX_INFO_LIST_ADD(rc, values, PMIX_EVENT_CUSTOM_RANGE, &darray, PMIX_DATA_ARRAY); + // convert the list to an array + PMIX_INFO_LIST_CONVERT(rc, values, &darray); + cd->info = (pmix_info_t*)darray.array; + cd->ninfo = darray.size; + PMIX_INFO_LIST_RELEASE(values); + // notify local procs + PMIx_Notify_event(PMIX_GROUP_INVITED, PRTE_PROC_MY_NAME, PMIX_RANGE_CUSTOM, + cd->info, cd->ninfo, opcbfunc, cd); + } + /* check for the tracker - it is not an error if not * found as that just means we wre not involved * in the collective */ diff --git a/src/mca/grpcomm/grpcomm.h b/src/mca/grpcomm/grpcomm.h index a423533a70..7c34500b25 100644 --- a/src/mca/grpcomm/grpcomm.h +++ b/src/mca/grpcomm/grpcomm.h @@ -64,8 +64,12 @@ typedef int (*prte_grpcomm_rbcast_cb_t)(pmix_data_buffer_t *buffer); typedef struct { pmix_object_t super; char *groupID; + size_t ctxid; + bool ctxid_assigned; pmix_proc_t *signature; size_t sz; + pmix_proc_t *addmembers; + size_t nmembers; } prte_grpcomm_signature_t; PRTE_EXPORT PMIX_CLASS_DECLARATION(prte_grpcomm_signature_t); diff --git a/src/mca/state/dvm/state_dvm.c b/src/mca/state/dvm/state_dvm.c index 1b3da8d84d..c522fe8e4f 100644 --- a/src/mca/state/dvm/state_dvm.c +++ b/src/mca/state/dvm/state_dvm.c @@ -320,6 +320,7 @@ static void vm_ready(int fd, short args, void *cbdata) } /* goes to all daemons */ + PMIX_CONSTRUCT(&sig, prte_grpcomm_signature_t); PMIX_PROC_CREATE(sig.signature, 1); PMIX_LOAD_PROCID(&sig.signature[0], PRTE_PROC_MY_NAME->nspace, PMIX_RANK_WILDCARD); sig.sz = 1; diff --git a/src/prted/pmix/Makefile.am b/src/prted/pmix/Makefile.am index b3edd789f1..a046447a01 100644 --- a/src/prted/pmix/Makefile.am +++ b/src/prted/pmix/Makefile.am @@ -22,4 +22,5 @@ libprrte_la_SOURCES += \ prted/pmix/pmix_server_gen.c \ prted/pmix/pmix_server_queries.c \ prted/pmix/pmix_server_allocate.c \ - prted/pmix/pmix_server_session.c + prted/pmix/pmix_server_session.c \ + prted/pmix/pmix_server_group.c diff --git a/src/prted/pmix/pmix_server_gen.c b/src/prted/pmix/pmix_server_gen.c index a8e7a97d80..7e0f60f19c 100644 --- a/src/prted/pmix/pmix_server_gen.c +++ b/src/prted/pmix/pmix_server_gen.c @@ -1004,279 +1004,6 @@ pmix_status_t pmix_server_job_ctrl_fn(const pmix_proc_t *requestor, const pmix_p return PMIX_ERR_NOT_SUPPORTED; } -static void relcb(void *cbdata) -{ - prte_pmix_mdx_caddy_t *cd = (prte_pmix_mdx_caddy_t *) cbdata; - - if (NULL != cd->info) { - PMIX_INFO_FREE(cd->info, cd->ninfo); - } - PMIX_RELEASE(cd); -} -static void group_release(int status, pmix_data_buffer_t *buf, void *cbdata) -{ - prte_pmix_mdx_caddy_t *cd = (prte_pmix_mdx_caddy_t *) cbdata; - int32_t cnt; - int rc = PRTE_SUCCESS; - pmix_status_t ret; - bool assignedID = false; - bool procsadded = false; - size_t cid; - pmix_proc_t *procs, *members; - size_t n, num_members; - pmix_data_array_t darray; - pmix_info_t info; - pmix_data_buffer_t dbuf; - pmix_byte_object_t bo; - int32_t byused; - pmix_server_pset_t *pset; - - PMIX_ACQUIRE_OBJECT(cd); - - pmix_output_verbose(2, prte_pmix_server_globals.output, - "%s group request complete", - PRTE_NAME_PRINT(PRTE_PROC_MY_NAME)); - - if (PRTE_SUCCESS != status) { - rc = status; - goto complete; - } - - /* if this was a destruct operation, then there is nothing - * further we need do */ - if (PMIX_GROUP_DESTRUCT == cd->op) { - /* find this group ID on our list of groups */ - PMIX_LIST_FOREACH(pset, &prte_pmix_server_globals.groups, pmix_server_pset_t) - { - if (0 == strcmp(pset->name, cd->grpid)) { - pmix_list_remove_item(&prte_pmix_server_globals.groups, &pset->super); - PMIX_RELEASE(pset); - break; - } - } - rc = status; - goto complete; - } - - /* check for any directives */ - cnt = 1; - rc = PMIx_Data_unpack(NULL, buf, &bo, &cnt, PMIX_BYTE_OBJECT); - if (PMIX_SUCCESS != rc) { - PMIX_ERROR_LOG(rc); - goto complete; - } - PMIX_DATA_BUFFER_CONSTRUCT(&dbuf); - PMIX_DATA_BUFFER_LOAD(&dbuf, bo.bytes, bo.size); - - cd->ninfo = 2; - cnt = 1; - rc = PMIx_Data_unpack(NULL, &dbuf, &info, &cnt, PMIX_INFO); - while (PMIX_SUCCESS == rc) { - if (PMIX_CHECK_KEY(&info, PMIX_GROUP_CONTEXT_ID)) { - PMIX_VALUE_GET_NUMBER(rc, &info.value, cid, size_t); - if (PMIX_SUCCESS != rc) { - PMIX_ERROR_LOG(rc); - cd->ninfo = 0; - PMIX_DATA_BUFFER_DESTRUCT(&dbuf); - goto complete; - } - assignedID = true; - cd->ninfo++; - } else if (PMIX_CHECK_KEY(&info, PMIX_GROUP_ADD_MEMBERS)) { - members = (pmix_proc_t*)info.value.data.darray->array; - num_members = info.value.data.darray->size; - PMIX_PROC_CREATE(procs, cd->nprocs + num_members); - for (n=0; n < cd->nprocs; n++) { - PMIX_XFER_PROCID(&procs[n], &cd->procs[n]); - } - for (n=0; n < num_members; n++) { - PMIX_XFER_PROCID(&procs[n+cd->nprocs], &members[n]); - } - PMIX_PROC_FREE(cd->procs, cd->nprocs); - cd->procs = procs; - cd->nprocs += num_members; - procsadded = true; - } - /* cleanup */ - PMIX_INFO_DESTRUCT(&info); - /* get the next object */ - cnt = 1; - rc = PMIx_Data_unpack(NULL, &dbuf, &info, &cnt, PMIX_INFO); - } - PMIX_DATA_BUFFER_DESTRUCT(&dbuf); - /* the unpacking loop will have ended when the unpack either - * went past the end of the buffer */ - if (PMIX_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) { - PMIX_ERROR_LOG(rc); - goto complete; - } - rc = PMIX_SUCCESS; - - if (PMIX_GROUP_CONSTRUCT == cd->op) { - /* add it to our list of known groups */ - pset = PMIX_NEW(pmix_server_pset_t); - pset->name = strdup(cd->grpid); - pset->num_members = cd->nprocs; - PMIX_PROC_CREATE(pset->members, pset->num_members); - memcpy(pset->members, cd->procs, cd->nprocs * sizeof(pmix_proc_t)); - pmix_list_append(&prte_pmix_server_globals.groups, &pset->super); - } - - /* if anything is left in the buffer, then it is - * modex data that needs to be stored */ - PMIX_BYTE_OBJECT_CONSTRUCT(&bo); - byused = buf->bytes_used - (buf->unpack_ptr - buf->base_ptr); - if (0 < byused) { - bo.bytes = buf->unpack_ptr; - bo.size = byused; - } - if (NULL != bo.bytes && 0 < bo.size) { - cd->ninfo++; - } - - PMIX_INFO_CREATE(cd->info, cd->ninfo); - n = 0; - // pass back the final group membership - darray.type = PMIX_PROC; - darray.array = cd->procs; - darray.size = cd->nprocs; - PMIX_INFO_LOAD(&cd->info[n], PMIX_GROUP_MEMBERSHIP, &darray, PMIX_DATA_ARRAY); - PMIX_PROC_FREE(cd->procs, cd->nprocs); - ++n; - if (assignedID) { - PMIX_INFO_LOAD(&cd->info[n], PMIX_GROUP_CONTEXT_ID, &cid, PMIX_SIZE); - ++n; - } - if (NULL != bo.bytes && 0 < bo.size) { - PMIX_INFO_LOAD(&cd->info[n], PMIX_GROUP_ENDPT_DATA, &bo, PMIX_BYTE_OBJECT); - } - -complete: - ret = prte_pmix_convert_rc(rc); - /* return to the local procs in the collective */ - if (NULL != cd->infocbfunc) { - cd->infocbfunc(ret, cd->info, cd->ninfo, cd->cbdata, relcb, cd); - } else { - if (NULL != cd->info) { - PMIX_INFO_FREE(cd->info, cd->ninfo); - } - PMIX_RELEASE(cd); - } -} - -pmix_status_t pmix_server_group_fn(pmix_group_operation_t op, char *grpid, - const pmix_proc_t procs[], size_t nprocs, - const pmix_info_t directives[], size_t ndirs, - pmix_info_cbfunc_t cbfunc, void *cbdata) -{ - prte_pmix_mdx_caddy_t *cd; - int rc; - size_t i; - bool assignID = false; - pmix_server_pset_t *pset; - bool fence = false; - bool force_local = false; - pmix_byte_object_t *bo = NULL; - struct timeval tv = {0, 0}; - - pmix_output_verbose(2, prte_pmix_server_globals.output, - "%s group request recvd", - PRTE_NAME_PRINT(PRTE_PROC_MY_NAME)); - - /* they are required to pass us an id */ - if (NULL == grpid) { - return PMIX_ERR_BAD_PARAM; - } - - /* check the directives */ - for (i = 0; i < ndirs; i++) { - /* see if they want a context id assigned */ - if (PMIX_CHECK_KEY(&directives[i], PMIX_GROUP_ASSIGN_CONTEXT_ID)) { - assignID = PMIX_INFO_TRUE(&directives[i]); - } else if (PMIX_CHECK_KEY(&directives[i], PMIX_EMBED_BARRIER)) { - fence = PMIX_INFO_TRUE(&directives[i]); - } else if (PMIX_CHECK_KEY(&directives[i], PMIX_GROUP_ENDPT_DATA)) { - bo = (pmix_byte_object_t *) &directives[i].value.data.bo; - } else if (PMIX_CHECK_KEY(&directives[i], PMIX_TIMEOUT)) { - tv.tv_sec = directives[i].value.data.uint32; - } else if (PMIX_CHECK_KEY(&directives[i], PMIX_GROUP_LOCAL_ONLY)) { - force_local = PMIX_INFO_TRUE(&directives[i]); - } - } - - /* if they don't want us to do a fence and they don't want a - * context id assigned, or they insist on forcing local - * completion of the operation, then we are done */ - if ((!fence && !assignID) || force_local) { - pmix_output_verbose(2, prte_pmix_server_globals.output, - "%s group request - purely local", - PRTE_NAME_PRINT(PRTE_PROC_MY_NAME)); - if (PMIX_GROUP_CONSTRUCT == op) { - /* add it to our list of known groups */ - pset = PMIX_NEW(pmix_server_pset_t); - pset->name = strdup(grpid); - pset->num_members = nprocs; - PMIX_PROC_CREATE(pset->members, pset->num_members); - memcpy(pset->members, procs, nprocs * sizeof(pmix_proc_t)); - pmix_list_append(&prte_pmix_server_globals.groups, &pset->super); - } else if (PMIX_GROUP_DESTRUCT == op) { - /* find this group ID on our list of groups */ - PMIX_LIST_FOREACH(pset, &prte_pmix_server_globals.groups, pmix_server_pset_t) - { - if (0 == strcmp(pset->name, grpid)) { - pmix_list_remove_item(&prte_pmix_server_globals.groups, &pset->super); - PMIX_RELEASE(pset); - break; - } - } - } - return PMIX_OPERATION_SUCCEEDED; - } - - cd = PMIX_NEW(prte_pmix_mdx_caddy_t); - cd->op = op; - cd->grpid = strdup(grpid); - /* have to copy the procs in case we add members */ - PMIX_PROC_CREATE(cd->procs, nprocs); - memcpy(cd->procs, procs, nprocs * sizeof(pmix_proc_t)); - cd->nprocs = nprocs; - cd->grpcbfunc = group_release; - cd->infocbfunc = cbfunc; - cd->cbdata = cbdata; - - /* compute the signature of this collective */ - cd->sig = PMIX_NEW(prte_grpcomm_signature_t); - cd->sig->groupID = strdup(grpid); - if (NULL != procs) { - cd->sig->sz = nprocs; - cd->sig->signature = (pmix_proc_t *) malloc(cd->sig->sz * sizeof(pmix_proc_t)); - memcpy(cd->sig->signature, procs, cd->sig->sz * sizeof(pmix_proc_t)); - } - /* setup the ctrls blob - this will include any "add_members" directive */ - rc = prte_pack_ctrl_options(&cd->ctrls, directives, ndirs); - if (PMIX_SUCCESS != rc) { - PMIX_RELEASE(cd); - return rc; - } - PMIX_DATA_BUFFER_CREATE(cd->buf); - /* if they provided us with a data blob, send it along */ - if (NULL != bo) { - /* We don't own the byte_object and so we have to - * copy it here */ - rc = PMIx_Data_embed(cd->buf, bo); - if (PMIX_SUCCESS != rc) { - PMIX_ERROR_LOG(rc); - } - } - /* pass it to the global collective algorithm */ - if (PRTE_SUCCESS != (rc = prte_grpcomm.allgather(cd))) { - PRTE_ERROR_LOG(rc); - PMIX_RELEASE(cd); - return PMIX_ERROR; - } - return PMIX_SUCCESS; -} - pmix_status_t pmix_server_iof_pull_fn(const pmix_proc_t procs[], size_t nprocs, const pmix_info_t directives[], size_t ndirs, pmix_iof_channel_t channels, pmix_op_cbfunc_t cbfunc, diff --git a/src/prted/pmix/pmix_server_group.c b/src/prted/pmix/pmix_server_group.c new file mode 100644 index 0000000000..4dd6936878 --- /dev/null +++ b/src/prted/pmix/pmix_server_group.c @@ -0,0 +1,377 @@ +/* + * Copyright (c) 2004-2010 The Trustees of Indiana University and Indiana + * University Research and Technology + * Corporation. All rights reserved. + * Copyright (c) 2004-2011 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2005 The Regents of the University of California. + * All rights reserved. + * Copyright (c) 2006-2013 Los Alamos National Security, LLC. + * All rights reserved. + * Copyright (c) 2009-2020 Cisco Systems, Inc. All rights reserved + * Copyright (c) 2011 Oak Ridge National Labs. All rights reserved. + * Copyright (c) 2013-2020 Intel, Inc. All rights reserved. + * Copyright (c) 2014-2017 Mellanox Technologies, Inc. + * All rights reserved. + * Copyright (c) 2014-2019 Research Organization for Information Science + * and Technology (RIST). All rights reserved. + * Copyright (c) 2020 IBM Corporation. All rights reserved. + * Copyright (c) 2021-2024 Nanook Consulting All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + * + */ + +#include "prte_config.h" + +#ifdef HAVE_UNISTD_H +# include +#endif + +#include "src/hwloc/hwloc-internal.h" +#include "src/pmix/pmix-internal.h" +#include "src/util/pmix_argv.h" +#include "src/util/pmix_output.h" + +#include "src/mca/errmgr/errmgr.h" +#include "src/mca/grpcomm/base/base.h" +#include "src/mca/iof/base/base.h" +#include "src/mca/iof/iof.h" +#include "src/mca/plm/base/plm_private.h" +#include "src/mca/plm/plm.h" +#include "src/mca/plm/base/plm_private.h" +#include "src/mca/rmaps/rmaps_types.h" +#include "src/rml/rml.h" +#include "src/mca/schizo/schizo.h" +#include "src/mca/state/state.h" +#include "src/runtime/prte_globals.h" +#include "src/runtime/prte_locks.h" +#include "src/threads/pmix_threads.h" +#include "src/util/name_fns.h" +#include "src/util/pmix_show_help.h" + +#include "src/prted/pmix/pmix_server_internal.h" + +static void relcb(void *cbdata) +{ + prte_pmix_mdx_caddy_t *cd = (prte_pmix_mdx_caddy_t *) cbdata; + + if (NULL != cd->info) { + PMIX_INFO_FREE(cd->info, cd->ninfo); + } + PMIX_RELEASE(cd); +} +static void group_release(int status, pmix_data_buffer_t *buf, void *cbdata) +{ + prte_pmix_mdx_caddy_t *cd = (prte_pmix_mdx_caddy_t *) cbdata; + int32_t cnt; + int rc = PRTE_SUCCESS; + pmix_status_t ret; + bool assignedID = false; + bool procsadded = false; + size_t cid; + pmix_proc_t *procs, *members; + size_t n, num_members; + pmix_data_array_t darray; + pmix_info_t info; + pmix_data_buffer_t dbuf; + pmix_byte_object_t bo; + int32_t byused; + pmix_server_pset_t *pset; + + PMIX_ACQUIRE_OBJECT(cd); + + pmix_output_verbose(2, prte_pmix_server_globals.output, + "%s group request complete", + PRTE_NAME_PRINT(PRTE_PROC_MY_NAME)); + + if (PRTE_SUCCESS != status) { + rc = status; + goto complete; + } + + /* if this was a destruct operation, then there is nothing + * further we need do */ + if (PMIX_GROUP_DESTRUCT == cd->op) { + /* find this group ID on our list of groups */ + PMIX_LIST_FOREACH(pset, &prte_pmix_server_globals.groups, pmix_server_pset_t) + { + if (0 == strcmp(pset->name, cd->grpid)) { + pmix_list_remove_item(&prte_pmix_server_globals.groups, &pset->super); + PMIX_RELEASE(pset); + break; + } + } + rc = status; + goto complete; + } + + /* check for any directives */ + cnt = 1; + rc = PMIx_Data_unpack(NULL, buf, &bo, &cnt, PMIX_BYTE_OBJECT); + if (PMIX_SUCCESS != rc) { + PMIX_ERROR_LOG(rc); + goto complete; + } + PMIX_DATA_BUFFER_CONSTRUCT(&dbuf); + PMIX_DATA_BUFFER_LOAD(&dbuf, bo.bytes, bo.size); + + cd->ninfo = 2; + cnt = 1; + rc = PMIx_Data_unpack(NULL, &dbuf, &info, &cnt, PMIX_INFO); + while (PMIX_SUCCESS == rc) { + if (PMIX_CHECK_KEY(&info, PMIX_GROUP_CONTEXT_ID)) { + PMIX_VALUE_GET_NUMBER(rc, &info.value, cid, size_t); + if (PMIX_SUCCESS != rc) { + PMIX_ERROR_LOG(rc); + cd->ninfo = 0; + PMIX_DATA_BUFFER_DESTRUCT(&dbuf); + goto complete; + } + assignedID = true; + cd->ninfo++; + } else if (PMIX_CHECK_KEY(&info, PMIX_GROUP_ADD_MEMBERS)) { + members = (pmix_proc_t*)info.value.data.darray->array; + num_members = info.value.data.darray->size; + PMIX_PROC_CREATE(procs, cd->nprocs + num_members); + for (n=0; n < cd->nprocs; n++) { + PMIX_XFER_PROCID(&procs[n], &cd->procs[n]); + } + for (n=0; n < num_members; n++) { + PMIX_XFER_PROCID(&procs[n+cd->nprocs], &members[n]); + } + PMIX_PROC_FREE(cd->procs, cd->nprocs); + cd->procs = procs; + cd->nprocs += num_members; + procsadded = true; + } + /* cleanup */ + PMIX_INFO_DESTRUCT(&info); + /* get the next object */ + cnt = 1; + rc = PMIx_Data_unpack(NULL, &dbuf, &info, &cnt, PMIX_INFO); + } + PMIX_DATA_BUFFER_DESTRUCT(&dbuf); + + /* the unpacking loop will have ended when the unpack either + * went past the end of the buffer */ + if (PMIX_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) { + PMIX_ERROR_LOG(rc); + goto complete; + } + rc = PMIX_SUCCESS; + + if (PMIX_GROUP_CONSTRUCT == cd->op) { + /* add it to our list of known groups */ + pset = PMIX_NEW(pmix_server_pset_t); + pset->name = strdup(cd->grpid); + pset->num_members = cd->nprocs; + PMIX_PROC_CREATE(pset->members, pset->num_members); + memcpy(pset->members, cd->procs, cd->nprocs * sizeof(pmix_proc_t)); + pmix_list_append(&prte_pmix_server_globals.groups, &pset->super); + } + + /* if anything is left in the buffer, then it is + * modex data that needs to be stored */ + PMIX_BYTE_OBJECT_CONSTRUCT(&bo); + byused = buf->bytes_used - (buf->unpack_ptr - buf->base_ptr); + if (0 < byused) { + bo.bytes = buf->unpack_ptr; + bo.size = byused; + } + if (NULL != bo.bytes && 0 < bo.size) { + cd->ninfo++; + } + + PMIX_INFO_CREATE(cd->info, cd->ninfo); + n = 0; + // pass back the final group membership + darray.type = PMIX_PROC; + darray.array = cd->procs; + darray.size = cd->nprocs; + PMIX_INFO_LOAD(&cd->info[n], PMIX_GROUP_MEMBERSHIP, &darray, PMIX_DATA_ARRAY); + PMIX_PROC_FREE(cd->procs, cd->nprocs); + ++n; + if (assignedID) { + PMIX_INFO_LOAD(&cd->info[n], PMIX_GROUP_CONTEXT_ID, &cid, PMIX_SIZE); + ++n; + } + if (NULL != bo.bytes && 0 < bo.size) { + PMIX_INFO_LOAD(&cd->info[n], PMIX_GROUP_ENDPT_DATA, &bo, PMIX_BYTE_OBJECT); + } + +complete: + ret = prte_pmix_convert_rc(rc); + /* return to the local procs in the collective */ + if (NULL != cd->infocbfunc) { + cd->infocbfunc(ret, cd->info, cd->ninfo, cd->cbdata, relcb, cd); + } else { + if (NULL != cd->info) { + PMIX_INFO_FREE(cd->info, cd->ninfo); + } + PMIX_RELEASE(cd); + } +} + +pmix_status_t pmix_server_group_fn(pmix_group_operation_t op, char *grpid, + const pmix_proc_t procs[], size_t nprocs, + const pmix_info_t directives[], size_t ndirs, + pmix_info_cbfunc_t cbfunc, void *cbdata) +{ + prte_pmix_mdx_caddy_t *cd; + int rc; + size_t i; + bool assignID = false; + pmix_server_pset_t *pset; + bool fence = false; + bool force_local = false; + pmix_proc_t *members = NULL; + pmix_proc_t *mbrs, *p; + size_t num_members = 0; + size_t nmembers; + bool copied = false; + pmix_byte_object_t *bo = NULL; + struct timeval tv = {0, 0}; + + pmix_output_verbose(2, prte_pmix_server_globals.output, + "%s Group request recvd with %u directives", + PRTE_NAME_PRINT(PRTE_PROC_MY_NAME), ndirs); + + /* they are required to pass us an id */ + if (NULL == grpid) { + return PMIX_ERR_BAD_PARAM; + } + + /* check the directives */ + for (i = 0; i < ndirs; i++) { + /* see if they want a context id assigned */ + if (PMIX_CHECK_KEY(&directives[i], PMIX_GROUP_ASSIGN_CONTEXT_ID)) { + assignID = PMIX_INFO_TRUE(&directives[i]); + } else if (PMIX_CHECK_KEY(&directives[i], PMIX_EMBED_BARRIER)) { + fence = PMIX_INFO_TRUE(&directives[i]); + } else if (PMIX_CHECK_KEY(&directives[i], PMIX_GROUP_ENDPT_DATA)) { + bo = (pmix_byte_object_t *) &directives[i].value.data.bo; + } else if (PMIX_CHECK_KEY(&directives[i], PMIX_TIMEOUT)) { + tv.tv_sec = directives[i].value.data.uint32; + } else if (PMIX_CHECK_KEY(&directives[i], PMIX_GROUP_LOCAL_ONLY)) { + force_local = PMIX_INFO_TRUE(&directives[i]); + } else if (PMIX_CHECK_KEY(&directives[i], PMIX_GROUP_ADD_MEMBERS)) { + // there can be more than one entry here as this is the aggregate + // of info keys from local procs that called group_construct + if (NULL == members) { + members = (pmix_proc_t*)directives[i].value.data.darray->array; + num_members = directives[i].value.data.darray->size; + } else { + copied = true; + // need to aggregate these + mbrs = (pmix_proc_t*)directives[i].value.data.darray->array; + nmembers = directives[i].value.data.darray->size; + // create a new array + PMIX_PROC_CREATE(p, nmembers * num_members); + // xfer data across + memcpy(p, members, num_members * sizeof(pmix_proc_t)); + memcpy(&p[num_members], mbrs, nmembers * sizeof(pmix_proc_t)); + // release the old array + PMIX_PROC_FREE(members, num_members); + // complete the xfer + members = p; + num_members = num_members + nmembers; + } + } + } + + /* if they don't want us to do a fence and they don't want a + * context id assigned and they aren't adding members, or they + * insist on forcing local completion of the operation, then + * we are done */ + if ((!fence && !assignID && NULL == members) || force_local) { + pmix_output_verbose(2, prte_pmix_server_globals.output, + "%s group request - purely local", + PRTE_NAME_PRINT(PRTE_PROC_MY_NAME)); + if (PMIX_GROUP_CONSTRUCT == op) { + /* add it to our list of known groups */ + pset = PMIX_NEW(pmix_server_pset_t); + pset->name = strdup(grpid); + if (NULL != members) {} + pset->num_members = nprocs; + if (NULL != members) { + pset->num_members += num_members; + } + PMIX_PROC_CREATE(pset->members, pset->num_members); + memcpy(pset->members, procs, nprocs * sizeof(pmix_proc_t)); + if (NULL != members) { + memcpy(&pset->members[nprocs], members, num_members * sizeof(pmix_proc_t)); + } + pmix_list_append(&prte_pmix_server_globals.groups, &pset->super); + } else if (PMIX_GROUP_DESTRUCT == op) { + /* find this group ID on our list of groups */ + PMIX_LIST_FOREACH(pset, &prte_pmix_server_globals.groups, pmix_server_pset_t) + { + if (0 == strcmp(pset->name, grpid)) { + pmix_list_remove_item(&prte_pmix_server_globals.groups, &pset->super); + PMIX_RELEASE(pset); + break; + } + } + } + return PMIX_OPERATION_SUCCEEDED; + } + + cd = PMIX_NEW(prte_pmix_mdx_caddy_t); + cd->op = op; + cd->grpid = strdup(grpid); + /* have to copy the procs in case we add members */ + PMIX_PROC_CREATE(cd->procs, nprocs); + memcpy(cd->procs, procs, nprocs * sizeof(pmix_proc_t)); + cd->nprocs = nprocs; + cd->grpcbfunc = group_release; + cd->infocbfunc = cbfunc; + cd->cbdata = cbdata; + + /* compute the signature of this collective */ + cd->sig = PMIX_NEW(prte_grpcomm_signature_t); + cd->sig->groupID = strdup(grpid); + if (NULL != procs) { + cd->sig->sz = nprocs; + cd->sig->signature = (pmix_proc_t *) malloc(cd->sig->sz * sizeof(pmix_proc_t)); + memcpy(cd->sig->signature, procs, cd->sig->sz * sizeof(pmix_proc_t)); + } + if (NULL != members) { + cd->sig->nmembers = num_members; + if (copied) { + cd->sig->addmembers = members; + } else { + cd->sig->addmembers = (pmix_proc_t *) malloc(num_members * sizeof(pmix_proc_t)); + memcpy(cd->sig->addmembers, members, num_members * sizeof(pmix_proc_t)); + } + } + /* setup the ctrls blob - this will include any "add_members" directive */ + rc = prte_pack_ctrl_options(&cd->ctrls, directives, ndirs); + if (PMIX_SUCCESS != rc) { + PMIX_RELEASE(cd); + return rc; + } + PMIX_DATA_BUFFER_CREATE(cd->buf); + /* if they provided us with a data blob, send it along */ + if (NULL != bo) { + /* We don't own the byte_object and so we have to + * copy it here */ + rc = PMIx_Data_embed(cd->buf, bo); + if (PMIX_SUCCESS != rc) { + PMIX_ERROR_LOG(rc); + } + } + /* pass it to the global collective algorithm */ + if (PRTE_SUCCESS != (rc = prte_grpcomm.allgather(cd))) { + PRTE_ERROR_LOG(rc); + PMIX_RELEASE(cd); + return PMIX_ERROR; + } + return PMIX_SUCCESS; +} diff --git a/src/rml/rml_types.h b/src/rml/rml_types.h index 91dafb4cd2..a22975e9d1 100644 --- a/src/rml/rml_types.h +++ b/src/rml/rml_types.h @@ -191,41 +191,41 @@ typedef void (*prte_rml_buffer_callback_fn_t)(int status, pmix_proc_t *peer, #define PRTE_RML_TAG_CLOSE_CHANNEL_ACCEPT 58 /* error notifications */ -#define PRTE_RML_TAG_NOTIFICATION 59 +#define PRTE_RML_TAG_NOTIFICATION 59 /* stacktrace for debug */ -#define PRTE_RML_TAG_STACK_TRACE 60 +#define PRTE_RML_TAG_STACK_TRACE 60 /* memory profile */ -#define PRTE_RML_TAG_MEMPROFILE 61 +#define PRTE_RML_TAG_MEMPROFILE 61 /* topology report */ -#define PRTE_RML_TAG_TOPOLOGY_REPORT 62 +#define PRTE_RML_TAG_TOPOLOGY_REPORT 62 /* warmup connection - simply establishes the connection */ -#define PRTE_RML_TAG_WARMUP_CONNECTION 63 +#define PRTE_RML_TAG_WARMUP_CONNECTION 63 /* node regex report */ -#define PRTE_RML_TAG_NODE_REGEX_REPORT 64 +#define PRTE_RML_TAG_NODE_REGEX_REPORT 64 /* pmix log requests */ -#define PRTE_RML_TAG_LOGGING 65 +#define PRTE_RML_TAG_LOGGING 65 /* error propagate */ -#define PRTE_RML_TAG_RBCAST 66 +#define PRTE_RML_TAG_RBCAST 66 /* heartbeat request */ -#define PRTE_RML_TAG_HEARTBEAT_REQUEST 70 +#define PRTE_RML_TAG_HEARTBEAT_REQUEST 70 /* error propagate */ -#define PRTE_RML_TAG_PROPAGATE 71 +#define PRTE_RML_TAG_PROPAGATE 71 /* scheduler requests */ -#define PRTE_RML_TAG_SCHED 72 -#define PRTE_RML_TAG_SCHED_RESP 73 +#define PRTE_RML_TAG_SCHED 72 +#define PRTE_RML_TAG_SCHED_RESP 73 -#define PRTE_RML_TAG_MAX 100 +#define PRTE_RML_TAG_MAX 100 #define PRTE_RML_TAG_NTOH(t) ntohl(t) #define PRTE_RML_TAG_HTON(t) htonl(t) diff --git a/src/runtime/data_type_support/prte_dt_packing_fns.c b/src/runtime/data_type_support/prte_dt_packing_fns.c index 6e04a4399a..44f72fda4c 100644 --- a/src/runtime/data_type_support/prte_dt_packing_fns.c +++ b/src/runtime/data_type_support/prte_dt_packing_fns.c @@ -622,6 +622,34 @@ int prte_grpcomm_sig_pack(pmix_data_buffer_t *bkt, return prte_pmix_convert_status(rc); } + // pack the context ID, if one was given + rc = PMIx_Data_pack(NULL, bkt, &sig->ctxid_assigned, 1, PMIX_BOOL); + if (PMIX_SUCCESS != rc) { + PMIX_ERROR_LOG(rc); + return prte_pmix_convert_status(rc); + } + if (sig->ctxid_assigned) { + rc = PMIx_Data_pack(NULL, bkt, &sig->ctxid, 1, PMIX_SIZE); + if (PMIX_SUCCESS != rc) { + PMIX_ERROR_LOG(rc); + return prte_pmix_convert_status(rc); + } + } + + // pack added members, if given + rc = PMIx_Data_pack(NULL, bkt, &sig->nmembers, 1, PMIX_SIZE); + if (PMIX_SUCCESS != rc) { + PMIX_ERROR_LOG(rc); + return prte_pmix_convert_status(rc); + } + if (0 < sig->nmembers) { + rc = PMIx_Data_pack(NULL, bkt, sig->addmembers, sig->nmembers, PMIX_PROC); + if (PMIX_SUCCESS != rc) { + PMIX_ERROR_LOG(rc); + return prte_pmix_convert_status(rc); + } + } + if (NULL != sig->groupID) { // add the groupID if one is given rc = PMIx_Data_pack(NULL, bkt, &sig->groupID, 1, PMIX_STRING); diff --git a/src/runtime/data_type_support/prte_dt_unpacking_fns.c b/src/runtime/data_type_support/prte_dt_unpacking_fns.c index a8d750a5c1..fcbf167f6c 100644 --- a/src/runtime/data_type_support/prte_dt_unpacking_fns.c +++ b/src/runtime/data_type_support/prte_dt_unpacking_fns.c @@ -733,6 +733,43 @@ int prte_grpcomm_sig_unpack(pmix_data_buffer_t *buffer, return prte_pmix_convert_status(rc); } + // unpack the context ID, if one was assigned + cnt = 1; + rc = PMIx_Data_unpack(NULL, buffer, &s->ctxid_assigned, &cnt, PMIX_BOOL); + if (PMIX_SUCCESS != rc) { + PMIX_ERROR_LOG(rc); + PMIX_RELEASE(s); + return prte_pmix_convert_status(rc); + } + if (s->ctxid_assigned) { + cnt = 1; + rc = PMIx_Data_unpack(NULL, buffer, &s->ctxid, &cnt, PMIX_SIZE); + if (PMIX_SUCCESS != rc) { + PMIX_ERROR_LOG(rc); + PMIX_RELEASE(s); + return prte_pmix_convert_status(rc); + } + } + + // unpack the added members + cnt = 1; + rc = PMIx_Data_unpack(NULL, buffer, &s->nmembers, &cnt, PMIX_SIZE); + if (PMIX_SUCCESS != rc) { + PMIX_ERROR_LOG(rc); + PMIX_RELEASE(s); + return prte_pmix_convert_status(rc); + } + if (0 < s->nmembers) { + PMIX_PROC_CREATE(s->addmembers, s->nmembers); + cnt = s->nmembers; + rc = PMIx_Data_unpack(NULL, buffer, s->addmembers, &cnt, PMIX_PROC); + if (PMIX_SUCCESS != rc) { + PMIX_ERROR_LOG(rc); + PMIX_RELEASE(s); + return prte_pmix_convert_status(rc); + } + } + // try and unpack the groupID - error is okay, just means // one wasn't provided save = buffer->unpack_ptr; @@ -741,7 +778,6 @@ int prte_grpcomm_sig_unpack(pmix_data_buffer_t *buffer, // ignore the return code if (PMIX_SUCCESS != rc) { buffer->unpack_ptr = save; // reset location for next unpack - rc = PMIX_SUCCESS; } *sig = s;