Skip to content

Commit

Permalink
Vine: Explicit Process Type Field (#3454)
Browse files Browse the repository at this point in the history
* Add process type field.

* Add explicit vine_process_type_t to distinguish process types.

* - Use process type when taking process actions.
- Formatting and commentary.
- Keep private functions within the module.

* Remove unused cache argument from vine_process_execute_and_wait

* Removed unused source field.

* Default return value from function.
  • Loading branch information
dthain authored Aug 15, 2023
1 parent b6fe7fb commit ba7d029
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 32 deletions.
4 changes: 2 additions & 2 deletions taskvine/src/worker/vine_cache.c
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ This will be double-checked below.

static int do_mini_task( struct vine_cache *c, struct vine_cache_file *f, char **error_message )
{
if(vine_process_execute_and_wait(f->process,c)) {
if(vine_process_execute_and_wait(f->process)) {
*error_message = 0;
return 1;
} else {
Expand Down Expand Up @@ -498,7 +498,7 @@ vine_cache_status_t vine_cache_ensure( struct vine_cache *c, const char *cachena
debug(D_VINE,"forking transfer process to create %s", cachename);

if(f->type == VINE_CACHE_MINI_TASK){
struct vine_process *p = vine_process_create(f->mini_task, 1);
struct vine_process *p = vine_process_create(f->mini_task,VINE_PROCESS_TYPE_MINI_TASK);
if(!vine_sandbox_stagein(p,c)) {
debug(D_VINE, "Can't stage input files for task %d.", p->task->task_id);
p->task = 0;
Expand Down
70 changes: 51 additions & 19 deletions taskvine/src/worker/vine_process.c
Original file line number Diff line number Diff line change
Expand Up @@ -48,29 +48,46 @@ See the file COPYING for details.
#include <sys/stat.h>
#include <sys/types.h>

extern char * workspace;

static int vine_process_wait_for_library_startup( struct vine_process *p, time_t stoptime );
static char *vine_process_invoke_function( struct vine_process *library_process, const char *function_name, const char *function_input, const char *sandbox_path );


/*
Create the task sandbox directory.
Create temporary directories inside as well.
Give the letter code used for the process sandbox dir.
*/

extern char * workspace;
static const char * vine_process_sandbox_code( vine_process_type_t type )
{
switch(type) {
case VINE_PROCESS_TYPE_STANDARD: return "task";
case VINE_PROCESS_TYPE_MINI_TASK: return "mini";
case VINE_PROCESS_TYPE_LIBRARY: return "libr";
case VINE_PROCESS_TYPE_FUNCTION: return "func";
case VINE_PROCESS_TYPE_TRANSFER: return "tran";
}

/* Odd return here is used to silence compiler while still retaining typedef check above. */
return "task";
}

/*
Create a vine_process and all of the information necessary for invocation.
However, do not allocate substantial resources at this point.
*/

struct vine_process *vine_process_create( struct vine_task *task, int mini_task )
struct vine_process *vine_process_create( struct vine_task *task, vine_process_type_t type )
{
struct vine_process *p = malloc(sizeof(*p));
memset(p, 0, sizeof(*p));

const char *type = mini_task ? "m" : "t";
const char *dirtype = vine_process_sandbox_code(p->type);

p->task = task;

p->cache_dir = string_format("%s/cache",workspace);
p->sandbox = string_format("%s/%s.%d", workspace,type,p->task->task_id);
p->sandbox = string_format("%s/%s.%d", workspace,dirtype,p->task->task_id);
p->tmpdir = string_format("%s/.taskvine.tmp",p->sandbox);
p->output_file_name = string_format("%s/.taskvine.stdout",p->sandbox);

Expand Down Expand Up @@ -111,7 +128,8 @@ void vine_process_delete(struct vine_process *p)
free(p);
}

static void clear_environment() {
static void clear_environment()
{
/* Clear variables that we really want the user to set explicitly.
* Ideally, we would start with a clean environment, but certain variables,
* such as HOME are seldom set explicitly, and some executables rely on them.
Expand Down Expand Up @@ -148,13 +166,15 @@ static void export_environment( struct vine_process *p )
}
}

static void set_integer_env_var( struct vine_process *p, const char *name, int64_t value) {
static void set_integer_env_var( struct vine_process *p, const char *name, int64_t value)
{
char *value_str = string_format("%" PRId64, value);
vine_task_set_env_var(p->task, name, value_str);
free(value_str);
}

static void set_resources_vars(struct vine_process *p) {
static void set_resources_vars(struct vine_process *p)
{
if(p->task->resources_requested->cores > 0) {
set_integer_env_var(p, "CORES", p->task->resources_requested->cores);
set_integer_env_var(p, "OMP_NUM_THREADS", p->task->resources_requested->cores);
Expand All @@ -176,7 +196,8 @@ static void set_resources_vars(struct vine_process *p) {
}
}

static char * load_input_file(struct vine_task *t) {
static char * load_input_file(struct vine_task *t)
{
FILE *fp = fopen("infile", "r");
if(!fp) {
fatal("coprocess could not open file 'infile' for reading: %s", strerror(errno));
Expand Down Expand Up @@ -215,7 +236,7 @@ void vine_process_set_exit_status( struct vine_process *p, int status )
Execute a task synchronously and return true on success.
*/

int vine_process_execute_and_wait( struct vine_process *p, struct vine_cache *cache)
int vine_process_execute_and_wait( struct vine_process *p )
{

pid_t pid = vine_process_execute(p);
Expand All @@ -232,6 +253,11 @@ int vine_process_execute_and_wait( struct vine_process *p, struct vine_cache *ca
return 1;
}

/*
Start a process executing and if successful, return its Unix pid.
On failure, return negative value.
*/

pid_t vine_process_execute(struct vine_process *p )
{
/* Flush pending stdio buffers prior to forking process, to avoid stale output in child. */
Expand All @@ -244,7 +270,7 @@ pid_t vine_process_execute(struct vine_process *p )
int output_fd = -1;
int error_fd = -1;

if(p->task->provides_library) {
if(p->type==VINE_PROCESS_TYPE_LIBRARY) {
/* If starting a library, create the pipes for parent-child communication. */

if(pipe(pipe_in)<0) fatal("couldn't create library pipes: %s\n",strerror(errno));
Expand Down Expand Up @@ -284,7 +310,7 @@ pid_t vine_process_execute(struct vine_process *p )
debug(D_VINE, "started process %d: %s", p->pid, p->task->command_line);

/* If we just started a library, then retain links to communicate with it. */
if(p->task->provides_library) {
if(p->type==VINE_PROCESS_TYPE_LIBRARY) {

debug(D_VINE, "waiting for library startup message from pid %d\n", p->pid );

Expand Down Expand Up @@ -338,7 +364,8 @@ pid_t vine_process_execute(struct vine_process *p )

/* In the special case of a function-call-task, just load data, communicate with the library, and exit. */

if(p->task->needs_library) {
if(p->type==VINE_PROCESS_TYPE_FUNCTION) {

change_process_title("vine_worker [function]");

// load data from input file
Expand All @@ -353,7 +380,7 @@ pid_t vine_process_execute(struct vine_process *p )
_exit(0);
}

/* Otherwise for a normal task or a library, set up file desciptors and execute the command. */
/* Otherwise for other process types, set up file desciptors and execute the command. */

int result = dup2(input_fd, STDIN_FILENO);
if(result<0) fatal("could not dup input to stdin: %s", strerror(errno));
Expand All @@ -369,7 +396,7 @@ pid_t vine_process_execute(struct vine_process *p )
close(error_fd);

/* For a library task, close the unused sides of the pipes. */
if(p->task->provides_library) {
if(p->type==VINE_PROCESS_TYPE_LIBRARY) {
close(pipe_in[1]);
close(pipe_out[0]);
}
Expand Down Expand Up @@ -397,7 +424,7 @@ back the library startup message with JSON containing the name of
the library, which should match the task's provides_library label.
*/

int vine_process_wait_for_library_startup( struct vine_process *p, time_t stoptime )
static int vine_process_wait_for_library_startup( struct vine_process *p, time_t stoptime )
{
char buffer[VINE_LINE_MAX];
int length = 0;
Expand Down Expand Up @@ -427,7 +454,7 @@ Invoke a function against a library by sending the invocation message,
and then reading back the result from the necessary pipe.
*/

char *vine_process_invoke_function( struct vine_process *library_process, const char *function_name, const char *function_input, const char *sandbox_path )
static char *vine_process_invoke_function( struct vine_process *library_process, const char *function_name, const char *function_input, const char *sandbox_path )
{
/* Set a five minute timeout. XXX This should be changeable. */
time_t stoptime = time(0)+300;
Expand All @@ -454,6 +481,10 @@ char *vine_process_invoke_function( struct vine_process *library_process, const
}
}

/*
Kill a running process and reap the final process state.
*/

void vine_process_kill(struct vine_process *p)
{
//make sure a few seconds have passed since child process was created to avoid sending a signal
Expand Down Expand Up @@ -512,7 +543,8 @@ void vine_process_compute_disk_needed( struct vine_process *p )

}

int vine_process_measure_disk(struct vine_process *p, int max_time_on_measurement) {
int vine_process_measure_disk(struct vine_process *p, int max_time_on_measurement)
{
/* we can't have pointers to struct members, thus we create temp variables here */

struct path_disk_size_info *state = p->disk_measurement_state;
Expand Down
24 changes: 14 additions & 10 deletions taskvine/src/worker/vine_process.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ See the file COPYING for details.

#include "vine_manager.h"
#include "vine_task.h"
#include "vine_cache.h"

#include "timestamp.h"
#include "path_disk_size_info.h"
Expand All @@ -18,12 +17,21 @@ See the file COPYING for details.
#include <sys/types.h>
#include <sys/resource.h>

typedef enum {
VINE_PROCESS_TYPE_STANDARD, // standard task with command line
VINE_PROCESS_TYPE_LIBRARY, // task providing serverless library
VINE_PROCESS_TYPE_FUNCTION, // task invoking serverless library
VINE_PROCESS_TYPE_MINI_TASK, // internal task used to create file
VINE_PROCESS_TYPE_TRANSFER, // internal task used to transfer file
} vine_process_type_t;

/*
vine_process is a running instance of a vine_task.
This object is private to the vine_worker.
*/

struct vine_process {
vine_process_type_t type;
pid_t pid;
vine_result_t result; // Any of VINE_RESULT_*
int exit_code; // Exit code, or signal number to task process.
Expand All @@ -37,7 +45,7 @@ struct vine_process {
char *tmpdir; // TMPDIR per task, expected to be a subdir of sandbox.
char *output_file_name;

/* The details of the task to execute. */
/* If a normal task, the details of the task to execute. */
struct vine_task *task;

/* If a function-call task, this is the specific library process to invoke. */
Expand All @@ -58,19 +66,15 @@ struct vine_process {
struct path_disk_size_info *disk_measurement_state;
};

struct vine_process * vine_process_create( struct vine_task *task, int mini_task );
struct vine_process * vine_process_create( struct vine_task *task, vine_process_type_t type );
pid_t vine_process_execute( struct vine_process *p );
void vine_process_set_exit_status( struct vine_process *p, int status );
void vine_process_kill( struct vine_process *p );
void vine_process_delete( struct vine_process *p );
void vine_process_compute_disk_needed( struct vine_process *p );

int vine_process_measure_disk(struct vine_process *p, int max_time_on_measurement);
char *vine_process_get_library_name(struct vine_process *p);
int vine_process_execute_and_wait( struct vine_process *p );

int vine_process_execute_and_wait( struct vine_process *p, struct vine_cache *cache);

int vine_process_wait_for_library_startup( struct vine_process *p, time_t stoptime );
char *vine_process_invoke_function( struct vine_process *library_process, const char *function_name, const char *function_input, const char *sandbox_path );
void vine_process_compute_disk_needed( struct vine_process *p );
int vine_process_measure_disk(struct vine_process *p, int max_time_on_measurement);

#endif
12 changes: 11 additions & 1 deletion taskvine/src/worker/vine_worker.c
Original file line number Diff line number Diff line change
Expand Up @@ -773,7 +773,17 @@ static int do_task( struct link *manager, int task_id, time_t stoptime )

last_task_received = task->task_id;

struct vine_process *p = vine_process_create(task, 0);
vine_process_type_t type = VINE_PROCESS_TYPE_STANDARD;

if(task->needs_library) {
type = VINE_PROCESS_TYPE_FUNCTION;
} else if(task->provides_library) {
type = VINE_PROCESS_TYPE_LIBRARY;
} else {
type = VINE_PROCESS_TYPE_STANDARD;
}

struct vine_process *p = vine_process_create(task,type);
if(!p) return 0;

itable_insert(procs_table,task_id,p);
Expand Down

0 comments on commit ba7d029

Please sign in to comment.