Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make deliberately scaled-in unstarted blocks not be failures #3594

Merged
merged 14 commits into from
Aug 26, 2024
Merged
3 changes: 2 additions & 1 deletion parsl/executors/high_throughput/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -790,7 +790,8 @@ def status(self) -> Dict[str, JobStatus]:
connected_blocks = self.connected_blocks()
for job_id in job_status:
job_info = job_status[job_id]
if job_info.terminal and job_id not in connected_blocks:
if job_info.terminal and job_id not in connected_blocks and job_info.state != JobState.SCALED_IN:
logger.debug("Rewriting job %s from status %s to MISSING", job_id, job_info)
job_status[job_id].state = JobState.MISSING
if job_status[job_id].message is None:
job_status[job_id].message = (
Expand Down
7 changes: 5 additions & 2 deletions parsl/executors/status_handling.py
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,10 @@ def scale_in_facade(self, n: int, max_idletime: Optional[float] = None) -> List[
if block_ids is not None:
new_status = {}
for block_id in block_ids:
new_status[block_id] = JobStatus(JobState.CANCELLED)
del self._status[block_id]
logger.debug("Marking block %s as SCALED_IN", block_id)
s = JobStatus(JobState.SCALED_IN)
new_status[block_id] = s
self._status[block_id] = s
self._simulated_status[block_id] = s
self.send_monitoring_info(new_status)
return block_ids
7 changes: 6 additions & 1 deletion parsl/jobs/states.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,17 @@ class JobState(IntEnum):
bad worker environment or network connectivity issues.
"""

SCALED_IN = 9
"""This job has been deliberately scaled in. Scaling code should not be concerned
that the job never ran (for example for error handling purposes).
"""

def __str__(self) -> str:
return f"{self.__class__.__name__}.{self.name}"


TERMINAL_STATES = [JobState.CANCELLED, JobState.COMPLETED, JobState.FAILED,
JobState.TIMEOUT, JobState.MISSING]
JobState.TIMEOUT, JobState.MISSING, JobState.SCALED_IN]


class JobStatus:
Expand Down
8 changes: 3 additions & 5 deletions parsl/tests/test_htex/test_multiple_disconnected_blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,14 @@ def local_config():
poll_period=100,
max_workers_per_node=1,
provider=LocalProvider(
worker_init="conda deactivate; export PATH=''; which python; exit 0",
init_blocks=2,
max_blocks=4,
min_blocks=0,
worker_init="exit 0",
init_blocks=2
),
)
],
run_dir="/tmp/test_htex",
max_idletime=0.5,
strategy='htex_auto_scale',
strategy='none',
)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,6 @@ def test_row_counts(tmpd_cwd, strategy):
(c, ) = result.first()
assert c == 1, "There should be a single pending status"

result = connection.execute(text("SELECT COUNT(*) FROM block WHERE block_id = 0 AND status = 'CANCELLED' AND run_id = :run_id"), binds)
result = connection.execute(text("SELECT COUNT(*) FROM block WHERE block_id = 0 AND status = 'SCALED_IN' AND run_id = :run_id"), binds)
(c, ) = result.first()
assert c == 1, "There should be a single cancelled status"
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
import time

import pytest

import parsl
from parsl.channels import LocalChannel
from parsl.config import Config
from parsl.executors import HighThroughputExecutor
from parsl.launchers import WrappedLauncher
from parsl.providers import LocalProvider


def local_config():
return Config(
max_idletime=1,
strategy='htex_auto_scale',
strategy_period=1,
executors=[
HighThroughputExecutor(
label="htex_local",
address="127.0.0.1",
cores_per_worker=1,
encrypted=True,
provider=LocalProvider(
channel=LocalChannel(),
init_blocks=1,
min_blocks=0,
max_blocks=1,
# TODO: swap out the launcher during the test. make it use a much longer sleep (or infinite sleep)
# to give blocks that run but never complete themselves. then swap that for a launcher which
# launches the worker pool immediately.
launcher=WrappedLauncher(prepend="sleep inf ; "),
),
)
],
)


@parsl.python_app
def task():
return 7


@pytest.mark.local
def test_regression(try_assert):
# config means that we should start with one block and very rapidly scale
# it down to 0 blocks. Because of 'sleep inf' in the WrappedLaucher, the
# block will not ever register. This lack of registration is part of the
# bug being tested for regression.

# After that scaling has happened, we should see that we have one block
# and it should be in a terminal state. We should also see htex reporting
# no blocks connected.

# Give 10 strategy periods for the above to happen: each step of scale up,
# and scale down due to idleness isn't guaranteed to happen in exactly one
# scaling step.

htex = parsl.dfk().executors['htex_local']

try_assert(lambda: len(htex.status_facade) == 1 and htex.status_facade['0'].terminal,
timeout_ms=10000)
Comment on lines +68 to +69
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider using the fail_msg argument to explain that this is a block. (Analogous to assert ..., "fail_msg")


assert htex.connected_blocks() == [], "No block should have connected to interchange"

# Now we can reconfigure the launcher to let subsequent blocks launch ok,
# and run a trivial task. That trivial task will scale up a new block and
# run the task successfully.

# Prior to issue #3568, the bug was that the scale in of the first
# block earlier in the test case would have been treated as a failure,
# and then the block error handler would have treated that failure as a
# permanent htex failure, and so the task execution below would raise
# a BadStateException rather than attempt to run the task.

assert htex.provider.launcher.prepend != "", "Pre-req: prepend attribute should exist and be non-empty"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would use of .startswith() work? If so, it makes for a stronger assertion / good faith in the test. Perhaps:

_inf_sleep = "sleep inf ; "
...
    launcher=WrappedLauncher(prepend=_inf_sleep),
...
    assert htex.provider.launcher.prepend.startswith(_inf_sleep), "..."

htex.provider.launcher.prepend = ""
assert task().result() == 7
Comment on lines +84 to +85
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you distill the comment above into the fail msg for this assert? Not an easy task, I know, but when this does fail, makes it that easier to dig in.

Loading