Skip to content

Commit

Permalink
Handle inaccesible runpath gracefully
Browse files Browse the repository at this point in the history
  • Loading branch information
eivindjahren committed Oct 24, 2024
1 parent f57a5b7 commit c85e16f
Show file tree
Hide file tree
Showing 13 changed files with 332 additions and 76 deletions.
1 change: 1 addition & 0 deletions .github/workflows/test_ert_with_slurm.yml
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ jobs:
set -e
export _ERT_TESTS_ALTERNATIVE_QUEUE=AlternativeQ
pytest tests/unit_tests/scheduler --slurm
pytest tests/ui_tests/cli/test_missing_runpath.py --slurm
- name: Test poly-example on slurm
run: |
Expand Down
4 changes: 3 additions & 1 deletion src/ert/gui/simulation/run_dialog.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ def __init__(
self._run_model = run_model
self._event_queue = event_queue
self._notifier = notifier
self.fail_msg_box: Optional[ErtMessageBox] = None

self._minimum_width = 1200
self._minimum_height = 800
Expand Down Expand Up @@ -407,7 +408,8 @@ def _on_simulation_done(self, failed: bool, msg: str) -> None:
if failed:
self.update_total_progress(1.0, "Failed")
self.fail_msg_box = ErtMessageBox("ERT experiment failed!", msg, self)
self.fail_msg_box.exec_()
self.fail_msg_box.setModal(True)
self.fail_msg_box.show()
else:
self.update_total_progress(1.0, "Experiment completed.")

Expand Down
4 changes: 4 additions & 0 deletions src/ert/scheduler/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@
"""Bash and other shells add an offset of 128 to the signal value when a process exited due to a signal"""


class FailedSubmit(RuntimeError):
pass


class Driver(ABC):
"""Adapter for the HPC cluster."""

Expand Down
35 changes: 24 additions & 11 deletions src/ert/scheduler/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from ert.load_status import LoadStatus
from ert.storage.realization_storage_state import RealizationStorageState

from .driver import Driver
from .driver import Driver, FailedSubmit

if TYPE_CHECKING:
from ert.ensemble_evaluator import Realization
Expand Down Expand Up @@ -63,13 +63,20 @@ def __init__(self, scheduler: Scheduler, real: Realization) -> None:
self.state = JobState.WAITING
self.started = asyncio.Event()
self.returncode: asyncio.Future[int] = asyncio.Future()
self._aborted = False
self._scheduler: Scheduler = scheduler
self._callback_status_msg: str = ""
self._requested_max_submit: Optional[int] = None
self._start_time: Optional[float] = None
self._end_time: Optional[float] = None

def unschedule(self, msg: str) -> None:
self.state = JobState.ABORTED
self.real.run_arg.ensemble_storage.set_failure(
self.real.run_arg.iens,
RealizationStorageState.LOAD_FAILURE,
f"Job not scheduled due to {msg}",
)

@property
def iens(self) -> int:
return self.real.iens
Expand All @@ -95,15 +102,21 @@ async def _submit_and_run_once(self, sem: asyncio.BoundedSemaphore) -> None:
if self._scheduler.submit_sleep_state:
await self._scheduler.submit_sleep_state.sleep_until_we_can_submit()
await self._send(JobState.SUBMITTING)
await self.driver.submit(
self.real.iens,
self.real.job_script,
self.real.run_arg.runpath,
num_cpu=self.real.num_cpu,
realization_memory=self.real.realization_memory,
name=self.real.run_arg.job_name,
runpath=Path(self.real.run_arg.runpath),
)
try:
await self.driver.submit(
self.real.iens,
self.real.job_script,
self.real.run_arg.runpath,
num_cpu=self.real.num_cpu,
realization_memory=self.real.realization_memory,
name=self.real.run_arg.job_name,
runpath=Path(self.real.run_arg.runpath),
)
except FailedSubmit as err:
await self._send(JobState.FAILED)
logger.error(f"Failed to submit: {err}")
self.returncode.cancel()
return

await self._send(JobState.PENDING)
await self.started.wait()
Expand Down
34 changes: 20 additions & 14 deletions src/ert/scheduler/lsf_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@
import shlex
import shutil
import stat
import tempfile
import time
from dataclasses import dataclass
from pathlib import Path
from tempfile import NamedTemporaryFile
from typing import (
Dict,
Iterable,
Expand All @@ -27,7 +27,7 @@
get_args,
)

from .driver import SIGNAL_OFFSET, Driver
from .driver import SIGNAL_OFFSET, Driver, FailedSubmit
from .event import Event, FinishedEvent, StartedEvent

_POLL_PERIOD = 2.0 # seconds
Expand Down Expand Up @@ -298,16 +298,22 @@ async def submit(
f"exec -a {shlex.quote(executable)} {executable} {shlex.join(args)}\n"
)
script_path: Optional[Path] = None
with tempfile.NamedTemporaryFile(
dir=runpath,
prefix=".lsf_submit_",
suffix=".sh",
mode="w",
encoding="utf-8",
delete=False,
) as script_handle:
script_handle.write(script)
script_path = Path(script_handle.name)
try:
with NamedTemporaryFile(
dir=runpath,
prefix=".lsf_submit_",
suffix=".sh",
mode="w",
encoding="utf-8",
delete=False,
) as script_handle:
script_handle.write(script)
script_path = Path(script_handle.name)
except OSError as err:
error_message = f"Could not create submit script: {err}"
self._job_error_message_by_iens[iens] = error_message
raise FailedSubmit(error_message) from err

assert script_path is not None
script_path.chmod(script_path.stat().st_mode | stat.S_IEXEC)

Expand Down Expand Up @@ -345,11 +351,11 @@ async def submit(
)
if not process_success:
self._job_error_message_by_iens[iens] = process_message
raise RuntimeError(process_message)
raise FailedSubmit(process_message)

match = re.search("Job <([0-9]+)> is submitted to .*queue", process_message)
if match is None:
raise RuntimeError(
raise FailedSubmit(
f"Could not understand '{process_message}' from bsub"
)
job_id = match[1]
Expand Down
4 changes: 2 additions & 2 deletions src/ert/scheduler/openpbs_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
get_type_hints,
)

from .driver import Driver
from .driver import Driver, FailedSubmit
from .event import Event, FinishedEvent, StartedEvent

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -274,7 +274,7 @@ async def submit(
)
if not process_success:
self._job_error_message_by_iens[iens] = process_message
raise RuntimeError(process_message)
raise FailedSubmit(process_message)

job_id_ = process_message
logger.debug(f"Realization {iens} accepted by PBS, got id {job_id_}")
Expand Down
50 changes: 38 additions & 12 deletions src/ert/scheduler/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from pydantic.dataclasses import dataclass

from _ert.async_utils import get_running_loop
from _ert.events import Event, ForwardModelStepChecksum, Id
from _ert.events import Event, ForwardModelStepChecksum, Id, event_from_dict
from ert.constant_filenames import CERT_FILE

from .driver import Driver
Expand Down Expand Up @@ -276,10 +276,24 @@ async def execute(
# does internalization at a time
forward_model_ok_lock = asyncio.Lock()
for iens, job in self._jobs.items():
self._job_tasks[iens] = asyncio.create_task(
job.run(sem, forward_model_ok_lock, self._max_submit),
name=f"job-{iens}_task",
)
if job.state != JobState.ABORTED:
self._job_tasks[iens] = asyncio.create_task(
job.run(sem, forward_model_ok_lock, self._max_submit),
name=f"job-{iens}_task",
)
else:
failure = job.real.run_arg.ensemble_storage.get_failure(iens)
await self._events.put(
event_from_dict(
{
"ensemble": self._ens_id,
"event_type": Id.REALIZATION_FAILURE,
"queue_event_type": JobState.FAILED,
"message": failure.message if failure else None,
"real": str(iens),
}
)
)
logger.info("All tasks started")
self._running.set()
try:
Expand Down Expand Up @@ -317,8 +331,14 @@ async def _process_event_queue(self) -> None:

def _update_jobs_json(self, iens: int, runpath: str) -> None:
cert_path = f"{runpath}/{CERT_FILE}"
if self._ee_cert is not None:
Path(cert_path).write_text(self._ee_cert, encoding="utf-8")
try:
if self._ee_cert is not None:
Path(cert_path).write_text(self._ee_cert, encoding="utf-8")
except OSError as err:
error_msg = f"Could not write ensemble certificate: {err}"
self._jobs[iens].unschedule(error_msg)
logger.error(error_msg)
return
jobs = _JobsJson(
experiment_id=None,
ens_id=self._ens_id,
Expand All @@ -328,8 +348,14 @@ def _update_jobs_json(self, iens: int, runpath: str) -> None:
ee_cert_path=cert_path if self._ee_cert is not None else None,
)
jobs_path = os.path.join(runpath, "jobs.json")
with open(jobs_path, "rb") as fp:
data = orjson.loads(fp.read())
with open(jobs_path, "wb") as fp:
data.update(asdict(jobs))
fp.write(orjson.dumps(data, option=orjson.OPT_INDENT_2))
try:
with open(jobs_path, "rb") as fp:
data = orjson.loads(fp.read())
with open(jobs_path, "wb") as fp:
data.update(asdict(jobs))
fp.write(orjson.dumps(data, option=orjson.OPT_INDENT_2))
except OSError as err:
error_msg = f"Could not update jobs.json: {err}"
self._jobs[iens].unschedule(error_msg)
logger.error(error_msg)
return
33 changes: 19 additions & 14 deletions src/ert/scheduler/slurm_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,18 @@
import logging
import shlex
import stat
import tempfile
import time
from dataclasses import dataclass
from enum import Enum, auto
from pathlib import Path
from tempfile import NamedTemporaryFile
from typing import (
Iterator,
Optional,
Tuple,
)

from .driver import SIGNAL_OFFSET, Driver
from .driver import SIGNAL_OFFSET, Driver, FailedSubmit
from .event import Event, FinishedEvent, StartedEvent

SLURM_FAILED_EXIT_CODE_FETCH = SIGNAL_OFFSET + 66
Expand Down Expand Up @@ -184,16 +184,21 @@ async def submit(
f"exec -a {shlex.quote(executable)} {executable} {shlex.join(args)}\n"
)
script_path: Optional[Path] = None
with tempfile.NamedTemporaryFile(
dir=runpath,
prefix=".slurm_submit_",
suffix=".sh",
mode="w",
encoding="utf-8",
delete=False,
) as script_handle:
script_handle.write(script)
script_path = Path(script_handle.name)
try:
with NamedTemporaryFile(
dir=runpath,
prefix=".slurm_submit_",
suffix=".sh",
mode="w",
encoding="utf-8",
delete=False,
) as script_handle:
script_handle.write(script)
script_path = Path(script_handle.name)
except OSError as err:
error_message = f"Could not create submit script: {err}"
self._job_error_message_by_iens[iens] = error_message
raise FailedSubmit(error_message) from err
assert script_path is not None
script_path.chmod(script_path.stat().st_mode | stat.S_IEXEC)
sbatch_with_args = [*self._submit_cmd(name, runpath, num_cpu), str(script_path)]
Expand All @@ -214,10 +219,10 @@ async def submit(
)
if not process_success:
self._job_error_message_by_iens[iens] = process_message
raise RuntimeError(process_message)
raise FailedSubmit(process_message)

if not process_message:
raise RuntimeError("sbatch returned empty jobid")
raise FailedSubmit("sbatch returned empty jobid")
job_id = process_message
logger.info(f"Realization {iens} accepted by SLURM, got id {job_id}")

Expand Down
Loading

0 comments on commit c85e16f

Please sign in to comment.