import logging from datetime import datetime from time import sleep import allure import pytest from frostfs_testlib.analytics import test_case from frostfs_testlib.hosting import Host from frostfs_testlib.resources.common import MORPH_BLOCK_TIME from frostfs_testlib.resources.wellknown_acl import PUBLIC_ACL from frostfs_testlib.s3 import AwsCliClient, Boto3ClientWrapper, S3ClientWrapper, VersioningStatus from frostfs_testlib.shell import CommandOptions, Shell from frostfs_testlib.steps.cli.container import ( StorageContainer, StorageContainerInfo, create_container, ) from frostfs_testlib.steps.cli.object import get_object, put_object_to_random_node from frostfs_testlib.steps.node_management import ( check_node_in_map, check_node_not_in_map, exclude_node_from_network_map, include_node_to_network_map, remove_nodes_from_map_morph, wait_for_node_to_be_ready, ) from frostfs_testlib.steps.s3 import s3_helper from frostfs_testlib.storage.cluster import Cluster, ClusterNode, StorageNode from frostfs_testlib.storage.controllers import ClusterStateController, ShardsWatcher from frostfs_testlib.storage.dataclasses.storage_object_info import StorageObjectInfo from frostfs_testlib.storage.dataclasses.wallet import WalletInfo from frostfs_testlib.testing.cluster_test_base import ClusterTestBase from frostfs_testlib.testing.test_control import expect_not_raises from frostfs_testlib.utils import datetime_utils from frostfs_testlib.utils.failover_utils import ( wait_all_storage_nodes_returned, wait_object_replication, ) from frostfs_testlib.utils.file_keeper import FileKeeper from frostfs_testlib.utils.file_utils import generate_file, get_file_hash logger = logging.getLogger("NeoLogger") stopped_nodes: list[StorageNode] = [] @pytest.fixture(scope="function") @allure.title("Provide File Keeper") def file_keeper(): keeper = FileKeeper() yield keeper keeper.restore_files() @allure.step("Return all stopped hosts") @pytest.fixture(scope="function", autouse=True) def after_run_return_all_stopped_hosts(client_shell: Shell, cluster: Cluster) -> str: yield "After this test stopped services will be started automatically via fixture" return_stopped_hosts(client_shell, cluster) @allure.step("Return all stopped storage services after test") @pytest.fixture(scope="function") def after_run_return_all_stopped_services(cluster_state_controller: ClusterStateController): yield cluster_state_controller.start_stopped_storage_services() def panic_reboot_host(host: Host) -> None: shell = host.get_shell() shell.exec('sudo sh -c "echo 1 > /proc/sys/kernel/sysrq"') options = CommandOptions(close_stdin=True, timeout=1, check=False) shell.exec('sudo sh -c "echo b > /proc/sysrq-trigger"', options) def return_stopped_hosts(shell: Shell, cluster: Cluster) -> None: for node in list(stopped_nodes): with allure.step(f"Start host {node}"): node.host.start_host() stopped_nodes.remove(node) wait_all_storage_nodes_returned(shell, cluster) @pytest.mark.failover class TestFailoverStorage(ClusterTestBase): @pytest.fixture(scope="function", autouse=True) def start_stopped_services(self, cluster_state_controller: ClusterStateController): yield cluster_state_controller.start_stopped_storage_services() @allure.title("Lose and return storage node's host") @pytest.mark.parametrize("hard_reboot", [True, False]) @pytest.mark.failover_reboot def test_lose_storage_node_host( self, default_wallet, hard_reboot: bool, require_multiple_hosts, simple_object_size ): wallet = default_wallet placement_rule = "REP 2 IN X CBF 2 SELECT 2 FROM * AS X" source_file_path = generate_file(simple_object_size) cid = create_container( wallet, shell=self.shell, endpoint=self.cluster.default_rpc_endpoint, rule=placement_rule, basic_acl=PUBLIC_ACL, ) oid = put_object_to_random_node( wallet, source_file_path, cid, shell=self.shell, cluster=self.cluster ) nodes = wait_object_replication( cid, oid, 2, shell=self.shell, nodes=self.cluster.storage_nodes ) for node in nodes: stopped_nodes.append(node) with allure.step(f"Stop host {node}"): node.host.stop_host("hard" if hard_reboot else "soft") new_nodes = wait_object_replication( cid, oid, 2, shell=self.shell, nodes=list(set(self.cluster.storage_nodes) - {node}), ) assert all(old_node not in new_nodes for old_node in nodes) with allure.step("Check object data is not corrupted"): got_file_path = get_object( wallet, cid, oid, endpoint=new_nodes[0].get_rpc_endpoint(), shell=self.shell ) assert get_file_hash(source_file_path) == get_file_hash(got_file_path) with allure.step("Return all hosts"): return_stopped_hosts(self.shell, self.cluster) with allure.step("Check object data is not corrupted"): new_nodes = wait_object_replication( cid, oid, 2, shell=self.shell, nodes=self.cluster.storage_nodes ) got_file_path = get_object( wallet, cid, oid, shell=self.shell, endpoint=new_nodes[0].get_rpc_endpoint() ) assert get_file_hash(source_file_path) == get_file_hash(got_file_path) @allure.title("Panic storage node's host") @pytest.mark.parametrize("sequence", [True, False]) @pytest.mark.failover_panic def test_panic_storage_node_host( self, default_wallet, require_multiple_hosts, sequence: bool, simple_object_size ): wallet = default_wallet placement_rule = "REP 2 IN X CBF 2 SELECT 2 FROM * AS X" source_file_path = generate_file(simple_object_size) cid = create_container( wallet, shell=self.shell, endpoint=self.cluster.default_rpc_endpoint, rule=placement_rule, basic_acl=PUBLIC_ACL, ) oid = put_object_to_random_node( wallet, source_file_path, cid, shell=self.shell, cluster=self.cluster ) nodes = wait_object_replication( cid, oid, 2, shell=self.shell, nodes=self.cluster.storage_nodes ) allure.attach( "\n".join([str(node) for node in nodes]), "Current nodes with object", allure.attachment_type.TEXT, ) new_nodes: list[StorageNode] = [] for node in nodes: with allure.step(f"Hard reboot host {node} via magic SysRq option"): panic_reboot_host(node.host) if sequence: try: new_nodes = wait_object_replication( cid, oid, 2, shell=self.shell, nodes=list(set(self.cluster.storage_nodes) - {node}), ) except AssertionError: new_nodes = wait_object_replication( cid, oid, 2, shell=self.shell, nodes=self.cluster.storage_nodes, ) allure.attach( "\n".join([str(new_node) for new_node in new_nodes]), f"Nodes with object after {node} fail", allure.attachment_type.TEXT, ) if not sequence: new_nodes = wait_object_replication( cid, oid, 2, shell=self.shell, nodes=self.cluster.storage_nodes ) allure.attach( "\n".join([str(new_node) for new_node in new_nodes]), "Nodes with object after nodes fail", allure.attachment_type.TEXT, ) got_file_path = get_object( wallet, cid, oid, shell=self.shell, endpoint=new_nodes[0].get_rpc_endpoint() ) assert get_file_hash(source_file_path) == get_file_hash(got_file_path) def pytest_generate_tests(metafunc: pytest.Metafunc): if "s3_client" in metafunc.fixturenames: metafunc.parametrize("s3_client", [AwsCliClient, Boto3ClientWrapper], indirect=True) @pytest.mark.failover @pytest.mark.failover_empty_map class TestEmptyMap(ClusterTestBase): """ A set of tests for makes map empty and verify that we can read objects after that """ @allure.step("Teardown after EmptyMap offline test") @pytest.fixture() def empty_map_offline_teardown(self): yield with allure.step("Return all storage nodes to network map"): for node in list(stopped_nodes): include_node_to_network_map(node, node, shell=self.shell, cluster=self.cluster) stopped_nodes.remove(node) @test_case.title("Test makes network map empty (offline all storage nodes)") @test_case.priority(test_case.TestCasePriority.HIGH) @test_case.suite_name("failovers") @test_case.suite_section("test_failover_storage") @pytest.mark.failover_empty_map_offlne @allure.title("Test makes network map empty (offline all storage nodes)") def test_offline_all_storage_nodes( self, s3_client: S3ClientWrapper, bucket: str, simple_object_size: int, empty_map_offline_teardown, ): """ The test makes network map empty (set offline status on all storage nodes) then returns all nodes to map and checks that object can read through s3. Steps: 1. Check that bucket is empty 2: PUT object into bucket 3: Check that object exists in bucket 4: Exclude all storage nodes from network map (set status OFFLINE) 5: Return all storage nodes to network map 6: Check that we can read object from #2 Args: bucket: bucket which contains tested object simple_object_size: size of object """ file_path = generate_file(simple_object_size) file_name = s3_helper.object_key_from_file_path(file_path) bucket_objects = [file_name] objects_list = s3_client.list_objects(bucket) assert not objects_list, f"Expected empty bucket, got {objects_list}" with allure.step("Put object into bucket"): s3_client.put_object(bucket, file_path) with allure.step("Check that object exists in bucket"): s3_helper.check_objects_in_bucket(s3_client, bucket, bucket_objects) storage_nodes = self.cluster.storage_nodes with allure.step("Exclude all storage nodes from network map"): for node in storage_nodes: exclude_node_from_network_map(node, node, shell=self.shell, cluster=self.cluster) stopped_nodes.append(node) with allure.step("Return all storage nodes to network map"): for node in storage_nodes: include_node_to_network_map(node, node, shell=self.shell, cluster=self.cluster) stopped_nodes.remove(node) with allure.step("Check that we can read object"): s3_helper.check_objects_in_bucket(s3_client, bucket, bucket_objects) @allure.step("Teardown after EmptyMap stop service test") @pytest.fixture() def empty_map_stop_service_teardown(self): yield with allure.step("Return all storage nodes to network map"): for node in list(list(stopped_nodes)): with allure.step(f"Start node {node}"): node.start_service() with allure.step(f"Waiting status ready for node {node}"): wait_for_node_to_be_ready(node) sleep(datetime_utils.parse_time(MORPH_BLOCK_TIME)) self.tick_epochs(1) check_node_in_map(node, shell=self.shell, alive_node=node) stopped_nodes.remove(node) @test_case.title("Test makes network map empty (stop storage service on all nodes)") @test_case.priority(test_case.TestCasePriority.HIGH) @test_case.suite_name("failovers") @test_case.suite_section("test_failover_storage") @pytest.mark.failover_empty_map_stop_service @allure.title("Test makes network map empty (stop storage service on all nodes)") def test_stop_all_storage_nodes( self, s3_client: S3ClientWrapper, bucket: str, simple_object_size: int, empty_map_stop_service_teardown, ): """ The test makes network map empty (stop storage service on all nodes then use 'frostfs-adm morph delete-nodes' to delete nodes from map) then start all services and checks that object can read through s3. Steps: 1. Check that bucket is empty 2: PUT object into bucket 3: Check that object exists in bucket 4: Exclude all storage nodes from network map (stop storage service and manual exclude from map) 5: Return all storage nodes to network map 6: Check that we can read object from #2 Args: bucket: bucket which contains tested object simple_object_size: size of object """ file_path = generate_file(simple_object_size) file_name = s3_helper.object_key_from_file_path(file_path) bucket_objects = [file_name] objects_list = s3_client.list_objects(bucket) assert not objects_list, f"Expected empty bucket, got {objects_list}" with allure.step("Put object into bucket"): s3_client.put_object(bucket, file_path) with allure.step("Check that object exists in bucket"): s3_helper.check_objects_in_bucket(s3_client, bucket, bucket_objects) with allure.step("Stop all storage nodes"): for node in self.cluster.storage_nodes: with allure.step(f"Stop storage service on node: {node}"): node.stop_service() stopped_nodes.append(node) with allure.step("Remove all nodes from network map"): remove_nodes_from_map_morph( shell=self.shell, cluster=self.cluster, remove_nodes=stopped_nodes ) with allure.step("Return all storage nodes to network map"): self.return_nodes_after_stop_with_check_empty_map(stopped_nodes) with allure.step("Check that object exists in bucket"): s3_helper.check_objects_in_bucket(s3_client, bucket, bucket_objects) @allure.step("Return all nodes to cluster with check empty map first") def return_nodes_after_stop_with_check_empty_map(self, return_nodes=None) -> None: first_node = True for node in list(return_nodes): with allure.step(f"Start node {node}"): node.start_service() with allure.step(f"Waiting status ready for node {node}"): wait_for_node_to_be_ready(node) with allure.step("Make sure that network map is empty"): if first_node: for check_node in list(return_nodes): check_node_not_in_map(check_node, shell=self.shell, alive_node=node) first_node = False sleep(datetime_utils.parse_time(MORPH_BLOCK_TIME)) self.tick_epochs(1) check_node_in_map(node, shell=self.shell, alive_node=node) stopped_nodes.remove(node) @allure.title("Test S3 Object loss from fstree/blobovnicza, versioning is enabled") def test_s3_fstree_blobovnicza_loss_versioning_on( self, s3_client: S3ClientWrapper, simple_object_size: int, cluster_state_controller: ClusterStateController, ): bucket = s3_client.create_bucket() s3_helper.set_bucket_versioning(s3_client, bucket, VersioningStatus.ENABLED) file_path = generate_file(simple_object_size) file_name = s3_helper.object_key_from_file_path(file_path) object_versions = [] with allure.step("Put object into one bucket"): put_object = s3_client.put_object(bucket, file_path) s3_helper.check_objects_in_bucket(s3_client, bucket, expected_objects=[file_name]) object_versions.append(put_object) with allure.step("Stop all storage nodes"): for node in self.cluster.cluster_nodes: with allure.step(f"Stop storage service on node: {node}"): cluster_state_controller.stop_storage_service(node) with allure.step("Delete blobovnicza and fstree from all nodes"): for node in self.cluster.storage_nodes: node.delete_blobovnicza() node.delete_fstree() with allure.step("Start all storage nodes"): cluster_state_controller.start_stopped_storage_services() # need to get Delete Marker first with allure.step("Delete the object from the bucket"): delete_object = s3_client.delete_object(bucket, file_name) object_versions.append(delete_object["VersionId"]) # and now delete all versions of object (including Delete Markers) with allure.step("Delete all versions of the object from the bucket"): for version in object_versions: delete_object = s3_client.delete_object(bucket, file_name, version_id=version) with allure.step("Delete bucket"): s3_client.delete_bucket(bucket) @allure.title("Test S3 Object loss from fstree/blobovnicza, versioning is disabled") def test_s3_fstree_blobovnicza_loss_versioning_off( self, s3_client: S3ClientWrapper, simple_object_size: int, cluster_state_controller: ClusterStateController, ): bucket = s3_client.create_bucket() file_path = generate_file(simple_object_size) file_name = s3_helper.object_key_from_file_path(file_path) with allure.step("Put object into one bucket"): s3_client.put_object(bucket, file_path) s3_helper.check_objects_in_bucket(s3_client, bucket, expected_objects=[file_name]) with allure.step("Stop all storage nodes"): for node in self.cluster.cluster_nodes: with allure.step(f"Stop storage service on node: {node}"): cluster_state_controller.stop_storage_service(node) with allure.step("Delete blobovnicza and fstree from all nodes"): for node in self.cluster.storage_nodes: node.delete_blobovnicza() node.delete_fstree() with allure.step("Start all storage nodes"): cluster_state_controller.start_stopped_storage_services() with allure.step("Delete the object from the bucket"): s3_client.delete_object(bucket, file_name) with allure.step("Delete bucket"): s3_client.delete_bucket(bucket) @pytest.mark.skip(reason="Need to increase cache lifetime") @pytest.mark.parametrize( # versioning should NOT be VersioningStatus.SUSPENDED, it needs to be undefined "versioning_status", [VersioningStatus.ENABLED, None], ) @allure.title( "After Pilorama.db loss on all nodes list objects should return nothing in second listing" ) def test_s3_pilorama_loss( self, s3_client: S3ClientWrapper, simple_object_size: int, versioning_status: VersioningStatus, cluster_state_controller: ClusterStateController, ): bucket = s3_client.create_bucket() if versioning_status: s3_helper.set_bucket_versioning(s3_client, bucket, versioning_status) file_path = generate_file(simple_object_size) file_name = s3_helper.object_key_from_file_path(file_path) with allure.step("Put object into one bucket"): s3_client.put_object(bucket, file_path) s3_helper.check_objects_in_bucket(s3_client, bucket, expected_objects=[file_name]) with allure.step("Stop all storage nodes"): for node in self.cluster.cluster_nodes: with allure.step(f"Stop storage service on node: {node}"): cluster_state_controller.stop_storage_service(node) with allure.step("Delete pilorama.db from all nodes"): for node in self.cluster.storage_nodes: node.delete_pilorama() with allure.step("Start all storage nodes"): cluster_state_controller.start_stopped_storage_services() with allure.step("Check list objects first time"): objects_list = s3_client.list_objects(bucket) assert objects_list, f"Expected not empty bucket" with allure.step("Check list objects second time"): objects_list = s3_client.list_objects(bucket) assert not objects_list, f"Expected empty bucket, got {objects_list}" with allure.step("Delete bucket"): s3_client.delete_bucket(bucket) @pytest.mark.failover @pytest.mark.failover_data_loss class TestStorageDataLoss(ClusterTestBase): @allure.title( "After metabase loss on all nodes operations on objects and buckets should be still available via S3" ) @pytest.mark.metabase_loss def test_metabase_loss( self, s3_client: S3ClientWrapper, simple_object_size: int, complex_object_size: int, cluster_state_controller: ClusterStateController, after_run_return_all_stopped_services: str, file_keeper: FileKeeper, ): allure.dynamic.description(after_run_return_all_stopped_services) with allure.step("Create bucket"): bucket = s3_client.create_bucket() with allure.step("Put objects into bucket"): simple_object_path = generate_file(simple_object_size) simple_object_key = s3_helper.object_key_from_file_path(simple_object_path) complex_object_path = generate_file(complex_object_size) complex_object_key = s3_helper.object_key_from_file_path(complex_object_path) s3_client.put_object(bucket, simple_object_path) s3_client.put_object(bucket, complex_object_path) with allure.step("Check objects are in bucket"): s3_helper.check_objects_in_bucket( s3_client, bucket, expected_objects=[simple_object_key, complex_object_key] ) with allure.step("Stop storage services on all nodes"): cluster_state_controller.stop_all_storage_services() with allure.step("Delete metabase from all nodes"): for node in cluster_state_controller.cluster.storage_nodes: node.delete_metabase() with allure.step("Enable resync_metabase option for storage services"): for storage_node in cluster_state_controller.cluster.storage_nodes: with allure.step(f"Enable resync_metabase option for {storage_node}"): config_file_path, config = storage_node.get_config() if not config["storage"]["shard"]["default"]["resync_metabase"]: file_keeper.add(storage_node, config_file_path) config["storage"]["shard"]["default"]["resync_metabase"] = True storage_node.save_config(config) with allure.step("Start storage services on all nodes"): cluster_state_controller.start_stopped_storage_services() with allure.step("Delete objects from bucket"): with allure.step("Delete simple object from bucket"): with expect_not_raises(): s3_client.delete_object(bucket, simple_object_key) with allure.step("Delete complex object from bucket"): with expect_not_raises(): s3_client.delete_object(bucket, complex_object_key) with allure.step("Delete bucket"): with expect_not_raises(): s3_client.delete_bucket(bucket) @allure.title( "Write cache loss on one node should not affect shards and should not produce errors in log" ) @pytest.mark.write_cache_loss def test_write_cache_loss_on_one_node( self, node_under_test: ClusterNode, simple_object_size: int, cluster_state_controller: ClusterStateController, shards_watcher: ShardsWatcher, default_wallet: str, test_start_time: datetime, after_run_return_all_stopped_services: str, ): exception_messages = [] allure.dynamic.description(after_run_return_all_stopped_services) with allure.step(f"Create container on node {node_under_test}"): locode = node_under_test.storage_node.get_un_locode() placement_rule = f"""REP 1 IN X CBF 1 SELECT 1 FROM C AS X FILTER 'UN-LOCODE' EQ '{locode}' AS C""" cid = create_container( default_wallet, self.shell, node_under_test.storage_node.get_rpc_endpoint(), rule=placement_rule, ) container = StorageContainer( StorageContainerInfo(cid, WalletInfo(default_wallet)), self.shell, cluster_state_controller.cluster, ) with allure.step(f"Put couple objects to container on node {node_under_test}"): storage_objects: list[StorageObjectInfo] = [] for _ in range(5): storage_object = container.generate_object( simple_object_size, endpoint=node_under_test.storage_node.get_rpc_endpoint() ) storage_objects.append(storage_object) with allure.step("Take shards snapshot"): shards_watcher.take_shards_snapshot() with allure.step(f"Stop storage service on node {node_under_test}"): cluster_state_controller.stop_storage_service(node_under_test) with allure.step(f"Delete write cache from node {node_under_test}"): node_under_test.storage_node.delete_write_cache() with allure.step(f"Start storage service on node {node_under_test}"): cluster_state_controller.start_storage_service(node_under_test) with allure.step("Objects should be available"): for storage_object in storage_objects: get_object( storage_object.wallet_file_path, container.get_id(), storage_object.oid, self.shell, node_under_test.storage_node.get_rpc_endpoint(), ) with allure.step("No shards should have new errors"): shards_watcher.take_shards_snapshot() shards_with_errors = shards_watcher.get_shards_with_new_errors() if shards_with_errors: exception_messages.append(f"Shards have new errors: {shards_with_errors}") with allure.step("No shards should have degraded status"): snapshot = shards_watcher.get_shards_snapshot() for shard in snapshot: status = snapshot[shard]["mode"] if status != "read-write": exception_messages.append(f"Shard {shard} changed status to {status}") with allure.step("No related errors should be in log"): if node_under_test.host.is_message_in_logs( message_regex=r"\Wno such file or directory\W", since=test_start_time ): exception_messages.append(f"Node {node_under_test} have shard errors in logs") with allure.step("Pass test if no errors found"): assert not exception_messages, "\n".join(exception_messages)