From 473854b0c5c358e77c274ea8b86e87fb84015bab Mon Sep 17 00:00:00 2001 From: Andreas Eknes Lie Date: Tue, 24 Sep 2024 14:39:17 +0200 Subject: [PATCH 1/5] Retrieve exec_hosts from bjobs and log to azure Retrieve exec_hosts from bjobs and log to azure Update bjobs output with placeholder for exec_hosts in tests Add tests for bjobs_exec_host parsing --- src/ert/scheduler/lsf_driver.py | 27 ++++++++-- tests/ert/unit_tests/scheduler/bin/bjobs.py | 2 +- .../unit_tests/scheduler/test_lsf_driver.py | 53 +++++++++++++++---- 3 files changed, 66 insertions(+), 16 deletions(-) diff --git a/src/ert/scheduler/lsf_driver.py b/src/ert/scheduler/lsf_driver.py index 7c7e364a2dc..c64a3e2044a 100644 --- a/src/ert/scheduler/lsf_driver.py +++ b/src/ert/scheduler/lsf_driver.py @@ -109,14 +109,15 @@ class JobData: iens: int job_state: AnyJob submitted_timestamp: float + exec_hosts: str = "-" def parse_bjobs(bjobs_output: str) -> Dict[str, JobState]: data: Dict[str, JobState] = {} for line in bjobs_output.splitlines(): tokens = line.split(sep="^") - if len(tokens) == 2: - job_id, job_state = tokens + if len(tokens) == 3: + job_id, job_state, _ = tokens if job_state not in get_args(JobState): logger.error( f"Unknown state {job_state} obtained from " @@ -127,6 +128,17 @@ def parse_bjobs(bjobs_output: str) -> Dict[str, JobState]: return data +def parse_bjobs_exec_hosts(bjobs_output: str) -> Dict[str, str]: + data: Dict[str, str] = {} + for line in bjobs_output.splitlines(): + tokens = line.split(sep="^") + if len(tokens) == 3: + job_id, _, exec_hosts = tokens + if exec_hosts != "-": + data[job_id] = exec_hosts + return data + + def build_resource_requirement_string( exclude_hosts: Sequence[str], realization_memory: int, @@ -423,7 +435,7 @@ async def poll(self) -> None: str(self._bjobs_cmd), "-noheader", "-o", - "jobid stat delimiter='^'", + "jobid stat exec_host delimiter='^'", *current_jobids, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, @@ -440,6 +452,14 @@ async def poll(self) -> None: f"bjobs gave returncode {process.returncode} and error {stderr.decode()}" ) bjobs_states = _parse_jobs_dict(parse_bjobs(stdout.decode(errors="ignore"))) + bjobs_exec_hosts = parse_bjobs_exec_hosts(stdout.decode(errors="ignore")) + + for jobid, exec_hosts in bjobs_exec_hosts.items(): + if self._jobs[jobid].exec_hosts == "-": + logger.info( + f"Realization {self._jobs[jobid].iens} was executed on host: {exec_hosts}" + ) + self._jobs[jobid].exec_hosts = exec_hosts job_ids_found_in_bjobs_output = set(bjobs_states.keys()) if ( @@ -491,7 +511,6 @@ async def _process_job_update(self, job_id: str, new_state: AnyJob) -> None: logger.info(f"Realization {iens} (LSF-id: {self._iens2jobid[iens]}) failed") exit_code = await self._get_exit_code(job_id) event = FinishedEvent(iens=iens, returncode=exit_code) - elif isinstance(new_state, FinishedJobSuccess): logger.info( f"Realization {iens} (LSF-id: {self._iens2jobid[iens]}) succeeded" diff --git a/tests/ert/unit_tests/scheduler/bin/bjobs.py b/tests/ert/unit_tests/scheduler/bin/bjobs.py index 883d97ae8cd..7d7a636df96 100644 --- a/tests/ert/unit_tests/scheduler/bin/bjobs.py +++ b/tests/ert/unit_tests/scheduler/bin/bjobs.py @@ -27,7 +27,7 @@ def get_parser() -> argparse.ArgumentParser: def bjobs_formatter(jobstats: List[Job]) -> str: - return "".join([f"{job.job_id}^{job.job_state}\n" for job in jobstats]) + return "".join([f"{job.job_id}^{job.job_state}^-\n" for job in jobstats]) def read(path: Path, default: Optional[str] = None) -> Optional[str]: diff --git a/tests/ert/unit_tests/scheduler/test_lsf_driver.py b/tests/ert/unit_tests/scheduler/test_lsf_driver.py index f10da7dfc29..489baa54e3d 100644 --- a/tests/ert/unit_tests/scheduler/test_lsf_driver.py +++ b/tests/ert/unit_tests/scheduler/test_lsf_driver.py @@ -36,6 +36,7 @@ filter_job_ids_on_submission_time, parse_bhist, parse_bjobs, + parse_bjobs_exec_hosts, ) from tests.ert.utils import poll, wait_until @@ -428,13 +429,13 @@ def test_parse_bjobs_gives_empty_result_on_random_input(some_text): "bjobs_output, expected", [ pytest.param( - "1^RUN", + "1^RUN^-", {"1": "RUN"}, id="basic", ), - pytest.param("1^DONE", {"1": "DONE"}, id="done"), + pytest.param("1^DONE^-", {"1": "DONE"}, id="done"), pytest.param( - "1^DONE\n2^RUN", + "1^DONE^-\n2^RUN^-", {"1": "DONE", "2": "RUN"}, id="two_jobs", ), @@ -444,13 +445,43 @@ def test_parse_bjobs_happy_path(bjobs_output, expected): assert parse_bjobs(bjobs_output) == expected +@pytest.mark.parametrize( + "bjobs_output, expected", + [ + pytest.param( + "1^RUN^st-vgrid01", + {"1": "st-vgrid01"}, + id="one_host", + ), + pytest.param("1^DONE^-", {}, id="no_host"), + pytest.param( + "1^DONE^st-vgrid02\n2^RUN^-", + {"1": "st-vgrid02"}, + id="only_one_host_outputs", + ), + ], +) +def test_parse_bjobs_exec_hosts_happy_path(bjobs_output, expected): + assert parse_bjobs_exec_hosts(bjobs_output) == expected + + +@given( + st.integers(min_value=1), + st.from_type(JobState), +) +def test_parse_bjobs(job_id, job_state): + assert parse_bjobs(f"{job_id}^{job_state}^-") == {str(job_id): job_state} + + @given( st.integers(min_value=1), - nonempty_string_without_whitespace(), st.from_type(JobState), + nonempty_string_without_whitespace(), ) -def test_parse_bjobs(job_id, username, job_state): - assert parse_bjobs(f"{job_id}^{job_state}") == {str(job_id): job_state} +def test_parse_bjobs_exec_host(job_id, job_state, exec_host): + assert parse_bjobs_exec_hosts(f"{job_id}^{job_state}^{exec_host}") == { + str(job_id): exec_host + } @given(nonempty_string_without_whitespace().filter(lambda x: x not in valid_jobstates)) @@ -460,7 +491,7 @@ def test_parse_bjobs_invalid_state_is_ignored(random_state): def test_parse_bjobs_invalid_state_is_logged(caplog): # (cannot combine caplog with hypothesis) - parse_bjobs("1^FOO") + parse_bjobs("1^FOO^-") assert "Unknown state FOO" in caplog.text @@ -468,7 +499,7 @@ def test_parse_bjobs_invalid_state_is_logged(caplog): "bjobs_script, expectation", [ pytest.param( - "echo '1^DONE'; exit 0", + "echo '1^DONE^-'; exit 0", does_not_raise(), id="all-good", ), @@ -484,13 +515,13 @@ def test_parse_bjobs_invalid_state_is_logged(caplog): id="empty_cluster_specific_id", ), pytest.param( - "echo '1^DONE'; echo 'Job <2> is not found' >&2 ; exit 255", + "echo '1^DONE^-'; echo 'Job <2> is not found' >&2 ; exit 255", # If we have some success and some failures, actual command returns 255 does_not_raise(), id="error_for_irrelevant_job_id", ), pytest.param( - "echo '2^DONE'", + "echo '2^DONE^-'", pytest.raises(asyncio.TimeoutError), id="wrong-job-id", ), @@ -500,7 +531,7 @@ def test_parse_bjobs_invalid_state_is_logged(caplog): id="exit-1", ), pytest.param( - "echo '1^DONE'; exit 1", + "echo '1^DONE^-'; exit 1", # (this is not observed in reality) does_not_raise(), id="correct_output_but_exitcode_1", From 4a615101c47e38000ea9f9c4075d859b400dfd7f Mon Sep 17 00:00:00 2001 From: Andreas Eknes Lie Date: Tue, 1 Oct 2024 15:24:06 +0200 Subject: [PATCH 2/5] Changed logger message to assigned --- src/ert/scheduler/lsf_driver.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ert/scheduler/lsf_driver.py b/src/ert/scheduler/lsf_driver.py index c64a3e2044a..ac4158b6f1e 100644 --- a/src/ert/scheduler/lsf_driver.py +++ b/src/ert/scheduler/lsf_driver.py @@ -457,7 +457,7 @@ async def poll(self) -> None: for jobid, exec_hosts in bjobs_exec_hosts.items(): if self._jobs[jobid].exec_hosts == "-": logger.info( - f"Realization {self._jobs[jobid].iens} was executed on host: {exec_hosts}" + f"Realization {self._jobs[jobid].iens} was assigned to host: {exec_hosts}" ) self._jobs[jobid].exec_hosts = exec_hosts From beb485470eaf8f9f5511ddcd4c741b1988834434 Mon Sep 17 00:00:00 2001 From: Andreas Eknes Lie Date: Tue, 1 Oct 2024 15:24:28 +0200 Subject: [PATCH 3/5] Replaced computer names --- tests/ert/unit_tests/scheduler/test_lsf_driver.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/ert/unit_tests/scheduler/test_lsf_driver.py b/tests/ert/unit_tests/scheduler/test_lsf_driver.py index 489baa54e3d..ff6617f4527 100644 --- a/tests/ert/unit_tests/scheduler/test_lsf_driver.py +++ b/tests/ert/unit_tests/scheduler/test_lsf_driver.py @@ -449,14 +449,14 @@ def test_parse_bjobs_happy_path(bjobs_output, expected): "bjobs_output, expected", [ pytest.param( - "1^RUN^st-vgrid01", - {"1": "st-vgrid01"}, + "1^RUN^abc-comp01", + {"1": "abc-comp01"}, id="one_host", ), pytest.param("1^DONE^-", {}, id="no_host"), pytest.param( - "1^DONE^st-vgrid02\n2^RUN^-", - {"1": "st-vgrid02"}, + "1^DONE^abc-comp02\n2^RUN^-", + {"1": "abc-comp02"}, id="only_one_host_outputs", ), ], From cc67aa914632fab91e4d73a26a1118b2105efd4c Mon Sep 17 00:00:00 2001 From: Andreas Eknes Lie Date: Wed, 2 Oct 2024 10:19:14 +0200 Subject: [PATCH 4/5] Verify logging occur only once --- src/ert/scheduler/lsf_driver.py | 19 +++++++++++-------- .../unit_tests/scheduler/test_lsf_driver.py | 14 ++++++++++++++ 2 files changed, 25 insertions(+), 8 deletions(-) diff --git a/src/ert/scheduler/lsf_driver.py b/src/ert/scheduler/lsf_driver.py index ac4158b6f1e..e2ae3333a2f 100644 --- a/src/ert/scheduler/lsf_driver.py +++ b/src/ert/scheduler/lsf_driver.py @@ -452,14 +452,9 @@ async def poll(self) -> None: f"bjobs gave returncode {process.returncode} and error {stderr.decode()}" ) bjobs_states = _parse_jobs_dict(parse_bjobs(stdout.decode(errors="ignore"))) - bjobs_exec_hosts = parse_bjobs_exec_hosts(stdout.decode(errors="ignore")) - - for jobid, exec_hosts in bjobs_exec_hosts.items(): - if self._jobs[jobid].exec_hosts == "-": - logger.info( - f"Realization {self._jobs[jobid].iens} was assigned to host: {exec_hosts}" - ) - self._jobs[jobid].exec_hosts = exec_hosts + self.update_and_log_exec_hosts( + parse_bjobs_exec_hosts(stdout.decode(errors="ignore")) + ) job_ids_found_in_bjobs_output = set(bjobs_states.keys()) if ( @@ -625,6 +620,14 @@ async def _poll_once_by_bhist( self._bhist_cache_timestamp = time.time() return _parse_jobs_dict(jobs) + def update_and_log_exec_hosts(self, bjobs_exec_hosts: Dict[str, str]) -> None: + for job_id, exec_hosts in bjobs_exec_hosts.items(): + if self._jobs[job_id].exec_hosts == "-": + logger.info( + f"Realization {self._jobs[job_id].iens} was assigned to host: {exec_hosts}" + ) + self._jobs[job_id].exec_hosts = exec_hosts + def _build_resource_requirement_arg(self, realization_memory: int) -> List[str]: resource_requirement_string = build_resource_requirement_string( self._exclude_hosts, diff --git a/tests/ert/unit_tests/scheduler/test_lsf_driver.py b/tests/ert/unit_tests/scheduler/test_lsf_driver.py index ff6617f4527..e970a51268b 100644 --- a/tests/ert/unit_tests/scheduler/test_lsf_driver.py +++ b/tests/ert/unit_tests/scheduler/test_lsf_driver.py @@ -1010,6 +1010,20 @@ def not_found_bjobs(monkeypatch, tmp_path): bjobs_path.chmod(bjobs_path.stat().st_mode | stat.S_IEXEC) +async def test_bjobs_exec_host_logs_only_once(tmp_path, job_name, caplog): + caplog.set_level(logging.INFO) + os.chdir(tmp_path) + driver = LsfDriver() + await driver.submit(0, "sh", "-c", "sleep 1", name=job_name) + + job_id = next(iter(driver._jobs.keys())) + driver.update_and_log_exec_hosts({job_id: "COMP-01"}) + driver.update_and_log_exec_hosts({job_id: "COMP-02"}) + + await poll(driver, {0}) + assert caplog.text.count("was assigned to host:") == 1 + + async def test_lsf_stdout_file(tmp_path, job_name): os.chdir(tmp_path) driver = LsfDriver() From 38803a9a155bd5eda68eac9efeaff7cf5b2d9cde Mon Sep 17 00:00:00 2001 From: Andreas Eknes Lie Date: Wed, 2 Oct 2024 12:22:54 +0200 Subject: [PATCH 5/5] Fix reported issue by hypothesis --- src/ert/scheduler/lsf_driver.py | 3 +-- tests/ert/unit_tests/scheduler/test_lsf_driver.py | 5 ++--- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/src/ert/scheduler/lsf_driver.py b/src/ert/scheduler/lsf_driver.py index e2ae3333a2f..941e05a2c0f 100644 --- a/src/ert/scheduler/lsf_driver.py +++ b/src/ert/scheduler/lsf_driver.py @@ -134,8 +134,7 @@ def parse_bjobs_exec_hosts(bjobs_output: str) -> Dict[str, str]: tokens = line.split(sep="^") if len(tokens) == 3: job_id, _, exec_hosts = tokens - if exec_hosts != "-": - data[job_id] = exec_hosts + data[job_id] = exec_hosts return data diff --git a/tests/ert/unit_tests/scheduler/test_lsf_driver.py b/tests/ert/unit_tests/scheduler/test_lsf_driver.py index e970a51268b..30e6187a56f 100644 --- a/tests/ert/unit_tests/scheduler/test_lsf_driver.py +++ b/tests/ert/unit_tests/scheduler/test_lsf_driver.py @@ -453,11 +453,10 @@ def test_parse_bjobs_happy_path(bjobs_output, expected): {"1": "abc-comp01"}, id="one_host", ), - pytest.param("1^DONE^-", {}, id="no_host"), pytest.param( "1^DONE^abc-comp02\n2^RUN^-", - {"1": "abc-comp02"}, - id="only_one_host_outputs", + {"1": "abc-comp02", "2": "-"}, + id="two_hosts_output", ), ], )