Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Show error when everest fm_step reaches MAX_RUNTIME #9522

Merged
merged 2 commits into from
Dec 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 9 additions & 6 deletions src/ert/run_models/everest_run_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -304,25 +304,27 @@ def _handle_errors(
realization: str,
fm_name: str,
error_path: str,
fm_running_err: str,
) -> None:
fm_id = f"b_{batch}_r_{realization}_s_{simulation}_{fm_name}"
fm_logger = logging.getLogger("forward_models")
with open(error_path, encoding="utf-8") as errors:
error_str = errors.read()

if Path(error_path).is_file():
error_str = Path(error_path).read_text(encoding="utf-8") or fm_running_err
oyvindeide marked this conversation as resolved.
Show resolved Hide resolved
else:
error_str = fm_running_err
error_hash = hash(error_str)
err_msg = "Batch: {} Realization: {} Simulation: {} Job: {} Failed {}".format(
batch, realization, simulation, fm_name, "Error: {}\n {}"
batch, realization, simulation, fm_name, "\n Error: {} ID:{}"
)

if error_hash not in self._fm_errors:
error_id = len(self._fm_errors)
fm_logger.error(err_msg.format(error_id, error_str))
fm_logger.error(err_msg.format(error_str, error_id))
self._fm_errors.update({error_hash: {"error_id": error_id, "ids": [fm_id]}})
elif fm_id not in self._fm_errors[error_hash]["ids"]:
self._fm_errors[error_hash]["ids"].append(fm_id)
error_id = self._fm_errors[error_hash]["error_id"]
fm_logger.error(err_msg.format(error_id, ""))
fm_logger.error(err_msg.format("Already reported as", error_id))

def _delete_runpath(self, run_args: list[RunArg]) -> None:
logging.getLogger(EVEREST).debug("Simulation callback called")
Expand Down Expand Up @@ -730,6 +732,7 @@ def _simulation_status(self, snapshot: EnsembleSnapshot) -> SimulationStatus:
realization=realization,
fm_name=fm_step.get("name", "Unknwon"), # type: ignore
error_path=fm_step.get("stderr", ""), # type: ignore
fm_running_err=fm_step.get("error", ""), # type: ignore
)
jobs_progress.append(jobs)

Expand Down
18 changes: 7 additions & 11 deletions src/everest/detached/jobs/everserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -398,17 +398,13 @@ def _failed_realizations_messages(shared_data):
messages = [OPT_FAILURE_REALIZATIONS]
failed = shared_data[SIM_PROGRESS_ENDPOINT]["status"]["failed"]
if failed > 0:
# Find the set of jobs that failed. To keep the order in which they
# are found in the queue, use a dict as sets are not ordered.
failed_jobs = dict.fromkeys(
job["name"]
for queue in shared_data[SIM_PROGRESS_ENDPOINT]["progress"]
for job in queue
if job["status"] == JOB_FAILURE
).keys()
messages.append(
"{} job failures caused by: {}".format(failed, ", ".join(failed_jobs))
)
# Report each unique pair of failed job name and error
for queue in shared_data[SIM_PROGRESS_ENDPOINT]["progress"]:
for job in queue:
if job["status"] == JOB_FAILURE:
err_msg = f"{job['name']} Failed with: {job.get('error', '')}"
if err_msg not in messages:
messages.append(err_msg)
return messages


Expand Down
38 changes: 38 additions & 0 deletions tests/everest/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,18 @@
from collections.abc import Callable, Iterator
from copy import deepcopy
from pathlib import Path
from textwrap import dedent
from unittest.mock import MagicMock

import pytest
import yaml

from ert.config import QueueSystem
from ert.ensemble_evaluator import EvaluatorServerConfig
from ert.run_models.everest_run_model import EverestRunModel
from everest.config import EverestConfig
from everest.config.control_config import ControlConfig
from everest.detached.jobs import everserver
from tests.everest.utils import relpath


Expand Down Expand Up @@ -189,3 +193,37 @@ def run_config(test_data_case: str):
return copied_path, config_file, optimal_result_json

return run_config


