diff --git a/docs/reference.rst b/docs/reference.rst index d8e18bd244..f2d89afaf8 100644 --- a/docs/reference.rst +++ b/docs/reference.rst @@ -93,6 +93,7 @@ Launchers parsl.launchers.SrunMPILauncher parsl.launchers.GnuParallelLauncher parsl.launchers.MpiExecLauncher + parsl.launchers.MpiRunLauncher parsl.launchers.JsrunLauncher parsl.launchers.WrappedLauncher diff --git a/docs/userguide/configuring.rst b/docs/userguide/configuring.rst index f3fe5cc407..a57e815fe7 100644 --- a/docs/userguide/configuring.rst +++ b/docs/userguide/configuring.rst @@ -536,12 +536,27 @@ Center's **Expanse** supercomputer. The example is designed to be executed on th .. literalinclude:: ../../parsl/configs/expanse.py +Improv (Argonne LCRC) +--------------------- + +.. image:: https://www.lcrc.anl.gov/sites/default/files/styles/965_wide/public/2023-12/20231214_114057.jpg?itok=A-Rz5pP9 + +**Improv** is a PBS Pro based supercomputer at Argonne's Laboratory Computing Resource +Center (LCRC). The following snippet is an example configuration that uses `parsl.providers.PBSProProvider` +and `parsl.launchers.MpiRunLauncher` to run on multinode jobs. + +.. literalinclude:: ../../parsl/configs/improv.py + + .. _configuring_nersc_cori: Perlmutter (NERSC) ------------------ NERSC provides documentation on `how to use Parsl on Perlmutter `_. +Perlmutter is a Slurm based HPC system and parsl uses `parsl.providers.SlurmProvider` with `parsl.launchers.SrunLauncher` +to launch tasks onto this machine. + Frontera (TACC) --------------- @@ -599,6 +614,8 @@ Polaris (ALCF) :width: 75% ALCF provides documentation on `how to use Parsl on Polaris `_. +Polaris uses `parsl.providers.PBSProProvider` and `parsl.launchers.MpiExecLauncher` to launch tasks onto the HPC system. + Stampede2 (TACC) diff --git a/parsl/configs/improv.py b/parsl/configs/improv.py new file mode 100644 index 0000000000..8a40282829 --- /dev/null +++ b/parsl/configs/improv.py @@ -0,0 +1,34 @@ +from parsl.config import Config +from parsl.executors import HighThroughputExecutor +from parsl.launchers import MpiRunLauncher +from parsl.providers import PBSProProvider + +config = Config( + executors=[ + HighThroughputExecutor( + label="Improv_multinode", + max_workers_per_node=32, + provider=PBSProProvider( + account="YOUR_ALLOCATION_ON_IMPROV", + # PBS directives (header lines), for example: + # scheduler_options='#PBS -l mem=4gb', + scheduler_options='', + + queue="compute", + + # Command to be run before starting a worker: + # **WARNING** Improv requires an openmpi module to be + # loaded for the MpiRunLauncher. Add additional env + # load commands to this multiline string. + worker_init=''' +module load gcc/13.2.0; +module load openmpi/5.0.3-gcc-13.2.0; ''', + launcher=MpiRunLauncher(), + + # number of compute nodes allocated for each block + nodes_per_block=2, + walltime='00:10:00' + ), + ), + ], +) diff --git a/parsl/executors/high_throughput/executor.py b/parsl/executors/high_throughput/executor.py index c4097500f1..0e0ea9c892 100644 --- a/parsl/executors/high_throughput/executor.py +++ b/parsl/executors/high_throughput/executor.py @@ -790,7 +790,8 @@ def status(self) -> Dict[str, JobStatus]: connected_blocks = self.connected_blocks() for job_id in job_status: job_info = job_status[job_id] - if job_info.terminal and job_id not in connected_blocks: + if job_info.terminal and job_id not in connected_blocks and job_info.state != JobState.SCALED_IN: + logger.debug("Rewriting job %s from status %s to MISSING", job_id, job_info) job_status[job_id].state = JobState.MISSING if job_status[job_id].message is None: job_status[job_id].message = ( diff --git a/parsl/executors/status_handling.py b/parsl/executors/status_handling.py index 34db2300f6..615f09de78 100644 --- a/parsl/executors/status_handling.py +++ b/parsl/executors/status_handling.py @@ -347,7 +347,10 @@ def scale_in_facade(self, n: int, max_idletime: Optional[float] = None) -> List[ if block_ids is not None: new_status = {} for block_id in block_ids: - new_status[block_id] = JobStatus(JobState.CANCELLED) - del self._status[block_id] + logger.debug("Marking block %s as SCALED_IN", block_id) + s = JobStatus(JobState.SCALED_IN) + new_status[block_id] = s + self._status[block_id] = s + self._simulated_status[block_id] = s self.send_monitoring_info(new_status) return block_ids diff --git a/parsl/jobs/states.py b/parsl/jobs/states.py index 7ba4aae94e..792a515bac 100644 --- a/parsl/jobs/states.py +++ b/parsl/jobs/states.py @@ -46,12 +46,17 @@ class JobState(IntEnum): bad worker environment or network connectivity issues. """ + SCALED_IN = 9 + """This job has been deliberately scaled in. Scaling code should not be concerned + that the job never ran (for example for error handling purposes). + """ + def __str__(self) -> str: return f"{self.__class__.__name__}.{self.name}" TERMINAL_STATES = [JobState.CANCELLED, JobState.COMPLETED, JobState.FAILED, - JobState.TIMEOUT, JobState.MISSING] + JobState.TIMEOUT, JobState.MISSING, JobState.SCALED_IN] class JobStatus: diff --git a/parsl/tests/test_htex/test_multiple_disconnected_blocks.py b/parsl/tests/test_htex/test_multiple_disconnected_blocks.py index 159c20f58d..4168f41b79 100644 --- a/parsl/tests/test_htex/test_multiple_disconnected_blocks.py +++ b/parsl/tests/test_htex/test_multiple_disconnected_blocks.py @@ -21,16 +21,14 @@ def local_config(): poll_period=100, max_workers_per_node=1, provider=LocalProvider( - worker_init="conda deactivate; export PATH=''; which python; exit 0", - init_blocks=2, - max_blocks=4, - min_blocks=0, + worker_init="exit 0", + init_blocks=2 ), ) ], run_dir="/tmp/test_htex", max_idletime=0.5, - strategy='htex_auto_scale', + strategy='none', ) diff --git a/parsl/tests/test_monitoring/test_htex_init_blocks_vs_monitoring.py b/parsl/tests/test_monitoring/test_htex_init_blocks_vs_monitoring.py index eb7a25003b..ada972e747 100644 --- a/parsl/tests/test_monitoring/test_htex_init_blocks_vs_monitoring.py +++ b/parsl/tests/test_monitoring/test_htex_init_blocks_vs_monitoring.py @@ -78,6 +78,6 @@ def test_row_counts(tmpd_cwd, strategy): (c, ) = result.first() assert c == 1, "There should be a single pending status" - result = connection.execute(text("SELECT COUNT(*) FROM block WHERE block_id = 0 AND status = 'CANCELLED' AND run_id = :run_id"), binds) + result = connection.execute(text("SELECT COUNT(*) FROM block WHERE block_id = 0 AND status = 'SCALED_IN' AND run_id = :run_id"), binds) (c, ) = result.first() assert c == 1, "There should be a single cancelled status" diff --git a/parsl/tests/test_scaling/test_regression_3568_scaledown_vs_MISSING.py b/parsl/tests/test_scaling/test_regression_3568_scaledown_vs_MISSING.py new file mode 100644 index 0000000000..a56b53af10 --- /dev/null +++ b/parsl/tests/test_scaling/test_regression_3568_scaledown_vs_MISSING.py @@ -0,0 +1,85 @@ +import time + +import pytest + +import parsl +from parsl.channels import LocalChannel +from parsl.config import Config +from parsl.executors import HighThroughputExecutor +from parsl.launchers import WrappedLauncher +from parsl.providers import LocalProvider + + +def local_config(): + # see the comments inside test_regression for reasoning about why each + # of these parameters is set why it is. + return Config( + max_idletime=1, + + strategy='htex_auto_scale', + strategy_period=1, + + executors=[ + HighThroughputExecutor( + label="htex_local", + encrypted=True, + provider=LocalProvider( + init_blocks=1, + min_blocks=0, + max_blocks=1, + launcher=WrappedLauncher(prepend="sleep inf ; "), + ), + ) + ], + ) + + +@parsl.python_app +def task(): + return 7 + + +@pytest.mark.local +def test_regression(try_assert): + # The above config means that we should start scaling out one initial + # block, but then scale it back in after a second or so if the executor + # is kept idle (which this test does using try_assert). + + # Because of 'sleep inf' in the WrappedLaucher, the block will not ever + # register. + + # The bug being tested is about mistreatment of blocks which are scaled in + # before they have a chance to register, and the above forces that to + # happen. + + # After that scaling in has happened, we should see that we have one block + # and it should be in a terminal state. The below try_assert waits for + # that to become true. + + # At that time, we should also see htex reporting no blocks registered - as + # mentioned above, that is a necessary part of the bug being tested here. + + # Give 10 strategy periods for the above to happen: each step of scale up, + # and scale down due to idleness isn't guaranteed to happen in exactly one + # scaling step. + + htex = parsl.dfk().executors['htex_local'] + + try_assert(lambda: len(htex.status_facade) == 1 and htex.status_facade['0'].terminal, + timeout_ms=10000) + + assert htex.connected_blocks() == [], "No block should have connected to interchange" + + # Now we can reconfigure the launcher to let subsequent blocks launch ok, + # and run a trivial task. That trivial task will scale up a new block and + # run the task successfully. + + # Prior to issue #3568, the bug was that the scale in of the first + # block earlier in the test case would have incorrectly been treated as a + # failure, and then the block error handler would have treated that failure + # as a permanent htex failure, and so the task execution below would raise + # a BadStateException rather than attempt to run the task. + + assert htex.provider.launcher.prepend != "", "Pre-req: prepend attribute should exist and be non-empty" + htex.provider.launcher.prepend = "" + assert task().result() == 7