diff --git a/parsl/monitoring/router.py b/parsl/monitoring/router.py index a9c2c4bcd4..73e36d31af 100644 --- a/parsl/monitoring/router.py +++ b/parsl/monitoring/router.py @@ -1,12 +1,12 @@ from __future__ import annotations import logging +import multiprocessing.queues as mpq import os import pickle import socket import threading import time -from multiprocessing.queues import Queue from multiprocessing.synchronize import Event from typing import Optional, Tuple, Union @@ -34,10 +34,10 @@ def __init__(self, logdir: str = ".", logging_level: int = logging.INFO, atexit_timeout: int = 3, # in seconds - priority_msgs: "Queue[AddressedMonitoringMessage]", - node_msgs: "Queue[AddressedMonitoringMessage]", - block_msgs: "Queue[AddressedMonitoringMessage]", - resource_msgs: "Queue[AddressedMonitoringMessage]", + priority_msgs: "mpq.Queue[AddressedMonitoringMessage]", + node_msgs: "mpq.Queue[AddressedMonitoringMessage]", + block_msgs: "mpq.Queue[AddressedMonitoringMessage]", + resource_msgs: "mpq.Queue[AddressedMonitoringMessage]", exit_event: Event, ): """ Initializes a monitoring configuration class. @@ -204,12 +204,12 @@ def start_zmq_listener(self) -> None: @wrap_with_logs @typeguard.typechecked -def router_starter(comm_q: "Queue[Union[Tuple[int, int], str]]", - exception_q: "Queue[Tuple[str, str]]", - priority_msgs: "Queue[AddressedMonitoringMessage]", - node_msgs: "Queue[AddressedMonitoringMessage]", - block_msgs: "Queue[AddressedMonitoringMessage]", - resource_msgs: "Queue[AddressedMonitoringMessage]", +def router_starter(comm_q: "mpq.Queue[Union[Tuple[int, int], str]]", + exception_q: "mpq.Queue[Tuple[str, str]]", + priority_msgs: "mpq.Queue[AddressedMonitoringMessage]", + node_msgs: "mpq.Queue[AddressedMonitoringMessage]", + block_msgs: "mpq.Queue[AddressedMonitoringMessage]", + resource_msgs: "mpq.Queue[AddressedMonitoringMessage]", exit_event: Event, hub_address: str,