Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix submit fail leading to ensemble failures #9032

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/test_ert_with_slurm.yml
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ jobs:
set -e
export _ERT_TESTS_ALTERNATIVE_QUEUE=AlternativeQ
pytest tests/unit_tests/scheduler --slurm
pytest tests/ui_tests/cli/test_missing_runpath.py --slurm

- name: Test poly-example on slurm
run: |
Expand Down
47 changes: 45 additions & 2 deletions ci/testkomodo.sh
Original file line number Diff line number Diff line change
Expand Up @@ -40,23 +40,66 @@ start_tests () {

pushd ${CI_TEST_ROOT}/tests

# Run all ert tests except tests evaluating memory consumption and tests requiring windows manager (GUI tests)
pytest --eclipse-simulator -n logical --show-capture=stderr -v --max-worker-restart 0 \
-m "not limit_memory and not requires_window_manager" --benchmark-disable --dist loadgroup
return_code_ert_main_tests=$?

# Run all ert tests requiring windows manager (GUI tests) except tests evaluating memory consumption
pytest --eclipse-simulator -v --mpl \
-m "not limit_memory and requires_window_manager" --benchmark-disable
return_code_ert_gui_tests=$?

# Restricting the number of threads utilized by numpy to control memory consumption, as some tests evaluate memory usage and additional threads increase it.
export OMP_NUM_THREADS=1

# Run ert tests that evaluates memory consumption
pytest -n 2 --durations=0 -m "limit_memory" --memray
return_code_ert_memory_consumption_tests=$?

unset OMP_NUM_THREADS

basetemp=$(mktemp -d -p $_ERT_TESTS_SHARED_TMP)
pytest --timeout=3600 -v --$_ERT_TESTS_QUEUE_SYSTEM --basetemp="$basetemp" unit_tests/scheduler
# Run ert scheduler tests on the actual cluster (defined by $_ERT_TESTS_QUEUE_SYSTEM)
basetemp=$(mktemp -d -p "$_ERT_TESTS_SHARED_TMP")
pytest --timeout=3600 -v --"$_ERT_TESTS_QUEUE_SYSTEM" --basetemp="$basetemp" unit_tests/scheduler
return_code_ert_sheduler_tests=$?
rm -rf "$basetemp" || true

popd

run_ert_with_opm
return_code_opm_integration_test=$?

run_everest_tests
return_code_everest_tests=$?
set -e


return_code_combined_tests=0
# We error if one or more returncodes are nonzero
if [ "$return_code_ert_main_tests" -ne 0 ]; then
echo "One or more ERT tests failed."
return_code_combined_tests=1
fi
if [ "$return_code_ert_gui_tests" -ne 0 ]; then
echo "One or more ERT GUI tests failed."
return_code_combined_tests=1
fi
if [ "$return_code_ert_memory_consumption_tests" -ne 0 ]; then
echo "One or more ERT memory consumption tests failed."
return_code_combined_tests=1
fi
if [ "$return_code_ert_sheduler_tests" -ne 0 ]; then
echo "One or more ERT scheduler tests failed."
return_code_combined_tests=1
fi
if [ "$return_code_opm_integration_test" -ne 0 ]; then
echo "The ERT OPM integration test failed."
return_code_combined_tests=1
fi
if [ "$return_code_everest_tests" -ne 0 ]; then
echo "One or more Everest tests failed."
return_code_combined_tests=1
fi
return $return_code_combined_tests
}
4 changes: 3 additions & 1 deletion src/ert/gui/simulation/run_dialog.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ def __init__(
self._run_model = run_model
self._event_queue = event_queue
self._notifier = notifier
self.fail_msg_box: Optional[ErtMessageBox] = None

self._minimum_width = 1200
self._minimum_height = 800
Expand Down Expand Up @@ -407,7 +408,8 @@ def _on_simulation_done(self, failed: bool, msg: str) -> None:
if failed:
self.update_total_progress(1.0, "Failed")
self.fail_msg_box = ErtMessageBox("ERT experiment failed!", msg, self)
self.fail_msg_box.exec_()
self.fail_msg_box.setModal(True)
self.fail_msg_box.show()
else:
self.update_total_progress(1.0, "Experiment completed.")

Expand Down
4 changes: 4 additions & 0 deletions src/ert/scheduler/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@
"""Bash and other shells add an offset of 128 to the signal value when a process exited due to a signal"""


class FailedSubmit(RuntimeError):
pass


class Driver(ABC):
"""Adapter for the HPC cluster."""

Expand Down
35 changes: 24 additions & 11 deletions src/ert/scheduler/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from ert.load_status import LoadStatus
from ert.storage.realization_storage_state import RealizationStorageState

from .driver import Driver
from .driver import Driver, FailedSubmit

if TYPE_CHECKING:
from ert.ensemble_evaluator import Realization
Expand Down Expand Up @@ -63,13 +63,20 @@ def __init__(self, scheduler: Scheduler, real: Realization) -> None:
self.state = JobState.WAITING
self.started = asyncio.Event()
self.returncode: asyncio.Future[int] = asyncio.Future()
self._aborted = False
self._scheduler: Scheduler = scheduler
self._callback_status_msg: str = ""
self._requested_max_submit: Optional[int] = None
self._start_time: Optional[float] = None
self._end_time: Optional[float] = None

def unschedule(self, msg: str) -> None:
self.state = JobState.ABORTED
self.real.run_arg.ensemble_storage.set_failure(
self.real.run_arg.iens,
RealizationStorageState.LOAD_FAILURE,
f"Job not scheduled due to {msg}",
)

@property
def iens(self) -> int:
return self.real.iens
Expand All @@ -95,15 +102,21 @@ async def _submit_and_run_once(self, sem: asyncio.BoundedSemaphore) -> None:
if self._scheduler.submit_sleep_state:
await self._scheduler.submit_sleep_state.sleep_until_we_can_submit()
await self._send(JobState.SUBMITTING)
await self.driver.submit(
self.real.iens,
self.real.job_script,
self.real.run_arg.runpath,
num_cpu=self.real.num_cpu,
realization_memory=self.real.realization_memory,
name=self.real.run_arg.job_name,
runpath=Path(self.real.run_arg.runpath),
)
try:
await self.driver.submit(
self.real.iens,
self.real.job_script,
self.real.run_arg.runpath,
num_cpu=self.real.num_cpu,
realization_memory=self.real.realization_memory,
name=self.real.run_arg.job_name,
runpath=Path(self.real.run_arg.runpath),
)
except FailedSubmit as err:
await self._send(JobState.FAILED)
logger.error(f"Failed to submit: {err}")
self.returncode.cancel()
return

await self._send(JobState.PENDING)
await self.started.wait()
Expand Down
34 changes: 20 additions & 14 deletions src/ert/scheduler/lsf_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@
import shlex
import shutil
import stat
import tempfile
import time
from dataclasses import dataclass
from pathlib import Path
from tempfile import NamedTemporaryFile
from typing import (
Dict,
Iterable,
Expand All @@ -27,7 +27,7 @@
get_args,
)

from .driver import SIGNAL_OFFSET, Driver
from .driver import SIGNAL_OFFSET, Driver, FailedSubmit
from .event import Event, FinishedEvent, StartedEvent

_POLL_PERIOD = 2.0 # seconds
Expand Down Expand Up @@ -298,16 +298,22 @@ async def submit(
f"exec -a {shlex.quote(executable)} {executable} {shlex.join(args)}\n"
)
script_path: Optional[Path] = None
with tempfile.NamedTemporaryFile(
dir=runpath,
prefix=".lsf_submit_",
suffix=".sh",
mode="w",
encoding="utf-8",
delete=False,
) as script_handle:
script_handle.write(script)
script_path = Path(script_handle.name)
try:
with NamedTemporaryFile(
dir=runpath,
prefix=".lsf_submit_",
suffix=".sh",
mode="w",
encoding="utf-8",
delete=False,
) as script_handle:
script_handle.write(script)
script_path = Path(script_handle.name)
except OSError as err:
error_message = f"Could not create submit script: {err}"
self._job_error_message_by_iens[iens] = error_message
raise FailedSubmit(error_message) from err

assert script_path is not None
script_path.chmod(script_path.stat().st_mode | stat.S_IEXEC)

Expand Down Expand Up @@ -345,11 +351,11 @@ async def submit(
)
if not process_success:
self._job_error_message_by_iens[iens] = process_message
raise RuntimeError(process_message)
raise FailedSubmit(process_message)

match = re.search("Job <([0-9]+)> is submitted to .*queue", process_message)
if match is None:
raise RuntimeError(
raise FailedSubmit(
f"Could not understand '{process_message}' from bsub"
)
job_id = match[1]
Expand Down
4 changes: 2 additions & 2 deletions src/ert/scheduler/openpbs_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
get_type_hints,
)

from .driver import Driver
from .driver import Driver, FailedSubmit
from .event import Event, FinishedEvent, StartedEvent

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -274,7 +274,7 @@ async def submit(
)
if not process_success:
self._job_error_message_by_iens[iens] = process_message
raise RuntimeError(process_message)
raise FailedSubmit(process_message)

job_id_ = process_message
logger.debug(f"Realization {iens} accepted by PBS, got id {job_id_}")
Expand Down
52 changes: 40 additions & 12 deletions src/ert/scheduler/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from pydantic.dataclasses import dataclass

from _ert.async_utils import get_running_loop
from _ert.events import Event, ForwardModelStepChecksum, Id
from _ert.events import Event, ForwardModelStepChecksum, Id, event_from_dict
from ert.constant_filenames import CERT_FILE

from .driver import Driver
Expand Down Expand Up @@ -276,10 +276,26 @@ async def execute(
# does internalization at a time
forward_model_ok_lock = asyncio.Lock()
for iens, job in self._jobs.items():
self._job_tasks[iens] = asyncio.create_task(
job.run(sem, forward_model_ok_lock, self._max_submit),
name=f"job-{iens}_task",
)
if job.state != JobState.ABORTED:
self._job_tasks[iens] = asyncio.create_task(
job.run(sem, forward_model_ok_lock, self._max_submit),
name=f"job-{iens}_task",
)
else:
failure = job.real.run_arg.ensemble_storage.get_failure(iens)
await self._events.put(
event_from_dict(
{
"ensemble": self._ens_id,
"event_type": Id.REALIZATION_FAILURE,
"queue_event_type": JobState.FAILED,
"callback_status_message": failure.message
if failure
else None,
"real": str(iens),
}
)
)
logger.info("All tasks started")
self._running.set()
try:
Expand Down Expand Up @@ -317,8 +333,14 @@ async def _process_event_queue(self) -> None:

def _update_jobs_json(self, iens: int, runpath: str) -> None:
cert_path = f"{runpath}/{CERT_FILE}"
if self._ee_cert is not None:
Path(cert_path).write_text(self._ee_cert, encoding="utf-8")
try:
if self._ee_cert is not None:
Path(cert_path).write_text(self._ee_cert, encoding="utf-8")
except OSError as err:
error_msg = f"Could not write ensemble certificate: {err}"
self._jobs[iens].unschedule(error_msg)
logger.error(error_msg)
return
jobs = _JobsJson(
experiment_id=None,
ens_id=self._ens_id,
Expand All @@ -328,8 +350,14 @@ def _update_jobs_json(self, iens: int, runpath: str) -> None:
ee_cert_path=cert_path if self._ee_cert is not None else None,
)
jobs_path = os.path.join(runpath, "jobs.json")
with open(jobs_path, "rb") as fp:
data = orjson.loads(fp.read())
with open(jobs_path, "wb") as fp:
data.update(asdict(jobs))
fp.write(orjson.dumps(data, option=orjson.OPT_INDENT_2))
try:
with open(jobs_path, "rb") as fp:
data = orjson.loads(fp.read())
with open(jobs_path, "wb") as fp:
data.update(asdict(jobs))
fp.write(orjson.dumps(data, option=orjson.OPT_INDENT_2))
except OSError as err:
error_msg = f"Could not update jobs.json: {err}"
self._jobs[iens].unschedule(error_msg)
logger.error(error_msg)
return
33 changes: 19 additions & 14 deletions src/ert/scheduler/slurm_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,18 @@
import logging
import shlex
import stat
import tempfile
import time
from dataclasses import dataclass
from enum import Enum, auto
from pathlib import Path
from tempfile import NamedTemporaryFile
from typing import (
Iterator,
Optional,
Tuple,
)

from .driver import SIGNAL_OFFSET, Driver
from .driver import SIGNAL_OFFSET, Driver, FailedSubmit
from .event import Event, FinishedEvent, StartedEvent

SLURM_FAILED_EXIT_CODE_FETCH = SIGNAL_OFFSET + 66
Expand Down Expand Up @@ -184,16 +184,21 @@ async def submit(
f"exec -a {shlex.quote(executable)} {executable} {shlex.join(args)}\n"
)
script_path: Optional[Path] = None
with tempfile.NamedTemporaryFile(
dir=runpath,
prefix=".slurm_submit_",
suffix=".sh",
mode="w",
encoding="utf-8",
delete=False,
) as script_handle:
script_handle.write(script)
script_path = Path(script_handle.name)
try:
with NamedTemporaryFile(
dir=runpath,
prefix=".slurm_submit_",
suffix=".sh",
mode="w",
encoding="utf-8",
delete=False,
) as script_handle:
script_handle.write(script)
script_path = Path(script_handle.name)
except OSError as err:
error_message = f"Could not create submit script: {err}"
self._job_error_message_by_iens[iens] = error_message
raise FailedSubmit(error_message) from err
assert script_path is not None
script_path.chmod(script_path.stat().st_mode | stat.S_IEXEC)
sbatch_with_args = [*self._submit_cmd(name, runpath, num_cpu), str(script_path)]
Expand All @@ -214,10 +219,10 @@ async def submit(
)
if not process_success:
self._job_error_message_by_iens[iens] = process_message
raise RuntimeError(process_message)
raise FailedSubmit(process_message)

if not process_message:
raise RuntimeError("sbatch returned empty jobid")
raise FailedSubmit("sbatch returned empty jobid")
job_id = process_message
logger.info(f"Realization {iens} accepted by SLURM, got id {job_id}")

Expand Down
Loading
Loading