Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WQ: Further mainloop optimizations #3380

Merged
merged 7 commits into from
Jun 21, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 83 additions & 39 deletions work_queue/src/work_queue.c
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,8 @@ struct work_queue {
struct itable *tasks; // taskid -> task
struct itable *task_state_map; // taskid -> state
struct list *ready_list; // ready to be sent to a worker
struct work_queue_task *last_waiting_task;
struct work_queue_task *last_retrieved_task;

struct hash_table *worker_table;
struct hash_table *worker_blocklist;
Expand Down Expand Up @@ -486,7 +488,7 @@ static void log_queue_stats(struct work_queue *q, int force)
struct work_queue_stats s;

timestamp_t now = timestamp_get();
if(!force && (now - q->time_last_log_stats < ONE_SECOND)) {
if(!force && (now - q->time_last_log_stats < (ONE_SECOND*5))) {
return;
}

Expand Down Expand Up @@ -1911,6 +1913,7 @@ static void fetch_output_from_worker(struct work_queue *q, struct work_queue_wor

// At this point, a task is completed.
reap_task_from_worker(q, w, t, WORK_QUEUE_TASK_RETRIEVED);
q->last_retrieved_task = t;

w->finished_tasks--;
w->total_tasks_complete++;
Expand Down Expand Up @@ -1994,31 +1997,26 @@ static int expire_waiting_tasks(struct work_queue *q)
{
struct work_queue_task *t;
int expired = 0;
int count;

int tasks_considered = 0;
double current_time = timestamp_get() / ONE_SECOND;
count = task_state_count(q, NULL, WORK_QUEUE_TASK_READY);

while(count > 0)
{
count--;

t = list_pop_head(q->ready_list);

if(t->resources_requested->end > 0 && t->resources_requested->end <= current_time)
{
while( (t = list_rotate(q->ready_list)) ) {
if(tasks_considered > q->attempt_schedule_depth) {
return expired;
}
if(t->resources_requested->end > 0 && t->resources_requested->end <= current_time) {
update_task_result(t, WORK_QUEUE_RESULT_TASK_TIMEOUT);
change_task_state(q, t, WORK_QUEUE_TASK_RETRIEVED);
expired++;
list_pop_tail(q->ready_list);
} else if(t->max_retries > 0 && t->try_count > t->max_retries) {
update_task_result(t, WORK_QUEUE_RESULT_MAX_RETRIES);
change_task_state(q, t, WORK_QUEUE_TASK_RETRIEVED);
expired++;
} else {
list_push_tail(q->ready_list, t);
list_pop_tail(q->ready_list);
}
tasks_considered++;
}

return expired;
}

Expand Down Expand Up @@ -2321,7 +2319,7 @@ static work_queue_result_code_t get_result(struct work_queue *q, struct work_que
}

change_task_state(q, t, WORK_QUEUE_TASK_WAITING_RETRIEVAL);

q->last_waiting_task = t;
return WQ_SUCCESS;
}

Expand Down Expand Up @@ -4663,33 +4661,45 @@ static void print_large_tasks_warning(struct work_queue *q)
rmsummary_delete(largest_unfit_task);
}

static void trim_factory( struct work_queue *q, struct work_queue_worker *w )
{
struct work_queue_factory_info *f;
f = hash_table_lookup(q->factory_table, w->factory_name);
if ( f && f->connected_workers > f->max_workers &&
itable_size(w->current_tasks) < 1 ) {
debug(D_WQ, "Final task received from worker %s, shutting down.", w->hostname);
shut_down_worker(q, w);
}
}

static int receive_one_task( struct work_queue *q )
{
struct work_queue_task *t;

struct work_queue_worker *w;
uint64_t taskid;

itable_firstkey(q->tasks);
while( itable_nextkey(q->tasks, &taskid, (void **) &t) ) {
if( task_state_is(q, taskid, WORK_QUEUE_TASK_WAITING_RETRIEVAL) ) {
w = itable_lookup(q->worker_task_map, taskid);
fetch_output_from_worker(q, w, taskid);
// Shutdown worker if appropriate.
if ( w->factory_name ) {
struct work_queue_factory_info *f;
f = hash_table_lookup(q->factory_table, w->factory_name);
if ( f && f->connected_workers > f->max_workers &&
itable_size(w->current_tasks) < 1 ) {
debug(D_WQ, "Final task received from worker %s, shutting down.", w->hostname);
shut_down_worker(q, w);
}
t = q->last_waiting_task;

dthain marked this conversation as resolved.
Show resolved Hide resolved
int found = 0;
if(!t) {
ITABLE_ITERATE(q->tasks, taskid, t) {
if( task_state_is(q, taskid, WORK_QUEUE_TASK_WAITING_RETRIEVAL) ) {
found = 1;
break;
colinthomas-z80 marked this conversation as resolved.
Show resolved Hide resolved
}
return 1;
}
if(!found) return 0;
}

return 0;
w = itable_lookup(q->worker_task_map, t->taskid);
fetch_output_from_worker(q, w, t->taskid);

if ( w->factory_name ) {
trim_factory(q, w);
}

q->last_waiting_task = 0;
return 1;
}

//Sends keepalives to check if connected workers are responsive, and ask for updates If not, removes those workers.
Expand Down Expand Up @@ -5853,6 +5863,8 @@ struct work_queue *work_queue_ssl_create(int port, const char *key, const char *
q->ready_list = list_create();

q->tasks = itable_create(0);
q->last_waiting_task = NULL;
q->last_retrieved_task = NULL;

q->task_state_map = itable_create(0);

Expand Down Expand Up @@ -5934,7 +5946,7 @@ struct work_queue *work_queue_ssl_create(int port, const char *key, const char *
q->task_ordering = WORK_QUEUE_TASK_ORDER_FIFO;
//

log_queue_stats(q, 1);
log_queue_stats(q, 0);

q->time_last_wait = timestamp_get();

Expand Down Expand Up @@ -6929,11 +6941,16 @@ struct work_queue_task *work_queue_wait_internal(struct work_queue *q, int timeo
while( (stoptime == 0) || (time(0) < stoptime) ) {

BEGIN_ACCUM_TIME(q, time_internal);

// task completed?
if (t == NULL)
{
if(tag) {
t = task_state_any_with_tag(q, WORK_QUEUE_TASK_RETRIEVED, tag);
} else if(q->last_retrieved_task) {
t = q->last_retrieved_task;
q->last_retrieved_task = NULL;

} else {
t = task_state_any(q, WORK_QUEUE_TASK_RETRIEVED);
}
Expand Down Expand Up @@ -7065,7 +7082,7 @@ struct work_queue_task *work_queue_wait_internal(struct work_queue *q, int timeo
// in this wait.
if(events > 0) {
BEGIN_ACCUM_TIME(q, time_internal);
int done = !task_state_any(q, WORK_QUEUE_TASK_RUNNING) && !task_state_any(q, WORK_QUEUE_TASK_READY) && !task_state_any(q, WORK_QUEUE_TASK_WAITING_RETRIEVAL) && !(foreman_uplink);
int done = !(task_state_any(q, WORK_QUEUE_TASK_READY) || task_state_any(q, WORK_QUEUE_TASK_RUNNING) || task_state_any(q, WORK_QUEUE_TASK_WAITING_RETRIEVAL) || (foreman_uplink));
END_ACCUM_TIME(q, time_internal);

if(done) {
Expand All @@ -7087,9 +7104,10 @@ struct work_queue_task *work_queue_wait_internal(struct work_queue *q, int timeo
}

if(events > 0) {
log_queue_stats(q, 1);
log_queue_stats(q, 0);
}


q->time_last_wait = timestamp_get();

return t;
Expand Down Expand Up @@ -7500,9 +7518,35 @@ void work_queue_get_stats(struct work_queue *q, struct work_queue_stats *s)
// s->workers_able computed below.

//info about tasks
s->tasks_waiting = task_state_count(q, NULL, WORK_QUEUE_TASK_READY);
s->tasks_with_results = task_state_count(q, NULL, WORK_QUEUE_TASK_WAITING_RETRIEVAL);
s->tasks_on_workers = task_state_count(q, NULL, WORK_QUEUE_TASK_RUNNING) + s->tasks_with_results;
struct work_queue_task *t;
uint64_t taskid;

int ready_tasks = 0;
int waiting_tasks = 0;
int running_tasks = 0;

itable_firstkey(q->tasks);
while( itable_nextkey(q->tasks, &taskid, (void **) &t) ) {
int state = (long)itable_lookup(q->task_state_map, taskid);
switch (state)
{
case WORK_QUEUE_TASK_READY:
ready_tasks++;
break;
case WORK_QUEUE_TASK_WAITING_RETRIEVAL:
waiting_tasks++;
break;
case WORK_QUEUE_TASK_RUNNING:
running_tasks++;
break;
default:
break;
}
}

s->tasks_waiting = ready_tasks;
s->tasks_with_results = waiting_tasks;
s->tasks_on_workers = running_tasks + s->tasks_with_results;

{
//accumulate tasks running, from workers:
Expand Down