diff --git a/taskvine/src/manager/vine_manager.c b/taskvine/src/manager/vine_manager.c index da1b44e035..3549cc4f71 100644 --- a/taskvine/src/manager/vine_manager.c +++ b/taskvine/src/manager/vine_manager.c @@ -1166,31 +1166,28 @@ static int expire_waiting_tasks(struct vine_manager *q) { struct vine_task *t; int expired = 0; - int count; - + + int tasks_considered = 0; double current_time = timestamp_get() / ONE_SECOND; - count = task_state_count(q, NULL, VINE_TASK_READY); - - while(count > 0) + while( (t = list_rotate(q->ready_list)) ) { - count--; - - t = list_pop_head(q->ready_list); - + if(tasks_considered > q->attempt_schedule_depth) { + return expired; + } if(t->resources_requested->end > 0 && t->resources_requested->end <= current_time) { vine_task_set_result(t, VINE_RESULT_MAX_END_TIME); + list_remove(q->ready_list, t); change_task_state(q, t, VINE_TASK_RETRIEVED); expired++; } else if(t->max_retries > 0 && t->try_count > t->max_retries) { vine_task_set_result(t, VINE_RESULT_MAX_RETRIES); + list_remove(q->ready_list, t); change_task_state(q, t, VINE_TASK_RETRIEVED); expired++; - } else { - list_push_tail(q->ready_list, t); - } + } + tasks_considered++; } - return expired; } @@ -1523,7 +1520,8 @@ static vine_result_code_t get_result(struct vine_manager *q, struct vine_worker_ vine_task_set_result(t, VINE_RESULT_MAX_END_TIME); } } - + + itable_remove(q->running_table, t->task_id); change_task_state(q, t, VINE_TASK_WAITING_RETRIEVAL); return VINE_SUCCESS; @@ -2609,6 +2607,19 @@ static void reap_task_from_worker(struct vine_manager *q, struct vine_worker_inf t->worker = 0; + switch(t->state) + { + case VINE_TASK_RUNNING: + itable_remove(q->running_table, t->task_id); + break; + case VINE_TASK_WAITING_RETRIEVAL: + list_remove(q->waiting_retrieval_list, t); + break; + default: + assert(t->state > VINE_TASK_READY); + break; + } + change_task_state(q, t, new_state); count_worker_resources(q, w); @@ -2829,7 +2840,7 @@ static int send_one_task( struct vine_manager *q ) // Otherwise, remove it from the ready list and start it: - list_pop_tail(q->ready_list); + list_pop_tail(q->ready_list); commit_task_to_worker(q,w,t); return 1; } @@ -2918,20 +2929,16 @@ and mark it as done. static int receive_one_task( struct vine_manager *q ) { struct vine_task *t; - uint64_t task_id; - ITABLE_ITERATE(q->tasks,task_id,t) { - if( t->state==VINE_TASK_WAITING_RETRIEVAL ) { - struct vine_worker_info *w = t->worker; - - /* Attempt to fetch from this worker. */ - if(fetch_output_from_worker(q, w, task_id)) { - /* If we got one, then we are done. */ - prune_worker(q, w); - return 1; - } else { - /* But if not, the worker pointer is no longer valid. */ - } + if((t = list_peek_head(q->waiting_retrieval_list))) { + struct vine_worker_info *w = t->worker; + /* Attempt to fetch from this worker. */ + if(fetch_output_from_worker(q, w, t->task_id)) { + /* If we got one, then we are done. */ + prune_worker(q, w); + return 1; + } else { + /* But if not, the worker pointer is no longer valid. */ } } @@ -3261,6 +3268,9 @@ struct vine_manager *vine_ssl_create(int port, const char *key, const char *cert q->next_task_id = 1; q->ready_list = list_create(); + q->running_table = itable_create(0); + q->waiting_retrieval_list = list_create(); + q->retrieved_list = list_create(); q->tasks = itable_create(0); q->libraries = hash_table_create(0, 0); @@ -3333,6 +3343,7 @@ struct vine_manager *vine_ssl_create(int port, const char *key, const char *cert q->file_source_max_transfers = VINE_FILE_SOURCE_MAX_TRANSFERS; q->worker_source_max_transfers = VINE_WORKER_SOURCE_MAX_TRANSFERS; + q->perf_log_interval = VINE_PERF_LOG_INTERVAL; q->resource_submit_multiplier = 1.0; @@ -3559,6 +3570,9 @@ void vine_delete(struct vine_manager *q) hash_table_delete(q->categories); list_delete(q->ready_list); + itable_delete(q->running_table); + list_delete(q->waiting_retrieval_list); + list_delete(q->retrieved_list); hash_table_delete(q->libraries); hash_table_delete(q->workers_with_available_results); @@ -3746,6 +3760,15 @@ static vine_task_state_t change_task_state( struct vine_manager *q, struct vine_ vine_task_set_result(t, VINE_RESULT_UNKNOWN); push_task_to_ready_list(q, t); break; + case VINE_TASK_RUNNING: + itable_insert(q->running_table, t->task_id, t); + break; + case VINE_TASK_WAITING_RETRIEVAL: + list_push_head(q->waiting_retrieval_list, t); + break; + case VINE_TASK_RETRIEVED: + list_push_head(q->retrieved_list, t); + break; case VINE_TASK_DONE: case VINE_TASK_CANCELED: /* Task was cloned when entered into our own table, so delete a reference on removal. */ @@ -4233,8 +4256,8 @@ static struct vine_task *vine_wait_internal(struct vine_manager *q, int timeout, if(temp->state==VINE_TASK_RETRIEVED) { t = temp; } - } else { - t = task_state_any(q, VINE_TASK_RETRIEVED); + } else if((t = list_peek_head(q->retrieved_list))) { + list_pop_head(q->retrieved_list); } if(t) { @@ -4404,7 +4427,7 @@ static struct vine_task *vine_wait_internal(struct vine_manager *q, int timeout, if(events > 0) { if(task_state_any(q, VINE_TASK_RETRIEVED) && t == NULL) continue; BEGIN_ACCUM_TIME(q, time_internal); - int done = !task_state_any(q, VINE_TASK_RUNNING) && !task_state_any(q, VINE_TASK_READY) && !task_state_any(q, VINE_TASK_WAITING_RETRIEVAL); + int done = !list_size(q->ready_list) && !list_size(q->waiting_retrieval_list) && !itable_size(q->running_table); END_ACCUM_TIME(q, time_internal); if(done) { @@ -4740,6 +4763,9 @@ int vine_tune(struct vine_manager *q, const char *name, double value) } else if(!strcmp(name, "worker-source-max-transfers")){ q->worker_source_max_transfers = MAX(1, (int)value); + } else if(!strcmp(name, "perf-log-interval")){ + q->perf_log_interval = MAX(1, (int)value); + } else if(!strcmp(name, "monitor-interval")) { /* 0 means use monitor's default */ q->monitor_interval = MAX(0, (int)value); @@ -4787,10 +4813,14 @@ void vine_get_stats(struct vine_manager *q, struct vine_stats *s) // s->workers_able computed below. //info about tasks - s->tasks_waiting = task_state_count(q, NULL, VINE_TASK_READY); - s->tasks_with_results = task_state_count(q, NULL, VINE_TASK_WAITING_RETRIEVAL); - s->tasks_on_workers = task_state_count(q, NULL, VINE_TASK_RUNNING) + s->tasks_with_results; + int ready_tasks = list_size(q->ready_list); + int waiting_tasks = list_size(q->waiting_retrieval_list); + int running_tasks = itable_size(q->running_table); + s->tasks_waiting = ready_tasks; + s->tasks_with_results = waiting_tasks; + s->tasks_on_workers = running_tasks + s->tasks_with_results; + { //accumulate tasks running, from workers: char *key; diff --git a/taskvine/src/manager/vine_manager.h b/taskvine/src/manager/vine_manager.h index c406eae8be..ac5c23dbd3 100644 --- a/taskvine/src/manager/vine_manager.h +++ b/taskvine/src/manager/vine_manager.h @@ -100,6 +100,9 @@ struct vine_manager { struct itable *tasks; /* Maps task_id -> vine_task of all tasks in any state. */ struct list *ready_list; /* List of vine_task that are waiting to execute. */ + struct itable *running_table; /* Table of vine_task that are running at workers. */ + struct list *waiting_retrieval_list; /* List of vine_task that are waiting to be retrieved. */ + struct list *retrieved_list; /* List of vine_task that have been retrieved. */ struct list *task_info_list; /* List of last N vine_task_infos for computing capacity. */ struct hash_table *categories; /* Maps category_name -> struct category */ struct hash_table *libraries; /* Maps library name -> vine_task of library with that name. */ @@ -148,6 +151,7 @@ struct vine_manager { FILE *perf_logfile; /* Performance logfile for tracking metrics by time. */ FILE *txn_logfile; /* Transaction logfile for recording every event of interest. */ FILE *graph_logfile; /* Graph logfile for visualizing application structure. */ + int perf_log_interval; /* Minimum interval for performance log entries in seconds. */ /* Resource monitoring configuration. */ diff --git a/taskvine/src/manager/vine_perf_log.c b/taskvine/src/manager/vine_perf_log.c index a5bb215573..e63c014408 100644 --- a/taskvine/src/manager/vine_perf_log.c +++ b/taskvine/src/manager/vine_perf_log.c @@ -6,6 +6,7 @@ See the file COPYING for details. #include "vine_manager.h" +#include "vine_perf_log.h" #include "timestamp.h" #include "buffer.h" #include "debug.h" @@ -52,7 +53,7 @@ void vine_perf_log_write_update( struct vine_manager *q, int force ) struct vine_stats s; timestamp_t now = timestamp_get(); - if(!force && (now - q->time_last_log_stats < ONE_SECOND)) { + if(!force && ((now - q->time_last_log_stats) < (long unsigned int)(ONE_SECOND*q->perf_log_interval))) { return; } diff --git a/taskvine/src/manager/vine_perf_log.h b/taskvine/src/manager/vine_perf_log.h index 29dd88f216..6437d77100 100644 --- a/taskvine/src/manager/vine_perf_log.h +++ b/taskvine/src/manager/vine_perf_log.h @@ -15,6 +15,8 @@ This module is private to the manager and should not be invoked by the end user. #include "vine_manager.h" +#define VINE_PERF_LOG_INTERVAL 5 + void vine_perf_log_write_header( struct vine_manager *q ); void vine_perf_log_write_update( struct vine_manager *q, int force ); diff --git a/taskvine/src/tools/vine_profile_dispatch b/taskvine/src/tools/vine_profile_dispatch index 23bce70814..56464f75f8 100755 --- a/taskvine/src/tools/vine_profile_dispatch +++ b/taskvine/src/tools/vine_profile_dispatch @@ -4,6 +4,8 @@ # This software is distributed under the GNU General Public License. # See the file COPYING for details. +# Plot the time spent matching tasks to workers through the information in the performance log + import sys import matplotlib.pyplot as plt