Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make deliberately scaled-in unstarted blocks not be failures #3594

Merged
merged 14 commits into from
Aug 26, 2024
Merged
3 changes: 2 additions & 1 deletion parsl/executors/high_throughput/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -790,7 +790,8 @@ def status(self) -> Dict[str, JobStatus]:
connected_blocks = self.connected_blocks()
for job_id in job_status:
job_info = job_status[job_id]
if job_info.terminal and job_id not in connected_blocks:
if job_info.terminal and job_id not in connected_blocks and job_info.state != JobState.SCALED_IN:
logger.debug("Rewriting job %s from status %s to MISSING", job_id, job_info)
job_status[job_id].state = JobState.MISSING
if job_status[job_id].message is None:
job_status[job_id].message = (
Expand Down
7 changes: 5 additions & 2 deletions parsl/executors/status_handling.py
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,10 @@ def scale_in_facade(self, n: int, max_idletime: Optional[float] = None) -> List[
if block_ids is not None:
new_status = {}
for block_id in block_ids:
new_status[block_id] = JobStatus(JobState.CANCELLED)
del self._status[block_id]
logger.debug("Marking block %s as SCALED_IN", block_id)
s = JobStatus(JobState.SCALED_IN)
new_status[block_id] = s
self._status[block_id] = s
self._simulated_status[block_id] = s
self.send_monitoring_info(new_status)
return block_ids
7 changes: 6 additions & 1 deletion parsl/jobs/states.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,17 @@ class JobState(IntEnum):
bad worker environment or network connectivity issues.
"""

SCALED_IN = 9
"""This job has been deliberately scaled in. Scaling code should not be concerned
that the job never ran (for example for error handling purposes).
"""

def __str__(self) -> str:
return f"{self.__class__.__name__}.{self.name}"


TERMINAL_STATES = [JobState.CANCELLED, JobState.COMPLETED, JobState.FAILED,
JobState.TIMEOUT, JobState.MISSING]
JobState.TIMEOUT, JobState.MISSING, JobState.SCALED_IN]


class JobStatus:
Expand Down
8 changes: 3 additions & 5 deletions parsl/tests/test_htex/test_multiple_disconnected_blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,14 @@ def local_config():
poll_period=100,
max_workers_per_node=1,
provider=LocalProvider(
worker_init="conda deactivate; export PATH=''; which python; exit 0",
init_blocks=2,
max_blocks=4,
min_blocks=0,
worker_init="exit 0",
init_blocks=2
),
)
],
run_dir="/tmp/test_htex",
max_idletime=0.5,
strategy='htex_auto_scale',
strategy='none',
)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,6 @@ def test_row_counts(tmpd_cwd, strategy):
(c, ) = result.first()
assert c == 1, "There should be a single pending status"

result = connection.execute(text("SELECT COUNT(*) FROM block WHERE block_id = 0 AND status = 'CANCELLED' AND run_id = :run_id"), binds)
result = connection.execute(text("SELECT COUNT(*) FROM block WHERE block_id = 0 AND status = 'SCALED_IN' AND run_id = :run_id"), binds)
(c, ) = result.first()
assert c == 1, "There should be a single cancelled status"
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
import time

import pytest

import parsl
from parsl.channels import LocalChannel
from parsl.config import Config
from parsl.executors import HighThroughputExecutor
from parsl.launchers import WrappedLauncher
from parsl.providers import LocalProvider


def local_config():
# see the comments inside test_regression for reasoning about why each
# of these parameters is set why it is.
return Config(
max_idletime=1,

strategy='htex_auto_scale',
strategy_period=1,

executors=[
HighThroughputExecutor(
label="htex_local",
encrypted=True,
provider=LocalProvider(
init_blocks=1,
min_blocks=0,
max_blocks=1,
launcher=WrappedLauncher(prepend="sleep inf ; "),
),
)
],
)


@parsl.python_app
def task():
return 7


@pytest.mark.local
def test_regression(try_assert):
# The above config means that we should start scaling out one initial
# block, but then scale it back in after a second or so if the executor
# is kept idle (which this test does using try_assert).

# Because of 'sleep inf' in the WrappedLaucher, the block will not ever
# register.

# The bug being tested is about mistreatment of blocks which are scaled in
# before they have a chance to register, and the above forces that to
# happen.

# After that scaling in has happened, we should see that we have one block
# and it should be in a terminal state. The below try_assert waits for
# that to become true.

# At that time, we should also see htex reporting no blocks registered - as
# mentioned above, that is a necessary part of the bug being tested here.

# Give 10 strategy periods for the above to happen: each step of scale up,
# and scale down due to idleness isn't guaranteed to happen in exactly one
# scaling step.

htex = parsl.dfk().executors['htex_local']

try_assert(lambda: len(htex.status_facade) == 1 and htex.status_facade['0'].terminal,
timeout_ms=10000)
Comment on lines +68 to +69
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider using the fail_msg argument to explain that this is a block. (Analogous to assert ..., "fail_msg")


assert htex.connected_blocks() == [], "No block should have connected to interchange"

# Now we can reconfigure the launcher to let subsequent blocks launch ok,
# and run a trivial task. That trivial task will scale up a new block and
# run the task successfully.

# Prior to issue #3568, the bug was that the scale in of the first
# block earlier in the test case would have incorrectly been treated as a
# failure, and then the block error handler would have treated that failure
# as a permanent htex failure, and so the task execution below would raise
# a BadStateException rather than attempt to run the task.

assert htex.provider.launcher.prepend != "", "Pre-req: prepend attribute should exist and be non-empty"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would use of .startswith() work? If so, it makes for a stronger assertion / good faith in the test. Perhaps:

_inf_sleep = "sleep inf ; "
...
    launcher=WrappedLauncher(prepend=_inf_sleep),
...
    assert htex.provider.launcher.prepend.startswith(_inf_sleep), "..."

htex.provider.launcher.prepend = ""
assert task().result() == 7
Comment on lines +84 to +85
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you distill the comment above into the fail msg for this assert? Not an easy task, I know, but when this does fail, makes it that easier to dig in.

Loading