Skip to content

Commit

Permalink
Merge master, work on serializer cache efficency failure
Browse files Browse the repository at this point in the history
  • Loading branch information
benclifford committed Aug 1, 2023
2 parents c13bdda + 4b8b0b8 commit c8508ce
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 21 deletions.
4 changes: 2 additions & 2 deletions parsl/app/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,11 +139,11 @@ def get_exception(self) -> BaseException:

def wrap_error(func: Callable[P, R]) -> Callable[P, Union[R, RemoteExceptionWrapper]]:
@wraps(func)
def wrapper(*args: P.args, **kwargs: P.kwargs) -> Union[R, RemoteExceptionWrapper]:
def parsl_error_wrapper(*args: P.args, **kwargs: P.kwargs) -> Union[R, RemoteExceptionWrapper]:
import sys
from parsl.app.errors import RemoteExceptionWrapper
try:
return func(*args, **kwargs)
except Exception:
return RemoteExceptionWrapper(*sys.exc_info())
return wrapper
return parsl_error_wrapper
25 changes: 15 additions & 10 deletions parsl/executors/high_throughput/executor.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import typing
from concurrent.futures import Future
import typeguard
import logging
Expand Down Expand Up @@ -524,18 +525,22 @@ def hold_worker(self, worker_id: str) -> None:
logger.debug("Sent hold request to manager: {}".format(worker_id))

@property
def outstanding(self):
outstanding_c = self.command_client.run("OUTSTANDING_C")
return outstanding_c
def outstanding(self) -> int:
"""Returns the count of tasks outstanding across the interchange
and managers"""
return self.command_client.run("OUTSTANDING_C")

@property
def connected_workers(self):
workers = self.command_client.run("WORKERS")
return workers

def connected_managers(self):
managers = self.command_client.run("MANAGERS")
return managers
def connected_workers(self) -> int:
"""Returns the count of workers across all connected managers"""
return self.command_client.run("WORKERS")

def connected_managers(self) -> List[Dict[str, typing.Any]]:
"""Returns a list of dicts one for each connected managers.
The dict contains info on manager(str:manager_id), block_id,
worker_count, tasks(int), idle_durations(float), active(bool)
"""
return self.command_client.run("MANAGERS")

def _hold_block(self, block_id):
""" Sends hold command to all managers which are in a specific block
Expand Down
6 changes: 3 additions & 3 deletions parsl/monitoring/monitoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ def monitor_wrapper(f: Any,
def filesystem_receiver(logdir: str, q: "queue.Queue[AddressedMonitoringMessage]", run_dir: str) -> None:
logger = start_file_logger("{}/monitoring_filesystem_radio.log".format(logdir),
name="monitoring_filesystem_radio",
level=logging.DEBUG)
level=logging.INFO)

logger.info("Starting filesystem radio receiver")
setproctitle("parsl: monitoring filesystem receiver")
Expand All @@ -345,7 +345,7 @@ def filesystem_receiver(logdir: str, q: "queue.Queue[AddressedMonitoringMessage]
os.makedirs(new_dir, exist_ok=True)

while True: # this loop will end on process termination
logger.info("Start filesystem radio receiver loop")
logger.debug("Start filesystem radio receiver loop")

# iterate over files in new_dir
for filename in os.listdir(new_dir):
Expand All @@ -354,7 +354,7 @@ def filesystem_receiver(logdir: str, q: "queue.Queue[AddressedMonitoringMessage]
full_path_filename = f"{new_dir}/{filename}"
with open(full_path_filename, "rb") as f:
message = deserialize(f.read())
logger.info(f"Message received is: {message}")
logger.debug(f"Message received is: {message}")
assert isinstance(message, tuple)
q.put(cast(AddressedMonitoringMessage, message))
os.remove(full_path_filename)
Expand Down
16 changes: 11 additions & 5 deletions parsl/monitoring/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import time
import logging
import datetime
import functools

from parsl.multiprocessing import ForkProcess
from multiprocessing import Event, Queue
Expand All @@ -17,7 +18,6 @@
monitoring_wrapper_cache: Dict
monitoring_wrapper_cache = {}


def monitor_wrapper(f: Any, # per app
args: Sequence, # per invocation
kwargs: Dict, # per invocation
Expand All @@ -40,11 +40,17 @@ def monitor_wrapper(f: Any, # per app
cache_key = (run_id, f, radio_mode)

if cache_key in monitoring_wrapper_cache:
wrapped = monitoring_wrapper_cache[cache_key]
parsl_monitoring_wrapper = monitoring_wrapper_cache[cache_key]

else:

def wrapped(*args: List[Any], **kwargs: Dict[str, Any]) -> Any:
# This is all of functools.WRAPPER_ASSIGNMENTS except __module__.
# Assigning __module__ in @wraps is causing the entire module to be
# serialized. This doesn't happen on the underlying wrapped function
# and doesn't happen if no @wraps is specified.
# I am unsure why.
@functools.wraps(f, assigned = ('__name__', '__qualname__', '__doc__', '__annotations__'))
def parsl_monitoring_wrapper(*args: List[Any], **kwargs: Dict[str, Any]) -> Any:
task_id = kwargs.pop('_parsl_monitoring_task_id')
try_id = kwargs.pop('_parsl_monitoring_try_id')
terminate_event = Event()
Expand Down Expand Up @@ -156,13 +162,13 @@ def wrapped(*args: List[Any], **kwargs: Dict[str, Any]) -> Any:
else:
return ret_v

monitoring_wrapper_cache[cache_key] = wrapped
monitoring_wrapper_cache[cache_key] = parsl_monitoring_wrapper

new_kwargs = kwargs.copy()
new_kwargs['_parsl_monitoring_task_id'] = x_task_id
new_kwargs['_parsl_monitoring_try_id'] = x_try_id

return (wrapped, args, new_kwargs)
return (parsl_monitoring_wrapper, args, new_kwargs)


@wrap_with_logs
Expand Down
2 changes: 1 addition & 1 deletion parsl/version.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
"""Set module version.
"""
VERSION = '2023.07.24-dev+desc-2023.07.28b'
VERSION = '2023.07.31-dev+desc-2023.08.01b'

0 comments on commit c8508ce

Please sign in to comment.