Skip to content

Commit

Permalink
Merge branch 'Parsl:master' into Glossary
Browse files Browse the repository at this point in the history
  • Loading branch information
Kanegraffiti authored Aug 23, 2024
2 parents a964e63 + 789ee82 commit 7beda8a
Show file tree
Hide file tree
Showing 6 changed files with 87 additions and 131 deletions.
5 changes: 4 additions & 1 deletion README.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Parsl - Parallel Scripting Library
==================================
|licence| |build-status| |docs| |NSF-1550588| |NSF-1550476| |NSF-1550562| |NSF-1550528|
|licence| |build-status| |docs| |NSF-1550588| |NSF-1550476| |NSF-1550562| |NSF-1550528| |CZI-EOSS|

Parsl extends parallelism in Python beyond a single computer.

Expand Down Expand Up @@ -64,6 +64,9 @@ then explore the `parallel computing patterns <https://parsl.readthedocs.io/en/s
.. |NSF-1550475| image:: https://img.shields.io/badge/NSF-1550475-blue.svg
:target: https://nsf.gov/awardsearch/showAward?AWD_ID=1550475
:alt: NSF award info
.. |CZI-EOSS| image:: https://chanzuckerberg.github.io/open-science/badges/CZI-EOSS.svg
:target: https://czi.co/EOSS
:alt: CZI's Essential Open Source Software for Science


Quickstart
Expand Down
60 changes: 31 additions & 29 deletions parsl/executors/high_throughput/interchange.py
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,7 @@ def start(self) -> None:

self.zmq_context.destroy()
delta = time.time() - start
logger.info("Processed {} tasks in {} seconds".format(self.count, delta))
logger.info(f"Processed {self.count} tasks in {delta} seconds")
logger.warning("Exiting")

def process_task_outgoing_incoming(
Expand All @@ -396,17 +396,16 @@ def process_task_outgoing_incoming(
try:
msg = json.loads(message[1].decode('utf-8'))
except Exception:
logger.warning("Got Exception reading message from manager: {!r}".format(
manager_id), exc_info=True)
logger.debug("Message: \n{!r}\n".format(message[1]))
logger.warning(f"Got Exception reading message from manager: {manager_id!r}", exc_info=True)
logger.debug("Message:\n %r\n", message[1])
return

# perform a bit of validation on the structure of the deserialized
# object, at least enough to behave like a deserialization error
# in obviously malformed cases
if not isinstance(msg, dict) or 'type' not in msg:
logger.error(f"JSON message was not correctly formatted from manager: {manager_id!r}")
logger.debug("Message: \n{!r}\n".format(message[1]))
logger.debug("Message:\n %r\n", message[1])
return

if msg['type'] == 'registration':
Expand All @@ -425,7 +424,7 @@ def process_task_outgoing_incoming(
self.connected_block_history.append(msg['block_id'])

interesting_managers.add(manager_id)
logger.info("Adding manager: {!r} to ready queue".format(manager_id))
logger.info(f"Adding manager: {manager_id!r} to ready queue")
m = self._ready_managers[manager_id]

# m is a ManagerRecord, but msg is a dict[Any,Any] and so can
Expand All @@ -434,12 +433,12 @@ def process_task_outgoing_incoming(
# later.
m.update(msg) # type: ignore[typeddict-item]

logger.info("Registration info for manager {!r}: {}".format(manager_id, msg))
logger.info(f"Registration info for manager {manager_id!r}: {msg}")
self._send_monitoring_info(monitoring_radio, m)

if (msg['python_v'].rsplit(".", 1)[0] != self.current_platform['python_v'].rsplit(".", 1)[0] or
msg['parsl_v'] != self.current_platform['parsl_v']):
logger.error("Manager {!r} has incompatible version info with the interchange".format(manager_id))
logger.error(f"Manager {manager_id!r} has incompatible version info with the interchange")
logger.debug("Setting kill event")
kill_event.set()
e = VersionMismatch("py.v={} parsl.v={}".format(self.current_platform['python_v'].rsplit(".", 1)[0],
Expand All @@ -452,16 +451,15 @@ def process_task_outgoing_incoming(
self.results_outgoing.send(pkl_package)
logger.error("Sent failure reports, shutting down interchange")
else:
logger.info("Manager {!r} has compatible Parsl version {}".format(manager_id, msg['parsl_v']))
logger.info("Manager {!r} has compatible Python version {}".format(manager_id,
msg['python_v'].rsplit(".", 1)[0]))
logger.info(f"Manager {manager_id!r} has compatible Parsl version {msg['parsl_v']}")
logger.info(f"Manager {manager_id!r} has compatible Python version {msg['python_v'].rsplit('.', 1)[0]}")
elif msg['type'] == 'heartbeat':
self._ready_managers[manager_id]['last_heartbeat'] = time.time()
logger.debug("Manager {!r} sent heartbeat via tasks connection".format(manager_id))
logger.debug("Manager %r sent heartbeat via tasks connection", manager_id)
self.task_outgoing.send_multipart([manager_id, b'', PKL_HEARTBEAT_CODE])
elif msg['type'] == 'drain':
self._ready_managers[manager_id]['draining'] = True
logger.debug(f"Manager {manager_id!r} requested drain")
logger.debug("Manager %r requested drain", manager_id)
else:
logger.error(f"Unexpected message type received from manager: {msg['type']}")
logger.debug("leaving task_outgoing section")
Expand All @@ -484,9 +482,11 @@ def expire_drained_managers(self, interesting_managers: Set[bytes], monitoring_r
def process_tasks_to_send(self, interesting_managers: Set[bytes]) -> None:
# Check if there are tasks that could be sent to managers

logger.debug("Managers count (interesting/total): {interesting}/{total}".format(
total=len(self._ready_managers),
interesting=len(interesting_managers)))
logger.debug(
"Managers count (interesting/total): %d/%d",
len(interesting_managers),
len(self._ready_managers)
)

if interesting_managers and not self.pending_task_queue.empty():
shuffled_managers = self.manager_selector.sort_managers(self._ready_managers, interesting_managers)
Expand All @@ -497,7 +497,7 @@ def process_tasks_to_send(self, interesting_managers: Set[bytes]) -> None:
tasks_inflight = len(m['tasks'])
real_capacity = m['max_capacity'] - tasks_inflight

if (real_capacity and m['active'] and not m['draining']):
if real_capacity and m["active"] and not m["draining"]:
tasks = self.get_tasks(real_capacity)
if tasks:
self.task_outgoing.send_multipart([manager_id, b'', pickle.dumps(tasks)])
Expand All @@ -506,19 +506,19 @@ def process_tasks_to_send(self, interesting_managers: Set[bytes]) -> None:
tids = [t['task_id'] for t in tasks]
m['tasks'].extend(tids)
m['idle_since'] = None
logger.debug("Sent tasks: {} to manager {!r}".format(tids, manager_id))
logger.debug("Sent tasks: %s to manager %r", tids, manager_id)
# recompute real_capacity after sending tasks
real_capacity = m['max_capacity'] - tasks_inflight
if real_capacity > 0:
logger.debug("Manager {!r} has free capacity {}".format(manager_id, real_capacity))
logger.debug("Manager %r has free capacity %s", manager_id, real_capacity)
# ... so keep it in the interesting_managers list
else:
logger.debug("Manager {!r} is now saturated".format(manager_id))
logger.debug("Manager %r is now saturated", manager_id)
interesting_managers.remove(manager_id)
else:
interesting_managers.remove(manager_id)
# logger.debug("Nothing to send to manager {}".format(manager_id))
logger.debug("leaving _ready_managers section, with {} managers still interesting".format(len(interesting_managers)))
logger.debug("leaving _ready_managers section, with %s managers still interesting", len(interesting_managers))
else:
logger.debug("either no interesting managers or no tasks, so skipping manager pass")

Expand All @@ -528,9 +528,9 @@ def process_results_incoming(self, interesting_managers: Set[bytes], monitoring_
logger.debug("entering results_incoming section")
manager_id, *all_messages = self.results_incoming.recv_multipart()
if manager_id not in self._ready_managers:
logger.warning("Received a result from a un-registered manager: {!r}".format(manager_id))
logger.warning(f"Received a result from a un-registered manager: {manager_id!r}")
else:
logger.debug(f"Got {len(all_messages)} result items in batch from manager {manager_id!r}")
logger.debug("Got %s result items in batch from manager %r", len(all_messages), manager_id)

b_messages = []

Expand All @@ -548,10 +548,10 @@ def process_results_incoming(self, interesting_managers: Set[bytes], monitoring_

monitoring_radio.send(r['payload'])
elif r['type'] == 'heartbeat':
logger.debug(f"Manager {manager_id!r} sent heartbeat via results connection")
logger.debug("Manager %r sent heartbeat via results connection", manager_id)
b_messages.append((p_message, r))
else:
logger.error("Interchange discarding result_queue message of unknown type: {}".format(r['type']))
logger.error("Interchange discarding result_queue message of unknown type: %s", r["type"])

got_result = False
m = self._ready_managers[manager_id]
Expand All @@ -560,14 +560,16 @@ def process_results_incoming(self, interesting_managers: Set[bytes], monitoring_
if r['type'] == 'result':
got_result = True
try:
logger.debug(f"Removing task {r['task_id']} from manager record {manager_id!r}")
logger.debug("Removing task %s from manager record %r", r["task_id"], manager_id)
m['tasks'].remove(r['task_id'])
except Exception:
# If we reach here, there's something very wrong.
logger.exception("Ignoring exception removing task_id {} for manager {!r} with task list {}".format(
logger.exception(
"Ignoring exception removing task_id %s for manager %r with task list %s",
r['task_id'],
manager_id,
m['tasks']))
m["tasks"]
)

b_messages_to_send = []
for (b_message, _) in b_messages:
Expand All @@ -578,7 +580,7 @@ def process_results_incoming(self, interesting_managers: Set[bytes], monitoring_
self.results_outgoing.send_multipart(b_messages_to_send)
logger.debug("Sent messages on results_outgoing")

logger.debug(f"Current tasks on manager {manager_id!r}: {m['tasks']}")
logger.debug("Current tasks on manager %r: %s", manager_id, m["tasks"])
if len(m['tasks']) == 0 and m['idle_since'] is None:
m['idle_since'] = time.time()

Expand Down
39 changes: 4 additions & 35 deletions parsl/monitoring/db_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -308,35 +308,9 @@ def __init__(self,
self.pending_resource_queue: queue.Queue[MonitoringMessage] = queue.Queue()

def start(self,
priority_queue: mpq.Queue,
node_queue: mpq.Queue,
block_queue: mpq.Queue,
resource_queue: mpq.Queue) -> None:

self._kill_event = threading.Event()
self._priority_queue_pull_thread = threading.Thread(target=self._migrate_logs_to_internal,
args=(
priority_queue, self._kill_event,),
name="Monitoring-migrate-priority",
daemon=True,
)
self._priority_queue_pull_thread.start()

self._node_queue_pull_thread = threading.Thread(target=self._migrate_logs_to_internal,
args=(
node_queue, self._kill_event,),
name="Monitoring-migrate-node",
daemon=True,
)
self._node_queue_pull_thread.start()

self._block_queue_pull_thread = threading.Thread(target=self._migrate_logs_to_internal,
args=(
block_queue, self._kill_event,),
name="Monitoring-migrate-block",
daemon=True,
)
self._block_queue_pull_thread.start()

self._resource_queue_pull_thread = threading.Thread(target=self._migrate_logs_to_internal,
args=(
Expand Down Expand Up @@ -372,20 +346,18 @@ def start(self,
while (not self._kill_event.is_set() or
self.pending_priority_queue.qsize() != 0 or self.pending_resource_queue.qsize() != 0 or
self.pending_node_queue.qsize() != 0 or self.pending_block_queue.qsize() != 0 or
priority_queue.qsize() != 0 or resource_queue.qsize() != 0 or
node_queue.qsize() != 0 or block_queue.qsize() != 0):
resource_queue.qsize() != 0):

"""
WORKFLOW_INFO and TASK_INFO messages (i.e. priority messages)
"""
try:
logger.debug("""Checking STOP conditions: {}, {}, {}, {}, {}, {}, {}, {}, {}""".format(
logger.debug("""Checking STOP conditions: {}, {}, {}, {}, {}, {}""".format(
self._kill_event.is_set(),
self.pending_priority_queue.qsize() != 0, self.pending_resource_queue.qsize() != 0,
self.pending_node_queue.qsize() != 0, self.pending_block_queue.qsize() != 0,
priority_queue.qsize() != 0, resource_queue.qsize() != 0,
node_queue.qsize() != 0, block_queue.qsize() != 0))
resource_queue.qsize() != 0))

# This is the list of resource messages which can be reprocessed as if they
# had just arrived because the corresponding first task message has been
Expand Down Expand Up @@ -707,9 +679,6 @@ def close(self) -> None:
@wrap_with_logs(target="database_manager")
@typeguard.typechecked
def dbm_starter(exception_q: mpq.Queue,
priority_msgs: mpq.Queue,
node_msgs: mpq.Queue,
block_msgs: mpq.Queue,
resource_msgs: mpq.Queue,
db_url: str,
logdir: str,
Expand All @@ -726,7 +695,7 @@ def dbm_starter(exception_q: mpq.Queue,
logdir=logdir,
logging_level=logging_level)
logger.info("Starting dbm in dbm starter")
dbm.start(priority_msgs, node_msgs, block_msgs, resource_msgs)
dbm.start(resource_msgs)
except KeyboardInterrupt:
logger.exception("KeyboardInterrupt signal caught")
dbm.close()
Expand Down
28 changes: 5 additions & 23 deletions parsl/monitoring/monitoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import time
from multiprocessing import Event, Process
from multiprocessing.queues import Queue
from typing import TYPE_CHECKING, Any, Optional, Tuple, Union, cast
from typing import TYPE_CHECKING, Any, Literal, Optional, Tuple, Union, cast

import typeguard

Expand Down Expand Up @@ -138,27 +138,15 @@ def start(self, dfk_run_dir: str, config_run_dir: Union[str, os.PathLike]) -> No
self.exception_q: Queue[Tuple[str, str]]
self.exception_q = SizedQueue(maxsize=10)

self.priority_msgs: Queue[Tuple[Any, int]]
self.priority_msgs = SizedQueue()

self.resource_msgs: Queue[AddressedMonitoringMessage]
self.resource_msgs: Queue[Union[AddressedMonitoringMessage, Tuple[Literal["STOP"], Literal[0]]]]
self.resource_msgs = SizedQueue()

self.node_msgs: Queue[AddressedMonitoringMessage]
self.node_msgs = SizedQueue()

self.block_msgs: Queue[AddressedMonitoringMessage]
self.block_msgs = SizedQueue()

self.router_exit_event: ms.Event
self.router_exit_event = Event()

self.router_proc = ForkProcess(target=router_starter,
kwargs={"comm_q": comm_q,
"exception_q": self.exception_q,
"priority_msgs": self.priority_msgs,
"node_msgs": self.node_msgs,
"block_msgs": self.block_msgs,
"resource_msgs": self.resource_msgs,
"exit_event": self.router_exit_event,
"hub_address": self.hub_address,
Expand All @@ -173,7 +161,7 @@ def start(self, dfk_run_dir: str, config_run_dir: Union[str, os.PathLike]) -> No
self.router_proc.start()

self.dbm_proc = ForkProcess(target=dbm_starter,
args=(self.exception_q, self.priority_msgs, self.node_msgs, self.block_msgs, self.resource_msgs,),
args=(self.exception_q, self.resource_msgs,),
kwargs={"logdir": self.logdir,
"logging_level": logging.DEBUG if self.monitoring_debug else logging.INFO,
"db_url": self.logging_endpoint,
Expand All @@ -192,7 +180,7 @@ def start(self, dfk_run_dir: str, config_run_dir: Union[str, os.PathLike]) -> No
self.filesystem_proc.start()
logger.info(f"Started filesystem radio receiver process {self.filesystem_proc.pid}")

self.radio = MultiprocessingQueueRadioSender(self.block_msgs)
self.radio = MultiprocessingQueueRadioSender(self.resource_msgs)

try:
comm_q_result = comm_q.get(block=True, timeout=120)
Expand Down Expand Up @@ -249,7 +237,7 @@ def close(self) -> None:
logger.debug("Finished waiting for router termination")
if len(exception_msgs) == 0:
logger.debug("Sending STOP to DBM")
self.priority_msgs.put(("STOP", 0))
self.resource_msgs.put(("STOP", 0))
else:
logger.debug("Not sending STOP to DBM, because there were DBM exceptions")
logger.debug("Waiting for DB termination")
Expand All @@ -267,14 +255,8 @@ def close(self) -> None:
logger.info("Closing monitoring multiprocessing queues")
self.exception_q.close()
self.exception_q.join_thread()
self.priority_msgs.close()
self.priority_msgs.join_thread()
self.resource_msgs.close()
self.resource_msgs.join_thread()
self.node_msgs.close()
self.node_msgs.join_thread()
self.block_msgs.close()
self.block_msgs.join_thread()
logger.info("Closed monitoring multiprocessing queues")


Expand Down
Loading

0 comments on commit 7beda8a

Please sign in to comment.