From dbffe8421bd4c63452e6e02c4dc564671c4d67c0 Mon Sep 17 00:00:00 2001 From: DanSava Date: Thu, 12 Dec 2024 13:44:06 +0200 Subject: [PATCH] Show error when everest fm_step reaches MAX_RUNTIME Also log the error in forward_models.log --- src/ert/run_models/everest_run_model.py | 15 ++++--- src/everest/detached/jobs/everserver.py | 18 ++++----- tests/everest/conftest.py | 22 +++++++++++ tests/everest/test_everlint.py | 20 ---------- tests/everest/test_everserver.py | 52 ++++++++++++++++++++++--- 5 files changed, 85 insertions(+), 42 deletions(-) diff --git a/src/ert/run_models/everest_run_model.py b/src/ert/run_models/everest_run_model.py index 8d7f9fdef12..4877c7d1c80 100644 --- a/src/ert/run_models/everest_run_model.py +++ b/src/ert/run_models/everest_run_model.py @@ -304,25 +304,27 @@ def _handle_errors( realization: str, fm_name: str, error_path: str, + fm_running_err: str, ) -> None: fm_id = f"b_{batch}_r_{realization}_s_{simulation}_{fm_name}" fm_logger = logging.getLogger("forward_models") - with open(error_path, encoding="utf-8") as errors: - error_str = errors.read() - + if Path(error_path).is_file(): + error_str = Path(error_path).read_text(encoding="utf-8") or fm_running_err + else: + error_str = fm_running_err error_hash = hash(error_str) err_msg = "Batch: {} Realization: {} Simulation: {} Job: {} Failed {}".format( - batch, realization, simulation, fm_name, "Error: {}\n {}" + batch, realization, simulation, fm_name, "\n Error: {} ID:{}" ) if error_hash not in self._fm_errors: error_id = len(self._fm_errors) - fm_logger.error(err_msg.format(error_id, error_str)) + fm_logger.error(err_msg.format(error_str, error_id)) self._fm_errors.update({error_hash: {"error_id": error_id, "ids": [fm_id]}}) elif fm_id not in self._fm_errors[error_hash]["ids"]: self._fm_errors[error_hash]["ids"].append(fm_id) error_id = self._fm_errors[error_hash]["error_id"] - fm_logger.error(err_msg.format(error_id, "")) + fm_logger.error(err_msg.format("Already reported as", error_id)) def _delete_runpath(self, run_args: list[RunArg]) -> None: logging.getLogger(EVEREST).debug("Simulation callback called") @@ -730,6 +732,7 @@ def _simulation_status(self, snapshot: EnsembleSnapshot) -> SimulationStatus: realization=realization, fm_name=fm_step.get("name", "Unknwon"), # type: ignore error_path=fm_step.get("stderr", ""), # type: ignore + fm_running_err=fm_step.get("error", ""), # type: ignore ) jobs_progress.append(jobs) diff --git a/src/everest/detached/jobs/everserver.py b/src/everest/detached/jobs/everserver.py index 936ccedf236..3c36a71fd71 100755 --- a/src/everest/detached/jobs/everserver.py +++ b/src/everest/detached/jobs/everserver.py @@ -398,17 +398,13 @@ def _failed_realizations_messages(shared_data): messages = [OPT_FAILURE_REALIZATIONS] failed = shared_data[SIM_PROGRESS_ENDPOINT]["status"]["failed"] if failed > 0: - # Find the set of jobs that failed. To keep the order in which they - # are found in the queue, use a dict as sets are not ordered. - failed_jobs = dict.fromkeys( - job["name"] - for queue in shared_data[SIM_PROGRESS_ENDPOINT]["progress"] - for job in queue - if job["status"] == JOB_FAILURE - ).keys() - messages.append( - "{} job failures caused by: {}".format(failed, ", ".join(failed_jobs)) - ) + # Report each unique pair of failed job name and error + for queue in shared_data[SIM_PROGRESS_ENDPOINT]["progress"]: + for job in queue: + if job["status"] == JOB_FAILURE: + err_msg = f"{job['name']} Failed with: {job.get('error', '')}" + if err_msg not in messages: + messages.append(err_msg) return messages diff --git a/tests/everest/conftest.py b/tests/everest/conftest.py index 9db0abf0aba..86e5aeed991 100644 --- a/tests/everest/conftest.py +++ b/tests/everest/conftest.py @@ -4,8 +4,10 @@ from collections.abc import Callable, Iterator from copy import deepcopy from pathlib import Path +from textwrap import dedent import pytest +import yaml from ert.config import QueueSystem from ert.ensemble_evaluator import EvaluatorServerConfig @@ -189,3 +191,23 @@ def run_config(test_data_case: str): return copied_path, config_file, optimal_result_json return run_config + + +@pytest.fixture +def min_config(): + yield yaml.safe_load( + dedent(""" + model: {"realizations": [0]} + controls: + - + name: my_control + type: well_control + min: 0 + max: 0.1 + variables: + - { name: test, initial_guess: 0.1 } + objective_functions: + - {name: my_objective} + config_path: . + """) + ) diff --git a/tests/everest/test_everlint.py b/tests/everest/test_everlint.py index b8aa0768bb4..894e8715630 100644 --- a/tests/everest/test_everlint.py +++ b/tests/everest/test_everlint.py @@ -13,26 +13,6 @@ from tests.everest.utils import relpath -@pytest.fixture -def min_config(): - yield yaml.safe_load( - dedent(""" - model: {"realizations": [0]} - controls: - - - name: my_control - type: well_control - min: 0 - max: 0.1 - variables: - - { name: test, initial_guess: 0.1 } - objective_functions: - - {name: my_objective} - config_path: . - """) - ) - - @pytest.mark.parametrize( "required_key", ( diff --git a/tests/everest/test_everserver.py b/tests/everest/test_everserver.py index 3c501440eec..e2b14d9d994 100644 --- a/tests/everest/test_everserver.py +++ b/tests/everest/test_everserver.py @@ -1,6 +1,7 @@ import json import os import ssl +import stat from functools import partial from pathlib import Path from unittest.mock import patch @@ -170,12 +171,12 @@ def test_everserver_status_running_complete( set_shared_status, progress=[ [ - {"name": "job1", "status": JOB_FAILURE}, - {"name": "job1", "status": JOB_FAILURE}, + {"name": "job1", "status": JOB_FAILURE, "error": "job 1 error 1"}, + {"name": "job1", "status": JOB_FAILURE, "error": "job 1 error 2"}, ], [ - {"name": "job2", "status": JOB_SUCCESS}, - {"name": "job2", "status": JOB_FAILURE}, + {"name": "job2", "status": JOB_SUCCESS, "error": ""}, + {"name": "job2", "status": JOB_FAILURE, "error": "job 2 error 1"}, ], ], ), @@ -193,7 +194,9 @@ def test_everserver_status_failed_job( # The server should fail and store a user-friendly message. assert status["status"] == ServerStatus.failed assert OPT_FAILURE_REALIZATIONS in status["message"] - assert "3 job failures caused by: job1, job2" in status["message"] + assert "job1 Failed with: job 1 error 1" in status["message"] + assert "job1 Failed with: job 1 error 2" in status["message"] + assert "job2 Failed with: job 2 error 1" in status["message"] @patch("sys.argv", ["name", "--config-file", "config_minimal.yml"]) @@ -271,3 +274,42 @@ def test_everserver_status_max_batch_num( filter_out_gradient=False, batches=None ) assert {data.batch for data in snapshot.simulation_data} == {0} + + +@patch("sys.argv", ["name", "--config-file", "config_minimal.yml"]) +@patch("everest.detached.jobs.everserver._configure_loggers") +@patch("everest.detached.jobs.everserver._generate_authentication") +@patch( + "everest.detached.jobs.everserver._generate_certificate", + return_value=(None, None, None), +) +@patch( + "everest.detached.jobs.everserver._find_open_port", + return_value=42, +) +@patch("everest.detached.jobs.everserver._write_hostfile") +@patch("everest.detached.jobs.everserver._everserver_thread") +def test_everserver_status_contains_max_runtime_failure( + _1, _2, _3, _4, _5, _6, change_to_tmpdir, min_config +): + config_file = "config_minimal.yml" + + Path("SLEEP_job").write_text("EXECUTABLE sleep", encoding="utf-8") + min_config["simulator"] = {"max_runtime": 2} + min_config["forward_model"] = ["sleep 5"] + min_config["install_jobs"] = [{"name": "sleep", "source": "SLEEP_job"}] + + config = EverestConfig(**min_config) + config.dump(config_file) + + everserver.main() + status = everserver_status( + ServerConfig.get_everserver_status_path(config.output_dir) + ) + + assert status["status"] == ServerStatus.failed + print(status["message"]) + assert ( + "sleep Failed with: The run is cancelled due to reaching MAX_RUNTIME" + in status["message"] + )