Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement thread affinity #114

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions misc/testProgs/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Common deps
DEPS=Makefile

default: test_affinity

test_affinity: $(DEPS) test-affinity.cpp
g++ -O3 -g -o test_affinity test-affinity.cpp -pthread

run_tests: default
./test_affinity

clean:
rm -f *.o *.so *.a test_*

.PHONY: clean default

133 changes: 133 additions & 0 deletions misc/testProgs/test-affinity.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
#include <stdint.h>
#include <stdio.h>
#include "unistd.h"
#include "pthread.h"
#include "sched.h"
#include "sys/syscall.h"
#include "sys/types.h"

static inline uint32_t get_cpuid() {
return sched_getcpu();
}

// Thread excutes different numbers of instructions based on its thread id.
// Check zsim.out for instruction counts on the pinning cores.
static uint64_t dummy_compute(uint64_t amount) {
uint64_t ret = 0;
const uint64_t amplify = 1uL << 23;
for (uint64_t i = 0; i < amount * amplify; i++) ret += i;
return ret;
}

struct thread_arg_t {
pid_t* pids;
pthread_barrier_t* bar;
int tid;
uint64_t ret;
};

static void* thread_function(void* th_args) {
thread_arg_t* args = (thread_arg_t*)th_args;
int tid = args->tid;
pid_t* pids = args->pids;
pthread_barrier_t* bar = args->bar;

printf("Thread %d: start on core %u\n", tid, get_cpuid());

pids[tid] = syscall(SYS_gettid);

pthread_barrier_wait(bar);

// syscall affinity API.
cpu_set_t set;
CPU_ZERO(&set);
CPU_SET(tid + 4, &set);
sched_setaffinity(0, sizeof(cpu_set_t), &set);

CPU_ZERO(&set);
sched_getaffinity(0, sizeof(cpu_set_t), &set);
for (int i = 0; i < (int)sizeof(cpu_set_t)*8; i++) {
if (CPU_ISSET(i, &set)) printf("Thread %d: could run on core %d\n", tid, i);
}
printf("Thread %d: actual running on core %u\n", tid, get_cpuid());

args->ret = dummy_compute(tid);

if (pthread_barrier_wait(bar) == PTHREAD_BARRIER_SERIAL_THREAD) {
printf("Round 1 done.\n");
}


// Pthread affinity API.
// Use dynamically sized cpu set.
cpu_set_t* pset = CPU_ALLOC(2048);
size_t size = CPU_ALLOC_SIZE(2048);
CPU_ZERO_S(size, pset);
CPU_SET_S(tid + 8, size, pset);
pthread_setaffinity_np(pthread_self(), size, pset);

CPU_ZERO_S(size, pset);
pthread_getaffinity_np(pthread_self(), size, pset);
for (int i = 0; i < (int)size*8; i++) {
if (CPU_ISSET_S(i, size, pset)) printf("Thread %d: could run on core %d\n", tid, i);
}
printf("Thread %d: actual running on core %u\n", tid, get_cpuid());
CPU_FREE(pset);

args->ret = dummy_compute(tid);

if (pthread_barrier_wait(bar) == PTHREAD_BARRIER_SERIAL_THREAD) {
printf("Round 2 done.\n");
}


// Set affinity for others.
CPU_ZERO(&set);
CPU_SET(tid + 12, &set);
sched_setaffinity(pids[(tid+2)%4], sizeof(cpu_set_t), &set);

// Wait on barrier to ensure affinity has been set.
pthread_barrier_wait(bar);

CPU_ZERO(&set);
sched_getaffinity(pids[(tid+2)%4], sizeof(cpu_set_t), &set);
for (int i = 0; i < (int)sizeof(cpu_set_t)*8; i++) {
if (CPU_ISSET(i, &set)) printf("Thread %d: could run on core %d\n", (tid+2)%4, i);
}
printf("Thread %d: actual running on core %u\n", tid, get_cpuid());

args->ret = dummy_compute(tid);

if (pthread_barrier_wait(bar) == PTHREAD_BARRIER_SERIAL_THREAD) {
printf("Round 3 done.\n");
}

return NULL;
}

