Skip to content

Commit

Permalink
vine: mainloop improvements (#3387)
Browse files Browse the repository at this point in the history
* reduce logging frequency and overhead

* merge with prune fix

* compiler

* clean up lists

* more likely short circuit

* rebase

* corrections

* compiler warnings

* add running list

* delete list

* remove before changing state

* add running itable, fix issues
  • Loading branch information
colinthomas-z80 authored Aug 14, 2023
1 parent 6d74e3f commit 0dc8e30
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 35 deletions.
98 changes: 64 additions & 34 deletions taskvine/src/manager/vine_manager.c
Original file line number Diff line number Diff line change
Expand Up @@ -1166,31 +1166,28 @@ static int expire_waiting_tasks(struct vine_manager *q)
{
struct vine_task *t;
int expired = 0;
int count;


int tasks_considered = 0;
double current_time = timestamp_get() / ONE_SECOND;
count = task_state_count(q, NULL, VINE_TASK_READY);

while(count > 0)
while( (t = list_rotate(q->ready_list)) )
{
count--;

t = list_pop_head(q->ready_list);

if(tasks_considered > q->attempt_schedule_depth) {
return expired;
}
if(t->resources_requested->end > 0 && t->resources_requested->end <= current_time)
{
vine_task_set_result(t, VINE_RESULT_MAX_END_TIME);
list_remove(q->ready_list, t);
change_task_state(q, t, VINE_TASK_RETRIEVED);
expired++;
} else if(t->max_retries > 0 && t->try_count > t->max_retries) {
vine_task_set_result(t, VINE_RESULT_MAX_RETRIES);
list_remove(q->ready_list, t);
change_task_state(q, t, VINE_TASK_RETRIEVED);
expired++;
} else {
list_push_tail(q->ready_list, t);
}
}
tasks_considered++;
}

return expired;
}

Expand Down Expand Up @@ -1523,7 +1520,8 @@ static vine_result_code_t get_result(struct vine_manager *q, struct vine_worker_
vine_task_set_result(t, VINE_RESULT_MAX_END_TIME);
}
}


itable_remove(q->running_table, t->task_id);
change_task_state(q, t, VINE_TASK_WAITING_RETRIEVAL);

