Skip to content

Commit

Permalink
Merge branch 'Parsl:master' into Glossary
Browse files Browse the repository at this point in the history
  • Loading branch information
Kanegraffiti authored Aug 29, 2024
2 parents 7beda8a + 3f2bf18 commit c5a427c
Show file tree
Hide file tree
Showing 9 changed files with 154 additions and 10 deletions.
1 change: 1 addition & 0 deletions docs/reference.rst
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ Launchers
parsl.launchers.SrunMPILauncher
parsl.launchers.GnuParallelLauncher
parsl.launchers.MpiExecLauncher
parsl.launchers.MpiRunLauncher
parsl.launchers.JsrunLauncher
parsl.launchers.WrappedLauncher

Expand Down
17 changes: 17 additions & 0 deletions docs/userguide/configuring.rst
Original file line number Diff line number Diff line change
Expand Up @@ -536,12 +536,27 @@ Center's **Expanse** supercomputer. The example is designed to be executed on th
.. literalinclude:: ../../parsl/configs/expanse.py


Improv (Argonne LCRC)
---------------------

.. image:: https://www.lcrc.anl.gov/sites/default/files/styles/965_wide/public/2023-12/20231214_114057.jpg?itok=A-Rz5pP9

**Improv** is a PBS Pro based supercomputer at Argonne's Laboratory Computing Resource
Center (LCRC). The following snippet is an example configuration that uses `parsl.providers.PBSProProvider`
and `parsl.launchers.MpiRunLauncher` to run on multinode jobs.

.. literalinclude:: ../../parsl/configs/improv.py


.. _configuring_nersc_cori:

Perlmutter (NERSC)
------------------

NERSC provides documentation on `how to use Parsl on Perlmutter <https://docs.nersc.gov/jobs/workflow/parsl/>`_.
Perlmutter is a Slurm based HPC system and parsl uses `parsl.providers.SlurmProvider` with `parsl.launchers.SrunLauncher`
to launch tasks onto this machine.


Frontera (TACC)
---------------
Expand Down Expand Up @@ -599,6 +614,8 @@ Polaris (ALCF)
:width: 75%

ALCF provides documentation on `how to use Parsl on Polaris <https://docs.alcf.anl.gov/polaris/workflows/parsl/>`_.
Polaris uses `parsl.providers.PBSProProvider` and `parsl.launchers.MpiExecLauncher` to launch tasks onto the HPC system.



Stampede2 (TACC)
Expand Down
34 changes: 34 additions & 0 deletions parsl/configs/improv.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from parsl.config import Config
from parsl.executors import HighThroughputExecutor
from parsl.launchers import MpiRunLauncher
from parsl.providers import PBSProProvider

config = Config(
executors=[
HighThroughputExecutor(
label="Improv_multinode",
max_workers_per_node=32,
provider=PBSProProvider(
account="YOUR_ALLOCATION_ON_IMPROV",
# PBS directives (header lines), for example:
# scheduler_options='#PBS -l mem=4gb',
scheduler_options='',

queue="compute",

# Command to be run before starting a worker:
# **WARNING** Improv requires an openmpi module to be
# loaded for the MpiRunLauncher. Add additional env
# load commands to this multiline string.
worker_init='''
module load gcc/13.2.0;
module load openmpi/5.0.3-gcc-13.2.0; ''',
launcher=MpiRunLauncher(),

# number of compute nodes allocated for each block
nodes_per_block=2,
walltime='00:10:00'
),
),
],
)
3 changes: 2 additions & 1 deletion parsl/executors/high_throughput/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -790,7 +790,8 @@ def status(self) -> Dict[str, JobStatus]:
connected_blocks = self.connected_blocks()
for job_id in job_status:
job_info = job_status[job_id]
if job_info.terminal and job_id not in connected_blocks:
if job_info.terminal and job_id not in connected_blocks and job_info.state != JobState.SCALED_IN:
logger.debug("Rewriting job %s from status %s to MISSING", job_id, job_info)
job_status[job_id].state = JobState.MISSING
if job_status[job_id].message is None:
job_status[job_id].message = (
Expand Down
7 changes: 5 additions & 2 deletions parsl/executors/status_handling.py
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,10 @@ def scale_in_facade(self, n: int, max_idletime: Optional[float] = None) -> List[
if block_ids is not None:
new_status = {}
for block_id in block_ids:
new_status[block_id] = JobStatus(JobState.CANCELLED)
del self._status[block_id]
logger.debug("Marking block %s as SCALED_IN", block_id)
s = JobStatus(JobState.SCALED_IN)
new_status[block_id] = s
self._status[block_id] = s
self._simulated_status[block_id] = s
self.send_monitoring_info(new_status)
return block_ids
7 changes: 6 additions & 1 deletion parsl/jobs/states.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,17 @@ class JobState(IntEnum):
bad worker environment or network connectivity issues.
"""

SCALED_IN = 9
"""This job has been deliberately scaled in. Scaling code should not be concerned
that the job never ran (for example for error handling purposes).
"""

def __str__(self) -> str:
return f"{self.__class__.__name__}.{self.name}"


TERMINAL_STATES = [JobState.CANCELLED, JobState.COMPLETED, JobState.FAILED,
JobState.TIMEOUT, JobState.MISSING]
JobState.TIMEOUT, JobState.MISSING, JobState.SCALED_IN]


class JobStatus:
Expand Down
8 changes: 3 additions & 5 deletions parsl/tests/test_htex/test_multiple_disconnected_blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,14 @@ def local_config():
poll_period=100,
max_workers_per_node=1,
provider=LocalProvider(
worker_init="conda deactivate; export PATH=''; which python; exit 0",
init_blocks=2,
max_blocks=4,
min_blocks=0,
worker_init="exit 0",
init_blocks=2
),
)
],
run_dir="/tmp/test_htex",
max_idletime=0.5,
strategy='htex_auto_scale',
strategy='none',
)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,6 @@ def test_row_counts(tmpd_cwd, strategy):
(c, ) = result.first()
assert c == 1, "There should be a single pending status"

result = connection.execute(text("SELECT COUNT(*) FROM block WHERE block_id = 0 AND status = 'CANCELLED' AND run_id = :run_id"), binds)
result = connection.execute(text("SELECT COUNT(*) FROM block WHERE block_id = 0 AND status = 'SCALED_IN' AND run_id = :run_id"), binds)
(c, ) = result.first()
assert c == 1, "There should be a single cancelled status"
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
import time

import pytest

import parsl
from parsl.channels import LocalChannel
from parsl.config import Config
from parsl.executors import HighThroughputExecutor
from parsl.launchers import WrappedLauncher
from parsl.providers import LocalProvider


def local_config():
# see the comments inside test_regression for reasoning about why each
# of these parameters is set why it is.
return Config(
max_idletime=1,

strategy='htex_auto_scale',
strategy_period=1,

executors=[
HighThroughputExecutor(
label="htex_local",
encrypted=True,
provider=LocalProvider(
init_blocks=1,
min_blocks=0,
max_blocks=1,
launcher=WrappedLauncher(prepend="sleep inf ; "),
),
)
],
)


@parsl.python_app
def task():
return 7


@pytest.mark.local
def test_regression(try_assert):
# The above config means that we should start scaling out one initial
# block, but then scale it back in after a second or so if the executor
# is kept idle (which this test does using try_assert).

# Because of 'sleep inf' in the WrappedLaucher, the block will not ever
# register.

# The bug being tested is about mistreatment of blocks which are scaled in
# before they have a chance to register, and the above forces that to
# happen.

# After that scaling in has happened, we should see that we have one block
# and it should be in a terminal state. The below try_assert waits for
# that to become true.

# At that time, we should also see htex reporting no blocks registered - as
# mentioned above, that is a necessary part of the bug being tested here.

# Give 10 strategy periods for the above to happen: each step of scale up,
# and scale down due to idleness isn't guaranteed to happen in exactly one
# scaling step.

htex = parsl.dfk().executors['htex_local']

try_assert(lambda: len(htex.status_facade) == 1 and htex.status_facade['0'].terminal,
timeout_ms=10000)

assert htex.connected_blocks() == [], "No block should have connected to interchange"

# Now we can reconfigure the launcher to let subsequent blocks launch ok,
# and run a trivial task. That trivial task will scale up a new block and
# run the task successfully.

# Prior to issue #3568, the bug was that the scale in of the first
# block earlier in the test case would have incorrectly been treated as a
# failure, and then the block error handler would have treated that failure
# as a permanent htex failure, and so the task execution below would raise
# a BadStateException rather than attempt to run the task.

assert htex.provider.launcher.prepend != "", "Pre-req: prepend attribute should exist and be non-empty"
htex.provider.launcher.prepend = ""
assert task().result() == 7

0 comments on commit c5a427c

Please sign in to comment.