diff --git a/docs/userguide/mpi_apps.rst b/docs/userguide/mpi_apps.rst index 5d487d10dd..588ddf1e94 100644 --- a/docs/userguide/mpi_apps.rst +++ b/docs/userguide/mpi_apps.rst @@ -3,33 +3,67 @@ MPI Apps .. note:: - Parsl support for MPI Apps is being re-engineered. - We describe the best practices with today's Parsl. - Join our Slack if you want to steer Parsl's future. + Parsl's support for MPI Apps described below is pending release. + Please use the ``mpi_experimental_3`` branch to use the functionality + described in this document. To install directly from github: + + >> pip install git+https://github.com/Parsl/parsl.git@mpi_experimental_3 MPI applications run multiple copies of a program that complete a single task by coordinating using messages passed within or across nodes. Starting MPI application requires invoking a "launcher" code (e.g., ``mpiexec``) from one node with options that define how the copies of a program should be distributed to others. -The need to call a launcher from a specific node requires a special configuration of Parsl `Apps `_ -and `Execution `_ environments to run Apps which use MPI. +Parsl simplifies this by composing the "launcher" command from the resources specified at the time +each app is invoked. + +The broad strokes of a complete solution involves the following components: + +1. Configuring the :class:`~parsl.executors.high_throughput.executor.HighThroughputExecutor` with: + ``enable_mpi_mode=True`` +2. Specify an MPI Launcher from one of the supported launchers ("aprun", "srun", "mpiexec") for the + :class:`~parsl.executors.high_throughput.executor.HighThroughputExecutor` with: ``mpi_launcher="srun"`` +3. Specify the provider that matches your cluster, (eg. user ``SlurmProvider`` for Slurm clusters) +4. Set the non-mpi launcher to :class:`~parsl.launchers.SingleNodeLauncher` +5. Specify resources required by the application via ``resource_specification`` as shown below: + + +.. code-block:: python + + # Define HighThroughputExecutor(enable_mpi_mode=True, mpi_launcher="mpiexec", ...) -HTEx and MPI Tasks + @bash_app + def lammps_mpi_application(infile: File, parsl_resource_specification: Dict): + # PARSL_MPI_PREFIX will resolve to `mpiexec -n 4 -ppn 2 -hosts NODE001,NODE002` + return f"$PARSL_MPI_PREFIX lmp_mpi -in {infile.filepath}" + + # Resources in terms of nodes and how ranks are to be distributed are set on a per app + # basis via the resource_spec dictionary. + resource_spec = { + "num_nodes" = 2, + "ranks_per_node" = 2, + "num_ranks" = 4, + } + future = lammps_mpi_application(File('in.file'), resource_specification=resource_spec) + + +HTEX and MPI Tasks ------------------ -The :class:`~parsl.executors.high_throughput.executor.HighThroughputExecutor` (HTEx) is the default execution provider -available through Parsl. -Parsl Apps which invoke MPI code require a dedicated HTEx configured such that every Parsl app -will have access to any of the nodes available within a block, -and Apps that will invoke the MPI launcher with the correct settings. +The :class:`~parsl.executors.high_throughput.executor.HighThroughputExecutor` (HTEX) is the +default executor available through Parsl. +Parsl Apps which invoke MPI code require MPI specific configuration such that: + +1. All workers are started on the lead-node (mom-node in case of Crays) +2. Resource requirements of Apps are propagated to workers who provision the required number of nodes from within the batch job. + Configuring the Provider ++++++++++++++++++++++++ -Parsl must be configured to deploy workers on exactly one node per block. -This part is simple. -Instead of defining `a launcher `_ which will -place an executor on each node in the block, simply use the :class:`~parsl.launchers.SimpleLauncher`. +Parsl must be configured to deploy workers on exactly one node per block. This part is +simple. Instead of defining a launcher which will place an executor on each node in the +block, simply use the :class:`~parsl.launchers.SingleNodeLauncher`. +The MPI Launcher that the application will use is to be specified via ``HighThroughputExecutor(mpi_launcher="LAUNCHER")`` It is also necessary to specify the desired number of blocks for the executor. Parsl cannot determine the number of blocks needed to run a set of MPI Tasks, @@ -40,116 +74,277 @@ to the desired number of blocks. Configuring the Executor ++++++++++++++++++++++++ -Configure the executor to launch a number of workers equal to the number of MPI tasks per block. -First set the ``max_workers`` to the number of MPI Apps per block. -then set ``cores_per_worker=1e-6`` to prevent HTEx from reducing the number of workers -if you request more workers than cores. - -If you plan to only launch one App per block, you are done! +Here are the steps for configuring the executor: -If not, the executor may need to first partition nodes into distinct groups for each MPI task. -Partitioning is necessary if your MPI launcher does not automatically ensure MPI task gets exclusive nodes. -Resources vary in how the list of available nodes is provided, -but they typically are found as a "hostfile" referenced in an environment variable (e.g., ``PBS_NODEFILE``). -We recommend splitting this hostfile for the host block into hostfiles for each worker -by adding code like the following to your ``worker_init``: +1. Set ``HighThroughputExecutor(enable_mpi_mode=True)`` +2. Set ``HighThroughputExecutor(mpi_launcher="LAUNCHER")`` to one from ("srun", "aprun", "mpiexec") +3. Set the ``max_workers`` to the number of MPI Apps you expect to run per scheduler job (block). +4. Set ``cores_per_worker=1e-6`` to prevent HTEx from reducing the number of workers if you request more workers than cores. -.. code-block:: bash - - NODES_PER_TASK=2 - mkdir -p /tmp/hostfiles/ - split --lines=$NODES_PER_TASK -d --suffix-length=2 $PBS_NODEFILE /tmp/hostfiles/hostfile. Example Configuration ~~~~~~~~~~~~~~~~~~~~~ -An example for an executor which runs MPI tasks on ALCF's Polaris supercomputer (HPE Apollo, PBSPro resource manager) -is below. +Here's an example configuration which runs MPI tasks on ALCF's Polaris Supercomputer .. code-block:: python - nodes_per_task = 2 - tasks_per_block = 16 + import parsl + from typing import Dict + from parsl.config import Config + + # PBSPro is the right provider for Polaris: + from parsl.providers import PBSProProvider + # The high throughput executor is for scaling to HPC systems: + from parsl.executors import HighThroughputExecutor + # address_by_interface is needed for the HighThroughputExecutor: + from parsl.addresses import address_by_interface + # For checkpointing: + from parsl.utils import get_all_checkpoints + + # Adjust your user-specific options here: + # run_dir="/lus/grand/projects/yourproject/yourrundir/" + + user_opts = { + "worker_init": "module load conda; conda activate parsl_mpi_py310", + "scheduler_options":"#PBS -l filesystems=home:eagle:grand\n#PBS -l place=scatter" , + "account": SET_YOUR_ALCF_ALLOCATION_HERE, + "queue": "debug-scaling", + "walltime": "1:00:00", + "nodes_per_block": 8, + "available_accelerators": 4, # Each Polaris node has 4 GPUs, setting this ensures one worker per GPU + "cores_per_worker": 8, # this will set the number of cpu hardware threads per worker. + } + config = Config( - executors=[ - HighThroughputExecutor( - label='mpiapps', - address=address_by_hostname(), - max_workers=tasks_per_block, - cores_per_worker=1e-6, # Prevents - provider=PBSProProvider( - account="ACCT", - worker_init=f""" - # Prepare the computational environment - module swap PrgEnv-nvhpc PrgEnv-gnu - module load conda - module list - conda activate /lus/grand/projects/path/to/env - cd $PBS_O_WORKDIR - - # Print the environment details for debugging - hostname - pwd - which python - - # Prepare the host files - mkdir -p /tmp/hostfiles/ - split --lines={nodes_per_task} -d --suffix-length=2 $PBS_NODEFILE /tmp/hostfiles/hostfile.""", - walltime="6:00:00", - launcher=SimpleLauncher(), # Launches only a single executor per block - select_options="ngpus=4", - nodes_per_block=nodes_per_task * tasks_per_block, - min_blocks=0, - max_blocks=1, - cpus_per_node=64, + executors=[ + HighThroughputExecutor( + label="htex", + enable_mpi_mode=True, + mpi_launcher="mpiexec", + cores_per_worker=user_opts["cores_per_worker"], + address=address_by_interface("bond0"), + provider=PBSProProvider( + account=user_opts["account"], + queue=user_opts["queue"], + # PBS directives (header lines): for array jobs pass '-J' option + scheduler_options=user_opts["scheduler_options"], + # Command to be run before starting a worker, such as: + worker_init=user_opts["worker_init"], + # number of compute nodes allocated for each block + nodes_per_block=user_opts["nodes_per_block"], + init_blocks=1, + min_blocks=0, + max_blocks=1, # Can increase more to have more parallel jobs + walltime=user_opts["walltime"] + ), ), - ), - ] - ) + ], Writing MPI-Compatible Apps ++++++++++++++++++++++++++++ -The `App `_ can be either a Python or Bash App which invokes the MPI application. - -In the easiest case (i.e., single MPI task per block), write the MPI launcher options in the string returned by -the bash app or as part of a subprocess call from a Python app. +In MPI mode, the :class:`~parsl.executors.high_throughput.executor.HighThroughputExecutor` can execute both Python or Bash Apps which invokes the MPI application. +However, it is important to not that Python Apps that directly use ``mpi4py`` is not supported. +For multi-node MPI applications, especially when running multiple applications within a single batch job, +it is important to specify the resource requirements for the app so that the Parsl worker can provision +the appropriate resources before the application starts. For eg, your Parsl script might contain a molecular +dynamics application that requires 8 ranks over 1 node for certain inputs and 32 ranks over 4 nodes for some +depending on the size of the molecules being simulated. By specifying resources via ``resource_specification``, +parsl workers will provision the requested resources and then compose MPI launch command prefixes +(Eg: ``mpiexec -n -ppn -hosts ``). These launch command prefixes are +shared with the app via environment variables. .. code-block:: python @bash_app - def echo_hello(n: int, stderr='std.err', stdout='std.out'): - return f'mpiexec -n {n} --ppn 1 hostname' + def echo_hello(n: int, stderr='std.err', stdout='std.out', parsl_resource_specification: Dict): + return f'$PARSL_MPI_PREFIX hostname' + + # The following app will echo the hostname from several MPI ranks + # Alternatively, you could also use the resource_specification to compose a launch + # command using env vars set by Parsl from the resource_specification like this: + @bash_app + def echo_hostname(n: int, stderr='std.err', stdout='std.out', parsl_resource_specification: Dict): + total_ranks = os.environ("") + return f'aprun -N $PARSL_RANKS_PER_NODE -n {total_ranks} /bin/hostname' + -Complications arise when running more than one MPI task per block, -and the MPI launcher does not automatically spread jobs across nodes. -In this case, use the ``PARSL_WORKER_RANK`` environment variable -set by HTEx to select the correct hostfile: +All valid key-value pairs set in the resource_specification are exported to the application via env vars, +for eg. ``parsl_resource_specification = {'RANKS_PER_NODE': 4} `` will set the env var ``PARSL_RANKS_PER_NODE`` +However, the following options are **required** for MPI applications : .. code-block:: python - @bash_app - def echo_hello(n: int, stderr='std.err', stdout='std.out'): - return (f'mpiexec -n {n} --ppn 1 ' - '--hostfile /tmp/hostfiles/local_hostfile.`printf %02d $PARSL_WORKER_RANK` ' - 'hostname') + resource_specification = { + 'num_nodes': , # Number of nodes required for the application instance + 'ranks_per_node': , # Number of ranks / application elements to be launched per node + 'num_ranks': , # Number of ranks in total + } -.. note:: + # The above are made available in the worker env vars: + # echo $PARSL_NUM_NODES, $PARSL_RANKS_PER_NODE, $PARSL_NUM_RANKS + +When the above are supplied, the following launch command prefixes are set: + +.. code-block:: + + PARSL_MPIEXEC_PREFIX: mpiexec launch command which works for a large number of batch systems especially PBS systems + PARSL_SRUN_PREFIX: srun launch command for Slurm based clusters + PARSL_APRUN_PREFIX: aprun launch command prefix for some Cray machines + PARSL_MPI_PREFIX: Parsl sets the MPI prefix to match the mpi_launcher specified to `HighThroughputExecutor` + PARSL_MPI_NODELIST: List of assigned nodes separated by commas (Eg, NODE1,NODE2) + PARSL_WORKER_POOL_ID: Alphanumeric string identifier for the worker pool + PARSL_WORKER_BLOCK_ID: Batch job ID that the worker belongs to + + +Example Application: CosmicTagger ++++++++++++++++++++++++++++++++++ + +TODO: Blurb about what CosmicTagger does +CosmicTagger implements models and training utilities to train convolutional networks to +separate cosmic pixels, background pixels, and neutrino pixels in a neutrinos dataset. +There are several variations. A detailed description of the code can be found in: + +`Cosmic Background Removal with Deep Neural Networks in SBND `_ + +Cosmic Background Removal with Deep Neural Networks in SBND +This network is implemented in both PyTorch and TensorFlow. To select between the networks, you can use the --framework parameter. It accepts either tensorflow or torch. The model is available in a development version with sparse convolutions in the torch framework. + +This example is broken down into three components. First, configure the Executor for Polaris at +ALCF. The configuration will use the :class:`~parsl.providers.PBSProProvider` to connect to the batch scheduler. +With the goal of running MPI applications, we set the + +.. code-block:: python + + import parsl + from typing import Dict + from parsl.config import Config + + # PBSPro is the right provider for Polaris: + from parsl.providers import PBSProProvider + # The high throughput executor is for scaling to HPC systems: + from parsl.executors import HighThroughputExecutor + # address_by_interface is needed for the HighThroughputExecutor: + from parsl.addresses import address_by_interface + + user_opts = { + # Make sure to setup a conda environment before using this config + "worker_init": "module load conda; conda activate parsl_mpi_py310", + "scheduler_options":"#PBS -l filesystems=home:eagle:grand\n#PBS -l place=scatter" , + "account": , + "queue": "debug-scaling", + "walltime": "1:00:00", + "nodes_per_block": 8, + "available_accelerators": 4, # Each Polaris node has 4 GPUs, setting this ensures one worker per GPU + "cores_per_worker": 8, # this will set the number of cpu hardware threads per worker. + } + + config = Config( + executors=[ + HighThroughputExecutor( + label="htex", + enable_mpi_mode=True, + mpi_launcher="mpiexec", + cores_per_worker=user_opts["cores_per_worker"], + address=address_by_interface("bond0"), + provider=PBSProProvider( + account=user_opts["account"], + queue=user_opts["queue"], + # PBS directives (header lines): for array jobs pass '-J' option + scheduler_options=user_opts["scheduler_options"], + # Command to be run before starting a worker, such as: + worker_init=user_opts["worker_init"], + # number of compute nodes allocated for each block + nodes_per_block=user_opts["nodes_per_block"], + init_blocks=1, + min_blocks=0, + max_blocks=1, # Can increase more to have more parallel jobs + walltime=user_opts["walltime"] + ), + ), + ], + ) + + + +Next we define the CosmicTagger MPI application. TODO: Ask Khalid for help. + +.. code-block:: python + + @parsl.bash_app + def cosmic_tagger(workdir: str, + datatype: str = "float32", + batchsize: int = 8, + framework: str = "torch", + iterations: int = 500, + trial: int = 2, + stdout=parsl.AUTO_LOGNAME, + stderr=parsl.AUTO_LOGNAME, + parsl_resource_specification:Dict={}): + NRANKS = parsl_resource_specification['num_ranks'] + + return f""" + module purge + module use /soft/modulefiles/ + module load conda/2023-10-04 + conda activate + + echo "PARSL_MPI_PREFIX : $PARSL_MPI_PREFIX" + + $PARSL_MPI_PREFIX --cpu-bind numa \ + python {workdir}/bin/exec.py --config-name a21 \ + run.id=run_plrs_ParslDemo_g${NRANKS}_{datatype}_b{batchsize}_{framework}_i{iterations}_T{trial} \ + run.compute_mode=GPU \ + run.distributed=True \ + framework={framework} \ + run.minibatch_size={batchsize} \ + run.precision={datatype} \ + mode.optimizer.loss_balance_scheme=light \ + run.iterations={iterations} + """ + +In this example, we run a simple test that does an exploration over the ``batchsize`` parameter +while launching the application over 2-4 nodes. + +.. code-block:: python + + def run_cosmic_tagger(): + futures = {} + for num_nodes in [2, 4]: + for batchsize in [2, 4, 8]: + + parsl_res_spec = {"num_nodes": num_nodes, + "num_tasks": num_nodes * 4, + "ranks_per_node": 4} + future = cosmic_tagger(workdir="/home/yadunand/CosmicTagger", + datatype="float32", + batchsize=str(batchsize), + parsl_resource_specification=parsl_res_spec) + + + print(f"Stdout : {future.stdout}") + print(f"Stderr : {future.stderr}") + futures[(num_nodes, batchsize)] = future + + + for key in futures: + print(f"Got result for {key}: {futures[key].result()}") + + + run_cosmic_tagger() - Use these Apps for testing! Submit many task using one of these Apps then ensure - the number of unique nodes in the "std.out" files - is the same as the number per block. Limitations +++++++++++ -Support for MPI tasks in HTEx is limited: +Support for MPI tasks in HTEX is limited. It is designed for running many multi-node MPI applications within a single +batch job. -#. All tasks must use the same number of nodes, which is fixed when creating the executor. #. MPI tasks may not span across nodes from more than one block. #. Parsl does not correctly determine the number of execution slots per block (`Issue #1647 `_) -#. The executor uses a Python process per task, which can use a lot of memory (`Issue #2264 `_) +#. The executor uses a Python process per task, which can use a lot of memory (`Issue #2264 `_) \ No newline at end of file diff --git a/parsl/executors/flux/executor.py b/parsl/executors/flux/executor.py index 2c4aada20f..dfea11acb3 100644 --- a/parsl/executors/flux/executor.py +++ b/parsl/executors/flux/executor.py @@ -24,7 +24,7 @@ from parsl.executors.errors import ScalingFailed from parsl.providers import LocalProvider from parsl.providers.base import ExecutionProvider -from parsl.serialize import pack_apply_message, deserialize +from parsl.serialize import deserialize, pack_res_spec_apply_message from parsl.serialize.errors import SerializationError from parsl.app.errors import AppException @@ -284,8 +284,10 @@ def submit( infile = os.path.join(self.working_dir, f"{task_id}_in{os.extsep}pkl") outfile = os.path.join(self.working_dir, f"{task_id}_out{os.extsep}pkl") try: - fn_buf = pack_apply_message( - func, args, kwargs, buffer_threshold=1024 * 1024 + fn_buf = pack_res_spec_apply_message( + func, args, kwargs, + resource_specification={}, + buffer_threshold=1024 * 1024 ) except TypeError: raise SerializationError(func.__name__) diff --git a/parsl/executors/high_throughput/executor.py b/parsl/executors/high_throughput/executor.py index 5bd5c5121b..d76a1896f2 100644 --- a/parsl/executors/high_throughput/executor.py +++ b/parsl/executors/high_throughput/executor.py @@ -11,7 +11,8 @@ from typing import List, Optional, Tuple, Union, Callable import math -from parsl.serialize import pack_apply_message, deserialize +import parsl.launchers +from parsl.serialize import pack_res_spec_apply_message, deserialize from parsl.serialize.errors import SerializationError, DeserializationError from parsl.app.errors import RemoteExceptionWrapper from parsl.jobs.states import JobStatus, JobState @@ -19,7 +20,10 @@ from parsl.executors.high_throughput import interchange from parsl.executors.errors import ( BadMessage, ScalingFailed, - UnsupportedFeatureError +) +from parsl.executors.high_throughput.mpi_prefix_composer import ( + VALID_LAUNCHERS, + validate_resource_spec ) from parsl import curvezmq @@ -50,6 +54,8 @@ "{address_probe_timeout_string} " "--hb_threshold={heartbeat_threshold} " "--cpu-affinity {cpu_affinity} " + "{enable_mpi_mode} " + "--mpi-launcher={mpi_launcher} " "--available-accelerators {accelerators}") @@ -193,6 +199,17 @@ class HighThroughputExecutor(BlockProviderExecutor, RepresentationMixin): worker_logdir_root : string In case of a remote file system, specify the path to where logs will be kept. + enable_mpi_mode: bool + If enabled, MPI launch prefixes will be composed for the batch scheduler based on + the nodes available in each batch job and the resource_specification dict passed + from the app. This is an experimental feature, please refer to the following doc section + before use: https://parsl.readthedocs.io/en/stable/userguide/mpi_apps.html + + mpi_launcher: str + This field is only used if enable_mpi_mode is set. Select one from the + list of supported MPI launchers = ("srun", "aprun", "mpiexec"). + default: "mpiexec" + encrypted : bool Flag to enable/disable encryption (CurveZMQ). Default is False. """ @@ -220,6 +237,8 @@ def __init__(self, poll_period: int = 10, address_probe_timeout: Optional[int] = None, worker_logdir_root: Optional[str] = None, + enable_mpi_mode: bool = False, + mpi_launcher: str = "mpiexec", block_error_handler: Union[bool, Callable[[BlockProviderExecutor, Dict[str, JobStatus]], None]] = True, encrypted: bool = False): @@ -281,6 +300,15 @@ def __init__(self, self.encrypted = encrypted self.cert_dir = None + self.enable_mpi_mode = enable_mpi_mode + assert mpi_launcher in VALID_LAUNCHERS, \ + f"mpi_launcher must be set to one of {VALID_LAUNCHERS}" + if self.enable_mpi_mode: + assert isinstance(self.provider.launcher, parsl.launchers.SingleNodeLauncher), \ + "mpi_mode requires the provider to be configured to use a SingleNodeLauncher" + + self.mpi_launcher = mpi_launcher + if not launch_cmd: launch_cmd = DEFAULT_LAUNCH_CMD self.launch_cmd = launch_cmd @@ -302,6 +330,7 @@ def initialize_scaling(self): """ debug_opts = "--debug" if self.worker_debug else "" max_workers = "" if self.max_workers == float('inf') else "--max_workers={}".format(self.max_workers) + enable_mpi_opts = "--enable_mpi_mode " if self.enable_mpi_mode else "" address_probe_timeout_string = "" if self.address_probe_timeout: @@ -323,6 +352,8 @@ def initialize_scaling(self): cert_dir=self.cert_dir, logdir=self.worker_logdir, cpu_affinity=self.cpu_affinity, + enable_mpi_mode=enable_mpi_opts, + mpi_launcher=self.mpi_launcher, accelerators=" ".join(self.available_accelerators)) self.launch_cmd = l_cmd logger.debug("Launch command: {}".format(self.launch_cmd)) @@ -584,10 +615,7 @@ def submit(self, func, resource_specification, *args, **kwargs): Returns: Future """ - if resource_specification: - logger.error("Ignoring the call specification. " - "Parsl call specification is not supported in HighThroughput Executor.") - raise UnsupportedFeatureError('resource specification', 'HighThroughput Executor', None) + validate_resource_spec(resource_specification) if self.bad_state_is_set: raise self.executor_exception @@ -605,8 +633,9 @@ def submit(self, func, resource_specification, *args, **kwargs): self.tasks[task_id] = fut try: - fn_buf = pack_apply_message(func, args, kwargs, - buffer_threshold=1024 * 1024) + fn_buf = pack_res_spec_apply_message(func, args, kwargs, + resource_specification=resource_specification, + buffer_threshold=1024 * 1024) except TypeError: raise SerializationError(func.__name__) diff --git a/parsl/executors/high_throughput/mpi_prefix_composer.py b/parsl/executors/high_throughput/mpi_prefix_composer.py new file mode 100644 index 0000000000..8b38be8549 --- /dev/null +++ b/parsl/executors/high_throughput/mpi_prefix_composer.py @@ -0,0 +1,137 @@ +import logging +from typing import Dict, List, Tuple, Set + +logger = logging.getLogger(__name__) + +VALID_LAUNCHERS = ('srun', + 'aprun', + 'mpiexec') + + +class InvalidResourceSpecification(Exception): + """Exception raised when Invalid keys are supplied via resource specification""" + + def __init__(self, invalid_keys: Set[str]): + self.invalid_keys = invalid_keys + + def __str__(self): + return f"Invalid resource specification options supplied: {self.invalid_keys}" + + +def validate_resource_spec(resource_spec: Dict[str, str]): + """Basic validation of keys in the resource_spec + + Raises: InvalidResourceSpecification if the resource_spec + is invalid (e.g, contains invalid keys) + """ + user_keys = set(resource_spec.keys()) + legal_keys = set(("ranks_per_node", + "num_nodes", + "num_ranks", + "launcher_options", + )) + invalid_keys = user_keys - legal_keys + if invalid_keys: + raise InvalidResourceSpecification(invalid_keys) + if "num_nodes" in resource_spec: + if not resource_spec.get("num_ranks") and resource_spec.get("ranks_per_node"): + resource_spec["num_ranks"] = str(int(resource_spec["num_nodes"]) * int(resource_spec["ranks_per_node"])) + if not resource_spec.get("ranks_per_node") and resource_spec.get("num_ranks"): + resource_spec["ranks_per_node"] = str(int(resource_spec["num_ranks"]) / int(resource_spec["num_nodes"])) + return + + +def compose_mpiexec_launch_cmd( + resource_spec: Dict, node_hostnames: List[str] +) -> Tuple[str, str]: + """Compose mpiexec launch command prefix""" + + node_str = ",".join(node_hostnames) + args = [ + "mpiexec", + "-n", + resource_spec.get("num_ranks"), + "-ppn", + resource_spec.get("ranks_per_node"), + "-hosts", + node_str, + resource_spec.get("launcher_options", ""), + ] + prefix = " ".join(str(arg) for arg in args) + return "PARSL_MPIEXEC_PREFIX", prefix + + +def compose_srun_launch_cmd( + resource_spec: Dict, node_hostnames: List[str] +) -> Tuple[str, str]: + """Compose srun launch command prefix""" + + num_nodes = str(len(node_hostnames)) + args = [ + "srun", + "--ntasks", + resource_spec.get("num_ranks"), + "--ntasks-per-node", + resource_spec.get("ranks_per_node"), + "--nodelist", + ",".join(node_hostnames), + "--nodes", + num_nodes, + resource_spec.get("launcher_options", ""), + ] + + prefix = " ".join(str(arg) for arg in args) + return "PARSL_SRUN_PREFIX", prefix + + +def compose_aprun_launch_cmd( + resource_spec: Dict, node_hostnames: List[str] +) -> Tuple[str, str]: + """Compose aprun launch command prefix""" + + node_str = ",".join(node_hostnames) + args = [ + "aprun", + "-n", + resource_spec.get("num_ranks"), + "-N", + resource_spec.get("ranks_per_node"), + "-node-list", + node_str, + resource_spec.get("launcher_options", ""), + ] + prefix = " ".join(str(arg) for arg in args) + return "PARSL_APRUN_PREFIX", prefix + + +def compose_all( + mpi_launcher: str, resource_spec: Dict, node_hostnames: List[str] +) -> Dict[str, str]: + """Compose all launch command prefixes and set the default""" + + all_prefixes = {} + composers = [ + compose_aprun_launch_cmd, + compose_srun_launch_cmd, + compose_mpiexec_launch_cmd, + ] + for composer in composers: + try: + key, prefix = composer(resource_spec, node_hostnames) + all_prefixes[key] = prefix + except Exception: + logging.exception( + f"Failed to compose launch prefix with {composer} from {resource_spec}" + ) + pass + + if mpi_launcher == "srun": + all_prefixes["PARSL_MPI_PREFIX"] = all_prefixes["PARSL_SRUN_PREFIX"] + elif mpi_launcher == "aprun": + all_prefixes["PARSL_MPI_PREFIX"] = all_prefixes["PARSL_APRUN_PREFIX"] + elif mpi_launcher == "mpiexec": + all_prefixes["PARSL_MPI_PREFIX"] = all_prefixes["PARSL_MPIEXEC_PREFIX"] + else: + raise RuntimeError(f"Unknown mpi_launcher:{mpi_launcher}") + + return all_prefixes diff --git a/parsl/executors/high_throughput/mpi_resource_management.py b/parsl/executors/high_throughput/mpi_resource_management.py new file mode 100644 index 0000000000..4434749827 --- /dev/null +++ b/parsl/executors/high_throughput/mpi_resource_management.py @@ -0,0 +1,217 @@ +import logging +import multiprocessing +import os +import pickle +import queue +import subprocess +from enum import Enum +from typing import Dict, List + +from parsl.multiprocessing import SpawnContext +from parsl.serialize import (pack_res_spec_apply_message, + unpack_res_spec_apply_message) + +logger = logging.getLogger(__name__) + + +class Scheduler(Enum): + Unknown = 0 + Slurm = 1 + PBS = 2 + Cobalt = 3 + + +def get_slurm_hosts_list() -> List[str]: + """Get list of slurm hosts from scontrol""" + cmd = "scontrol show hostname $SLURM_NODELIST" + b_output = subprocess.check_output( + cmd, stderr=subprocess.STDOUT, shell=True + ) # bytes + output = b_output.decode().strip().split() + return output + + +def get_pbs_hosts_list() -> List[str]: + """Get list of PBS hosts from envvar: PBS_NODEFILE""" + nodefile_name = os.environ["PBS_NODEFILE"] + with open(nodefile_name) as f: + return [line.strip() for line in f.readlines()] + + +def get_cobalt_hosts_list() -> List[str]: + """Get list of COBALT hosts from envvar: COBALT_NODEFILE""" + nodefile_name = os.environ["COBALT_NODEFILE"] + with open(nodefile_name) as f: + return [line.strip() for line in f.readlines()] + + +def get_nodes_in_batchjob(scheduler: Scheduler) -> List[str]: + """Get nodelist from all supported schedulers""" + nodelist = [] + if scheduler == Scheduler.Slurm: + nodelist = get_slurm_hosts_list() + elif scheduler == Scheduler.PBS: + nodelist = get_pbs_hosts_list() + elif scheduler == Scheduler.Cobalt: + nodelist = get_cobalt_hosts_list() + else: + raise RuntimeError(f"mpi_mode does not support scheduler:{scheduler}") + return nodelist + + +def identify_scheduler() -> Scheduler: + """Use envvars to determine batch scheduler""" + if os.environ.get("SLURM_NODELIST"): + return Scheduler.Slurm + elif os.environ.get("PBS_NODEFILE"): + return Scheduler.PBS + elif os.environ.get("COBALT_NODEFILE"): + return Scheduler.Cobalt + else: + return Scheduler.Unknown + + +class MPINodesUnavailable(Exception): + """Raised if there are no free nodes available for an MPI request""" + + def __init__(self, requested: int, available: int): + self.requested = requested + self.available = available + + def __str__(self): + return f"MPINodesUnavailable(requested={self.requested} available={self.available})" + + +class TaskScheduler: + """Default TaskScheduler that does no taskscheduling + + This class simply acts as an abstraction over the task_q and result_q + that can be extended to implement more complex task scheduling logic + """ + def __init__( + self, + pending_task_q: multiprocessing.Queue, + pending_result_q: multiprocessing.Queue, + ): + self.pending_task_q = pending_task_q + self.pending_result_q = pending_result_q + + def put_task(self, task) -> None: + return self.pending_task_q.put(task) + + def get_result(self, block: bool, timeout: float): + return self.pending_result_q.get(block, timeout=timeout) + + +class MPITaskScheduler(TaskScheduler): + """Extends TaskScheduler to schedule MPI functions over provisioned nodes + The MPITaskScheduler runs on a Manager on the lead node of a batch job, as + such it is expected to control task placement over this single batch job. + + The MPITaskScheduler adds the following functionality: + 1) Determine list of nodes attached to current batch job + 2) put_task for execution onto workers: + a) if resources are available attach resource list + b) if unavailable place tasks into backlog + 3) get_result will fetch a result and relinquish nodes, + and attempt to schedule tasks in backlog if any. + """ + def __init__( + self, + pending_task_q: multiprocessing.Queue, + pending_result_q: multiprocessing.Queue, + ): + super().__init__(pending_task_q, pending_result_q) + self.scheduler = identify_scheduler() + # PriorityQueue is threadsafe + self._backlog_queue: queue.PriorityQueue = queue.PriorityQueue() + self._map_tasks_to_nodes: Dict[str, List[str]] = {} + self.available_nodes = get_nodes_in_batchjob(self.scheduler) + self._free_node_counter = SpawnContext.Value("i", len(self.available_nodes)) + # mp.Value has issues with mypy + # issue https://github.com/python/typeshed/issues/8799 + # from mypy 0.981 onwards + self.nodes_q: queue.Queue = queue.Queue() + for node in self.available_nodes: + self.nodes_q.put(node) + + logger.info( + f"Starting MPITaskScheduler with {len(self.available_nodes)}" + ) + + def _get_nodes(self, num_nodes: int) -> List[str]: + """Thread safe method to acquire num_nodes from free resources + + Raises: MPINodesUnavailable if there aren't enough resources + Returns: List of nodenames:str + """ + logger.debug( + f"Requesting : {num_nodes=} we have {self._free_node_counter}" + ) + acquired_nodes = [] + with self._free_node_counter.get_lock(): + if num_nodes <= self._free_node_counter.value: # type: ignore[attr-defined] + self._free_node_counter.value -= num_nodes # type: ignore[attr-defined] + else: + raise MPINodesUnavailable( + requested=num_nodes, available=self._free_node_counter.value # type: ignore[attr-defined] + ) + + for i in range(num_nodes): + node = self.nodes_q.get() + acquired_nodes.append(node) + return acquired_nodes + + def _return_nodes(self, nodes: List[str]) -> None: + """Threadsafe method to return a list of nodes""" + for node in nodes: + self.nodes_q.put(node) + with self._free_node_counter.get_lock(): + self._free_node_counter.value += len(nodes) # type: ignore[attr-defined] + + def put_task(self, task_package: dict): + """Schedule task if resources are available otherwise backlog the task""" + user_ns = locals() + user_ns.update({"__builtins__": __builtins__}) + _f, _args, _kwargs, resource_spec = unpack_res_spec_apply_message( + task_package["buffer"], user_ns, copy=False + ) + + nodes_needed = resource_spec.get("num_nodes") + if nodes_needed: + try: + allocated_nodes = self._get_nodes(nodes_needed) + except MPINodesUnavailable: + logger.warning("Not enough resources, placing task into backlog") + self._backlog_queue.put((nodes_needed, task_package)) + return + else: + resource_spec["MPI_NODELIST"] = ",".join(allocated_nodes) + self._map_tasks_to_nodes[task_package["task_id"]] = allocated_nodes + buffer = pack_res_spec_apply_message(_f, _args, _kwargs, resource_spec) + task_package["buffer"] = buffer + + self.pending_task_q.put(task_package) + + def _schedule_backlog_tasks(self): + """Attempt to schedule backlogged tasks""" + try: + _nodes_requested, task_package = self._backlog_queue.get(block=False) + self.put_task(task_package) + except queue.Empty: + return + else: + # Keep attempting to schedule tasks till we are out of resources + self._schedule_backlog_tasks() + + def get_result(self, block: bool, timeout: float): + """Return result and relinquish provisioned nodes""" + result_pkl = self.pending_result_q.get(block, timeout=timeout) + result_dict = pickle.loads(result_pkl) + if result_dict["type"] == "result": + task_id = result_dict["task_id"] + nodes_to_reallocate = self._map_tasks_to_nodes[task_id] + self._return_nodes(nodes_to_reallocate) + self._schedule_backlog_tasks() + + return result_pkl diff --git a/parsl/executors/high_throughput/process_worker_pool.py b/parsl/executors/high_throughput/process_worker_pool.py index 6f5be16a4c..54460b561d 100755 --- a/parsl/executors/high_throughput/process_worker_pool.py +++ b/parsl/executors/high_throughput/process_worker_pool.py @@ -10,7 +10,7 @@ import time import queue import uuid -from typing import Sequence, Optional +from typing import Sequence, Optional, Dict, List import zmq import math @@ -27,7 +27,13 @@ from parsl.executors.high_throughput.errors import WorkerLost from parsl.executors.high_throughput.probe import probe_addresses from parsl.multiprocessing import SpawnContext -from parsl.serialize import unpack_apply_message, serialize +from parsl.serialize import unpack_res_spec_apply_message, serialize +from parsl.executors.high_throughput.mpi_resource_management import ( + TaskScheduler, + MPITaskScheduler +) + +from parsl.executors.high_throughput.mpi_prefix_composer import compose_all, VALID_LAUNCHERS HEARTBEAT_CODE = (2 ** 32) - 1 @@ -64,6 +70,8 @@ def __init__(self, *, heartbeat_period, poll_period, cpu_affinity, + enable_mpi_mode: bool = False, + mpi_launcher: str = "mpiexec", available_accelerators: Sequence[str], cert_dir: Optional[str]): """ @@ -120,6 +128,14 @@ def __init__(self, *, available_accelerators: list of str List of accelerators available to the workers. + enable_mpi_mode: bool + When set to true, the manager assumes ownership of the batch job and each worker + claims a subset of nodes from a shared pool to execute multi-node mpi tasks. Node + info is made available to workers via env vars. + + mpi_launcher: str + Set to one of the supported MPI launchers: ("srun", "aprun", "mpiexec") + cert_dir : str | None Path to the certificate directory. """ @@ -159,6 +175,9 @@ def __init__(self, *, self.uid = uid self.block_id = block_id + self.enable_mpi_mode = enable_mpi_mode + self.mpi_launcher = mpi_launcher + if os.environ.get('PARSL_CORES'): cores_on_node = int(os.environ['PARSL_CORES']) else: @@ -186,6 +205,17 @@ def __init__(self, *, self.monitoring_queue = self._mp_manager.Queue() self.pending_task_queue = SpawnContext.Queue() self.pending_result_queue = SpawnContext.Queue() + self.task_scheduler: TaskScheduler + if self.enable_mpi_mode: + self.task_scheduler = MPITaskScheduler( + self.pending_task_queue, + self.pending_result_queue, + ) + else: + self.task_scheduler = TaskScheduler( + self.pending_task_queue, + self.pending_result_queue + ) self.ready_worker_count = SpawnContext.Value("i", 0) self.max_queue_size = self.prefetch_capacity + self.worker_count @@ -286,9 +316,7 @@ def pull_tasks(self, kill_event): logger.debug("Got executor tasks: {}, cumulative count of tasks: {}".format([t['task_id'] for t in tasks], task_recv_counter)) for task in tasks: - self.pending_task_queue.put(task) - # logger.debug("Ready tasks: {}".format( - # [i['task_id'] for i in self.pending_task_queue])) + self.task_scheduler.put_task(task) else: logger.debug("No incoming tasks") @@ -327,7 +355,7 @@ def push_results(self, kill_event): while not kill_event.is_set(): try: logger.debug("Starting pending_result_queue get") - r = self.pending_result_queue.get(block=True, timeout=push_poll_period) + r = self.task_scheduler.get_result(block=True, timeout=push_poll_period) logger.debug("Got a result item") items.append(r) except queue.Empty: @@ -497,6 +525,7 @@ def _start_worker(self, worker_id: int): os.getpid(), args.logdir, args.debug, + self.mpi_launcher, ), name="HTEX-Worker-{}".format(worker_id), ) @@ -504,7 +533,13 @@ def _start_worker(self, worker_id: int): return p -def execute_task(bufs): +def update_resource_spec_env_vars(mpi_launcher: str, resource_spec: Dict, node_info: List[str]) -> None: + prefix_table = compose_all(mpi_launcher, resource_spec=resource_spec, node_hostnames=node_info) + for key in prefix_table: + os.environ[key] = prefix_table[key] + + +def execute_task(bufs, mpi_launcher: Optional[str] = None): """Deserialize the buffer and execute the task. Returns the result or throws exception. @@ -512,8 +547,20 @@ def execute_task(bufs): user_ns = locals() user_ns.update({'__builtins__': __builtins__}) - f, args, kwargs = unpack_apply_message(bufs, user_ns, copy=False) + f, args, kwargs, resource_spec = unpack_res_spec_apply_message(bufs, user_ns, copy=False) + + for varname in resource_spec: + envname = "PARSL_" + str(varname).upper() + os.environ[envname] = str(resource_spec[varname]) + if resource_spec.get("MPI_NODELIST"): + worker_id = os.environ['PARSL_WORKER_RANK'] + nodes_for_task = resource_spec["MPI_NODELIST"].split(',') + logger.info(f"Launching task on provisioned nodes: {nodes_for_task}") + assert mpi_launcher + update_resource_spec_env_vars(mpi_launcher, + resource_spec=resource_spec, + node_info=nodes_for_task) # We might need to look into callability of the function from itself # since we change it's name in the new namespace prefix = "parsl_" @@ -550,6 +597,7 @@ def worker( manager_pid: int, logdir: str, debug: bool, + mpi_launcher: str, ): """ @@ -668,7 +716,7 @@ def manager_is_alive(): worker_enqueued = False try: - result = execute_task(req['buffer']) + result = execute_task(req['buffer'], mpi_launcher=mpi_launcher) serialized_result = serialize(result, buffer_threshold=1000000) except Exception as e: logger.info('Caught an exception: {}'.format(e)) @@ -768,6 +816,10 @@ def strategyorlist(s: str): help="Whether/how workers should control CPU affinity.") parser.add_argument("--available-accelerators", type=str, nargs="*", help="Names of available accelerators") + parser.add_argument("--enable_mpi_mode", action='store_true', + help="Enable MPI mode") + parser.add_argument("--mpi-launcher", type=str, choices=VALID_LAUNCHERS, + help="MPI launcher to use iff enable_mpi_mode=true") args = parser.parse_args() @@ -797,6 +849,8 @@ def strategyorlist(s: str): logger.info("Heartbeat period: {}".format(args.hb_period)) logger.info("CPU affinity: {}".format(args.cpu_affinity)) logger.info("Accelerators: {}".format(" ".join(args.available_accelerators))) + logger.info("enable_mpi_mode: {}".format(args.enable_mpi_mode)) + logger.info("mpi_launcher: {}".format(args.mpi_launcher)) manager = Manager(task_port=args.task_port, result_port=args.result_port, @@ -812,6 +866,8 @@ def strategyorlist(s: str): heartbeat_period=int(args.hb_period), poll_period=int(args.poll), cpu_affinity=args.cpu_affinity, + enable_mpi_mode=args.enable_mpi_mode, + mpi_launcher=args.mpi_launcher, available_accelerators=args.available_accelerators, cert_dir=None if args.cert_dir == "None" else args.cert_dir) manager.start() diff --git a/parsl/executors/radical/executor.py b/parsl/executors/radical/executor.py index 7cbfcd45f1..2200fefe26 100644 --- a/parsl/executors/radical/executor.py +++ b/parsl/executors/radical/executor.py @@ -23,7 +23,7 @@ from parsl.app.errors import BashExitFailure from parsl.executors.base import ParslExecutor from parsl.app.errors import RemoteExceptionWrapper -from parsl.serialize import pack_apply_message, deserialize +from parsl.serialize import deserialize, pack_res_spec_apply_message from parsl.serialize.errors import SerializationError, DeserializationError try: @@ -400,8 +400,11 @@ def task_translate(self, tid, func, parsl_resource_specification, args, kwargs): def _pack_and_apply_message(self, func, args, kwargs): try: - buffer = pack_apply_message(func, args, kwargs, - buffer_threshold=1024 * 1024) + buffer = pack_res_spec_apply_message(func, + args, + kwargs, + resource_specification={}, + buffer_threshold=1024 * 1024) task_func = rp.utils.serialize_bson(buffer) except TypeError: raise SerializationError(func.__name__) diff --git a/parsl/executors/radical/rpex_worker.py b/parsl/executors/radical/rpex_worker.py index 9684147e5f..2225499e89 100644 --- a/parsl/executors/radical/rpex_worker.py +++ b/parsl/executors/radical/rpex_worker.py @@ -3,7 +3,7 @@ import parsl.app.errors as pe from parsl.app.bash import remote_side_bash_executor -from parsl.serialize import unpack_apply_message, serialize +from parsl.serialize import unpack_res_spec_apply_message, serialize from parsl.executors.high_throughput.process_worker_pool import execute_task @@ -32,7 +32,7 @@ def _dispatch_proc(self, task): try: buffer = rp.utils.deserialize_bson(task['description']['executable']) - func, args, kwargs = unpack_apply_message(buffer, {}, copy=False) + func, args, kwargs, _resource_spec = unpack_res_spec_apply_message(buffer, {}, copy=False) ret = remote_side_bash_executor(func, *args, **kwargs) exc = (None, None) val = None diff --git a/parsl/serialize/__init__.py b/parsl/serialize/__init__.py index 83ef2ca1c5..90585bda90 100644 --- a/parsl/serialize/__init__.py +++ b/parsl/serialize/__init__.py @@ -1,6 +1,11 @@ -from parsl.serialize.facade import serialize, deserialize, pack_apply_message, unpack_apply_message +from parsl.serialize.facade import (serialize, deserialize, pack_apply_message, + unpack_apply_message, unpack_res_spec_apply_message, + pack_res_spec_apply_message) __all__ = ['serialize', 'deserialize', 'pack_apply_message', - 'unpack_apply_message'] + 'unpack_apply_message', + 'unpack_res_spec_apply_message', + 'pack_res_spec_apply_message' + ] diff --git a/parsl/serialize/facade.py b/parsl/serialize/facade.py index bb7e360967..f8e76f174b 100644 --- a/parsl/serialize/facade.py +++ b/parsl/serialize/facade.py @@ -62,13 +62,44 @@ def pack_apply_message(func: Any, args: Any, kwargs: Any, buffer_threshold: int return packed_buffer +def pack_res_spec_apply_message(func: Any, args: Any, kwargs: Any, resource_specification: Any, buffer_threshold: int = int(128 * 1e6)) -> bytes: + """Serialize and pack function, parameters, and resource_specification + + Parameters + ---------- + + func: Function + A function to ship + + args: Tuple/list of objects + positional parameters as a list + + kwargs: Dict + Dict containing named parameters + + resource_specification: Dict + Dict containing application resource specification + + buffer_threshold: int + Limits buffer to specified size in bytes. Exceeding this limit would give you + a warning in the log. Default is 128MB. + """ + return pack_apply_message(func, args, (kwargs, resource_specification), buffer_threshold=buffer_threshold) + + def unpack_apply_message(packed_buffer: bytes, user_ns: Any = None, copy: Any = False) -> List[Any]: """ Unpack and deserialize function and parameters - """ return [deserialize(buf) for buf in unpack_buffers(packed_buffer)] +def unpack_res_spec_apply_message(packed_buffer: bytes, user_ns: Any = None, copy: Any = False) -> List[Any]: + """ Unpack and deserialize function, parameters, and resource_specification + """ + func, args, (kwargs, resource_spec) = unpack_apply_message(packed_buffer, user_ns=user_ns, copy=copy) + return [func, args, kwargs, resource_spec] + + def serialize(obj: Any, buffer_threshold: int = int(1e6)) -> bytes: """ Try available serialization methods one at a time diff --git a/parsl/tests/test_error_handling/test_resource_spec.py b/parsl/tests/test_error_handling/test_resource_spec.py index 4aa7091e35..336080afae 100644 --- a/parsl/tests/test_error_handling/test_resource_spec.py +++ b/parsl/tests/test_error_handling/test_resource_spec.py @@ -2,6 +2,8 @@ from parsl.app.app import python_app from parsl.executors.errors import UnsupportedFeatureError, ExecutorError from parsl.executors import WorkQueueExecutor +from parsl.executors.high_throughput.mpi_prefix_composer import InvalidResourceSpecification +from parsl.executors.high_throughput.executor import HighThroughputExecutor @python_app @@ -22,6 +24,8 @@ def test_resource(n=2): fut = double(n, parsl_resource_specification=spec) try: fut.result() + except InvalidResourceSpecification: + assert isinstance(executor, HighThroughputExecutor) except UnsupportedFeatureError: assert not isinstance(executor, WorkQueueExecutor) except Exception as e: @@ -33,6 +37,8 @@ def test_resource(n=2): fut = double(n, parsl_resource_specification=spec) try: fut.result() + except InvalidResourceSpecification: + assert isinstance(executor, HighThroughputExecutor) except UnsupportedFeatureError: assert not isinstance(executor, WorkQueueExecutor) except Exception as e: diff --git a/parsl/tests/test_mpi_apps/__init__.py b/parsl/tests/test_mpi_apps/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/parsl/tests/test_mpi_apps/mocks/pbs_nodefile b/parsl/tests/test_mpi_apps/mocks/pbs_nodefile new file mode 100644 index 0000000000..42e3ab71ef --- /dev/null +++ b/parsl/tests/test_mpi_apps/mocks/pbs_nodefile @@ -0,0 +1,4 @@ +node1 +node2 +node3 +node4 diff --git a/parsl/tests/test_mpi_apps/mocks/pbs_nodefile.8 b/parsl/tests/test_mpi_apps/mocks/pbs_nodefile.8 new file mode 100644 index 0000000000..643d60f36a --- /dev/null +++ b/parsl/tests/test_mpi_apps/mocks/pbs_nodefile.8 @@ -0,0 +1,8 @@ +NODE001 +NODE002 +NODE003 +NODE004 +NODE005 +NODE006 +NODE007 +NODE008 diff --git a/parsl/tests/test_mpi_apps/test_bad_mpi_config.py b/parsl/tests/test_mpi_apps/test_bad_mpi_config.py new file mode 100644 index 0000000000..0254f172b3 --- /dev/null +++ b/parsl/tests/test_mpi_apps/test_bad_mpi_config.py @@ -0,0 +1,41 @@ +import pytest + +from parsl import Config +from parsl.executors import HighThroughputExecutor +from parsl.launchers import SrunLauncher, SingleNodeLauncher, SimpleLauncher, AprunLauncher +from parsl.providers import SlurmProvider + + +@pytest.mark.local +def test_bad_launcher_with_mpi_mode(): + """AssertionError if a launcher other than SingleNodeLauncher is supplied""" + + for launcher in [SrunLauncher(), SimpleLauncher(), AprunLauncher()]: + with pytest.raises(AssertionError): + Config(executors=[ + HighThroughputExecutor( + enable_mpi_mode=True, + provider=SlurmProvider(launcher=launcher), + ) + ]) + + +@pytest.mark.local +def test_correct_launcher_with_mpi_mode(): + """Confirm that SingleNodeLauncer works with mpi_mode""" + + config = Config(executors=[ + HighThroughputExecutor( + enable_mpi_mode=True, + provider=SlurmProvider(launcher=SingleNodeLauncher()), + ) + ]) + assert isinstance(config.executors[0].provider.launcher, SingleNodeLauncher) + + config = Config(executors=[ + HighThroughputExecutor( + enable_mpi_mode=True, + provider=SlurmProvider(), + ) + ]) + assert isinstance(config.executors[0].provider.launcher, SingleNodeLauncher) diff --git a/parsl/tests/test_mpi_apps/test_mpi_mode_disabled.py b/parsl/tests/test_mpi_apps/test_mpi_mode_disabled.py new file mode 100644 index 0000000000..52856ffdb5 --- /dev/null +++ b/parsl/tests/test_mpi_apps/test_mpi_mode_disabled.py @@ -0,0 +1,51 @@ +import logging +from typing import Dict +import pytest +import parsl +from parsl import python_app +from parsl.tests.configs.htex_local import fresh_config + +EXECUTOR_LABEL = "MPI_TEST" + + +def local_setup(): + config = fresh_config() + config.executors[0].label = EXECUTOR_LABEL + config.executors[0].max_workers = 1 + config.executors[0].enable_mpi_mode = False + parsl.load(config) + + +def local_teardown(): + parsl.dfk().cleanup() + parsl.clear() + + +@python_app +def get_env_vars(parsl_resource_specification: Dict = {}) -> Dict: + import os + + parsl_vars = {} + for key in os.environ: + if key.startswith("PARSL_"): + parsl_vars[key] = os.environ[key] + return parsl_vars + + +@pytest.mark.local +def test_only_resource_specs_set(): + """Confirm that resource_spec env vars are set while launch prefixes are not + when enable_mpi_mode = False""" + resource_spec = { + "num_nodes": 4, + "ranks_per_node": 2, + } + + future = get_env_vars(parsl_resource_specification=resource_spec) + + result = future.result() + assert isinstance(result, Dict) + assert "PARSL_DEFAULT_PREFIX" not in result + assert "PARSL_SRUN_PREFIX" not in result + assert result["PARSL_NUM_NODES"] == str(resource_spec["num_nodes"]) + assert result["PARSL_RANKS_PER_NODE"] == str(resource_spec["ranks_per_node"]) diff --git a/parsl/tests/test_mpi_apps/test_mpi_mode_enabled.py b/parsl/tests/test_mpi_apps/test_mpi_mode_enabled.py new file mode 100644 index 0000000000..e0b2faff88 --- /dev/null +++ b/parsl/tests/test_mpi_apps/test_mpi_mode_enabled.py @@ -0,0 +1,171 @@ +import logging +import random +from typing import Dict +import pytest +import parsl +from parsl import python_app, bash_app +from parsl.tests.configs.htex_local import fresh_config + +import os + +EXECUTOR_LABEL = "MPI_TEST" + + +def local_setup(): + config = fresh_config() + config.executors[0].label = EXECUTOR_LABEL + config.executors[0].max_workers = 2 + config.executors[0].enable_mpi_mode = True + config.executors[0].mpi_launcher = "mpiexec" + + cwd = os.path.abspath(os.path.dirname(__file__)) + pbs_nodefile = os.path.join(cwd, "mocks", "pbs_nodefile") + + config.executors[0].provider.worker_init = f"export PBS_NODEFILE={pbs_nodefile}" + + parsl.load(config) + + +def local_teardown(): + parsl.dfk().cleanup() + parsl.clear() + + +@python_app +def get_env_vars(parsl_resource_specification: Dict = {}) -> Dict: + import os + + parsl_vars = {} + for key in os.environ: + if key.startswith("PARSL_"): + parsl_vars[key] = os.environ[key] + return parsl_vars + + +@pytest.mark.local +def test_only_resource_specs_set(): + """Confirm that resource_spec env vars are set while launch prefixes are not + when enable_mpi_mode = False""" + resource_spec = { + "num_nodes": 2, + "ranks_per_node": 2, + } + + future = get_env_vars(parsl_resource_specification=resource_spec) + + result = future.result() + assert isinstance(result, Dict) + logging.warning(f"Got table: {result}") + assert "PARSL_MPI_PREFIX" in result + assert "PARSL_MPIEXEC_PREFIX" in result + assert result["PARSL_MPI_PREFIX"] == result["PARSL_MPIEXEC_PREFIX"] + assert result["PARSL_NUM_NODES"] == str(resource_spec["num_nodes"]) + assert result["PARSL_RANKS_PER_NODE"] == str(resource_spec["ranks_per_node"]) + assert result["PARSL_NUM_RANKS"] == str(resource_spec["ranks_per_node"] * resource_spec["num_nodes"]) + + +@bash_app +def echo_launch_cmd( + parsl_resource_specification: Dict, + stdout=parsl.AUTO_LOGNAME, + stderr=parsl.AUTO_LOGNAME, +): + return 'echo "$PARSL_MPI_PREFIX hostname"' + + +@pytest.mark.local +def test_bash_default_prefix_set(): + """Confirm that resource_spec env vars are set while launch prefixes are not + when enable_mpi_mode = False""" + resource_spec = { + "num_nodes": 2, + "ranks_per_node": 2, + } + + future = echo_launch_cmd(parsl_resource_specification=resource_spec) + + result = future.result() + assert result == 0 + with open(future.stdout) as f: + output = f.readlines() + assert output[0].startswith("mpiexec") + logging.warning(f"output : {output}") + + +@pytest.mark.local +def test_bash_multiple_set(): + """Confirm that multiple apps can run without blocking each other out + when enable_mpi_mode = False""" + resource_spec = { + "num_nodes": 2, + "num_ranks": 4, + } + futures = [] + for i in range(4): + resource_spec["num_nodes"] = i + 1 + future = echo_launch_cmd(parsl_resource_specification=resource_spec) + futures.append(future) + + for future in futures: + result = future.result() + assert result == 0 + with open(future.stdout) as f: + output = f.readlines() + assert output[0].startswith("mpiexec") + + +@bash_app +def bash_resource_spec(parsl_resource_specification=None, stdout=parsl.AUTO_LOGNAME): + total_ranks = ( + parsl_resource_specification["ranks_per_node"] * parsl_resource_specification["num_nodes"] + ) + return f'echo "{total_ranks}"' + + +@pytest.mark.local +def test_bash_app_using_resource_spec(): + resource_spec = { + "num_nodes": 2, + "ranks_per_node": 2, + } + future = bash_resource_spec(parsl_resource_specification=resource_spec) + assert future.result() == 0 + with open(future.stdout) as f: + output = f.readlines() + total_ranks = resource_spec["num_nodes"] * resource_spec["ranks_per_node"] + assert int(output[0].strip()) == total_ranks + + +@python_app +def mock_app(sleep_dur: float = 0.0, parsl_resource_specification: Dict = {}): + import os + import time + time.sleep(sleep_dur) + + total_ranks = int(os.environ["PARSL_NUM_NODES"]) * int(os.environ["PARSL_RANKS_PER_NODE"]) + nodes = os.environ["PARSL_MPI_NODELIST"].split(',') + + return total_ranks, nodes + + +@pytest.mark.local +def test_simulated_load(rounds: int = 100): + + node_choices = (1, 2, 4) + sleep_choices = (0, 0.01, 0.02, 0.04) + ranks_per_node = (4, 8) + + futures = {} + for i in range(rounds): + resource_spec = { + "num_nodes": random.choice(node_choices), + "ranks_per_node": random.choice(ranks_per_node), + } + future = mock_app(sleep_dur=random.choice(sleep_choices), + parsl_resource_specification=resource_spec) + futures[future] = resource_spec + + for future in futures: + total_ranks, nodes = future.result(timeout=10) + assert len(nodes) == futures[future]["num_nodes"] + assert total_ranks == futures[future]["num_nodes"] * futures[future]["ranks_per_node"] diff --git a/parsl/tests/test_mpi_apps/test_mpi_prefix.py b/parsl/tests/test_mpi_apps/test_mpi_prefix.py new file mode 100644 index 0000000000..37c5dc7553 --- /dev/null +++ b/parsl/tests/test_mpi_apps/test_mpi_prefix.py @@ -0,0 +1,71 @@ +import logging +import pytest + +from parsl.executors.high_throughput.mpi_resource_management import Scheduler +from parsl.executors.high_throughput.mpi_prefix_composer import ( + compose_srun_launch_cmd, + compose_aprun_launch_cmd, + compose_mpiexec_launch_cmd, + compose_all, +) + + +resource_spec = {"num_nodes": 2, + "num_ranks": 8, + "ranks_per_node": 4} + + +@pytest.mark.local +def test_srun_launch_cmd(): + prefix_name, composed_prefix = compose_srun_launch_cmd( + resource_spec=resource_spec, node_hostnames=["node1", "node2"] + ) + assert prefix_name == "PARSL_SRUN_PREFIX" + logging.warning(composed_prefix) + + assert "None" not in composed_prefix + + +@pytest.mark.local +def test_aprun_launch_cmd(): + prefix_name, composed_prefix = compose_aprun_launch_cmd( + resource_spec=resource_spec, node_hostnames=["node1", "node2"] + ) + logging.warning(composed_prefix) + assert prefix_name == "PARSL_APRUN_PREFIX" + assert "None" not in composed_prefix + + +@pytest.mark.local +def test_mpiexec_launch_cmd(): + prefix_name, composed_prefix = compose_mpiexec_launch_cmd( + resource_spec=resource_spec, node_hostnames=["node1", "node2"] + ) + logging.warning(composed_prefix) + assert prefix_name == "PARSL_MPIEXEC_PREFIX" + assert "None" not in composed_prefix + + +@pytest.mark.local +def test_slurm_launch_cmd(): + table = compose_all( + mpi_launcher="srun", + resource_spec=resource_spec, + node_hostnames=["NODE001", "NODE002"], + ) + + assert "PARSL_MPI_PREFIX" in table + assert "PARSL_SRUN_PREFIX" in table + + +@pytest.mark.local +def test_default_launch_cmd(): + table = compose_all( + mpi_launcher="srun", + resource_spec=resource_spec, + node_hostnames=["NODE001", "NODE002"], + ) + + assert "PARSL_MPI_PREFIX" in table + assert "PARSL_SRUN_PREFIX" in table + assert table["PARSL_MPI_PREFIX"] == table["PARSL_SRUN_PREFIX"] diff --git a/parsl/tests/test_mpi_apps/test_mpi_scheduler.py b/parsl/tests/test_mpi_apps/test_mpi_scheduler.py new file mode 100644 index 0000000000..5e8f76e7b9 --- /dev/null +++ b/parsl/tests/test_mpi_apps/test_mpi_scheduler.py @@ -0,0 +1,158 @@ +import logging +import os +import mock +import pytest +import pickle +from parsl.executors.high_throughput.mpi_resource_management import TaskScheduler, MPITaskScheduler +from parsl.multiprocessing import SpawnContext +from parsl.serialize import pack_res_spec_apply_message, unpack_res_spec_apply_message + + +@pytest.fixture(autouse=True) +def set_pbs_nodefile_envvars(): + cwd = os.path.abspath(os.path.dirname(__file__)) + pbs_nodefile = os.path.join(cwd, "mocks", "pbs_nodefile.8") + with mock.patch.dict(os.environ, {"PBS_NODEFILE": pbs_nodefile}): + yield + + +@pytest.mark.local +def test_NoopScheduler(): + task_q, result_q = SpawnContext.Queue(), SpawnContext.Queue() + scheduler = TaskScheduler(task_q, result_q) + + scheduler.put_task("TaskFoo") + assert task_q.get() == "TaskFoo" + + result_q.put("Result1") + assert scheduler.get_result(True, 1) == "Result1" + + +@pytest.mark.local +def test_MPISched_put_task(): + task_q, result_q = SpawnContext.Queue(), SpawnContext.Queue() + scheduler = MPITaskScheduler(task_q, result_q) + + assert scheduler.available_nodes + assert len(scheduler.available_nodes) == 8 + assert scheduler._free_node_counter.value == 8 + + mock_task_buffer = pack_res_spec_apply_message("func", + "args", + "kwargs", + resource_specification={"num_nodes": 2, + "ranks_per_node": 2}) + task_package = {"task_id": 1, "buffer": mock_task_buffer} + scheduler.put_task(task_package) + + assert scheduler._free_node_counter.value == 6 + + +@pytest.mark.local +def test_MPISched_get_result(): + task_q, result_q = SpawnContext.Queue(), SpawnContext.Queue() + scheduler = MPITaskScheduler(task_q, result_q) + + assert scheduler.available_nodes + assert len(scheduler.available_nodes) == 8 + assert scheduler._free_node_counter.value == 8 + + nodes = [scheduler.nodes_q.get() for _ in range(4)] + scheduler._free_node_counter.value = 4 + scheduler._map_tasks_to_nodes[1] = nodes + + result_package = pickle.dumps({"task_id": 1, "type": "result", "buffer": "Foo"}) + result_q.put(result_package) + result_received = scheduler.get_result(block=True, timeout=1) + assert result_received == result_package + + assert scheduler._free_node_counter.value == 8 + + +@pytest.mark.local +def test_MPISched_roundtrip(): + task_q, result_q = SpawnContext.Queue(), SpawnContext.Queue() + scheduler = MPITaskScheduler(task_q, result_q) + + assert scheduler.available_nodes + assert len(scheduler.available_nodes) == 8 + + for round in range(1, 9): + assert scheduler._free_node_counter.value == 8 + + mock_task_buffer = pack_res_spec_apply_message("func", + "args", + "kwargs", + resource_specification={"num_nodes": round, + "ranks_per_node": 2}) + task_package = {"task_id": round, "buffer": mock_task_buffer} + scheduler.put_task(task_package) + + assert scheduler._free_node_counter.value == 8 - round + + # Pop in a mock result + result_pkl = pickle.dumps({"task_id": round, "type": "result", "buffer": "RESULT BUF"}) + result_q.put(result_pkl) + + got_result = scheduler.get_result(True, 1) + assert got_result == result_pkl + + +@pytest.mark.local +def test_MPISched_contention(): + """Second task has to wait for the first task due to insufficient resources""" + task_q, result_q = SpawnContext.Queue(), SpawnContext.Queue() + scheduler = MPITaskScheduler(task_q, result_q) + + assert scheduler.available_nodes + assert len(scheduler.available_nodes) == 8 + + assert scheduler._free_node_counter.value == 8 + + mock_task_buffer = pack_res_spec_apply_message("func", + "args", + "kwargs", + resource_specification={ + "num_nodes": 8, + "ranks_per_node": 2 + }) + task_package = {"task_id": 1, "buffer": mock_task_buffer} + scheduler.put_task(task_package) + + assert scheduler._free_node_counter.value == 0 + assert scheduler._backlog_queue.empty() + + mock_task_buffer = pack_res_spec_apply_message("func", + "args", + "kwargs", + resource_specification={ + "num_nodes": 8, + "ranks_per_node": 2 + }) + task_package = {"task_id": 2, "buffer": mock_task_buffer} + scheduler.put_task(task_package) + + # Second task should now be in the backlog_queue + assert not scheduler._backlog_queue.empty() + + # Confirm that the first task is available and has all 8 nodes provisioned + task_on_worker_side = task_q.get() + assert task_on_worker_side['task_id'] == 1 + _, _, _, resource_spec = unpack_res_spec_apply_message(task_on_worker_side['buffer']) + assert len(resource_spec['MPI_NODELIST'].split(',')) == 8 + assert task_q.empty() # Confirm that task 2 is not yet scheduled + + # Simulate worker returning result and the scheduler picking up result + result_pkl = pickle.dumps({"task_id": 1, "type": "result", "buffer": "RESULT BUF"}) + result_q.put(result_pkl) + got_result = scheduler.get_result(True, 1) + assert got_result == result_pkl + + # Now task2 must be scheduled + assert scheduler._backlog_queue.empty() + + # Pop in a mock result + task_on_worker_side = task_q.get() + assert task_on_worker_side['task_id'] == 2 + _, _, _, resource_spec = unpack_res_spec_apply_message(task_on_worker_side['buffer']) + assert len(resource_spec['MPI_NODELIST'].split(',')) == 8 diff --git a/parsl/tests/test_mpi_apps/test_resource_spec.py b/parsl/tests/test_mpi_apps/test_resource_spec.py new file mode 100644 index 0000000000..582b79052c --- /dev/null +++ b/parsl/tests/test_mpi_apps/test_resource_spec.py @@ -0,0 +1,145 @@ +import contextlib +import logging +import os +import typing + + +import pytest +import unittest + +import parsl +from parsl.app.app import python_app +from parsl.tests.configs.htex_local import fresh_config +from typing import Dict +from parsl.executors.high_throughput.mpi_resource_management import ( + get_pbs_hosts_list, + get_slurm_hosts_list, + get_nodes_in_batchjob, + identify_scheduler, +) +from parsl.executors.high_throughput.mpi_prefix_composer import ( + validate_resource_spec, + InvalidResourceSpecification +) + +EXECUTOR_LABEL = "MPI_TEST" + + +def local_setup(): + config = fresh_config() + config.executors[0].label = EXECUTOR_LABEL + config.executors[0].max_workers = 1 + parsl.load(config) + + +def local_teardown(): + logging.warning("Exiting") + parsl.dfk().cleanup() + parsl.clear() + + +@python_app +def double(x, resource_spec=None): + return x * 2 + + +@python_app +def get_env_vars(parsl_resource_specification: Dict = {}) -> Dict: + import os + + parsl_vars = {} + for key in os.environ: + if key.startswith("PARSL_"): + parsl_vars[key] = os.environ[key] + return parsl_vars + + +@pytest.mark.local +def test_resource_spec_env_vars(): + resource_spec = { + "num_nodes": 4, + "ranks_per_node": 2, + } + + assert double(5).result() == 10 + + future = get_env_vars(parsl_resource_specification=resource_spec) + + result = future.result() + assert isinstance(result, Dict) + assert result["PARSL_NUM_NODES"] == str(resource_spec["num_nodes"]) + assert result["PARSL_RANKS_PER_NODE"] == str(resource_spec["ranks_per_node"]) + + +@pytest.mark.local +@unittest.mock.patch("subprocess.check_output", return_value=b"c203-031\nc203-032\n") +def test_slurm_mocked_mpi_fetch(subprocess_check): + nodeinfo = get_slurm_hosts_list() + assert isinstance(nodeinfo, list) + assert len(nodeinfo) == 2 + + +@contextlib.contextmanager +def add_to_path(path: os.PathLike) -> typing.Generator[None, None, None]: + old_path = os.environ["PATH"] + try: + os.environ["PATH"] += str(path) + yield + finally: + os.environ["PATH"] = old_path + + +@pytest.mark.local +@pytest.mark.skip +def test_slurm_mpi_fetch(): + logging.warning(f"Current pwd : {os.path.dirname(__file__)}") + with add_to_path(os.path.dirname(__file__)): + logging.warning(f"PATH: {os.environ['PATH']}") + nodeinfo = get_slurm_hosts_list() + logging.warning(f"Got : {nodeinfo}") + + +@contextlib.contextmanager +def mock_pbs_nodefile(nodefile: str = "pbs_nodefile") -> typing.Generator[None, None, None]: + cwd = os.path.abspath(os.path.dirname(__file__)) + filename = os.path.join(cwd, "mocks", nodefile) + try: + os.environ["PBS_NODEFILE"] = filename + yield + finally: + del os.environ["PBS_NODEFILE"] + + +@pytest.mark.local +def test_get_pbs_hosts_list(): + with mock_pbs_nodefile(): + nodelist = get_pbs_hosts_list() + assert nodelist + assert len(nodelist) == 4 + + +@pytest.mark.local +def test_top_level(): + with mock_pbs_nodefile(): + scheduler = identify_scheduler() + nodelist = get_nodes_in_batchjob(scheduler) + assert len(nodelist) > 0 + + +@pytest.mark.local +@pytest.mark.parametrize( + "resource_spec, exception", + ( + ({"num_nodes": 2, "ranks_per_node": 1}, None), + ({"launcher_options": "--debug_foo"}, None), + ({"num_nodes": 2, "BAD_OPT": 1}, InvalidResourceSpecification), + ({}, None), + ) +) +def test_resource_spec(resource_spec: Dict, exception): + if exception: + with pytest.raises(exception): + validate_resource_spec(resource_spec) + else: + result = validate_resource_spec(resource_spec) + assert result is None diff --git a/parsl/tests/test_serialization/test_pack_resource_spec.py b/parsl/tests/test_serialization/test_pack_resource_spec.py new file mode 100644 index 0000000000..922e124cfc --- /dev/null +++ b/parsl/tests/test_serialization/test_pack_resource_spec.py @@ -0,0 +1,22 @@ +import pytest +from parsl.serialize import unpack_res_spec_apply_message, pack_res_spec_apply_message + + +def double(x: int, y: int = 2) -> int: + return x * y + + +@pytest.mark.local +def test_pack_and_unpack(): + args = (5,) + kwargs = {'y': 10} + resource_spec = {'num_nodes': 4} + packed = pack_res_spec_apply_message(double, args, kwargs, resource_specification=resource_spec) + + unpacked = unpack_res_spec_apply_message(packed) + assert len(unpacked) == 4 + u_fn, u_args, u_kwargs, u_res_spec = unpacked + assert u_fn == double + assert u_args == args + assert u_kwargs == kwargs + assert u_res_spec == resource_spec diff --git a/test-requirements.txt b/test-requirements.txt index ee7d1649bf..3958c45d3b 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -5,6 +5,7 @@ pytest>=7.4.0,<8 pytest-cov pytest-random-order mock>=1.0.0 +types-mock nbsphinx sphinx_rtd_theme mypy==1.5.1