Skip to content

Commit

Permalink
Support for running MPI applications with HighThroughputExecutor (#…
Browse files Browse the repository at this point in the history
…3016)

The HighThroughputExecutor (HTEX) is designed to follow the pilot job model where nodes are provisioned, agents launched per node, which then execute tasks. However, this limits the executor from launching MPI aware applications which are not limited to single node execution. This PR extends HTEX to do:

Support an MPI mode that can be toggled with enable_mpi_mode: bool option. In this mode a single manager is launched per batch-job, that in turn keeps track of the provisioned nodes, and can schedule multi-node MPI applications.
Support for passing MPI application requirements such as ranks_per_node via the special parsl_resource_specification kwarg.
MPI applications with resource specifications defined are launched with environment variables such as PARSL_MPI_PREFIX that can be used to launch the application from within bash_apps.
I've got user documentation here -> https://github.com/Parsl/parsl/blob/mpi_experimental_3/docs/userguide/mpi_apps.rst

Tested on :

Polaris@ALCF with CosmicTagger and LAMMPS
Perlmutter@NERSC with LAMMPS
  • Loading branch information
yadudoc authored Feb 16, 2024
1 parent 7f0eca4 commit a21635e
Show file tree
Hide file tree
Showing 22 changed files with 1,473 additions and 120 deletions.
379 changes: 287 additions & 92 deletions docs/userguide/mpi_apps.rst

Large diffs are not rendered by default.

8 changes: 5 additions & 3 deletions parsl/executors/flux/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from parsl.executors.errors import ScalingFailed
from parsl.providers import LocalProvider
from parsl.providers.base import ExecutionProvider
from parsl.serialize import pack_apply_message, deserialize
from parsl.serialize import deserialize, pack_res_spec_apply_message
from parsl.serialize.errors import SerializationError
from parsl.app.errors import AppException

