Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

vine: mainloop improvements #3387

Merged
merged 12 commits into from
Aug 14, 2023
98 changes: 64 additions & 34 deletions taskvine/src/manager/vine_manager.c
Original file line number Diff line number Diff line change
Expand Up @@ -1166,31 +1166,28 @@ static int expire_waiting_tasks(struct vine_manager *q)
{
struct vine_task *t;
int expired = 0;
int count;


int tasks_considered = 0;
double current_time = timestamp_get() / ONE_SECOND;
count = task_state_count(q, NULL, VINE_TASK_READY);

while(count > 0)
while( (t = list_rotate(q->ready_list)) )
{
count--;

t = list_pop_head(q->ready_list);

if(tasks_considered > q->attempt_schedule_depth) {
return expired;
}
if(t->resources_requested->end > 0 && t->resources_requested->end <= current_time)
{
vine_task_set_result(t, VINE_RESULT_MAX_END_TIME);
list_remove(q->ready_list, t);
change_task_state(q, t, VINE_TASK_RETRIEVED);
expired++;
} else if(t->max_retries > 0 && t->try_count > t->max_retries) {
vine_task_set_result(t, VINE_RESULT_MAX_RETRIES);
list_remove(q->ready_list, t);
change_task_state(q, t, VINE_TASK_RETRIEVED);
expired++;
} else {
list_push_tail(q->ready_list, t);
}
}
tasks_considered++;
}

return expired;
}

Expand All @@ -1212,6 +1209,7 @@ static int enforce_waiting_fixed_locations(struct vine_manager *q)
t = list_pop_head(q->ready_list);
if(t->has_fixed_locations && !vine_schedule_check_fixed_location(q, t)) {
vine_task_set_result(t, VINE_RESULT_FIXED_LOCATION_MISSING);
list_remove(q->ready_list, t);
dthain marked this conversation as resolved.
Show resolved Hide resolved
change_task_state(q, t, VINE_TASK_RETRIEVED);
terminated++;
} else {
Expand Down Expand Up @@ -1523,7 +1521,8 @@ static vine_result_code_t get_result(struct vine_manager *q, struct vine_worker_
vine_task_set_result(t, VINE_RESULT_MAX_END_TIME);
}
}


list_remove(q->running_list, t);
change_task_state(q, t, VINE_TASK_WAITING_RETRIEVAL);

