Skip to content

Commit

Permalink
Attempt to re-apply previously reverted #1512 garbage collection code
Browse files Browse the repository at this point in the history
I reverted this because it was hard to type-check, but subsequently this has become easier
  • Loading branch information
benclifford committed May 10, 2020
1 parent 7746e5a commit 9e4d086
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 54 deletions.
4 changes: 4 additions & 0 deletions docs/userguide/checkpoints.rst
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ Parsl provides four checkpointing modes:

In all cases the checkpoint file is written out to the ``runinfo/RUN_ID/checkpoint/`` directory.

.. Note:: Checkpoint modes `periodic`, `dfk_exit`, and `manual` can interfere with garbage collection.
In these modes task information will be retained after completion, until checkpointing events are triggered.


Creating a checkpoint
^^^^^^^^^^^^^^^^^^^^^

Expand Down
5 changes: 3 additions & 2 deletions parsl/dataflow/dflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -355,8 +355,8 @@ def handle_exec_update(self, task_id: int, future: Future) -> None:
res = future.result()
if isinstance(res, RemoteExceptionWrapper):
res.reraise()
self.tasks[task_id]['app_fu'].set_result(future.result())

self.tasks[task_id]['app_fu'].set_result(future.result())
except Exception as e:
if self.tasks[task_id]['retries_left'] > 0:
# ignore this exception, because assume some later
Expand Down Expand Up @@ -856,7 +856,7 @@ def callback_adapter(dep_fut: Future) -> None:

self.launch_if_ready(task_id)

return task_def['app_fu']
return app_fu

# it might also be interesting to assert that all DFK
# tasks are in a "final" state (3,4,5) when the DFK
Expand Down Expand Up @@ -1059,6 +1059,7 @@ def checkpoint(self, tasks: Optional[List[int]] = None) -> str:
with open(checkpoint_tasks, 'ab') as f:
for task_id in checkpoint_queue:
if task_id in self.tasks and \
not self.tasks[task_id]['checkpoint'] and \
self.tasks[task_id]['app_fu'] is not None and \
self.tasks[task_id]['app_fu'].done() and \
self.tasks[task_id]['app_fu'].exception() is None:
Expand Down
40 changes: 20 additions & 20 deletions parsl/tests/test_checkpointing/test_python_checkpoint_1.py
Original file line number Diff line number Diff line change
@@ -1,39 +1,36 @@
import argparse
import os
import time

import pytest

import parsl
from parsl.app.app import python_app
from parsl import python_app
from parsl.tests.configs.local_threads import config


local_config = config
@python_app(cache=True)
def random_app(i):
import random
return random.randint(i, 100000)


@python_app(cache=True)
def slow_double(x, sleep_dur=1):
import time
time.sleep(sleep_dur)
return x * 2
def launch_n_random(n=2):
"""1. Launch a few apps and write the checkpoint once a few have completed
"""

d = [random_app(i) for i in range(0, n)]
print("Done launching")

# Block till done
return [i.result() for i in d]


@pytest.mark.local
def test_initial_checkpoint_write(n=2):
"""1. Launch a few apps and write the checkpoint once a few have completed
"""
parsl.load(config)
results = launch_n_random(n)

d = {}
time.time()
print("Launching : ", n)
for i in range(0, n):
d[i] = slow_double(i)
print("Done launching")

for i in range(0, n):
d[i].result()
print("Done sleeping")
cpt_dir = parsl.dfk().checkpoint()

cptpath = cpt_dir + '/dfk.pkl'
Expand All @@ -46,7 +43,10 @@ def test_initial_checkpoint_write(n=2):
assert os.path.exists(
cptpath), "Tasks checkpoint missing: {0}".format(cptpath)

return parsl.dfk().run_dir
run_dir = parsl.dfk().run_dir
parsl.clear()

return run_dir, results


if __name__ == '__main__':
Expand Down
44 changes: 20 additions & 24 deletions parsl/tests/test_checkpointing/test_python_checkpoint_2.py
Original file line number Diff line number Diff line change
@@ -1,50 +1,46 @@
import argparse
import os
import time

import pytest

import parsl
from parsl.app.app import python_app
import parsl.tests.test_checkpointing.test_python_checkpoint_1 as test1
from parsl import python_app

from parsl.tests.configs.local_threads import config
from parsl.tests.configs.local_threads_checkpoint import fresh_config


@python_app(cache=True)
def slow_double(x, sleep_dur=1):
import time
time.sleep(sleep_dur)
return x * 2
def random_app(i):
import random
return random.randint(i, 100000)


def launch_n_random(n=2):
"""1. Launch a few apps and write the checkpoint once a few have completed
"""
d = [random_app(i) for i in range(0, n)]
return [i.result() for i in d]


@pytest.mark.local
def test_loading_checkpoint(n=2):
"""Load memoization table from previous checkpoint
"""

config.checkpoint_mode = 'task_exit'
parsl.load(config)
rundir = test1.test_initial_checkpoint_write()
results = launch_n_random(n)
rundir = parsl.dfk().run_dir
parsl.dfk().cleanup()
parsl.clear()

local_config = fresh_config()
local_config.checkpoint_files = [os.path.join(rundir, 'checkpoint')]
parsl.load(local_config)

d = {}

start = time.time()
print("Launching : ", n)
for i in range(0, n):
d[i] = slow_double(i)
print("Done launching")

for i in range(0, n):
d[i].result()
print("Done sleeping")
relaunched = launch_n_random(n)
assert len(relaunched) == len(results) == n, "Expected all results to have n items"

delta = time.time() - start
assert delta < 1, "Took longer than a second ({}), assuming restore from checkpoint failed".format(delta)
for i in range(n):
assert relaunched[i] == results[i], "Expected relaunched to contain cached results from first run"
parsl.clear()


Expand Down
10 changes: 2 additions & 8 deletions parsl/tests/test_python_apps/test_garbage_collect.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
import parsl
import time

from parsl.app.app import python_app


Expand All @@ -21,13 +19,9 @@ def test_garbage_collect():
assert parsl.dfk().tasks[x.tid]['app_fu'] == x, "Tasks table should have app_fu ref before done"

x.result()
if parsl.dfk().checkpoint_mode is not None:
# We explicit call checkpoint if checkpoint_mode is enabled covering
# cases like manual/periodic where checkpointing may be deferred.
parsl.dfk().checkpoint()
parsl.dfk().checkpoint()

time.sleep(0.2) # Give enough time for task wipes to work
assert x.tid not in parsl.dfk().tasks, "Task record should be wiped after task completion"
assert parsl.dfk().tasks[x.tid]['app_fu'] is None, "Tasks should have app_fu ref wiped after task completion"


if __name__ == '__main__':
Expand Down

0 comments on commit 9e4d086

Please sign in to comment.