Switch failover test to hosting from testlib

Signed-off-by: Vladimir Domnich <v.domnich@yadro.com>
This commit is contained in:
Vladimir Domnich 2022-10-13 16:13:45 +00:00 committed by Vladimir
parent 92c034c10b
commit 48e53b3d86
20 changed files with 242 additions and 441 deletions

View file

@ -17,11 +17,11 @@ def get_local_binaries_versions(shell: Shell) -> dict[str, str]:
versions[binary] = _parse_version(out)
neofs_cli = NeofsCli(shell, NEOFS_CLI_EXEC, WALLET_CONFIG)
versions["neofs-cli"] = _parse_version(neofs_cli.version.get())
versions["neofs-cli"] = _parse_version(neofs_cli.version.get().stdout)
try:
neofs_adm = NeofsAdm(shell, NEOFS_ADM_EXEC)
versions["neofs-adm"] = _parse_version(neofs_adm.version.get())
versions["neofs-adm"] = _parse_version(neofs_adm.version.get().stdout)
except RuntimeError:
logger.info(f"neofs-adm not installed")
@ -35,24 +35,23 @@ def get_local_binaries_versions(shell: Shell) -> dict[str, str]:
def get_remote_binaries_versions(hosting: Hosting) -> dict[str, str]:
versions_by_host = {}
for host in hosting.hosts:
binaries = []
binary_path_by_name = {} # Maps binary name to executable path
for service_config in host.config.services:
exec_path = service_config.attributes.get("exec_path")
if exec_path:
binaries.append(exec_path)
binary_path_by_name[service_config.name] = exec_path
for cli_config in host.config.clis:
binaries.append(cli_config.exec_path)
binary_path_by_name[cli_config.name] = cli_config.exec_path
shell = host.get_shell()
versions_at_host = {}
for binary in binaries:
for binary_name, binary_path in binary_path_by_name.items():
try:
result = shell.exec(f"{binary} --version")
versions_at_host[service_config.name] = _parse_version(result.stdout)
result = shell.exec(f"{binary_path} --version")
versions_at_host[binary_name] = _parse_version(result.stdout)
except Exception as exc:
logger.error(f"Cannot get version for {exec_path} because of\n{exc}")
versions_at_host[service_config.name] = "Unknown"
continue
logger.error(f"Cannot get version for {binary_path} because of\n{exc}")
versions_at_host[binary_name] = "Unknown"
versions_by_host[host.config.address] = versions_at_host
# Consolidate versions across all hosts
@ -61,7 +60,9 @@ def get_remote_binaries_versions(hosting: Hosting) -> dict[str, str]:
for name, version in binary_versions.items():
captured_version = versions.get(name)
if captured_version:
assert captured_version == version, f"Binary {name} has inconsistent version on host {host}"
assert (
captured_version == version
), f"Binary {name} has inconsistent version on host {host}"
else:
versions[name] = version
return versions

View file

@ -1,16 +1,13 @@
from ssh_helper import HostClient
from neofs_testlib.shell import Shell
# TODO: convert to shell from hosting
class IpTablesHelper:
@staticmethod
def drop_input_traffic_to_port(client: HostClient, ports: list[str]):
def drop_input_traffic_to_port(shell: Shell, ports: list[str]) -> None:
for port in ports:
cmd_output = client.exec(cmd=f"sudo iptables -A INPUT -p tcp --dport {port} -j DROP")
assert cmd_output.rc == 0
shell.exec(f"sudo iptables -A INPUT -p tcp --dport {port} -j DROP")
@staticmethod
def restore_input_traffic_to_port(client: HostClient, ports: list[str]):
def restore_input_traffic_to_port(shell: Shell, ports: list[str]) -> None:
for port in ports:
cmd_output = client.exec(cmd=f"sudo iptables -D INPUT -p tcp --dport {port} -j DROP")
assert cmd_output.rc == 0
shell.exec(f"sudo iptables -D INPUT -p tcp --dport {port} -j DROP")

View file

@ -4,6 +4,10 @@ import uuid
from typing import Optional
import allure
# This file is broken, because tenacity is not registered in requirements.txt
# So, the file won't be fixed in scope of this PR, alipay will fix himself by
# switching RemoteProcess and K6 classes from HostClient to shell from hosting
from tenacity import retry, stop_after_attempt, wait_fixed
from pytest_tests.helpers.ssh_helper import HostClient

View file

