diff --git a/parsl/app/errors.py b/parsl/app/errors.py index 4a15558d37..327096be41 100644 --- a/parsl/app/errors.py +++ b/parsl/app/errors.py @@ -139,11 +139,11 @@ def get_exception(self) -> BaseException: def wrap_error(func: Callable[P, R]) -> Callable[P, Union[R, RemoteExceptionWrapper]]: @wraps(func) - def wrapper(*args: P.args, **kwargs: P.kwargs) -> Union[R, RemoteExceptionWrapper]: + def parsl_error_wrapper(*args: P.args, **kwargs: P.kwargs) -> Union[R, RemoteExceptionWrapper]: import sys from parsl.app.errors import RemoteExceptionWrapper try: return func(*args, **kwargs) except Exception: return RemoteExceptionWrapper(*sys.exc_info()) - return wrapper + return parsl_error_wrapper diff --git a/parsl/executors/high_throughput/executor.py b/parsl/executors/high_throughput/executor.py index 10e5a9fab4..15fc990e79 100644 --- a/parsl/executors/high_throughput/executor.py +++ b/parsl/executors/high_throughput/executor.py @@ -1,3 +1,4 @@ +import typing from concurrent.futures import Future import typeguard import logging @@ -524,18 +525,22 @@ def hold_worker(self, worker_id: str) -> None: logger.debug("Sent hold request to manager: {}".format(worker_id)) @property - def outstanding(self): - outstanding_c = self.command_client.run("OUTSTANDING_C") - return outstanding_c + def outstanding(self) -> int: + """Returns the count of tasks outstanding across the interchange + and managers""" + return self.command_client.run("OUTSTANDING_C") @property - def connected_workers(self): - workers = self.command_client.run("WORKERS") - return workers - - def connected_managers(self): - managers = self.command_client.run("MANAGERS") - return managers + def connected_workers(self) -> int: + """Returns the count of workers across all connected managers""" + return self.command_client.run("WORKERS") + + def connected_managers(self) -> List[Dict[str, typing.Any]]: + """Returns a list of dicts one for each connected managers. + The dict contains info on manager(str:manager_id), block_id, + worker_count, tasks(int), idle_durations(float), active(bool) + """ + return self.command_client.run("MANAGERS") def _hold_block(self, block_id): """ Sends hold command to all managers which are in a specific block diff --git a/parsl/monitoring/monitoring.py b/parsl/monitoring/monitoring.py index 45d3ac4be8..f1311c52e8 100644 --- a/parsl/monitoring/monitoring.py +++ b/parsl/monitoring/monitoring.py @@ -332,7 +332,7 @@ def monitor_wrapper(f: Any, def filesystem_receiver(logdir: str, q: "queue.Queue[AddressedMonitoringMessage]", run_dir: str) -> None: logger = start_file_logger("{}/monitoring_filesystem_radio.log".format(logdir), name="monitoring_filesystem_radio", - level=logging.DEBUG) + level=logging.INFO) logger.info("Starting filesystem radio receiver") setproctitle("parsl: monitoring filesystem receiver") @@ -345,7 +345,7 @@ def filesystem_receiver(logdir: str, q: "queue.Queue[AddressedMonitoringMessage] os.makedirs(new_dir, exist_ok=True) while True: # this loop will end on process termination - logger.info("Start filesystem radio receiver loop") + logger.debug("Start filesystem radio receiver loop") # iterate over files in new_dir for filename in os.listdir(new_dir): @@ -354,7 +354,7 @@ def filesystem_receiver(logdir: str, q: "queue.Queue[AddressedMonitoringMessage] full_path_filename = f"{new_dir}/{filename}" with open(full_path_filename, "rb") as f: message = deserialize(f.read()) - logger.info(f"Message received is: {message}") + logger.debug(f"Message received is: {message}") assert isinstance(message, tuple) q.put(cast(AddressedMonitoringMessage, message)) os.remove(full_path_filename) diff --git a/parsl/monitoring/remote.py b/parsl/monitoring/remote.py index 49e06540ce..161fd87444 100644 --- a/parsl/monitoring/remote.py +++ b/parsl/monitoring/remote.py @@ -2,6 +2,7 @@ import time import logging import datetime +import functools from parsl.multiprocessing import ForkProcess from multiprocessing import Event, Queue @@ -17,7 +18,6 @@ monitoring_wrapper_cache: Dict monitoring_wrapper_cache = {} - def monitor_wrapper(f: Any, # per app args: Sequence, # per invocation kwargs: Dict, # per invocation @@ -40,11 +40,17 @@ def monitor_wrapper(f: Any, # per app cache_key = (run_id, f, radio_mode) if cache_key in monitoring_wrapper_cache: - wrapped = monitoring_wrapper_cache[cache_key] + parsl_monitoring_wrapper = monitoring_wrapper_cache[cache_key] else: - def wrapped(*args: List[Any], **kwargs: Dict[str, Any]) -> Any: + # This is all of functools.WRAPPER_ASSIGNMENTS except __module__. + # Assigning __module__ in @wraps is causing the entire module to be + # serialized. This doesn't happen on the underlying wrapped function + # and doesn't happen if no @wraps is specified. + # I am unsure why. + @functools.wraps(f, assigned = ('__name__', '__qualname__', '__doc__', '__annotations__')) + def parsl_monitoring_wrapper(*args: List[Any], **kwargs: Dict[str, Any]) -> Any: task_id = kwargs.pop('_parsl_monitoring_task_id') try_id = kwargs.pop('_parsl_monitoring_try_id') terminate_event = Event() @@ -156,13 +162,13 @@ def wrapped(*args: List[Any], **kwargs: Dict[str, Any]) -> Any: else: return ret_v - monitoring_wrapper_cache[cache_key] = wrapped + monitoring_wrapper_cache[cache_key] = parsl_monitoring_wrapper new_kwargs = kwargs.copy() new_kwargs['_parsl_monitoring_task_id'] = x_task_id new_kwargs['_parsl_monitoring_try_id'] = x_try_id - return (wrapped, args, new_kwargs) + return (parsl_monitoring_wrapper, args, new_kwargs) @wrap_with_logs diff --git a/parsl/version.py b/parsl/version.py index 5de4b5964e..89061bc60c 100644 --- a/parsl/version.py +++ b/parsl/version.py @@ -1,3 +1,3 @@ """Set module version. """ -VERSION = '2023.07.24-dev+desc-2023.07.28b' +VERSION = '2023.07.31-dev+desc-2023.08.01b'