diff --git a/libfabric.vcxproj b/libfabric.vcxproj
index 3eef3ef0521..9acba798776 100644
--- a/libfabric.vcxproj
+++ b/libfabric.vcxproj
@@ -886,8 +886,8 @@
+
-
diff --git a/prov/efa/Makefile.include b/prov/efa/Makefile.include
index 980f3430644..db5e44df1f0 100644
--- a/prov/efa/Makefile.include
+++ b/prov/efa/Makefile.include
@@ -49,8 +49,8 @@ _efa_files = \
prov/efa/src/efa_cntr.c \
prov/efa/src/efa_msg.c \
prov/efa/src/efa_rma.c \
+ prov/efa/src/efa_cq.c \
prov/efa/src/dgram/efa_dgram_ep.c \
- prov/efa/src/dgram/efa_dgram_cq.c \
prov/efa/src/rdm/efa_rdm_peer.c \
prov/efa/src/rdm/efa_rdm_cq.c \
prov/efa/src/rdm/efa_rdm_ep_utils.c \
@@ -95,7 +95,6 @@ _efa_headers = \
prov/efa/src/efa_env.h \
prov/efa/src/fi_ext_efa.h \
prov/efa/src/dgram/efa_dgram_ep.h \
- prov/efa/src/dgram/efa_dgram_cq.h \
prov/efa/src/rdm/efa_rdm_peer.h \
prov/efa/src/rdm/efa_rdm_cq.h \
prov/efa/src/rdm/efa_rdm_ep.h \
diff --git a/prov/efa/src/dgram/efa_dgram_cq.c b/prov/efa/src/dgram/efa_dgram_cq.c
deleted file mode 100644
index d046549bd66..00000000000
--- a/prov/efa/src/dgram/efa_dgram_cq.c
+++ /dev/null
@@ -1,339 +0,0 @@
-/* SPDX-License-Identifier: BSD-2-Clause OR GPL-2.0-only */
-/* SPDX-FileCopyrightText: Copyright (c) 2013-2015 Intel Corporation, Inc. All rights reserved. */
-/* SPDX-FileCopyrightText: Copyright Amazon.com, Inc. or its affiliates. All rights reserved. */
-
-#include
-#include
-#include "config.h"
-#include
-#include "dgram/efa_dgram_ep.h"
-#include "efa.h"
-#include "efa_cq.h"
-#include "efa_av.h"
-#include "efa_dgram_cq.h"
-#include
-
-struct efa_wc {
- struct ibv_wc ibv_wc;
- /* Source address */
- uint16_t efa_ah;
-};
-
-struct efa_wce {
- struct slist_entry entry;
- struct efa_wc wc;
-};
-
-#define EFA_WCE_CNT 1024
-
-static inline uint64_t efa_dgram_cq_opcode_to_fi_flags(enum ibv_wc_opcode opcode) {
- switch (opcode) {
- case IBV_WC_SEND:
- return FI_SEND | FI_MSG;
- case IBV_WC_RECV:
- return FI_RECV | FI_MSG;
- default:
- assert(0);
- return 0;
- }
-}
-
-static inline uint32_t efa_dgram_cq_api_version(struct efa_dgram_cq *cq) {
- return cq->domain->fabric->util_fabric.fabric_fid.api_version;
-}
-
-ssize_t efa_dgram_cq_readerr(struct fid_cq *cq_fid, struct fi_cq_err_entry *entry,
- uint64_t flags)
-{
- struct efa_dgram_cq *cq;
- uint32_t api_version;
-
- cq = container_of(cq_fid, struct efa_dgram_cq, util_cq.cq_fid);
-
- ofi_spin_lock(&cq->lock);
-
- if (!cq->ibv_cq_ex->status)
- goto err;
-
- api_version = efa_dgram_cq_api_version(cq);
-
- entry->op_context = (void *)(uintptr_t)cq->ibv_cq_ex->wr_id;
- entry->flags = efa_dgram_cq_opcode_to_fi_flags(ibv_wc_read_opcode(cq->ibv_cq_ex));
- entry->err = FI_EIO;
- entry->prov_errno = ibv_wc_read_vendor_err(cq->ibv_cq_ex);
- EFA_WARN(FI_LOG_CQ, "Work completion status: %s\n", efa_strerror(entry->prov_errno));
-
- ofi_spin_unlock(&cq->lock);
-
- /* We currently don't have err_data to give back to the user. */
- if (FI_VERSION_GE(api_version, FI_VERSION(1, 5)))
- entry->err_data_size = 0;
-
- return sizeof(*entry);
-err:
- ofi_spin_unlock(&cq->lock);
- return -FI_EAGAIN;
-}
-
-static void efa_dgram_cq_read_context_entry(struct ibv_cq_ex *ibv_cqx, int i, void *buf)
-{
- struct fi_cq_entry *entry = buf;
-
- entry[i].op_context = (void *)ibv_cqx->wr_id;
-}
-
-static void efa_dgram_cq_read_msg_entry(struct ibv_cq_ex *ibv_cqx, int i, void *buf)
-{
- struct fi_cq_msg_entry *entry = buf;
-
- entry[i].op_context = (void *)(uintptr_t)ibv_cqx->wr_id;
- entry[i].flags = efa_dgram_cq_opcode_to_fi_flags(ibv_wc_read_opcode(ibv_cqx));
- entry[i].len = ibv_wc_read_byte_len(ibv_cqx);
-}
-
-static void efa_dgram_cq_read_data_entry(struct ibv_cq_ex *ibv_cqx, int i, void *buf)
-{
- struct fi_cq_data_entry *entry = buf;
-
- entry[i].op_context = (void *)ibv_cqx->wr_id;
- entry[i].flags = efa_dgram_cq_opcode_to_fi_flags(ibv_wc_read_opcode(ibv_cqx));
- entry[i].data = 0;
- entry[i].len = ibv_wc_read_byte_len(ibv_cqx);
-}
-
-/**
- * @brief Convert an error code from CQ poll API, e.g. `ibv_start_poll`, `ibv_end_poll`.
- * The returned error code must be 0 (success) or negative (error).
- * As a special case, if input error code is ENOENT (there was no item on CQ), we should return -FI_EAGAIN.
- * @param[in] err Return value from `ibv_start_poll` or `ibv_end_poll`
- * @returns Converted error code
- */
-static inline ssize_t efa_dgram_cq_ibv_poll_error_to_fi_error(ssize_t err) {
- if (err == ENOENT) {
- return -FI_EAGAIN;
- }
-
- if (err > 0) {
- return -err;
- }
-
- return err;
-}
-
-ssize_t efa_dgram_cq_readfrom(struct fid_cq *cq_fid, void *buf, size_t count,
- fi_addr_t *src_addr)
-{
- bool should_end_poll = false;
- struct efa_dgram_cq *cq;
- struct efa_av *av;
- ssize_t err = 0;
- size_t num_cqe = 0; /* Count of read entries */
- uint32_t qp_num, src_qp, slid;
-
- /* Initialize an empty ibv_poll_cq_attr struct for ibv_start_poll.
- * EFA expects .comp_mask = 0, or otherwise returns EINVAL.
- */
- struct ibv_poll_cq_attr poll_cq_attr = {.comp_mask = 0};
-
- cq = container_of(cq_fid, struct efa_dgram_cq, util_cq.cq_fid);
-
- ofi_spin_lock(&cq->lock);
-
- /* Call ibv_start_poll only once regardless of count == 0 */
- err = ibv_start_poll(cq->ibv_cq_ex, &poll_cq_attr);
- should_end_poll = !err;
-
- while (!err && num_cqe < count) {
- if (cq->ibv_cq_ex->status) {
- err = -FI_EAVAIL;
- break;
- }
-
- if (src_addr) {
- qp_num = ibv_wc_read_qp_num(cq->ibv_cq_ex);
- src_qp = ibv_wc_read_src_qp(cq->ibv_cq_ex);
- slid = ibv_wc_read_slid(cq->ibv_cq_ex);
- av = cq->domain->qp_table[qp_num & cq->domain->qp_table_sz_m1]->base_ep->av;
-
- src_addr[num_cqe] = efa_av_reverse_lookup_dgram(av, slid, src_qp);
- }
-
- cq->read_entry(cq->ibv_cq_ex, num_cqe, buf);
- num_cqe++;
-
- err = ibv_next_poll(cq->ibv_cq_ex);
- }
-
- err = efa_dgram_cq_ibv_poll_error_to_fi_error(err);
-
- if (should_end_poll)
- ibv_end_poll(cq->ibv_cq_ex);
-
- ofi_spin_unlock(&cq->lock);
-
- return num_cqe ? num_cqe : err;
-}
-
-static const char *efa_dgram_cq_strerror(struct fid_cq *cq_fid,
- int prov_errno,
- const void *err_data,
- char *buf, size_t len)
-{
- return err_data
- ? (const char *) err_data
- : efa_strerror(prov_errno);
-}
-
-static struct fi_ops_cq efa_dgram_cq_ops = {
- .size = sizeof(struct fi_ops_cq),
- .read = ofi_cq_read,
- .readfrom = ofi_cq_readfrom,
- .readerr = ofi_cq_readerr,
- .sread = fi_no_cq_sread,
- .sreadfrom = fi_no_cq_sreadfrom,
- .signal = fi_no_cq_signal,
- .strerror = efa_dgram_cq_strerror
-};
-
-static int efa_dgram_cq_control(fid_t fid, int command, void *arg)
-{
- int ret = 0;
-
- switch (command) {
- default:
- ret = -FI_ENOSYS;
- break;
- }
-
- return ret;
-}
-
-static int efa_dgram_cq_close(fid_t fid)
-{
- struct efa_dgram_cq *cq;
- int ret;
-
- cq = container_of(fid, struct efa_dgram_cq, util_cq.cq_fid.fid);
-
- ofi_bufpool_destroy(cq->wce_pool);
-
- ofi_spin_destroy(&cq->lock);
-
- ret = -ibv_destroy_cq(ibv_cq_ex_to_cq(cq->ibv_cq_ex));
- if (ret)
- return ret;
-
- ret = ofi_cq_cleanup(&cq->util_cq);
- if (ret)
- return ret;
-
- free(cq);
-
- return 0;
-}
-
-static struct fi_ops efa_dgram_cq_fi_ops = {
- .size = sizeof(struct fi_ops),
- .close = efa_dgram_cq_close,
- .bind = fi_no_bind,
- .control = efa_dgram_cq_control,
- .ops_open = fi_no_ops_open,
-};
-
-/**
- * @brief Create and set cq->ibv_cq_ex
- *
- * @param[in] cq Pointer to the efa_dgram_cq. cq->ibv_cq_ex must be NULL.
- * @param[in] attr Pointer to fi_cq_attr.
- * @param[out] Return code = 0 if successful, or negative otherwise.
- */
-static inline int efa_dgram_cq_set_ibv_cq_ex(struct efa_dgram_cq *cq, struct fi_cq_attr *attr)
-{
- enum ibv_cq_ex_type ibv_cq_ex_type;
-
- if (cq->ibv_cq_ex) {
- EFA_WARN(FI_LOG_CQ, "CQ already has attached ibv_cq_ex\n");
- return -FI_EALREADY;
- }
-
- return efa_cq_ibv_cq_ex_open(attr, cq->domain->device->ibv_ctx,
- &cq->ibv_cq_ex, &ibv_cq_ex_type);
-}
-
-int efa_dgram_cq_open(struct fid_domain *domain_fid, struct fi_cq_attr *attr,
- struct fid_cq **cq_fid, void *context)
-{
- struct efa_dgram_cq *cq;
- int err;
-
- if (attr->wait_obj != FI_WAIT_NONE)
- return -FI_ENOSYS;
-
- cq = calloc(1, sizeof(*cq));
- if (!cq)
- return -FI_ENOMEM;
-
- err = ofi_cq_init(&efa_prov, domain_fid, attr, &cq->util_cq,
- &ofi_cq_progress, context);
- if (err) {
- EFA_WARN(FI_LOG_CQ, "Unable to create UTIL_CQ\n");
- goto err_free_cq;
- }
-
- cq->domain = container_of(domain_fid, struct efa_domain,
- util_domain.domain_fid);
-
- err = efa_dgram_cq_set_ibv_cq_ex(cq, attr);
- if (err) {
- EFA_WARN(FI_LOG_CQ, "Unable to create extended CQ\n");
- err = -FI_EINVAL;
- goto err_free_util_cq;
- }
-
- err = ofi_bufpool_create(&cq->wce_pool, sizeof(struct efa_wce), 16, 0,
- EFA_WCE_CNT, 0);
- if (err) {
- EFA_WARN(FI_LOG_CQ, "Failed to create wce_pool\n");
- goto err_destroy_cq;
- }
-
- switch (attr->format) {
- case FI_CQ_FORMAT_UNSPEC:
- case FI_CQ_FORMAT_CONTEXT:
- cq->read_entry = efa_dgram_cq_read_context_entry;
- cq->entry_size = sizeof(struct fi_cq_entry);
- break;
- case FI_CQ_FORMAT_MSG:
- cq->read_entry = efa_dgram_cq_read_msg_entry;
- cq->entry_size = sizeof(struct fi_cq_msg_entry);
- break;
- case FI_CQ_FORMAT_DATA:
- cq->read_entry = efa_dgram_cq_read_data_entry;
- cq->entry_size = sizeof(struct fi_cq_data_entry);
- break;
- case FI_CQ_FORMAT_TAGGED:
- default:
- err = -FI_ENOSYS;
- goto err_destroy_pool;
- }
-
- ofi_spin_init(&cq->lock);
-
- *cq_fid = &cq->util_cq.cq_fid;
- (*cq_fid)->fid.fclass = FI_CLASS_CQ;
- (*cq_fid)->fid.context = context;
- (*cq_fid)->fid.ops = &efa_dgram_cq_fi_ops;
- (*cq_fid)->ops = &efa_dgram_cq_ops;
-
- return 0;
-
-err_destroy_pool:
- ofi_bufpool_destroy(cq->wce_pool);
-err_destroy_cq:
- ibv_destroy_cq(ibv_cq_ex_to_cq(cq->ibv_cq_ex));
-err_free_util_cq:
- ofi_cq_cleanup(&cq->util_cq);
-err_free_cq:
- free(cq);
- return err;
-}
diff --git a/prov/efa/src/dgram/efa_dgram_cq.h b/prov/efa/src/dgram/efa_dgram_cq.h
deleted file mode 100644
index fbb986d3f72..00000000000
--- a/prov/efa/src/dgram/efa_dgram_cq.h
+++ /dev/null
@@ -1,28 +0,0 @@
-/* SPDX-License-Identifier: BSD-2-Clause OR GPL-2.0-only */
-/* SPDX-FileCopyrightText: Copyright Amazon.com, Inc. or its affiliates. All rights reserved. */
-
-#ifndef EFA_DGRAM_CQ_H
-#define EFA_DGRAM_CQ_H
-
-typedef void (*efa_dgram_cq_read_entry)(struct ibv_cq_ex *ibv_cqx, int index, void *buf);
-
-struct efa_dgram_cq {
- struct util_cq util_cq;
- struct efa_domain *domain;
- size_t entry_size;
- efa_dgram_cq_read_entry read_entry;
- ofi_spin_t lock;
- struct ofi_bufpool *wce_pool;
- uint32_t flags; /* User defined capability mask */
-
- struct ibv_cq_ex *ibv_cq_ex;
-};
-
-int efa_dgram_cq_open(struct fid_domain *domain_fid, struct fi_cq_attr *attr,
- struct fid_cq **cq_fid, void *context);
-
-ssize_t efa_dgram_cq_readfrom(struct fid_cq *cq_fid, void *buf, size_t count, fi_addr_t *src_addr);
-
-ssize_t efa_dgram_cq_readerr(struct fid_cq *cq_fid, struct fi_cq_err_entry *entry, uint64_t flags);
-
-#endif
\ No newline at end of file
diff --git a/prov/efa/src/dgram/efa_dgram_ep.c b/prov/efa/src/dgram/efa_dgram_ep.c
index 635d5e7a9b6..3119b8bee72 100644
--- a/prov/efa/src/dgram/efa_dgram_ep.c
+++ b/prov/efa/src/dgram/efa_dgram_ep.c
@@ -4,12 +4,11 @@
#include "config.h"
#include "efa_dgram_ep.h"
-#include "efa_dgram_cq.h"
#include "efa.h"
#include "efa_av.h"
+#include "efa_cq.h"
#include
-#define efa_dgram_cq_PROGRESS_ENTRIES 500
static int efa_dgram_ep_getopt(fid_t fid, int level, int optname,
void *optval, size_t *optlen)
@@ -71,8 +70,9 @@ static int efa_dgram_ep_close(fid_t fid)
static int efa_dgram_ep_bind(struct fid *fid, struct fid *bfid, uint64_t flags)
{
struct efa_dgram_ep *ep;
- struct efa_dgram_cq *cq;
+ struct efa_cq *cq;
struct efa_av *av;
+ struct efa_domain *efa_domain;
struct util_eq *eq;
struct util_cntr *cntr;
int ret;
@@ -94,24 +94,15 @@ static int efa_dgram_ep_bind(struct fid *fid, struct fid *bfid, uint64_t flags)
if (!(flags & (FI_RECV | FI_TRANSMIT)))
return -FI_EBADFLAGS;
- cq = container_of(bfid, struct efa_dgram_cq, util_cq.cq_fid);
- if (ep->base_ep.domain != cq->domain)
+ cq = container_of(bfid, struct efa_cq, util_cq.cq_fid);
+ efa_domain = container_of(cq->util_cq.domain, struct efa_domain, util_domain);
+ if (ep->base_ep.domain != efa_domain)
return -FI_EINVAL;
ret = ofi_ep_bind_cq(&ep->base_ep.util_ep, &cq->util_cq, flags);
if (ret)
return ret;
- if (flags & FI_RECV) {
- if (ep->rcq)
- return -EINVAL;
- ep->rcq = cq;
- }
- if (flags & FI_TRANSMIT) {
- if (ep->scq)
- return -EINVAL;
- ep->scq = cq;
- }
break;
case FI_CLASS_AV:
av = container_of(bfid, struct efa_av, util_av.av_fid.fid);
@@ -186,46 +177,47 @@ static int efa_dgram_ep_setflags(struct fid_ep *ep_fid, uint64_t flags)
static int efa_dgram_ep_enable(struct fid_ep *ep_fid)
{
struct ibv_qp_init_attr_ex attr_ex = { 0 };
- struct ibv_pd *ibv_pd;
struct efa_dgram_ep *ep;
+ struct efa_cq *scq, *rcq;
int err;
ep = container_of(ep_fid, struct efa_dgram_ep, base_ep.util_ep.ep_fid);
- if (!ep->scq && !ep->rcq) {
+ scq = ep->base_ep.util_ep.tx_cq ? container_of(ep->base_ep.util_ep.tx_cq, struct efa_cq, util_cq) : NULL;
+ rcq = ep->base_ep.util_ep.rx_cq ? container_of(ep->base_ep.util_ep.rx_cq, struct efa_cq, util_cq) : NULL;
+
+ if (!scq && !rcq) {
EFA_WARN(FI_LOG_EP_CTRL,
"Endpoint is not bound to a send or receive completion queue\n");
return -FI_ENOCQ;
}
- if (!ep->scq && ofi_send_allowed(ep->base_ep.info->caps)) {
+ if (!scq && ofi_needs_tx(ep->base_ep.info->caps)) {
EFA_WARN(FI_LOG_EP_CTRL,
"Endpoint is not bound to a send completion queue when it has transmit capabilities enabled (FI_SEND).\n");
return -FI_ENOCQ;
}
- if (!ep->rcq && ofi_recv_allowed(ep->base_ep.info->caps)) {
+ if (!rcq && ofi_needs_rx(ep->base_ep.info->caps)) {
EFA_WARN(FI_LOG_EP_CTRL,
"Endpoint is not bound to a receive completion queue when it has receive capabilities enabled. (FI_RECV)\n");
return -FI_ENOCQ;
}
- if (ep->scq) {
+ if (scq) {
attr_ex.cap.max_send_wr = ep->base_ep.info->tx_attr->size;
attr_ex.cap.max_send_sge = ep->base_ep.info->tx_attr->iov_limit;
- attr_ex.send_cq = ibv_cq_ex_to_cq(ep->scq->ibv_cq_ex);
- ibv_pd = ep->scq->domain->ibv_pd;
+ attr_ex.send_cq = ibv_cq_ex_to_cq(scq->ibv_cq.ibv_cq_ex);
} else {
- attr_ex.send_cq = ibv_cq_ex_to_cq(ep->rcq->ibv_cq_ex);
- ibv_pd = ep->rcq->domain->ibv_pd;
+ attr_ex.send_cq = ibv_cq_ex_to_cq(rcq->ibv_cq.ibv_cq_ex);
}
- if (ep->rcq) {
+ if (rcq) {
attr_ex.cap.max_recv_wr = ep->base_ep.info->rx_attr->size;
attr_ex.cap.max_recv_sge = ep->base_ep.info->rx_attr->iov_limit;
- attr_ex.recv_cq = ibv_cq_ex_to_cq(ep->rcq->ibv_cq_ex);
+ attr_ex.recv_cq = ibv_cq_ex_to_cq(rcq->ibv_cq.ibv_cq_ex);
} else {
- attr_ex.recv_cq = ibv_cq_ex_to_cq(ep->scq->ibv_cq_ex);
+ attr_ex.recv_cq = ibv_cq_ex_to_cq(scq->ibv_cq.ibv_cq_ex);
}
attr_ex.cap.max_inline_data =
@@ -234,7 +226,7 @@ static int efa_dgram_ep_enable(struct fid_ep *ep_fid)
assert(EFA_EP_TYPE_IS_DGRAM(ep->base_ep.domain->info));
attr_ex.qp_type = IBV_QPT_UD;
attr_ex.comp_mask = IBV_QP_INIT_ATTR_PD;
- attr_ex.pd = ibv_pd;
+ attr_ex.pd = container_of(ep->base_ep.util_ep.domain, struct efa_domain, util_domain)->ibv_pd;
attr_ex.qp_context = ep;
attr_ex.sq_sig_all = 1;
@@ -277,89 +269,19 @@ static struct fi_ops efa_dgram_ep_ops = {
.ops_open = fi_no_ops_open,
};
-static void efa_dgram_ep_progress_internal(struct efa_dgram_ep *ep, struct efa_dgram_cq *efa_dgram_cq)
+/**
+ * @brief progress engine for the EFA dgram endpoint
+ *
+ * This function now a no-op.
+ *
+ * @param[in] util_ep The endpoint FID to progress
+ */
+static
+void efa_ep_progress_no_op(struct util_ep *util_ep)
{
- struct util_cq *cq;
- struct fi_cq_tagged_entry cq_entry[efa_dgram_cq_PROGRESS_ENTRIES] = {0};
- struct fi_cq_tagged_entry *temp_cq_entry;
- struct fi_cq_err_entry cq_err_entry = {0};
- fi_addr_t src_addr[efa_dgram_cq_PROGRESS_ENTRIES];
- uint64_t flags;
- int i;
- ssize_t ret, err;
-
- cq = &efa_dgram_cq->util_cq;
- flags = ep->base_ep.util_ep.caps;
-
- VALGRIND_MAKE_MEM_DEFINED(&cq_entry, sizeof(cq_entry));
-
- ret = efa_dgram_cq_readfrom(&cq->cq_fid, cq_entry, efa_dgram_cq_PROGRESS_ENTRIES,
- (flags & FI_SOURCE) ? src_addr : NULL);
- if (ret == -FI_EAGAIN)
- return;
-
- if (OFI_UNLIKELY(ret < 0)) {
- if (OFI_UNLIKELY(ret != -FI_EAVAIL)) {
- EFA_WARN(FI_LOG_CQ, "no error available errno: %ld\n", ret);
- efa_base_ep_write_eq_error(&ep->base_ep, -ret, FI_EFA_ERR_DGRAM_CQ_READ);
- return;
- }
-
- err = efa_dgram_cq_readerr(&cq->cq_fid, &cq_err_entry, flags);
- if (OFI_UNLIKELY(err < 0)) {
- EFA_WARN(FI_LOG_CQ, "unable to read error entry errno: %ld\n", err);
- efa_base_ep_write_eq_error(&ep->base_ep, FI_EIO, cq_err_entry.prov_errno);
- return;
- }
-
- ofi_cq_write_error(cq, &cq_err_entry);
- return;
- }
-
- temp_cq_entry = (struct fi_cq_tagged_entry *)cq_entry;
- for (i = 0; i < ret; i++) {
- (flags & FI_SOURCE) ?
- ofi_cq_write_src(cq, temp_cq_entry->op_context,
- temp_cq_entry->flags,
- temp_cq_entry->len,
- temp_cq_entry->buf,
- temp_cq_entry->data,
- temp_cq_entry->tag,
- src_addr[i]) :
- ofi_cq_write(cq, temp_cq_entry->op_context,
- temp_cq_entry->flags,
- temp_cq_entry->len,
- temp_cq_entry->buf,
- temp_cq_entry->data,
- temp_cq_entry->tag);
-
- temp_cq_entry = (struct fi_cq_tagged_entry *)
- ((uint8_t *)temp_cq_entry + efa_dgram_cq->entry_size);
- }
return;
}
-void efa_dgram_ep_progress(struct util_ep *ep)
-{
- struct efa_dgram_ep *efa_dgram_ep;
- struct efa_dgram_cq *rcq;
- struct efa_dgram_cq *scq;
-
- efa_dgram_ep = container_of(ep, struct efa_dgram_ep, base_ep.util_ep);
- rcq = efa_dgram_ep->rcq;
- scq = efa_dgram_ep->scq;
-
- ofi_genlock_lock(&ep->lock);
-
- if (rcq)
- efa_dgram_ep_progress_internal(efa_dgram_ep, rcq);
-
- if (scq && scq != rcq)
- efa_dgram_ep_progress_internal(efa_dgram_ep, scq);
-
- ofi_genlock_unlock(&ep->lock);
-}
-
static struct fi_ops_atomic efa_dgram_ep_atomic_ops = {
.size = sizeof(struct fi_ops_atomic),
.write = fi_no_atomic_write,
@@ -433,7 +355,7 @@ int efa_dgram_ep_open(struct fid_domain *domain_fid, struct fi_info *user_info,
if (!ep)
return -FI_ENOMEM;
- ret = efa_base_ep_construct(&ep->base_ep, domain_fid, user_info, efa_dgram_ep_progress, context);
+ ret = efa_base_ep_construct(&ep->base_ep, domain_fid, user_info, efa_ep_progress_no_op, context);
if (ret)
goto err_ep_destroy;
diff --git a/prov/efa/src/dgram/efa_dgram_ep.h b/prov/efa/src/dgram/efa_dgram_ep.h
index b01db81f57e..18ab0dc8703 100644
--- a/prov/efa/src/dgram/efa_dgram_ep.h
+++ b/prov/efa/src/dgram/efa_dgram_ep.h
@@ -8,9 +8,6 @@
struct efa_dgram_ep {
struct efa_base_ep base_ep;
-
- struct efa_dgram_cq *rcq;
- struct efa_dgram_cq *scq;
};
int efa_dgram_ep_open(struct fid_domain *domain_fid, struct fi_info *info,
diff --git a/prov/efa/src/efa_av.c b/prov/efa/src/efa_av.c
index 5ee81de7ebd..4b1d2f70442 100644
--- a/prov/efa/src/efa_av.c
+++ b/prov/efa/src/efa_av.c
@@ -53,7 +53,7 @@ struct efa_conn *efa_av_addr_to_conn(struct efa_av *av, fi_addr_t fi_addr)
struct util_av_entry *util_av_entry;
struct efa_av_entry *efa_av_entry;
- if (OFI_UNLIKELY(fi_addr == FI_ADDR_UNSPEC))
+ if (OFI_UNLIKELY(fi_addr == FI_ADDR_UNSPEC || fi_addr == FI_ADDR_NOTAVAIL))
return NULL;
if (av->type == FI_AV_MAP) {
@@ -70,7 +70,7 @@ struct efa_conn *efa_av_addr_to_conn(struct efa_av *av, fi_addr_t fi_addr)
}
/**
- * @brief find fi_addr for dgram endpoint
+ * @brief find fi_addr for efa endpoint
*
* @param[in] av address vector
* @param[in] ahn address handle number
@@ -78,7 +78,7 @@ struct efa_conn *efa_av_addr_to_conn(struct efa_av *av, fi_addr_t fi_addr)
* @return On success, return fi_addr to the peer who send the packet
* If no such peer exist, return FI_ADDR_NOTAVAIL
*/
-fi_addr_t efa_av_reverse_lookup_dgram(struct efa_av *av, uint16_t ahn, uint16_t qpn)
+fi_addr_t efa_av_reverse_lookup(struct efa_av *av, uint16_t ahn, uint16_t qpn)
{
struct efa_cur_reverse_av *cur_entry;
struct efa_cur_reverse_av_key cur_key;
diff --git a/prov/efa/src/efa_av.h b/prov/efa/src/efa_av.h
index b1624398be0..acf7e58e320 100644
--- a/prov/efa/src/efa_av.h
+++ b/prov/efa/src/efa_av.h
@@ -86,6 +86,6 @@ struct efa_conn *efa_av_addr_to_conn(struct efa_av *av, fi_addr_t fi_addr);
fi_addr_t efa_av_reverse_lookup_rdm(struct efa_av *av, uint16_t ahn, uint16_t qpn, struct efa_rdm_pke *pkt_entry);
-fi_addr_t efa_av_reverse_lookup_dgram(struct efa_av *av, uint16_t ahn, uint16_t qpn);
+fi_addr_t efa_av_reverse_lookup(struct efa_av *av, uint16_t ahn, uint16_t qpn);
#endif
\ No newline at end of file
diff --git a/prov/efa/src/efa_cntr.c b/prov/efa/src/efa_cntr.c
index fa1f548c525..b914a499305 100644
--- a/prov/efa/src/efa_cntr.c
+++ b/prov/efa/src/efa_cntr.c
@@ -178,6 +178,24 @@ static void efa_rdm_cntr_progress(struct util_cntr *cntr)
ofi_genlock_unlock(&cntr->ep_list_lock);
}
+static void efa_cntr_progress(struct util_cntr *cntr)
+{
+ struct util_ep *ep;
+ struct fid_list_entry *fid_entry;
+ struct dlist_entry *item;
+
+ ofi_genlock_lock(&cntr->ep_list_lock);
+ dlist_foreach(&cntr->ep_list, item) {
+ fid_entry = container_of(item, struct fid_list_entry, entry);
+ ep = container_of(fid_entry->fid, struct util_ep, ep_fid.fid);
+ if (ep->tx_cq)
+ efa_cq_progress(ep->tx_cq);
+ if (ep->rx_cq)
+ efa_cq_progress(ep->rx_cq);
+ }
+ ofi_genlock_unlock(&cntr->ep_list_lock);
+}
+
int efa_cntr_open(struct fid_domain *domain, struct fi_cntr_attr *attr,
struct fid_cntr **cntr_fid, void *context)
{
@@ -199,7 +217,7 @@ int efa_cntr_open(struct fid_domain *domain, struct fi_cntr_attr *attr,
cntr_progress_func = efa_domain->info->ep_attr->type == FI_EP_RDM
? efa_rdm_cntr_progress
- : ofi_cntr_progress;
+ : efa_cntr_progress;
ret = ofi_cntr_init(&efa_prov, domain, attr, &cntr->util_cntr,
cntr_progress_func, context);
diff --git a/prov/efa/src/efa_cq.c b/prov/efa/src/efa_cq.c
new file mode 100644
index 00000000000..a5b737d89ac
--- /dev/null
+++ b/prov/efa/src/efa_cq.c
@@ -0,0 +1,470 @@
+/* SPDX-License-Identifier: BSD-2-Clause OR GPL-2.0-only */
+/* SPDX-FileCopyrightText: Copyright (c) 2013-2015 Intel Corporation, Inc. All rights reserved. */
+/* SPDX-FileCopyrightText: Copyright Amazon.com, Inc. or its affiliates. All rights reserved. */
+
+#include
+#include
+#include "config.h"
+#include
+#include "dgram/efa_dgram_ep.h"
+#include "efa.h"
+#include "efa_av.h"
+#include "efa_cntr.h"
+#include "efa_cq.h"
+#include
+
+
+static inline uint64_t efa_cq_opcode_to_fi_flags(enum ibv_wc_opcode opcode) {
+ switch (opcode) {
+ case IBV_WC_SEND:
+ return FI_SEND | FI_MSG;
+ case IBV_WC_RECV:
+ return FI_RECV | FI_MSG;
+ case IBV_WC_RDMA_WRITE:
+ return FI_RMA | FI_WRITE;
+ case IBV_WC_RECV_RDMA_WITH_IMM:
+ return FI_REMOTE_CQ_DATA | FI_RMA | FI_REMOTE_WRITE;
+ case IBV_WC_RDMA_READ:
+ return FI_RMA | FI_READ;
+ default:
+ assert(0);
+ return 0;
+ }
+}
+
+static void efa_cq_construct_cq_entry(struct ibv_cq_ex *ibv_cqx,
+ struct fi_cq_tagged_entry *entry)
+{
+ entry->op_context = (void *)ibv_cqx->wr_id;
+ entry->flags = efa_cq_opcode_to_fi_flags(ibv_wc_read_opcode(ibv_cqx));
+ entry->len = ibv_wc_read_byte_len(ibv_cqx);
+ entry->buf = NULL;
+ entry->data = 0;
+ entry->tag = 0;
+
+ if (ibv_wc_read_wc_flags(ibv_cqx) & IBV_WC_WITH_IMM) {
+ entry->flags |= FI_REMOTE_CQ_DATA;
+ entry->data = ibv_wc_read_imm_data(ibv_cqx);
+ }
+}
+
+/**
+ * @brief handle the situation that a TX/RX operation encountered error
+ *
+ * This function does the following to handle error:
+ *
+ * 1. write an error cq entry for the operation, if writing
+ * CQ error entry failed, it will write eq entry.
+ *
+ * 2. increase error counter.
+ *
+ * 3. print warning message with self and peer's raw address
+ *
+ * @param[in] base_ep efa_base_ep
+ * @param[in] ibv_cq_ex extended ibv cq
+ * @param[in] err positive libfabric error code
+ * @param[in] prov_errno positive EFA provider specific error code
+ * @param[in] is_tx if the error is for TX or RX operation
+ */
+static void efa_cq_handle_error(struct efa_base_ep *base_ep,
+ struct ibv_cq_ex *ibv_cq_ex, int err,
+ int prov_errno, bool is_tx)
+{
+ struct fi_cq_err_entry err_entry;
+ fi_addr_t addr;
+ char err_msg[EFA_ERROR_MSG_BUFFER_LENGTH] = {0};
+ int write_cq_err;
+
+ memset(&err_entry, 0, sizeof(err_entry));
+ efa_cq_construct_cq_entry(ibv_cq_ex, (struct fi_cq_tagged_entry *) &err_entry);
+ err_entry.err = err;
+ err_entry.prov_errno = prov_errno;
+
+ if (is_tx)
+ // TODO: get correct peer addr for TX operation
+ addr = FI_ADDR_NOTAVAIL;
+ else
+ addr = efa_av_reverse_lookup(base_ep->av,
+ ibv_wc_read_slid(ibv_cq_ex),
+ ibv_wc_read_src_qp(ibv_cq_ex));
+
+ if (OFI_UNLIKELY(efa_write_error_msg(base_ep, addr, prov_errno,
+ err_msg,
+ &err_entry.err_data_size))) {
+ err_entry.err_data_size = 0;
+ } else {
+ err_entry.err_data = err_msg;
+ }
+
+ EFA_WARN(FI_LOG_CQ, "err: %d, message: %s (%d)\n",
+ err_entry.err,
+ err_entry.err_data
+ ? (const char *) err_entry.err_data
+ : efa_strerror(err_entry.prov_errno),
+ err_entry.prov_errno);
+
+ efa_show_help(err_entry.prov_errno);
+
+ efa_cntr_report_error(&base_ep->util_ep, err_entry.flags);
+ write_cq_err = ofi_cq_write_error(is_tx ? base_ep->util_ep.tx_cq :
+ base_ep->util_ep.rx_cq,
+ &err_entry);
+ if (write_cq_err) {
+ EFA_WARN(
+ FI_LOG_CQ,
+ "Error writing error cq entry when handling %s error\n",
+ is_tx ? "TX" : "RX");
+ efa_base_ep_write_eq_error(base_ep, err, prov_errno);
+ }
+}
+
+/**
+ * @brief handle the event that a TX request has been completed
+ *
+ * @param[in] base_ep efa_base_ep
+ * @param[in] ibv_cq_ex extended ibv cq
+ * @param[in] cq_entry fi_cq_tagged_entry
+ */
+static void efa_cq_handle_tx_completion(struct efa_base_ep *base_ep,
+ struct ibv_cq_ex *ibv_cq_ex,
+ struct fi_cq_tagged_entry *cq_entry)
+{
+ struct util_cq *tx_cq = base_ep->util_ep.tx_cq;
+ int ret = 0;
+
+ /* NULL wr_id means no FI_COMPLETION flag */
+ if (!ibv_cq_ex->wr_id)
+ return;
+
+ /* TX completions should not send peer address to util_cq */
+ if (base_ep->util_ep.caps & FI_SOURCE)
+ ret = ofi_cq_write_src(tx_cq, cq_entry->op_context,
+ cq_entry->flags, cq_entry->len,
+ cq_entry->buf, cq_entry->data,
+ cq_entry->tag, FI_ADDR_NOTAVAIL);
+ else
+ ret = ofi_cq_write(tx_cq, cq_entry->op_context, cq_entry->flags,
+ cq_entry->len, cq_entry->buf, cq_entry->data,
+ cq_entry->tag);
+
+ if (OFI_UNLIKELY(ret)) {
+ EFA_WARN(FI_LOG_CQ, "Unable to write send completion: %s\n",
+ fi_strerror(-ret));
+ efa_cq_handle_error(base_ep, ibv_cq_ex, -ret,
+ FI_EFA_ERR_WRITE_SEND_COMP, true);
+ }
+}
+
+/**
+ * @brief handle the event that a RX request has been completed
+ *
+ * @param[in] base_ep efa_base_ep
+ * @param[in] ibv_cq_ex extended ibv cq
+ * @param[in] cq_entry fi_cq_tagged_entry
+ */
+static void efa_cq_handle_rx_completion(struct efa_base_ep *base_ep,
+ struct ibv_cq_ex *ibv_cq_ex,
+ struct fi_cq_tagged_entry *cq_entry)
+{
+ struct util_cq *rx_cq = base_ep->util_ep.rx_cq;
+ fi_addr_t src_addr;
+ int ret = 0;
+
+ /* NULL wr_id means no FI_COMPLETION flag */
+ if (!ibv_cq_ex->wr_id)
+ return;
+
+ if (base_ep->util_ep.caps & FI_SOURCE) {
+ src_addr = efa_av_reverse_lookup(base_ep->av,
+ ibv_wc_read_slid(ibv_cq_ex),
+ ibv_wc_read_src_qp(ibv_cq_ex));
+ ret = ofi_cq_write_src(rx_cq, cq_entry->op_context,
+ cq_entry->flags, cq_entry->len,
+ cq_entry->buf, cq_entry->data,
+ cq_entry->tag, src_addr);
+ } else {
+ ret = ofi_cq_write(rx_cq, cq_entry->op_context, cq_entry->flags,
+ cq_entry->len, cq_entry->buf, cq_entry->data,
+ cq_entry->tag);
+ }
+
+ if (OFI_UNLIKELY(ret)) {
+ EFA_WARN(FI_LOG_CQ, "Unable to write recv completion: %s\n",
+ fi_strerror(-ret));
+ efa_cq_handle_error(base_ep, ibv_cq_ex, -ret,
+ FI_EFA_ERR_WRITE_RECV_COMP, false);
+ }
+}
+
+/**
+ * @brief handle rdma-core CQ completion resulted from IBV_WRITE_WITH_IMM
+ *
+ * This function handles hardware-assisted RDMA writes with immediate data at
+ * remote endpoint. These do not have a packet context, nor do they have a
+ * connid available.
+ *
+ * @param[in] base_ep efa_base_ep
+ * @param[in] ibv_cq_ex extended ibv cq
+ */
+static void
+efa_cq_proc_ibv_recv_rdma_with_imm_completion(struct efa_base_ep *base_ep,
+ struct ibv_cq_ex *ibv_cq_ex)
+{
+ struct util_cq *rx_cq = base_ep->util_ep.rx_cq;
+ int ret;
+ fi_addr_t src_addr;
+ uint32_t imm_data = ibv_wc_read_imm_data(ibv_cq_ex);
+ uint32_t len = ibv_wc_read_byte_len(ibv_cq_ex);
+ uint64_t flags = FI_REMOTE_CQ_DATA | FI_RMA | FI_REMOTE_WRITE;
+
+ if (base_ep->util_ep.caps & FI_SOURCE) {
+ src_addr = efa_av_reverse_lookup(base_ep->av,
+ ibv_wc_read_slid(ibv_cq_ex),
+ ibv_wc_read_src_qp(ibv_cq_ex));
+ ret = ofi_cq_write_src(rx_cq, NULL, flags, len, NULL, imm_data,
+ 0, src_addr);
+ } else {
+ ret = ofi_cq_write(rx_cq, NULL, flags, len, NULL, imm_data, 0);
+ }
+
+ if (OFI_UNLIKELY(ret)) {
+ EFA_WARN(FI_LOG_CQ,
+ "Unable to write a cq entry for remote for RECV_RDMA "
+ "operation: %s\n",
+ fi_strerror(-ret));
+ efa_base_ep_write_eq_error(base_ep, -ret,
+ FI_EFA_ERR_WRITE_RECV_COMP);
+ }
+}
+
+/**
+ * @brief poll rdma-core cq and process the cq entry
+ *
+ * @param[in] cqe_to_process Max number of cq entry to poll and process.
+ * A negative number means to poll until cq empty.
+ * @param[in] util_cq util_cq
+ */
+void efa_cq_poll_ibv_cq(ssize_t cqe_to_process, struct util_cq *util_cq)
+{
+ bool should_end_poll = false;
+ struct efa_base_ep *base_ep;
+ struct efa_cq *cq;
+ struct efa_domain *efa_domain;
+ struct fi_cq_tagged_entry cq_entry = {0};
+ struct fi_cq_err_entry err_entry;
+ ssize_t err = 0;
+ size_t num_cqe = 0; /* Count of read entries */
+ int prov_errno, opcode;
+
+ /* Initialize an empty ibv_poll_cq_attr struct for ibv_start_poll.
+ * EFA expects .comp_mask = 0, or otherwise returns EINVAL.
+ */
+ struct ibv_poll_cq_attr poll_cq_attr = {.comp_mask = 0};
+
+ cq = container_of(util_cq, struct efa_cq, util_cq);
+ efa_domain = container_of(cq->util_cq.domain, struct efa_domain, util_domain);
+
+ /* Call ibv_start_poll only once */
+ err = ibv_start_poll(cq->ibv_cq.ibv_cq_ex, &poll_cq_attr);
+ should_end_poll = !err;
+
+ while (!err) {
+ base_ep = efa_domain->qp_table[ibv_wc_read_qp_num(cq->ibv_cq.ibv_cq_ex) & efa_domain->qp_table_sz_m1]->base_ep;
+ opcode = ibv_wc_read_opcode(cq->ibv_cq.ibv_cq_ex);
+ if (cq->ibv_cq.ibv_cq_ex->status) {
+ prov_errno = ibv_wc_read_vendor_err(cq->ibv_cq.ibv_cq_ex);
+ switch (opcode) {
+ case IBV_WC_SEND: /* fall through */
+ case IBV_WC_RDMA_WRITE: /* fall through */
+ case IBV_WC_RDMA_READ:
+ efa_cq_handle_error(base_ep, cq->ibv_cq.ibv_cq_ex,
+ to_fi_errno(prov_errno),
+ prov_errno, true);
+ break;
+ case IBV_WC_RECV: /* fall through */
+ case IBV_WC_RECV_RDMA_WITH_IMM:
+ if (efa_cq_wc_is_unsolicited(cq->ibv_cq.ibv_cq_ex)) {
+ EFA_WARN(FI_LOG_CQ,
+ "Receive error %s (%d) for "
+ "unsolicited write recv",
+ efa_strerror(prov_errno),
+ prov_errno);
+ efa_base_ep_write_eq_error(
+ base_ep,
+ to_fi_errno(prov_errno),
+ prov_errno);
+ break;
+ }
+ efa_cq_handle_error(base_ep, cq->ibv_cq.ibv_cq_ex,
+ to_fi_errno(prov_errno),
+ prov_errno, false);
+ break;
+ default:
+ EFA_WARN(FI_LOG_EP_CTRL, "Unhandled op code %d\n", opcode);
+ assert(0 && "Unhandled op code");
+ }
+ break;
+ }
+
+ efa_cq_construct_cq_entry(cq->ibv_cq.ibv_cq_ex, &cq_entry);
+
+ switch (opcode) {
+ case IBV_WC_SEND: /* fall through */
+ case IBV_WC_RDMA_WRITE: /* fall through */
+ case IBV_WC_RDMA_READ:
+ efa_cq_handle_tx_completion(base_ep, cq->ibv_cq.ibv_cq_ex, &cq_entry);
+ efa_cntr_report_tx_completion(&base_ep->util_ep, cq_entry.flags);
+ break;
+ case IBV_WC_RECV:
+ efa_cq_handle_rx_completion(base_ep, cq->ibv_cq.ibv_cq_ex, &cq_entry);
+ efa_cntr_report_rx_completion(&base_ep->util_ep, cq_entry.flags);
+ break;
+ case IBV_WC_RECV_RDMA_WITH_IMM:
+ efa_cq_proc_ibv_recv_rdma_with_imm_completion(
+ base_ep, cq->ibv_cq.ibv_cq_ex);
+ efa_cntr_report_rx_completion(&base_ep->util_ep, cq_entry.flags);
+ break;
+ default:
+ EFA_WARN(FI_LOG_EP_CTRL,
+ "Unhandled cq type\n");
+ assert(0 && "Unhandled cq type");
+ }
+
+ num_cqe++;
+ if (num_cqe == cqe_to_process) {
+ break;
+ }
+
+ err = ibv_next_poll(cq->ibv_cq.ibv_cq_ex);
+ }
+
+ if (err && err != ENOENT) {
+ err = err > 0 ? err : -err;
+ prov_errno = ibv_wc_read_vendor_err(cq->ibv_cq.ibv_cq_ex);
+ EFA_WARN(FI_LOG_CQ,
+ "Unexpected error when polling ibv cq, err: %s (%zd) "
+ "prov_errno: %s (%d)\n",
+ fi_strerror(err), err, efa_strerror(prov_errno),
+ prov_errno);
+ efa_show_help(prov_errno);
+ err_entry = (struct fi_cq_err_entry) {
+ .err = err,
+ .prov_errno = prov_errno,
+ .op_context = NULL,
+ };
+ ofi_cq_write_error(&cq->util_cq, &err_entry);
+ }
+
+ if (should_end_poll)
+ ibv_end_poll(cq->ibv_cq.ibv_cq_ex);
+}
+
+static const char *efa_cq_strerror(struct fid_cq *cq_fid,
+ int prov_errno,
+ const void *err_data,
+ char *buf, size_t len)
+{
+ return err_data
+ ? (const char *) err_data
+ : efa_strerror(prov_errno);
+}
+
+static struct fi_ops_cq efa_cq_ops = {
+ .size = sizeof(struct fi_ops_cq),
+ .read = ofi_cq_read,
+ .readfrom = ofi_cq_readfrom,
+ .readerr = ofi_cq_readerr,
+ .sread = fi_no_cq_sread,
+ .sreadfrom = fi_no_cq_sreadfrom,
+ .signal = fi_no_cq_signal,
+ .strerror = efa_cq_strerror
+};
+
+void efa_cq_progress(struct util_cq *cq)
+{
+ efa_cq_poll_ibv_cq(efa_env.efa_cq_read_size, cq);
+}
+
+static int efa_cq_close(fid_t fid)
+{
+ struct efa_cq *cq;
+ int ret;
+
+ cq = container_of(fid, struct efa_cq, util_cq.cq_fid.fid);
+
+ if (cq->ibv_cq.ibv_cq_ex) {
+ ret = -ibv_destroy_cq(ibv_cq_ex_to_cq(cq->ibv_cq.ibv_cq_ex));
+ if (ret) {
+ EFA_WARN(FI_LOG_CQ, "Unable to close ibv cq: %s\n",
+ fi_strerror(-ret));
+ return ret;
+ }
+ cq->ibv_cq.ibv_cq_ex = NULL;
+ }
+
+ ret = ofi_cq_cleanup(&cq->util_cq);
+ if (ret)
+ return ret;
+
+ free(cq);
+
+ return 0;
+}
+
+static struct fi_ops efa_cq_fi_ops = {
+ .size = sizeof(struct fi_ops),
+ .close = efa_cq_close,
+ .bind = fi_no_bind,
+ .control = fi_no_control,
+ .ops_open = fi_no_ops_open,
+};
+
+
+int efa_cq_open(struct fid_domain *domain_fid, struct fi_cq_attr *attr,
+ struct fid_cq **cq_fid, void *context)
+{
+ struct efa_cq *cq;
+ struct efa_domain *efa_domain;
+ int err, retv;
+
+ if (attr->wait_obj != FI_WAIT_NONE)
+ return -FI_ENOSYS;
+
+ cq = calloc(1, sizeof(*cq));
+ if (!cq)
+ return -FI_ENOMEM;
+
+ err = ofi_cq_init(&efa_prov, domain_fid, attr, &cq->util_cq,
+ &efa_cq_progress, context);
+ if (err) {
+ EFA_WARN(FI_LOG_CQ, "Unable to create UTIL_CQ\n");
+ goto err_free_cq;
+ }
+
+ efa_domain = container_of(cq->util_cq.domain, struct efa_domain,
+ util_domain);
+ err = efa_cq_ibv_cq_ex_open(attr, efa_domain->device->ibv_ctx,
+ &cq->ibv_cq.ibv_cq_ex,
+ &cq->ibv_cq.ibv_cq_ex_type);
+ if (err) {
+ EFA_WARN(FI_LOG_CQ, "Unable to create extended CQ: %s\n", fi_strerror(err));
+ goto err_free_util_cq;
+ }
+
+ *cq_fid = &cq->util_cq.cq_fid;
+ (*cq_fid)->fid.fclass = FI_CLASS_CQ;
+ (*cq_fid)->fid.context = context;
+ (*cq_fid)->fid.ops = &efa_cq_fi_ops;
+ (*cq_fid)->ops = &efa_cq_ops;
+
+ return 0;
+
+err_free_util_cq:
+ retv = ofi_cq_cleanup(&cq->util_cq);
+ if (retv)
+ EFA_WARN(FI_LOG_CQ, "Unable to close util cq: %s\n",
+ fi_strerror(-retv));
+err_free_cq:
+ free(cq);
+ return err;
+}
diff --git a/prov/efa/src/efa_cq.h b/prov/efa/src/efa_cq.h
index 26366d5094c..8d328d8e7fd 100644
--- a/prov/efa/src/efa_cq.h
+++ b/prov/efa/src/efa_cq.h
@@ -18,6 +18,11 @@ struct efa_ibv_cq_poll_list_entry {
struct efa_ibv_cq *cq;
};
+struct efa_cq {
+ struct util_cq util_cq;
+ struct efa_ibv_cq ibv_cq;
+};
+
/*
* Control header with completion data. CQ data length is static.
*/
@@ -177,6 +182,11 @@ static inline int efa_cq_ibv_cq_ex_open(struct fi_cq_attr *attr,
}
#endif
+int efa_cq_open(struct fid_domain *domain_fid, struct fi_cq_attr *attr,
+ struct fid_cq **cq_fid, void *context);
+
+void efa_cq_progress(struct util_cq *cq);
+
#if HAVE_CAPS_UNSOLICITED_WRITE_RECV
/**
* @brief Check whether a completion consumes recv buffer
@@ -200,3 +210,62 @@ bool efa_cq_wc_is_unsolicited(struct ibv_cq_ex *ibv_cq_ex)
}
#endif
+
+/**
+ * @brief Write the error message and return its byte length
+ * @param[in] ep EFA base endpoint
+ * @param[in] addr Remote peer fi_addr_t
+ * @param[in] prov_errno EFA provider * error code(must be positive)
+ * @param[out] err_msg Pointer to the address of error message written by
+ * this function
+ * @param[out] buflen Pointer to the returned error data size
+ * @return A status code. 0 if the error data was written successfully,
+ * otherwise a negative FI error code.
+ */
+static inline int efa_write_error_msg(struct efa_base_ep *ep, fi_addr_t addr,
+ int prov_errno, char *err_msg,
+ size_t *buflen)
+{
+ char ep_addr_str[OFI_ADDRSTRLEN] = {0}, peer_addr_str[OFI_ADDRSTRLEN] = {0};
+ char peer_host_id_str[EFA_HOST_ID_STRING_LENGTH + 1] = {0};
+ char local_host_id_str[EFA_HOST_ID_STRING_LENGTH + 1] = {0};
+ const char *base_msg = efa_strerror(prov_errno);
+ size_t len = 0;
+ uint64_t local_host_id;
+
+ *buflen = 0;
+
+ len = sizeof(ep_addr_str);
+ efa_base_ep_raw_addr_str(ep, ep_addr_str, &len);
+ len = sizeof(peer_addr_str);
+ efa_base_ep_get_peer_raw_addr_str(ep, addr, peer_addr_str, &len);
+
+ local_host_id = efa_get_host_id(efa_env.host_id_file);
+ if (!local_host_id ||
+ EFA_HOST_ID_STRING_LENGTH != snprintf(local_host_id_str,
+ EFA_HOST_ID_STRING_LENGTH + 1,
+ "i-%017lx", local_host_id)) {
+ strcpy(local_host_id_str, "N/A");
+ }
+
+ /* efa-raw cannot get peer host id without a handshake */
+ strcpy(peer_host_id_str, "N/A");
+
+ int ret = snprintf(err_msg, EFA_ERROR_MSG_BUFFER_LENGTH,
+ "%s My EFA addr: %s My host id: %s Peer EFA addr: "
+ "%s Peer host id: %s",
+ base_msg, ep_addr_str, local_host_id_str,
+ peer_addr_str, peer_host_id_str);
+
+ if (ret < 0 || ret > EFA_ERROR_MSG_BUFFER_LENGTH - 1) {
+ return -FI_EINVAL;
+ }
+
+ if (strlen(err_msg) >= EFA_ERROR_MSG_BUFFER_LENGTH) {
+ return -FI_ENOBUFS;
+ }
+
+ *buflen = EFA_ERROR_MSG_BUFFER_LENGTH;
+
+ return 0;
+}
diff --git a/prov/efa/src/efa_domain.c b/prov/efa/src/efa_domain.c
index e6cab857af3..e64f1fda4c0 100644
--- a/prov/efa/src/efa_domain.c
+++ b/prov/efa/src/efa_domain.c
@@ -12,7 +12,6 @@
#include "rdm/efa_rdm_cq.h"
#include "rdm/efa_rdm_atomic.h"
#include "dgram/efa_dgram_ep.h"
-#include "dgram/efa_dgram_cq.h"
struct dlist_entry g_efa_domain_list;
@@ -33,7 +32,7 @@ static struct fi_ops efa_ops_domain_fid = {
static struct fi_ops_domain efa_ops_domain_dgram = {
.size = sizeof(struct fi_ops_domain),
.av_open = efa_av_open,
- .cq_open = efa_dgram_cq_open,
+ .cq_open = efa_cq_open,
.endpoint = efa_dgram_ep_open,
.scalable_ep = fi_no_scalable_ep,
.cntr_open = efa_cntr_open,
diff --git a/prov/efa/src/efa_msg.c b/prov/efa/src/efa_msg.c
index 7920afbf531..2fc4ad6c195 100644
--- a/prov/efa/src/efa_msg.c
+++ b/prov/efa/src/efa_msg.c
@@ -97,9 +97,9 @@ static inline ssize_t efa_post_recv(struct efa_base_ep *base_ep, const struct fi
}
wr = &base_ep->efa_recv_wr_vec[wr_index].wr;
- wr->wr_id = (uintptr_t)msg->context;
wr->num_sge = msg->iov_count;
wr->sg_list = base_ep->efa_recv_wr_vec[wr_index].sge;
+ wr->wr_id = (uintptr_t) ((flags & FI_COMPLETION) ? msg->context : NULL);
for (i = 0; i < msg->iov_count; i++) {
addr = (uintptr_t)msg->msg_iov[i].iov_base;
@@ -214,7 +214,8 @@ static inline ssize_t efa_post_send(struct efa_base_ep *base_ep, const struct fi
base_ep->is_wr_started = true;
}
- qp->ibv_qp_ex->wr_id = (uintptr_t)msg->context;
+ qp->ibv_qp_ex->wr_id = (uintptr_t) ((flags & FI_COMPLETION) ? msg->context : NULL);
+
if (flags & FI_REMOTE_CQ_DATA) {
ibv_wr_send_imm(qp->ibv_qp_ex, msg->data);
} else {
diff --git a/prov/efa/src/efa_rma.c b/prov/efa/src/efa_rma.c
index a7bad7d3877..bbec8c78451 100644
--- a/prov/efa/src/efa_rma.c
+++ b/prov/efa/src/efa_rma.c
@@ -87,7 +87,7 @@ static inline ssize_t efa_rma_post_read(struct efa_base_ep *base_ep,
ibv_wr_start(qp->ibv_qp_ex);
base_ep->is_wr_started = true;
}
- qp->ibv_qp_ex->wr_id = (uintptr_t)msg->context;
+ qp->ibv_qp_ex->wr_id = (uintptr_t) ((flags & FI_COMPLETION) ? msg->context : NULL);
/* ep->domain->info->tx_attr->rma_iov_limit is set to 1 */
ibv_wr_rdma_read(qp->ibv_qp_ex, msg->rma_iov[0].key, msg->rma_iov[0].addr);
@@ -216,7 +216,7 @@ static inline ssize_t efa_rma_post_write(struct efa_base_ep *base_ep,
ibv_wr_start(qp->ibv_qp_ex);
base_ep->is_wr_started = true;
}
- qp->ibv_qp_ex->wr_id = (uintptr_t)msg->context;
+ qp->ibv_qp_ex->wr_id = (uintptr_t) ((flags & FI_COMPLETION) ? msg->context : NULL);
if (flags & FI_REMOTE_CQ_DATA) {
ibv_wr_rdma_write_imm(qp->ibv_qp_ex, msg->rma_iov[0].key,
diff --git a/prov/efa/src/rdm/efa_rdm_cq.h b/prov/efa/src/rdm/efa_rdm_cq.h
index 932c57109d7..a56d62dac40 100644
--- a/prov/efa/src/rdm/efa_rdm_cq.h
+++ b/prov/efa/src/rdm/efa_rdm_cq.h
@@ -9,8 +9,8 @@
struct efa_rdm_cq {
struct util_cq util_cq;
- struct fid_cq *shm_cq;
struct efa_ibv_cq ibv_cq;
+ struct fid_cq *shm_cq;
struct dlist_entry ibv_cq_poll_list;
bool need_to_scan_ep_list;
};
diff --git a/prov/efa/test/efa_unit_test_cq.c b/prov/efa/test/efa_unit_test_cq.c
index 29a06fc1579..df415f7cd9a 100644
--- a/prov/efa/test/efa_unit_test_cq.c
+++ b/prov/efa/test/efa_unit_test_cq.c
@@ -3,8 +3,8 @@
#include "efa_unit_tests.h"
#include "dgram/efa_dgram_ep.h"
-#include "dgram/efa_dgram_cq.h"
#include "rdm/efa_rdm_cq.h"
+#include "efa_av.h"
/**
* @brief implementation of test cases for fi_cq_read() works with empty device CQ for given endpoint type
@@ -27,7 +27,7 @@ void test_impl_cq_read_empty_cq(struct efa_resource *resource, enum fi_ep_type e
struct efa_dgram_ep *efa_dgram_ep;
efa_dgram_ep = container_of(resource->ep, struct efa_dgram_ep, base_ep.util_ep.ep_fid);
- ibv_cqx = efa_dgram_ep->rcq->ibv_cq_ex;
+ ibv_cqx = container_of(efa_dgram_ep->base_ep.util_ep.rx_cq, struct efa_cq, util_cq)->ibv_cq.ibv_cq_ex;
} else {
struct efa_rdm_ep *efa_rdm_ep;
@@ -811,3 +811,216 @@ void test_ibv_cq_ex_read_ignore_removed_peer()
skip();
}
#endif
+
+static void test_efa_cq_read(struct efa_resource *resource, fi_addr_t *addr,
+ int ibv_wc_opcode, int status, int vendor_error)
+{
+ int ret;
+ size_t raw_addr_len = sizeof(struct efa_ep_addr);
+ struct efa_ep_addr raw_addr;
+ struct ibv_cq_ex *ibv_cqx;
+ struct ibv_qp_ex *ibv_qpx;
+ struct efa_base_ep *base_ep;
+
+ efa_unit_test_resource_construct(resource, FI_EP_DGRAM);
+
+ base_ep = container_of(resource->ep, struct efa_base_ep, util_ep.ep_fid);
+ ibv_qpx = base_ep->qp->ibv_qp_ex;
+
+ ret = fi_getname(&resource->ep->fid, &raw_addr, &raw_addr_len);
+ assert_int_equal(ret, 0);
+ raw_addr.qpn = 1;
+ raw_addr.qkey = 0x1234;
+ ret = fi_av_insert(resource->av, &raw_addr, 1, addr, 0 /* flags */, NULL /* context */);
+ assert_int_equal(ret, 1);
+
+ ibv_qpx->wr_start = &efa_mock_ibv_wr_start_no_op;
+ /* this mock will save the send work request (wr) in a global list */
+ ibv_qpx->wr_send = &efa_mock_ibv_wr_send_save_wr;
+ ibv_qpx->wr_set_sge_list = &efa_mock_ibv_wr_set_sge_list_no_op;
+ ibv_qpx->wr_set_ud_addr = &efa_mock_ibv_wr_set_ud_addr_no_op;
+ ibv_qpx->wr_complete = &efa_mock_ibv_wr_complete_no_op;
+
+ base_ep->qp->ibv_qp->context->ops.post_recv = &efa_mock_ibv_post_recv;
+ will_return_maybe(efa_mock_ibv_post_recv, 0);
+
+ if (ibv_wc_opcode == IBV_WC_RECV) {
+ ibv_cqx = container_of(base_ep->util_ep.rx_cq, struct efa_cq, util_cq)->ibv_cq.ibv_cq_ex;
+ ibv_cqx->start_poll = &efa_mock_ibv_start_poll_return_mock;
+ ibv_cqx->wr_id = (uintptr_t)12345;
+ will_return(efa_mock_ibv_start_poll_return_mock, 0);
+ ibv_cqx->status = status;
+ } else {
+ ibv_cqx = container_of(base_ep->util_ep.tx_cq, struct efa_cq, util_cq)->ibv_cq.ibv_cq_ex;
+ /* this mock will set ibv_cq_ex->wr_id to the wr_id of the head of global send_wr,
+ * and set ibv_cq_ex->status to mock value */
+ ibv_cqx->start_poll = &efa_mock_ibv_start_poll_use_saved_send_wr_with_mock_status;
+ will_return(efa_mock_ibv_start_poll_use_saved_send_wr_with_mock_status, status);
+ }
+
+ ibv_cqx->next_poll = &efa_mock_ibv_next_poll_return_mock;
+ ibv_cqx->end_poll = &efa_mock_ibv_end_poll_check_mock;
+ ibv_cqx->read_opcode = &efa_mock_ibv_read_opcode_return_mock;
+ ibv_cqx->read_vendor_err = &efa_mock_ibv_read_vendor_err_return_mock;
+ ibv_cqx->read_qp_num = &efa_mock_ibv_read_qp_num_return_mock;
+ will_return_maybe(efa_mock_ibv_end_poll_check_mock, NULL);
+ will_return_maybe(efa_mock_ibv_next_poll_return_mock, 0);
+ will_return_maybe(efa_mock_ibv_read_opcode_return_mock, ibv_wc_opcode);
+ will_return_maybe(efa_mock_ibv_read_qp_num_return_mock, base_ep->qp->qp_num);
+ will_return_maybe(efa_mock_ibv_read_vendor_err_return_mock, vendor_error);
+#if HAVE_EFADV_CQ_EX
+ ibv_cqx->read_byte_len = &efa_mock_ibv_read_byte_len_return_mock;
+ ibv_cqx->read_slid = &efa_mock_ibv_read_slid_return_mock;
+ ibv_cqx->read_src_qp = &efa_mock_ibv_read_src_qp_return_mock;
+ ibv_cqx->read_wc_flags = &efa_mock_ibv_read_wc_flags_return_mock;
+ will_return_maybe(efa_mock_ibv_read_byte_len_return_mock, 4096);
+ will_return_maybe(efa_mock_ibv_read_slid_return_mock, efa_av_addr_to_conn(base_ep->av, *addr)->ah->ahn);
+ will_return_maybe(efa_mock_ibv_read_src_qp_return_mock, raw_addr.qpn);
+ will_return_maybe(efa_mock_ibv_read_wc_flags_return_mock, 0);
+#endif
+}
+
+/**
+ * @brief test EFA CQ's fi_cq_read() works properly when rdma-core return
+ * success status for send operation.
+ */
+void test_efa_cq_read_send_success(struct efa_resource **state)
+{
+ struct efa_resource *resource = *state;
+ struct efa_unit_test_buff send_buff;
+ struct fi_cq_data_entry cq_entry;
+ fi_addr_t addr;
+ int ret;
+
+ test_efa_cq_read(resource, &addr, IBV_WC_SEND, IBV_WC_SUCCESS, 0);
+ efa_unit_test_buff_construct(&send_buff, resource, 4096 /* buff_size */);
+
+ assert_int_equal(g_ibv_submitted_wr_id_cnt, 0);
+ ret = fi_send(resource->ep, send_buff.buff, send_buff.size,
+ fi_mr_desc(send_buff.mr), addr, (void *) 12345);
+ assert_int_equal(ret, 0);
+ assert_int_equal(g_ibv_submitted_wr_id_cnt, 1);
+
+ ret = fi_cq_read(resource->cq, &cq_entry, 1);
+ /* fi_cq_read() called efa_mock_ibv_start_poll_use_saved_send_wr(), which pulled one send_wr from g_ibv_submitted_wr_idv=_vec */
+ assert_int_equal(g_ibv_submitted_wr_id_cnt, 0);
+ assert_int_equal(ret, 1);
+
+ efa_unit_test_buff_destruct(&send_buff);
+}
+
+/**
+ * @brief test EFA CQ's fi_cq_read() works properly when rdma-core return
+ * success status for recv operation.
+ */
+void test_efa_cq_read_recv_success(struct efa_resource **state)
+{
+ struct efa_resource *resource = *state;
+ struct efa_unit_test_buff recv_buff;
+ struct fi_cq_data_entry cq_entry;
+ fi_addr_t addr;
+ int ret;
+
+ test_efa_cq_read(resource, &addr, IBV_WC_RECV, IBV_WC_SUCCESS, 0);
+ efa_unit_test_buff_construct(&recv_buff, resource, 4096 /* buff_size */);
+
+ ret = fi_recv(resource->ep, recv_buff.buff, recv_buff.size,
+ fi_mr_desc(recv_buff.mr), addr, NULL);
+ assert_int_equal(ret, 0);
+
+ ret = fi_cq_read(resource->cq, &cq_entry, 1);
+ assert_int_equal(ret, 1);
+
+ efa_unit_test_buff_destruct(&recv_buff);
+}
+
+static void efa_cq_check_cq_err_entry(struct efa_resource *resource, int vendor_error) {
+ struct fi_cq_err_entry cq_err_entry = {0};
+ const char *strerror;
+ int ret;
+
+ /* Allocate memory to read CQ error */
+ cq_err_entry.err_data_size = EFA_ERROR_MSG_BUFFER_LENGTH;
+ cq_err_entry.err_data = malloc(cq_err_entry.err_data_size);
+ assert_non_null(cq_err_entry.err_data);
+
+ ret = fi_cq_readerr(resource->cq, &cq_err_entry, 0);
+ assert_true(cq_err_entry.err_data_size > 0);
+ strerror = fi_cq_strerror(resource->cq, cq_err_entry.prov_errno,
+ cq_err_entry.err_data, NULL, 0);
+
+ assert_int_equal(ret, 1);
+ assert_int_not_equal(cq_err_entry.err, FI_SUCCESS);
+ assert_int_equal(cq_err_entry.prov_errno, vendor_error);
+ assert_true(strlen(strerror) > 0);
+}
+
+/**
+ * @brief test EFA CQ's fi_cq_read()/fi_cq_readerr() works properly when rdma-core return bad status for send.
+ *
+ * When the send operation failed, fi_cq_read() should return -FI_EAVAIL, which means error available.
+ * then user should call fi_cq_readerr() to get an error CQ entry that contain error code.
+ *
+ * @param[in] state struct efa_resource that is managed by the framework
+ */
+void test_efa_cq_read_send_failure(struct efa_resource **state)
+{
+ struct efa_resource *resource = *state;
+ struct efa_unit_test_buff send_buff;
+ struct fi_cq_data_entry cq_entry;
+ fi_addr_t addr;
+ int ret;
+
+ test_efa_cq_read(resource, &addr, IBV_WC_SEND, IBV_WC_GENERAL_ERR,
+ EFA_IO_COMP_STATUS_LOCAL_ERROR_UNRESP_REMOTE);
+ efa_unit_test_buff_construct(&send_buff, resource, 4096 /* buff_size */);
+
+ assert_int_equal(g_ibv_submitted_wr_id_cnt, 0);
+ ret = fi_send(resource->ep, send_buff.buff, send_buff.size,
+ fi_mr_desc(send_buff.mr), addr, (void *) 12345);
+ assert_int_equal(ret, 0);
+ assert_int_equal(g_ibv_submitted_wr_id_cnt, 1);
+
+ ret = fi_cq_read(resource->cq, &cq_entry, 1);
+ /* fi_cq_read() called efa_mock_ibv_start_poll_use_saved_send_wr(), which pulled one send_wr from g_ibv_submitted_wr_idv=_vec */
+ assert_int_equal(g_ibv_submitted_wr_id_cnt, 0);
+ assert_int_equal(ret, -FI_EAVAIL);
+
+ efa_cq_check_cq_err_entry(resource,
+ EFA_IO_COMP_STATUS_LOCAL_ERROR_UNRESP_REMOTE);
+
+ efa_unit_test_buff_destruct(&send_buff);
+}
+
+/**
+ * @brief test EFA CQ's fi_cq_read()/fi_cq_readerr() works properly when rdma-core return bad status for recv.
+ *
+ * When the recv operation failed, fi_cq_read() should return -FI_EAVAIL, which means error available.
+ * then user should call fi_cq_readerr() to get an error CQ entry that contain error code.
+ *
+ * @param[in] state struct efa_resource that is managed by the framework
+ */
+void test_efa_cq_read_recv_failure(struct efa_resource **state)
+{
+ struct efa_resource *resource = *state;
+ struct efa_unit_test_buff recv_buff;
+ struct fi_cq_data_entry cq_entry;
+ fi_addr_t addr;
+ int ret;
+
+ test_efa_cq_read(resource, &addr, IBV_WC_RECV, IBV_WC_GENERAL_ERR,
+ EFA_IO_COMP_STATUS_LOCAL_ERROR_UNRESP_REMOTE);
+ efa_unit_test_buff_construct(&recv_buff, resource, 4096 /* buff_size */);
+
+ ret = fi_recv(resource->ep, recv_buff.buff, recv_buff.size,
+ fi_mr_desc(recv_buff.mr), addr, NULL);
+ assert_int_equal(ret, 0);
+
+ ret = fi_cq_read(resource->cq, &cq_entry, 1);
+ assert_int_equal(ret, -FI_EAVAIL);
+
+ efa_cq_check_cq_err_entry(resource,
+ EFA_IO_COMP_STATUS_LOCAL_ERROR_UNRESP_REMOTE);
+
+ efa_unit_test_buff_destruct(&recv_buff);
+}
diff --git a/prov/efa/test/efa_unit_tests.c b/prov/efa/test/efa_unit_tests.c
index 017f4e65ded..3e3ba43ef04 100644
--- a/prov/efa/test/efa_unit_tests.c
+++ b/prov/efa/test/efa_unit_tests.c
@@ -229,6 +229,10 @@ int main(void)
cmocka_unit_test_setup_teardown(test_efa_rma_writedata, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown),
cmocka_unit_test_setup_teardown(test_efa_rma_inject_write, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown),
cmocka_unit_test_setup_teardown(test_efa_rma_inject_writedata, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown),
+ cmocka_unit_test_setup_teardown(test_efa_cq_read_send_success, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown),
+ cmocka_unit_test_setup_teardown(test_efa_cq_read_recv_success, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown),
+ cmocka_unit_test_setup_teardown(test_efa_cq_read_send_failure, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown),
+ cmocka_unit_test_setup_teardown(test_efa_cq_read_recv_failure, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown),
};
cmocka_set_message_output(CM_OUTPUT_XML);
diff --git a/prov/efa/test/efa_unit_tests.h b/prov/efa/test/efa_unit_tests.h
index 4a796e5385f..86bef64edab 100644
--- a/prov/efa/test/efa_unit_tests.h
+++ b/prov/efa/test/efa_unit_tests.h
@@ -251,6 +251,10 @@ void test_efa_rma_writemsg();
void test_efa_rma_writedata();
void test_efa_rma_inject_write();
void test_efa_rma_inject_writedata();
+void test_efa_cq_read_send_success();
+void test_efa_cq_read_recv_success();
+void test_efa_cq_read_send_failure();
+void test_efa_cq_read_recv_failure();
static inline
int efa_unit_test_get_dlist_length(struct dlist_entry *head)