Expand Down Expand Up @@ -284,8 +284,10 @@ def submit(
infile = os.path.join(self.working_dir, f"{task_id}_in{os.extsep}pkl")
outfile = os.path.join(self.working_dir, f"{task_id}_out{os.extsep}pkl")
try:
fn_buf = pack_apply_message(
func, args, kwargs, buffer_threshold=1024 * 1024
fn_buf = pack_res_spec_apply_message(
func, args, kwargs,
resource_specification={},
buffer_threshold=1024 * 1024
)
except TypeError:
raise SerializationError(func.__name__)
Expand Down
45 changes: 37 additions & 8 deletions parsl/executors/high_throughput/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,19 @@
from typing import List, Optional, Tuple, Union, Callable
import math

from parsl.serialize import pack_apply_message, deserialize
import parsl.launchers
from parsl.serialize import pack_res_spec_apply_message, deserialize
from parsl.serialize.errors import SerializationError, DeserializationError
from parsl.app.errors import RemoteExceptionWrapper
from parsl.jobs.states import JobStatus, JobState
from parsl.executors.high_throughput import zmq_pipes
from parsl.executors.high_throughput import interchange
from parsl.executors.errors import (
BadMessage, ScalingFailed,
UnsupportedFeatureError
)
from parsl.executors.high_throughput.mpi_prefix_composer import (
VALID_LAUNCHERS,
validate_resource_spec
)

from parsl import curvezmq
Expand Down Expand Up @@ -50,6 +54,8 @@
"{address_probe_timeout_string} "
"--hb_threshold={heartbeat_threshold} "
"--cpu-affinity {cpu_affinity} "
"{enable_mpi_mode} "
"--mpi-launcher={mpi_launcher} "
"--available-accelerators {accelerators}")


Expand Down Expand Up @@ -193,6 +199,17 @@ class HighThroughputExecutor(BlockProviderExecutor, RepresentationMixin):
worker_logdir_root : string
In case of a remote file system, specify the path to where logs will be kept.
enable_mpi_mode: bool
If enabled, MPI launch prefixes will be composed for the batch scheduler based on
the nodes available in each batch job and the resource_specification dict passed
from the app. This is an experimental feature, please refer to the following doc section
before use: https://parsl.readthedocs.io/en/stable/userguide/mpi_apps.html
mpi_launcher: str
This field is only used if enable_mpi_mode is set. Select one from the
list of supported MPI launchers = ("srun", "aprun", "mpiexec").
default: "mpiexec"
encrypted : bool
Flag to enable/disable encryption (CurveZMQ). Default is False.
"""
Expand Down Expand Up @@ -220,6 +237,8 @@ def __init__(self,
poll_period: int = 10,
address_probe_timeout: Optional[int] = None,
worker_logdir_root: Optional[str] = None,
enable_mpi_mode: bool = False,
mpi_launcher: str = "mpiexec",
block_error_handler: Union[bool, Callable[[BlockProviderExecutor, Dict[str, JobStatus]], None]] = True,
encrypted: bool = False):

Expand Down Expand Up @@ -281,6 +300,15 @@ def __init__(self,
self.encrypted = encrypted
self.cert_dir = None

self.enable_mpi_mode = enable_mpi_mode
assert mpi_launcher in VALID_LAUNCHERS, \
f"mpi_launcher must be set to one of {VALID_LAUNCHERS}"
if self.enable_mpi_mode:
assert isinstance(self.provider.launcher, parsl.launchers.SingleNodeLauncher), \
"mpi_mode requires the provider to be configured to use a SingleNodeLauncher"

self.mpi_launcher = mpi_launcher

if not launch_cmd:
launch_cmd = DEFAULT_LAUNCH_CMD
self.launch_cmd = launch_cmd
Expand All @@ -302,6 +330,7 @@ def initialize_scaling(self):
"""
debug_opts = "--debug" if self.worker_debug else ""
max_workers = "" if self.max_workers == float('inf') else "--max_workers={}".format(self.max_workers)
enable_mpi_opts = "--enable_mpi_mode " if self.enable_mpi_mode else ""

address_probe_timeout_string = ""
if self.address_probe_timeout:
Expand All @@ -323,6 +352,8 @@ def initialize_scaling(self):
cert_dir=self.cert_dir,
logdir=self.worker_logdir,
cpu_affinity=self.cpu_affinity,
enable_mpi_mode=enable_mpi_opts,
mpi_launcher=self.mpi_launcher,
accelerators=" ".join(self.available_accelerators))
self.launch_cmd = l_cmd
logger.debug("Launch command: {}".format(self.launch_cmd))
Expand Down Expand Up @@ -584,10 +615,7 @@ def submit(self, func, resource_specification, *args, **kwargs):
Returns:
Future
"""
if resource_specification:
logger.error("Ignoring the call specification. "
"Parsl call specification is not supported in HighThroughput Executor.")
raise UnsupportedFeatureError('resource specification', 'HighThroughput Executor', None)
validate_resource_spec(resource_specification)

if self.bad_state_is_set:
raise self.executor_exception
Expand All @@ -605,8 +633,9 @@ def submit(self, func, resource_specification, *args, **kwargs):
self.tasks[task_id] = fut

try:
fn_buf = pack_apply_message(func, args, kwargs,
buffer_threshold=1024 * 1024)
fn_buf = pack_res_spec_apply_message(func, args, kwargs,
resource_specification=resource_specification,
buffer_threshold=1024 * 1024)
except TypeError:
raise SerializationError(func.__name__)

Expand Down
137 changes: 137 additions & 0 deletions parsl/executors/high_throughput/mpi_prefix_composer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
import logging
from typing import Dict, List, Tuple, Set

logger = logging.getLogger(__name__)

VALID_LAUNCHERS = ('srun',
'aprun',
'mpiexec')


class InvalidResourceSpecification(Exception):
"""Exception raised when Invalid keys are supplied via resource specification"""

def __init__(self, invalid_keys: Set[str]):
self.invalid_keys = invalid_keys

def __str__(self):
return f"Invalid resource specification options supplied: {self.invalid_keys}"


def validate_resource_spec(resource_spec: Dict[str, str]):
"""Basic validation of keys in the resource_spec
Raises: InvalidResourceSpecification if the resource_spec
is invalid (e.g, contains invalid keys)
"""
user_keys = set(resource_spec.keys())
legal_keys = set(("ranks_per_node",
"num_nodes",
"num_ranks",
"launcher_options",
))
invalid_keys = user_keys - legal_keys
if invalid_keys:
raise InvalidResourceSpecification(invalid_keys)
if "num_nodes" in resource_spec:
if not resource_spec.get("num_ranks") and resource_spec.get("ranks_per_node"):
resource_spec["num_ranks"] = str(int(resource_spec["num_nodes"]) * int(resource_spec["ranks_per_node"]))
if not resource_spec.get("ranks_per_node") and resource_spec.get("num_ranks"):
resource_spec["ranks_per_node"] = str(int(resource_spec["num_ranks"]) / int(resource_spec["num_nodes"]))
return


def compose_mpiexec_launch_cmd(
resource_spec: Dict, node_hostnames: List[str]
) -> Tuple[str, str]:
"""Compose mpiexec launch command prefix"""

node_str = ",".join(node_hostnames)
args = [
"mpiexec",
"-n",
resource_spec.get("num_ranks"),
"-ppn",
resource_spec.get("ranks_per_node"),
"-hosts",
node_str,
resource_spec.get("launcher_options", ""),
]
prefix = " ".join(str(arg) for arg in args)
return "PARSL_MPIEXEC_PREFIX", prefix


def compose_srun_launch_cmd(
resource_spec: Dict, node_hostnames: List[str]
) -> Tuple[str, str]:
"""Compose srun launch command prefix"""

num_nodes = str(len(node_hostnames))
args = [
"srun",
"--ntasks",
resource_spec.get("num_ranks"),
"--ntasks-per-node",
resource_spec.get("ranks_per_node"),
"--nodelist",
",".join(node_hostnames),
"--nodes",
num_nodes,
resource_spec.get("launcher_options", ""),
]

prefix = " ".join(str(arg) for arg in args)
return "PARSL_SRUN_PREFIX", prefix


def compose_aprun_launch_cmd(
resource_spec: Dict, node_hostnames: List[str]
) -> Tuple[str, str]:
"""Compose aprun launch command prefix"""

node_str = ",".join(node_hostnames)
args = [
"aprun",
"-n",
resource_spec.get("num_ranks"),
"-N",
resource_spec.get("ranks_per_node"),
"-node-list",
node_str,
resource_spec.get("launcher_options", ""),
]
prefix = " ".join(str(arg) for arg in args)
return "PARSL_APRUN_PREFIX", prefix


def compose_all(
mpi_launcher: str, resource_spec: Dict, node_hostnames: List[str]
) -> Dict[str, str]:
"""Compose all launch command prefixes and set the default"""

all_prefixes = {}
composers = [
compose_aprun_launch_cmd,
compose_srun_launch_cmd,
compose_mpiexec_launch_cmd,
]
for composer in composers:
try:
key, prefix = composer(resource_spec, node_hostnames)
all_prefixes[key] = prefix
except Exception:
logging.exception(
f"Failed to compose launch prefix with {composer} from {resource_spec}"
)
pass

if mpi_launcher == "srun":
all_prefixes["PARSL_MPI_PREFIX"] = all_prefixes["PARSL_SRUN_PREFIX"]
elif mpi_launcher == "aprun":
all_prefixes["PARSL_MPI_PREFIX"] = all_prefixes["PARSL_APRUN_PREFIX"]
elif mpi_launcher == "mpiexec":
all_prefixes["PARSL_MPI_PREFIX"] = all_prefixes["PARSL_MPIEXEC_PREFIX"]
else:
raise RuntimeError(f"Unknown mpi_launcher:{mpi_launcher}")

return all_prefixes
Loading

0 comments on commit a21635e

Please sign in to comment.