Skip to content

Commit

Permalink
Merge branch 'master' into parsl+flux
Browse files Browse the repository at this point in the history
  • Loading branch information
mercybassey authored Mar 6, 2024
2 parents 5c7a6c0 + 9334059 commit 2bf4de1
Show file tree
Hide file tree
Showing 13 changed files with 106 additions and 55 deletions.
2 changes: 1 addition & 1 deletion .flake8
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
# W504: line break after binary operator
# (Raised by flake8 even when it is followed)
ignore = E126, E402, E129, W504
max-line-length = 155
max-line-length = 151
exclude = test_import_fail.py,
parsl/executors/workqueue/parsl_coprocess.py
# E741 disallows ambiguous single letter names which look like numbers
Expand Down
10 changes: 9 additions & 1 deletion docs/README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,15 @@ Local builds

To build the documentation locally, use::

$ make html
$ make clean html

To view the freshly rebuilt docs, use::

$ cd _build/html
$ python3 -m http.server 8080

Once the python http server is launched, point your browser to `http://localhost:8080 <http://localhost:8080>`_


Regenerate module stubs
--------------------------
Expand Down
9 changes: 0 additions & 9 deletions docs/userguide/configuring.rst
Original file line number Diff line number Diff line change
Expand Up @@ -446,16 +446,7 @@ The following snippet shows an example configuration for accessing NSCC's **ASPI
.. literalinclude:: ../../parsl/configs/ASPIRE1.py


Blue Waters (NCSA)
------------------

.. image:: https://www.cray.com/sites/default/files/images/Solutions_Images/bluewaters.png

The following snippet shows an example configuration for executing remotely on Blue Waters, a flagship machine at the National Center for Supercomputing Applications.
The configuration assumes the user is running on a login node and uses the `parsl.providers.TorqueProvider` to interface
with the scheduler, and uses the `parsl.launchers.AprunLauncher` to launch workers.

.. literalinclude:: ../../parsl/configs/bluewaters.py

Illinois Campus Cluster (UIUC)
------------------------------
Expand Down
5 changes: 3 additions & 2 deletions docs/userguide/execution.rst
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ parameters include access keys, instance type, and spot bid price
Parsl currently supports the following providers:

1. `parsl.providers.LocalProvider`: The provider allows you to run locally on your laptop or workstation.
2. `parsl.providers.CobaltProvider`: This provider allows you to schedule resources via the Cobalt scheduler.
2. `parsl.providers.CobaltProvider`: This provider allows you to schedule resources via the Cobalt scheduler. **This provider is deprecated and will be removed by 2024.04**.
3. `parsl.providers.SlurmProvider`: This provider allows you to schedule resources via the Slurm scheduler.
4. `parsl.providers.CondorProvider`: This provider allows you to schedule resources via the Condor scheduler.
5. `parsl.providers.GridEngineProvider`: This provider allows you to schedule resources via the GridEngine scheduler.
Expand All @@ -48,7 +48,8 @@ Parsl currently supports the following providers:
8. `parsl.providers.GoogleCloudProvider`: This provider allows you to provision and manage cloud nodes from Google Cloud.
9. `parsl.providers.KubernetesProvider`: This provider allows you to provision and manage containers on a Kubernetes cluster.
10. `parsl.providers.AdHocProvider`: This provider allows you manage execution over a collection of nodes to form an ad-hoc cluster.
11. `parsl.providers.LSFProvider`: This provider allows you to schedule resources via IBM's LSF scheduler
11. `parsl.providers.LSFProvider`: This provider allows you to schedule resources via IBM's LSF scheduler.



Executors
Expand Down
28 changes: 0 additions & 28 deletions parsl/configs/bluewaters.py

This file was deleted.

26 changes: 19 additions & 7 deletions parsl/executors/high_throughput/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import queue
import datetime
import pickle
from dataclasses import dataclass
from multiprocessing import Process, Queue
from typing import Dict, Sequence
from typing import List, Optional, Tuple, Union, Callable
Expand Down Expand Up @@ -694,7 +695,7 @@ def create_monitoring_info(self, status):
def workers_per_node(self) -> Union[int, float]:
return self._workers_per_node

def scale_in(self, blocks, max_idletime=None):
def scale_in(self, blocks: int, max_idletime: Optional[float] = None) -> List[str]:
"""Scale in the number of active blocks by specified amount.
The scale in method here is very rude. It doesn't give the workers
Expand All @@ -721,25 +722,36 @@ def scale_in(self, blocks, max_idletime=None):
List of block IDs scaled in
"""
logger.debug(f"Scale in called, blocks={blocks}")

@dataclass
class BlockInfo:
tasks: int # sum of tasks in this block
idle: float # shortest idle time of any manager in this block

