diff --git a/parsl/dataflow/dflow.py b/parsl/dataflow/dflow.py index 2cb2f4d660..164878a66f 100644 --- a/parsl/dataflow/dflow.py +++ b/parsl/dataflow/dflow.py @@ -162,8 +162,8 @@ def __init__(self, config: Config) -> None: } if self.monitoring: - self.monitoring.send(MessageType.WORKFLOW_INFO, - workflow_info) + self.monitoring.send((MessageType.WORKFLOW_INFO, + workflow_info)) if config.checkpoint_files is not None: checkpoints = self.load_checkpoints(config.checkpoint_files) @@ -238,7 +238,7 @@ def __exit__(self, exc_type, exc_value, traceback) -> None: def _send_task_log_info(self, task_record: TaskRecord) -> None: if self.monitoring: task_log_info = self._create_task_log_info(task_record) - self.monitoring.send(MessageType.TASK_INFO, task_log_info) + self.monitoring.send((MessageType.TASK_INFO, task_log_info)) def _create_task_log_info(self, task_record: TaskRecord) -> Dict[str, Any]: """ @@ -1295,12 +1295,12 @@ def cleanup(self) -> None: if self.monitoring: logger.info("Sending final monitoring message") - self.monitoring.send(MessageType.WORKFLOW_INFO, + self.monitoring.send((MessageType.WORKFLOW_INFO, {'tasks_failed_count': self.task_state_counts[States.failed], 'tasks_completed_count': self.task_state_counts[States.exec_done], "time_began": self.time_began, 'time_completed': self.time_completed, - 'run_id': self.run_id, 'rundir': self.run_dir}) + 'run_id': self.run_id, 'rundir': self.run_dir})) logger.info("Terminating monitoring") self.monitoring.close() diff --git a/parsl/monitoring/monitoring.py b/parsl/monitoring/monitoring.py index f5079112ca..794dbb841f 100644 --- a/parsl/monitoring/monitoring.py +++ b/parsl/monitoring/monitoring.py @@ -7,13 +7,12 @@ import time from multiprocessing import Event, Process from multiprocessing.queues import Queue -from typing import TYPE_CHECKING, Any, Literal, Optional, Tuple, Union, cast +from typing import TYPE_CHECKING, Literal, Optional, Tuple, Union, cast import typeguard from parsl.log_utils import set_file_logger from parsl.monitoring.errors import MonitoringHubStartError -from parsl.monitoring.message_type import MessageType from parsl.monitoring.radios import MultiprocessingQueueRadioSender from parsl.monitoring.router import router_starter from parsl.monitoring.types import TaggedMonitoringMessage @@ -202,10 +201,9 @@ def start(self, dfk_run_dir: str, config_run_dir: Union[str, os.PathLike]) -> No self.hub_zmq_port = zmq_port - # TODO: tighten the Any message format - def send(self, mtype: MessageType, message: Any) -> None: - logger.debug("Sending message type {}".format(mtype)) - self.radio.send((mtype, message)) + def send(self, message: TaggedMonitoringMessage) -> None: + logger.debug("Sending message type {}".format(message[0])) + self.radio.send(message) def close(self) -> None: logger.info("Terminating Monitoring Hub")