diff --git a/parsl/dataflow/dflow.py b/parsl/dataflow/dflow.py index bd62b1b9ea..c6bd7fdedf 100644 --- a/parsl/dataflow/dflow.py +++ b/parsl/dataflow/dflow.py @@ -1260,6 +1260,7 @@ def cleanup(self) -> None: self.log_task_states() + # TODO: do this in the basic memoizer # Checkpointing takes priority over the rest of the tasks # checkpoint if any valid checkpoint method is specified if self.checkpoint_mode is not None: @@ -1272,6 +1273,10 @@ def cleanup(self) -> None: logger.info("Stopping checkpoint timer") self._checkpoint_timer.close() + logger.info("Closing memoizer") + self.memoizer.close() + logger.info("Closed memoizer") + # Send final stats logger.info("Sending end message for usage tracking") self.usage_tracker.send_end_message() diff --git a/parsl/dataflow/memoization.py b/parsl/dataflow/memoization.py index a50b411a9d..0a5f541b9c 100644 --- a/parsl/dataflow/memoization.py +++ b/parsl/dataflow/memoization.py @@ -161,6 +161,9 @@ class Memoizer: def start(self, *, dfk: DataFlowKernel, memoize: bool = True, checkpoint_files: Sequence[str], run_dir: str) -> None: raise NotImplementedError + def close(self) -> None: + raise NotImplementedError + def update_memo(self, task: TaskRecord, r: Future[Any]) -> None: raise NotImplementedError @@ -236,6 +239,9 @@ def start(self, *, dfk: DataFlowKernel, memoize: bool = True, checkpoint_files: logger.info("App caching disabled for all apps") self.memo_lookup_table = {} + def close(self) -> None: + pass # nothing to close but more should move here + def check_memo(self, task: TaskRecord) -> Optional[Future[Any]]: """Create a hash of the task and its inputs and check the lookup table for this hash.