@ -1,250 +0,0 @@
import logging
import socket
import tempfile
import textwrap
from contextlib import contextmanager
from dataclasses import dataclass
from datetime import datetime
from functools import wraps
from time import sleep
from typing import ClassVar, Optional
import allure
from paramiko import AutoAddPolicy, RSAKey, SFTPClient, SSHClient, SSHException, ssh_exception
from paramiko.ssh_exception import AuthenticationException
class HostIsNotAvailable(Exception):
"""Raises when host is not reachable."""
def __init__(self, ip: str = None, exc: Exception = None):
msg = f'Host is not available{f" by ip: {ip}" if ip else ""}'
if exc:
msg = f"{msg}. {exc}"
super().__init__(msg)
def log_command(func):
@wraps(func)
def wrapper(host: "HostClient", command: str, *args, **kwargs):
display_length = 60
short = command.removeprefix("$ProgressPreference='SilentlyContinue'\n")
short = short[:display_length]
short += "..." if short != command else ""
with allure.step(f"SSH: {short}"):
logging.info(f'Execute command "{command}" on "{host.ip}"')
start_time = datetime.utcnow()
cmd_result = func(host, command, *args, **kwargs)
end_time = datetime.utcnow()
log_message = (
f"HOST: {host.ip}\n"
f'COMMAND:\n{textwrap.indent(command, " ")}\n'
f"RC:\n {cmd_result.rc}\n"
f'STDOUT:\n{textwrap.indent(cmd_result.stdout, " ")}\n'
f'STDERR:\n{textwrap.indent(cmd_result.stderr, " ")}\n'
f"Start / End / Elapsed\t {start_time.time()} / {end_time.time()} / {end_time - start_time}"
)
logging.info(log_message)
allure.attach(log_message, "SSH command", allure.attachment_type.TEXT)
return cmd_result
return wrapper
@dataclass
class SSHCommand:
stdout: str
stderr: str
rc: int
class HostClient:
ssh_client: SSHClient
SSH_CONNECTION_ATTEMPTS: ClassVar[int] = 3
CONNECTION_TIMEOUT = 90
TIMEOUT_RESTORE_CONNECTION = 10, 24
def __init__(
self,
ip: str,
login: str,
password: Optional[str] = None,
private_key_path: Optional[str] = None,
private_key_passphrase: Optional[str] = None,
init_ssh_client=True,
) -> None:
self.ip = ip
self.login = login
self.password = password
self.private_key_path = private_key_path
self.private_key_passphrase = private_key_passphrase
if init_ssh_client:
self.create_connection(self.SSH_CONNECTION_ATTEMPTS)
def exec(self, cmd: str, verify=True, timeout=90) -> SSHCommand:
cmd_result = self._inner_exec(cmd, timeout)
if verify:
assert cmd_result.rc == 0, f'Non zero rc from command: "{cmd}"'
return cmd_result
@log_command
def exec_with_confirmation(
self, cmd: str, confirmation: list, verify=True, timeout=90
) -> SSHCommand:
ssh_stdin, ssh_stdout, ssh_stderr = self.ssh_client.exec_command(cmd, timeout=timeout)
for line in confirmation:
if not line.endswith("\n"):
line = f"{line}\n"
try:
ssh_stdin.write(line)
except OSError as err:
logging.error(f"Got error {err} executing command {cmd}")
ssh_stdin.close()
output = SSHCommand(
stdout=ssh_stdout.read().decode(errors="ignore"),
stderr=ssh_stderr.read().decode(errors="ignore"),
rc=ssh_stdout.channel.recv_exit_status(),
)
if verify:
debug_info = f"\nSTDOUT: {output.stdout}\nSTDERR: {output.stderr}\nRC: {output.rc}"
assert output.rc == 0, f'Non zero rc from command: "{cmd}"{debug_info}'
return output
@contextmanager
def as_user(self, user: str, password: str):
keep_user, keep_password = self.login, self.password
self.login, self.password = user, password
self.create_connection()
yield
self.login, self.password = keep_user, keep_password
self.create_connection()
@contextmanager
def create_ssh_connection(self) -> "SSHClient":
if not self.ssh_client:
self.create_connection()
try:
yield self.ssh_client
finally:
self.drop()
@allure.step("Restore connection")
def restore_ssh_connection(self):
retry_time, retry_count = self.TIMEOUT_RESTORE_CONNECTION
for _ in range(retry_count):
try:
self.create_connection()
except AssertionError:
logging.warning(f"Host: Cant reach host: {self.ip}.")
sleep(retry_time)
else:
logging.info(f"Host: Cant reach host: {self.ip}.")
return
raise AssertionError(f"Host: Cant reach host: {self.ip} after 240 seconds..")
@allure.step("Copy file {host_path_to_file} to local file {path_to_file}")
def copy_file_from_host(self, host_path_to_file: str, path_to_file: str):
with self._sftp_client() as sftp_client:
sftp_client.get(host_path_to_file, path_to_file)
def copy_file_to_host(self, path_to_file: str, host_path_to_file: str):
with allure.step(
f"Copy local file {path_to_file} to remote file {host_path_to_file} on host {self.ip}"
):
with self._sftp_client() as sftp_client:
sftp_client.put(path_to_file, host_path_to_file)
@allure.step("Save string to remote file {host_path_to_file}")
def copy_str_to_host_file(self, string: str, host_path_to_file: str):
with tempfile.NamedTemporaryFile(mode="r+") as temp:
temp.writelines(string)
temp.flush()
with self._sftp_client() as client:
client.put(temp.name, host_path_to_file)
self.exec(f"cat {host_path_to_file}", verify=False)
def create_connection(self, attempts=SSH_CONNECTION_ATTEMPTS):
exc_err = None
for attempt in range(attempts):
self.ssh_client = SSHClient()
self.ssh_client.set_missing_host_key_policy(AutoAddPolicy())
try:
if self.private_key_path:
logging.info(
f"Trying to connect to host {self.ip} as {self.login} using SSH key "
f"{self.private_key_path} (attempt {attempt})"
)
self.ssh_client.connect(
hostname=self.ip,
username=self.login,
pkey=RSAKey.from_private_key_file(
self.private_key_path, self.private_key_passphrase
),
timeout=self.CONNECTION_TIMEOUT,
)
else:
logging.info(
f"Trying to connect to host {self.ip} as {self.login} using password "
f"(attempt {attempt})"
)
self.ssh_client.connect(
hostname=self.ip,
username=self.login,
password=self.password,
timeout=self.CONNECTION_TIMEOUT,
)
return True
except AuthenticationException as auth_err:
logging.error(f"Host: {self.ip}. {auth_err}")
self.drop()
raise auth_err
except (
SSHException,
ssh_exception.NoValidConnectionsError,
AttributeError,
socket.timeout,
OSError,
) as ssh_err:
exc_err = ssh_err
self.drop()
logging.error(f"Host: {self.ip}, connection error. {exc_err}")
raise HostIsNotAvailable(self.ip, exc_err)
def drop(self):
self.ssh_client.close()
@log_command
def _inner_exec(self, cmd: str, timeout: int) -> SSHCommand:
if not self.ssh_client:
self.create_connection()
for _ in range(self.SSH_CONNECTION_ATTEMPTS):
try:
_, stdout, stderr = self.ssh_client.exec_command(cmd, timeout=timeout)
return SSHCommand(
stdout=stdout.read().decode(errors="ignore"),
stderr=stderr.read().decode(errors="ignore"),
rc=stdout.channel.recv_exit_status(),
)
except (
SSHException,
TimeoutError,
ssh_exception.NoValidConnectionsError,
ConnectionResetError,
AttributeError,
socket.timeout,
) as ssh_err:
logging.error(f"Host: {self.ip}, exec command error {ssh_err}")
self.create_connection()
raise HostIsNotAvailable(f"Host: {self.ip} is not reachable.")
@contextmanager
def _sftp_client(self) -> SFTPClient:
with self.ssh_client.open_sftp() as sftp:
yield sftp

