diff --git a/.github/workflows/parsl+flux.yaml b/.github/workflows/parsl+flux.yaml index e733f14199..8b8c43d8b2 100644 --- a/.github/workflows/parsl+flux.yaml +++ b/.github/workflows/parsl+flux.yaml @@ -31,12 +31,12 @@ jobs: run: | pytest parsl/tests/ -k "not cleannet and not unix_filesystem_permissions_required" --config parsl/tests/configs/local_threads.py --random-order --durations 10 - - name: Start Flux and Test Parsl with Flux + - name: Test Parsl with Flux run: | - flux start pytest parsl/tests/test_flux.py --config local --random-order + pytest parsl/tests/test_flux.py --config local --random-order - name: Test Parsl with Flux Config run: | - flux start pytest parsl/tests/ -k "not cleannet and not unix_filesystem_permissions_required" --config parsl/tests/configs/flux_local.py --random-order --durations 10 + pytest parsl/tests/ -k "not cleannet and not unix_filesystem_permissions_required" --config parsl/tests/configs/flux_local.py --random-order --durations 10 diff --git a/README.rst b/README.rst index fb1070e7d7..72048d39f4 100644 --- a/README.rst +++ b/README.rst @@ -109,7 +109,7 @@ For Developers 3. Install:: - $ cd parsl + $ cd parsl # only if you didn't enter the top-level directory in step 2 above $ python3 setup.py install 4. Use Parsl! diff --git a/docs/historical/changelog.rst b/docs/historical/changelog.rst index 18fe6ca5b1..931998f93d 100644 --- a/docs/historical/changelog.rst +++ b/docs/historical/changelog.rst @@ -334,7 +334,7 @@ New Functionality * New launcher: `parsl.launchers.WrappedLauncher` for launching tasks inside containers. -* `parsl.channels.SSHChannel` now supports a ``key_filename`` kwarg `issue#1639 `_ +* ``parsl.channels.SSHChannel`` now supports a ``key_filename`` kwarg `issue#1639 `_ * Newly added Makefile wraps several frequent developer operations such as: @@ -442,7 +442,7 @@ New Functionality module, parsl.data_provider.globus * `parsl.executors.WorkQueueExecutor`: a new executor that integrates functionality from `Work Queue `_ is now available. -* New provider to support for Ad-Hoc clusters `parsl.providers.AdHocProvider` +* New provider to support for Ad-Hoc clusters ``parsl.providers.AdHocProvider`` * New provider added to support LSF on Summit `parsl.providers.LSFProvider` * Support for CPU and Memory resource hints to providers `(github) `_. * The ``logging_level=logging.INFO`` in `parsl.monitoring.MonitoringHub` is replaced with ``monitoring_debug=False``: @@ -468,7 +468,7 @@ New Functionality * Several test-suite improvements that have dramatically reduced test duration. * Several improvements to the Monitoring interface. -* Configurable port on `parsl.channels.SSHChannel`. +* Configurable port on ``parsl.channels.SSHChannel``. * ``suppress_failure`` now defaults to True. * `parsl.executors.HighThroughputExecutor` is the recommended executor, and ``IPyParallelExecutor`` is deprecated. * `parsl.executors.HighThroughputExecutor` will expose worker information via environment variables: ``PARSL_WORKER_RANK`` and ``PARSL_WORKER_COUNT`` @@ -532,7 +532,7 @@ New Functionality * Cleaner user app file log management. * Updated configurations using `parsl.executors.HighThroughputExecutor` in the configuration section of the userguide. -* Support for OAuth based SSH with `parsl.channels.OAuthSSHChannel`. +* Support for OAuth based SSH with ``parsl.channels.OAuthSSHChannel``. Bug Fixes ^^^^^^^^^ diff --git a/docs/reference.rst b/docs/reference.rst index 1af850792c..d8e18bd244 100644 --- a/docs/reference.rst +++ b/docs/reference.rst @@ -38,15 +38,9 @@ Configuration Channels ======== -.. autosummary:: - :toctree: stubs - :nosignatures: - - parsl.channels.base.Channel - parsl.channels.LocalChannel - parsl.channels.SSHChannel - parsl.channels.OAuthSSHChannel - parsl.channels.SSHInteractiveLoginChannel +Channels are deprecated in Parsl. See +`issue 3515 `_ +for further discussion. Data management =============== @@ -109,7 +103,6 @@ Providers :toctree: stubs :nosignatures: - parsl.providers.AdHocProvider parsl.providers.AWSProvider parsl.providers.CobaltProvider parsl.providers.CondorProvider diff --git a/docs/userguide/checkpoints.rst b/docs/userguide/checkpoints.rst index dbcfcfc760..8867107b7a 100644 --- a/docs/userguide/checkpoints.rst +++ b/docs/userguide/checkpoints.rst @@ -49,15 +49,17 @@ during development. Using app caching will ensure that only modified apps are re App equivalence ^^^^^^^^^^^^^^^ -Parsl determines app equivalence by storing the hash -of the app function. Thus, any changes to the app code (e.g., -its signature, its body, or even the docstring within the body) -will invalidate cached values. +Parsl determines app equivalence using the name of the app function: +if two apps have the same name, then they are equivalent under this +relation. -However, Parsl does not traverse the call graph of the app function, -so changes inside functions called by an app will not invalidate +Changes inside the app, or by functions called by an app will not invalidate cached values. +There are lots of other ways functions might be compared for equivalence, +and `parsl.dataflow.memoization.id_for_memo` provides a hook to plug in +alternate application-specific implementations. + Invocation equivalence ^^^^^^^^^^^^^^^^^^^^^^ @@ -92,7 +94,7 @@ Attempting to cache apps invoked with other, non-hashable, data types will lead to an exception at invocation. In that case, mechanisms to hash new types can be registered by a program by -implementing the ``parsl.dataflow.memoization.id_for_memo`` function for +implementing the `parsl.dataflow.memoization.id_for_memo` function for the new type. Ignoring arguments diff --git a/docs/userguide/configuring.rst b/docs/userguide/configuring.rst index 24ce0ca938..f3fe5cc407 100644 --- a/docs/userguide/configuring.rst +++ b/docs/userguide/configuring.rst @@ -15,7 +15,7 @@ queues, durations, and data management options. The following example shows a basic configuration object (:class:`~parsl.config.Config`) for the Frontera supercomputer at TACC. This config uses the `parsl.executors.HighThroughputExecutor` to submit -tasks from a login node (`parsl.channels.LocalChannel`). It requests an allocation of +tasks from a login node. It requests an allocation of 128 nodes, deploying 1 worker for each of the 56 cores per node, from the normal partition. To limit network connections to just the internal network the config specifies the address used by the infiniband interface with ``address_by_interface('ib0')`` @@ -23,7 +23,6 @@ used by the infiniband interface with ``address_by_interface('ib0')`` .. code-block:: python from parsl.config import Config - from parsl.channels import LocalChannel from parsl.providers import SlurmProvider from parsl.executors import HighThroughputExecutor from parsl.launchers import SrunLauncher @@ -36,7 +35,6 @@ used by the infiniband interface with ``address_by_interface('ib0')`` address=address_by_interface('ib0'), max_workers_per_node=56, provider=SlurmProvider( - channel=LocalChannel(), nodes_per_block=128, init_blocks=1, partition='normal', @@ -197,22 +195,6 @@ Stepping through the following question should help formulate a suitable configu are on a **native Slurm** system like :ref:`configuring_nersc_cori` -4) Where will the main Parsl program run and how will it communicate with the apps? - -+------------------------+--------------------------+---------------------------------------------------+ -| Parsl program location | App execution target | Suitable channel | -+========================+==========================+===================================================+ -| Laptop/Workstation | Laptop/Workstation | `parsl.channels.LocalChannel` | -+------------------------+--------------------------+---------------------------------------------------+ -| Laptop/Workstation | Cloud Resources | No channel is needed | -+------------------------+--------------------------+---------------------------------------------------+ -| Laptop/Workstation | Clusters with no 2FA | `parsl.channels.SSHChannel` | -+------------------------+--------------------------+---------------------------------------------------+ -| Laptop/Workstation | Clusters with 2FA | `parsl.channels.SSHInteractiveLoginChannel` | -+------------------------+--------------------------+---------------------------------------------------+ -| Login node | Cluster/Supercomputer | `parsl.channels.LocalChannel` | -+------------------------+--------------------------+---------------------------------------------------+ - Heterogeneous Resources ----------------------- @@ -324,9 +306,13 @@ and Work Queue does not require Python to run. Accelerators ------------ -Many modern clusters provide multiple accelerators per compute note, yet many applications are best suited to using a single accelerator per task. -Parsl supports pinning each worker to difference accelerators using ``available_accelerators`` option of the :class:`~parsl.executors.HighThroughputExecutor`. -Provide either the number of executors (Parsl will assume they are named in integers starting from zero) or a list of the names of the accelerators available on the node. +Many modern clusters provide multiple accelerators per compute note, yet many applications are best suited to using a +single accelerator per task. Parsl supports pinning each worker to different accelerators using +``available_accelerators`` option of the :class:`~parsl.executors.HighThroughputExecutor`. Provide either the number of +executors (Parsl will assume they are named in integers starting from zero) or a list of the names of the accelerators +available on the node. Parsl will limit the number of workers it launches to the number of accelerators specified, +in other words, you cannot have more workers per node than there are accelerators. By default, Parsl will launch +as many workers as the accelerators specified via ``available_accelerators``. .. code-block:: python @@ -337,7 +323,6 @@ Provide either the number of executors (Parsl will assume they are named in inte worker_debug=True, available_accelerators=2, provider=LocalProvider( - channel=LocalChannel(), init_blocks=1, max_blocks=1, ), @@ -346,7 +331,38 @@ Provide either the number of executors (Parsl will assume they are named in inte strategy='none', ) -For hardware that uses Nvidia devices, Parsl allows for the oversubscription of workers to GPUS. This is intended to make use of Nvidia's `Multi-Process Service (MPS) `_ available on many of their GPUs that allows users to run multiple concurrent processes on a single GPU. The user needs to set in the ``worker_init`` commands to start MPS on every node in the block (this is machine dependent). The ``available_accelerators`` option should then be set to the total number of GPU partitions run on a single node in the block. For example, for a node with 4 Nvidia GPUs, to create 8 workers per GPU, set ``available_accelerators=32``. GPUs will be assigned to workers in ascending order in contiguous blocks. In the example, workers 0-7 will be placed on GPU 0, workers 8-15 on GPU 1, workers 16-23 on GPU 2, and workers 24-31 on GPU 3. +It is possible to bind multiple/specific accelerators to each worker by specifying a list of comma separated strings +each specifying accelerators. In the context of binding to NVIDIA GPUs, this works by setting ``CUDA_VISIBLE_DEVICES`` +on each worker to a specific string in the list supplied to ``available_accelerators``. + +Here's an example: + +.. code-block:: python + + # The following config is trimmed for clarity + local_config = Config( + executors=[ + HighThroughputExecutor( + # Starts 2 workers per node, each bound to 2 GPUs + available_accelerators=["0,1", "2,3"], + + # Start a single worker bound to all 4 GPUs + # available_accelerators=["0,1,2,3"] + ) + ], + ) + +GPU Oversubscription +"""""""""""""""""""" + +For hardware that uses Nvidia devices, Parsl allows for the oversubscription of workers to GPUS. This is intended to +make use of Nvidia's `Multi-Process Service (MPS) `_ available on many of their +GPUs that allows users to run multiple concurrent processes on a single GPU. The user needs to set in the +``worker_init`` commands to start MPS on every node in the block (this is machine dependent). The +``available_accelerators`` option should then be set to the total number of GPU partitions run on a single node in the +block. For example, for a node with 4 Nvidia GPUs, to create 8 workers per GPU, set ``available_accelerators=32``. +GPUs will be assigned to workers in ascending order in contiguous blocks. In the example, workers 0-7 will be placed +on GPU 0, workers 8-15 on GPU 1, workers 16-23 on GPU 2, and workers 24-31 on GPU 3. Multi-Threaded Applications --------------------------- @@ -372,7 +388,6 @@ Select the best blocking strategy for processor's cache hierarchy (choose ``alte worker_debug=True, cpu_affinity='alternating', provider=LocalProvider( - channel=LocalChannel(), init_blocks=1, max_blocks=1, ), @@ -412,18 +427,12 @@ These include ``OMP_NUM_THREADS``, ``GOMP_COMP_AFFINITY``, and ``KMP_THREAD_AFFI Ad-Hoc Clusters --------------- -Any collection of compute nodes without a scheduler can be considered an -ad-hoc cluster. Often these machines have a shared file system such as NFS or Lustre. -In order to use these resources with Parsl, they need to set-up for password-less SSH access. - -To use these ssh-accessible collection of nodes as an ad-hoc cluster, we use -the `parsl.providers.AdHocProvider` with an `parsl.channels.SSHChannel` to each node. An example -configuration follows. +Parsl's support of ad-hoc clusters of compute nodes without a scheduler +is deprecated. -.. literalinclude:: ../../parsl/configs/ad_hoc.py - -.. note:: - Multiple blocks should not be assigned to each node when using the `parsl.executors.HighThroughputExecutor` +See +`issue #3515 `_ +for further discussion. Amazon Web Services ------------------- diff --git a/docs/userguide/examples/config.py b/docs/userguide/examples/config.py index 166faaf4ac..68057d2b01 100644 --- a/docs/userguide/examples/config.py +++ b/docs/userguide/examples/config.py @@ -1,4 +1,3 @@ -from parsl.channels import LocalChannel from parsl.config import Config from parsl.executors import HighThroughputExecutor from parsl.providers import LocalProvider @@ -8,9 +7,7 @@ HighThroughputExecutor( label="htex_local", cores_per_worker=1, - provider=LocalProvider( - channel=LocalChannel(), - ), + provider=LocalProvider(), ) ], ) diff --git a/docs/userguide/execution.rst b/docs/userguide/execution.rst index 4168367f9d..df17dc458f 100644 --- a/docs/userguide/execution.rst +++ b/docs/userguide/execution.rst @@ -47,8 +47,7 @@ Parsl currently supports the following providers: 7. `parsl.providers.AWSProvider`: This provider allows you to provision and manage cloud nodes from Amazon Web Services. 8. `parsl.providers.GoogleCloudProvider`: This provider allows you to provision and manage cloud nodes from Google Cloud. 9. `parsl.providers.KubernetesProvider`: This provider allows you to provision and manage containers on a Kubernetes cluster. -10. `parsl.providers.AdHocProvider`: This provider allows you manage execution over a collection of nodes to form an ad-hoc cluster. -11. `parsl.providers.LSFProvider`: This provider allows you to schedule resources via IBM's LSF scheduler. +10. `parsl.providers.LSFProvider`: This provider allows you to schedule resources via IBM's LSF scheduler. diff --git a/docs/userguide/mpi_apps.rst b/docs/userguide/mpi_apps.rst index a40c03e004..82123123b6 100644 --- a/docs/userguide/mpi_apps.rst +++ b/docs/userguide/mpi_apps.rst @@ -60,6 +60,13 @@ An example for ALCF's Polaris supercomputer that will run 3 MPI tasks of 2 nodes ) +.. warning:: + Please note that ``Provider`` options that specify per-task or per-node resources, for example, + ``SlurmProvider(cores_per_node=N, ...)`` should not be used with :class:`~parsl.executors.high_throughput.MPIExecutor`. + Parsl primarily uses a pilot job model and assumptions from that context do not translate to the MPI context. For + more info refer to : + `github issue #3006 `_ + Writing an MPI App ------------------ diff --git a/docs/userguide/plugins.rst b/docs/userguide/plugins.rst index 4ecff86cfe..c3c38dea63 100644 --- a/docs/userguide/plugins.rst +++ b/docs/userguide/plugins.rst @@ -16,8 +16,8 @@ executor to run code on the local submitting host, while another executor can run the same code on a large supercomputer. -Providers, Launchers and Channels ---------------------------------- +Providers and Launchers +----------------------- Some executors are based on blocks of workers (for example the `parsl.executors.HighThroughputExecutor`: the submit side requires a batch system (eg slurm, kubernetes) to start worker processes, which then @@ -34,10 +34,9 @@ add on any wrappers that are needed to launch the command (eg srun inside slurm). Providers and launchers are usually paired together for a particular system type. -A `Channel` allows the commands used to interact with an `ExecutionProvider` to be -executed on a remote system. The default channel executes commands on the -local system, but a few variants of an `parsl.channels.SSHChannel` are provided. - +Parsl also has a deprecated ``Channel`` abstraction. See +`issue 3515 `_ +for further discussion. File staging ------------ diff --git a/parsl/channels/__init__.py b/parsl/channels/__init__.py index 7cd7137faf..c71649851c 100644 --- a/parsl/channels/__init__.py +++ b/parsl/channels/__init__.py @@ -3,16 +3,10 @@ if TYPE_CHECKING: from parsl.channels.base import Channel from parsl.channels.local.local import LocalChannel - from parsl.channels.oauth_ssh.oauth_ssh import OAuthSSHChannel - from parsl.channels.ssh.ssh import SSHChannel - from parsl.channels.ssh_il.ssh_il import SSHInteractiveLoginChannel lazys = { 'Channel': 'parsl.channels.base', - 'SSHChannel': 'parsl.channels.ssh.ssh', 'LocalChannel': 'parsl.channels.local.local', - 'SSHInteractiveLoginChannel': 'parsl.channels.ssh_il.ssh_il', - 'OAuthSSHChannel': 'parsl.channels.oauth_ssh.oauth_ssh', } import parsl.channels as px @@ -33,4 +27,4 @@ def lazy_loader(name): px.__getattr__ = lazy_loader # type: ignore[method-assign] -__all__ = ['Channel', 'SSHChannel', 'LocalChannel', 'SSHInteractiveLoginChannel', 'OAuthSSHChannel'] +__all__ = ['Channel', 'LocalChannel'] diff --git a/parsl/channels/oauth_ssh/oauth_ssh.py b/parsl/channels/oauth_ssh/oauth_ssh.py index c9efa27767..3173b163a8 100644 --- a/parsl/channels/oauth_ssh/oauth_ssh.py +++ b/parsl/channels/oauth_ssh/oauth_ssh.py @@ -3,7 +3,7 @@ import paramiko -from parsl.channels.ssh.ssh import SSHChannel +from parsl.channels.ssh.ssh import DeprecatedSSHChannel from parsl.errors import OptionalModuleMissing try: @@ -17,7 +17,7 @@ logger = logging.getLogger(__name__) -class OAuthSSHChannel(SSHChannel): +class DeprecatedOAuthSSHChannel(DeprecatedSSHChannel): """SSH persistent channel. This enables remote execution on sites accessible via ssh. This channel uses Globus based OAuth tokens for authentication. """ diff --git a/parsl/channels/ssh/ssh.py b/parsl/channels/ssh/ssh.py index bf33727e63..38b8afe47b 100644 --- a/parsl/channels/ssh/ssh.py +++ b/parsl/channels/ssh/ssh.py @@ -24,7 +24,7 @@ def _auth(self, username, *args): return -class SSHChannel(Channel, RepresentationMixin): +class DeprecatedSSHChannel(Channel, RepresentationMixin): ''' SSH persistent channel. This enables remote execution on sites accessible via ssh. It is assumed that the user has setup host keys so as to ssh to the remote host. Which goes to say that the following diff --git a/parsl/channels/ssh_il/ssh_il.py b/parsl/channels/ssh_il/ssh_il.py index 02e7a58cd4..3a5e0c5096 100644 --- a/parsl/channels/ssh_il/ssh_il.py +++ b/parsl/channels/ssh_il/ssh_il.py @@ -3,12 +3,12 @@ import paramiko -from parsl.channels.ssh.ssh import SSHChannel +from parsl.channels.ssh.ssh import DeprecatedSSHChannel logger = logging.getLogger(__name__) -class SSHInteractiveLoginChannel(SSHChannel): +class DeprecatedSSHInteractiveLoginChannel(DeprecatedSSHChannel): """SSH persistent channel. This enables remote execution on sites accessible via ssh. This channel supports interactive login and is appropriate when keys are not set up. diff --git a/parsl/configs/ad_hoc.py b/parsl/configs/ad_hoc.py deleted file mode 100644 index 05b0e8190d..0000000000 --- a/parsl/configs/ad_hoc.py +++ /dev/null @@ -1,38 +0,0 @@ -from typing import Any, Dict - -from parsl.channels import SSHChannel -from parsl.config import Config -from parsl.executors import HighThroughputExecutor -from parsl.providers import AdHocProvider -from parsl.usage_tracking.levels import LEVEL_1 - -user_opts: Dict[str, Dict[str, Any]] -user_opts = {'adhoc': - {'username': 'YOUR_USERNAME', - 'script_dir': 'YOUR_SCRIPT_DIR', - 'remote_hostnames': ['REMOTE_HOST_URL_1', 'REMOTE_HOST_URL_2'] - } - } - - -config = Config( - executors=[ - HighThroughputExecutor( - label='remote_htex', - max_workers_per_node=2, - worker_logdir_root=user_opts['adhoc']['script_dir'], - provider=AdHocProvider( - # Command to be run before starting a worker, such as: - # 'module load Anaconda; source activate parsl_env'. - worker_init='', - channels=[SSHChannel(hostname=m, - username=user_opts['adhoc']['username'], - script_dir=user_opts['adhoc']['script_dir'], - ) for m in user_opts['adhoc']['remote_hostnames']] - ) - ) - ], - # AdHoc Clusters should not be setup with scaling strategy. - strategy='none', - usage_tracking=LEVEL_1, -) diff --git a/parsl/dataflow/dflow.py b/parsl/dataflow/dflow.py index 8c3e029718..de1c028fbf 100644 --- a/parsl/dataflow/dflow.py +++ b/parsl/dataflow/dflow.py @@ -114,14 +114,10 @@ def __init__(self, config: Config) -> None: self.monitoring: Optional[MonitoringHub] self.monitoring = config.monitoring - # hub address and port for interchange to connect - self.hub_address = None # type: Optional[str] - self.hub_zmq_port = None # type: Optional[int] if self.monitoring: if self.monitoring.logdir is None: self.monitoring.logdir = self.run_dir - self.hub_address = self.monitoring.hub_address - self.hub_zmq_port = self.monitoring.start(self.run_id, self.run_dir, self.config.run_dir) + self.monitoring.start(self.run_dir, self.config.run_dir) self.time_began = datetime.datetime.now() self.time_completed: Optional[datetime.datetime] = None @@ -1253,10 +1249,10 @@ def add_executors(self, executors: Sequence[ParslExecutor]) -> None: for executor in executors: executor.run_id = self.run_id executor.run_dir = self.run_dir - executor.hub_address = self.hub_address - executor.hub_zmq_port = self.hub_zmq_port if self.monitoring: - executor.monitoring_radio = self.monitoring.radio + executor.hub_address = self.monitoring.hub_address + executor.hub_zmq_port = self.monitoring.hub_zmq_port + executor.submit_monitoring_radio = self.monitoring.radio if hasattr(executor, 'provider'): if hasattr(executor.provider, 'script_dir'): executor.provider.script_dir = os.path.join(self.run_dir, 'submit_scripts') @@ -1536,8 +1532,6 @@ def load_checkpoints(self, checkpointDirs: Optional[Sequence[str]]) -> Dict[str, Returns: - dict containing, hashed -> future mappings """ - self.memo_lookup_table = None - if checkpointDirs: return self._load_checkpoints(checkpointDirs) else: diff --git a/parsl/executors/base.py b/parsl/executors/base.py index b00aa55680..a112b9eb00 100644 --- a/parsl/executors/base.py +++ b/parsl/executors/base.py @@ -5,7 +5,7 @@ from typing_extensions import Literal, Self -from parsl.monitoring.radios import MonitoringRadio +from parsl.monitoring.radios import MonitoringRadioSender class ParslExecutor(metaclass=ABCMeta): @@ -52,13 +52,13 @@ def __init__( *, hub_address: Optional[str] = None, hub_zmq_port: Optional[int] = None, - monitoring_radio: Optional[MonitoringRadio] = None, + submit_monitoring_radio: Optional[MonitoringRadioSender] = None, run_dir: str = ".", run_id: Optional[str] = None, ): self.hub_address = hub_address self.hub_zmq_port = hub_zmq_port - self.monitoring_radio = monitoring_radio + self.submit_monitoring_radio = submit_monitoring_radio self.run_dir = os.path.abspath(run_dir) self.run_id = run_id @@ -147,11 +147,11 @@ def hub_zmq_port(self, value: Optional[int]) -> None: self._hub_zmq_port = value @property - def monitoring_radio(self) -> Optional[MonitoringRadio]: + def submit_monitoring_radio(self) -> Optional[MonitoringRadioSender]: """Local radio for sending monitoring messages """ - return self._monitoring_radio + return self._submit_monitoring_radio - @monitoring_radio.setter - def monitoring_radio(self, value: Optional[MonitoringRadio]) -> None: - self._monitoring_radio = value + @submit_monitoring_radio.setter + def submit_monitoring_radio(self, value: Optional[MonitoringRadioSender]) -> None: + self._submit_monitoring_radio = value diff --git a/parsl/executors/flux/executor.py b/parsl/executors/flux/executor.py index c4926abb68..f1b981f7e0 100644 --- a/parsl/executors/flux/executor.py +++ b/parsl/executors/flux/executor.py @@ -200,7 +200,6 @@ def __init__( raise EnvironmentError("Cannot find Flux installation in PATH") self.flux_path = os.path.abspath(flux_path) self._task_id_counter = itertools.count() - self._socket = zmq.Context().socket(zmq.REP) # Assumes a launch command cannot be None or empty self.launch_cmd = launch_cmd or self.DEFAULT_LAUNCH_CMD self._submission_queue: queue.Queue = queue.Queue() @@ -213,7 +212,6 @@ def __init__( args=( self._submission_queue, self._stop_event, - self._socket, self.working_dir, self.flux_executor_kwargs, self.provider, @@ -306,11 +304,13 @@ def _submit_wrapper( If an exception is thrown, error out all submitted tasks. """ - try: - _submit_flux_jobs(submission_queue, stop_event, *args, **kwargs) - except Exception as exc: - _error_out_jobs(submission_queue, stop_event, exc) - raise + with zmq.Context() as ctx: + with ctx.socket(zmq.REP) as socket: + try: + _submit_flux_jobs(submission_queue, stop_event, socket, *args, **kwargs) + except Exception as exc: + _error_out_jobs(submission_queue, stop_event, exc) + raise def _error_out_jobs( diff --git a/parsl/executors/high_throughput/executor.py b/parsl/executors/high_throughput/executor.py index ffd21a13ec..772a76e6eb 100644 --- a/parsl/executors/high_throughput/executor.py +++ b/parsl/executors/high_throughput/executor.py @@ -20,6 +20,10 @@ from parsl.executors.errors import BadMessage, ScalingFailed from parsl.executors.high_throughput import zmq_pipes from parsl.executors.high_throughput.errors import CommandClientTimeoutError +from parsl.executors.high_throughput.manager_selector import ( + ManagerSelector, + RandomManagerSelector, +) from parsl.executors.high_throughput.mpi_prefix_composer import ( VALID_LAUNCHERS, validate_resource_spec, @@ -56,7 +60,7 @@ "--mpi-launcher={mpi_launcher} " "--available-accelerators {accelerators}") -DEFAULT_INTERCHANGE_LAUNCH_CMD = "interchange.py" +DEFAULT_INTERCHANGE_LAUNCH_CMD = ["interchange.py"] GENERAL_HTEX_PARAM_DOCS = """provider : :class:`~parsl.providers.base.ExecutionProvider` Provider to access computation resources. Can be one of :class:`~parsl.providers.aws.aws.EC2Provider`, @@ -78,9 +82,9 @@ cores_per_worker, nodes_per_block, heartbeat_period ,heartbeat_threshold, logdir). For example: launch_cmd="process_worker_pool.py {debug} -c {cores_per_worker} --task_url={task_url} --result_url={result_url}" - interchange_launch_cmd : str - Custom command line string to launch the interchange process from the executor. If undefined, - the executor will use the default "interchange.py" command. + interchange_launch_cmd : Sequence[str] + Custom sequence of command line tokens to launch the interchange process from the executor. If + undefined, the executor will use the default "interchange.py" command. address : string An address to connect to the main Parsl process which is reachable from the network in which @@ -238,7 +242,7 @@ def __init__(self, label: str = 'HighThroughputExecutor', provider: ExecutionProvider = LocalProvider(), launch_cmd: Optional[str] = None, - interchange_launch_cmd: Optional[str] = None, + interchange_launch_cmd: Optional[Sequence[str]] = None, address: Optional[str] = None, worker_ports: Optional[Tuple[int, int]] = None, worker_port_range: Optional[Tuple[int, int]] = (54000, 55000), @@ -261,6 +265,7 @@ def __init__(self, worker_logdir_root: Optional[str] = None, enable_mpi_mode: bool = False, mpi_launcher: str = "mpiexec", + manager_selector: ManagerSelector = RandomManagerSelector(), block_error_handler: Union[bool, Callable[[BlockProviderExecutor, Dict[str, JobStatus]], None]] = True, encrypted: bool = False): @@ -276,6 +281,7 @@ def __init__(self, self.prefetch_capacity = prefetch_capacity self.address = address self.address_probe_timeout = address_probe_timeout + self.manager_selector = manager_selector if self.address: self.all_addresses = address else: @@ -456,8 +462,6 @@ def _result_queue_worker(self): "task_id" : "exception" : serialized exception object, on failure } - - The `None` message is a die request. """ logger.debug("Result queue worker starting") @@ -475,58 +479,53 @@ def _result_queue_worker(self): else: - if msgs is None: - logger.debug("Got None, exiting") - return + for serialized_msg in msgs: + try: + msg = pickle.loads(serialized_msg) + except pickle.UnpicklingError: + raise BadMessage("Message received could not be unpickled") - else: - for serialized_msg in msgs: + if msg['type'] == 'heartbeat': + continue + elif msg['type'] == 'result': try: - msg = pickle.loads(serialized_msg) - except pickle.UnpicklingError: - raise BadMessage("Message received could not be unpickled") + tid = msg['task_id'] + except Exception: + raise BadMessage("Message received does not contain 'task_id' field") + + if tid == -1 and 'exception' in msg: + logger.warning("Executor shutting down due to exception from interchange") + exception = deserialize(msg['exception']) + self.set_bad_state_and_fail_all(exception) + break + + task_fut = self.tasks.pop(tid) + + if 'result' in msg: + result = deserialize(msg['result']) + task_fut.set_result(result) - if msg['type'] == 'heartbeat': - continue - elif msg['type'] == 'result': + elif 'exception' in msg: try: - tid = msg['task_id'] - except Exception: - raise BadMessage("Message received does not contain 'task_id' field") - - if tid == -1 and 'exception' in msg: - logger.warning("Executor shutting down due to exception from interchange") - exception = deserialize(msg['exception']) - self.set_bad_state_and_fail_all(exception) - break - - task_fut = self.tasks.pop(tid) - - if 'result' in msg: - result = deserialize(msg['result']) - task_fut.set_result(result) - - elif 'exception' in msg: - try: - s = deserialize(msg['exception']) - # s should be a RemoteExceptionWrapper... so we can reraise it - if isinstance(s, RemoteExceptionWrapper): - try: - s.reraise() - except Exception as e: - task_fut.set_exception(e) - elif isinstance(s, Exception): - task_fut.set_exception(s) - else: - raise ValueError("Unknown exception-like type received: {}".format(type(s))) - except Exception as e: - # TODO could be a proper wrapped exception? - task_fut.set_exception( - DeserializationError("Received exception, but handling also threw an exception: {}".format(e))) - else: - raise BadMessage("Message received is neither result or exception") + s = deserialize(msg['exception']) + # s should be a RemoteExceptionWrapper... so we can reraise it + if isinstance(s, RemoteExceptionWrapper): + try: + s.reraise() + except Exception as e: + task_fut.set_exception(e) + elif isinstance(s, Exception): + task_fut.set_exception(s) + else: + raise ValueError("Unknown exception-like type received: {}".format(type(s))) + except Exception as e: + # TODO could be a proper wrapped exception? + task_fut.set_exception( + DeserializationError("Received exception, but handling also threw an exception: {}".format(e))) else: - raise BadMessage("Message received with unknown type {}".format(msg['type'])) + raise BadMessage("Message received is neither result or exception") + else: + raise BadMessage("Message received with unknown type {}".format(msg['type'])) logger.info("Result queue worker finished") @@ -551,11 +550,13 @@ def _start_local_interchange_process(self) -> None: "poll_period": self.poll_period, "logging_level": logging.DEBUG if self.worker_debug else logging.INFO, "cert_dir": self.cert_dir, + "manager_selector": self.manager_selector, + "run_id": self.run_id, } config_pickle = pickle.dumps(interchange_config) - self.interchange_proc = subprocess.Popen(self.interchange_launch_cmd.encode("utf-8"), stdin=subprocess.PIPE) + self.interchange_proc = subprocess.Popen(self.interchange_launch_cmd, stdin=subprocess.PIPE) stdin = self.interchange_proc.stdin assert stdin is not None, "Popen should have created an IO object (vs default None) because of PIPE mode" @@ -836,7 +837,7 @@ def shutdown(self, timeout: float = 10.0): try: self.interchange_proc.wait(timeout=timeout) except subprocess.TimeoutExpired: - logger.info("Unable to terminate Interchange process; sending SIGKILL") + logger.warning("Unable to terminate Interchange process; sending SIGKILL") self.interchange_proc.kill() logger.info("Closing ZMQ pipes") diff --git a/parsl/executors/high_throughput/interchange.py b/parsl/executors/high_throughput/interchange.py index c6a3895d36..b1ddcbdba9 100644 --- a/parsl/executors/high_throughput/interchange.py +++ b/parsl/executors/high_throughput/interchange.py @@ -6,7 +6,6 @@ import pickle import platform import queue -import random import signal import sys import threading @@ -19,7 +18,9 @@ from parsl.app.errors import RemoteExceptionWrapper from parsl.executors.high_throughput.errors import ManagerLost, VersionMismatch from parsl.executors.high_throughput.manager_record import ManagerRecord +from parsl.executors.high_throughput.manager_selector import ManagerSelector from parsl.monitoring.message_type import MessageType +from parsl.monitoring.radios import MonitoringRadioSender, ZMQRadioSender from parsl.process_loggers import wrap_with_logs from parsl.serialize import serialize as serialize_object from parsl.utils import setproctitle @@ -53,6 +54,8 @@ def __init__(self, logging_level: int, poll_period: int, cert_dir: Optional[str], + manager_selector: ManagerSelector, + run_id: str, ) -> None: """ Parameters @@ -123,6 +126,8 @@ def __init__(self, self.command_channel.connect("tcp://{}:{}".format(client_address, client_ports[2])) logger.info("Connected to client") + self.run_id = run_id + self.hub_address = hub_address self.hub_zmq_port = hub_zmq_port @@ -160,6 +165,8 @@ def __init__(self, self.heartbeat_threshold = heartbeat_threshold + self.manager_selector = manager_selector + self.current_platform = {'parsl_v': PARSL_VERSION, 'python_v': "{}.{}.{}".format(sys.version_info.major, sys.version_info.minor, @@ -216,27 +223,16 @@ def task_puller(self) -> NoReturn: task_counter += 1 logger.debug(f"Fetched {task_counter} tasks so far") - def _create_monitoring_channel(self) -> Optional[zmq.Socket]: - if self.hub_address and self.hub_zmq_port: - logger.info("Connecting to MonitoringHub") - # This is a one-off because monitoring is unencrypted - hub_channel = zmq.Context().socket(zmq.DEALER) - hub_channel.set_hwm(0) - hub_channel.connect("tcp://{}:{}".format(self.hub_address, self.hub_zmq_port)) - logger.info("Connected to MonitoringHub") - return hub_channel - else: - return None - - def _send_monitoring_info(self, hub_channel: Optional[zmq.Socket], manager: ManagerRecord) -> None: - if hub_channel: + def _send_monitoring_info(self, monitoring_radio: Optional[MonitoringRadioSender], manager: ManagerRecord) -> None: + if monitoring_radio: logger.info("Sending message {} to MonitoringHub".format(manager)) d: Dict = cast(Dict, manager.copy()) d['timestamp'] = datetime.datetime.now() d['last_heartbeat'] = datetime.datetime.fromtimestamp(d['last_heartbeat']) + d['run_id'] = self.run_id - hub_channel.send_pyobj((MessageType.NODE_INFO, d)) + monitoring_radio.send((MessageType.NODE_INFO, d)) @wrap_with_logs(target="interchange") def _command_server(self) -> NoReturn: @@ -244,8 +240,11 @@ def _command_server(self) -> NoReturn: """ logger.debug("Command Server Starting") - # Need to create a new ZMQ socket for command server thread - hub_channel = self._create_monitoring_channel() + if self.hub_address is not None and self.hub_zmq_port is not None: + logger.debug("Creating monitoring radio to %s:%s", self.hub_address, self.hub_zmq_port) + monitoring_radio = ZMQRadioSender(self.hub_address, self.hub_zmq_port) + else: + monitoring_radio = None reply: Any # the type of reply depends on the command_req received (aka this needs dependent types...) @@ -296,7 +295,7 @@ def _command_server(self) -> NoReturn: if manager_id in self._ready_managers: m = self._ready_managers[manager_id] m['active'] = False - self._send_monitoring_info(hub_channel, m) + self._send_monitoring_info(monitoring_radio, m) else: logger.warning("Worker to hold was not in ready managers list") @@ -331,9 +330,14 @@ def start(self) -> None: # parent-process-inheritance problems. signal.signal(signal.SIGTERM, signal.SIG_DFL) - logger.info("Incoming ports bound") + logger.info("Starting main interchange method") - hub_channel = self._create_monitoring_channel() + if self.hub_address is not None and self.hub_zmq_port is not None: + logger.debug("Creating monitoring radio to %s:%s", self.hub_address, self.hub_zmq_port) + monitoring_radio = ZMQRadioSender(self.hub_address, self.hub_zmq_port) + logger.debug("Created monitoring radio") + else: + monitoring_radio = None poll_period = self.poll_period @@ -364,10 +368,10 @@ def start(self) -> None: while not kill_event.is_set(): self.socks = dict(poller.poll(timeout=poll_period)) - self.process_task_outgoing_incoming(interesting_managers, hub_channel, kill_event) - self.process_results_incoming(interesting_managers, hub_channel) - self.expire_bad_managers(interesting_managers, hub_channel) - self.expire_drained_managers(interesting_managers, hub_channel) + self.process_task_outgoing_incoming(interesting_managers, monitoring_radio, kill_event) + self.process_results_incoming(interesting_managers, monitoring_radio) + self.expire_bad_managers(interesting_managers, monitoring_radio) + self.expire_drained_managers(interesting_managers, monitoring_radio) self.process_tasks_to_send(interesting_managers) self.zmq_context.destroy() @@ -378,7 +382,7 @@ def start(self) -> None: def process_task_outgoing_incoming( self, interesting_managers: Set[bytes], - hub_channel: Optional[zmq.Socket], + monitoring_radio: Optional[MonitoringRadioSender], kill_event: threading.Event ) -> None: """Process one message from manager on the task_outgoing channel. @@ -411,6 +415,7 @@ def process_task_outgoing_incoming( self._ready_managers[manager_id] = {'last_heartbeat': time.time(), 'idle_since': time.time(), 'block_id': None, + 'start_time': msg['start_time'], 'max_capacity': 0, 'worker_count': 0, 'active': True, @@ -431,7 +436,7 @@ def process_task_outgoing_incoming( m.update(msg) # type: ignore[typeddict-item] logger.info("Registration info for manager {!r}: {}".format(manager_id, msg)) - self._send_monitoring_info(hub_channel, m) + self._send_monitoring_info(monitoring_radio, m) if (msg['python_v'].rsplit(".", 1)[0] != self.current_platform['python_v'].rsplit(".", 1)[0] or msg['parsl_v'] != self.current_platform['parsl_v']): @@ -462,7 +467,7 @@ def process_task_outgoing_incoming( logger.error(f"Unexpected message type received from manager: {msg['type']}") logger.debug("leaving task_outgoing section") - def expire_drained_managers(self, interesting_managers: Set[bytes], hub_channel: Optional[zmq.Socket]) -> None: + def expire_drained_managers(self, interesting_managers: Set[bytes], monitoring_radio: Optional[MonitoringRadioSender]) -> None: for manager_id in list(interesting_managers): # is it always true that a draining manager will be in interesting managers? @@ -475,7 +480,7 @@ def expire_drained_managers(self, interesting_managers: Set[bytes], hub_channel: self._ready_managers.pop(manager_id) m['active'] = False - self._send_monitoring_info(hub_channel, m) + self._send_monitoring_info(monitoring_radio, m) def process_tasks_to_send(self, interesting_managers: Set[bytes]) -> None: # Check if there are tasks that could be sent to managers @@ -485,8 +490,7 @@ def process_tasks_to_send(self, interesting_managers: Set[bytes]) -> None: interesting=len(interesting_managers))) if interesting_managers and not self.pending_task_queue.empty(): - shuffled_managers = list(interesting_managers) - random.shuffle(shuffled_managers) + shuffled_managers = self.manager_selector.sort_managers(self._ready_managers, interesting_managers) while shuffled_managers and not self.pending_task_queue.empty(): # cf. the if statement above... manager_id = shuffled_managers.pop() @@ -519,7 +523,7 @@ def process_tasks_to_send(self, interesting_managers: Set[bytes]) -> None: else: logger.debug("either no interesting managers or no tasks, so skipping manager pass") - def process_results_incoming(self, interesting_managers: Set[bytes], hub_channel: Optional[zmq.Socket]) -> None: + def process_results_incoming(self, interesting_managers: Set[bytes], monitoring_radio: Optional[MonitoringRadioSender]) -> None: # Receive any results and forward to client if self.results_incoming in self.socks and self.socks[self.results_incoming] == zmq.POLLIN: logger.debug("entering results_incoming section") @@ -539,11 +543,11 @@ def process_results_incoming(self, interesting_managers: Set[bytes], hub_channel elif r['type'] == 'monitoring': # the monitoring code makes the assumption that no # monitoring messages will be received if monitoring - # is not configured, and that hub_channel will only + # is not configured, and that monitoring_radio will only # be None when monitoring is not configurated. - assert hub_channel is not None + assert monitoring_radio is not None - hub_channel.send_pyobj(r['payload']) + monitoring_radio.send(r['payload']) elif r['type'] == 'heartbeat': logger.debug(f"Manager {manager_id!r} sent heartbeat via results connection") b_messages.append((p_message, r)) @@ -587,7 +591,7 @@ def process_results_incoming(self, interesting_managers: Set[bytes], hub_channel interesting_managers.add(manager_id) logger.debug("leaving results_incoming section") - def expire_bad_managers(self, interesting_managers: Set[bytes], hub_channel: Optional[zmq.Socket]) -> None: + def expire_bad_managers(self, interesting_managers: Set[bytes], monitoring_radio: Optional[MonitoringRadioSender]) -> None: bad_managers = [(manager_id, m) for (manager_id, m) in self._ready_managers.items() if time.time() - m['last_heartbeat'] > self.heartbeat_threshold] for (manager_id, m) in bad_managers: @@ -595,7 +599,7 @@ def expire_bad_managers(self, interesting_managers: Set[bytes], hub_channel: Opt logger.warning(f"Too many heartbeats missed for manager {manager_id!r} - removing manager") if m['active']: m['active'] = False - self._send_monitoring_info(hub_channel, m) + self._send_monitoring_info(monitoring_radio, m) logger.warning(f"Cancelling htex tasks {m['tasks']} on removed manager") for tid in m['tasks']: diff --git a/parsl/executors/high_throughput/manager_record.py b/parsl/executors/high_throughput/manager_record.py index 7e58b53954..a48c18cbd9 100644 --- a/parsl/executors/high_throughput/manager_record.py +++ b/parsl/executors/high_throughput/manager_record.py @@ -6,6 +6,7 @@ class ManagerRecord(TypedDict, total=False): block_id: Optional[str] + start_time: float tasks: List[Any] worker_count: int max_capacity: int diff --git a/parsl/executors/high_throughput/manager_selector.py b/parsl/executors/high_throughput/manager_selector.py new file mode 100644 index 0000000000..0ede28ee7d --- /dev/null +++ b/parsl/executors/high_throughput/manager_selector.py @@ -0,0 +1,25 @@ +import random +from abc import ABCMeta, abstractmethod +from typing import Dict, List, Set + +from parsl.executors.high_throughput.manager_record import ManagerRecord + + +class ManagerSelector(metaclass=ABCMeta): + + @abstractmethod + def sort_managers(self, ready_managers: Dict[bytes, ManagerRecord], manager_list: Set[bytes]) -> List[bytes]: + """ Sort a given list of managers. + + Any operations pertaining to the sorting and rearrangement of the + interesting_managers Set should be performed here. + """ + pass + + +class RandomManagerSelector(ManagerSelector): + + def sort_managers(self, ready_managers: Dict[bytes, ManagerRecord], manager_list: Set[bytes]) -> List[bytes]: + c_manager_list = list(manager_list) + random.shuffle(c_manager_list) + return c_manager_list diff --git a/parsl/executors/high_throughput/process_worker_pool.py b/parsl/executors/high_throughput/process_worker_pool.py index 5c766123d7..59efe501f1 100755 --- a/parsl/executors/high_throughput/process_worker_pool.py +++ b/parsl/executors/high_throughput/process_worker_pool.py @@ -184,6 +184,7 @@ def __init__(self, *, self.uid = uid self.block_id = block_id + self.start_time = time.time() self.enable_mpi_mode = enable_mpi_mode self.mpi_launcher = mpi_launcher @@ -263,6 +264,7 @@ def create_reg_message(self): 'worker_count': self.worker_count, 'uid': self.uid, 'block_id': self.block_id, + 'start_time': self.start_time, 'prefetch_capacity': self.prefetch_capacity, 'max_capacity': self.worker_count + self.prefetch_capacity, 'os': platform.system(), diff --git a/parsl/executors/status_handling.py b/parsl/executors/status_handling.py index 7956992f2e..34db2300f6 100644 --- a/parsl/executors/status_handling.py +++ b/parsl/executors/status_handling.py @@ -12,7 +12,7 @@ from parsl.executors.base import ParslExecutor from parsl.executors.errors import BadStateException, ScalingFailed from parsl.jobs.error_handlers import noop_error_handler, simple_error_handler -from parsl.jobs.states import JobState, JobStatus +from parsl.jobs.states import TERMINAL_STATES, JobState, JobStatus from parsl.monitoring.message_type import MessageType from parsl.providers.base import ExecutionProvider from parsl.utils import AtomicIDCounter @@ -59,20 +59,28 @@ def __init__(self, *, else: self.block_error_handler = block_error_handler - # errors can happen during the submit call to the provider; this is used - # to keep track of such errors so that they can be handled in one place - # together with errors reported by status() - self._simulated_status: Dict[str, JobStatus] = {} self._executor_bad_state = threading.Event() self._executor_exception: Optional[Exception] = None self._block_id_counter = AtomicIDCounter() self._tasks = {} # type: Dict[object, Future] + + self._last_poll_time = 0.0 + + # these four structures track, in loosely coordinated fashion, the + # existence of blocks and jobs and how to map between their + # identifiers. self.blocks_to_job_id = {} # type: Dict[str, str] self.job_ids_to_block = {} # type: Dict[str, str] - self._last_poll_time = 0.0 + # errors can happen during the submit call to the provider; this is used + # to keep track of such errors so that they can be handled in one place + # together with errors reported by status() + self._simulated_status: Dict[str, JobStatus] = {} + + # this stores an approximation (sometimes delayed) of the latest status + # of pending, active and recently terminated blocks self._status = {} # type: Dict[str, JobStatus] def _make_status_dict(self, block_ids: List[str], status_list: List[JobStatus]) -> Dict[str, JobStatus]: @@ -159,41 +167,82 @@ def tasks(self) -> Dict[object, Future]: def provider(self): return self._provider - def _filter_scale_in_ids(self, to_kill, killed): + def _filter_scale_in_ids(self, to_kill: Sequence[Any], killed: Sequence[bool]) -> Sequence[Any]: """ Filter out job id's that were not killed """ assert len(to_kill) == len(killed) + + if False in killed: + killed_job_ids = [jid for jid, k in zip(to_kill, killed) if k] + not_killed_job_ids = [jid for jid, k in zip(to_kill, killed) if not k] + logger.warning("Some jobs were not killed successfully: " + f"killed jobs: {killed_job_ids}, " + f"not-killed jobs: {not_killed_job_ids}") + # Filters first iterable by bool values in second return list(compress(to_kill, killed)) - def scale_out(self, blocks: int = 1) -> List[str]: + def scale_out_facade(self, n: int) -> List[str]: """Scales out the number of blocks by "blocks" """ if not self.provider: raise ScalingFailed(self, "No execution provider available") block_ids = [] - logger.info(f"Scaling out by {blocks} blocks") - for _ in range(blocks): + monitoring_status_changes = {} + logger.info(f"Scaling out by {n} blocks") + for _ in range(n): block_id = str(self._block_id_counter.get_id()) logger.info(f"Allocated block ID {block_id}") try: job_id = self._launch_block(block_id) + + pending_status = JobStatus(JobState.PENDING) + self.blocks_to_job_id[block_id] = job_id self.job_ids_to_block[job_id] = block_id + self._status[block_id] = pending_status + + monitoring_status_changes[block_id] = pending_status block_ids.append(block_id) + except Exception as ex: - self._simulated_status[block_id] = JobStatus(JobState.FAILED, "Failed to start block {}: {}".format(block_id, ex)) + failed_status = JobStatus(JobState.FAILED, "Failed to start block {}: {}".format(block_id, ex)) + self._simulated_status[block_id] = failed_status + self._status[block_id] = failed_status + + self.send_monitoring_info(monitoring_status_changes) return block_ids - @abstractmethod def scale_in(self, blocks: int) -> List[str]: """Scale in method. Cause the executor to reduce the number of blocks by count. + The default implementation will kill blocks without regard to their + status or whether they are executing tasks. Executors with more + nuanced scaling strategies might overload this method to work with + that strategy - see the HighThroughputExecutor for an example of that. + :return: A list of block ids corresponding to the blocks that were removed. """ - pass + + active_blocks = [block_id for block_id, status in self._status.items() + if status.state not in TERMINAL_STATES] + + block_ids_to_kill = active_blocks[:blocks] + + job_ids_to_kill = [self.blocks_to_job_id[block] for block in block_ids_to_kill] + + # Cancel the blocks provisioned + if self.provider: + logger.info(f"Scaling in jobs: {job_ids_to_kill}") + r = self.provider.cancel(job_ids_to_kill) + job_ids = self._filter_scale_in_ids(job_ids_to_kill, r) + block_ids_killed = [self.job_ids_to_block[job_id] for job_id in job_ids] + return block_ids_killed + else: + logger.error("No execution provider available to scale in") + return [] def _launch_block(self, block_id: str) -> Any: launch_cmd = self._get_launch_command(block_id) @@ -227,10 +276,10 @@ def workers_per_node(self) -> Union[int, float]: def send_monitoring_info(self, status: Dict) -> None: # Send monitoring info for HTEX when monitoring enabled - if self.monitoring_radio: + if self.submit_monitoring_radio: msg = self.create_monitoring_info(status) - logger.debug("Sending message {} to hub from job status poller".format(msg)) - self.monitoring_radio.send((MessageType.BLOCK_INFO, msg)) + logger.debug("Sending block monitoring message: %r", msg) + self.submit_monitoring_radio.send((MessageType.BLOCK_INFO, msg)) def create_monitoring_info(self, status: Dict[str, JobStatus]) -> Sequence[object]: """Create a monitoring message for each block based on the poll status. @@ -302,13 +351,3 @@ def scale_in_facade(self, n: int, max_idletime: Optional[float] = None) -> List[ del self._status[block_id] self.send_monitoring_info(new_status) return block_ids - - def scale_out_facade(self, n: int) -> List[str]: - block_ids = self.scale_out(n) - if block_ids is not None: - new_status = {} - for block_id in block_ids: - new_status[block_id] = JobStatus(JobState.PENDING) - self.send_monitoring_info(new_status) - self._status.update(new_status) - return block_ids diff --git a/parsl/executors/taskvine/executor.py b/parsl/executors/taskvine/executor.py index 6cfedf92bb..2e1efb211f 100644 --- a/parsl/executors/taskvine/executor.py +++ b/parsl/executors/taskvine/executor.py @@ -573,24 +573,6 @@ def outstanding(self) -> int: def workers_per_node(self) -> Union[int, float]: return 1 - def scale_in(self, count: int) -> List[str]: - """Scale in method. Cancel a given number of blocks - """ - # Obtain list of blocks to kill - to_kill = list(self.blocks_to_job_id.keys())[:count] - kill_ids = [self.blocks_to_job_id[block] for block in to_kill] - - # Cancel the blocks provisioned - if self.provider: - logger.info(f"Scaling in jobs: {kill_ids}") - r = self.provider.cancel(kill_ids) - job_ids = self._filter_scale_in_ids(kill_ids, r) - block_ids_killed = [self.job_ids_to_block[jid] for jid in job_ids] - return block_ids_killed - else: - logger.error("No execution provider available to scale") - return [] - def shutdown(self, *args, **kwargs): """Shutdown the executor. Sets flag to cancel the submit process and collector thread, which shuts down the TaskVine system submission. @@ -607,11 +589,13 @@ def shutdown(self, *args, **kwargs): # Join all processes before exiting logger.debug("Joining on submit process") self._submit_process.join() + self._submit_process.close() logger.debug("Joining on collector thread") self._collector_thread.join() if self.worker_launch_method == 'factory': logger.debug("Joining on factory process") self._factory_process.join() + self._factory_process.close() # Shutdown multiprocessing queues self._ready_task_queue.close() diff --git a/parsl/executors/workqueue/executor.py b/parsl/executors/workqueue/executor.py index 29f13494ae..198dab0236 100644 --- a/parsl/executors/workqueue/executor.py +++ b/parsl/executors/workqueue/executor.py @@ -731,24 +731,6 @@ def outstanding(self) -> int: def workers_per_node(self) -> Union[int, float]: return self.scaling_cores_per_worker - def scale_in(self, count: int) -> List[str]: - """Scale in method. - """ - # Obtain list of blocks to kill - to_kill = list(self.blocks_to_job_id.keys())[:count] - kill_ids = [self.blocks_to_job_id[block] for block in to_kill] - - # Cancel the blocks provisioned - if self.provider: - logger.info(f"Scaling in jobs: {kill_ids}") - r = self.provider.cancel(kill_ids) - job_ids = self._filter_scale_in_ids(kill_ids, r) - block_ids_killed = [self.job_ids_to_block[jid] for jid in job_ids] - return block_ids_killed - else: - logger.error("No execution provider available to scale in") - return [] - def shutdown(self, *args, **kwargs): """Shutdown the executor. Sets flag to cancel the submit process and collector thread, which shuts down the Work Queue system submission. @@ -764,6 +746,8 @@ def shutdown(self, *args, **kwargs): logger.debug("Joining on submit process") self.submit_process.join() + self.submit_process.close() + logger.debug("Joining on collector thread") self.collector_thread.join() diff --git a/parsl/monitoring/db_manager.py b/parsl/monitoring/db_manager.py index 90536253b6..ab8b88296f 100644 --- a/parsl/monitoring/db_manager.py +++ b/parsl/monitoring/db_manager.py @@ -283,7 +283,7 @@ def __init__(self, ): self.workflow_end = False - self.workflow_start_message = None # type: Optional[MonitoringMessage] + self.workflow_start_message: Optional[MonitoringMessage] = None self.logdir = logdir os.makedirs(self.logdir, exist_ok=True) @@ -299,10 +299,10 @@ def __init__(self, self.batching_interval = batching_interval self.batching_threshold = batching_threshold - self.pending_priority_queue = queue.Queue() # type: queue.Queue[TaggedMonitoringMessage] - self.pending_node_queue = queue.Queue() # type: queue.Queue[MonitoringMessage] - self.pending_block_queue = queue.Queue() # type: queue.Queue[MonitoringMessage] - self.pending_resource_queue = queue.Queue() # type: queue.Queue[MonitoringMessage] + self.pending_priority_queue: queue.Queue[TaggedMonitoringMessage] = queue.Queue() + self.pending_node_queue: queue.Queue[MonitoringMessage] = queue.Queue() + self.pending_block_queue: queue.Queue[MonitoringMessage] = queue.Queue() + self.pending_resource_queue: queue.Queue[MonitoringMessage] = queue.Queue() def start(self, priority_queue: "queue.Queue[TaggedMonitoringMessage]", @@ -351,18 +351,18 @@ def start(self, If that happens, the message will be added to deferred_resource_messages and processed later. """ - inserted_tasks = set() # type: Set[object] + inserted_tasks: Set[object] = set() """ like inserted_tasks but for task,try tuples """ - inserted_tries = set() # type: Set[Any] + inserted_tries: Set[Any] = set() # for any task ID, we can defer exactly one message, which is the # assumed-to-be-unique first message (with first message flag set). # The code prior to this patch will discard previous message in # the case of multiple messages to defer. - deferred_resource_messages = {} # type: MonitoringMessage + deferred_resource_messages: MonitoringMessage = {} exception_happened = False @@ -505,7 +505,7 @@ def start(self, "Got {} messages from block queue".format(len(block_info_messages))) # block_info_messages is possibly a nested list of dict (at different polling times) # Each dict refers to the info of a job/block at one polling time - block_messages_to_insert = [] # type: List[Any] + block_messages_to_insert: List[Any] = [] for block_msg in block_info_messages: block_messages_to_insert.extend(block_msg) self._insert(table=BLOCK, messages=block_messages_to_insert) @@ -687,7 +687,7 @@ def _insert(self, table: str, messages: List[MonitoringMessage]) -> None: logger.exception("Rollback failed") def _get_messages_in_batch(self, msg_queue: "queue.Queue[X]") -> List[X]: - messages = [] # type: List[X] + messages: List[X] = [] start = time.time() while True: if time.time() - start >= self.batching_interval or len(messages) >= self.batching_threshold: diff --git a/parsl/monitoring/errors.py b/parsl/monitoring/errors.py new file mode 100644 index 0000000000..f41225ff44 --- /dev/null +++ b/parsl/monitoring/errors.py @@ -0,0 +1,6 @@ +from parsl.errors import ParslError + + +class MonitoringHubStartError(ParslError): + def __str__(self) -> str: + return "Hub failed to start" diff --git a/parsl/monitoring/monitoring.py b/parsl/monitoring/monitoring.py index 852e2bb8bc..69e4fb5966 100644 --- a/parsl/monitoring/monitoring.py +++ b/parsl/monitoring/monitoring.py @@ -12,8 +12,9 @@ import typeguard from parsl.log_utils import set_file_logger +from parsl.monitoring.errors import MonitoringHubStartError from parsl.monitoring.message_type import MessageType -from parsl.monitoring.radios import MultiprocessingQueueRadio +from parsl.monitoring.radios import MultiprocessingQueueRadioSender from parsl.monitoring.router import router_starter from parsl.monitoring.types import AddressedMonitoringMessage from parsl.multiprocessing import ForkProcess, SizedQueue @@ -105,7 +106,7 @@ def __init__(self, self.resource_monitoring_enabled = resource_monitoring_enabled self.resource_monitoring_interval = resource_monitoring_interval - def start(self, run_id: str, dfk_run_dir: str, config_run_dir: Union[str, os.PathLike]) -> int: + def start(self, dfk_run_dir: str, config_run_dir: Union[str, os.PathLike]) -> None: logger.debug("Starting MonitoringHub") @@ -160,7 +161,6 @@ def start(self, run_id: str, dfk_run_dir: str, config_run_dir: Union[str, os.Pat "zmq_port_range": self.hub_port_range, "logdir": self.logdir, "logging_level": logging.DEBUG if self.monitoring_debug else logging.INFO, - "run_id": run_id }, name="Monitoring-Router-Process", daemon=True, @@ -187,7 +187,7 @@ def start(self, run_id: str, dfk_run_dir: str, config_run_dir: Union[str, os.Pat self.filesystem_proc.start() logger.info(f"Started filesystem radio receiver process {self.filesystem_proc.pid}") - self.radio = MultiprocessingQueueRadio(self.block_msgs) + self.radio = MultiprocessingQueueRadioSender(self.block_msgs) try: comm_q_result = comm_q.get(block=True, timeout=120) @@ -195,7 +195,7 @@ def start(self, run_id: str, dfk_run_dir: str, config_run_dir: Union[str, os.Pat comm_q.join_thread() except queue.Empty: logger.error("Hub has not completed initialization in 120s. Aborting") - raise Exception("Hub failed to start") + raise MonitoringHubStartError() if isinstance(comm_q_result, str): logger.error(f"MonitoringRouter sent an error message: {comm_q_result}") @@ -207,7 +207,7 @@ def start(self, run_id: str, dfk_run_dir: str, config_run_dir: Union[str, os.Pat logger.info("Monitoring Hub initialized") - return zmq_port + self.hub_zmq_port = zmq_port # TODO: tighten the Any message format def send(self, mtype: MessageType, message: Any) -> None: diff --git a/parsl/monitoring/node_reporter.py b/parsl/monitoring/node_reporter.py index a867fa31de..4a09416266 100755 --- a/parsl/monitoring/node_reporter.py +++ b/parsl/monitoring/node_reporter.py @@ -21,7 +21,7 @@ import psutil from parsl.log_utils import set_stream_logger -from parsl.monitoring.radios import FilesystemRadio +from parsl.monitoring.radios import FilesystemRadioSender logger = logging.getLogger("parsl.monitoring.node_reporter") @@ -83,8 +83,8 @@ def send_msg(*, active, uid, radio): run_dir = "/home/benc/parsl/src/parsl/runinfo/000/" # TODO at least get the real version of this value, no matter how badly - radio = FilesystemRadio(monitoring_url="", # TODO: monitoring_hub_url and source_id real values? - source_id=0, run_dir=run_dir) + radio = FilesystemRadioSender(monitoring_url="", # TODO: monitoring_hub_url and source_id real values? + source_id=0, run_dir=run_dir) uid = str(uuid.uuid4()) diff --git a/parsl/monitoring/radios.py b/parsl/monitoring/radios.py index e781aba025..df5b8c94be 100644 --- a/parsl/monitoring/radios.py +++ b/parsl/monitoring/radios.py @@ -7,6 +7,8 @@ from multiprocessing.queues import Queue from typing import Optional +import zmq + from parsl.serialize import serialize _db_manager_excepts: Optional[Exception] @@ -32,14 +34,14 @@ result_radio_queue = [] -class MonitoringRadio(metaclass=ABCMeta): +class MonitoringRadioSender(metaclass=ABCMeta): @abstractmethod def send(self, message: object) -> None: pass -class FilesystemRadio(MonitoringRadio): - """A MonitoringRadio that sends messages over a shared filesystem. +class FilesystemRadioSender(MonitoringRadioSender): + """A MonitoringRadioSender that sends messages over a shared filesystem. The messsage directory structure is based on maildir, https://en.wikipedia.org/wiki/Maildir @@ -53,7 +55,7 @@ class FilesystemRadio(MonitoringRadio): This avoids a race condition of reading partially written messages. This radio is likely to give higher shared filesystem load compared to - the UDPRadio, but should be much more reliable. + the UDP radio, but should be much more reliable. """ def __init__(self, *, monitoring_url: str, source_id: int, timeout: int = 10, run_dir: str): @@ -83,7 +85,7 @@ def send(self, message: object) -> None: os.rename(tmp_filename, new_filename) -class HTEXRadio(MonitoringRadio): +class HTEXRadioSender(MonitoringRadioSender): def __init__(self, monitoring_url: str, source_id: int, timeout: int = 10): """ @@ -137,7 +139,7 @@ def send(self, message: object) -> None: return -class ResultsRadio(MonitoringRadio): +class ResultsRadioSender(MonitoringRadioSender): def __init__(self, monitoring_url: str, source_id: int, timeout: int = 10): pass @@ -147,7 +149,7 @@ def send(self, message: object) -> None: # raise RuntimeError(f"BENC: appended {message} to {result_radio_queue}") -class UDPRadio(MonitoringRadio): +class UDPRadioSender(MonitoringRadioSender): def __init__(self, monitoring_url: str, source_id: int, timeout: int = 10): """ @@ -201,7 +203,7 @@ def send(self, message: object) -> None: return -class MultiprocessingQueueRadio(MonitoringRadio): +class MultiprocessingQueueRadioSender(MonitoringRadioSender): """A monitoring radio which connects over a multiprocessing Queue. This radio is intended to be used on the submit side, where components in the submit process, or processes launched by multiprocessing, will have @@ -213,3 +215,17 @@ def __init__(self, queue: Queue) -> None: def send(self, message: object) -> None: self.queue.put((message, 0)) + + +class ZMQRadioSender(MonitoringRadioSender): + """A monitoring radio which connects over ZMQ. This radio is not + thread-safe, because its use of ZMQ is not thread-safe. + """ + + def __init__(self, hub_address: str, hub_zmq_port: int) -> None: + self._hub_channel = zmq.Context().socket(zmq.DEALER) + self._hub_channel.set_hwm(0) + self._hub_channel.connect(f"tcp://{hub_address}:{hub_zmq_port}") + + def send(self, message: object) -> None: + self._hub_channel.send_pyobj(message) diff --git a/parsl/monitoring/remote.py b/parsl/monitoring/remote.py index c72302a8c3..18ff3568cb 100644 --- a/parsl/monitoring/remote.py +++ b/parsl/monitoring/remote.py @@ -9,11 +9,11 @@ from parsl.monitoring.message_type import MessageType from parsl.monitoring.radios import ( - FilesystemRadio, - HTEXRadio, - MonitoringRadio, - ResultsRadio, - UDPRadio, + FilesystemRadioSender, + HTEXRadioSender, + MonitoringRadioSender, + ResultsRadioSender, + UDPRadioSender, ) from parsl.multiprocessing import ForkProcess from parsl.process_loggers import wrap_with_logs @@ -150,20 +150,20 @@ def wrapped(*args: List[Any], **kwargs: Dict[str, Any]) -> Any: return (wrapped, args, new_kwargs) -def get_radio(radio_mode: str, monitoring_hub_url: str, task_id: int, run_dir: str) -> MonitoringRadio: - radio: MonitoringRadio +def get_radio(radio_mode: str, monitoring_hub_url: str, task_id: int, run_dir: str) -> MonitoringRadioSender: + radio: MonitoringRadioSender if radio_mode == "udp": - radio = UDPRadio(monitoring_hub_url, - source_id=task_id) + radio = UDPRadioSender(monitoring_hub_url, + source_id=task_id) elif radio_mode == "htex": - radio = HTEXRadio(monitoring_hub_url, - source_id=task_id) + radio = HTEXRadioSender(monitoring_hub_url, + source_id=task_id) elif radio_mode == "filesystem": - radio = FilesystemRadio(monitoring_url=monitoring_hub_url, - source_id=task_id, run_dir=run_dir) + radio = FilesystemRadioSender(monitoring_url=monitoring_hub_url, + source_id=task_id, run_dir=run_dir) elif radio_mode == "results": - radio = ResultsRadio(monitoring_url=monitoring_hub_url, - source_id=task_id) + radio = ResultsRadioSender(monitoring_url=monitoring_hub_url, + source_id=task_id) else: raise RuntimeError(f"Unknown radio mode: {radio_mode}") return radio @@ -253,10 +253,10 @@ def monitor(pid: int, pm = psutil.Process(pid) - children_user_time = {} # type: Dict[int, float] - children_system_time = {} # type: Dict[int, float] - children_num_ctx_switches_voluntary = {} # type: Dict[int, float] - children_num_ctx_switches_involuntary = {} # type: Dict[int, float] + children_user_time: Dict[int, float] = {} + children_system_time: Dict[int, float] = {} + children_num_ctx_switches_voluntary: Dict[int, float] = {} + children_num_ctx_switches_involuntary: Dict[int, float] = {} def accumulate_and_prepare() -> Dict[str, Any]: d = {"psutil_process_" + str(k): v for k, v in pm.as_dict().items() if k in simple} diff --git a/parsl/monitoring/router.py b/parsl/monitoring/router.py index 70b4862295..4be454b797 100644 --- a/parsl/monitoring/router.py +++ b/parsl/monitoring/router.py @@ -5,6 +5,7 @@ import pickle import queue import socket +import threading import time from multiprocessing.synchronize import Event from typing import Optional, Tuple, Union @@ -30,9 +31,13 @@ def __init__(self, monitoring_hub_address: str = "127.0.0.1", logdir: str = ".", - run_id: str, logging_level: int = logging.INFO, - atexit_timeout: int = 3 # in seconds + atexit_timeout: int = 3, # in seconds + priority_msgs: "queue.Queue[AddressedMonitoringMessage]", + node_msgs: "queue.Queue[AddressedMonitoringMessage]", + block_msgs: "queue.Queue[AddressedMonitoringMessage]", + resource_msgs: "queue.Queue[AddressedMonitoringMessage]", + exit_event: Event, ): """ Initializes a monitoring configuration class. @@ -51,7 +56,11 @@ def __init__(self, Logging level as defined in the logging module. Default: logging.INFO atexit_timeout : float, optional The amount of time in seconds to terminate the hub without receiving any messages, after the last dfk workflow message is received. + *_msgs : Queue + Four multiprocessing queues to receive messages, routed by type tag, and sometimes modified according to type tag. + exit_event : Event + An event that the main Parsl process will set to signal that the monitoring router should shut down. """ os.makedirs(logdir, exist_ok=True) self.logger = set_file_logger("{}/monitoring_router.log".format(logdir), @@ -61,7 +70,6 @@ def __init__(self, self.hub_address = hub_address self.atexit_timeout = atexit_timeout - self.run_id = run_id self.loop_freq = 10.0 # milliseconds @@ -93,22 +101,60 @@ def __init__(self, min_port=zmq_port_range[0], max_port=zmq_port_range[1]) - def start(self, - priority_msgs: "queue.Queue[AddressedMonitoringMessage]", - node_msgs: "queue.Queue[AddressedMonitoringMessage]", - block_msgs: "queue.Queue[AddressedMonitoringMessage]", - resource_msgs: "queue.Queue[AddressedMonitoringMessage]", - exit_event: Event) -> None: + self.priority_msgs = priority_msgs + self.node_msgs = node_msgs + self.block_msgs = block_msgs + self.resource_msgs = resource_msgs + self.exit_event = exit_event + + @wrap_with_logs(target="monitoring_router") + def start(self) -> None: + self.logger.info("Starting UDP listener thread") + udp_radio_receiver_thread = threading.Thread(target=self.start_udp_listener, daemon=True) + udp_radio_receiver_thread.start() + + self.logger.info("Starting ZMQ listener thread") + zmq_radio_receiver_thread = threading.Thread(target=self.start_zmq_listener, daemon=True) + zmq_radio_receiver_thread.start() + + self.logger.info("Joining on ZMQ listener thread") + zmq_radio_receiver_thread.join() + self.logger.info("Joining on UDP listener thread") + udp_radio_receiver_thread.join() + self.logger.info("Joined on both ZMQ and UDP listener threads") + + @wrap_with_logs(target="monitoring_router") + def start_udp_listener(self) -> None: try: - while not exit_event.is_set(): + while not self.exit_event.is_set(): try: data, addr = self.udp_sock.recvfrom(2048) resource_msg = pickle.loads(data) self.logger.debug("Got UDP Message from {}: {}".format(addr, resource_msg)) - resource_msgs.put((resource_msg, addr)) + self.resource_msgs.put((resource_msg, addr)) except socket.timeout: pass + self.logger.info("UDP listener draining") + last_msg_received_time = time.time() + while time.time() - last_msg_received_time < self.atexit_timeout: + try: + data, addr = self.udp_sock.recvfrom(2048) + msg = pickle.loads(data) + self.logger.debug("Got UDP Message from {}: {}".format(addr, msg)) + self.resource_msgs.put((msg, addr)) + last_msg_received_time = time.time() + except socket.timeout: + pass + + self.logger.info("UDP listener finishing normally") + finally: + self.logger.info("UDP listener finished") + + @wrap_with_logs(target="monitoring_router") + def start_zmq_listener(self) -> None: + try: + while not self.exit_event.is_set(): try: dfk_loop_start = time.time() while time.time() - dfk_loop_start < 1.0: # TODO make configurable @@ -124,16 +170,15 @@ def start(self, msg_0 = (msg, 0) if msg[0] == MessageType.NODE_INFO: - msg[1]['run_id'] = self.run_id - node_msgs.put(msg_0) + self.node_msgs.put(msg_0) elif msg[0] == MessageType.RESOURCE_INFO: - resource_msgs.put(msg_0) + self.resource_msgs.put(msg_0) elif msg[0] == MessageType.BLOCK_INFO: - block_msgs.put(msg_0) + self.block_msgs.put(msg_0) elif msg[0] == MessageType.TASK_INFO: - priority_msgs.put(msg_0) + self.priority_msgs.put(msg_0) elif msg[0] == MessageType.WORKFLOW_INFO: - priority_msgs.put(msg_0) + self.priority_msgs.put(msg_0) else: # There is a type: ignore here because if msg[0] # is of the correct type, this code is unreachable, @@ -151,21 +196,9 @@ def start(self, # thing to do. self.logger.warning("Failure processing a ZMQ message", exc_info=True) - self.logger.info("Monitoring router draining") - last_msg_received_time = time.time() - while time.time() - last_msg_received_time < self.atexit_timeout: - try: - data, addr = self.udp_sock.recvfrom(2048) - msg = pickle.loads(data) - self.logger.debug("Got UDP Message from {}: {}".format(addr, msg)) - resource_msgs.put((msg, addr)) - last_msg_received_time = time.time() - except socket.timeout: - pass - - self.logger.info("Monitoring router finishing normally") + self.logger.info("ZMQ listener finishing normally") finally: - self.logger.info("Monitoring router finished") + self.logger.info("ZMQ listener finished") @wrap_with_logs @@ -182,8 +215,7 @@ def router_starter(comm_q: "queue.Queue[Union[Tuple[int, int], str]]", zmq_port_range: Tuple[int, int], logdir: str, - logging_level: int, - run_id: str) -> None: + logging_level: int) -> None: setproctitle("parsl: monitoring router") try: router = MonitoringRouter(hub_address=hub_address, @@ -191,7 +223,11 @@ def router_starter(comm_q: "queue.Queue[Union[Tuple[int, int], str]]", zmq_port_range=zmq_port_range, logdir=logdir, logging_level=logging_level, - run_id=run_id) + priority_msgs=priority_msgs, + node_msgs=node_msgs, + block_msgs=block_msgs, + resource_msgs=resource_msgs, + exit_event=exit_event) except Exception as e: logger.error("MonitoringRouter construction failed.", exc_info=True) comm_q.put(f"Monitoring router construction failed: {e}") @@ -200,7 +236,7 @@ def router_starter(comm_q: "queue.Queue[Union[Tuple[int, int], str]]", router.logger.info("Starting MonitoringRouter in router_starter") try: - router.start(priority_msgs, node_msgs, block_msgs, resource_msgs, exit_event) + router.start() except Exception as e: router.logger.exception("router.start exception") exception_q.put(('Hub', str(e))) diff --git a/parsl/providers/__init__.py b/parsl/providers/__init__.py index 1f744215e5..c2ac29528c 100644 --- a/parsl/providers/__init__.py +++ b/parsl/providers/__init__.py @@ -1,12 +1,6 @@ - - from typing import TYPE_CHECKING if TYPE_CHECKING: - # Workstation Provider - - from parsl.providers.ad_hoc.ad_hoc import AdHocProvider - # Cloud Providers from parsl.providers.aws.aws import AWSProvider from parsl.providers.azure.azure import AzureProvider @@ -37,7 +31,6 @@ 'TorqueProvider': 'parsl.providers.torque.torque', 'PBSProProvider': 'parsl.providers.pbspro.pbspro', 'LSFProvider': 'parsl.providers.lsf.lsf', - 'AdHocProvider': 'parsl.providers.ad_hoc.ad_hoc', # Cloud Providers 'AWSProvider': 'parsl.providers.aws.aws', @@ -72,7 +65,6 @@ def lazy_loader(name): 'SlurmProvider', 'TorqueProvider', 'LSFProvider', - 'AdHocProvider', 'PBSProProvider', 'AWSProvider', 'GoogleCloudProvider', diff --git a/parsl/providers/ad_hoc/ad_hoc.py b/parsl/providers/ad_hoc/ad_hoc.py index 207dd55738..9059648101 100644 --- a/parsl/providers/ad_hoc/ad_hoc.py +++ b/parsl/providers/ad_hoc/ad_hoc.py @@ -12,8 +12,12 @@ logger = logging.getLogger(__name__) -class AdHocProvider(ExecutionProvider, RepresentationMixin): - """ Ad-hoc execution provider +class DeprecatedAdHocProvider(ExecutionProvider, RepresentationMixin): + """ Deprecated ad-hoc execution provider + + The (former) AdHocProvider is deprecated. See + `issue #3515 `_ + for further discussion. This provider is used to provision execution resources over one or more ad hoc nodes that are each accessible over a Channel (say, ssh) but otherwise lack a cluster scheduler. diff --git a/parsl/tests/configs/ad_hoc_cluster_htex.py b/parsl/tests/configs/ad_hoc_cluster_htex.py deleted file mode 100644 index 0949b82392..0000000000 --- a/parsl/tests/configs/ad_hoc_cluster_htex.py +++ /dev/null @@ -1,35 +0,0 @@ -from typing import Any, Dict - -from parsl.channels import SSHChannel -from parsl.config import Config -from parsl.executors import HighThroughputExecutor -from parsl.providers import AdHocProvider - -user_opts = {'adhoc': - {'username': 'YOUR_USERNAME', - 'script_dir': 'YOUR_SCRIPT_DIR', - 'remote_hostnames': ['REMOTE_HOST_URL_1', 'REMOTE_HOST_URL_2'] - } - } # type: Dict[str, Dict[str, Any]] - -config = Config( - executors=[ - HighThroughputExecutor( - label='remote_htex', - max_workers_per_node=2, - worker_logdir_root=user_opts['adhoc']['script_dir'], - encrypted=True, - provider=AdHocProvider( - # Command to be run before starting a worker, such as: - # 'module load Anaconda; source activate parsl_env'. - worker_init='', - channels=[SSHChannel(hostname=m, - username=user_opts['adhoc']['username'], - script_dir=user_opts['adhoc']['script_dir'], - ) for m in user_opts['adhoc']['remote_hostnames']] - ) - ) - ], - # AdHoc Clusters should not be setup with scaling strategy. - strategy='none', -) diff --git a/parsl/tests/configs/htex_ad_hoc_cluster.py b/parsl/tests/configs/htex_ad_hoc_cluster.py deleted file mode 100644 index db24b42ab2..0000000000 --- a/parsl/tests/configs/htex_ad_hoc_cluster.py +++ /dev/null @@ -1,26 +0,0 @@ -from parsl.channels import SSHChannel -from parsl.config import Config -from parsl.executors import HighThroughputExecutor -from parsl.providers import AdHocProvider -from parsl.tests.configs.user_opts import user_opts - -config = Config( - executors=[ - HighThroughputExecutor( - label='remote_htex', - cores_per_worker=1, - worker_debug=False, - address=user_opts['public_ip'], - encrypted=True, - provider=AdHocProvider( - move_files=False, - parallelism=1, - worker_init=user_opts['adhoc']['worker_init'], - channels=[SSHChannel(hostname=m, - username=user_opts['adhoc']['username'], - script_dir=user_opts['adhoc']['script_dir'], - ) for m in user_opts['adhoc']['remote_hostnames']] - ) - ) - ], -) diff --git a/parsl/tests/configs/local_adhoc.py b/parsl/tests/configs/local_adhoc.py index 25b1f38d61..9b1f951842 100644 --- a/parsl/tests/configs/local_adhoc.py +++ b/parsl/tests/configs/local_adhoc.py @@ -1,7 +1,7 @@ from parsl.channels import LocalChannel from parsl.config import Config from parsl.executors import HighThroughputExecutor -from parsl.providers import AdHocProvider +from parsl.providers.ad_hoc.ad_hoc import DeprecatedAdHocProvider def fresh_config(): @@ -10,7 +10,7 @@ def fresh_config(): HighThroughputExecutor( label='AdHoc', encrypted=True, - provider=AdHocProvider( + provider=DeprecatedAdHocProvider( channels=[LocalChannel(), LocalChannel()] ) ) diff --git a/parsl/tests/configs/swan_htex.py b/parsl/tests/configs/swan_htex.py deleted file mode 100644 index 3b1b6785ab..0000000000 --- a/parsl/tests/configs/swan_htex.py +++ /dev/null @@ -1,43 +0,0 @@ -""" -================== Block -| ++++++++++++++ | Node -| | | | -| | Task | | . . . -| | | | -| ++++++++++++++ | -================== -""" -from parsl.channels import SSHChannel -from parsl.config import Config -from parsl.executors import HighThroughputExecutor -from parsl.launchers import AprunLauncher -from parsl.providers import TorqueProvider - -# If you are a developer running tests, make sure to update parsl/tests/configs/user_opts.py -# If you are a user copying-and-pasting this as an example, make sure to either -# 1) create a local `user_opts.py`, or -# 2) delete the user_opts import below and replace all appearances of `user_opts` with the literal value -# (i.e., user_opts['swan']['username'] -> 'your_username') -from .user_opts import user_opts - -config = Config( - executors=[ - HighThroughputExecutor( - label='swan_htex', - encrypted=True, - provider=TorqueProvider( - channel=SSHChannel( - hostname='swan.cray.com', - username=user_opts['swan']['username'], - script_dir=user_opts['swan']['script_dir'], - ), - nodes_per_block=1, - init_blocks=1, - max_blocks=1, - launcher=AprunLauncher(), - scheduler_options=user_opts['swan']['scheduler_options'], - worker_init=user_opts['swan']['worker_init'], - ), - ) - ] -) diff --git a/parsl/tests/integration/test_channels/test_scp_1.py b/parsl/tests/integration/test_channels/test_scp_1.py deleted file mode 100644 index c11df3c663..0000000000 --- a/parsl/tests/integration/test_channels/test_scp_1.py +++ /dev/null @@ -1,45 +0,0 @@ -import os - -from parsl.channels.ssh.ssh import SSHChannel as SSH - - -def connect_and_list(hostname, username): - out = '' - conn = SSH(hostname, username=username) - conn.push_file(os.path.abspath('remote_run.sh'), '/home/davidk/') - # ec, out, err = conn.execute_wait("ls /tmp/remote_run.sh; bash /tmp/remote_run.sh") - conn.close() - return out - - -script = '''#!/bin/bash -echo "Hostname: $HOSTNAME" -echo "Cpu info -----" -cat /proc/cpuinfo -echo "Done----------" -''' - - -def test_connect_1(): - with open('remote_run.sh', 'w') as f: - f.write(script) - - sites = { - 'midway': { - 'url': 'midway.rcc.uchicago.edu', - 'uname': 'yadunand' - }, - 'swift': { - 'url': 'swift.rcc.uchicago.edu', - 'uname': 'yadunand' - } - } - - for site in sites.values(): - out = connect_and_list(site['url'], site['uname']) - print("Sitename :{0} hostname:{1}".format(site['url'], out)) - - -if __name__ == "__main__": - - test_connect_1() diff --git a/parsl/tests/integration/test_channels/test_ssh_1.py b/parsl/tests/integration/test_channels/test_ssh_1.py deleted file mode 100644 index 61ab3f2705..0000000000 --- a/parsl/tests/integration/test_channels/test_ssh_1.py +++ /dev/null @@ -1,40 +0,0 @@ -from parsl.channels.ssh.ssh import SSHChannel as SSH - - -def connect_and_list(hostname, username): - conn = SSH(hostname, username=username) - ec, out, err = conn.execute_wait("echo $HOSTNAME") - conn.close() - return out - - -def test_midway(): - ''' Test ssh channels to midway - ''' - url = 'midway.rcc.uchicago.edu' - uname = 'yadunand' - out = connect_and_list(url, uname) - print("Sitename :{0} hostname:{1}".format(url, out)) - - -def test_beagle(): - ''' Test ssh channels to beagle - ''' - url = 'login04.beagle.ci.uchicago.edu' - uname = 'yadunandb' - out = connect_and_list(url, uname) - print("Sitename :{0} hostname:{1}".format(url, out)) - - -def test_osg(): - ''' Test ssh connectivity to osg - ''' - url = 'login.osgconnect.net' - uname = 'yadunand' - out = connect_and_list(url, uname) - print("Sitename :{0} hostname:{1}".format(url, out)) - - -if __name__ == "__main__": - - pass diff --git a/parsl/tests/integration/test_channels/test_ssh_errors.py b/parsl/tests/integration/test_channels/test_ssh_errors.py deleted file mode 100644 index 7483e30a5c..0000000000 --- a/parsl/tests/integration/test_channels/test_ssh_errors.py +++ /dev/null @@ -1,46 +0,0 @@ -from parsl.channels.errors import BadHostKeyException, SSHException -from parsl.channels.ssh.ssh import SSHChannel as SSH - - -def connect_and_list(hostname, username): - conn = SSH(hostname, username=username) - ec, out, err = conn.execute_wait("echo $HOSTNAME") - conn.close() - return out - - -def test_error_1(): - try: - connect_and_list("bad.url.gov", "ubuntu") - except Exception as e: - assert type(e) is SSHException, "Expected SSException, got: {0}".format(e) - - -def test_error_2(): - try: - connect_and_list("swift.rcc.uchicago.edu", "mango") - except SSHException: - print("Caught the right exception") - else: - raise Exception("Expected SSException, got: {0}".format(e)) - - -def test_error_3(): - ''' This should work - ''' - try: - connect_and_list("edison.nersc.gov", "yadunand") - except BadHostKeyException as e: - print("Caught exception BadHostKeyException: ", e) - else: - assert False, "Expected SSException, got: {0}".format(e) - - -if __name__ == "__main__": - - tests = [test_error_1, test_error_2, test_error_3] - - for test in tests: - print("---------Running : {0}---------------".format(test)) - test() - print("----------------------DONE--------------------------") diff --git a/parsl/tests/integration/test_channels/test_ssh_file_transport.py b/parsl/tests/integration/test_channels/test_ssh_file_transport.py deleted file mode 100644 index 61672c3ff5..0000000000 --- a/parsl/tests/integration/test_channels/test_ssh_file_transport.py +++ /dev/null @@ -1,41 +0,0 @@ -import parsl -from parsl.channels.ssh.ssh import SSHChannel as SSH - - -def connect_and_list(hostname, username): - conn = SSH(hostname, username=username) - ec, out, err = conn.execute_wait("echo $HOSTNAME") - conn.close() - return out - - -def test_push(conn, fname="test001.txt"): - - with open(fname, 'w') as f: - f.write("Hello from parsl.ssh testing\n") - - conn.push_file(fname, "/tmp") - ec, out, err = conn.execute_wait("ls /tmp/{0}".format(fname)) - print(ec, out, err) - - -def test_pull(conn, fname="test001.txt"): - - local = "foo" - conn.pull_file("/tmp/{0}".format(fname), local) - - with open("{0}/{1}".format(local, fname), 'r') as f: - print(f.readlines()) - - -if __name__ == "__main__": - - parsl.set_stream_logger() - - # This is for testing - conn = SSH("midway.rcc.uchicago.edu", username="yadunand") - - test_push(conn) - test_pull(conn) - - conn.close() diff --git a/parsl/tests/integration/test_channels/test_ssh_interactive.py b/parsl/tests/integration/test_channels/test_ssh_interactive.py deleted file mode 100644 index c6f9b9dea9..0000000000 --- a/parsl/tests/integration/test_channels/test_ssh_interactive.py +++ /dev/null @@ -1,24 +0,0 @@ -import parsl -from parsl.channels.ssh_il.ssh_il import SSHInteractiveLoginChannel as SSH - - -def connect_and_list(hostname, username): - conn = SSH(hostname, username=username) - ec, out, err = conn.execute_wait("echo $HOSTNAME") - conn.close() - return out - - -def test_cooley(): - ''' Test ssh channels to midway - ''' - url = 'cooley.alcf.anl.gov' - uname = 'yadunand' - out = connect_and_list(url, uname) - print("Sitename :{0} hostname:{1}".format(url, out)) - return - - -if __name__ == "__main__": - parsl.set_stream_logger() - test_cooley() diff --git a/parsl/tests/manual_tests/test_ad_hoc_htex.py b/parsl/tests/manual_tests/test_ad_hoc_htex.py deleted file mode 100644 index dfa34ec0d1..0000000000 --- a/parsl/tests/manual_tests/test_ad_hoc_htex.py +++ /dev/null @@ -1,49 +0,0 @@ -import parsl -from parsl import python_app - -parsl.set_stream_logger() - -from parsl.channels import SSHChannel -from parsl.config import Config -from parsl.executors import HighThroughputExecutor -from parsl.providers import AdHocProvider - -remotes = ['midway2-login2.rcc.uchicago.edu', 'midway2-login1.rcc.uchicago.edu'] - -config = Config( - executors=[ - HighThroughputExecutor( - label='AdHoc', - max_workers_per_node=2, - worker_logdir_root="/scratch/midway2/yadunand/parsl_scripts", - encrypted=True, - provider=AdHocProvider( - worker_init="source /scratch/midway2/yadunand/parsl_env_setup.sh", - channels=[SSHChannel(hostname=m, - username="yadunand", - script_dir="/scratch/midway2/yadunand/parsl_cluster") - for m in remotes] - ) - ) - ] -) - - -@python_app -def platform(sleep=2, stdout=None): - import platform - import time - time.sleep(sleep) - return platform.uname() - - -def test_raw_provider(): - - parsl.load(config) - - x = [platform() for i in range(10)] - print([i.result() for i in x]) - - -if __name__ == "__main__": - test_raw_provider() diff --git a/parsl/tests/manual_tests/test_oauth_ssh.py b/parsl/tests/manual_tests/test_oauth_ssh.py deleted file mode 100644 index 3d464bcc0e..0000000000 --- a/parsl/tests/manual_tests/test_oauth_ssh.py +++ /dev/null @@ -1,13 +0,0 @@ -from parsl.channels import OAuthSSHChannel - - -def test_channel(): - channel = OAuthSSHChannel(hostname='ssh.demo.globus.org', username='yadunand') - x, stdout, stderr = channel.execute_wait('ls') - print(x, stdout, stderr) - assert x == 0, "Expected exit code 0, got {}".format(x) - - -if __name__ == '__main__': - - test_channel() diff --git a/parsl/tests/test_htex/test_disconnected_blocks_failing_provider.py b/parsl/tests/test_htex/test_disconnected_blocks_failing_provider.py new file mode 100644 index 0000000000..b2fa507aca --- /dev/null +++ b/parsl/tests/test_htex/test_disconnected_blocks_failing_provider.py @@ -0,0 +1,71 @@ +import logging + +import pytest + +import parsl +from parsl import Config +from parsl.executors import HighThroughputExecutor +from parsl.executors.errors import BadStateException +from parsl.jobs.states import JobState, JobStatus +from parsl.providers import LocalProvider + + +class FailingProvider(LocalProvider): + def submit(*args, **kwargs): + raise RuntimeError("Deliberate failure of provider.submit") + + +def local_config(): + """Config to simulate failing blocks without connecting""" + return Config( + executors=[ + HighThroughputExecutor( + label="HTEX", + heartbeat_period=1, + heartbeat_threshold=2, + poll_period=100, + max_workers_per_node=1, + provider=FailingProvider( + init_blocks=0, + max_blocks=2, + min_blocks=0, + ), + ) + ], + max_idletime=0.5, + strategy='htex_auto_scale', + strategy_period=0.1 + # this strategy period needs to be a few times smaller than the + # status_polling_interval of FailingProvider, which is 5s at + # time of writing + ) + + +@parsl.python_app +def double(x): + return x * 2 + + +@pytest.mark.local +def test_disconnected_blocks(): + """Test reporting of blocks that fail to connect from HTEX""" + dfk = parsl.dfk() + executor = dfk.executors["HTEX"] + + connected_blocks = executor.connected_blocks() + assert not connected_blocks, "Expected 0 blocks" + + future = double(5) + with pytest.raises(BadStateException): + future.result() + + assert isinstance(future.exception(), BadStateException) + + status_dict = executor.status() + assert len(status_dict) == 1, "Expected exactly 1 block" + for status in status_dict.values(): + assert isinstance(status, JobStatus) + assert status.state == JobState.MISSING + + connected_blocks = executor.connected_blocks() + assert connected_blocks == [], "Expected exactly 0 connected blocks" diff --git a/parsl/tests/test_htex/test_htex.py b/parsl/tests/test_htex/test_htex.py index 2d1aafda85..810236c1b4 100644 --- a/parsl/tests/test_htex/test_htex.py +++ b/parsl/tests/test_htex/test_htex.py @@ -1,6 +1,7 @@ +import logging import pathlib -import warnings from subprocess import Popen, TimeoutExpired +from typing import Optional, Sequence from unittest import mock import pytest @@ -71,12 +72,11 @@ def test_htex_start_encrypted( @pytest.mark.local @pytest.mark.parametrize("started", (True, False)) @pytest.mark.parametrize("timeout_expires", (True, False)) -@mock.patch(f"{_MOCK_BASE}.logger") def test_htex_shutdown( - mock_logger: mock.MagicMock, started: bool, timeout_expires: bool, htex: HighThroughputExecutor, + caplog ): mock_ix_proc = mock.Mock(spec=Popen) @@ -108,22 +108,22 @@ def kill_interchange(*args, **kwargs): mock_ix_proc.terminate.side_effect = kill_interchange - htex.shutdown() + with caplog.at_level(logging.INFO): + htex.shutdown() - mock_logs = mock_logger.info.call_args_list if started: assert mock_ix_proc.terminate.called assert mock_ix_proc.wait.called assert {"timeout": 10} == mock_ix_proc.wait.call_args[1] if timeout_expires: - assert "Unable to terminate Interchange" in mock_logs[1][0][0] + assert "Unable to terminate Interchange" in caplog.text assert mock_ix_proc.kill.called - assert "Attempting" in mock_logs[0][0][0] - assert "Finished" in mock_logs[-1][0][0] + assert "Attempting HighThroughputExecutor shutdown" in caplog.text + assert "Finished HighThroughputExecutor shutdown" in caplog.text else: assert not mock_ix_proc.terminate.called assert not mock_ix_proc.wait.called - assert "has not started" in mock_logs[0][0][0] + assert "HighThroughputExecutor has not started" in caplog.text @pytest.mark.local @@ -139,13 +139,22 @@ def test_max_workers_per_node(): @pytest.mark.local -def test_htex_launch_cmd(): - htex = HighThroughputExecutor() - assert htex.launch_cmd.startswith("process_worker_pool.py") - assert htex.interchange_launch_cmd == "interchange.py" - - launch_cmd = "custom-launch-cmd" - ix_launch_cmd = "custom-ix-launch-cmd" - htex = HighThroughputExecutor(launch_cmd=launch_cmd, interchange_launch_cmd=ix_launch_cmd) - assert htex.launch_cmd == launch_cmd - assert htex.interchange_launch_cmd == ix_launch_cmd +@pytest.mark.parametrize("cmd", (None, "custom-launch-cmd")) +def test_htex_worker_pool_launch_cmd(cmd: Optional[str]): + if cmd: + htex = HighThroughputExecutor(launch_cmd=cmd) + assert htex.launch_cmd == cmd + else: + htex = HighThroughputExecutor() + assert htex.launch_cmd.startswith("process_worker_pool.py") + + +@pytest.mark.local +@pytest.mark.parametrize("cmd", (None, ["custom", "launch", "cmd"])) +def test_htex_interchange_launch_cmd(cmd: Optional[Sequence[str]]): + if cmd: + htex = HighThroughputExecutor(interchange_launch_cmd=cmd) + assert htex.interchange_launch_cmd == cmd + else: + htex = HighThroughputExecutor() + assert htex.interchange_launch_cmd == ["interchange.py"] diff --git a/parsl/tests/test_htex/test_zmq_binding.py b/parsl/tests/test_htex/test_zmq_binding.py index 1194e632d0..e21c065d0d 100644 --- a/parsl/tests/test_htex/test_zmq_binding.py +++ b/parsl/tests/test_htex/test_zmq_binding.py @@ -9,6 +9,7 @@ from parsl import curvezmq from parsl.executors.high_throughput.interchange import Interchange +from parsl.executors.high_throughput.manager_selector import RandomManagerSelector def make_interchange(*, interchange_address: Optional[str], cert_dir: Optional[str]) -> Interchange: @@ -23,7 +24,9 @@ def make_interchange(*, interchange_address: Optional[str], cert_dir: Optional[s heartbeat_threshold=60, logdir=".", logging_level=logging.INFO, - poll_period=10) + manager_selector=RandomManagerSelector(), + poll_period=10, + run_id="test_run_id") @pytest.fixture diff --git a/parsl/tests/test_monitoring/test_basic.py b/parsl/tests/test_monitoring/test_basic.py index c900670ec8..1c792a9d82 100644 --- a/parsl/tests/test_monitoring/test_basic.py +++ b/parsl/tests/test_monitoring/test_basic.py @@ -25,10 +25,23 @@ def this_app(): # a configuration that is suitably configured for monitoring. def htex_config(): + """This config will use htex's default htex-specific monitoring radio mode""" from parsl.tests.configs.htex_local_alternate import fresh_config return fresh_config() +def htex_udp_config(): + """This config will force UDP""" + from parsl.tests.configs.htex_local_alternate import fresh_config + c = fresh_config() + assert len(c.executors) == 1 + + assert c.executors[0].radio_mode == "htex", "precondition: htex has a radio mode attribute, configured for htex radio" + c.executors[0].radio_mode = "udp" + + return c + + def workqueue_config(): from parsl.tests.configs.workqueue_ex import fresh_config c = fresh_config() @@ -48,7 +61,7 @@ def taskvine_config(): @pytest.mark.local -@pytest.mark.parametrize("fresh_config", [htex_config, workqueue_config, taskvine_config]) +@pytest.mark.parametrize("fresh_config", [htex_config, htex_udp_config, workqueue_config, taskvine_config]) def test_row_counts(tmpd_cwd, fresh_config): # this is imported here rather than at module level because # it isn't available in a plain parsl install, so this module diff --git a/parsl/tests/test_monitoring/test_fuzz_zmq.py b/parsl/tests/test_monitoring/test_fuzz_zmq.py index 36f048efb3..3f50385564 100644 --- a/parsl/tests/test_monitoring/test_fuzz_zmq.py +++ b/parsl/tests/test_monitoring/test_fuzz_zmq.py @@ -44,8 +44,8 @@ def test_row_counts(): # the latter is what i'm most suspicious of in my present investigation # dig out the interchange port... - hub_address = parsl.dfk().hub_address - hub_zmq_port = parsl.dfk().hub_zmq_port + hub_address = parsl.dfk().monitoring.hub_address + hub_zmq_port = parsl.dfk().monitoring.hub_zmq_port # this will send a string to a new socket connection with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: diff --git a/parsl/tests/test_mpi_apps/test_mpiex.py b/parsl/tests/test_mpi_apps/test_mpiex.py index 1b3e86e0b8..a85547abea 100644 --- a/parsl/tests/test_mpi_apps/test_mpiex.py +++ b/parsl/tests/test_mpi_apps/test_mpiex.py @@ -44,7 +44,7 @@ def test_init(): new_kwargs = {'max_workers_per_block'} excluded_kwargs = {'available_accelerators', 'enable_mpi_mode', 'cores_per_worker', 'max_workers_per_node', - 'mem_per_worker', 'cpu_affinity', 'max_workers'} + 'mem_per_worker', 'cpu_affinity', 'max_workers', 'manager_selector'} # Get the kwargs from both HTEx and MPIEx htex_kwargs = set(signature(HighThroughputExecutor.__init__).parameters) diff --git a/parsl/tests/test_providers/test_local_provider.py b/parsl/tests/test_providers/test_local_provider.py index c6844b00c0..497c13370d 100644 --- a/parsl/tests/test_providers/test_local_provider.py +++ b/parsl/tests/test_providers/test_local_provider.py @@ -11,7 +11,8 @@ import pytest -from parsl.channels import LocalChannel, SSHChannel +from parsl.channels import LocalChannel +from parsl.channels.ssh.ssh import DeprecatedSSHChannel from parsl.jobs.states import JobState from parsl.launchers import SingleNodeLauncher from parsl.providers import LocalProvider @@ -92,10 +93,10 @@ def test_ssh_channel(): # already exist, so create it here. pathlib.Path('{}/known.hosts'.format(config_dir)).touch(mode=0o600) script_dir = tempfile.mkdtemp() - channel = SSHChannel('127.0.0.1', port=server_port, - script_dir=remote_script_dir, - host_keys_filename='{}/known.hosts'.format(config_dir), - key_filename=priv_key) + channel = DeprecatedSSHChannel('127.0.0.1', port=server_port, + script_dir=remote_script_dir, + host_keys_filename='{}/known.hosts'.format(config_dir), + key_filename=priv_key) try: p = LocalProvider(channel=channel, launcher=SingleNodeLauncher(debug=False)) diff --git a/parsl/version.py b/parsl/version.py index b91e22c53e..b0bd6ea77b 100644 --- a/parsl/version.py +++ b/parsl/version.py @@ -1,3 +1,3 @@ """Set module version. """ -VERSION = '2024.07.15+desc-2024.07.18b' +VERSION = '2024.08.12+desc-2024.08.14a'