From 71eb2147285196435b865a631800ee299a67941e Mon Sep 17 00:00:00 2001 From: Changcheng Liu Date: Wed, 6 Apr 2022 15:41:39 +0800 Subject: [PATCH] UCP: develop path compatible function Signed-off-by: Changcheng Liu --- buildlib/pr/io_demo/io-demo.yml | 16 ++++++++++ src/ucp/wireup/wireup.c | 52 ++++++++++++++++++++++++--------- src/ucp/wireup/wireup.h | 5 ++-- src/ucp/wireup/wireup_cm.c | 20 ++----------- 4 files changed, 60 insertions(+), 33 deletions(-) diff --git a/buildlib/pr/io_demo/io-demo.yml b/buildlib/pr/io_demo/io-demo.yml index f72b44c3714..505e5825572 100644 --- a/buildlib/pr/io_demo/io-demo.yml +++ b/buildlib/pr/io_demo/io-demo.yml @@ -35,6 +35,22 @@ parameters: interface: $(roce_iface_cx6) tls: "rc_x" test_name: tag_umemh_cx6_rc + "server one path compatible on CX4": + args: "" + initial_delay: 9999 # No interference + duration: 120 + interface: $(roce_iface_cx4) + tls: "rc_x" + extra_run_args: "--env server UCX_IB_NUM_PATHS=1 --env client UCX_IB_NUM_PATHS=2" + test_name: tag_cx4_rc_server_one_path + "client one path compatible on CX4": + args: "" + initial_delay: 9999 # No interference + duration: 120 + interface: $(roce_iface_cx4) + tls: "rc_x" + extra_run_args: "--env server UCX_IB_NUM_PATHS=2 --env client UCX_IB_NUM_PATHS=1" + test_name: tag_cx4_rc_client_one_path jobs: - job: io_build diff --git a/src/ucp/wireup/wireup.c b/src/ucp/wireup/wireup.c index 38327e48c42..55c811b397b 100644 --- a/src/ucp/wireup/wireup.c +++ b/src/ucp/wireup/wireup.c @@ -295,13 +295,11 @@ ucp_wireup_find_remote_p2p_addr(ucp_ep_h ep, ucp_lane_index_t remote_lane, ucs_status_t ucp_wireup_connect_local(ucp_ep_h ep, const ucp_unpacked_address_t *remote_address, - const ucp_lane_index_t *lanes2remote, - int *num_eps_connected_p) + const ucp_lane_index_t *lanes2remote) { ucp_lane_index_t lane, remote_lane; const uct_device_addr_t *dev_addr; const uct_ep_addr_t *ep_addr; - int num_eps_connected = 0; ucs_status_t status; ucs_trace("ep %p: connect local transports", ep); @@ -325,14 +323,11 @@ ucp_wireup_connect_local(ucp_ep_h ep, if (status != UCS_OK) { goto out; } - - num_eps_connected++; } status = UCS_OK; out: - *num_eps_connected_p = num_eps_connected; return status; } @@ -426,7 +421,6 @@ ucp_wireup_process_request(ucp_worker_h worker, const ucp_wireup_msg_t *msg, uint64_t tl_bitmap = 0; int send_reply = 0; unsigned ep_init_flags = 0; - int num_eps_connected = 0; ucp_rsc_index_t lanes2remote[UCP_MAX_LANES]; unsigned addr_indices[UCP_MAX_LANES]; ucs_status_t status; @@ -516,8 +510,7 @@ ucp_wireup_process_request(ucp_worker_h worker, const ucp_wireup_msg_t *msg, /* Connect p2p addresses to remote endpoint */ if (!(ep->flags & UCP_EP_FLAG_LOCAL_CONNECTED)) { - status = ucp_wireup_connect_local(ep, remote_address, lanes2remote, - &num_eps_connected); + status = ucp_wireup_connect_local(ep, remote_address, lanes2remote); if (status != UCS_OK) { return; } @@ -584,7 +577,6 @@ ucp_wireup_process_reply(ucp_worker_h worker, const ucp_wireup_msg_t *msg, const ucp_unpacked_address_t *remote_address) { uct_worker_cb_id_t cb_id = UCS_CALLBACKQ_ID_NULL; - int num_eps_connected = 0; ucs_status_t status; ucp_ep_h ep; int ack; @@ -611,7 +603,7 @@ ucp_wireup_process_reply(ucp_worker_h worker, const ucp_wireup_msg_t *msg, * **receiver** ep lane should be connected to a given ep address. So we * don't pass 'lanes2remote' mapping, and use local lanes directly. */ - status = ucp_wireup_connect_local(ep, remote_address, NULL, &num_eps_connected); + status = ucp_wireup_connect_local(ep, remote_address, NULL); if (status != UCS_OK) { return; } @@ -975,6 +967,31 @@ ucp_wireup_get_reachable_mds(ucp_worker_h worker, key->reachable_md_map = dst_md_map; } +static void +ucp_wireup_reclaim_unused_uct_ep(ucp_ep_h ep, ucp_ep_config_key_t *new_key, + ucs_queue_head_t *uct_ep_req_queue) +{ + ucp_ep_config_key_t *old_key = &ucp_ep_config(ep)->key; + ucp_lane_index_t lane; + + ucs_assert(new_key->num_lanes > 1); + if (old_key->num_lanes == 0) { + return; + } + + ucs_assertv(old_key->num_lanes >= new_key->num_lanes, + "inited lane : %d, selected lane : %d", + (uint32_t)(old_key->num_lanes), (uint32_t)(new_key->num_lanes)); + for (lane = new_key->num_lanes; lane < old_key->num_lanes; ++lane) { + uct_ep_pending_purge(ep->uct_eps[lane], ucp_wireup_pending_purge_cb, + uct_ep_req_queue); + uct_ep_destroy(ep->uct_eps[lane]); + ep->uct_eps[lane] = NULL; + } + + return; +} + ucs_status_t ucp_wireup_init_lanes(ucp_ep_h ep, unsigned ep_init_flags, uint64_t local_tl_bitmap, const ucp_unpacked_address_t *remote_address, @@ -988,6 +1005,8 @@ ucs_status_t ucp_wireup_init_lanes(ucp_ep_h ep, unsigned ep_init_flags, ucs_status_t status; char str[32]; ucp_wireup_ep_t *cm_wireup_ep; + ucs_queue_head_t uct_ep_req_queue; + ucp_request_t *req; ucs_assert(tl_bitmap != 0); @@ -1002,6 +1021,9 @@ ucs_status_t ucp_wireup_init_lanes(ucp_ep_h ep, unsigned ep_init_flags, return status; } + ucs_queue_head_init(&uct_ep_req_queue); + ucp_wireup_reclaim_unused_uct_ep(ep, &key, &uct_ep_req_queue); + /* Get all reachable MDs from full remote address list */ key.dst_md_cmpts = ucs_alloca(sizeof(*key.dst_md_cmpts) * UCP_MAX_MDS); ucp_wireup_get_reachable_mds(worker, remote_address, &ucp_ep_config(ep)->key, @@ -1070,6 +1092,10 @@ ucs_status_t ucp_wireup_init_lanes(ucp_ep_h ep, unsigned ep_init_flags, ep->flags |= UCP_EP_FLAG_LOCAL_CONNECTED; } + ucs_queue_for_each_extract(req, &uct_ep_req_queue, send.uct.priv, 1) { + ucp_request_send(req, 0); + } + return UCS_OK; } @@ -1096,7 +1122,7 @@ ucs_status_t ucp_wireup_send_request(ucp_ep_h ep) return status; } -static void ucp_wireup_connect_remote_purge_cb(uct_pending_req_t *self, void *arg) +void ucp_wireup_pending_purge_cb(uct_pending_req_t *self, void *arg) { ucp_request_t *req = ucs_container_of(self, ucp_request_t, send.uct); ucs_queue_head_t *queue = arg; @@ -1169,7 +1195,7 @@ ucs_status_t ucp_wireup_connect_remote(ucp_ep_h ep, ucp_lane_index_t lane) * could not be progressed any more after switching to wireup proxy). */ ucs_queue_head_init(&tmp_q); - uct_ep_pending_purge(uct_ep, ucp_wireup_connect_remote_purge_cb, &tmp_q); + uct_ep_pending_purge(uct_ep, ucp_wireup_pending_purge_cb, &tmp_q); /* the wireup ep should use the existing [am_lane] as next_ep */ ucp_wireup_ep_set_next_ep(ep->uct_eps[lane], uct_ep); diff --git a/src/ucp/wireup/wireup.h b/src/ucp/wireup/wireup.h index e5072b5aa84..39ee5f42956 100644 --- a/src/ucp/wireup/wireup.h +++ b/src/ucp/wireup/wireup.h @@ -154,7 +154,8 @@ unsigned ucp_ep_init_flags(const ucp_worker_h worker, ucs_status_t ucp_wireup_connect_local(ucp_ep_h ep, const ucp_unpacked_address_t *remote_address, - const ucp_lane_index_t *lanes2remote, - int *num_eps_connected_p); + const ucp_lane_index_t *lanes2remote); + +void ucp_wireup_pending_purge_cb(uct_pending_req_t *self, void *arg); #endif diff --git a/src/ucp/wireup/wireup_cm.c b/src/ucp/wireup/wireup_cm.c index 73bbd7a75e7..95654771797 100644 --- a/src/ucp/wireup/wireup_cm.c +++ b/src/ucp/wireup/wireup_cm.c @@ -249,7 +249,6 @@ static unsigned ucp_cm_client_connect_progress(void *arg) ucp_worker_h worker = ucp_ep->worker; ucp_context_h context = worker->context; uct_ep_h uct_cm_ep = ucp_ep_get_cm_uct_ep(ucp_ep); - int num_eps_connected = 0; ucp_wireup_ep_t *wireup_ep; ucp_unpacked_address_t addr; uint64_t tl_bitmap; @@ -306,7 +305,7 @@ static unsigned ucp_cm_client_connect_progress(void *arg) goto out_unblock; } - status = ucp_wireup_connect_local(ucp_ep, &addr, NULL, &num_eps_connected); + status = ucp_wireup_connect_local(ucp_ep, &addr, NULL); if (status != UCS_OK) { goto out_unblock; } @@ -659,9 +658,6 @@ ucp_ep_cm_server_create_connected(ucp_worker_h worker, unsigned ep_init_flags, { uint64_t tl_bitmap = ucp_context_dev_tl_bitmap(worker->context, conn_request->dev_name); - int num_remote_ep_addrs = 0; - int num_eps_connected = 0; - const ucp_address_entry_t *address; ucp_ep_h ep; ucs_status_t status; char client_addr_str[UCS_SOCKADDR_STRING_LEN]; @@ -675,10 +671,6 @@ ucp_ep_cm_server_create_connected(ucp_worker_h worker, unsigned ep_init_flags, return UCS_ERR_UNREACHABLE; } - ucp_unpacked_address_for_each(address, remote_addr) { - num_remote_ep_addrs += address->num_ep_addrs; - } - /* Create and connect TL part */ status = ucp_ep_create_to_worker_addr(worker, tl_bitmap, remote_addr, ep_init_flags, @@ -687,20 +679,12 @@ ucp_ep_cm_server_create_connected(ucp_worker_h worker, unsigned ep_init_flags, return status; } - status = ucp_wireup_connect_local(ep, remote_addr, NULL, &num_eps_connected); + status = ucp_wireup_connect_local(ep, remote_addr, NULL); if (status != UCS_OK) { ucp_ep_destroy_internal(ep); return status; } - if (num_remote_ep_addrs > num_eps_connected) { - ucs_error("server received more remote endpoint addresses (%d) than it " - "can connect to (%d)", - num_remote_ep_addrs, num_eps_connected); - ucp_ep_destroy_internal(ep); - return UCS_ERR_SOME_CONNECTS_FAILED; - } - status = ucp_ep_cm_connect_server_lane(ep, conn_request); if (status != UCS_OK) { ucp_ep_destroy_internal(ep);