@pytest.fixture
def min_config():
yield yaml.safe_load(
dedent("""
model: {"realizations": [0]}
controls:
-
name: my_control
type: well_control
min: 0
max: 0.1
variables:
- { name: test, initial_guess: 0.1 }
objective_functions:
- {name: my_objective}
config_path: .
""")
)


@pytest.fixture()
def mock_server(monkeypatch):
monkeypatch.setattr(everserver, "_configure_loggers", MagicMock())
monkeypatch.setattr(everserver, "_generate_authentication", MagicMock())
monkeypatch.setattr(
everserver, "_generate_certificate", lambda *args: (None, None, None)
)
monkeypatch.setattr(everserver, "_find_open_port", lambda *args, **kwargs: 42)
monkeypatch.setattr(everserver, "_write_hostfile", MagicMock())
monkeypatch.setattr(everserver, "_everserver_thread", MagicMock())
monkeypatch.setattr(everserver, "export_to_csv", MagicMock())
monkeypatch.setattr(everserver, "export_with_progress", MagicMock())
20 changes: 0 additions & 20 deletions tests/everest/test_everlint.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,26 +13,6 @@
from tests.everest.utils import relpath


@pytest.fixture
def min_config():
yield yaml.safe_load(
dedent("""
model: {"realizations": [0]}
controls:
-
name: my_control
type: well_control
min: 0
max: 0.1
variables:
- { name: test, initial_guess: 0.1 }
objective_functions:
- {name: my_objective}
config_path: .
""")
)


