diff --git a/parsl/jobs/job_status_poller.py b/parsl/jobs/job_status_poller.py index 5a7f71ab32..7f57ce1d6d 100644 --- a/parsl/jobs/job_status_poller.py +++ b/parsl/jobs/job_status_poller.py @@ -23,7 +23,6 @@ def __init__(self, executor: BlockProviderExecutor, dfk: Optional["parsl.dataflo self._interval = executor.status_polling_interval self._last_poll_time = 0.0 self._status = {} # type: Dict[str, JobStatus] - self.first = True # Create a ZMQ channel to send poll status to monitoring self.monitoring_enabled = False diff --git a/parsl/jobs/strategy.py b/parsl/jobs/strategy.py index 61a2d43e97..d8f3155c11 100644 --- a/parsl/jobs/strategy.py +++ b/parsl/jobs/strategy.py @@ -26,6 +26,10 @@ class ExecutorState(TypedDict): If the executor is not idle, then None. """ + first: bool + """True if this executor has not yet had a strategy poll. + """ + class Strategy: """Scaling strategy. @@ -144,17 +148,17 @@ def __init__(self, *, strategy: Optional[str], max_idletime: float) -> None: def add_executors(self, executors: Sequence[ParslExecutor]) -> None: for executor in executors: - self.executors[executor.label] = {'idle_since': None} + self.executors[executor.label] = {'idle_since': None, 'first': True} def _strategy_init_only(self, executor_facades: List[jsp.PolledExecutorFacade]) -> None: """Scale up to init_blocks at the start, then nothing more. """ for ef in executor_facades: - if ef.first: - executor = ef.executor + executor = ef.executor + if self.executors[executor.label]['first']: logger.debug(f"strategy_init_only: scaling out {executor.provider.init_blocks} initial blocks for {executor.label}") ef.scale_out(executor.provider.init_blocks) - ef.first = False + self.executors[executor.label]['first'] = False else: logger.debug("strategy_init_only: doing nothing") @@ -190,11 +194,11 @@ def _general_strategy(self, executor_facades, *, strategy_type): continue logger.debug(f"Strategizing for executor {label}") - if ef.first: + if self.executors[label]['first']: executor = ef.executor logger.debug(f"Scaling out {executor.provider.init_blocks} initial blocks for {label}") ef.scale_out(executor.provider.init_blocks) - ef.first = False + self.executors[label]['first'] = False # Tasks that are either pending completion active_tasks = executor.outstanding