Skip to content

Commit

Permalink
vine: cleanup hungry feature (#3990)
Browse files Browse the repository at this point in the history
* add committed resources and hungry_factor to hungry feature

Use committed resources as a fallback when waiting tasks do not declare
resources. Also fix the use of hungry_factor, instead of simply using
the number 2.

* add test

* format

* limit sample of waiting tasks to attempt_schedule_depth

* use number of workers as minimum

* correctly consider sampling of tasks
  • Loading branch information
btovar authored Dec 2, 2024
1 parent c364db8 commit e885f92
Show file tree
Hide file tree
Showing 3 changed files with 246 additions and 79 deletions.
144 changes: 65 additions & 79 deletions taskvine/src/manager/vine_manager.c
Original file line number Diff line number Diff line change
Expand Up @@ -5205,31 +5205,56 @@ struct vine_task *vine_manager_no_wait(struct vine_manager *q, const char *tag,
//@return: approximate number of additional tasks if hungry, 0 otherwise
int vine_hungry(struct vine_manager *q)
{
// check if manager is initialized
// return false if not
if (q == NULL) {
if (!q) {
return 0;
}

struct vine_stats qstats;
vine_get_stats(q, &qstats);

// if number of ready tasks is the queue is less than the miniumum, then it is hungry
if (qstats.tasks_waiting < q->hungry_minimum) {
return q->hungry_minimum - qstats.tasks_waiting;
// set min tasks running to 1. if it was 0, then committed resource would be 0 anyway so average works out to 0.
int64_t tasks_running = MAX(qstats.tasks_running, 1);
int64_t tasks_waiting = qstats.tasks_waiting;

/* queue is hungry according to the number of workers available (assume each worker can run at least one task) */
int hungry_minimum = MAX(q->hungry_minimum, qstats.workers_connected * q->hungry_minimum_factor);

if (tasks_running < 1 && tasks_waiting < 1) {
return hungry_minimum;
}

/* assume a task uses at least one core, otherwise if no resource is specified, the queue is infinitely hungry */
int64_t avg_commited_tasks_cores = MAX(1, DIV_INT_ROUND_UP(qstats.committed_cores, tasks_running));
int64_t avg_commited_tasks_memory = DIV_INT_ROUND_UP(qstats.committed_memory, tasks_running);
int64_t avg_commited_tasks_disk = DIV_INT_ROUND_UP(qstats.committed_disk, tasks_running);
int64_t avg_commited_tasks_gpus = DIV_INT_ROUND_UP(qstats.committed_gpus, tasks_running);

// get total available resources consumption (cores, memory, disk, gpus) of all workers of this manager
// available = total (all) - committed (actual in use)
int64_t workers_total_avail_cores = 0;
int64_t workers_total_avail_memory = 0;
int64_t workers_total_avail_disk = 0;
int64_t workers_total_avail_gpus = 0;
// Find available resources (2 * total - committed)
workers_total_avail_cores = 2 * qstats.total_cores - qstats.committed_cores;
workers_total_avail_memory = 2 * qstats.total_memory - qstats.committed_memory;
workers_total_avail_gpus = 2 * qstats.total_gpus - qstats.committed_gpus;
workers_total_avail_disk = 2 * qstats.total_disk - qstats.committed_disk; // never overcommit disk
// available = factor*total (all) - committed (actual in use)
int64_t workers_total_avail_cores = q->hungry_minimum_factor * qstats.total_cores - qstats.committed_cores;
int64_t workers_total_avail_memory = q->hungry_minimum_factor * qstats.total_memory - qstats.committed_memory;
int64_t workers_total_avail_disk = q->hungry_minimum_factor * qstats.total_disk - qstats.committed_disk;
int64_t workers_total_avail_gpus = q->hungry_minimum_factor * qstats.total_gpus - qstats.committed_gpus;

int64_t tasks_needed = 0;
if (tasks_waiting < 1) {
tasks_needed = DIV_INT_ROUND_UP(workers_total_avail_cores, avg_commited_tasks_cores);
if (avg_commited_tasks_memory > 0) {
tasks_needed = MIN(tasks_needed, DIV_INT_ROUND_UP(workers_total_avail_memory, avg_commited_tasks_memory));
}

if (avg_commited_tasks_disk > 0) {
tasks_needed = MIN(tasks_needed, DIV_INT_ROUND_UP(workers_total_avail_disk, avg_commited_tasks_disk));
}

if (avg_commited_tasks_gpus > 0) {
tasks_needed = MIN(tasks_needed, DIV_INT_ROUND_UP(workers_total_avail_gpus, avg_commited_tasks_gpus));
}

return MAX(tasks_needed, hungry_minimum);
}

// from here on we can assume that tasks_waiting > 0.

// get required resources (cores, memory, disk, gpus) of one (all?) waiting tasks
// seems to iterate through all tasks counted in the queue.
Expand All @@ -5240,79 +5265,40 @@ int vine_hungry(struct vine_manager *q)

int t_idx;
struct vine_task *t;
int iter_count = 0;
int iter_depth = priority_queue_size(q->ready_tasks);

PRIORITY_QUEUE_BASE_ITERATE(q->ready_tasks, t_idx, t, iter_count, iter_depth)
int iter_depth = MIN(q->attempt_schedule_depth, tasks_waiting);
int sampled_tasks_waiting = 0;
PRIORITY_QUEUE_BASE_ITERATE(q->ready_tasks, t_idx, t, sampled_tasks_waiting, iter_depth)
{
ready_task_cores += MAX(1, t->resources_requested->cores);
ready_task_memory += t->resources_requested->memory;
ready_task_disk += t->resources_requested->disk;
ready_task_gpus += t->resources_requested->gpus;
/* unset resources are marked with -1, so we added what we know about currently running tasks */
ready_task_cores += t->resources_requested->cores > 0 ? t->resources_requested->cores : avg_commited_tasks_cores;
ready_task_memory += t->resources_requested->memory > 0 ? t->resources_requested->memory : avg_commited_tasks_memory;
ready_task_disk += t->resources_requested->disk > 0 ? t->resources_requested->disk : avg_commited_tasks_disk;
ready_task_gpus += t->resources_requested->gpus > 0 ? t->resources_requested->gpus : avg_commited_tasks_gpus;
}

int count = priority_queue_size(q->ready_tasks);
int64_t avg_ready_tasks_cores = DIV_INT_ROUND_UP(ready_task_cores, sampled_tasks_waiting);
int64_t avg_ready_tasks_memory = DIV_INT_ROUND_UP(ready_task_memory, sampled_tasks_waiting);
int64_t avg_ready_tasks_disk = DIV_INT_ROUND_UP(ready_task_disk, sampled_tasks_waiting);
int64_t avg_ready_tasks_gpus = DIV_INT_ROUND_UP(ready_task_gpus, sampled_tasks_waiting);

int64_t avg_additional_tasks_cores, avg_additional_tasks_memory, avg_additional_tasks_disk, avg_additional_tasks_gpus;
// since sampled_tasks_waiting > 0 and avg_commited_tasks_cores > 0, then ready_task_cores > 0 and avg_ready_tasks_cores > 0
tasks_needed = DIV_INT_ROUND_UP(workers_total_avail_cores, avg_ready_tasks_cores);

if (ready_task_cores > workers_total_avail_cores) {
return 0;
}
if (ready_task_memory > workers_total_avail_memory) {
return 0;
if (avg_ready_tasks_memory > 0) {
tasks_needed = MIN(tasks_needed, DIV_INT_ROUND_UP(workers_total_avail_memory, avg_ready_tasks_memory));
}
if (ready_task_disk > workers_total_avail_disk) {
return 0;

if (avg_ready_tasks_disk > 0) {
tasks_needed = MIN(tasks_needed, DIV_INT_ROUND_UP(workers_total_avail_disk, avg_ready_tasks_disk));
}
if (ready_task_gpus > workers_total_avail_gpus) {
return 0;

if (avg_ready_tasks_gpus > 0) {
tasks_needed = MIN(tasks_needed, DIV_INT_ROUND_UP(workers_total_avail_gpus, avg_ready_tasks_gpus));
}

if (ready_task_cores < 0)
ready_task_cores = 0;
if (ready_task_memory < 0)
ready_task_memory = 0;
if (ready_task_disk < 0)
ready_task_disk = 0;
if (ready_task_gpus < 0)
ready_task_gpus = 0;
tasks_needed = MAX(0, MAX(tasks_needed, hungry_minimum) - tasks_waiting);

if (count != 0) { // each statement counts the available (2*total - committed) and further subtracts the ready/in-queue tasks and then finds how mabny more
if (ready_task_cores != 0) {
avg_additional_tasks_cores = (workers_total_avail_cores - ready_task_cores) / (ready_task_cores / count);
} else {
avg_additional_tasks_cores = workers_total_avail_cores;
}
if (ready_task_memory != 0) {
avg_additional_tasks_memory = (workers_total_avail_memory - ready_task_memory) / (ready_task_memory / count);
} else {
avg_additional_tasks_memory = workers_total_avail_cores;
}
if (ready_task_disk != 0) {
avg_additional_tasks_disk = (workers_total_avail_disk - ready_task_disk) / (ready_task_disk / count);
} else {
avg_additional_tasks_disk = workers_total_avail_cores;
}
if (ready_task_gpus != 0) {
avg_additional_tasks_gpus = (workers_total_avail_gpus - ready_task_gpus) / (ready_task_gpus / count);
} else {
avg_additional_tasks_gpus = workers_total_avail_cores;
}
} else {
return workers_total_avail_cores; // this returns number of cores if no tasks in queue and we have resources.
}
// find the limiting factor
int64_t min = avg_additional_tasks_cores;
if (avg_additional_tasks_memory < min)
min = avg_additional_tasks_memory;
if (avg_additional_tasks_disk < min)
min = avg_additional_tasks_disk;
if (avg_additional_tasks_gpus < min)
min = avg_additional_tasks_gpus;
if (min < 0)
min = 0; // if for some reason we have a negative, just make it 0

return min;
return tasks_needed;
}

int vine_workers_shutdown(struct vine_manager *q, int n)
Expand Down
38 changes: 38 additions & 0 deletions taskvine/test/TR_vine_hungry.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
#!/bin/sh
set -ex

. ../../dttools/test/test_runner_common.sh

import_config_val CCTOOLS_PYTHON_TEST_EXEC
import_config_val CCTOOLS_PYTHON_TEST_DIR

export PATH=$(pwd)/../src/worker:$(pwd)/../../batch_job/src:$PATH
export PYTHONPATH=$(pwd)/../../test_support/python_modules/${CCTOOLS_PYTHON_TEST_DIR}:$PYTHONPATH

check_needed()
{
[ -n "${CCTOOLS_PYTHON_TEST_EXEC}" ] || return 1
}

prepare()
{
return 0
}

run()
{
# send taskvine to the background, saving its exit status.
exec ${CCTOOLS_PYTHON_TEST_EXEC} vine_python_hungry.py
}

clean()
{
rm -rf vine-run-info

exit 0
}


dispatch "$@"

# vim: set noexpandtab tabstop=4:
143 changes: 143 additions & 0 deletions taskvine/test/vine_python_hungry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
#! /usr/bin/env python

import ndcctools.taskvine as vine
import signal
import sys


def timeout(signum, frame):
print("hungry test did not finish in time")
sys.exit(1)


command = "sleep 120"


signal.signal(signal.SIGALRM, timeout)
signal.alarm(600)


m = vine.Manager(port=0)

m.tune("hungry-minimum", 11)
assert m.hungry() == 11

m.tune("hungry-minimum", 2)
assert m.hungry() == 2

t_1 = vine.Task(command)
i_1 = m.submit(t_1)

assert m.hungry() == 1

t_2 = vine.Task(command)
i_2 = m.submit(t_2)

assert m.hungry() == 0

worker_cores = 12
worker_memory = 1200
worker_disk = 1200

workers = vine.Factory("local", manager=m)
workers.max_workers = 1
workers.min_workers = 1
workers.cores = worker_cores
workers.disk = worker_disk
workers.memory = worker_memory

with workers:
while m.stats.tasks_running < 1:
m.wait(1)

# hungry-minimum is 2, 2 tasks submitted, one waiting, thus hungry for 1 task
assert m.hungry() == 1

m.tune("hungry-minimum", 5)

# hungry-minimum is 5, 2 tasks submitted, one waiting, thus hungry for 4 tasks
assert m.hungry() == 4

m.cancel_by_task_id(i_1)
m.cancel_by_task_id(i_2)

while m.stats.tasks_running > 0:
m.wait(1)

# hungry-minimum is 5, no tasks submitted, thus hungry for max of 5 tasks and 2 x worker_cores
assert m.hungry() == 2 * worker_cores

t_3 = vine.Task(command)
t_3.set_cores(1)
i_3 = m.submit(t_3)

while m.stats.tasks_running < 1:
m.wait(1)

# hungry-minimum is 5, and tasks with one core is running. max of 5 and 2 x worker_cores - cores running
assert m.hungry() == worker_cores * 2 - 1

factor = 3
m.tune("hungry-minimum-factor", factor)

# as previous, but now available has different hungry factor
assert m.hungry() == worker_cores * factor - 1

t_4 = vine.Task(command)
t_4.set_cores(worker_cores - 1)
i_4 = m.submit(t_4)

while m.stats.tasks_running < 2:
m.wait(1)

# hungry-minimum is 5, and all cores are being used
assert m.hungry() == 5

m.cancel_by_task_id(i_3)
m.cancel_by_task_id(i_4)

while m.stats.tasks_running > 0:
m.wait(1)

mem_task = int(2 * worker_memory / worker_cores)
t_5 = vine.Task(command)
t_5.set_cores(1)
t_5.set_memory(mem_task)
i_5 = m.submit(t_5)

while m.stats.tasks_running < 1:
m.wait(1)

# memory should be the limit factor here
assert m.hungry() == (factor * worker_memory - mem_task) / mem_task

m.cancel_by_task_id(i_5)

cores_t_6 = 1
t_6 = vine.Task(command)
t_6.set_cores(cores_t_6)
t_6.set_memory(1)
t_6.set_disk(1)
i_6 = m.submit(t_6)

cores_t_7 = 11
t_7 = vine.Task(command)
t_7.set_cores(cores_t_7)
t_7.set_memory(1)
t_7.set_disk(1)
i_7 = m.submit(t_7)

while m.stats.tasks_running < 2:
m.wait(1)

cores_t_8 = 2
t_8 = vine.Task(command)
t_8.set_cores(cores_t_8)
i_8 = m.submit(t_8)

factor = 10
m.tune("hungry-minimum-factor", factor)

# avg cores waiting should be the limiting factor
# each task would get two cores, minus one task of the already waiting task
assert m.hungry() == (factor * worker_cores - cores_t_6 - cores_t_7) / cores_t_8 - 1

0 comments on commit e885f92

Please sign in to comment.