diff --git a/parsl/curvezmq.py b/parsl/curvezmq.py index e90e13a5bd..f1de25b05f 100644 --- a/parsl/curvezmq.py +++ b/parsl/curvezmq.py @@ -144,7 +144,7 @@ def _start_auth_thread(self) -> ThreadAuthenticator: auth_thread.start() # Only allow certs that are in the cert dir assert self.cert_dir # For mypy - auth_thread.configure_curve(domain="*", location=self.cert_dir) + auth_thread.configure_curve(domain="*", location=str(self.cert_dir)) return auth_thread def socket(self, socket_type: int, *args, **kwargs) -> zmq.Socket: diff --git a/parsl/data_provider/files.py b/parsl/data_provider/files.py index 286212e783..9d37d41813 100644 --- a/parsl/data_provider/files.py +++ b/parsl/data_provider/files.py @@ -49,8 +49,8 @@ def __init__(self, url: Union[os.PathLike, str], uu_id: Optional[uuid.UUID] = No self.path = parsed_url.path self.filename = os.path.basename(self.path) # let the DFK set these values, if needed - self.size = None - self.md5sum = None + self.size: Optional[int] = None + self.md5sum: Optional[str] = None self.timestamp = timestamp self.local_path: Optional[str] = None diff --git a/parsl/dataflow/dflow.py b/parsl/dataflow/dflow.py index fff183062e..d0bfdd80f4 100644 --- a/parsl/dataflow/dflow.py +++ b/parsl/dataflow/dflow.py @@ -304,7 +304,7 @@ def std_spec_to_name(name, spec): def _send_file_log_info(self, file: Union[File, DataFuture], task_record: TaskRecord, is_output: bool) -> None: """ Generate a message for the monitoring db about a file. """ - if self.file_provenance: + if self.monitoring and self.file_provenance: file_log_info = self._create_file_log_info(file, task_record) # make sure the task_id is None for inputs if not is_output: @@ -343,7 +343,7 @@ def _create_file_log_info(self, file: Union[File, DataFuture], def register_as_input(self, f: Union[File, DataFuture], task_record: TaskRecord): """ Register a file as an input to a task. """ - if self.file_provenance: + if self.monitoring and self.file_provenance: self._send_file_log_info(f, task_record, False) file_input_info = self._create_file_io_info(f, task_record) self.monitoring.send((MessageType.INPUT_FILE, file_input_info)) @@ -351,7 +351,7 @@ def register_as_input(self, f: Union[File, DataFuture], def register_as_output(self, f: Union[File, DataFuture], task_record: TaskRecord): """ Register a file as an output of a task. """ - if self.file_provenance: + if self.monitoring and self.file_provenance: self._send_file_log_info(f, task_record, True) file_output_info = self._create_file_io_info(f, task_record) self.monitoring.send((MessageType.OUTPUT_FILE, file_output_info)) @@ -370,7 +370,7 @@ def _create_file_io_info(self, file: Union[File, DataFuture], def _register_env(self, environ: ParslExecutor) -> None: """ Capture the environment information for the monitoring db. """ - if self.file_provenance: + if self.monitoring and self.file_provenance: environ_info = self._create_env_log_info(environ) self.monitoring.send((MessageType.ENVIRONMENT_INFO, environ_info)) @@ -387,7 +387,7 @@ def _create_env_log_info(self, environ: ParslExecutor) -> Dict[str, Any]: provider = getattr(environ, 'provider', None) if provider is not None: env_log_info['provider'] = provider.label - env_log_info['launcher'] = type(getattr(provider, 'launcher', None)) + env_log_info['launcher'] = str(type(getattr(provider, 'launcher', None))) env_log_info['worker_init'] = getattr(provider, 'worker_init', None) return env_log_info diff --git a/parsl/dataflow/taskrecord.py b/parsl/dataflow/taskrecord.py index f48aecceb9..1d3961a023 100644 --- a/parsl/dataflow/taskrecord.py +++ b/parsl/dataflow/taskrecord.py @@ -103,3 +103,6 @@ class TaskRecord(TypedDict, total=False): """Restricts access to end-of-join behavior to ensure that joins only complete once, even if several joining Futures complete close together in time.""" + + environment: str + """The environment in which the task is being executed.""" diff --git a/parsl/monitoring/db_manager.py b/parsl/monitoring/db_manager.py index bb6b3e1cef..9e71c1b18c 100644 --- a/parsl/monitoring/db_manager.py +++ b/parsl/monitoring/db_manager.py @@ -5,7 +5,7 @@ import queue import threading import time -from typing import Any, Dict, List, Optional, Set, Tuple, TypeVar, cast +from typing import Any, Dict, List, Optional, Set, Tuple, TypeVar, Union, cast import typeguard @@ -395,10 +395,10 @@ def start(self, """ like inserted_tasks but for Files """ - inserted_files = dict() # type: Dict[str, Dict[str, Union[None, datetime.datetime, str, int]]] - input_inserted_files = dict() # type: Dict[str, List[str]] - output_inserted_files = dict() # type: Dict[str, List[str]] - inserted_envs = set() # type: Set[object] + inserted_files: Dict[str, Dict[str, Union[None, datetime.datetime, str, int]]] = dict() + input_inserted_files: Dict[str, List[str]] = dict() + output_inserted_files: Dict[str, List[str]] = dict() + inserted_envs: Set[object] = set() # for any task ID, we can defer exactly one message, which is the # assumed-to-be-unique first message (with first message flag set).