Integrate with hosting from testlib

Replace service_helper with hosting class from the testlib.
Instead of invoking commands on remote via ssh_helper, we now use
shell from the hosting.

Signed-off-by: Vladimir Domnich <v.domnich@yadro.com>
develop
Vladimir Domnich 2022-10-09 20:01:59 +00:00 committed by Vladimir
parent 88da942b03
commit bfd02531ef
15 changed files with 324 additions and 749 deletions

View File

@ -0,0 +1,84 @@
hosts:
- address: localhost
plugin_name: docker
services:
- name: s01
attributes:
container_name: s01
config_path: ../neofs-dev-env/services/storage/.storage.env
wallet_path: ../neofs-dev-env/services/storage/wallet01.json
wallet_password: ""
volume_name: storage_storage_s01
rpc_endpoint: s01.neofs.devenv:8080
control_endpoint: s01.neofs.devenv:8081
un_locode: "RU MOW"
- name: s02
attributes:
container_name: s02
config_path: ../neofs-dev-env/services/storage/.storage.env
wallet_path: ../neofs-dev-env/services/storage/wallet02.json
wallet_password: ""
volume_name: storage_storage_s02
rpc_endpoint: s02.neofs.devenv:8080
control_endpoint: s02.neofs.devenv:8081
un_locode: "RU LED"
- name: s03
attributes:
container_name: s03
config_path: ../neofs-dev-env/services/storage/.storage.env
wallet_path: ../neofs-dev-env/services/storage/wallet03.json
wallet_password: ""
volume_name: storage_storage_s03
rpc_endpoint: s03.neofs.devenv:8080
control_endpoint: s03.neofs.devenv:8081
un_locode: "SE STO"
- name: s04
attributes:
container_name: s04
config_path: ../neofs-dev-env/services/storage/.storage.env
wallet_path: ../neofs-dev-env/services/storage/wallet04.json
wallet_password: ""
volume_name: storage_storage_s04
rpc_endpoint: s04.neofs.devenv:8080
control_endpoint: s04.neofs.devenv:8081
un_locode: "FI HEL"
- name: s3-gate01
attributes:
container_name: s3_gate
config_path: ../neofs-dev-env/services/s3_gate/.s3.env
wallet_path: ../neofs-dev-env/services/s3_gate/wallet.json
wallet_password: "s3"
endpoint: https://s3.neofs.devenv:8080
- name: http-gate01
attributes:
container_name: http_gate
config_path: ../neofs-dev-env/services/http_gate/.http.env
wallet_path: ../neofs-dev-env/services/http_gate/wallet.json
wallet_password: "one"
endpoint: http://http.neofs.devenv
- name: ir01
attributes:
container_name: ir01
config_path: ../neofs-dev-env/services/ir/.ir.env
wallet_path: ../neofs-dev-env/services/ir/az.json
wallet_password: "one"
- name: morph-chain01
attributes:
container_name: morph_chain
config_path: ../neofs-dev-env/services/morph_chain/protocol.privnet.yml
wallet_path: ../neofs-dev-env/services/morph_chain/node-wallet.json
wallet_password: "one"
endpoint: http://morph-chain.neofs.devenv:30333
- name: main-chain01
attributes:
container_name: main_chain
config_path: ../neofs-dev-env/services/chain/protocol.privnet.yml
wallet_path: ../neofs-dev-env/services/chain/node-wallet.json
wallet_password: "one"
endpoint: http://main-chain.neofs.devenv:30333
- name: coredns01
attributes:
container_name: coredns
clis:
- name: neofs-cli
exec_path: neofs-cli

View File

@ -0,0 +1,72 @@
import logging
import re
from common import NEOFS_ADM_EXEC, NEOFS_CLI_EXEC, WALLET_CONFIG
from neofs_testlib.cli import NeofsAdm, NeofsCli
from neofs_testlib.hosting import Hosting
from neofs_testlib.shell import Shell
logger = logging.getLogger("NeoLogger")
def get_local_binaries_versions(shell: Shell) -> dict[str, str]:
versions = {}
for binary in ["neo-go", "neofs-authmate"]:
out = shell.exec(f"{binary} --version").stdout
versions[binary] = _parse_version(out)
neofs_cli = NeofsCli(shell, NEOFS_CLI_EXEC, WALLET_CONFIG)
versions["neofs-cli"] = _parse_version(neofs_cli.version.get())
try:
neofs_adm = NeofsAdm(shell, NEOFS_ADM_EXEC)
versions["neofs-adm"] = _parse_version(neofs_adm.version.get())
except RuntimeError:
logger.info(f"neofs-adm not installed")
out = shell.exec("aws --version").stdout
out_lines = out.split("\n")
versions["AWS"] = out_lines[0] if out_lines else "Unknown"
return versions
def get_remote_binaries_versions(hosting: Hosting) -> dict[str, str]:
versions_by_host = {}
for host in hosting.hosts:
binaries = []
for service_config in host.config.services:
exec_path = service_config.attributes.get("exec_path")
if exec_path:
binaries.append(exec_path)
for cli_config in host.config.clis:
binaries.append(cli_config.exec_path)
shell = host.get_shell()
versions_at_host = {}
for binary in binaries:
try:
result = shell.exec(f"{binary} --version")
versions_at_host[service_config.name] = _parse_version(result.stdout)
except Exception as exc:
logger.error(f"Cannot get version for {exec_path} because of\n{exc}")
versions_at_host[service_config.name] = "Unknown"
continue
versions_by_host[host.config.address] = versions_at_host
# Consolidate versions across all hosts
versions = {}
for host, binary_versions in versions_by_host.items():
for name, version in binary_versions.items():
captured_version = versions.get(name)
if captured_version:
assert captured_version == version, f"Binary {name} has inconsistent version on host {host}"
else:
versions[name] = version
return versions
def _parse_version(version_output: str) -> str:
version = re.search(r"version[:\s]*v?(.+)", version_output, re.IGNORECASE)
return version.group(1).strip() if version else "Unknown"

View File

@ -1,6 +1,7 @@
from ssh_helper import HostClient
# TODO: convert to shell from hosting
class IpTablesHelper:
@staticmethod
def drop_input_traffic_to_port(client: HostClient, ports: list[str]):

View File

