From a6e1190f235fe63551c144c8616122859ecbaf03 Mon Sep 17 00:00:00 2001 From: Dmitriy Zayakin Date: Wed, 3 May 2023 11:45:44 +0300 Subject: [PATCH] Add tests for node shutdown and object replication integrity Signed-off-by: Dmitriy Zayakin --- pytest_tests/helpers/__init__.py | 0 pytest_tests/helpers/cluster.py | 119 ++++++++- pytest_tests/helpers/failover_utils.py | 24 +- pytest_tests/helpers/frostfs_verbs.py | 34 ++- pytest_tests/helpers/test_control.py | 37 +++ pytest_tests/steps/cluster_test_base.py | 10 +- .../failovers/test_failover_server.py | 248 ++++++++++++++++++ 7 files changed, 450 insertions(+), 22 deletions(-) delete mode 100644 pytest_tests/helpers/__init__.py create mode 100644 pytest_tests/testsuites/failovers/test_failover_server.py diff --git a/pytest_tests/helpers/__init__.py b/pytest_tests/helpers/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/pytest_tests/helpers/cluster.py b/pytest_tests/helpers/cluster.py index cafe3177..f2b58f7c 100644 --- a/pytest_tests/helpers/cluster.py +++ b/pytest_tests/helpers/cluster.py @@ -1,9 +1,10 @@ -import random import pathlib +import random import re from dataclasses import dataclass -from typing import Any +from typing import Any, Optional +import pytest import yaml from frostfs_testlib.blockchain import RPCClient from frostfs_testlib.hosting import Host, Hosting @@ -111,14 +112,14 @@ class InnerRingNode(NodeBase): since frostfs network will still treat it as "node" """ - def get_netmap_cleaner_threshold(self) -> str: + def get_netmap_cleaner_threshold(self) -> int: config_file = self.get_remote_config_path() contents = self.host.get_shell().exec(f"cat {config_file}").stdout config = yaml.safe_load(contents) value = config["netmap_cleaner"]["threshold"] - return value + return int(value) class S3Gate(NodeBase): @@ -217,6 +218,73 @@ class StorageNode(NodeBase): return f"{self.name}: {self.get_rpc_endpoint()}" +class ClusterNode: + """ + Represents physical node where multiple different services may be located + """ + + host: Host + storage_node: Optional[StorageNode] = None + ir_node: Optional[InnerRingNode] = None + s3_gate: Optional[S3Gate] = None + http_gate: Optional[HTTPGate] = None + morph_chain: Optional[MorphChain] = None + main_chain: Optional[MainChain] = None + + def __init__(self, host: Host, nodes: list[NodeBase]) -> None: + self.host = host + attributes_map = { + StorageNode: "storage_node", + InnerRingNode: "ir_node", + S3Gate: "s3_gate", + HTTPGate: "http_gate", + MorphChain: "morph_chain", + MainChain: "main_chain", + } + + for node in nodes: + if node.host.config.address == host.config.address: + self.__setattr__(attributes_map[node.__class__], node) + + @property + def host_ip(self): + return self.host.config.address + + def __eq__(self, other): + return self.host.config.address == other.host.config.address + + def __hash__(self): + return id(self.host.config.address) + + def __str__(self): + return self.host.config.address + + def __repr__(self) -> str: + return self.host.config.address + + def get_service_by_type(self, service_type: type[NodeBase]) -> type[NodeBase]: + class_name = service_type.__name__ + class_field_map = { + StorageNode.__name__: self.storage_node, + InnerRingNode.__name__: self.ir_node, + S3Gate.__name__: self.s3_gate, + HTTPGate.__name__: self.http_gate, + MorphChain.__name__: self.morph_chain, + } + if class_name not in class_field_map: + raise pytest.fail(f"Invalid type passed {class_name}") + return class_field_map[class_name] + + def get_list_of_services(self) -> list[str]: + return [ + self.storage_node.get_service_systemctl_name(), + self.ir_node.get_service_systemctl_name(), + self.s3_gate.get_service_systemctl_name(), + self.http_gate.get_service_systemctl_name(), + self.morph_chain.get_service_systemctl_name(), + ] + + class Cluster: """ This class represents a Cluster object for the whole storage based on provided hosting @@ -225,6 +293,7 @@ class Cluster: default_rpc_endpoint: str default_s3_gate_endpoint: str default_http_gate_endpoint: str + cluster_nodes: list[ClusterNode] def __init__(self, hosting: Hosting) -> None: self._hosting = hosting @@ -239,6 +308,17 @@ class Cluster: """ return self._hosting.hosts + @property + def cluster_nodes(self) -> list[ClusterNode]: + """ + Returns list of Cluster Nodes + """ + + return [ + ClusterNode(host, self._find_nodes_by_pattern(f".*{id:02d}$")) + for id, host in enumerate(self.hosts, start=1) + ] + @property def hosting(self) -> Hosting: return self._hosting @@ -304,8 +384,11 @@ class Cluster: """ return self._get_nodes(_ServicesNames.INNER_RING) - def _get_nodes(self, service_name) -> list[StorageNode]: - configs = self.hosting.find_service_configs(f"{service_name}\d*$") + def _get_nodes(self, service_name) -> list[NodeBase]: + return self._find_nodes_by_pattern(f"{service_name}\d*$") + + def _find_nodes_by_pattern(self, pattern) -> list[NodeBase]: + configs = self.hosting.find_service_configs(pattern) class_mapping: dict[str, Any] = { _ServicesNames.STORAGE: StorageNode, @@ -316,15 +399,23 @@ class Cluster: _ServicesNames.MAIN_CHAIN: MainChain, } - cls = class_mapping.get(service_name) - return [ - cls( - self._get_id(config.name), - config.name, - self.hosting.get_host_by_service(config.name), + found_nodes = [] + for config in configs: + # config.name is something like s3-gate01. Cut last digits to know service type + service_type = re.findall(".*\D", config.name)[0] + # exclude unsupported services + if service_type not in class_mapping.keys(): + continue + + cls = class_mapping.get(service_type) + found_nodes.append( + cls( + self._get_id(config.name), + config.name, + self.hosting.get_host_by_service(config.name), + ) ) - for config in configs - ] + return found_nodes def _get_id(self, node_name) -> str: pattern = "\d*$" diff --git a/pytest_tests/helpers/failover_utils.py b/pytest_tests/helpers/failover_utils.py index a9b257b7..093d2a40 100644 --- a/pytest_tests/helpers/failover_utils.py +++ b/pytest_tests/helpers/failover_utils.py @@ -2,11 +2,13 @@ import logging from time import sleep import allure -from frostfs_testlib.shell import Shell +from frostfs_testlib.hosting import Host +from frostfs_testlib.shell import CommandOptions, Shell from pytest_tests.helpers.cluster import Cluster, StorageNode from pytest_tests.helpers.node_management import storage_node_healthcheck from pytest_tests.helpers.storage_policy import get_nodes_with_object +from pytest_tests.helpers.test_control import retry logger = logging.getLogger("NeoLogger") @@ -18,8 +20,9 @@ def wait_object_replication( expected_copies: int, shell: Shell, nodes: list[StorageNode], + sleep_interval: int = 15, + attempts: int = 20, ) -> list[StorageNode]: - sleep_interval, attempts = 15, 20 nodes_with_object = [] for _ in range(attempts): nodes_with_object = get_nodes_with_object(cid, oid, shell=shell, nodes=nodes) @@ -53,3 +56,20 @@ def is_all_storage_nodes_returned(cluster: Cluster) -> bool: if health_check.health_status != "READY" or health_check.network_status != "ONLINE": return False return True + + +@allure.step("Ping node") +def ping_host(shell: Shell, host: Host): + options = CommandOptions(check=False) + return shell.exec(f"ping {host.config.address} -c 1", options).return_code + + +@retry(max_attempts=60, sleep_interval=5, expected_result=1) +@allure.step("Waiting for host of {node} to go offline") +def wait_for_host_offline(shell: Shell, node: StorageNode): + try: + # TODO: Quick solution for now, should be replaced by lib interactions + return ping_host(shell, node.host) + except Exception as err: + logger.warning(f"Host ping fails with error {err}") + return 0 diff --git a/pytest_tests/helpers/frostfs_verbs.py b/pytest_tests/helpers/frostfs_verbs.py index aa58132b..4d5b21a5 100644 --- a/pytest_tests/helpers/frostfs_verbs.py +++ b/pytest_tests/helpers/frostfs_verbs.py @@ -6,7 +6,7 @@ import uuid from typing import Any, Optional import allure -from frostfs_testlib.cli import FrostfsCli +from frostfs_testlib.cli import FrostfsCli, NeoGo from frostfs_testlib.shell import Shell from frostfs_testlib.utils import json_utils @@ -15,6 +15,7 @@ from pytest_tests.resources.common import ( ASSETS_DIR, CLI_DEFAULT_TIMEOUT, FROSTFS_CLI_EXEC, + NEOGO_EXECUTABLE, WALLET_CONFIG, ) @@ -670,3 +671,34 @@ def head_object( logger.info("decoding simple header") return json_utils.decode_simple_header(decoded) + + +@allure.step("Run neo-go query height") +def neo_go_query_height(shell: Shell, endpoint: str) -> dict: + """ + Run neo-go query height command + + Args: + shell: executor for cli command + endpoint: endpoint to execute + Returns: + dict-> + Latest block: {value} + Validated state: {value} + + """ + neogo = NeoGo(shell, neo_go_exec_path=NEOGO_EXECUTABLE) + output = neogo.query.height(rpc_endpoint=endpoint).stdout + try: + # taking first line from command's output contain the latest block in blockchain + first_line = output.split("\n")[0] + except Exception: + logger.error(f"Got empty output (neo-go query height): {output}") + latest_block = first_line.split(":") + # taking second line from command's output contain wallet key + second_line = output.split("\n")[1] + validated_state = second_line.split(":") + return { + latest_block[0].replace(":", ""): int(latest_block[1]), + validated_state[0].replace(":", ""): int(validated_state[1]), + } diff --git a/pytest_tests/helpers/test_control.py b/pytest_tests/helpers/test_control.py index 5676b96e..51b4cd3c 100644 --- a/pytest_tests/helpers/test_control.py +++ b/pytest_tests/helpers/test_control.py @@ -1,6 +1,7 @@ import logging from functools import wraps from time import sleep, time +from typing import Any from _pytest.outcomes import Failed from pytest import fail @@ -78,3 +79,39 @@ def wait_for_success(max_wait_time: int = 60, interval: int = 1): return impl return wrapper + + +def retry(max_attempts: int, sleep_interval: int = 1, expected_result: Any = None): + """ + Decorator to wait for some conditions/functions to pass successfully. + This is useful if you don't know exact time when something should pass successfully and do not + want to use sleep(X) with too big X. + + Be careful though, wrapped function should only check the state of something, not change it. + """ + + def wrapper(func): + @wraps(func) + def impl(*a, **kw): + last_exception = None + for _ in range(max_attempts): + try: + actual_result = func(*a, **kw) + if expected_result is not None: + assert expected_result == actual_result + return actual_result + except Exception as ex: + logger.debug(ex) + last_exception = ex + sleep(sleep_interval) + except Failed as ex: + logger.debug(ex) + last_exception = ex + sleep(sleep_interval) + + # timeout exceeded with no success, raise last_exception + raise last_exception + + return impl + + return wrapper diff --git a/pytest_tests/steps/cluster_test_base.py b/pytest_tests/steps/cluster_test_base.py index a0e6c965..7ebef916 100644 --- a/pytest_tests/steps/cluster_test_base.py +++ b/pytest_tests/steps/cluster_test_base.py @@ -3,7 +3,7 @@ import pytest from frostfs_testlib.shell import Shell from pytest_tests.helpers import epoch -from pytest_tests.helpers.cluster import Cluster +from pytest_tests.helpers.cluster import Cluster, StorageNode # To skip adding every mandatory singleton dependency to EACH test function @@ -18,12 +18,12 @@ class ClusterTestBase: yield @allure.title("Tick {epochs_to_tick} epochs") - def tick_epochs(self, epochs_to_tick: int): + def tick_epochs(self, epochs_to_tick: int, alive_node: StorageNode): for _ in range(epochs_to_tick): - self.tick_epoch() + self.tick_epoch(alive_node) - def tick_epoch(self): - epoch.tick_epoch(self.shell, self.cluster) + def tick_epoch(self, alive_node: StorageNode): + epoch.tick_epoch(self.shell, self.cluster, alive_node=alive_node) def wait_for_epochs_align(self): epoch.wait_for_epochs_align(self.shell, self.cluster) diff --git a/pytest_tests/testsuites/failovers/test_failover_server.py b/pytest_tests/testsuites/failovers/test_failover_server.py new file mode 100644 index 00000000..57e46cde --- /dev/null +++ b/pytest_tests/testsuites/failovers/test_failover_server.py @@ -0,0 +1,248 @@ +import logging +import os.path +import random +import time + +import allure +import pytest +from frostfs_testlib.resources.common import PUBLIC_ACL +from frostfs_testlib.utils import datetime_utils +from pytest import FixtureRequest + +from pytest_tests.helpers.cluster import ClusterNode, StorageNode +from pytest_tests.helpers.container import StorageContainer, StorageContainerInfo, create_container +from pytest_tests.helpers.failover_utils import wait_for_host_offline, wait_object_replication +from pytest_tests.helpers.file_helper import get_file_hash +from pytest_tests.helpers.frostfs_verbs import get_object +from pytest_tests.helpers.node_management import ( + check_node_in_map, + check_node_not_in_map, + wait_for_node_to_be_ready, +) +from pytest_tests.helpers.storage_object_info import StorageObjectInfo +from pytest_tests.helpers.test_control import wait_for_success +from pytest_tests.helpers.wallet import WalletFile +from pytest_tests.resources.common import MORPH_BLOCK_TIME +from pytest_tests.steps.cluster_test_base import ClusterTestBase + +logger = logging.getLogger("NeoLogger") + + +@pytest.mark.failover +@pytest.mark.failover_server +class TestFailoverServer(ClusterTestBase): + @wait_for_success(max_wait_time=120, interval=1) + def wait_node_not_in_map(self, *args, **kwargs): + check_node_not_in_map(*args, **kwargs) + + @wait_for_success(max_wait_time=120, interval=1) + def wait_node_in_map(self, *args, **kwargs): + check_node_in_map(*args, **kwargs) + + @allure.step("Create {count_containers} containers and {count_files} objects") + @pytest.fixture + def containers( + self, + request: FixtureRequest, + default_wallet: str, + ) -> list[StorageContainer]: + + placement_rule = "REP 2 CBF 2 SELECT 2 FROM * AS X" + + containers = [] + + for _ in range(request.param): + cont_id = create_container( + default_wallet, + shell=self.shell, + endpoint=self.cluster.default_rpc_endpoint, + rule=placement_rule, + basic_acl=PUBLIC_ACL, + ) + wallet = WalletFile(path=default_wallet) + storage_cont_info = StorageContainerInfo(id=cont_id, wallet_file=wallet) + containers.append( + StorageContainer( + storage_container_info=storage_cont_info, shell=self.shell, cluster=self.cluster + ) + ) + + return containers + + @pytest.fixture( + params=[ + pytest.lazy_fixture("simple_object_size"), + pytest.lazy_fixture("complex_object_size"), + ], + ids=["simple object", "complex object"], + # Scope session to upload/delete each files set only once + scope="class", + ) + def object_size(self, request): + return request.param + + @allure.step("Create object and delete after test") + @pytest.fixture(scope="class") + def storage_objects( + self, request: FixtureRequest, containers: list[StorageContainer], object_size: int + ) -> StorageObjectInfo: + object_list = [] + for cont in containers: + for _ in range(request.param): + object_list.append(cont.generate_object(size=object_size)) + + yield object_list + + for storage_object in object_list: + os.remove(storage_object.file_path) + + @allure.step("Select random node to stop and start it after test") + @pytest.fixture + def node_to_stop(self) -> ClusterNode: + node = random.choice(self.cluster.cluster_nodes) + yield node + with allure.step(f"start {node.storage_node}"): + node.host.start_host() + with allure.step(f"Waiting status ready for node {node.storage_node}"): + wait_for_node_to_be_ready(node.storage_node) + + @allure.step("Upload object with nodes and compare") + def get_corrupted_objects_list( + self, nodes: list[StorageNode], storage_objects: list[StorageObjectInfo] + ) -> list[StorageObjectInfo]: + corrupted_objects = [] + for node in nodes: + for storage_object in storage_objects: + got_file_path = get_object( + storage_object.wallet_file_path, + storage_object.cid, + storage_object.oid, + endpoint=node.get_rpc_endpoint(), + shell=self.shell, + timeout="60s", + ) + if storage_object.file_hash != get_file_hash(got_file_path): + corrupted_objects.append(storage_object) + os.remove(got_file_path) + + return corrupted_objects + + def check_objects_replication( + self, storage_objects: list[StorageObjectInfo], storage_nodes: list[StorageNode] + ) -> None: + for storage_object in storage_objects: + wait_object_replication( + storage_object.cid, + storage_object.oid, + 2, + shell=self.shell, + nodes=storage_nodes, + sleep_interval=45, + attempts=60, + ) + + @allure.title("Full shutdown node") + @pytest.mark.parametrize("containers, storage_objects", [(5, 20)], indirect=True) + def test_complete_node_shutdown( + self, + containers: list[StorageContainer], + storage_objects: list[StorageObjectInfo], + default_wallet: str, + node_to_stop: ClusterNode, + ): + with allure.step("Checking that the objects are loader according to the policy"): + self.check_objects_replication(storage_objects, self.cluster.storage_nodes) + + with allure.step(f"Remove {node_to_stop} from the list of nodes"): + alive_nodes = list(set(self.cluster.cluster_nodes) - {node_to_stop}) + + storage_nodes = [cluster.storage_node for cluster in alive_nodes] + + with allure.step("Tick epoch"): + self.tick_epochs(1, storage_nodes[0]) + + with allure.step("Wait 2 block time"): + time.sleep(datetime_utils.parse_time(MORPH_BLOCK_TIME) * 2) + + with allure.step(f"Stop {node_to_stop} node"): + node_to_stop.host.stop_host(mode="hard") + + with allure.step(f"Check if the node {node_to_stop.storage_node} has stopped"): + wait_for_host_offline(self.shell, node_to_stop.storage_node) + + with allure.step("Wait for objects replication"): + self.check_objects_replication(storage_objects, storage_nodes) + + with allure.step("Verify that there are no corrupted objects"): + corrupted_objects_list = self.get_corrupted_objects_list(storage_nodes, storage_objects) + + assert not corrupted_objects_list + + with allure.step(f"check {node_to_stop.storage_node} in map"): + self.wait_node_in_map( + node_to_stop.storage_node, self.shell, alive_node=storage_nodes[0] + ) + + count_tick_epoch = alive_nodes[0].ir_node.get_netmap_cleaner_threshold() + 1 + with allure.step(f"Tick {count_tick_epoch} epoch, in {storage_nodes[0]} node"): + self.tick_epochs(count_tick_epoch, storage_nodes[0]) + + with allure.step("Wait 2 block time"): + time.sleep(datetime_utils.parse_time(MORPH_BLOCK_TIME) * 2) + + with allure.step(f"Check {node_to_stop} in not map"): + self.wait_node_not_in_map( + node_to_stop.storage_node, self.shell, alive_node=storage_nodes[0] + ) + + with allure.step( + f"Verify that there are no corrupted objects after {count_tick_epoch} epoch" + ): + corrupted_objects_list = self.get_corrupted_objects_list(storage_nodes, storage_objects) + assert not corrupted_objects_list + + @allure.title("Temporarily disable a node") + @pytest.mark.parametrize("containers, storage_objects", [(1, 2)], indirect=True) + def test_temporarily_disable_a_node( + self, + containers: list[StorageContainer], + storage_objects: list[StorageObjectInfo], + default_wallet: str, + node_to_stop, + ): + with allure.step("Checking that the objects are loader according to the policy"): + self.check_objects_replication(storage_objects, self.cluster.storage_nodes) + + with allure.step(f"Remove {node_to_stop} from the list of nodes"): + storage_nodes = list(set(self.cluster.storage_nodes) - {node_to_stop.storage_node}) + + with allure.step("Tick epoch"): + self.tick_epochs(1, storage_nodes[0]) + + with allure.step("Wait 2 block time"): + time.sleep(datetime_utils.parse_time(MORPH_BLOCK_TIME) * 2) + + with allure.step(f"Stop {node_to_stop.storage_node} node"): + node_to_stop.host.stop_host(mode="hard") + + with allure.step(f"Check if the node {node_to_stop} has stopped"): + wait_for_host_offline(self.shell, node_to_stop.storage_node) + + with allure.step("Wait for objects replication"): + self.check_objects_replication(storage_objects, storage_nodes) + + with allure.step("Verify that there are no corrupted objects"): + corrupted_objects_list = self.get_corrupted_objects_list(storage_nodes, storage_objects) + assert not corrupted_objects_list + + with allure.step(f"Check {node_to_stop} in map"): + self.wait_node_in_map( + node_to_stop.storage_node, self.shell, alive_node=storage_nodes[0] + ) + + with allure.step(f"Start {node_to_stop}"): + node_to_stop.host.start_host() + + with allure.step("Verify that there are no corrupted objects"): + corrupted_objects_list = self.get_corrupted_objects_list(storage_nodes, storage_objects) + assert not corrupted_objects_list