import logging import os.path import random import time import allure import pytest from frostfs_testlib.resources.common import MORPH_BLOCK_TIME from frostfs_testlib.resources.wellknown_acl import PUBLIC_ACL from frostfs_testlib.steps.cli.container import ( StorageContainer, StorageContainerInfo, create_container, ) from frostfs_testlib.steps.cli.object import get_object from frostfs_testlib.steps.node_management import ( check_node_in_map, check_node_not_in_map, wait_for_node_to_be_ready, ) from frostfs_testlib.storage.cluster import ClusterNode, StorageNode from frostfs_testlib.storage.dataclasses.storage_object_info import StorageObjectInfo from frostfs_testlib.storage.dataclasses.wallet import WalletInfo from frostfs_testlib.testing.cluster_test_base import ClusterTestBase from frostfs_testlib.testing.test_control import wait_for_success from frostfs_testlib.utils import datetime_utils from frostfs_testlib.utils.failover_utils import wait_for_host_offline, wait_object_replication from frostfs_testlib.utils.file_utils import get_file_hash from pytest import FixtureRequest logger = logging.getLogger("NeoLogger") @pytest.mark.failover @pytest.mark.failover_server class TestFailoverServer(ClusterTestBase): @wait_for_success(max_wait_time=120, interval=1) def wait_node_not_in_map(self, *args, **kwargs): check_node_not_in_map(*args, **kwargs) @wait_for_success(max_wait_time=120, interval=1) def wait_node_in_map(self, *args, **kwargs): check_node_in_map(*args, **kwargs) @allure.step("Create {count_containers} containers and {count_files} objects") @pytest.fixture def containers( self, request: FixtureRequest, default_wallet: str, ) -> list[StorageContainer]: placement_rule = "REP 2 CBF 2 SELECT 2 FROM * AS X" containers = [] for _ in range(request.param): cont_id = create_container( default_wallet, shell=self.shell, endpoint=self.cluster.default_rpc_endpoint, rule=placement_rule, basic_acl=PUBLIC_ACL, ) wallet = WalletInfo(path=default_wallet) storage_cont_info = StorageContainerInfo(id=cont_id, wallet_file=wallet) containers.append( StorageContainer( storage_container_info=storage_cont_info, shell=self.shell, cluster=self.cluster ) ) return containers @pytest.fixture( params=[ pytest.lazy_fixture("simple_object_size"), pytest.lazy_fixture("complex_object_size"), ], ids=["simple object", "complex object"], # Scope session to upload/delete each files set only once scope="class", ) def object_size(self, request): return request.param @allure.step("Create object and delete after test") @pytest.fixture(scope="class") def storage_objects( self, request: FixtureRequest, containers: list[StorageContainer], object_size: int ) -> StorageObjectInfo: object_list = [] for cont in containers: for _ in range(request.param): object_list.append(cont.generate_object(size=object_size)) yield object_list for storage_object in object_list: os.remove(storage_object.file_path) @allure.step("Select random node to stop and start it after test") @pytest.fixture def node_to_stop(self) -> ClusterNode: node = random.choice(self.cluster.cluster_nodes) yield node with allure.step(f"start {node.storage_node}"): node.host.start_host() with allure.step(f"Waiting status ready for node {node.storage_node}"): wait_for_node_to_be_ready(node.storage_node) @allure.step("Upload object with nodes and compare") def get_corrupted_objects_list( self, nodes: list[StorageNode], storage_objects: list[StorageObjectInfo] ) -> list[StorageObjectInfo]: corrupted_objects = [] for node in nodes: for storage_object in storage_objects: got_file_path = get_object( storage_object.wallet_file_path, storage_object.cid, storage_object.oid, endpoint=node.get_rpc_endpoint(), shell=self.shell, timeout="60s", ) if storage_object.file_hash != get_file_hash(got_file_path): corrupted_objects.append(storage_object) os.remove(got_file_path) return corrupted_objects def check_objects_replication( self, storage_objects: list[StorageObjectInfo], storage_nodes: list[StorageNode] ) -> None: for storage_object in storage_objects: wait_object_replication( storage_object.cid, storage_object.oid, 2, shell=self.shell, nodes=storage_nodes, sleep_interval=45, attempts=60, ) @allure.title("Full shutdown node") @pytest.mark.parametrize("containers, storage_objects", [(5, 20)], indirect=True) def test_complete_node_shutdown( self, containers: list[StorageContainer], storage_objects: list[StorageObjectInfo], default_wallet: str, node_to_stop: ClusterNode, ): with allure.step("Checking that the objects are loader according to the policy"): self.check_objects_replication(storage_objects, self.cluster.storage_nodes) with allure.step(f"Remove {node_to_stop} from the list of nodes"): alive_nodes = list(set(self.cluster.cluster_nodes) - {node_to_stop}) storage_nodes = [cluster.storage_node for cluster in alive_nodes] with allure.step("Tick epoch"): self.tick_epochs(1, storage_nodes[0]) with allure.step("Wait 2 block time"): time.sleep(datetime_utils.parse_time(MORPH_BLOCK_TIME) * 2) with allure.step(f"Stop {node_to_stop} node"): node_to_stop.host.stop_host(mode="hard") with allure.step(f"Check if the node {node_to_stop.storage_node} has stopped"): wait_for_host_offline(self.shell, node_to_stop.storage_node) with allure.step("Wait for objects replication"): self.check_objects_replication(storage_objects, storage_nodes) with allure.step("Verify that there are no corrupted objects"): corrupted_objects_list = self.get_corrupted_objects_list(storage_nodes, storage_objects) assert not corrupted_objects_list with allure.step(f"check {node_to_stop.storage_node} in map"): self.wait_node_in_map( node_to_stop.storage_node, self.shell, alive_node=storage_nodes[0] ) count_tick_epoch = alive_nodes[0].ir_node.get_netmap_cleaner_threshold() + 1 with allure.step(f"Tick {count_tick_epoch} epoch, in {storage_nodes[0]} node"): self.tick_epochs(count_tick_epoch, storage_nodes[0]) with allure.step("Wait 2 block time"): time.sleep(datetime_utils.parse_time(MORPH_BLOCK_TIME) * 2) with allure.step(f"Check {node_to_stop} in not map"): self.wait_node_not_in_map( node_to_stop.storage_node, self.shell, alive_node=storage_nodes[0] ) with allure.step( f"Verify that there are no corrupted objects after {count_tick_epoch} epoch" ): corrupted_objects_list = self.get_corrupted_objects_list(storage_nodes, storage_objects) assert not corrupted_objects_list @allure.title("Temporarily disable a node") @pytest.mark.parametrize("containers, storage_objects", [(1, 2)], indirect=True) def test_temporarily_disable_a_node( self, containers: list[StorageContainer], storage_objects: list[StorageObjectInfo], default_wallet: str, node_to_stop, ): with allure.step("Checking that the objects are loader according to the policy"): self.check_objects_replication(storage_objects, self.cluster.storage_nodes) with allure.step(f"Remove {node_to_stop} from the list of nodes"): storage_nodes = list(set(self.cluster.storage_nodes) - {node_to_stop.storage_node}) with allure.step("Tick epoch"): self.tick_epochs(1, storage_nodes[0]) with allure.step("Wait 2 block time"): time.sleep(datetime_utils.parse_time(MORPH_BLOCK_TIME) * 2) with allure.step(f"Stop {node_to_stop.storage_node} node"): node_to_stop.host.stop_host(mode="hard") with allure.step(f"Check if the node {node_to_stop} has stopped"): wait_for_host_offline(self.shell, node_to_stop.storage_node) with allure.step("Wait for objects replication"): self.check_objects_replication(storage_objects, storage_nodes) with allure.step("Verify that there are no corrupted objects"): corrupted_objects_list = self.get_corrupted_objects_list(storage_nodes, storage_objects) assert not corrupted_objects_list with allure.step(f"Check {node_to_stop} in map"): self.wait_node_in_map( node_to_stop.storage_node, self.shell, alive_node=storage_nodes[0] ) with allure.step(f"Start {node_to_stop}"): node_to_stop.host.start_host() with allure.step("Verify that there are no corrupted objects"): corrupted_objects_list = self.get_corrupted_objects_list(storage_nodes, storage_objects) assert not corrupted_objects_list