Skip to content

Commit

Permalink
Implement thread pinning for threads with high-frequency communication
Browse files Browse the repository at this point in the history
  • Loading branch information
PeterTh committed Nov 18, 2024
1 parent 9344534 commit 0c1ae63
Show file tree
Hide file tree
Showing 14 changed files with 399 additions and 118 deletions.
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ if(CMAKE_GENERATOR STREQUAL "Ninja")
endif()

set(SOURCES
src/affinity.cc
src/config.cc
src/command_graph_generator.cc
src/dry_run_executor.cc
Expand Down
4 changes: 2 additions & 2 deletions ci/run-benchmarks.sh
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ if [[ $# -ne 0 ]]; then
fi

# we use taskset to pin the process to a specific set of cores and their HTs to reduce benchmark result variance
# this set is chosen to all be located on the second die of the Threadripper 2920X CPU in our current CI benchmark system "gpuc2"
bash /root/capture-backtrace.sh taskset -c 6-11,18-23 test/benchmarks \
# this set is chosen to all be located well on the Threadripper 2920X CPU in our current CI benchmark system "gpuc2"
bash /root/capture-backtrace.sh taskset -c 4-11,16-23 test/benchmarks \
--reporter celerity-benchmark-md::out=gpuc2_bench.md \
--reporter celerity-benchmark-csv::out=gpuc2_bench.csv
84 changes: 76 additions & 8 deletions include/affinity.h
Original file line number Diff line number Diff line change
@@ -1,15 +1,83 @@
#pragma once

#include <cstdint>
#include <string_view>
#include <vector>

namespace celerity {
namespace detail {
// The goal of this thread pinning mechanism, when enabled, is to ensure that threads which benefit from fast communication
// are pinned to cores that are close to each other in terms of cache hierarchy.
// It currently accomplishes this by pinning threads to cores in a round-robin fashion according to their order in the `thread_type` enum.
//
// In terms of interface design, the goal is to provide a very simple entry point (`pin_this_thread`), that is safe to use from any thread at any time,
// and does not require polluting any other modules with state related to thread pinning. The `thread_pinner` RAII class offers the only way to manage the
// lifetime of the pinning mechanism, and prevents misuse. The implementation safely removes pinning from any thread it previously pinned on teardown.
//
// TODO: A future extension would be to respect NUMA for threads performing memory operations, but this requires in-depth knowledge of the system's topology.
namespace celerity::detail::thread_pinning {

uint32_t affinity_cores_available();
constexpr uint32_t thread_type_step = 10000;

/* a priori we need 3 threads, plus 1 for parallel-task workers and at least one more for host-task.
This depends on the application invoking celerity. */
constexpr static uint64_t min_cores_needed = 5;
// The threads Celerity interacts with ("user") and creates (everything else), identified for the purpose of pinning.
// Note: this is not an enum class to make interactions such as specifying `first_backend_worker+i` easier
enum thread_type : uint32_t {
user = 0 * thread_type_step,
scheduler = 1 * thread_type_step,
executor = 2 * thread_type_step,
first_backend_worker = 3 * thread_type_step,
first_host_queue = 4 * thread_type_step,
};
std::string thread_type_to_string(const thread_type t_type);

} // namespace detail
} // namespace celerity
// User-level configuration of the thread pinning mechanism (set by the user via environment variables)
struct environment_configuration {
bool enabled = true; // we want thread pinning to be enabled by default
uint32_t starting_from_core = 1;
std::vector<uint32_t> hardcoded_core_ids;
};

// Parses and validates the environment variable string, returning the corresponding configuration
environment_configuration parse_validate_env(const std::string_view str);

// Configures the pinning mechanism
// For now, only "standard" threads are pinned
// these are threads that benefit from rapid communication between each other,
// i.e. scheduler -> executor -> backend workers
// Extensible for future use where some threads might benefit from NUMA-aware per-GPU pinning
struct runtime_configuration {
// Whether or not to perform pinning
bool enabled = false;

// Number of devices that will need corresponding threads
uint32_t num_devices = 1;

// Number of processes running in legacy mode
uint32_t num_legacy_processes = 1;
// Process index of current process running in legacy mode
uint32_t legacy_process_index = 0;

// The core to start pinning "standard" threads to
uint32_t standard_core_start_id = 1;

// If set, this list of core ids will be used for pinning instead of the default round-robin scheme
// The list must contain exactly as many elements as there are standard threads
std::vector<uint32_t> hardcoded_core_ids = {}; // NOLINT(readability-redundant-member-init) -- to allow partial designated init elsewhere
};

// An RAII class for managing thread pinning
// Only one instance of this class may be active at a time (this is enforced by the implementation)
// Threads pinned by this class will be unpinned when the instance is destroyed
class thread_pinner {
public:
thread_pinner(const runtime_configuration& cfg);
~thread_pinner();
thread_pinner(const thread_pinner&) = delete;
thread_pinner& operator=(const thread_pinner&) = delete;
thread_pinner(thread_pinner&&) = default;
thread_pinner& operator=(thread_pinner&&) = default;
};

// Pins the invoking thread of type `t_type` according to the current configuration
// This is a no-op if the thread pinning machinery is not currently initialized (by a `thread_pinner` instance)
void pin_this_thread(const thread_type t_type);

} // namespace celerity::detail::thread_pinning
5 changes: 4 additions & 1 deletion include/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <cstddef>
#include <optional>

#include "affinity.h"
#include "log.h"

namespace celerity {
Expand All @@ -26,6 +27,7 @@ namespace detail {

bool should_enable_device_profiling() const { return m_enable_device_profiling.value_or(m_tracy_mode == tracy_mode::full); }
bool should_use_backend_worker_threads() const { return m_enable_backend_worker_threads; }
const thread_pinning::environment_configuration& get_thread_pinning_config() const& { return m_thread_pinning_config; }
bool is_dry_run() const { return m_dry_run_nodes > 0; }
bool should_print_graphs() const { return m_should_print_graphs; }
bool should_record() const {
Expand All @@ -41,7 +43,8 @@ namespace detail {
log_level m_log_lvl;
std::optional<bool> m_enable_device_profiling;
bool m_enable_backend_worker_threads = true;
size_t m_dry_run_nodes = 0;
thread_pinning::environment_configuration m_thread_pinning_config;
int m_dry_run_nodes = 0;
bool m_should_print_graphs = false;
std::optional<int> m_horizon_step;
std::optional<int> m_horizon_max_parallelism;
Expand Down
6 changes: 6 additions & 0 deletions include/runtime.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ namespace detail {
class reducer;
struct host_object_instance;

namespace thread_pinning {
class thread_pinner;
}

class runtime final : private task_manager::delegate, private scheduler::delegate, private executor::delegate {
friend struct runtime_testspy;

Expand Down Expand Up @@ -117,6 +121,8 @@ namespace detail {
std::unique_ptr<detail::command_recorder> m_command_recorder; // accessed only by scheduler thread (until shutdown)
std::unique_ptr<detail::instruction_recorder> m_instruction_recorder; // accessed only by scheduler thread (until shutdown)

std::unique_ptr<detail::thread_pinning::thread_pinner> m_thread_pinner; // thread safe, manages lifetime of thread pinning machinery

runtime(int* argc, char** argv[], const devices_or_selector& user_devices_or_selector);

/// Panic when not called from m_application_thread (see that variable for more info on the matter). Since there are thread-safe and non thread-safe
Expand Down
75 changes: 75 additions & 0 deletions src/affinity.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
#pragma once

// non-platform-specific code for thread pinning

#include "affinity.h"
#include "log.h"

#include <cstdint>
#include <ranges>
#include <string_view>
#include <vector>

#include <libenvpp/env.hpp>

namespace celerity::detail::thread_pinning {

std::string thread_type_to_string(const thread_type t_type) {
switch(t_type) {
case thread_type::user: return "user";
case thread_type::scheduler: return "scheduler";
case thread_type::executor: return "executor";
default: break;
}
if(t_type >= thread_type::first_backend_worker && t_type < thread_type::first_host_queue) {
return fmt::format("backend_worker_{}", t_type - thread_type::first_backend_worker);
}
if(t_type >= thread_type::first_host_queue) { return fmt::format("host_queue_{}", t_type - thread_type::first_host_queue); }
return "unknown";
}

environment_configuration parse_validate_env(const std::string_view str) {
using namespace std::string_view_literals;
constexpr const char* error_msg =
"Cannot parse CELERITY_THREAD_PINNING setting, needs to be either 'auto', 'from:#', comma-separated core list, or bool: {}";

if(str.empty()) return {};

// "auto" case
constexpr uint32_t auto_start_from_core = 1;
if(str == "auto") { return {true, auto_start_from_core, {}}; }

// "from:" case
constexpr auto from_prefix = "from:"sv;
if(str.starts_with(from_prefix)) {
try {
const auto from = env::default_parser<uint32_t>{}(std::string(str.substr(from_prefix.size())));
return {true, from, {}};
} catch(const env::parser_error& e) {
CELERITY_ERROR(error_msg, e.what());
return {};
}
}

// core list case
if(str.find(',') != std::string::npos) {
std::vector<uint32_t> core_ids;
for(auto cs : std::views::split(str, ","sv)) {
try {
core_ids.push_back(env::default_parser<uint32_t>{}(std::string(cs.begin(), cs.end())));
} catch(const env::parser_error& e) {
CELERITY_ERROR(error_msg, e.what());
return {};
}
}
return {true, 0, core_ids};
}

// if all else fails, assume we have a boolean
try {
return {env::default_parser<bool>{}(str), auto_start_from_core, {}};
} catch(const env::parser_error& e) { CELERITY_ERROR(error_msg, e.what()); }
return {};
}

} // namespace celerity::detail::thread_pinning
4 changes: 2 additions & 2 deletions src/backend/sycl_backend.cc
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include "backend/sycl_backend.h"

#include "affinity.h"
#include "closure_hydrator.h"
#include "dense_map.h"
#include "nd_memory.h"
Expand Down Expand Up @@ -168,8 +169,8 @@ sycl_backend::sycl_backend(const std::vector<sycl::device>& devices, const confi
m_impl->devices[did].worker_thread.emplace(fmt::format("cy-be-worker-{}", did.value), m_impl->config.profiling);
// no need to wait for the event -> will happen before the first task is submitted
(void)m_impl->devices[did].worker_thread->submit([did] {
thread_pinning::pin_this_thread(thread_pinning::thread_type(thread_pinning::thread_type::first_backend_worker + did.value));
closure_hydrator::make_available();
// TODO: Set BE worker thread affinity
});
}
}
Expand All @@ -194,7 +195,6 @@ void sycl_backend::init() {
for(device_id did = 0; did < m_impl->system.devices.size(); ++did) {
(void)m_impl->get_device_queue(did, 0 /* lane */);
}
// TODO: Set executor thread affinity
}

void* sycl_backend::debug_alloc(const size_t size) {
Expand Down
5 changes: 4 additions & 1 deletion src/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ namespace detail {
"LOG_LEVEL", {log_level::trace, log_level::debug, log_level::info, log_level::warn, log_level::err, log_level::critical, log_level::off});
const auto env_profile_kernel = pref.register_variable<bool>("PROFILE_KERNEL", parse_validate_profile_kernel);
const auto env_backend_worker_threads = pref.register_variable<bool>("BACKEND_WORKER_THREADS");
const auto env_thread_pinning = pref.register_variable<thread_pinning::environment_configuration>("THREAD_PINNING", thread_pinning::parse_validate_env);
const auto env_print_graphs = pref.register_variable<bool>("PRINT_GRAPHS");
const auto env_dry_run_nodes = pref.register_variable<size_t>("DRY_RUN_NODES", parse_validate_dry_run_nodes);
constexpr int horizon_max = 1024 * 64;
Expand Down Expand Up @@ -121,8 +122,10 @@ namespace detail {
}
#endif // CELERITY_WORKAROUND(SIMSYCL)

m_thread_pinning_config = parsed_and_validated_envs.get_or(env_thread_pinning, {});

const auto has_dry_run_nodes = parsed_and_validated_envs.get(env_dry_run_nodes);
if(has_dry_run_nodes) { m_dry_run_nodes = *has_dry_run_nodes; }
if(has_dry_run_nodes) { m_dry_run_nodes = static_cast<int>(*has_dry_run_nodes); }

m_should_print_graphs = parsed_and_validated_envs.get_or(env_print_graphs, false);

Expand Down
4 changes: 4 additions & 0 deletions src/live_executor.cc
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include "live_executor.h"
#include "affinity.h"
#include "backend/backend.h"
#include "closure_hydrator.h"
#include "communicator.h"
Expand Down Expand Up @@ -950,6 +951,9 @@ void live_executor::submit(std::vector<const instruction*> instructions, std::ve

void live_executor::thread_main(std::unique_ptr<backend> backend, executor::delegate* const dlg, const policy_set& policy) {
CELERITY_DETAIL_TRACY_SET_THREAD_NAME_AND_ORDER("cy-executor", tracy_detail::thread_order::executor);

thread_pinning::pin_this_thread(thread_pinning::thread_type::executor);

try {
live_executor_detail::executor_impl(std::move(backend), m_root_comm.get(), m_submission_queue, dlg, policy).run();
}
Expand Down
Loading

0 comments on commit 0c1ae63

Please sign in to comment.