diff --git a/src/ert/scheduler/lsf_driver.py b/src/ert/scheduler/lsf_driver.py index 7c7e364a2dc..941e05a2c0f 100644 --- a/src/ert/scheduler/lsf_driver.py +++ b/src/ert/scheduler/lsf_driver.py @@ -109,14 +109,15 @@ class JobData: iens: int job_state: AnyJob submitted_timestamp: float + exec_hosts: str = "-" def parse_bjobs(bjobs_output: str) -> Dict[str, JobState]: data: Dict[str, JobState] = {} for line in bjobs_output.splitlines(): tokens = line.split(sep="^") - if len(tokens) == 2: - job_id, job_state = tokens + if len(tokens) == 3: + job_id, job_state, _ = tokens if job_state not in get_args(JobState): logger.error( f"Unknown state {job_state} obtained from " @@ -127,6 +128,16 @@ def parse_bjobs(bjobs_output: str) -> Dict[str, JobState]: return data +def parse_bjobs_exec_hosts(bjobs_output: str) -> Dict[str, str]: + data: Dict[str, str] = {} + for line in bjobs_output.splitlines(): + tokens = line.split(sep="^") + if len(tokens) == 3: + job_id, _, exec_hosts = tokens + data[job_id] = exec_hosts + return data + + def build_resource_requirement_string( exclude_hosts: Sequence[str], realization_memory: int, @@ -423,7 +434,7 @@ async def poll(self) -> None: str(self._bjobs_cmd), "-noheader", "-o", - "jobid stat delimiter='^'", + "jobid stat exec_host delimiter='^'", *current_jobids, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, @@ -440,6 +451,9 @@ async def poll(self) -> None: f"bjobs gave returncode {process.returncode} and error {stderr.decode()}" ) bjobs_states = _parse_jobs_dict(parse_bjobs(stdout.decode(errors="ignore"))) + self.update_and_log_exec_hosts( + parse_bjobs_exec_hosts(stdout.decode(errors="ignore")) + ) job_ids_found_in_bjobs_output = set(bjobs_states.keys()) if ( @@ -491,7 +505,6 @@ async def _process_job_update(self, job_id: str, new_state: AnyJob) -> None: logger.info(f"Realization {iens} (LSF-id: {self._iens2jobid[iens]}) failed") exit_code = await self._get_exit_code(job_id) event = FinishedEvent(iens=iens, returncode=exit_code) - elif isinstance(new_state, FinishedJobSuccess): logger.info( f"Realization {iens} (LSF-id: {self._iens2jobid[iens]}) succeeded" @@ -606,6 +619,14 @@ async def _poll_once_by_bhist( self._bhist_cache_timestamp = time.time() return _parse_jobs_dict(jobs) + def update_and_log_exec_hosts(self, bjobs_exec_hosts: Dict[str, str]) -> None: + for job_id, exec_hosts in bjobs_exec_hosts.items(): + if self._jobs[job_id].exec_hosts == "-": + logger.info( + f"Realization {self._jobs[job_id].iens} was assigned to host: {exec_hosts}" + ) + self._jobs[job_id].exec_hosts = exec_hosts + def _build_resource_requirement_arg(self, realization_memory: int) -> List[str]: resource_requirement_string = build_resource_requirement_string( self._exclude_hosts, diff --git a/tests/ert/unit_tests/scheduler/bin/bjobs.py b/tests/ert/unit_tests/scheduler/bin/bjobs.py index 883d97ae8cd..7d7a636df96 100644 --- a/tests/ert/unit_tests/scheduler/bin/bjobs.py +++ b/tests/ert/unit_tests/scheduler/bin/bjobs.py @@ -27,7 +27,7 @@ def get_parser() -> argparse.ArgumentParser: def bjobs_formatter(jobstats: List[Job]) -> str: - return "".join([f"{job.job_id}^{job.job_state}\n" for job in jobstats]) + return "".join([f"{job.job_id}^{job.job_state}^-\n" for job in jobstats]) def read(path: Path, default: Optional[str] = None) -> Optional[str]: diff --git a/tests/ert/unit_tests/scheduler/test_lsf_driver.py b/tests/ert/unit_tests/scheduler/test_lsf_driver.py index f10da7dfc29..30e6187a56f 100644 --- a/tests/ert/unit_tests/scheduler/test_lsf_driver.py +++ b/tests/ert/unit_tests/scheduler/test_lsf_driver.py @@ -36,6 +36,7 @@ filter_job_ids_on_submission_time, parse_bhist, parse_bjobs, + parse_bjobs_exec_hosts, ) from tests.ert.utils import poll, wait_until @@ -428,13 +429,13 @@ def test_parse_bjobs_gives_empty_result_on_random_input(some_text): "bjobs_output, expected", [ pytest.param( - "1^RUN", + "1^RUN^-", {"1": "RUN"}, id="basic", ), - pytest.param("1^DONE", {"1": "DONE"}, id="done"), + pytest.param("1^DONE^-", {"1": "DONE"}, id="done"), pytest.param( - "1^DONE\n2^RUN", + "1^DONE^-\n2^RUN^-", {"1": "DONE", "2": "RUN"}, id="two_jobs", ), @@ -444,13 +445,42 @@ def test_parse_bjobs_happy_path(bjobs_output, expected): assert parse_bjobs(bjobs_output) == expected +@pytest.mark.parametrize( + "bjobs_output, expected", + [ + pytest.param( + "1^RUN^abc-comp01", + {"1": "abc-comp01"}, + id="one_host", + ), + pytest.param( + "1^DONE^abc-comp02\n2^RUN^-", + {"1": "abc-comp02", "2": "-"}, + id="two_hosts_output", + ), + ], +) +def test_parse_bjobs_exec_hosts_happy_path(bjobs_output, expected): + assert parse_bjobs_exec_hosts(bjobs_output) == expected + + @given( st.integers(min_value=1), - nonempty_string_without_whitespace(), st.from_type(JobState), ) -def test_parse_bjobs(job_id, username, job_state): - assert parse_bjobs(f"{job_id}^{job_state}") == {str(job_id): job_state} +def test_parse_bjobs(job_id, job_state): + assert parse_bjobs(f"{job_id}^{job_state}^-") == {str(job_id): job_state} + + +@given( + st.integers(min_value=1), + st.from_type(JobState), + nonempty_string_without_whitespace(), +) +def test_parse_bjobs_exec_host(job_id, job_state, exec_host): + assert parse_bjobs_exec_hosts(f"{job_id}^{job_state}^{exec_host}") == { + str(job_id): exec_host + } @given(nonempty_string_without_whitespace().filter(lambda x: x not in valid_jobstates)) @@ -460,7 +490,7 @@ def test_parse_bjobs_invalid_state_is_ignored(random_state): def test_parse_bjobs_invalid_state_is_logged(caplog): # (cannot combine caplog with hypothesis) - parse_bjobs("1^FOO") + parse_bjobs("1^FOO^-") assert "Unknown state FOO" in caplog.text @@ -468,7 +498,7 @@ def test_parse_bjobs_invalid_state_is_logged(caplog): "bjobs_script, expectation", [ pytest.param( - "echo '1^DONE'; exit 0", + "echo '1^DONE^-'; exit 0", does_not_raise(), id="all-good", ), @@ -484,13 +514,13 @@ def test_parse_bjobs_invalid_state_is_logged(caplog): id="empty_cluster_specific_id", ), pytest.param( - "echo '1^DONE'; echo 'Job <2> is not found' >&2 ; exit 255", + "echo '1^DONE^-'; echo 'Job <2> is not found' >&2 ; exit 255", # If we have some success and some failures, actual command returns 255 does_not_raise(), id="error_for_irrelevant_job_id", ), pytest.param( - "echo '2^DONE'", + "echo '2^DONE^-'", pytest.raises(asyncio.TimeoutError), id="wrong-job-id", ), @@ -500,7 +530,7 @@ def test_parse_bjobs_invalid_state_is_logged(caplog): id="exit-1", ), pytest.param( - "echo '1^DONE'; exit 1", + "echo '1^DONE^-'; exit 1", # (this is not observed in reality) does_not_raise(), id="correct_output_but_exitcode_1", @@ -979,6 +1009,20 @@ def not_found_bjobs(monkeypatch, tmp_path): bjobs_path.chmod(bjobs_path.stat().st_mode | stat.S_IEXEC) +async def test_bjobs_exec_host_logs_only_once(tmp_path, job_name, caplog): + caplog.set_level(logging.INFO) + os.chdir(tmp_path) + driver = LsfDriver() + await driver.submit(0, "sh", "-c", "sleep 1", name=job_name) + + job_id = next(iter(driver._jobs.keys())) + driver.update_and_log_exec_hosts({job_id: "COMP-01"}) + driver.update_and_log_exec_hosts({job_id: "COMP-02"}) + + await poll(driver, {0}) + assert caplog.text.count("was assigned to host:") == 1 + + async def test_lsf_stdout_file(tmp_path, job_name): os.chdir(tmp_path) driver = LsfDriver()