From 67dd637a9d547e50b13dcf317122769179c13ffa Mon Sep 17 00:00:00 2001 From: Jiakun Yan Date: Mon, 12 Jun 2023 22:52:09 -0500 Subject: [PATCH] lci parcelport: upgrade to LCI v1.7.6; add more configuration options The major changes include: - Necessary changes to CMake files to accommodate changes due to the split of the original LCI library into two libraries (LCI and LCT). - A new performance counters and logging infrastructure based on LCT (currently only applied to the lci pp). - Controlled by CMake variables HPX_WITH_PARCELPORT_LCI_PCOUNTER and HPX_WITH_PARCELPORT_LCI_LOG. The default is OFF. - New LCI parcelport configurations: - reg_mem: whether to explicitly register memory buffers for long messages (value 1) or just let LCI register them on the fly (value 0). The default is 1. - ndevices: how many LCI devices (low-level network contexts) to use. The default is 2. - ncomps: how many completion managers to use. The default is 1. --- CMakeLists.txt | 20 +- cmake/HPX_SetupLCI.cmake | 7 +- .../include/hpx/lci_base/lci_environment.hpp | 53 +++- libs/core/lci_base/src/lci_environment.cpp | 152 +++++++---- .../completion_manager_queue.hpp | 5 +- .../include/hpx/parcelport_lci/config.hpp | 6 + .../include/hpx/parcelport_lci/header.hpp | 30 ++- .../hpx/parcelport_lci/parcelport_lci.hpp | 41 ++- .../parcelport_lci/putva/receiver_putva.hpp | 27 +- .../putva/sender_connection_putva.hpp | 2 + .../parcelport_lci/sender_connection_base.hpp | 3 + .../sendrecv/receiver_connection_sendrecv.hpp | 5 + .../sendrecv/receiver_sendrecv.hpp | 19 +- .../sendrecv/sender_connection_sendrecv.hpp | 5 + libs/full/parcelport_lci/src/config.cpp | 25 ++ .../parcelport_lci/src/parcelport_lci.cpp | 249 +++++++++++------- .../src/putva/sender_connection_putva.cpp | 57 +++- libs/full/parcelport_lci/src/sender_base.cpp | 15 +- .../src/sender_connection_base.cpp | 63 +++-- .../sendrecv/receiver_connection_sendrecv.cpp | 63 +++-- .../src/sendrecv/receiver_sendrecv.cpp | 52 ++-- .../sendrecv/sender_connection_sendrecv.cpp | 73 +++-- 22 files changed, 695 insertions(+), 277 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 93b9bdccad28..50a0aef9976e 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1147,10 +1147,28 @@ if(HPX_WITH_NETWORKING) ADVANCED ) hpx_option( - HPX_WITH_LCI_TAG STRING "LCI repository tag or branch" "v1.7.5" + HPX_WITH_LCI_TAG STRING "LCI repository tag or branch" "v1.7.6" CATEGORY "Build Targets" ADVANCED ) + hpx_option( + HPX_WITH_PARCELPORT_LCI_LOG STRING + "Enable the LCI-parcelport-specific logger" OFF + CATEGORY "Parcelport" + ADVANCED + ) + if(HPX_WITH_PARCELPORT_LCI_LOG) + hpx_add_config_define(HPX_HAVE_PARCELPORT_LCI_LOG) + endif() + hpx_option( + HPX_WITH_PARCELPORT_LCI_PCOUNTER STRING + "Enable the LCI-parcelport-specific performance counter" OFF + CATEGORY "Parcelport" + ADVANCED + ) + if(HPX_WITH_PARCELPORT_LCI_PCOUNTER) + hpx_add_config_define(HPX_HAVE_PARCELPORT_LCI_PCOUNTER) + endif() hpx_option( HPX_WITH_PARCELPORT_TCP BOOL "Enable the TCP based parcelport." ON diff --git a/cmake/HPX_SetupLCI.cmake b/cmake/HPX_SetupLCI.cmake index d34cc6e08ec0..649bdb828deb 100644 --- a/cmake/HPX_SetupLCI.cmake +++ b/cmake/HPX_SetupLCI.cmake @@ -87,7 +87,7 @@ macro(hpx_setup_lci) endif() install( - TARGETS LCI lci-ucx + TARGETS LCI LCT lci-ucx EXPORT HPXLCITarget COMPONENT core LIBRARY DESTINATION ${CMAKE_INSTALL_LIBDIR} @@ -95,7 +95,8 @@ macro(hpx_setup_lci) ) install( - DIRECTORY ${lci_SOURCE_DIR}/src/api/ ${lci_BINARY_DIR}/src/api/ + DIRECTORY ${lci_SOURCE_DIR}/lci/api/ ${lci_BINARY_DIR}/lci/api/ + ${lci_SOURCE_DIR}/lct/api/ ${lci_BINARY_DIR}/lct/api/ DESTINATION ${CMAKE_INSTALL_INCLUDEDIR} COMPONENT core FILES_MATCHING @@ -103,7 +104,7 @@ macro(hpx_setup_lci) ) export( - TARGETS LCI lci-ucx + TARGETS LCI LCT lci-ucx NAMESPACE LCI:: FILE "${CMAKE_CURRENT_BINARY_DIR}/lib/cmake/${HPX_PACKAGE_NAME}/HPXLCITarget.cmake" ) diff --git a/libs/core/lci_base/include/hpx/lci_base/lci_environment.hpp b/libs/core/lci_base/include/hpx/lci_base/lci_environment.hpp index 7997f78885ee..112935fb7542 100644 --- a/libs/core/lci_base/include/hpx/lci_base/lci_environment.hpp +++ b/libs/core/lci_base/include/hpx/lci_base/lci_environment.hpp @@ -42,22 +42,55 @@ namespace hpx { namespace util { static std::string get_processor_name(); - // Configurations: - // Log level + // log enum class log_level_t { none, profile, - debug + debug, }; static log_level_t log_level; - // Output filename of log - static FILE* log_outfile; - static void log(log_level_t level, const char* format, ...); - - private: - static bool enabled_; - }; +#ifdef HPX_HAVE_PARCELPORT_LCI_LOG + static LCT_log_ctx_t log_ctx; +#endif + static void log( + log_level_t level, const char* tag, const char* format, ...); + // performance counter +// clang-format off +#define HPX_LCI_PCOUNTER_NONE_FOR_EACH(_macro) + +#define HPX_LCI_PCOUNTER_TREND_FOR_EACH(_macro) \ + _macro(send_conn_start) \ + _macro(send_conn_end) \ + _macro(recv_conn_start) \ + _macro(recv_conn_end) + +#define HPX_LCI_PCOUNTER_TIMER_FOR_EACH(_macro) \ + _macro(send_conn_timer) \ + _macro(recv_conn_timer) \ + _macro(async_write_timer) \ + _macro(send_timer) \ + _macro(handle_parcels) \ + _macro(poll_comp) \ + _macro(useful_bg_work) + // clang-format on + +#define HPX_LCI_PCOUNTER_HANDLE_DECL(name) static LCT_pcounter_handle_t name; + + HPX_LCI_PCOUNTER_NONE_FOR_EACH(HPX_LCI_PCOUNTER_HANDLE_DECL) + HPX_LCI_PCOUNTER_TREND_FOR_EACH(HPX_LCI_PCOUNTER_HANDLE_DECL) + HPX_LCI_PCOUNTER_TIMER_FOR_EACH(HPX_LCI_PCOUNTER_HANDLE_DECL) + + static LCT_pcounter_ctx_t pcounter_ctx; + static int64_t pcounter_now(); + static int64_t pcounter_since(int64_t then); + static void pcounter_add(LCT_pcounter_handle_t handle, int64_t val); + static void pcounter_start(LCT_pcounter_handle_t handle); + static void pcounter_end(LCT_pcounter_handle_t handle); + + private: + static bool enabled_; + }; }} // namespace hpx::util #include diff --git a/libs/core/lci_base/src/lci_environment.cpp b/libs/core/lci_base/src/lci_environment.cpp index 1ed9e55ba1c8..f3e51a29eb30 100644 --- a/libs/core/lci_base/src/lci_environment.cpp +++ b/libs/core/lci_base/src/lci_environment.cpp @@ -91,60 +91,19 @@ namespace hpx { namespace util { defined(HPX_HAVE_MODULE_LCI_BASE) namespace hpx { namespace util { - bool lci_environment::enabled_ = false; - lci_environment::log_level_t lci_environment::log_level = log_level_t::none; - FILE* lci_environment::log_outfile = nullptr; + lci_environment::log_level_t lci_environment::log_level; +#ifdef HPX_HAVE_PARCELPORT_LCI_LOG + LCT_log_ctx_t lci_environment::log_ctx; +#endif + LCT_pcounter_ctx_t lci_environment::pcounter_ctx; - /////////////////////////////////////////////////////////////////////////// - void lci_environment::init_config(util::runtime_configuration& rtcfg) - { - // The default value here does not matter here - std::string log_level_str = get_entry_as( - rtcfg, "hpx.parcel.lci.log_level", "" /* Does not matter*/); - if (log_level_str == "none") - log_level = log_level_t::none; - else if (log_level_str == "profile") - log_level = log_level_t::profile; - else if (log_level_str == "debug") - log_level = log_level_t::debug; - else - throw std::runtime_error("Unknown log level " + log_level_str); - std::string log_filename = get_entry_as( - rtcfg, "hpx.parcel.lci.log_outfile", "" /* Does not matter*/); - if (log_filename == "stderr") - log_outfile = stderr; - else if (log_filename == "stdout") - log_outfile = stdout; - else - { - const int filename_max = 256; - char filename[filename_max]; - char* p0_old = log_filename.data(); - char* p0_new = strchr(log_filename.data(), '%'); - char* p1 = filename; - while (p0_new) - { - long nbytes = p0_new - p0_old; - HPX_ASSERT(p1 + nbytes < filename + filename_max); - memcpy(p1, p0_old, nbytes); - p1 += nbytes; - nbytes = - snprintf(p1, filename + filename_max - p1, "%d", LCI_RANK); - p1 += nbytes; - p0_old = p0_new + 1; - p0_new = strchr(p0_old, '%'); - } - strncat(p1, p0_old, filename + filename_max - p1 - 1); - log_outfile = fopen(filename, "w+"); - if (log_outfile == nullptr) - { - throw std::runtime_error( - "Cannot open the logfile " + std::string(filename)); - } - } - } +#define HPX_LCI_PCOUNTER_HANDLE_DEF(name) \ + LCT_pcounter_handle_t lci_environment::name; + HPX_LCI_PCOUNTER_NONE_FOR_EACH(HPX_LCI_PCOUNTER_HANDLE_DEF) + HPX_LCI_PCOUNTER_TREND_FOR_EACH(HPX_LCI_PCOUNTER_HANDLE_DEF) + HPX_LCI_PCOUNTER_TIMER_FOR_EACH(HPX_LCI_PCOUNTER_HANDLE_DEF) /////////////////////////////////////////////////////////////////////////// void lci_environment::init( int*, char***, util::runtime_configuration& rtcfg) @@ -186,7 +145,34 @@ namespace hpx { namespace util { rtcfg.add_entry("hpx.parcel.bootstrap", "lci"); rtcfg.add_entry("hpx.parcel.lci.rank", std::to_string(this_rank)); - init_config(rtcfg); + LCT_init(); + // initialize the log context +#ifdef HPX_HAVE_PARCELPORT_LCI_LOG + const char* const log_levels[] = {"none", "profile", "debug"}; + log_ctx = LCT_log_ctx_alloc(log_levels, + sizeof(log_levels) / sizeof(log_levels[0]), 0, "hpx_lci", + getenv("HPX_LCI_LOG_OUTFILE"), getenv("HPX_LCI_LOG_LEVEL"), + getenv("HPX_LCI_LOG_WHITELIST"), getenv("HPX_LCI_LOG_BLACKLIST")); + log_level = static_cast(LCT_log_get_level(log_ctx)); +#else + log_level = log_level_t::none; +#endif +#ifdef HPX_HAVE_PARCELPORT_LCI_PCOUNTER + // initialize the performance counters + pcounter_ctx = LCT_pcounter_ctx_alloc("hpx-lci"); + +#define HPX_LCI_PCOUNTER_NONE_REGISTER(name) \ + name = LCT_pcounter_register(pcounter_ctx, #name, LCT_PCOUNTER_NONE); + HPX_LCI_PCOUNTER_NONE_FOR_EACH(HPX_LCI_PCOUNTER_NONE_REGISTER) + +#define HPX_LCI_PCOUNTER_TREND_REGISTER(name) \ + name = LCT_pcounter_register(pcounter_ctx, #name, LCT_PCOUNTER_TREND); + HPX_LCI_PCOUNTER_TREND_FOR_EACH(HPX_LCI_PCOUNTER_TREND_REGISTER) + +#define HPX_LCI_PCOUNTER_TIMER_REGISTER(name) \ + name = LCT_pcounter_register(pcounter_ctx, #name, LCT_PCOUNTER_TIMER); + HPX_LCI_PCOUNTER_TIMER_FOR_EACH(HPX_LCI_PCOUNTER_TIMER_REGISTER) +#endif enabled_ = true; } @@ -200,7 +186,13 @@ namespace hpx { namespace util { if (enabled()) { enabled_ = false; - // for some reasons, this code block can be entered twice when HPX exits +#ifdef HPX_HAVE_PARCELPORT_LCI_PCOUNTER + LCT_pcounter_ctx_free(&pcounter_ctx); +#endif +#ifdef HPX_HAVE_PARCELPORT_LCI_LOG + LCT_log_ctx_free(&log_ctx); +#endif + LCT_fina(); int lci_init = 0; LCI_initialized(&lci_init); if (lci_init) @@ -240,17 +232,63 @@ namespace hpx { namespace util { return res; } - void lci_environment::log( - lci_environment::log_level_t level, const char* format, ...) + void lci_environment::log([[maybe_unused]] log_level_t level, + [[maybe_unused]] const char* tag, [[maybe_unused]] const char* format, + ...) { +#ifdef HPX_HAVE_PARCELPORT_LCI_LOG + if (level > log_level) + return; va_list args; va_start(args, format); - if (level <= log_level) - vfprintf(log_outfile, format, args); + LCT_Logv(log_ctx, static_cast(level), tag, format, args); va_end(args); +#endif + } + + int64_t lci_environment::pcounter_now() + { +#ifdef HPX_HAVE_PARCELPORT_LCI_PCOUNTER + return static_cast(LCT_now()); +#endif + return 0; + } + + int64_t lci_environment::pcounter_since([[maybe_unused]] int64_t then) + { +#ifdef HPX_HAVE_PARCELPORT_LCI_PCOUNTER + return static_cast(LCT_now()) - then; +#endif + return 0; + } + + void lci_environment::pcounter_add( + [[maybe_unused]] LCT_pcounter_handle_t handle, + [[maybe_unused]] int64_t val) + { +#ifdef HPX_HAVE_PARCELPORT_LCI_PCOUNTER + LCT_pcounter_add(pcounter_ctx, handle, val); +#endif + } + + void lci_environment::pcounter_start( + [[maybe_unused]] LCT_pcounter_handle_t handle) + { +#ifdef HPX_HAVE_PARCELPORT_LCI_PCOUNTER + LCT_pcounter_start(pcounter_ctx, handle); +#endif } + + void lci_environment::pcounter_end( + [[maybe_unused]] LCT_pcounter_handle_t handle) + { +#ifdef HPX_HAVE_PARCELPORT_LCI_PCOUNTER + LCT_pcounter_end(pcounter_ctx, handle); +#endif + } + }} // namespace hpx::util #endif diff --git a/libs/full/parcelport_lci/include/hpx/parcelport_lci/completion_manager/completion_manager_queue.hpp b/libs/full/parcelport_lci/include/hpx/parcelport_lci/completion_manager/completion_manager_queue.hpp index d013779110cf..af6404ed0bbc 100644 --- a/libs/full/parcelport_lci/include/hpx/parcelport_lci/completion_manager/completion_manager_queue.hpp +++ b/libs/full/parcelport_lci/include/hpx/parcelport_lci/completion_manager/completion_manager_queue.hpp @@ -17,7 +17,10 @@ namespace hpx::parcelset::policies::lci { { completion_manager_queue() { - LCI_queue_create(LCI_UR_DEVICE, &queue); + // LCI_queue_create(LCI_UR_DEVICE, &queue); + // Hack for now + LCI_queue_createx(LCI_UR_DEVICE, + LCI_SERVER_NUM_PKTS * (size_t) config_t::ndevices, &queue); } ~completion_manager_queue() diff --git a/libs/full/parcelport_lci/include/hpx/parcelport_lci/config.hpp b/libs/full/parcelport_lci/include/hpx/parcelport_lci/config.hpp index 376a78b70019..027e908b4d3d 100644 --- a/libs/full/parcelport_lci/include/hpx/parcelport_lci/config.hpp +++ b/libs/full/parcelport_lci/include/hpx/parcelport_lci/config.hpp @@ -45,6 +45,12 @@ namespace hpx::parcelset::policies::lci { // How many pre-posted receives for new messages // (can only be applied to `sendrecv` protocol). static int prepost_recv_num; + // Whether to register the buffer in HPX (or rely on LCI to register it) + static bool reg_mem; + // How many devices to use + static int ndevices; + // How many completion managers to use + static int ncomps; static void init_config(util::runtime_configuration const& rtcfg); }; diff --git a/libs/full/parcelport_lci/include/hpx/parcelport_lci/header.hpp b/libs/full/parcelport_lci/include/hpx/parcelport_lci/header.hpp index 2ebb7f18ea76..d9ad22b1f534 100644 --- a/libs/full/parcelport_lci/include/hpx/parcelport_lci/header.hpp +++ b/libs/full/parcelport_lci/include/hpx/parcelport_lci/header.hpp @@ -28,23 +28,25 @@ namespace hpx::parcelset::policies::lci { { // siguature for assert_valid pos_signature = 0, + // device idx + pos_device_idx = 1 * sizeof(value_type), // tag - pos_tag = 1 * sizeof(value_type), + pos_tag = 2 * sizeof(value_type), // non-zero-copy chunk size - pos_numbytes_nonzero_copy = 2 * sizeof(value_type), + pos_numbytes_nonzero_copy = 3 * sizeof(value_type), // transmission chunk size - pos_numbytes_tchunk = 3 * sizeof(value_type), + pos_numbytes_tchunk = 4 * sizeof(value_type), // how many bytes in total (including zero-copy and non-zero-copy chunks) - pos_numbytes = 4 * sizeof(value_type), + pos_numbytes = 5 * sizeof(value_type), // zero-copy chunk number - pos_numchunks_zero_copy = 5 * sizeof(value_type), + pos_numchunks_zero_copy = 6 * sizeof(value_type), // non-zero-copy chunk number - pos_numchunks_nonzero_copy = 6 * sizeof(value_type), + pos_numchunks_nonzero_copy = 7 * sizeof(value_type), // whether piggyback data - pos_piggy_back_flag_data = 7 * sizeof(value_type), + pos_piggy_back_flag_data = 8 * sizeof(value_type), // whether piggyback transmission chunk - pos_piggy_back_flag_tchunk = 7 * sizeof(value_type) + 1, - pos_piggy_back_address = 7 * sizeof(value_type) + 2 + pos_piggy_back_flag_tchunk = 8 * sizeof(value_type) + 1, + pos_piggy_back_address = 8 * sizeof(value_type) + 2 }; template @@ -133,6 +135,16 @@ namespace hpx::parcelset::policies::lci { return get(); } + void set_device_idx(int device_idx) noexcept + { + set(static_cast(device_idx)); + } + + value_type get_device_idx() const noexcept + { + return get(); + } + void set_tag(LCI_tag_t tag) noexcept { set(static_cast(tag)); diff --git a/libs/full/parcelport_lci/include/hpx/parcelport_lci/parcelport_lci.hpp b/libs/full/parcelport_lci/include/hpx/parcelport_lci/parcelport_lci.hpp index 42d9b17b4cb3..94d975437ce7 100644 --- a/libs/full/parcelport_lci/include/hpx/parcelport_lci/parcelport_lci.hpp +++ b/libs/full/parcelport_lci/include/hpx/parcelport_lci/parcelport_lci.hpp @@ -120,27 +120,39 @@ namespace hpx::parcelset { // whether the parcelport has been initialized // (starting to execute the background works) std::atomic is_initialized = false; - // whether the parcelport is sending early parcels - std::atomic is_sending_early_parcel = false; // LCI objects - LCI_device_t device; - LCI_endpoint_t endpoint_new; - LCI_endpoint_t endpoint_followup; + struct completion_manager_t; + struct device_t + { + // These are all pointers to the real data structure allocated + // by LCI. They would not be modified once initialized. + // So we should not have false sharing here. + int idx; + LCI_device_t device; + LCI_endpoint_t endpoint_new; + LCI_endpoint_t endpoint_followup; + completion_manager_t* completion_manager_p; + }; + std::vector devices; // Parcelport objects static std::atomic prg_thread_flag; std::unique_ptr prg_thread_p; - std::shared_ptr send_completion_manager; - std::shared_ptr - recv_new_completion_manager; - std::shared_ptr - recv_followup_completion_manager; + struct completion_manager_t + { + std::shared_ptr send; + std::shared_ptr recv_new; + std::shared_ptr recv_followup; + }; + std::vector completion_managers; - bool do_progress(); + bool do_progress_local(); + device_t& get_tls_device(); private: - static void progress_thread_fn(LCI_device_t device); + static void progress_thread_fn( + const std::vector& devices); void setup(util::runtime_configuration const& rtcfg); void cleanup(); @@ -250,7 +262,10 @@ namespace hpx::traits { "protocol = putsendrecv\n" "comp_type = queue\n" "progress_type = rp\n" - "prepost_recv_num = 1\n"; + "prepost_recv_num = 1\n" + "reg_mem = 1\n" + "ndevices = 2\n" + "ncomps = 1\n"; } }; } // namespace hpx::traits diff --git a/libs/full/parcelport_lci/include/hpx/parcelport_lci/putva/receiver_putva.hpp b/libs/full/parcelport_lci/include/hpx/parcelport_lci/putva/receiver_putva.hpp index 845eb301f0b3..09fbc87f52fd 100644 --- a/libs/full/parcelport_lci/include/hpx/parcelport_lci/putva/receiver_putva.hpp +++ b/libs/full/parcelport_lci/include/hpx/parcelport_lci/putva/receiver_putva.hpp @@ -37,17 +37,28 @@ namespace hpx::parcelset::policies::lci { bool background_work() noexcept { - // We first try to accept a new connection + bool did_some_work = false; + + auto poll_comp_start = util::lci_environment::pcounter_now(); + auto completion_manager_p = + pp_->get_tls_device().completion_manager_p; request_wrapper_t request; - request.request = pp_->recv_new_completion_manager->poll(); + request.request = completion_manager_p->recv_new->poll(); + util::lci_environment::pcounter_add( + util::lci_environment::poll_comp, + util::lci_environment::pcounter_since(poll_comp_start)); if (request.request.flag == LCI_OK) { + auto useful_bg_start = util::lci_environment::pcounter_now(); HPX_ASSERT(request.request.flag == LCI_OK); process_request(request.request); - return true; + util::lci_environment::pcounter_add( + util::lci_environment::useful_bg_work, + util::lci_environment::pcounter_since(useful_bg_start)); + did_some_work = true; } - return false; + return did_some_work; } private: @@ -83,6 +94,8 @@ namespace hpx::parcelset::policies::lci { hpx::chrono::high_resolution_timer timer_; parcelset::data_point& data = buffer.data_point_; #endif + util::lci_environment::pcounter_add( + util::lci_environment::recv_conn_start, 1); // decode header header header_ = header((char*) address); header_.assert_valid(); @@ -97,6 +110,8 @@ namespace hpx::parcelset::policies::lci { int num_non_zero_copy_chunks = header_.num_non_zero_copy_chunks(); buffer.num_chunks_.first = num_zero_copy_chunks; buffer.num_chunks_.second = num_non_zero_copy_chunks; + util::lci_environment::pcounter_add( + util::lci_environment::recv_conn_end, 1); #if defined(HPX_HAVE_PARCELPORT_COUNTERS) data.bytes_ = static_cast(header_.numbytes()); data.time_ = timer_.elapsed_nanoseconds() - data.time_; @@ -111,6 +126,8 @@ namespace hpx::parcelset::policies::lci { parcelset::data_point& data = buffer.data_point_; data.time_ = timer_.elapsed_nanoseconds(); #endif + util::lci_environment::pcounter_add( + util::lci_environment::recv_conn_start, 1); // decode header header header_ = header((char*) iovec.piggy_back.address); header_.assert_valid(); @@ -169,6 +186,8 @@ namespace hpx::parcelset::policies::lci { } } HPX_ASSERT(i == iovec.count); + util::lci_environment::pcounter_add( + util::lci_environment::recv_conn_end, 1); #if defined(HPX_HAVE_PARCELPORT_COUNTERS) data.bytes_ = static_cast(header_.numbytes()); data.time_ = timer_.elapsed_nanoseconds(); diff --git a/libs/full/parcelport_lci/include/hpx/parcelport_lci/putva/sender_connection_putva.hpp b/libs/full/parcelport_lci/include/hpx/parcelport_lci/putva/sender_connection_putva.hpp index 12af797f2c20..fd60ac298443 100644 --- a/libs/full/parcelport_lci/include/hpx/parcelport_lci/putva/sender_connection_putva.hpp +++ b/libs/full/parcelport_lci/include/hpx/parcelport_lci/putva/sender_connection_putva.hpp @@ -54,6 +54,8 @@ namespace hpx::parcelset::policies::lci { LCI_iovec_t iovec; std::shared_ptr* sharedPtr_p; // for LCI_putva + // for profiling + LCT_time_t conn_start_time; }; } // namespace hpx::parcelset::policies::lci diff --git a/libs/full/parcelport_lci/include/hpx/parcelport_lci/sender_connection_base.hpp b/libs/full/parcelport_lci/include/hpx/parcelport_lci/sender_connection_base.hpp index 34dbb44f968f..4a5a5ede9842 100644 --- a/libs/full/parcelport_lci/include/hpx/parcelport_lci/sender_connection_base.hpp +++ b/libs/full/parcelport_lci/include/hpx/parcelport_lci/sender_connection_base.hpp @@ -10,6 +10,7 @@ #if defined(HPX_HAVE_NETWORKING) && defined(HPX_HAVE_PARCELPORT_LCI) +#include #include #include @@ -52,6 +53,7 @@ namespace hpx::parcelset::policies::lci { : dst_rank(dst) , pp_((lci::parcelport*) pp) , there_(parcelset::locality(locality(dst_rank))) + , device_p(nullptr) { } @@ -84,6 +86,7 @@ namespace hpx::parcelset::policies::lci { postprocess_handler_type postprocess_handler_; parcelport* pp_; parcelset::locality there_; + parcelport::device_t* device_p; #if defined(HPX_HAVE_PARCELPORT_COUNTERS) parcelset::data_point data_point_; #endif diff --git a/libs/full/parcelport_lci/include/hpx/parcelport_lci/sendrecv/receiver_connection_sendrecv.hpp b/libs/full/parcelport_lci/include/hpx/parcelport_lci/sendrecv/receiver_connection_sendrecv.hpp index f4daedd2f785..c2194c7bfe68 100644 --- a/libs/full/parcelport_lci/include/hpx/parcelport_lci/sendrecv/receiver_connection_sendrecv.hpp +++ b/libs/full/parcelport_lci/include/hpx/parcelport_lci/sendrecv/receiver_connection_sendrecv.hpp @@ -69,7 +69,12 @@ namespace hpx::parcelset::policies::lci { std::vector parcels_; std::vector> chunk_buffers_; parcelport* pp_; + parcelport::device_t* device_p; std::shared_ptr* sharedPtr_p; + // temporary data + LCI_segment_t segment_used; + // for profiling + LCT_time_t conn_start_time; }; } // namespace hpx::parcelset::policies::lci diff --git a/libs/full/parcelport_lci/include/hpx/parcelport_lci/sendrecv/receiver_sendrecv.hpp b/libs/full/parcelport_lci/include/hpx/parcelport_lci/sendrecv/receiver_sendrecv.hpp index 47dda2c04c47..83a69995460d 100644 --- a/libs/full/parcelport_lci/include/hpx/parcelport_lci/sendrecv/receiver_sendrecv.hpp +++ b/libs/full/parcelport_lci/include/hpx/parcelport_lci/sendrecv/receiver_sendrecv.hpp @@ -42,14 +42,19 @@ namespace hpx::parcelset::policies::lci { { if (config_t::protocol == config_t::protocol_t::sendrecv) { - for (int i = 0; i < config_t::prepost_recv_num; ++i) + for (std::size_t i = 0; i < pp_->devices.size(); ++i) { - LCI_comp_t completion = - pp_->recv_new_completion_manager->alloc_completion(); - LCI_recvmn(pp_->endpoint_new, LCI_RANK_ANY, 0, completion, - nullptr); - pp_->recv_new_completion_manager->enqueue_completion( - completion); + auto& device = pp->devices[i]; + for (int j = 0; j < config_t::prepost_recv_num; ++j) + { + LCI_comp_t completion = + device.completion_manager_p->recv_new + ->alloc_completion(); + LCI_recvmn(device.endpoint_new, LCI_RANK_ANY, 0, + completion, reinterpret_cast(i)); + device.completion_manager_p->recv_new + ->enqueue_completion(completion); + } } } } diff --git a/libs/full/parcelport_lci/include/hpx/parcelport_lci/sendrecv/sender_connection_sendrecv.hpp b/libs/full/parcelport_lci/include/hpx/parcelport_lci/sendrecv/sender_connection_sendrecv.hpp index 866f64995301..4fa4c4d4ca8e 100644 --- a/libs/full/parcelport_lci/include/hpx/parcelport_lci/sendrecv/sender_connection_sendrecv.hpp +++ b/libs/full/parcelport_lci/include/hpx/parcelport_lci/sendrecv/sender_connection_sendrecv.hpp @@ -63,6 +63,11 @@ namespace hpx::parcelset::policies::lci { LCI_tag_t tag; LCI_tag_t original_tag; std::shared_ptr* sharedPtr_p; + // temporary data + LCI_comp_t completion; + LCI_segment_t segment_to_use, segment_used; + // for profiling + LCT_time_t conn_start_time; static std::atomic next_tag; }; diff --git a/libs/full/parcelport_lci/src/config.cpp b/libs/full/parcelport_lci/src/config.cpp index ace852670da3..6d527cf14f23 100644 --- a/libs/full/parcelport_lci/src/config.cpp +++ b/libs/full/parcelport_lci/src/config.cpp @@ -25,6 +25,9 @@ namespace hpx::parcelset::policies::lci { config_t::progress_type_t config_t::progress_type; int config_t::progress_thread_num; int config_t::prepost_recv_num; + bool config_t::reg_mem; + int config_t::ndevices; + int config_t::ncomps; void config_t::init_config(util::runtime_configuration const& rtcfg) { @@ -99,6 +102,9 @@ namespace hpx::parcelset::policies::lci { util::get_entry_as(rtcfg, "hpx.parcel.lci.prg_thread_num", -1); prepost_recv_num = util::get_entry_as(rtcfg, "hpx.parcel.lci.prepost_recv_num", 1); + reg_mem = util::get_entry_as(rtcfg, "hpx.parcel.lci.reg_mem", 1); + ndevices = util::get_entry_as(rtcfg, "hpx.parcel.lci.ndevices", 1); + ncomps = util::get_entry_as(rtcfg, "hpx.parcel.lci.ncomps", 1); if (!enable_send_immediate && enable_lci_backlog_queue) { @@ -113,6 +119,25 @@ namespace hpx::parcelset::policies::lci { progress_type = progress_type_t::pthread; fprintf(stderr, "WARNING: set progress_type to pthread!\n"); } +#ifndef LCI_ENABLE_MULTITHREAD_PROGRESS + if (progress_type == progress_type_t::worker || + progress_thread_num > ndevices) + { + fprintf(stderr, + "WARNING: Thread-safe LCI_progress is needed " + "but not enabled during compilation!\n"); + } +#endif + if (ncomps > ndevices) + { + int old_ncomps = ncomps; + ncomps = ndevices; + fprintf(stderr, + "WARNING: the number of completion managers (%d) " + "cannot exceed the number of devices (%d). " + "ncomps is adjusted accordingly (%d).", + old_ncomps, ndevices, ncomps); + } } } // namespace hpx::parcelset::policies::lci #endif diff --git a/libs/full/parcelport_lci/src/parcelport_lci.cpp b/libs/full/parcelport_lci/src/parcelport_lci.cpp index 8d96e8b61551..c6d47958f928 100644 --- a/libs/full/parcelport_lci/src/parcelport_lci.cpp +++ b/libs/full/parcelport_lci/src/parcelport_lci.cpp @@ -35,8 +35,10 @@ #include #include #include +#include namespace hpx::parcelset::policies::lci { + parcelset::locality parcelport::here() { return parcelset::locality(locality(util::lci_environment::enabled() ? @@ -143,52 +145,51 @@ namespace hpx::parcelset::policies::lci { void parcelport::send_early_parcel( hpx::parcelset::locality const& dest, parcel p) { - is_sending_early_parcel = true; base_type::send_early_parcel(dest, HPX_MOVE(p)); - is_sending_early_parcel = false; } bool parcelport::do_background_work( std::size_t num_thread, parcelport_background_mode mode) { - static thread_local int do_lci_progress = -1; - if (do_lci_progress == -1) + static thread_local bool devices_to_progress_initialized = false; + static thread_local std::vector devices_to_progress; + if (!devices_to_progress_initialized) { - do_lci_progress = 0; + devices_to_progress_initialized = true; if (config_t::progress_type == config_t::progress_type_t::rp && hpx::threads::get_self_id() != hpx::threads::invalid_thread_id) { if (hpx::this_thread::get_pool() == &hpx::resource::get_thread_pool("lci-progress-pool")) - do_lci_progress = 1; - } - } - - bool has_work = false; - // magic number - const int max_idle_loop_count = 1000; - if (do_lci_progress == 1) - { - int idle_loop_count = 0; - while (idle_loop_count < max_idle_loop_count) - { - while (util::lci_environment::do_progress(device)) { - has_work = true; - idle_loop_count = 0; + std::size_t prg_thread_id = + hpx::get_local_worker_thread_num(); + double rate = (double) config_t::ndevices / + config_t::progress_thread_num; + for (int i = prg_thread_id * rate; + i < (prg_thread_id + 1) * rate; ++i) + { + devices_to_progress.push_back(&devices[i]); + } } - ++idle_loop_count; } } - else if (do_lci_progress == 2) + + bool has_work = false; + if (!devices_to_progress.empty()) { + // magic number + const int max_idle_loop_count = 1000; int idle_loop_count = 0; while (idle_loop_count < max_idle_loop_count) { - while (util::lci_environment::do_progress(device)) + for (auto device_p : devices_to_progress) { - has_work = true; - idle_loop_count = 0; + if (util::lci_environment::do_progress(device_p->device)) + { + has_work = true; + idle_loop_count = 0; + } } ++idle_loop_count; } @@ -213,11 +214,13 @@ namespace hpx::parcelset::policies::lci { if (config_t::progress_type == config_t::progress_type_t::worker || config_t::progress_type == config_t::progress_type_t::pthread_worker) - do_progress(); + do_progress_local(); if (config_t::enable_lci_backlog_queue) // try to send pending messages - has_work = backlog_queue::background_work( - send_completion_manager.get(), num_thread) || + has_work = + backlog_queue::background_work( + get_tls_device().completion_manager_p->send.get(), + num_thread) || has_work; } if (mode & parcelport_background_mode_receive) @@ -226,7 +229,7 @@ namespace hpx::parcelset::policies::lci { if (config_t::progress_type == config_t::progress_type_t::worker || config_t::progress_type == config_t::progress_type_t::pthread_worker) - do_progress(); + do_progress_local(); } return has_work; } @@ -258,7 +261,7 @@ namespace hpx::parcelset::policies::lci { if (config_t::progress_type == config_t::progress_type_t::worker || config_t::progress_type == config_t::progress_type_t::pthread_worker) - while (do_progress()) + while (do_progress_local()) continue; if (has_work) { @@ -275,11 +278,14 @@ namespace hpx::parcelset::policies::lci { } std::atomic parcelport::prg_thread_flag = false; - void parcelport::progress_thread_fn(LCI_device_t device) + void parcelport::progress_thread_fn(const std::vector& devices) { while (prg_thread_flag) { - util::lci_environment::do_progress(device); + for (auto& device : devices) + { + util::lci_environment::do_progress(device.device); + } } } @@ -300,77 +306,98 @@ namespace hpx::parcelset::policies::lci { void parcelport::setup(util::runtime_configuration const& rtcfg) { HPX_UNUSED(rtcfg); - // Create device - device = LCI_UR_DEVICE; - // Create completion objects - if (config_t::protocol == config_t::protocol_t::sendrecv && - config_t::completion_type == LCI_COMPLETION_SYNC) + // Create completion managers + completion_managers.resize(config_t::ncomps); + for (auto& completion_manager : completion_managers) { - if (config_t::prepost_recv_num == 1) + if (config_t::protocol == config_t::protocol_t::sendrecv && + config_t::completion_type == LCI_COMPLETION_SYNC) { - recv_new_completion_manager = - std::make_shared(); + if (config_t::prepost_recv_num == 1 && + config_t::ndevices == config_t::ncomps) + { + completion_manager.recv_new = + std::make_shared(); + } + else + { + completion_manager.recv_new = + std::make_shared(); + } } else { - recv_new_completion_manager = + completion_manager.recv_new = + std::make_shared(); + } + switch (config_t::completion_type) + { + case LCI_COMPLETION_QUEUE: + completion_manager.send = + std::make_shared(); + completion_manager.recv_followup = + std::make_shared(); + break; + case LCI_COMPLETION_SYNC: + completion_manager.send = std::make_shared(); + completion_manager.recv_followup = + std::make_shared(); + break; + default: + throw std::runtime_error("Unknown completion type!"); } } - else - { - recv_new_completion_manager = - std::make_shared(); - } - switch (config_t::completion_type) - { - case LCI_COMPLETION_QUEUE: - send_completion_manager = - std::make_shared(); - recv_followup_completion_manager = - std::make_shared(); - break; - case LCI_COMPLETION_SYNC: - send_completion_manager = - std::make_shared(); - recv_followup_completion_manager = - std::make_shared(); - break; - default: - throw std::runtime_error("Unknown completion type!"); - } - // Create endpoints - LCI_plist_t plist_; - LCI_plist_create(&plist_); - LCI_plist_set_comp_type( - plist_, LCI_PORT_COMMAND, config_t::completion_type); - LCI_plist_set_comp_type( - plist_, LCI_PORT_MESSAGE, config_t::completion_type); - LCI_endpoint_init(&endpoint_followup, device, plist_); - LCI_plist_set_default_comp( - plist_, recv_new_completion_manager->get_completion_object()); - if (config_t::protocol == config_t::protocol_t::sendrecv && - config_t::completion_type == LCI_COMPLETION_SYNC) - LCI_plist_set_comp_type( - plist_, LCI_PORT_MESSAGE, LCI_COMPLETION_SYNC); - else + // Create device + devices.resize(config_t::ndevices); + for (int i = 0; i < config_t::ndevices; ++i) { + auto& device = devices[i]; + // Create the LCI device + device.idx = i; + if (i == 0) + { + device.device = LCI_UR_DEVICE; + } + else + { + LCI_device_init(&device.device); + } + int comp_idx = i * config_t::ncomps / config_t::ndevices; + device.completion_manager_p = &completion_managers[comp_idx]; + // Create the LCI endpoint + LCI_plist_t plist_; + LCI_plist_create(&plist_); LCI_plist_set_comp_type( - plist_, LCI_PORT_MESSAGE, LCI_COMPLETION_QUEUE); + plist_, LCI_PORT_COMMAND, config_t::completion_type); + LCI_plist_set_comp_type( + plist_, LCI_PORT_MESSAGE, config_t::completion_type); + LCI_endpoint_init(&device.endpoint_followup, device.device, plist_); + LCI_plist_set_default_comp(plist_, + device.completion_manager_p->recv_new->get_completion_object()); + if (config_t::protocol == config_t::protocol_t::sendrecv && + config_t::completion_type == LCI_COMPLETION_SYNC) + LCI_plist_set_comp_type( + plist_, LCI_PORT_MESSAGE, LCI_COMPLETION_SYNC); + else + { + LCI_plist_set_comp_type( + plist_, LCI_PORT_MESSAGE, LCI_COMPLETION_QUEUE); + } + if (config_t::protocol == config_t::protocol_t::sendrecv) + LCI_plist_set_match_type(plist_, LCI_MATCH_TAG); + LCI_endpoint_init(&device.endpoint_new, device.device, plist_); + LCI_plist_free(&plist_); } - if (config_t::protocol == config_t::protocol_t::sendrecv) - LCI_plist_set_match_type(plist_, LCI_MATCH_TAG); - LCI_endpoint_init(&endpoint_new, device, plist_); - LCI_plist_free(&plist_); // Create progress threads HPX_ASSERT(prg_thread_flag == false); HPX_ASSERT(prg_thread_p == nullptr); prg_thread_flag = true; prg_thread_p = - std::make_unique(progress_thread_fn, device); + std::make_unique(progress_thread_fn, devices); // Create the sender and receiver switch (config_t::protocol) @@ -392,9 +419,16 @@ namespace hpx::parcelset::policies::lci { void parcelport::cleanup() { join_prg_thread_if_running(); - // free ep, rcq - LCI_endpoint_free(&endpoint_followup); - LCI_endpoint_free(&endpoint_new); + // Free devices + for (auto& device : devices) + { + LCI_endpoint_free(&device.endpoint_followup); + LCI_endpoint_free(&device.endpoint_new); + if (device.device != LCI_UR_DEVICE) + { + LCI_device_free(&device.device); + } + } } void parcelport::join_prg_thread_if_running() @@ -410,12 +444,49 @@ namespace hpx::parcelset::policies::lci { } } - bool parcelport::do_progress() + bool parcelport::do_progress_local() { bool ret = false; - ret = util::lci_environment::do_progress(device) || ret; + auto device = get_tls_device(); + ret = util::lci_environment::do_progress(device.device) || ret; return ret; } + + parcelport::device_t& parcelport::get_tls_device() + { + static thread_local std::size_t tls_device_idx = -1; + + if (HPX_UNLIKELY(!is_initialized || + hpx::threads::get_self_id() == hpx::threads::invalid_thread_id)) + { + static thread_local unsigned int tls_rand_seed = rand(); + util::lci_environment::log( + util::lci_environment::log_level_t::debug, "device", + "Rank %d unusual phase\n", LCI_RANK); + return devices[rand_r(&tls_rand_seed) % devices.size()]; + } + if (tls_device_idx == std::size_t(-1)) + { + // initialize TLS device + // hpx::threads::topology& topo = hpx::threads::create_topology(); + auto& rp = hpx::resource::get_partitioner(); + + std::size_t num_thread = + hpx::get_worker_thread_num(); // current worker + std::size_t total_thread_num = rp.get_num_threads(); + HPX_ASSERT(num_thread < total_thread_num); + std::size_t nthreads_per_device = + (total_thread_num + config_t::ndevices - 1) / + config_t::ndevices; + + tls_device_idx = num_thread / nthreads_per_device; + util::lci_environment::log( + util::lci_environment::log_level_t::debug, "device", + "Rank %d thread %lu/%lu gets device %lu\n", LCI_RANK, + num_thread, total_thread_num, tls_device_idx); + } + return devices[tls_device_idx]; + } } // namespace hpx::parcelset::policies::lci HPX_REGISTER_PARCELPORT(hpx::parcelset::policies::lci::parcelport, lci) diff --git a/libs/full/parcelport_lci/src/putva/sender_connection_putva.cpp b/libs/full/parcelport_lci/src/putva/sender_connection_putva.cpp index 1b13cac06738..6142e0355abe 100644 --- a/libs/full/parcelport_lci/src/putva/sender_connection_putva.cpp +++ b/libs/full/parcelport_lci/src/putva/sender_connection_putva.cpp @@ -49,7 +49,7 @@ namespace hpx::parcelset::policies::lci { data_point_ = buffer_.data_point_; data_point_.time_ = hpx::chrono::high_resolution_clock::now(); #endif - + conn_start_time = util::lci_environment::pcounter_now(); HPX_ASSERT(!handler_); HPX_ASSERT(!postprocess_handler_); HPX_ASSERT(!buffer_.data_.empty()); @@ -62,7 +62,7 @@ namespace hpx::parcelset::policies::lci { int num_zero_copy_chunks = static_cast(buffer_.num_chunks_.first); if (is_eager) { - while (LCI_mbuffer_alloc(pp_->device, &mbuffer) != LCI_OK) + while (LCI_mbuffer_alloc(device_p->device, &mbuffer) != LCI_OK) continue; HPX_ASSERT(mbuffer.length == (size_t) LCI_MEDIUM_SIZE); header_ = header(buffer_, (char*) mbuffer.address, mbuffer.length); @@ -97,7 +97,16 @@ namespace hpx::parcelset::policies::lci { // data (non-zero-copy chunks) iovec.lbuffers[i].address = buffer_.data_.data(); iovec.lbuffers[i].length = buffer_.data_.size(); - iovec.lbuffers[i].segment = LCI_SEGMENT_ALL; + if (config_t::reg_mem) + { + LCI_memory_register(device_p->device, + iovec.lbuffers[i].address, iovec.lbuffers[i].length, + &iovec.lbuffers[i].segment); + } + else + { + iovec.lbuffers[i].segment = LCI_SEGMENT_ALL; + } ++i; } if (num_zero_copy_chunks != 0) @@ -112,7 +121,16 @@ namespace hpx::parcelset::policies::lci { sizeof(parcel_buffer_type::transmission_chunk_type)); iovec.lbuffers[i].address = tchunks.data(); iovec.lbuffers[i].length = tchunks_length; - iovec.lbuffers[i].segment = LCI_SEGMENT_ALL; + if (config_t::reg_mem) + { + LCI_memory_register(device_p->device, + iovec.lbuffers[i].address, iovec.lbuffers[i].length, + &iovec.lbuffers[i].segment); + } + else + { + iovec.lbuffers[i].segment = LCI_SEGMENT_ALL; + } ++i; } // zero-copy chunks @@ -126,7 +144,17 @@ namespace hpx::parcelset::policies::lci { iovec.lbuffers[i].address = const_cast(c.data_.cpos_); iovec.lbuffers[i].length = c.size_; - iovec.lbuffers[i].segment = LCI_SEGMENT_ALL; + if (config_t::reg_mem) + { + LCI_memory_register(device_p->device, + iovec.lbuffers[i].address, + iovec.lbuffers[i].length, + &iovec.lbuffers[i].segment); + } + else + { + iovec.lbuffers[i].segment = LCI_SEGMENT_ALL; + } ++i; } } @@ -172,7 +200,7 @@ namespace hpx::parcelset::policies::lci { int ret; if (is_eager) { - ret = LCI_putmna(pp_->endpoint_new, mbuffer, dst_rank, 0, + ret = LCI_putmna(device_p->endpoint_new, mbuffer, dst_rank, 0, LCI_DEFAULT_COMP_REMOTE); if (ret == LCI_OK) { @@ -184,15 +212,15 @@ namespace hpx::parcelset::policies::lci { { void* buffer_to_free = iovec.piggy_back.address; LCI_comp_t completion = - pp_->send_completion_manager->alloc_completion(); + device_p->completion_manager_p->send->alloc_completion(); // In order to keep the send_connection object from being // deallocated. We have to allocate a shared_ptr in the heap // and pass a pointer to shared_ptr to LCI. // We will get this pointer back via the send completion queue // after this send completes. state.store(connection_state::locked, std::memory_order_relaxed); - ret = LCI_putva(pp_->endpoint_new, iovec, completion, dst_rank, 0, - LCI_DEFAULT_COMP_REMOTE, sharedPtr_p); + ret = LCI_putva(device_p->endpoint_new, iovec, completion, dst_rank, + 0, LCI_DEFAULT_COMP_REMOTE, sharedPtr_p); // After this point, if ret == OK, this object can be shared by // two threads (the sending thread and the thread polling the // completion queue). Care must be taken to avoid data race. @@ -215,6 +243,13 @@ namespace hpx::parcelset::policies::lci { if (!is_eager) { HPX_ASSERT(iovec.count > 0); + for (int i = 0; i < iovec.count; ++i) + { + if (iovec.lbuffers[i].segment != LCI_SEGMENT_ALL) + { + LCI_memory_deregister(&iovec.lbuffers[i].segment); + } + } free(iovec.lbuffers); } error_code ec; @@ -235,6 +270,10 @@ namespace hpx::parcelset::policies::lci { hpx::chrono::high_resolution_clock::now() - data_point_.time_; pp_->add_sent_data(data_point_); #endif + util::lci_environment::pcounter_add( + util::lci_environment::send_conn_timer, + util::lci_environment::pcounter_since(conn_start_time)); + if (postprocess_handler_) { // Return this connection to the connection cache. diff --git a/libs/full/parcelport_lci/src/sender_base.cpp b/libs/full/parcelport_lci/src/sender_base.cpp index eb490eb3bd4f..5eef8fb8c4c9 100644 --- a/libs/full/parcelport_lci/src/sender_base.cpp +++ b/libs/full/parcelport_lci/src/sender_base.cpp @@ -30,12 +30,15 @@ namespace hpx::parcelset::policies::lci { bool sender_base::background_work(size_t /* num_thread */) noexcept { bool did_some_work = false; - // try to accept a new connection - LCI_request_t request = pp_->send_completion_manager->poll(); - // LCI_queue_pop(util::lci_environment::get_scq(), &request); + auto poll_comp_start = util::lci_environment::pcounter_now(); + auto completion_manager_p = pp_->get_tls_device().completion_manager_p; + LCI_request_t request = completion_manager_p->send->poll(); + util::lci_environment::pcounter_add(util::lci_environment::poll_comp, + util::lci_environment::pcounter_since(poll_comp_start)); if (request.flag == LCI_OK) { + auto useful_bg_start = util::lci_environment::pcounter_now(); did_some_work = true; auto* sharedPtr_p = (connection_ptr*) request.user_context; sender_connection_base::return_t ret = (*sharedPtr_p)->send(); @@ -47,9 +50,11 @@ namespace hpx::parcelset::policies::lci { else if (ret.status == sender_connection_base::return_status_t::wait) { - pp_->send_completion_manager->enqueue_completion( - ret.completion); + completion_manager_p->send->enqueue_completion(ret.completion); } + util::lci_environment::pcounter_add( + util::lci_environment::useful_bg_work, + util::lci_environment::pcounter_since(useful_bg_start)); } return did_some_work; diff --git a/libs/full/parcelport_lci/src/sender_connection_base.cpp b/libs/full/parcelport_lci/src/sender_connection_base.cpp index f24a39b9a075..db78212232ee 100644 --- a/libs/full/parcelport_lci/src/sender_connection_base.cpp +++ b/libs/full/parcelport_lci/src/sender_connection_base.cpp @@ -30,6 +30,9 @@ namespace hpx::parcelset::policies::lci { sender_connection_base::handler_type&& handler, sender_connection_base::postprocess_handler_type&& parcel_postprocess) { + LCT_time_t async_write_start_time = + util::lci_environment::pcounter_now(); + device_p = &pp_->get_tls_device(); load(HPX_FORWARD(handler_type, handler), HPX_FORWARD(postprocess_handler_type, parcel_postprocess)); return_t ret = send(); @@ -39,53 +42,73 @@ namespace hpx::parcelset::policies::lci { } else if (ret.status == return_status_t::wait) { - pp_->send_completion_manager->enqueue_completion(ret.completion); + device_p->completion_manager_p->send->enqueue_completion( + ret.completion); } + util::lci_environment::pcounter_add( + util::lci_environment::async_write_timer, + util::lci_environment::pcounter_since(async_write_start_time)); } sender_connection_base::return_t sender_connection_base::send() { + auto start_time = util::lci_environment::pcounter_now(); + return_t ret; + const int retry_max_spin = 32; if (!config_t::enable_lci_backlog_queue || - HPX_UNLIKELY(pp_->is_sending_early_parcel)) + HPX_UNLIKELY(!pp_->is_initialized)) { // If we are sending early parcels, we should not expect the // thread make progress on the backlog queue - return_t ret; + int retry_count = 0; do { ret = send_nb(); - if (ret.status == return_status_t::retry && - (config_t::progress_type == + if (ret.status == return_status_t::retry) + { + ++retry_count; + if (retry_count > retry_max_spin) + { + retry_count = 0; + while (pp_->background_work( + -1, parcelport_background_mode_all)) + continue; + hpx::this_thread::yield(); + } + if (config_t::progress_type == config_t::progress_type_t::worker || config_t::progress_type == - config_t::progress_type_t::pthread_worker)) - while (pp_->do_progress()) - continue; + config_t::progress_type_t::pthread_worker) + while (pp_->do_progress_local()) + continue; + } } while (ret.status == return_status_t::retry); - return ret; } else { if (!backlog_queue::empty(dst_rank)) { backlog_queue::push(shared_from_this()); - return {return_status_t::retry, nullptr}; - } - return_t ret = send_nb(); - if (ret.status == return_status_t::retry) - { - backlog_queue::push(shared_from_this()); - return ret; + ret = {return_status_t::retry, nullptr}; } else { - return ret; + ret = send_nb(); + if (ret.status == return_status_t::retry) + { + backlog_queue::push(shared_from_this()); + } } } + util::lci_environment::pcounter_add(util::lci_environment::send_timer, + util::lci_environment::pcounter_since(start_time)); + return ret; } void sender_connection_base::profile_start_hook(const header& header_) { + util::lci_environment::pcounter_add( + util::lci_environment::send_conn_start, 1); if (util::lci_environment::log_level < util::lci_environment::log_level_t::profile) return; @@ -109,14 +132,16 @@ namespace hpx::parcelset::policies::lci { consumed += snprintf(buf + consumed, sizeof(buf) - consumed, "]\n"); HPX_ASSERT(sizeof(buf) > consumed); util::lci_environment::log( - util::lci_environment::log_level_t::profile, "%s", buf); + util::lci_environment::log_level_t::profile, "send", "%s", buf); } void sender_connection_base::profile_end_hook() { util::lci_environment::log(util::lci_environment::log_level_t::profile, - "%d:%lf:send_connection(%p) end\n", LCI_RANK, + "send", "%d:%lf:send_connection(%p) end\n", LCI_RANK, hpx::chrono::high_resolution_clock::now() / 1e9, (void*) this); + util::lci_environment::pcounter_add( + util::lci_environment::send_conn_end, 1); } } // namespace hpx::parcelset::policies::lci diff --git a/libs/full/parcelport_lci/src/sendrecv/receiver_connection_sendrecv.cpp b/libs/full/parcelport_lci/src/sendrecv/receiver_connection_sendrecv.cpp index 72fd3b5f2345..9dd683db0df1 100644 --- a/libs/full/parcelport_lci/src/sendrecv/receiver_connection_sendrecv.cpp +++ b/libs/full/parcelport_lci/src/sendrecv/receiver_connection_sendrecv.cpp @@ -32,6 +32,9 @@ namespace hpx::parcelset::policies::lci { void receiver_connection_sendrecv::load(char* header_buffer) { + conn_start_time = util::lci_environment::pcounter_now(); + util::lci_environment::pcounter_add( + util::lci_environment::recv_conn_start, 1); header header_ = header(header_buffer); header_.assert_valid(); #if defined(HPX_HAVE_PARCELPORT_COUNTERS) @@ -39,6 +42,7 @@ namespace hpx::parcelset::policies::lci { static_cast(header_.numbytes()); timer_.restart(); #endif + device_p = &pp_->devices[header_.get_device_idx()]; tag = header_.get_tag(); // decode data buffer.data_.allocate(header_.numbytes_nonzero_copy()); @@ -91,12 +95,14 @@ namespace hpx::parcelset::policies::lci { shared_from_this()); } // set state - state.store(connection_state::initialized, std::memory_order_release); recv_chunks_idx = 0; recv_zero_copy_chunks_idx = 0; original_tag = tag; + segment_used = LCI_SEGMENT_ALL; + state.store(connection_state::initialized, std::memory_order_release); util::lci_environment::log(util::lci_environment::log_level_t::debug, - "recv connection (%d, %d, %d) start!\n", dst_rank, LCI_RANK, tag); + "recv", "recv connection (%d, %d, %d) start!\n", dst_rank, LCI_RANK, + tag); } receiver_connection_sendrecv::return_t @@ -131,37 +137,51 @@ namespace hpx::parcelset::policies::lci { void* address, int length) { LCI_comp_t completion = - pp_->recv_followup_completion_manager->alloc_completion(); + device_p->completion_manager_p->recv_followup->alloc_completion(); if (length <= LCI_MEDIUM_SIZE) { LCI_mbuffer_t mbuffer; mbuffer.address = address; mbuffer.length = length; - LCI_error_t ret = LCI_recvm(pp_->endpoint_followup, mbuffer, + LCI_error_t ret = LCI_recvm(device_p->endpoint_followup, mbuffer, dst_rank, tag, completion, sharedPtr_p); HPX_ASSERT(ret == LCI_OK); HPX_UNUSED(ret); util::lci_environment::log( - util::lci_environment::log_level_t::debug, - "recvm (%d, %d, %d) tag %d size %d\n", dst_rank, LCI_RANK, - original_tag, tag, length); + util::lci_environment::log_level_t::debug, "recv", + "recvm (%d, %d, %d) device %d tag %d size %d\n", dst_rank, + LCI_RANK, original_tag, device_p->idx, tag, length); tag = (tag + 1) % LCI_MAX_TAG; } else { LCI_lbuffer_t lbuffer; - lbuffer.segment = LCI_SEGMENT_ALL; lbuffer.address = address; lbuffer.length = length; - LCI_error_t ret = LCI_recvl(pp_->endpoint_followup, lbuffer, + if (config_t::reg_mem) + { + LCI_memory_register(device_p->device, lbuffer.address, + lbuffer.length, &lbuffer.segment); + } + else + { + lbuffer.segment = LCI_SEGMENT_ALL; + } + LCI_error_t ret = LCI_recvl(device_p->endpoint_followup, lbuffer, dst_rank, tag, completion, sharedPtr_p); HPX_ASSERT(ret == LCI_OK); HPX_UNUSED(ret); util::lci_environment::log( - util::lci_environment::log_level_t::debug, - "recvl (%d, %d, %d) tag %d size %d\n", dst_rank, LCI_RANK, - original_tag, tag, length); + util::lci_environment::log_level_t::debug, "recv", + "recvl (%d, %d, %d) device %d tag %d size %d\n", dst_rank, + LCI_RANK, original_tag, device_p->idx, tag, length); tag = (tag + 1) % LCI_MAX_TAG; + if (segment_used != LCI_SEGMENT_ALL) + { + LCI_memory_deregister(&segment_used); + segment_used = LCI_SEGMENT_ALL; + } + segment_used = lbuffer.segment; } return completion; } @@ -334,13 +354,23 @@ namespace hpx::parcelset::policies::lci { void receiver_connection_sendrecv::done() { + util::lci_environment::pcounter_add( + util::lci_environment::recv_conn_end, 1); util::lci_environment::log(util::lci_environment::log_level_t::debug, - "recv connection (%d, %d, %d, %d) done!\n", dst_rank, LCI_RANK, - original_tag, tag - original_tag + 1); + "recv", "recv connection (%d, %d, %d, %d) done!\n", dst_rank, + LCI_RANK, original_tag, tag - original_tag + 1); + if (segment_used != LCI_SEGMENT_ALL) + { + LCI_memory_deregister(&segment_used); + segment_used = LCI_SEGMENT_ALL; + } #if defined(HPX_HAVE_PARCELPORT_COUNTERS) buffer.data_point_.time_ = timer_.elapsed_nanoseconds(); #endif - + util::lci_environment::pcounter_add( + util::lci_environment::recv_conn_timer, + util::lci_environment::pcounter_since(conn_start_time)); + auto handle_parcels_start_time = util::lci_environment::pcounter_now(); if (parcels_.empty()) { // decode and handle received data @@ -356,6 +386,9 @@ namespace hpx::parcelset::policies::lci { pp_->allow_zero_copy_receive_optimizations()); handle_received_parcels(HPX_MOVE(parcels_)); } + util::lci_environment::pcounter_add( + util::lci_environment::handle_parcels, + util::lci_environment::pcounter_since(handle_parcels_start_time)); buffer.data_.free(); parcels_.clear(); } diff --git a/libs/full/parcelport_lci/src/sendrecv/receiver_sendrecv.cpp b/libs/full/parcelport_lci/src/sendrecv/receiver_sendrecv.cpp index 6f9cfbd06b8c..beab2fae62eb 100644 --- a/libs/full/parcelport_lci/src/sendrecv/receiver_sendrecv.cpp +++ b/libs/full/parcelport_lci/src/sendrecv/receiver_sendrecv.cpp @@ -17,6 +17,7 @@ #include #include +#include #include namespace hpx::parcelset::policies::lci { @@ -38,22 +39,31 @@ namespace hpx::parcelset::policies::lci { bool receiver_sendrecv::accept_new() noexcept { + bool did_some_work = false; + + auto poll_comp_start = util::lci_environment::pcounter_now(); + auto completion_manager_p = pp_->get_tls_device().completion_manager_p; request_wrapper_t request; - request.request = pp_->recv_new_completion_manager->poll(); + request.request = completion_manager_p->recv_new->poll(); + util::lci_environment::pcounter_add(util::lci_environment::poll_comp, + util::lci_environment::pcounter_since(poll_comp_start)); if (request.request.flag == LCI_OK) { + auto useful_bg_start = util::lci_environment::pcounter_now(); if (config_t::protocol == config_t::protocol_t::sendrecv) { + std::size_t device_idx = + (std::size_t) request.request.user_context; + auto& device = pp_->devices[device_idx]; LCI_comp_t completion = - pp_->recv_new_completion_manager->alloc_completion(); - LCI_recvmn( - pp_->endpoint_new, LCI_RANK_ANY, 0, completion, nullptr); - pp_->recv_new_completion_manager->enqueue_completion( - completion); + completion_manager_p->recv_new->alloc_completion(); + LCI_recvmn(device.endpoint_new, LCI_RANK_ANY, 0, completion, + reinterpret_cast(device_idx)); + completion_manager_p->recv_new->enqueue_completion(completion); } util::lci_environment::log( - util::lci_environment::log_level_t::debug, + util::lci_environment::log_level_t::debug, "recv", "accept_new (%d, %d, %d) length %lu\n", request.request.rank, LCI_RANK, request.request.tag, request.request.data.mbuffer.length); @@ -67,22 +77,31 @@ namespace hpx::parcelset::policies::lci { } else { - pp_->recv_followup_completion_manager->enqueue_completion( + completion_manager_p->recv_followup->enqueue_completion( ret.completion); } - return true; + util::lci_environment::pcounter_add( + util::lci_environment::useful_bg_work, + util::lci_environment::pcounter_since(useful_bg_start)); + did_some_work = true; } - return false; + return did_some_work; } bool receiver_sendrecv::followup() noexcept { + bool did_some_work = false; // We don't use a request_wrapper here because all the receive buffers // should be managed by the connections - LCI_request_t request = pp_->recv_followup_completion_manager->poll(); + auto poll_comp_start = util::lci_environment::pcounter_now(); + auto completion_manager_p = pp_->get_tls_device().completion_manager_p; + LCI_request_t request = completion_manager_p->recv_followup->poll(); + util::lci_environment::pcounter_add(util::lci_environment::poll_comp, + util::lci_environment::pcounter_since(poll_comp_start)); if (request.flag == LCI_OK) { + auto useful_bg_start = util::lci_environment::pcounter_now(); HPX_ASSERT(request.user_context); auto* sharedPtr_p = (connection_ptr*) request.user_context; size_t length; @@ -91,7 +110,7 @@ namespace hpx::parcelset::policies::lci { else length = request.data.lbuffer.length; util::lci_environment::log( - util::lci_environment::log_level_t::debug, + util::lci_environment::log_level_t::debug, "recv", "followup (%d, %d, %d) length %lu\n", request.rank, LCI_RANK, request.tag, length); receiver_connection_sendrecv::return_t ret = @@ -103,12 +122,15 @@ namespace hpx::parcelset::policies::lci { } else { - pp_->recv_followup_completion_manager->enqueue_completion( + completion_manager_p->recv_followup->enqueue_completion( ret.completion); } - return true; + util::lci_environment::pcounter_add( + util::lci_environment::useful_bg_work, + util::lci_environment::pcounter_since(useful_bg_start)); + did_some_work = true; } - return false; + return did_some_work; } } // namespace hpx::parcelset::policies::lci diff --git a/libs/full/parcelport_lci/src/sendrecv/sender_connection_sendrecv.cpp b/libs/full/parcelport_lci/src/sendrecv/sender_connection_sendrecv.cpp index dd421c9028b1..8614ad5e7cb6 100644 --- a/libs/full/parcelport_lci/src/sendrecv/sender_connection_sendrecv.cpp +++ b/libs/full/parcelport_lci/src/sendrecv/sender_connection_sendrecv.cpp @@ -36,7 +36,7 @@ namespace hpx::parcelset::policies::lci { #if defined(HPX_HAVE_PARCELPORT_COUNTERS) timer_.restart(); #endif - + conn_start_time = util::lci_environment::pcounter_now(); HPX_ASSERT(!handler_); HPX_ASSERT(!postprocess_handler_); HPX_ASSERT(!buffer_.data_.empty()); @@ -44,7 +44,7 @@ namespace hpx::parcelset::policies::lci { postprocess_handler_ = HPX_MOVE(parcel_postprocess); // build header - while (LCI_mbuffer_alloc(pp_->device, &header_buffer) != LCI_OK) + while (LCI_mbuffer_alloc(device_p->device, &header_buffer) != LCI_OK) continue; HPX_ASSERT(header_buffer.length == (size_t) LCI_MEDIUM_SIZE); header_ = header( @@ -81,15 +81,20 @@ namespace hpx::parcelset::policies::lci { } if ((int) tag <= LCI_MAX_TAG && (int) tag + num_send > LCI_MAX_TAG) util::lci_environment::log( - util::lci_environment::log_level_t::debug, + util::lci_environment::log_level_t::debug, "tag", "Rank %d Wrap around!\n", LCI_RANK); + header_.set_device_idx(device_p->idx); header_.set_tag(tag); send_chunks_idx = 0; + completion = nullptr; + segment_to_use = LCI_SEGMENT_ALL; + segment_used = LCI_SEGMENT_ALL; // set state profile_start_hook(header_); state.store(connection_state::initialized, std::memory_order_release); original_tag = tag; util::lci_environment::log(util::lci_environment::log_level_t::debug, + "send", "send connection (%d, %d, %d, %d) tchunks " "%d data %d chunks %d start!\n", LCI_RANK, dst_rank, original_tag, num_send, @@ -138,18 +143,19 @@ namespace hpx::parcelset::policies::lci { LCI_error_t ret; if (config_t::protocol == config_t::protocol_t::putsendrecv) { - ret = LCI_putmna(pp_->endpoint_new, header_buffer, dst_rank, 0, + ret = LCI_putmna(device_p->endpoint_new, header_buffer, dst_rank, 0, LCI_DEFAULT_COMP_REMOTE); } else { HPX_ASSERT(config_t::protocol == config_t::protocol_t::sendrecv); - ret = LCI_sendmn(pp_->endpoint_new, header_buffer, dst_rank, 0); + ret = + LCI_sendmn(device_p->endpoint_new, header_buffer, dst_rank, 0); } if (ret == LCI_OK) { util::lci_environment::log( - util::lci_environment::log_level_t::debug, + util::lci_environment::log_level_t::debug, "send", "%s (%d, %d, %d) length %lu\n", config_t::protocol == config_t::protocol_t::putsendrecv ? "LCI_putmna" : @@ -174,13 +180,13 @@ namespace hpx::parcelset::policies::lci { buffer.address = address; buffer.length = length; LCI_error_t ret = - LCI_sendm(pp_->endpoint_followup, buffer, dst_rank, tag); + LCI_sendm(device_p->endpoint_followup, buffer, dst_rank, tag); if (ret == LCI_OK) { util::lci_environment::log( - util::lci_environment::log_level_t::debug, - "sendm (%d, %d, %d) tag %d size %d\n", LCI_RANK, dst_rank, - original_tag, tag, length); + util::lci_environment::log_level_t::debug, "send", + "sendm (%d, %d, %d) device %d tag %d size %d\n", LCI_RANK, + dst_rank, original_tag, device_p->idx, tag, length); tag = (tag + 1) % LCI_MAX_TAG; return {return_status_t::done, nullptr}; } @@ -192,22 +198,39 @@ namespace hpx::parcelset::policies::lci { } else { + if (config_t::reg_mem && segment_to_use == LCI_SEGMENT_ALL) + { + LCI_memory_register( + device_p->device, address, length, &segment_to_use); + } + if (completion == nullptr) + { + completion = + device_p->completion_manager_p->send->alloc_completion(); + } LCI_lbuffer_t buffer; - buffer.segment = LCI_SEGMENT_ALL; + buffer.segment = segment_to_use; buffer.address = address; buffer.length = length; - LCI_comp_t completion = - pp_->send_completion_manager->alloc_completion(); - LCI_error_t ret = LCI_sendl(pp_->endpoint_followup, buffer, + LCI_error_t ret = LCI_sendl(device_p->endpoint_followup, buffer, dst_rank, tag, completion, sharedPtr_p); if (ret == LCI_OK) { util::lci_environment::log( - util::lci_environment::log_level_t::debug, - "sendl (%d, %d, %d) tag %d size %d\n", LCI_RANK, dst_rank, - original_tag, tag, length); + util::lci_environment::log_level_t::debug, "send", + "sendl (%d, %d, %d) device %d, tag %d size %d\n", LCI_RANK, + dst_rank, original_tag, device_p->idx, tag, length); tag = (tag + 1) % LCI_MAX_TAG; - return {return_status_t::wait, completion}; + if (segment_used != LCI_SEGMENT_ALL) + { + LCI_memory_deregister(&segment_used); + segment_used = LCI_SEGMENT_ALL; + } + segment_used = segment_to_use; + segment_to_use = LCI_SEGMENT_ALL; + auto ret_comp = completion; + completion = nullptr; + return {return_status_t::wait, ret_comp}; } else { @@ -331,8 +354,8 @@ namespace hpx::parcelset::policies::lci { void sender_connection_sendrecv::done() { util::lci_environment::log(util::lci_environment::log_level_t::debug, - "send connection (%d, %d, %d, %d) done!\n", LCI_RANK, dst_rank, - original_tag, tag - original_tag + 1); + "send", "send connection (%d, %d, %d, %d) done!\n", LCI_RANK, + dst_rank, original_tag, tag - original_tag + 1); profile_end_hook(); error_code ec; handler_(ec); @@ -341,7 +364,17 @@ namespace hpx::parcelset::policies::lci { data_point_.time_ = timer_.elapsed_nanoseconds(); pp_->add_sent_data(data_point_); #endif + if (segment_used != LCI_SEGMENT_ALL) + { + LCI_memory_deregister(&segment_used); + segment_used = LCI_SEGMENT_ALL; + } + HPX_ASSERT(completion == nullptr); + HPX_ASSERT(segment_to_use == LCI_SEGMENT_ALL); buffer_.clear(); + util::lci_environment::pcounter_add( + util::lci_environment::send_conn_timer, + util::lci_environment::pcounter_since(conn_start_time)); if (postprocess_handler_) {