From 9c77ec1d74874ee22bdef8f110e8e8d41389abf2 Mon Sep 17 00:00:00 2001 From: slaren Date: Wed, 19 Jun 2024 15:04:15 +0200 Subject: [PATCH] ggml : synchronize threads using barriers (#7993) --- .github/workflows/server.yml | 14 +++ ggml.c | 217 +++++++++++------------------------ 2 files changed, 81 insertions(+), 150 deletions(-) diff --git a/.github/workflows/server.yml b/.github/workflows/server.yml index 1fee9ac281943..6155e94156e42 100644 --- a/.github/workflows/server.yml +++ b/.github/workflows/server.yml @@ -87,8 +87,22 @@ jobs: exit 1 fi + - name: Build (no OpenMP) + id: cmake_build_no_openmp + if: ${{ matrix.sanitizer == 'THREAD' }} + run: | + cmake -B build \ + -DLLAMA_NATIVE=OFF \ + -DLLAMA_BUILD_SERVER=ON \ + -DLLAMA_CURL=ON \ + -DCMAKE_BUILD_TYPE=${{ matrix.build_type }} \ + -DLLAMA_SANITIZE_${{ matrix.sanitizer }}=ON \ + -DLLAMA_OPENMP=OFF ; + cmake --build build --config ${{ matrix.build_type }} -j $(nproc) --target llama-server + - name: Build id: cmake_build + if: ${{ matrix.sanitizer != 'THREAD' }} run: | cmake -B build \ -DLLAMA_NATIVE=OFF \ diff --git a/ggml.c b/ggml.c index d5d33c2ba1029..778ca3fdf1f8f 100644 --- a/ggml.c +++ b/ggml.c @@ -1753,9 +1753,8 @@ struct ggml_compute_state_shared { int n_threads; // synchronization primitives - atomic_int n_active; // num active threads - atomic_int node_n; // active graph node - atomic_int node_task; // active graph node task phase + atomic_int n_barrier; + atomic_int n_barrier_passed; ggml_abort_callback abort_callback; // abort ggml_graph_compute when true void* abort_callback_data; @@ -18972,47 +18971,49 @@ static int ggml_get_n_tasks(struct ggml_tensor * node, int n_threads, int n_cur_ return n_tasks; } -static void ggml_graph_compute_thread_sync_node(int * node_n, struct ggml_compute_state * state, const bool do_yield) { - // wait for other threads to finish - const int last_node_n = * node_n; - - while (true) { - if (do_yield) { - sched_yield(); - } - - *node_n = atomic_load(&state->shared->node_n); - if (*node_n != last_node_n) { - break; - } - -#if defined(__SSE3__) - // Tell the processor we're spinning. It's a processor hint for spinlocks. - _mm_pause(); -#endif +#ifdef GGML_USE_OPENMP +static void ggml_barrier(struct ggml_compute_state * state) { + if (state->shared->n_threads == 1) { + return; } + + #pragma omp barrier } +#else +static void ggml_barrier(struct ggml_compute_state * state) { + if (state->shared->n_threads == 1) { + return; + } -static void ggml_graph_compute_thread_sync_task(int * task_phase, struct ggml_compute_state * state, const bool do_yield) { - // wait for other threads to finish - const int last_task_phase = *task_phase; + atomic_int * n_barrier = &state->shared->n_barrier; + atomic_int * n_barrier_passed = &state->shared->n_barrier_passed; - while (true) { - if (do_yield) { - sched_yield(); - } + int n_threads = state->shared->n_threads; + int passed_old = atomic_load(n_barrier_passed); - *task_phase = atomic_load(&state->shared->node_task); - if (*task_phase != last_task_phase) { - break; + if (atomic_fetch_add(n_barrier, 1) == n_threads - 1) { + // last thread + atomic_store(n_barrier, 0); + atomic_fetch_add(n_barrier_passed, 1); + } else { + // wait for other threads + //while (atomic_load(n_barrier_passed) == passed_old) { + //} + const int n_spin_before_sleep = 100000; + while (true) { + for (int i = 0; i < n_spin_before_sleep; i++) { + if (atomic_load(n_barrier_passed) != passed_old) { + return; + } + #if defined(__SSE3__) + _mm_pause(); + #endif + } + sched_yield(); } - -#if defined(__SSE3__) - // Tell the processor we're spinning. It's a processor hint for spinlocks. - _mm_pause(); -#endif } } +#endif static thread_ret_t ggml_graph_compute_thread(void * data) { struct ggml_compute_state * state = (struct ggml_compute_state *) data; @@ -19020,136 +19021,54 @@ static thread_ret_t ggml_graph_compute_thread(void * data) { const struct ggml_cgraph * cgraph = state->shared->cgraph; const struct ggml_cplan * cplan = state->shared->cplan; - const int n_threads = state->shared->n_threads; + const int ith = state->ith; + const int n_threads = state->shared->n_threads; - set_numa_thread_affinity(state->ith); + set_numa_thread_affinity(ith); - int node_n = -1; - int task_phase = GGML_TASK_TYPE_FINALIZE; + struct ggml_compute_params params = { + /*.type =*/ GGML_TASK_TYPE_INIT, + /*.ith =*/ ith, + /*.nth =*/ state->shared->n_threads, + /*.wsize =*/ cplan->work_size, + /*.wdata =*/ cplan->work_data, + }; - while (true) { + for (int node_n = 0; node_n < cgraph->n_nodes; node_n++) { if (cplan->abort_callback && cplan->abort_callback(cplan->abort_callback_data)) { - state->shared->node_n += 1; state->ec = GGML_STATUS_ABORTED; return 0; } - if (atomic_fetch_sub(&state->shared->n_active, 1) == 1) { - // all other threads are finished and spinning - // do finalize and init here so we don't have synchronize again - struct ggml_compute_params params = { - /*.type =*/ GGML_TASK_TYPE_FINALIZE, - /*.ith =*/ 0, - /*.nth =*/ 0, - /*.wsize =*/ cplan->work_size, - /*.wdata =*/ cplan->work_data, - }; - - if (node_n != -1) { - /* FINALIZE */ - struct ggml_tensor * node = cgraph->nodes[node_n]; - if (GGML_OP_HAS_FINALIZE[node->op]) { - params.nth = ggml_get_n_tasks(node, n_threads, state->shared->n_threads); - ggml_compute_forward(¶ms, node, state); - } - ggml_graph_compute_perf_stats_node(node, state->shared); - } - - // distribute new work or execute it direct if 1T - while (++node_n < cgraph->n_nodes) { - GGML_PRINT_DEBUG_5("%s: %d/%d\n", __func__, node_n, cgraph->n_nodes); - struct ggml_tensor * node = cgraph->nodes[node_n]; - const int n_tasks = ggml_get_n_tasks(node, n_threads, state->shared->n_threads); - - state->shared->perf_node_start_cycles = ggml_perf_cycles(); - state->shared->perf_node_start_time_us = ggml_perf_time_us(); - - params.nth = n_tasks; - - if (n_tasks == 1) { - /* INIT */ - if (GGML_OP_HAS_INIT[node->op]) { - params.type = GGML_TASK_TYPE_INIT; - ggml_compute_forward(¶ms, node, state); - } - - // TODO: maybe push node_n to the atomic but if other threads see n_tasks is 1, - // they do something more efficient than spinning (?) - params.type = GGML_TASK_TYPE_COMPUTE; - ggml_compute_forward(¶ms, node, state); - - if (GGML_OP_HAS_FINALIZE[node->op]) { - params.type = GGML_TASK_TYPE_FINALIZE; - ggml_compute_forward(¶ms, node, state); - } - - ggml_graph_compute_perf_stats_node(node, state->shared); - } else { - break; - } - - if (cplan->abort_callback && cplan->abort_callback(cplan->abort_callback_data)) { - break; - } - } - - task_phase = GGML_TASK_TYPE_INIT; - atomic_store(&state->shared->n_active, n_threads); - atomic_store(&state->shared->node_n, node_n); - atomic_store(&state->shared->node_task, task_phase); - } else { - ggml_graph_compute_thread_sync_node(&node_n, state, false); - ggml_graph_compute_thread_sync_task(&task_phase, state, false); - } - - // check if we should stop - if (node_n >= cgraph->n_nodes) break; - - /* INIT & COMPUTE */ struct ggml_tensor * node = cgraph->nodes[node_n]; const int n_tasks = ggml_get_n_tasks(node, n_threads, state->shared->n_threads); - struct ggml_compute_params params = { - /*.type =*/ GGML_TASK_TYPE_INIT, - /*.ith =*/ state->ith, - /*.nth =*/ n_tasks, - /*.wsize =*/ cplan->work_size, - /*.wdata =*/ cplan->work_data, - }; + params.nth = n_tasks; - if (state->ith < n_tasks) { - if (GGML_OP_HAS_INIT[node->op]) { + /* INIT */ + if (GGML_OP_HAS_INIT[node->op]) { + if (ith < n_tasks) { + params.type = GGML_TASK_TYPE_INIT; ggml_compute_forward(¶ms, node, state); } + ggml_barrier(state); } - if (atomic_fetch_sub(&state->shared->n_active, 1) == 1) { - task_phase = GGML_TASK_TYPE_COMPUTE; - atomic_store(&state->shared->n_active, n_threads); - atomic_store(&state->shared->node_task, task_phase); - } - else { - // TODO: this sched_yield can have significant impact on the performance - either positive or negative - // depending on the workload and the operating system. - // since it is not clear what is the best approach, it should potentially become user-configurable - // ref: https://github.com/ggerganov/ggml/issues/291 - // UPD: adding the do_yield flag seems to resolve the issue universally - const bool do_yield = node_n < 0 || cgraph->nodes[node_n]->op == GGML_OP_MUL_MAT; - ggml_graph_compute_thread_sync_task(&task_phase, state, do_yield); - } - - if (state->ith < n_tasks) { + /* COMPUTE */ + if (ith < n_tasks) { params.type = GGML_TASK_TYPE_COMPUTE; ggml_compute_forward(¶ms, node, state); } - if (atomic_fetch_sub(&state->shared->n_active, 1) == 1) { - task_phase = GGML_TASK_TYPE_FINALIZE; - atomic_store(&state->shared->n_active, n_threads); - atomic_store(&state->shared->node_task, task_phase); - } - else { - ggml_graph_compute_thread_sync_task(&task_phase, state, false); + ggml_barrier(state); + + /* FINALIZE */ + if (GGML_OP_HAS_FINALIZE[node->op]) { + if (params.ith == 0) { + params.type = GGML_TASK_TYPE_FINALIZE; + ggml_compute_forward(¶ms, node, state); + } + ggml_barrier(state); } } @@ -19336,7 +19255,6 @@ static enum ggml_status ggml_graph_compute_parallel(struct ggml_compute_state * // update the number of threads from the actual number of threads that we got from OpenMP n_threads = omp_get_num_threads(); workers[0].shared->n_threads = n_threads; - workers[0].shared->n_active = n_threads; } ggml_graph_compute_thread(&workers[omp_get_thread_num()]); } @@ -19399,9 +19317,8 @@ enum ggml_status ggml_graph_compute(struct ggml_cgraph * cgraph, struct ggml_cpl /*.perf_node_start_cycles =*/ 0, /*.perf_node_start_time_us =*/ 0, /*.n_threads =*/ n_threads, - /*.n_active =*/ n_threads, - /*.node_n =*/ -1, - /*.node_task =*/ GGML_TASK_TYPE_FINALIZE, + /*.n_barrier =*/ 0, + /*.n_barrier_passed =*/ 0, /*.abort_callback =*/ NULL, /*.abort_callback_data =*/ NULL, /*.current_chunk; =*/ 0,