Skip to content

Commit

Permalink
Retrieve exec_hosts from bjobs and log to azure
Browse files Browse the repository at this point in the history
Retrieve exec_hosts from bjobs and log to azure
Update bjobs output with placeholder for exec_hosts in tests
Add tests for bjobs_exec_host parsing
  • Loading branch information
andreas-el committed Oct 1, 2024
1 parent 0825322 commit 473854b
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 16 deletions.
27 changes: 23 additions & 4 deletions src/ert/scheduler/lsf_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,14 +109,15 @@ class JobData:
iens: int
job_state: AnyJob
submitted_timestamp: float
exec_hosts: str = "-"


def parse_bjobs(bjobs_output: str) -> Dict[str, JobState]:
data: Dict[str, JobState] = {}
for line in bjobs_output.splitlines():
tokens = line.split(sep="^")
if len(tokens) == 2:
job_id, job_state = tokens
if len(tokens) == 3:
job_id, job_state, _ = tokens
if job_state not in get_args(JobState):
logger.error(
f"Unknown state {job_state} obtained from "
Expand All @@ -127,6 +128,17 @@ def parse_bjobs(bjobs_output: str) -> Dict[str, JobState]:
return data


def parse_bjobs_exec_hosts(bjobs_output: str) -> Dict[str, str]:
data: Dict[str, str] = {}
for line in bjobs_output.splitlines():
tokens = line.split(sep="^")
if len(tokens) == 3:
job_id, _, exec_hosts = tokens
if exec_hosts != "-":
data[job_id] = exec_hosts
return data


def build_resource_requirement_string(
exclude_hosts: Sequence[str],
realization_memory: int,
Expand Down Expand Up @@ -423,7 +435,7 @@ async def poll(self) -> None:
str(self._bjobs_cmd),
"-noheader",
"-o",
"jobid stat delimiter='^'",
"jobid stat exec_host delimiter='^'",
*current_jobids,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
Expand All @@ -440,6 +452,14 @@ async def poll(self) -> None:
f"bjobs gave returncode {process.returncode} and error {stderr.decode()}"
)
bjobs_states = _parse_jobs_dict(parse_bjobs(stdout.decode(errors="ignore")))
bjobs_exec_hosts = parse_bjobs_exec_hosts(stdout.decode(errors="ignore"))

for jobid, exec_hosts in bjobs_exec_hosts.items():
if self._jobs[jobid].exec_hosts == "-":
logger.info(
f"Realization {self._jobs[jobid].iens} was executed on host: {exec_hosts}"
)
self._jobs[jobid].exec_hosts = exec_hosts

job_ids_found_in_bjobs_output = set(bjobs_states.keys())
if (
Expand Down Expand Up @@ -491,7 +511,6 @@ async def _process_job_update(self, job_id: str, new_state: AnyJob) -> None:
logger.info(f"Realization {iens} (LSF-id: {self._iens2jobid[iens]}) failed")
exit_code = await self._get_exit_code(job_id)
event = FinishedEvent(iens=iens, returncode=exit_code)

elif isinstance(new_state, FinishedJobSuccess):
logger.info(
f"Realization {iens} (LSF-id: {self._iens2jobid[iens]}) succeeded"
Expand Down
2 changes: 1 addition & 1 deletion tests/ert/unit_tests/scheduler/bin/bjobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def get_parser() -> argparse.ArgumentParser:


def bjobs_formatter(jobstats: List[Job]) -> str:
return "".join([f"{job.job_id}^{job.job_state}\n" for job in jobstats])
return "".join([f"{job.job_id}^{job.job_state}^-\n" for job in jobstats])


def read(path: Path, default: Optional[str] = None) -> Optional[str]:
Expand Down
53 changes: 42 additions & 11 deletions tests/ert/unit_tests/scheduler/test_lsf_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
filter_job_ids_on_submission_time,
parse_bhist,
parse_bjobs,
parse_bjobs_exec_hosts,
)
from tests.ert.utils import poll, wait_until

Expand Down Expand Up @@ -428,13 +429,13 @@ def test_parse_bjobs_gives_empty_result_on_random_input(some_text):
"bjobs_output, expected",
[
pytest.param(
"1^RUN",
"1^RUN^-",
{"1": "RUN"},
id="basic",
),
pytest.param("1^DONE", {"1": "DONE"}, id="done"),
pytest.param("1^DONE^-", {"1": "DONE"}, id="done"),
pytest.param(
"1^DONE\n2^RUN",
"1^DONE^-\n2^RUN^-",
{"1": "DONE", "2": "RUN"},
id="two_jobs",
),
Expand All @@ -444,13 +445,43 @@ def test_parse_bjobs_happy_path(bjobs_output, expected):
assert parse_bjobs(bjobs_output) == expected


@pytest.mark.parametrize(
"bjobs_output, expected",
[
pytest.param(
"1^RUN^st-vgrid01",
{"1": "st-vgrid01"},
id="one_host",
),
pytest.param("1^DONE^-", {}, id="no_host"),
pytest.param(
"1^DONE^st-vgrid02\n2^RUN^-",
{"1": "st-vgrid02"},
id="only_one_host_outputs",
),
],
)
def test_parse_bjobs_exec_hosts_happy_path(bjobs_output, expected):
assert parse_bjobs_exec_hosts(bjobs_output) == expected


@given(
st.integers(min_value=1),
st.from_type(JobState),
)
def test_parse_bjobs(job_id, job_state):
assert parse_bjobs(f"{job_id}^{job_state}^-") == {str(job_id): job_state}


@given(
st.integers(min_value=1),
nonempty_string_without_whitespace(),
st.from_type(JobState),
nonempty_string_without_whitespace(),
)
def test_parse_bjobs(job_id, username, job_state):
assert parse_bjobs(f"{job_id}^{job_state}") == {str(job_id): job_state}
def test_parse_bjobs_exec_host(job_id, job_state, exec_host):
assert parse_bjobs_exec_hosts(f"{job_id}^{job_state}^{exec_host}") == {
str(job_id): exec_host
}


@given(nonempty_string_without_whitespace().filter(lambda x: x not in valid_jobstates))
Expand All @@ -460,15 +491,15 @@ def test_parse_bjobs_invalid_state_is_ignored(random_state):

def test_parse_bjobs_invalid_state_is_logged(caplog):
# (cannot combine caplog with hypothesis)
parse_bjobs("1^FOO")
parse_bjobs("1^FOO^-")
assert "Unknown state FOO" in caplog.text


@pytest.mark.parametrize(
"bjobs_script, expectation",
[
pytest.param(
"echo '1^DONE'; exit 0",
"echo '1^DONE^-'; exit 0",
does_not_raise(),
id="all-good",
),
Expand All @@ -484,13 +515,13 @@ def test_parse_bjobs_invalid_state_is_logged(caplog):
id="empty_cluster_specific_id",
),
pytest.param(
"echo '1^DONE'; echo 'Job <2> is not found' >&2 ; exit 255",
"echo '1^DONE^-'; echo 'Job <2> is not found' >&2 ; exit 255",
# If we have some success and some failures, actual command returns 255
does_not_raise(),
id="error_for_irrelevant_job_id",
),
pytest.param(
"echo '2^DONE'",
"echo '2^DONE^-'",
pytest.raises(asyncio.TimeoutError),
id="wrong-job-id",
),
Expand All @@ -500,7 +531,7 @@ def test_parse_bjobs_invalid_state_is_logged(caplog):
id="exit-1",
),
pytest.param(
"echo '1^DONE'; exit 1",
"echo '1^DONE^-'; exit 1",
# (this is not observed in reality)
does_not_raise(),
id="correct_output_but_exitcode_1",
Expand Down

0 comments on commit 473854b

Please sign in to comment.