From f6ee129354a11abe25ceacfce8a5a4a23252d29d Mon Sep 17 00:00:00 2001 From: Vladimir Domnich Date: Wed, 24 Aug 2022 19:01:07 +0400 Subject: [PATCH] Implement basic version of local shell Also added two simple reporters that can be used by the shell to report command execution details. Signed-off-by: Vladimir Domnich --- reporter/__init__.py | 14 ++ reporter/allure_reporter.py | 36 +++++ reporter/dummy_reporter.py | 21 +++ reporter/interfaces.py | 29 ++++ requirements.txt | 2 + shell/__init__.py | 0 shell/interfaces.py | 59 ++++++++ shell/local_shell.py | 159 ++++++++++++++++++++++ tests/helpers.py | 9 ++ tests/test_local_shell_interactive.py | 78 +++++++++++ tests/test_local_shell_non_interactive.py | 46 +++++++ 11 files changed, 453 insertions(+) create mode 100644 reporter/__init__.py create mode 100644 reporter/allure_reporter.py create mode 100644 reporter/dummy_reporter.py create mode 100644 reporter/interfaces.py create mode 100644 requirements.txt create mode 100644 shell/__init__.py create mode 100644 shell/interfaces.py create mode 100644 shell/local_shell.py create mode 100644 tests/helpers.py create mode 100644 tests/test_local_shell_interactive.py create mode 100644 tests/test_local_shell_non_interactive.py diff --git a/reporter/__init__.py b/reporter/__init__.py new file mode 100644 index 0000000..d312fcc --- /dev/null +++ b/reporter/__init__.py @@ -0,0 +1,14 @@ +import os + +from .allure_reporter import AllureReporter +from .interfaces import Reporter +from .dummy_reporter import DummyReporter + + +def get_reporter() -> Reporter: + # TODO: in scope of reporter implementation task here we will have extendable + # solution for configuring and providing reporter for the library + if os.getenv("TESTLIB_REPORTER_TYPE", "DUMMY") == "DUMMY": + return DummyReporter() + else: + return AllureReporter() diff --git a/reporter/allure_reporter.py b/reporter/allure_reporter.py new file mode 100644 index 0000000..6522859 --- /dev/null +++ b/reporter/allure_reporter.py @@ -0,0 +1,36 @@ +import os +from contextlib import AbstractContextManager +from textwrap import shorten +from typing import Any + +import allure +from allure import attachment_type + +from .interfaces import Reporter + + +class AllureReporter(Reporter): + """ + Implements storing of test artifacts in Allure report. + """ + + def step(self, name: str) -> AbstractContextManager: + name = shorten(name, width=70, placeholder="...") + return allure.step(name) + + def attach(self, body: Any, file_name: str) -> None: + attachment_name, extension = os.path.splitext(file_name) + attachment_type = self._resolve_attachment_type(extension) + + allure.attach(body, attachment_name, attachment_type) + + def _resolve_attachment_type(self, extension: str) -> attachment_type: + """ + Try to find matching Allure attachment type by extension. If no match was found, + default to TXT format. + """ + extension = extension.lower() + return next( + (allure_type for allure_type in attachment_type if allure_type.extension == extension), + attachment_type.TXT, + ) diff --git a/reporter/dummy_reporter.py b/reporter/dummy_reporter.py new file mode 100644 index 0000000..e559193 --- /dev/null +++ b/reporter/dummy_reporter.py @@ -0,0 +1,21 @@ +from contextlib import AbstractContextManager, contextmanager +from typing import Any + +from .interfaces import Reporter + + +@contextmanager +def _dummy_step(): + yield + + +class DummyReporter(Reporter): + """ + Dummy implementation of reporter, does not store artifacts anywhere. + """ + + def step(self, name: str) -> AbstractContextManager: + return _dummy_step() + + def attach(self, content: Any, file_name: str) -> None: + pass diff --git a/reporter/interfaces.py b/reporter/interfaces.py new file mode 100644 index 0000000..de7bcb7 --- /dev/null +++ b/reporter/interfaces.py @@ -0,0 +1,29 @@ +from abc import ABC, abstractmethod +from contextlib import AbstractContextManager +from typing import Any + + +class Reporter(ABC): + """ + Interface that supports storage of test artifacts in some reporting tool. + """ + + @abstractmethod + def step(self, name: str) -> AbstractContextManager: + """ + Register a new step in test execution. + + :param str name: Name of the step + :return: step context + """ + pass + + @abstractmethod + def attach(self, content: Any, file_name: str) -> None: + """ + Attach specified content with given file name to the test report. + + :param any name: content to attach. If not a string, it will be converted to a string. + :param str file_name: file name of attachment. + """ + pass diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..babc3a7 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,2 @@ +allure-python-commons==2.9.45 +pexpect==4.8.0 diff --git a/shell/__init__.py b/shell/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/shell/interfaces.py b/shell/interfaces.py new file mode 100644 index 0000000..1e194dd --- /dev/null +++ b/shell/interfaces.py @@ -0,0 +1,59 @@ +from abc import ABC, abstractmethod +from dataclasses import dataclass +from typing import Optional + + +@dataclass +class InteractiveInput: + """ + Interactive input for a shell command. + + :attr str prompt_pattern: regular expression that defines expected prompt from the command. + :attr str input: user input that should be supplied to the command in response to the prompt. + """ + prompt_pattern: str + input: str + + +@dataclass +class CommandOptions: + """ + Options that control command execution. + + :attr list interactive_inputs: user inputs that should be interactively supplied to + the command during its' execution. + :attr int timeout: timeout for command execution (in seconds). + :attr bool check: controls whether to check return code of the command. Set to False to + ignore non-zero return codes. + """ + interactive_inputs: Optional[list[InteractiveInput]] = None + timeout: int = 30 + check: bool = True + + +@dataclass +class CommandResult: + """ + Represents a result of a command executed via shell. + """ + stdout: str + stderr: str + return_code: int + + +class Shell(ABC): + """ + Interface of a command shell on some system (local or remote). + """ + + @abstractmethod + def exec(self, command: str, options: Optional[CommandOptions] = None) -> CommandResult: + """ + Executes specified command on this shell. To execute interactive command, user inputs + should be specified in *options*. + + :param str command: command to execute on the shell. + :param CommandOptions options: options that control command execution. + :return command result. + """ + pass diff --git a/shell/local_shell.py b/shell/local_shell.py new file mode 100644 index 0000000..2345ede --- /dev/null +++ b/shell/local_shell.py @@ -0,0 +1,159 @@ +import logging +import subprocess +import tempfile +from datetime import datetime +from typing import IO, Optional + +import pexpect + +from reporter import get_reporter +from shell.interfaces import CommandOptions, CommandResult, Shell + + +logger = logging.getLogger("neofs.testlib.shell") +reporter = get_reporter() + + +class LocalShell(Shell): + def exec(self, command: str, options: Optional[CommandOptions] = None) -> CommandResult: + # If no options were provided, use default options + options = options or CommandOptions() + + logger.info(f"Executing command: {command}") + if options.interactive_inputs: + return self._exec_interactive(command, options) + return self._exec_non_interactive(command, options) + + def _exec_interactive(self, command: str, options: CommandOptions) -> CommandResult: + start_time = datetime.utcnow() + log_file = tempfile.TemporaryFile() # File is reliable cross-platform way to capture output + result = None + command_process = None + + try: + command_process = pexpect.spawn(command, timeout=options.timeout) + command_process.delaybeforesend = 1 + command_process.logfile_read = log_file + + for interactive_input in options.interactive_inputs: + command_process.expect(interactive_input.prompt_pattern) + command_process.sendline(interactive_input.input) + + result = self._get_pexpect_process_result(command_process, command) + if options.check and result.return_code != 0: + raise RuntimeError(f"Command: {command}\nreturn code: {result.return_code}\nOutput: {result.stdout}") + + return result + except pexpect.ExceptionPexpect as exc: + result = self._get_pexpect_process_result(command_process, command) + message = f"Command: {command}\nreturn code: {result.return_code}\nOutput: {result.stdout}" + if options.check: + raise RuntimeError(message) from exc + else: + logger.exception(message) + return result + except OSError as exc: + result = self._get_pexpect_process_result(command_process, command) + message = f"Command: {command}\nreturn code: {result.return_code}\nOutput: {exc.strerror}" + if options.check: + raise RuntimeError(message) from exc + else: + logger.exception(message) + return result + except Exception: + result = self._get_pexpect_process_result(command_process, command) + raise + finally: + log_file.close() + end_time = datetime.utcnow() + self._report_command_result(command, start_time, end_time, result) + + def _exec_non_interactive(self, command: str, options: CommandOptions) -> CommandResult: + start_time = datetime.utcnow() + result = None + + try: + command_process = subprocess.run( + command, + check=options.check, + universal_newlines=True, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + timeout=options.timeout, + shell=True + ) + + result = CommandResult( + stdout=command_process.stdout or "", + stderr=command_process.stderr or "", + return_code=command_process.returncode, + ) + return result + except subprocess.CalledProcessError as exc: + # TODO: always set check flag to false and capture command result normally + result = self._get_failing_command_result(command) + raise RuntimeError(f"Command: {command}\nError:\n" + f"return code: {exc.returncode}\n" + f"output: {exc.output}") from exc + except OSError as exc: + raise RuntimeError(f"Command: {command}\nOutput: {exc.strerror}") from exc + except Exception as exc: + result = self._get_failing_command_result(command) + raise + finally: + end_time = datetime.utcnow() + self._report_command_result(command, start_time, end_time, result) + + def _get_failing_command_result(self, command: str) -> CommandResult: + return_code, cmd_output = subprocess.getstatusoutput(command) + return CommandResult( + stdout=cmd_output, + stderr="", + return_code=return_code + ) + + def _get_pexpect_process_result(self, command_process: Optional[pexpect.spawn], + command: str) -> CommandResult: + """ + If command process is not None, captures output of this process. + If command process is None, then command fails when we attempt to start it, in this case + we use regular non-interactive process to get it's output. + """ + if command_process is None: + return self._get_failing_command_result(command) + + # Wait for child process to end it's work + if command_process.isalive(): + command_process.expect(pexpect.EOF) + + # Close the process to obtain the exit code + command_process.close() + return_code = command_process.exitstatus + + # Capture output from the log file + log_file: IO[bytes] = command_process.logfile_read + log_file.seek(0) + output = log_file.read().decode() + + return CommandResult(stdout=output, stderr="", return_code=return_code) + + def _report_command_result(self, command: str, start_time: datetime, end_time: datetime, + result: Optional[CommandResult]) -> None: + # TODO: increase logging level if return code is non 0, should be warning at least + logger.info( + f"Command: {command}\n" + f"{'Success:' if result and result.return_code == 0 else 'Error:'}\n" + f"return code: {result.return_code if result else ''} " + f"\nOutput: {result.stdout if result else ''}") + + if result: + elapsed_time = end_time - start_time + command_attachment = ( + f"COMMAND: {command}\n" + f"RETCODE: {result.return_code}\n\n" + f"STDOUT:\n{result.stdout}\n" + f"STDERR:\n{result.stderr}\n" + f"Start / End / Elapsed\t {start_time.time()} / {end_time.time()} / {elapsed_time}" + ) + with reporter.step(f"COMMAND: {command}"): + reporter.attach(command_attachment, "Command execution.txt") diff --git a/tests/helpers.py b/tests/helpers.py new file mode 100644 index 0000000..6035651 --- /dev/null +++ b/tests/helpers.py @@ -0,0 +1,9 @@ +import traceback + + +def format_error_details(error: Exception) -> str: + return "".join(traceback.format_exception( + etype=type(error), + value=error, + tb=error.__traceback__) + ) diff --git a/tests/test_local_shell_interactive.py b/tests/test_local_shell_interactive.py new file mode 100644 index 0000000..278d3b1 --- /dev/null +++ b/tests/test_local_shell_interactive.py @@ -0,0 +1,78 @@ +from unittest import TestCase + +from shell.interfaces import CommandOptions, InteractiveInput +from shell.local_shell import LocalShell +from tests.helpers import format_error_details + + +class TestLocalShellInteractive(TestCase): + @classmethod + def setUpClass(cls): + cls.shell = LocalShell() + + def test_command_with_one_prompt(self): + script = "password = input('Password: '); print(password)" + + inputs = [InteractiveInput(prompt_pattern="Password", input="test")] + result = self.shell.exec( + f"python -c \"{script}\"", + CommandOptions(interactive_inputs=inputs) + ) + + self.assertEqual(0, result.return_code) + self.assertOutputLines(["Password: test", "test"], result.stdout) + self.assertEqual("", result.stderr) + + def test_command_with_several_prompts(self): + script = ( + "input1 = input('Input1: '); print(input1); " + "input2 = input('Input2: '); print(input2)" + ) + inputs = [ + InteractiveInput(prompt_pattern="Input1", input="test1"), + InteractiveInput(prompt_pattern="Input2", input="test2"), + ] + + result = self.shell.exec( + f"python -c \"{script}\"", + CommandOptions(interactive_inputs=inputs) + ) + + self.assertEqual(0, result.return_code) + self.assertOutputLines(["Input1: test1", "test1", "Input2: test2", "test2"], result.stdout) + self.assertEqual("", result.stderr) + + def test_failed_command_with_check(self): + script = "invalid script" + inputs = [InteractiveInput(prompt_pattern=".*", input="test")] + + with self.assertRaises(RuntimeError) as exc: + self.shell.exec(f"python -c \"{script}\"", CommandOptions(interactive_inputs=inputs)) + + error = format_error_details(exc.exception) + self.assertIn("Error", error) + # TODO: it would be nice to have return code as well + # self.assertIn("return code: 1", error) + + def test_failed_command_without_check(self): + script = "invalid script" + inputs = [InteractiveInput(prompt_pattern=".*", input="test")] + + result = self.shell.exec( + f"python -c \"{script}\"", + CommandOptions(interactive_inputs=inputs, check=False), + ) + self.assertEqual(1, result.return_code) + + def test_non_existing_binary(self): + inputs = [InteractiveInput(prompt_pattern=".*", input="test")] + + with self.assertRaises(RuntimeError) as exc: + self.shell.exec("not-a-command", CommandOptions(interactive_inputs=inputs)) + + error = format_error_details(exc.exception) + self.assertIn("command was not found or was not executable", error) + + def assertOutputLines(self, expected_lines: list[str], output: str) -> None: + output_lines = [line.strip() for line in output.split("\n") if line.strip()] + self.assertEqual(expected_lines, output_lines) diff --git a/tests/test_local_shell_non_interactive.py b/tests/test_local_shell_non_interactive.py new file mode 100644 index 0000000..3fe04f5 --- /dev/null +++ b/tests/test_local_shell_non_interactive.py @@ -0,0 +1,46 @@ +from unittest import TestCase + +from shell.interfaces import CommandOptions +from shell.local_shell import LocalShell +from tests.helpers import format_error_details + + +class TestLocalShellNonInteractive(TestCase): + @classmethod + def setUpClass(cls): + cls.shell = LocalShell() + + def test_successful_command(self): + script = "print('test')" + + result = self.shell.exec(f"python -c \"{script}\"") + + self.assertEqual(0, result.return_code) + self.assertEqual("test", result.stdout.strip()) + self.assertEqual("", result.stderr) + + def test_failed_command_with_check(self): + script = "invalid script" + + with self.assertRaises(RuntimeError) as exc: + self.shell.exec(f"python -c \"{script}\"") + + error = format_error_details(exc.exception) + self.assertIn("Error", error) + self.assertIn("return code: 1", error) + + def test_failed_command_without_check(self): + script = "invalid script" + + result = self.shell.exec(f"python -c \"{script}\"", CommandOptions(check=False)) + + self.assertEqual(1, result.return_code) + self.assertIn("Error", result.stdout) + + def test_non_existing_binary(self): + with self.assertRaises(RuntimeError) as exc: + self.shell.exec(f"not-a-command") + + error = format_error_details(exc.exception) + self.assertIn("Error", error) + self.assertIn("return code: 127", error)