Skip to content

Commit

Permalink
Detect CPU topology
Browse files Browse the repository at this point in the history
PiperOrigin-RevId: 630388690
  • Loading branch information
jan-wassenberg authored and copybara-github committed May 3, 2024
1 parent 6e10ea6 commit 45404a4
Show file tree
Hide file tree
Showing 3 changed files with 362 additions and 55 deletions.
278 changes: 259 additions & 19 deletions hwy/contrib/thread_pool/topology.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,14 @@

#include "hwy/contrib/thread_pool/topology.h"

#include <stddef.h>
#include <stdint.h>
#include <stdio.h>

#include <map>
#include <thread> // NOLINT
#include <vector>

#include "hwy/detect_compiler_arch.h" // HWY_OS_WIN

#if HWY_OS_WIN
Expand Down Expand Up @@ -46,10 +54,11 @@
#include <emscripten/threading.h>
#endif

#include <stddef.h>
#include <stdio.h>

#include <thread> // NOLINT
#if HWY_OS_LINUX
#include <errno.h>
#include <fcntl.h>
#include <sys/stat.h>
#endif // HWY_OS_LINUX

#include "hwy/base.h"

Expand All @@ -64,26 +73,38 @@ HWY_DLLEXPORT bool HaveThreadingSupport() {
}

