Skip to content
This repository has been archived by the owner on Aug 30, 2024. It is now read-only.

Commit

Permalink
remove threading code in ne_layers
Browse files Browse the repository at this point in the history
  • Loading branch information
luoyu-intel committed Mar 6, 2024
1 parent d0e5e30 commit 1722f1c
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 285 deletions.
3 changes: 2 additions & 1 deletion neural_speed/core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,9 @@ endif()

if(NOT WIN32)
target_link_libraries(ne_layers PUBLIC rt)
else()
target_link_options(ne_layers PUBLIC /STACK:5242880)
endif()
target_link_options(ne_layers PUBLIC /STACK:5242880)

if (NS_BUILD_TESTS)

Expand Down
286 changes: 2 additions & 284 deletions neural_speed/core/ne_layers.c
Original file line number Diff line number Diff line change
Expand Up @@ -11131,171 +11131,11 @@ struct ne_cgraph ne_build_backward(struct ne_context* ctx, struct ne_cgraph* gf,
// I tried using spin locks, but not sure how to use them correctly - the things I tried were slower than busy loops
//

#ifdef __APPLE__

// #include <os/lock.h>
//
// typedef os_unfair_lock ne_lock_t;
//
// #define ne_lock_init(x) UNUSED(x)
// #define ne_lock_destroy(x) UNUSED(x)
// #define ne_lock_lock os_unfair_lock_lock
// #define ne_lock_unlock os_unfair_lock_unlock
//
// #define NE_LOCK_INITIALIZER OS_UNFAIR_LOCK_INIT

typedef int ne_lock_t;

#define ne_lock_init(x) UNUSED(x)
#define ne_lock_destroy(x) UNUSED(x)
#define ne_lock_lock(x) UNUSED(x)
#define ne_lock_unlock(x) UNUSED(x)

#define NE_LOCK_INITIALIZER 0

typedef pthread_t ne_thread_t;

#define ne_thread_create pthread_create
#define ne_thread_join pthread_join

#else

// typedef pthread_spinlock_t ne_lock_t;

// #define ne_lock_init(x) pthread_spin_init(x, PTHREAD_PROCESS_PRIVATE)
// #define ne_lock_destroy pthread_spin_destroy
// #define ne_lock_lock pthread_spin_lock
// #define ne_lock_unlock pthread_spin_unlock

typedef int ne_lock_t;

#define ne_lock_init(x) UNUSED(x)
#define ne_lock_destroy(x) UNUSED(x)
#if defined(__x86_64__) || (defined(_MSC_VER) && defined(_M_AMD64))
#define ne_lock_lock(x) _mm_pause()
#else
#define ne_lock_lock(x) UNUSED(x)
#endif
#define ne_lock_unlock(x) UNUSED(x)

#define NE_LOCK_INITIALIZER 0

typedef pthread_t ne_thread_t;

#define ne_thread_create pthread_create
#define ne_thread_join pthread_join

#endif

struct ne_compute_state_shared {
ne_lock_t spin;

int n_threads;

// synchronization primitives
atomic_int n_ready;
atomic_bool has_work;
atomic_bool stop; // stop all threads
};

struct ne_compute_state {
ne_thread_t thrd;

struct ne_compute_params params;
struct ne_tensor* node;

struct ne_compute_state_shared* shared;
};

static thread_ret_t ne_graph_compute_thread(void* data) {
struct ne_compute_state* state = (struct ne_compute_state*)data;

const int n_threads = state->shared->n_threads;

while (true) {
if (atomic_fetch_add(&state->shared->n_ready, 1) == n_threads - 1) {
atomic_store(&state->shared->has_work, false);
} else {
while (atomic_load(&state->shared->has_work)) {
if (atomic_load(&state->shared->stop)) {
return 0;
}
ne_lock_lock(&state->shared->spin);
ne_lock_unlock(&state->shared->spin);
}
}

atomic_fetch_sub(&state->shared->n_ready, 1);

// wait for work
while (!atomic_load(&state->shared->has_work)) {
if (atomic_load(&state->shared->stop)) {
return 0;
}
ne_lock_lock(&state->shared->spin);
ne_lock_unlock(&state->shared->spin);
}

// check if we should stop
if (atomic_load(&state->shared->stop)) {
break;
}

if (state->node) {
if (state->params.ith < state->params.nth) {
ne_compute_forward(&state->params, state->node);
}

state->node = NULL;
} else {
break;
}
}

return 0;
}

void ne_graph_compute(struct ne_context* ctx, struct ne_cgraph* cgraph) {
int n_threads = cgraph->n_threads;

struct ne_compute_state_shared state_shared = {
/*.spin =*/NE_LOCK_INITIALIZER,
/*.n_threads =*/n_threads,
/*.n_ready =*/0,
/*.has_work =*/false,
/*.stop =*/false,
};
struct ne_compute_state* workers = n_threads > 1 ? alloca(sizeof(struct ne_compute_state) * (n_threads - 1)) : NULL;
#ifndef _OPENMP
// create thread pool
if (n_threads > 1) {
ne_lock_init(&state_shared.spin);

atomic_store(&state_shared.has_work, true);

for (int j = 0; j < n_threads - 1; j++) {
workers[j] = (struct ne_compute_state){
.thrd = 0,
.params =
{
.type = NE_TASK_COMPUTE,
.ith = j + 1,
.nth = n_threads,
.wsize = cgraph->work ? ne_nbytes(cgraph->work) : 0,
.wdata = cgraph->work ? cgraph->work->data : NULL,
},
.node = NULL,
.shared = &state_shared,
};

int rc = ne_thread_create(&workers[j].thrd, NULL, ne_graph_compute_thread, &workers[j]);
NE_ASSERT(rc == 0);
UNUSED(rc);
}
}
#else
n_threads = bestla_set_threads(n_threads); // prevent from using two sockets
#endif
n_threads = bestla_set_threads(n_threads);
// initialize tasks + work buffer
{
size_t work_size = 0;
Expand Down Expand Up @@ -11598,7 +11438,6 @@ void ne_graph_compute(struct ne_context* ctx, struct ne_cgraph* cgraph) {
#if NE_DEBUG
bestla_timer(true);
#endif
#ifndef _OPENMP
// INIT
struct ne_compute_params params = {
/*.type =*/NE_TASK_INIT,
Expand All @@ -11608,128 +11447,7 @@ void ne_graph_compute(struct ne_context* ctx, struct ne_cgraph* cgraph) {
/*.wdata =*/cgraph->work ? cgraph->work->data : NULL,
};

ne_compute_forward(&params, node);

// COMPUTE
if (node->n_tasks > 1) {
if (atomic_fetch_add(&state_shared.n_ready, 1) == n_threads - 1) {
atomic_store(&state_shared.has_work, false);
}

while (atomic_load(&state_shared.has_work)) {
ne_lock_lock(&state_shared.spin);
ne_lock_unlock(&state_shared.spin);
}

// launch thread pool
for (int j = 0; j < n_threads - 1; j++) {
workers[j].params = (struct ne_compute_params){
.type = NE_TASK_COMPUTE,
.ith = j + 1,
.nth = node->n_tasks,
.wsize = cgraph->work ? ne_nbytes(cgraph->work) : 0,
.wdata = cgraph->work ? cgraph->work->data : NULL,
};
workers[j].node = node;
}

atomic_fetch_sub(&state_shared.n_ready, 1);

while (atomic_load(&state_shared.n_ready) > 0) {
ne_lock_lock(&state_shared.spin);
ne_lock_unlock(&state_shared.spin);
}

atomic_store(&state_shared.has_work, true);
}

params.type = NE_TASK_COMPUTE;
ne_compute_forward(&params, node);

// wait for thread pool
if (node->n_tasks > 1) {
if (atomic_fetch_add(&state_shared.n_ready, 1) == n_threads - 1) {
atomic_store(&state_shared.has_work, false);
}

while (atomic_load(&state_shared.has_work)) {
ne_lock_lock(&state_shared.spin);
ne_lock_unlock(&state_shared.spin);
}

atomic_fetch_sub(&state_shared.n_ready, 1);

while (atomic_load(&state_shared.n_ready) != 0) {
ne_lock_lock(&state_shared.spin);
ne_lock_unlock(&state_shared.spin);
}
}
// FINALIZE
if (node->n_tasks > 1) {
if (atomic_fetch_add(&state_shared.n_ready, 1) == n_threads - 1) {
atomic_store(&state_shared.has_work, false);
}

while (atomic_load(&state_shared.has_work)) {
ne_lock_lock(&state_shared.spin);
ne_lock_unlock(&state_shared.spin);
}

// launch thread pool
for (int j = 0; j < n_threads - 1; j++) {
workers[j].params = (struct ne_compute_params){
.type = NE_TASK_FINALIZE,
.ith = j + 1,
.nth = node->n_tasks,
.wsize = cgraph->work ? ne_nbytes(cgraph->work) : 0,
.wdata = cgraph->work ? cgraph->work->data : NULL,
};
workers[j].node = node;
}

atomic_fetch_sub(&state_shared.n_ready, 1);

while (atomic_load(&state_shared.n_ready) > 0) {
ne_lock_lock(&state_shared.spin);
ne_lock_unlock(&state_shared.spin);
}

atomic_store(&state_shared.has_work, true);
}

params.type = NE_TASK_FINALIZE;
ne_compute_forward(&params, node);

// wait for thread pool
if (node->n_tasks > 1) {
if (atomic_fetch_add(&state_shared.n_ready, 1) == n_threads - 1) {
atomic_store(&state_shared.has_work, false);
}

while (atomic_load(&state_shared.has_work)) {
ne_lock_lock(&state_shared.spin);
ne_lock_unlock(&state_shared.spin);
}

atomic_fetch_sub(&state_shared.n_ready, 1);

while (atomic_load(&state_shared.n_ready) != 0) {
ne_lock_lock(&state_shared.spin);
ne_lock_unlock(&state_shared.spin);
}
}
#else
// INIT
struct ne_compute_params params = {
/*.type =*/NE_TASK_INIT,
/*.ith =*/0,
/*.nth =*/node->n_tasks,
/*.wsize =*/cgraph->work ? ne_nbytes(cgraph->work) : 0,
/*.wdata =*/cgraph->work ? cgraph->work->data : NULL,
};

bestla_parallel_for(ne_compute_forward,&params,node);
#endif
bestla_parallel_for(ne_compute_forward, &params, node);
#if NE_DEBUG
printf("Node %d ", node->op);
bestla_timer(false);
Expand Down

0 comments on commit 1722f1c

Please sign in to comment.