Skip to content

Commit

Permalink
Move 'first' field into strategy state
Browse files Browse the repository at this point in the history
This is part of work to move JobStatusPoller facade state into other
classes, as part of job handling rearrangements in PR #3293

This should not change behaviour: each executor has a single
PolledExecutorFacade and a single strategy.ExecutorState, and this
PR moves the 'first' field from one to the other.

parsl/tests/test_monitoring/test_htex_init_blocks_vs_monitoring.py tests that
init_blocks handling still fires properly - that's what is switched by this
'first' field.
  • Loading branch information
benclifford committed Apr 8, 2024
1 parent 7fba7d6 commit 0d5081e
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 7 deletions.
1 change: 0 additions & 1 deletion parsl/jobs/job_status_poller.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ def __init__(self, executor: BlockProviderExecutor, dfk: Optional["parsl.dataflo
self._interval = executor.status_polling_interval
self._last_poll_time = 0.0
self._status = {} # type: Dict[str, JobStatus]
self.first = True

# Create a ZMQ channel to send poll status to monitoring
self.monitoring_enabled = False
Expand Down
16 changes: 10 additions & 6 deletions parsl/jobs/strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ class ExecutorState(TypedDict):
If the executor is not idle, then None.
"""

first: bool
"""Is this the first poll for this executor?
"""


class Strategy:
"""Scaling strategy.
Expand Down Expand Up @@ -144,17 +148,17 @@ def __init__(self, *, strategy: Optional[str], max_idletime: float) -> None:

def add_executors(self, executors: Sequence[ParslExecutor]) -> None:
for executor in executors:
self.executors[executor.label] = {'idle_since': None}
self.executors[executor.label] = {'idle_since': None, 'first': True}

def _strategy_init_only(self, executor_facades: List[jsp.PolledExecutorFacade]) -> None:
"""Scale up to init_blocks at the start, then nothing more.
"""
for ef in executor_facades:
if ef.first:
executor = ef.executor
executor = ef.executor
if self.executors[executor.label]['first']:
logger.debug(f"strategy_init_only: scaling out {executor.provider.init_blocks} initial blocks for {executor.label}")
ef.scale_out(executor.provider.init_blocks)
ef.first = False
self.executors[executor.label]['first'] = False
else:
logger.debug("strategy_init_only: doing nothing")

Expand Down Expand Up @@ -190,11 +194,11 @@ def _general_strategy(self, executor_facades, *, strategy_type):
continue
logger.debug(f"Strategizing for executor {label}")

if ef.first:
if self.executors[label]['first']:
executor = ef.executor
logger.debug(f"Scaling out {executor.provider.init_blocks} initial blocks for {label}")
ef.scale_out(executor.provider.init_blocks)
ef.first = False
self.executors[label]['first'] = False

# Tasks that are either pending completion
active_tasks = executor.outstanding
Expand Down

0 comments on commit 0d5081e

Please sign in to comment.