@ -1,237 +0,0 @@
import binascii
import hashlib
import hmac
import json
import os
from dataclasses import dataclass
from datetime import datetime
from typing import Optional
from urllib.parse import quote, unquote
import requests
import yaml
@dataclass
class SberCloudConfig:
access_key_id: Optional[str] = None
secret_key: Optional[str] = None
ecs_endpoint: Optional[str] = None
project_id: Optional[str] = None
@staticmethod
def from_dict(config_dict: dict) -> "SberCloudConfig":
return SberCloudConfig(**config_dict)
@staticmethod
def from_yaml(config_path: str) -> "SberCloudConfig":
with open(config_path) as file:
config_dict = yaml.load(file, Loader=yaml.FullLoader)
return SberCloudConfig.from_dict(config_dict["sbercloud"])
@staticmethod
def from_env() -> "SberCloudConfig":
config_dict = {
"access_key_id": os.getenv("SBERCLOUD_ACCESS_KEY_ID"),
"secret_key": os.getenv("SBERCLOUD_SECRET_KEY"),
"ecs_endpoint": os.getenv("SBERCLOUD_ECS_ENDPOINT"),
"project_id": os.getenv("SBERCLOUD_PROJECT_ID"),
}
return SberCloudConfig.from_dict(config_dict)
class SberCloudAuthRequests:
"""
Implements authentication mechanism with access key+secret key in accordance with:
https://support.hc.sbercloud.ru/devg/apisign/api-sign-algorithm.html
endpoint - represents endpoint of a specific service (listed at https://support.hc.sbercloud.ru/en-us/endpoint/index.html)
base_path - is prefix for all request path's that will be sent via this instance.
"""
ENCODING = "utf-8"
ALGORITHM = "SDK-HMAC-SHA256"
TIMESTAMP_FORMAT = "%Y%m%dT%H%M%SZ"
def __init__(
self, endpoint: str, access_key_id: str, secret_key: str, base_path: str = ""
) -> None:
self.endpoint = endpoint
self.base_path = base_path
self.access_key_id = access_key_id
self.secret_key = secret_key
def get(self, path: str, query: Optional[dict] = None) -> requests.Response:
return self._send_request("GET", path, query, data=None)
def post(
self, path: str, query: Optional[dict] = None, data: Optional[dict] = None
) -> requests.Response:
return self._send_request("POST", path, query, data)
def _send_request(
self, method: str, path: str, query: Optional[dict], data: Optional[dict]
) -> requests.Response:
if self.base_path:
path = self.base_path + path
timestamp = datetime.strftime(datetime.utcnow(), self.TIMESTAMP_FORMAT)
headers = self._build_original_headers(timestamp)
content = ""
if data:
# At the moment we support json content only
content = json.dumps(data)
headers["Content-Type"] = "application/json"
body = content.encode(self.ENCODING)
signed_headers = self._build_signed_headers(headers)
canonical_request = self._build_canonical_request(
method, path, query, body, headers, signed_headers
)
signature = self._build_signature(timestamp, canonical_request)
headers["Authorization"] = self._build_authorization_header(signature, signed_headers)
query_string = "?" + self._build_canonical_query_string(query) if query else ""
url = f"https://{self.endpoint}{path}{query_string}"
response = requests.request(method, url, headers=headers, data=body)
if response.status_code < 200 or response.status_code >= 300:
raise AssertionError(
f"Request to url={url} failed: status={response.status_code} "
f"response={response.text})"
)
return response
def _build_original_headers(self, timestamp: str) -> dict[str, str]:
return {
"X-Sdk-Date": timestamp,
"host": self.endpoint,
}
def _build_signed_headers(self, headers: dict[str, str]) -> list[str]:
return sorted(header_name.lower() for header_name in headers)
def _build_canonical_request(
self,
method: str,
path: str,
query: Optional[dict],
body: bytes,
headers: dict[str, str],
signed_headers: list[str],
) -> str:
canonical_headers = self._build_canonical_headers(headers, signed_headers)
body_hash = self._calc_sha256_hash(body)
canonical_url = self._build_canonical_url(path)
canonical_query_string = self._build_canonical_query_string(query)
return "\n".join(
[
method.upper(),
canonical_url,
canonical_query_string,
canonical_headers,
";".join(signed_headers),
body_hash,
]
)
def _build_canonical_headers(self, headers: dict[str, str], signed_headers: list[str]) -> str:
normalized_headers = {}
for key, value in headers.items():
normalized_key = key.lower()
normalized_value = value.strip()
normalized_headers[normalized_key] = normalized_value
# Re-encode header in request itself (iso-8859-1 comes from HTTP 1.1 standard)
headers[key] = normalized_value.encode(self.ENCODING).decode("iso-8859-1")
# Join headers in the same order as they are sorted in signed_headers list
joined_headers = "\n".join(f"{key}:{normalized_headers[key]}" for key in signed_headers)
return joined_headers + "\n"
def _calc_sha256_hash(self, value: bytes) -> str:
sha256 = hashlib.sha256()
sha256.update(value)
return sha256.hexdigest()
def _build_canonical_url(self, path: str) -> str:
path_parts = unquote(path).split("/")
canonical_url = "/".join(quote(path_part) for path_part in path_parts)
if not canonical_url.endswith("/"):
canonical_url += "/"
return canonical_url
def _build_canonical_query_string(self, query: Optional[dict]) -> str:
if not query:
return ""
key_value_pairs = []
for key in sorted(query.keys()):
# NOTE: we do not support list values, as they are not used in API at the moment
encoded_key = quote(key)
encoded_value = quote(str(query[key]))
key_value_pairs.append(f"{encoded_key}={encoded_value}")
return "&".join(key_value_pairs)
def _build_signature(self, timestamp: str, canonical_request: str) -> str:
canonical_request_hash = self._calc_sha256_hash(canonical_request.encode(self.ENCODING))
string_to_sign = f"{self.ALGORITHM}\n{timestamp}\n{canonical_request_hash}"
hmac_digest = hmac.new(
key=self.secret_key.encode(self.ENCODING),
msg=string_to_sign.encode(self.ENCODING),
digestmod=hashlib.sha256,
).digest()
signature = binascii.hexlify(hmac_digest).decode()
return signature
def _build_authorization_header(self, signature: str, signed_headers: list[str]) -> str:
joined_signed_headers = ";".join(signed_headers)
return f"{self.ALGORITHM} Access={self.access_key_id}, SignedHeaders={joined_signed_headers}, Signature={signature}"
class SberCloud:
"""
Manages resources in Sbercloud via API.
API reference:
https://docs.sbercloud.ru/terraform/ug/topics/quickstart.html
https://support.hc.sbercloud.ru/en-us/api/ecs/en-us_topic_0020212668.html
"""
def __init__(self, config: SberCloudConfig) -> None:
self.ecs_requests = SberCloudAuthRequests(
endpoint=config.ecs_endpoint,
base_path=f"/v1/{config.project_id}/cloudservers",
access_key_id=config.access_key_id,
secret_key=config.secret_key,
)
self.ecs_node_by_ip = {} # Cached list of ecs servers
def find_ecs_node_by_ip(self, ip: str) -> str:
if ip not in self.ecs_node_by_ip:
self.ecs_node_by_ip[ip] = self.get_ecs_node_id(ip)
assert ip in self.ecs_node_by_ip
return self.ecs_node_by_ip[ip]
def get_ecs_node_id(self, ip: str) -> str:
response = self.ecs_requests.get("/detail", {"ip": ip}).json()
return response["servers"][0]["id"]
def start_node(self, node_id: Optional[str] = None, node_ip: Optional[str] = None) -> None:
data = {"os-start": {"servers": [{"id": node_id or self.find_ecs_node_by_ip(node_ip)}]}}
self.ecs_requests.post("/action", data=data)
def stop_node(
self, node_id: Optional[str] = None, node_ip: Optional[str] = None, hard: bool = False
) -> None:
data = {
"os-stop": {
"type": "HARD" if hard else "SOFT",
"servers": [{"id": node_id or self.find_ecs_node_by_ip(node_ip)}],
}
}
self.ecs_requests.post("/action", data=data)

View File