View file

@ -56,7 +56,7 @@ pyparsing==3.0.9
pyrsistent==0.18.1
pytest==7.1.2
python-dateutil==2.8.2
requests==2.27.1
requests==2.28.0
robotframework==4.1.2
s3transfer==0.3.7
six==1.16.0

View file

@ -20,6 +20,7 @@ from common import (
S3_GATE_WALLET_PATH,
)
from data_formatters import get_wallet_public_key
from neofs_testlib.shell import Shell
from python_keywords.container import list_containers
from steps.aws_cli_client import AwsCliClient
@ -42,7 +43,7 @@ class TestS3GateBase:
@pytest.fixture(scope="class", autouse=True)
@allure.title("[Class/Autouse]: Create S3 client")
def s3_client(self, prepare_wallet_and_deposit, request):
def s3_client(self, prepare_wallet_and_deposit, client_shell: Shell, request):
wallet = prepare_wallet_and_deposit
s3_bearer_rules_file = f"{os.getcwd()}/robot/resources/files/s3_bearer_rules.json"
@ -53,7 +54,7 @@ class TestS3GateBase:
secret_access_key,
owner_private_key,
) = init_s3_credentials(wallet, s3_bearer_rules_file=s3_bearer_rules_file)
containers_list = list_containers(wallet)
containers_list = list_containers(wallet, shell=client_shell)
assert cid in containers_list, f"Expected cid {cid} in {containers_list}"
if request.param == "aws cli":

View file

