Skip to content

Commit

Permalink
router type annotations were incorrectly introduced in PR #1936 in 2021
Browse files Browse the repository at this point in the history
they were introduced as `queue` module queues, but actually the values
(and the annotations used at the other end of the router_starter call)
are multiprocessing.Queue (subclasses...)

mypy does not typecheck this call... a later PR introduces typeguard here
(which detects this)

this hasn't caused execution problems - the APIs of the two queue types are close enough
- but it is incorrect when trying to understand how pieces of monitoring
communicate with each other.
  • Loading branch information
benclifford committed Aug 8, 2024
1 parent 9e2935c commit 467f006
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 20 deletions.
19 changes: 10 additions & 9 deletions parsl/monitoring/db_manager.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import datetime
import logging
import multiprocessing.queues as mpq
import os
import queue
import threading
Expand Down Expand Up @@ -307,10 +308,10 @@ def __init__(self,
self.pending_resource_queue = queue.Queue() # type: queue.Queue[MonitoringMessage]

def start(self,
priority_queue: "queue.Queue[TaggedMonitoringMessage]",
node_queue: "queue.Queue[MonitoringMessage]",
block_queue: "queue.Queue[MonitoringMessage]",
resource_queue: "queue.Queue[MonitoringMessage]") -> None:
priority_queue: "mpq.Queue[TaggedMonitoringMessage]",
node_queue: "mpq.Queue[MonitoringMessage]",
block_queue: "mpq.Queue[MonitoringMessage]",
resource_queue: "mpq.Queue[MonitoringMessage]") -> None:

self._kill_event = threading.Event()
self._priority_queue_pull_thread = threading.Thread(target=self._migrate_logs_to_internal,
Expand Down Expand Up @@ -722,11 +723,11 @@ def close(self) -> None:

@wrap_with_logs(target="database_manager")
@typeguard.typechecked
def dbm_starter(exception_q: "queue.Queue[Tuple[str, str]]",
priority_msgs: "queue.Queue[TaggedMonitoringMessage]",
node_msgs: "queue.Queue[MonitoringMessage]",
block_msgs: "queue.Queue[MonitoringMessage]",
resource_msgs: "queue.Queue[MonitoringMessage]",
def dbm_starter(exception_q: "mpq.Queue[Tuple[str, str]]",
priority_msgs: "mpq.Queue[TaggedMonitoringMessage]",
node_msgs: "mpq.Queue[MonitoringMessage]",
block_msgs: "mpq.Queue[MonitoringMessage]",
resource_msgs: "mpq.Queue[MonitoringMessage]",
db_url: str,
logdir: str,
logging_level: int) -> None:
Expand Down
22 changes: 11 additions & 11 deletions parsl/monitoring/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@
import logging
import os
import pickle
import queue
import socket
import threading
import time
from multiprocessing.queues import Queue
from multiprocessing.synchronize import Event
from typing import Optional, Tuple, Union

Expand Down Expand Up @@ -34,10 +34,10 @@ def __init__(self,
logdir: str = ".",
logging_level: int = logging.INFO,
atexit_timeout: int = 3, # in seconds
priority_msgs: "queue.Queue[AddressedMonitoringMessage]",
node_msgs: "queue.Queue[AddressedMonitoringMessage]",
block_msgs: "queue.Queue[AddressedMonitoringMessage]",
resource_msgs: "queue.Queue[AddressedMonitoringMessage]",
priority_msgs: "Queue[AddressedMonitoringMessage]",
node_msgs: "Queue[AddressedMonitoringMessage]",
block_msgs: "Queue[AddressedMonitoringMessage]",
resource_msgs: "Queue[AddressedMonitoringMessage]",
exit_event: Event,
):
""" Initializes a monitoring configuration class.
Expand Down Expand Up @@ -204,12 +204,12 @@ def start_zmq_listener(self) -> None:

@wrap_with_logs
@typeguard.typechecked
def router_starter(comm_q: "queue.Queue[Union[Tuple[int, int], str]]",
exception_q: "queue.Queue[Tuple[str, str]]",
priority_msgs: "queue.Queue[AddressedMonitoringMessage]",
node_msgs: "queue.Queue[AddressedMonitoringMessage]",
block_msgs: "queue.Queue[AddressedMonitoringMessage]",
resource_msgs: "queue.Queue[AddressedMonitoringMessage]",
def router_starter(comm_q: "Queue[Union[Tuple[int, int], str]]",
exception_q: "Queue[Tuple[str, str]]",
priority_msgs: "Queue[AddressedMonitoringMessage]",
node_msgs: "Queue[AddressedMonitoringMessage]",
block_msgs: "Queue[AddressedMonitoringMessage]",
resource_msgs: "Queue[AddressedMonitoringMessage]",
exit_event: Event,

hub_address: str,
Expand Down

0 comments on commit 467f006

Please sign in to comment.