@ -1,342 +0,0 @@
import json
import logging
import os
import re
import time
from contextlib import contextmanager
from datetime import datetime
from typing import Optional
import docker
from cli_helpers import _cmd_run
from common import (
INFRASTRUCTURE_TYPE,
NEOFS_CLI_EXEC,
NEOFS_NETMAP_DICT,
STORAGE_NODE_BIN_PATH,
STORAGE_NODE_SSH_PASSWORD,
STORAGE_NODE_SSH_PRIVATE_KEY_PATH,
STORAGE_NODE_SSH_USER,
WALLET_CONFIG,
)
from requests import HTTPError
from ssh_helper import HostClient
logger = logging.getLogger("NeoLogger")
class LocalDevEnvStorageServiceHelper:
"""
Manages storage services running on local devenv.
"""
# Names of all containers that are running neofs code
ALL_CONTAINERS = [
"s3_gate",
"http_gate",
"s03",
"s01",
"s02",
"s04",
"ir01",
"morph_chain",
"main_chain",
]
def stop_node(self, node_name: str, wait: bool = True) -> None:
container_name = _get_storage_container_name(node_name)
client = self._get_docker_client(node_name)
client.stop(container_name)
if wait:
self._wait_for_container_to_be_in_state(node_name, container_name, "exited")
def start_node(self, node_name: str, wait: bool = True) -> None:
container_name = _get_storage_container_name(node_name)
client = self._get_docker_client(node_name)
client.start(container_name)
if wait:
self._wait_for_container_to_be_in_state(node_name, container_name, "running")
def run_control_command(self, node_name: str, command: str) -> str:
control_endpoint = NEOFS_NETMAP_DICT[node_name]["control"]
wallet_path = NEOFS_NETMAP_DICT[node_name]["wallet_path"]
cmd = (
f"{NEOFS_CLI_EXEC} {command} --endpoint {control_endpoint} "
f"--wallet {wallet_path} --config {WALLET_CONFIG}"
)
output = _cmd_run(cmd)
return output
def delete_node_data(self, node_name: str) -> None:
volume_name = _get_storage_volume_name(node_name)
client = self._get_docker_client(node_name)
volume_info = client.inspect_volume(volume_name)
volume_path = volume_info["Mountpoint"]
_cmd_run(f"rm -rf {volume_path}/*")
def get_binaries_version(self) -> dict:
return {}
def dump_logs(
self, directory_path: str, since: Optional[datetime], until: Optional[datetime]
) -> None:
# All containers are running on the same host, so we can use 1st node to collect all logs
first_node_name = next(iter(NEOFS_NETMAP_DICT))
client = self._get_docker_client(first_node_name)
for container_name in self.ALL_CONTAINERS:
try:
logs = client.logs(container_name, since=since, until=until)
except HTTPError as exc:
logger.info(f"Got exception while dumping container '{container_name}' logs: {exc}")
continue
# Dump logs to the directory
file_path = os.path.join(directory_path, f"{container_name}-log.txt")
with open(file_path, "wb") as file:
file.write(logs)
def _get_container_by_name(self, node_name: str, container_name: str) -> dict:
client = self._get_docker_client(node_name)
containers = client.containers(all=True)
logger.info(f"Current containers state\n:{json.dumps(containers, indent=2)}")
for container in containers:
# Names in local docker environment are prefixed with /
clean_names = set(name.strip("/") for name in container["Names"])
if container_name in clean_names:
return container
return None
def _wait_for_container_to_be_in_state(
self, node_name: str, container_name: str, expected_state: str
) -> None:
for __attempt in range(10):
container = self._get_container_by_name(node_name, container_name)
logger.info(f"Container info:\n{json.dumps(container, indent=2)}")
if container and container["State"] == expected_state:
return
time.sleep(5)
raise AssertionError(f"Container {container_name} is not in {expected_state} state.")
def _get_docker_client(self, node_name: str) -> docker.APIClient:
# For local docker we use default docker client that talks to unix socket
client = docker.APIClient()
return client
class CloudVmStorageServiceHelper:
STORAGE_SERVICE = "neofs-storage.service"
def stop_node(self, node_name: str, wait: bool = True) -> None:
with _create_ssh_client(node_name) as ssh_client:
cmd = f"sudo systemctl stop {self.STORAGE_SERVICE}"
output = ssh_client.exec_with_confirmation(cmd, [""])
logger.info(f"Stop command output: {output.stdout}")
if wait:
self._wait_for_service_to_be_in_state(node_name, self.STORAGE_SERVICE, "inactive")
def start_node(self, node_name: str, wait: bool = True) -> None:
with _create_ssh_client(node_name) as ssh_client:
cmd = f"sudo systemctl start {self.STORAGE_SERVICE}"
output = ssh_client.exec_with_confirmation(cmd, [""])
logger.info(f"Start command output: {output.stdout}")
if wait:
self._wait_for_service_to_be_in_state(
node_name, self.STORAGE_SERVICE, "active (running)"
)
def run_control_command(self, node_name: str, command: str) -> str:
control_endpoint = NEOFS_NETMAP_DICT[node_name]["control"]
wallet_path = NEOFS_NETMAP_DICT[node_name]["wallet_path"]
# Private control endpoint is accessible only from the host where storage node is running
# So, we connect to storage node host and run CLI command from there
with _create_ssh_client(node_name) as ssh_client:
# Copy wallet content on storage node host
with open(wallet_path, "r") as file:
wallet = file.read()
remote_wallet_path = f"/tmp/{node_name}-wallet.json"
ssh_client.exec_with_confirmation(f"echo '{wallet}' > {remote_wallet_path}", [""])
# Put config on storage node host
remote_config_path = f"/tmp/{node_name}-config.yaml"
remote_config = 'password: ""'
ssh_client.exec_with_confirmation(
f"echo '{remote_config}' > {remote_config_path}", [""]
)
# Execute command
cmd = (
f"sudo {STORAGE_NODE_BIN_PATH}/neofs-cli {command} --endpoint {control_endpoint} "
f"--wallet {remote_wallet_path} --config {remote_config_path}"
)
output = ssh_client.exec_with_confirmation(cmd, [""])
return output.stdout
def _wait_for_service_to_be_in_state(
self, node_name: str, service_name: str, expected_state: str
) -> None:
with _create_ssh_client(node_name) as ssh_client:
for __attempt in range(10):
# Run command to get service status (set --lines=0 to suppress logs output)
# Also we don't verify return code, because for an inactive service return code will be 3
command = f"sudo systemctl status {service_name} --lines=0"
output = ssh_client.exec(command, verify=False)
if expected_state in output.stdout:
return
time.sleep(3)
raise AssertionError(f"Service {service_name} is not in {expected_state} state")
def delete_node_data(self, node_name: str) -> None:
with _create_ssh_client(node_name) as ssh_client:
ssh_client.exec("sudo rm -rf /srv/neofs/*")
def get_binaries_version(self, binaries: list = None) -> dict:
default_binaries = [
"neo-go",
"neofs-adm",
"neofs-cli",
"neofs-http-gw",
"neofs-ir",
"neofs-lens",
"neofs-node",
"neofs-s3-authmate",
"neofs-s3-gw",
"neogo-morph-cn",
]
binaries = binaries or default_binaries
version_map = {}
for node_name in NEOFS_NETMAP_DICT:
with _create_ssh_client(node_name) as ssh_client:
for binary in binaries:
try:
out = ssh_client.exec(f"sudo {binary} --version").stdout
except AssertionError as err:
logger.error(f"Can not get version for {binary} because of\n{err}")
version_map[binary] = "Can not get version"
continue
version = re.search(r"version[:\s]*v?(.+)", out, re.IGNORECASE)
version = version.group(1).strip() if version else "Unknown"
if not version_map.get(binary):
version_map[binary] = version
else:
assert version_map[binary] == version, (
f"Expected binary {binary} to have identical version on all nodes "
f"(mismatch on node {node_name})"
)
return version_map
def dump_logs(
self, directory_path: str, since: Optional[datetime], until: Optional[datetime]
) -> None:
for node_name, node_info in NEOFS_NETMAP_DICT.items():
with _create_ssh_client(node_name) as ssh_client:
# We do not filter out logs of neofs services, because system logs might contain
# information that is useful for troubleshooting
filters = " ".join(
[
f"--since '{since:%Y-%m-%d %H:%M:%S}'" if since else "",
f"--until '{until:%Y-%m-%d %H:%M:%S}'" if until else "",
]
)
result = ssh_client.exec(f"journalctl --no-pager {filters}")
logs = result.stdout
# Dump logs to the directory. We include node endpoint in file name, because almost
# everywhere in Allure report we are logging endpoints rather than node names
file_path = os.path.join(directory_path, f"{node_name}-{node_info['rpc']}-log.txt")
with open(file_path, "w") as file:
file.write(logs)
class RemoteDevEnvStorageServiceHelper(LocalDevEnvStorageServiceHelper):
"""
Manages storage services running on remote devenv.
Most of operations are identical to local devenv, however, any interactions
with host resources (files, etc.) require ssh into the remote host machine.
"""
def _get_docker_client(self, node_name: str) -> docker.APIClient:
# For remote devenv we use docker client that talks to tcp socket 2375:
# https://docs.docker.com/engine/reference/commandline/dockerd/#daemon-socket-option
host = _get_node_host(node_name)
client = docker.APIClient(base_url=f"tcp://{host}:2375")
return client
def delete_node_data(self, node_name: str) -> None:
volume_name = _get_storage_volume_name(node_name)
client = self._get_docker_client(node_name)
volume_info = client.inspect_volume(volume_name)
volume_path = volume_info["Mountpoint"]
# SSH into remote machine and delete files in host directory that is mounted as docker volume
with _create_ssh_client(node_name) as ssh_client:
# TODO: add sudo prefix after we change a user
ssh_client.exec(f"rm -rf {volume_path}/*")
def get_storage_service_helper():
if INFRASTRUCTURE_TYPE == "LOCAL_DEVENV":
return LocalDevEnvStorageServiceHelper()
if INFRASTRUCTURE_TYPE == "REMOTE_DEVENV":
return RemoteDevEnvStorageServiceHelper()
if INFRASTRUCTURE_TYPE == "CLOUD_VM":
return CloudVmStorageServiceHelper()
raise EnvironmentError(f"Infrastructure type is not supported: {INFRASTRUCTURE_TYPE}")
@contextmanager
def _create_ssh_client(node_name: str) -> HostClient:
host = _get_node_host(node_name)
ssh_client = HostClient(
host,
login=STORAGE_NODE_SSH_USER,
password=STORAGE_NODE_SSH_PASSWORD,
private_key_path=STORAGE_NODE_SSH_PRIVATE_KEY_PATH,
)
try:
yield ssh_client
finally:
ssh_client.drop()
def _get_node_host(node_name: str) -> str:
if node_name not in NEOFS_NETMAP_DICT:
raise AssertionError(f"Node {node_name} is not found!")
# We use rpc endpoint to determine host address, because control endpoint
# (if it is private) will be a local address on the host machine
node_config = NEOFS_NETMAP_DICT.get(node_name)
host = node_config.get("rpc").split(":")[0]
return host
def _get_storage_container_name(node_name: str) -> str:
"""
Converts name of storage node (as it is listed in netmap) into the name of docker container
that runs instance of this storage node.
"""
return node_name.split(".")[0]
def _get_storage_volume_name(node_name: str) -> str:
"""
Converts name of storage node (as it is listed in netmap) into the name of docker volume
that contains data of this storage node.
"""
container_name = _get_storage_container_name(node_name)
return f"storage_storage_{container_name}"

