Skip to content

Commit

Permalink
Show error when everest fm_step reaches MAX_RUNTIME
Browse files Browse the repository at this point in the history
Also log the error in forward_models.log
  • Loading branch information
DanSava committed Dec 19, 2024
1 parent d875982 commit dbffe84
Show file tree
Hide file tree
Showing 5 changed files with 85 additions and 42 deletions.
15 changes: 9 additions & 6 deletions src/ert/run_models/everest_run_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -304,25 +304,27 @@ def _handle_errors(
realization: str,
fm_name: str,
error_path: str,
fm_running_err: str,
) -> None:
fm_id = f"b_{batch}_r_{realization}_s_{simulation}_{fm_name}"
fm_logger = logging.getLogger("forward_models")
with open(error_path, encoding="utf-8") as errors:
error_str = errors.read()

if Path(error_path).is_file():
error_str = Path(error_path).read_text(encoding="utf-8") or fm_running_err
else:
error_str = fm_running_err
error_hash = hash(error_str)
err_msg = "Batch: {} Realization: {} Simulation: {} Job: {} Failed {}".format(
batch, realization, simulation, fm_name, "Error: {}\n {}"
batch, realization, simulation, fm_name, "\n Error: {} ID:{}"
)

if error_hash not in self._fm_errors:
error_id = len(self._fm_errors)
fm_logger.error(err_msg.format(error_id, error_str))
fm_logger.error(err_msg.format(error_str, error_id))
self._fm_errors.update({error_hash: {"error_id": error_id, "ids": [fm_id]}})
elif fm_id not in self._fm_errors[error_hash]["ids"]:
self._fm_errors[error_hash]["ids"].append(fm_id)
error_id = self._fm_errors[error_hash]["error_id"]
fm_logger.error(err_msg.format(error_id, ""))
fm_logger.error(err_msg.format("Already reported as", error_id))

def _delete_runpath(self, run_args: list[RunArg]) -> None:
logging.getLogger(EVEREST).debug("Simulation callback called")
Expand Down Expand Up @@ -730,6 +732,7 @@ def _simulation_status(self, snapshot: EnsembleSnapshot) -> SimulationStatus:
realization=realization,
fm_name=fm_step.get("name", "Unknwon"), # type: ignore
error_path=fm_step.get("stderr", ""), # type: ignore
fm_running_err=fm_step.get("error", ""), # type: ignore
)
jobs_progress.append(jobs)

Expand Down
18 changes: 7 additions & 11 deletions src/everest/detached/jobs/everserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -398,17 +398,13 @@ def _failed_realizations_messages(shared_data):
messages = [OPT_FAILURE_REALIZATIONS]
failed = shared_data[SIM_PROGRESS_ENDPOINT]["status"]["failed"]
if failed > 0:
# Find the set of jobs that failed. To keep the order in which they
# are found in the queue, use a dict as sets are not ordered.
failed_jobs = dict.fromkeys(
job["name"]
for queue in shared_data[SIM_PROGRESS_ENDPOINT]["progress"]
for job in queue
if job["status"] == JOB_FAILURE
).keys()
messages.append(
"{} job failures caused by: {}".format(failed, ", ".join(failed_jobs))
)
# Report each unique pair of failed job name and error
for queue in shared_data[SIM_PROGRESS_ENDPOINT]["progress"]:
for job in queue:
if job["status"] == JOB_FAILURE:
err_msg = f"{job['name']} Failed with: {job.get('error', '')}"
if err_msg not in messages:
messages.append(err_msg)
return messages


Expand Down
22 changes: 22 additions & 0 deletions tests/everest/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@
from collections.abc import Callable, Iterator
from copy import deepcopy
from pathlib import Path
from textwrap import dedent

import pytest
import yaml

from ert.config import QueueSystem
from ert.ensemble_evaluator import EvaluatorServerConfig
Expand Down Expand Up @@ -189,3 +191,23 @@ def run_config(test_data_case: str):
return copied_path, config_file, optimal_result_json

