import os import logging from time import sleep import allure import pytest from frostfs_testlib.analytics import test_case from frostfs_testlib.hosting import Host from frostfs_testlib.resources.common import PUBLIC_ACL from frostfs_testlib.shell import CommandOptions from frostfs_testlib.utils import datetime_utils from pytest_tests.resources.common import FROSTFS_CONTRACT_CACHE_TIMEOUT, MORPH_BLOCK_TIME from pytest_tests.helpers.cluster import Cluster, StorageNode from pytest_tests.helpers.container import create_container from pytest_tests.helpers.failover_utils import ( wait_all_storage_nodes_returned, wait_object_replication, ) from pytest_tests.helpers.file_helper import generate_file, get_file_hash from pytest_tests.helpers.frostfs_verbs import get_object, put_object_to_random_node from pytest_tests.steps.cluster_test_base import ClusterTestBase from pytest_tests.helpers.node_management import ( check_node_in_map, check_node_not_in_map, exclude_node_from_network_map, include_node_to_network_map, stop_random_storage_nodes, wait_for_node_to_be_ready, remove_nodes_from_map_morph ) from pytest_tests.helpers.s3_helper import ( check_objects_in_bucket ) from pytest_tests.steps import s3_gate_object from pytest_tests.steps.s3_gate_base import TestS3GateBase from pytest_tests.helpers.aws_cli_client import AwsCliClient from pytest_tests.helpers.file_helper import ( generate_file, get_file_hash, ) from pytest_tests.helpers.node_management import ( check_node_in_map, exclude_node_from_network_map, include_node_to_network_map, ) logger = logging.getLogger("NeoLogger") stopped_nodes: list[StorageNode] = [] @allure.step("Return all stopped hosts") @pytest.fixture(scope="function", autouse=True) def after_run_return_all_stopped_hosts(cluster: Cluster): yield return_stopped_hosts(cluster) def panic_reboot_host(host: Host) -> None: shell = host.get_shell() shell.exec('sudo sh -c "echo 1 > /proc/sys/kernel/sysrq"') options = CommandOptions(close_stdin=True, timeout=1, check=False) shell.exec('sudo sh -c "echo b > /proc/sysrq-trigger"', options) def return_stopped_hosts(cluster: Cluster) -> None: for node in list(stopped_nodes): with allure.step(f"Start host {node}"): node.host.start_host() stopped_nodes.remove(node) wait_all_storage_nodes_returned(cluster) @pytest.mark.failover class TestFailoverStorage(ClusterTestBase): @allure.title("Lose and return storage node's host") @pytest.mark.parametrize("hard_reboot", [True, False]) @pytest.mark.failover_reboot def test_lose_storage_node_host( self, default_wallet, hard_reboot: bool, require_multiple_hosts, simple_object_size ): wallet = default_wallet placement_rule = "REP 2 IN X CBF 2 SELECT 2 FROM * AS X" source_file_path = generate_file(simple_object_size) cid = create_container( wallet, shell=self.shell, endpoint=self.cluster.default_rpc_endpoint, rule=placement_rule, basic_acl=PUBLIC_ACL, ) oid = put_object_to_random_node( wallet, source_file_path, cid, shell=self.shell, cluster=self.cluster ) nodes = wait_object_replication( cid, oid, 2, shell=self.shell, nodes=self.cluster.storage_nodes ) for node in nodes: stopped_nodes.append(node) with allure.step(f"Stop host {node}"): node.host.stop_host("hard" if hard_reboot else "soft") new_nodes = wait_object_replication( cid, oid, 2, shell=self.shell, nodes=list(set(self.cluster.storage_nodes) - {node}), ) assert all(old_node not in new_nodes for old_node in nodes) with allure.step("Check object data is not corrupted"): got_file_path = get_object( wallet, cid, oid, endpoint=new_nodes[0].get_rpc_endpoint(), shell=self.shell ) assert get_file_hash(source_file_path) == get_file_hash(got_file_path) with allure.step("Return all hosts"): return_stopped_hosts(self.cluster) with allure.step("Check object data is not corrupted"): new_nodes = wait_object_replication( cid, oid, 2, shell=self.shell, nodes=self.cluster.storage_nodes ) got_file_path = get_object( wallet, cid, oid, shell=self.shell, endpoint=new_nodes[0].get_rpc_endpoint() ) assert get_file_hash(source_file_path) == get_file_hash(got_file_path) @allure.title("Panic storage node's host") @pytest.mark.parametrize("sequence", [True, False]) @pytest.mark.failover_panic def test_panic_storage_node_host( self, default_wallet, require_multiple_hosts, sequence: bool, simple_object_size ): wallet = default_wallet placement_rule = "REP 2 IN X CBF 2 SELECT 2 FROM * AS X" source_file_path = generate_file(simple_object_size) cid = create_container( wallet, shell=self.shell, endpoint=self.cluster.default_rpc_endpoint, rule=placement_rule, basic_acl=PUBLIC_ACL, ) oid = put_object_to_random_node( wallet, source_file_path, cid, shell=self.shell, cluster=self.cluster ) nodes = wait_object_replication( cid, oid, 2, shell=self.shell, nodes=self.cluster.storage_nodes ) allure.attach( "\n".join([str(node) for node in nodes]), "Current nodes with object", allure.attachment_type.TEXT, ) new_nodes: list[StorageNode] = [] for node in nodes: with allure.step(f"Hard reboot host {node} via magic SysRq option"): panic_reboot_host(node.host) if sequence: try: new_nodes = wait_object_replication( cid, oid, 2, shell=self.shell, nodes=list(set(self.cluster.storage_nodes) - {node}), ) except AssertionError: new_nodes = wait_object_replication( cid, oid, 2, shell=self.shell, nodes=self.cluster.storage_nodes, ) allure.attach( "\n".join([str(new_node) for new_node in new_nodes]), f"Nodes with object after {node} fail", allure.attachment_type.TEXT, ) if not sequence: new_nodes = wait_object_replication( cid, oid, 2, shell=self.shell, nodes=self.cluster.storage_nodes ) allure.attach( "\n".join([str(new_node) for new_node in new_nodes]), "Nodes with object after nodes fail", allure.attachment_type.TEXT, ) got_file_path = get_object( wallet, cid, oid, shell=self.shell, endpoint=new_nodes[0].get_rpc_endpoint() ) assert get_file_hash(source_file_path) == get_file_hash(got_file_path) def pytest_generate_tests(metafunc): if "s3_client" in metafunc.fixturenames: metafunc.parametrize("s3_client", ["aws cli", "boto3"], indirect=True) @pytest.mark.failover @pytest.mark.failover_empty_map class TestEmptyMap(TestS3GateBase): """ A set of tests for makes map empty and verify that we can read objects after that """ @allure.step("Teardown after EmptyMap offline test") @pytest.fixture() def empty_map_offline_teardown(self): yield with allure.step("Return all storage nodes to network map"): for node in list(stopped_nodes): include_node_to_network_map( node, node, shell=self.shell, cluster=self.cluster ) stopped_nodes.remove(node) @staticmethod def object_key_from_file_path(full_path: str) -> str: return os.path.basename(full_path) @test_case.title("Test makes network map empty (offline all storage nodes)") @test_case.priority(test_case.TestCasePriority.HIGH) @test_case.suite_name("failovers") @test_case.suite_section("test_failover_storage") @pytest.mark.failover_empty_map_offlne @allure.title("Test makes network map empty (offline all storage nodes)") def test_offline_all_storage_nodes(self, bucket, simple_object_size, empty_map_offline_teardown): """ The test makes network map empty (set offline status on all storage nodes) then returns all nodes to map and checks that object can read through s3. Steps: 1. Check that bucket is empty 2: PUT object into bucket 3: Check that object exists in bucket 4: Exclude all storage nodes from network map (set status OFFLINE) 5: Return all storage nodes to network map 6: Check that we can read object from #2 Args: bucket: bucket which contains tested object simple_object_size: size of object """ file_path = generate_file(simple_object_size) file_name = self.object_key_from_file_path(file_path) bucket_objects = [file_name] objects_list = s3_gate_object.list_objects_s3(self.s3_client, bucket) assert not objects_list, f"Expected empty bucket, got {objects_list}" with allure.step("Put object into bucket"): s3_gate_object.put_object_s3(self.s3_client, bucket, file_path) with allure.step("Check that object exists in bucket"): check_objects_in_bucket(self.s3_client, bucket, bucket_objects) storage_nodes = self.cluster.storage_nodes with allure.step("Exclude all storage nodes from network map"): for node in storage_nodes: exclude_node_from_network_map( node, node, shell=self.shell, cluster=self.cluster ) stopped_nodes.append(node) with allure.step("Return all storage nodes to network map"): for node in storage_nodes: include_node_to_network_map( node, node, shell=self.shell, cluster=self.cluster ) stopped_nodes.remove(node) with allure.step("Check that we can read object"): check_objects_in_bucket(self.s3_client, bucket, bucket_objects) objects_list = s3_gate_object.list_objects_s3(self.s3_client, bucket) assert not objects_list, f"Expected empty bucket, got {objects_list}" @allure.step("Teardown after EmptyMap stop service test") @pytest.fixture() def empty_map_stop_service_teardown(self): yield with allure.step("Return all storage nodes to network map"): for node in list(list(stopped_nodes)): with allure.step(f"Start node {node}"): node.start_service() with allure.step(f"Waiting status ready for node {node}"): wait_for_node_to_be_ready(node) sleep(datetime_utils.parse_time(MORPH_BLOCK_TIME)) self.tick_epochs(1) check_node_in_map(node, shell=self.shell, alive_node=node) stopped_nodes.remove(node) @test_case.title("Test makes network map empty (stop storage service on all nodes)") @test_case.priority(test_case.TestCasePriority.HIGH) @test_case.suite_name("failovers") @test_case.suite_section("test_failover_storage") @pytest.mark.failover_empty_map_stop_service @allure.title("Test makes network map empty (stop storage service on all nodes)") def test_stop_all_storage_nodes(self, bucket, simple_object_size, empty_map_stop_service_teardown): """ The test makes network map empty (stop storage service on all nodes then use 'frostfs-adm morph delete-nodes' to delete nodes from map) then start all services and checks that object can read through s3. Steps: 1. Check that bucket is empty 2: PUT object into bucket 3: Check that object exists in bucket 4: Exclude all storage nodes from network map (stop storage service and manual exclude from map) 5: Return all storage nodes to network map 6: Check that we can read object from #2 Args: bucket: bucket which contains tested object simple_object_size: size of object """ file_path = generate_file(simple_object_size) file_name = self.object_key_from_file_path(file_path) bucket_objects = [file_name] objects_list = s3_gate_object.list_objects_s3(self.s3_client, bucket) assert not objects_list, f"Expected empty bucket, got {objects_list}" with allure.step("Put object into bucket"): s3_gate_object.put_object_s3(self.s3_client, bucket, file_path) with allure.step("Check that object exists in bucket"): check_objects_in_bucket(self.s3_client, bucket, bucket_objects) with allure.step("Stop all storage nodes"): for node in self.cluster.storage_nodes: with allure.step(f"Stop storage service on node: {node}"): node.stop_service() stopped_nodes.append(node) with allure.step(f"Remove all nodes from network map"): remove_nodes_from_map_morph(shell=self.shell, cluster=self.cluster, remove_nodes=stopped_nodes) with allure.step("Return all storage nodes to network map"): self.return_nodes_after_stop_with_check_empty_map(stopped_nodes) with allure.step("Check that object exists in bucket"): check_objects_in_bucket(self.s3_client, bucket, bucket_objects) @allure.step("Return all nodes to cluster with check empty map first") def return_nodes_after_stop_with_check_empty_map(self, return_nodes = None) -> None: first_node = True for node in list(return_nodes): with allure.step(f"Start node {node}"): node.start_service() with allure.step(f"Waiting status ready for node {node}"): wait_for_node_to_be_ready(node) with allure.step(f"We need to make sure that network map is empty"): if first_node: for check_node in list(return_nodes): check_node_not_in_map(check_node, shell=self.shell, alive_node=node) first_node = False sleep(datetime_utils.parse_time(MORPH_BLOCK_TIME)) self.tick_epochs(1) check_node_in_map(node, shell=self.shell, alive_node=node) stopped_nodes.remove(node)