Skip to content

Commit

Permalink
UCP/UCT/RCACHE: API to return rcache usage information
Browse files Browse the repository at this point in the history
  • Loading branch information
yosefe committed Jan 20, 2021
1 parent 87fd76e commit 7c395d6
Show file tree
Hide file tree
Showing 12 changed files with 191 additions and 54 deletions.
23 changes: 21 additions & 2 deletions src/ucp/api/ucp.h
Original file line number Diff line number Diff line change
Expand Up @@ -325,8 +325,11 @@ enum ucp_mem_advise_params_field {
* present. It is used to enable backward compatibility support.
*/
enum ucp_context_attr_field {
UCP_ATTR_FIELD_REQUEST_SIZE = UCS_BIT(0), /**< UCP request size */
UCP_ATTR_FIELD_THREAD_MODE = UCS_BIT(1) /**< UCP context thread flag */
UCP_ATTR_FIELD_REQUEST_SIZE = UCS_BIT(0), /**< UCP request size */
UCP_ATTR_FIELD_THREAD_MODE = UCS_BIT(1), /**< UCP context thread flag */
UCP_ATTR_FIELD_NUM_PINNED_REGIONS = UCS_BIT(2), /**< Current pinned regions count */
UCP_ATTR_FIELD_NUM_PINNED_BYTES = UCS_BIT(3), /**< Current pinned regions total size */
UCP_ATTR_FIELD_NUM_PINNED_EVICTIONS = UCS_BIT(4) /**< Total pinned memory evictions */
};


Expand Down Expand Up @@ -841,6 +844,22 @@ typedef struct ucp_context_attr {
* see @ref ucs_thread_mode_t.
*/
ucs_thread_mode_t thread_mode;

/**
* Number of pinned regions in this context
*/
unsigned long num_pinned_regions;

/**
* Total size of pinned memory in this context
*/
size_t num_pinned_bytes;

/**
* How many pinned regions were evicted due to memory usage constraints
*/
unsigned long num_pinned_evictions;

} ucp_context_attr_t;


Expand Down
46 changes: 38 additions & 8 deletions src/ucp/core/ucp_context.c
Original file line number Diff line number Diff line change
Expand Up @@ -1626,16 +1626,46 @@ void ucp_context_uct_atomic_iface_flags(ucp_context_h context,
}
}

#define UCP_SET_ATTR_FIELD(_attr, _field, _flag, _op_rvalue) \
if ((_attr)->field_mask & (_flag)) { \
(_attr)->_field _op_rvalue; \
}

