diff --git a/parsl/dataflow/dflow.py b/parsl/dataflow/dflow.py index e9f9247cc2..fc1b38effc 100644 --- a/parsl/dataflow/dflow.py +++ b/parsl/dataflow/dflow.py @@ -32,7 +32,7 @@ from parsl.dataflow.rundirs import make_rundir from parsl.dataflow.states import States, FINAL_STATES, FINAL_FAILURE_STATES from parsl.dataflow.taskrecord import TaskRecord -from parsl.errors import ConfigurationError +from parsl.errors import ConfigurationError, InternalConsistencyError, NoDataFlowKernelError from parsl.jobs.job_status_poller import JobStatusPoller from parsl.jobs.states import JobStatus, JobState from parsl.usage_tracking.usage import UsageTracker @@ -295,7 +295,7 @@ def handle_exec_update(self, task_record: TaskRecord, future: Future) -> None: task_record['try_time_returned'] = datetime.datetime.now() if not future.done(): - raise RuntimeError("done callback called, despite future not reporting itself as done") + raise InternalConsistencyError("done callback called, despite future not reporting itself as done") try: res = self._unwrap_remote_exception_wrapper(future) @@ -535,7 +535,7 @@ def handle_app_update(self, task_record: TaskRecord, future: AppFuture) -> None: elif self.checkpoint_mode is None: pass else: - raise RuntimeError(f"Invalid checkpoint mode {self.checkpoint_mode}") + raise InternalConsistencyError(f"Invalid checkpoint mode {self.checkpoint_mode}") self.wipe_task(task_id) return @@ -933,7 +933,7 @@ def submit(self, ignore_for_cache = list(ignore_for_cache) if self.cleanup_called: - raise RuntimeError("Cannot submit to a DFK that has been cleaned up") + raise NoDataFlowKernelError("Cannot submit to a DFK that has been cleaned up") task_id = self.task_count self.task_count += 1 @@ -1420,7 +1420,7 @@ def load(cls, config: Optional[Config] = None) -> DataFlowKernel: - DataFlowKernel : The loaded DataFlowKernel object. """ if cls._dfk is not None: - raise RuntimeError('Config has already been loaded') + raise ConfigurationError('Config has already been loaded') if config is None: cls._dfk = DataFlowKernel(Config()) @@ -1441,5 +1441,5 @@ def wait_for_current_tasks(cls) -> None: def dfk(cls) -> DataFlowKernel: """Return the currently-loaded DataFlowKernel.""" if cls._dfk is None: - raise RuntimeError('Must first load config') + raise ConfigurationError('Must first load config') return cls._dfk diff --git a/parsl/errors.py b/parsl/errors.py index 571ced1e8d..5de6010dcd 100644 --- a/parsl/errors.py +++ b/parsl/errors.py @@ -25,3 +25,13 @@ def __str__(self) -> str: return "The functionality requested requires optional modules {0} which could not be imported, because: {1}".format( self.module_names, self.reason ) + + +class InternalConsistencyError(ParslError): + """Raised when a component enounters an internal inconsistency. + """ + + +class NoDataFlowKernelError(ParslError): + """Raised when no DataFlowKernel is available for an operation that needs one. + """