From 73f6f657233aa393f829f0361ba89c819218ff79 Mon Sep 17 00:00:00 2001 From: "Daniel S. Katz" Date: Tue, 20 Aug 2024 00:37:37 -0500 Subject: [PATCH 1/5] Add CZI badge to README.rst (#3596) --- README.rst | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/README.rst b/README.rst index 72048d39f4..da7f8245a5 100644 --- a/README.rst +++ b/README.rst @@ -1,6 +1,6 @@ Parsl - Parallel Scripting Library ================================== -|licence| |build-status| |docs| |NSF-1550588| |NSF-1550476| |NSF-1550562| |NSF-1550528| +|licence| |build-status| |docs| |NSF-1550588| |NSF-1550476| |NSF-1550562| |NSF-1550528| |CZI-EOSS| Parsl extends parallelism in Python beyond a single computer. @@ -64,6 +64,9 @@ then explore the `parallel computing patterns Date: Tue, 20 Aug 2024 04:46:13 -0700 Subject: [PATCH 2/5] Fallback to squeue when sacct is missing in SlurmProvider (#3591) Adds internal check to test whether the slurm provider should use the sacct or squeue command. Some slurm clusters might not use the accounting database sacct uses. This allows slurm clusters that use the database to use the sacct command which can be easier on the slurm scheduler, or if the database is not present switch to the squeue command which will should work on all clusters. Fixes #3590 --- parsl/providers/slurm/slurm.py | 50 +++++++++++++++++++++++++++------- 1 file changed, 40 insertions(+), 10 deletions(-) diff --git a/parsl/providers/slurm/slurm.py b/parsl/providers/slurm/slurm.py index ec6abeff56..54b4053fed 100644 --- a/parsl/providers/slurm/slurm.py +++ b/parsl/providers/slurm/slurm.py @@ -20,7 +20,7 @@ logger = logging.getLogger(__name__) # From https://slurm.schedmd.com/sacct.html#SECTION_JOB-STATE-CODES -translate_table = { +sacct_translate_table = { 'PENDING': JobState.PENDING, 'RUNNING': JobState.RUNNING, 'CANCELLED': JobState.CANCELLED, @@ -37,6 +37,20 @@ 'REQUEUED': JobState.PENDING } +squeue_translate_table = { + 'PD': JobState.PENDING, + 'R': JobState.RUNNING, + 'CA': JobState.CANCELLED, + 'CF': JobState.PENDING, # (configuring), + 'CG': JobState.RUNNING, # (completing), + 'CD': JobState.COMPLETED, + 'F': JobState.FAILED, # (failed), + 'TO': JobState.TIMEOUT, # (timeout), + 'NF': JobState.FAILED, # (node failure), + 'RV': JobState.FAILED, # (revoked) and + 'SE': JobState.FAILED # (special exit state) +} + class SlurmProvider(ClusterProvider, RepresentationMixin): """Slurm Execution Provider @@ -155,6 +169,23 @@ def __init__(self, self.regex_job_id = regex_job_id self.worker_init = worker_init + '\n' + # Check if sacct works and if not fall back to squeue + cmd = "sacct -X" + logger.debug("Executing %s", cmd) + retcode, stdout, stderr = self.execute_wait(cmd) + # If sacct fails it should return retcode=1 stderr="Slurm accounting storage is disabled" + logger.debug(f"sacct returned retcode={retcode} stderr={stderr}") + if retcode == 0: + logger.debug("using sacct to get job status") + # Using state%20 to get enough characters to not truncate output + # of the state. Without output can look like " CANCELLED+" + self._cmd = "sacct -X --noheader --format=jobid,state%20 --job '{0}'" + self._translate_table = sacct_translate_table + else: + logger.debug(f"sacct failed with retcode={retcode}") + logger.debug("falling back to using squeue to get job status") + self._cmd = "squeue --noheader --format='%i %t' --job '{0}'" + self._translate_table = squeue_translate_table def _status(self): '''Returns the status list for a list of job_ids @@ -172,16 +203,14 @@ def _status(self): logger.debug('No active jobs, skipping status update') return - # Using state%20 to get enough characters to not truncate output - # of the state. Without output can look like " CANCELLED+" - cmd = "sacct -X --noheader --format=jobid,state%20 --job '{0}'".format(job_id_list) + cmd = self._cmd.format(job_id_list) logger.debug("Executing %s", cmd) retcode, stdout, stderr = self.execute_wait(cmd) - logger.debug("sacct returned %s %s", stdout, stderr) + logger.debug("sacct/squeue returned %s %s", stdout, stderr) # Execute_wait failed. Do no update if retcode != 0: - logger.warning("sacct failed with non-zero exit code {}".format(retcode)) + logger.warning("sacct/squeue failed with non-zero exit code {}".format(retcode)) return jobs_missing = set(self.resources.keys()) @@ -193,9 +222,9 @@ def _status(self): # For example " CANCELLED by " # This splits and ignores anything past the first two unpacked values job_id, slurm_state, *ignore = line.split() - if slurm_state not in translate_table: + if slurm_state not in self._translate_table: logger.warning(f"Slurm status {slurm_state} is not recognized") - status = translate_table.get(slurm_state, JobState.UNKNOWN) + status = self._translate_table.get(slurm_state, JobState.UNKNOWN) logger.debug("Updating job {} with slurm status {} to parsl state {!s}".format(job_id, slurm_state, status)) self.resources[job_id]['status'] = JobStatus(status, stdout_path=self.resources[job_id]['job_stdout_path'], @@ -203,9 +232,10 @@ def _status(self): jobs_missing.remove(job_id) # sacct can get job info after jobs have completed so this path shouldn't be hit - # log a warning if there are missing jobs for some reason + # squeue does not report on jobs that are not running. So we are filling in the + # blanks for missing jobs, we might lose some information about why the jobs failed. for missing_job in jobs_missing: - logger.warning("Updating missing job {} to completed status".format(missing_job)) + logger.debug("Updating missing job {} to completed status".format(missing_job)) self.resources[missing_job]['status'] = JobStatus( JobState.COMPLETED, stdout_path=self.resources[missing_job]['job_stdout_path'], stderr_path=self.resources[missing_job]['job_stderr_path']) From 0fc966f2a284839df6c6662fd369d3530ae31a20 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Tue, 20 Aug 2024 14:16:18 +0200 Subject: [PATCH 3/5] Collapse 4 monitoring router to db queues into 1 queue (#3593) Prior to this PR, there are four multiprocessing queues from the monitoring router process to the database manager process. (also used by the submit process via MultiprocessingQueueRadioSender but that is not so relevant for this PR) Each message arriving at the router goes into MonitoringRouter.start_zmq_listener where it is dispatched based on tag type into one of these four queues towards the monitoring database. In the monitoring database code, no matter which queue the messages arrive on, they are all passed into DatabaseManager._dispatch_to_internal. The four queues then don't provide much functionality - their effect is maybe some non-deterministic message order shuffling. This PR collapses those four queues into a single queue. # Changed Behaviour Messages will arrive at the database manager in possibly different orders. This might flush out more race conditions. The monitoring router would previous validate that a message tag was one of 5 known message tags (as part of choosing which queue to dispatch to). This PR removes that validation. That validation now happens at the receiving end of the (now single) queue, in DatabaseManager._dispatch_to_internal. Error messages related to invalid tags (which should only be coming from development of new message types) will now appear in the database manager process, rather than the router process. --- parsl/monitoring/db_manager.py | 39 ++++------------------------------ parsl/monitoring/monitoring.py | 28 +++++------------------- parsl/monitoring/router.py | 36 +++---------------------------- 3 files changed, 12 insertions(+), 91 deletions(-) diff --git a/parsl/monitoring/db_manager.py b/parsl/monitoring/db_manager.py index 053c98d598..4fcf5ec2e2 100644 --- a/parsl/monitoring/db_manager.py +++ b/parsl/monitoring/db_manager.py @@ -308,35 +308,9 @@ def __init__(self, self.pending_resource_queue: queue.Queue[MonitoringMessage] = queue.Queue() def start(self, - priority_queue: mpq.Queue, - node_queue: mpq.Queue, - block_queue: mpq.Queue, resource_queue: mpq.Queue) -> None: self._kill_event = threading.Event() - self._priority_queue_pull_thread = threading.Thread(target=self._migrate_logs_to_internal, - args=( - priority_queue, self._kill_event,), - name="Monitoring-migrate-priority", - daemon=True, - ) - self._priority_queue_pull_thread.start() - - self._node_queue_pull_thread = threading.Thread(target=self._migrate_logs_to_internal, - args=( - node_queue, self._kill_event,), - name="Monitoring-migrate-node", - daemon=True, - ) - self._node_queue_pull_thread.start() - - self._block_queue_pull_thread = threading.Thread(target=self._migrate_logs_to_internal, - args=( - block_queue, self._kill_event,), - name="Monitoring-migrate-block", - daemon=True, - ) - self._block_queue_pull_thread.start() self._resource_queue_pull_thread = threading.Thread(target=self._migrate_logs_to_internal, args=( @@ -372,20 +346,18 @@ def start(self, while (not self._kill_event.is_set() or self.pending_priority_queue.qsize() != 0 or self.pending_resource_queue.qsize() != 0 or self.pending_node_queue.qsize() != 0 or self.pending_block_queue.qsize() != 0 or - priority_queue.qsize() != 0 or resource_queue.qsize() != 0 or - node_queue.qsize() != 0 or block_queue.qsize() != 0): + resource_queue.qsize() != 0): """ WORKFLOW_INFO and TASK_INFO messages (i.e. priority messages) """ try: - logger.debug("""Checking STOP conditions: {}, {}, {}, {}, {}, {}, {}, {}, {}""".format( + logger.debug("""Checking STOP conditions: {}, {}, {}, {}, {}, {}""".format( self._kill_event.is_set(), self.pending_priority_queue.qsize() != 0, self.pending_resource_queue.qsize() != 0, self.pending_node_queue.qsize() != 0, self.pending_block_queue.qsize() != 0, - priority_queue.qsize() != 0, resource_queue.qsize() != 0, - node_queue.qsize() != 0, block_queue.qsize() != 0)) + resource_queue.qsize() != 0)) # This is the list of resource messages which can be reprocessed as if they # had just arrived because the corresponding first task message has been @@ -707,9 +679,6 @@ def close(self) -> None: @wrap_with_logs(target="database_manager") @typeguard.typechecked def dbm_starter(exception_q: mpq.Queue, - priority_msgs: mpq.Queue, - node_msgs: mpq.Queue, - block_msgs: mpq.Queue, resource_msgs: mpq.Queue, db_url: str, logdir: str, @@ -726,7 +695,7 @@ def dbm_starter(exception_q: mpq.Queue, logdir=logdir, logging_level=logging_level) logger.info("Starting dbm in dbm starter") - dbm.start(priority_msgs, node_msgs, block_msgs, resource_msgs) + dbm.start(resource_msgs) except KeyboardInterrupt: logger.exception("KeyboardInterrupt signal caught") dbm.close() diff --git a/parsl/monitoring/monitoring.py b/parsl/monitoring/monitoring.py index a76e2cf487..e1de80116c 100644 --- a/parsl/monitoring/monitoring.py +++ b/parsl/monitoring/monitoring.py @@ -7,7 +7,7 @@ import time from multiprocessing import Event, Process from multiprocessing.queues import Queue -from typing import TYPE_CHECKING, Any, Optional, Tuple, Union, cast +from typing import TYPE_CHECKING, Any, Literal, Optional, Tuple, Union, cast import typeguard @@ -138,27 +138,15 @@ def start(self, dfk_run_dir: str, config_run_dir: Union[str, os.PathLike]) -> No self.exception_q: Queue[Tuple[str, str]] self.exception_q = SizedQueue(maxsize=10) - self.priority_msgs: Queue[Tuple[Any, int]] - self.priority_msgs = SizedQueue() - - self.resource_msgs: Queue[AddressedMonitoringMessage] + self.resource_msgs: Queue[Union[AddressedMonitoringMessage, Tuple[Literal["STOP"], Literal[0]]]] self.resource_msgs = SizedQueue() - self.node_msgs: Queue[AddressedMonitoringMessage] - self.node_msgs = SizedQueue() - - self.block_msgs: Queue[AddressedMonitoringMessage] - self.block_msgs = SizedQueue() - self.router_exit_event: ms.Event self.router_exit_event = Event() self.router_proc = ForkProcess(target=router_starter, kwargs={"comm_q": comm_q, "exception_q": self.exception_q, - "priority_msgs": self.priority_msgs, - "node_msgs": self.node_msgs, - "block_msgs": self.block_msgs, "resource_msgs": self.resource_msgs, "exit_event": self.router_exit_event, "hub_address": self.hub_address, @@ -173,7 +161,7 @@ def start(self, dfk_run_dir: str, config_run_dir: Union[str, os.PathLike]) -> No self.router_proc.start() self.dbm_proc = ForkProcess(target=dbm_starter, - args=(self.exception_q, self.priority_msgs, self.node_msgs, self.block_msgs, self.resource_msgs,), + args=(self.exception_q, self.resource_msgs,), kwargs={"logdir": self.logdir, "logging_level": logging.DEBUG if self.monitoring_debug else logging.INFO, "db_url": self.logging_endpoint, @@ -192,7 +180,7 @@ def start(self, dfk_run_dir: str, config_run_dir: Union[str, os.PathLike]) -> No self.filesystem_proc.start() logger.info(f"Started filesystem radio receiver process {self.filesystem_proc.pid}") - self.radio = MultiprocessingQueueRadioSender(self.block_msgs) + self.radio = MultiprocessingQueueRadioSender(self.resource_msgs) try: comm_q_result = comm_q.get(block=True, timeout=120) @@ -249,7 +237,7 @@ def close(self) -> None: logger.debug("Finished waiting for router termination") if len(exception_msgs) == 0: logger.debug("Sending STOP to DBM") - self.priority_msgs.put(("STOP", 0)) + self.resource_msgs.put(("STOP", 0)) else: logger.debug("Not sending STOP to DBM, because there were DBM exceptions") logger.debug("Waiting for DB termination") @@ -267,14 +255,8 @@ def close(self) -> None: logger.info("Closing monitoring multiprocessing queues") self.exception_q.close() self.exception_q.join_thread() - self.priority_msgs.close() - self.priority_msgs.join_thread() self.resource_msgs.close() self.resource_msgs.join_thread() - self.node_msgs.close() - self.node_msgs.join_thread() - self.block_msgs.close() - self.block_msgs.join_thread() logger.info("Closed monitoring multiprocessing queues") diff --git a/parsl/monitoring/router.py b/parsl/monitoring/router.py index 343410e3a4..e92386c407 100644 --- a/parsl/monitoring/router.py +++ b/parsl/monitoring/router.py @@ -14,7 +14,6 @@ import zmq from parsl.log_utils import set_file_logger -from parsl.monitoring.message_type import MessageType from parsl.monitoring.types import AddressedMonitoringMessage, TaggedMonitoringMessage from parsl.process_loggers import wrap_with_logs from parsl.utils import setproctitle @@ -34,9 +33,6 @@ def __init__(self, logdir: str = ".", logging_level: int = logging.INFO, atexit_timeout: int = 3, # in seconds - priority_msgs: mpq.Queue, - node_msgs: mpq.Queue, - block_msgs: mpq.Queue, resource_msgs: mpq.Queue, exit_event: Event, ): @@ -57,8 +53,8 @@ def __init__(self, Logging level as defined in the logging module. Default: logging.INFO atexit_timeout : float, optional The amount of time in seconds to terminate the hub without receiving any messages, after the last dfk workflow message is received. - *_msgs : Queue - Four multiprocessing queues to receive messages, routed by type tag, and sometimes modified according to type tag. + resource_msgs : multiprocessing.Queue + A multiprocessing queue to receive messages to be routed onwards to the database process exit_event : Event An event that the main Parsl process will set to signal that the monitoring router should shut down. @@ -102,9 +98,6 @@ def __init__(self, min_port=zmq_port_range[0], max_port=zmq_port_range[1]) - self.priority_msgs = priority_msgs - self.node_msgs = node_msgs - self.block_msgs = block_msgs self.resource_msgs = resource_msgs self.exit_event = exit_event @@ -170,24 +163,7 @@ def start_zmq_listener(self) -> None: msg_0: AddressedMonitoringMessage msg_0 = (msg, 0) - if msg[0] == MessageType.NODE_INFO: - self.node_msgs.put(msg_0) - elif msg[0] == MessageType.RESOURCE_INFO: - self.resource_msgs.put(msg_0) - elif msg[0] == MessageType.BLOCK_INFO: - self.block_msgs.put(msg_0) - elif msg[0] == MessageType.TASK_INFO: - self.priority_msgs.put(msg_0) - elif msg[0] == MessageType.WORKFLOW_INFO: - self.priority_msgs.put(msg_0) - else: - # There is a type: ignore here because if msg[0] - # is of the correct type, this code is unreachable, - # but there is no verification that the message - # received from zmq_receiver_channel.recv_pyobj() is actually - # of that type. - self.logger.error("Discarding message " # type: ignore[unreachable] - f"from interchange with unknown type {msg[0].value}") + self.resource_msgs.put(msg_0) except zmq.Again: pass except Exception: @@ -207,9 +183,6 @@ def start_zmq_listener(self) -> None: def router_starter(*, comm_q: mpq.Queue, exception_q: mpq.Queue, - priority_msgs: mpq.Queue, - node_msgs: mpq.Queue, - block_msgs: mpq.Queue, resource_msgs: mpq.Queue, exit_event: Event, @@ -226,9 +199,6 @@ def router_starter(*, zmq_port_range=zmq_port_range, logdir=logdir, logging_level=logging_level, - priority_msgs=priority_msgs, - node_msgs=node_msgs, - block_msgs=block_msgs, resource_msgs=resource_msgs, exit_event=exit_event) except Exception as e: From b284dc1b068e397ad87fe4154e640af60d364c6d Mon Sep 17 00:00:00 2001 From: Kevin Hunter Kesling Date: Wed, 21 Aug 2024 12:32:32 -0400 Subject: [PATCH 4/5] Non-functional change: minor log call-site updates (#3597) Massage log statements to use argument style consistent with recent practice --- .../executors/high_throughput/interchange.py | 60 ++++++++++--------- 1 file changed, 31 insertions(+), 29 deletions(-) diff --git a/parsl/executors/high_throughput/interchange.py b/parsl/executors/high_throughput/interchange.py index fa0969d398..cd7d0596a9 100644 --- a/parsl/executors/high_throughput/interchange.py +++ b/parsl/executors/high_throughput/interchange.py @@ -375,7 +375,7 @@ def start(self) -> None: self.zmq_context.destroy() delta = time.time() - start - logger.info("Processed {} tasks in {} seconds".format(self.count, delta)) + logger.info(f"Processed {self.count} tasks in {delta} seconds") logger.warning("Exiting") def process_task_outgoing_incoming( @@ -396,9 +396,8 @@ def process_task_outgoing_incoming( try: msg = json.loads(message[1].decode('utf-8')) except Exception: - logger.warning("Got Exception reading message from manager: {!r}".format( - manager_id), exc_info=True) - logger.debug("Message: \n{!r}\n".format(message[1])) + logger.warning(f"Got Exception reading message from manager: {manager_id!r}", exc_info=True) + logger.debug("Message:\n %r\n", message[1]) return # perform a bit of validation on the structure of the deserialized @@ -406,7 +405,7 @@ def process_task_outgoing_incoming( # in obviously malformed cases if not isinstance(msg, dict) or 'type' not in msg: logger.error(f"JSON message was not correctly formatted from manager: {manager_id!r}") - logger.debug("Message: \n{!r}\n".format(message[1])) + logger.debug("Message:\n %r\n", message[1]) return if msg['type'] == 'registration': @@ -425,7 +424,7 @@ def process_task_outgoing_incoming( self.connected_block_history.append(msg['block_id']) interesting_managers.add(manager_id) - logger.info("Adding manager: {!r} to ready queue".format(manager_id)) + logger.info(f"Adding manager: {manager_id!r} to ready queue") m = self._ready_managers[manager_id] # m is a ManagerRecord, but msg is a dict[Any,Any] and so can @@ -434,12 +433,12 @@ def process_task_outgoing_incoming( # later. m.update(msg) # type: ignore[typeddict-item] - logger.info("Registration info for manager {!r}: {}".format(manager_id, msg)) + logger.info(f"Registration info for manager {manager_id!r}: {msg}") self._send_monitoring_info(monitoring_radio, m) if (msg['python_v'].rsplit(".", 1)[0] != self.current_platform['python_v'].rsplit(".", 1)[0] or msg['parsl_v'] != self.current_platform['parsl_v']): - logger.error("Manager {!r} has incompatible version info with the interchange".format(manager_id)) + logger.error(f"Manager {manager_id!r} has incompatible version info with the interchange") logger.debug("Setting kill event") kill_event.set() e = VersionMismatch("py.v={} parsl.v={}".format(self.current_platform['python_v'].rsplit(".", 1)[0], @@ -452,16 +451,15 @@ def process_task_outgoing_incoming( self.results_outgoing.send(pkl_package) logger.error("Sent failure reports, shutting down interchange") else: - logger.info("Manager {!r} has compatible Parsl version {}".format(manager_id, msg['parsl_v'])) - logger.info("Manager {!r} has compatible Python version {}".format(manager_id, - msg['python_v'].rsplit(".", 1)[0])) + logger.info(f"Manager {manager_id!r} has compatible Parsl version {msg['parsl_v']}") + logger.info(f"Manager {manager_id!r} has compatible Python version {msg['python_v'].rsplit('.', 1)[0]}") elif msg['type'] == 'heartbeat': self._ready_managers[manager_id]['last_heartbeat'] = time.time() - logger.debug("Manager {!r} sent heartbeat via tasks connection".format(manager_id)) + logger.debug("Manager %r sent heartbeat via tasks connection", manager_id) self.task_outgoing.send_multipart([manager_id, b'', PKL_HEARTBEAT_CODE]) elif msg['type'] == 'drain': self._ready_managers[manager_id]['draining'] = True - logger.debug(f"Manager {manager_id!r} requested drain") + logger.debug("Manager %r requested drain", manager_id) else: logger.error(f"Unexpected message type received from manager: {msg['type']}") logger.debug("leaving task_outgoing section") @@ -484,9 +482,11 @@ def expire_drained_managers(self, interesting_managers: Set[bytes], monitoring_r def process_tasks_to_send(self, interesting_managers: Set[bytes]) -> None: # Check if there are tasks that could be sent to managers - logger.debug("Managers count (interesting/total): {interesting}/{total}".format( - total=len(self._ready_managers), - interesting=len(interesting_managers))) + logger.debug( + "Managers count (interesting/total): {}/{}", + len(interesting_managers), + len(self._ready_managers) + ) if interesting_managers and not self.pending_task_queue.empty(): shuffled_managers = self.manager_selector.sort_managers(self._ready_managers, interesting_managers) @@ -497,7 +497,7 @@ def process_tasks_to_send(self, interesting_managers: Set[bytes]) -> None: tasks_inflight = len(m['tasks']) real_capacity = m['max_capacity'] - tasks_inflight - if (real_capacity and m['active'] and not m['draining']): + if real_capacity and m["active"] and not m["draining"]: tasks = self.get_tasks(real_capacity) if tasks: self.task_outgoing.send_multipart([manager_id, b'', pickle.dumps(tasks)]) @@ -506,19 +506,19 @@ def process_tasks_to_send(self, interesting_managers: Set[bytes]) -> None: tids = [t['task_id'] for t in tasks] m['tasks'].extend(tids) m['idle_since'] = None - logger.debug("Sent tasks: {} to manager {!r}".format(tids, manager_id)) + logger.debug("Sent tasks: %s to manager %r", tids, manager_id) # recompute real_capacity after sending tasks real_capacity = m['max_capacity'] - tasks_inflight if real_capacity > 0: - logger.debug("Manager {!r} has free capacity {}".format(manager_id, real_capacity)) + logger.debug("Manager %r has free capacity %s", manager_id, real_capacity) # ... so keep it in the interesting_managers list else: - logger.debug("Manager {!r} is now saturated".format(manager_id)) + logger.debug("Manager %r is now saturated", manager_id) interesting_managers.remove(manager_id) else: interesting_managers.remove(manager_id) # logger.debug("Nothing to send to manager {}".format(manager_id)) - logger.debug("leaving _ready_managers section, with {} managers still interesting".format(len(interesting_managers))) + logger.debug("leaving _ready_managers section, with %s managers still interesting", len(interesting_managers)) else: logger.debug("either no interesting managers or no tasks, so skipping manager pass") @@ -528,9 +528,9 @@ def process_results_incoming(self, interesting_managers: Set[bytes], monitoring_ logger.debug("entering results_incoming section") manager_id, *all_messages = self.results_incoming.recv_multipart() if manager_id not in self._ready_managers: - logger.warning("Received a result from a un-registered manager: {!r}".format(manager_id)) + logger.warning(f"Received a result from a un-registered manager: {manager_id!r}") else: - logger.debug(f"Got {len(all_messages)} result items in batch from manager {manager_id!r}") + logger.debug("Got %s result items in batch from manager %r", len(all_messages), manager_id) b_messages = [] @@ -548,10 +548,10 @@ def process_results_incoming(self, interesting_managers: Set[bytes], monitoring_ monitoring_radio.send(r['payload']) elif r['type'] == 'heartbeat': - logger.debug(f"Manager {manager_id!r} sent heartbeat via results connection") + logger.debug("Manager %r sent heartbeat via results connection", manager_id) b_messages.append((p_message, r)) else: - logger.error("Interchange discarding result_queue message of unknown type: {}".format(r['type'])) + logger.error("Interchange discarding result_queue message of unknown type: %s", r["type"]) got_result = False m = self._ready_managers[manager_id] @@ -560,14 +560,16 @@ def process_results_incoming(self, interesting_managers: Set[bytes], monitoring_ if r['type'] == 'result': got_result = True try: - logger.debug(f"Removing task {r['task_id']} from manager record {manager_id!r}") + logger.debug("Removing task %s from manager record %r", r["task_id"], manager_id) m['tasks'].remove(r['task_id']) except Exception: # If we reach here, there's something very wrong. - logger.exception("Ignoring exception removing task_id {} for manager {!r} with task list {}".format( + logger.exception( + "Ignoring exception removing task_id %s for manager %r with task list %s", r['task_id'], manager_id, - m['tasks'])) + m["tasks"] + ) b_messages_to_send = [] for (b_message, _) in b_messages: @@ -578,7 +580,7 @@ def process_results_incoming(self, interesting_managers: Set[bytes], monitoring_ self.results_outgoing.send_multipart(b_messages_to_send) logger.debug("Sent messages on results_outgoing") - logger.debug(f"Current tasks on manager {manager_id!r}: {m['tasks']}") + logger.debug("Current tasks on manager %r: %s", manager_id, m["tasks"]) if len(m['tasks']) == 0 and m['idle_since'] is None: m['idle_since'] = time.time() From 789ee82606b4a625bfa4c8d2c1870f7e09456821 Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Thu, 22 Aug 2024 17:21:33 +0200 Subject: [PATCH 5/5] Fix incorrect string template introduced in PR #3597 (#3599) --- parsl/executors/high_throughput/interchange.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parsl/executors/high_throughput/interchange.py b/parsl/executors/high_throughput/interchange.py index cd7d0596a9..d61c76fed2 100644 --- a/parsl/executors/high_throughput/interchange.py +++ b/parsl/executors/high_throughput/interchange.py @@ -483,7 +483,7 @@ def process_tasks_to_send(self, interesting_managers: Set[bytes]) -> None: # Check if there are tasks that could be sent to managers logger.debug( - "Managers count (interesting/total): {}/{}", + "Managers count (interesting/total): %d/%d", len(interesting_managers), len(self._ready_managers) )