diff --git a/parsl/channels/__init__.py b/parsl/channels/__init__.py deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/parsl/channels/local/__init__.py b/parsl/channels/local/__init__.py deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/parsl/channels/local/local.py b/parsl/channels/local/local.py deleted file mode 100644 index feecfa0358..0000000000 --- a/parsl/channels/local/local.py +++ /dev/null @@ -1,40 +0,0 @@ -import logging -import os -import subprocess - -logger = logging.getLogger(__name__) - - -def execute_wait(cmd, walltime=None): - ''' Synchronously execute a commandline string on the shell. - - Args: - - cmd (string) : Commandline string to execute - - walltime (int) : walltime in seconds - - Returns: - - retcode : Return code from the execution - - stdout : stdout string - - stderr : stderr string - ''' - try: - logger.debug("Creating process with command '%s'", cmd) - proc = subprocess.Popen( - cmd, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - shell=True, - preexec_fn=os.setpgrp - ) - logger.debug("Created process with pid %s. Performing communicate", proc.pid) - (stdout, stderr) = proc.communicate(timeout=walltime) - retcode = proc.returncode - logger.debug("Process %s returned %s", proc.pid, proc.returncode) - - except Exception: - logger.exception(f"Execution of command failed:\n{cmd}") - raise - else: - logger.debug("Execution of command in process %s completed normally", proc.pid) - - return (retcode, stdout.decode("utf-8"), stderr.decode("utf-8")) diff --git a/parsl/providers/cluster_provider.py b/parsl/providers/cluster_provider.py index 5e46e7fbfb..db7ef9eaac 100644 --- a/parsl/providers/cluster_provider.py +++ b/parsl/providers/cluster_provider.py @@ -2,11 +2,11 @@ from abc import abstractmethod from string import Template -from parsl.channels.local.local import execute_wait from parsl.launchers.base import Launcher from parsl.launchers.errors import BadLauncher from parsl.providers.base import ExecutionProvider from parsl.providers.errors import SchedulerMissingArgs, ScriptPathError +from parsl.utils import execute_wait logger = logging.getLogger(__name__) diff --git a/parsl/providers/local/local.py b/parsl/providers/local/local.py index bce7fa78db..55994c31c3 100644 --- a/parsl/providers/local/local.py +++ b/parsl/providers/local/local.py @@ -2,7 +2,6 @@ import os import time -from parsl.channels.local.local import execute_wait from parsl.jobs.states import JobState, JobStatus from parsl.launchers import SingleNodeLauncher from parsl.providers.base import ExecutionProvider @@ -11,7 +10,7 @@ ScriptPathError, SubmitException, ) -from parsl.utils import RepresentationMixin +from parsl.utils import RepresentationMixin, execute_wait logger = logging.getLogger(__name__) diff --git a/parsl/tests/test_channels/test_large_output.py b/parsl/tests/test_channels/test_large_output.py index 4c9b7e1021..0558e600c7 100644 --- a/parsl/tests/test_channels/test_large_output.py +++ b/parsl/tests/test_channels/test_large_output.py @@ -1,6 +1,6 @@ import pytest -from parsl.channels.local.local import execute_wait +from parsl.utils import execute_wait @pytest.mark.local diff --git a/parsl/tests/test_channels/test_local_channel.py b/parsl/tests/test_channels/test_local_channel.py index 0c9190e855..6564bacb22 100644 --- a/parsl/tests/test_channels/test_local_channel.py +++ b/parsl/tests/test_channels/test_local_channel.py @@ -1,6 +1,6 @@ import pytest -from parsl.channels.local.local import execute_wait +from parsl.utils import execute_wait @pytest.mark.local diff --git a/parsl/utils.py b/parsl/utils.py index 0ea5d7d9eb..681b677b45 100644 --- a/parsl/utils.py +++ b/parsl/utils.py @@ -458,3 +458,38 @@ def sanitize_dns_subdomain_rfc1123(raw_string: str) -> str: raise ValueError(f"Sanitized DNS subdomain is empty for input '{raw_string}'") return sanitized + + +def execute_wait(cmd: str, walltime: Union[float, int, None] = None) -> Tuple[int, str, str]: + ''' Synchronously execute a commandline string on the shell. + + Args: + - cmd (string) : Commandline string to execute + - walltime (int) : walltime in seconds + + Returns: + - retcode : Return code from the execution + - stdout : stdout string + - stderr : stderr string + ''' + try: + logger.debug("Creating process with command '%s'", cmd) + proc = subprocess.Popen( + cmd, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + shell=True, + preexec_fn=os.setpgrp + ) + logger.debug("Created process with pid %s. Performing communicate", proc.pid) + (stdout, stderr) = proc.communicate(timeout=walltime) + retcode = proc.returncode + logger.debug("Process %s returned %s", proc.pid, proc.returncode) + + except Exception: + logger.exception(f"Execution of command failed:\n{cmd}") + raise + else: + logger.debug("Execution of command in process %s completed normally", proc.pid) + + return (retcode, stdout.decode("utf-8"), stderr.decode("utf-8"))