Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add exit choices for the DFK context manager #3468

Merged
merged 4 commits into from
Jun 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions parsl/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,15 @@ class Config(RepresentationMixin, UsageInformation):
``checkpoint_mode='periodic'``.
dependency_resolver: plugin point for custom dependency resolvers. Default: only resolve Futures,
using the `SHALLOW_DEPENDENCY_RESOLVER`.
exit_mode: str, optional
When Parsl is used as a context manager (using ``with parsl.load`` syntax) then this parameter
controls what will happen to running tasks and exceptions at exit. The options are:

* ``cleanup``: cleanup the DFK on exit without waiting for any tasks
* ``skip``: skip all shutdown behaviour when exiting the context manager
* ``wait``: wait for all tasks to complete when exiting normally, but exit immediately when exiting due to an exception.

Default is ``cleanup``.
garbage_collect : bool. optional.
Delete task records from DFK when tasks have completed. Default: True
internal_tasks_max_threads : int, optional
Expand Down Expand Up @@ -97,6 +106,7 @@ def __init__(self,
Literal['manual']] = None,
checkpoint_period: Optional[str] = None,
dependency_resolver: Optional[DependencyResolver] = None,
exit_mode: Literal['cleanup', 'skip', 'wait'] = 'cleanup',
garbage_collect: bool = True,
internal_tasks_max_threads: int = 10,
retries: int = 0,
Expand Down Expand Up @@ -133,6 +143,7 @@ def __init__(self,
checkpoint_period = "00:30:00"
self.checkpoint_period = checkpoint_period
self.dependency_resolver = dependency_resolver
self.exit_mode = exit_mode
self.garbage_collect = garbage_collect
self.internal_tasks_max_threads = internal_tasks_max_threads
self.retries = retries
Expand Down
21 changes: 18 additions & 3 deletions parsl/dataflow/dflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,9 +217,24 @@ def __init__(self, config: Config) -> None:
def __enter__(self):
return self

def __exit__(self, exc_type, exc_value, traceback):
logger.debug("Exiting the context manager, calling cleanup for DFK")
self.cleanup()
def __exit__(self, exc_type, exc_value, traceback) -> None:
mode = self.config.exit_mode
logger.debug("Exiting context manager, with exit mode '%s'", mode)
if mode == "cleanup":
logger.info("Calling cleanup for DFK")
self.cleanup()
elif mode == "skip":
logger.info("Skipping all cleanup handling")
elif mode == "wait":
if exc_type is None:
logger.info("Waiting for all tasks to complete")
self.wait_for_current_tasks()
self.cleanup()
else:
logger.info("There was an exception - cleaning up without waiting for task completion")
self.cleanup()
Comment on lines +223 to +235
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If cleanup is the default in almost all cases, could make the code follow "moral" suit. Consider:

# with typeguard, is this, in fact, necessary?
if mode not in ("cleanup", "wait", "skip"):
    raise InternalConsistencyError(f"Exit case for {mode} should be unreachable, validated by typeguard on Config()")

if mode == "skip":
    logger.info("Skipping all cleanup handling")
    return

if mode == "wait":
     if exc_type is None:
        logger.info("Waiting ...")
        self.wait_for_current_tasks()
     else:
        logger.info("was an exc ...")

logger.info("Calling cleanup for DFK")
self.cleanup()

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could do - I think it comes down to seeing this case statement as choosing three distinct behaviours vs a "normal flow" that gets ornamented by different things on the way. I've ended up with more of a preference for being explicit about all three flows as explicit separate options, but I think it would be fine either way.

else:
raise InternalConsistencyError(f"Exit case for {mode} should be unreachable, validated by typeguard on Config()")

def _send_task_log_info(self, task_record: TaskRecord) -> None:
if self.monitoring:
Expand Down
97 changes: 96 additions & 1 deletion parsl/tests/test_python_apps/test_context_manager.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
from concurrent.futures import Future
from threading import Event

import pytest

import parsl
from parsl.dataflow.dflow import DataFlowKernel
from parsl.config import Config
from parsl.dataflow.dflow import DataFlowKernel, DataFlowKernelLoader
from parsl.errors import NoDataFlowKernelError
from parsl.tests.configs.local_threads import fresh_config

Expand All @@ -16,6 +20,16 @@ def foo(x, stdout='foo.stdout'):
return f"echo {x + 1}"


@parsl.python_app
def wait_for_event(ev: Event):
ev.wait()


@parsl.python_app
def raise_app():
raise RuntimeError("raise_app deliberate failure")


@pytest.mark.local
def test_within_context_manger(tmpd_cwd):
config = fresh_config()
Expand All @@ -31,3 +45,84 @@ def test_within_context_manger(tmpd_cwd):
with pytest.raises(NoDataFlowKernelError) as excinfo:
square(2).result()
assert str(excinfo.value) == "Must first load config"


@pytest.mark.local
def test_exit_skip():
config = fresh_config()
config.exit_mode = "skip"

with parsl.load(config) as dfk:
ev = Event()
fut = wait_for_event(ev)
# deliberately don't wait for this to finish, so that the context
# manager can exit

assert parsl.dfk() is dfk, "global dfk should be left in place by skip mode"

assert not fut.done(), "wait_for_event should not be done yet"
ev.set()

# now we can wait for that result...
fut.result()
assert fut.done(), "wait_for_event should complete outside of context manager in 'skip' mode"

# now cleanup the DFK that the above `with` block
# deliberately avoided doing...
dfk.cleanup()


# 'wait' mode has two cases to test:
# 1. that we wait when there is no exception
# 2. that we do not wait when there is an exception
@pytest.mark.local
def test_exit_wait_no_exception():
config = fresh_config()
config.exit_mode = "wait"

with parsl.load(config) as dfk:
fut = square(1)
# deliberately don't wait for this to finish, so that the context
# manager can exit

assert fut.done(), "This future should be marked as done before the context manager exits"

assert dfk.cleanup_called, "The DFK should have been cleaned up by the context manager"
assert DataFlowKernelLoader._dfk is None, "The global DFK should have been removed"


@pytest.mark.local
def test_exit_wait_exception():
config = fresh_config()
config.exit_mode = "wait"

with pytest.raises(RuntimeError):
with parsl.load(config) as dfk:
# we'll never fire this future
fut_never = Future()

fut_raise = raise_app()

fut_depend = square(fut_never)

# this should cause an exception, which should cause the context
# manager to exit, without waiting for fut_depend to finish.
fut_raise.result()

assert dfk.cleanup_called, "The DFK should have been cleaned up by the context manager"
assert DataFlowKernelLoader._dfk is None, "The global DFK should have been removed"
assert fut_raise.exception() is not None, "fut_raise should contain an exception"
assert not fut_depend.done(), "fut_depend should have been left un-done (due to dependency failure)"


@pytest.mark.local
def test_exit_wrong_mode():

with pytest.raises(Exception) as ex:
Config(exit_mode="wrongmode")

# with typeguard 4.x this is TypeCheckError,
# with typeguard 2.x this is TypeError
# we can't instantiate TypeCheckError if we're in typeguard 2.x environment
# because it does not exist... so check name using strings.
assert ex.type.__name__ == "TypeCheckError" or ex.type.__name__ == "TypeError"
Loading