From 4886f5001d9f6fc9089f0640cb79030c5a2b7e50 Mon Sep 17 00:00:00 2001 From: slaren Date: Tue, 18 Jun 2024 18:38:49 +0200 Subject: [PATCH] ggml : synchronize using openmp barriers --- ggml.c | 190 ++++++++++----------------------------------------------- 1 file changed, 31 insertions(+), 159 deletions(-) diff --git a/ggml.c b/ggml.c index d5d33c2ba1029..ad6172c023d4c 100644 --- a/ggml.c +++ b/ggml.c @@ -1753,9 +1753,9 @@ struct ggml_compute_state_shared { int n_threads; // synchronization primitives - atomic_int n_active; // num active threads - atomic_int node_n; // active graph node - atomic_int node_task; // active graph node task phase + //atomic_int n_active; // num active threads + //atomic_int node_n; // active graph node + //atomic_int node_task; // active graph node task phase ggml_abort_callback abort_callback; // abort ggml_graph_compute when true void* abort_callback_data; @@ -18972,184 +18972,60 @@ static int ggml_get_n_tasks(struct ggml_tensor * node, int n_threads, int n_cur_ return n_tasks; } -static void ggml_graph_compute_thread_sync_node(int * node_n, struct ggml_compute_state * state, const bool do_yield) { - // wait for other threads to finish - const int last_node_n = * node_n; - - while (true) { - if (do_yield) { - sched_yield(); - } - - *node_n = atomic_load(&state->shared->node_n); - if (*node_n != last_node_n) { - break; - } - -#if defined(__SSE3__) - // Tell the processor we're spinning. It's a processor hint for spinlocks. - _mm_pause(); -#endif - } -} - -static void ggml_graph_compute_thread_sync_task(int * task_phase, struct ggml_compute_state * state, const bool do_yield) { - // wait for other threads to finish - const int last_task_phase = *task_phase; - - while (true) { - if (do_yield) { - sched_yield(); - } - - *task_phase = atomic_load(&state->shared->node_task); - if (*task_phase != last_task_phase) { - break; - } - -#if defined(__SSE3__) - // Tell the processor we're spinning. It's a processor hint for spinlocks. - _mm_pause(); -#endif - } -} - static thread_ret_t ggml_graph_compute_thread(void * data) { struct ggml_compute_state * state = (struct ggml_compute_state *) data; const struct ggml_cgraph * cgraph = state->shared->cgraph; const struct ggml_cplan * cplan = state->shared->cplan; - const int n_threads = state->shared->n_threads; + const int ith = state->ith; + const int n_threads = state->shared->n_threads; - set_numa_thread_affinity(state->ith); + set_numa_thread_affinity(ith); - int node_n = -1; - int task_phase = GGML_TASK_TYPE_FINALIZE; + struct ggml_compute_params params = { + /*.type =*/ GGML_TASK_TYPE_INIT, + /*.ith =*/ ith, + /*.nth =*/ state->shared->n_threads, + /*.wsize =*/ cplan->work_size, + /*.wdata =*/ cplan->work_data, + }; - while (true) { + for (int node_n = 0; node_n < cgraph->n_nodes; node_n++) { if (cplan->abort_callback && cplan->abort_callback(cplan->abort_callback_data)) { - state->shared->node_n += 1; state->ec = GGML_STATUS_ABORTED; return 0; } - if (atomic_fetch_sub(&state->shared->n_active, 1) == 1) { - // all other threads are finished and spinning - // do finalize and init here so we don't have synchronize again - struct ggml_compute_params params = { - /*.type =*/ GGML_TASK_TYPE_FINALIZE, - /*.ith =*/ 0, - /*.nth =*/ 0, - /*.wsize =*/ cplan->work_size, - /*.wdata =*/ cplan->work_data, - }; - - if (node_n != -1) { - /* FINALIZE */ - struct ggml_tensor * node = cgraph->nodes[node_n]; - if (GGML_OP_HAS_FINALIZE[node->op]) { - params.nth = ggml_get_n_tasks(node, n_threads, state->shared->n_threads); - ggml_compute_forward(¶ms, node, state); - } - ggml_graph_compute_perf_stats_node(node, state->shared); - } - - // distribute new work or execute it direct if 1T - while (++node_n < cgraph->n_nodes) { - GGML_PRINT_DEBUG_5("%s: %d/%d\n", __func__, node_n, cgraph->n_nodes); - struct ggml_tensor * node = cgraph->nodes[node_n]; - const int n_tasks = ggml_get_n_tasks(node, n_threads, state->shared->n_threads); - - state->shared->perf_node_start_cycles = ggml_perf_cycles(); - state->shared->perf_node_start_time_us = ggml_perf_time_us(); - - params.nth = n_tasks; - - if (n_tasks == 1) { - /* INIT */ - if (GGML_OP_HAS_INIT[node->op]) { - params.type = GGML_TASK_TYPE_INIT; - ggml_compute_forward(¶ms, node, state); - } - - // TODO: maybe push node_n to the atomic but if other threads see n_tasks is 1, - // they do something more efficient than spinning (?) - params.type = GGML_TASK_TYPE_COMPUTE; - ggml_compute_forward(¶ms, node, state); - - if (GGML_OP_HAS_FINALIZE[node->op]) { - params.type = GGML_TASK_TYPE_FINALIZE; - ggml_compute_forward(¶ms, node, state); - } - - ggml_graph_compute_perf_stats_node(node, state->shared); - } else { - break; - } - - if (cplan->abort_callback && cplan->abort_callback(cplan->abort_callback_data)) { - break; - } - } - - task_phase = GGML_TASK_TYPE_INIT; - atomic_store(&state->shared->n_active, n_threads); - atomic_store(&state->shared->node_n, node_n); - atomic_store(&state->shared->node_task, task_phase); - } else { - ggml_graph_compute_thread_sync_node(&node_n, state, false); - ggml_graph_compute_thread_sync_task(&task_phase, state, false); - } - - // check if we should stop - if (node_n >= cgraph->n_nodes) break; - - /* INIT & COMPUTE */ struct ggml_tensor * node = cgraph->nodes[node_n]; const int n_tasks = ggml_get_n_tasks(node, n_threads, state->shared->n_threads); - struct ggml_compute_params params = { - /*.type =*/ GGML_TASK_TYPE_INIT, - /*.ith =*/ state->ith, - /*.nth =*/ n_tasks, - /*.wsize =*/ cplan->work_size, - /*.wdata =*/ cplan->work_data, - }; + params.nth = n_tasks; - if (state->ith < n_tasks) { - if (GGML_OP_HAS_INIT[node->op]) { + /* INIT */ + if (GGML_OP_HAS_INIT[node->op]) { + if (ith < n_tasks) { + params.type = GGML_TASK_TYPE_INIT; ggml_compute_forward(¶ms, node, state); } + #pragma omp barrier } - if (atomic_fetch_sub(&state->shared->n_active, 1) == 1) { - task_phase = GGML_TASK_TYPE_COMPUTE; - atomic_store(&state->shared->n_active, n_threads); - atomic_store(&state->shared->node_task, task_phase); - } - else { - // TODO: this sched_yield can have significant impact on the performance - either positive or negative - // depending on the workload and the operating system. - // since it is not clear what is the best approach, it should potentially become user-configurable - // ref: https://github.com/ggerganov/ggml/issues/291 - // UPD: adding the do_yield flag seems to resolve the issue universally - const bool do_yield = node_n < 0 || cgraph->nodes[node_n]->op == GGML_OP_MUL_MAT; - ggml_graph_compute_thread_sync_task(&task_phase, state, do_yield); - } - - if (state->ith < n_tasks) { + /* COMPUTE */ + if (ith < n_tasks) { params.type = GGML_TASK_TYPE_COMPUTE; ggml_compute_forward(¶ms, node, state); } - if (atomic_fetch_sub(&state->shared->n_active, 1) == 1) { - task_phase = GGML_TASK_TYPE_FINALIZE; - atomic_store(&state->shared->n_active, n_threads); - atomic_store(&state->shared->node_task, task_phase); - } - else { - ggml_graph_compute_thread_sync_task(&task_phase, state, false); + #pragma omp barrier + + /* FINALIZE */ + if (GGML_OP_HAS_FINALIZE[node->op]) { + if (params.ith == 0) { + params.type = GGML_TASK_TYPE_FINALIZE; + ggml_compute_forward(¶ms, node, state); + } + #pragma omp barrier } } @@ -19336,7 +19212,6 @@ static enum ggml_status ggml_graph_compute_parallel(struct ggml_compute_state * // update the number of threads from the actual number of threads that we got from OpenMP n_threads = omp_get_num_threads(); workers[0].shared->n_threads = n_threads; - workers[0].shared->n_active = n_threads; } ggml_graph_compute_thread(&workers[omp_get_thread_num()]); } @@ -19399,9 +19274,6 @@ enum ggml_status ggml_graph_compute(struct ggml_cgraph * cgraph, struct ggml_cpl /*.perf_node_start_cycles =*/ 0, /*.perf_node_start_time_us =*/ 0, /*.n_threads =*/ n_threads, - /*.n_active =*/ n_threads, - /*.node_n =*/ -1, - /*.node_task =*/ GGML_TASK_TYPE_FINALIZE, /*.abort_callback =*/ NULL, /*.abort_callback_data =*/ NULL, /*.current_chunk; =*/ 0,