Implement basic version of local shell

Also added two simple reporters that can be used by the shell to report command execution details.

Signed-off-by: Vladimir Domnich <v.domnich@yadro.com>
This commit is contained in:
Vladimir Domnich 2022-08-24 19:01:07 +04:00 committed by Vladimir
parent c0bbfd1705
commit f6ee129354
11 changed files with 453 additions and 0 deletions

14
reporter/__init__.py Normal file
View file

@ -0,0 +1,14 @@
import os
from .allure_reporter import AllureReporter
from .interfaces import Reporter
from .dummy_reporter import DummyReporter
def get_reporter() -> Reporter:
# TODO: in scope of reporter implementation task here we will have extendable
# solution for configuring and providing reporter for the library
if os.getenv("TESTLIB_REPORTER_TYPE", "DUMMY") == "DUMMY":
return DummyReporter()
else:
return AllureReporter()

View file

@ -0,0 +1,36 @@
import os
from contextlib import AbstractContextManager
from textwrap import shorten
from typing import Any
import allure
from allure import attachment_type
from .interfaces import Reporter
class AllureReporter(Reporter):
"""
Implements storing of test artifacts in Allure report.
"""
def step(self, name: str) -> AbstractContextManager:
name = shorten(name, width=70, placeholder="...")
return allure.step(name)
def attach(self, body: Any, file_name: str) -> None:
attachment_name, extension = os.path.splitext(file_name)
attachment_type = self._resolve_attachment_type(extension)
allure.attach(body, attachment_name, attachment_type)
def _resolve_attachment_type(self, extension: str) -> attachment_type:
"""
Try to find matching Allure attachment type by extension. If no match was found,
default to TXT format.
"""
extension = extension.lower()
return next(
(allure_type for allure_type in attachment_type if allure_type.extension == extension),
attachment_type.TXT,
)

View file

@ -0,0 +1,21 @@
from contextlib import AbstractContextManager, contextmanager
from typing import Any
from .interfaces import Reporter
@contextmanager
def _dummy_step():
yield
class DummyReporter(Reporter):
"""
Dummy implementation of reporter, does not store artifacts anywhere.
"""
def step(self, name: str) -> AbstractContextManager:
return _dummy_step()
def attach(self, content: Any, file_name: str) -> None:
pass

29
reporter/interfaces.py Normal file
View file

@ -0,0 +1,29 @@
from abc import ABC, abstractmethod
from contextlib import AbstractContextManager
from typing import Any
class Reporter(ABC):
"""
Interface that supports storage of test artifacts in some reporting tool.
"""
@abstractmethod
def step(self, name: str) -> AbstractContextManager:
"""
Register a new step in test execution.
:param str name: Name of the step
:return: step context
"""
pass
@abstractmethod
def attach(self, content: Any, file_name: str) -> None:
"""
Attach specified content with given file name to the test report.
:param any name: content to attach. If not a string, it will be converted to a string.
:param str file_name: file name of attachment.
"""
pass

2
requirements.txt Normal file
View file

@ -0,0 +1,2 @@
allure-python-commons==2.9.45
pexpect==4.8.0

0
shell/__init__.py Normal file
View file

59
shell/interfaces.py Normal file
View file

@ -0,0 +1,59 @@
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Optional
@dataclass
class InteractiveInput:
"""
Interactive input for a shell command.
:attr str prompt_pattern: regular expression that defines expected prompt from the command.
:attr str input: user input that should be supplied to the command in response to the prompt.
"""
prompt_pattern: str
input: str
@dataclass
class CommandOptions:
"""
Options that control command execution.
:attr list interactive_inputs: user inputs that should be interactively supplied to
the command during its' execution.
:attr int timeout: timeout for command execution (in seconds).
:attr bool check: controls whether to check return code of the command. Set to False to
ignore non-zero return codes.
"""
interactive_inputs: Optional[list[InteractiveInput]] = None
timeout: int = 30
check: bool = True
@dataclass
class CommandResult:
"""
Represents a result of a command executed via shell.
"""
stdout: str
stderr: str
return_code: int
class Shell(ABC):
"""
Interface of a command shell on some system (local or remote).
"""
@abstractmethod
def exec(self, command: str, options: Optional[CommandOptions] = None) -> CommandResult:
"""
Executes specified command on this shell. To execute interactive command, user inputs
should be specified in *options*.
:param str command: command to execute on the shell.
:param CommandOptions options: options that control command execution.
:return command result.
"""
pass

159
shell/local_shell.py Normal file
View file

@ -0,0 +1,159 @@
import logging
import subprocess
import tempfile
from datetime import datetime
from typing import IO, Optional
import pexpect
from reporter import get_reporter
from shell.interfaces import CommandOptions, CommandResult, Shell
logger = logging.getLogger("neofs.testlib.shell")
reporter = get_reporter()
class LocalShell(Shell):
def exec(self, command: str, options: Optional[CommandOptions] = None) -> CommandResult:
# If no options were provided, use default options
options = options or CommandOptions()
logger.info(f"Executing command: {command}")
if options.interactive_inputs:
return self._exec_interactive(command, options)
return self._exec_non_interactive(command, options)
def _exec_interactive(self, command: str, options: CommandOptions) -> CommandResult:
start_time = datetime.utcnow()
log_file = tempfile.TemporaryFile() # File is reliable cross-platform way to capture output
result = None
command_process = None
try:
command_process = pexpect.spawn(command, timeout=options.timeout)
command_process.delaybeforesend = 1
command_process.logfile_read = log_file
for interactive_input in options.interactive_inputs:
command_process.expect(interactive_input.prompt_pattern)
command_process.sendline(interactive_input.input)
result = self._get_pexpect_process_result(command_process, command)
if options.check and result.return_code != 0:
raise RuntimeError(f"Command: {command}\nreturn code: {result.return_code}\nOutput: {result.stdout}")
return result
except pexpect.ExceptionPexpect as exc:
result = self._get_pexpect_process_result(command_process, command)
message = f"Command: {command}\nreturn code: {result.return_code}\nOutput: {result.stdout}"
if options.check:
raise RuntimeError(message) from exc
else:
logger.exception(message)
return result
except OSError as exc:
result = self._get_pexpect_process_result(command_process, command)
message = f"Command: {command}\nreturn code: {result.return_code}\nOutput: {exc.strerror}"
if options.check:
raise RuntimeError(message) from exc
else:
logger.exception(message)
return result
except Exception:
result = self._get_pexpect_process_result(command_process, command)
raise
finally:
log_file.close()
end_time = datetime.utcnow()
self._report_command_result(command, start_time, end_time, result)
def _exec_non_interactive(self, command: str, options: CommandOptions) -> CommandResult:
start_time = datetime.utcnow()
result = None
try:
command_process = subprocess.run(
command,
check=options.check,
universal_newlines=True,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
timeout=options.timeout,
shell=True
)
result = CommandResult(
stdout=command_process.stdout or "",
stderr=command_process.stderr or "",
return_code=command_process.returncode,
)
return result
except subprocess.CalledProcessError as exc:
# TODO: always set check flag to false and capture command result normally
result = self._get_failing_command_result(command)
raise RuntimeError(f"Command: {command}\nError:\n"
f"return code: {exc.returncode}\n"
f"output: {exc.output}") from exc
except OSError as exc:
raise RuntimeError(f"Command: {command}\nOutput: {exc.strerror}") from exc
except Exception as exc:
result = self._get_failing_command_result(command)
raise
finally:
end_time = datetime.utcnow()
self._report_command_result(command, start_time, end_time, result)
def _get_failing_command_result(self, command: str) -> CommandResult:
return_code, cmd_output = subprocess.getstatusoutput(command)
return CommandResult(
stdout=cmd_output,
stderr="",
return_code=return_code
)
def _get_pexpect_process_result(self, command_process: Optional[pexpect.spawn],
command: str) -> CommandResult:
"""
If command process is not None, captures output of this process.
If command process is None, then command fails when we attempt to start it, in this case
we use regular non-interactive process to get it's output.
"""
if command_process is None:
return self._get_failing_command_result(command)
# Wait for child process to end it's work
if command_process.isalive():
command_process.expect(pexpect.EOF)
# Close the process to obtain the exit code
command_process.close()
return_code = command_process.exitstatus
# Capture output from the log file
log_file: IO[bytes] = command_process.logfile_read
log_file.seek(0)
output = log_file.read().decode()
return CommandResult(stdout=output, stderr="", return_code=return_code)
def _report_command_result(self, command: str, start_time: datetime, end_time: datetime,
result: Optional[CommandResult]) -> None:
# TODO: increase logging level if return code is non 0, should be warning at least
logger.info(
f"Command: {command}\n"
f"{'Success:' if result and result.return_code == 0 else 'Error:'}\n"
f"return code: {result.return_code if result else ''} "
f"\nOutput: {result.stdout if result else ''}")
if result:
elapsed_time = end_time - start_time
command_attachment = (
f"COMMAND: {command}\n"
f"RETCODE: {result.return_code}\n\n"
f"STDOUT:\n{result.stdout}\n"
f"STDERR:\n{result.stderr}\n"
f"Start / End / Elapsed\t {start_time.time()} / {end_time.time()} / {elapsed_time}"
)
with reporter.step(f"COMMAND: {command}"):
reporter.attach(command_attachment, "Command execution.txt")

9
tests/helpers.py Normal file
View file

@ -0,0 +1,9 @@
import traceback
def format_error_details(error: Exception) -> str:
return "".join(traceback.format_exception(
etype=type(error),
value=error,
tb=error.__traceback__)
)

View file

@ -0,0 +1,78 @@
from unittest import TestCase
from shell.interfaces import CommandOptions, InteractiveInput
from shell.local_shell import LocalShell
from tests.helpers import format_error_details
class TestLocalShellInteractive(TestCase):
@classmethod
def setUpClass(cls):
cls.shell = LocalShell()
def test_command_with_one_prompt(self):
script = "password = input('Password: '); print(password)"
inputs = [InteractiveInput(prompt_pattern="Password", input="test")]
result = self.shell.exec(
f"python -c \"{script}\"",
CommandOptions(interactive_inputs=inputs)
)
self.assertEqual(0, result.return_code)
self.assertOutputLines(["Password: test", "test"], result.stdout)
self.assertEqual("", result.stderr)
def test_command_with_several_prompts(self):
script = (
"input1 = input('Input1: '); print(input1); "
"input2 = input('Input2: '); print(input2)"
)
inputs = [
InteractiveInput(prompt_pattern="Input1", input="test1"),
InteractiveInput(prompt_pattern="Input2", input="test2"),
]
result = self.shell.exec(
f"python -c \"{script}\"",
CommandOptions(interactive_inputs=inputs)
)
self.assertEqual(0, result.return_code)
self.assertOutputLines(["Input1: test1", "test1", "Input2: test2", "test2"], result.stdout)
self.assertEqual("", result.stderr)
def test_failed_command_with_check(self):
script = "invalid script"
inputs = [InteractiveInput(prompt_pattern=".*", input="test")]
with self.assertRaises(RuntimeError) as exc:
self.shell.exec(f"python -c \"{script}\"", CommandOptions(interactive_inputs=inputs))
error = format_error_details(exc.exception)
self.assertIn("Error", error)
# TODO: it would be nice to have return code as well
# self.assertIn("return code: 1", error)
def test_failed_command_without_check(self):
script = "invalid script"
inputs = [InteractiveInput(prompt_pattern=".*", input="test")]
result = self.shell.exec(
f"python -c \"{script}\"",
CommandOptions(interactive_inputs=inputs, check=False),
)
self.assertEqual(1, result.return_code)
def test_non_existing_binary(self):
inputs = [InteractiveInput(prompt_pattern=".*", input="test")]
with self.assertRaises(RuntimeError) as exc:
self.shell.exec("not-a-command", CommandOptions(interactive_inputs=inputs))
error = format_error_details(exc.exception)
self.assertIn("command was not found or was not executable", error)
def assertOutputLines(self, expected_lines: list[str], output: str) -> None:
output_lines = [line.strip() for line in output.split("\n") if line.strip()]
self.assertEqual(expected_lines, output_lines)

View file

@ -0,0 +1,46 @@
from unittest import TestCase
from shell.interfaces import CommandOptions
from shell.local_shell import LocalShell
from tests.helpers import format_error_details
class TestLocalShellNonInteractive(TestCase):
@classmethod
def setUpClass(cls):
cls.shell = LocalShell()
def test_successful_command(self):
script = "print('test')"
result = self.shell.exec(f"python -c \"{script}\"")
self.assertEqual(0, result.return_code)
self.assertEqual("test", result.stdout.strip())
self.assertEqual("", result.stderr)
def test_failed_command_with_check(self):
script = "invalid script"
with self.assertRaises(RuntimeError) as exc:
self.shell.exec(f"python -c \"{script}\"")
error = format_error_details(exc.exception)
self.assertIn("Error", error)
self.assertIn("return code: 1", error)
def test_failed_command_without_check(self):
script = "invalid script"
result = self.shell.exec(f"python -c \"{script}\"", CommandOptions(check=False))
self.assertEqual(1, result.return_code)
self.assertIn("Error", result.stdout)
def test_non_existing_binary(self):
with self.assertRaises(RuntimeError) as exc:
self.shell.exec(f"not-a-command")
error = format_error_details(exc.exception)
self.assertIn("Error", error)
self.assertIn("return code: 127", error)