ucs_status_t ucp_context_query(ucp_context_h context, ucp_context_attr_t *attr)
{
if (attr->field_mask & UCP_ATTR_FIELD_REQUEST_SIZE) {
attr->request_size = sizeof(ucp_request_t);
}
if (attr->field_mask & UCP_ATTR_FIELD_THREAD_MODE) {
if (UCP_THREAD_IS_REQUIRED(&context->mt_lock)) {
attr->thread_mode = UCS_THREAD_MODE_MULTI;
} else {
attr->thread_mode = UCS_THREAD_MODE_SINGLE;
ucp_md_index_t md_index;
uct_md_attr_t md_attr;

UCP_SET_ATTR_FIELD(attr, request_size, UCP_ATTR_FIELD_REQUEST_SIZE,
= sizeof(ucp_request_t));

UCP_SET_ATTR_FIELD(attr, thread_mode, UCP_ATTR_FIELD_THREAD_MODE,
= UCP_THREAD_IS_REQUIRED(&context->mt_lock) ?
UCS_THREAD_MODE_MULTI :
UCS_THREAD_MODE_SINGLE);

if (attr->field_mask &
(UCP_ATTR_FIELD_NUM_PINNED_REGIONS | UCP_ATTR_FIELD_NUM_PINNED_BYTES |
UCP_ATTR_FIELD_NUM_PINNED_EVICTIONS)) {

UCP_SET_ATTR_FIELD(attr, num_pinned_regions,
UCP_ATTR_FIELD_NUM_PINNED_REGIONS, = 0);
UCP_SET_ATTR_FIELD(attr, num_pinned_bytes,
UCP_ATTR_FIELD_NUM_PINNED_BYTES, = 0);
UCP_SET_ATTR_FIELD(attr, num_pinned_evictions,
UCP_ATTR_FIELD_NUM_PINNED_EVICTIONS, = 0);

for (md_index = 0; md_index < context->num_mds; ++md_index) {
uct_md_query(context->tl_mds[md_index].md, &md_attr);
UCP_SET_ATTR_FIELD(attr, num_pinned_regions,
UCP_ATTR_FIELD_NUM_PINNED_REGIONS,
+= md_attr.rcache_attr.num_regions);
UCP_SET_ATTR_FIELD(attr, num_pinned_bytes,
UCP_ATTR_FIELD_NUM_PINNED_BYTES,
+= md_attr.rcache_attr.total_size);
UCP_SET_ATTR_FIELD(attr, num_pinned_evictions,
UCP_ATTR_FIELD_NUM_PINNED_EVICTIONS,
+= md_attr.rcache_attr.num_evictions);
}
}

Expand Down
15 changes: 12 additions & 3 deletions src/ucs/memory/rcache.c
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,7 @@ static void ucs_rcache_lru_evict(ucs_rcache_t *rcache)
ucs_spin_unlock(&rcache->lru.lock);

if (num_evicted > 0) {
rcache->num_evictions += num_evicted;
ucs_debug("evicted %d regions, skipped %d regions, usage: %lu (%lu)",
num_evicted, num_skipped, rcache->num_regions,
rcache->params.max_regions);
Expand Down Expand Up @@ -746,6 +747,13 @@ void ucs_rcache_check_inv_queue_slow(ucs_rcache_t *rcache)
pthread_rwlock_unlock(&rcache->lock);
}

void ucs_rcache_query(ucs_rcache_t *rcache, ucs_rcache_attr_t *rcache_attr)
{
rcache_attr->num_regions = rcache->num_regions;
rcache_attr->total_size = rcache->total_size;
rcache_attr->num_evictions = rcache->num_evictions;
}

static UCS_CLASS_INIT_FUNC(ucs_rcache_t, const ucs_rcache_params_t *params,
const char *name, ucs_stats_node_t *stats_parent)
{
Expand Down Expand Up @@ -807,9 +815,10 @@ static UCS_CLASS_INIT_FUNC(ucs_rcache_t, const ucs_rcache_params_t *params,
}

ucs_queue_head_init(&self->inv_q);
self->lru.count = 0;
self->num_regions = 0;
self->total_size = 0;
self->lru.count = 0;
self->num_regions = 0;
self->total_size = 0;
self->num_evictions = 0;
ucs_list_head_init(&self->lru.list);
ucs_spinlock_init(&self->lru.lock, 0);

Expand Down
20 changes: 20 additions & 0 deletions src/ucs/memory/rcache.h
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,18 @@ struct ucs_rcache_region {
};


typedef struct {
/* Number of regions in registration cache */
unsigned long num_regions;

/* Total size of all regions in the cache */
size_t total_size;

/* How many regions were evicted during the cache lifetime */
unsigned long num_evictions;
} ucs_rcache_attr_t;


/**
* Create a memory registration cache.
*
Expand Down Expand Up @@ -193,4 +205,12 @@ void ucs_rcache_region_hold(ucs_rcache_t *rcache, ucs_rcache_region_t *region);
void ucs_rcache_region_put(ucs_rcache_t *rcache, ucs_rcache_region_t *region);


/**
* Query registration cache information.
*
* @param [in] rcache Memory registration cache to query.
* @param [out] rcache_attr Filled with rcache attributes.
*/
void ucs_rcache_query(ucs_rcache_t *rcache, ucs_rcache_attr_t *rcache_attr);

#endif
1 change: 1 addition & 0 deletions src/ucs/memory/rcache_int.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ struct ucs_rcache {
which does not generate memory events */
unsigned long num_regions;/**< Total number of managed regions */
size_t total_size; /**< Total size of registered memory */
unsigned long num_evictions; /**< Total number of evictions */

struct {
ucs_spinlock_t lock; /**< Lock for this structure */
Expand Down
9 changes: 1 addition & 8 deletions src/ucs/sys/sys.c
Original file line number Diff line number Diff line change
Expand Up @@ -1169,14 +1169,7 @@ int ucs_sys_getaffinity(ucs_sys_cpuset_t *cpuset)

void ucs_sys_cpuset_copy(ucs_cpu_set_t *dst, const ucs_sys_cpuset_t *src)
{
int c;

UCS_CPU_ZERO(dst);
for (c = 0; c < UCS_CPU_SETSIZE; ++c) {
if (CPU_ISSET(c, src)) {
UCS_CPU_SET(c, dst);
}
}
memcpy(dst, src, sizeof(*dst));
}

ucs_sys_ns_t ucs_sys_get_ns(ucs_sys_namespace_type_t ns)
Expand Down
2 changes: 2 additions & 0 deletions src/uct/api/uct.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <ucs/async/async_fwd.h>
#include <ucs/datastruct/callbackq.h>
#include <ucs/memory/memory_type.h>
#include <ucs/memory/rcache.h>
#include <ucs/type/status.h>
#include <ucs/type/thread_mode.h>
#include <ucs/type/cpu_set.h>
Expand Down Expand Up @@ -1251,6 +1252,7 @@ struct uct_md_attr {
char component_name[UCT_COMPONENT_NAME_MAX]; /**< Component name */
size_t rkey_packed_size; /**< Size of buffer needed for packed rkey */
ucs_cpu_set_t local_cpus; /**< Mask of CPUs near the resource */
ucs_rcache_attr_t rcache_attr; /**< Registration cache information */
};


Expand Down
2 changes: 2 additions & 0 deletions src/uct/base/uct_md.c
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,8 @@ ucs_status_t uct_md_query(uct_md_h md, uct_md_attr_t *md_attr)
{
ucs_status_t status;

memset(&md_attr->rcache_attr, 0, sizeof(md_attr->rcache_attr));

status = md->ops->query(md, md_attr);
if (status != UCS_OK) {
return status;
Expand Down
82 changes: 55 additions & 27 deletions src/uct/ib/base/ib_md.c
Original file line number Diff line number Diff line change
Expand Up @@ -257,57 +257,85 @@ typedef struct {
struct ibv_mr **mr;
} uct_ib_md_mem_reg_thread_t;

static void uct_ib_check_gpudirect_driver(uct_ib_md_t *md, uct_md_attr_t *md_attr,
const char *file,
ucs_memory_type_t mem_type)
static int
uct_ib_check_gpudirect_driver(ucs_memory_type_t mem_type)
{
if (!access(file, F_OK)) {
md_attr->cap.reg_mem_types |= UCS_BIT(mem_type);
static const char *driver_file[UCS_MEMORY_TYPE_LAST] = {
[UCS_MEMORY_TYPE_CUDA] = "/sys/kernel/mm/memory_peers/nv_mem/version",
[UCS_MEMORY_TYPE_ROCM] = "/dev/kfd"
};
int enabled;

enabled = !access(driver_file[mem_type], F_OK);
ucs_debug("%s GPUDirect RDMA is %s", ucs_memory_type_names[mem_type],
enabled ? "enabled" : "disabled");
return enabled;
}

static ucs_status_t uct_ib_md_reg_mem_types()
{
static uint64_t cached_reg_mem_types = 0;
uint64_t reg_mem_types;

/*
* Cache GPU-direct drivers state, to avoid checking it in every call to
* uct_md_query()
*/
if (cached_reg_mem_types == 0) {
reg_mem_types = UCS_MEMORY_TYPES_CPU_ACCESSIBLE;

if (uct_ib_check_gpudirect_driver(UCS_MEMORY_TYPE_CUDA)) {
reg_mem_types |= UCS_BIT(UCS_MEMORY_TYPE_CUDA);
};

if (uct_ib_check_gpudirect_driver(UCS_MEMORY_TYPE_ROCM)) {
reg_mem_types |= UCS_BIT(UCS_MEMORY_TYPE_ROCM);
}

cached_reg_mem_types = reg_mem_types;
}

ucs_debug("%s: %s GPUDirect RDMA is %s",
uct_ib_device_name(&md->dev), ucs_memory_type_names[mem_type],
md_attr->cap.reg_mem_types & UCS_BIT(mem_type) ?
"enabled" : "disabled");
return cached_reg_mem_types;
}

static ucs_status_t uct_ib_md_query(uct_md_h uct_md, uct_md_attr_t *md_attr)
{
uct_ib_md_t *md = ucs_derived_of(uct_md, uct_ib_md_t);
uint64_t reg_mem_types;

md_attr->cap.max_alloc = ULONG_MAX; /* TODO query device */
md_attr->cap.max_reg = ULONG_MAX; /* TODO query device */
md_attr->cap.flags = UCT_MD_FLAG_REG |
UCT_MD_FLAG_NEED_MEMH |
UCT_MD_FLAG_NEED_RKEY |
UCT_MD_FLAG_ADVISE;
md_attr->cap.reg_mem_types = UCS_MEMORY_TYPES_CPU_ACCESSIBLE;
md_attr->cap.access_mem_type = UCS_MEMORY_TYPE_HOST;
md_attr->cap.detect_mem_types = 0;

if (md->config.enable_gpudirect_rdma != UCS_NO) {
/* check if GDR driver is loaded */
uct_ib_check_gpudirect_driver(md, md_attr,
"/sys/kernel/mm/memory_peers/nv_mem/version",
UCS_MEMORY_TYPE_CUDA);

/* check if ROCM KFD driver is loaded */
uct_ib_check_gpudirect_driver(md, md_attr, "/dev/kfd",
UCS_MEMORY_TYPE_ROCM);

if (!(md_attr->cap.reg_mem_types & ~UCS_BIT(UCS_MEMORY_TYPE_HOST)) &&
(md->config.enable_gpudirect_rdma == UCS_YES)) {
ucs_error("%s: Couldn't enable GPUDirect RDMA. Please make sure"
" nv_peer_mem or amdgpu plugin installed correctly.",
uct_ib_device_name(&md->dev));
return UCS_ERR_UNSUPPORTED;
}
reg_mem_types = uct_ib_md_reg_mem_types();
if (md->config.enable_gpudirect_rdma == UCS_NO) {
/* Disable GPU-direct mem types */
md_attr->cap.reg_mem_types = reg_mem_types &
UCS_MEMORY_TYPES_CPU_ACCESSIBLE;
} else if ((md->config.enable_gpudirect_rdma == UCS_YES) &&
!(md_attr->cap.reg_mem_types & ~UCS_BIT(UCS_MEMORY_TYPE_HOST))) {
/* GPU-direct required, but only host memory is supported */
ucs_error("%s: Couldn't enable GPUDirect RDMA. Please make sure"
" nv_peer_mem or amdgpu plugin installed correctly.",
uct_ib_device_name(&md->dev));
return UCS_ERR_UNSUPPORTED;
} else {
md_attr->cap.reg_mem_types = reg_mem_types;
}

md_attr->rkey_packed_size = UCT_IB_MD_PACKED_RKEY_SIZE;
md_attr->reg_cost = md->reg_cost;
ucs_sys_cpuset_copy(&md_attr->local_cpus, &md->dev.local_cpus);

if (md->rcache != NULL) {
ucs_rcache_query(md->rcache, &md_attr->rcache_attr);
}

return UCS_OK;
}

Expand Down
18 changes: 12 additions & 6 deletions test/apps/iodemo/io_demo.cc
Original file line number Diff line number Diff line change
Expand Up @@ -748,12 +748,18 @@ class DemoServer : public P2pDemoCommon {
}

void report_state() {
LOG << "read:" << _curr_state.read_count -
_prev_state.read_count << " ops, "
<< "write:" << _curr_state.write_count -
_prev_state.write_count << " ops, "
memory_pin_stats_t pin_stats;
memory_pin_stats(&pin_stats);

LOG << "read:" << _curr_state.read_count - _prev_state.read_count
<< " ops, "
<< "write:" << _curr_state.write_count - _prev_state.write_count
<< " ops, "
<< "active connections:" << _curr_state.active_conns
<< ", buffers:" << _data_buffers_pool.allocated();
<< ", buffers:" << _data_buffers_pool.allocated()
<< ", pin bytes:" << pin_stats.bytes
<< " regions:" << pin_stats.regions
<< " evict:" << pin_stats.evictions;
save_prev_state();
}

Expand Down Expand Up @@ -839,7 +845,7 @@ class DemoClient : public P2pDemoCommon {
};

DemoClient(const options_t& test_opts) :
P2pDemoCommon(test_opts),
P2pDemoCommon(test_opts),
_num_active_servers_to_use(0),
_prev_connect_time(0), _num_sent(0), _num_completed(0),
_status(OK), _start_time(get_time()),
Expand Down
19 changes: 19 additions & 0 deletions test/apps/iodemo/ucx_wrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,25 @@ void UcxContext::progress()
progress_failed_connections();
}

void UcxContext::memory_pin_stats(memory_pin_stats_t *stats)
{
ucp_context_attr_t ctx_attr;

ctx_attr.field_mask = UCP_ATTR_FIELD_NUM_PINNED_REGIONS |
UCP_ATTR_FIELD_NUM_PINNED_EVICTIONS |
UCP_ATTR_FIELD_NUM_PINNED_BYTES;
ucs_status_t status = ucp_context_query(_context, &ctx_attr);
if (status == UCS_OK) {
stats->regions = ctx_attr.num_pinned_regions;
stats->bytes = ctx_attr.num_pinned_bytes;
stats->evictions = ctx_attr.num_pinned_evictions;
} else {
stats->regions = 0;
stats->bytes = 0;
stats->evictions = 0;
}
}

uint32_t UcxContext::get_next_conn_id()
{
static uint32_t conn_id = 1;
Expand Down
Loading

0 comments on commit 7c395d6

Please sign in to comment.