From bfd02531ef379de7d01a423b7b5594b10af3ea16 Mon Sep 17 00:00:00 2001 From: Vladimir Domnich Date: Sun, 9 Oct 2022 20:01:59 +0000 Subject: [PATCH] Integrate with hosting from testlib Replace service_helper with hosting class from the testlib. Instead of invoking commands on remote via ssh_helper, we now use shell from the hosting. Signed-off-by: Vladimir Domnich --- .devenv.hosting.yaml | 84 +++++ pytest_tests/helpers/binary_version_helper.py | 72 ++++ pytest_tests/helpers/iptables_helper.py | 1 + pytest_tests/helpers/sbercloud_helper.py | 237 ------------ pytest_tests/helpers/service_helper.py | 342 ------------------ pytest_tests/requirements.txt | 2 +- pytest_tests/testsuites/acl/test_eacl.py | 5 +- pytest_tests/testsuites/conftest.py | 69 ++-- .../failovers/test_failover_network.py | 5 +- .../failovers/test_failover_storage.py | 43 +-- .../network/test_node_management.py | 85 ++--- .../testsuites/services/test_binaries.py | 8 +- .../lib/python_keywords/failover_utils.py | 9 +- .../lib/python_keywords/node_management.py | 83 +++-- robot/variables/common.py | 28 +- 15 files changed, 324 insertions(+), 749 deletions(-) create mode 100644 .devenv.hosting.yaml create mode 100644 pytest_tests/helpers/binary_version_helper.py delete mode 100644 pytest_tests/helpers/sbercloud_helper.py delete mode 100644 pytest_tests/helpers/service_helper.py diff --git a/.devenv.hosting.yaml b/.devenv.hosting.yaml new file mode 100644 index 0000000..33a7614 --- /dev/null +++ b/.devenv.hosting.yaml @@ -0,0 +1,84 @@ +hosts: +- address: localhost + plugin_name: docker + services: + - name: s01 + attributes: + container_name: s01 + config_path: ../neofs-dev-env/services/storage/.storage.env + wallet_path: ../neofs-dev-env/services/storage/wallet01.json + wallet_password: "" + volume_name: storage_storage_s01 + rpc_endpoint: s01.neofs.devenv:8080 + control_endpoint: s01.neofs.devenv:8081 + un_locode: "RU MOW" + - name: s02 + attributes: + container_name: s02 + config_path: ../neofs-dev-env/services/storage/.storage.env + wallet_path: ../neofs-dev-env/services/storage/wallet02.json + wallet_password: "" + volume_name: storage_storage_s02 + rpc_endpoint: s02.neofs.devenv:8080 + control_endpoint: s02.neofs.devenv:8081 + un_locode: "RU LED" + - name: s03 + attributes: + container_name: s03 + config_path: ../neofs-dev-env/services/storage/.storage.env + wallet_path: ../neofs-dev-env/services/storage/wallet03.json + wallet_password: "" + volume_name: storage_storage_s03 + rpc_endpoint: s03.neofs.devenv:8080 + control_endpoint: s03.neofs.devenv:8081 + un_locode: "SE STO" + - name: s04 + attributes: + container_name: s04 + config_path: ../neofs-dev-env/services/storage/.storage.env + wallet_path: ../neofs-dev-env/services/storage/wallet04.json + wallet_password: "" + volume_name: storage_storage_s04 + rpc_endpoint: s04.neofs.devenv:8080 + control_endpoint: s04.neofs.devenv:8081 + un_locode: "FI HEL" + - name: s3-gate01 + attributes: + container_name: s3_gate + config_path: ../neofs-dev-env/services/s3_gate/.s3.env + wallet_path: ../neofs-dev-env/services/s3_gate/wallet.json + wallet_password: "s3" + endpoint: https://s3.neofs.devenv:8080 + - name: http-gate01 + attributes: + container_name: http_gate + config_path: ../neofs-dev-env/services/http_gate/.http.env + wallet_path: ../neofs-dev-env/services/http_gate/wallet.json + wallet_password: "one" + endpoint: http://http.neofs.devenv + - name: ir01 + attributes: + container_name: ir01 + config_path: ../neofs-dev-env/services/ir/.ir.env + wallet_path: ../neofs-dev-env/services/ir/az.json + wallet_password: "one" + - name: morph-chain01 + attributes: + container_name: morph_chain + config_path: ../neofs-dev-env/services/morph_chain/protocol.privnet.yml + wallet_path: ../neofs-dev-env/services/morph_chain/node-wallet.json + wallet_password: "one" + endpoint: http://morph-chain.neofs.devenv:30333 + - name: main-chain01 + attributes: + container_name: main_chain + config_path: ../neofs-dev-env/services/chain/protocol.privnet.yml + wallet_path: ../neofs-dev-env/services/chain/node-wallet.json + wallet_password: "one" + endpoint: http://main-chain.neofs.devenv:30333 + - name: coredns01 + attributes: + container_name: coredns + clis: + - name: neofs-cli + exec_path: neofs-cli diff --git a/pytest_tests/helpers/binary_version_helper.py b/pytest_tests/helpers/binary_version_helper.py new file mode 100644 index 0000000..4076b89 --- /dev/null +++ b/pytest_tests/helpers/binary_version_helper.py @@ -0,0 +1,72 @@ +import logging +import re + +from common import NEOFS_ADM_EXEC, NEOFS_CLI_EXEC, WALLET_CONFIG +from neofs_testlib.cli import NeofsAdm, NeofsCli +from neofs_testlib.hosting import Hosting +from neofs_testlib.shell import Shell + +logger = logging.getLogger("NeoLogger") + + +def get_local_binaries_versions(shell: Shell) -> dict[str, str]: + versions = {} + + for binary in ["neo-go", "neofs-authmate"]: + out = shell.exec(f"{binary} --version").stdout + versions[binary] = _parse_version(out) + + neofs_cli = NeofsCli(shell, NEOFS_CLI_EXEC, WALLET_CONFIG) + versions["neofs-cli"] = _parse_version(neofs_cli.version.get()) + + try: + neofs_adm = NeofsAdm(shell, NEOFS_ADM_EXEC) + versions["neofs-adm"] = _parse_version(neofs_adm.version.get()) + except RuntimeError: + logger.info(f"neofs-adm not installed") + + out = shell.exec("aws --version").stdout + out_lines = out.split("\n") + versions["AWS"] = out_lines[0] if out_lines else "Unknown" + + return versions + + +def get_remote_binaries_versions(hosting: Hosting) -> dict[str, str]: + versions_by_host = {} + for host in hosting.hosts: + binaries = [] + for service_config in host.config.services: + exec_path = service_config.attributes.get("exec_path") + if exec_path: + binaries.append(exec_path) + for cli_config in host.config.clis: + binaries.append(cli_config.exec_path) + + shell = host.get_shell() + versions_at_host = {} + for binary in binaries: + try: + result = shell.exec(f"{binary} --version") + versions_at_host[service_config.name] = _parse_version(result.stdout) + except Exception as exc: + logger.error(f"Cannot get version for {exec_path} because of\n{exc}") + versions_at_host[service_config.name] = "Unknown" + continue + versions_by_host[host.config.address] = versions_at_host + + # Consolidate versions across all hosts + versions = {} + for host, binary_versions in versions_by_host.items(): + for name, version in binary_versions.items(): + captured_version = versions.get(name) + if captured_version: + assert captured_version == version, f"Binary {name} has inconsistent version on host {host}" + else: + versions[name] = version + return versions + + +def _parse_version(version_output: str) -> str: + version = re.search(r"version[:\s]*v?(.+)", version_output, re.IGNORECASE) + return version.group(1).strip() if version else "Unknown" diff --git a/pytest_tests/helpers/iptables_helper.py b/pytest_tests/helpers/iptables_helper.py index dc0a257..338f034 100644 --- a/pytest_tests/helpers/iptables_helper.py +++ b/pytest_tests/helpers/iptables_helper.py @@ -1,6 +1,7 @@ from ssh_helper import HostClient +# TODO: convert to shell from hosting class IpTablesHelper: @staticmethod def drop_input_traffic_to_port(client: HostClient, ports: list[str]): diff --git a/pytest_tests/helpers/sbercloud_helper.py b/pytest_tests/helpers/sbercloud_helper.py deleted file mode 100644 index 3e465ea..0000000 --- a/pytest_tests/helpers/sbercloud_helper.py +++ /dev/null @@ -1,237 +0,0 @@ -import binascii -import hashlib -import hmac -import json -import os -from dataclasses import dataclass -from datetime import datetime -from typing import Optional -from urllib.parse import quote, unquote - -import requests -import yaml - - -@dataclass -class SberCloudConfig: - access_key_id: Optional[str] = None - secret_key: Optional[str] = None - ecs_endpoint: Optional[str] = None - project_id: Optional[str] = None - - @staticmethod - def from_dict(config_dict: dict) -> "SberCloudConfig": - return SberCloudConfig(**config_dict) - - @staticmethod - def from_yaml(config_path: str) -> "SberCloudConfig": - with open(config_path) as file: - config_dict = yaml.load(file, Loader=yaml.FullLoader) - return SberCloudConfig.from_dict(config_dict["sbercloud"]) - - @staticmethod - def from_env() -> "SberCloudConfig": - config_dict = { - "access_key_id": os.getenv("SBERCLOUD_ACCESS_KEY_ID"), - "secret_key": os.getenv("SBERCLOUD_SECRET_KEY"), - "ecs_endpoint": os.getenv("SBERCLOUD_ECS_ENDPOINT"), - "project_id": os.getenv("SBERCLOUD_PROJECT_ID"), - } - return SberCloudConfig.from_dict(config_dict) - - -class SberCloudAuthRequests: - """ - Implements authentication mechanism with access key+secret key in accordance with: - https://support.hc.sbercloud.ru/devg/apisign/api-sign-algorithm.html - - endpoint - represents endpoint of a specific service (listed at https://support.hc.sbercloud.ru/en-us/endpoint/index.html) - base_path - is prefix for all request path's that will be sent via this instance. - """ - - ENCODING = "utf-8" - ALGORITHM = "SDK-HMAC-SHA256" - TIMESTAMP_FORMAT = "%Y%m%dT%H%M%SZ" - - def __init__( - self, endpoint: str, access_key_id: str, secret_key: str, base_path: str = "" - ) -> None: - self.endpoint = endpoint - self.base_path = base_path - self.access_key_id = access_key_id - self.secret_key = secret_key - - def get(self, path: str, query: Optional[dict] = None) -> requests.Response: - return self._send_request("GET", path, query, data=None) - - def post( - self, path: str, query: Optional[dict] = None, data: Optional[dict] = None - ) -> requests.Response: - return self._send_request("POST", path, query, data) - - def _send_request( - self, method: str, path: str, query: Optional[dict], data: Optional[dict] - ) -> requests.Response: - if self.base_path: - path = self.base_path + path - - timestamp = datetime.strftime(datetime.utcnow(), self.TIMESTAMP_FORMAT) - headers = self._build_original_headers(timestamp) - - content = "" - if data: - # At the moment we support json content only - content = json.dumps(data) - headers["Content-Type"] = "application/json" - body = content.encode(self.ENCODING) - - signed_headers = self._build_signed_headers(headers) - canonical_request = self._build_canonical_request( - method, path, query, body, headers, signed_headers - ) - signature = self._build_signature(timestamp, canonical_request) - headers["Authorization"] = self._build_authorization_header(signature, signed_headers) - - query_string = "?" + self._build_canonical_query_string(query) if query else "" - url = f"https://{self.endpoint}{path}{query_string}" - - response = requests.request(method, url, headers=headers, data=body) - if response.status_code < 200 or response.status_code >= 300: - raise AssertionError( - f"Request to url={url} failed: status={response.status_code} " - f"response={response.text})" - ) - return response - - def _build_original_headers(self, timestamp: str) -> dict[str, str]: - return { - "X-Sdk-Date": timestamp, - "host": self.endpoint, - } - - def _build_signed_headers(self, headers: dict[str, str]) -> list[str]: - return sorted(header_name.lower() for header_name in headers) - - def _build_canonical_request( - self, - method: str, - path: str, - query: Optional[dict], - body: bytes, - headers: dict[str, str], - signed_headers: list[str], - ) -> str: - canonical_headers = self._build_canonical_headers(headers, signed_headers) - body_hash = self._calc_sha256_hash(body) - canonical_url = self._build_canonical_url(path) - canonical_query_string = self._build_canonical_query_string(query) - - return "\n".join( - [ - method.upper(), - canonical_url, - canonical_query_string, - canonical_headers, - ";".join(signed_headers), - body_hash, - ] - ) - - def _build_canonical_headers(self, headers: dict[str, str], signed_headers: list[str]) -> str: - normalized_headers = {} - for key, value in headers.items(): - normalized_key = key.lower() - normalized_value = value.strip() - normalized_headers[normalized_key] = normalized_value - # Re-encode header in request itself (iso-8859-1 comes from HTTP 1.1 standard) - headers[key] = normalized_value.encode(self.ENCODING).decode("iso-8859-1") - - # Join headers in the same order as they are sorted in signed_headers list - joined_headers = "\n".join(f"{key}:{normalized_headers[key]}" for key in signed_headers) - return joined_headers + "\n" - - def _calc_sha256_hash(self, value: bytes) -> str: - sha256 = hashlib.sha256() - sha256.update(value) - return sha256.hexdigest() - - def _build_canonical_url(self, path: str) -> str: - path_parts = unquote(path).split("/") - canonical_url = "/".join(quote(path_part) for path_part in path_parts) - - if not canonical_url.endswith("/"): - canonical_url += "/" - return canonical_url - - def _build_canonical_query_string(self, query: Optional[dict]) -> str: - if not query: - return "" - - key_value_pairs = [] - for key in sorted(query.keys()): - # NOTE: we do not support list values, as they are not used in API at the moment - encoded_key = quote(key) - encoded_value = quote(str(query[key])) - key_value_pairs.append(f"{encoded_key}={encoded_value}") - return "&".join(key_value_pairs) - - def _build_signature(self, timestamp: str, canonical_request: str) -> str: - canonical_request_hash = self._calc_sha256_hash(canonical_request.encode(self.ENCODING)) - string_to_sign = f"{self.ALGORITHM}\n{timestamp}\n{canonical_request_hash}" - - hmac_digest = hmac.new( - key=self.secret_key.encode(self.ENCODING), - msg=string_to_sign.encode(self.ENCODING), - digestmod=hashlib.sha256, - ).digest() - signature = binascii.hexlify(hmac_digest).decode() - - return signature - - def _build_authorization_header(self, signature: str, signed_headers: list[str]) -> str: - joined_signed_headers = ";".join(signed_headers) - return f"{self.ALGORITHM} Access={self.access_key_id}, SignedHeaders={joined_signed_headers}, Signature={signature}" - - -class SberCloud: - """ - Manages resources in Sbercloud via API. - - API reference: - https://docs.sbercloud.ru/terraform/ug/topics/quickstart.html - https://support.hc.sbercloud.ru/en-us/api/ecs/en-us_topic_0020212668.html - """ - - def __init__(self, config: SberCloudConfig) -> None: - self.ecs_requests = SberCloudAuthRequests( - endpoint=config.ecs_endpoint, - base_path=f"/v1/{config.project_id}/cloudservers", - access_key_id=config.access_key_id, - secret_key=config.secret_key, - ) - self.ecs_node_by_ip = {} # Cached list of ecs servers - - def find_ecs_node_by_ip(self, ip: str) -> str: - if ip not in self.ecs_node_by_ip: - self.ecs_node_by_ip[ip] = self.get_ecs_node_id(ip) - assert ip in self.ecs_node_by_ip - return self.ecs_node_by_ip[ip] - - def get_ecs_node_id(self, ip: str) -> str: - response = self.ecs_requests.get("/detail", {"ip": ip}).json() - return response["servers"][0]["id"] - - def start_node(self, node_id: Optional[str] = None, node_ip: Optional[str] = None) -> None: - data = {"os-start": {"servers": [{"id": node_id or self.find_ecs_node_by_ip(node_ip)}]}} - self.ecs_requests.post("/action", data=data) - - def stop_node( - self, node_id: Optional[str] = None, node_ip: Optional[str] = None, hard: bool = False - ) -> None: - data = { - "os-stop": { - "type": "HARD" if hard else "SOFT", - "servers": [{"id": node_id or self.find_ecs_node_by_ip(node_ip)}], - } - } - self.ecs_requests.post("/action", data=data) diff --git a/pytest_tests/helpers/service_helper.py b/pytest_tests/helpers/service_helper.py deleted file mode 100644 index 6df5a62..0000000 --- a/pytest_tests/helpers/service_helper.py +++ /dev/null @@ -1,342 +0,0 @@ -import json -import logging -import os -import re -import time -from contextlib import contextmanager -from datetime import datetime -from typing import Optional - -import docker -from cli_helpers import _cmd_run -from common import ( - INFRASTRUCTURE_TYPE, - NEOFS_CLI_EXEC, - NEOFS_NETMAP_DICT, - STORAGE_NODE_BIN_PATH, - STORAGE_NODE_SSH_PASSWORD, - STORAGE_NODE_SSH_PRIVATE_KEY_PATH, - STORAGE_NODE_SSH_USER, - WALLET_CONFIG, -) -from requests import HTTPError -from ssh_helper import HostClient - -logger = logging.getLogger("NeoLogger") - - -class LocalDevEnvStorageServiceHelper: - """ - Manages storage services running on local devenv. - """ - - # Names of all containers that are running neofs code - ALL_CONTAINERS = [ - "s3_gate", - "http_gate", - "s03", - "s01", - "s02", - "s04", - "ir01", - "morph_chain", - "main_chain", - ] - - def stop_node(self, node_name: str, wait: bool = True) -> None: - container_name = _get_storage_container_name(node_name) - client = self._get_docker_client(node_name) - client.stop(container_name) - - if wait: - self._wait_for_container_to_be_in_state(node_name, container_name, "exited") - - def start_node(self, node_name: str, wait: bool = True) -> None: - container_name = _get_storage_container_name(node_name) - client = self._get_docker_client(node_name) - client.start(container_name) - - if wait: - self._wait_for_container_to_be_in_state(node_name, container_name, "running") - - def run_control_command(self, node_name: str, command: str) -> str: - control_endpoint = NEOFS_NETMAP_DICT[node_name]["control"] - wallet_path = NEOFS_NETMAP_DICT[node_name]["wallet_path"] - - cmd = ( - f"{NEOFS_CLI_EXEC} {command} --endpoint {control_endpoint} " - f"--wallet {wallet_path} --config {WALLET_CONFIG}" - ) - output = _cmd_run(cmd) - return output - - def delete_node_data(self, node_name: str) -> None: - volume_name = _get_storage_volume_name(node_name) - - client = self._get_docker_client(node_name) - volume_info = client.inspect_volume(volume_name) - volume_path = volume_info["Mountpoint"] - - _cmd_run(f"rm -rf {volume_path}/*") - - def get_binaries_version(self) -> dict: - return {} - - def dump_logs( - self, directory_path: str, since: Optional[datetime], until: Optional[datetime] - ) -> None: - # All containers are running on the same host, so we can use 1st node to collect all logs - first_node_name = next(iter(NEOFS_NETMAP_DICT)) - client = self._get_docker_client(first_node_name) - - for container_name in self.ALL_CONTAINERS: - try: - logs = client.logs(container_name, since=since, until=until) - except HTTPError as exc: - logger.info(f"Got exception while dumping container '{container_name}' logs: {exc}") - continue - # Dump logs to the directory - file_path = os.path.join(directory_path, f"{container_name}-log.txt") - with open(file_path, "wb") as file: - file.write(logs) - - def _get_container_by_name(self, node_name: str, container_name: str) -> dict: - client = self._get_docker_client(node_name) - containers = client.containers(all=True) - - logger.info(f"Current containers state\n:{json.dumps(containers, indent=2)}") - - for container in containers: - # Names in local docker environment are prefixed with / - clean_names = set(name.strip("/") for name in container["Names"]) - if container_name in clean_names: - return container - return None - - def _wait_for_container_to_be_in_state( - self, node_name: str, container_name: str, expected_state: str - ) -> None: - for __attempt in range(10): - container = self._get_container_by_name(node_name, container_name) - logger.info(f"Container info:\n{json.dumps(container, indent=2)}") - if container and container["State"] == expected_state: - return - time.sleep(5) - - raise AssertionError(f"Container {container_name} is not in {expected_state} state.") - - def _get_docker_client(self, node_name: str) -> docker.APIClient: - # For local docker we use default docker client that talks to unix socket - client = docker.APIClient() - return client - - -class CloudVmStorageServiceHelper: - STORAGE_SERVICE = "neofs-storage.service" - - def stop_node(self, node_name: str, wait: bool = True) -> None: - with _create_ssh_client(node_name) as ssh_client: - cmd = f"sudo systemctl stop {self.STORAGE_SERVICE}" - output = ssh_client.exec_with_confirmation(cmd, [""]) - logger.info(f"Stop command output: {output.stdout}") - - if wait: - self._wait_for_service_to_be_in_state(node_name, self.STORAGE_SERVICE, "inactive") - - def start_node(self, node_name: str, wait: bool = True) -> None: - with _create_ssh_client(node_name) as ssh_client: - cmd = f"sudo systemctl start {self.STORAGE_SERVICE}" - output = ssh_client.exec_with_confirmation(cmd, [""]) - logger.info(f"Start command output: {output.stdout}") - - if wait: - self._wait_for_service_to_be_in_state( - node_name, self.STORAGE_SERVICE, "active (running)" - ) - - def run_control_command(self, node_name: str, command: str) -> str: - control_endpoint = NEOFS_NETMAP_DICT[node_name]["control"] - wallet_path = NEOFS_NETMAP_DICT[node_name]["wallet_path"] - - # Private control endpoint is accessible only from the host where storage node is running - # So, we connect to storage node host and run CLI command from there - with _create_ssh_client(node_name) as ssh_client: - # Copy wallet content on storage node host - with open(wallet_path, "r") as file: - wallet = file.read() - remote_wallet_path = f"/tmp/{node_name}-wallet.json" - ssh_client.exec_with_confirmation(f"echo '{wallet}' > {remote_wallet_path}", [""]) - - # Put config on storage node host - remote_config_path = f"/tmp/{node_name}-config.yaml" - remote_config = 'password: ""' - ssh_client.exec_with_confirmation( - f"echo '{remote_config}' > {remote_config_path}", [""] - ) - - # Execute command - cmd = ( - f"sudo {STORAGE_NODE_BIN_PATH}/neofs-cli {command} --endpoint {control_endpoint} " - f"--wallet {remote_wallet_path} --config {remote_config_path}" - ) - output = ssh_client.exec_with_confirmation(cmd, [""]) - return output.stdout - - def _wait_for_service_to_be_in_state( - self, node_name: str, service_name: str, expected_state: str - ) -> None: - with _create_ssh_client(node_name) as ssh_client: - for __attempt in range(10): - # Run command to get service status (set --lines=0 to suppress logs output) - # Also we don't verify return code, because for an inactive service return code will be 3 - command = f"sudo systemctl status {service_name} --lines=0" - output = ssh_client.exec(command, verify=False) - if expected_state in output.stdout: - return - time.sleep(3) - raise AssertionError(f"Service {service_name} is not in {expected_state} state") - - def delete_node_data(self, node_name: str) -> None: - with _create_ssh_client(node_name) as ssh_client: - ssh_client.exec("sudo rm -rf /srv/neofs/*") - - def get_binaries_version(self, binaries: list = None) -> dict: - default_binaries = [ - "neo-go", - "neofs-adm", - "neofs-cli", - "neofs-http-gw", - "neofs-ir", - "neofs-lens", - "neofs-node", - "neofs-s3-authmate", - "neofs-s3-gw", - "neogo-morph-cn", - ] - binaries = binaries or default_binaries - - version_map = {} - for node_name in NEOFS_NETMAP_DICT: - with _create_ssh_client(node_name) as ssh_client: - for binary in binaries: - try: - out = ssh_client.exec(f"sudo {binary} --version").stdout - except AssertionError as err: - logger.error(f"Can not get version for {binary} because of\n{err}") - version_map[binary] = "Can not get version" - continue - version = re.search(r"version[:\s]*v?(.+)", out, re.IGNORECASE) - version = version.group(1).strip() if version else "Unknown" - if not version_map.get(binary): - version_map[binary] = version - else: - assert version_map[binary] == version, ( - f"Expected binary {binary} to have identical version on all nodes " - f"(mismatch on node {node_name})" - ) - return version_map - - def dump_logs( - self, directory_path: str, since: Optional[datetime], until: Optional[datetime] - ) -> None: - for node_name, node_info in NEOFS_NETMAP_DICT.items(): - with _create_ssh_client(node_name) as ssh_client: - # We do not filter out logs of neofs services, because system logs might contain - # information that is useful for troubleshooting - filters = " ".join( - [ - f"--since '{since:%Y-%m-%d %H:%M:%S}'" if since else "", - f"--until '{until:%Y-%m-%d %H:%M:%S}'" if until else "", - ] - ) - result = ssh_client.exec(f"journalctl --no-pager {filters}") - logs = result.stdout - - # Dump logs to the directory. We include node endpoint in file name, because almost - # everywhere in Allure report we are logging endpoints rather than node names - file_path = os.path.join(directory_path, f"{node_name}-{node_info['rpc']}-log.txt") - with open(file_path, "w") as file: - file.write(logs) - - -class RemoteDevEnvStorageServiceHelper(LocalDevEnvStorageServiceHelper): - """ - Manages storage services running on remote devenv. - - Most of operations are identical to local devenv, however, any interactions - with host resources (files, etc.) require ssh into the remote host machine. - """ - - def _get_docker_client(self, node_name: str) -> docker.APIClient: - # For remote devenv we use docker client that talks to tcp socket 2375: - # https://docs.docker.com/engine/reference/commandline/dockerd/#daemon-socket-option - host = _get_node_host(node_name) - client = docker.APIClient(base_url=f"tcp://{host}:2375") - return client - - def delete_node_data(self, node_name: str) -> None: - volume_name = _get_storage_volume_name(node_name) - - client = self._get_docker_client(node_name) - volume_info = client.inspect_volume(volume_name) - volume_path = volume_info["Mountpoint"] - - # SSH into remote machine and delete files in host directory that is mounted as docker volume - with _create_ssh_client(node_name) as ssh_client: - # TODO: add sudo prefix after we change a user - ssh_client.exec(f"rm -rf {volume_path}/*") - - -def get_storage_service_helper(): - if INFRASTRUCTURE_TYPE == "LOCAL_DEVENV": - return LocalDevEnvStorageServiceHelper() - if INFRASTRUCTURE_TYPE == "REMOTE_DEVENV": - return RemoteDevEnvStorageServiceHelper() - if INFRASTRUCTURE_TYPE == "CLOUD_VM": - return CloudVmStorageServiceHelper() - - raise EnvironmentError(f"Infrastructure type is not supported: {INFRASTRUCTURE_TYPE}") - - -@contextmanager -def _create_ssh_client(node_name: str) -> HostClient: - host = _get_node_host(node_name) - ssh_client = HostClient( - host, - login=STORAGE_NODE_SSH_USER, - password=STORAGE_NODE_SSH_PASSWORD, - private_key_path=STORAGE_NODE_SSH_PRIVATE_KEY_PATH, - ) - - try: - yield ssh_client - finally: - ssh_client.drop() - - -def _get_node_host(node_name: str) -> str: - if node_name not in NEOFS_NETMAP_DICT: - raise AssertionError(f"Node {node_name} is not found!") - - # We use rpc endpoint to determine host address, because control endpoint - # (if it is private) will be a local address on the host machine - node_config = NEOFS_NETMAP_DICT.get(node_name) - host = node_config.get("rpc").split(":")[0] - return host - - -def _get_storage_container_name(node_name: str) -> str: - """ - Converts name of storage node (as it is listed in netmap) into the name of docker container - that runs instance of this storage node. - """ - return node_name.split(".")[0] - - -def _get_storage_volume_name(node_name: str) -> str: - """ - Converts name of storage node (as it is listed in netmap) into the name of docker volume - that contains data of this storage node. - """ - container_name = _get_storage_container_name(node_name) - return f"storage_storage_{container_name}" diff --git a/pytest_tests/requirements.txt b/pytest_tests/requirements.txt index 785cd4d..fe42f50 100644 --- a/pytest_tests/requirements.txt +++ b/pytest_tests/requirements.txt @@ -35,7 +35,7 @@ neo-mamba==0.10.0 neo3crypto==0.2.1 neo3vm==0.9.0 neo3vm-stubs==0.9.0 -neofs-testlib==0.1.0 +neofs-testlib==0.2.0 netaddr==0.8.0 orjson==3.6.8 packaging==21.3 diff --git a/pytest_tests/testsuites/acl/test_eacl.py b/pytest_tests/testsuites/acl/test_eacl.py index d5f32d9..27c32d0 100644 --- a/pytest_tests/testsuites/acl/test_eacl.py +++ b/pytest_tests/testsuites/acl/test_eacl.py @@ -2,6 +2,7 @@ import allure import pytest from common import NEOFS_NETMAP_DICT from failover_utils import wait_object_replication_on_nodes +from neofs_testlib.hosting import Hosting from python_keywords.acl import ( EACLAccess, EACLOperation, @@ -194,7 +195,7 @@ class TestEACLContainer: @allure.title("Testcase to validate NeoFS replication with eACL deny rules.") def test_extended_acl_deny_replication( - self, wallets, client_shell, eacl_full_placement_container_with_object, file_path + self, wallets, client_shell, hosting: Hosting, eacl_full_placement_container_with_object, file_path ): user_wallet = wallets.get_wallet() cid, oid, file_path = eacl_full_placement_container_with_object @@ -217,7 +218,7 @@ class TestEACLContainer: wait_for_cache_expired() with allure.step("Drop object to check replication"): - drop_object(node_name=[*NEOFS_NETMAP_DICT][0], cid=cid, oid=oid) + drop_object(hosting, node_name=[*NEOFS_NETMAP_DICT][0], cid=cid, oid=oid) storage_wallet_path = NEOFS_NETMAP_DICT[[*NEOFS_NETMAP_DICT][0]]["wallet_path"] with allure.step("Wait for dropped object replicated"): diff --git a/pytest_tests/testsuites/conftest.py b/pytest_tests/testsuites/conftest.py index 82935db..c0081dd 100644 --- a/pytest_tests/testsuites/conftest.py +++ b/pytest_tests/testsuites/conftest.py @@ -1,30 +1,26 @@ import logging import os -import re import shutil from datetime import datetime import allure import pytest import wallet -from cli_helpers import _cmd_run +import yaml +from binary_version_helper import get_local_binaries_versions, get_remote_binaries_versions from common import ( ASSETS_DIR, FREE_STORAGE, + HOSTING_CONFIG_FILE, INFRASTRUCTURE_TYPE, MAINNET_WALLET_PATH, - NEOFS_ADM_EXEC, - NEOFS_CLI_EXEC, NEOFS_NETMAP_DICT, - WALLET_CONFIG, ) -from neofs_testlib.cli import NeofsAdm, NeofsCli - from env_properties import save_env_properties +from neofs_testlib.hosting import Hosting from neofs_testlib.shell import LocalShell, Shell from payment_neogo import neofs_deposit, transfer_mainnet_gas from python_keywords.node_management import node_healthcheck -from service_helper import get_storage_service_helper logger = logging.getLogger("NeoLogger") @@ -41,41 +37,24 @@ def cloud_infrastructure_check(): yield +@pytest.fixture(scope="session") +def hosting() -> Hosting: + with open(HOSTING_CONFIG_FILE, "r") as file: + hosting_config = yaml.full_load(file) + + hosting_instance = Hosting() + hosting_instance.configure(hosting_config) + yield hosting_instance + + @pytest.fixture(scope="session", autouse=True) @allure.title("Check binary versions") -def check_binary_versions(request, client_shell): - # Collect versions of local binaries - binaries = ["neo-go", "neofs-authmate"] - local_binaries = _get_binaries_version_local(binaries) +def check_binary_versions(request, hosting: Hosting, client_shell: Shell): + local_versions = get_local_binaries_versions(client_shell) + remote_versions = get_remote_binaries_versions(hosting) - try: - local_binaries["neofs-adm"] = NeofsAdm(client_shell, NEOFS_ADM_EXEC).version.get() - except RuntimeError: - logger.info(f"neofs-adm not installed") - local_binaries["neofs-cli"] = NeofsCli( - client_shell, NEOFS_CLI_EXEC, WALLET_CONFIG - ).version.get() - # Collect versions of remote binaries - - helper = get_storage_service_helper() - remote_binaries = helper.get_binaries_version() - all_binaries = {**local_binaries, **remote_binaries} - - # Get version of aws binary - out = _cmd_run("aws --version") - out_lines = out.split("\n") - all_binaries["AWS"] = out_lines[0] if out_lines else "Unknown" - - save_env_properties(request.config, all_binaries) - - -def _get_binaries_version_local(binaries: list) -> dict: - env_out = {} - for binary in binaries: - out = _cmd_run(f"{binary} --version") - version = re.search(r"version[:\s]*(.+)", out, re.IGNORECASE) - env_out[binary] = version.group(1).strip() if version else "Unknown" - return env_out + all_versions = {**local_versions, **remote_versions} + save_env_properties(request.config, all_versions) @pytest.fixture(scope="session") @@ -90,7 +69,7 @@ def prepare_tmp_dir(): @pytest.fixture(scope="session", autouse=True) @allure.title("Collect logs") -def collect_logs(prepare_tmp_dir): +def collect_logs(prepare_tmp_dir, hosting: Hosting): start_time = datetime.utcnow() yield end_time = datetime.utcnow() @@ -99,8 +78,8 @@ def collect_logs(prepare_tmp_dir): logs_dir = os.path.join(prepare_tmp_dir, "logs") os.makedirs(logs_dir) - helper = get_storage_service_helper() - helper.dump_logs(logs_dir, since=start_time, until=end_time) + for host in hosting.hosts: + host.dump_logs(logs_dir, since=start_time, until=end_time) # Zip all files and attach to Allure because it is more convenient to download a single # zip with all logs rather than mess with individual logs files per service or node @@ -110,10 +89,10 @@ def collect_logs(prepare_tmp_dir): @pytest.fixture(scope="session", autouse=True) @allure.title("Run health check for all storage nodes") -def run_health_check(collect_logs): +def run_health_check(collect_logs, hosting: Hosting): failed_nodes = [] for node_name in NEOFS_NETMAP_DICT.keys(): - health_check = node_healthcheck(node_name) + health_check = node_healthcheck(hosting, node_name) if health_check.health_status != "READY" or health_check.network_status != "ONLINE": failed_nodes.append(node_name) diff --git a/pytest_tests/testsuites/failovers/test_failover_network.py b/pytest_tests/testsuites/failovers/test_failover_network.py index 96d92c2..0b8f205 100644 --- a/pytest_tests/testsuites/failovers/test_failover_network.py +++ b/pytest_tests/testsuites/failovers/test_failover_network.py @@ -12,6 +12,7 @@ from common import ( from failover_utils import wait_all_storage_node_returned, wait_object_replication_on_nodes from file_helper import generate_file, get_file_hash from iptables_helper import IpTablesHelper +from neofs_testlib.hosting import Hosting from python_keywords.container import create_container from python_keywords.neofs_verbs import get_object, put_object from ssh_helper import HostClient @@ -26,7 +27,7 @@ blocked_hosts = [] @pytest.fixture(autouse=True) @allure.step("Restore network") -def restore_network(): +def restore_network(hosting: Hosting): yield not_empty = len(blocked_hosts) != 0 @@ -42,7 +43,7 @@ def restore_network(): IpTablesHelper.restore_input_traffic_to_port(client, PORTS_TO_BLOCK) blocked_hosts.remove(host) if not_empty: - wait_all_storage_node_returned() + wait_all_storage_node_returned(hosting) @allure.title("Block Storage node traffic") diff --git a/pytest_tests/testsuites/failovers/test_failover_storage.py b/pytest_tests/testsuites/failovers/test_failover_storage.py index 18aedf4..d8cefab 100644 --- a/pytest_tests/testsuites/failovers/test_failover_storage.py +++ b/pytest_tests/testsuites/failovers/test_failover_storage.py @@ -9,9 +9,9 @@ from common import ( ) from failover_utils import wait_all_storage_node_returned, wait_object_replication_on_nodes from file_helper import generate_file, get_file_hash +from neofs_testlib.hosting import Hosting from python_keywords.container import create_container from python_keywords.neofs_verbs import get_object, put_object -from sbercloud_helper import SberCloud, SberCloudConfig from ssh_helper import HostClient from wellknown_acl import PUBLIC_ACL @@ -19,21 +19,11 @@ logger = logging.getLogger("NeoLogger") stopped_hosts = [] -@pytest.fixture(scope="session") -def sbercloud_client(): - with allure.step("Connect to SberCloud"): - try: - config = SberCloudConfig.from_env() - yield SberCloud(config) - except Exception as err: - pytest.fail(f"SberCloud infrastructure not available. Error\n{err}") - - @pytest.fixture(autouse=True) @allure.step("Return all storage nodes") -def return_all_storage_nodes_fixture(sbercloud_client): +def return_all_storage_nodes_fixture(hosting: Hosting): yield - return_all_storage_nodes(sbercloud_client) + return_all_storage_nodes(hosting) def panic_reboot_host(ip: str = None): @@ -50,13 +40,14 @@ def panic_reboot_host(ip: str = None): ssh_stdin.close() -def return_all_storage_nodes(sbercloud_client: SberCloud) -> None: - for host in list(stopped_hosts): - with allure.step(f"Start storage node {host}"): - sbercloud_client.start_node(node_ip=host.split(":")[-2]) - stopped_hosts.remove(host) +def return_all_storage_nodes(hosting: Hosting) -> None: + for host_address in list(stopped_hosts): + with allure.step(f"Start host {host_address}"): + host = hosting.get_host_by_address(host_address) + host.start_host() + stopped_hosts.remove(host_address) - wait_all_storage_node_returned() + wait_all_storage_node_returned(hosting) @allure.title("Lost and returned nodes") @@ -65,8 +56,7 @@ def return_all_storage_nodes(sbercloud_client: SberCloud) -> None: def test_lost_storage_node( prepare_wallet_and_deposit, client_shell, - sbercloud_client: SberCloud, - cloud_infrastructure_check, + hosting: Hosting, hard_reboot: bool, ): wallet = prepare_wallet_and_deposit @@ -78,19 +68,18 @@ def test_lost_storage_node( new_nodes = [] for node in nodes: - stopped_hosts.append(node) + host = hosting.get_host_by_service(node) + stopped_hosts.append(host.config.address) with allure.step(f"Stop storage node {node}"): - sbercloud_client.stop_node(node_ip=node.split(":")[-2], hard=hard_reboot) - new_nodes = wait_object_replication_on_nodes( - wallet, cid, oid, 2, shell=client_shell, excluded_nodes=[node] - ) + host.stop_host("hard" if hard_reboot else "soft") + new_nodes = wait_object_replication_on_nodes(wallet, cid, oid, 2, shell=client_shell, excluded_nodes=[node]) assert not [node for node in nodes if node in new_nodes] got_file_path = get_object(wallet, cid, oid, shell=client_shell, endpoint=new_nodes[0]) assert get_file_hash(source_file_path) == get_file_hash(got_file_path) with allure.step(f"Return storage nodes"): - return_all_storage_nodes(sbercloud_client) + return_all_storage_nodes(hosting) new_nodes = wait_object_replication_on_nodes(wallet, cid, oid, 2, shell=client_shell) diff --git a/pytest_tests/testsuites/network/test_node_management.py b/pytest_tests/testsuites/network/test_node_management.py index 20b8b11..727438c 100644 --- a/pytest_tests/testsuites/network/test_node_management.py +++ b/pytest_tests/testsuites/network/test_node_management.py @@ -1,6 +1,7 @@ import logging from random import choice from time import sleep +from typing import Optional import allure import pytest @@ -17,6 +18,7 @@ from data_formatters import get_wallet_public_key from epoch import tick_epoch from file_helper import generate_file from grpc_responses import OBJECT_NOT_FOUND, error_matches_status +from neofs_testlib.hosting import Hosting from python_keywords.container import create_container, get_container from python_keywords.failover_utils import wait_object_replication_on_nodes from python_keywords.neofs_verbs import delete_object, get_object, head_object, put_object @@ -35,7 +37,6 @@ from python_keywords.node_management import ( start_nodes, stop_nodes, ) -from service_helper import get_storage_service_helper from storage_policy import get_nodes_with_object, get_simple_object_copies from utility import parse_time, placement_policy_from_container, wait_for_gc_pass_on_storage_nodes from wellknown_acl import PUBLIC_ACL @@ -46,7 +47,7 @@ check_nodes = [] @pytest.fixture @allure.title("Create container and pick the node with data") -def create_container_and_pick_node(prepare_wallet_and_deposit, client_shell): +def create_container_and_pick_node(prepare_wallet_and_deposit, client_shell, hosting: Hosting): wallet = prepare_wallet_and_deposit file_path = generate_file() placement_rule = "REP 1 IN X CBF 1 SELECT 1 FROM * AS X" @@ -64,45 +65,45 @@ def create_container_and_pick_node(prepare_wallet_and_deposit, client_shell): yield cid, node_name - shards = node_shard_list(node_name) + shards = node_shard_list(hosting, node_name) assert shards for shard in shards: - node_shard_set_mode(node_name, shard, "read-write") + node_shard_set_mode(hosting, node_name, shard, "read-write") - node_shard_list(node_name) + node_shard_list(hosting, node_name) @pytest.fixture -def after_run_start_all_nodes(): +def after_run_start_all_nodes(hosting: Hosting): yield try: - start_nodes(list(NEOFS_NETMAP_DICT.keys())) + start_nodes(hosting, list(NEOFS_NETMAP_DICT.keys())) except Exception as err: logger.error(f"Node start fails with error:\n{err}") @pytest.fixture -def return_nodes_after_test_run(client_shell): +def return_nodes_after_test_run(client_shell: Shell, hosting: Hosting): yield - return_nodes(shell=client_shell) + return_nodes(client_shell, hosting) @allure.step("Return node to cluster") -def return_nodes(shell: Shell, alive_node: str = None): - helper = get_storage_service_helper() +def return_nodes(shell: Shell, hosting: Hosting, alive_node: Optional[str] = None) -> None: for node in list(check_nodes): with allure.step(f"Start node {node}"): - helper.start_node(node) + host = hosting.get_host_by_service(node) + host.start_service(node) with allure.step(f"Waiting status ready for node {node}"): - wait_for_node_to_be_ready(node) + wait_for_node_to_be_ready(hosting, node) # We need to wait for node to establish notifications from morph-chain # Otherwise it will hang up when we will try to set status sleep(parse_time(MORPH_BLOCK_TIME)) with allure.step(f"Move node {node} to online state"): - node_set_status(node, status="online", retries=2) + node_set_status(hosting, node, status="online", retries=2) check_nodes.remove(node) sleep(parse_time(MORPH_BLOCK_TIME)) @@ -120,7 +121,7 @@ def return_nodes(shell: Shell, alive_node: str = None): @pytest.mark.add_nodes @pytest.mark.node_mgmt def test_add_nodes( - prepare_tmp_dir, client_shell, prepare_wallet_and_deposit, return_nodes_after_test_run + prepare_tmp_dir, client_shell, prepare_wallet_and_deposit, return_nodes_after_test_run, hosting: Hosting ): wallet = prepare_wallet_and_deposit placement_rule_3 = "REP 3 IN X CBF 1 SELECT 3 FROM * AS X" @@ -140,8 +141,8 @@ def test_add_nodes( # Add node to recovery list before messing with it check_nodes.append(additional_node) - exclude_node_from_network_map(additional_node, alive_node) - delete_node_data(additional_node) + exclude_node_from_network_map(hosting, additional_node, alive_node) + delete_node_data(hosting, additional_node) cid = create_container(wallet, rule=placement_rule_3, basic_acl=PUBLIC_ACL) oid = put_object( @@ -149,16 +150,16 @@ def test_add_nodes( ) wait_object_replication_on_nodes(wallet, cid, oid, 3) - return_nodes(shell=client_shell, alive_node=alive_node) + return_nodes(shell=client_shell, hosting=hosting, alive_node=alive_node) with allure.step("Check data could be replicated to new node"): random_node = choice( [node for node in NEOFS_NETMAP_DICT if node not in (additional_node, alive_node)] ) - exclude_node_from_network_map(random_node, alive_node) + exclude_node_from_network_map(hosting, random_node, alive_node) wait_object_replication_on_nodes(wallet, cid, oid, 3, excluded_nodes=[random_node]) - include_node_to_network_map(random_node, alive_node, shell=client_shell) + include_node_to_network_map(hosting, random_node, alive_node, shell=client_shell) wait_object_replication_on_nodes(wallet, cid, oid, 3) with allure.step("Check container could be created with new node"): @@ -171,7 +172,7 @@ def test_add_nodes( @allure.title("Control Operations with storage nodes") @pytest.mark.node_mgmt -def test_nodes_management(prepare_tmp_dir, client_shell): +def test_nodes_management(prepare_tmp_dir, client_shell, hosting: Hosting): """ This test checks base control operations with storage nodes (healthcheck, netmap-snapshot, set-status). """ @@ -188,29 +189,29 @@ def test_nodes_management(prepare_tmp_dir, client_shell): with allure.step("Run health check for all storage nodes"): for node_name in NEOFS_NETMAP_DICT.keys(): - health_check = node_healthcheck(node_name) + health_check = node_healthcheck(hosting, node_name) assert health_check.health_status == "READY" and health_check.network_status == "ONLINE" with allure.step(f"Move node {random_node} to offline state"): - node_set_status(random_node, status="offline") + node_set_status(hosting, random_node, status="offline") sleep(parse_time(MORPH_BLOCK_TIME)) tick_epoch() with allure.step(f"Check node {random_node} went to offline"): - health_check = node_healthcheck(random_node) + health_check = node_healthcheck(hosting, random_node) assert health_check.health_status == "READY" and health_check.network_status == "OFFLINE" snapshot = get_netmap_snapshot(node_name=alive_node, shell=client_shell) assert random_node_netmap_key not in snapshot, f"Expected node {random_node} not in netmap" with allure.step(f"Check node {random_node} went to online"): - node_set_status(random_node, status="online") + node_set_status(hosting, random_node, status="online") sleep(parse_time(MORPH_BLOCK_TIME)) tick_epoch() with allure.step(f"Check node {random_node} went to online"): - health_check = node_healthcheck(random_node) + health_check = node_healthcheck(hosting, random_node) assert health_check.health_status == "READY" and health_check.network_status == "ONLINE" snapshot = get_netmap_snapshot(node_name=alive_node, shell=client_shell) assert random_node_netmap_key in snapshot, f"Expected node {random_node} in netmap" @@ -325,7 +326,7 @@ def test_placement_policy_negative(prepare_wallet_and_deposit, placement_rule, e @pytest.mark.skip(reason="We cover this scenario for Sbercloud in failover tests") @pytest.mark.node_mgmt @allure.title("NeoFS object replication on node failover") -def test_replication(prepare_wallet_and_deposit, after_run_start_all_nodes): +def test_replication(prepare_wallet_and_deposit, after_run_start_all_nodes, hosting: Hosting): """ Test checks object replication on storage not failover and come back. """ @@ -342,22 +343,22 @@ def test_replication(prepare_wallet_and_deposit, after_run_start_all_nodes): ), f"Expected {expected_nodes_count} copies, got {len(nodes)}" node_names = [name for name, config in NEOFS_NETMAP_DICT.items() if config.get("rpc") in nodes] - stopped_nodes = stop_nodes(1, node_names) + stopped_nodes = stop_nodes(hosting, 1, node_names) wait_for_expected_object_copies(wallet, cid, oid) - start_nodes(stopped_nodes) + start_nodes(hosting, stopped_nodes) tick_epoch() for node_name in node_names: - wait_for_node_go_online(node_name) + wait_for_node_go_online(hosting, node_name) wait_for_expected_object_copies(wallet, cid, oid) @pytest.mark.node_mgmt @allure.title("NeoFS object could be dropped using control command") -def test_drop_object(prepare_wallet_and_deposit): +def test_drop_object(prepare_wallet_and_deposit, hosting: Hosting): """ Test checks object could be dropped using `neofs-cli control drop-objects` command. """ @@ -383,7 +384,7 @@ def test_drop_object(prepare_wallet_and_deposit): with allure.step(f"Drop object {oid}"): get_object(wallet, cid, oid) head_object(wallet, cid, oid) - drop_object(node_name, cid, oid) + drop_object(hosting, node_name, cid, oid) wait_for_obj_dropped(wallet, cid, oid, get_object) wait_for_obj_dropped(wallet, cid, oid, head_object) @@ -391,7 +392,7 @@ def test_drop_object(prepare_wallet_and_deposit): @pytest.mark.node_mgmt @pytest.mark.skip(reason="Need to clarify scenario") @allure.title("Control Operations with storage nodes") -def test_shards(prepare_wallet_and_deposit, create_container_and_pick_node): +def test_shards(prepare_wallet_and_deposit, create_container_and_pick_node, hosting: Hosting): wallet = prepare_wallet_and_deposit file_path = generate_file() @@ -400,13 +401,13 @@ def test_shards(prepare_wallet_and_deposit, create_container_and_pick_node): # for mode in ('read-only', 'degraded'): for mode in ("degraded",): - shards = node_shard_list(node_name) + shards = node_shard_list(hosting, node_name) assert shards for shard in shards: - node_shard_set_mode(node_name, shard, mode) + node_shard_set_mode(hosting, node_name, shard, mode) - shards = node_shard_list(node_name) + shards = node_shard_list(hosting, node_name) assert shards with pytest.raises(RuntimeError): @@ -419,9 +420,9 @@ def test_shards(prepare_wallet_and_deposit, create_container_and_pick_node): get_object(wallet, cid, original_oid) for shard in shards: - node_shard_set_mode(node_name, shard, "read-write") + node_shard_set_mode(hosting, node_name, shard, "read-write") - shards = node_shard_list(node_name) + shards = node_shard_list(hosting, node_name) assert shards oid = put_object(wallet, file_path, cid) @@ -442,11 +443,11 @@ def validate_object_copies(wallet: str, placement_rule: str, file_path: str, exp @allure.step("Wait for node {node_name} goes online") -def wait_for_node_go_online(node_name: str) -> None: +def wait_for_node_go_online(hosting: Hosting, node_name: str) -> None: timeout, attempts = 5, 20 for _ in range(attempts): try: - health_check = node_healthcheck(node_name) + health_check = node_healthcheck(hosting, node_name) assert health_check.health_status == "READY" and health_check.network_status == "ONLINE" return except Exception as err: @@ -458,11 +459,11 @@ def wait_for_node_go_online(node_name: str) -> None: @allure.step("Wait for node {node_name} is ready") -def wait_for_node_to_be_ready(node_name: str) -> None: +def wait_for_node_to_be_ready(hosting: Hosting, node_name: str) -> None: timeout, attempts = 30, 6 for _ in range(attempts): try: - health_check = node_healthcheck(node_name) + health_check = node_healthcheck(hosting, node_name) if health_check.health_status == "READY": return except Exception as err: diff --git a/pytest_tests/testsuites/services/test_binaries.py b/pytest_tests/testsuites/services/test_binaries.py index 2aa133b..5800f61 100644 --- a/pytest_tests/testsuites/services/test_binaries.py +++ b/pytest_tests/testsuites/services/test_binaries.py @@ -5,9 +5,10 @@ from re import match import allure import pytest import requests +from binary_version_helper import get_remote_binaries_versions from common import BIN_VERSIONS_FILE from env_properties import read_env_properties, save_env_properties -from service_helper import get_storage_service_helper +from neofs_testlib.hosting import Hosting logger = logging.getLogger("NeoLogger") @@ -15,7 +16,7 @@ logger = logging.getLogger("NeoLogger") @allure.title("Check binaries versions") @pytest.mark.check_binaries @pytest.mark.skip("Skipped due to https://j.yadro.com/browse/OBJECT-628") -def test_binaries_versions(request): +def test_binaries_versions(request, hosting: Hosting): """ Compare binaries versions from external source (url) and deployed on servers. """ @@ -24,8 +25,7 @@ def test_binaries_versions(request): binaries_to_check = download_versions_info(BIN_VERSIONS_FILE) with allure.step("Get binaries versions from servers"): - helper = get_storage_service_helper() - got_versions = helper.get_binaries_version(binaries=list(binaries_to_check.keys())) + got_versions = get_remote_binaries_versions(hosting) env_properties = read_env_properties(request.config) diff --git a/robot/resources/lib/python_keywords/failover_utils.py b/robot/resources/lib/python_keywords/failover_utils.py index d9088fe..1ea4aeb 100644 --- a/robot/resources/lib/python_keywords/failover_utils.py +++ b/robot/resources/lib/python_keywords/failover_utils.py @@ -5,6 +5,7 @@ from typing import Optional import allure from common import NEOFS_NETMAP_DICT from neofs_testlib.shell import Shell +from neofs_testlib.hosting import Hosting from python_keywords.node_management import node_healthcheck from storage_policy import get_nodes_with_object @@ -35,20 +36,20 @@ def wait_object_replication_on_nodes( @allure.step("Wait for storage node returned to cluster") -def wait_all_storage_node_returned(): +def wait_all_storage_node_returned(hosting: Hosting) -> None: sleep_interval, attempts = 15, 20 for __attempt in range(attempts): - if is_all_storage_node_returned(): + if is_all_storage_node_returned(hosting): return sleep(sleep_interval) raise AssertionError("Storage node(s) is broken") -def is_all_storage_node_returned() -> bool: +def is_all_storage_node_returned(hosting: Hosting) -> bool: with allure.step("Run health check for all storage nodes"): for node_name in NEOFS_NETMAP_DICT.keys(): try: - health_check = node_healthcheck(node_name) + health_check = node_healthcheck(hosting, node_name) except Exception as err: logger.warning(f"Node healthcheck fails with error {err}") return False diff --git a/robot/resources/lib/python_keywords/node_management.py b/robot/resources/lib/python_keywords/node_management.py index 84aa927..3f67309 100644 --- a/robot/resources/lib/python_keywords/node_management.py +++ b/robot/resources/lib/python_keywords/node_management.py @@ -1,5 +1,3 @@ -#!/usr/bin/python3.9 - import logging import random import re @@ -19,7 +17,7 @@ from data_formatters import get_wallet_public_key from epoch import tick_epoch from neofs_testlib.cli import NeofsCli from neofs_testlib.shell import Shell -from service_helper import get_storage_service_helper +from neofs_testlib.hosting import Hosting from utility import parse_time logger = logging.getLogger("NeoLogger") @@ -42,7 +40,7 @@ class HealthStatus: @allure.step("Stop storage nodes") -def stop_nodes(number: int, nodes: list[str]) -> list[str]: +def stop_nodes(hosting: Hosting, number: int, nodes: list[str]) -> list[str]: """ Shuts down the given number of randomly selected storage nodes. Args: @@ -51,23 +49,23 @@ def stop_nodes(number: int, nodes: list[str]) -> list[str]: Returns: (list): the list of nodes that were shut down """ - helper = get_storage_service_helper() nodes_to_stop = random.sample(nodes, number) for node in nodes_to_stop: - helper.stop_node(node) + host = hosting.get_host_by_service(node) + host.stop_service(node) return nodes_to_stop @allure.step("Start storage nodes") -def start_nodes(nodes: list[str]) -> None: +def start_nodes(hosting: Hosting, nodes: list[str]) -> None: """ The function starts specified storage nodes. Args: nodes (list): the list of nodes to start """ - helper = get_storage_service_helper() for node in nodes: - helper.start(node) + host = hosting.get_host_by_service(node) + host.start_service(node) @allure.step("Get Locode") @@ -79,7 +77,7 @@ def get_locode() -> str: @allure.step("Healthcheck for node {node_name}") -def node_healthcheck(node_name: str) -> HealthStatus: +def node_healthcheck(hosting: Hosting, node_name: str) -> HealthStatus: """ The function returns node's health status. Args: @@ -88,12 +86,12 @@ def node_healthcheck(node_name: str) -> HealthStatus: health status as HealthStatus object. """ command = "control healthcheck" - output = _run_control_command(node_name, command) + output = _run_control_command_with_retries(hosting, node_name, command) return HealthStatus.from_stdout(output) @allure.step("Set status for node {node_name}") -def node_set_status(node_name: str, status: str, retries: int = 0) -> None: +def node_set_status(hosting: Hosting, node_name: str, status: str, retries: int = 0) -> None: """ The function sets particular status for given node. Args: @@ -102,7 +100,7 @@ def node_set_status(node_name: str, status: str, retries: int = 0) -> None: retries (optional, int): number of retry attempts if it didn't work from the first time """ command = f"control set-status --status {status}" - _run_control_command(node_name, command, retries) + _run_control_command_with_retries(hosting, node_name, command, retries) @allure.step("Get netmap snapshot") @@ -123,7 +121,7 @@ def get_netmap_snapshot(node_name: str, shell: Shell) -> str: @allure.step("Get shard list for node {node_name}") -def node_shard_list(node_name: str) -> list[str]: +def node_shard_list(hosting: Hosting, node_name: str) -> list[str]: """ The function returns list of shards for specified node. Args: @@ -132,46 +130,46 @@ def node_shard_list(node_name: str) -> list[str]: list of shards. """ command = "control shards list" - output = _run_control_command(node_name, command) + output = _run_control_command_with_retries(hosting, node_name, command) return re.findall(r"Shard (.*):", output) @allure.step("Shard set for node {node_name}") -def node_shard_set_mode(node_name: str, shard: str, mode: str) -> str: +def node_shard_set_mode(hosting: Hosting, node_name: str, shard: str, mode: str) -> str: """ The function sets mode for specified shard. Args: node_name str: node name on which shard mode should be set. """ command = f"control shards set-mode --id {shard} --mode {mode}" - return _run_control_command(node_name, command) + return _run_control_command_with_retries(hosting, node_name, command) @allure.step("Drop object from node {node_name}") -def drop_object(node_name: str, cid: str, oid: str) -> str: +def drop_object(hosting: Hosting, node_name: str, cid: str, oid: str) -> str: """ The function drops object from specified node. Args: node_name str: node name from which object should be dropped. """ command = f"control drop-objects -o {cid}/{oid}" - return _run_control_command(node_name, command) + return _run_control_command_with_retries(hosting, node_name, command) @allure.step("Delete data of node {node_name}") -def delete_node_data(node_name: str) -> None: - helper = get_storage_service_helper() - helper.stop_node(node_name) - helper.delete_node_data(node_name) +def delete_node_data(hosting: Hosting, node_name: str) -> None: + host = hosting.get_host_by_service(node_name) + host.stop_service(node_name) + host.delete_storage_node_data(node_name) time.sleep(parse_time(MORPH_BLOCK_TIME)) @allure.step("Exclude node {node_to_exclude} from network map") -def exclude_node_from_network_map(node_to_exclude: str, alive_node: str, shell: Shell) -> None: +def exclude_node_from_network_map(hosting: Hosting, node_to_exclude: str, alive_node: str, shell: Shell) -> None: node_wallet_path = NEOFS_NETMAP_DICT[node_to_exclude]["wallet_path"] node_netmap_key = get_wallet_public_key(node_wallet_path, STORAGE_WALLET_PASS) - node_set_status(node_to_exclude, status="offline") + node_set_status(hosting, node_to_exclude, status="offline") time.sleep(parse_time(MORPH_BLOCK_TIME)) tick_epoch() @@ -183,8 +181,8 @@ def exclude_node_from_network_map(node_to_exclude: str, alive_node: str, shell: @allure.step("Include node {node_to_include} into network map") -def include_node_to_network_map(node_to_include: str, alive_node: str, shell: Shell) -> None: - node_set_status(node_to_include, status="online") +def include_node_to_network_map(hosting: Hosting, node_to_include: str, alive_node: str, shell: Shell) -> None: + node_set_status(hosting, node_to_include, status="online") time.sleep(parse_time(MORPH_BLOCK_TIME)) tick_epoch() @@ -204,13 +202,38 @@ def check_node_in_map(node_name: str, shell: Shell, alive_node: Optional[str] = assert node_netmap_key in snapshot, f"Expected node with key {node_netmap_key} in network map" -def _run_control_command(node_name: str, command: str, retries: int = 0) -> str: - helper = get_storage_service_helper() +def _run_control_command_with_retries( + hosting: Hosting, node_name: str, command: str, retries: int = 0 +) -> str: for attempt in range(1 + retries): # original attempt + specified retries try: - return helper.run_control_command(node_name, command) + return _run_control_command(hosting, node_name, command) except AssertionError as err: if attempt < retries: logger.warning(f"Command {command} failed with error {err} and will be retried") continue raise AssertionError(f"Command {command} failed with error {err}") from err + + +def _run_control_command(hosting: Hosting, service_name: str, command: str) -> None: + host = hosting.get_host_by_service(service_name) + + service_config = host.get_service_config(service_name) + wallet_path = service_config.attributes["wallet_path"] + wallet_password = service_config.attributes["wallet_password"] + control_endpoint = service_config.attributes["control_endpoint"] + + shell = host.get_shell() + wallet_config_path = f"/tmp/{service_name}-config.yaml" + wallet_config = f'password: "{wallet_password}"' + shell.exec(f"echo '{wallet_config}' > {wallet_config_path}") + + cli_config = host.get_cli_config("neofs-cli") + + # TODO: implement cli.control + # cli = NeofsCli(shell, cli_config.exec_path, wallet_config_path) + result = shell.exec( + f"{cli_config.exec_path} {command} --endpoint {control_endpoint} " + f"--wallet {wallet_path} --config {wallet_config_path}" + ) + return result.stdout diff --git a/robot/variables/common.py b/robot/variables/common.py index bd535c0..329a123 100644 --- a/robot/variables/common.py +++ b/robot/variables/common.py @@ -17,12 +17,13 @@ NEOFS_CONTRACT_CACHE_TIMEOUT = os.getenv("NEOFS_CONTRACT_CACHE_TIMEOUT", "30s") # of 1min plus 15 seconds for GC pass itself) STORAGE_GC_TIME = os.getenv("STORAGE_GC_TIME", "75s") +# TODO: we should use hosting instead of these endpoints NEOFS_ENDPOINT = os.getenv("NEOFS_ENDPOINT", "s01.neofs.devenv:8080") - NEO_MAINNET_ENDPOINT = os.getenv("NEO_MAINNET_ENDPOINT", "http://main-chain.neofs.devenv:30333") MORPH_ENDPOINT = os.getenv("MORPH_ENDPOINT", "http://morph-chain.neofs.devenv:30333") HTTP_GATE = os.getenv("HTTP_GATE", "http://http.neofs.devenv") S3_GATE = os.getenv("S3_GATE", "https://s3.neofs.devenv:8080") + GAS_HASH = os.getenv("GAS_HASH", "0xd2a4cff31913016155e38e474a2c06d08be276cf") NEOFS_CONTRACT = os.getenv("NEOFS_IR_CONTRACTS_NEOFS") @@ -30,12 +31,11 @@ NEOFS_CONTRACT = os.getenv("NEOFS_IR_CONTRACTS_NEOFS") ASSETS_DIR = os.getenv("ASSETS_DIR", "TemporaryDir") DEVENV_PATH = os.getenv("DEVENV_PATH", os.path.join("..", "neofs-dev-env")) -MORPH_MAGIC = os.getenv("MORPH_MAGIC") - # Password of wallet owned by user on behalf of whom we are running tests WALLET_PASS = os.getenv("WALLET_PASS", "") # Configuration of storage nodes +# TODO: we should use hosting instead of all these variables STORAGE_RPC_ENDPOINT_1 = os.getenv("STORAGE_RPC_ENDPOINT_1", "s01.neofs.devenv:8080") STORAGE_RPC_ENDPOINT_2 = os.getenv("STORAGE_RPC_ENDPOINT_2", "s02.neofs.devenv:8080") STORAGE_RPC_ENDPOINT_3 = os.getenv("STORAGE_RPC_ENDPOINT_3", "s03.neofs.devenv:8080") @@ -89,7 +89,13 @@ NEOFS_NETMAP_DICT = { } NEOFS_NETMAP = [node["rpc"] for node in NEOFS_NETMAP_DICT.values()] -# Paths to CLI executables +# Parameters that control SSH connection to storage node +# TODO: we should use hosting instead +STORAGE_NODE_SSH_USER = os.getenv("STORAGE_NODE_SSH_USER") +STORAGE_NODE_SSH_PASSWORD = os.getenv("STORAGE_NODE_SSH_PASSWORD") +STORAGE_NODE_SSH_PRIVATE_KEY_PATH = os.getenv("STORAGE_NODE_SSH_PRIVATE_KEY_PATH") + +# Paths to CLI executables on machine that runs tests NEOGO_EXECUTABLE = os.getenv("NEOGO_EXECUTABLE", "neo-go") NEOFS_CLI_EXEC = os.getenv("NEOFS_CLI_EXEC", "neofs-cli") NEOFS_AUTHMATE_EXEC = os.getenv("NEOFS_AUTHMATE_EXEC", "neofs-authmate") @@ -109,21 +115,17 @@ S3_GATE_WALLET_PATH = os.getenv( ) S3_GATE_WALLET_PASS = os.getenv("S3_GATE_WALLET_PASS", "s3") -# Parameters that control SSH connection to storage node -STORAGE_NODE_SSH_USER = os.getenv("STORAGE_NODE_SSH_USER") -STORAGE_NODE_SSH_PASSWORD = os.getenv("STORAGE_NODE_SSH_PASSWORD") -STORAGE_NODE_SSH_PRIVATE_KEY_PATH = os.getenv("STORAGE_NODE_SSH_PRIVATE_KEY_PATH") - -# Path to directory with CLI binaries on storage node (currently we need only neofs-cli) -STORAGE_NODE_BIN_PATH = os.getenv("STORAGE_NODE_BIN_PATH", f"{DEVENV_PATH}/vendor") - # Config for neofs-adm utility. Optional if tests are running against devenv NEOFS_ADM_CONFIG_PATH = os.getenv("NEOFS_ADM_CONFIG_PATH") -INFRASTRUCTURE_TYPE = os.getenv("INFRASTRUCTURE_TYPE", "LOCAL_DEVENV") FREE_STORAGE = os.getenv("FREE_STORAGE", "false").lower() == "true" BIN_VERSIONS_FILE = os.getenv("BIN_VERSIONS_FILE") +# TODO: instead of specifying infrastructure type we should use attributes of hosts +INFRASTRUCTURE_TYPE = os.getenv("INFRASTRUCTURE_TYPE", "LOCAL_DEVENV") + +HOSTING_CONFIG_FILE = os.getenv("HOSTING_CONFIG_FILE", ".devenv.hosting.yaml") + # Generate wallet configs # TODO: we should move all info about wallet configs to fixtures WALLET_CONFIG = os.path.join(os.getcwd(), "wallet_config.yml")