Skip to content

Commit

Permalink
Merge branch 'master' into tmp_function_data
Browse files Browse the repository at this point in the history
  • Loading branch information
benclifford authored Sep 4, 2024
2 parents 5ec7cdb + 3a256de commit cd7229f
Show file tree
Hide file tree
Showing 15 changed files with 204 additions and 85 deletions.
1 change: 1 addition & 0 deletions docs/reference.rst
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ Launchers
parsl.launchers.SrunMPILauncher
parsl.launchers.GnuParallelLauncher
parsl.launchers.MpiExecLauncher
parsl.launchers.MpiRunLauncher
parsl.launchers.JsrunLauncher
parsl.launchers.WrappedLauncher

Expand Down
17 changes: 17 additions & 0 deletions docs/userguide/configuring.rst
Original file line number Diff line number Diff line change
Expand Up @@ -536,12 +536,27 @@ Center's **Expanse** supercomputer. The example is designed to be executed on th
.. literalinclude:: ../../parsl/configs/expanse.py


Improv (Argonne LCRC)
---------------------

.. image:: https://www.lcrc.anl.gov/sites/default/files/styles/965_wide/public/2023-12/20231214_114057.jpg?itok=A-Rz5pP9

**Improv** is a PBS Pro based supercomputer at Argonne's Laboratory Computing Resource
Center (LCRC). The following snippet is an example configuration that uses `parsl.providers.PBSProProvider`
and `parsl.launchers.MpiRunLauncher` to run on multinode jobs.

.. literalinclude:: ../../parsl/configs/improv.py


.. _configuring_nersc_cori:

Perlmutter (NERSC)
------------------

NERSC provides documentation on `how to use Parsl on Perlmutter <https://docs.nersc.gov/jobs/workflow/parsl/>`_.
Perlmutter is a Slurm based HPC system and parsl uses `parsl.providers.SlurmProvider` with `parsl.launchers.SrunLauncher`
to launch tasks onto this machine.


Frontera (TACC)
---------------
Expand Down Expand Up @@ -599,6 +614,8 @@ Polaris (ALCF)
:width: 75%

ALCF provides documentation on `how to use Parsl on Polaris <https://docs.alcf.anl.gov/polaris/workflows/parsl/>`_.
Polaris uses `parsl.providers.PBSProProvider` and `parsl.launchers.MpiExecLauncher` to launch tasks onto the HPC system.



Stampede2 (TACC)
Expand Down
34 changes: 34 additions & 0 deletions parsl/configs/improv.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from parsl.config import Config
from parsl.executors import HighThroughputExecutor
from parsl.launchers import MpiRunLauncher
from parsl.providers import PBSProProvider

