Skip to content

Commit

Permalink
ggml : synchronize using openmp barriers
Browse files Browse the repository at this point in the history
  • Loading branch information
slaren committed Jun 18, 2024
1 parent 91c188d commit 4886f50
Showing 1 changed file with 31 additions and 159 deletions.
190 changes: 31 additions & 159 deletions ggml.c
Original file line number Diff line number Diff line change
Expand Up @@ -1753,9 +1753,9 @@ struct ggml_compute_state_shared {
int n_threads;

// synchronization primitives
atomic_int n_active; // num active threads
atomic_int node_n; // active graph node
atomic_int node_task; // active graph node task phase
//atomic_int n_active; // num active threads
//atomic_int node_n; // active graph node
//atomic_int node_task; // active graph node task phase

ggml_abort_callback abort_callback; // abort ggml_graph_compute when true
void* abort_callback_data;
Expand Down Expand Up @@ -18972,184 +18972,60 @@ static int ggml_get_n_tasks(struct ggml_tensor * node, int n_threads, int n_cur_
return n_tasks;
}

static void ggml_graph_compute_thread_sync_node(int * node_n, struct ggml_compute_state * state, const bool do_yield) {
// wait for other threads to finish
const int last_node_n = * node_n;

while (true) {
if (do_yield) {
sched_yield();
}

*node_n = atomic_load(&state->shared->node_n);
if (*node_n != last_node_n) {
break;
}

#if defined(__SSE3__)
// Tell the processor we're spinning. It's a processor hint for spinlocks.
_mm_pause();
#endif
}
}

static void ggml_graph_compute_thread_sync_task(int * task_phase, struct ggml_compute_state * state, const bool do_yield) {
// wait for other threads to finish
const int last_task_phase = *task_phase;

while (true) {
if (do_yield) {
sched_yield();
}

*task_phase = atomic_load(&state->shared->node_task);
if (*task_phase != last_task_phase) {
break;
}

#if defined(__SSE3__)
// Tell the processor we're spinning. It's a processor hint for spinlocks.
_mm_pause();
#endif
}
}

static thread_ret_t ggml_graph_compute_thread(void * data) {
struct ggml_compute_state * state = (struct ggml_compute_state *) data;

const struct ggml_cgraph * cgraph = state->shared->cgraph;
const struct ggml_cplan * cplan = state->shared->cplan;

const int n_threads = state->shared->n_threads;
const int ith = state->ith;
const int n_threads = state->shared->n_threads;

set_numa_thread_affinity(state->ith);
set_numa_thread_affinity(ith);

int node_n = -1;
int task_phase = GGML_TASK_TYPE_FINALIZE;
struct ggml_compute_params params = {
/*.type =*/ GGML_TASK_TYPE_INIT,
/*.ith =*/ ith,
/*.nth =*/ state->shared->n_threads,
/*.wsize =*/ cplan->work_size,
/*.wdata =*/ cplan->work_data,
};

while (true) {
for (int node_n = 0; node_n < cgraph->n_nodes; node_n++) {
if (cplan->abort_callback && cplan->abort_callback(cplan->abort_callback_data)) {
state->shared->node_n += 1;
state->ec = GGML_STATUS_ABORTED;
return 0;
}

if (atomic_fetch_sub(&state->shared->n_active, 1) == 1) {
// all other threads are finished and spinning
// do finalize and init here so we don't have synchronize again
struct ggml_compute_params params = {
/*.type =*/ GGML_TASK_TYPE_FINALIZE,
/*.ith =*/ 0,
/*.nth =*/ 0,
/*.wsize =*/ cplan->work_size,
/*.wdata =*/ cplan->work_data,
};

if (node_n != -1) {
/* FINALIZE */
struct ggml_tensor * node = cgraph->nodes[node_n];
if (GGML_OP_HAS_FINALIZE[node->op]) {
params.nth = ggml_get_n_tasks(node, n_threads, state->shared->n_threads);
ggml_compute_forward(&params, node, state);
}
ggml_graph_compute_perf_stats_node(node, state->shared);
}

// distribute new work or execute it direct if 1T
while (++node_n < cgraph->n_nodes) {
GGML_PRINT_DEBUG_5("%s: %d/%d\n", __func__, node_n, cgraph->n_nodes);
struct ggml_tensor * node = cgraph->nodes[node_n];
const int n_tasks = ggml_get_n_tasks(node, n_threads, state->shared->n_threads);

state->shared->perf_node_start_cycles = ggml_perf_cycles();
state->shared->perf_node_start_time_us = ggml_perf_time_us();

params.nth = n_tasks;

if (n_tasks == 1) {
/* INIT */
if (GGML_OP_HAS_INIT[node->op]) {
params.type = GGML_TASK_TYPE_INIT;
ggml_compute_forward(&params, node, state);
}

// TODO: maybe push node_n to the atomic but if other threads see n_tasks is 1,
// they do something more efficient than spinning (?)
params.type = GGML_TASK_TYPE_COMPUTE;
ggml_compute_forward(&params, node, state);

if (GGML_OP_HAS_FINALIZE[node->op]) {
params.type = GGML_TASK_TYPE_FINALIZE;
ggml_compute_forward(&params, node, state);
}

ggml_graph_compute_perf_stats_node(node, state->shared);
} else {
break;
}

if (cplan->abort_callback && cplan->abort_callback(cplan->abort_callback_data)) {
break;
}
}

task_phase = GGML_TASK_TYPE_INIT;
atomic_store(&state->shared->n_active, n_threads);
atomic_store(&state->shared->node_n, node_n);
atomic_store(&state->shared->node_task, task_phase);
} else {
ggml_graph_compute_thread_sync_node(&node_n, state, false);
ggml_graph_compute_thread_sync_task(&task_phase, state, false);
}

// check if we should stop
if (node_n >= cgraph->n_nodes) break;

/* INIT & COMPUTE */
struct ggml_tensor * node = cgraph->nodes[node_n];
const int n_tasks = ggml_get_n_tasks(node, n_threads, state->shared->n_threads);

struct ggml_compute_params params = {
/*.type =*/ GGML_TASK_TYPE_INIT,
/*.ith =*/ state->ith,
/*.nth =*/ n_tasks,
/*.wsize =*/ cplan->work_size,
/*.wdata =*/ cplan->work_data,
};
params.nth = n_tasks;

if (state->ith < n_tasks) {
if (GGML_OP_HAS_INIT[node->op]) {
/* INIT */
if (GGML_OP_HAS_INIT[node->op]) {
if (ith < n_tasks) {
params.type = GGML_TASK_TYPE_INIT;
ggml_compute_forward(&params, node, state);
}
#pragma omp barrier
}

if (atomic_fetch_sub(&state->shared->n_active, 1) == 1) {
task_phase = GGML_TASK_TYPE_COMPUTE;
atomic_store(&state->shared->n_active, n_threads);
atomic_store(&state->shared->node_task, task_phase);
}
else {
// TODO: this sched_yield can have significant impact on the performance - either positive or negative
// depending on the workload and the operating system.
// since it is not clear what is the best approach, it should potentially become user-configurable
// ref: https://github.com/ggerganov/ggml/issues/291
// UPD: adding the do_yield flag seems to resolve the issue universally
const bool do_yield = node_n < 0 || cgraph->nodes[node_n]->op == GGML_OP_MUL_MAT;
ggml_graph_compute_thread_sync_task(&task_phase, state, do_yield);
}

if (state->ith < n_tasks) {
/* COMPUTE */
if (ith < n_tasks) {
params.type = GGML_TASK_TYPE_COMPUTE;
ggml_compute_forward(&params, node, state);
}

if (atomic_fetch_sub(&state->shared->n_active, 1) == 1) {
task_phase = GGML_TASK_TYPE_FINALIZE;
atomic_store(&state->shared->n_active, n_threads);
atomic_store(&state->shared->node_task, task_phase);
}
else {
ggml_graph_compute_thread_sync_task(&task_phase, state, false);
#pragma omp barrier

/* FINALIZE */
if (GGML_OP_HAS_FINALIZE[node->op]) {
if (params.ith == 0) {
params.type = GGML_TASK_TYPE_FINALIZE;
ggml_compute_forward(&params, node, state);
}
#pragma omp barrier
}
}

Expand Down Expand Up @@ -19336,7 +19212,6 @@ static enum ggml_status ggml_graph_compute_parallel(struct ggml_compute_state *
// update the number of threads from the actual number of threads that we got from OpenMP
n_threads = omp_get_num_threads();
workers[0].shared->n_threads = n_threads;
workers[0].shared->n_active = n_threads;
}
ggml_graph_compute_thread(&workers[omp_get_thread_num()]);
}
Expand Down Expand Up @@ -19399,9 +19274,6 @@ enum ggml_status ggml_graph_compute(struct ggml_cgraph * cgraph, struct ggml_cpl
/*.perf_node_start_cycles =*/ 0,
/*.perf_node_start_time_us =*/ 0,
/*.n_threads =*/ n_threads,
/*.n_active =*/ n_threads,
/*.node_n =*/ -1,
/*.node_task =*/ GGML_TASK_TYPE_FINALIZE,
/*.abort_callback =*/ NULL,
/*.abort_callback_data =*/ NULL,
/*.current_chunk; =*/ 0,
Expand Down

0 comments on commit 4886f50

Please sign in to comment.