From b3bb31e8ef825e4dcbf6ffc8e697a6eb8d5225b8 Mon Sep 17 00:00:00 2001 From: DanSava Date: Thu, 12 Dec 2024 13:44:06 +0200 Subject: [PATCH] Show error when everest fm_step reaches MAX_RUNTIME Also log the error in forward_models.log --- src/ert/run_models/everest_run_model.py | 15 +++--- src/everest/detached/jobs/everserver.py | 18 +++---- tests/everest/test_everserver.py | 63 +++++++++++++++++++++++-- 3 files changed, 74 insertions(+), 22 deletions(-) diff --git a/src/ert/run_models/everest_run_model.py b/src/ert/run_models/everest_run_model.py index 8d7f9fdef12..4877c7d1c80 100644 --- a/src/ert/run_models/everest_run_model.py +++ b/src/ert/run_models/everest_run_model.py @@ -304,25 +304,27 @@ def _handle_errors( realization: str, fm_name: str, error_path: str, + fm_running_err: str, ) -> None: fm_id = f"b_{batch}_r_{realization}_s_{simulation}_{fm_name}" fm_logger = logging.getLogger("forward_models") - with open(error_path, encoding="utf-8") as errors: - error_str = errors.read() - + if Path(error_path).is_file(): + error_str = Path(error_path).read_text(encoding="utf-8") or fm_running_err + else: + error_str = fm_running_err error_hash = hash(error_str) err_msg = "Batch: {} Realization: {} Simulation: {} Job: {} Failed {}".format( - batch, realization, simulation, fm_name, "Error: {}\n {}" + batch, realization, simulation, fm_name, "\n Error: {} ID:{}" ) if error_hash not in self._fm_errors: error_id = len(self._fm_errors) - fm_logger.error(err_msg.format(error_id, error_str)) + fm_logger.error(err_msg.format(error_str, error_id)) self._fm_errors.update({error_hash: {"error_id": error_id, "ids": [fm_id]}}) elif fm_id not in self._fm_errors[error_hash]["ids"]: self._fm_errors[error_hash]["ids"].append(fm_id) error_id = self._fm_errors[error_hash]["error_id"] - fm_logger.error(err_msg.format(error_id, "")) + fm_logger.error(err_msg.format("Already reported as", error_id)) def _delete_runpath(self, run_args: list[RunArg]) -> None: logging.getLogger(EVEREST).debug("Simulation callback called") @@ -730,6 +732,7 @@ def _simulation_status(self, snapshot: EnsembleSnapshot) -> SimulationStatus: realization=realization, fm_name=fm_step.get("name", "Unknwon"), # type: ignore error_path=fm_step.get("stderr", ""), # type: ignore + fm_running_err=fm_step.get("error", ""), # type: ignore ) jobs_progress.append(jobs) diff --git a/src/everest/detached/jobs/everserver.py b/src/everest/detached/jobs/everserver.py index 936ccedf236..3c36a71fd71 100755 --- a/src/everest/detached/jobs/everserver.py +++ b/src/everest/detached/jobs/everserver.py @@ -398,17 +398,13 @@ def _failed_realizations_messages(shared_data): messages = [OPT_FAILURE_REALIZATIONS] failed = shared_data[SIM_PROGRESS_ENDPOINT]["status"]["failed"] if failed > 0: - # Find the set of jobs that failed. To keep the order in which they - # are found in the queue, use a dict as sets are not ordered. - failed_jobs = dict.fromkeys( - job["name"] - for queue in shared_data[SIM_PROGRESS_ENDPOINT]["progress"] - for job in queue - if job["status"] == JOB_FAILURE - ).keys() - messages.append( - "{} job failures caused by: {}".format(failed, ", ".join(failed_jobs)) - ) + # Report each unique pair of failed job name and error + for queue in shared_data[SIM_PROGRESS_ENDPOINT]["progress"]: + for job in queue: + if job["status"] == JOB_FAILURE: + err_msg = f"{job['name']} Failed with: {job.get('error', '')}" + if err_msg not in messages: + messages.append(err_msg) return messages diff --git a/tests/everest/test_everserver.py b/tests/everest/test_everserver.py index 3c501440eec..fca084023cb 100644 --- a/tests/everest/test_everserver.py +++ b/tests/everest/test_everserver.py @@ -52,6 +52,14 @@ def set_shared_status(*args, progress, shared_data): } +def _add_snippet(file_path, snippet, position): + with open(file_path, encoding="utf-8") as file: + lines = file.readlines() + lines.insert(position - 1, snippet + "\n") + with open(file_path, "w", encoding="utf-8") as file: + file.writelines(lines) + + def test_certificate_generation(copy_math_func_test_data_to_tmp): config = EverestConfig.load_file("config_minimal.yml") cert, key, pw = everserver._generate_certificate( @@ -170,12 +178,12 @@ def test_everserver_status_running_complete( set_shared_status, progress=[ [ - {"name": "job1", "status": JOB_FAILURE}, - {"name": "job1", "status": JOB_FAILURE}, + {"name": "job1", "status": JOB_FAILURE, "error": "job 1 error 1"}, + {"name": "job1", "status": JOB_FAILURE, "error": "job 1 error 2"}, ], [ - {"name": "job2", "status": JOB_SUCCESS}, - {"name": "job2", "status": JOB_FAILURE}, + {"name": "job2", "status": JOB_SUCCESS, "error": ""}, + {"name": "job2", "status": JOB_FAILURE, "error": "job 2 error 1"}, ], ], ), @@ -193,7 +201,9 @@ def test_everserver_status_failed_job( # The server should fail and store a user-friendly message. assert status["status"] == ServerStatus.failed assert OPT_FAILURE_REALIZATIONS in status["message"] - assert "3 job failures caused by: job1, job2" in status["message"] + assert "job1 Failed with: job 1 error 1" in status["message"] + assert "job1 Failed with: job 1 error 2" in status["message"] + assert "job2 Failed with: job 2 error 1" in status["message"] @patch("sys.argv", ["name", "--config-file", "config_minimal.yml"]) @@ -271,3 +281,46 @@ def test_everserver_status_max_batch_num( filter_out_gradient=False, batches=None ) assert {data.batch for data in snapshot.simulation_data} == {0} + + +@patch("sys.argv", ["name", "--config-file", "config_minimal.yml"]) +@patch("everest.detached.jobs.everserver._configure_loggers") +@patch("everest.detached.jobs.everserver._generate_authentication") +@patch( + "everest.detached.jobs.everserver._generate_certificate", + return_value=(None, None, None), +) +@patch( + "everest.detached.jobs.everserver._find_open_port", + return_value=42, +) +@patch("everest.detached.jobs.everserver._write_hostfile") +@patch("everest.detached.jobs.everserver._everserver_thread") +def test_everserver_status_contains_max_runtime_failure( + _1, _2, _3, _4, _5, _6, copy_math_func_test_data_to_tmp +): + config_file = "config_minimal.yml" + # Add sleep to distance3 job + _add_snippet( + file_path="jobs/distance3.py", + snippet=" import time\n time.sleep(5)\n", + position=21, + ) + # Add 1 second max runtime + _add_snippet( + file_path="config_minimal.yml", + snippet="\nsimulator:\n max_runtime: 2\n", + position=1, + ) + config = EverestConfig.load_file(config_file) + everserver.main() + status = everserver_status( + ServerConfig.get_everserver_status_path(config.output_dir) + ) + + assert status["status"] == ServerStatus.failed + print(status["message"]) + assert ( + "distance3 Failed with: The run is cancelled due to reaching MAX_RUNTIME" + in status["message"] + )