config = Config(
executors=[
HighThroughputExecutor(
label="Improv_multinode",
max_workers_per_node=32,
provider=PBSProProvider(
account="YOUR_ALLOCATION_ON_IMPROV",
# PBS directives (header lines), for example:
# scheduler_options='#PBS -l mem=4gb',
scheduler_options='',

queue="compute",

# Command to be run before starting a worker:
# **WARNING** Improv requires an openmpi module to be
# loaded for the MpiRunLauncher. Add additional env
# load commands to this multiline string.
worker_init='''
module load gcc/13.2.0;
module load openmpi/5.0.3-gcc-13.2.0; ''',
launcher=MpiRunLauncher(),

# number of compute nodes allocated for each block
nodes_per_block=2,
walltime='00:10:00'
),
),
],
)
21 changes: 3 additions & 18 deletions parsl/executors/high_throughput/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,9 +199,6 @@ class HighThroughputExecutor(BlockProviderExecutor, RepresentationMixin, UsageIn
will check the available memory at startup and limit the number of workers such that
the there's sufficient memory for each worker. Default: None
max_workers : int
Deprecated. Please use max_workers_per_node instead.
max_workers_per_node : int
Caps the number of workers launched per node. Default: None
Expand Down Expand Up @@ -239,7 +236,6 @@ def __init__(self,
worker_debug: bool = False,
cores_per_worker: float = 1.0,
mem_per_worker: Optional[float] = None,
max_workers: Optional[Union[int, float]] = None,
max_workers_per_node: Optional[Union[int, float]] = None,
cpu_affinity: str = 'none',
available_accelerators: Union[int, Sequence[str]] = (),
Expand Down Expand Up @@ -272,9 +268,7 @@ def __init__(self,
else:
self.all_addresses = ','.join(get_all_addresses())

if max_workers:
self._warn_deprecated("max_workers", "max_workers_per_node")
self.max_workers_per_node = max_workers_per_node or max_workers or float("inf")
self.max_workers_per_node = max_workers_per_node or float("inf")

mem_slots = self.max_workers_per_node
cpu_slots = self.max_workers_per_node
Expand Down Expand Up @@ -335,16 +329,6 @@ def _warn_deprecated(self, old: str, new: str):
stacklevel=2
)

@property
def max_workers(self):
self._warn_deprecated("max_workers", "max_workers_per_node")
return self.max_workers_per_node

@max_workers.setter
def max_workers(self, val: Union[int, float]):
self._warn_deprecated("max_workers", "max_workers_per_node")
self.max_workers_per_node = val

@property
def logdir(self):
return "{}/{}".format(self.run_dir, self.label)
Expand Down Expand Up @@ -790,7 +774,8 @@ def status(self) -> Dict[str, JobStatus]:
connected_blocks = self.connected_blocks()
for job_id in job_status:
job_info = job_status[job_id]
if job_info.terminal and job_id not in connected_blocks:
if job_info.terminal and job_id not in connected_blocks and job_info.state != JobState.SCALED_IN:
logger.debug("Rewriting job %s from status %s to MISSING", job_id, job_info)
job_status[job_id].state = JobState.MISSING
if job_status[job_id].message is None:
job_status[job_id].message = (
Expand Down
60 changes: 31 additions & 29 deletions parsl/executors/high_throughput/interchange.py
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,7 @@ def start(self) -> None:

self.zmq_context.destroy()
delta = time.time() - start
logger.info("Processed {} tasks in {} seconds".format(self.count, delta))
logger.info(f"Processed {self.count} tasks in {delta} seconds")
logger.warning("Exiting")

def process_task_outgoing_incoming(
Expand All @@ -396,17 +396,16 @@ def process_task_outgoing_incoming(
try:
msg = json.loads(message[1].decode('utf-8'))
except Exception:
logger.warning("Got Exception reading message from manager: {!r}".format(
manager_id), exc_info=True)
logger.debug("Message: \n{!r}\n".format(message[1]))
logger.warning(f"Got Exception reading message from manager: {manager_id!r}", exc_info=True)
logger.debug("Message:\n %r\n", message[1])
return

# perform a bit of validation on the structure of the deserialized
# object, at least enough to behave like a deserialization error
# in obviously malformed cases
if not isinstance(msg, dict) or 'type' not in msg:
logger.error(f"JSON message was not correctly formatted from manager: {manager_id!r}")
logger.debug("Message: \n{!r}\n".format(message[1]))
logger.debug("Message:\n %r\n", message[1])
return

if msg['type'] == 'registration':
Expand All @@ -425,7 +424,7 @@ def process_task_outgoing_incoming(
self.connected_block_history.append(msg['block_id'])

interesting_managers.add(manager_id)
logger.info("Adding manager: {!r} to ready queue".format(manager_id))
logger.info(f"Adding manager: {manager_id!r} to ready queue")
m = self._ready_managers[manager_id]

# m is a ManagerRecord, but msg is a dict[Any,Any] and so can
Expand All @@ -434,12 +433,12 @@ def process_task_outgoing_incoming(
# later.
m.update(msg) # type: ignore[typeddict-item]

logger.info("Registration info for manager {!r}: {}".format(manager_id, msg))
logger.info(f"Registration info for manager {manager_id!r}: {msg}")
self._send_monitoring_info(monitoring_radio, m)

if (msg['python_v'].rsplit(".", 1)[0] != self.current_platform['python_v'].rsplit(".", 1)[0] or
msg['parsl_v'] != self.current_platform['parsl_v']):
logger.error("Manager {!r} has incompatible version info with the interchange".format(manager_id))
logger.error(f"Manager {manager_id!r} has incompatible version info with the interchange")
logger.debug("Setting kill event")
kill_event.set()
e = VersionMismatch("py.v={} parsl.v={}".format(self.current_platform['python_v'].rsplit(".", 1)[0],
Expand All @@ -452,16 +451,15 @@ def process_task_outgoing_incoming(
self.results_outgoing.send(pkl_package)
logger.error("Sent failure reports, shutting down interchange")
else:
logger.info("Manager {!r} has compatible Parsl version {}".format(manager_id, msg['parsl_v']))
logger.info("Manager {!r} has compatible Python version {}".format(manager_id,
msg['python_v'].rsplit(".", 1)[0]))
logger.info(f"Manager {manager_id!r} has compatible Parsl version {msg['parsl_v']}")
logger.info(f"Manager {manager_id!r} has compatible Python version {msg['python_v'].rsplit('.', 1)[0]}")
elif msg['type'] == 'heartbeat':
self._ready_managers[manager_id]['last_heartbeat'] = time.time()
logger.debug("Manager {!r} sent heartbeat via tasks connection".format(manager_id))
logger.debug("Manager %r sent heartbeat via tasks connection", manager_id)
self.task_outgoing.send_multipart([manager_id, b'', PKL_HEARTBEAT_CODE])
elif msg['type'] == 'drain':
self._ready_managers[manager_id]['draining'] = True
logger.debug(f"Manager {manager_id!r} requested drain")
logger.debug("Manager %r requested drain", manager_id)
else:
logger.error(f"Unexpected message type received from manager: {msg['type']}")
logger.debug("leaving task_outgoing section")
Expand All @@ -484,9 +482,11 @@ def expire_drained_managers(self, interesting_managers: Set[bytes], monitoring_r
def process_tasks_to_send(self, interesting_managers: Set[bytes]) -> None:
# Check if there are tasks that could be sent to managers

logger.debug("Managers count (interesting/total): {interesting}/{total}".format(
total=len(self._ready_managers),
interesting=len(interesting_managers)))
logger.debug(
"Managers count (interesting/total): %d/%d",
len(interesting_managers),
len(self._ready_managers)
)

if interesting_managers and not self.pending_task_queue.empty():
shuffled_managers = self.manager_selector.sort_managers(self._ready_managers, interesting_managers)
Expand All @@ -497,7 +497,7 @@ def process_tasks_to_send(self, interesting_managers: Set[bytes]) -> None:
tasks_inflight = len(m['tasks'])
real_capacity = m['max_capacity'] - tasks_inflight

if (real_capacity and m['active'] and not m['draining']):
if real_capacity and m["active"] and not m["draining"]:
tasks = self.get_tasks(real_capacity)
if tasks:
self.task_outgoing.send_multipart([manager_id, b'', pickle.dumps(tasks)])
Expand All @@ -506,19 +506,19 @@ def process_tasks_to_send(self, interesting_managers: Set[bytes]) -> None:
tids = [t['task_id'] for t in tasks]
m['tasks'].extend(tids)
m['idle_since'] = None
logger.debug("Sent tasks: {} to manager {!r}".format(tids, manager_id))
logger.debug("Sent tasks: %s to manager %r", tids, manager_id)
# recompute real_capacity after sending tasks
real_capacity = m['max_capacity'] - tasks_inflight
if real_capacity > 0:
logger.debug("Manager {!r} has free capacity {}".format(manager_id, real_capacity))
logger.debug("Manager %r has free capacity %s", manager_id, real_capacity)
# ... so keep it in the interesting_managers list
else:
logger.debug("Manager {!r} is now saturated".format(manager_id))
logger.debug("Manager %r is now saturated", manager_id)
interesting_managers.remove(manager_id)
else:
interesting_managers.remove(manager_id)
# logger.debug("Nothing to send to manager {}".format(manager_id))
logger.debug("leaving _ready_managers section, with {} managers still interesting".format(len(interesting_managers)))
logger.debug("leaving _ready_managers section, with %s managers still interesting", len(interesting_managers))
else:
logger.debug("either no interesting managers or no tasks, so skipping manager pass")

Expand All @@ -528,9 +528,9 @@ def process_results_incoming(self, interesting_managers: Set[bytes], monitoring_
logger.debug("entering results_incoming section")
manager_id, *all_messages = self.results_incoming.recv_multipart()
if manager_id not in self._ready_managers:
logger.warning("Received a result from a un-registered manager: {!r}".format(manager_id))
logger.warning(f"Received a result from a un-registered manager: {manager_id!r}")
else:
logger.debug(f"Got {len(all_messages)} result items in batch from manager {manager_id!r}")
logger.debug("Got %s result items in batch from manager %r", len(all_messages), manager_id)

b_messages = []

Expand All @@ -548,10 +548,10 @@ def process_results_incoming(self, interesting_managers: Set[bytes], monitoring_

monitoring_radio.send(r['payload'])
elif r['type'] == 'heartbeat':
logger.debug(f"Manager {manager_id!r} sent heartbeat via results connection")
logger.debug("Manager %r sent heartbeat via results connection", manager_id)
b_messages.append((p_message, r))
else:
logger.error("Interchange discarding result_queue message of unknown type: {}".format(r['type']))
logger.error("Interchange discarding result_queue message of unknown type: %s", r["type"])

got_result = False
m = self._ready_managers[manager_id]
Expand All @@ -560,14 +560,16 @@ def process_results_incoming(self, interesting_managers: Set[bytes], monitoring_
if r['type'] == 'result':
got_result = True
try:
logger.debug(f"Removing task {r['task_id']} from manager record {manager_id!r}")
logger.debug("Removing task %s from manager record %r", r["task_id"], manager_id)
m['tasks'].remove(r['task_id'])
except Exception:
# If we reach here, there's something very wrong.
logger.exception("Ignoring exception removing task_id {} for manager {!r} with task list {}".format(
logger.exception(
"Ignoring exception removing task_id %s for manager %r with task list %s",
r['task_id'],
manager_id,
m['tasks']))
m["tasks"]
)

b_messages_to_send = []
for (b_message, _) in b_messages:
Expand All @@ -578,7 +580,7 @@ def process_results_incoming(self, interesting_managers: Set[bytes], monitoring_
self.results_outgoing.send_multipart(b_messages_to_send)
logger.debug("Sent messages on results_outgoing")

logger.debug(f"Current tasks on manager {manager_id!r}: {m['tasks']}")
logger.debug("Current tasks on manager %r: %s", manager_id, m["tasks"])
if len(m['tasks']) == 0 and m['idle_since'] is None:
m['idle_since'] = time.time()

Expand Down
7 changes: 5 additions & 2 deletions parsl/executors/status_handling.py
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,10 @@ def scale_in_facade(self, n: int, max_idletime: Optional[float] = None) -> List[
if block_ids is not None:
new_status = {}
for block_id in block_ids:
new_status[block_id] = JobStatus(JobState.CANCELLED)
del self._status[block_id]
logger.debug("Marking block %s as SCALED_IN", block_id)
s = JobStatus(JobState.SCALED_IN)
new_status[block_id] = s
self._status[block_id] = s
self._simulated_status[block_id] = s
self.send_monitoring_info(new_status)
return block_ids
7 changes: 6 additions & 1 deletion parsl/jobs/states.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,17 @@ class JobState(IntEnum):
bad worker environment or network connectivity issues.
"""

SCALED_IN = 9
"""This job has been deliberately scaled in. Scaling code should not be concerned
that the job never ran (for example for error handling purposes).
"""

def __str__(self) -> str:
return f"{self.__class__.__name__}.{self.name}"


TERMINAL_STATES = [JobState.CANCELLED, JobState.COMPLETED, JobState.FAILED,
JobState.TIMEOUT, JobState.MISSING]
JobState.TIMEOUT, JobState.MISSING, JobState.SCALED_IN]


class JobStatus:
Expand Down
29 changes: 15 additions & 14 deletions parsl/tests/site_tests/README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -46,27 +46,28 @@ Adding a new site
1. Specialized python builds for the system (for eg, Summit)
2. Anaconda available via modules
3. User's conda installation
* Add a new block to `conda_setup.sh` that installs a fresh environment and writes out
the activation commands to `~/setup_parsl_test_env.sh`
* Add a site config to `parsl/tests/configs/<SITE.py>` and add your local user options
to `parsl/tests/configs/local_user_opts.py`. For eg, `here's mine<https://gist.github.com/yadudoc/b71765284d2db0706c4f43605dd8b8d6>`_
Make sure that the site config uses the `fresh_config` pattern.
* Add a new block to ``conda_setup.sh`` that installs a fresh environment and writes out
the activation commands to ``~/setup_parsl_test_env.sh``
* Add a site config to ``parsl/tests/configs/<SITE.py>`` and add your local user options
to ``parsl/tests/configs/local_user_opts.py``. For example,
`here's mine<https://gist.github.com/yadudoc/b71765284d2db0706c4f43605dd8b8d6>`_
Make sure that the site config uses the ``fresh_config`` pattern.
Please ensure that the site config uses:
* max_workers = 1
* init_blocks = 1
* min_blocks = 0
* ``max_workers_per_node = 1``
* ``init_blocks = 1``
* ``min_blocks = 0``

* Add this site config to `parsl/tests/site_tests/site_config_selector.py`
* Reinstall parsl, using `pip install .`
* Test a single test: `python3 test_site.py -d` to confirm that the site works correctly.
* Once tests are passing run the whole site_test with `make site_test`
* Add this site config to ``parsl/tests/site_tests/site_config_selector.py``
* Reinstall parsl, using ``pip install .``
* Test a single test: ``python3 test_site.py -d`` to confirm that the site works correctly.
* Once tests are passing run the whole site_test with ``make site_test``


Shared filesystem option
------------------------

There is a new env variable "SHARED_FS_OPTIONS" to pass markers to pytest to skip certain tests.
There is a new env variable ``SHARED_FS_OPTIONS`` to pass markers to pytest to skip certain tests.

When there's a shared-FS, the default NoOpStaging works. However, when there's no shared-FS some tests
that uses File objects require a staging provider (eg. rsync). These tests can be turned off with
`-k "not staging_required"`
``-k "not staging_required"``
Loading

0 comments on commit cd7229f

Please sign in to comment.