diff --git a/.wci.yml b/.wci.yml index c11a2e82b4..ccea1300a6 100644 --- a/.wci.yml +++ b/.wci.yml @@ -33,7 +33,6 @@ execution_environment: - Slurm - LSF - PBS - - Cobalt - Flux - GridEngine - HTCondor diff --git a/docs/faq.rst b/docs/faq.rst index ca4bd82bdb..f58d2639e7 100644 --- a/docs/faq.rst +++ b/docs/faq.rst @@ -257,15 +257,15 @@ There are a few common situations in which a Parsl script might hang: .. code-block:: python - from libsubmit.providers import Cobalt from parsl.config import Config + from parsl.providers import SlurmProvider from parsl.executors import HighThroughputExecutor config = Config( executors=[ HighThroughputExecutor( - label='ALCF_theta_local', - provider=Cobalt(), + label='htex', + provider=SlurmProvider(), worer_port_range=('50000,55000'), interchange_port_range=('50000,55000') ) diff --git a/docs/reference.rst b/docs/reference.rst index d1cab26038..1424e08106 100644 --- a/docs/reference.rst +++ b/docs/reference.rst @@ -115,7 +115,6 @@ Providers :nosignatures: parsl.providers.AWSProvider - parsl.providers.CobaltProvider parsl.providers.CondorProvider parsl.providers.GoogleCloudProvider parsl.providers.GridEngineProvider diff --git a/docs/userguide/configuring.rst b/docs/userguide/configuring.rst index 8a8eba8d42..88d4456a26 100644 --- a/docs/userguide/configuring.rst +++ b/docs/userguide/configuring.rst @@ -123,9 +123,6 @@ Stepping through the following question should help formulate a suitable configu | Torque/PBS based | * `parsl.executors.HighThroughputExecutor` | `parsl.providers.TorqueProvider` | | system | * `parsl.executors.WorkQueueExecutor` | | +---------------------+-----------------------------------------------+----------------------------------------+ -| Cobalt based system | * `parsl.executors.HighThroughputExecutor` | `parsl.providers.CobaltProvider` | -| | * `parsl.executors.WorkQueueExecutor` | | -+---------------------+-----------------------------------------------+----------------------------------------+ | GridEngine based | * `parsl.executors.HighThroughputExecutor` | `parsl.providers.GridEngineProvider` | | system | * `parsl.executors.WorkQueueExecutor` | | +---------------------+-----------------------------------------------+----------------------------------------+ @@ -185,8 +182,6 @@ Stepping through the following question should help formulate a suitable configu | `parsl.providers.TorqueProvider` | Any | * `parsl.launchers.AprunLauncher` | | | | * `parsl.launchers.MpiExecLauncher` | +-------------------------------------+--------------------------+----------------------------------------------------+ -| `parsl.providers.CobaltProvider` | Any | * `parsl.launchers.AprunLauncher` | -+-------------------------------------+--------------------------+----------------------------------------------------+ | `parsl.providers.SlurmProvider` | Any | * `parsl.launchers.SrunLauncher` if native slurm | | | | * `parsl.launchers.AprunLauncher`, otherwise | +-------------------------------------+--------------------------+----------------------------------------------------+ diff --git a/docs/userguide/execution.rst b/docs/userguide/execution.rst index df17dc458f..832985c164 100644 --- a/docs/userguide/execution.rst +++ b/docs/userguide/execution.rst @@ -27,8 +27,7 @@ retrieve the status of an allocation (e.g., squeue), and cancel a running job (e.g., scancel). Parsl implements providers for local execution (fork), for various cloud platforms using cloud-specific APIs, and for clusters and supercomputers that use a Local Resource Manager -(LRM) to manage access to resources, such as Slurm, HTCondor, -and Cobalt. +(LRM) to manage access to resources, such as Slurm and HTCondor. Each provider implementation may allow users to specify additional parameters for further configuration. Parameters are generally mapped to LRM submission script or cloud API options. Examples of LRM-specific options are partition, wall clock time, @@ -39,15 +38,14 @@ parameters include access keys, instance type, and spot bid price Parsl currently supports the following providers: 1. `parsl.providers.LocalProvider`: The provider allows you to run locally on your laptop or workstation. -2. `parsl.providers.CobaltProvider`: This provider allows you to schedule resources via the Cobalt scheduler. **This provider is deprecated and will be removed by 2024.04**. -3. `parsl.providers.SlurmProvider`: This provider allows you to schedule resources via the Slurm scheduler. -4. `parsl.providers.CondorProvider`: This provider allows you to schedule resources via the Condor scheduler. -5. `parsl.providers.GridEngineProvider`: This provider allows you to schedule resources via the GridEngine scheduler. -6. `parsl.providers.TorqueProvider`: This provider allows you to schedule resources via the Torque scheduler. -7. `parsl.providers.AWSProvider`: This provider allows you to provision and manage cloud nodes from Amazon Web Services. -8. `parsl.providers.GoogleCloudProvider`: This provider allows you to provision and manage cloud nodes from Google Cloud. -9. `parsl.providers.KubernetesProvider`: This provider allows you to provision and manage containers on a Kubernetes cluster. -10. `parsl.providers.LSFProvider`: This provider allows you to schedule resources via IBM's LSF scheduler. +2. `parsl.providers.SlurmProvider`: This provider allows you to schedule resources via the Slurm scheduler. +3. `parsl.providers.CondorProvider`: This provider allows you to schedule resources via the Condor scheduler. +4. `parsl.providers.GridEngineProvider`: This provider allows you to schedule resources via the GridEngine scheduler. +5. `parsl.providers.TorqueProvider`: This provider allows you to schedule resources via the Torque scheduler. +6. `parsl.providers.AWSProvider`: This provider allows you to provision and manage cloud nodes from Amazon Web Services. +7. `parsl.providers.GoogleCloudProvider`: This provider allows you to provision and manage cloud nodes from Google Cloud. +8. `parsl.providers.KubernetesProvider`: This provider allows you to provision and manage containers on a Kubernetes cluster. +9. `parsl.providers.LSFProvider`: This provider allows you to schedule resources via IBM's LSF scheduler. diff --git a/parsl/executors/high_throughput/executor.py b/parsl/executors/high_throughput/executor.py index ab1498efc4..a1def0466a 100644 --- a/parsl/executors/high_throughput/executor.py +++ b/parsl/executors/high_throughput/executor.py @@ -63,7 +63,6 @@ GENERAL_HTEX_PARAM_DOCS = """provider : :class:`~parsl.providers.base.ExecutionProvider` Provider to access computation resources. Can be one of :class:`~parsl.providers.aws.aws.EC2Provider`, - :class:`~parsl.providers.cobalt.cobalt.Cobalt`, :class:`~parsl.providers.condor.condor.Condor`, :class:`~parsl.providers.googlecloud.googlecloud.GoogleCloud`, :class:`~parsl.providers.gridEngine.gridEngine.GridEngine`, diff --git a/parsl/executors/high_throughput/mpi_resource_management.py b/parsl/executors/high_throughput/mpi_resource_management.py index 3dd3294648..3f3fc33ea4 100644 --- a/parsl/executors/high_throughput/mpi_resource_management.py +++ b/parsl/executors/high_throughput/mpi_resource_management.py @@ -17,7 +17,6 @@ class Scheduler(Enum): Unknown = 0 Slurm = 1 PBS = 2 - Cobalt = 3 def get_slurm_hosts_list() -> List[str]: @@ -37,13 +36,6 @@ def get_pbs_hosts_list() -> List[str]: return [line.strip() for line in f.readlines()] -def get_cobalt_hosts_list() -> List[str]: - """Get list of COBALT hosts from envvar: COBALT_NODEFILE""" - nodefile_name = os.environ["COBALT_NODEFILE"] - with open(nodefile_name) as f: - return [line.strip() for line in f.readlines()] - - def get_nodes_in_batchjob(scheduler: Scheduler) -> List[str]: """Get nodelist from all supported schedulers""" nodelist = [] @@ -51,8 +43,6 @@ def get_nodes_in_batchjob(scheduler: Scheduler) -> List[str]: nodelist = get_slurm_hosts_list() elif scheduler == Scheduler.PBS: nodelist = get_pbs_hosts_list() - elif scheduler == Scheduler.Cobalt: - nodelist = get_cobalt_hosts_list() else: raise RuntimeError(f"mpi_mode does not support scheduler:{scheduler}") return nodelist @@ -64,8 +54,6 @@ def identify_scheduler() -> Scheduler: return Scheduler.Slurm elif os.environ.get("PBS_NODEFILE"): return Scheduler.PBS - elif os.environ.get("COBALT_NODEFILE"): - return Scheduler.Cobalt else: return Scheduler.Unknown diff --git a/parsl/providers/__init__.py b/parsl/providers/__init__.py index 150f425f3d..6855915ba7 100644 --- a/parsl/providers/__init__.py +++ b/parsl/providers/__init__.py @@ -1,7 +1,6 @@ # Cloud Providers from parsl.providers.aws.aws import AWSProvider from parsl.providers.azure.azure import AzureProvider -from parsl.providers.cobalt.cobalt import CobaltProvider from parsl.providers.condor.condor import CondorProvider from parsl.providers.googlecloud.googlecloud import GoogleCloudProvider from parsl.providers.grid_engine.grid_engine import GridEngineProvider @@ -15,7 +14,6 @@ from parsl.providers.torque.torque import TorqueProvider __all__ = ['LocalProvider', - 'CobaltProvider', 'CondorProvider', 'GridEngineProvider', 'SlurmProvider', diff --git a/parsl/providers/base.py b/parsl/providers/base.py index c8c17c6f5b..d7d236cacb 100644 --- a/parsl/providers/base.py +++ b/parsl/providers/base.py @@ -11,7 +11,7 @@ class ExecutionProvider(metaclass=ABCMeta): """Execution providers are responsible for managing execution resources that have a Local Resource Manager (LRM). For instance, campus clusters and supercomputers generally have LRMs (schedulers) such as Slurm, - Torque/PBS, Condor and Cobalt. Clouds, on the other hand, have API + Torque/PBS, and Condor. Clouds, on the other hand, have API interfaces that allow much more fine-grained composition of an execution environment. An execution provider abstracts these types of resources and provides a single uniform interface to them. diff --git a/parsl/providers/cobalt/__init__.py b/parsl/providers/cobalt/__init__.py deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/parsl/providers/cobalt/cobalt.py b/parsl/providers/cobalt/cobalt.py deleted file mode 100644 index 4039dfcbea..0000000000 --- a/parsl/providers/cobalt/cobalt.py +++ /dev/null @@ -1,236 +0,0 @@ -import logging -import os -import time -import warnings - -from parsl.channels import LocalChannel -from parsl.jobs.states import JobState, JobStatus -from parsl.launchers import AprunLauncher -from parsl.providers.cluster_provider import ClusterProvider -from parsl.providers.cobalt.template import template_string -from parsl.providers.errors import ScaleOutFailed -from parsl.utils import RepresentationMixin, wtime_to_minutes - -logger = logging.getLogger(__name__) - -translate_table = { - 'QUEUED': JobState.PENDING, - 'STARTING': JobState.PENDING, - 'RUNNING': JobState.RUNNING, - 'EXITING': JobState.COMPLETED, - 'KILLING': JobState.COMPLETED -} - - -class CobaltProvider(ClusterProvider, RepresentationMixin): - """ Cobalt Execution Provider - - WARNING: CobaltProvider is deprecated and will be removed by 2024.04 - - This provider uses cobalt to submit (qsub), obtain the status of (qstat), and cancel (qdel) - jobs. Theo script to be used is created from a template file in this - same module. - - Parameters - ---------- - channel : Channel - Channel for accessing this provider. Possible channels include - :class:`~parsl.channels.LocalChannel` (the default), - :class:`~parsl.channels.SSHChannel`, or - :class:`~parsl.channels.SSHInteractiveLoginChannel`. - nodes_per_block : int - Nodes to provision per block. - min_blocks : int - Minimum number of blocks to maintain. - max_blocks : int - Maximum number of blocks to maintain. - walltime : str - Walltime requested per block in HH:MM:SS. - account : str - Account that the job will be charged against. - queue : str - Torque queue to request blocks from. - scheduler_options : str - String to prepend to the submit script to the scheduler. - worker_init : str - Command to be run before starting a worker, such as 'module load Anaconda; source activate env'. - launcher : Launcher - Launcher for this provider. Possible launchers include - :class:`~parsl.launchers.AprunLauncher` (the default) or, - :class:`~parsl.launchers.SingleNodeLauncher` - """ - def __init__(self, - channel=LocalChannel(), - nodes_per_block=1, - init_blocks=0, - min_blocks=0, - max_blocks=1, - parallelism=1, - walltime="00:10:00", - account=None, - queue=None, - scheduler_options='', - worker_init='', - launcher=AprunLauncher(), - cmd_timeout=10): - label = 'cobalt' - super().__init__(label, - channel=channel, - nodes_per_block=nodes_per_block, - init_blocks=init_blocks, - min_blocks=min_blocks, - max_blocks=max_blocks, - parallelism=parallelism, - walltime=walltime, - launcher=launcher, - cmd_timeout=cmd_timeout) - - self.account = account - self.queue = queue - self.scheduler_options = scheduler_options - self.worker_init = worker_init - warnings.warn("CobaltProvider is deprecated; This will be removed after 2024-04", - DeprecationWarning, - stacklevel=2) - - def _status(self): - """Returns the status list for a list of job_ids - - Args: - self - - Returns: - [status...] : Status list of all jobs - """ - - jobs_missing = list(self.resources.keys()) - - retcode, stdout, stderr = self.execute_wait("qstat -u $USER") - - # Execute_wait failed. Do no update. - if retcode != 0: - return - - for line in stdout.split('\n'): - if line.startswith('='): - continue - - parts = line.upper().split() - if parts and parts[0] != 'JOBID': - job_id = parts[0] - - if job_id not in self.resources: - continue - - status = translate_table.get(parts[4], JobState.UNKNOWN) - - self.resources[job_id]['status'] = JobStatus(status) - jobs_missing.remove(job_id) - - # squeue does not report on jobs that are not running. So we are filling in the - # blanks for missing jobs, we might lose some information about why the jobs failed. - for missing_job in jobs_missing: - self.resources[missing_job]['status'] = JobStatus(JobState.COMPLETED) - - def submit(self, command, tasks_per_node, job_name="parsl.cobalt"): - """ Submits the command onto an Local Resource Manager job of parallel elements. - Submit returns an ID that corresponds to the task that was just submitted. - - If tasks_per_node < 1 : ! This is illegal. tasks_per_node should be integer - - If tasks_per_node == 1: - A single node is provisioned - - If tasks_per_node > 1 : - tasks_per_node number of nodes are provisioned. - - Args: - - command :(String) Commandline invocation to be made on the remote side. - - tasks_per_node (int) : command invocations to be launched per node - - Kwargs: - - job_name (String): Name for job, must be unique - - Returns: - - None: At capacity, cannot provision more - - job_id: (string) Identifier for the job - - """ - - account_opt = '-A {}'.format(self.account) if self.account is not None else '' - - job_name = "parsl.{0}.{1}".format(job_name, time.time()) - - script_path = "{0}/{1}.submit".format(self.script_dir, job_name) - script_path = os.path.abspath(script_path) - - job_config = {} - job_config["scheduler_options"] = self.scheduler_options - job_config["worker_init"] = self.worker_init - - logger.debug("Requesting nodes_per_block:%s tasks_per_node:%s", - self.nodes_per_block, tasks_per_node) - - # Wrap the command - job_config["user_script"] = self.launcher(command, tasks_per_node, self.nodes_per_block) - - queue_opt = '-q {}'.format(self.queue) if self.queue is not None else '' - - logger.debug("Writing submit script") - self._write_submit_script(template_string, script_path, job_name, job_config) - - channel_script_path = self.channel.push_file(script_path, self.channel.script_dir) - - command = 'qsub -n {0} {1} -t {2} {3} {4}'.format( - self.nodes_per_block, queue_opt, wtime_to_minutes(self.walltime), account_opt, channel_script_path) - logger.debug("Executing {}".format(command)) - - retcode, stdout, stderr = self.execute_wait(command) - - # TODO : FIX this block - if retcode != 0: - logger.error("Failed command: {0}".format(command)) - logger.error("Launch failed stdout:\n{0} \nstderr:{1}\n".format(stdout, stderr)) - - logger.debug("Retcode:%s STDOUT:%s STDERR:%s", retcode, stdout.strip(), stderr.strip()) - - job_id = None - - if retcode == 0: - # We should be getting only one line back - job_id = stdout.strip() - self.resources[job_id] = {'job_id': job_id, 'status': JobStatus(JobState.PENDING)} - else: - logger.error("Submit command failed: {0}".format(stderr)) - raise ScaleOutFailed(self.__class__, "Request to submit job to local scheduler failed") - - logger.debug("Returning job id : {0}".format(job_id)) - return job_id - - def cancel(self, job_ids): - """ Cancels the jobs specified by a list of job ids - - Args: - job_ids : [ ...] - - Returns : - [True/False...] : If the cancel operation fails the entire list will be False. - """ - - job_id_list = ' '.join(job_ids) - retcode, stdout, stderr = self.execute_wait("qdel {0}".format(job_id_list)) - rets = None - if retcode == 0: - for jid in job_ids: - # ??? - # self.resources[jid]['status'] = translate_table['KILLING'] # Setting state to cancelled - self.resources[jid]['status'] = JobStatus(JobState.COMPLETED) - rets = [True for i in job_ids] - else: - rets = [False for i in job_ids] - - return rets - - @property - def status_polling_interval(self): - return 60 diff --git a/parsl/providers/cobalt/template.py b/parsl/providers/cobalt/template.py deleted file mode 100755 index 07141dd136..0000000000 --- a/parsl/providers/cobalt/template.py +++ /dev/null @@ -1,17 +0,0 @@ -template_string = '''#!/bin/bash -el -${scheduler_options} - -${worker_init} - -echo "Starting Cobalt job script" - -echo "----Cobalt Nodefile: -----" -cat $$COBALT_NODEFILE -echo "--------------------------" - -export JOBNAME="${jobname}" - -$user_script - -echo "End of Cobalt job" -''' diff --git a/parsl/tests/configs/cooley_htex.py b/parsl/tests/configs/cooley_htex.py deleted file mode 100644 index 202379d0af..0000000000 --- a/parsl/tests/configs/cooley_htex.py +++ /dev/null @@ -1,37 +0,0 @@ -# UNTESTED - -from parsl.config import Config -from parsl.executors import HighThroughputExecutor -from parsl.launchers import MpiRunLauncher -from parsl.providers import CobaltProvider - -# If you are a developer running tests, make sure to update parsl/tests/configs/user_opts.py -# If you are a user copying-and-pasting this as an example, make sure to either -# 1) create a local `user_opts.py`, or -# 2) delete the user_opts import below and replace all appearances of `user_opts` with the literal value -# (i.e., user_opts['swan']['username'] -> 'your_username') -from .user_opts import user_opts - -config = Config( - executors=[ - HighThroughputExecutor( - label="cooley_htex", - worker_debug=False, - cores_per_worker=1, - encrypted=True, - provider=CobaltProvider( - queue='debug', - account=user_opts['cooley']['account'], - launcher=MpiRunLauncher(), # UNTESTED COMPONENT - scheduler_options=user_opts['cooley']['scheduler_options'], - worker_init=user_opts['cooley']['worker_init'], - init_blocks=1, - max_blocks=1, - min_blocks=1, - nodes_per_block=4, - cmd_timeout=60, - walltime='00:10:00', - ), - ) - ] -) diff --git a/parsl/tests/configs/theta.py b/parsl/tests/configs/theta.py deleted file mode 100644 index 71ed25142b..0000000000 --- a/parsl/tests/configs/theta.py +++ /dev/null @@ -1,37 +0,0 @@ -from parsl.config import Config -from parsl.executors import HighThroughputExecutor -from parsl.launchers import AprunLauncher -from parsl.providers import CobaltProvider - -from .user_opts import user_opts - - -def fresh_config(): - return Config( - executors=[ - HighThroughputExecutor( - label='theta_local_htex_multinode', - max_workers_per_node=1, - encrypted=True, - provider=CobaltProvider( - queue=user_opts['theta']['queue'], - account=user_opts['theta']['account'], - launcher=AprunLauncher(overrides="-d 64"), - walltime='00:10:00', - nodes_per_block=2, - init_blocks=1, - max_blocks=1, - # string to prepend to #COBALT blocks in the submit - # script to the scheduler eg: '#COBALT -t 50' - scheduler_options='', - # Command to be run before starting a worker, such as: - # 'module load Anaconda; source activate parsl_env'. - worker_init=user_opts['theta']['worker_init'], - cmd_timeout=120, - ), - ) - ], - ) - - -config = fresh_config() diff --git a/parsl/tests/manual_tests/test_fan_in_out_htex_remote.py b/parsl/tests/manual_tests/test_fan_in_out_htex_remote.py deleted file mode 100644 index b780fd47b4..0000000000 --- a/parsl/tests/manual_tests/test_fan_in_out_htex_remote.py +++ /dev/null @@ -1,88 +0,0 @@ -import logging - -import parsl -from parsl.app.app import python_app -from parsl.config import Config -from parsl.executors import HighThroughputExecutor -from parsl.launchers import AprunLauncher -from parsl.monitoring.monitoring import MonitoringHub -from parsl.providers import CobaltProvider - - -def local_setup(): - threads_config = Config( - executors=[ - HighThroughputExecutor( - label="theta_htex", - # worker_debug=True, - cores_per_worker=4, - encrypted=True, - provider=CobaltProvider( - queue='debug-flat-quad', - account='CSC249ADCD01', - launcher=AprunLauncher(overrides="-d 64"), - worker_init='source activate parsl-issues', - init_blocks=1, - max_blocks=1, - min_blocks=1, - nodes_per_block=4, - cmd_timeout=60, - walltime='00:10:00', - ), - ) - ], - monitoring=MonitoringHub( - hub_port=55055, - logging_level=logging.DEBUG, - resource_monitoring_interval=10), - strategy='none') - parsl.load(threads_config) - - -def local_teardown(): - parsl.clear() - - -@python_app -def inc(x): - import time - start = time.time() - sleep_duration = 30.0 - while True: - x += 1 - end = time.time() - if end - start >= sleep_duration: - break - return x - - -@python_app -def add_inc(inputs=[]): - import time - start = time.time() - sleep_duration = 30.0 - res = sum(inputs) - while True: - res += 1 - end = time.time() - if end - start >= sleep_duration: - break - return res - - -if __name__ == "__main__": - - total = 200 - half = int(total / 2) - one_third = int(total / 3) - two_third = int(total / 3 * 2) - futures_1 = [inc(i) for i in range(total)] - futures_2 = [add_inc(inputs=futures_1[0:half]), - add_inc(inputs=futures_1[half:total])] - futures_3 = [inc(futures_2[0]) for _ in range(half)] + [inc(futures_2[1]) for _ in range(half)] - futures_4 = [add_inc(inputs=futures_3[0:one_third]), - add_inc(inputs=futures_3[one_third:two_third]), - add_inc(inputs=futures_3[two_third:total])] - - print([f.result() for f in futures_4]) - print("Done") diff --git a/parsl/tests/site_tests/site_config_selector.py b/parsl/tests/site_tests/site_config_selector.py index 8e41a9103f..921df197b9 100644 --- a/parsl/tests/site_tests/site_config_selector.py +++ b/parsl/tests/site_tests/site_config_selector.py @@ -7,12 +7,7 @@ def fresh_config(): hostname = os.getenv('PARSL_HOSTNAME', platform.uname().node) print("Loading config for {}".format(hostname)) - if 'thetalogin' in hostname: - from parsl.tests.configs.theta import fresh_config - config = fresh_config() - print("Loading Theta config") - - elif 'frontera' in hostname: + if 'frontera' in hostname: print("Loading Frontera config") from parsl.tests.configs.frontera import fresh_config config = fresh_config() diff --git a/parsl/tests/test_providers/test_cobalt_deprecation_warning.py b/parsl/tests/test_providers/test_cobalt_deprecation_warning.py deleted file mode 100644 index 249a4d90b9..0000000000 --- a/parsl/tests/test_providers/test_cobalt_deprecation_warning.py +++ /dev/null @@ -1,18 +0,0 @@ -import warnings - -import pytest - -from parsl.providers import CobaltProvider - - -@pytest.mark.local -def test_deprecation_warning(): - - with warnings.catch_warnings(record=True) as w: - warnings.simplefilter("always") - - CobaltProvider() - - assert len(w) == 1 - assert issubclass(w[-1].category, DeprecationWarning) - assert "CobaltProvider" in str(w[-1].message)