return VINE_SUCCESS;
Expand Down Expand Up @@ -2609,6 +2608,18 @@ static void reap_task_from_worker(struct vine_manager *q, struct vine_worker_inf

t->worker = 0;

switch(t->state)
{
case VINE_TASK_RUNNING:
list_remove(q->running_list, t);
break;
case VINE_TASK_WAITING_RETRIEVAL:
list_remove(q->waiting_retrieval_list, t);
break;
default:
dthain marked this conversation as resolved.
Show resolved Hide resolved
break;
}

change_task_state(q, t, new_state);

count_worker_resources(q, w);
Expand Down Expand Up @@ -2829,7 +2840,7 @@ static int send_one_task( struct vine_manager *q )


// Otherwise, remove it from the ready list and start it:
list_pop_tail(q->ready_list);
list_pop_tail(q->ready_list);
commit_task_to_worker(q,w,t);
return 1;
}
Expand Down Expand Up @@ -2918,20 +2929,16 @@ and mark it as done.
static int receive_one_task( struct vine_manager *q )
{
struct vine_task *t;
uint64_t task_id;

ITABLE_ITERATE(q->tasks,task_id,t) {
if( t->state==VINE_TASK_WAITING_RETRIEVAL ) {
struct vine_worker_info *w = t->worker;

/* Attempt to fetch from this worker. */
if(fetch_output_from_worker(q, w, task_id)) {
/* If we got one, then we are done. */
prune_worker(q, w);
return 1;
} else {
/* But if not, the worker pointer is no longer valid. */
}
if((t = list_peek_head(q->waiting_retrieval_list))) {
struct vine_worker_info *w = t->worker;
/* Attempt to fetch from this worker. */
if(fetch_output_from_worker(q, w, t->task_id)) {
/* If we got one, then we are done. */
prune_worker(q, w);
return 1;
} else {
/* But if not, the worker pointer is no longer valid. */
}
}

Expand Down Expand Up @@ -3261,6 +3268,9 @@ struct vine_manager *vine_ssl_create(int port, const char *key, const char *cert
q->next_task_id = 1;

q->ready_list = list_create();
q->running_list = list_create();
dthain marked this conversation as resolved.
Show resolved Hide resolved
q->waiting_retrieval_list = list_create();
q->retrieved_list = list_create();

q->tasks = itable_create(0);
q->libraries = hash_table_create(0, 0);
Expand Down Expand Up @@ -3333,6 +3343,7 @@ struct vine_manager *vine_ssl_create(int port, const char *key, const char *cert

q->file_source_max_transfers = VINE_FILE_SOURCE_MAX_TRANSFERS;
q->worker_source_max_transfers = VINE_WORKER_SOURCE_MAX_TRANSFERS;
q->perf_log_interval = VINE_PERF_LOG_INTERVAL;

q->resource_submit_multiplier = 1.0;

Expand Down Expand Up @@ -3559,6 +3570,9 @@ void vine_delete(struct vine_manager *q)
hash_table_delete(q->categories);

list_delete(q->ready_list);
list_delete(q->running_list);
list_delete(q->waiting_retrieval_list);
list_delete(q->retrieved_list);
hash_table_delete(q->libraries);
hash_table_delete(q->workers_with_available_results);

Expand Down Expand Up @@ -3746,6 +3760,15 @@ static vine_task_state_t change_task_state( struct vine_manager *q, struct vine_
vine_task_set_result(t, VINE_RESULT_UNKNOWN);
dthain marked this conversation as resolved.
Show resolved Hide resolved
push_task_to_ready_list(q, t);
break;
case VINE_TASK_RUNNING:
list_push_head(q->running_list, t);
break;
case VINE_TASK_WAITING_RETRIEVAL:
list_push_head(q->waiting_retrieval_list, t);
break;
case VINE_TASK_RETRIEVED:
list_push_head(q->retrieved_list, t);
break;
case VINE_TASK_DONE:
case VINE_TASK_CANCELED:
/* Task was cloned when entered into our own table, so delete a reference on removal. */
Expand Down Expand Up @@ -4233,8 +4256,8 @@ static struct vine_task *vine_wait_internal(struct vine_manager *q, int timeout,
if(temp->state==VINE_TASK_RETRIEVED) {
t = temp;
}
} else {
t = task_state_any(q, VINE_TASK_RETRIEVED);
} else if((t = list_peek_head(q->retrieved_list))) {
dthain marked this conversation as resolved.
Show resolved Hide resolved
list_pop_head(q->retrieved_list);
}

if(t) {
Expand Down Expand Up @@ -4404,7 +4427,7 @@ static struct vine_task *vine_wait_internal(struct vine_manager *q, int timeout,
if(events > 0) {
if(task_state_any(q, VINE_TASK_RETRIEVED) && t == NULL) continue;
BEGIN_ACCUM_TIME(q, time_internal);
int done = !task_state_any(q, VINE_TASK_RUNNING) && !task_state_any(q, VINE_TASK_READY) && !task_state_any(q, VINE_TASK_WAITING_RETRIEVAL);
int done = !list_size(q->ready_list) && !list_size(q->waiting_retrieval_list) && !list_size(q->running_list);
END_ACCUM_TIME(q, time_internal);

if(done) {
Expand Down Expand Up @@ -4740,6 +4763,9 @@ int vine_tune(struct vine_manager *q, const char *name, double value)
} else if(!strcmp(name, "worker-source-max-transfers")){
q->worker_source_max_transfers = MAX(1, (int)value);

} else if(!strcmp(name, "perf-log-interval")){
q->perf_log_interval = MAX(1, (int)value);

} else if(!strcmp(name, "monitor-interval")) {
/* 0 means use monitor's default */
q->monitor_interval = MAX(0, (int)value);
Expand Down Expand Up @@ -4787,10 +4813,14 @@ void vine_get_stats(struct vine_manager *q, struct vine_stats *s)
// s->workers_able computed below.

//info about tasks
s->tasks_waiting = task_state_count(q, NULL, VINE_TASK_READY);
s->tasks_with_results = task_state_count(q, NULL, VINE_TASK_WAITING_RETRIEVAL);
s->tasks_on_workers = task_state_count(q, NULL, VINE_TASK_RUNNING) + s->tasks_with_results;
int ready_tasks = list_size(q->ready_list);
int waiting_tasks = list_size(q->waiting_retrieval_list);
int running_tasks = list_size(q->running_list);

s->tasks_waiting = ready_tasks;
s->tasks_with_results = waiting_tasks;
s->tasks_on_workers = running_tasks + s->tasks_with_results;

{
//accumulate tasks running, from workers:
char *key;
Expand Down
4 changes: 4 additions & 0 deletions taskvine/src/manager/vine_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,9 @@ struct vine_manager {

struct itable *tasks; /* Maps task_id -> vine_task of all tasks in any state. */
struct list *ready_list; /* List of vine_task that are waiting to execute. */
struct list *running_list; /* List of vine_task that are running at workers. */
struct list *waiting_retrieval_list; /* List of vine_task that are waiting to be retrieved. */
struct list *retrieved_list; /* List of vine_task that have been retrieved. */
struct list *task_info_list; /* List of last N vine_task_infos for computing capacity. */
struct hash_table *categories; /* Maps category_name -> struct category */
struct hash_table *libraries; /* Maps library name -> vine_task of library with that name. */
Expand Down Expand Up @@ -148,6 +151,7 @@ struct vine_manager {
FILE *perf_logfile; /* Performance logfile for tracking metrics by time. */
FILE *txn_logfile; /* Transaction logfile for recording every event of interest. */
FILE *graph_logfile; /* Graph logfile for visualizing application structure. */
int perf_log_interval; /* Minimum interval for performance log entries in seconds. */

/* Resource monitoring configuration. */

Expand Down
3 changes: 2 additions & 1 deletion taskvine/src/manager/vine_perf_log.c
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ See the file COPYING for details.

#include "vine_manager.h"

#include "vine_perf_log.h"
#include "timestamp.h"
#include "buffer.h"
#include "debug.h"
Expand Down Expand Up @@ -52,7 +53,7 @@ void vine_perf_log_write_update( struct vine_manager *q, int force )
struct vine_stats s;

timestamp_t now = timestamp_get();
if(!force && (now - q->time_last_log_stats < ONE_SECOND)) {
if(!force && ((now - q->time_last_log_stats) < (long unsigned int)(ONE_SECOND*q->perf_log_interval))) {
return;
}

Expand Down
2 changes: 2 additions & 0 deletions taskvine/src/manager/vine_perf_log.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ This module is private to the manager and should not be invoked by the end user.

#include "vine_manager.h"

#define VINE_PERF_LOG_INTERVAL 5

void vine_perf_log_write_header( struct vine_manager *q );
void vine_perf_log_write_update( struct vine_manager *q, int force );

Expand Down
2 changes: 2 additions & 0 deletions taskvine/src/tools/vine_profile_dispatch
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
# This software is distributed under the GNU General Public License.
# See the file COPYING for details.

# Plot the time spent matching tasks to workers through the information in the performance log

import sys
import matplotlib.pyplot as plt

Expand Down