managers = self.connected_managers()
block_info = {} # block id -> list( tasks, idle duration )
block_info: Dict[str, BlockInfo] = {}
for manager in managers:
if not manager['active']:
continue
b_id = manager['block_id']
if b_id not in block_info:
block_info[b_id] = [0, float('inf')]
block_info[b_id][0] += manager['tasks']
block_info[b_id][1] = min(block_info[b_id][1], manager['idle_duration'])
block_info[b_id] = BlockInfo(tasks=0, idle=float('inf'))
block_info[b_id].tasks += manager['tasks']
block_info[b_id].idle = min(block_info[b_id].idle, manager['idle_duration'])

# The scaling policy is that longest idle blocks should be scaled down
# in preference to least idle (most recently used) blocks.
# Other policies could be implemented here.

sorted_blocks = sorted(block_info.items(), key=lambda item: (-item[1].idle, item[1].tasks))

sorted_blocks = sorted(block_info.items(), key=lambda item: (item[1][1], item[1][0]))
logger.debug(f"Scale in selecting from {len(sorted_blocks)} blocks")
if max_idletime is None:
block_ids_to_kill = [x[0] for x in sorted_blocks[:blocks]]
else:
block_ids_to_kill = []
for x in sorted_blocks:
if x[1][1] > max_idletime and x[1][0] == 0:
if x[1].idle > max_idletime and x[1].tasks == 0:
block_ids_to_kill.append(x[0])
if len(block_ids_to_kill) == blocks:
break
Expand Down
8 changes: 7 additions & 1 deletion parsl/executors/high_throughput/interchange.py
Original file line number Diff line number Diff line change
Expand Up @@ -621,7 +621,13 @@ def start_file_logger(filename: str, level: int = logging.DEBUG, format_string:
None.
"""
if format_string is None:
format_string = "%(asctime)s.%(msecs)03d %(name)s:%(lineno)d %(processName)s(%(process)d) %(threadName)s %(funcName)s [%(levelname)s] %(message)s"
format_string = (

"%(asctime)s.%(msecs)03d %(name)s:%(lineno)d "
"%(processName)s(%(process)d) %(threadName)s "
"%(funcName)s [%(levelname)s] %(message)s"

)

global logger
logger = logging.getLogger(LOGGER_NAME)
Expand Down
5 changes: 4 additions & 1 deletion parsl/executors/high_throughput/process_worker_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -869,7 +869,10 @@ def strategyorlist(s: str):
block_id=args.block_id,
cores_per_worker=float(args.cores_per_worker),
mem_per_worker=None if args.mem_per_worker == 'None' else float(args.mem_per_worker),
max_workers_per_node=args.max_workers_per_node if args.max_workers_per_node == float('inf') else int(args.max_workers_per_node),
max_workers_per_node=(
args.max_workers_per_node if args.max_workers_per_node == float('inf')
else int(args.max_workers_per_node)
),
prefetch_capacity=int(args.prefetch_capacity),
heartbeat_threshold=int(args.hb_threshold),
heartbeat_period=int(args.hb_period),
Expand Down
16 changes: 14 additions & 2 deletions parsl/monitoring/db_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,12 @@ class Resource(Base):
'psutil_process_disk_write', Float, nullable=True)
psutil_process_status = Column(
'psutil_process_status', Text, nullable=True)
psutil_cpu_num = Column(
'psutil_cpu_num', Text, nullable=True)
psutil_process_num_ctx_switches_voluntary = Column(
'psutil_process_num_ctx_switches_voluntary', Float, nullable=True)
psutil_process_num_ctx_switches_involuntary = Column(
'psutil_process_num_ctx_switches_involuntary', Float, nullable=True)
__table_args__ = (
PrimaryKeyConstraint('try_id', 'task_id', 'run_id', 'timestamp'),
)
Expand Down Expand Up @@ -518,7 +524,10 @@ def start(self,
reprocessable_first_resource_messages.append(msg)
else:
if task_try_id in deferred_resource_messages:
logger.error("Task {} already has a deferred resource message. Discarding previous message.".format(msg['task_id']))
logger.error(
"Task {} already has a deferred resource message. "
"Discarding previous message.".format(msg['task_id'])
)
deferred_resource_messages[task_try_id] = msg
elif msg['last_msg']:
# This assumes that the primary key has been added
Expand All @@ -544,7 +553,10 @@ def start(self,
if reprocessable_last_resource_messages:
self._insert(table=STATUS, messages=reprocessable_last_resource_messages)
except Exception:
logger.exception("Exception in db loop: this might have been a malformed message, or some other error. monitoring data may have been lost")
logger.exception(
"Exception in db loop: this might have been a malformed message, "
"or some other error. monitoring data may have been lost"
)
exception_happened = True
if exception_happened:
raise RuntimeError("An exception happened sometime during database processing and should have been logged in database_manager.log")
Expand Down
8 changes: 6 additions & 2 deletions parsl/monitoring/monitoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -290,8 +290,12 @@ def close(self) -> None:
self._dfk_channel.close()
if exception_msgs:
for exception_msg in exception_msgs:
self.logger.error("{} process delivered an exception: {}. Terminating all monitoring processes immediately.".format(exception_msg[0],
exception_msg[1]))
self.logger.error(
"{} process delivered an exception: {}. Terminating all monitoring processes immediately.".format(
exception_msg[0],
exception_msg[1]
)
)
self.router_proc.terminate()
self.dbm_proc.terminate()
self.filesystem_proc.terminate()
Expand Down
29 changes: 29 additions & 0 deletions parsl/monitoring/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,8 @@ def monitor(pid: int,

children_user_time = {} # type: Dict[int, float]
children_system_time = {} # type: Dict[int, float]
children_num_ctx_switches_voluntary = {} # type: Dict[int, float]
children_num_ctx_switches_involuntary = {} # type: Dict[int, float]

def accumulate_and_prepare() -> Dict[str, Any]:
d = {"psutil_process_" + str(k): v for k, v in pm.as_dict().items() if k in simple}
Expand All @@ -218,6 +220,15 @@ def accumulate_and_prepare() -> Dict[str, Any]:
logging.debug("got children")

d["psutil_cpu_count"] = psutil.cpu_count()

# note that this will be the CPU number of the base process, not anything launched by it
d["psutil_cpu_num"] = pm.cpu_num()

pctxsw = pm.num_ctx_switches()

d["psutil_process_num_ctx_switches_voluntary"] = pctxsw.voluntary
d["psutil_process_num_ctx_switches_involuntary"] = pctxsw.involuntary

d['psutil_process_memory_virtual'] = pm.memory_info().vms
d['psutil_process_memory_resident'] = pm.memory_info().rss
d['psutil_process_time_user'] = pm.cpu_times().user
Expand All @@ -238,6 +249,11 @@ def accumulate_and_prepare() -> Dict[str, Any]:
child_system_time = child.cpu_times().system
children_user_time[child.pid] = child_user_time
children_system_time[child.pid] = child_system_time

pctxsw = child.num_ctx_switches()
children_num_ctx_switches_voluntary[child.pid] = pctxsw.voluntary
children_num_ctx_switches_involuntary[child.pid] = pctxsw.involuntary

d['psutil_process_memory_virtual'] += child.memory_info().vms
d['psutil_process_memory_resident'] += child.memory_info().rss
try:
Expand All @@ -248,14 +264,27 @@ def accumulate_and_prepare() -> Dict[str, Any]:
logging.exception("Exception reading IO counters for child {k}. Recorded IO usage may be incomplete".format(k=k), exc_info=True)
d['psutil_process_disk_write'] += 0
d['psutil_process_disk_read'] += 0

total_children_user_time = 0.0
for child_pid in children_user_time:
total_children_user_time += children_user_time[child_pid]

total_children_system_time = 0.0
for child_pid in children_system_time:
total_children_system_time += children_system_time[child_pid]

total_children_num_ctx_switches_voluntary = 0.0
for child_pid in children_num_ctx_switches_voluntary:
total_children_num_ctx_switches_voluntary += children_num_ctx_switches_voluntary[child_pid]

total_children_num_ctx_switches_involuntary = 0.0
for child_pid in children_num_ctx_switches_involuntary:
total_children_num_ctx_switches_involuntary += children_num_ctx_switches_involuntary[child_pid]

d['psutil_process_time_user'] += total_children_user_time
d['psutil_process_time_system'] += total_children_system_time
d['psutil_process_num_ctx_switches_voluntary'] += total_children_num_ctx_switches_voluntary
d['psutil_process_num_ctx_switches_involuntary'] += total_children_num_ctx_switches_involuntary
logging.debug("sending message")
return d

Expand Down
7 changes: 7 additions & 0 deletions parsl/monitoring/visualization/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,5 +102,12 @@ class Resource(db.Model):
'psutil_process_disk_write', db.Float, nullable=True)
psutil_process_status = db.Column(
'psutil_process_status', db.Text, nullable=True)
psutil_cpu_num = db.Column(
'psutil_cpu_num', db.Text, nullable=True)
psutil_process_num_ctx_switches_voluntary = db.Column(
'psutil_process_num_ctx_switches_voluntary', db.Float, nullable=True)
psutil_process_num_ctx_switches_involuntary = db.Column(
'psutil_process_num_ctx_switches_involuntary', db.Float, nullable=True)

__table_args__ = (
db.PrimaryKeyConstraint('task_id', 'run_id', 'timestamp'),)
8 changes: 7 additions & 1 deletion parsl/providers/slurm/slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,13 @@ def submit(self, command: str, tasks_per_node: int, job_name="parsl.slurm") -> s
else:
logger.error("Could not read job ID from submit command standard output.")
logger.error("Retcode:%s STDOUT:%s STDERR:%s", retcode, stdout.strip(), stderr.strip())
raise SubmitException(job_name, "Could not read job ID from submit command standard output", stdout=stdout, stderr=stderr, retcode=retcode)
raise SubmitException(
job_name,
"Could not read job ID from submit command standard output",
stdout=stdout,
stderr=stderr,
retcode=retcode
)
else:
logger.error("Submit command failed")
logger.error("Retcode:%s STDOUT:%s STDERR:%s", retcode, stdout.strip(), stderr.strip())
Expand Down

0 comments on commit 2bf4de1

Please sign in to comment.