diff --git a/libjob_queue/CMakeLists.txt b/libjob_queue/CMakeLists.txt index 832dea5e42..e1359658cb 100644 --- a/libjob_queue/CMakeLists.txt +++ b/libjob_queue/CMakeLists.txt @@ -14,6 +14,7 @@ endif () add_library(job_queue src/ext_job.c src/ext_joblist.c src/forward_model.c + src/job_status.c src/job_list.c src/job_node.c src/job_queue.c diff --git a/libjob_queue/include/ert/job_queue/ext_job.h b/libjob_queue/include/ert/job_queue/ext_job.h index 620657f763..9056f016c4 100644 --- a/libjob_queue/include/ert/job_queue/ext_job.h +++ b/libjob_queue/include/ert/job_queue/ext_job.h @@ -1,19 +1,19 @@ /* - Copyright (C) 2011 Statoil ASA, Norway. - - The file 'ext_job.h' is part of ERT - Ensemble based Reservoir Tool. - - ERT is free software: you can redistribute it and/or modify - it under the terms of the GNU General Public License as published by - the Free Software Foundation, either version 3 of the License, or - (at your option) any later version. - - ERT is distributed in the hope that it will be useful, but WITHOUT ANY - WARRANTY; without even the implied warranty of MERCHANTABILITY or - FITNESS FOR A PARTICULAR PURPOSE. - - See the GNU General Public License at - for more details. + Copyright (C) 2011 Statoil ASA, Norway. + + The file 'ext_job.h' is part of ERT - Ensemble based Reservoir Tool. + + ERT is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + ERT is distributed in the hope that it will be useful, but WITHOUT ANY + WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. + + See the GNU General Public License at + for more details. */ #ifndef ERT_EXT_JOB_H @@ -89,7 +89,7 @@ void ext_job_add_environment(ext_job_type *ext_job , const ch void ext_job_clear_environment( ext_job_type * ext_job ); hash_type * ext_job_get_environment( ext_job_type * ext_job ); int ext_job_set_private_args_from_string( ext_job_type * ext_job , const char * arg_string ); -const char * ext_job_get_private_args_as_string( ext_job_type * ext_job ); +const char * ext_job_get_private_args_as_string( ext_job_type * ext_job ); const char * ext_job_get_license_path(const ext_job_type*); //const char * ext_job_get_arglist_as_string( ext_job_type * ext_job ); //void ext_job_set_arglist_from_string( ext_job_type * ext_job , const char * argv_string ); diff --git a/libjob_queue/include/ert/job_queue/job_status.h b/libjob_queue/include/ert/job_queue/job_status.h new file mode 100644 index 0000000000..50139284ad --- /dev/null +++ b/libjob_queue/include/ert/job_queue/job_status.h @@ -0,0 +1,140 @@ +/* + Copyright (C) 2018 Statoil ASA, Norway. + + The file 'job_status.h' is part of ERT - Ensemble based Reservoir Tool. + + ERT is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + ERT is distributed in the hope that it will be useful, but WITHOUT ANY + WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. + + See the GNU General Public License at + for more details. + */ + +#ifndef JOB_STATUS_H +#define JOB_STATUS_H +#ifdef __cplusplus +extern "C" { +#endif + +#include +/* + +---------------------------------+ + | | ++---------------------------------+ | JOB_QUEUE_WAITING <----------------------------------+ +| | | <--------------+ | +| JOB_QUEUE_NOT_ACTIVE | +---------------+-----------------+ | | +| | | | | ++---------------------------------+ | | | + | | | + +---------------v-----------------+ | | + | | | | ++---------------------------------+ | JOB_QUEUE_SUBMITTED | | | +| | | | | | +|JOB_QUEUE_STATUS_FAILURE | +-------+--------------------+----+ | | +| | | | | | ++---------------------------------+ | | +----------------+----------------+ | + +-------------------------v-------+ | | | | + | | | | JOB_QUEUE_DO_KILL_NODE_FAILURE | | + | JOB_QUEUE_PENDING | | | | | + | | | +---------------------^-----------+ | + +-------------------------+-------+ | | | + | | | | + | +---------------v-----------------+ | | + | | | | | + +----> JOB_QUEUE_RUNNING +------+ | + +----------------------------------------------------------+ | | + | +---+-------------------+---------+ | + | | | | + | | | | + +--------------v------------------+ +---------------------------------+ | +---------v-----------------------+ | + | | | | | | | | + | JOB_QUEUE_DO_KILL | | JOB_QUEUE_DONE +<---+ +---> JOB_QUEUE_EXIT | | + | | | | | | | | + +--------------+------------------+ +----------------+----------------+ | +-----------------+---------------+ | + | | | | | + | | | | | + | | | | | + | +----------------v----------------+ | +-----------------v---------------+ | + | | | | | | | + | |JOB_QUEUE_RUNNING_DONE_CALLBACK +----------+ | JOB_QUEUE_RUNNING_EXIT_CALLBACK +-------+ + | | | | | + | +----------------+----------------+ +----------------+----------------+ + | | | + | | | + | | | + +--------------v------------------+ +----------------v----------------+ +----------------v----------------+ + | | | | | | + | JOB_QUEUE_IS_KILLED | | JOB_QUEUE_SUCCESS | | JOB_QUEUE_FAILED | + | | | | | | + +---------------------------------+ +---------------------------------+ +---------------------------------+ + +*/ + + + +/* + NB: the status count algorithm has a HARD assumption that these + values are on the 2^N form - without holes in the series. +*/ + +typedef enum { + JOB_QUEUE_NOT_ACTIVE = 1, /* This value is used in external query routines - for jobs which are (currently) not active. */ + JOB_QUEUE_WAITING = 2, /* A node which is waiting in the internal queue. */ + JOB_QUEUE_SUBMITTED = 4, /* Internal status: It has has been submitted - the next status update will (should) place it as pending or running. */ + JOB_QUEUE_PENDING = 8, /* A node which is pending - a status returned by the external system. I.e LSF */ + JOB_QUEUE_RUNNING = 16, /* The job is running */ + JOB_QUEUE_DONE = 32, /* The job is done - but we have not yet checked if the target file is produced */ + JOB_QUEUE_EXIT = 64, /* The job has exited - check attempts to determine if we retry or go to complete_fail */ + JOB_QUEUE_IS_KILLED = 128, /* The job has been killed, following a JOB_QUEUE_DO_KILL*/ + JOB_QUEUE_DO_KILL = 256, /* The the job should be killed, either due to user request, or automated measures - the job can NOT be restarted. */ + JOB_QUEUE_SUCCESS = 512, + JOB_QUEUE_RUNNING_DONE_CALLBACK = 1024, + JOB_QUEUE_RUNNING_EXIT_CALLBACK = 2048, + JOB_QUEUE_STATUS_FAILURE = 4096, + JOB_QUEUE_FAILED = 8192, + JOB_QUEUE_DO_KILL_NODE_FAILURE = 16384 + } job_status_type; + +#define JOB_QUEUE_RUNNING_CALLBACK (JOB_QUEUE_RUNNING_DONE_CALLBACK + JOB_QUEUE_RUNNING_EXIT_CALLBACK) + +#define JOB_QUEUE_STATUS_ALL (JOB_QUEUE_NOT_ACTIVE + JOB_QUEUE_WAITING + JOB_QUEUE_SUBMITTED + JOB_QUEUE_PENDING + JOB_QUEUE_RUNNING + JOB_QUEUE_DONE + \ + JOB_QUEUE_EXIT + JOB_QUEUE_IS_KILLED + JOB_QUEUE_DO_KILL + JOB_QUEUE_SUCCESS + JOB_QUEUE_RUNNING_CALLBACK + \ + JOB_QUEUE_STATUS_FAILURE + JOB_QUEUE_FAILED + JOB_QUEUE_DO_KILL_NODE_FAILURE) + +#define JOB_QUEUE_MAX_STATE 15 + + /* + All jobs which are in the status set defined by + JOB_QUEUE_CAN_RESTART can be restarted based on external + user-input. It is OK to try to restart a job which is not in this + state - basically nothing should happen. + */ +#define JOB_QUEUE_CAN_RESTART (JOB_QUEUE_FAILED + JOB_QUEUE_IS_KILLED + JOB_QUEUE_SUCCESS) + + + /* + These are the jobs which can be killed. It is OK to try to kill a + job which is not in this state, the only thing happening is that the + function job_queue_kill_simulation() wil return false. + */ +#define JOB_QUEUE_CAN_KILL (JOB_QUEUE_WAITING + JOB_QUEUE_RUNNING + JOB_QUEUE_PENDING + JOB_QUEUE_SUBMITTED + JOB_QUEUE_DO_KILL + JOB_QUEUE_DO_KILL_NODE_FAILURE) + +#define JOB_QUEUE_WAITING_STATUS (JOB_QUEUE_WAITING + JOB_QUEUE_PENDING) + +#define JOB_QUEUE_CAN_UPDATE_STATUS (JOB_QUEUE_RUNNING + JOB_QUEUE_PENDING + JOB_QUEUE_SUBMITTED) + +#define JOB_QUEUE_COMPLETE_STATUS (JOB_QUEUE_IS_KILLED + JOB_QUEUE_SUCCESS + JOB_QUEUE_FAILED) + + +const char * job_status_get_name(job_status_type status); + +#ifdef __cplusplus +} +#endif +#endif diff --git a/libjob_queue/include/ert/job_queue/queue_driver.h b/libjob_queue/include/ert/job_queue/queue_driver.h index 730032eba6..0628c155dc 100644 --- a/libjob_queue/include/ert/job_queue/queue_driver.h +++ b/libjob_queue/include/ert/job_queue/queue_driver.h @@ -23,6 +23,7 @@ extern "C" { #endif #include +#include typedef enum { NULL_DRIVER = 0, @@ -46,114 +47,6 @@ extern "C" { */ #define MAX_RUNNING "MAX_RUNNING" -/* - +---------------------------------+ - | | -+---------------------------------+ | JOB_QUEUE_WAITING <----------------------------------+ -| | | <--------------+ | -| JOB_QUEUE_NOT_ACTIVE | +---------------+-----------------+ | | -| | | | | -+---------------------------------+ | | | - | | | - +---------------v-----------------+ | | - | | | | -+---------------------------------+ | JOB_QUEUE_SUBMITTED | | | -| | | | | | -|JOB_QUEUE_STATUS_FAILURE | +-------+--------------------+----+ | | -| | | | | | -+---------------------------------+ | | +----------------+----------------+ | - +-------------------------v-------+ | | | | - | | | | JOB_QUEUE_DO_KILL_NODE_FAILURE | | - | JOB_QUEUE_PENDING | | | | | - | | | +---------------------^-----------+ | - +-------------------------+-------+ | | | - | | | | - | +---------------v-----------------+ | | - | | | | | - +----> JOB_QUEUE_RUNNING +------+ | - +----------------------------------------------------------+ | | - | +---+-------------------+---------+ | - | | | | - | | | | - +--------------v------------------+ +---------------------------------+ | +---------v-----------------------+ | - | | | | | | | | - | JOB_QUEUE_DO_KILL | | JOB_QUEUE_DONE +<---+ +---> JOB_QUEUE_EXIT | | - | | | | | | | | - +--------------+------------------+ +----------------+----------------+ | +-----------------+---------------+ | - | | | | | - | | | | | - | | | | | - | +----------------v----------------+ | +-----------------v---------------+ | - | | | | | | | - | |JOB_QUEUE_RUNNING_DONE_CALLBACK +----------+ | JOB_QUEUE_RUNNING_EXIT_CALLBACK +-------+ - | | | | | - | +----------------+----------------+ +----------------+----------------+ - | | | - | | | - | | | - +--------------v------------------+ +----------------v----------------+ +----------------v----------------+ - | | | | | | - | JOB_QUEUE_IS_KILLED | | JOB_QUEUE_SUCCESS | | JOB_QUEUE_FAILED | - | | | | | | - +---------------------------------+ +---------------------------------+ +---------------------------------+ - -*/ - - - -/* - NB: the status count algorithm has a HARD assumption that these - values are on the 2^N form - without holes in the series. -*/ - -typedef enum { - JOB_QUEUE_NOT_ACTIVE = 1, /* This value is used in external query routines - for jobs which are (currently) not active. */ - JOB_QUEUE_WAITING = 2, /* A node which is waiting in the internal queue. */ - JOB_QUEUE_SUBMITTED = 4, /* Internal status: It has has been submitted - the next status update will (should) place it as pending or running. */ - JOB_QUEUE_PENDING = 8, /* A node which is pending - a status returned by the external system. I.e LSF */ - JOB_QUEUE_RUNNING = 16, /* The job is running */ - JOB_QUEUE_DONE = 32, /* The job is done - but we have not yet checked if the target file is produced */ - JOB_QUEUE_EXIT = 64, /* The job has exited - check attempts to determine if we retry or go to complete_fail */ - JOB_QUEUE_IS_KILLED = 128, /* The job has been killed, following a JOB_QUEUE_DO_KILL*/ - JOB_QUEUE_DO_KILL = 256, /* The the job should be killed, either due to user request, or automated measures - the job can NOT be restarted. */ - JOB_QUEUE_SUCCESS = 512, - JOB_QUEUE_RUNNING_DONE_CALLBACK = 1024, - JOB_QUEUE_RUNNING_EXIT_CALLBACK = 2048, - JOB_QUEUE_STATUS_FAILURE = 4096, - JOB_QUEUE_FAILED = 8192, - JOB_QUEUE_DO_KILL_NODE_FAILURE = 16384 - } job_status_type; - -#define JOB_QUEUE_RUNNING_CALLBACK (JOB_QUEUE_RUNNING_DONE_CALLBACK + JOB_QUEUE_RUNNING_EXIT_CALLBACK) - -#define JOB_QUEUE_STATUS_ALL (JOB_QUEUE_NOT_ACTIVE + JOB_QUEUE_WAITING + JOB_QUEUE_SUBMITTED + JOB_QUEUE_PENDING + JOB_QUEUE_RUNNING + JOB_QUEUE_DONE + \ - JOB_QUEUE_EXIT + JOB_QUEUE_IS_KILLED + JOB_QUEUE_DO_KILL + JOB_QUEUE_SUCCESS + JOB_QUEUE_RUNNING_CALLBACK + \ - JOB_QUEUE_STATUS_FAILURE + JOB_QUEUE_FAILED + JOB_QUEUE_DO_KILL_NODE_FAILURE) - -#define JOB_QUEUE_MAX_STATE 15 - - /* - All jobs which are in the status set defined by - JOB_QUEUE_CAN_RESTART can be restarted based on external - user-input. It is OK to try to restart a job which is not in this - state - basically nothing should happen. - */ -#define JOB_QUEUE_CAN_RESTART (JOB_QUEUE_FAILED + JOB_QUEUE_IS_KILLED + JOB_QUEUE_SUCCESS) - - - /* - These are the jobs which can be killed. It is OK to try to kill a - job which is not in this state, the only thing happening is that the - function job_queue_kill_simulation() wil return false. - */ -#define JOB_QUEUE_CAN_KILL (JOB_QUEUE_WAITING + JOB_QUEUE_RUNNING + JOB_QUEUE_PENDING + JOB_QUEUE_SUBMITTED + JOB_QUEUE_DO_KILL + JOB_QUEUE_DO_KILL_NODE_FAILURE) - -#define JOB_QUEUE_WAITING_STATUS (JOB_QUEUE_WAITING + JOB_QUEUE_PENDING) - -#define JOB_QUEUE_CAN_UPDATE_STATUS (JOB_QUEUE_RUNNING + JOB_QUEUE_PENDING + JOB_QUEUE_SUBMITTED) - -#define JOB_QUEUE_COMPLETE_STATUS (JOB_QUEUE_IS_KILLED + JOB_QUEUE_SUCCESS + JOB_QUEUE_FAILED) - typedef struct queue_driver_struct queue_driver_type; diff --git a/libjob_queue/src/ext_job.c b/libjob_queue/src/ext_job.c index 33eac309b8..4200325f40 100644 --- a/libjob_queue/src/ext_job.c +++ b/libjob_queue/src/ext_job.c @@ -338,7 +338,6 @@ static void ext_job_init_license_control(ext_job_type * ext_job) { if (ext_job->license_path == NULL) { ext_job->license_path = util_alloc_sprintf("%s%c%s" , ext_job->license_root_path , UTIL_PATH_SEP_CHAR , ext_job->name ); util_make_path( ext_job->license_path ); - printf("License for %s in %s \n",ext_job->name , ext_job->license_path); } } diff --git a/libjob_queue/src/forward_model.c b/libjob_queue/src/forward_model.c index 572ad4b35c..619f7815c1 100644 --- a/libjob_queue/src/forward_model.c +++ b/libjob_queue/src/forward_model.c @@ -30,6 +30,7 @@ #include #include #include +#include #include @@ -166,7 +167,7 @@ void forward_model_parse_job_deprecated_args(forward_model_type * forward_model, free( arg_string ); } } - + free(job_name); } diff --git a/libjob_queue/src/job_queue_status.c b/libjob_queue/src/job_queue_status.c index 9c3b14bee0..79c1d260b9 100644 --- a/libjob_queue/src/job_queue_status.c +++ b/libjob_queue/src/job_queue_status.c @@ -41,9 +41,9 @@ static int STATUS_INDEX(const job_queue_status_type * status_count, job_status_t return index; index++; - if (index == JOB_QUEUE_MAX_STATE) + if (index == JOB_QUEUE_MAX_STATE) util_abort("%s: failed to get index from status:%d \n",__func__ , status); - + } return 0; } @@ -59,8 +59,8 @@ job_queue_status_type * job_queue_status_alloc() { pthread_rwlock_init( &status->rw_lock , NULL); job_queue_status_clear( status ); - - + + status->status_index[0] = JOB_QUEUE_NOT_ACTIVE; // Initial, allocated job state, job not added - controlled by job_queue status->status_index[1] = JOB_QUEUE_WAITING; // The job is ready to be started - controlled by job_queue @@ -78,10 +78,10 @@ job_queue_status_type * job_queue_status_alloc() { status->status_index[13] = JOB_QUEUE_FAILED; // Job has failed, no more retries, FINAL STATE status->status_index[14] = JOB_QUEUE_DO_KILL_NODE_FAILURE; // Job has failed, node should be blacklisted - - - - + + + + return status; } diff --git a/libjob_queue/src/job_status.c b/libjob_queue/src/job_status.c new file mode 100644 index 0000000000..951df505fb --- /dev/null +++ b/libjob_queue/src/job_status.c @@ -0,0 +1,74 @@ +/* + Copyright (C) 2018 Statoil ASA, Norway. + + The file 'job_status.c' is part of ERT - Ensemble based Reservoir Tool. + + ERT is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + ERT is distributed in the hope that it will be useful, but WITHOUT ANY + WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. + + See the GNU General Public License at + for more details. +*/ + + +#include + + +const char * job_status_get_name(job_status_type status) { + switch(status) { + case JOB_QUEUE_NOT_ACTIVE: + return "JOB_QUEUE_NOT_ACTIVE"; + break; + case JOB_QUEUE_WAITING: + return "JOB_QUEUE_WAITING"; + break; + case JOB_QUEUE_SUBMITTED: + return "JOB_QUEUE_SUBMITTED"; + break; + case JOB_QUEUE_PENDING: + return "JOB_QUEUE_PENDING"; + break; + case JOB_QUEUE_RUNNING: + return "JOB_QUEUE_RUNNING"; + break; + case JOB_QUEUE_DONE: + return "JOB_QUEUE_DONE"; + break; + case JOB_QUEUE_EXIT: + return "JOB_QUEUE_EXIT"; + break; + case JOB_QUEUE_IS_KILLED: + return "JOB_QUEUE_IS_KILLED"; + break; + case JOB_QUEUE_DO_KILL: + return "JOB_QUEUE_DO_KILL"; + break; + case JOB_QUEUE_SUCCESS: + return "JOB_QUEUE_SUCCESS"; + break; + case JOB_QUEUE_RUNNING_DONE_CALLBACK: + return "JOB_QUEUE_RUNNING_DONE_CALLBACK"; + break; + case JOB_QUEUE_RUNNING_EXIT_CALLBACK: + return "JOB_QUEUE_RUNNING_EXIT_CALLBACK"; + break; + case JOB_QUEUE_STATUS_FAILURE: + return "JOB_QUEUE_STATUS_FAILURE"; + break; + case JOB_QUEUE_FAILED: + return "JOB_QUEUE_FAILED"; + break; + case JOB_QUEUE_DO_KILL_NODE_FAILURE: + return "JOB_QUEUE_DO_KILL_NODE_FAIURE"; + break; + } + + util_abort("%s: internal error", __func__); + return NULL; +} diff --git a/python/python/res/enkf/run_arg.py b/python/python/res/enkf/run_arg.py index ea105070bf..e060a51c32 100644 --- a/python/python/res/enkf/run_arg.py +++ b/python/python/res/enkf/run_arg.py @@ -27,6 +27,7 @@ class RunArg(BaseCClass): _get_run_id = EnkfPrototype("char* run_arg_get_run_id(run_arg)") _get_geo_id = EnkfPrototype("int run_arg_get_geo_id(run_arg)") _set_geo_id = EnkfPrototype("void run_arg_set_geo_id(run_arg, int)") + _get_runpath = EnkfPrototype("char* run_arg_get_runpath(run_arg)") def __init__(self): raise NotImplementedError("Cannot instantiat RunArg directly!") @@ -65,3 +66,7 @@ def geo_id(self): @geo_id.setter def geo_id(self, value): self._set_geo_id(value) + + @property + def runpath(self): + return self._get_runpath() diff --git a/python/python/res/job_queue/CMakeLists.txt b/python/python/res/job_queue/CMakeLists.txt index b3e2738132..d99541e2ea 100644 --- a/python/python/res/job_queue/CMakeLists.txt +++ b/python/python/res/job_queue/CMakeLists.txt @@ -7,6 +7,7 @@ set(PYTHON_SOURCES ext_joblist.py external_ert_script.py forward_model.py + forward_model_status.py function_ert_script.py job.py job_status_type_enum.py diff --git a/python/python/res/job_queue/__init__.py b/python/python/res/job_queue/__init__.py index 02998a06cc..8ec4a08ee2 100644 --- a/python/python/res/job_queue/__init__.py +++ b/python/python/res/job_queue/__init__.py @@ -1,18 +1,18 @@ -# Copyright (C) 2011 Statoil ASA, Norway. -# -# The file '__init__.py' is part of ERT - Ensemble based Reservoir Tool. -# -# ERT is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# ERT is distributed in the hope that it will be useful, but WITHOUT ANY -# WARRANTY; without even the implied warranty of MERCHANTABILITY or -# FITNESS FOR A PARTICULAR PURPOSE. -# -# See the GNU General Public License at -# for more details. +# Copyright (C) 2011 Statoil ASA, Norway. +# +# The file '__init__.py' is part of ERT - Ensemble based Reservoir Tool. +# +# ERT is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# ERT is distributed in the hope that it will be useful, but WITHOUT ANY +# WARRANTY; without even the implied warranty of MERCHANTABILITY or +# FITNESS FOR A PARTICULAR PURPOSE. +# +# See the GNU General Public License at +# for more details. """ The job_queue package contains modules and classes for running external commands. @@ -78,7 +78,7 @@ class QueuePrototype(Prototype): def __init__(self, prototype, bind=True): super(QueuePrototype, self).__init__(QueuePrototype.lib, prototype, bind=bind) - + #from .job_status_type_enum import JobStatusType from .job_status_type_enum import JobStatusType from .job import Job @@ -89,6 +89,7 @@ def __init__(self, prototype, bind=True): from .ext_joblist import ExtJoblist from .environment_varlist import EnvironmentVarlist from .forward_model import ForwardModel +from .forward_model_status import ForwardModelJobStatus, ForwardModelStatus from .ert_script import ErtScript from .ert_plugin import ErtPlugin, CancelPluginException diff --git a/python/python/res/job_queue/forward_model.py b/python/python/res/job_queue/forward_model.py index f06e488e31..f1ce64dec8 100644 --- a/python/python/res/job_queue/forward_model.py +++ b/python/python/res/job_queue/forward_model.py @@ -1,17 +1,17 @@ -# Copyright (C) 2012 Statoil ASA, Norway. -# -# The file 'forward_model.py' is part of ERT - Ensemble based Reservoir Tool. -# -# ERT is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# ERT is distributed in the hope that it will be useful, but WITHOUT ANY -# WARRANTY; without even the implied warranty of MERCHANTABILITY or -# FITNESS FOR A PARTICULAR PURPOSE. -# -# See the GNU General Public License at +# Copyright (C) 2012 Statoil ASA, Norway. +# +# The file 'forward_model.py' is part of ERT - Ensemble based Reservoir Tool. +# +# ERT is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# ERT is distributed in the hope that it will be useful, but WITHOUT ANY +# WARRANTY; without even the implied warranty of MERCHANTABILITY or +# FITNESS FOR A PARTICULAR PURPOSE. +# +# See the GNU General Public License at # for more details. from cwrap import BaseCClass from res.job_queue import ExtJob, QueuePrototype, ExtJoblist @@ -39,6 +39,9 @@ def __init__(self, ext_joblist): else: raise ValueError('Failed to construct forward model from provided ext_joblist %s' % ext_joblist) + def __len__(self): + return self._get_length() + def joblist(self): """ @rtype: StringList """ return self._alloc_joblist( ) @@ -64,4 +67,4 @@ def __repr__(self): return self._create_repr('joblist=%s' % self.joblist()) def get_size(self): - return self._get_length() + return len(self) diff --git a/python/python/res/job_queue/forward_model_status.py b/python/python/res/job_queue/forward_model_status.py new file mode 100644 index 0000000000..2f76112761 --- /dev/null +++ b/python/python/res/job_queue/forward_model_status.py @@ -0,0 +1,141 @@ +# Copyright (C) 2018 Statoil ASA, Norway. +# +# The file 'forward_model_status.py' is part of ERT - Ensemble based Reservoir Tool. +# +# ERT is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# ERT is distributed in the hope that it will be useful, but WITHOUT ANY +# WARRANTY; without even the implied warranty of MERCHANTABILITY or +# FITNESS FOR A PARTICULAR PURPOSE. +# +# See the GNU General Public License at +# for more details. +import os.path +import json +import datetime +import time +import sys + +def _serialize_date(dt): + if dt is None: + return None + + return time.mktime(dt.timetuple()) + + +def _deserialize_date(serial_dt): + if serial_dt is None: + return None + + time_struct = time.gmtime(serial_dt) + return datetime.datetime(*time_struct[0:6]) + + +class ForwardModelJobStatus(object): + + def __init__(self, name, start_time = None, end_time = None, status = "Waiting", error=None): + self.start_time = start_time + self.end_time = end_time + self.name = name + self.status = status + self.error = error + + + @classmethod + def load(cls,data): + start_time = _deserialize_date(data["start_time"]) + end_time = _deserialize_date(data["end_time"]) + name = data["name"] + status = data["status"] + error = data["error"] + + return cls(name, + start_time=start_time, + end_time=end_time, + status=status, + error=error) + + + def __str__(self): + return "name:{} start_time:{} end_time:{} status:{} error:{} ".format(self.name, self.start_time, self.end_time, self.status, self.error) + + + def dump_data(self): + return {"name" : self.name, + "status" : self.status, + "error" : self.error, + "start_time" : _serialize_date(self.start_time), + "end_time" : _serialize_date(self.end_time)} + +class ForwardModelStatus(object): + STATUS_FILE = "status.json" + + + def __init__(self, run_id, start_time = None, end_time = None): + self.run_id = run_id + self.start_time = start_time + self.end_time = end_time + self._jobs = [] + + @classmethod + def try_load(cls, status_file): + fp = open(status_file) + data = json.load(fp) + + start_time = _deserialize_date(data["start_time"]) + end_time = _deserialize_date(data["end_time"]) + status = cls(data["run_id"], + start_time=start_time, + end_time=end_time) + + for job in data["jobs"]: + status.add_job(ForwardModelJobStatus.load(job)) + + return status + + + @classmethod + def load(cls, path): + num_retry = 10 + sleep_time = 0.10 + attempt = 0 + status_file = os.path.join(path, cls.STATUS_FILE) + while attempt < num_retry: + try: + status = cls.try_load(status_file) + return status + except: + time.sleep(0.10) + attempt += 1 + + return None + + + @property + def jobs(self): + return self._jobs + + + def add_job(self, job): + self._jobs.append(job) + + + def dump(self, filename = None): + if filename is None: + status_file = self.STATUS_FILE + else: + status_file = filename + + data = {"run_id" : self.run_id, + "start_time" : _serialize_date(self.start_time), + "end_time" : _serialize_date(self.end_time)} + jobs = [] + for job in self.jobs: + jobs.append( job.dump_data() ) + + data["jobs"] = jobs + with open(status_file, "w") as fp: + json.dump(data, fp) diff --git a/python/python/res/job_queue/job_manager.py b/python/python/res/job_queue/job_manager.py index df0e05be41..e68b5a7e1b 100644 --- a/python/python/res/job_queue/job_manager.py +++ b/python/python/res/job_queue/job_manager.py @@ -29,6 +29,7 @@ import imp from ecl import EclVersion from res import ResVersion +from res.job_queue import ForwardModelStatus, ForwardModelJobStatus from sys import version as sys_version def redirect(file, fd, open_mode): @@ -139,7 +140,9 @@ def __init__(self, module_file="jobs.py", json_file="jobs.json", error_url=None, self.global_environment = None self.global_update_path = None if json_file is not None and os.path.isfile(json_file): + self.job_status = ForwardModelStatus("????") self._loadJson(json_file) + self.job_status.run_id = self.simulation_id else: raise IOError("'jobs.json' not found.") @@ -178,6 +181,9 @@ def __init__(self, module_file="jobs.py", json_file="jobs.json", error_url=None, self.information = logged_fields + def dump_status(self): + self.job_status.dump(ForwardModelStatus.STATUS_FILE) + def set_environment(self): if self.global_environment: data = self.global_environment @@ -221,6 +227,13 @@ def _loadJson(self, json_file_name): self._ensureCompatibleJobList() self._buildJobMap() + for job in self.job_list: + self.job_status.add_job( ForwardModelJobStatus(job.get("name"))) + + # "Monkey-patching" the job object by attaching a status object. + status_list = self.job_status.jobs + for i in range(len(self.job_list)): + self.job_list[i]["status"] = status_list[i] # To ensure compatibility with old versions. def _ensureCompatibleJobList(self): @@ -470,10 +483,28 @@ def addLogLine(self, job): def runJob(self, job): assert_file_executable(job.get('executable')) self.addLogLine(job) + + status = job["status"] + status.start_time = dt.now() + status.status = "Running" + self.job_status.dump() + pid = os.fork() exit_status, err_msg = 0, '' if pid == 0: - self.execJob(job) + # This code block should exec into the actual executable we are + # running, and execution should not come back here. However - if + # the code fails with an exception before actually reaching the + # exec() call we suddenly have two Python processes running the + # current code; one waiting for the exit status and one unrolling + # an exception. The latter will incorrectly "steal" the + # finalization of with statements. So - in the case of an exception + # before the exec() call we call the hard exit: os._exit(1). + try: + self.execJob(job) + except Exception as e: + sys.stderr.write("Failed to exec:%s error:%s\n" % (job["name"], str(e))) + os._exit(1) else: _, exit_status = os.waitpid(pid, 0) # The exit_status returned from os.waitpid encodes @@ -481,10 +512,19 @@ def runJob(self, job): # and in case the job was killed by a signal - the # number of that signal. exit_status = os.WEXITSTATUS(exit_status) + + status.end_time = dt.now() + if exit_status != 0: err_msg = "Executable: %s failed with exit code: %s" % (job.get('executable'), exit_status) + status.status = "Failure" + status.error = err_msg + else: + status.status = "Success" + + self.job_status.dump() return exit_status, err_msg diff --git a/python/python/res/job_queue/job_status_type_enum.py b/python/python/res/job_queue/job_status_type_enum.py index b7cd877556..4b49885ae0 100644 --- a/python/python/res/job_queue/job_status_type_enum.py +++ b/python/python/res/job_queue/job_status_type_enum.py @@ -35,6 +35,9 @@ class JobStatusType(BaseCEnum): JOB_QUEUE_FAILED = None JOB_QUEUE_DO_KILL_NODE_FAILURE = None + @classmethod + def from_string(cls, string): + pass JobStatusType.addEnum("JOB_QUEUE_NOT_ACTIVE", 1) diff --git a/python/python/res/server/simulation_context.py b/python/python/res/server/simulation_context.py index c794427eab..c712d01202 100644 --- a/python/python/res/server/simulation_context.py +++ b/python/python/res/server/simulation_context.py @@ -1,6 +1,7 @@ +import os.path from ecl.util.util import ArgPack, BoolVector -from res.job_queue import JobQueueManager +from res.job_queue import JobQueueManager, ForwardModelStatus from res.util import CThreadPool from res.enkf import ENKF_LIB from res.enkf.ert_run_context import ErtRunContext @@ -121,3 +122,49 @@ def get_run_context(self): def stop(self): self._queue_manager.stop_queue( ) + + + def job_progress(self, iens): + """Will return a detailed progress of the job. + + The progress report is obtained by reading a file from the filesystem, + that file is typically created by another process running on another + machine, and reading might fail due to NFS issues, simultanoues write + and so on. If loading valid json fails the function will sleep 0.10 + seconds and retry - eventually giving up and returning None. Also for + jobs which have not yet started the method will return None. + + When the method succeeds in reading the progress file from the file + system the return value will be an object with properties like this:| + + progress.start_time + progress.end_time + progress.run_id + progress.jobs =[ (job1.name, job1.start_time, job1.end_time, job1.status, job1.error_msg), + (job2.name, job2.start_time, job2.end_time, job2.status, job2.error_msg), + .... + (jobN.name, jobN.start_time, jobN.end_time, jobN.status, jobN.error_msg) ] + + """ + if not iens in self._run_args: + raise KeyError("No such simulation: %s" % iens) + + run_arg = self._run_args[iens] + queue_index = run_arg.getQueueIndex() + if self._queue_manager.isJobWaiting(queue_index): + return None + + return ForwardModelStatus.load(run_arg.runpath) + + + + + def job_status(self, iens): + """Will query the queue system for the status of the job. + """ + if not iens in self._run_args: + raise KeyError("No such simulation: %s" % iens) + + run_arg = self._run_args[iens] + queue_index = run_arg.getQueueIndex() + return self._queue_manager.getJobStatus(queue_index) diff --git a/python/tests/res/job_queue/test_forward_model_formatted_print.py b/python/tests/res/job_queue/test_forward_model_formatted_print.py index d7d937a634..af650cdc89 100644 --- a/python/tests/res/job_queue/test_forward_model_formatted_print.py +++ b/python/tests/res/job_queue/test_forward_model_formatted_print.py @@ -7,6 +7,7 @@ from res.util.substitution_list import SubstitutionList from res.job_queue.environment_varlist import EnvironmentVarlist from res.job_queue.forward_model import ForwardModel +from res.job_queue.forward_model_status import ForwardModelStatus from res.job_queue.ext_job import ExtJob from res.job_queue.ext_joblist import ExtJoblist @@ -527,3 +528,26 @@ def test_various_null_fields(self): joblist[0][key] = None self.run_all() joblist[0][key] = back_up + + + def test_status_file(self): + with TestAreaContext("status_json"): + forward_model = self.set_up_forward_model() + run_id = "test_no_jobs_id" + umask = 4 + global_args = SubstitutionList() + varlist = EnvironmentVarlist() + forward_model.formatted_fprintf( + run_id, + os.getcwd(), + "data_root", + global_args, + umask, + varlist) + + s = '{"start_time": null, "jobs": [{"status": "Success", "start_time": 1519653419.0, "end_time": 1519653419.0, "name": "SQUARE_PARAMS", "error": null}], "end_time": null, "run_id": ""}' + + with open("status.json", "w") as f: + f.write(s) + + status = ForwardModelStatus.try_load("status.json") diff --git a/python/tests/res/job_queue/test_job_queue.py b/python/tests/res/job_queue/test_job_queue.py index ab81693f73..052a649bc1 100644 --- a/python/tests/res/job_queue/test_job_queue.py +++ b/python/tests/res/job_queue/test_job_queue.py @@ -5,7 +5,7 @@ class JobQueueTest(ResTest): def testStatusEnum(self): - source_path = "libjob_queue/include/ert/job_queue/queue_driver.h" + source_path = "libjob_queue/include/ert/job_queue/job_status.h" self.assertEnumIsFullyDefined(JobStatusType, "job_status_type", source_path) diff --git a/python/tests/res/simulator/test_batch_sim.py b/python/tests/res/simulator/test_batch_sim.py index a6bb638f0a..4fd129b8ab 100644 --- a/python/tests/res/simulator/test_batch_sim.py +++ b/python/tests/res/simulator/test_batch_sim.py @@ -174,11 +174,24 @@ def test_batch_simulation(self): with self.assertRaises(RuntimeError): ctx.results() + # Ask for status of simulation we do not have. + with self.assertRaises(KeyError): + ctx.job_status(1973) + + with self.assertRaises(KeyError): + ctx.job_progress(1987) + # Carry out simulations.. while ctx.running(): status = ctx.status time.sleep(1) sys.stderr.write("status: %s\n" % str(status)) + for job_index in range(len(case_data)): + status = ctx.job_status(job_index) + progress = ctx.job_progress(job_index) + if progress: + for job in progress.jobs: + sys.stderr.write(" %s \n" % str(job)) # Fetch and validate results results = ctx.results() diff --git a/test-data/local/batch_sim/jobs/square_params.py b/test-data/local/batch_sim/jobs/square_params.py index a552ab951f..f6afea1414 100644 --- a/test-data/local/batch_sim/jobs/square_params.py +++ b/test-data/local/batch_sim/jobs/square_params.py @@ -1,6 +1,8 @@ #!/usr/bin/env python import sys import json +import random +import time keys = ["W1", "W2", "W3"] @@ -12,6 +14,7 @@ def copy_file(input_file, output_file): sq = data[key] * data[key] f.write("%g\n" % sq) + time.sleep(random.randint(0,4)) if __name__ == "__main__": copy_file("WELL_ORDER.json", "ORDER_0")