Skip to content

Commit

Permalink
WQ: Further mainloop optimizations (#3380)
Browse files Browse the repository at this point in the history
* use list rotate on expire

* store last waiting/retrieved tasks to quickly fetch them

* should fix python

* reduce overhead of get_stats and frequency of performance logging

* refactor receive_one_task

* fix logic

* itable iterate
  • Loading branch information
colinthomas-z80 authored Jun 21, 2023
1 parent 7315aad commit c2c386d
Showing 1 changed file with 83 additions and 39 deletions.
122 changes: 83 additions & 39 deletions work_queue/src/work_queue.c
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,8 @@ struct work_queue {
struct itable *tasks; // taskid -> task
struct itable *task_state_map; // taskid -> state
struct list *ready_list; // ready to be sent to a worker
struct work_queue_task *last_waiting_task;
struct work_queue_task *last_retrieved_task;

struct hash_table *worker_table;
struct hash_table *worker_blocklist;
Expand Down Expand Up @@ -486,7 +488,7 @@ static void log_queue_stats(struct work_queue *q, int force)
struct work_queue_stats s;

timestamp_t now = timestamp_get();
if(!force && (now - q->time_last_log_stats < ONE_SECOND)) {
if(!force && (now - q->time_last_log_stats < (ONE_SECOND*5))) {
return;
}

Expand Down Expand Up @@ -1911,6 +1913,7 @@ static void fetch_output_from_worker(struct work_queue *q, struct work_queue_wor

// At this point, a task is completed.
reap_task_from_worker(q, w, t, WORK_QUEUE_TASK_RETRIEVED);
q->last_retrieved_task = t;

w->finished_tasks--;
w->total_tasks_complete++;
Expand Down Expand Up @@ -1994,31 +1997,26 @@ static int expire_waiting_tasks(struct work_queue *q)
{
struct work_queue_task *t;
int expired = 0;
int count;

int tasks_considered = 0;
double current_time = timestamp_get() / ONE_SECOND;
count = task_state_count(q, NULL, WORK_QUEUE_TASK_READY);

while(count > 0)
{
count--;

t = list_pop_head(q->ready_list);

if(t->resources_requested->end > 0 && t->resources_requested->end <= current_time)
{
while( (t = list_rotate(q->ready_list)) ) {
if(tasks_considered > q->attempt_schedule_depth) {
return expired;
}
if(t->resources_requested->end > 0 && t->resources_requested->end <= current_time) {
update_task_result(t, WORK_QUEUE_RESULT_TASK_TIMEOUT);
change_task_state(q, t, WORK_QUEUE_TASK_RETRIEVED);
expired++;
list_pop_tail(q->ready_list);
} else if(t->max_retries > 0 && t->try_count > t->max_retries) {
update_task_result(t, WORK_QUEUE_RESULT_MAX_RETRIES);
change_task_state(q, t, WORK_QUEUE_TASK_RETRIEVED);
expired++;
} else {
list_push_tail(q->ready_list, t);
list_pop_tail(q->ready_list);
}
tasks_considered++;
}

return expired;
}

Expand Down Expand Up @@ -2321,7 +2319,7 @@ static work_queue_result_code_t get_result(struct work_queue *q, struct work_que
}

change_task_state(q, t, WORK_QUEUE_TASK_WAITING_RETRIEVAL);

q->last_waiting_task = t;
return WQ_SUCCESS;
}

Expand Down Expand Up @@ -4663,33 +4661,45 @@ static void print_large_tasks_warning(struct work_queue *q)
rmsummary_delete(largest_unfit_task);
}

static void trim_factory( struct work_queue *q, struct work_queue_worker *w )
{
struct work_queue_factory_info *f;
f = hash_table_lookup(q->factory_table, w->factory_name);
if ( f && f->connected_workers > f->max_workers &&
itable_size(w->current_tasks) < 1 ) {
debug(D_WQ, "Final task received from worker %s, shutting down.", w->hostname);
shut_down_worker(q, w);
}
}

static int receive_one_task( struct work_queue *q )
{
struct work_queue_task *t;

struct work_queue_worker *w;
uint64_t taskid;

itable_firstkey(q->tasks);
while( itable_nextkey(q->tasks, &taskid, (void **) &t) ) {
if( task_state_is(q, taskid, WORK_QUEUE_TASK_WAITING_RETRIEVAL) ) {
w = itable_lookup(q->worker_task_map, taskid);
fetch_output_from_worker(q, w, taskid);
// Shutdown worker if appropriate.
if ( w->factory_name ) {
struct work_queue_factory_info *f;
f = hash_table_lookup(q->factory_table, w->factory_name);
if ( f && f->connected_workers > f->max_workers &&
itable_size(w->current_tasks) < 1 ) {
debug(D_WQ, "Final task received from worker %s, shutting down.", w->hostname);
shut_down_worker(q, w);
}
t = q->last_waiting_task;

int found = 0;
if(!t) {
ITABLE_ITERATE(q->tasks, taskid, t) {
if( task_state_is(q, taskid, WORK_QUEUE_TASK_WAITING_RETRIEVAL) ) {
found = 1;
break;
}
return 1;
}
if(!found) return 0;
}

return 0;
w = itable_lookup(q->worker_task_map, t->taskid);
fetch_output_from_worker(q, w, t->taskid);

if ( w->factory_name ) {
trim_factory(q, w);
}

q->last_waiting_task = 0;
return 1;
}

//Sends keepalives to check if connected workers are responsive, and ask for updates If not, removes those workers.
Expand Down Expand Up @@ -5853,6 +5863,8 @@ struct work_queue *work_queue_ssl_create(int port, const char *key, const char *
q->ready_list = list_create();

q->tasks = itable_create(0);
q->last_waiting_task = NULL;
q->last_retrieved_task = NULL;

q->task_state_map = itable_create(0);

Expand Down Expand Up @@ -5934,7 +5946,7 @@ struct work_queue *work_queue_ssl_create(int port, const char *key, const char *
q->task_ordering = WORK_QUEUE_TASK_ORDER_FIFO;
//

log_queue_stats(q, 1);
log_queue_stats(q, 0);

q->time_last_wait = timestamp_get();

Expand Down Expand Up @@ -6929,11 +6941,16 @@ struct work_queue_task *work_queue_wait_internal(struct work_queue *q, int timeo
while( (stoptime == 0) || (time(0) < stoptime) ) {

BEGIN_ACCUM_TIME(q, time_internal);

// task completed?
if (t == NULL)
{
if(tag) {
t = task_state_any_with_tag(q, WORK_QUEUE_TASK_RETRIEVED, tag);
} else if(q->last_retrieved_task) {
t = q->last_retrieved_task;
q->last_retrieved_task = NULL;

} else {
t = task_state_any(q, WORK_QUEUE_TASK_RETRIEVED);
}
Expand Down Expand Up @@ -7065,7 +7082,7 @@ struct work_queue_task *work_queue_wait_internal(struct work_queue *q, int timeo
// in this wait.
if(events > 0) {
BEGIN_ACCUM_TIME(q, time_internal);
int done = !task_state_any(q, WORK_QUEUE_TASK_RUNNING) && !task_state_any(q, WORK_QUEUE_TASK_READY) && !task_state_any(q, WORK_QUEUE_TASK_WAITING_RETRIEVAL) && !(foreman_uplink);
int done = !(task_state_any(q, WORK_QUEUE_TASK_READY) || task_state_any(q, WORK_QUEUE_TASK_RUNNING) || task_state_any(q, WORK_QUEUE_TASK_WAITING_RETRIEVAL) || (foreman_uplink));
END_ACCUM_TIME(q, time_internal);

if(done) {
Expand All @@ -7087,9 +7104,10 @@ struct work_queue_task *work_queue_wait_internal(struct work_queue *q, int timeo
}

if(events > 0) {
log_queue_stats(q, 1);
log_queue_stats(q, 0);
}


q->time_last_wait = timestamp_get();

return t;
Expand Down Expand Up @@ -7500,9 +7518,35 @@ void work_queue_get_stats(struct work_queue *q, struct work_queue_stats *s)
// s->workers_able computed below.

//info about tasks
s->tasks_waiting = task_state_count(q, NULL, WORK_QUEUE_TASK_READY);
s->tasks_with_results = task_state_count(q, NULL, WORK_QUEUE_TASK_WAITING_RETRIEVAL);
s->tasks_on_workers = task_state_count(q, NULL, WORK_QUEUE_TASK_RUNNING) + s->tasks_with_results;
struct work_queue_task *t;
uint64_t taskid;

int ready_tasks = 0;
int waiting_tasks = 0;
int running_tasks = 0;

itable_firstkey(q->tasks);
while( itable_nextkey(q->tasks, &taskid, (void **) &t) ) {
int state = (long)itable_lookup(q->task_state_map, taskid);
switch (state)
{
case WORK_QUEUE_TASK_READY:
ready_tasks++;
break;
case WORK_QUEUE_TASK_WAITING_RETRIEVAL:
waiting_tasks++;
break;
case WORK_QUEUE_TASK_RUNNING:
running_tasks++;
break;
default:
break;
}
}

s->tasks_waiting = ready_tasks;
s->tasks_with_results = waiting_tasks;
s->tasks_on_workers = running_tasks + s->tasks_with_results;

{
//accumulate tasks running, from workers:
Expand Down

0 comments on commit c2c386d

Please sign in to comment.