Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Respect pytest -s flag, streaming stdout/stderr as the test runs #200

Open
wants to merge 4 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions HISTORY.rst
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ Changelog
.. This document is user facing. Please word the changes in such a way
.. that users understand how the changes affect the new version.

unreleased
---------------------------
+ ``-s`` pytest flag is now supported, streaming stdout/stderr as the tests run

version 2.1.0
---------------------------
+ Python version 3.7 support is dropped because it is deprecated. Python
Expand Down
11 changes: 10 additions & 1 deletion src/pytest_workflow/content_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def check_content(strings: Iterable[str],

class ContentTestCollector(pytest.Collector):
def __init__(self, name: str, parent: pytest.Collector,
filepath: Path,
filepath: Optional[Path],
content_test: ContentTest,
workflow: Workflow,
content_name: Optional[str] = None):
Expand Down Expand Up @@ -105,6 +105,11 @@ def find_strings(self):
When a file we test is not produced, we save the FileNotFoundError so
we can give an accurate repr_failure."""
self.workflow.wait()

if self.filepath is None:
self.file_not_found = True
return

strings_to_check = (self.content_test.contains +
self.content_test.must_not_contain)
patterns_to_check = (self.content_test.contains_regex +
Expand Down Expand Up @@ -195,6 +200,10 @@ def __init__(self, parent: ContentTestCollector, string: str,
name = f"{contain} '{string}'"
super().__init__(name, parent=parent)
self.parent: ContentTestCollector = parent # explicitly declare type
assert self.parent.filepath is not None, (
f"Invalid test {content_name}, unknown file to validate. "
"This can happen if you specify stdout/stderr tests while "
"specifying a different capture method.")
self.should_contain = should_contain
self.string = string
self.content_name = content_name
Expand Down
3 changes: 2 additions & 1 deletion src/pytest_workflow/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,8 @@ def queue_workflow(self):
workflow = Workflow(command=self.workflow_test.command,
cwd=tempdir,
name=self.workflow_test.name,
desired_exit_code=self.workflow_test.exit_code)
desired_exit_code=self.workflow_test.exit_code,
capture=self.config.getoption("capture"))

# Add the workflow to the workflow queue.
self.config.workflow_queue.put(workflow)
Expand Down
61 changes: 45 additions & 16 deletions src/pytest_workflow/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,11 @@
import threading
import time
from pathlib import Path
from typing import List, Optional
from typing import List, Literal, Optional


# pytest does not export this type
CaptureMethod = Literal["fd", "sys", "no", "tee-sys"]


class Workflow(object):
Expand All @@ -36,7 +40,8 @@ def __init__(self,
command: str,
cwd: Optional[Path] = None,
name: Optional[str] = None,
desired_exit_code: int = 0):
desired_exit_code: int = 0,
capture: CaptureMethod = "fd"):
"""
Initiates a workflow object
:param command: The string that represents the command to be run
Expand All @@ -48,24 +53,35 @@ def __init__(self,
if command == "":
raise ValueError("command can not be an empty string")
self.command = command

# Always ensure a name. command.split()[0] can't fail because we tested
# for emptiness.
self.name = name or command.split()[0]
self.cwd = cwd or Path()

# For long running workflows it is best to save the stdout and stderr
# to a file which can be checked with ``tail -f``.
# stdout and stderr will be written to a tempfile if no CWD is given
# to prevent clutter created when testing.
self.stdout_file = (
Path(tempfile.NamedTemporaryFile(prefix=self.name,
suffix=".out").name)
if cwd is None
else self.cwd / Path("log.out"))
self.stderr_file = (
Path(tempfile.NamedTemporaryFile(prefix=self.name,
suffix=".err").name)
if cwd is None
else self.cwd / Path("log.err"))
supported_capture_methods = ["no", "fd"]
if capture not in supported_capture_methods:
raise ValueError("only capture methods "
f"{supported_capture_methods} are supported, "
f"found {capture}")
self.capture = capture
self.stdout_file = None
self.stderr_file = None
if self.capture != "no":
self.stdout_file = (
Path(tempfile.NamedTemporaryFile(prefix=self.name,
suffix=".out").name)
if cwd is None
else self.cwd / Path("log.out"))
self.stderr_file = (
Path(tempfile.NamedTemporaryFile(prefix=self.name,
suffix=".err").name)
if cwd is None
else self.cwd / Path("log.err"))
self._popen: Optional[subprocess.Popen] = None
self._started = False
self.errors: List[Exception] = []
Expand All @@ -79,9 +95,12 @@ def start(self):
# is started from multiple threads.
with self.start_lock:
if not self._started:
stdout_h = None
stderr_h = None
try:
stdout_h = self.stdout_file.open('wb')
stderr_h = self.stderr_file.open('wb')
if self.capture != "no":
stdout_h = self.stdout_file.open('wb')
stderr_h = self.stderr_file.open('wb')
sub_process_args = shlex.split(self.command)
self._popen = subprocess.Popen(
sub_process_args, stdout=stdout_h,
Expand All @@ -91,8 +110,10 @@ def start(self):
self.errors.append(error)
finally:
self._started = True
stdout_h.close()
stderr_h.close()
if stdout_h is not None:
stdout_h.close()
if stderr_h is not None:
stderr_h.close()
else:
raise ValueError("Workflows can only be started once")

Expand Down Expand Up @@ -148,12 +169,20 @@ def matching_exitcode(self) -> bool:
@property
def stdout(self) -> bytes:
self.wait()
if self.stdout_file is None:
raise ValueError(
f"Stdout not available with capture={self.capture}"
)
with self.stdout_file.open('rb') as stdout:
return stdout.read()

@property
def stderr(self) -> bytes:
self.wait()
if self.stderr_file is None:
raise ValueError(
f"Stdout not available with capture={self.capture}"
)
with self.stderr_file.open('rb') as stderr:
return stderr.read()

Expand Down
16 changes: 16 additions & 0 deletions tests/test_fail_messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,3 +179,19 @@ def test_messages_exitcode(test: str, message: str, pytester):
# possible due to multiple levels of process launching.
result = pytester.runpytest("-v", "--sb", "5")
assert message in result.stdout.str()


def test_invalid_test_capture(pytester):
test_yml_contents = """\
- name: tee test
command: bash -c "echo foo"
stdout:
contains:
- foo
"""
pytester.makefile(".yml", textwrap.dedent(test_yml_contents))
result = pytester.runpytest("-v", "-s")
assert (
"Invalid test 'tee test': stdout, unknown file to validate"
in result.stdout.str()
)
14 changes: 14 additions & 0 deletions tests/test_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,3 +105,17 @@ def test_workflow_matching_exit_code():
workflow2 = Workflow("grep", desired_exit_code=2)
workflow2.run()
assert workflow2.matching_exitcode()


def test_capture_unsupported():
with pytest.raises(ValueError) as error:
Workflow("echo moo", capture="tee-sys")
error.match("only capture methods")


def test_capture_no():
workflow = Workflow("echo moo", capture="no")
workflow.run()
with pytest.raises(ValueError) as error:
workflow.stdout
error.match("Stdout not available")