Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make RuntimeErrors in DFK into custom error classes #2878

Merged
merged 1 commit into from
Sep 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions parsl/dataflow/dflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
from parsl.dataflow.rundirs import make_rundir
from parsl.dataflow.states import States, FINAL_STATES, FINAL_FAILURE_STATES
from parsl.dataflow.taskrecord import TaskRecord
from parsl.errors import ConfigurationError
from parsl.errors import ConfigurationError, InternalConsistencyError, NoDataFlowKernelError
from parsl.jobs.job_status_poller import JobStatusPoller
from parsl.jobs.states import JobStatus, JobState
from parsl.usage_tracking.usage import UsageTracker
Expand Down Expand Up @@ -295,7 +295,7 @@ def handle_exec_update(self, task_record: TaskRecord, future: Future) -> None:
task_record['try_time_returned'] = datetime.datetime.now()

if not future.done():
raise RuntimeError("done callback called, despite future not reporting itself as done")
raise InternalConsistencyError("done callback called, despite future not reporting itself as done")

try:
res = self._unwrap_remote_exception_wrapper(future)
Expand Down Expand Up @@ -535,7 +535,7 @@ def handle_app_update(self, task_record: TaskRecord, future: AppFuture) -> None:
elif self.checkpoint_mode is None:
pass
else:
raise RuntimeError(f"Invalid checkpoint mode {self.checkpoint_mode}")
raise InternalConsistencyError(f"Invalid checkpoint mode {self.checkpoint_mode}")

self.wipe_task(task_id)
return
Expand Down Expand Up @@ -933,7 +933,7 @@ def submit(self,
ignore_for_cache = list(ignore_for_cache)

if self.cleanup_called:
raise RuntimeError("Cannot submit to a DFK that has been cleaned up")
raise NoDataFlowKernelError("Cannot submit to a DFK that has been cleaned up")

task_id = self.task_count
self.task_count += 1
Expand Down Expand Up @@ -1420,7 +1420,7 @@ def load(cls, config: Optional[Config] = None) -> DataFlowKernel:
- DataFlowKernel : The loaded DataFlowKernel object.
"""
if cls._dfk is not None:
raise RuntimeError('Config has already been loaded')
raise ConfigurationError('Config has already been loaded')

if config is None:
cls._dfk = DataFlowKernel(Config())
Expand All @@ -1441,5 +1441,5 @@ def wait_for_current_tasks(cls) -> None:
def dfk(cls) -> DataFlowKernel:
"""Return the currently-loaded DataFlowKernel."""
if cls._dfk is None:
raise RuntimeError('Must first load config')
raise ConfigurationError('Must first load config')
return cls._dfk
10 changes: 10 additions & 0 deletions parsl/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,13 @@ def __str__(self) -> str:
return "The functionality requested requires optional modules {0} which could not be imported, because: {1}".format(
self.module_names, self.reason
)


class InternalConsistencyError(ParslError):
"""Raised when a component enounters an internal inconsistency.
"""


class NoDataFlowKernelError(ParslError):
"""Raised when no DataFlowKernel is available for an operation that needs one.
"""