return VINE_SUCCESS;
Expand Down Expand Up @@ -2609,6 +2607,19 @@ static void reap_task_from_worker(struct vine_manager *q, struct vine_worker_inf

t->worker = 0;

switch(t->state)
{
case VINE_TASK_RUNNING:
itable_remove(q->running_table, t->task_id);
break;
case VINE_TASK_WAITING_RETRIEVAL:
list_remove(q->waiting_retrieval_list, t);
break;
default:
assert(t->state > VINE_TASK_READY);
break;
}

change_task_state(q, t, new_state);

count_worker_resources(q, w);
Expand Down Expand Up @@ -2829,7 +2840,7 @@ static int send_one_task( struct vine_manager *q )


// Otherwise, remove it from the ready list and start it:
list_pop_tail(q->ready_list);
list_pop_tail(q->ready_list);
commit_task_to_worker(q,w,t);
return 1;
}
Expand Down Expand Up @@ -2918,20 +2929,16 @@ and mark it as done.
static int receive_one_task( struct vine_manager *q )
{
struct vine_task *t;
uint64_t task_id;

ITABLE_ITERATE(q->tasks,task_id,t) {
if( t->state==VINE_TASK_WAITING_RETRIEVAL ) {
struct vine_worker_info *w = t->worker;

/* Attempt to fetch from this worker. */
if(fetch_output_from_worker(q, w, task_id)) {
/* If we got one, then we are done. */
prune_worker(q, w);
return 1;
} else {
/* But if not, the worker pointer is no longer valid. */
}
if((t = list_peek_head(q->waiting_retrieval_list))) {
struct vine_worker_info *w = t->worker;
/* Attempt to fetch from this worker. */
if(fetch_output_from_worker(q, w, t->task_id)) {
/* If we got one, then we are done. */
prune_worker(q, w);
return 1;
} else {
/* But if not, the worker pointer is no longer valid. */
}
}

Expand Down Expand Up @@ -3261,6 +3268,9 @@ struct vine_manager *vine_ssl_create(int port, const char *key, const char *cert
q->next_task_id = 1;

q->ready_list = list_create();
q->running_table = itable_create(0);
q->waiting_retrieval_list = list_create();
q->retrieved_list = list_create();

q->tasks = itable_create(0);
q->libraries = hash_table_create(0, 0);
Expand Down Expand Up @@ -3333,6 +3343,7 @@ struct vine_manager *vine_ssl_create(int port, const char *key, const char *cert

q->file_source_max_transfers = VINE_FILE_SOURCE_MAX_TRANSFERS;
q->worker_source_max_transfers = VINE_WORKER_SOURCE_MAX_TRANSFERS;
q->perf_log_interval = VINE_PERF_LOG_INTERVAL;

q->resource_submit_multiplier = 1.0;

Expand Down Expand Up @@ -3559,6 +3570,9 @@ void vine_delete(struct vine_manager *q)
hash_table_delete(q->categories);

list_delete(q->ready_list);
itable_delete(q->running_table);
list_delete(q->waiting_retrieval_list);
list_delete(q->retrieved_list);
hash_table_delete(q->libraries);
hash_table_delete(q->workers_with_available_results);

Expand Down Expand Up @@ -3746,6 +3760,15 @@ static vine_task_state_t change_task_state( struct vine_manager *q, struct vine_
vine_task_set_result(t, VINE_RESULT_UNKNOWN);
push_task_to_ready_list(q, t);
break;
case VINE_TASK_RUNNING:
itable_insert(q->running_table, t->task_id, t);
break;
case VINE_TASK_WAITING_RETRIEVAL:
list_push_head(q->waiting_retrieval_list, t);
break;
case VINE_TASK_RETRIEVED:
list_push_head(q->retrieved_list, t);
break;
case VINE_TASK_DONE:
case VINE_TASK_CANCELED:
/* Task was cloned when entered into our own table, so delete a reference on removal. */
Expand Down Expand Up @@ -4233,8 +4256,8 @@ static struct vine_task *vine_wait_internal(struct vine_manager *q, int timeout,
if(temp->state==VINE_TASK_RETRIEVED) {
t = temp;
}
} else {
t = task_state_any(q, VINE_TASK_RETRIEVED);
} else if((t = list_peek_head(q->retrieved_list))) {
list_pop_head(q->retrieved_list);
}

if(t) {
Expand Down Expand Up @@ -4404,7 +4427,7 @@ static struct vine_task *vine_wait_internal(struct vine_manager *q, int timeout,
if(events > 0) {
if(task_state_any(q, VINE_TASK_RETRIEVED) && t == NULL) continue;
BEGIN_ACCUM_TIME(q, time_internal);
int done = !task_state_any(q, VINE_TASK_RUNNING) && !task_state_any(q, VINE_TASK_READY) && !task_state_any(q, VINE_TASK_WAITING_RETRIEVAL);
int done = !list_size(q->ready_list) && !list_size(q->waiting_retrieval_list) && !itable_size(q->running_table);
END_ACCUM_TIME(q, time_internal);

if(done) {
Expand Down Expand Up @@ -4740,6 +4763,9 @@ int vine_tune(struct vine_manager *q, const char *name, double value)
} else if(!strcmp(name, "worker-source-max-transfers")){
q->worker_source_max_transfers = MAX(1, (int)value);

} else if(!strcmp(name, "perf-log-interval")){
q->perf_log_interval = MAX(1, (int)value);

} else if(!strcmp(name, "monitor-interval")) {
/* 0 means use monitor's default */
q->monitor_interval = MAX(0, (int)value);
Expand Down Expand Up @@ -4787,10 +4813,14 @@ void vine_get_stats(struct vine_manager *q, struct vine_stats *s)
// s->workers_able computed below.

//info about tasks
s->tasks_waiting = task_state_count(q, NULL, VINE_TASK_READY);
s->tasks_with_results = task_state_count(q, NULL, VINE_TASK_WAITING_RETRIEVAL);
s->tasks_on_workers = task_state_count(q, NULL, VINE_TASK_RUNNING) + s->tasks_with_results;
int ready_tasks = list_size(q->ready_list);
int waiting_tasks = list_size(q->waiting_retrieval_list);
int running_tasks = itable_size(q->running_table);

s->tasks_waiting = ready_tasks;
s->tasks_with_results = waiting_tasks;
s->tasks_on_workers = running_tasks + s->tasks_with_results;

{
//accumulate tasks running, from workers:
char *key;
Expand Down
4 changes: 4 additions & 0 deletions taskvine/src/manager/vine_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,9 @@ struct vine_manager {

struct itable *tasks; /* Maps task_id -> vine_task of all tasks in any state. */
struct list *ready_list; /* List of vine_task that are waiting to execute. */
struct itable *running_table; /* Table of vine_task that are running at workers. */
struct list *waiting_retrieval_list; /* List of vine_task that are waiting to be retrieved. */
struct list *retrieved_list; /* List of vine_task that have been retrieved. */
struct list *task_info_list; /* List of last N vine_task_infos for computing capacity. */
struct hash_table *categories; /* Maps category_name -> struct category */
struct hash_table *libraries; /* Maps library name -> vine_task of library with that name. */
Expand Down Expand Up @@ -148,6 +151,7 @@ struct vine_manager {
FILE *perf_logfile; /* Performance logfile for tracking metrics by time. */
FILE *txn_logfile; /* Transaction logfile for recording every event of interest. */
FILE *graph_logfile; /* Graph logfile for visualizing application structure. */
int perf_log_interval; /* Minimum interval for performance log entries in seconds. */

/* Resource monitoring configuration. */

Expand Down
3 changes: 2 additions & 1 deletion taskvine/src/manager/vine_perf_log.c
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ See the file COPYING for details.

#include "vine_manager.h"

#include "vine_perf_log.h"
#include "timestamp.h"
#include "buffer.h"
#include "debug.h"
Expand Down Expand Up @@ -52,7 +53,7 @@ void vine_perf_log_write_update( struct vine_manager *q, int force )
struct vine_stats s;

timestamp_t now = timestamp_get();
if(!force && (now - q->time_last_log_stats < ONE_SECOND)) {
if(!force && ((now - q->time_last_log_stats) < (long unsigned int)(ONE_SECOND*q->perf_log_interval))) {
return;
}

Expand Down
2 changes: 2 additions & 0 deletions taskvine/src/manager/vine_perf_log.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ This module is private to the manager and should not be invoked by the end user.

#include "vine_manager.h"

#define VINE_PERF_LOG_INTERVAL 5

void vine_perf_log_write_header( struct vine_manager *q );
void vine_perf_log_write_update( struct vine_manager *q, int force );

Expand Down
2 changes: 2 additions & 0 deletions taskvine/src/tools/vine_profile_dispatch
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
# This software is distributed under the GNU General Public License.
# See the file COPYING for details.

# Plot the time spent matching tasks to workers through the information in the performance log

import sys
import matplotlib.pyplot as plt

Expand Down

0 comments on commit 0dc8e30

Please sign in to comment.