@pytest.mark.parametrize(
"required_key",
(
Expand Down
103 changes: 38 additions & 65 deletions tests/everest/test_everserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,21 +102,6 @@ def test_everserver_status_failure(_1, copy_math_func_test_data_to_tmp):


@patch("sys.argv", ["name", "--config-file", "config_minimal.yml"])
@patch("everest.detached.jobs.everserver._configure_loggers")
@patch("everest.detached.jobs.everserver._generate_authentication")
@patch(
"everest.detached.jobs.everserver._generate_certificate",
return_value=(None, None, None),
)
@patch(
"everest.detached.jobs.everserver._find_open_port",
return_value=42,
)
@patch(
"everest.detached.jobs.everserver._write_hostfile",
side_effect=partial(check_status, status=ServerStatus.starting),
)
@patch("everest.detached.jobs.everserver._everserver_thread")
@patch(
"ert.run_models.everest_run_model.EverestRunModel.run_experiment",
autospec=True,
Expand All @@ -125,13 +110,8 @@ def test_everserver_status_failure(_1, copy_math_func_test_data_to_tmp):
status=ServerStatus.running,
),
)
@patch(
"everest.detached.jobs.everserver.check_for_errors",
return_value=([], False),
)
@patch("everest.detached.jobs.everserver.export_to_csv")
def test_everserver_status_running_complete(
_1, _2, _3, _4, _5, _6, _7, _8, _9, copy_math_func_test_data_to_tmp
_1, mock_server, copy_math_func_test_data_to_tmp
):
config_file = "config_minimal.yml"
config = EverestConfig.load_file(config_file)
Expand All @@ -145,18 +125,6 @@ def test_everserver_status_running_complete(


@patch("sys.argv", ["name", "--config-file", "config_minimal.yml"])
@patch("everest.detached.jobs.everserver._configure_loggers")
@patch("everest.detached.jobs.everserver._generate_authentication")
@patch(
"everest.detached.jobs.everserver._generate_certificate",
return_value=(None, None, None),
)
@patch(
"everest.detached.jobs.everserver._find_open_port",
return_value=42,
)
@patch("everest.detached.jobs.everserver._write_hostfile")
@patch("everest.detached.jobs.everserver._everserver_thread")
@patch(
"ert.run_models.everest_run_model.EverestRunModel.run_experiment",
autospec=True,
Expand All @@ -170,18 +138,18 @@ def test_everserver_status_running_complete(
set_shared_status,
progress=[
[
{"name": "job1", "status": JOB_FAILURE},
{"name": "job1", "status": JOB_FAILURE},
{"name": "job1", "status": JOB_FAILURE, "error": "job 1 error 1"},
{"name": "job1", "status": JOB_FAILURE, "error": "job 1 error 2"},
],
[
{"name": "job2", "status": JOB_SUCCESS},
{"name": "job2", "status": JOB_FAILURE},
{"name": "job2", "status": JOB_SUCCESS, "error": ""},
{"name": "job2", "status": JOB_FAILURE, "error": "job 2 error 1"},
],
],
),
)
def test_everserver_status_failed_job(
_1, _2, _3, _4, _5, _6, _7, _8, copy_math_func_test_data_to_tmp
_1, _2, mock_server, copy_math_func_test_data_to_tmp
):
config_file = "config_minimal.yml"
config = EverestConfig.load_file(config_file)
Expand All @@ -193,22 +161,12 @@ def test_everserver_status_failed_job(
# The server should fail and store a user-friendly message.
assert status["status"] == ServerStatus.failed
assert OPT_FAILURE_REALIZATIONS in status["message"]
assert "3 job failures caused by: job1, job2" in status["message"]
assert "job1 Failed with: job 1 error 1" in status["message"]
assert "job1 Failed with: job 1 error 2" in status["message"]
assert "job2 Failed with: job 2 error 1" in status["message"]


@patch("sys.argv", ["name", "--config-file", "config_minimal.yml"])
@patch("everest.detached.jobs.everserver._configure_loggers")
@patch("everest.detached.jobs.everserver._generate_authentication")
@patch(
"everest.detached.jobs.everserver._generate_certificate",
return_value=(None, None, None),
)
@patch(
"everest.detached.jobs.everserver._find_open_port",
return_value=42,
)
@patch("everest.detached.jobs.everserver._write_hostfile")
@patch("everest.detached.jobs.everserver._everserver_thread")
@patch(
"ert.run_models.everest_run_model.EverestRunModel.run_experiment",
autospec=True,
Expand All @@ -221,7 +179,7 @@ def test_everserver_status_failed_job(
side_effect=partial(set_shared_status, progress=[]),
)
def test_everserver_status_exception(
_1, _2, _3, _4, _5, _6, _7, _8, copy_math_func_test_data_to_tmp
_1, _2, mock_server, copy_math_func_test_data_to_tmp
):
config_file = "config_minimal.yml"
config = EverestConfig.load_file(config_file)
Expand All @@ -237,24 +195,12 @@ def test_everserver_status_exception(


@patch("sys.argv", ["name", "--config-file", "config_one_batch.yml"])
@patch("everest.detached.jobs.everserver._configure_loggers")
@patch("everest.detached.jobs.everserver._generate_authentication")
@patch(
"everest.detached.jobs.everserver._generate_certificate",
return_value=(None, None, None),
)
@patch(
"everest.detached.jobs.everserver._find_open_port",
return_value=42,
)
@patch("everest.detached.jobs.everserver._write_hostfile")
@patch("everest.detached.jobs.everserver._everserver_thread")
@patch(
"everest.detached.jobs.everserver._sim_monitor",
side_effect=partial(set_shared_status, progress=[]),
)
def test_everserver_status_max_batch_num(
_1, _2, _3, _4, _5, _6, _7, copy_math_func_test_data_to_tmp
_1, mock_server, copy_math_func_test_data_to_tmp
):
config_file = "config_one_batch.yml"
config = EverestConfig.load_file(config_file)
Expand All @@ -271,3 +217,30 @@ def test_everserver_status_max_batch_num(
filter_out_gradient=False, batches=None
)
assert {data.batch for data in snapshot.simulation_data} == {0}


@patch("sys.argv", ["name", "--config-file", "config_minimal.yml"])
def test_everserver_status_contains_max_runtime_failure(
mock_server, change_to_tmpdir, min_config
):
config_file = "config_minimal.yml"

Path("SLEEP_job").write_text("EXECUTABLE sleep", encoding="utf-8")
min_config["simulator"] = {"max_runtime": 2}
min_config["forward_model"] = ["sleep 5"]
min_config["install_jobs"] = [{"name": "sleep", "source": "SLEEP_job"}]

config = EverestConfig(**min_config)
config.dump(config_file)

everserver.main()
status = everserver_status(
ServerConfig.get_everserver_status_path(config.output_dir)
)

assert status["status"] == ServerStatus.failed
print(status["message"])
assert (
"sleep Failed with: The run is cancelled due to reaching MAX_RUNTIME"
in status["message"]
)
Loading