From c2c386d463f71dcf7300afe7b432dbbd8d885d48 Mon Sep 17 00:00:00 2001 From: Colin Thomas <33940547+colinthomas-z80@users.noreply.github.com> Date: Wed, 21 Jun 2023 15:34:13 -0400 Subject: [PATCH] WQ: Further mainloop optimizations (#3380) * use list rotate on expire * store last waiting/retrieved tasks to quickly fetch them * should fix python * reduce overhead of get_stats and frequency of performance logging * refactor receive_one_task * fix logic * itable iterate --- work_queue/src/work_queue.c | 122 ++++++++++++++++++++++++------------ 1 file changed, 83 insertions(+), 39 deletions(-) diff --git a/work_queue/src/work_queue.c b/work_queue/src/work_queue.c index 40ccba200c..10dfc27f68 100644 --- a/work_queue/src/work_queue.c +++ b/work_queue/src/work_queue.c @@ -160,6 +160,8 @@ struct work_queue { struct itable *tasks; // taskid -> task struct itable *task_state_map; // taskid -> state struct list *ready_list; // ready to be sent to a worker + struct work_queue_task *last_waiting_task; + struct work_queue_task *last_retrieved_task; struct hash_table *worker_table; struct hash_table *worker_blocklist; @@ -486,7 +488,7 @@ static void log_queue_stats(struct work_queue *q, int force) struct work_queue_stats s; timestamp_t now = timestamp_get(); - if(!force && (now - q->time_last_log_stats < ONE_SECOND)) { + if(!force && (now - q->time_last_log_stats < (ONE_SECOND*5))) { return; } @@ -1911,6 +1913,7 @@ static void fetch_output_from_worker(struct work_queue *q, struct work_queue_wor // At this point, a task is completed. reap_task_from_worker(q, w, t, WORK_QUEUE_TASK_RETRIEVED); + q->last_retrieved_task = t; w->finished_tasks--; w->total_tasks_complete++; @@ -1994,31 +1997,26 @@ static int expire_waiting_tasks(struct work_queue *q) { struct work_queue_task *t; int expired = 0; - int count; + int tasks_considered = 0; double current_time = timestamp_get() / ONE_SECOND; - count = task_state_count(q, NULL, WORK_QUEUE_TASK_READY); - - while(count > 0) - { - count--; - - t = list_pop_head(q->ready_list); - - if(t->resources_requested->end > 0 && t->resources_requested->end <= current_time) - { + while( (t = list_rotate(q->ready_list)) ) { + if(tasks_considered > q->attempt_schedule_depth) { + return expired; + } + if(t->resources_requested->end > 0 && t->resources_requested->end <= current_time) { update_task_result(t, WORK_QUEUE_RESULT_TASK_TIMEOUT); change_task_state(q, t, WORK_QUEUE_TASK_RETRIEVED); expired++; + list_pop_tail(q->ready_list); } else if(t->max_retries > 0 && t->try_count > t->max_retries) { update_task_result(t, WORK_QUEUE_RESULT_MAX_RETRIES); change_task_state(q, t, WORK_QUEUE_TASK_RETRIEVED); expired++; - } else { - list_push_tail(q->ready_list, t); + list_pop_tail(q->ready_list); } + tasks_considered++; } - return expired; } @@ -2321,7 +2319,7 @@ static work_queue_result_code_t get_result(struct work_queue *q, struct work_que } change_task_state(q, t, WORK_QUEUE_TASK_WAITING_RETRIEVAL); - + q->last_waiting_task = t; return WQ_SUCCESS; } @@ -4663,33 +4661,45 @@ static void print_large_tasks_warning(struct work_queue *q) rmsummary_delete(largest_unfit_task); } +static void trim_factory( struct work_queue *q, struct work_queue_worker *w ) +{ + struct work_queue_factory_info *f; + f = hash_table_lookup(q->factory_table, w->factory_name); + if ( f && f->connected_workers > f->max_workers && + itable_size(w->current_tasks) < 1 ) { + debug(D_WQ, "Final task received from worker %s, shutting down.", w->hostname); + shut_down_worker(q, w); + } +} + static int receive_one_task( struct work_queue *q ) { struct work_queue_task *t; - struct work_queue_worker *w; uint64_t taskid; - itable_firstkey(q->tasks); - while( itable_nextkey(q->tasks, &taskid, (void **) &t) ) { - if( task_state_is(q, taskid, WORK_QUEUE_TASK_WAITING_RETRIEVAL) ) { - w = itable_lookup(q->worker_task_map, taskid); - fetch_output_from_worker(q, w, taskid); - // Shutdown worker if appropriate. - if ( w->factory_name ) { - struct work_queue_factory_info *f; - f = hash_table_lookup(q->factory_table, w->factory_name); - if ( f && f->connected_workers > f->max_workers && - itable_size(w->current_tasks) < 1 ) { - debug(D_WQ, "Final task received from worker %s, shutting down.", w->hostname); - shut_down_worker(q, w); - } + t = q->last_waiting_task; + + int found = 0; + if(!t) { + ITABLE_ITERATE(q->tasks, taskid, t) { + if( task_state_is(q, taskid, WORK_QUEUE_TASK_WAITING_RETRIEVAL) ) { + found = 1; + break; } - return 1; } + if(!found) return 0; } - return 0; + w = itable_lookup(q->worker_task_map, t->taskid); + fetch_output_from_worker(q, w, t->taskid); + + if ( w->factory_name ) { + trim_factory(q, w); + } + + q->last_waiting_task = 0; + return 1; } //Sends keepalives to check if connected workers are responsive, and ask for updates If not, removes those workers. @@ -5853,6 +5863,8 @@ struct work_queue *work_queue_ssl_create(int port, const char *key, const char * q->ready_list = list_create(); q->tasks = itable_create(0); + q->last_waiting_task = NULL; + q->last_retrieved_task = NULL; q->task_state_map = itable_create(0); @@ -5934,7 +5946,7 @@ struct work_queue *work_queue_ssl_create(int port, const char *key, const char * q->task_ordering = WORK_QUEUE_TASK_ORDER_FIFO; // - log_queue_stats(q, 1); + log_queue_stats(q, 0); q->time_last_wait = timestamp_get(); @@ -6929,11 +6941,16 @@ struct work_queue_task *work_queue_wait_internal(struct work_queue *q, int timeo while( (stoptime == 0) || (time(0) < stoptime) ) { BEGIN_ACCUM_TIME(q, time_internal); + // task completed? if (t == NULL) { if(tag) { t = task_state_any_with_tag(q, WORK_QUEUE_TASK_RETRIEVED, tag); + } else if(q->last_retrieved_task) { + t = q->last_retrieved_task; + q->last_retrieved_task = NULL; + } else { t = task_state_any(q, WORK_QUEUE_TASK_RETRIEVED); } @@ -7065,7 +7082,7 @@ struct work_queue_task *work_queue_wait_internal(struct work_queue *q, int timeo // in this wait. if(events > 0) { BEGIN_ACCUM_TIME(q, time_internal); - int done = !task_state_any(q, WORK_QUEUE_TASK_RUNNING) && !task_state_any(q, WORK_QUEUE_TASK_READY) && !task_state_any(q, WORK_QUEUE_TASK_WAITING_RETRIEVAL) && !(foreman_uplink); + int done = !(task_state_any(q, WORK_QUEUE_TASK_READY) || task_state_any(q, WORK_QUEUE_TASK_RUNNING) || task_state_any(q, WORK_QUEUE_TASK_WAITING_RETRIEVAL) || (foreman_uplink)); END_ACCUM_TIME(q, time_internal); if(done) { @@ -7087,9 +7104,10 @@ struct work_queue_task *work_queue_wait_internal(struct work_queue *q, int timeo } if(events > 0) { - log_queue_stats(q, 1); + log_queue_stats(q, 0); } + q->time_last_wait = timestamp_get(); return t; @@ -7500,9 +7518,35 @@ void work_queue_get_stats(struct work_queue *q, struct work_queue_stats *s) // s->workers_able computed below. //info about tasks - s->tasks_waiting = task_state_count(q, NULL, WORK_QUEUE_TASK_READY); - s->tasks_with_results = task_state_count(q, NULL, WORK_QUEUE_TASK_WAITING_RETRIEVAL); - s->tasks_on_workers = task_state_count(q, NULL, WORK_QUEUE_TASK_RUNNING) + s->tasks_with_results; + struct work_queue_task *t; + uint64_t taskid; + + int ready_tasks = 0; + int waiting_tasks = 0; + int running_tasks = 0; + + itable_firstkey(q->tasks); + while( itable_nextkey(q->tasks, &taskid, (void **) &t) ) { + int state = (long)itable_lookup(q->task_state_map, taskid); + switch (state) + { + case WORK_QUEUE_TASK_READY: + ready_tasks++; + break; + case WORK_QUEUE_TASK_WAITING_RETRIEVAL: + waiting_tasks++; + break; + case WORK_QUEUE_TASK_RUNNING: + running_tasks++; + break; + default: + break; + } + } + + s->tasks_waiting = ready_tasks; + s->tasks_with_results = waiting_tasks; + s->tasks_on_workers = running_tasks + s->tasks_with_results; { //accumulate tasks running, from workers: