Skip to content

Commit

Permalink
Merge branch 'Parsl:master' into slurm-tests
Browse files Browse the repository at this point in the history
  • Loading branch information
tylern4 authored Sep 4, 2024
2 parents 2c1561f + 3a256de commit fa9a3ea
Show file tree
Hide file tree
Showing 28 changed files with 431 additions and 332 deletions.
5 changes: 4 additions & 1 deletion README.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Parsl - Parallel Scripting Library
==================================
|licence| |build-status| |docs| |NSF-1550588| |NSF-1550476| |NSF-1550562| |NSF-1550528|
|licence| |build-status| |docs| |NSF-1550588| |NSF-1550476| |NSF-1550562| |NSF-1550528| |CZI-EOSS|

Parsl extends parallelism in Python beyond a single computer.

Expand Down Expand Up @@ -64,6 +64,9 @@ then explore the `parallel computing patterns <https://parsl.readthedocs.io/en/s
.. |NSF-1550475| image:: https://img.shields.io/badge/NSF-1550475-blue.svg
:target: https://nsf.gov/awardsearch/showAward?AWD_ID=1550475
:alt: NSF award info
.. |CZI-EOSS| image:: https://chanzuckerberg.github.io/open-science/badges/CZI-EOSS.svg
:target: https://czi.co/EOSS
:alt: CZI's Essential Open Source Software for Science


Quickstart
Expand Down
1 change: 1 addition & 0 deletions docs/reference.rst
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ Launchers
parsl.launchers.SrunMPILauncher
parsl.launchers.GnuParallelLauncher
parsl.launchers.MpiExecLauncher
parsl.launchers.MpiRunLauncher
parsl.launchers.JsrunLauncher
parsl.launchers.WrappedLauncher

Expand Down
17 changes: 17 additions & 0 deletions docs/userguide/configuring.rst
Original file line number Diff line number Diff line change
Expand Up @@ -536,12 +536,27 @@ Center's **Expanse** supercomputer. The example is designed to be executed on th
.. literalinclude:: ../../parsl/configs/expanse.py


Improv (Argonne LCRC)
---------------------

.. image:: https://www.lcrc.anl.gov/sites/default/files/styles/965_wide/public/2023-12/20231214_114057.jpg?itok=A-Rz5pP9

**Improv** is a PBS Pro based supercomputer at Argonne's Laboratory Computing Resource
Center (LCRC). The following snippet is an example configuration that uses `parsl.providers.PBSProProvider`
and `parsl.launchers.MpiRunLauncher` to run on multinode jobs.

.. literalinclude:: ../../parsl/configs/improv.py


.. _configuring_nersc_cori:

Perlmutter (NERSC)
------------------

NERSC provides documentation on `how to use Parsl on Perlmutter <https://docs.nersc.gov/jobs/workflow/parsl/>`_.
Perlmutter is a Slurm based HPC system and parsl uses `parsl.providers.SlurmProvider` with `parsl.launchers.SrunLauncher`
to launch tasks onto this machine.


Frontera (TACC)
---------------
Expand Down Expand Up @@ -599,6 +614,8 @@ Polaris (ALCF)
:width: 75%

ALCF provides documentation on `how to use Parsl on Polaris <https://docs.alcf.anl.gov/polaris/workflows/parsl/>`_.
Polaris uses `parsl.providers.PBSProProvider` and `parsl.launchers.MpiExecLauncher` to launch tasks onto the HPC system.



Stampede2 (TACC)
Expand Down
34 changes: 34 additions & 0 deletions parsl/configs/improv.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from parsl.config import Config
from parsl.executors import HighThroughputExecutor
from parsl.launchers import MpiRunLauncher
from parsl.providers import PBSProProvider

config = Config(
executors=[
HighThroughputExecutor(
label="Improv_multinode",
max_workers_per_node=32,
provider=PBSProProvider(
account="YOUR_ALLOCATION_ON_IMPROV",
# PBS directives (header lines), for example:
# scheduler_options='#PBS -l mem=4gb',
scheduler_options='',

queue="compute",

# Command to be run before starting a worker:
# **WARNING** Improv requires an openmpi module to be
# loaded for the MpiRunLauncher. Add additional env
# load commands to this multiline string.
worker_init='''
module load gcc/13.2.0;
module load openmpi/5.0.3-gcc-13.2.0; ''',
launcher=MpiRunLauncher(),

# number of compute nodes allocated for each block
nodes_per_block=2,
walltime='00:10:00'
),
),
],
)
63 changes: 19 additions & 44 deletions parsl/executors/high_throughput/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@

import typeguard

import parsl.launchers
from parsl import curvezmq
from parsl.addresses import get_all_addresses
from parsl.app.errors import RemoteExceptionWrapper
Expand All @@ -25,8 +24,7 @@
RandomManagerSelector,
)
from parsl.executors.high_throughput.mpi_prefix_composer import (
VALID_LAUNCHERS,
validate_resource_spec,
InvalidResourceSpecification,
)
from parsl.executors.status_handling import BlockProviderExecutor
from parsl.jobs.states import TERMINAL_STATES, JobState, JobStatus
Expand Down Expand Up @@ -201,9 +199,6 @@ class HighThroughputExecutor(BlockProviderExecutor, RepresentationMixin, UsageIn
will check the available memory at startup and limit the number of workers such that
the there's sufficient memory for each worker. Default: None
max_workers : int
Deprecated. Please use max_workers_per_node instead.
max_workers_per_node : int
Caps the number of workers launched per node. Default: None
Expand All @@ -224,17 +219,6 @@ class HighThroughputExecutor(BlockProviderExecutor, RepresentationMixin, UsageIn
Parsl will create names as integers starting with 0.
default: empty list
enable_mpi_mode: bool
If enabled, MPI launch prefixes will be composed for the batch scheduler based on
the nodes available in each batch job and the resource_specification dict passed
from the app. This is an experimental feature, please refer to the following doc section
before use: https://parsl.readthedocs.io/en/stable/userguide/mpi_apps.html
mpi_launcher: str
This field is only used if enable_mpi_mode is set. Select one from the
list of supported MPI launchers = ("srun", "aprun", "mpiexec").
default: "mpiexec"
"""

@typeguard.typechecked
Expand All @@ -252,7 +236,6 @@ def __init__(self,
worker_debug: bool = False,
cores_per_worker: float = 1.0,
mem_per_worker: Optional[float] = None,
max_workers: Optional[Union[int, float]] = None,
max_workers_per_node: Optional[Union[int, float]] = None,
cpu_affinity: str = 'none',
available_accelerators: Union[int, Sequence[str]] = (),
Expand All @@ -263,8 +246,6 @@ def __init__(self,
poll_period: int = 10,
address_probe_timeout: Optional[int] = None,
worker_logdir_root: Optional[str] = None,
enable_mpi_mode: bool = False,
mpi_launcher: str = "mpiexec",
manager_selector: ManagerSelector = RandomManagerSelector(),
block_error_handler: Union[bool, Callable[[BlockProviderExecutor, Dict[str, JobStatus]], None]] = True,
encrypted: bool = False):
Expand All @@ -287,9 +268,7 @@ def __init__(self,
else:
self.all_addresses = ','.join(get_all_addresses())

if max_workers:
self._warn_deprecated("max_workers", "max_workers_per_node")
self.max_workers_per_node = max_workers_per_node or max_workers or float("inf")
self.max_workers_per_node = max_workers_per_node or float("inf")

mem_slots = self.max_workers_per_node
cpu_slots = self.max_workers_per_node
Expand Down Expand Up @@ -330,15 +309,6 @@ def __init__(self,
self.encrypted = encrypted
self.cert_dir = None

self.enable_mpi_mode = enable_mpi_mode
assert mpi_launcher in VALID_LAUNCHERS, \
f"mpi_launcher must be set to one of {VALID_LAUNCHERS}"
if self.enable_mpi_mode:
assert isinstance(self.provider.launcher, parsl.launchers.SimpleLauncher), \
"mpi_mode requires the provider to be configured to use a SimpleLauncher"

self.mpi_launcher = mpi_launcher

if not launch_cmd:
launch_cmd = DEFAULT_LAUNCH_CMD
self.launch_cmd = launch_cmd
Expand All @@ -348,6 +318,8 @@ def __init__(self,
self.interchange_launch_cmd = interchange_launch_cmd

radio_mode = "htex"
enable_mpi_mode: bool = False
mpi_launcher: str = "mpiexec"

def _warn_deprecated(self, old: str, new: str):
warnings.warn(
Expand All @@ -357,16 +329,6 @@ def _warn_deprecated(self, old: str, new: str):
stacklevel=2
)

@property
def max_workers(self):
self._warn_deprecated("max_workers", "max_workers_per_node")
return self.max_workers_per_node

@max_workers.setter
def max_workers(self, val: Union[int, float]):
self._warn_deprecated("max_workers", "max_workers_per_node")
self.max_workers_per_node = val

@property
def logdir(self):
return "{}/{}".format(self.run_dir, self.label)
Expand All @@ -377,6 +339,18 @@ def worker_logdir(self):
return "{}/{}".format(self.worker_logdir_root, self.label)
return self.logdir

def validate_resource_spec(self, resource_specification: dict):
"""HTEX does not support *any* resource_specification options and
will raise InvalidResourceSpecification is any are passed to it"""
if resource_specification:
raise InvalidResourceSpecification(
set(resource_specification.keys()),
("HTEX does not support the supplied resource_specifications."
"For MPI applications consider using the MPIExecutor. "
"For specifications for core count/memory/walltime, consider using WorkQueueExecutor. ")
)
return

def initialize_scaling(self):
"""Compose the launch command and scale out the initial blocks.
"""
Expand Down Expand Up @@ -660,7 +634,7 @@ def submit(self, func, resource_specification, *args, **kwargs):
Future
"""

validate_resource_spec(resource_specification, self.enable_mpi_mode)
self.validate_resource_spec(resource_specification)

if self.bad_state_is_set:
raise self.executor_exception
Expand Down Expand Up @@ -800,7 +774,8 @@ def status(self) -> Dict[str, JobStatus]:
connected_blocks = self.connected_blocks()
for job_id in job_status:
job_info = job_status[job_id]
if job_info.terminal and job_id not in connected_blocks:
if job_info.terminal and job_id not in connected_blocks and job_info.state != JobState.SCALED_IN:
logger.debug("Rewriting job %s from status %s to MISSING", job_id, job_info)
job_status[job_id].state = JobState.MISSING
if job_status[job_id].message is None:
job_status[job_id].message = (
Expand Down
Loading

0 comments on commit fa9a3ea

Please sign in to comment.