HWY_DLLEXPORT size_t TotalLogicalProcessors() {
size_t lp = 0;
#if HWY_ARCH_WASM
const int lp = emscripten_num_logical_cores();
HWY_ASSERT(lp < static_cast<int>(kMaxLogicalProcessors));
if (lp > 0) return static_cast<size_t>(lp);
const int num_cores = emscripten_num_logical_cores();
if (num_cores > 0) lp = static_cast<size_t>(num_cores);
#else
const unsigned lp = std::thread::hardware_concurrency();
HWY_ASSERT(lp < static_cast<unsigned>(kMaxLogicalProcessors));
if (lp != 0) return static_cast<size_t>(lp);
const unsigned concurrency = std::thread::hardware_concurrency();
if (concurrency != 0) lp = static_cast<size_t>(concurrency);
#endif

// WASM or C++ stdlib failed.
if (HWY_IS_DEBUG_BUILD) {
fprintf(
stderr,
"Unknown TotalLogicalProcessors. HWY_OS_: WIN=%d LINUX=%d APPLE=%d;\n"
"HWY_ARCH_: WASM=%d X86=%d PPC=%d ARM=%d RISCV=%d S390X=%d\n",
HWY_OS_WIN, HWY_OS_LINUX, HWY_OS_APPLE, HWY_ARCH_WASM, HWY_ARCH_X86,
HWY_ARCH_PPC, HWY_ARCH_ARM, HWY_ARCH_RISCV, HWY_ARCH_S390X);
// WASM or C++ stdlib failed to detect #CPUs.
if (lp == 0) {
if (HWY_IS_DEBUG_BUILD) {
fprintf(
stderr,
"Unknown TotalLogicalProcessors. HWY_OS_: WIN=%d LINUX=%d APPLE=%d;\n"
"HWY_ARCH_: WASM=%d X86=%d PPC=%d ARM=%d RISCV=%d S390X=%d\n",
HWY_OS_WIN, HWY_OS_LINUX, HWY_OS_APPLE, HWY_ARCH_WASM, HWY_ARCH_X86,
HWY_ARCH_PPC, HWY_ARCH_ARM, HWY_ARCH_RISCV, HWY_ARCH_S390X);
}
return 1;
}

// Warn that we are clamping.
if (lp > kMaxLogicalProcessors) {
if (HWY_IS_DEBUG_BUILD) {
fprintf(stderr, "OS reports %zu processors but clamping to %zu\n", lp,
kMaxLogicalProcessors);
}
lp = kMaxLogicalProcessors;
}
return 1;

return lp;
}

#ifdef __ANDROID__
Expand Down Expand Up @@ -169,4 +190,223 @@ HWY_DLLEXPORT bool SetThreadAffinity(const LogicalProcessorSet& lps) {
#endif
}

#if HWY_OS_LINUX

class File {
public:
explicit File(const char* path) {
for (;;) {
fd_ = open(path, O_RDONLY);
if (fd_ > 0) return; // success
if (errno == EINTR) continue; // signal: retry
if (errno == ENOENT) return; // not found, give up
if (HWY_IS_DEBUG_BUILD) {
fprintf(stderr, "Unexpected error opening %s: %d\n", path, errno);
}
return; // unknown error, give up
}
}

~File() {
if (fd_ > 0) {
for (;;) {
const int ret = close(fd_);
if (ret == 0) break; // success
if (errno == EINTR) continue; // signal: retry
if (HWY_IS_DEBUG_BUILD) {
fprintf(stderr, "Unexpected error closing file: %d\n", errno);
}
return; // unknown error, ignore
}
}
}

// Returns number of bytes read or 0 on failure.
size_t Read(char* buf200) const {
if (fd_ < 0) return 0;
size_t pos = 0;
for (;;) {
// read instead of pread, which might not work for sysfs.
const auto bytes_read = read(fd_, buf200 + pos, 200 - pos);
if (bytes_read == 0) { // EOF: done
buf200[pos++] = '\0';
return pos;
}
if (bytes_read == -1) {
if (errno == EINTR) continue; // signal: retry
if (HWY_IS_DEBUG_BUILD) {
fprintf(stderr, "Unexpected error reading file: %d\n", errno);
}
return 0;
}
pos += bytes_read;
HWY_ASSERT(pos <= 200);
}
}

private:
int fd_;
};

bool SimpleAtoi(const char* str, size_t len, size_t* out) {
size_t value = 0;
size_t digits_seen = 0;
// 9 digits cannot overflow even 32-bit size_t.
for (size_t i = 0; i < HWY_MIN(len, 9); ++i) {
if (str[i] < '0' || str[i] > '9') break;
value *= 10;
value += str[i] - '0';
++digits_seen;
}
if (digits_seen == 0) {
*out = 0;
return false;
}
*out = value;
return true;
}

bool ReadSysfs(const char* format, size_t lp, size_t max, size_t* out) {
char path[200];
const int bytes_written = snprintf(path, sizeof(path), format, lp);
HWY_ASSERT(0 < bytes_written && bytes_written < sizeof(path) - 1);

const File file(path);
char buf200[200];
const size_t pos = file.Read(buf200);
if (pos == 0) return false;

if (!SimpleAtoi(buf200, pos, out)) return false;
if (*out > max) {
if (HWY_IS_DEBUG_BUILD) {
fprintf(stderr, "Value for %s = %zu > %zu\n", path, *out, max);
}
return false;
}
return true;
}

// Given a set of opaque values, returns a map of each value to the number of
// values that precede it. Used to generate contiguous arrays in the same order.
std::map<size_t, size_t> RanksFromSet(const BitSet4096<>& set) {
HWY_ASSERT(set.Count() != 0);
std::map<size_t, size_t> ranks;
size_t num = 0;
set.Foreach([&ranks, &num](size_t opaque) {
const bool inserted = ranks.insert({opaque, num++}).second;
HWY_ASSERT(inserted);
});
HWY_ASSERT(num == set.Count());
HWY_ASSERT(num == ranks.size());
return ranks;
}

const char* kPackage =
"/sys/devices/system/cpu/cpu%zu/topology/physical_package_id";
const char* kCluster = "/sys/devices/system/cpu/cpu%zu/cache/index3/id";
const char* kCore = "/sys/devices/system/cpu/cpu%zu/topology/core_id";

// Returns 0 on error, or number of packages and initializes LP::package.
size_t DetectPackages(std::vector<Topology::LP>& lps) {
// Packages are typically an index, but could actually be an arbitrary value.
// We assume they do not exceed kMaxLogicalProcessors and thus fit in a set.
BitSet4096<> package_set;
for (size_t lp = 0; lp < lps.size(); ++lp) {
size_t package;
if (!ReadSysfs(kPackage, lp, kMaxLogicalProcessors, &package)) return 0;
package_set.Set(package);
// Storing a set of lps belonging to each package requires more space/time
// than storing the package per lp and remapping below.
lps[lp].package = static_cast<uint16_t>(package);
}
HWY_ASSERT(package_set.Count() != 0);

// Remap the per-lp package to their rank.
std::map<size_t, size_t> ranks = RanksFromSet(package_set);
for (size_t lp = 0; lp < lps.size(); ++lp) {
lps[lp].package = ranks[lps[lp].package];
}
return ranks.size();
}

// Stores the global cluster/core values separately for each package so we can
// return per-package arrays.
struct PerPackage {
// Use maximum possible set size because Arm values can exceed 1k.
BitSet4096<> cluster_set;
BitSet4096<> core_set;

std::map<size_t, size_t> smt_per_core;

size_t num_clusters;
size_t num_cores;
};

// Returns false, or fills per_package and initializes LP::cluster/core/smt.
bool DetectClusterAndCore(std::vector<Topology::LP>& lps,
std::vector<PerPackage>& per_package) {
for (size_t lp = 0; lp < lps.size(); ++lp) {
PerPackage& pp = per_package[lps[lp].package];
size_t cluster, core;
if (!ReadSysfs(kCluster, lp, kMaxLogicalProcessors, &cluster)) return false;
if (!ReadSysfs(kCore, lp, kMaxLogicalProcessors, &core)) return false;

// SMT ID is how many LP we have already seen assigned to the same core.
const size_t smt = pp.smt_per_core[core]++;
HWY_ASSERT(smt < 16);
lps[lp].smt = static_cast<uint8_t>(smt); // already contiguous

// Certainly core, and likely also cluster, are HW-dependent opaque values.
// We assume they do not exceed kMaxLogicalProcessors and thus fit in a set.
pp.cluster_set.Set(cluster);
pp.core_set.Set(core);
// Temporary storage for remapping as in DetectPackages.
lps[lp].cluster = static_cast<uint16_t>(cluster);
lps[lp].core = static_cast<uint16_t>(core);
}

for (size_t p = 0; p < per_package.size(); ++p) {
PerPackage& pp = per_package[p];
std::map<size_t, size_t> cluster_ranks = RanksFromSet(pp.cluster_set);
std::map<size_t, size_t> core_ranks = RanksFromSet(pp.core_set);
// Remap *this packages'* per-lp cluster/core to their ranks.
for (size_t lp = 0; lp < lps.size(); ++lp) {
if (lps[lp].package != p) continue;
lps[lp].cluster = cluster_ranks[lps[lp].cluster];
lps[lp].core = core_ranks[lps[lp].core];
}
pp.num_clusters = cluster_ranks.size();
pp.num_cores = core_ranks.size();
}

return true;
}

#endif // HWY_OS_LINUX

HWY_DLLEXPORT Topology::Topology() {
#if HWY_OS_LINUX
lps.resize(TotalLogicalProcessors());

const size_t num_packages = DetectPackages(lps);
if (num_packages == 0) return;
std::vector<PerPackage> per_package(num_packages);
if (!DetectClusterAndCore(lps, per_package)) return;

// Allocate per-package/cluster/core vectors. This indicates to callers that
// detection succeeded.
packages.resize(num_packages);
for (size_t p = 0; p < num_packages; ++p) {
packages[p].clusters.resize(per_package[p].num_clusters);
packages[p].cores.resize(per_package[p].num_cores);
}

// Populate the per-cluster/core sets of LP.
for (size_t lp = 0; lp < lps.size(); ++lp) {
packages[lps[lp].package].clusters[lps[lp].cluster].lps.Set(lp);
packages[lps[lp].package].cores[lps[lp].core].lps.Set(lp);
}
#endif
}

} // namespace hwy
Loading

0 comments on commit 45404a4

Please sign in to comment.