Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move 'first' field into strategy state #3317

Merged
merged 3 commits into from
Apr 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion parsl/jobs/job_status_poller.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ def __init__(self, executor: BlockProviderExecutor, dfk: Optional["parsl.dataflo
self._interval = executor.status_polling_interval
self._last_poll_time = 0.0
self._status = {} # type: Dict[str, JobStatus]
self.first = True

# Create a ZMQ channel to send poll status to monitoring
self.monitoring_enabled = False
Expand Down
16 changes: 10 additions & 6 deletions parsl/jobs/strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ class ExecutorState(TypedDict):
If the executor is not idle, then None.
"""

first: bool
"""True if this executor has not yet had a strategy poll.
"""


class Strategy:
"""Scaling strategy.
Expand Down Expand Up @@ -144,17 +148,17 @@ def __init__(self, *, strategy: Optional[str], max_idletime: float) -> None:

def add_executors(self, executors: Sequence[ParslExecutor]) -> None:
for executor in executors:
self.executors[executor.label] = {'idle_since': None}
self.executors[executor.label] = {'idle_since': None, 'first': True}

def _strategy_init_only(self, executor_facades: List[jsp.PolledExecutorFacade]) -> None:
"""Scale up to init_blocks at the start, then nothing more.
"""
for ef in executor_facades:
if ef.first:
executor = ef.executor
executor = ef.executor
if self.executors[executor.label]['first']:
logger.debug(f"strategy_init_only: scaling out {executor.provider.init_blocks} initial blocks for {executor.label}")
ef.scale_out(executor.provider.init_blocks)
ef.first = False
self.executors[executor.label]['first'] = False
else:
logger.debug("strategy_init_only: doing nothing")

Expand Down Expand Up @@ -190,11 +194,11 @@ def _general_strategy(self, executor_facades, *, strategy_type):
continue
logger.debug(f"Strategizing for executor {label}")

if ef.first:
if self.executors[label]['first']:
executor = ef.executor
logger.debug(f"Scaling out {executor.provider.init_blocks} initial blocks for {label}")
ef.scale_out(executor.provider.init_blocks)
ef.first = False
self.executors[label]['first'] = False

# Tasks that are either pending completion
active_tasks = executor.outstanding
Expand Down
Loading