Skip to content

Commit

Permalink
Move 'first' field into strategy state (#3317)
Browse files Browse the repository at this point in the history
This is part of work to move JobStatusPoller facade state into other classes, as part of job handling rearrangements in PR #3293

This should not change behaviour: each executor has a single PolledExecutorFacade and a single strategy.ExecutorState, and this PR moves the 'first' field from one to the other.

parsl/tests/test_monitoring/test_htex_init_blocks_vs_monitoring.py tests that init_blocks handling still fires properly - that's what is switched by this 'first' field.
  • Loading branch information
benclifford authored Apr 8, 2024
1 parent 5072633 commit ad69440
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 7 deletions.
1 change: 0 additions & 1 deletion parsl/jobs/job_status_poller.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ def __init__(self, executor: BlockProviderExecutor, dfk: Optional["parsl.dataflo
self._interval = executor.status_polling_interval
self._last_poll_time = 0.0
self._status = {} # type: Dict[str, JobStatus]
self.first = True

# Create a ZMQ channel to send poll status to monitoring
self.monitoring_enabled = False
Expand Down
16 changes: 10 additions & 6 deletions parsl/jobs/strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ class ExecutorState(TypedDict):
If the executor is not idle, then None.
"""

first: bool
"""True if this executor has not yet had a strategy poll.
"""


class Strategy:
"""Scaling strategy.
Expand Down Expand Up @@ -144,17 +148,17 @@ def __init__(self, *, strategy: Optional[str], max_idletime: float) -> None:

def add_executors(self, executors: Sequence[ParslExecutor]) -> None:
for executor in executors:
self.executors[executor.label] = {'idle_since': None}
self.executors[executor.label] = {'idle_since': None, 'first': True}

def _strategy_init_only(self, executor_facades: List[jsp.PolledExecutorFacade]) -> None:
"""Scale up to init_blocks at the start, then nothing more.
"""
for ef in executor_facades:
if ef.first:
executor = ef.executor
executor = ef.executor
if self.executors[executor.label]['first']:
logger.debug(f"strategy_init_only: scaling out {executor.provider.init_blocks} initial blocks for {executor.label}")
ef.scale_out(executor.provider.init_blocks)
ef.first = False
self.executors[executor.label]['first'] = False
else:
logger.debug("strategy_init_only: doing nothing")

Expand Down Expand Up @@ -190,11 +194,11 @@ def _general_strategy(self, executor_facades, *, strategy_type):
continue
logger.debug(f"Strategizing for executor {label}")

if ef.first:
if self.executors[label]['first']:
executor = ef.executor
logger.debug(f"Scaling out {executor.provider.init_blocks} initial blocks for {label}")
ef.scale_out(executor.provider.init_blocks)
ef.first = False
self.executors[label]['first'] = False

# Tasks that are either pending completion
active_tasks = executor.outstanding
Expand Down

0 comments on commit ad69440

Please sign in to comment.