Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into benc-remove-channels
Browse files Browse the repository at this point in the history
  • Loading branch information
benclifford committed Nov 1, 2024
2 parents ce6f792 + 6dd5845 commit 8c46bc6
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 11 deletions.
10 changes: 5 additions & 5 deletions parsl/dataflow/dflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,8 @@ def __init__(self, config: Config) -> None:
}

if self.monitoring:
self.monitoring.send(MessageType.WORKFLOW_INFO,
workflow_info)
self.monitoring.send((MessageType.WORKFLOW_INFO,
workflow_info))

if config.checkpoint_files is not None:
checkpoints = self.load_checkpoints(config.checkpoint_files)
Expand Down Expand Up @@ -235,7 +235,7 @@ def __exit__(self, exc_type, exc_value, traceback) -> None:
def _send_task_log_info(self, task_record: TaskRecord) -> None:
if self.monitoring:
task_log_info = self._create_task_log_info(task_record)
self.monitoring.send(MessageType.TASK_INFO, task_log_info)
self.monitoring.send((MessageType.TASK_INFO, task_log_info))

def _create_task_log_info(self, task_record: TaskRecord) -> Dict[str, Any]:
"""
Expand Down Expand Up @@ -1239,12 +1239,12 @@ def cleanup(self) -> None:

if self.monitoring:
logger.info("Sending final monitoring message")
self.monitoring.send(MessageType.WORKFLOW_INFO,
self.monitoring.send((MessageType.WORKFLOW_INFO,
{'tasks_failed_count': self.task_state_counts[States.failed],
'tasks_completed_count': self.task_state_counts[States.exec_done],
"time_began": self.time_began,
'time_completed': self.time_completed,
'run_id': self.run_id, 'rundir': self.run_dir})
'run_id': self.run_id, 'rundir': self.run_dir}))

logger.info("Terminating monitoring")
self.monitoring.close()
Expand Down
10 changes: 4 additions & 6 deletions parsl/monitoring/monitoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,12 @@
import time
from multiprocessing import Event, Process
from multiprocessing.queues import Queue
from typing import TYPE_CHECKING, Any, Literal, Optional, Tuple, Union, cast
from typing import TYPE_CHECKING, Literal, Optional, Tuple, Union, cast

import typeguard

from parsl.log_utils import set_file_logger
from parsl.monitoring.errors import MonitoringHubStartError
from parsl.monitoring.message_type import MessageType
from parsl.monitoring.radios import MultiprocessingQueueRadioSender
from parsl.monitoring.router import router_starter
from parsl.monitoring.types import TaggedMonitoringMessage
Expand Down Expand Up @@ -202,10 +201,9 @@ def start(self, dfk_run_dir: str, config_run_dir: Union[str, os.PathLike]) -> No

self.hub_zmq_port = zmq_port

# TODO: tighten the Any message format
def send(self, mtype: MessageType, message: Any) -> None:
logger.debug("Sending message type {}".format(mtype))
self.radio.send((mtype, message))
def send(self, message: TaggedMonitoringMessage) -> None:
logger.debug("Sending message type {}".format(message[0]))
self.radio.send(message)

def close(self) -> None:
logger.info("Terminating Monitoring Hub")
Expand Down

0 comments on commit 8c46bc6

Please sign in to comment.