return run_config


@pytest.fixture
def min_config():
yield yaml.safe_load(
dedent("""
model: {"realizations": [0]}
controls:
-
name: my_control
type: well_control
min: 0
max: 0.1
variables:
- { name: test, initial_guess: 0.1 }
objective_functions:
- {name: my_objective}
config_path: .
""")
)
20 changes: 0 additions & 20 deletions tests/everest/test_everlint.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,26 +13,6 @@
from tests.everest.utils import relpath


@pytest.fixture
def min_config():
yield yaml.safe_load(
dedent("""
model: {"realizations": [0]}
controls:
-
name: my_control
type: well_control
min: 0
max: 0.1
variables:
- { name: test, initial_guess: 0.1 }
objective_functions:
- {name: my_objective}
config_path: .
""")
)


@pytest.mark.parametrize(
"required_key",
(
Expand Down
52 changes: 47 additions & 5 deletions tests/everest/test_everserver.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import json
import os
import ssl
import stat
from functools import partial
from pathlib import Path
from unittest.mock import patch
Expand Down Expand Up @@ -170,12 +171,12 @@ def test_everserver_status_running_complete(
set_shared_status,
progress=[
[
{"name": "job1", "status": JOB_FAILURE},
{"name": "job1", "status": JOB_FAILURE},
{"name": "job1", "status": JOB_FAILURE, "error": "job 1 error 1"},
{"name": "job1", "status": JOB_FAILURE, "error": "job 1 error 2"},
],
[
{"name": "job2", "status": JOB_SUCCESS},
{"name": "job2", "status": JOB_FAILURE},
{"name": "job2", "status": JOB_SUCCESS, "error": ""},
{"name": "job2", "status": JOB_FAILURE, "error": "job 2 error 1"},
],
],
),
Expand All @@ -193,7 +194,9 @@ def test_everserver_status_failed_job(
# The server should fail and store a user-friendly message.
assert status["status"] == ServerStatus.failed
assert OPT_FAILURE_REALIZATIONS in status["message"]
assert "3 job failures caused by: job1, job2" in status["message"]
assert "job1 Failed with: job 1 error 1" in status["message"]
assert "job1 Failed with: job 1 error 2" in status["message"]
assert "job2 Failed with: job 2 error 1" in status["message"]


@patch("sys.argv", ["name", "--config-file", "config_minimal.yml"])
Expand Down Expand Up @@ -271,3 +274,42 @@ def test_everserver_status_max_batch_num(
filter_out_gradient=False, batches=None
)
assert {data.batch for data in snapshot.simulation_data} == {0}


@patch("sys.argv", ["name", "--config-file", "config_minimal.yml"])
@patch("everest.detached.jobs.everserver._configure_loggers")
@patch("everest.detached.jobs.everserver._generate_authentication")
@patch(
"everest.detached.jobs.everserver._generate_certificate",
return_value=(None, None, None),
)
@patch(
"everest.detached.jobs.everserver._find_open_port",
return_value=42,
)
@patch("everest.detached.jobs.everserver._write_hostfile")
@patch("everest.detached.jobs.everserver._everserver_thread")
def test_everserver_status_contains_max_runtime_failure(
_1, _2, _3, _4, _5, _6, change_to_tmpdir, min_config
):
config_file = "config_minimal.yml"

Path("SLEEP_job").write_text("EXECUTABLE sleep", encoding="utf-8")
min_config["simulator"] = {"max_runtime": 2}
min_config["forward_model"] = ["sleep 5"]
min_config["install_jobs"] = [{"name": "sleep", "source": "SLEEP_job"}]

config = EverestConfig(**min_config)
config.dump(config_file)

everserver.main()
status = everserver_status(
ServerConfig.get_everserver_status_path(config.output_dir)
)

assert status["status"] == ServerStatus.failed
print(status["message"])
assert (
"sleep Failed with: The run is cancelled due to reaching MAX_RUNTIME"
in status["message"]
)

0 comments on commit dbffe84

Please sign in to comment.