int main() {
printf("zsim sched_get/setaffinity test\n");
printf("sizeof(cpu_set_t) == %lu\n", sizeof(cpu_set_t));

pthread_t threads[4];
pthread_barrier_t barrier;
pthread_barrier_init(&barrier, NULL, 4);
pid_t pids[4];
thread_arg_t thread_args[4];
for (uint32_t tid = 0; tid < 4; tid++) {
thread_args[tid].pids = pids;
thread_args[tid].bar = &barrier;
thread_args[tid].tid = tid;
pthread_create(&threads[tid], NULL, thread_function, &thread_args[tid]);
}
for (uint32_t tid = 0; tid < 4; tid++) {
pthread_join(threads[tid], NULL);
}
pthread_barrier_destroy(&barrier);

printf("zsim sched_get/setaffinity test done\n");

return 0;
}


45 changes: 40 additions & 5 deletions src/cpuenum.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
/* Small routines for core enumeration */

#include "process_tree.h"
#include "scheduler.h"
#include "zsim.h"

inline uint32_t cpuenumNumCpus(uint32_t pid) {
Expand All @@ -43,19 +44,53 @@ inline uint32_t cpuenumNumCpus(uint32_t pid) {
}
}

inline std::vector<bool> cpuenumMask(uint32_t pid) {
// Returns the per-thread cpu mask (from scheduler), taking care of per-process cpuenum.
inline std::vector<bool> cpuenumMask(uint32_t pid, uint32_t tid) {
std::vector<bool> res;
const g_vector<bool>& processMask = zinfo->procArray[pid]->getMask();
const g_vector<bool>& schedMask = zinfo->sched->getMask(pid, tid);
if (zinfo->perProcessCpuEnum) {
res.resize(cpuenumNumCpus(pid));
for (uint32_t i = 0; i < res.size(); i++) res[i] = true;
uint32_t perProcCid = 0;
for (uint32_t i = 0; i < schedMask.size(); i++) {
if (processMask[i]) {
res[perProcCid] = schedMask[i];
perProcCid++;
}
}
assert(perProcCid == cpuenumNumCpus(pid));
} else {
const g_vector<bool>& mask = zinfo->procArray[pid]->getMask();
res.resize(mask.size());
for (uint32_t i = 0; i < res.size(); i++) res[i] = mask[i];
res.resize(schedMask.size());
for (uint32_t i = 0; i < res.size(); i++) res[i] = schedMask[i];
}
return res;
}

// Update the per-thread cpu mask, taking care of per-process cpuenum.
// Consistent with cpuenumMask().
inline void cpuenumUpdateMask(uint32_t pid, uint32_t tid, const std::vector<bool>& mask) {
const g_vector<bool>& processMask = zinfo->procArray[pid]->getMask();
g_vector<bool> schedMask(zinfo->numCores, false);
if (zinfo->perProcessCpuEnum) {
// Given mask is per-process enumerated.
uint32_t perProcCid = 0;
for (uint32_t i = 0; i < schedMask.size() && perProcCid < mask.size(); i++) {
if (processMask[i]) {
schedMask[i] = mask[perProcCid];
perProcCid++;
}
}
} else {
for (uint32_t i = 0; i < schedMask.size() && i < mask.size(); i++) {
if (mask[i]) {
assert_msg(processMask[i], "Thread mask must be within the process mask.");
schedMask[i] = true;
}
}
}
zinfo->sched->updateMask(pid, tid, schedMask);
}

// Returns the cpu that this cid is scheduled on, taking care of per-process cpuenum
// Can be called when app is fast-forwarding (cid == -1), it will return the first cpu
// that can run a thread from the specified pid
Expand Down
20 changes: 18 additions & 2 deletions src/scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -153,12 +153,19 @@ void Scheduler::watchdogThreadFunc() {

uint64_t pc = fl->pc;
do {
// NOTE(gaomy): to avoid race with join() if a thread is not actually blocked but just waiting too long (e.g., heavily loaded host).
// Because we finish fake leave and release lock before doing actual leave, the join could happen in between,
// when we haven't done leave.
th->flWord = 1;
finishFakeLeave(th);

futex_unlock(&schedLock);
leave(pid, tid, cid);
futex_lock(&schedLock);

th->flWord = 0;
syscall(SYS_futex, &th->flWord, FUTEX_WAKE, 1, nullptr, nullptr, 0);

// also do real leave for other threads blocked at the same pc ...
fl = fakeLeaves.front();
if (fl == nullptr || getPid(th->gid) != pid || fl->pc != pc)
Expand Down Expand Up @@ -281,8 +288,9 @@ void Scheduler::syscallLeave(uint32_t pid, uint32_t tid, uint32_t cid, uint64_t
assert_msg(pid < blockingSyscalls.size(), "%d >= %ld?", pid, blockingSyscalls.size());

bool blacklisted = blockingSyscalls[pid].find(pc) != blockingSyscalls[pid].end();
if (blacklisted || th->markedForSleep) {
DEBUG_FL("%s @ 0x%lx calling leave(), reason: %s", GetSyscallName(syscallNumber), pc, blacklisted? "blacklist" : "sleep");
if (blacklisted || th->markedForSleep || !th->mask[cid]) {
// (mgao): thread mask may be updated by SYS_sched_setaffinity syscall, in which case it must do true leave.
DEBUG_FL("%s @ 0x%lx calling leave(), reason: %s", GetSyscallName(syscallNumber), pc, blacklisted? "blacklist" : (th->markedForSleep ? "sleep" : "affinity"));
futex_unlock(&schedLock);
leave(pid, tid, cid);
} else {
Expand Down Expand Up @@ -397,6 +405,14 @@ void Scheduler::finishFakeLeave(ThreadInfo* th) {
}

void Scheduler::waitUntilQueued(ThreadInfo* th) {
if (th->linuxPid == syscall(SYS_getpid) && th->linuxTid == syscall(SYS_gettid)) {
// We are waiting for ourselves. Return immediately.
// This could happen when one thread is marked for sleep and gets into
// leave() and calls bar.leave(), which triggers end of phase and calls
// callback(), which wakes up sleeping threads, including itself, whose
// deadline is met.
return;
}
uint64_t startNs = getNs();
uint32_t sleepUs = 1;
while(!IsSleepingInFutex(th->linuxPid, th->linuxTid, (uintptr_t)&schedLock)) {
Expand Down
65 changes: 63 additions & 2 deletions src/scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ class Scheduler : public GlobAlloc, public Callee {
g_vector<bool> mask;

FakeLeaveInfo* fakeLeave; // for accurate join-leaves, see below
volatile uint32_t flWord; // if non-zero, currently transiting fake leave to true leave

FutexJoinInfo futexJoin;

Expand All @@ -124,6 +125,7 @@ class Scheduler : public GlobAlloc, public Callee {
for (auto b : mask) if (b) count++;
if (count == 0) panic("Empty mask on gid %d!", gid);
fakeLeave = nullptr;
flWord = 0;
futexJoin.action = FJA_NONE;
}
};
Expand Down Expand Up @@ -294,6 +296,14 @@ class Scheduler : public GlobAlloc, public Callee {
uint32_t cid = th->cid;
futex_unlock(&schedLock);
return cid;
} else if (th->flWord) {
// We are just finishing fake leave and transiting into true leave. Wait until done.
futex_unlock(&schedLock);
while (true) {
int futex_res = syscall(SYS_futex, &th->flWord, FUTEX_WAIT, 1, nullptr, nullptr, 0);
if (futex_res == 0 || th->futexWord != 1) break;
}
futex_lock(&schedLock);
}

assert(!th->markedForSleep);
Expand Down Expand Up @@ -372,6 +382,10 @@ class Scheduler : public GlobAlloc, public Callee {
schedule(inTh, ctx);
zinfo->cores[ctx->cid]->join(); //inTh does not do a sched->join, so we need to notify the core since we just called leave() on it
wakeup(inTh, false /*no join, we did not leave*/);
} else if (th->mask[th->cid] == false) {
deschedule(th, ctx, BLOCKED);
freeList.push_back(ctx);
bar.leave(cid); //may trigger end of phase
} else { //lazily transition to OUT, where we retain our context
th->state = OUT;
outQueue.push_back(th);
Expand Down Expand Up @@ -551,6 +565,50 @@ class Scheduler : public GlobAlloc, public Callee {

uint32_t getScheduledPid(uint32_t cid) const { return (contexts[cid].state == USED)? getPid(contexts[cid].curThread->gid) : (uint32_t)-1; }

const g_vector<bool> getMask(uint32_t pid, uint32_t tid) {
g_vector<bool> mask;
futex_lock(&schedLock);
uint32_t gid = getGid(pid, tid);
if(gidMap.find(gid) == gidMap.end()) {
futex_unlock(&schedLock);
warn("Scheduler::getMask(): can't find thread info pid=%d, tid=%d", pid, tid);
mask.resize(zinfo->numCores, true);
return mask;
}
ThreadInfo* th = gidMap[gid];
mask = th->mask;
futex_unlock(&schedLock);
return mask;
}

void updateMask(uint32_t pid, uint32_t tid, const g_vector<bool>& mask) {
futex_lock(&schedLock);
uint32_t gid = getGid(pid, tid);
if(gidMap.find(gid) == gidMap.end()) {
futex_unlock(&schedLock);
warn("Scheduler::updateMask(): can't find thread info pid=%d, tid=%d", pid, tid);
return;
}
ThreadInfo* th = gidMap[gid];
//info("Scheduler::updateMask(): update thread mask pid=%d, tid=%d", pid, tid);
assert(mask.size() == zinfo->numCores);
uint32_t count = 0;
for (auto b : mask) if (b) count++;
if (count == 0) panic("Empty mask on gid %d!", gid);
th->mask = mask;
futex_unlock(&schedLock);
// Do leave and join outside to clear and set cid in zsim.cpp
}

uint32_t getTidFromLinuxTid(uint32_t linuxTid) {
for (const auto& kv : gidMap) {
if (kv.second->linuxTid == linuxTid) {
return getTid(kv.first);
}
}
return -1;
}

private:
void schedule(ThreadInfo* th, ContextInfo* ctx) {
assert(th->state == STARTED || th->state == BLOCKED || th->state == QUEUED);
Expand Down Expand Up @@ -595,13 +653,16 @@ class Scheduler : public GlobAlloc, public Callee {
if (futex_res == 0 || th->futexWord != 1) break;
}
//info("%d out of sched wait, got cid = %d, needsJoin = %d", th->gid, th->cid, th->needsJoin);
// NOTE(mgao): After wakeup, waker assumes the wakee will wait on schedLock to join. See waitUntilQueued().
// So we should get the lock regardless of needsJoin.
futex_lock(&schedLock);
if (th->needsJoin) {
futex_lock(&schedLock);
assert(th->needsJoin); //re-check after the lock
//assert(th->needsJoin); //re-check after the lock
zinfo->cores[th->cid]->join();
bar.join(th->cid, &schedLock);
//info("%d join done", th->gid);
}
futex_unlock(&schedLock);
}

void wakeup(ThreadInfo* th, bool needsJoin) {
Expand Down
Loading