From 1ecea255ebb70750b52688393f37a63606b90e3f Mon Sep 17 00:00:00 2001 From: Pierrick Hymbert Date: Wed, 21 Feb 2024 15:47:48 +0100 Subject: [PATCH] server: health: fix race condition on slots data using tasks queue (#5634) * server: health: fix race condition on slots data using tasks queue * server: health: * include_slots only if slots_endpoint * fix compile warning task.target_id not initialized. --- examples/server/README.md | 2 + examples/server/server.cpp | 122 ++++++++++++++++++++++++------------- examples/server/utils.hpp | 3 +- 3 files changed, 84 insertions(+), 43 deletions(-) diff --git a/examples/server/README.md b/examples/server/README.md index f6b9c74021f30..6d9f96cd4ba64 100644 --- a/examples/server/README.md +++ b/examples/server/README.md @@ -140,6 +140,8 @@ node index.js - 200 -> `{"status": "no slot available", "slots_idle": 0, "slots_processing": 32}` if no slot are currently available. - 503 -> `{"status": "no slot available", "slots_idle": 0, "slots_processing": 32}` if the query parameter `fail_on_no_slot` is provided and no slot are currently available. + If the query parameter `include_slots` is passed, `slots` field will contain internal slots data except if `--slots-endpoint-disable` is set. + - **POST** `/completion`: Given a `prompt`, it returns the predicted completion. *Options:* diff --git a/examples/server/server.cpp b/examples/server/server.cpp index eb01729fa7a8a..1c44795128fc1 100644 --- a/examples/server/server.cpp +++ b/examples/server/server.cpp @@ -1394,6 +1394,46 @@ struct llama_server_context case TASK_TYPE_NEXT_RESPONSE: { // do nothing } break; + case TASK_TYPE_SLOTS_DATA: { + json slots_data = json::array(); + int n_idle_slots = 0; + int n_processing_slots = 0; + + for (llama_client_slot &slot: slots) { + if (slot.available()) { + n_idle_slots++; + } else { + n_processing_slots++; + } + json slot_data = get_formated_generation(slot); + slot_data["id"] = slot.id; + slot_data["task_id"] = slot.task_id; + slot_data["state"] = slot.state; + slot_data["prompt"] = slot.prompt; + slot_data["next_token"] = { + {"has_next_token", slot.has_next_token}, + {"n_remain", slot.n_remaining}, + {"num_tokens_predicted", slot.n_decoded}, + {"stopped_eos", slot.stopped_eos}, + {"stopped_word", slot.stopped_word}, + {"stopped_limit", slot.stopped_limit}, + {"stopping_word", slot.stopping_word}, + }; + slots_data.push_back(slot_data); + } + LOG_TEE("task %i - slots data: idle=%i processing=%i\n", task.id, n_idle_slots, n_processing_slots); + task_result res; + res.id = task.id; + res.multitask_id = task.multitask_id; + res.stop = true; + res.error = false; + res.result_json = { + { "idle", n_idle_slots }, + { "processing", n_processing_slots }, + { "slots", slots_data } + }; + queue_results.send(res); + } break; } } @@ -2557,34 +2597,38 @@ int main(int argc, char **argv) server_state current_state = state.load(); switch(current_state) { case SERVER_STATE_READY: { - int available_slots = 0; - int processing_slots = 0; - for (llama_client_slot &slot: llama.slots) { - if (slot.available()) { - available_slots++; - } else { - processing_slots++; - } + // request slots data using task queue + task_server task; + task.id = llama.queue_tasks.get_new_id(); + task.type = TASK_TYPE_SLOTS_DATA; + task.target_id = -1; + + llama.queue_results.add_waiting_task_id(task.id); + llama.queue_tasks.post(task); + + // get the result + task_result result = llama.queue_results.recv(task.id); + llama.queue_results.remove_waiting_task_id(task.id); + + int n_idle_slots = result.result_json["idle"]; + int n_processing_slots = result.result_json["processing"]; + + json health = { + {"status", "ok"}, + {"slots_idle", n_idle_slots}, + {"slots_processing", n_processing_slots}}; + res.status = 200; // HTTP OK + if (sparams.slots_endpoint && req.has_param("include_slots")) { + health["slots"] = result.result_json["slots"]; } - if (available_slots > 0) { - json health = { - {"status", "ok"}, - {"slots_idle", available_slots}, - {"slots_processing", processing_slots}}; - res.set_content(health.dump(), "application/json"); - res.status = 200; // HTTP OK - } else { - json health = { - {"status", "no slot available"}, - {"slots_idle", available_slots}, - {"slots_processing", processing_slots}}; - res.set_content(health.dump(), "application/json"); + + if (n_idle_slots == 0) { + health["status"] = "no slot available"; if (req.has_param("fail_on_no_slot")) { res.status = 503; // HTTP Service Unavailable - } else { - res.status = 200; // HTTP OK } } + res.set_content(health.dump(), "application/json"); break; } case SERVER_STATE_LOADING_MODEL: @@ -2600,26 +2644,20 @@ int main(int argc, char **argv) if (sparams.slots_endpoint) { svr.Get("/slots", [&](const httplib::Request&, httplib::Response& res) { - json slots; - for (llama_client_slot & slot : llama.slots) { - json slot_data = llama.get_formated_generation(slot); - slot_data["id"] = slot.id; - slot_data["task_id"] = slot.task_id; - slot_data["state"] = slot.state; - slot_data["prompt"] = slot.prompt; - slot_data["next_token"] = { - {"has_next_token", slot.has_next_token}, - {"n_remain", slot.n_remaining}, - {"num_tokens_predicted", slot.n_decoded}, - {"stopped_eos", slot.stopped_eos}, - {"stopped_word", slot.stopped_word}, - {"stopped_limit", slot.stopped_limit}, - {"stopping_word", slot.stopping_word}, - }; + // request slots data using task queue + task_server task; + task.id = llama.queue_tasks.get_new_id(); + task.type = TASK_TYPE_SLOTS_DATA; + task.target_id = -1; - slots.push_back(slot_data); - } - res.set_content(slots.dump(), "application/json"); + llama.queue_results.add_waiting_task_id(task.id); + llama.queue_tasks.post(task); + + // get the result + task_result result = llama.queue_results.recv(task.id); + llama.queue_results.remove_waiting_task_id(task.id); + + res.set_content(result.result_json["slots"].dump(), "application/json"); res.status = 200; // HTTP OK }); } diff --git a/examples/server/utils.hpp b/examples/server/utils.hpp index e954fb0ef0413..88545eb6931d0 100644 --- a/examples/server/utils.hpp +++ b/examples/server/utils.hpp @@ -49,7 +49,8 @@ enum server_state { enum task_type { TASK_TYPE_COMPLETION, TASK_TYPE_CANCEL, - TASK_TYPE_NEXT_RESPONSE + TASK_TYPE_NEXT_RESPONSE, + TASK_TYPE_SLOTS_DATA }; struct task_server {