import logging import os.path import random import allure import pytest from frostfs_testlib import reporter from frostfs_testlib.resources.common import MORPH_BLOCK_TIME from frostfs_testlib.resources.wellknown_acl import PUBLIC_ACL from frostfs_testlib.steps.cli.container import StorageContainer, StorageContainerInfo, create_container from frostfs_testlib.steps.cli.object import get_object, get_object_nodes, put_object from frostfs_testlib.steps.node_management import check_node_in_map, check_node_not_in_map from frostfs_testlib.storage.cluster import ClusterNode, StorageNode from frostfs_testlib.storage.controllers import ClusterStateController from frostfs_testlib.storage.dataclasses.object_size import ObjectSize from frostfs_testlib.storage.dataclasses.storage_object_info import StorageObjectInfo from frostfs_testlib.storage.dataclasses.wallet import WalletInfo from frostfs_testlib.testing.cluster_test_base import ClusterTestBase from frostfs_testlib.testing.test_control import wait_for_success from frostfs_testlib.utils import datetime_utils from frostfs_testlib.utils.failover_utils import wait_object_replication from frostfs_testlib.utils.file_utils import get_file_hash from pytest import FixtureRequest logger = logging.getLogger("NeoLogger") @pytest.mark.failover @pytest.mark.failover_server class TestFailoverServer(ClusterTestBase): @wait_for_success(max_wait_time=120, interval=1) def wait_node_not_in_map(self, *args, **kwargs): check_node_not_in_map(*args, **kwargs) @wait_for_success(max_wait_time=120, interval=1) def wait_node_in_map(self, *args, **kwargs): check_node_in_map(*args, **kwargs) @reporter.step("Create {count_containers} containers and {count_files} objects") @pytest.fixture def containers( self, request: FixtureRequest, default_wallet: str, ) -> list[StorageContainer]: placement_rule = "REP 2 CBF 2 SELECT 2 FROM *" containers = [] for _ in range(request.param): cont_id = create_container( default_wallet, shell=self.shell, endpoint=self.cluster.default_rpc_endpoint, rule=placement_rule, basic_acl=PUBLIC_ACL, ) wallet = WalletInfo(path=default_wallet) storage_cont_info = StorageContainerInfo(id=cont_id, wallet_file=wallet) containers.append( StorageContainer(storage_container_info=storage_cont_info, shell=self.shell, cluster=self.cluster) ) return containers @reporter.step("Creation container") @pytest.fixture() def container(self, default_wallet: str) -> StorageContainer: select = len(self.cluster.cluster_nodes) placement_rule = f"REP {select - 1} CBF 1 SELECT {select} FROM *" cont_id = create_container( default_wallet, shell=self.shell, endpoint=self.cluster.default_rpc_endpoint, rule=placement_rule, basic_acl=PUBLIC_ACL, ) wallet = WalletInfo(path=default_wallet) storage_cont_info = StorageContainerInfo(id=cont_id, wallet_file=wallet) return StorageContainer(storage_container_info=storage_cont_info, shell=self.shell, cluster=self.cluster) @reporter.step("Create object and delete after test") @pytest.fixture(scope="class") def storage_objects( self, request: FixtureRequest, containers: list[StorageContainer], simple_object_size: ObjectSize, complex_object_size: ObjectSize, ) -> StorageObjectInfo: count_object = request.param object_sizes = [simple_object_size, complex_object_size] object_list: list[StorageObjectInfo] = [] for cont in containers: for _ in range(count_object): object_list.append(cont.generate_object(size=random.choice(object_sizes).value)) for storage_object in object_list: os.remove(storage_object.file_path) yield object_list @reporter.step("Select random node to stop and start it after test") @pytest.fixture def node_to_stop( self, node_under_test: ClusterNode, cluster_state_controller: ClusterStateController ) -> ClusterNode: yield node_under_test with reporter.step(f"start {node_under_test.storage_node}"): cluster_state_controller.start_stopped_hosts() @reporter.step("Upload object with nodes and compare") def get_corrupted_objects_list( self, nodes: list[StorageNode], storage_objects: list[StorageObjectInfo] ) -> list[StorageObjectInfo]: corrupted_objects = [] errors_get = [] for node in nodes: for storage_object in storage_objects: try: got_file_path = get_object( storage_object.wallet_file_path, storage_object.cid, storage_object.oid, endpoint=node.get_rpc_endpoint(), shell=self.shell, timeout="60s", ) if storage_object.file_hash != get_file_hash(got_file_path): corrupted_objects.append(storage_object) os.remove(got_file_path) except RuntimeError: errors_get.append(storage_object.oid) assert len(errors_get) == 0, f"Get failed - {errors_get}" return corrupted_objects def check_objects_replication( self, storage_objects: list[StorageObjectInfo], storage_nodes: list[StorageNode] ) -> None: for storage_object in storage_objects: wait_object_replication( storage_object.cid, storage_object.oid, 2, shell=self.shell, nodes=storage_nodes, sleep_interval=45, attempts=60, ) @pytest.fixture() def object_and_nodes( self, simple_object_size: ObjectSize, container: StorageContainer, default_wallet: str ) -> tuple[StorageObjectInfo, list[ClusterNode]]: object_info = container.generate_object(simple_object_size.value) object_nodes = get_object_nodes( cluster=self.cluster, cid=object_info.cid, oid=object_info.oid, alive_node=self.cluster.cluster_nodes[0] ) return object_info, object_nodes @pytest.fixture() def up_stop_nodes(self, cluster_state_controller: ClusterStateController): yield cluster_state_controller.start_stopped_hosts() @allure.title("Full shutdown node") @pytest.mark.parametrize("containers, storage_objects", [(5, 10)], indirect=True) def test_complete_node_shutdown( self, storage_objects: list[StorageObjectInfo], node_to_stop: ClusterNode, cluster_state_controller: ClusterStateController, ): with reporter.step(f"Remove {node_to_stop} from the list of nodes"): alive_nodes = list(set(self.cluster.cluster_nodes) - {node_to_stop}) storage_nodes = [cluster.storage_node for cluster in alive_nodes] with reporter.step("Tick epoch and wait for 2 blocks"): self.tick_epochs(1, storage_nodes[0], wait_block=2) with reporter.step(f"Stop node"): cluster_state_controller.stop_node_host(node=node_to_stop, mode="hard") with reporter.step("Verify that there are no corrupted objects"): corrupted_objects_list = self.get_corrupted_objects_list(storage_nodes, storage_objects) assert not corrupted_objects_list with reporter.step(f"check {node_to_stop.storage_node} in map"): self.wait_node_in_map(node_to_stop.storage_node, self.shell, alive_node=storage_nodes[0]) count_tick_epoch = int(alive_nodes[0].ir_node.get_netmap_cleaner_threshold()) + 4 with reporter.step(f"Tick {count_tick_epoch} epochs and wait for 2 blocks"): self.tick_epochs(count_tick_epoch, storage_nodes[0], wait_block=2) with reporter.step(f"Check {node_to_stop} in not map"): self.wait_node_not_in_map(node_to_stop.storage_node, self.shell, alive_node=storage_nodes[0]) with reporter.step(f"Verify that there are no corrupted objects after {count_tick_epoch} epoch"): corrupted_objects_list = self.get_corrupted_objects_list(storage_nodes, storage_objects) assert not corrupted_objects_list @allure.title("Temporarily disable a node") @pytest.mark.parametrize("containers, storage_objects", [(5, 10)], indirect=True) def test_temporarily_disable_a_node( self, storage_objects: list[StorageObjectInfo], node_to_stop: ClusterNode, cluster_state_controller: ClusterStateController, ): with reporter.step(f"Remove {node_to_stop} from the list of nodes"): storage_nodes = list(set(self.cluster.storage_nodes) - {node_to_stop.storage_node}) with reporter.step("Tick epoch and wait for 2 blocks"): self.tick_epochs(1, storage_nodes[0], wait_block=2) with reporter.step(f"Stop node"): cluster_state_controller.stop_node_host(node=node_to_stop, mode="hard") with reporter.step("Verify that there are no corrupted objects"): corrupted_objects_list = self.get_corrupted_objects_list(storage_nodes, storage_objects) assert not corrupted_objects_list with reporter.step(f"Check {node_to_stop} in map"): self.wait_node_in_map(node_to_stop.storage_node, self.shell, alive_node=storage_nodes[0]) cluster_state_controller.start_node_host(node_to_stop) with reporter.step("Verify that there are no corrupted objects"): corrupted_objects_list = self.get_corrupted_objects_list(storage_nodes, storage_objects) assert not corrupted_objects_list @allure.title("Not enough nodes in the container with policy - 'REP 3 CBF 1 SELECT 4 FROM *'") def test_not_enough_nodes_in_container_rep_3( self, container: list[StorageContainer], object_and_nodes: tuple[StorageObjectInfo, list[ClusterNode]], default_wallet: str, cluster_state_controller: ClusterStateController, simple_file: str, up_stop_nodes: None, ): object_info, object_nodes = object_and_nodes node_not_object = list(set(self.cluster.cluster_nodes) - set(object_nodes))[0] with reporter.step("Stop all nodes with object, except 1"): for cluster_node in object_nodes[1:]: cluster_state_controller.stop_node_host(node=cluster_node, mode="hard") with reporter.step("Get object"): with reporter.step(f"Get operation to {node_not_object} where it does not exist, expect success"): get_object( wallet=default_wallet, cid=object_info.cid, oid=object_info.oid, shell=self.shell, endpoint=node_not_object.storage_node.get_rpc_endpoint(), ) with reporter.step(f"Get operation to {object_nodes[0]} with object, expect success"): get_object( wallet=default_wallet, cid=object_info.cid, oid=object_info.oid, shell=self.shell, endpoint=object_nodes[0].storage_node.get_rpc_endpoint(), ) with reporter.step(f"Put operation to node with object, expect error"): with pytest.raises(RuntimeError): put_object( wallet=default_wallet, path=simple_file, cid=object_info.cid, shell=self.shell, endpoint=node_not_object.storage_node.get_rpc_endpoint(), ) @allure.title("Not enough nodes in the container with policy - 'REP 2 CBF 2 SELECT 4 FROM *'") def test_not_enough_nodes_in_container_rep_2( self, default_wallet: str, cluster_state_controller: ClusterStateController, simple_file: str, up_stop_nodes: None, ): with reporter.step("Creating a container with a full network map"): select = len(self.cluster.cluster_nodes) placement_rule = f"REP {select - 2} IN X CBF 2 SELECT {select} FROM * AS X" cid_1 = create_container( default_wallet, shell=self.shell, endpoint=self.cluster.default_rpc_endpoint, rule=placement_rule, basic_acl=PUBLIC_ACL, ) with reporter.step("Put object"): oid = put_object( wallet=default_wallet, path=simple_file, cid=cid_1, shell=self.shell, endpoint=self.cluster.default_rpc_endpoint, ) with reporter.step("Search nodes with object"): object_nodes = get_object_nodes( cluster=self.cluster, cid=cid_1, oid=oid, alive_node=self.cluster.cluster_nodes[0] ) with reporter.step("Turn off random node with object"): cluster_state_controller.stop_node_host(node=random.choice(object_nodes[1:]), mode="hard") with reporter.step("Checking PUT operation"): oid_2 = put_object( wallet=default_wallet, path=simple_file, cid=cid_1, shell=self.shell, endpoint=self.cluster.default_rpc_endpoint, ) with reporter.step("Checking GET operation"): get_file = get_object( wallet=default_wallet, cid=cid_1, oid=oid_2, shell=self.shell, endpoint=object_nodes[0].storage_node.get_rpc_endpoint(), ) os.remove(get_file) with reporter.step("Checking creating container"): _ = create_container( default_wallet, shell=self.shell, endpoint=object_nodes[0].storage_node.get_rpc_endpoint(), rule=placement_rule, basic_acl=PUBLIC_ACL, )