From 62107037adf4a84d33fd54c3bc722f96fc8da6da Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Fri, 29 Mar 2024 05:15:05 -0500 Subject: [PATCH] Remove a hard-coded indirection to monitor_wrapper (#3309) --- parsl/dataflow/dflow.py | 17 +++++++++-------- parsl/monitoring/monitoring.py | 21 +-------------------- 2 files changed, 10 insertions(+), 28 deletions(-) diff --git a/parsl/dataflow/dflow.py b/parsl/dataflow/dflow.py index 2f37cae131..eff326d759 100644 --- a/parsl/dataflow/dflow.py +++ b/parsl/dataflow/dflow.py @@ -39,6 +39,7 @@ from parsl.executors.status_handling import BlockProviderExecutor from parsl.executors.threads import ThreadPoolExecutor from parsl.monitoring import MonitoringHub +from parsl.monitoring.remote import monitor_wrapper from parsl.process_loggers import wrap_with_logs from parsl.providers.base import ExecutionProvider from parsl.utils import get_version, get_std_fname_mode, get_all_checkpoints, Timer @@ -713,14 +714,14 @@ def launch_task(self, task_record: TaskRecord) -> Future: if self.monitoring is not None and self.monitoring.resource_monitoring_enabled: wrapper_logging_level = logging.DEBUG if self.monitoring.monitoring_debug else logging.INFO - (function, args, kwargs) = self.monitoring.monitor_wrapper(function, args, kwargs, try_id, task_id, - self.monitoring.monitoring_hub_url, - self.run_id, - wrapper_logging_level, - self.monitoring.resource_monitoring_interval, - executor.radio_mode, - executor.monitor_resources(), - self.run_dir) + (function, args, kwargs) = monitor_wrapper(function, args, kwargs, try_id, task_id, + self.monitoring.monitoring_hub_url, + self.run_id, + wrapper_logging_level, + self.monitoring.resource_monitoring_interval, + executor.radio_mode, + executor.monitor_resources(), + self.run_dir) with self.submitter_lock: exec_fu = executor.submit(function, task_record['resource_specification'], *args, **kwargs) diff --git a/parsl/monitoring/monitoring.py b/parsl/monitoring/monitoring.py index 0d5b26c8e0..3a6b88e113 100644 --- a/parsl/monitoring/monitoring.py +++ b/parsl/monitoring/monitoring.py @@ -8,8 +8,6 @@ import queue -import parsl.monitoring.remote - from parsl.multiprocessing import ForkProcess, SizedQueue from multiprocessing import Process from multiprocessing.queues import Queue @@ -23,7 +21,7 @@ from parsl.monitoring.router import router_starter from parsl.monitoring.message_type import MessageType from parsl.monitoring.types import AddressedMonitoringMessage -from typing import cast, Any, Callable, Dict, Optional, Sequence, Tuple, Union, TYPE_CHECKING +from typing import cast, Any, Optional, Tuple, Union, TYPE_CHECKING _db_manager_excepts: Optional[Exception] @@ -269,23 +267,6 @@ def close(self) -> None: self.filesystem_proc.terminate() self.filesystem_proc.join() - @staticmethod - def monitor_wrapper(f: Any, - args: Sequence, - kwargs: Dict, - try_id: int, - task_id: int, - monitoring_hub_url: str, - run_id: str, - logging_level: int, - sleep_dur: float, - radio_mode: str, - monitor_resources: bool, - run_dir: str) -> Tuple[Callable, Sequence, Dict]: - return parsl.monitoring.remote.monitor_wrapper(f, args, kwargs, try_id, task_id, monitoring_hub_url, - run_id, logging_level, sleep_dur, radio_mode, - monitor_resources, run_dir) - @wrap_with_logs def filesystem_receiver(logdir: str, q: "queue.Queue[AddressedMonitoringMessage]", run_dir: str) -> None: