Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move execute_wait from vestigial LocalChannel into parsl.utils #3705

Merged
merged 3 commits into from
Dec 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 1 addition & 33 deletions parsl/channels/base.py
Original file line number Diff line number Diff line change
@@ -1,39 +1,7 @@
from abc import ABCMeta, abstractmethod, abstractproperty
from typing import Tuple
from abc import ABCMeta, abstractproperty


class Channel(metaclass=ABCMeta):
"""Channels are abstractions that enable ExecutionProviders to talk to
resource managers of remote compute facilities.

For certain resources such as campus clusters or supercomputers at
research laboratories, resource requirements may require authentication.

The only remaining Channel, *LocalChannel*, executes commands locally in a
shell.

Channels provide the ability to execute commands remotely, using the
execute_wait method, and manipulate the remote file system using methods
such as push_file, pull_file and makedirs.

Channels should ensure that each launched command runs in a new process
group, so that providers (such as LocalProvider) which terminate long
running commands using process groups can do so.
"""

@abstractmethod
def execute_wait(self, cmd: str, walltime: int = 0) -> Tuple[int, str, str]:
''' Executes the cmd, with a defined walltime.

Args:
- cmd (string): Command string to execute over the channel
- walltime (int) : Timeout in seconds

Returns:
- (exit_code, stdout, stderr) (int, string, string)
'''
pass

@abstractproperty
def script_dir(self) -> str:
''' This is a property. Returns the directory assigned for storing all internal scripts such as
Expand Down
35 changes: 0 additions & 35 deletions parsl/channels/local/local.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import logging
import os
import subprocess

from parsl.channels.base import Channel
from parsl.utils import RepresentationMixin
Expand All @@ -21,40 +20,6 @@ def __init__(self):
'''
self.script_dir = None

def execute_wait(self, cmd, walltime=None):
''' Synchronously execute a commandline string on the shell.

Args:
- cmd (string) : Commandline string to execute
- walltime (int) : walltime in seconds

Returns:
- retcode : Return code from the execution
- stdout : stdout string
- stderr : stderr string
'''
try:
logger.debug("Creating process with command '%s'", cmd)
proc = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
shell=True,
preexec_fn=os.setpgrp
)
logger.debug("Created process with pid %s. Performing communicate", proc.pid)
(stdout, stderr) = proc.communicate(timeout=walltime)
retcode = proc.returncode
logger.debug("Process %s returned %s", proc.pid, proc.returncode)

except Exception:
logger.exception(f"Execution of command failed:\n{cmd}")
raise
else:
logger.debug("Execution of command in process %s completed normally", proc.pid)

return (retcode, stdout.decode("utf-8"), stderr.decode("utf-8"))

@property
def script_dir(self):
return self._script_dir
Expand Down
3 changes: 2 additions & 1 deletion parsl/providers/cluster_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from parsl.launchers.errors import BadLauncher
from parsl.providers.base import ExecutionProvider
from parsl.providers.errors import SchedulerMissingArgs, ScriptPathError
from parsl.utils import execute_wait

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -76,7 +77,7 @@ def execute_wait(self, cmd, timeout=None):
t = self.cmd_timeout
if timeout is not None:
t = timeout
return self.channel.execute_wait(cmd, t)
return execute_wait(cmd, t)

def _write_submit_script(self, template, script_filename, job_name, configs):
"""Generate submit script and write it to a file.
Expand Down
10 changes: 5 additions & 5 deletions parsl/providers/local/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
ScriptPathError,
SubmitException,
)
from parsl.utils import RepresentationMixin
from parsl.utils import RepresentationMixin, execute_wait

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -118,7 +118,7 @@ def status(self, job_ids):
return [self.resources[jid]['status'] for jid in job_ids]

def _is_alive(self, job_dict):
retcode, stdout, stderr = self.channel.execute_wait(
retcode, stdout, stderr = execute_wait(
'ps -p {} > /dev/null 2> /dev/null; echo "STATUS:$?" '.format(
job_dict['remote_pid']), self.cmd_timeout)
for line in stdout.split('\n'):
Expand Down Expand Up @@ -223,11 +223,11 @@ def submit(self, command, tasks_per_node, job_name="parsl.localprovider"):
# cancel the task later.
#
# We need to do the >/dev/null 2>&1 so that bash closes stdout, otherwise
# channel.execute_wait hangs reading the process stdout until all the
# execute_wait hangs reading the process stdout until all the
# background commands complete.
cmd = '/bin/bash -c \'echo - >{0}.ec && {{ {{ bash {0} 1>{0}.out 2>{0}.err ; ' \
'echo $? > {0}.ec ; }} >/dev/null 2>&1 & echo "PID:$!" ; }}\''.format(script_path)
retcode, stdout, stderr = self.channel.execute_wait(cmd, self.cmd_timeout)
retcode, stdout, stderr = execute_wait(cmd, self.cmd_timeout)
if retcode != 0:
raise SubmitException(job_name, "Launch command exited with code {0}".format(retcode),
stdout, stderr)
Expand Down Expand Up @@ -258,7 +258,7 @@ def cancel(self, job_ids):
job_dict['cancelled'] = True
logger.debug("Terminating job/process ID: {0}".format(job))
cmd = "kill -- -$(ps -o pgid= {} | grep -o '[0-9]*')".format(job_dict['remote_pid'])
retcode, stdout, stderr = self.channel.execute_wait(cmd, self.cmd_timeout)
retcode, stdout, stderr = execute_wait(cmd, self.cmd_timeout)
if retcode != 0:
logger.warning("Failed to kill PID: {} and child processes on {}".format(job_dict['remote_pid'],
self.label))
Expand Down
Empty file.
22 changes: 0 additions & 22 deletions parsl/tests/test_channels/test_large_output.py

This file was deleted.

19 changes: 0 additions & 19 deletions parsl/tests/test_channels/test_local_channel.py

This file was deleted.

35 changes: 35 additions & 0 deletions parsl/tests/test_utils/test_execute_wait.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import pytest

from parsl.utils import execute_wait


@pytest.mark.local
def test_env():
''' Regression testing for issue #27
'''

rc, stdout, stderr = execute_wait("env", 1)

stdout = stdout.split('\n')
x = [s for s in stdout if s.startswith("PATH=")]
assert x, "PATH not found"

x = [s for s in stdout if s.startswith("HOME=")]
assert x, "HOME not found"
Comment on lines +6 to +18
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I get this is just moving the test, but I also don't understand what it's testing. What does this have to do with the referenced issue #27, which looks related to documentation?

Meanwhile, PATH/HOME not found is restating the logic. Is the goal to just ensure we're executing something successfully? If so, something like Did not execute: canary value missing! might be a better assertion message. Or is the environment actually important to verify? I don't see any reference to a special environment in execute_wait ... ?

And of course, a simplification of the test itself might be:

assert any(s.startswith("PATH=") for s in stdout), "Bad environment!"
...

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The #27 is a red herring -- around 2017 there was a somewhat misguided activity to separate out the provider layer into its own github repo called libsubmit. This refers to libsubmit issue 27: Parsl/libsubmit#27.

Go back far enough in history and the commit that introduces this test does stuff in the libsubmit/ subdirectory from the subsequently re-merged libsubmit history.

I'm not going to tidy these tests any time soon - this PR sequence really is meant to be moving, deleting and then moving on.



@pytest.mark.local
def test_large_output_2210():
"""Regression test for #2210.
execute_wait was hanging if the specified command gave too
much output, due to a race condition between process exiting and
pipes filling up.
"""

# this will output 128kb of stdout
execute_wait("yes | dd count=128 bs=1024", walltime=60)

# if this test fails, execute_wait should raise a timeout
# exception.

# The contents out the output is not verified by this test
Comment on lines +22 to +35
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar question (and understanding of simply moving) as to the progeny of this test, though it's utility is more clear to me.

Meanwhile I'm unsure of the value of these particular numbers. Why stop at "only" 128K? If it's going to hang, perhaps we make sure it does so we catch it, nominally with a much higher value. 1G would be overkill enough: yes | dd count=1K bs=1M

Similarly, if we're going to fail, this should fail super quick, why wait for a full minute? 5s should be beyond plenty of time, no?

35 changes: 35 additions & 0 deletions parsl/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -458,3 +458,38 @@ def sanitize_dns_subdomain_rfc1123(raw_string: str) -> str:
raise ValueError(f"Sanitized DNS subdomain is empty for input '{raw_string}'")

return sanitized


def execute_wait(cmd: str, walltime: Optional[int] = None) -> Tuple[int, str, str]:
''' Synchronously execute a commandline string on the shell.

Args:
- cmd (string) : Commandline string to execute
- walltime (int) : walltime in seconds

Returns:
- retcode : Return code from the execution
- stdout : stdout string
- stderr : stderr string
'''
try:
logger.debug("Creating process with command '%s'", cmd)
proc = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
shell=True,
preexec_fn=os.setpgrp
)
logger.debug("Created process with pid %s. Performing communicate", proc.pid)
(stdout, stderr) = proc.communicate(timeout=walltime)
retcode = proc.returncode
logger.debug("Process %s returned %s", proc.pid, proc.returncode)

except Exception:
logger.exception(f"Execution of command failed:\n{cmd}")
raise
else:
logger.debug("Execution of command in process %s completed normally", proc.pid)

return (retcode, stdout.decode("utf-8"), stderr.decode("utf-8"))