diff --git a/docs/userguide/checkpoints.rst b/docs/userguide/checkpoints.rst index 9fe729cd23..26840b18a9 100644 --- a/docs/userguide/checkpoints.rst +++ b/docs/userguide/checkpoints.rst @@ -58,6 +58,10 @@ Parsl provides four checkpointing modes: In all cases the checkpoint file is written out to the ``runinfo/RUN_ID/checkpoint/`` directory. +.. Note:: Checkpoint modes `periodic`, `dfk_exit`, and `manual` can interfere with garbage collection. + In these modes task information will be retained after completion, until checkpointing events are triggered. + + Creating a checkpoint ^^^^^^^^^^^^^^^^^^^^^ diff --git a/parsl/dataflow/dflow.py b/parsl/dataflow/dflow.py index 1d3a618f9d..b9e3b5cbf3 100644 --- a/parsl/dataflow/dflow.py +++ b/parsl/dataflow/dflow.py @@ -355,8 +355,8 @@ def handle_exec_update(self, task_id: int, future: Future) -> None: res = future.result() if isinstance(res, RemoteExceptionWrapper): res.reraise() - self.tasks[task_id]['app_fu'].set_result(future.result()) + self.tasks[task_id]['app_fu'].set_result(future.result()) except Exception as e: if self.tasks[task_id]['retries_left'] > 0: # ignore this exception, because assume some later @@ -856,7 +856,7 @@ def callback_adapter(dep_fut: Future) -> None: self.launch_if_ready(task_id) - return task_def['app_fu'] + return app_fu # it might also be interesting to assert that all DFK # tasks are in a "final" state (3,4,5) when the DFK @@ -1059,6 +1059,7 @@ def checkpoint(self, tasks: Optional[List[int]] = None) -> str: with open(checkpoint_tasks, 'ab') as f: for task_id in checkpoint_queue: if task_id in self.tasks and \ + not self.tasks[task_id]['checkpoint'] and \ self.tasks[task_id]['app_fu'] is not None and \ self.tasks[task_id]['app_fu'].done() and \ self.tasks[task_id]['app_fu'].exception() is None: diff --git a/parsl/tests/test_checkpointing/test_python_checkpoint_1.py b/parsl/tests/test_checkpointing/test_python_checkpoint_1.py index 8b7dcb7be5..54dbc3d46f 100644 --- a/parsl/tests/test_checkpointing/test_python_checkpoint_1.py +++ b/parsl/tests/test_checkpointing/test_python_checkpoint_1.py @@ -1,39 +1,36 @@ import argparse import os -import time - import pytest import parsl -from parsl.app.app import python_app +from parsl import python_app from parsl.tests.configs.local_threads import config -local_config = config +@python_app(cache=True) +def random_app(i): + import random + return random.randint(i, 100000) -@python_app(cache=True) -def slow_double(x, sleep_dur=1): - import time - time.sleep(sleep_dur) - return x * 2 +def launch_n_random(n=2): + """1. Launch a few apps and write the checkpoint once a few have completed + """ + + d = [random_app(i) for i in range(0, n)] + print("Done launching") + + # Block till done + return [i.result() for i in d] @pytest.mark.local def test_initial_checkpoint_write(n=2): """1. Launch a few apps and write the checkpoint once a few have completed """ + parsl.load(config) + results = launch_n_random(n) - d = {} - time.time() - print("Launching : ", n) - for i in range(0, n): - d[i] = slow_double(i) - print("Done launching") - - for i in range(0, n): - d[i].result() - print("Done sleeping") cpt_dir = parsl.dfk().checkpoint() cptpath = cpt_dir + '/dfk.pkl' @@ -46,7 +43,10 @@ def test_initial_checkpoint_write(n=2): assert os.path.exists( cptpath), "Tasks checkpoint missing: {0}".format(cptpath) - return parsl.dfk().run_dir + run_dir = parsl.dfk().run_dir + parsl.clear() + + return run_dir, results if __name__ == '__main__': diff --git a/parsl/tests/test_checkpointing/test_python_checkpoint_2.py b/parsl/tests/test_checkpointing/test_python_checkpoint_2.py index 611515379e..cbe424bedd 100644 --- a/parsl/tests/test_checkpointing/test_python_checkpoint_2.py +++ b/parsl/tests/test_checkpointing/test_python_checkpoint_2.py @@ -1,50 +1,46 @@ import argparse import os -import time - import pytest - import parsl -from parsl.app.app import python_app -import parsl.tests.test_checkpointing.test_python_checkpoint_1 as test1 +from parsl import python_app + from parsl.tests.configs.local_threads import config from parsl.tests.configs.local_threads_checkpoint import fresh_config @python_app(cache=True) -def slow_double(x, sleep_dur=1): - import time - time.sleep(sleep_dur) - return x * 2 +def random_app(i): + import random + return random.randint(i, 100000) + + +def launch_n_random(n=2): + """1. Launch a few apps and write the checkpoint once a few have completed + """ + d = [random_app(i) for i in range(0, n)] + return [i.result() for i in d] @pytest.mark.local def test_loading_checkpoint(n=2): """Load memoization table from previous checkpoint """ - + config.checkpoint_mode = 'task_exit' parsl.load(config) - rundir = test1.test_initial_checkpoint_write() + results = launch_n_random(n) + rundir = parsl.dfk().run_dir + parsl.dfk().cleanup() parsl.clear() local_config = fresh_config() local_config.checkpoint_files = [os.path.join(rundir, 'checkpoint')] parsl.load(local_config) - d = {} - - start = time.time() - print("Launching : ", n) - for i in range(0, n): - d[i] = slow_double(i) - print("Done launching") - - for i in range(0, n): - d[i].result() - print("Done sleeping") + relaunched = launch_n_random(n) + assert len(relaunched) == len(results) == n, "Expected all results to have n items" - delta = time.time() - start - assert delta < 1, "Took longer than a second ({}), assuming restore from checkpoint failed".format(delta) + for i in range(n): + assert relaunched[i] == results[i], "Expected relaunched to contain cached results from first run" parsl.clear() diff --git a/parsl/tests/test_python_apps/test_garbage_collect.py b/parsl/tests/test_python_apps/test_garbage_collect.py index 20bd3a9739..a7a71855ef 100644 --- a/parsl/tests/test_python_apps/test_garbage_collect.py +++ b/parsl/tests/test_python_apps/test_garbage_collect.py @@ -1,6 +1,4 @@ import parsl -import time - from parsl.app.app import python_app @@ -21,13 +19,9 @@ def test_garbage_collect(): assert parsl.dfk().tasks[x.tid]['app_fu'] == x, "Tasks table should have app_fu ref before done" x.result() - if parsl.dfk().checkpoint_mode is not None: - # We explicit call checkpoint if checkpoint_mode is enabled covering - # cases like manual/periodic where checkpointing may be deferred. - parsl.dfk().checkpoint() + parsl.dfk().checkpoint() - time.sleep(0.2) # Give enough time for task wipes to work - assert x.tid not in parsl.dfk().tasks, "Task record should be wiped after task completion" + assert parsl.dfk().tasks[x.tid]['app_fu'] is None, "Tasks should have app_fu ref wiped after task completion" if __name__ == '__main__':