diff --git a/parsl/monitoring/db_manager.py b/parsl/monitoring/db_manager.py index 8f9f302640..853bc4c3c7 100644 --- a/parsl/monitoring/db_manager.py +++ b/parsl/monitoring/db_manager.py @@ -1,11 +1,14 @@ import datetime import logging +import multiprocessing.queues as mpq import os import queue import threading import time from typing import Any, Dict, List, Optional, Set, Tuple, TypeVar, cast +import typeguard + from parsl.dataflow.states import States from parsl.errors import OptionalModuleMissing from parsl.log_utils import set_file_logger @@ -305,10 +308,10 @@ def __init__(self, self.pending_resource_queue: queue.Queue[MonitoringMessage] = queue.Queue() def start(self, - priority_queue: "queue.Queue[TaggedMonitoringMessage]", - node_queue: "queue.Queue[MonitoringMessage]", - block_queue: "queue.Queue[MonitoringMessage]", - resource_queue: "queue.Queue[MonitoringMessage]") -> None: + priority_queue: mpq.Queue, + node_queue: mpq.Queue, + block_queue: mpq.Queue, + resource_queue: mpq.Queue) -> None: self._kill_event = threading.Event() self._priority_queue_pull_thread = threading.Thread(target=self._migrate_logs_to_internal, @@ -719,11 +722,12 @@ def close(self) -> None: @wrap_with_logs(target="database_manager") -def dbm_starter(exception_q: "queue.Queue[Tuple[str, str]]", - priority_msgs: "queue.Queue[TaggedMonitoringMessage]", - node_msgs: "queue.Queue[MonitoringMessage]", - block_msgs: "queue.Queue[MonitoringMessage]", - resource_msgs: "queue.Queue[MonitoringMessage]", +@typeguard.typechecked +def dbm_starter(exception_q: mpq.Queue, + priority_msgs: mpq.Queue, + node_msgs: mpq.Queue, + block_msgs: mpq.Queue, + resource_msgs: mpq.Queue, db_url: str, logdir: str, logging_level: int) -> None: diff --git a/parsl/monitoring/router.py b/parsl/monitoring/router.py index 4be454b797..7cce223048 100644 --- a/parsl/monitoring/router.py +++ b/parsl/monitoring/router.py @@ -1,15 +1,16 @@ from __future__ import annotations import logging +import multiprocessing.queues as mpq import os import pickle -import queue import socket import threading import time from multiprocessing.synchronize import Event -from typing import Optional, Tuple, Union +from typing import Optional, Tuple +import typeguard import zmq from parsl.log_utils import set_file_logger @@ -33,10 +34,10 @@ def __init__(self, logdir: str = ".", logging_level: int = logging.INFO, atexit_timeout: int = 3, # in seconds - priority_msgs: "queue.Queue[AddressedMonitoringMessage]", - node_msgs: "queue.Queue[AddressedMonitoringMessage]", - block_msgs: "queue.Queue[AddressedMonitoringMessage]", - resource_msgs: "queue.Queue[AddressedMonitoringMessage]", + priority_msgs: mpq.Queue, + node_msgs: mpq.Queue, + block_msgs: mpq.Queue, + resource_msgs: mpq.Queue, exit_event: Event, ): """ Initializes a monitoring configuration class. @@ -202,12 +203,13 @@ def start_zmq_listener(self) -> None: @wrap_with_logs -def router_starter(comm_q: "queue.Queue[Union[Tuple[int, int], str]]", - exception_q: "queue.Queue[Tuple[str, str]]", - priority_msgs: "queue.Queue[AddressedMonitoringMessage]", - node_msgs: "queue.Queue[AddressedMonitoringMessage]", - block_msgs: "queue.Queue[AddressedMonitoringMessage]", - resource_msgs: "queue.Queue[AddressedMonitoringMessage]", +@typeguard.typechecked +def router_starter(comm_q: mpq.Queue, + exception_q: mpq.Queue, + priority_msgs: mpq.Queue, + node_msgs: mpq.Queue, + block_msgs: mpq.Queue, + resource_msgs: mpq.Queue, exit_event: Event, hub_address: str,