Skip to content

Commit

Permalink
Retrieve exec_hosts from bjobs and log to azure
Browse files Browse the repository at this point in the history
  • Loading branch information
andreas-el committed Oct 1, 2024
1 parent 484381b commit 1f5f5d5
Showing 1 changed file with 24 additions and 4 deletions.
28 changes: 24 additions & 4 deletions src/ert/scheduler/lsf_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,14 +108,15 @@ class JobData:
iens: int
job_state: AnyJob
submitted_timestamp: float
exec_hosts: str


def parse_bjobs(bjobs_output: str) -> Dict[str, JobState]:
data: Dict[str, JobState] = {}
for line in bjobs_output.splitlines():
tokens = line.split(sep="^")
if len(tokens) == 2:
job_id, job_state = tokens
if len(tokens) == 3:
job_id, job_state, _ = tokens
if job_state not in get_args(JobState):
logger.error(
f"Unknown state {job_state} obtained from "
Expand All @@ -126,6 +127,17 @@ def parse_bjobs(bjobs_output: str) -> Dict[str, JobState]:
return data


def parse_bjobs_exec_hosts(bjobs_output: str) -> Dict[str, str]:
data: Dict[str, str] = {}
for line in bjobs_output.splitlines():
tokens = line.split(sep="^")
if len(tokens) == 3:
job_id, _, exec_hosts = tokens
if exec_hosts != "-":
data[job_id] = exec_hosts
return data


def build_resource_requirement_string(
exclude_hosts: Sequence[str],
realization_memory: int,
Expand Down Expand Up @@ -360,6 +372,7 @@ async def submit(
iens=iens,
job_state=QueuedJob(job_state="PEND"),
submitted_timestamp=time.time(),
exec_hosts="-",
)
self._iens2jobid[iens] = job_id

Expand Down Expand Up @@ -421,7 +434,7 @@ async def poll(self) -> None:
str(self._bjobs_cmd),
"-noheader",
"-o",
"jobid stat delimiter='^'",
"jobid stat exec_host delimiter='^'",
*current_jobids,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
Expand All @@ -438,6 +451,14 @@ async def poll(self) -> None:
f"bjobs gave returncode {process.returncode} and error {stderr.decode()}"
)
bjobs_states = _parse_jobs_dict(parse_bjobs(stdout.decode(errors="ignore")))
bjobs_exec_hosts = parse_bjobs_exec_hosts(stdout.decode(errors="ignore"))

for jobid, exec_hosts in bjobs_exec_hosts.items():
if self._jobs[jobid].exec_hosts == "-":
logger.info(
f"Realization {self._jobs[jobid].iens} was executed on host: {exec_hosts}"
)
self._jobs[jobid].exec_hosts = exec_hosts

job_ids_found_in_bjobs_output = set(bjobs_states.keys())
if (
Expand Down Expand Up @@ -489,7 +510,6 @@ async def _process_job_update(self, job_id: str, new_state: AnyJob) -> None:
logger.info(f"Realization {iens} (LSF-id: {self._iens2jobid[iens]}) failed")
exit_code = await self._get_exit_code(job_id)
event = FinishedEvent(iens=iens, returncode=exit_code)

elif isinstance(new_state, FinishedJobSuccess):
logger.info(
f"Realization {iens} (LSF-id: {self._iens2jobid[iens]}) succeeded"
Expand Down

0 comments on commit 1f5f5d5

Please sign in to comment.