diff --git a/.flake8 b/.flake8 index 45a31e7cf6..4c434c7518 100644 --- a/.flake8 +++ b/.flake8 @@ -8,7 +8,7 @@ # W504: line break after binary operator # (Raised by flake8 even when it is followed) ignore = E126, E402, E129, W504 -max-line-length = 155 +max-line-length = 151 exclude = test_import_fail.py, parsl/executors/workqueue/parsl_coprocess.py # E741 disallows ambiguous single letter names which look like numbers diff --git a/docs/README.rst b/docs/README.rst index 44f91eb16f..f50c03f3d1 100644 --- a/docs/README.rst +++ b/docs/README.rst @@ -21,7 +21,15 @@ Local builds To build the documentation locally, use:: - $ make html + $ make clean html + +To view the freshly rebuilt docs, use:: + + $ cd _build/html + $ python3 -m http.server 8080 + +Once the python http server is launched, point your browser to `http://localhost:8080 `_ + Regenerate module stubs -------------------------- diff --git a/docs/userguide/configuring.rst b/docs/userguide/configuring.rst index 7378157601..5233b30ddc 100644 --- a/docs/userguide/configuring.rst +++ b/docs/userguide/configuring.rst @@ -446,16 +446,7 @@ The following snippet shows an example configuration for accessing NSCC's **ASPI .. literalinclude:: ../../parsl/configs/ASPIRE1.py -Blue Waters (NCSA) ------------------- - -.. image:: https://www.cray.com/sites/default/files/images/Solutions_Images/bluewaters.png - -The following snippet shows an example configuration for executing remotely on Blue Waters, a flagship machine at the National Center for Supercomputing Applications. -The configuration assumes the user is running on a login node and uses the `parsl.providers.TorqueProvider` to interface -with the scheduler, and uses the `parsl.launchers.AprunLauncher` to launch workers. -.. literalinclude:: ../../parsl/configs/bluewaters.py Illinois Campus Cluster (UIUC) ------------------------------ diff --git a/docs/userguide/execution.rst b/docs/userguide/execution.rst index 1a61e40e73..4168367f9d 100644 --- a/docs/userguide/execution.rst +++ b/docs/userguide/execution.rst @@ -39,7 +39,7 @@ parameters include access keys, instance type, and spot bid price Parsl currently supports the following providers: 1. `parsl.providers.LocalProvider`: The provider allows you to run locally on your laptop or workstation. -2. `parsl.providers.CobaltProvider`: This provider allows you to schedule resources via the Cobalt scheduler. +2. `parsl.providers.CobaltProvider`: This provider allows you to schedule resources via the Cobalt scheduler. **This provider is deprecated and will be removed by 2024.04**. 3. `parsl.providers.SlurmProvider`: This provider allows you to schedule resources via the Slurm scheduler. 4. `parsl.providers.CondorProvider`: This provider allows you to schedule resources via the Condor scheduler. 5. `parsl.providers.GridEngineProvider`: This provider allows you to schedule resources via the GridEngine scheduler. @@ -48,7 +48,8 @@ Parsl currently supports the following providers: 8. `parsl.providers.GoogleCloudProvider`: This provider allows you to provision and manage cloud nodes from Google Cloud. 9. `parsl.providers.KubernetesProvider`: This provider allows you to provision and manage containers on a Kubernetes cluster. 10. `parsl.providers.AdHocProvider`: This provider allows you manage execution over a collection of nodes to form an ad-hoc cluster. -11. `parsl.providers.LSFProvider`: This provider allows you to schedule resources via IBM's LSF scheduler +11. `parsl.providers.LSFProvider`: This provider allows you to schedule resources via IBM's LSF scheduler. + Executors diff --git a/docs/userguide/mpi_apps.rst b/docs/userguide/mpi_apps.rst index 588ddf1e94..1bfeb6ce56 100644 --- a/docs/userguide/mpi_apps.rst +++ b/docs/userguide/mpi_apps.rst @@ -146,7 +146,7 @@ Writing MPI-Compatible Apps ++++++++++++++++++++++++++++ In MPI mode, the :class:`~parsl.executors.high_throughput.executor.HighThroughputExecutor` can execute both Python or Bash Apps which invokes the MPI application. -However, it is important to not that Python Apps that directly use ``mpi4py`` is not supported. +However, it is important to note that Python Apps that directly use ``mpi4py`` is not supported. For multi-node MPI applications, especially when running multiple applications within a single batch job, it is important to specify the resource requirements for the app so that the Parsl worker can provision diff --git a/parsl/configs/bluewaters.py b/parsl/configs/bluewaters.py deleted file mode 100644 index 38ae9cbad8..0000000000 --- a/parsl/configs/bluewaters.py +++ /dev/null @@ -1,28 +0,0 @@ -from parsl.config import Config -from parsl.executors import HighThroughputExecutor -from parsl.launchers import AprunLauncher -from parsl.providers import TorqueProvider - - -config = Config( - executors=[ - HighThroughputExecutor( - label="bw_htex", - cores_per_worker=1, - worker_debug=False, - provider=TorqueProvider( - queue='normal', - launcher=AprunLauncher(overrides="-b -- bwpy-environ --"), - scheduler_options='', # string to prepend to #SBATCH blocks in the submit script to the scheduler - worker_init='', # command to run before starting a worker, such as 'source activate env' - init_blocks=1, - max_blocks=1, - min_blocks=1, - nodes_per_block=2, - walltime='00:10:00' - ), - ) - - ], - -) diff --git a/parsl/executors/high_throughput/executor.py b/parsl/executors/high_throughput/executor.py index df427f61c7..ee326c5f66 100644 --- a/parsl/executors/high_throughput/executor.py +++ b/parsl/executors/high_throughput/executor.py @@ -6,6 +6,7 @@ import queue import datetime import pickle +from dataclasses import dataclass from multiprocessing import Process, Queue from typing import Dict, Sequence from typing import List, Optional, Tuple, Union, Callable @@ -327,7 +328,7 @@ def __init__(self, def _warn_deprecated(self, old: str, new: str): warnings.warn( f"{old} is deprecated and will be removed in a future release. " - "Please use {new} instead.", + f"Please use {new} instead.", DeprecationWarning, stacklevel=2 ) @@ -694,7 +695,7 @@ def create_monitoring_info(self, status): def workers_per_node(self) -> Union[int, float]: return self._workers_per_node - def scale_in(self, blocks, max_idletime=None): + def scale_in(self, blocks: int, max_idletime: Optional[float] = None) -> List[str]: """Scale in the number of active blocks by specified amount. The scale in method here is very rude. It doesn't give the workers @@ -721,25 +722,36 @@ def scale_in(self, blocks, max_idletime=None): List of block IDs scaled in """ logger.debug(f"Scale in called, blocks={blocks}") + + @dataclass + class BlockInfo: + tasks: int # sum of tasks in this block + idle: float # shortest idle time of any manager in this block + managers = self.connected_managers() - block_info = {} # block id -> list( tasks, idle duration ) + block_info: Dict[str, BlockInfo] = {} for manager in managers: if not manager['active']: continue b_id = manager['block_id'] if b_id not in block_info: - block_info[b_id] = [0, float('inf')] - block_info[b_id][0] += manager['tasks'] - block_info[b_id][1] = min(block_info[b_id][1], manager['idle_duration']) + block_info[b_id] = BlockInfo(tasks=0, idle=float('inf')) + block_info[b_id].tasks += manager['tasks'] + block_info[b_id].idle = min(block_info[b_id].idle, manager['idle_duration']) + + # The scaling policy is that longest idle blocks should be scaled down + # in preference to least idle (most recently used) blocks. + # Other policies could be implemented here. + + sorted_blocks = sorted(block_info.items(), key=lambda item: (-item[1].idle, item[1].tasks)) - sorted_blocks = sorted(block_info.items(), key=lambda item: (item[1][1], item[1][0])) logger.debug(f"Scale in selecting from {len(sorted_blocks)} blocks") if max_idletime is None: block_ids_to_kill = [x[0] for x in sorted_blocks[:blocks]] else: block_ids_to_kill = [] for x in sorted_blocks: - if x[1][1] > max_idletime and x[1][0] == 0: + if x[1].idle > max_idletime and x[1].tasks == 0: block_ids_to_kill.append(x[0]) if len(block_ids_to_kill) == blocks: break diff --git a/parsl/executors/high_throughput/interchange.py b/parsl/executors/high_throughput/interchange.py index 13d271b402..0c96cc51f1 100644 --- a/parsl/executors/high_throughput/interchange.py +++ b/parsl/executors/high_throughput/interchange.py @@ -621,7 +621,13 @@ def start_file_logger(filename: str, level: int = logging.DEBUG, format_string: None. """ if format_string is None: - format_string = "%(asctime)s.%(msecs)03d %(name)s:%(lineno)d %(processName)s(%(process)d) %(threadName)s %(funcName)s [%(levelname)s] %(message)s" + format_string = ( + + "%(asctime)s.%(msecs)03d %(name)s:%(lineno)d " + "%(processName)s(%(process)d) %(threadName)s " + "%(funcName)s [%(levelname)s] %(message)s" + + ) global logger logger = logging.getLogger(LOGGER_NAME) diff --git a/parsl/executors/high_throughput/process_worker_pool.py b/parsl/executors/high_throughput/process_worker_pool.py index 6e6eb2e425..7c32334786 100755 --- a/parsl/executors/high_throughput/process_worker_pool.py +++ b/parsl/executors/high_throughput/process_worker_pool.py @@ -869,7 +869,10 @@ def strategyorlist(s: str): block_id=args.block_id, cores_per_worker=float(args.cores_per_worker), mem_per_worker=None if args.mem_per_worker == 'None' else float(args.mem_per_worker), - max_workers_per_node=args.max_workers_per_node if args.max_workers_per_node == float('inf') else int(args.max_workers_per_node), + max_workers_per_node=( + args.max_workers_per_node if args.max_workers_per_node == float('inf') + else int(args.max_workers_per_node) + ), prefetch_capacity=int(args.prefetch_capacity), heartbeat_threshold=int(args.hb_threshold), heartbeat_period=int(args.hb_period), diff --git a/parsl/monitoring/db_manager.py b/parsl/monitoring/db_manager.py index ebc63b8291..84e25619a4 100644 --- a/parsl/monitoring/db_manager.py +++ b/parsl/monitoring/db_manager.py @@ -250,6 +250,12 @@ class Resource(Base): 'psutil_process_disk_write', Float, nullable=True) psutil_process_status = Column( 'psutil_process_status', Text, nullable=True) + psutil_cpu_num = Column( + 'psutil_cpu_num', Text, nullable=True) + psutil_process_num_ctx_switches_voluntary = Column( + 'psutil_process_num_ctx_switches_voluntary', Float, nullable=True) + psutil_process_num_ctx_switches_involuntary = Column( + 'psutil_process_num_ctx_switches_involuntary', Float, nullable=True) __table_args__ = ( PrimaryKeyConstraint('try_id', 'task_id', 'run_id', 'timestamp'), ) @@ -518,7 +524,10 @@ def start(self, reprocessable_first_resource_messages.append(msg) else: if task_try_id in deferred_resource_messages: - logger.error("Task {} already has a deferred resource message. Discarding previous message.".format(msg['task_id'])) + logger.error( + "Task {} already has a deferred resource message. " + "Discarding previous message.".format(msg['task_id']) + ) deferred_resource_messages[task_try_id] = msg elif msg['last_msg']: # This assumes that the primary key has been added @@ -544,7 +553,10 @@ def start(self, if reprocessable_last_resource_messages: self._insert(table=STATUS, messages=reprocessable_last_resource_messages) except Exception: - logger.exception("Exception in db loop: this might have been a malformed message, or some other error. monitoring data may have been lost") + logger.exception( + "Exception in db loop: this might have been a malformed message, " + "or some other error. monitoring data may have been lost" + ) exception_happened = True if exception_happened: raise RuntimeError("An exception happened sometime during database processing and should have been logged in database_manager.log") diff --git a/parsl/monitoring/monitoring.py b/parsl/monitoring/monitoring.py index 8acc40b70f..b6a168cff2 100644 --- a/parsl/monitoring/monitoring.py +++ b/parsl/monitoring/monitoring.py @@ -290,8 +290,12 @@ def close(self) -> None: self._dfk_channel.close() if exception_msgs: for exception_msg in exception_msgs: - self.logger.error("{} process delivered an exception: {}. Terminating all monitoring processes immediately.".format(exception_msg[0], - exception_msg[1])) + self.logger.error( + "{} process delivered an exception: {}. Terminating all monitoring processes immediately.".format( + exception_msg[0], + exception_msg[1] + ) + ) self.router_proc.terminate() self.dbm_proc.terminate() self.filesystem_proc.terminate() diff --git a/parsl/monitoring/remote.py b/parsl/monitoring/remote.py index c0a9629cf0..e0a84a1a2f 100644 --- a/parsl/monitoring/remote.py +++ b/parsl/monitoring/remote.py @@ -201,6 +201,8 @@ def monitor(pid: int, children_user_time = {} # type: Dict[int, float] children_system_time = {} # type: Dict[int, float] + children_num_ctx_switches_voluntary = {} # type: Dict[int, float] + children_num_ctx_switches_involuntary = {} # type: Dict[int, float] def accumulate_and_prepare() -> Dict[str, Any]: d = {"psutil_process_" + str(k): v for k, v in pm.as_dict().items() if k in simple} @@ -218,6 +220,15 @@ def accumulate_and_prepare() -> Dict[str, Any]: logging.debug("got children") d["psutil_cpu_count"] = psutil.cpu_count() + + # note that this will be the CPU number of the base process, not anything launched by it + d["psutil_cpu_num"] = pm.cpu_num() + + pctxsw = pm.num_ctx_switches() + + d["psutil_process_num_ctx_switches_voluntary"] = pctxsw.voluntary + d["psutil_process_num_ctx_switches_involuntary"] = pctxsw.involuntary + d['psutil_process_memory_virtual'] = pm.memory_info().vms d['psutil_process_memory_resident'] = pm.memory_info().rss d['psutil_process_time_user'] = pm.cpu_times().user @@ -238,6 +249,11 @@ def accumulate_and_prepare() -> Dict[str, Any]: child_system_time = child.cpu_times().system children_user_time[child.pid] = child_user_time children_system_time[child.pid] = child_system_time + + pctxsw = child.num_ctx_switches() + children_num_ctx_switches_voluntary[child.pid] = pctxsw.voluntary + children_num_ctx_switches_involuntary[child.pid] = pctxsw.involuntary + d['psutil_process_memory_virtual'] += child.memory_info().vms d['psutil_process_memory_resident'] += child.memory_info().rss try: @@ -248,14 +264,27 @@ def accumulate_and_prepare() -> Dict[str, Any]: logging.exception("Exception reading IO counters for child {k}. Recorded IO usage may be incomplete".format(k=k), exc_info=True) d['psutil_process_disk_write'] += 0 d['psutil_process_disk_read'] += 0 + total_children_user_time = 0.0 for child_pid in children_user_time: total_children_user_time += children_user_time[child_pid] + total_children_system_time = 0.0 for child_pid in children_system_time: total_children_system_time += children_system_time[child_pid] + + total_children_num_ctx_switches_voluntary = 0.0 + for child_pid in children_num_ctx_switches_voluntary: + total_children_num_ctx_switches_voluntary += children_num_ctx_switches_voluntary[child_pid] + + total_children_num_ctx_switches_involuntary = 0.0 + for child_pid in children_num_ctx_switches_involuntary: + total_children_num_ctx_switches_involuntary += children_num_ctx_switches_involuntary[child_pid] + d['psutil_process_time_user'] += total_children_user_time d['psutil_process_time_system'] += total_children_system_time + d['psutil_process_num_ctx_switches_voluntary'] += total_children_num_ctx_switches_voluntary + d['psutil_process_num_ctx_switches_involuntary'] += total_children_num_ctx_switches_involuntary logging.debug("sending message") return d diff --git a/parsl/monitoring/visualization/models.py b/parsl/monitoring/visualization/models.py index 79459260ea..9ac4a0d8b2 100644 --- a/parsl/monitoring/visualization/models.py +++ b/parsl/monitoring/visualization/models.py @@ -102,5 +102,12 @@ class Resource(db.Model): 'psutil_process_disk_write', db.Float, nullable=True) psutil_process_status = db.Column( 'psutil_process_status', db.Text, nullable=True) + psutil_cpu_num = db.Column( + 'psutil_cpu_num', db.Text, nullable=True) + psutil_process_num_ctx_switches_voluntary = db.Column( + 'psutil_process_num_ctx_switches_voluntary', db.Float, nullable=True) + psutil_process_num_ctx_switches_involuntary = db.Column( + 'psutil_process_num_ctx_switches_involuntary', db.Float, nullable=True) + __table_args__ = ( db.PrimaryKeyConstraint('task_id', 'run_id', 'timestamp'),) diff --git a/parsl/providers/slurm/slurm.py b/parsl/providers/slurm/slurm.py index 3f57076f50..e792bc07c6 100644 --- a/parsl/providers/slurm/slurm.py +++ b/parsl/providers/slurm/slurm.py @@ -280,7 +280,13 @@ def submit(self, command: str, tasks_per_node: int, job_name="parsl.slurm") -> s else: logger.error("Could not read job ID from submit command standard output.") logger.error("Retcode:%s STDOUT:%s STDERR:%s", retcode, stdout.strip(), stderr.strip()) - raise SubmitException(job_name, "Could not read job ID from submit command standard output", stdout=stdout, stderr=stderr, retcode=retcode) + raise SubmitException( + job_name, + "Could not read job ID from submit command standard output", + stdout=stdout, + stderr=stderr, + retcode=retcode + ) else: logger.error("Submit command failed") logger.error("Retcode:%s STDOUT:%s STDERR:%s", retcode, stdout.strip(), stderr.strip())