Skip to content

Commit

Permalink
Properly invite members being added to a group
Browse files Browse the repository at this point in the history
If members are being added to a PMIx_Group_construct operation,
then those members need to be invited to join the group via
PMIx event. Properly track that via the grpcomm signature object,
including the info required to be passed to the invited members
so they have a complete picture of the group.

Signed-off-by: Ralph Castain <[email protected]>
  • Loading branch information
rhc54 committed Feb 20, 2024
1 parent 8822c96 commit cb87ece
Show file tree
Hide file tree
Showing 11 changed files with 593 additions and 319 deletions.
7 changes: 7 additions & 0 deletions src/mca/grpcomm/base/grpcomm_base_frame.c
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,12 @@ PMIX_CLASS_INSTANCE(prte_grpcomm_base_active_t,
static void scon(prte_grpcomm_signature_t *p)
{
p->groupID = NULL;
p->ctxid = 0;
p->ctxid_assigned = false;
p->signature = NULL;
p->sz = 0;
p->addmembers = NULL;
p->nmembers = 0;
}
static void sdes(prte_grpcomm_signature_t *p)
{
Expand All @@ -138,6 +142,9 @@ static void sdes(prte_grpcomm_signature_t *p)
if (NULL != p->signature) {
free(p->signature);
}
if (NULL != p->addmembers) {
free(p->addmembers);
}
}
PMIX_CLASS_INSTANCE(prte_grpcomm_signature_t,
pmix_object_t,
Expand Down
54 changes: 36 additions & 18 deletions src/mca/grpcomm/base/grpcomm_base_stubs.c
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,8 @@ prte_grpcomm_coll_t *prte_grpcomm_base_get_tracker(prte_grpcomm_signature_t *sig
{
prte_grpcomm_coll_t *coll;
int rc;
size_t n;
pmix_proc_t *p;
size_t n, nmb;

/* search the existing tracker list to see if this already exists - we
* default to using the groupID if one is given, otherwise we fallback
Expand All @@ -210,23 +211,21 @@ prte_grpcomm_coll_t *prte_grpcomm_base_get_tracker(prte_grpcomm_signature_t *sig
/* if only one is NULL, then we can't possibly match */
break;
}
if (sig->sz == coll->sig->sz) {
if (NULL != sig->groupID) {
// must match groupID's
if (NULL != coll->sig->groupID && 0 == strcmp(sig->groupID, coll->sig->groupID)) {
PMIX_OUTPUT_VERBOSE((1, prte_grpcomm_base_framework.framework_output,
"%s grpcomm:base:returning existing collective",
PRTE_NAME_PRINT(PRTE_PROC_MY_NAME)));
return coll;
}
} else {
// must match proc signature
if (0 == memcmp(sig->signature, coll->sig->signature, sig->sz * sizeof(pmix_proc_t))) {
PMIX_OUTPUT_VERBOSE((1, prte_grpcomm_base_framework.framework_output,
"%s grpcomm:base:returning existing collective",
PRTE_NAME_PRINT(PRTE_PROC_MY_NAME)));
return coll;
}
if (NULL != sig->groupID) {
// must match groupID's
if (NULL != coll->sig->groupID && 0 == strcmp(sig->groupID, coll->sig->groupID)) {
PMIX_OUTPUT_VERBOSE((1, prte_grpcomm_base_framework.framework_output,
"%s grpcomm:base:returning existing collective",
PRTE_NAME_PRINT(PRTE_PROC_MY_NAME)));
goto checkmembers;
}
} else if (sig->sz == coll->sig->sz) {
// must match proc signature
if (0 == memcmp(sig->signature, coll->sig->signature, sig->sz * sizeof(pmix_proc_t))) {
PMIX_OUTPUT_VERBOSE((1, prte_grpcomm_base_framework.framework_output,
"%s grpcomm:base:returning existing collective",
PRTE_NAME_PRINT(PRTE_PROC_MY_NAME)));
goto checkmembers;
}
}
}
Expand Down Expand Up @@ -270,6 +269,25 @@ prte_grpcomm_coll_t *prte_grpcomm_base_get_tracker(prte_grpcomm_signature_t *sig
}
}

checkmembers:
// add any addmembers that were given
if (NULL != sig->addmembers) {
if (NULL == coll->sig->addmembers) {
PMIX_PROC_CREATE(coll->sig->addmembers, sig->nmembers);
coll->sig->nmembers = sig->nmembers;
memcpy(coll->sig->addmembers, sig->addmembers, coll->sig->nmembers * sizeof(pmix_proc_t));
} else {
// aggregate them
nmb = coll->sig->nmembers + sig->nmembers;
PMIX_PROC_CREATE(p, nmb);
memcpy(p, coll->sig->addmembers, coll->sig->nmembers * sizeof(pmix_proc_t));
memcpy(&p[coll->sig->nmembers], sig->addmembers, sig->nmembers * sizeof(pmix_proc_t));
PMIX_PROC_FREE(coll->sig->addmembers, coll->sig->nmembers);
coll->sig->addmembers = p;
coll->sig->nmembers = nmb;
}
}

return coll;
}

Expand Down
101 changes: 88 additions & 13 deletions src/mca/grpcomm/direct/grpcomm_direct.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "src/class/pmix_list.h"
#include "src/pmix/pmix-internal.h"

#include "src/prted/pmix/pmix_server_internal.h"
#include "src/mca/errmgr/errmgr.h"
#include "src/rml/rml.h"
#include "src/mca/state/state.h"
Expand Down Expand Up @@ -163,6 +164,7 @@ static void allgather_recv(int status, pmix_proc_t *sender,
int rc, timeout;
size_t n, ninfo, memsize, m;
bool assignID = false;
bool found;
pmix_proc_t *addmembers = NULL;
size_t num_members = 0;
prte_namelist_t *nm;
Expand All @@ -183,8 +185,8 @@ static void allgather_recv(int status, pmix_proc_t *sender,

/* unpack the signature */
rc = prte_grpcomm_sig_unpack(buffer, &sig);
if (PMIX_SUCCESS != rc) {
PMIX_ERROR_LOG(rc);
if (PRTE_SUCCESS != rc) {
PRTE_ERROR_LOG(rc);
}

/* check for the tracker and create it if not found */
Expand Down Expand Up @@ -271,12 +273,25 @@ static void allgather_recv(int status, pmix_proc_t *sender,
/* update the info with the collected value */
info[n].value.type = PMIX_BOOL;
info[n].value.data.flag = coll->assignID;
} else if (PMIX_CHECK_KEY(&info[n], PMIX_GROUP_ADD_MEMBERS)) {
addmembers = (pmix_proc_t*)info[n].value.data.darray->array;
num_members = info[n].value.data.darray->size;
for (m=0; m < num_members; m++) {
}
}

// check for any added members
if (NULL != sig->addmembers) {
// add them to the global collective
for (m=0; m < sig->nmembers; m++) {
// check to see if we already have this member
found = false;
PMIX_LIST_FOREACH(nm, &coll->addmembers, prte_namelist_t) {
if (PMIX_CHECK_PROCID(&nm->name, &sig->addmembers[m])) {
// already have it
found = true;
break;
}
}
if (!found) {
nm = PMIX_NEW(prte_namelist_t);
PMIX_XFER_PROCID(&nm->name, &addmembers[m]);
PMIX_XFER_PROCID(&nm->name, &sig->addmembers[m]);
pmix_list_append(&coll->addmembers, &nm->super);
}
}
Expand Down Expand Up @@ -305,8 +320,16 @@ static void allgather_recv(int status, pmix_proc_t *sender,
PRTE_NAME_PRINT(PRTE_PROC_MY_NAME)));
/* the allgather is complete - send the xcast */
PMIX_DATA_BUFFER_CREATE(reply);

/* if we were asked to provide a context id, do so */
if (assignID) {
coll->sig->ctxid = prte_grpcomm_base.context_id;
--prte_grpcomm_base.context_id;
coll->sig->ctxid_assigned = true;
}

/* pack the signature */
rc = prte_grpcomm_sig_pack(reply, sig);
rc = prte_grpcomm_sig_pack(reply, coll->sig);
if (PMIX_SUCCESS != rc) {
PMIX_ERROR_LOG(rc);
PMIX_DATA_BUFFER_RELEASE(reply);
Expand All @@ -324,12 +347,9 @@ static void allgather_recv(int status, pmix_proc_t *sender,
/* add some values to the payload in the bucket */
PMIX_DATA_BUFFER_CONSTRUCT(&ctrlbuf);

/* if we were asked to provide a context id, do so */
/* if we assigned a context id, include it in the bucket as well */
if (assignID) {
size_t sz;
sz = prte_grpcomm_base.context_id;
--prte_grpcomm_base.context_id;
PMIX_INFO_LOAD(&infostat, PMIX_GROUP_CONTEXT_ID, &sz, PMIX_SIZE);
PMIX_INFO_LOAD(&infostat, PMIX_GROUP_CONTEXT_ID, &coll->sig->ctxid, PMIX_SIZE);
rc = PMIx_Data_pack(NULL, &ctrlbuf, &infostat, 1, PMIX_INFO);
PMIX_INFO_DESTRUCT(&infostat);
if (PMIX_SUCCESS != rc) {
Expand Down Expand Up @@ -383,6 +403,7 @@ static void allgather_recv(int status, pmix_proc_t *sender,
PMIX_RELEASE(sig);
return;
}

/* send the release via xcast */
(void) prte_grpcomm.xcast(sig, PRTE_RML_TAG_COLL_RELEASE, reply);
} else {
Expand Down Expand Up @@ -668,6 +689,14 @@ static void xcast_recv(int status, pmix_proc_t *sender,
PMIX_DATA_BUFFER_DESTRUCT(&datbuf);
}

static void opcbfunc(pmix_status_t status, void *cbdata)
{
prte_pmix_server_op_caddy_t *cd = (prte_pmix_server_op_caddy_t*)cbdata;

PMIX_INFO_FREE(cd->info, cd->ninfo);
PMIX_RELEASE(cd);
}

static void barrier_release(int status, pmix_proc_t *sender,
pmix_data_buffer_t *buffer,
prte_rml_tag_t tag, void *cbdata)
Expand All @@ -676,6 +705,9 @@ static void barrier_release(int status, pmix_proc_t *sender,
int rc, ret;
prte_grpcomm_signature_t *sig = NULL;
prte_grpcomm_coll_t *coll;
pmix_data_array_t darray;
prte_pmix_server_op_caddy_t *cd;
void *values;
PRTE_HIDE_UNUSED_PARAMS(status, sender, tag, cbdata);

PMIX_OUTPUT_VERBOSE((5, prte_grpcomm_base_framework.framework_output,
Expand All @@ -698,6 +730,49 @@ static void barrier_release(int status, pmix_proc_t *sender,
return;
}

// if there are added members, notify any that are local to us
if (NULL != sig->addmembers) {
// we have to handle these even if we did not participate in
// the collective as an added member might be on a non-participating
// node. Start by providing the overall membership
cd = PMIX_NEW(prte_pmix_server_op_caddy_t);
PMIX_INFO_LIST_START(values);

/* protect against infinite loops by marking that this notification was
* passed down to the server by me */
PMIX_INFO_LIST_ADD(rc, values, "prte.notify.donotloop", NULL, PMIX_BOOL);
// add the context ID, if one was given
if (sig->ctxid_assigned) {
PMIX_INFO_LIST_ADD(rc, values, PMIX_GROUP_CONTEXT_ID, &sig->ctxid, PMIX_SIZE);
}
// load the membership
cd->nprocs = sig->sz + sig->nmembers;
PMIX_PROC_CREATE(cd->procs, cd->nprocs);
memcpy(cd->procs, sig->signature, sig->sz * sizeof(pmix_proc_t));
memcpy(&cd->procs[sig->sz], sig->addmembers, sig->nmembers * sizeof(pmix_proc_t));
darray.type = PMIX_PROC;
darray.array = cd->procs;
darray.size = cd->nprocs;
// load the array - note: this copies the array!
PMIX_INFO_LIST_ADD(rc, values, PMIX_GROUP_MEMBERSHIP, &darray, PMIX_DATA_ARRAY);
PMIX_PROC_FREE(cd->procs, cd->nprocs);
// provide the group ID
PMIX_INFO_LIST_ADD(rc, values, PMIX_GROUP_ID, sig->groupID, PMIX_STRING);
// set the range to be only procs that were added
darray.array = sig->addmembers;
darray.size = sig->nmembers;
// load the array - note: this copies the array!
PMIX_INFO_LIST_ADD(rc, values, PMIX_EVENT_CUSTOM_RANGE, &darray, PMIX_DATA_ARRAY);
// convert the list to an array
PMIX_INFO_LIST_CONVERT(rc, values, &darray);
cd->info = (pmix_info_t*)darray.array;
cd->ninfo = darray.size;
PMIX_INFO_LIST_RELEASE(values);
// notify local procs
PMIx_Notify_event(PMIX_GROUP_INVITED, PRTE_PROC_MY_NAME, PMIX_RANGE_CUSTOM,
cd->info, cd->ninfo, opcbfunc, cd);
}

/* check for the tracker - it is not an error if not
* found as that just means we wre not involved
* in the collective */
Expand Down
4 changes: 4 additions & 0 deletions src/mca/grpcomm/grpcomm.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,12 @@ typedef int (*prte_grpcomm_rbcast_cb_t)(pmix_data_buffer_t *buffer);
typedef struct {
pmix_object_t super;
char *groupID;
size_t ctxid;
bool ctxid_assigned;
pmix_proc_t *signature;
size_t sz;
pmix_proc_t *addmembers;
size_t nmembers;
} prte_grpcomm_signature_t;
PRTE_EXPORT PMIX_CLASS_DECLARATION(prte_grpcomm_signature_t);

Expand Down
1 change: 1 addition & 0 deletions src/mca/state/dvm/state_dvm.c
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,7 @@ static void vm_ready(int fd, short args, void *cbdata)
}

/* goes to all daemons */
PMIX_CONSTRUCT(&sig, prte_grpcomm_signature_t);
PMIX_PROC_CREATE(sig.signature, 1);
PMIX_LOAD_PROCID(&sig.signature[0], PRTE_PROC_MY_NAME->nspace, PMIX_RANK_WILDCARD);
sig.sz = 1;
Expand Down
3 changes: 2 additions & 1 deletion src/prted/pmix/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,5 @@ libprrte_la_SOURCES += \
prted/pmix/pmix_server_gen.c \
prted/pmix/pmix_server_queries.c \
prted/pmix/pmix_server_allocate.c \
prted/pmix/pmix_server_session.c
prted/pmix/pmix_server_session.c \
prted/pmix/pmix_server_group.c
Loading

0 comments on commit cb87ece

Please sign in to comment.