Skip to content

Commit

Permalink
Remove a hard-coded indirection to monitor_wrapper (#3309)
Browse files Browse the repository at this point in the history
  • Loading branch information
benclifford authored Mar 29, 2024
1 parent ce63913 commit 6210703
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 28 deletions.
17 changes: 9 additions & 8 deletions parsl/dataflow/dflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
from parsl.executors.status_handling import BlockProviderExecutor
from parsl.executors.threads import ThreadPoolExecutor
from parsl.monitoring import MonitoringHub
from parsl.monitoring.remote import monitor_wrapper
from parsl.process_loggers import wrap_with_logs
from parsl.providers.base import ExecutionProvider
from parsl.utils import get_version, get_std_fname_mode, get_all_checkpoints, Timer
Expand Down Expand Up @@ -713,14 +714,14 @@ def launch_task(self, task_record: TaskRecord) -> Future:

if self.monitoring is not None and self.monitoring.resource_monitoring_enabled:
wrapper_logging_level = logging.DEBUG if self.monitoring.monitoring_debug else logging.INFO
(function, args, kwargs) = self.monitoring.monitor_wrapper(function, args, kwargs, try_id, task_id,
self.monitoring.monitoring_hub_url,
self.run_id,
wrapper_logging_level,
self.monitoring.resource_monitoring_interval,
executor.radio_mode,
executor.monitor_resources(),
self.run_dir)
(function, args, kwargs) = monitor_wrapper(function, args, kwargs, try_id, task_id,
self.monitoring.monitoring_hub_url,
self.run_id,
wrapper_logging_level,
self.monitoring.resource_monitoring_interval,
executor.radio_mode,
executor.monitor_resources(),
self.run_dir)

with self.submitter_lock:
exec_fu = executor.submit(function, task_record['resource_specification'], *args, **kwargs)
Expand Down
21 changes: 1 addition & 20 deletions parsl/monitoring/monitoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@

import queue

import parsl.monitoring.remote

from parsl.multiprocessing import ForkProcess, SizedQueue
from multiprocessing import Process
from multiprocessing.queues import Queue
Expand All @@ -23,7 +21,7 @@
from parsl.monitoring.router import router_starter
from parsl.monitoring.message_type import MessageType
from parsl.monitoring.types import AddressedMonitoringMessage
from typing import cast, Any, Callable, Dict, Optional, Sequence, Tuple, Union, TYPE_CHECKING
from typing import cast, Any, Optional, Tuple, Union, TYPE_CHECKING

_db_manager_excepts: Optional[Exception]

Expand Down Expand Up @@ -269,23 +267,6 @@ def close(self) -> None:
self.filesystem_proc.terminate()
self.filesystem_proc.join()

@staticmethod
def monitor_wrapper(f: Any,
args: Sequence,
kwargs: Dict,
try_id: int,
task_id: int,
monitoring_hub_url: str,
run_id: str,
logging_level: int,
sleep_dur: float,
radio_mode: str,
monitor_resources: bool,
run_dir: str) -> Tuple[Callable, Sequence, Dict]:
return parsl.monitoring.remote.monitor_wrapper(f, args, kwargs, try_id, task_id, monitoring_hub_url,
run_id, logging_level, sleep_dur, radio_mode,
monitor_resources, run_dir)


@wrap_with_logs
def filesystem_receiver(logdir: str, q: "queue.Queue[AddressedMonitoringMessage]", run_dir: str) -> None:
Expand Down

0 comments on commit 6210703

Please sign in to comment.