diff --git a/parsl/dataflow/dflow.py b/parsl/dataflow/dflow.py index eff326d759..4d3584b53b 100644 --- a/parsl/dataflow/dflow.py +++ b/parsl/dataflow/dflow.py @@ -1216,19 +1216,19 @@ def cleanup(self) -> None: logger.info("Scaling in and shutting down executors") - for pi in self.job_status_poller._poll_items: - if not pi.executor.bad_state_is_set: - logger.info(f"Scaling in executor {pi.executor.label}") + for ef in self.job_status_poller._executor_facades: + if not ef.executor.bad_state_is_set: + logger.info(f"Scaling in executor {ef.executor.label}") # this code needs to be at least as many blocks as need # cancelling, but it is safe to be more, as the scaling # code will cope with being asked to cancel more blocks # than exist. - block_count = len(pi.status) - pi.scale_in(block_count) + block_count = len(ef.status) + ef.scale_in(block_count) else: # and bad_state_is_set - logger.warning(f"Not scaling in executor {pi.executor.label} because it is in bad state") + logger.warning(f"Not scaling in executor {ef.executor.label} because it is in bad state") for executor in self.executors.values(): logger.info(f"Shutting down executor {executor.label}") diff --git a/parsl/jobs/job_status_poller.py b/parsl/jobs/job_status_poller.py index 0709a17d30..247aa7bc66 100644 --- a/parsl/jobs/job_status_poller.py +++ b/parsl/jobs/job_status_poller.py @@ -16,7 +16,7 @@ logger = logging.getLogger(__name__) -class PollItem: +class PolledExecutorFacade: def __init__(self, executor: BlockProviderExecutor, dfk: Optional["parsl.dataflow.dflow.DataFlowKernel"] = None): self._executor = executor self._dfk = dfk @@ -110,7 +110,7 @@ class JobStatusPoller(Timer): def __init__(self, *, strategy: Optional[str], max_idletime: float, strategy_period: Union[float, int], dfk: Optional["parsl.dataflow.dflow.DataFlowKernel"] = None) -> None: - self._poll_items = [] # type: List[PollItem] + self._executor_facades = [] # type: List[PolledExecutorFacade] self.dfk = dfk self._strategy = Strategy(strategy=strategy, max_idletime=max_idletime) @@ -118,21 +118,21 @@ def __init__(self, *, strategy: Optional[str], max_idletime: float, def poll(self) -> None: self._update_state() - self._run_error_handlers(self._poll_items) - self._strategy.strategize(self._poll_items) + self._run_error_handlers(self._executor_facades) + self._strategy.strategize(self._executor_facades) - def _run_error_handlers(self, status: List[PollItem]) -> None: + def _run_error_handlers(self, status: List[PolledExecutorFacade]) -> None: for es in status: es.executor.handle_errors(es.status) def _update_state(self) -> None: now = time.time() - for item in self._poll_items: + for item in self._executor_facades: item.poll(now) def add_executors(self, executors: Sequence[BlockProviderExecutor]) -> None: for executor in executors: if executor.status_polling_interval > 0: logger.debug("Adding executor {}".format(executor.label)) - self._poll_items.append(PollItem(executor, self.dfk)) + self._executor_facades.append(PolledExecutorFacade(executor, self.dfk)) self._strategy.add_executors(executors) diff --git a/parsl/jobs/strategy.py b/parsl/jobs/strategy.py index 02519bd456..61a2d43e97 100644 --- a/parsl/jobs/strategy.py +++ b/parsl/jobs/strategy.py @@ -146,22 +146,22 @@ def add_executors(self, executors: Sequence[ParslExecutor]) -> None: for executor in executors: self.executors[executor.label] = {'idle_since': None} - def _strategy_init_only(self, status_list: List[jsp.PollItem]) -> None: + def _strategy_init_only(self, executor_facades: List[jsp.PolledExecutorFacade]) -> None: """Scale up to init_blocks at the start, then nothing more. """ - for exec_status in status_list: - if exec_status.first: - executor = exec_status.executor + for ef in executor_facades: + if ef.first: + executor = ef.executor logger.debug(f"strategy_init_only: scaling out {executor.provider.init_blocks} initial blocks for {executor.label}") - exec_status.scale_out(executor.provider.init_blocks) - exec_status.first = False + ef.scale_out(executor.provider.init_blocks) + ef.first = False else: logger.debug("strategy_init_only: doing nothing") - def _strategy_simple(self, status_list: List[jsp.PollItem]) -> None: - self._general_strategy(status_list, strategy_type='simple') + def _strategy_simple(self, executor_facades: List[jsp.PolledExecutorFacade]) -> None: + self._general_strategy(executor_facades, strategy_type='simple') - def _strategy_htex_auto_scale(self, status_list: List[jsp.PollItem]) -> None: + def _strategy_htex_auto_scale(self, executor_facades: List[jsp.PolledExecutorFacade]) -> None: """HTEX specific auto scaling strategy This strategy works only for HTEX. This strategy will scale out by @@ -176,30 +176,30 @@ def _strategy_htex_auto_scale(self, status_list: List[jsp.PollItem]) -> None: expected to scale in effectively only when # of workers, or tasks executing per block is close to 1. """ - self._general_strategy(status_list, strategy_type='htex') + self._general_strategy(executor_facades, strategy_type='htex') @wrap_with_logs - def _general_strategy(self, status_list, *, strategy_type): - logger.debug(f"general strategy starting with strategy_type {strategy_type} for {len(status_list)} executors") + def _general_strategy(self, executor_facades, *, strategy_type): + logger.debug(f"general strategy starting with strategy_type {strategy_type} for {len(executor_facades)} executors") - for exec_status in status_list: - executor = exec_status.executor + for ef in executor_facades: + executor = ef.executor label = executor.label if not isinstance(executor, BlockProviderExecutor): logger.debug(f"Not strategizing for executor {label} because scaling not enabled") continue logger.debug(f"Strategizing for executor {label}") - if exec_status.first: - executor = exec_status.executor + if ef.first: + executor = ef.executor logger.debug(f"Scaling out {executor.provider.init_blocks} initial blocks for {label}") - exec_status.scale_out(executor.provider.init_blocks) - exec_status.first = False + ef.scale_out(executor.provider.init_blocks) + ef.first = False # Tasks that are either pending completion active_tasks = executor.outstanding - status = exec_status.status + status = ef.status # FIXME we need to handle case where provider does not define these # FIXME probably more of this logic should be moved to the provider @@ -255,7 +255,7 @@ def _general_strategy(self, status_list, *, strategy_type): # We have resources idle for the max duration, # we have to scale_in now. logger.debug(f"Idle time has reached {self.max_idletime}s for executor {label}; scaling in") - exec_status.scale_in(active_blocks - min_blocks) + ef.scale_in(active_blocks - min_blocks) else: logger.debug( @@ -278,7 +278,7 @@ def _general_strategy(self, status_list, *, strategy_type): excess_blocks = math.ceil(float(excess_slots) / (tasks_per_node * nodes_per_block)) excess_blocks = min(excess_blocks, max_blocks - active_blocks) logger.debug(f"Requesting {excess_blocks} more blocks") - exec_status.scale_out(excess_blocks) + ef.scale_out(excess_blocks) elif active_slots == 0 and active_tasks > 0: logger.debug("Strategy case 4a: No active slots but some active tasks - could scale out by a single block") @@ -287,7 +287,7 @@ def _general_strategy(self, status_list, *, strategy_type): if active_blocks < max_blocks: logger.debug("Requesting single block") - exec_status.scale_out(1) + ef.scale_out(1) else: logger.debug("Not requesting single block, because at maxblocks already") @@ -303,7 +303,7 @@ def _general_strategy(self, status_list, *, strategy_type): excess_blocks = math.ceil(float(excess_slots) / (tasks_per_node * nodes_per_block)) excess_blocks = min(excess_blocks, active_blocks - min_blocks) logger.debug(f"Requesting scaling in by {excess_blocks} blocks with idle time {self.max_idletime}s") - exec_status.scale_in(excess_blocks, max_idletime=self.max_idletime) + ef.scale_in(excess_blocks, max_idletime=self.max_idletime) else: logger.error("This strategy does not support scaling in except for HighThroughputExecutor - taking no action") else: