sync master #1

Merged
dansingjulia merged 4 commits from master into master 2023-01-16 07:04:18 +00:00
7 changed files with 80 additions and 61 deletions

1
.gitignore vendored
View file

@ -7,4 +7,5 @@
# ignore build artifacts
/dist
/build
*.egg-info

View file

@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
[project]
name = "neofs-testlib"
version = "0.8.0"
version = "0.9.0"
description = "Building blocks and utilities to facilitate development of automated tests for NeoFS system"
readme = "README.md"
authors = [{ name = "NSPCC", email = "info@nspcc.ru" }]
@ -48,7 +48,7 @@ line-length = 100
target-version = ["py39"]
[tool.bumpver]
current_version = "0.8.0"
current_version = "0.9.0"
version_pattern = "MAJOR.MINOR.PATCH"
commit_message = "Bump version {old_version} -> {new_version}"
commit = true

View file

@ -1 +1 @@
__version__ = "0.8.0"
__version__ = "0.9.0"

View file

@ -10,7 +10,7 @@ import docker
from requests import HTTPError
from neofs_testlib.hosting.config import ParsedAttributes
from neofs_testlib.hosting.interfaces import Host
from neofs_testlib.hosting.interfaces import DiskInfo, Host
from neofs_testlib.shell import LocalShell, Shell, SSHShell
from neofs_testlib.shell.command_inspectors import SudoInspector
@ -127,6 +127,15 @@ class DockerHost(Host):
cmd = f"rm -rf {volume_path}/meta*" if cache_only else f"rm -rf {volume_path}/*"
shell.exec(cmd)
def attach_disk(self, device: str, disk_info: DiskInfo) -> None:
raise NotImplementedError("Not supported for docker")
def detach_disk(self, device: str) -> DiskInfo:
raise NotImplementedError("Not supported for docker")
def is_disk_attached(self, device: str, disk_info: DiskInfo) -> bool:
raise NotImplementedError("Not supported for docker")
def dump_logs(
self,
directory_path: str,

View file

@ -1,11 +1,15 @@
from abc import ABC, abstractmethod
from datetime import datetime
from typing import Optional
from typing import Any, Optional
from neofs_testlib.hosting.config import CLIConfig, HostConfig, ServiceConfig
from neofs_testlib.shell.interfaces import Shell
class DiskInfo(dict):
"""Dict wrapper for disk_info for disk management commands."""
class Host(ABC):
"""Interface of a host machine where neoFS services are running.
@ -109,6 +113,40 @@ class Host(ABC):
cache_only: To delete cache only.
"""
@abstractmethod
def detach_disk(self, device: str) -> DiskInfo:
"""Detaches disk device to simulate disk offline/failover scenario.
Args:
device: Device name to detach
Returns:
internal service disk info related to host plugin (i.e. volume id for cloud devices),
which may be used to identify or re-attach existing volume back
"""
@abstractmethod
def attach_disk(self, device: str, disk_info: DiskInfo) -> None:
"""Attaches disk device back.
Args:
device: Device name to attach
service_info: any info required for host plugin to identify/attach disk
"""
@abstractmethod
def is_disk_attached(self, device: str, disk_info: DiskInfo) -> bool:
"""Checks if disk device is attached.
Args:
device: Device name to check
service_info: any info required for host plugin to identify disk
Returns:
True if attached
False if detached
"""
@abstractmethod
def dump_logs(
self,

View file

@ -35,53 +35,35 @@ class LocalShell(Shell):
def _exec_interactive(self, command: str, options: CommandOptions) -> CommandResult:
start_time = datetime.utcnow()
log_file = tempfile.TemporaryFile() # File is reliable cross-platform way to capture output
result = None
command_process = None
try:
command_process = pexpect.spawn(command, timeout=options.timeout)
except (pexpect.ExceptionPexpect, OSError) as exc:
raise RuntimeError(f"Command: {command}") from exc
command_process.delaybeforesend = 1
command_process.logfile_read = log_file
try:
for interactive_input in options.interactive_inputs:
command_process.expect(interactive_input.prompt_pattern)
command_process.sendline(interactive_input.input)
result = self._get_pexpect_process_result(command_process, command)
if options.check and result.return_code != 0:
raise RuntimeError(
f"Command: {command}\nreturn code: {result.return_code}\nOutput: {result.stdout}"
)
return result
except pexpect.ExceptionPexpect as exc:
result = self._get_pexpect_process_result(command_process, command)
message = (
f"Command: {command}\nreturn code: {result.return_code}\nOutput: {result.stdout}"
)
except (pexpect.ExceptionPexpect, OSError) as exc:
if options.check:
raise RuntimeError(message) from exc
else:
logger.exception(message)
return result
except OSError as exc:
result = self._get_pexpect_process_result(command_process, command)
message = (
f"Command: {command}\nreturn code: {result.return_code}\nOutput: {exc.strerror}"
)
if options.check:
raise RuntimeError(message) from exc
else:
logger.exception(message)
return result
except Exception:
result = self._get_pexpect_process_result(command_process, command)
raise
raise RuntimeError(f"Command: {command}") from exc
finally:
result = self._get_pexpect_process_result(command_process)
log_file.close()
end_time = datetime.utcnow()
self._report_command_result(command, start_time, end_time, result)
if options.check and result.return_code != 0:
raise RuntimeError(
f"Command: {command}\nreturn code: {result.return_code}\n"
f"Output: {result.stdout}"
)
return result
def _exec_non_interactive(self, command: str, options: CommandOptions) -> CommandResult:
start_time = datetime.utcnow()
result = None
@ -99,13 +81,16 @@ class LocalShell(Shell):
result = CommandResult(
stdout=command_process.stdout or "",
stderr=command_process.stderr or "",
stderr="",
return_code=command_process.returncode,
)
return result
except subprocess.CalledProcessError as exc:
# TODO: always set check flag to false and capture command result normally
result = self._get_failing_command_result(command)
result = CommandResult(
stdout=exc.stdout or "",
stderr="",
return_code=exc.returncode,
)
raise RuntimeError(
f"Command: {command}\nError:\n"
f"return code: {exc.returncode}\n"
@ -113,29 +98,15 @@ class LocalShell(Shell):
) from exc
except OSError as exc:
raise RuntimeError(f"Command: {command}\nOutput: {exc.strerror}") from exc
except Exception as exc:
result = self._get_failing_command_result(command)
raise
finally:
end_time = datetime.utcnow()
self._report_command_result(command, start_time, end_time, result)
return result
def _get_failing_command_result(self, command: str) -> CommandResult:
return_code, cmd_output = subprocess.getstatusoutput(command)
return CommandResult(stdout=cmd_output, stderr="", return_code=return_code)
def _get_pexpect_process_result(
self, command_process: Optional[pexpect.spawn], command: str
) -> CommandResult:
"""Captures output of the process.
If command process is not None, captures output of this process.
If command process is None, then command fails when we attempt to start it, in this case
we use regular non-interactive process to get it's output.
def _get_pexpect_process_result(self, command_process: pexpect.spawn) -> CommandResult:
"""
Captures output of the process.
"""
if command_process is None:
return self._get_failing_command_result(command)
# Wait for child process to end it's work
if command_process.isalive():
command_process.expect(pexpect.EOF)

View file

@ -72,7 +72,7 @@ class TestLocalShellInteractive(TestCase):
self.shell.exec("not-a-command", CommandOptions(interactive_inputs=inputs))
error = format_error_details(exc.exception)
self.assertIn("return code: 127", error)
self.assertIn("The command was not found", error)
class TestLocalShellNonInteractive(TestCase):