Skip to content

Commit

Permalink
Move scale in at exit code into close method of job status poller (#3311
Browse files Browse the repository at this point in the history
)

Changed behaviour: this will now happen slightly before it used to, but the
differences are only changed place in the call stack, and log messages will
appear in a different order now - so the actual scaling-in behaviour should
be unchanged.
  • Loading branch information
benclifford authored Apr 5, 2024
1 parent 6a3834f commit 7fba7d6
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 15 deletions.
16 changes: 1 addition & 15 deletions parsl/dataflow/dflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -1218,21 +1218,7 @@ def cleanup(self) -> None:
self.job_status_poller.close()
logger.info("Terminated job status poller")

logger.info("Scaling in and shutting down executors")

for ef in self.job_status_poller._executor_facades:
if not ef.executor.bad_state_is_set:
logger.info(f"Scaling in executor {ef.executor.label}")

# this code needs to be at least as many blocks as need
# cancelling, but it is safe to be more, as the scaling
# code will cope with being asked to cancel more blocks
# than exist.
block_count = len(ef.status)
ef.scale_in(block_count)

else: # and bad_state_is_set
logger.warning(f"Not scaling in executor {ef.executor.label} because it is in bad state")
logger.info("Shutting down executors")

for executor in self.executors.values():
logger.info(f"Shutting down executor {executor.label}")
Expand Down
16 changes: 16 additions & 0 deletions parsl/jobs/job_status_poller.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,3 +136,19 @@ def add_executors(self, executors: Sequence[BlockProviderExecutor]) -> None:
logger.debug("Adding executor {}".format(executor.label))
self._executor_facades.append(PolledExecutorFacade(executor, self.dfk))
self._strategy.add_executors(executors)

def close(self):
super().close()
for ef in self._executor_facades:
if not ef.executor.bad_state_is_set:
logger.info(f"Scaling in executor {ef.executor.label}")

# this code needs to be at least as many blocks as need
# cancelling, but it is safe to be more, as the scaling
# code will cope with being asked to cancel more blocks
# than exist.
block_count = len(ef.status)
ef.scale_in(block_count)

else: # and bad_state_is_set
logger.warning(f"Not scaling in executor {ef.executor.label} because it is in bad state")

0 comments on commit 7fba7d6

Please sign in to comment.