View File

@ -35,7 +35,7 @@ neo-mamba==0.10.0
neo3crypto==0.2.1
neo3vm==0.9.0
neo3vm-stubs==0.9.0
neofs-testlib==0.1.0
neofs-testlib==0.2.0
netaddr==0.8.0
orjson==3.6.8
packaging==21.3

View File

@ -2,6 +2,7 @@ import allure
import pytest
from common import NEOFS_NETMAP_DICT
from failover_utils import wait_object_replication_on_nodes
from neofs_testlib.hosting import Hosting
from python_keywords.acl import (
EACLAccess,
EACLOperation,
@ -194,7 +195,7 @@ class TestEACLContainer:
@allure.title("Testcase to validate NeoFS replication with eACL deny rules.")
def test_extended_acl_deny_replication(
self, wallets, client_shell, eacl_full_placement_container_with_object, file_path
self, wallets, client_shell, hosting: Hosting, eacl_full_placement_container_with_object, file_path
):
user_wallet = wallets.get_wallet()
cid, oid, file_path = eacl_full_placement_container_with_object
@ -217,7 +218,7 @@ class TestEACLContainer:
wait_for_cache_expired()
with allure.step("Drop object to check replication"):
drop_object(node_name=[*NEOFS_NETMAP_DICT][0], cid=cid, oid=oid)
drop_object(hosting, node_name=[*NEOFS_NETMAP_DICT][0], cid=cid, oid=oid)
storage_wallet_path = NEOFS_NETMAP_DICT[[*NEOFS_NETMAP_DICT][0]]["wallet_path"]
with allure.step("Wait for dropped object replicated"):

View File

@ -1,30 +1,26 @@
import logging
import os
import re
import shutil
from datetime import datetime
import allure
import pytest
import wallet
from cli_helpers import _cmd_run
import yaml
from binary_version_helper import get_local_binaries_versions, get_remote_binaries_versions
from common import (
ASSETS_DIR,
FREE_STORAGE,
HOSTING_CONFIG_FILE,
INFRASTRUCTURE_TYPE,
MAINNET_WALLET_PATH,
NEOFS_ADM_EXEC,
NEOFS_CLI_EXEC,
NEOFS_NETMAP_DICT,
WALLET_CONFIG,
)
from neofs_testlib.cli import NeofsAdm, NeofsCli
from env_properties import save_env_properties
from neofs_testlib.hosting import Hosting
from neofs_testlib.shell import LocalShell, Shell
from payment_neogo import neofs_deposit, transfer_mainnet_gas
from python_keywords.node_management import node_healthcheck
from service_helper import get_storage_service_helper
logger = logging.getLogger("NeoLogger")
@ -41,41 +37,24 @@ def cloud_infrastructure_check():
yield
@pytest.fixture(scope="session")
def hosting() -> Hosting:
with open(HOSTING_CONFIG_FILE, "r") as file:
hosting_config = yaml.full_load(file)
hosting_instance = Hosting()
hosting_instance.configure(hosting_config)
yield hosting_instance
@pytest.fixture(scope="session", autouse=True)
@allure.title("Check binary versions")
def check_binary_versions(request, client_shell):
# Collect versions of local binaries
binaries = ["neo-go", "neofs-authmate"]
local_binaries = _get_binaries_version_local(binaries)
def check_binary_versions(request, hosting: Hosting, client_shell: Shell):
local_versions = get_local_binaries_versions(client_shell)
remote_versions = get_remote_binaries_versions(hosting)
try:
local_binaries["neofs-adm"] = NeofsAdm(client_shell, NEOFS_ADM_EXEC).version.get()
except RuntimeError:
logger.info(f"neofs-adm not installed")
local_binaries["neofs-cli"] = NeofsCli(
client_shell, NEOFS_CLI_EXEC, WALLET_CONFIG
).version.get()
# Collect versions of remote binaries
helper = get_storage_service_helper()
remote_binaries = helper.get_binaries_version()
all_binaries = {**local_binaries, **remote_binaries}
# Get version of aws binary
out = _cmd_run("aws --version")
out_lines = out.split("\n")
all_binaries["AWS"] = out_lines[0] if out_lines else "Unknown"
save_env_properties(request.config, all_binaries)
def _get_binaries_version_local(binaries: list) -> dict:
env_out = {}
for binary in binaries:
out = _cmd_run(f"{binary} --version")
version = re.search(r"version[:\s]*(.+)", out, re.IGNORECASE)
env_out[binary] = version.group(1).strip() if version else "Unknown"
return env_out
all_versions = {**local_versions, **remote_versions}
save_env_properties(request.config, all_versions)
@pytest.fixture(scope="session")
@ -90,7 +69,7 @@ def prepare_tmp_dir():
@pytest.fixture(scope="session", autouse=True)
@allure.title("Collect logs")
def collect_logs(prepare_tmp_dir):
def collect_logs(prepare_tmp_dir, hosting: Hosting):
start_time = datetime.utcnow()
yield
end_time = datetime.utcnow()
@ -99,8 +78,8 @@ def collect_logs(prepare_tmp_dir):
logs_dir = os.path.join(prepare_tmp_dir, "logs")
os.makedirs(logs_dir)
helper = get_storage_service_helper()
helper.dump_logs(logs_dir, since=start_time, until=end_time)
for host in hosting.hosts:
host.dump_logs(logs_dir, since=start_time, until=end_time)
# Zip all files and attach to Allure because it is more convenient to download a single
# zip with all logs rather than mess with individual logs files per service or node
@ -110,10 +89,10 @@ def collect_logs(prepare_tmp_dir):
@pytest.fixture(scope="session", autouse=True)
@allure.title("Run health check for all storage nodes")
def run_health_check(collect_logs):
def run_health_check(collect_logs, hosting: Hosting):
failed_nodes = []
for node_name in NEOFS_NETMAP_DICT.keys():
health_check = node_healthcheck(node_name)
health_check = node_healthcheck(hosting, node_name)
if health_check.health_status != "READY" or health_check.network_status != "ONLINE":
failed_nodes.append(node_name)

View File

@ -12,6 +12,7 @@ from common import (
from failover_utils import wait_all_storage_node_returned, wait_object_replication_on_nodes
from file_helper import generate_file, get_file_hash
from iptables_helper import IpTablesHelper
from neofs_testlib.hosting import Hosting
from python_keywords.container import create_container
from python_keywords.neofs_verbs import get_object, put_object
from ssh_helper import HostClient
@ -26,7 +27,7 @@ blocked_hosts = []
@pytest.fixture(autouse=True)
@allure.step("Restore network")
def restore_network():
def restore_network(hosting: Hosting):
yield
not_empty = len(blocked_hosts) != 0
@ -42,7 +43,7 @@ def restore_network():
IpTablesHelper.restore_input_traffic_to_port(client, PORTS_TO_BLOCK)
blocked_hosts.remove(host)
if not_empty:
wait_all_storage_node_returned()
wait_all_storage_node_returned(hosting)
@allure.title("Block Storage node traffic")

View File

@ -9,9 +9,9 @@ from common import (
)
from failover_utils import wait_all_storage_node_returned, wait_object_replication_on_nodes
from file_helper import generate_file, get_file_hash
from neofs_testlib.hosting import Hosting
from python_keywords.container import create_container
from python_keywords.neofs_verbs import get_object, put_object
from sbercloud_helper import SberCloud, SberCloudConfig
from ssh_helper import HostClient
from wellknown_acl import PUBLIC_ACL
@ -19,21 +19,11 @@ logger = logging.getLogger("NeoLogger")
stopped_hosts = []
@pytest.fixture(scope="session")
def sbercloud_client():
with allure.step("Connect to SberCloud"):
try:
config = SberCloudConfig.from_env()
yield SberCloud(config)
except Exception as err:
pytest.fail(f"SberCloud infrastructure not available. Error\n{err}")
@pytest.fixture(autouse=True)
@allure.step("Return all storage nodes")
def return_all_storage_nodes_fixture(sbercloud_client):
def return_all_storage_nodes_fixture(hosting: Hosting):
yield
return_all_storage_nodes(sbercloud_client)
return_all_storage_nodes(hosting)
def panic_reboot_host(ip: str = None):
@ -50,13 +40,14 @@ def panic_reboot_host(ip: str = None):
ssh_stdin.close()
def return_all_storage_nodes(sbercloud_client: SberCloud) -> None:
for host in list(stopped_hosts):
with allure.step(f"Start storage node {host}"):
sbercloud_client.start_node(node_ip=host.split(":")[-2])
stopped_hosts.remove(host)
def return_all_storage_nodes(hosting: Hosting) -> None:
for host_address in list(stopped_hosts):
with allure.step(f"Start host {host_address}"):
host = hosting.get_host_by_address(host_address)
host.start_host()
stopped_hosts.remove(host_address)
wait_all_storage_node_returned()
wait_all_storage_node_returned(hosting)
@allure.title("Lost and returned nodes")
@ -65,8 +56,7 @@ def return_all_storage_nodes(sbercloud_client: SberCloud) -> None:
def test_lost_storage_node(
prepare_wallet_and_deposit,
client_shell,
sbercloud_client: SberCloud,
cloud_infrastructure_check,
hosting: Hosting,
hard_reboot: bool,
):
wallet = prepare_wallet_and_deposit
@ -78,19 +68,18 @@ def test_lost_storage_node(
new_nodes = []
for node in nodes:
stopped_hosts.append(node)
host = hosting.get_host_by_service(node)
stopped_hosts.append(host.config.address)
with allure.step(f"Stop storage node {node}"):
sbercloud_client.stop_node(node_ip=node.split(":")[-2], hard=hard_reboot)
new_nodes = wait_object_replication_on_nodes(
wallet, cid, oid, 2, shell=client_shell, excluded_nodes=[node]
)
host.stop_host("hard" if hard_reboot else "soft")
new_nodes = wait_object_replication_on_nodes(wallet, cid, oid, 2, shell=client_shell, excluded_nodes=[node])
assert not [node for node in nodes if node in new_nodes]
got_file_path = get_object(wallet, cid, oid, shell=client_shell, endpoint=new_nodes[0])
assert get_file_hash(source_file_path) == get_file_hash(got_file_path)
with allure.step(f"Return storage nodes"):
return_all_storage_nodes(sbercloud_client)
return_all_storage_nodes(hosting)
new_nodes = wait_object_replication_on_nodes(wallet, cid, oid, 2, shell=client_shell)

View File

@ -1,6 +1,7 @@
import logging
from random import choice
from time import sleep
from typing import Optional
import allure
import pytest
@ -17,6 +18,7 @@ from data_formatters import get_wallet_public_key
from epoch import tick_epoch
from file_helper import generate_file
from grpc_responses import OBJECT_NOT_FOUND, error_matches_status
from neofs_testlib.hosting import Hosting
from python_keywords.container import create_container, get_container
from python_keywords.failover_utils import wait_object_replication_on_nodes
from python_keywords.neofs_verbs import delete_object, get_object, head_object, put_object
@ -35,7 +37,6 @@ from python_keywords.node_management import (
start_nodes,
stop_nodes,
)
from service_helper import get_storage_service_helper
from storage_policy import get_nodes_with_object, get_simple_object_copies
from utility import parse_time, placement_policy_from_container, wait_for_gc_pass_on_storage_nodes
from wellknown_acl import PUBLIC_ACL
@ -46,7 +47,7 @@ check_nodes = []
@pytest.fixture
@allure.title("Create container and pick the node with data")
def create_container_and_pick_node(prepare_wallet_and_deposit, client_shell):
def create_container_and_pick_node(prepare_wallet_and_deposit, client_shell, hosting: Hosting):
wallet = prepare_wallet_and_deposit
file_path = generate_file()
placement_rule = "REP 1 IN X CBF 1 SELECT 1 FROM * AS X"
@ -64,45 +65,45 @@ def create_container_and_pick_node(prepare_wallet_and_deposit, client_shell):
yield cid, node_name
shards = node_shard_list(node_name)
shards = node_shard_list(hosting, node_name)
assert shards
for shard in shards:
node_shard_set_mode(node_name, shard, "read-write")
node_shard_set_mode(hosting, node_name, shard, "read-write")
node_shard_list(node_name)
node_shard_list(hosting, node_name)
@pytest.fixture
def after_run_start_all_nodes():
def after_run_start_all_nodes(hosting: Hosting):
yield
try:
start_nodes(list(NEOFS_NETMAP_DICT.keys()))
start_nodes(hosting, list(NEOFS_NETMAP_DICT.keys()))
except Exception as err:
logger.error(f"Node start fails with error:\n{err}")
@pytest.fixture
def return_nodes_after_test_run(client_shell):
def return_nodes_after_test_run(client_shell: Shell, hosting: Hosting):
yield
return_nodes(shell=client_shell)
return_nodes(client_shell, hosting)
@allure.step("Return node to cluster")
def return_nodes(shell: Shell, alive_node: str = None):
helper = get_storage_service_helper()
def return_nodes(shell: Shell, hosting: Hosting, alive_node: Optional[str] = None) -> None:
for node in list(check_nodes):
with allure.step(f"Start node {node}"):
helper.start_node(node)
host = hosting.get_host_by_service(node)
host.start_service(node)
with allure.step(f"Waiting status ready for node {node}"):
wait_for_node_to_be_ready(node)
wait_for_node_to_be_ready(hosting, node)
# We need to wait for node to establish notifications from morph-chain
# Otherwise it will hang up when we will try to set status
sleep(parse_time(MORPH_BLOCK_TIME))
with allure.step(f"Move node {node} to online state"):
node_set_status(node, status="online", retries=2)
node_set_status(hosting, node, status="online", retries=2)
check_nodes.remove(node)
sleep(parse_time(MORPH_BLOCK_TIME))
@ -120,7 +121,7 @@ def return_nodes(shell: Shell, alive_node: str = None):
@pytest.mark.add_nodes
@pytest.mark.node_mgmt
def test_add_nodes(
prepare_tmp_dir, client_shell, prepare_wallet_and_deposit, return_nodes_after_test_run
prepare_tmp_dir, client_shell, prepare_wallet_and_deposit, return_nodes_after_test_run, hosting: Hosting
):
wallet = prepare_wallet_and_deposit
placement_rule_3 = "REP 3 IN X CBF 1 SELECT 3 FROM * AS X"
@ -140,8 +141,8 @@ def test_add_nodes(
# Add node to recovery list before messing with it
check_nodes.append(additional_node)
exclude_node_from_network_map(additional_node, alive_node)
delete_node_data(additional_node)
exclude_node_from_network_map(hosting, additional_node, alive_node)
delete_node_data(hosting, additional_node)
cid = create_container(wallet, rule=placement_rule_3, basic_acl=PUBLIC_ACL)
oid = put_object(
@ -149,16 +150,16 @@ def test_add_nodes(
)
wait_object_replication_on_nodes(wallet, cid, oid, 3)
return_nodes(shell=client_shell, alive_node=alive_node)
return_nodes(shell=client_shell, hosting=hosting, alive_node=alive_node)
with allure.step("Check data could be replicated to new node"):
random_node = choice(
[node for node in NEOFS_NETMAP_DICT if node not in (additional_node, alive_node)]
)
exclude_node_from_network_map(random_node, alive_node)
exclude_node_from_network_map(hosting, random_node, alive_node)
wait_object_replication_on_nodes(wallet, cid, oid, 3, excluded_nodes=[random_node])
include_node_to_network_map(random_node, alive_node, shell=client_shell)
include_node_to_network_map(hosting, random_node, alive_node, shell=client_shell)
wait_object_replication_on_nodes(wallet, cid, oid, 3)
with allure.step("Check container could be created with new node"):
@ -171,7 +172,7 @@ def test_add_nodes(
@allure.title("Control Operations with storage nodes")
@pytest.mark.node_mgmt
def test_nodes_management(prepare_tmp_dir, client_shell):
def test_nodes_management(prepare_tmp_dir, client_shell, hosting: Hosting):
"""
This test checks base control operations with storage nodes (healthcheck, netmap-snapshot, set-status).
"""
@ -188,29 +189,29 @@ def test_nodes_management(prepare_tmp_dir, client_shell):
with allure.step("Run health check for all storage nodes"):
for node_name in NEOFS_NETMAP_DICT.keys():
health_check = node_healthcheck(node_name)
health_check = node_healthcheck(hosting, node_name)
assert health_check.health_status == "READY" and health_check.network_status == "ONLINE"
with allure.step(f"Move node {random_node} to offline state"):
node_set_status(random_node, status="offline")
node_set_status(hosting, random_node, status="offline")
sleep(parse_time(MORPH_BLOCK_TIME))
tick_epoch()
with allure.step(f"Check node {random_node} went to offline"):
health_check = node_healthcheck(random_node)
health_check = node_healthcheck(hosting, random_node)
assert health_check.health_status == "READY" and health_check.network_status == "OFFLINE"
snapshot = get_netmap_snapshot(node_name=alive_node, shell=client_shell)
assert random_node_netmap_key not in snapshot, f"Expected node {random_node} not in netmap"
with allure.step(f"Check node {random_node} went to online"):
node_set_status(random_node, status="online")
node_set_status(hosting, random_node, status="online")
sleep(parse_time(MORPH_BLOCK_TIME))
tick_epoch()
with allure.step(f"Check node {random_node} went to online"):
health_check = node_healthcheck(random_node)
health_check = node_healthcheck(hosting, random_node)
assert health_check.health_status == "READY" and health_check.network_status == "ONLINE"
snapshot = get_netmap_snapshot(node_name=alive_node, shell=client_shell)
assert random_node_netmap_key in snapshot, f"Expected node {random_node} in netmap"
@ -325,7 +326,7 @@ def test_placement_policy_negative(prepare_wallet_and_deposit, placement_rule, e
@pytest.mark.skip(reason="We cover this scenario for Sbercloud in failover tests")
@pytest.mark.node_mgmt
@allure.title("NeoFS object replication on node failover")
def test_replication(prepare_wallet_and_deposit, after_run_start_all_nodes):
def test_replication(prepare_wallet_and_deposit, after_run_start_all_nodes, hosting: Hosting):
"""
Test checks object replication on storage not failover and come back.
"""
@ -342,22 +343,22 @@ def test_replication(prepare_wallet_and_deposit, after_run_start_all_nodes):
), f"Expected {expected_nodes_count} copies, got {len(nodes)}"
node_names = [name for name, config in NEOFS_NETMAP_DICT.items() if config.get("rpc") in nodes]
stopped_nodes = stop_nodes(1, node_names)
stopped_nodes = stop_nodes(hosting, 1, node_names)
wait_for_expected_object_copies(wallet, cid, oid)
start_nodes(stopped_nodes)
start_nodes(hosting, stopped_nodes)
tick_epoch()
for node_name in node_names:
wait_for_node_go_online(node_name)
wait_for_node_go_online(hosting, node_name)
wait_for_expected_object_copies(wallet, cid, oid)
@pytest.mark.node_mgmt
@allure.title("NeoFS object could be dropped using control command")
def test_drop_object(prepare_wallet_and_deposit):
def test_drop_object(prepare_wallet_and_deposit, hosting: Hosting):
"""
Test checks object could be dropped using `neofs-cli control drop-objects` command.
"""
@ -383,7 +384,7 @@ def test_drop_object(prepare_wallet_and_deposit):
with allure.step(f"Drop object {oid}"):
get_object(wallet, cid, oid)
head_object(wallet, cid, oid)
drop_object(node_name, cid, oid)
drop_object(hosting, node_name, cid, oid)
wait_for_obj_dropped(wallet, cid, oid, get_object)
wait_for_obj_dropped(wallet, cid, oid, head_object)
@ -391,7 +392,7 @@ def test_drop_object(prepare_wallet_and_deposit):
@pytest.mark.node_mgmt
@pytest.mark.skip(reason="Need to clarify scenario")
@allure.title("Control Operations with storage nodes")
def test_shards(prepare_wallet_and_deposit, create_container_and_pick_node):
def test_shards(prepare_wallet_and_deposit, create_container_and_pick_node, hosting: Hosting):
wallet = prepare_wallet_and_deposit
file_path = generate_file()
@ -400,13 +401,13 @@ def test_shards(prepare_wallet_and_deposit, create_container_and_pick_node):
# for mode in ('read-only', 'degraded'):
for mode in ("degraded",):
shards = node_shard_list(node_name)
shards = node_shard_list(hosting, node_name)
assert shards
for shard in shards:
node_shard_set_mode(node_name, shard, mode)
node_shard_set_mode(hosting, node_name, shard, mode)
shards = node_shard_list(node_name)
shards = node_shard_list(hosting, node_name)
assert shards
with pytest.raises(RuntimeError):
@ -419,9 +420,9 @@ def test_shards(prepare_wallet_and_deposit, create_container_and_pick_node):
get_object(wallet, cid, original_oid)
for shard in shards:
node_shard_set_mode(node_name, shard, "read-write")
node_shard_set_mode(hosting, node_name, shard, "read-write")
shards = node_shard_list(node_name)
shards = node_shard_list(hosting, node_name)
assert shards
oid = put_object(wallet, file_path, cid)
@ -442,11 +443,11 @@ def validate_object_copies(wallet: str, placement_rule: str, file_path: str, exp
@allure.step("Wait for node {node_name} goes online")
def wait_for_node_go_online(node_name: str) -> None:
def wait_for_node_go_online(hosting: Hosting, node_name: str) -> None:
timeout, attempts = 5, 20
for _ in range(attempts):
try:
health_check = node_healthcheck(node_name)
health_check = node_healthcheck(hosting, node_name)
assert health_check.health_status == "READY" and health_check.network_status == "ONLINE"
return
except Exception as err:
@ -458,11 +459,11 @@ def wait_for_node_go_online(node_name: str) -> None:
@allure.step("Wait for node {node_name} is ready")
def wait_for_node_to_be_ready(node_name: str) -> None:
def wait_for_node_to_be_ready(hosting: Hosting, node_name: str) -> None:
timeout, attempts = 30, 6
for _ in range(attempts):
try:
health_check = node_healthcheck(node_name)
health_check = node_healthcheck(hosting, node_name)
if health_check.health_status == "READY":
return
except Exception as err:

View File

@ -5,9 +5,10 @@ from re import match
import allure
import pytest
import requests
from binary_version_helper import get_remote_binaries_versions
from common import BIN_VERSIONS_FILE
from env_properties import read_env_properties, save_env_properties
from service_helper import get_storage_service_helper
from neofs_testlib.hosting import Hosting
logger = logging.getLogger("NeoLogger")
@ -15,7 +16,7 @@ logger = logging.getLogger("NeoLogger")
@allure.title("Check binaries versions")
@pytest.mark.check_binaries
@pytest.mark.skip("Skipped due to https://j.yadro.com/browse/OBJECT-628")
def test_binaries_versions(request):
def test_binaries_versions(request, hosting: Hosting):
"""
Compare binaries versions from external source (url) and deployed on servers.
"""
@ -24,8 +25,7 @@ def test_binaries_versions(request):
binaries_to_check = download_versions_info(BIN_VERSIONS_FILE)
with allure.step("Get binaries versions from servers"):
helper = get_storage_service_helper()
got_versions = helper.get_binaries_version(binaries=list(binaries_to_check.keys()))
got_versions = get_remote_binaries_versions(hosting)
env_properties = read_env_properties(request.config)

View File

@ -5,6 +5,7 @@ from typing import Optional
import allure
from common import NEOFS_NETMAP_DICT
from neofs_testlib.shell import Shell
from neofs_testlib.hosting import Hosting
from python_keywords.node_management import node_healthcheck
from storage_policy import get_nodes_with_object
@ -35,20 +36,20 @@ def wait_object_replication_on_nodes(
@allure.step("Wait for storage node returned to cluster")
def wait_all_storage_node_returned():
def wait_all_storage_node_returned(hosting: Hosting) -> None:
sleep_interval, attempts = 15, 20
for __attempt in range(attempts):
if is_all_storage_node_returned():
if is_all_storage_node_returned(hosting):
return
sleep(sleep_interval)
raise AssertionError("Storage node(s) is broken")
def is_all_storage_node_returned() -> bool:
def is_all_storage_node_returned(hosting: Hosting) -> bool:
with allure.step("Run health check for all storage nodes"):
for node_name in NEOFS_NETMAP_DICT.keys():
try:
health_check = node_healthcheck(node_name)
health_check = node_healthcheck(hosting, node_name)
except Exception as err:
logger.warning(f"Node healthcheck fails with error {err}")
return False

View File

@ -1,5 +1,3 @@
#!/usr/bin/python3.9
import logging
import random
import re
@ -19,7 +17,7 @@ from data_formatters import get_wallet_public_key
from epoch import tick_epoch
from neofs_testlib.cli import NeofsCli
from neofs_testlib.shell import Shell
from service_helper import get_storage_service_helper
from neofs_testlib.hosting import Hosting
from utility import parse_time
logger = logging.getLogger("NeoLogger")
@ -42,7 +40,7 @@ class HealthStatus:
@allure.step("Stop storage nodes")
def stop_nodes(number: int, nodes: list[str]) -> list[str]:
def stop_nodes(hosting: Hosting, number: int, nodes: list[str]) -> list[str]:
"""
Shuts down the given number of randomly selected storage nodes.
Args:
@ -51,23 +49,23 @@ def stop_nodes(number: int, nodes: list[str]) -> list[str]:
Returns:
(list): the list of nodes that were shut down
"""
helper = get_storage_service_helper()
nodes_to_stop = random.sample(nodes, number)
for node in nodes_to_stop:
helper.stop_node(node)
host = hosting.get_host_by_service(node)
host.stop_service(node)
return nodes_to_stop
@allure.step("Start storage nodes")
def start_nodes(nodes: list[str]) -> None:
def start_nodes(hosting: Hosting, nodes: list[str]) -> None:
"""
The function starts specified storage nodes.
Args:
nodes (list): the list of nodes to start
"""
helper = get_storage_service_helper()
for node in nodes:
helper.start(node)
host = hosting.get_host_by_service(node)
host.start_service(node)
@allure.step("Get Locode")
@ -79,7 +77,7 @@ def get_locode() -> str:
@allure.step("Healthcheck for node {node_name}")
def node_healthcheck(node_name: str) -> HealthStatus:
def node_healthcheck(hosting: Hosting, node_name: str) -> HealthStatus:
"""
The function returns node's health status.
Args:
@ -88,12 +86,12 @@ def node_healthcheck(node_name: str) -> HealthStatus:
health status as HealthStatus object.
"""
command = "control healthcheck"
output = _run_control_command(node_name, command)
output = _run_control_command_with_retries(hosting, node_name, command)
return HealthStatus.from_stdout(output)
@allure.step("Set status for node {node_name}")
def node_set_status(node_name: str, status: str, retries: int = 0) -> None:
def node_set_status(hosting: Hosting, node_name: str, status: str, retries: int = 0) -> None:
"""
The function sets particular status for given node.
Args:
@ -102,7 +100,7 @@ def node_set_status(node_name: str, status: str, retries: int = 0) -> None:
retries (optional, int): number of retry attempts if it didn't work from the first time
"""
command = f"control set-status --status {status}"
_run_control_command(node_name, command, retries)
_run_control_command_with_retries(hosting, node_name, command, retries)
@allure.step("Get netmap snapshot")
@ -123,7 +121,7 @@ def get_netmap_snapshot(node_name: str, shell: Shell) -> str:
@allure.step("Get shard list for node {node_name}")
def node_shard_list(node_name: str) -> list[str]:
def node_shard_list(hosting: Hosting, node_name: str) -> list[str]:
"""
The function returns list of shards for specified node.
Args:
@ -132,46 +130,46 @@ def node_shard_list(node_name: str) -> list[str]:
list of shards.
"""
command = "control shards list"
output = _run_control_command(node_name, command)
output = _run_control_command_with_retries(hosting, node_name, command)
return re.findall(r"Shard (.*):", output)
@allure.step("Shard set for node {node_name}")
def node_shard_set_mode(node_name: str, shard: str, mode: str) -> str:
def node_shard_set_mode(hosting: Hosting, node_name: str, shard: str, mode: str) -> str:
"""
The function sets mode for specified shard.
Args:
node_name str: node name on which shard mode should be set.
"""
command = f"control shards set-mode --id {shard} --mode {mode}"
return _run_control_command(node_name, command)
return _run_control_command_with_retries(hosting, node_name, command)
@allure.step("Drop object from node {node_name}")
def drop_object(node_name: str, cid: str, oid: str) -> str:
def drop_object(hosting: Hosting, node_name: str, cid: str, oid: str) -> str:
"""
The function drops object from specified node.
Args:
node_name str: node name from which object should be dropped.
"""
command = f"control drop-objects -o {cid}/{oid}"
return _run_control_command(node_name, command)
return _run_control_command_with_retries(hosting, node_name, command)
@allure.step("Delete data of node {node_name}")
def delete_node_data(node_name: str) -> None:
helper = get_storage_service_helper()
helper.stop_node(node_name)
helper.delete_node_data(node_name)
def delete_node_data(hosting: Hosting, node_name: str) -> None:
host = hosting.get_host_by_service(node_name)
host.stop_service(node_name)
host.delete_storage_node_data(node_name)
time.sleep(parse_time(MORPH_BLOCK_TIME))
@allure.step("Exclude node {node_to_exclude} from network map")
def exclude_node_from_network_map(node_to_exclude: str, alive_node: str, shell: Shell) -> None:
def exclude_node_from_network_map(hosting: Hosting, node_to_exclude: str, alive_node: str, shell: Shell) -> None:
node_wallet_path = NEOFS_NETMAP_DICT[node_to_exclude]["wallet_path"]
node_netmap_key = get_wallet_public_key(node_wallet_path, STORAGE_WALLET_PASS)
node_set_status(node_to_exclude, status="offline")
node_set_status(hosting, node_to_exclude, status="offline")
time.sleep(parse_time(MORPH_BLOCK_TIME))
tick_epoch()
@ -183,8 +181,8 @@ def exclude_node_from_network_map(node_to_exclude: str, alive_node: str, shell:
@allure.step("Include node {node_to_include} into network map")
def include_node_to_network_map(node_to_include: str, alive_node: str, shell: Shell) -> None:
node_set_status(node_to_include, status="online")
def include_node_to_network_map(hosting: Hosting, node_to_include: str, alive_node: str, shell: Shell) -> None:
node_set_status(hosting, node_to_include, status="online")
time.sleep(parse_time(MORPH_BLOCK_TIME))
tick_epoch()
@ -204,13 +202,38 @@ def check_node_in_map(node_name: str, shell: Shell, alive_node: Optional[str] =
assert node_netmap_key in snapshot, f"Expected node with key {node_netmap_key} in network map"
def _run_control_command(node_name: str, command: str, retries: int = 0) -> str:
helper = get_storage_service_helper()
def _run_control_command_with_retries(
hosting: Hosting, node_name: str, command: str, retries: int = 0
) -> str:
for attempt in range(1 + retries): # original attempt + specified retries
try:
return helper.run_control_command(node_name, command)
return _run_control_command(hosting, node_name, command)
except AssertionError as err:
if attempt < retries:
logger.warning(f"Command {command} failed with error {err} and will be retried")
continue
raise AssertionError(f"Command {command} failed with error {err}") from err
def _run_control_command(hosting: Hosting, service_name: str, command: str) -> None:
host = hosting.get_host_by_service(service_name)
service_config = host.get_service_config(service_name)
wallet_path = service_config.attributes["wallet_path"]
wallet_password = service_config.attributes["wallet_password"]
control_endpoint = service_config.attributes["control_endpoint"]
shell = host.get_shell()
wallet_config_path = f"/tmp/{service_name}-config.yaml"
wallet_config = f'password: "{wallet_password}"'
shell.exec(f"echo '{wallet_config}' > {wallet_config_path}")
cli_config = host.get_cli_config("neofs-cli")
# TODO: implement cli.control
# cli = NeofsCli(shell, cli_config.exec_path, wallet_config_path)
result = shell.exec(
f"{cli_config.exec_path} {command} --endpoint {control_endpoint} "
f"--wallet {wallet_path} --config {wallet_config_path}"
)
return result.stdout

View File

@ -17,12 +17,13 @@ NEOFS_CONTRACT_CACHE_TIMEOUT = os.getenv("NEOFS_CONTRACT_CACHE_TIMEOUT", "30s")
# of 1min plus 15 seconds for GC pass itself)
STORAGE_GC_TIME = os.getenv("STORAGE_GC_TIME", "75s")
# TODO: we should use hosting instead of these endpoints
NEOFS_ENDPOINT = os.getenv("NEOFS_ENDPOINT", "s01.neofs.devenv:8080")
NEO_MAINNET_ENDPOINT = os.getenv("NEO_MAINNET_ENDPOINT", "http://main-chain.neofs.devenv:30333")
MORPH_ENDPOINT = os.getenv("MORPH_ENDPOINT", "http://morph-chain.neofs.devenv:30333")
HTTP_GATE = os.getenv("HTTP_GATE", "http://http.neofs.devenv")
S3_GATE = os.getenv("S3_GATE", "https://s3.neofs.devenv:8080")
GAS_HASH = os.getenv("GAS_HASH", "0xd2a4cff31913016155e38e474a2c06d08be276cf")
NEOFS_CONTRACT = os.getenv("NEOFS_IR_CONTRACTS_NEOFS")
@ -30,12 +31,11 @@ NEOFS_CONTRACT = os.getenv("NEOFS_IR_CONTRACTS_NEOFS")
ASSETS_DIR = os.getenv("ASSETS_DIR", "TemporaryDir")
DEVENV_PATH = os.getenv("DEVENV_PATH", os.path.join("..", "neofs-dev-env"))
MORPH_MAGIC = os.getenv("MORPH_MAGIC")
# Password of wallet owned by user on behalf of whom we are running tests
WALLET_PASS = os.getenv("WALLET_PASS", "")
# Configuration of storage nodes
# TODO: we should use hosting instead of all these variables
STORAGE_RPC_ENDPOINT_1 = os.getenv("STORAGE_RPC_ENDPOINT_1", "s01.neofs.devenv:8080")
STORAGE_RPC_ENDPOINT_2 = os.getenv("STORAGE_RPC_ENDPOINT_2", "s02.neofs.devenv:8080")
STORAGE_RPC_ENDPOINT_3 = os.getenv("STORAGE_RPC_ENDPOINT_3", "s03.neofs.devenv:8080")
@ -89,7 +89,13 @@ NEOFS_NETMAP_DICT = {
}
NEOFS_NETMAP = [node["rpc"] for node in NEOFS_NETMAP_DICT.values()]
# Paths to CLI executables
# Parameters that control SSH connection to storage node
# TODO: we should use hosting instead
STORAGE_NODE_SSH_USER = os.getenv("STORAGE_NODE_SSH_USER")
STORAGE_NODE_SSH_PASSWORD = os.getenv("STORAGE_NODE_SSH_PASSWORD")
STORAGE_NODE_SSH_PRIVATE_KEY_PATH = os.getenv("STORAGE_NODE_SSH_PRIVATE_KEY_PATH")
# Paths to CLI executables on machine that runs tests
NEOGO_EXECUTABLE = os.getenv("NEOGO_EXECUTABLE", "neo-go")
NEOFS_CLI_EXEC = os.getenv("NEOFS_CLI_EXEC", "neofs-cli")
NEOFS_AUTHMATE_EXEC = os.getenv("NEOFS_AUTHMATE_EXEC", "neofs-authmate")
@ -109,21 +115,17 @@ S3_GATE_WALLET_PATH = os.getenv(
)
S3_GATE_WALLET_PASS = os.getenv("S3_GATE_WALLET_PASS", "s3")
# Parameters that control SSH connection to storage node
STORAGE_NODE_SSH_USER = os.getenv("STORAGE_NODE_SSH_USER")
STORAGE_NODE_SSH_PASSWORD = os.getenv("STORAGE_NODE_SSH_PASSWORD")
STORAGE_NODE_SSH_PRIVATE_KEY_PATH = os.getenv("STORAGE_NODE_SSH_PRIVATE_KEY_PATH")
# Path to directory with CLI binaries on storage node (currently we need only neofs-cli)
STORAGE_NODE_BIN_PATH = os.getenv("STORAGE_NODE_BIN_PATH", f"{DEVENV_PATH}/vendor")
# Config for neofs-adm utility. Optional if tests are running against devenv
NEOFS_ADM_CONFIG_PATH = os.getenv("NEOFS_ADM_CONFIG_PATH")
INFRASTRUCTURE_TYPE = os.getenv("INFRASTRUCTURE_TYPE", "LOCAL_DEVENV")
FREE_STORAGE = os.getenv("FREE_STORAGE", "false").lower() == "true"
BIN_VERSIONS_FILE = os.getenv("BIN_VERSIONS_FILE")
# TODO: instead of specifying infrastructure type we should use attributes of hosts
INFRASTRUCTURE_TYPE = os.getenv("INFRASTRUCTURE_TYPE", "LOCAL_DEVENV")
HOSTING_CONFIG_FILE = os.getenv("HOSTING_CONFIG_FILE", ".devenv.hosting.yaml")
# Generate wallet configs
# TODO: we should move all info about wallet configs to fixtures
WALLET_CONFIG = os.path.join(os.getcwd(), "wallet_config.yml")