Skip to content

Commit

Permalink
Make RuntimeErrors in DFK into custom error classes (#2878)
Browse files Browse the repository at this point in the history
Fixes issue #2873
  • Loading branch information
benclifford authored Sep 15, 2023
1 parent 465eb43 commit df235e4
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 6 deletions.
12 changes: 6 additions & 6 deletions parsl/dataflow/dflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
from parsl.dataflow.rundirs import make_rundir
from parsl.dataflow.states import States, FINAL_STATES, FINAL_FAILURE_STATES
from parsl.dataflow.taskrecord import TaskRecord
from parsl.errors import ConfigurationError
from parsl.errors import ConfigurationError, InternalConsistencyError, NoDataFlowKernelError
from parsl.jobs.job_status_poller import JobStatusPoller
from parsl.jobs.states import JobStatus, JobState
from parsl.usage_tracking.usage import UsageTracker
Expand Down Expand Up @@ -295,7 +295,7 @@ def handle_exec_update(self, task_record: TaskRecord, future: Future) -> None:
task_record['try_time_returned'] = datetime.datetime.now()

if not future.done():
raise RuntimeError("done callback called, despite future not reporting itself as done")
raise InternalConsistencyError("done callback called, despite future not reporting itself as done")

try:
res = self._unwrap_remote_exception_wrapper(future)
Expand Down Expand Up @@ -535,7 +535,7 @@ def handle_app_update(self, task_record: TaskRecord, future: AppFuture) -> None:
elif self.checkpoint_mode is None:
pass
else:
raise RuntimeError(f"Invalid checkpoint mode {self.checkpoint_mode}")
raise InternalConsistencyError(f"Invalid checkpoint mode {self.checkpoint_mode}")

self.wipe_task(task_id)
return
Expand Down Expand Up @@ -933,7 +933,7 @@ def submit(self,
ignore_for_cache = list(ignore_for_cache)

if self.cleanup_called:
raise RuntimeError("Cannot submit to a DFK that has been cleaned up")
raise NoDataFlowKernelError("Cannot submit to a DFK that has been cleaned up")

task_id = self.task_count
self.task_count += 1
Expand Down Expand Up @@ -1420,7 +1420,7 @@ def load(cls, config: Optional[Config] = None) -> DataFlowKernel:
- DataFlowKernel : The loaded DataFlowKernel object.
"""
if cls._dfk is not None:
raise RuntimeError('Config has already been loaded')
raise ConfigurationError('Config has already been loaded')

if config is None:
cls._dfk = DataFlowKernel(Config())
Expand All @@ -1441,5 +1441,5 @@ def wait_for_current_tasks(cls) -> None:
def dfk(cls) -> DataFlowKernel:
"""Return the currently-loaded DataFlowKernel."""
if cls._dfk is None:
raise RuntimeError('Must first load config')
raise ConfigurationError('Must first load config')
return cls._dfk
10 changes: 10 additions & 0 deletions parsl/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,13 @@ def __str__(self) -> str:
return "The functionality requested requires optional modules {0} which could not be imported, because: {1}".format(
self.module_names, self.reason
)


class InternalConsistencyError(ParslError):
"""Raised when a component enounters an internal inconsistency.
"""


class NoDataFlowKernelError(ParslError):
"""Raised when no DataFlowKernel is available for an operation that needs one.
"""

0 comments on commit df235e4

Please sign in to comment.