Skip to content

Commit

Permalink
Use the provided groupID as the grpcomm signature
Browse files Browse the repository at this point in the history
If the user provides a groupID (e.g., in a PMIx_Group op),
then use that as the signature of any involved grpcomm
operations instead of the proc array. This allows the same
collection of procs to participate in multiple PMIx_Group
operations at the same time since PMIx requires the user
provide a unique grpID for each operation.

Signed-off-by: Ralph Castain <[email protected]>
  • Loading branch information
rhc54 committed Feb 17, 2024
1 parent 31c948f commit 8822c96
Show file tree
Hide file tree
Showing 12 changed files with 224 additions and 127 deletions.
6 changes: 5 additions & 1 deletion src/mca/grpcomm/base/grpcomm_base_frame.c
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* Copyright (c) 2015-2019 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
* Copyright (c) 2020 Cisco Systems, Inc. All rights reserved
* Copyright (c) 2021-2023 Nanook Consulting All rights reserved.
* Copyright (c) 2021-2024 Nanook Consulting All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
Expand Down Expand Up @@ -126,11 +126,15 @@ PMIX_CLASS_INSTANCE(prte_grpcomm_base_active_t,

static void scon(prte_grpcomm_signature_t *p)
{
p->groupID = NULL;
p->signature = NULL;
p->sz = 0;
}
static void sdes(prte_grpcomm_signature_t *p)
{
if (NULL != p->groupID) {
free(p->groupID);
}
if (NULL != p->signature) {
free(p->signature);
}
Expand Down
70 changes: 47 additions & 23 deletions src/mca/grpcomm/base/grpcomm_base_stubs.c
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* Copyright (c) 2017-2018 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
* Copyright (c) 2020 Cisco Systems, Inc. All rights reserved
* Copyright (c) 2021-2023 Nanook Consulting All rights reserved.
* Copyright (c) 2021-2024 Nanook Consulting All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
Expand Down Expand Up @@ -121,8 +121,13 @@ static void allgather_stub(int fd, short args, void *cbdata)
/* retrieve an existing tracker, create it if not
* already found. The allgather module is responsible
* for releasing it upon completion of the collective */
ret = pmix_hash_table_get_value_ptr(&prte_grpcomm_base.sig_table, (void *) cd->sig->signature,
cd->sig->sz * sizeof(pmix_proc_t), (void **) &seq_number);
if (NULL != cd->sig->groupID) {
ret = pmix_hash_table_get_value_ptr(&prte_grpcomm_base.sig_table, (void *) cd->sig->groupID,
strlen(cd->sig->groupID), (void **) &seq_number);
} else {
ret = pmix_hash_table_get_value_ptr(&prte_grpcomm_base.sig_table, (void *) cd->sig->signature,
cd->sig->sz * sizeof(pmix_proc_t), (void **) &seq_number);
}
if (PMIX_ERR_NOT_FOUND == ret) {
seq_number = (uint32_t *) malloc(sizeof(uint32_t));
*seq_number = 0;
Expand All @@ -136,8 +141,13 @@ static void allgather_stub(int fd, short args, void *cbdata)
PMIX_RELEASE(cd);
return;
}
ret = pmix_hash_table_set_value_ptr(&prte_grpcomm_base.sig_table, (void *) cd->sig->signature,
cd->sig->sz * sizeof(pmix_proc_t), (void *) seq_number);
if (NULL != cd->sig->groupID) {
ret = pmix_hash_table_set_value_ptr(&prte_grpcomm_base.sig_table, (void *) cd->sig->groupID,
strlen(cd->sig->groupID), (void *) seq_number);
} else {
ret = pmix_hash_table_set_value_ptr(&prte_grpcomm_base.sig_table, (void *) cd->sig->signature,
cd->sig->sz * sizeof(pmix_proc_t), (void *) seq_number);
}
if (PMIX_SUCCESS != ret) {
PMIX_OUTPUT((prte_grpcomm_base_framework.framework_output,
"%s rpcomm:base:allgather cannot add new signature to hash table",
Expand Down Expand Up @@ -187,23 +197,37 @@ prte_grpcomm_coll_t *prte_grpcomm_base_get_tracker(prte_grpcomm_signature_t *sig
int rc;
size_t n;

/* search the existing tracker list to see if this already exists */
/* search the existing tracker list to see if this already exists - we
* default to using the groupID if one is given, otherwise we fallback
* to the array of participating procs */
PMIX_LIST_FOREACH(coll, &prte_grpcomm_base.ongoing, prte_grpcomm_coll_t) {
if (NULL == sig->signature) {
if (NULL == coll->sig->signature) {
if (NULL == sig->groupID && NULL == sig->signature) {
if (NULL == coll->sig->groupID && NULL == coll->sig->signature) {
/* only one collective can operate at a time
* across every process in the system */
return coll;
}
/* if only one is NULL, then we can't possibly match */
break;
}
if (sig->sz == coll->sig->sz &&
0 == memcmp(sig->signature, coll->sig->signature, sig->sz * sizeof(pmix_proc_t))) {
PMIX_OUTPUT_VERBOSE((1, prte_grpcomm_base_framework.framework_output,
"%s grpcomm:base:returning existing collective",
PRTE_NAME_PRINT(PRTE_PROC_MY_NAME)));
return coll;
if (sig->sz == coll->sig->sz) {
if (NULL != sig->groupID) {
// must match groupID's
if (NULL != coll->sig->groupID && 0 == strcmp(sig->groupID, coll->sig->groupID)) {
PMIX_OUTPUT_VERBOSE((1, prte_grpcomm_base_framework.framework_output,
"%s grpcomm:base:returning existing collective",
PRTE_NAME_PRINT(PRTE_PROC_MY_NAME)));
return coll;
}
} else {
// must match proc signature
if (0 == memcmp(sig->signature, coll->sig->signature, sig->sz * sizeof(pmix_proc_t))) {
PMIX_OUTPUT_VERBOSE((1, prte_grpcomm_base_framework.framework_output,
"%s grpcomm:base:returning existing collective",
PRTE_NAME_PRINT(PRTE_PROC_MY_NAME)));
return coll;
}
}
}
}
/* if we get here, then this is a new collective - so create
Expand All @@ -217,6 +241,10 @@ prte_grpcomm_coll_t *prte_grpcomm_base_get_tracker(prte_grpcomm_signature_t *sig
}
coll = PMIX_NEW(prte_grpcomm_coll_t);
coll->sig = PMIX_NEW(prte_grpcomm_signature_t);
if (NULL != sig->groupID) {
coll->sig->groupID = strdup(sig->groupID);
}
// we have to know the participating procs
coll->sig->sz = sig->sz;
coll->sig->signature = (pmix_proc_t *) malloc(coll->sig->sz * sizeof(pmix_proc_t));
memcpy(coll->sig->signature, sig->signature, coll->sig->sz * sizeof(pmix_proc_t));
Expand Down Expand Up @@ -387,23 +415,19 @@ static int pack_xcast(prte_grpcomm_signature_t *sig, pmix_data_buffer_t *buffer,
bool compressed;
pmix_byte_object_t bo;
size_t sz;
uint8_t flag;

/* setup an intermediate buffer */
PMIX_DATA_BUFFER_CONSTRUCT(&data);

/* pass along the signature */
rc = PMIx_Data_pack(NULL, &data, &sig->sz, 1, PMIX_SIZE);
if (PMIX_SUCCESS != rc) {
PMIX_ERROR_LOG(rc);
PMIX_DATA_BUFFER_DESTRUCT(&data);
return rc;
}
rc = PMIx_Data_pack(NULL, &data, sig->signature, sig->sz, PMIX_PROC);
if (PMIX_SUCCESS != rc) {
PMIX_ERROR_LOG(rc);
rc = prte_grpcomm_sig_pack(&data, sig);
if (PRTE_SUCCESS != rc) {
PRTE_ERROR_LOG(rc);
PMIX_DATA_BUFFER_DESTRUCT(&data);
return rc;
}

/* pass the final tag */
rc = PMIx_Data_pack(NULL, &data, &tag, 1, PRTE_RML_TAG);
if (PMIX_SUCCESS != rc) {
Expand Down
Loading

0 comments on commit 8822c96

Please sign in to comment.