diff --git a/pytest_tests/helpers/service_helper.py b/pytest_tests/helpers/service_helper.py index a34ccd4..82c7d65 100644 --- a/pytest_tests/helpers/service_helper.py +++ b/pytest_tests/helpers/service_helper.py @@ -1,5 +1,8 @@ -from contextlib import contextmanager +import json import logging +import re +import time +from contextlib import contextmanager import docker @@ -18,15 +21,25 @@ class LocalDevEnvStorageServiceHelper: Manages storage services running on local devenv. """ def stop_node(self, node_name: str) -> None: - container_name = node_name.split('.')[0] + container_name = _get_storage_container_name(node_name) client = docker.APIClient() client.stop(container_name) def start_node(self, node_name: str) -> None: - container_name = node_name.split('.')[0] + container_name = _get_storage_container_name(node_name) client = docker.APIClient() client.start(container_name) + def wait_for_node_to_start(self, node_name: str) -> None: + container_name = _get_storage_container_name(node_name) + expected_state = "running" + for __attempt in range(10): + container = self._get_container_by_name(container_name) + if container and container["State"] == expected_state: + return + time.sleep(3) + raise AssertionError(f'Container {container_name} is not in {expected_state} state') + def run_control_command(self, node_name: str, command: str) -> str: control_endpoint = NEOFS_NETMAP_DICT[node_name]["control"] wallet_path = NEOFS_NETMAP_DICT[node_name]["wallet_path"] @@ -38,20 +51,50 @@ class LocalDevEnvStorageServiceHelper: output = _cmd_run(cmd) return output + def destroy_node(self, node_name: str) -> None: + container_name = _get_storage_container_name(node_name) + client = docker.APIClient() + client.remove_container(container_name, force=True) + + def get_binaries_version(self) -> dict: + return {} + + def _get_container_by_name(self, container_name: str) -> dict: + client = docker.APIClient() + containers = client.containers() + for container in containers: + if container_name in container["Names"]: + return container + return None + class CloudVmStorageServiceHelper: + STORAGE_SERVICE = "neofs-storage.service" + def stop_node(self, node_name: str) -> None: with _create_ssh_client(node_name) as ssh_client: - cmd = "systemctl stop neofs-storage" + cmd = f"systemctl stop {self.STORAGE_SERVICE}" output = ssh_client.exec_with_confirmation(cmd, [""]) logger.info(f"Stop command output: {output.stdout}") def start_node(self, node_name: str) -> None: with _create_ssh_client(node_name) as ssh_client: - cmd = "systemctl start neofs-storage" + cmd = f"systemctl start {self.STORAGE_SERVICE}" output = ssh_client.exec_with_confirmation(cmd, [""]) logger.info(f"Start command output: {output.stdout}") + def wait_for_node_to_start(self, node_name: str) -> None: + expected_state = 'active (running)' + with _create_ssh_client(node_name) as ssh_client: + for __attempt in range(10): + output = ssh_client.exec(f'systemctl status {self.STORAGE_SERVICE}') + if expected_state in output.stdout: + return + time.sleep(3) + raise AssertionError( + f'Service {self.STORAGE_SERVICE} is not in {expected_state} state' + ) + def run_control_command(self, node_name: str, command: str) -> str: control_endpoint = NEOFS_NETMAP_DICT[node_name]["control"] wallet_path = NEOFS_NETMAP_DICT[node_name]["wallet_path"] @@ -78,25 +121,87 @@ class CloudVmStorageServiceHelper: output = ssh_client.exec_with_confirmation(cmd, [""]) return output.stdout + def destroy_node(self, node_name: str) -> None: + with _create_ssh_client(node_name) as ssh_client: + ssh_client.exec(f'systemctl stop {self.STORAGE_SERVICE}') + ssh_client.exec('rm -rf /srv/neofs/*') + + def get_binaries_version(self) -> dict: + binaries = [ + 'neo-go', + 'neofs-adm', + 'neofs-cli', + 'neofs-http-gw', + 'neofs-ir', + 'neofs-lens', + 'neofs-node', + 'neofs-s3-authmate', + 'neofs-s3-gw', + 'neogo-morph-cn', + ] + + version_map = {} + for node_name in NEOFS_NETMAP_DICT: + with _create_ssh_client(node_name) as ssh_client: + for binary in binaries: + out = ssh_client.exec(f'{binary} --version').stdout + version = re.search(r'version[:\s]*(.+)', out, re.IGNORECASE) + version = version.group(1) if version else 'Unknown' + if not version_map.get(binary.upper()): + version_map[binary.upper()] = version + else: + assert version_map[binary.upper()] == version, \ + f'Expected binary {binary} to have identical version on all nodes ' \ + f'(mismatch on node {node_name})' + return version_map class RemoteDevEnvStorageServiceHelper: """ Manages storage services running on remote devenv. """ def stop_node(self, node_name: str) -> None: - container_name = node_name.split('.')[0] + container_name = _get_storage_container_name(node_name) with _create_ssh_client(node_name) as ssh_client: ssh_client.exec(f'docker stop {container_name}') def start_node(self, node_name: str) -> None: - container_name = node_name.split('.')[0] + container_name = _get_storage_container_name(node_name) with _create_ssh_client(node_name) as ssh_client: ssh_client.exec(f'docker start {container_name}') + def wait_for_node_to_start(self, node_name: str) -> None: + container_name = _get_storage_container_name(node_name) + expected_state = 'running' + for __attempt in range(10): + container = self._get_container_by_name(container_name) + if container and container["State"] == expected_state: + return + time.sleep(3) + raise AssertionError(f'Container {container_name} is not in {expected_state} state') + def run_control_command(self, node_name: str, command: str) -> str: # On remote devenv it works same way as in cloud return CloudVmStorageServiceHelper().run_control_command(node_name, command) + def destroy_node(self, node_name: str) -> None: + container_name = _get_storage_container_name(node_name) + with _create_ssh_client(node_name) as ssh_client: + ssh_client.exec(f'docker rm {container_name} --force') + + def get_binaries_version(self) -> dict: + return {} + + def _get_container_by_name(self, node_name: str, container_name: str) -> dict: + with _create_ssh_client(node_name) as ssh_client: + output = ssh_client.exec('docker ps -a --format "{{json .}}"') + containers = json.loads(output) + + for container in containers: + # unlike docker.API in docker ps output Names seems to be a string, so we check by equality + if container["Names"] == container_name: + return container + return None + def get_storage_service_helper(): if INFRASTRUCTURE_TYPE == "LOCAL_DEVENV": @@ -129,3 +234,11 @@ def _create_ssh_client(node_name: str) -> HostClient: yield ssh_client finally: ssh_client.drop() + + +def _get_storage_container_name(node_name: str) -> str: + """ + Converts name of storage name (as it is listed in netmap) into the name of docker container + that runs instance of this storage node. + """ + return node_name.split('.')[0] diff --git a/pytest_tests/testsuites/conftest.py b/pytest_tests/testsuites/conftest.py index ae98378..883d06e 100644 --- a/pytest_tests/testsuites/conftest.py +++ b/pytest_tests/testsuites/conftest.py @@ -12,7 +12,8 @@ from cli_helpers import _cmd_run from common import (ASSETS_DIR, FREE_STORAGE, INFRASTRUCTURE_TYPE, MAINNET_WALLET_PATH, NEOFS_NETMAP_DICT) from payment_neogo import neofs_deposit, transfer_mainnet_gas -from python_keywords.node_management import node_healthcheck, create_ssh_client +from python_keywords.node_management import node_healthcheck +from service_helper import get_storage_service_helper def robot_keyword_adapter(name=None, tags=(), types=()): @@ -26,42 +27,29 @@ logger = logging.getLogger('NeoLogger') @pytest.fixture(scope='session') def cloud_infrastructure_check(): - if not is_cloud_infrastructure(): + if INFRASTRUCTURE_TYPE != "CLOUD_VM": pytest.skip('Test only works on SberCloud infrastructure') yield -def is_cloud_infrastructure(): - return INFRASTRUCTURE_TYPE == "CLOUD_VM" - - @pytest.fixture(scope='session', autouse=True) @allure.title('Check binary versions') def check_binary_versions(request): - environment_dir = request.config.getoption('--alluredir') - is_cloud = is_cloud_infrastructure() - # Collect versions of neo binaries + # Collect versions of local binaries binaries = ['neo-go', 'neofs-cli', 'neofs-authmate'] env_out = _get_binaries_version_local(binaries) - if is_cloud: - binaries = ['neo-go', - 'neofs-adm', - 'neofs-cli', - 'neofs-http-gw', - 'neofs-ir', - 'neofs-lens', - 'neofs-node', - 'neofs-s3-authmate', - 'neofs-s3-gw', - 'neogo-morph-cn'] - env_out = _get_binaries_version_remote(binaries) + # Collect versions of remote binaries + helper = get_storage_service_helper() + remote_binaries = helper.get_binaries_version() + env_out = {**env_out, **remote_binaries} # Get version of aws binary out = _cmd_run('aws --version') out_lines = out.split("\n") env_out["AWS"] = out_lines[0] if out_lines else 'Unknown' + environment_dir = request.config.getoption('--alluredir') if environment_dir: with open(f'{environment_dir}/environment.properties', 'w') as out_file: for env, env_value in env_out.items(): @@ -77,23 +65,6 @@ def _get_binaries_version_local(binaries: list) -> dict: return env_out -def _get_binaries_version_remote(binaries: list) -> dict: - env_out = {} - - for node_name in NEOFS_NETMAP_DICT: - with create_ssh_client(node_name) as ssh_client: - for binary in binaries: - out = ssh_client.exec(f'{binary} --version').stdout - version = re.search(r'version[:\s]*(.+)', out, re.IGNORECASE) - version = version.group(1) if version else 'Unknown' - if not env_out.get(binary.upper()): - env_out[binary.upper()] = version - else: - msg = f'Expected binary {binary} versions on node s1 and {node_name} are the same' - assert env_out[binary.upper()] == version, msg - return env_out - - @pytest.fixture(scope='session', autouse=True) @allure.title('Run health check for all storage nodes') def run_health_check(): diff --git a/pytest_tests/testsuites/network/test_node_management.py b/pytest_tests/testsuites/network/test_node_management.py index a564c7a..628fab0 100644 --- a/pytest_tests/testsuites/network/test_node_management.py +++ b/pytest_tests/testsuites/network/test_node_management.py @@ -11,11 +11,13 @@ from epoch import tick_epoch from python_keywords.container import create_container, get_container from python_keywords.failover_utils import wait_object_replication_on_nodes from python_keywords.neofs_verbs import delete_object, get_object, head_object, put_object -from python_keywords.node_management import (create_ssh_client, drop_object, get_netmap_snapshot, - get_locode, node_healthcheck, node_set_status, - node_shard_list, node_shard_set_mode) +from python_keywords.node_management import (check_node_in_map, delete_node, drop_object, exclude_node_from_network_map, get_netmap_snapshot, get_locode, include_node_to_network_map, + node_healthcheck, node_set_status, + node_shard_list, node_shard_set_mode, start_nodes, stop_nodes) +from service_helper import get_storage_service_helper from storage_policy import get_nodes_with_object, get_simple_object_copies -from utility import placement_policy_from_container, robot_time_to_int, wait_for_gc_pass_on_storage_nodes +from utility import (placement_policy_from_container, robot_time_to_int, + wait_for_gc_pass_on_storage_nodes) from utility_keywords import generate_file from wellknown_acl import PUBLIC_ACL @@ -60,48 +62,19 @@ def after_run_start_all_nodes(): logger.error(f'Node start fails with error:\n{err}') -@pytest.fixture -def after_run_set_all_nodes_online(): - yield - for node in list(NEOFS_NETMAP_DICT.keys()): - try: - node_set_status(node, status="online") - except Exception as err: - logger.error(f"Node status change fails with error:\n{err}") - - -def wait_for_service_started(ssh_client, service_name: str): - expected_state = 'active (running)' - for __attempt in range(10): - output = ssh_client.exec(f'systemctl status {service_name}') - if expected_state in output.stdout: - return - sleep(3) - raise AssertionError(f'Service {service_name} is not in {expected_state} state') - - @pytest.fixture def return_nodes_after_test_run(): yield return_nodes() -def cleanup_node(node_to_cleanup, alive_node): - exclude_node_from_network_map(node_to_cleanup, alive_node) - - with create_ssh_client(node_to_cleanup) as ssh_client: - ssh_client.exec(f'systemctl stop neofs-storage.service') - ssh_client.exec('rm -rf /srv/neofs/*') - sleep(robot_time_to_int(MAINNET_BLOCK_TIME)) - - @allure.step('Return node to cluster') def return_nodes(alive_node: str = None): + helper = get_storage_service_helper() for node in list(check_nodes): - with create_ssh_client(node) as ssh_client: - ssh_client.exec(f'systemctl start neofs-storage.service') - wait_for_service_started(ssh_client, 'neofs-storage.service') - sleep(robot_time_to_int(MAINNET_BLOCK_TIME)) + with allure.step(f'Start node {node}'): + helper.start_node(node) + helper.wait_for_node_to_start(node) with allure.step(f'Move node {node} to online state'): node_set_status(node, status='online', retry=True) @@ -118,50 +91,6 @@ def return_nodes(alive_node: str = None): check_node_in_map(node, alive_node) -def exclude_node_from_network_map(node_to_exclude, alive_node): - node_wallet_path = NEOFS_NETMAP_DICT[node_to_exclude]['wallet_path'] - node_netmap_key = get_wallet_public_key( - node_wallet_path, - STORAGE_WALLET_PASS, - format="base58" - ) - - with allure.step(f'Move node {node_to_exclude} to offline state'): - node_set_status(node_to_exclude, status='offline') - - sleep(robot_time_to_int(MAINNET_BLOCK_TIME)) - tick_epoch() - - snapshot = get_netmap_snapshot(node_name=alive_node) - assert node_netmap_key not in snapshot, f'Expected node with key {node_netmap_key} not in network map' - - -def include_node_to_network_map(node_to_include, alive_node): - with allure.step(f'Move node {node_to_include} to online state'): - node_set_status(node_to_include, status='online') - - sleep(robot_time_to_int(MAINNET_BLOCK_TIME)) - tick_epoch() - - check_node_in_map(node_to_include, alive_node) - - -@allure.step('Check node {node_name} in network map') -def check_node_in_map(node_name: str, alive_node: str = None): - alive_node = alive_node or node_name - node_wallet_path = NEOFS_NETMAP_DICT[node_name]['wallet_path'] - node_netmap_key = get_wallet_public_key( - node_wallet_path, - STORAGE_WALLET_PASS, - format="base58" - ) - - logger.info(f'Node {node_name} netmap key: {node_netmap_key}') - - snapshot = get_netmap_snapshot(node_name=alive_node) - assert node_netmap_key in snapshot, f'Expected node with key {node_netmap_key} in network map' - - @allure.title('Add one node to cluster') @pytest.mark.add_nodes @pytest.mark.node_mgmt @@ -171,14 +100,16 @@ def test_add_nodes(prepare_tmp_dir, prepare_wallet_and_deposit, return_nodes_aft placement_rule_4 = 'REP 4 IN X CBF 1 SELECT 4 FROM * AS X' source_file_path = generate_file() - additional_node = choice(list( - node for node, node_config in NEOFS_NETMAP_DICT.items() if node_config.get('rpc') != STORAGE_RPC_ENDPOINT_1)) + additional_node = choice([ + node for node, node_config in NEOFS_NETMAP_DICT.items() + if node_config.get('rpc') != STORAGE_RPC_ENDPOINT_1 + ]) alive_node = choice([node for node in NEOFS_NETMAP_DICT if node != additional_node]) check_node_in_map(additional_node, alive_node) with allure.step(f'Exclude node {additional_node} from map and clean it up'): - cleanup_node(additional_node, alive_node) + delete_node(additional_node, alive_node) check_nodes.append(additional_node) cid = create_container(wallet, rule=placement_rule_3, basic_acl=PUBLIC_ACL) diff --git a/robot/resources/lib/python_keywords/node_management.py b/robot/resources/lib/python_keywords/node_management.py index 2059960..c0ffa2d 100644 --- a/robot/resources/lib/python_keywords/node_management.py +++ b/robot/resources/lib/python_keywords/node_management.py @@ -6,13 +6,17 @@ import random import re +import time from dataclasses import dataclass from typing import Optional -from common import NEOFS_NETMAP_DICT +from common import MAINNET_BLOCK_TIME, NEOFS_NETMAP_DICT, STORAGE_WALLET_PASS +from data_formatters import get_wallet_public_key +from epoch import tick_epoch from robot.api import logger from robot.api.deco import keyword from service_helper import get_storage_service_helper +from utility import robot_time_to_int ROBOT_AUTO_KEYWORDS = False @@ -184,6 +188,58 @@ def drop_object(node_name: str, cid: str, oid: str) -> str: return _run_control_command(node_name, command) +def delete_node(node_name: str, alive_node: str) -> None: + exclude_node_from_network_map(node_name, alive_node) + + helper = get_storage_service_helper() + helper.destroy_node(node_name) + time.sleep(robot_time_to_int(MAINNET_BLOCK_TIME)) + + +@keyword('Exclude node {node_to_include} from network map') +def exclude_node_from_network_map(node_to_exclude, alive_node): + node_wallet_path = NEOFS_NETMAP_DICT[node_to_exclude]['wallet_path'] + node_netmap_key = get_wallet_public_key( + node_wallet_path, + STORAGE_WALLET_PASS, + format="base58" + ) + + node_set_status(node_to_exclude, status='offline') + + time.sleep(robot_time_to_int(MAINNET_BLOCK_TIME)) + tick_epoch() + + snapshot = get_netmap_snapshot(node_name=alive_node) + assert node_netmap_key not in snapshot, f'Expected node with key {node_netmap_key} not in network map' + + +@keyword('Include node {node_to_include} into network map') +def include_node_to_network_map(node_to_include: str, alive_node: str) -> None: + node_set_status(node_to_include, status='online') + + time.sleep(robot_time_to_int(MAINNET_BLOCK_TIME)) + tick_epoch() + + check_node_in_map(node_to_include, alive_node) + + +@keyword('Check node {node_name} in network map') +def check_node_in_map(node_name: str, alive_node: str = None): + alive_node = alive_node or node_name + node_wallet_path = NEOFS_NETMAP_DICT[node_name]['wallet_path'] + node_netmap_key = get_wallet_public_key( + node_wallet_path, + STORAGE_WALLET_PASS, + format="base58" + ) + + logger.info(f'Node {node_name} netmap key: {node_netmap_key}') + + snapshot = get_netmap_snapshot(node_name=alive_node) + assert node_netmap_key in snapshot, f'Expected node with key {node_netmap_key} in network map' + + def _run_control_command(node_name: str, command: str, retries: int = 0) -> str: helper = get_storage_service_helper() for attempt in range(1 + retries): # original attempt + specified retries