diff --git a/src/ert/scheduler/lsf_driver.py b/src/ert/scheduler/lsf_driver.py index cd2f49a9ca6..c68f5caf661 100644 --- a/src/ert/scheduler/lsf_driver.py +++ b/src/ert/scheduler/lsf_driver.py @@ -108,14 +108,15 @@ class JobData: iens: int job_state: AnyJob submitted_timestamp: float + exec_hosts: str def parse_bjobs(bjobs_output: str) -> Dict[str, JobState]: data: Dict[str, JobState] = {} for line in bjobs_output.splitlines(): tokens = line.split(sep="^") - if len(tokens) == 2: - job_id, job_state = tokens + if len(tokens) == 3: + job_id, job_state, _ = tokens if job_state not in get_args(JobState): logger.error( f"Unknown state {job_state} obtained from " @@ -126,6 +127,17 @@ def parse_bjobs(bjobs_output: str) -> Dict[str, JobState]: return data +def parse_bjobs_exec_hosts(bjobs_output: str) -> Dict[str, str]: + data: Dict[str, str] = {} + for line in bjobs_output.splitlines(): + tokens = line.split(sep="^") + if len(tokens) == 3: + job_id, _, exec_hosts = tokens + if exec_hosts != "-": + data[job_id] = exec_hosts + return data + + def build_resource_requirement_string( exclude_hosts: Sequence[str], realization_memory: int, @@ -360,6 +372,7 @@ async def submit( iens=iens, job_state=QueuedJob(job_state="PEND"), submitted_timestamp=time.time(), + exec_hosts="-", ) self._iens2jobid[iens] = job_id @@ -421,7 +434,7 @@ async def poll(self) -> None: str(self._bjobs_cmd), "-noheader", "-o", - "jobid stat delimiter='^'", + "jobid stat exec_host delimiter='^'", *current_jobids, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, @@ -438,6 +451,14 @@ async def poll(self) -> None: f"bjobs gave returncode {process.returncode} and error {stderr.decode()}" ) bjobs_states = _parse_jobs_dict(parse_bjobs(stdout.decode(errors="ignore"))) + bjobs_exec_hosts = parse_bjobs_exec_hosts(stdout.decode(errors="ignore")) + + for jobid, exec_hosts in bjobs_exec_hosts.items(): + if self._jobs[jobid].exec_hosts == "-": + logger.info( + f"Realization {self._jobs[jobid].iens} was executed on host: {exec_hosts}" + ) + self._jobs[jobid].exec_hosts = exec_hosts job_ids_found_in_bjobs_output = set(bjobs_states.keys()) if ( @@ -489,7 +510,6 @@ async def _process_job_update(self, job_id: str, new_state: AnyJob) -> None: logger.info(f"Realization {iens} (LSF-id: {self._iens2jobid[iens]}) failed") exit_code = await self._get_exit_code(job_id) event = FinishedEvent(iens=iens, returncode=exit_code) - elif isinstance(new_state, FinishedJobSuccess): logger.info( f"Realization {iens} (LSF-id: {self._iens2jobid[iens]}) succeeded"