@ -1,6 +1,5 @@
import allure
import pytest
from python_keywords.acl import EACLRole
from python_keywords.container import create_container
from python_keywords.container_access import (

View file

@ -3,6 +3,7 @@ import pytest
from common import NEOFS_NETMAP_DICT
from failover_utils import wait_object_replication_on_nodes
from neofs_testlib.hosting import Hosting
from neofs_testlib.shell import Shell
from python_keywords.acl import (
EACLAccess,
EACLOperation,
@ -28,7 +29,6 @@ from python_keywords.object_access import (
can_put_object,
can_search_object,
)
from neofs_testlib.shell import Shell
from wellknown_acl import PUBLIC_ACL
@ -195,7 +195,12 @@ class TestEACLContainer:
@allure.title("Testcase to validate NeoFS replication with eACL deny rules.")
def test_extended_acl_deny_replication(
self, wallets, client_shell, hosting: Hosting, eacl_full_placement_container_with_object, file_path
self,
wallets,
client_shell,
hosting: Hosting,
eacl_full_placement_container_with_object,
file_path,
):
user_wallet = wallets.get_wallet()
cid, oid, file_path = eacl_full_placement_container_with_object

View file

@ -17,6 +17,7 @@ from common import (
)
from env_properties import save_env_properties
from neofs_testlib.hosting import Hosting
from neofs_testlib.reporter import AllureHandler, get_reporter
from neofs_testlib.shell import LocalShell, Shell
from payment_neogo import neofs_deposit, transfer_mainnet_gas
from python_keywords.node_management import node_healthcheck
@ -25,10 +26,26 @@ logger = logging.getLogger("NeoLogger")
@pytest.fixture(scope="session")
def client_shell() -> Shell:
def configure_testlib():
get_reporter().register_handler(AllureHandler())
yield
@pytest.fixture(scope="session")
def client_shell(configure_testlib) -> Shell:
yield LocalShell()
@pytest.fixture(scope="session")
def hosting(configure_testlib) -> Hosting:
with open(HOSTING_CONFIG_FILE, "r") as file:
hosting_config = yaml.full_load(file)
hosting_instance = Hosting()
hosting_instance.configure(hosting_config)
yield hosting_instance
@pytest.fixture(scope="session")
def require_multiple_hosts(hosting: Hosting):
"""Fixture that is applied to tests that require that environment has multiple hosts."""
@ -37,16 +54,6 @@ def require_multiple_hosts(hosting: Hosting):
yield
@pytest.fixture(scope="session")
def hosting() -> Hosting:
with open(HOSTING_CONFIG_FILE, "r") as file:
hosting_config = yaml.full_load(file)
hosting_instance = Hosting()
hosting_instance.configure(hosting_config)
yield hosting_instance
@pytest.fixture(scope="session", autouse=True)
@allure.title("Check binary versions")
def check_binary_versions(request, hosting: Hosting, client_shell: Shell):

View file

@ -4,18 +4,12 @@ from time import sleep
import allure
import pytest
from common import (
STORAGE_NODE_SSH_PASSWORD,
STORAGE_NODE_SSH_PRIVATE_KEY_PATH,
STORAGE_NODE_SSH_USER,
)
from failover_utils import wait_all_storage_node_returned, wait_object_replication_on_nodes
from file_helper import generate_file, get_file_hash
from iptables_helper import IpTablesHelper
from neofs_testlib.hosting import Hosting
from python_keywords.container import create_container
from python_keywords.neofs_verbs import get_object, put_object
from ssh_helper import HostClient
from wellknown_acl import PUBLIC_ACL
logger = logging.getLogger("NeoLogger")
@ -31,16 +25,10 @@ def restore_network(hosting: Hosting):
yield
not_empty = len(blocked_hosts) != 0
for host in list(blocked_hosts):
with allure.step(f"Start storage node {host}"):
client = HostClient(
ip=host,
login=STORAGE_NODE_SSH_USER,
password=STORAGE_NODE_SSH_PASSWORD,
private_key_path=STORAGE_NODE_SSH_PRIVATE_KEY_PATH,
)
with client.create_ssh_connection():
IpTablesHelper.restore_input_traffic_to_port(client, PORTS_TO_BLOCK)
for host_address in list(blocked_hosts):
with allure.step(f"Restore network at host {host_address}"):
host = hosting.get_host_by_address(host_address)
IpTablesHelper.restore_input_traffic_to_port(host.get_shell(), PORTS_TO_BLOCK)
blocked_hosts.remove(host)
if not_empty:
wait_all_storage_node_returned(hosting)
@ -49,67 +37,65 @@ def restore_network(hosting: Hosting):
@allure.title("Block Storage node traffic")
@pytest.mark.failover
@pytest.mark.failover_net
def test_block_storage_node_traffic(prepare_wallet_and_deposit, client_shell, require_multiple_hosts):
def test_block_storage_node_traffic(
prepare_wallet_and_deposit, client_shell, require_multiple_hosts, hosting: Hosting
):
"""
Block storage nodes traffic using iptables and wait for replication for objects.
"""
wallet = prepare_wallet_and_deposit
placement_rule = "REP 2 IN X CBF 2 SELECT 2 FROM * AS X"
excluded_nodes = []
wakeup_node_timeout = 10 # timeout to let nodes detect that traffic has blocked
nodes_to_block_count = 2
source_file_path = generate_file()
cid = create_container(wallet, shell=client_shell, rule=placement_rule, basic_acl=PUBLIC_ACL)
oid = put_object(wallet, source_file_path, cid, shell=client_shell)
nodes = wait_object_replication_on_nodes(wallet, cid, oid, 2, shell=client_shell)
logger.info(f"Nodes are {nodes}")
random_nodes = [(node, node.split(":")[0]) for node in nodes]
if nodes_to_block_count > len(nodes):
random_nodes = [(node, node.split(":")[0]) for node in choices(nodes, k=2)]
# TODO: we need to refactor wait_object_replication_on_nodes so that it returns
# storage node names rather than endpoints
node_endpoints = wait_object_replication_on_nodes(wallet, cid, oid, 2, shell=client_shell)
for random_node, random_node_ip in random_nodes:
client = HostClient(
ip=random_node_ip,
login=STORAGE_NODE_SSH_USER,
password=STORAGE_NODE_SSH_PASSWORD,
private_key_path=STORAGE_NODE_SSH_PRIVATE_KEY_PATH,
)
logger.info(f"Nodes are {node_endpoints}")
node_endpoints_to_block = node_endpoints
if nodes_to_block_count > len(node_endpoints):
# TODO: the intent of this logic is not clear, need to revisit
node_endpoints_to_block = choices(node_endpoints, k=2)
with allure.step(f"Block incoming traffic for node {random_node} on port {PORTS_TO_BLOCK}"):
with client.create_ssh_connection():
IpTablesHelper.drop_input_traffic_to_port(client, PORTS_TO_BLOCK)
blocked_hosts.append(random_node_ip)
excluded_nodes.append(random_node)
excluded_nodes = []
for node_endpoint in node_endpoints_to_block:
host_address = node_endpoint.split(":")[0]
host = hosting.get_host_by_address(host_address)
with allure.step(f"Block incoming traffic at host {host_address} on port {PORTS_TO_BLOCK}"):
blocked_hosts.append(host_address)
excluded_nodes.append(node_endpoint)
IpTablesHelper.drop_input_traffic_to_port(host.get_shell(), PORTS_TO_BLOCK)
sleep(wakeup_node_timeout)
new_nodes = wait_object_replication_on_nodes(
wallet, cid, oid, 2, excluded_nodes=excluded_nodes
)
with allure.step(f"Check object is not stored on node {node_endpoint}"):
new_nodes = wait_object_replication_on_nodes(
wallet, cid, oid, 2, shell=client_shell, excluded_nodes=excluded_nodes
)
assert node_endpoint not in new_nodes
assert random_node not in new_nodes
with allure.step(f"Check object data is not corrupted"):
got_file_path = get_object(wallet, cid, oid, endpoint=new_nodes[0], shell=client_shell)
assert get_file_hash(source_file_path) == get_file_hash(got_file_path)
for node_endpoint in node_endpoints_to_block:
host_address = node_endpoint.split(":")[0]
host = hosting.get_host_by_address(host_address)
with allure.step(
f"Unblock incoming traffic at host {host_address} on port {PORTS_TO_BLOCK}"
):
IpTablesHelper.restore_input_traffic_to_port(host.get_shell(), PORTS_TO_BLOCK)
blocked_hosts.remove(host_address)
sleep(wakeup_node_timeout)
with allure.step(f"Check object data is not corrupted"):
new_nodes = wait_object_replication_on_nodes(wallet, cid, oid, 2, shell=client_shell)
got_file_path = get_object(wallet, cid, oid, shell=client_shell, endpoint=new_nodes[0])
assert get_file_hash(source_file_path) == get_file_hash(got_file_path)
for random_node, random_node_ip in random_nodes:
client = HostClient(
ip=random_node_ip,
login=STORAGE_NODE_SSH_USER,
password=STORAGE_NODE_SSH_PASSWORD,
private_key_path=STORAGE_NODE_SSH_PRIVATE_KEY_PATH,
)
with allure.step(
f"Unblock incoming traffic for node {random_node} on port {PORTS_TO_BLOCK}"
):
with client.create_ssh_connection():
IpTablesHelper.restore_input_traffic_to_port(client, PORTS_TO_BLOCK)
blocked_hosts.remove(random_node_ip)
sleep(wakeup_node_timeout)
new_nodes = wait_object_replication_on_nodes(wallet, cid, oid, 2, shell=client_shell)
got_file_path = get_object(wallet, cid, oid, shell=client_shell, endpoint=new_nodes[0])
assert get_file_hash(source_file_path) == get_file_hash(got_file_path)

View file

@ -2,17 +2,12 @@ import logging
import allure
import pytest
from common import (
STORAGE_NODE_SSH_PASSWORD,
STORAGE_NODE_SSH_PRIVATE_KEY_PATH,
STORAGE_NODE_SSH_USER,
)
from failover_utils import wait_all_storage_node_returned, wait_object_replication_on_nodes
from file_helper import generate_file, get_file_hash
from neofs_testlib.hosting import Hosting
from neofs_testlib.hosting import Host, Hosting
from neofs_testlib.shell import CommandOptions
from python_keywords.container import create_container
from python_keywords.neofs_verbs import get_object, put_object
from ssh_helper import HostClient
from wellknown_acl import PUBLIC_ACL
logger = logging.getLogger("NeoLogger")
@ -20,27 +15,21 @@ stopped_hosts = []
@pytest.fixture(autouse=True)
@allure.step("Return all storage nodes")
def return_all_storage_nodes_fixture(hosting: Hosting):
@allure.step("Return all stopped hosts")
def after_run_return_all_stopped_hosts(hosting: Hosting):
yield
return_all_storage_nodes(hosting)
return_stopped_hosts(hosting)
def panic_reboot_host(ip: str = None):
ssh = HostClient(
ip=ip,
login=STORAGE_NODE_SSH_USER,
password=STORAGE_NODE_SSH_PASSWORD,
private_key_path=STORAGE_NODE_SSH_PRIVATE_KEY_PATH,
)
ssh.exec('sudo sh -c "echo 1 > /proc/sys/kernel/sysrq"')
ssh_stdin, _, _ = ssh.ssh_client.exec_command(
'sudo sh -c "echo b > /proc/sysrq-trigger"', timeout=1
)
ssh_stdin.close()
def panic_reboot_host(host: Host) -> None:
shell = host.get_shell()
shell.exec('sudo sh -c "echo 1 > /proc/sys/kernel/sysrq"')
options = CommandOptions(close_stdin=True, timeout=1, check=False)
shell.exec('sudo sh -c "echo b > /proc/sysrq-trigger"', options)
def return_all_storage_nodes(hosting: Hosting) -> None:
def return_stopped_hosts(hosting: Hosting) -> None:
for host_address in list(stopped_hosts):
with allure.step(f"Start host {host_address}"):
host = hosting.get_host_by_address(host_address)
@ -50,10 +39,10 @@ def return_all_storage_nodes(hosting: Hosting) -> None:
wait_all_storage_node_returned(hosting)
@allure.title("Lost and returned nodes")
@allure.title("Lose and return storage node's host")
@pytest.mark.parametrize("hard_reboot", [True, False])
@pytest.mark.failover
def test_lost_storage_node(
def test_lose_storage_node_host(
prepare_wallet_and_deposit,
client_shell,
hosting: Hosting,
@ -65,35 +54,44 @@ def test_lost_storage_node(
source_file_path = generate_file()
cid = create_container(wallet, shell=client_shell, rule=placement_rule, basic_acl=PUBLIC_ACL)
oid = put_object(wallet, source_file_path, cid, shell=client_shell)
nodes = wait_object_replication_on_nodes(wallet, cid, oid, 2, shell=client_shell)
node_endpoints = wait_object_replication_on_nodes(wallet, cid, oid, 2, shell=client_shell)
new_nodes = []
for node in nodes:
host = hosting.get_host_by_service(node)
for node_endpoint in node_endpoints:
host_address = node_endpoint.split(":")[0]
host = hosting.get_host_by_address(host_address)
stopped_hosts.append(host.config.address)
with allure.step(f"Stop storage node {node}"):
with allure.step(f"Stop host {host_address}"):
host.stop_host("hard" if hard_reboot else "soft")
new_nodes = wait_object_replication_on_nodes(wallet, cid, oid, 2, shell=client_shell, excluded_nodes=[node])
assert not [node for node in nodes if node in new_nodes]
got_file_path = get_object(wallet, cid, oid, shell=client_shell, endpoint=new_nodes[0])
assert get_file_hash(source_file_path) == get_file_hash(got_file_path)
new_nodes = wait_object_replication_on_nodes(
wallet, cid, oid, 2, shell=client_shell, excluded_nodes=[node_endpoint]
)
assert all(old_node not in new_nodes for old_node in node_endpoints)
with allure.step(f"Return storage nodes"):
return_all_storage_nodes(hosting)
with allure.step("Check object data is not corrupted"):
got_file_path = get_object(wallet, cid, oid, endpoint=new_nodes[0])
assert get_file_hash(source_file_path) == get_file_hash(got_file_path)
new_nodes = wait_object_replication_on_nodes(wallet, cid, oid, 2, shell=client_shell)
with allure.step(f"Return all hosts"):
return_stopped_hosts(hosting)
got_file_path = get_object(wallet, cid, oid, shell=client_shell, endpoint=new_nodes[0])
assert get_file_hash(source_file_path) == get_file_hash(got_file_path)
with allure.step("Check object data is not corrupted"):
new_nodes = wait_object_replication_on_nodes(wallet, cid, oid, 2, shell=client_shell)
got_file_path = get_object(wallet, cid, oid, shell=client_shell, endpoint=new_nodes[0])
assert get_file_hash(source_file_path) == get_file_hash(got_file_path)
@allure.title("Panic storage node(s)")
@allure.title("Panic storage node's host")
@pytest.mark.parametrize("sequence", [True, False])
@pytest.mark.failover_panic
@pytest.mark.failover
def test_panic_storage_node(
prepare_wallet_and_deposit, client_shell, require_multiple_hosts, sequence: bool
def test_panic_storage_node_host(
prepare_wallet_and_deposit,
client_shell,
hosting: Hosting,
require_multiple_hosts,
sequence: bool,
):
wallet = prepare_wallet_and_deposit
placement_rule = "REP 2 IN X CBF 2 SELECT 2 FROM * AS X"
@ -101,16 +99,24 @@ def test_panic_storage_node(
cid = create_container(wallet, shell=client_shell, rule=placement_rule, basic_acl=PUBLIC_ACL)
oid = put_object(wallet, source_file_path, cid, shell=client_shell)
nodes = wait_object_replication_on_nodes(wallet, cid, oid, 2, shell=client_shell)
node_endpoints = wait_object_replication_on_nodes(wallet, cid, oid, 2, shell=client_shell)
allure.attach(
"\n".join(node_endpoints),
"Current nodes with object",
allure.attachment_type.TEXT,
)
new_nodes: list[str] = []
allure.attach("\n".join(nodes), "Current nodes with object", allure.attachment_type.TEXT)
for node in nodes:
with allure.step(f"Hard reboot host {node} via magic SysRq option"):
panic_reboot_host(ip=node.split(":")[-2])
for node_endpoint in node_endpoints:
host_address = node_endpoint.split(":")[0]
with allure.step(f"Hard reboot host {node_endpoint} via magic SysRq option"):
host = hosting.get_host_by_address(host_address)
panic_reboot_host(host)
if sequence:
try:
new_nodes = wait_object_replication_on_nodes(
wallet, cid, oid, 2, shell=client_shell, excluded_nodes=[node]
wallet, cid, oid, 2, shell=client_shell, excluded_nodes=[node_endpoint]
)
except AssertionError:
new_nodes = wait_object_replication_on_nodes(
@ -119,7 +125,7 @@ def test_panic_storage_node(
allure.attach(
"\n".join(new_nodes),
f"Nodes with object after {node} fail",
f"Nodes with object after {node_endpoint} fail",
allure.attachment_type.TEXT,
)

View file

@ -5,7 +5,6 @@ from typing import Optional
import allure
import pytest
from neofs_testlib.shell import Shell
from common import (
COMPLEX_OBJ_SIZE,
MORPH_BLOCK_TIME,
@ -19,6 +18,7 @@ from epoch import tick_epoch
from file_helper import generate_file
from grpc_responses import OBJECT_NOT_FOUND, error_matches_status
from neofs_testlib.hosting import Hosting
from neofs_testlib.shell import Shell
from python_keywords.container import create_container, get_container
from python_keywords.failover_utils import wait_object_replication_on_nodes
from python_keywords.neofs_verbs import delete_object, get_object, head_object, put_object
@ -121,7 +121,11 @@ def return_nodes(shell: Shell, hosting: Hosting, alive_node: Optional[str] = Non
@pytest.mark.add_nodes
@pytest.mark.node_mgmt
def test_add_nodes(
prepare_tmp_dir, client_shell, prepare_wallet_and_deposit, return_nodes_after_test_run, hosting: Hosting
prepare_tmp_dir,
client_shell,
prepare_wallet_and_deposit,
return_nodes_after_test_run,
hosting: Hosting,
):
wallet = prepare_wallet_and_deposit
placement_rule_3 = "REP 3 IN X CBF 1 SELECT 3 FROM * AS X"
@ -141,14 +145,18 @@ def test_add_nodes(
# Add node to recovery list before messing with it
check_nodes.append(additional_node)
exclude_node_from_network_map(hosting, additional_node, alive_node)
exclude_node_from_network_map(hosting, additional_node, alive_node, shell=client_shell)
delete_node_data(hosting, additional_node)
cid = create_container(wallet, rule=placement_rule_3, basic_acl=PUBLIC_ACL)
cid = create_container(wallet, rule=placement_rule_3, basic_acl=PUBLIC_ACL, shell=client_shell)
oid = put_object(
wallet, source_file_path, cid, endpoint=NEOFS_NETMAP_DICT[alive_node].get("rpc")
wallet,
source_file_path,
cid,
endpoint=NEOFS_NETMAP_DICT[alive_node].get("rpc"),
shell=client_shell,
)
wait_object_replication_on_nodes(wallet, cid, oid, 3)
wait_object_replication_on_nodes(wallet, cid, oid, 3, shell=client_shell)
return_nodes(shell=client_shell, hosting=hosting, alive_node=alive_node)
@ -156,16 +164,24 @@ def test_add_nodes(
random_node = choice(
[node for node in NEOFS_NETMAP_DICT if node not in (additional_node, alive_node)]
)
exclude_node_from_network_map(hosting, random_node, alive_node)
exclude_node_from_network_map(hosting, random_node, alive_node, shell=client_shell)
wait_object_replication_on_nodes(wallet, cid, oid, 3, excluded_nodes=[random_node])
wait_object_replication_on_nodes(
wallet, cid, oid, 3, excluded_nodes=[random_node], shell=client_shell
)
include_node_to_network_map(hosting, random_node, alive_node, shell=client_shell)
wait_object_replication_on_nodes(wallet, cid, oid, 3)
wait_object_replication_on_nodes(wallet, cid, oid, 3, shell=client_shell)
with allure.step("Check container could be created with new node"):
cid = create_container(wallet, rule=placement_rule_4, basic_acl=PUBLIC_ACL)
cid = create_container(
wallet, rule=placement_rule_4, basic_acl=PUBLIC_ACL, shell=client_shell
)
oid = put_object(
wallet, source_file_path, cid, endpoint=NEOFS_NETMAP_DICT[alive_node].get("rpc")
wallet,
source_file_path,
cid,
endpoint=NEOFS_NETMAP_DICT[alive_node].get("rpc"),
shell=client_shell,
)
wait_object_replication_on_nodes(wallet, cid, oid, 4, shell=client_shell)
@ -231,13 +247,15 @@ def test_nodes_management(prepare_tmp_dir, client_shell, hosting: Hosting):
)
@pytest.mark.node_mgmt
@allure.title("Test object copies based on placement policy")
def test_placement_policy(prepare_wallet_and_deposit, placement_rule, expected_copies):
def test_placement_policy(
prepare_wallet_and_deposit, placement_rule, expected_copies, client_shell: Shell
):
"""
This test checks object's copies based on container's placement policy.
"""
wallet = prepare_wallet_and_deposit
file_path = generate_file()
validate_object_copies(wallet, placement_rule, file_path, expected_copies)
validate_object_copies(wallet, placement_rule, file_path, expected_copies, shell=client_shell)
@pytest.mark.parametrize(
@ -288,7 +306,7 @@ def test_placement_policy(prepare_wallet_and_deposit, placement_rule, expected_c
@pytest.mark.node_mgmt
@allure.title("Test object copies and storage nodes based on placement policy")
def test_placement_policy_with_nodes(
prepare_wallet_and_deposit, placement_rule, expected_copies, nodes
prepare_wallet_and_deposit, placement_rule, expected_copies, nodes, client_shell: Shell
):
"""
Based on container's placement policy check that storage nodes are piked correctly and object has
@ -297,7 +315,7 @@ def test_placement_policy_with_nodes(
wallet = prepare_wallet_and_deposit
file_path = generate_file()
cid, oid, found_nodes = validate_object_copies(
wallet, placement_rule, file_path, expected_copies
wallet, placement_rule, file_path, expected_copies, shell=client_shell
)
expected_nodes = [NEOFS_NETMAP_DICT[node_name].get("rpc") for node_name in nodes]
assert set(found_nodes) == set(
@ -313,17 +331,21 @@ def test_placement_policy_with_nodes(
)
@pytest.mark.node_mgmt
@allure.title("Negative cases for placement policy")
def test_placement_policy_negative(prepare_wallet_and_deposit, placement_rule, expected_copies):
def test_placement_policy_negative(
prepare_wallet_and_deposit, placement_rule, expected_copies, client_shell: Shell
):
"""
Negative test for placement policy.
"""
wallet = prepare_wallet_and_deposit
file_path = generate_file()
with pytest.raises(RuntimeError, match=".*not enough nodes to SELECT from.*"):
validate_object_copies(wallet, placement_rule, file_path, expected_copies)
validate_object_copies(
wallet, placement_rule, file_path, expected_copies, shell=client_shell
)
@pytest.mark.skip(reason="We cover this scenario for Sbercloud in failover tests")
@pytest.mark.skip(reason="We cover this scenario in failover tests")
@pytest.mark.node_mgmt
@allure.title("NeoFS object replication on node failover")
def test_replication(prepare_wallet_and_deposit, after_run_start_all_nodes, hosting: Hosting):
@ -430,14 +452,18 @@ def test_shards(prepare_wallet_and_deposit, create_container_and_pick_node, host
@allure.step("Validate object has {expected_copies} copies")
def validate_object_copies(wallet: str, placement_rule: str, file_path: str, expected_copies: int):
cid = create_container(wallet, rule=placement_rule, basic_acl=PUBLIC_ACL)
got_policy = placement_policy_from_container(get_container(wallet, cid, json_mode=False))
def validate_object_copies(
wallet: str, placement_rule: str, file_path: str, expected_copies: int, shell: Shell
):
cid = create_container(wallet, rule=placement_rule, basic_acl=PUBLIC_ACL, shell=shell)
got_policy = placement_policy_from_container(
get_container(wallet, cid, json_mode=False, shell=shell)
)
assert got_policy == placement_rule.replace(
"'", ""
), f"Expected \n{placement_rule} and got policy \n{got_policy} are the same"
oid = put_object(wallet, file_path, cid)
nodes = get_nodes_with_object(wallet, cid, oid)
oid = put_object(wallet, file_path, cid, shell=shell)
nodes = get_nodes_with_object(wallet, cid, oid, shell=shell)
assert len(nodes) == expected_copies, f"Expected {expected_copies} copies, got {len(nodes)}"
return cid, oid, nodes

View file

@ -8,6 +8,7 @@ from container import create_container
from epoch import get_epoch, tick_epoch
from file_helper import generate_file, get_file_content, get_file_hash
from grpc_responses import OBJECT_ALREADY_REMOVED, OBJECT_NOT_FOUND, error_matches_status
from neofs_testlib.shell import Shell
from python_keywords.neofs_verbs import (
delete_object,
get_object,
@ -18,7 +19,6 @@ from python_keywords.neofs_verbs import (
search_object,
)
from python_keywords.storage_policy import get_complex_object_copies, get_simple_object_copies
from neofs_testlib.shell import Shell
from tombstone import verify_head_tombstone
from utility import wait_for_gc_pass_on_storage_nodes

View file

@ -9,6 +9,7 @@ from common import COMPLEX_OBJ_SIZE
from container import create_container
from epoch import get_epoch, tick_epoch
from file_helper import generate_file, get_file_hash
from neofs_testlib.shell import Shell
from python_keywords.http_gate import (
get_via_http_curl,
get_via_http_gate,
@ -17,7 +18,6 @@ from python_keywords.http_gate import (
upload_via_http_gate,
upload_via_http_gate_curl,
)
from neofs_testlib.shell import Shell
from python_keywords.neofs_verbs import get_object, put_object
from python_keywords.storage_policy import get_nodes_without_object
from utility import wait_for_gc_pass_on_storage_nodes
@ -225,7 +225,14 @@ class TestHttpGate:
oid_curl = upload_via_http_gate_curl(cid=cid, filepath=file_path, large_object=True)
self.get_object_and_verify_hashes(oid_gate, file_path, self.wallet, cid, shell=client_shell)
self.get_object_and_verify_hashes(oid_curl, file_path, self.wallet, cid, shell=client_shell, object_getter=get_via_http_curl)
self.get_object_and_verify_hashes(
oid_curl,
file_path,
self.wallet,
cid,
shell=client_shell,
object_getter=get_via_http_curl,
)
@pytest.mark.curl
@allure.title("Test Put/Get over HTTP using Curl utility")
@ -243,7 +250,14 @@ class TestHttpGate:
oid_large = upload_via_http_gate_curl(cid=cid, filepath=file_path_large)
for oid, file_path in ((oid_simple, file_path_simple), (oid_large, file_path_large)):
self.get_object_and_verify_hashes(oid, file_path, self.wallet, cid, shell=client_shell, object_getter=get_via_http_curl)
self.get_object_and_verify_hashes(
oid,
file_path,
self.wallet,
cid,
shell=client_shell,
object_getter=get_via_http_curl,
)
@staticmethod
@allure.step("Try to get object and expect error")

View file

@ -2,10 +2,10 @@ import random
import allure
import pytest
from neofs_testlib.shell import Shell
from common import COMPLEX_OBJ_SIZE, NEOFS_NETMAP_DICT, SIMPLE_OBJ_SIZE
from file_helper import generate_file
from grpc_responses import SESSION_NOT_FOUND
from neofs_testlib.shell import Shell
from payment_neogo import _address_from_wallet
from python_keywords.container import create_container
from python_keywords.neofs_verbs import (

View file

@ -129,7 +129,10 @@ def list_containers(wallet: str, shell: Shell) -> list[str]:
@allure.step("Get Container")
def get_container(
wallet: str, cid: str, shell: Shell, json_mode: bool = True,
wallet: str,
cid: str,
shell: Shell,
json_mode: bool = True,
) -> Union[dict, str]:
"""
A wrapper for `neofs-cli container get` call. It extracts container's
@ -162,9 +165,7 @@ def get_container(
@allure.step("Delete Container")
# TODO: make the error message about a non-found container more user-friendly
# https://github.com/nspcc-dev/neofs-contract/issues/121
def delete_container(
wallet: str, cid: str, shell: Shell, force: bool = False
) -> None:
def delete_container(wallet: str, cid: str, shell: Shell, force: bool = False) -> None:
"""
A wrapper for `neofs-cli container delete` call.
Args:

View file

@ -4,8 +4,8 @@ from typing import Optional
import allure
from common import NEOFS_NETMAP_DICT
from neofs_testlib.shell import Shell
from neofs_testlib.hosting import Hosting
from neofs_testlib.shell import Shell
from python_keywords.node_management import node_healthcheck
from storage_policy import get_nodes_with_object
@ -22,7 +22,7 @@ def wait_object_replication_on_nodes(
excluded_nodes: Optional[list[str]] = None,
) -> list[str]:
excluded_nodes = excluded_nodes or []
sleep_interval, attempts = 10, 18
sleep_interval, attempts = 15, 20
nodes = []
for __attempt in range(attempts):
nodes = get_nodes_with_object(wallet, cid, oid, shell=shell, skip_nodes=excluded_nodes)

View file

@ -16,8 +16,8 @@ from common import (
from data_formatters import get_wallet_public_key
from epoch import tick_epoch
from neofs_testlib.cli import NeofsCli
from neofs_testlib.shell import Shell
from neofs_testlib.hosting import Hosting
from neofs_testlib.shell import Shell
from utility import parse_time
logger = logging.getLogger("NeoLogger")
@ -165,7 +165,9 @@ def delete_node_data(hosting: Hosting, node_name: str) -> None:
@allure.step("Exclude node {node_to_exclude} from network map")
def exclude_node_from_network_map(hosting: Hosting, node_to_exclude: str, alive_node: str, shell: Shell) -> None:
def exclude_node_from_network_map(
hosting: Hosting, node_to_exclude: str, alive_node: str, shell: Shell
) -> None:
node_wallet_path = NEOFS_NETMAP_DICT[node_to_exclude]["wallet_path"]
node_netmap_key = get_wallet_public_key(node_wallet_path, STORAGE_WALLET_PASS)
@ -181,13 +183,15 @@ def exclude_node_from_network_map(hosting: Hosting, node_to_exclude: str, alive_
@allure.step("Include node {node_to_include} into network map")
def include_node_to_network_map(hosting: Hosting, node_to_include: str, alive_node: str, shell: Shell) -> None:
def include_node_to_network_map(
hosting: Hosting, node_to_include: str, alive_node: str, shell: Shell
) -> None:
node_set_status(hosting, node_to_include, status="online")
time.sleep(parse_time(MORPH_BLOCK_TIME))
tick_epoch()
check_node_in_map(node_to_include, alive_node, shell=shell)
check_node_in_map(node_to_include, shell, alive_node)
@allure.step("Check node {node_name} in network map")

View file

@ -30,7 +30,13 @@ def can_get_object(
with allure.step("Try get object from container"):
try:
got_file_path = get_object(
wallet, cid, oid, bearer_token=bearer, wallet_config=wallet_config, xhdr=xhdr, shell=shell,
wallet,
cid,
oid,
bearer_token=bearer,
wallet_config=wallet_config,
xhdr=xhdr,
shell=shell,
)
except OPERATION_ERROR_TYPE as err:
assert error_matches_status(

View file

@ -89,12 +89,6 @@ NEOFS_NETMAP_DICT = {
}
NEOFS_NETMAP = [node["rpc"] for node in NEOFS_NETMAP_DICT.values()]
# Parameters that control SSH connection to storage node
# TODO: we should use hosting instead
STORAGE_NODE_SSH_USER = os.getenv("STORAGE_NODE_SSH_USER")
STORAGE_NODE_SSH_PASSWORD = os.getenv("STORAGE_NODE_SSH_PASSWORD")
STORAGE_NODE_SSH_PRIVATE_KEY_PATH = os.getenv("STORAGE_NODE_SSH_PRIVATE_KEY_PATH")
# Paths to CLI executables on machine that runs tests
NEOGO_EXECUTABLE = os.getenv("NEOGO_EXECUTABLE", "neo-go")
NEOFS_CLI_EXEC = os.getenv("NEOFS_CLI_EXEC", "neofs-cli")