diff --git a/parsl/executors/errors.py b/parsl/executors/errors.py index 9725105dd9..fed43228be 100644 --- a/parsl/executors/errors.py +++ b/parsl/executors/errors.py @@ -1,4 +1,6 @@ """Exceptions raise by Executors.""" +from typing import Set + from parsl.errors import ParslError from parsl.executors.base import ParslExecutor @@ -44,6 +46,17 @@ def __str__(self): self.current_executor) +class InvalidResourceSpecification(ExecutorError): + """Error raised when Invalid input is supplied via resource Specification""" + + def __init__(self, invalid_keys: Set[str], message: str = ''): + self.invalid_keys = invalid_keys + self.message = message + + def __str__(self): + return f"Invalid Resource Specification Supplied: {self.invalid_keys}. {self.message}" + + class ScalingFailed(ExecutorError): """Scaling failed due to error in Execution provider.""" diff --git a/parsl/executors/high_throughput/executor.py b/parsl/executors/high_throughput/executor.py index 1988d2e5fd..2b9fea9f2a 100644 --- a/parsl/executors/high_throughput/executor.py +++ b/parsl/executors/high_throughput/executor.py @@ -16,16 +16,17 @@ from parsl.addresses import get_all_addresses from parsl.app.errors import RemoteExceptionWrapper from parsl.data_provider.staging import Staging -from parsl.executors.errors import BadMessage, ScalingFailed +from parsl.executors.errors import ( + BadMessage, + InvalidResourceSpecification, + ScalingFailed, +) from parsl.executors.high_throughput import zmq_pipes from parsl.executors.high_throughput.errors import CommandClientTimeoutError from parsl.executors.high_throughput.manager_selector import ( ManagerSelector, RandomManagerSelector, ) -from parsl.executors.high_throughput.mpi_prefix_composer import ( - InvalidResourceSpecification, -) from parsl.executors.status_handling import BlockProviderExecutor from parsl.jobs.states import TERMINAL_STATES, JobState, JobStatus from parsl.process_loggers import wrap_with_logs diff --git a/parsl/executors/high_throughput/mpi_prefix_composer.py b/parsl/executors/high_throughput/mpi_prefix_composer.py index 0125d9a532..689fbb771f 100644 --- a/parsl/executors/high_throughput/mpi_prefix_composer.py +++ b/parsl/executors/high_throughput/mpi_prefix_composer.py @@ -1,5 +1,7 @@ import logging -from typing import Dict, List, Set, Tuple +from typing import Dict, List, Tuple + +from parsl.executors.errors import InvalidResourceSpecification logger = logging.getLogger(__name__) @@ -8,27 +10,6 @@ 'mpiexec') -class MissingResourceSpecification(Exception): - """Exception raised when input is not supplied a resource specification""" - - def __init__(self, reason: str): - self.reason = reason - - def __str__(self): - return f"Missing resource specification: {self.reason}" - - -class InvalidResourceSpecification(Exception): - """Exception raised when Invalid input is supplied via resource specification""" - - def __init__(self, invalid_keys: Set[str], message: str = ''): - self.invalid_keys = invalid_keys - self.message = message - - def __str__(self): - return f"Invalid resource specification options supplied: {self.invalid_keys} {self.message}" - - def validate_resource_spec(resource_spec: Dict[str, str]): """Basic validation of keys in the resource_spec @@ -40,7 +21,8 @@ def validate_resource_spec(resource_spec: Dict[str, str]): # empty resource_spec when mpi_mode is set causes parsl to hang # ref issue #3427 if len(user_keys) == 0: - raise MissingResourceSpecification('MPI mode requires optional parsl_resource_specification keyword argument to be configured') + raise InvalidResourceSpecification(user_keys, + 'MPI mode requires optional parsl_resource_specification keyword argument to be configured') legal_keys = set(("ranks_per_node", "num_nodes", diff --git a/parsl/executors/threads.py b/parsl/executors/threads.py index cbc813d9d4..9b3b0df5ce 100644 --- a/parsl/executors/threads.py +++ b/parsl/executors/threads.py @@ -6,7 +6,7 @@ from parsl.data_provider.staging import Staging from parsl.executors.base import ParslExecutor -from parsl.executors.errors import UnsupportedFeatureError +from parsl.executors.errors import InvalidResourceSpecification from parsl.utils import RepresentationMixin logger = logging.getLogger(__name__) @@ -54,7 +54,8 @@ def submit(self, func, resource_specification, *args, **kwargs): if resource_specification: logger.error("Ignoring the resource specification. " "Parsl resource specification is not supported in ThreadPool Executor.") - raise UnsupportedFeatureError('resource specification', 'ThreadPool Executor', None) + raise InvalidResourceSpecification(set(resource_specification.keys()), + "Parsl resource specification is not supported in ThreadPool Executor.") return self.executor.submit(func, *args, **kwargs) diff --git a/parsl/executors/workqueue/executor.py b/parsl/executors/workqueue/executor.py index ae39f8c118..155c990ab5 100644 --- a/parsl/executors/workqueue/executor.py +++ b/parsl/executors/workqueue/executor.py @@ -28,7 +28,7 @@ from parsl.data_provider.files import File from parsl.data_provider.staging import Staging from parsl.errors import OptionalModuleMissing -from parsl.executors.errors import ExecutorError +from parsl.executors.errors import ExecutorError, InvalidResourceSpecification from parsl.executors.status_handling import BlockProviderExecutor from parsl.executors.workqueue import exec_parsl_function from parsl.process_loggers import wrap_with_logs @@ -419,7 +419,7 @@ def submit(self, func, resource_specification, *args, **kwargs): message = "Task resource specification only accepts these types of resources: {}".format( ', '.join(acceptable_fields)) logger.error(message) - raise ExecutorError(self, message) + raise InvalidResourceSpecification(keys, message) # this checks that either all of the required resource types are specified, or # that none of them are: the `required_resource_types` are not actually required, @@ -430,9 +430,10 @@ def submit(self, func, resource_specification, *args, **kwargs): logger.error("Running with `autolabel=False`. In this mode, " "task resource specification requires " "three resources to be specified simultaneously: cores, memory, and disk") - raise ExecutorError(self, "Task resource specification requires " - "three resources to be specified simultaneously: cores, memory, and disk. " - "Try setting autolabel=True if you are unsure of the resource usage") + raise InvalidResourceSpecification(keys, + "Task resource specification requires " + "three resources to be specified simultaneously: cores, memory, and disk. " + "Try setting autolabel=True if you are unsure of the resource usage") for k in keys: if k == 'cores': diff --git a/parsl/tests/test_error_handling/test_resource_spec.py b/parsl/tests/test_error_handling/test_resource_spec.py index 871df68512..4616219be2 100644 --- a/parsl/tests/test_error_handling/test_resource_spec.py +++ b/parsl/tests/test_error_handling/test_resource_spec.py @@ -1,11 +1,9 @@ import parsl from parsl.app.app import python_app from parsl.executors import WorkQueueExecutor -from parsl.executors.errors import ExecutorError, UnsupportedFeatureError +from parsl.executors.errors import InvalidResourceSpecification from parsl.executors.high_throughput.executor import HighThroughputExecutor -from parsl.executors.high_throughput.mpi_prefix_composer import ( - InvalidResourceSpecification, -) +from parsl.executors.threads import ThreadPoolExecutor @python_app @@ -27,11 +25,10 @@ def test_resource(n=2): try: fut.result() except InvalidResourceSpecification: - assert isinstance(executor, HighThroughputExecutor) - except UnsupportedFeatureError: - assert not isinstance(executor, WorkQueueExecutor) - except Exception as e: - assert isinstance(e, ExecutorError) + assert ( + isinstance(executor, HighThroughputExecutor) or + isinstance(executor, WorkQueueExecutor) or + isinstance(executor, ThreadPoolExecutor)) # Specify resources with wrong types # 'cpus' is incorrect, should be 'cores' @@ -40,8 +37,7 @@ def test_resource(n=2): try: fut.result() except InvalidResourceSpecification: - assert isinstance(executor, HighThroughputExecutor) - except UnsupportedFeatureError: - assert not isinstance(executor, WorkQueueExecutor) - except Exception as e: - assert isinstance(e, ExecutorError) + assert ( + isinstance(executor, HighThroughputExecutor) or + isinstance(executor, WorkQueueExecutor) or + isinstance(executor, ThreadPoolExecutor)) diff --git a/parsl/tests/test_htex/test_resource_spec_validation.py b/parsl/tests/test_htex/test_resource_spec_validation.py index ac0c580c20..5587891650 100644 --- a/parsl/tests/test_htex/test_resource_spec_validation.py +++ b/parsl/tests/test_htex/test_resource_spec_validation.py @@ -4,9 +4,7 @@ import pytest from parsl.executors import HighThroughputExecutor -from parsl.executors.high_throughput.mpi_prefix_composer import ( - InvalidResourceSpecification, -) +from parsl.executors.errors import InvalidResourceSpecification def double(x): diff --git a/parsl/tests/test_mpi_apps/test_mpi_mode_enabled.py b/parsl/tests/test_mpi_apps/test_mpi_mode_enabled.py index aff2501674..63005a8df7 100644 --- a/parsl/tests/test_mpi_apps/test_mpi_mode_enabled.py +++ b/parsl/tests/test_mpi_apps/test_mpi_mode_enabled.py @@ -8,9 +8,7 @@ import parsl from parsl import Config, bash_app, python_app from parsl.executors import MPIExecutor -from parsl.executors.high_throughput.mpi_prefix_composer import ( - MissingResourceSpecification, -) +from parsl.executors.errors import InvalidResourceSpecification from parsl.launchers import SimpleLauncher from parsl.providers import LocalProvider @@ -185,6 +183,6 @@ def test_simulated_load(rounds: int = 100): @pytest.mark.local def test_missing_resource_spec(): - with pytest.raises(MissingResourceSpecification): + with pytest.raises(InvalidResourceSpecification): future = mock_app(sleep_dur=0.4) future.result(timeout=10) diff --git a/parsl/tests/test_mpi_apps/test_resource_spec.py b/parsl/tests/test_mpi_apps/test_resource_spec.py index f180c67d52..058a7ef425 100644 --- a/parsl/tests/test_mpi_apps/test_resource_spec.py +++ b/parsl/tests/test_mpi_apps/test_resource_spec.py @@ -10,12 +10,9 @@ import pytest from parsl.app.app import python_app +from parsl.executors.errors import InvalidResourceSpecification from parsl.executors.high_throughput.executor import HighThroughputExecutor from parsl.executors.high_throughput.mpi_executor import MPIExecutor -from parsl.executors.high_throughput.mpi_prefix_composer import ( - InvalidResourceSpecification, - MissingResourceSpecification, -) from parsl.executors.high_throughput.mpi_resource_management import ( get_nodes_in_batchjob, get_pbs_hosts_list, @@ -105,7 +102,7 @@ def test_top_level(): ({"num_nodes": 2, "ranks_per_node": 1}, None), ({"launcher_options": "--debug_foo"}, None), ({"num_nodes": 2, "BAD_OPT": 1}, InvalidResourceSpecification), - ({}, MissingResourceSpecification), + ({}, InvalidResourceSpecification), ) ) def test_mpi_resource_spec(resource_spec: Dict, exception):