From 473854b0c5c358e77c274ea8b86e87fb84015bab Mon Sep 17 00:00:00 2001 From: Andreas Eknes Lie Date: Tue, 24 Sep 2024 14:39:17 +0200 Subject: [PATCH] Retrieve exec_hosts from bjobs and log to azure Retrieve exec_hosts from bjobs and log to azure Update bjobs output with placeholder for exec_hosts in tests Add tests for bjobs_exec_host parsing --- src/ert/scheduler/lsf_driver.py | 27 ++++++++-- tests/ert/unit_tests/scheduler/bin/bjobs.py | 2 +- .../unit_tests/scheduler/test_lsf_driver.py | 53 +++++++++++++++---- 3 files changed, 66 insertions(+), 16 deletions(-) diff --git a/src/ert/scheduler/lsf_driver.py b/src/ert/scheduler/lsf_driver.py index 7c7e364a2dc..c64a3e2044a 100644 --- a/src/ert/scheduler/lsf_driver.py +++ b/src/ert/scheduler/lsf_driver.py @@ -109,14 +109,15 @@ class JobData: iens: int job_state: AnyJob submitted_timestamp: float + exec_hosts: str = "-" def parse_bjobs(bjobs_output: str) -> Dict[str, JobState]: data: Dict[str, JobState] = {} for line in bjobs_output.splitlines(): tokens = line.split(sep="^") - if len(tokens) == 2: - job_id, job_state = tokens + if len(tokens) == 3: + job_id, job_state, _ = tokens if job_state not in get_args(JobState): logger.error( f"Unknown state {job_state} obtained from " @@ -127,6 +128,17 @@ def parse_bjobs(bjobs_output: str) -> Dict[str, JobState]: return data +def parse_bjobs_exec_hosts(bjobs_output: str) -> Dict[str, str]: + data: Dict[str, str] = {} + for line in bjobs_output.splitlines(): + tokens = line.split(sep="^") + if len(tokens) == 3: + job_id, _, exec_hosts = tokens + if exec_hosts != "-": + data[job_id] = exec_hosts + return data + + def build_resource_requirement_string( exclude_hosts: Sequence[str], realization_memory: int, @@ -423,7 +435,7 @@ async def poll(self) -> None: str(self._bjobs_cmd), "-noheader", "-o", - "jobid stat delimiter='^'", + "jobid stat exec_host delimiter='^'", *current_jobids, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, @@ -440,6 +452,14 @@ async def poll(self) -> None: f"bjobs gave returncode {process.returncode} and error {stderr.decode()}" ) bjobs_states = _parse_jobs_dict(parse_bjobs(stdout.decode(errors="ignore"))) + bjobs_exec_hosts = parse_bjobs_exec_hosts(stdout.decode(errors="ignore")) + + for jobid, exec_hosts in bjobs_exec_hosts.items(): + if self._jobs[jobid].exec_hosts == "-": + logger.info( + f"Realization {self._jobs[jobid].iens} was executed on host: {exec_hosts}" + ) + self._jobs[jobid].exec_hosts = exec_hosts job_ids_found_in_bjobs_output = set(bjobs_states.keys()) if ( @@ -491,7 +511,6 @@ async def _process_job_update(self, job_id: str, new_state: AnyJob) -> None: logger.info(f"Realization {iens} (LSF-id: {self._iens2jobid[iens]}) failed") exit_code = await self._get_exit_code(job_id) event = FinishedEvent(iens=iens, returncode=exit_code) - elif isinstance(new_state, FinishedJobSuccess): logger.info( f"Realization {iens} (LSF-id: {self._iens2jobid[iens]}) succeeded" diff --git a/tests/ert/unit_tests/scheduler/bin/bjobs.py b/tests/ert/unit_tests/scheduler/bin/bjobs.py index 883d97ae8cd..7d7a636df96 100644 --- a/tests/ert/unit_tests/scheduler/bin/bjobs.py +++ b/tests/ert/unit_tests/scheduler/bin/bjobs.py @@ -27,7 +27,7 @@ def get_parser() -> argparse.ArgumentParser: def bjobs_formatter(jobstats: List[Job]) -> str: - return "".join([f"{job.job_id}^{job.job_state}\n" for job in jobstats]) + return "".join([f"{job.job_id}^{job.job_state}^-\n" for job in jobstats]) def read(path: Path, default: Optional[str] = None) -> Optional[str]: diff --git a/tests/ert/unit_tests/scheduler/test_lsf_driver.py b/tests/ert/unit_tests/scheduler/test_lsf_driver.py index f10da7dfc29..489baa54e3d 100644 --- a/tests/ert/unit_tests/scheduler/test_lsf_driver.py +++ b/tests/ert/unit_tests/scheduler/test_lsf_driver.py @@ -36,6 +36,7 @@ filter_job_ids_on_submission_time, parse_bhist, parse_bjobs, + parse_bjobs_exec_hosts, ) from tests.ert.utils import poll, wait_until @@ -428,13 +429,13 @@ def test_parse_bjobs_gives_empty_result_on_random_input(some_text): "bjobs_output, expected", [ pytest.param( - "1^RUN", + "1^RUN^-", {"1": "RUN"}, id="basic", ), - pytest.param("1^DONE", {"1": "DONE"}, id="done"), + pytest.param("1^DONE^-", {"1": "DONE"}, id="done"), pytest.param( - "1^DONE\n2^RUN", + "1^DONE^-\n2^RUN^-", {"1": "DONE", "2": "RUN"}, id="two_jobs", ), @@ -444,13 +445,43 @@ def test_parse_bjobs_happy_path(bjobs_output, expected): assert parse_bjobs(bjobs_output) == expected +@pytest.mark.parametrize( + "bjobs_output, expected", + [ + pytest.param( + "1^RUN^st-vgrid01", + {"1": "st-vgrid01"}, + id="one_host", + ), + pytest.param("1^DONE^-", {}, id="no_host"), + pytest.param( + "1^DONE^st-vgrid02\n2^RUN^-", + {"1": "st-vgrid02"}, + id="only_one_host_outputs", + ), + ], +) +def test_parse_bjobs_exec_hosts_happy_path(bjobs_output, expected): + assert parse_bjobs_exec_hosts(bjobs_output) == expected + + +@given( + st.integers(min_value=1), + st.from_type(JobState), +) +def test_parse_bjobs(job_id, job_state): + assert parse_bjobs(f"{job_id}^{job_state}^-") == {str(job_id): job_state} + + @given( st.integers(min_value=1), - nonempty_string_without_whitespace(), st.from_type(JobState), + nonempty_string_without_whitespace(), ) -def test_parse_bjobs(job_id, username, job_state): - assert parse_bjobs(f"{job_id}^{job_state}") == {str(job_id): job_state} +def test_parse_bjobs_exec_host(job_id, job_state, exec_host): + assert parse_bjobs_exec_hosts(f"{job_id}^{job_state}^{exec_host}") == { + str(job_id): exec_host + } @given(nonempty_string_without_whitespace().filter(lambda x: x not in valid_jobstates)) @@ -460,7 +491,7 @@ def test_parse_bjobs_invalid_state_is_ignored(random_state): def test_parse_bjobs_invalid_state_is_logged(caplog): # (cannot combine caplog with hypothesis) - parse_bjobs("1^FOO") + parse_bjobs("1^FOO^-") assert "Unknown state FOO" in caplog.text @@ -468,7 +499,7 @@ def test_parse_bjobs_invalid_state_is_logged(caplog): "bjobs_script, expectation", [ pytest.param( - "echo '1^DONE'; exit 0", + "echo '1^DONE^-'; exit 0", does_not_raise(), id="all-good", ), @@ -484,13 +515,13 @@ def test_parse_bjobs_invalid_state_is_logged(caplog): id="empty_cluster_specific_id", ), pytest.param( - "echo '1^DONE'; echo 'Job <2> is not found' >&2 ; exit 255", + "echo '1^DONE^-'; echo 'Job <2> is not found' >&2 ; exit 255", # If we have some success and some failures, actual command returns 255 does_not_raise(), id="error_for_irrelevant_job_id", ), pytest.param( - "echo '2^DONE'", + "echo '2^DONE^-'", pytest.raises(asyncio.TimeoutError), id="wrong-job-id", ), @@ -500,7 +531,7 @@ def test_parse_bjobs_invalid_state_is_logged(caplog): id="exit-1", ), pytest.param( - "echo '1^DONE'; exit 1", + "echo '1^DONE^-'; exit 1", # (this is not observed in reality) does_not_raise(), id="correct_output_but_exitcode_1",