Skip to content

Commit

Permalink
Merge branch 'master' into tmp_function_data
Browse files Browse the repository at this point in the history
  • Loading branch information
benclifford authored Aug 21, 2024
2 parents 9f6b037 + 0fc966f commit 5ec7cdb
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 102 deletions.
5 changes: 4 additions & 1 deletion README.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Parsl - Parallel Scripting Library
==================================
|licence| |build-status| |docs| |NSF-1550588| |NSF-1550476| |NSF-1550562| |NSF-1550528|
|licence| |build-status| |docs| |NSF-1550588| |NSF-1550476| |NSF-1550562| |NSF-1550528| |CZI-EOSS|

Parsl extends parallelism in Python beyond a single computer.

Expand Down Expand Up @@ -64,6 +64,9 @@ then explore the `parallel computing patterns <https://parsl.readthedocs.io/en/s
.. |NSF-1550475| image:: https://img.shields.io/badge/NSF-1550475-blue.svg
:target: https://nsf.gov/awardsearch/showAward?AWD_ID=1550475
:alt: NSF award info
.. |CZI-EOSS| image:: https://chanzuckerberg.github.io/open-science/badges/CZI-EOSS.svg
:target: https://czi.co/EOSS
:alt: CZI's Essential Open Source Software for Science


Quickstart
Expand Down
39 changes: 4 additions & 35 deletions parsl/monitoring/db_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -308,35 +308,9 @@ def __init__(self,
self.pending_resource_queue: queue.Queue[MonitoringMessage] = queue.Queue()

def start(self,
priority_queue: mpq.Queue,
node_queue: mpq.Queue,
block_queue: mpq.Queue,
resource_queue: mpq.Queue) -> None:

self._kill_event = threading.Event()
self._priority_queue_pull_thread = threading.Thread(target=self._migrate_logs_to_internal,
args=(
priority_queue, self._kill_event,),
name="Monitoring-migrate-priority",
daemon=True,
)
self._priority_queue_pull_thread.start()

self._node_queue_pull_thread = threading.Thread(target=self._migrate_logs_to_internal,
args=(
node_queue, self._kill_event,),
name="Monitoring-migrate-node",
daemon=True,
)
self._node_queue_pull_thread.start()

self._block_queue_pull_thread = threading.Thread(target=self._migrate_logs_to_internal,
args=(
block_queue, self._kill_event,),
name="Monitoring-migrate-block",
daemon=True,
)
self._block_queue_pull_thread.start()

self._resource_queue_pull_thread = threading.Thread(target=self._migrate_logs_to_internal,
args=(
Expand Down Expand Up @@ -372,20 +346,18 @@ def start(self,
while (not self._kill_event.is_set() or
self.pending_priority_queue.qsize() != 0 or self.pending_resource_queue.qsize() != 0 or
self.pending_node_queue.qsize() != 0 or self.pending_block_queue.qsize() != 0 or
priority_queue.qsize() != 0 or resource_queue.qsize() != 0 or
node_queue.qsize() != 0 or block_queue.qsize() != 0):
resource_queue.qsize() != 0):

"""
WORKFLOW_INFO and TASK_INFO messages (i.e. priority messages)
"""
try:
logger.debug("""Checking STOP conditions: {}, {}, {}, {}, {}, {}, {}, {}, {}""".format(
logger.debug("""Checking STOP conditions: {}, {}, {}, {}, {}, {}""".format(
self._kill_event.is_set(),
self.pending_priority_queue.qsize() != 0, self.pending_resource_queue.qsize() != 0,
self.pending_node_queue.qsize() != 0, self.pending_block_queue.qsize() != 0,
priority_queue.qsize() != 0, resource_queue.qsize() != 0,
node_queue.qsize() != 0, block_queue.qsize() != 0))
resource_queue.qsize() != 0))

# This is the list of resource messages which can be reprocessed as if they
# had just arrived because the corresponding first task message has been
Expand Down Expand Up @@ -707,9 +679,6 @@ def close(self) -> None:
@wrap_with_logs(target="database_manager")
@typeguard.typechecked
def dbm_starter(exception_q: mpq.Queue,
priority_msgs: mpq.Queue,
node_msgs: mpq.Queue,
block_msgs: mpq.Queue,
resource_msgs: mpq.Queue,
db_url: str,
logdir: str,
Expand All @@ -726,7 +695,7 @@ def dbm_starter(exception_q: mpq.Queue,
logdir=logdir,
logging_level=logging_level)
logger.info("Starting dbm in dbm starter")
dbm.start(priority_msgs, node_msgs, block_msgs, resource_msgs)
dbm.start(resource_msgs)
except KeyboardInterrupt:
logger.exception("KeyboardInterrupt signal caught")
dbm.close()
Expand Down
28 changes: 5 additions & 23 deletions parsl/monitoring/monitoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import time
from multiprocessing import Event, Process
from multiprocessing.queues import Queue
from typing import TYPE_CHECKING, Any, Optional, Tuple, Union, cast
from typing import TYPE_CHECKING, Any, Literal, Optional, Tuple, Union, cast

import typeguard

Expand Down Expand Up @@ -138,27 +138,15 @@ def start(self, dfk_run_dir: str, config_run_dir: Union[str, os.PathLike]) -> No
self.exception_q: Queue[Tuple[str, str]]
self.exception_q = SizedQueue(maxsize=10)

self.priority_msgs: Queue[Tuple[Any, int]]
self.priority_msgs = SizedQueue()

self.resource_msgs: Queue[AddressedMonitoringMessage]
self.resource_msgs: Queue[Union[AddressedMonitoringMessage, Tuple[Literal["STOP"], Literal[0]]]]
self.resource_msgs = SizedQueue()

self.node_msgs: Queue[AddressedMonitoringMessage]
self.node_msgs = SizedQueue()

self.block_msgs: Queue[AddressedMonitoringMessage]
self.block_msgs = SizedQueue()

self.router_exit_event: ms.Event
self.router_exit_event = Event()

self.router_proc = ForkProcess(target=router_starter,
kwargs={"comm_q": comm_q,
"exception_q": self.exception_q,
"priority_msgs": self.priority_msgs,
"node_msgs": self.node_msgs,
"block_msgs": self.block_msgs,
"resource_msgs": self.resource_msgs,
"exit_event": self.router_exit_event,
"hub_address": self.hub_address,
Expand All @@ -173,7 +161,7 @@ def start(self, dfk_run_dir: str, config_run_dir: Union[str, os.PathLike]) -> No
self.router_proc.start()

self.dbm_proc = ForkProcess(target=dbm_starter,
args=(self.exception_q, self.priority_msgs, self.node_msgs, self.block_msgs, self.resource_msgs,),
args=(self.exception_q, self.resource_msgs,),
kwargs={"logdir": self.logdir,
"logging_level": logging.DEBUG if self.monitoring_debug else logging.INFO,
"db_url": self.logging_endpoint,
Expand All @@ -192,7 +180,7 @@ def start(self, dfk_run_dir: str, config_run_dir: Union[str, os.PathLike]) -> No
self.filesystem_proc.start()
logger.info(f"Started filesystem radio receiver process {self.filesystem_proc.pid}")

self.radio = MultiprocessingQueueRadioSender(self.block_msgs)
self.radio = MultiprocessingQueueRadioSender(self.resource_msgs)

try:
comm_q_result = comm_q.get(block=True, timeout=120)
Expand Down Expand Up @@ -249,7 +237,7 @@ def close(self) -> None:
logger.debug("Finished waiting for router termination")
if len(exception_msgs) == 0:
logger.debug("Sending STOP to DBM")
self.priority_msgs.put(("STOP", 0))
self.resource_msgs.put(("STOP", 0))
else:
logger.debug("Not sending STOP to DBM, because there were DBM exceptions")
logger.debug("Waiting for DB termination")
Expand All @@ -267,14 +255,8 @@ def close(self) -> None:
logger.info("Closing monitoring multiprocessing queues")
self.exception_q.close()
self.exception_q.join_thread()
self.priority_msgs.close()
self.priority_msgs.join_thread()
self.resource_msgs.close()
self.resource_msgs.join_thread()
self.node_msgs.close()
self.node_msgs.join_thread()
self.block_msgs.close()
self.block_msgs.join_thread()
logger.info("Closed monitoring multiprocessing queues")


Expand Down
36 changes: 3 additions & 33 deletions parsl/monitoring/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import zmq

from parsl.log_utils import set_file_logger
from parsl.monitoring.message_type import MessageType
from parsl.monitoring.types import AddressedMonitoringMessage, TaggedMonitoringMessage
from parsl.process_loggers import wrap_with_logs
from parsl.utils import setproctitle
Expand All @@ -34,9 +33,6 @@ def __init__(self,
logdir: str = ".",
logging_level: int = logging.INFO,
atexit_timeout: int = 3, # in seconds
priority_msgs: mpq.Queue,
node_msgs: mpq.Queue,
block_msgs: mpq.Queue,
resource_msgs: mpq.Queue,
exit_event: Event,
):
Expand All @@ -57,8 +53,8 @@ def __init__(self,
Logging level as defined in the logging module. Default: logging.INFO
atexit_timeout : float, optional
The amount of time in seconds to terminate the hub without receiving any messages, after the last dfk workflow message is received.
*_msgs : Queue
Four multiprocessing queues to receive messages, routed by type tag, and sometimes modified according to type tag.
resource_msgs : multiprocessing.Queue
A multiprocessing queue to receive messages to be routed onwards to the database process
exit_event : Event
An event that the main Parsl process will set to signal that the monitoring router should shut down.
Expand Down Expand Up @@ -102,9 +98,6 @@ def __init__(self,
min_port=zmq_port_range[0],
max_port=zmq_port_range[1])

self.priority_msgs = priority_msgs
self.node_msgs = node_msgs
self.block_msgs = block_msgs
self.resource_msgs = resource_msgs
self.exit_event = exit_event

Expand Down Expand Up @@ -170,24 +163,7 @@ def start_zmq_listener(self) -> None:
msg_0: AddressedMonitoringMessage
msg_0 = (msg, 0)

if msg[0] == MessageType.NODE_INFO:
self.node_msgs.put(msg_0)
elif msg[0] == MessageType.RESOURCE_INFO:
self.resource_msgs.put(msg_0)
elif msg[0] == MessageType.BLOCK_INFO:
self.block_msgs.put(msg_0)
elif msg[0] == MessageType.TASK_INFO:
self.priority_msgs.put(msg_0)
elif msg[0] == MessageType.WORKFLOW_INFO:
self.priority_msgs.put(msg_0)
else:
# There is a type: ignore here because if msg[0]
# is of the correct type, this code is unreachable,
# but there is no verification that the message
# received from zmq_receiver_channel.recv_pyobj() is actually
# of that type.
self.logger.error("Discarding message " # type: ignore[unreachable]
f"from interchange with unknown type {msg[0].value}")
self.resource_msgs.put(msg_0)
except zmq.Again:
pass
except Exception:
Expand All @@ -207,9 +183,6 @@ def start_zmq_listener(self) -> None:
def router_starter(*,
comm_q: mpq.Queue,
exception_q: mpq.Queue,
priority_msgs: mpq.Queue,
node_msgs: mpq.Queue,
block_msgs: mpq.Queue,
resource_msgs: mpq.Queue,
exit_event: Event,

Expand All @@ -226,9 +199,6 @@ def router_starter(*,
zmq_port_range=zmq_port_range,
logdir=logdir,
logging_level=logging_level,
priority_msgs=priority_msgs,
node_msgs=node_msgs,
block_msgs=block_msgs,
resource_msgs=resource_msgs,
exit_event=exit_event)
except Exception as e:
Expand Down
50 changes: 40 additions & 10 deletions parsl/providers/slurm/slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
logger = logging.getLogger(__name__)

# From https://slurm.schedmd.com/sacct.html#SECTION_JOB-STATE-CODES
translate_table = {
sacct_translate_table = {
'PENDING': JobState.PENDING,
'RUNNING': JobState.RUNNING,
'CANCELLED': JobState.CANCELLED,
Expand All @@ -37,6 +37,20 @@
'REQUEUED': JobState.PENDING
}

squeue_translate_table = {
'PD': JobState.PENDING,
'R': JobState.RUNNING,
'CA': JobState.CANCELLED,
'CF': JobState.PENDING, # (configuring),
'CG': JobState.RUNNING, # (completing),
'CD': JobState.COMPLETED,
'F': JobState.FAILED, # (failed),
'TO': JobState.TIMEOUT, # (timeout),
'NF': JobState.FAILED, # (node failure),
'RV': JobState.FAILED, # (revoked) and
'SE': JobState.FAILED # (special exit state)
}


class SlurmProvider(ClusterProvider, RepresentationMixin):
"""Slurm Execution Provider
Expand Down Expand Up @@ -155,6 +169,23 @@ def __init__(self,

self.regex_job_id = regex_job_id
self.worker_init = worker_init + '\n'
# Check if sacct works and if not fall back to squeue
cmd = "sacct -X"
logger.debug("Executing %s", cmd)
retcode, stdout, stderr = self.execute_wait(cmd)
# If sacct fails it should return retcode=1 stderr="Slurm accounting storage is disabled"
logger.debug(f"sacct returned retcode={retcode} stderr={stderr}")
if retcode == 0:
logger.debug("using sacct to get job status")
# Using state%20 to get enough characters to not truncate output
# of the state. Without output can look like "<job_id> CANCELLED+"
self._cmd = "sacct -X --noheader --format=jobid,state%20 --job '{0}'"
self._translate_table = sacct_translate_table
else:
logger.debug(f"sacct failed with retcode={retcode}")
logger.debug("falling back to using squeue to get job status")
self._cmd = "squeue --noheader --format='%i %t' --job '{0}'"
self._translate_table = squeue_translate_table

def _status(self):
'''Returns the status list for a list of job_ids
Expand All @@ -172,16 +203,14 @@ def _status(self):
logger.debug('No active jobs, skipping status update')
return

# Using state%20 to get enough characters to not truncate output
# of the state. Without output can look like "<job_id> CANCELLED+"
cmd = "sacct -X --noheader --format=jobid,state%20 --job '{0}'".format(job_id_list)
cmd = self._cmd.format(job_id_list)
logger.debug("Executing %s", cmd)
retcode, stdout, stderr = self.execute_wait(cmd)
logger.debug("sacct returned %s %s", stdout, stderr)
logger.debug("sacct/squeue returned %s %s", stdout, stderr)

# Execute_wait failed. Do no update
if retcode != 0:
logger.warning("sacct failed with non-zero exit code {}".format(retcode))
logger.warning("sacct/squeue failed with non-zero exit code {}".format(retcode))
return

jobs_missing = set(self.resources.keys())
Expand All @@ -193,19 +222,20 @@ def _status(self):
# For example "<job_id> CANCELLED by <user_id>"
# This splits and ignores anything past the first two unpacked values
job_id, slurm_state, *ignore = line.split()
if slurm_state not in translate_table:
if slurm_state not in self._translate_table:
logger.warning(f"Slurm status {slurm_state} is not recognized")
status = translate_table.get(slurm_state, JobState.UNKNOWN)
status = self._translate_table.get(slurm_state, JobState.UNKNOWN)
logger.debug("Updating job {} with slurm status {} to parsl state {!s}".format(job_id, slurm_state, status))
self.resources[job_id]['status'] = JobStatus(status,
stdout_path=self.resources[job_id]['job_stdout_path'],
stderr_path=self.resources[job_id]['job_stderr_path'])
jobs_missing.remove(job_id)

# sacct can get job info after jobs have completed so this path shouldn't be hit
# log a warning if there are missing jobs for some reason
# squeue does not report on jobs that are not running. So we are filling in the
# blanks for missing jobs, we might lose some information about why the jobs failed.
for missing_job in jobs_missing:
logger.warning("Updating missing job {} to completed status".format(missing_job))
logger.debug("Updating missing job {} to completed status".format(missing_job))
self.resources[missing_job]['status'] = JobStatus(
JobState.COMPLETED, stdout_path=self.resources[missing_job]['job_stdout_path'],
stderr_path=self.resources[missing_job]['job_stderr_path'])
Expand Down

0 comments on commit 5ec7cdb

Please sign in to comment.