Skip to content

Commit

Permalink
server: health: fix race condition on slots data using tasks queue (g…
Browse files Browse the repository at this point in the history
…gerganov#5634)

* server: health: fix race condition on slots data using tasks queue

* server: health:
    * include_slots only if slots_endpoint
    * fix compile warning task.target_id not initialized.
  • Loading branch information
phymbert authored Feb 21, 2024
1 parent a00a35c commit 1ecea25
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 43 deletions.
2 changes: 2 additions & 0 deletions examples/server/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,8 @@ node index.js
- 200 -> `{"status": "no slot available", "slots_idle": 0, "slots_processing": 32}` if no slot are currently available.
- 503 -> `{"status": "no slot available", "slots_idle": 0, "slots_processing": 32}` if the query parameter `fail_on_no_slot` is provided and no slot are currently available.

If the query parameter `include_slots` is passed, `slots` field will contain internal slots data except if `--slots-endpoint-disable` is set.

- **POST** `/completion`: Given a `prompt`, it returns the predicted completion.

*Options:*
Expand Down
122 changes: 80 additions & 42 deletions examples/server/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1394,6 +1394,46 @@ struct llama_server_context
case TASK_TYPE_NEXT_RESPONSE: {
// do nothing
} break;
case TASK_TYPE_SLOTS_DATA: {
json slots_data = json::array();
int n_idle_slots = 0;
int n_processing_slots = 0;

for (llama_client_slot &slot: slots) {
if (slot.available()) {
n_idle_slots++;
} else {
n_processing_slots++;
}
json slot_data = get_formated_generation(slot);
slot_data["id"] = slot.id;
slot_data["task_id"] = slot.task_id;
slot_data["state"] = slot.state;
slot_data["prompt"] = slot.prompt;
slot_data["next_token"] = {
{"has_next_token", slot.has_next_token},
{"n_remain", slot.n_remaining},
{"num_tokens_predicted", slot.n_decoded},
{"stopped_eos", slot.stopped_eos},
{"stopped_word", slot.stopped_word},
{"stopped_limit", slot.stopped_limit},
{"stopping_word", slot.stopping_word},
};
slots_data.push_back(slot_data);
}
LOG_TEE("task %i - slots data: idle=%i processing=%i\n", task.id, n_idle_slots, n_processing_slots);
task_result res;
res.id = task.id;
res.multitask_id = task.multitask_id;
res.stop = true;
res.error = false;
res.result_json = {
{ "idle", n_idle_slots },
{ "processing", n_processing_slots },
{ "slots", slots_data }
};
queue_results.send(res);
} break;
}
}

Expand Down Expand Up @@ -2557,34 +2597,38 @@ int main(int argc, char **argv)
server_state current_state = state.load();
switch(current_state) {
case SERVER_STATE_READY: {
int available_slots = 0;
int processing_slots = 0;
for (llama_client_slot &slot: llama.slots) {
if (slot.available()) {
available_slots++;
} else {
processing_slots++;
}
// request slots data using task queue
task_server task;
task.id = llama.queue_tasks.get_new_id();
task.type = TASK_TYPE_SLOTS_DATA;
task.target_id = -1;

llama.queue_results.add_waiting_task_id(task.id);
llama.queue_tasks.post(task);

// get the result
task_result result = llama.queue_results.recv(task.id);
llama.queue_results.remove_waiting_task_id(task.id);

int n_idle_slots = result.result_json["idle"];
int n_processing_slots = result.result_json["processing"];

json health = {
{"status", "ok"},
{"slots_idle", n_idle_slots},
{"slots_processing", n_processing_slots}};
res.status = 200; // HTTP OK
if (sparams.slots_endpoint && req.has_param("include_slots")) {
health["slots"] = result.result_json["slots"];
}
if (available_slots > 0) {
json health = {
{"status", "ok"},
{"slots_idle", available_slots},
{"slots_processing", processing_slots}};
res.set_content(health.dump(), "application/json");
res.status = 200; // HTTP OK
} else {
json health = {
{"status", "no slot available"},
{"slots_idle", available_slots},
{"slots_processing", processing_slots}};
res.set_content(health.dump(), "application/json");

if (n_idle_slots == 0) {
health["status"] = "no slot available";
if (req.has_param("fail_on_no_slot")) {
res.status = 503; // HTTP Service Unavailable
} else {
res.status = 200; // HTTP OK
}
}
res.set_content(health.dump(), "application/json");
break;
}
case SERVER_STATE_LOADING_MODEL:
Expand All @@ -2600,26 +2644,20 @@ int main(int argc, char **argv)

if (sparams.slots_endpoint) {
svr.Get("/slots", [&](const httplib::Request&, httplib::Response& res) {
json slots;
for (llama_client_slot & slot : llama.slots) {
json slot_data = llama.get_formated_generation(slot);
slot_data["id"] = slot.id;
slot_data["task_id"] = slot.task_id;
slot_data["state"] = slot.state;
slot_data["prompt"] = slot.prompt;
slot_data["next_token"] = {
{"has_next_token", slot.has_next_token},
{"n_remain", slot.n_remaining},
{"num_tokens_predicted", slot.n_decoded},
{"stopped_eos", slot.stopped_eos},
{"stopped_word", slot.stopped_word},
{"stopped_limit", slot.stopped_limit},
{"stopping_word", slot.stopping_word},
};
// request slots data using task queue
task_server task;
task.id = llama.queue_tasks.get_new_id();
task.type = TASK_TYPE_SLOTS_DATA;
task.target_id = -1;

slots.push_back(slot_data);
}
res.set_content(slots.dump(), "application/json");
llama.queue_results.add_waiting_task_id(task.id);
llama.queue_tasks.post(task);

// get the result
task_result result = llama.queue_results.recv(task.id);
llama.queue_results.remove_waiting_task_id(task.id);

res.set_content(result.result_json["slots"].dump(), "application/json");
res.status = 200; // HTTP OK
});
}
Expand Down
3 changes: 2 additions & 1 deletion examples/server/utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ enum server_state {
enum task_type {
TASK_TYPE_COMPLETION,
TASK_TYPE_CANCEL,
TASK_TYPE_NEXT_RESPONSE
TASK_TYPE_NEXT_RESPONSE,
TASK_TYPE_SLOTS_DATA
};

struct task_server {
Expand Down

0 comments on commit 1ecea25

Please sign in to comment.