Skip to content

Commit

Permalink
Merge branch 'master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
benclifford authored Jul 30, 2024
2 parents 3f3b3ab + 64e163c commit 64c1b54
Show file tree
Hide file tree
Showing 20 changed files with 212 additions and 193 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/parsl+flux.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,12 @@ jobs:
run: |
pytest parsl/tests/ -k "not cleannet and not unix_filesystem_permissions_required" --config parsl/tests/configs/local_threads.py --random-order --durations 10
- name: Start Flux and Test Parsl with Flux
- name: Test Parsl with Flux
run: |
flux start pytest parsl/tests/test_flux.py --config local --random-order
pytest parsl/tests/test_flux.py --config local --random-order
- name: Test Parsl with Flux Config
run: |
flux start pytest parsl/tests/ -k "not cleannet and not unix_filesystem_permissions_required" --config parsl/tests/configs/flux_local.py --random-order --durations 10
pytest parsl/tests/ -k "not cleannet and not unix_filesystem_permissions_required" --config parsl/tests/configs/flux_local.py --random-order --durations 10
2 changes: 1 addition & 1 deletion README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ For Developers

3. Install::

$ cd parsl
$ cd parsl # only if you didn't enter the top-level directory in step 2 above
$ python3 setup.py install

4. Use Parsl!
Expand Down
16 changes: 9 additions & 7 deletions docs/userguide/checkpoints.rst
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,17 @@ during development. Using app caching will ensure that only modified apps are re
App equivalence
^^^^^^^^^^^^^^^

Parsl determines app equivalence by storing the hash
of the app function. Thus, any changes to the app code (e.g.,
its signature, its body, or even the docstring within the body)
will invalidate cached values.
Parsl determines app equivalence using the name of the app function:
if two apps have the same name, then they are equivalent under this
relation.

However, Parsl does not traverse the call graph of the app function,
so changes inside functions called by an app will not invalidate
Changes inside the app, or by functions called by an app will not invalidate
cached values.

There are lots of other ways functions might be compared for equivalence,
and `parsl.dataflow.memoization.id_for_memo` provides a hook to plug in
alternate application-specific implementations.


Invocation equivalence
^^^^^^^^^^^^^^^^^^^^^^
Expand Down Expand Up @@ -92,7 +94,7 @@ Attempting to cache apps invoked with other, non-hashable, data types will
lead to an exception at invocation.

In that case, mechanisms to hash new types can be registered by a program by
implementing the ``parsl.dataflow.memoization.id_for_memo`` function for
implementing the `parsl.dataflow.memoization.id_for_memo` function for
the new type.

Ignoring arguments
Expand Down
3 changes: 2 additions & 1 deletion docs/userguide/configuring.rst
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,8 @@ Provide either the number of executors (Parsl will assume they are named in inte
strategy='none',
)
For hardware that uses Nvidia devices, Parsl allows for the oversubscription of workers to GPUS. This is intended to make use of Nvidia's `Multi-Process Service (MPS) <https://docs.nvidia.com/deploy/mps/>`_ available on many of their GPUs that allows users to run multiple concurrent processes on a single GPU. The user needs to set in the ``worker_init`` commands to start MPS on every node in the block (this is machine dependent). The ``available_accelerators`` option should then be set to the total number of GPU partitions run on a single node in the block. For example, for a node with 4 Nvidia GPUs, to create 8 workers per GPU, set ``available_accelerators=32``. GPUs will be assigned to workers in ascending order in contiguous blocks. In the example, workers 0-7 will be placed on GPU 0, workers 8-15 on GPU 1, workers 16-23 on GPU 2, and workers 24-31 on GPU 3.

Multi-Threaded Applications
---------------------------

Expand Down
7 changes: 7 additions & 0 deletions docs/userguide/mpi_apps.rst
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,13 @@ An example for ALCF's Polaris supercomputer that will run 3 MPI tasks of 2 nodes
)
.. warning::
Please note that ``Provider`` options that specify per-task or per-node resources, for example,
``SlurmProvider(cores_per_node=N, ...)`` should not be used with :class:`~parsl.executors.high_throughput.MPIExecutor`.
Parsl primarily uses a pilot job model and assumptions from that context do not translate to the MPI context. For
more info refer to :
`github issue #3006 <https://github.com/Parsl/parsl/issues/3006>`_

Writing an MPI App
------------------

Expand Down
12 changes: 3 additions & 9 deletions parsl/dataflow/dflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,14 +113,10 @@ def __init__(self, config: Config) -> None:
self.monitoring: Optional[MonitoringHub]
self.monitoring = config.monitoring

# hub address and port for interchange to connect
self.hub_address = None # type: Optional[str]
self.hub_zmq_port = None # type: Optional[int]
if self.monitoring:
if self.monitoring.logdir is None:
self.monitoring.logdir = self.run_dir
self.hub_address = self.monitoring.hub_address
self.hub_zmq_port = self.monitoring.start(self.run_id, self.run_dir, self.config.run_dir)
self.monitoring.start(self.run_id, self.run_dir, self.config.run_dir)

