import logging import os from time import sleep import allure import pytest from import test_case from import Host from frostfs_testlib.resources.common import MORPH_BLOCK_TIME from frostfs_testlib.resources.wellknown_acl import PUBLIC_ACL from frostfs_testlib.s3 import AwsCliClient, Boto3ClientWrapper, S3ClientWrapper, VersioningStatus from import CommandOptions, Shell from frostfs_testlib.steps.cli.container import create_container from frostfs_testlib.steps.cli.object import get_object, put_object_to_random_node from frostfs_testlib.steps.node_management import ( check_node_in_map, check_node_not_in_map, exclude_node_from_network_map, include_node_to_network_map, remove_nodes_from_map_morph, wait_for_node_to_be_ready, ) from frostfs_testlib.steps.s3 import s3_helper from import Cluster, StorageNode from frostfs_testlib.testing.cluster_test_base import ClusterTestBase from frostfs_testlib.utils import datetime_utils from frostfs_testlib.utils.failover_utils import ( wait_all_storage_nodes_returned, wait_object_replication, ) from frostfs_testlib.utils.file_utils import generate_file, get_file_hash logger = logging.getLogger("NeoLogger") stopped_nodes: list[StorageNode] = [] @allure.step("Return all stopped hosts") @pytest.fixture(scope="function", autouse=True) def after_run_return_all_stopped_hosts(client_shell: Shell, cluster: Cluster): yield return_stopped_hosts(client_shell, cluster) def panic_reboot_host(host: Host) -> None: shell = host.get_shell() shell.exec('sudo sh -c "echo 1 > /proc/sys/kernel/sysrq"') options = CommandOptions(close_stdin=True, timeout=1, check=False) shell.exec('sudo sh -c "echo b > /proc/sysrq-trigger"', options) def return_stopped_hosts(shell: Shell, cluster: Cluster) -> None: for node in list(stopped_nodes): with allure.step(f"Start host {node}"): stopped_nodes.remove(node) wait_all_storage_nodes_returned(shell, cluster) @pytest.mark.failover class TestFailoverStorage(ClusterTestBase): @allure.title("Lose and return storage node's host") @pytest.mark.parametrize("hard_reboot", [True, False]) @pytest.mark.failover_reboot def test_lose_storage_node_host( self, default_wallet, hard_reboot: bool, require_multiple_hosts, simple_object_size ): wallet = default_wallet placement_rule = "REP 2 IN X CBF 2 SELECT 2 FROM * AS X" source_file_path = generate_file(simple_object_size) cid = create_container( wallet,, endpoint=self.cluster.default_rpc_endpoint, rule=placement_rule, basic_acl=PUBLIC_ACL, ) oid = put_object_to_random_node( wallet, source_file_path, cid,, cluster=self.cluster ) nodes = wait_object_replication( cid, oid, 2,, nodes=self.cluster.storage_nodes ) for node in nodes: stopped_nodes.append(node) with allure.step(f"Stop host {node}"):"hard" if hard_reboot else "soft") new_nodes = wait_object_replication( cid, oid, 2,, nodes=list(set(self.cluster.storage_nodes) - {node}), ) assert all(old_node not in new_nodes for old_node in nodes) with allure.step("Check object data is not corrupted"): got_file_path = get_object( wallet, cid, oid, endpoint=new_nodes[0].get_rpc_endpoint(), ) assert get_file_hash(source_file_path) == get_file_hash(got_file_path) with allure.step("Return all hosts"): return_stopped_hosts(, self.cluster) with allure.step("Check object data is not corrupted"): new_nodes = wait_object_replication( cid, oid, 2,, nodes=self.cluster.storage_nodes ) got_file_path = get_object( wallet, cid, oid,, endpoint=new_nodes[0].get_rpc_endpoint() ) assert get_file_hash(source_file_path) == get_file_hash(got_file_path) @allure.title("Panic storage node's host") @pytest.mark.parametrize("sequence", [True, False]) @pytest.mark.failover_panic def test_panic_storage_node_host( self, default_wallet, require_multiple_hosts, sequence: bool, simple_object_size ): wallet = default_wallet placement_rule = "REP 2 IN X CBF 2 SELECT 2 FROM * AS X" source_file_path = generate_file(simple_object_size) cid = create_container( wallet,, endpoint=self.cluster.default_rpc_endpoint, rule=placement_rule, basic_acl=PUBLIC_ACL, ) oid = put_object_to_random_node( wallet, source_file_path, cid,, cluster=self.cluster ) nodes = wait_object_replication( cid, oid, 2,, nodes=self.cluster.storage_nodes ) allure.attach( "\n".join([str(node) for node in nodes]), "Current nodes with object", allure.attachment_type.TEXT, ) new_nodes: list[StorageNode] = [] for node in nodes: with allure.step(f"Hard reboot host {node} via magic SysRq option"): panic_reboot_host( if sequence: try: new_nodes = wait_object_replication( cid, oid, 2,, nodes=list(set(self.cluster.storage_nodes) - {node}), ) except AssertionError: new_nodes = wait_object_replication( cid, oid, 2,, nodes=self.cluster.storage_nodes, ) allure.attach( "\n".join([str(new_node) for new_node in new_nodes]), f"Nodes with object after {node} fail", allure.attachment_type.TEXT, ) if not sequence: new_nodes = wait_object_replication( cid, oid, 2,, nodes=self.cluster.storage_nodes ) allure.attach( "\n".join([str(new_node) for new_node in new_nodes]), "Nodes with object after nodes fail", allure.attachment_type.TEXT, ) got_file_path = get_object( wallet, cid, oid,, endpoint=new_nodes[0].get_rpc_endpoint() ) assert get_file_hash(source_file_path) == get_file_hash(got_file_path) def pytest_generate_tests(metafunc: pytest.Metafunc): if "s3_client" in metafunc.fixturenames: metafunc.parametrize("s3_client", [AwsCliClient, Boto3ClientWrapper], indirect=True) @pytest.mark.failover @pytest.mark.failover_empty_map class TestEmptyMap(ClusterTestBase): """ A set of tests for makes map empty and verify that we can read objects after that """ @allure.step("Teardown after EmptyMap offline test") @pytest.fixture() def empty_map_offline_teardown(self): yield with allure.step("Return all storage nodes to network map"): for node in list(stopped_nodes): include_node_to_network_map(node, node,, cluster=self.cluster) stopped_nodes.remove(node) @test_case.title("Test makes network map empty (offline all storage nodes)") @test_case.priority(test_case.TestCasePriority.HIGH) @test_case.suite_name("failovers") @test_case.suite_section("test_failover_storage") @pytest.mark.failover_empty_map_offlne @allure.title("Test makes network map empty (offline all storage nodes)") def test_offline_all_storage_nodes( self, s3_client: S3ClientWrapper, bucket: str, simple_object_size: int, empty_map_offline_teardown, ): """ The test makes network map empty (set offline status on all storage nodes) then returns all nodes to map and checks that object can read through s3. Steps: 1. Check that bucket is empty 2: PUT object into bucket 3: Check that object exists in bucket 4: Exclude all storage nodes from network map (set status OFFLINE) 5: Return all storage nodes to network map 6: Check that we can read object from #2 Args: bucket: bucket which contains tested object simple_object_size: size of object """ file_path = generate_file(simple_object_size) file_name = s3_helper.object_key_from_file_path(file_path) bucket_objects = [file_name] objects_list = s3_client.list_objects(bucket) assert not objects_list, f"Expected empty bucket, got {objects_list}" with allure.step("Put object into bucket"): s3_client.put_object(bucket, file_path) with allure.step("Check that object exists in bucket"): s3_helper.check_objects_in_bucket(s3_client, bucket, bucket_objects) storage_nodes = self.cluster.storage_nodes with allure.step("Exclude all storage nodes from network map"): for node in storage_nodes: exclude_node_from_network_map(node, node,, cluster=self.cluster) stopped_nodes.append(node) with allure.step("Return all storage nodes to network map"): for node in storage_nodes: include_node_to_network_map(node, node,, cluster=self.cluster) stopped_nodes.remove(node) with allure.step("Check that we can read object"): s3_helper.check_objects_in_bucket(s3_client, bucket, bucket_objects) @allure.step("Teardown after EmptyMap stop service test") @pytest.fixture() def empty_map_stop_service_teardown(self): yield with allure.step("Return all storage nodes to network map"): for node in list(list(stopped_nodes)): with allure.step(f"Start node {node}"): node.start_service() with allure.step(f"Waiting status ready for node {node}"): wait_for_node_to_be_ready(node) sleep(datetime_utils.parse_time(MORPH_BLOCK_TIME)) self.tick_epochs(1) check_node_in_map(node,, alive_node=node) stopped_nodes.remove(node) @test_case.title("Test makes network map empty (stop storage service on all nodes)") @test_case.priority(test_case.TestCasePriority.HIGH) @test_case.suite_name("failovers") @test_case.suite_section("test_failover_storage") @pytest.mark.failover_empty_map_stop_service @allure.title("Test makes network map empty (stop storage service on all nodes)") def test_stop_all_storage_nodes( self, s3_client: S3ClientWrapper, bucket: str, simple_object_size: int, empty_map_stop_service_teardown, ): """ The test makes network map empty (stop storage service on all nodes then use 'frostfs-adm morph delete-nodes' to delete nodes from map) then start all services and checks that object can read through s3. Steps: 1. Check that bucket is empty 2: PUT object into bucket 3: Check that object exists in bucket 4: Exclude all storage nodes from network map (stop storage service and manual exclude from map) 5: Return all storage nodes to network map 6: Check that we can read object from #2 Args: bucket: bucket which contains tested object simple_object_size: size of object """ file_path = generate_file(simple_object_size) file_name = s3_helper.object_key_from_file_path(file_path) bucket_objects = [file_name] objects_list = s3_client.list_objects(bucket) assert not objects_list, f"Expected empty bucket, got {objects_list}" with allure.step("Put object into bucket"): s3_client.put_object(bucket, file_path) with allure.step("Check that object exists in bucket"): s3_helper.check_objects_in_bucket(s3_client, bucket, bucket_objects) with allure.step("Stop all storage nodes"): for node in self.cluster.storage_nodes: with allure.step(f"Stop storage service on node: {node}"): node.stop_service() stopped_nodes.append(node) with allure.step("Remove all nodes from network map"): remove_nodes_from_map_morph(, cluster=self.cluster, remove_nodes=stopped_nodes ) with allure.step("Return all storage nodes to network map"): self.return_nodes_after_stop_with_check_empty_map(stopped_nodes) with allure.step("Check that object exists in bucket"): s3_helper.check_objects_in_bucket(s3_client, bucket, bucket_objects) @allure.step("Return all nodes to cluster with check empty map first") def return_nodes_after_stop_with_check_empty_map(self, return_nodes=None) -> None: first_node = True for node in list(return_nodes): with allure.step(f"Start node {node}"): node.start_service() with allure.step(f"Waiting status ready for node {node}"): wait_for_node_to_be_ready(node) with allure.step("Make sure that network map is empty"): if first_node: for check_node in list(return_nodes): check_node_not_in_map(check_node,, alive_node=node) first_node = False sleep(datetime_utils.parse_time(MORPH_BLOCK_TIME)) self.tick_epochs(1) check_node_in_map(node,, alive_node=node) stopped_nodes.remove(node) @allure.title("Test S3 Object loss from fstree/blobovnicza, versioning is enabled") def test_s3_fstree_blobovnicza_loss_versioning_on( self, s3_client: S3ClientWrapper, simple_object_size ): bucket = s3_client.create_bucket() s3_helper.set_bucket_versioning(s3_client, bucket, VersioningStatus.ENABLED) file_path = generate_file(simple_object_size) file_name = s3_helper.object_key_from_file_path(file_path) object_versions = [] with allure.step("Put object into one bucket"): put_object = s3_client.put_object(bucket, file_path) s3_helper.check_objects_in_bucket(s3_client, bucket, expected_objects=[file_name]) object_versions.append(put_object) with allure.step("Stop all storage nodes"): for node in self.cluster.storage_nodes: with allure.step(f"Stop storage service on node: {node}"): node.stop_service() stopped_nodes.append(node) with allure.step("Delete blobovnicza and fstree from all nodes"): for node in self.cluster.storage_nodes: node.delete_blobovnicza() node.delete_fstree() with allure.step("Start all storage nodes"): for node in list(stopped_nodes): with allure.step(f"Start node {node}"): node.start_service() stopped_nodes.remove(node) with allure.step(f"Waiting status ready for node {node}"): wait_for_node_to_be_ready(node) # need to get Delete Marker first with allure.step("Delete the object from the bucket"): delete_object = s3_client.delete_object(bucket, file_name) object_versions.append(delete_object["VersionId"]) # and now delete all versions of object (including Delete Markers) with allure.step("Delete all versions of the object from the bucket"): for version in object_versions: delete_object = s3_client.delete_object(bucket, file_name, version_id=version) with allure.step("Delete bucket"): s3_client.delete_bucket(bucket) @allure.title("Test S3 Object loss from fstree/blobovnicza, versioning is disabled") def test_s3_fstree_blobovnicza_loss_versioning_off( self, s3_client: S3ClientWrapper, simple_object_size ): bucket = s3_client.create_bucket() file_path = generate_file(simple_object_size) file_name = s3_helper.object_key_from_file_path(file_path) with allure.step("Put object into one bucket"): s3_client.put_object(bucket, file_path) s3_helper.check_objects_in_bucket(s3_client, bucket, expected_objects=[file_name]) with allure.step("Stop all storage nodes"): for node in self.cluster.storage_nodes: with allure.step(f"Stop storage service on node: {node}"): node.stop_service() stopped_nodes.append(node) with allure.step("Delete blobovnicza and fstree from all nodes"): for node in self.cluster.storage_nodes: node.delete_blobovnicza() node.delete_fstree() with allure.step("Start all storage nodes"): for node in list(stopped_nodes): with allure.step(f"Start node {node}"): node.start_service() stopped_nodes.remove(node) with allure.step(f"Waiting status ready for node {node}"): wait_for_node_to_be_ready(node) with allure.step("Delete the object from the bucket"): s3_client.delete_object(bucket, file_name) with allure.step("Delete bucket"): s3_client.delete_bucket(bucket) @allure.step("Test S3 Loss of one node should trigger use of tree service in another node") def test_s3_one_endpoint_loss(self, bucket, s3_client: S3ClientWrapper, simple_object_size): with allure.step("Stop one node and wait for rebalance"): current_node = self.cluster.storage_nodes[0] current_node.stop_service() # waiting for rebalance sleep(60) file_path = generate_file(simple_object_size) file_name = s3_helper.object_key_from_file_path(file_path) with allure.step("Put object into one bucket"): put_object = s3_client.put_object(bucket, file_path) s3_helper.check_objects_in_bucket(s3_client, bucket, expected_objects=[file_name]) with allure.step(f"Start node {current_node}"): current_node.start_service() with allure.step(f"Waiting status ready for node {current_node}"): wait_for_node_to_be_ready(current_node)