self.time_began = datetime.datetime.now()
self.time_completed: Optional[datetime.datetime] = None
Expand Down Expand Up @@ -1181,9 +1177,9 @@ def add_executors(self, executors: Sequence[ParslExecutor]) -> None:
for executor in executors:
executor.run_id = self.run_id
executor.run_dir = self.run_dir
executor.hub_address = self.hub_address
executor.hub_zmq_port = self.hub_zmq_port
if self.monitoring:
executor.hub_address = self.monitoring.hub_address
executor.hub_zmq_port = self.monitoring.hub_zmq_port
executor.monitoring_radio = self.monitoring.radio
if hasattr(executor, 'provider'):
if hasattr(executor.provider, 'script_dir'):
Expand Down Expand Up @@ -1460,8 +1456,6 @@ def load_checkpoints(self, checkpointDirs: Optional[Sequence[str]]) -> Dict[str,
Returns:
- dict containing, hashed -> future mappings
"""
self.memo_lookup_table = None

if checkpointDirs:
return self._load_checkpoints(checkpointDirs)
else:
Expand Down
8 changes: 4 additions & 4 deletions parsl/executors/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

from typing_extensions import Literal, Self

from parsl.monitoring.radios import MonitoringRadio
from parsl.monitoring.radios import MonitoringRadioSender


class ParslExecutor(metaclass=ABCMeta):
Expand Down Expand Up @@ -52,7 +52,7 @@ def __init__(
*,
hub_address: Optional[str] = None,
hub_zmq_port: Optional[int] = None,
monitoring_radio: Optional[MonitoringRadio] = None,
monitoring_radio: Optional[MonitoringRadioSender] = None,
run_dir: str = ".",
run_id: Optional[str] = None,
):
Expand Down Expand Up @@ -147,11 +147,11 @@ def hub_zmq_port(self, value: Optional[int]) -> None:
self._hub_zmq_port = value

@property
def monitoring_radio(self) -> Optional[MonitoringRadio]:
def monitoring_radio(self) -> Optional[MonitoringRadioSender]:
"""Local radio for sending monitoring messages
"""
return self._monitoring_radio

@monitoring_radio.setter
def monitoring_radio(self, value: Optional[MonitoringRadio]) -> None:
def monitoring_radio(self, value: Optional[MonitoringRadioSender]) -> None:
self._monitoring_radio = value
14 changes: 7 additions & 7 deletions parsl/executors/flux/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,6 @@ def __init__(
raise EnvironmentError("Cannot find Flux installation in PATH")
self.flux_path = os.path.abspath(flux_path)
self._task_id_counter = itertools.count()
self._socket = zmq.Context().socket(zmq.REP)
# Assumes a launch command cannot be None or empty
self.launch_cmd = launch_cmd or self.DEFAULT_LAUNCH_CMD
self._submission_queue: queue.Queue = queue.Queue()
Expand All @@ -213,7 +212,6 @@ def __init__(
args=(
self._submission_queue,
self._stop_event,
self._socket,
self.working_dir,
self.flux_executor_kwargs,
self.provider,
Expand Down Expand Up @@ -306,11 +304,13 @@ def _submit_wrapper(
If an exception is thrown, error out all submitted tasks.
"""
try:
_submit_flux_jobs(submission_queue, stop_event, *args, **kwargs)
except Exception as exc:
_error_out_jobs(submission_queue, stop_event, exc)
raise
with zmq.Context() as ctx:
with ctx.socket(zmq.REP) as socket:
try:
_submit_flux_jobs(submission_queue, stop_event, socket, *args, **kwargs)
except Exception as exc:
_error_out_jobs(submission_queue, stop_event, exc)
raise


def _error_out_jobs(
Expand Down
103 changes: 48 additions & 55 deletions parsl/executors/high_throughput/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
"--mpi-launcher={mpi_launcher} "
"--available-accelerators {accelerators}")

DEFAULT_INTERCHANGE_LAUNCH_CMD = "interchange.py"
DEFAULT_INTERCHANGE_LAUNCH_CMD = ["interchange.py"]

GENERAL_HTEX_PARAM_DOCS = """provider : :class:`~parsl.providers.base.ExecutionProvider`
Provider to access computation resources. Can be one of :class:`~parsl.providers.aws.aws.EC2Provider`,
Expand All @@ -78,9 +78,9 @@
cores_per_worker, nodes_per_block, heartbeat_period ,heartbeat_threshold, logdir). For example:
launch_cmd="process_worker_pool.py {debug} -c {cores_per_worker} --task_url={task_url} --result_url={result_url}"
interchange_launch_cmd : str
Custom command line string to launch the interchange process from the executor. If undefined,
the executor will use the default "interchange.py" command.
interchange_launch_cmd : Sequence[str]
Custom sequence of command line tokens to launch the interchange process from the executor. If
undefined, the executor will use the default "interchange.py" command.
address : string
An address to connect to the main Parsl process which is reachable from the network in which
Expand Down Expand Up @@ -238,7 +238,7 @@ def __init__(self,
label: str = 'HighThroughputExecutor',
provider: ExecutionProvider = LocalProvider(),
launch_cmd: Optional[str] = None,
interchange_launch_cmd: Optional[str] = None,
interchange_launch_cmd: Optional[Sequence[str]] = None,
address: Optional[str] = None,
worker_ports: Optional[Tuple[int, int]] = None,
worker_port_range: Optional[Tuple[int, int]] = (54000, 55000),
Expand Down Expand Up @@ -456,8 +456,6 @@ def _result_queue_worker(self):
"task_id" : <task_id>
"exception" : serialized exception object, on failure
}
The `None` message is a die request.
"""
logger.debug("Result queue worker starting")

Expand All @@ -475,58 +473,53 @@ def _result_queue_worker(self):

else:

if msgs is None:
logger.debug("Got None, exiting")
return
for serialized_msg in msgs:
try:
msg = pickle.loads(serialized_msg)
except pickle.UnpicklingError:
raise BadMessage("Message received could not be unpickled")

else:
for serialized_msg in msgs:
if msg['type'] == 'heartbeat':
continue
elif msg['type'] == 'result':
try:
msg = pickle.loads(serialized_msg)
except pickle.UnpicklingError:
raise BadMessage("Message received could not be unpickled")
tid = msg['task_id']
except Exception:
raise BadMessage("Message received does not contain 'task_id' field")

if tid == -1 and 'exception' in msg:
logger.warning("Executor shutting down due to exception from interchange")
exception = deserialize(msg['exception'])
self.set_bad_state_and_fail_all(exception)
break

task_fut = self.tasks.pop(tid)

if 'result' in msg:
result = deserialize(msg['result'])
task_fut.set_result(result)

if msg['type'] == 'heartbeat':
continue
elif msg['type'] == 'result':
elif 'exception' in msg:
try:
tid = msg['task_id']
except Exception:
raise BadMessage("Message received does not contain 'task_id' field")

if tid == -1 and 'exception' in msg:
logger.warning("Executor shutting down due to exception from interchange")
exception = deserialize(msg['exception'])
self.set_bad_state_and_fail_all(exception)
break

task_fut = self.tasks.pop(tid)

if 'result' in msg:
result = deserialize(msg['result'])
task_fut.set_result(result)

elif 'exception' in msg:
try:
s = deserialize(msg['exception'])
# s should be a RemoteExceptionWrapper... so we can reraise it
if isinstance(s, RemoteExceptionWrapper):
try:
s.reraise()
except Exception as e:
task_fut.set_exception(e)
elif isinstance(s, Exception):
task_fut.set_exception(s)
else:
raise ValueError("Unknown exception-like type received: {}".format(type(s)))
except Exception as e:
# TODO could be a proper wrapped exception?
task_fut.set_exception(
DeserializationError("Received exception, but handling also threw an exception: {}".format(e)))
else:
raise BadMessage("Message received is neither result or exception")
s = deserialize(msg['exception'])
# s should be a RemoteExceptionWrapper... so we can reraise it
if isinstance(s, RemoteExceptionWrapper):
try:
s.reraise()
except Exception as e:
task_fut.set_exception(e)
elif isinstance(s, Exception):
task_fut.set_exception(s)
else:
raise ValueError("Unknown exception-like type received: {}".format(type(s)))
except Exception as e:
# TODO could be a proper wrapped exception?
task_fut.set_exception(
DeserializationError("Received exception, but handling also threw an exception: {}".format(e)))
else:
raise BadMessage("Message received with unknown type {}".format(msg['type']))
raise BadMessage("Message received is neither result or exception")
else:
raise BadMessage("Message received with unknown type {}".format(msg['type']))

logger.info("Result queue worker finished")

Expand Down Expand Up @@ -555,7 +548,7 @@ def _start_local_interchange_process(self) -> None:

config_pickle = pickle.dumps(interchange_config)

self.interchange_proc = subprocess.Popen(self.interchange_launch_cmd.encode("utf-8"), stdin=subprocess.PIPE)
self.interchange_proc = subprocess.Popen(self.interchange_launch_cmd, stdin=subprocess.PIPE)
stdin = self.interchange_proc.stdin
assert stdin is not None, "Popen should have created an IO object (vs default None) because of PIPE mode"

Expand Down
1 change: 1 addition & 0 deletions parsl/executors/high_throughput/interchange.py
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,7 @@ def process_task_outgoing_incoming(
self._ready_managers[manager_id] = {'last_heartbeat': time.time(),
'idle_since': time.time(),
'block_id': None,
'start_time': msg['start_time'],
'max_capacity': 0,
'worker_count': 0,
'active': True,
Expand Down
1 change: 1 addition & 0 deletions parsl/executors/high_throughput/manager_record.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

class ManagerRecord(TypedDict, total=False):
block_id: Optional[str]
start_time: float
tasks: List[Any]
worker_count: int
max_capacity: int
Expand Down
Loading

0 comments on commit 64c1b54

Please sign in to comment.