import logging import random from datetime import datetime from time import sleep import allure import pytest from frostfs_testlib import reporter from frostfs_testlib.resources.common import MORPH_BLOCK_TIME from frostfs_testlib.resources.wellknown_acl import PUBLIC_ACL from frostfs_testlib.s3 import S3ClientWrapper, VersioningStatus from frostfs_testlib.steps.cli.container import StorageContainer, StorageContainerInfo, create_container from frostfs_testlib.steps.cli.object import get_object, put_object_to_random_node from frostfs_testlib.steps.node_management import ( check_node_in_map, check_node_not_in_map, exclude_node_from_network_map, include_node_to_network_map, remove_nodes_from_map_morph, wait_for_node_to_be_ready, ) from frostfs_testlib.steps.s3 import s3_helper from frostfs_testlib.steps.s3.s3_helper import search_nodes_with_bucket from frostfs_testlib.storage.cluster import Cluster, ClusterNode, S3Gate, StorageNode from frostfs_testlib.storage.controllers import ClusterStateController, ShardsWatcher from frostfs_testlib.storage.dataclasses.object_size import ObjectSize from frostfs_testlib.storage.dataclasses.storage_object_info import StorageObjectInfo from frostfs_testlib.storage.dataclasses.wallet import WalletInfo from frostfs_testlib.testing.cluster_test_base import ClusterTestBase from frostfs_testlib.testing.test_control import expect_not_raises from frostfs_testlib.utils import datetime_utils from frostfs_testlib.utils.failover_utils import wait_object_replication from frostfs_testlib.utils.file_keeper import FileKeeper from frostfs_testlib.utils.file_utils import generate_file, get_file_hash logger = logging.getLogger("NeoLogger") stopped_nodes: list[StorageNode] = [] @pytest.fixture(scope="function") @allure.title("Provide File Keeper") def file_keeper(): keeper = FileKeeper() yield keeper keeper.restore_files() @pytest.mark.failover @pytest.mark.failover_storage class TestFailoverStorage(ClusterTestBase): @allure.title("Shutdown and start node (stop_mode={stop_mode})") @pytest.mark.parametrize("stop_mode", ["hard", "soft"]) @pytest.mark.failover_reboot def test_lose_storage_node_host( self, default_wallet, stop_mode: str, require_multiple_hosts, simple_object_size: ObjectSize, cluster: Cluster, cluster_state_controller: ClusterStateController, ): wallet = default_wallet placement_rule = "REP 2 IN X CBF 2 SELECT 2 FROM * AS X" source_file_path = generate_file(simple_object_size.value) stopped_hosts_nodes = [] with reporter.step(f"Create container and put object"): cid = create_container( wallet, shell=self.shell, endpoint=self.cluster.default_rpc_endpoint, rule=placement_rule, basic_acl=PUBLIC_ACL, ) oid = put_object_to_random_node(wallet, source_file_path, cid, shell=self.shell, cluster=self.cluster) with reporter.step(f"Wait for replication and get nodes with object"): nodes_with_object = wait_object_replication(cid, oid, 2, shell=self.shell, nodes=self.cluster.storage_nodes) with reporter.step(f"Stop 2 nodes with object and wait replication one by one"): for storage_node in random.sample(nodes_with_object, 2): stopped_hosts_nodes.append(storage_node) cluster_node = cluster.node(storage_node) cluster_state_controller.stop_node_host(cluster_node, stop_mode) replicated_nodes = wait_object_replication( cid, oid, 2, shell=self.shell, nodes=list(set(self.cluster.storage_nodes) - {*stopped_hosts_nodes}), ) with reporter.step("Check object data is not corrupted"): got_file_path = get_object( wallet, cid, oid, endpoint=replicated_nodes[0].get_rpc_endpoint(), shell=self.shell ) assert get_file_hash(source_file_path) == get_file_hash(got_file_path) with reporter.step("Return all hosts"): cluster_state_controller.start_stopped_hosts() with reporter.step("Check object data is not corrupted"): replicated_nodes = wait_object_replication(cid, oid, 2, shell=self.shell, nodes=self.cluster.storage_nodes) got_file_path = get_object( wallet, cid, oid, shell=self.shell, endpoint=replicated_nodes[0].get_rpc_endpoint() ) assert get_file_hash(source_file_path) == get_file_hash(got_file_path) @pytest.mark.parametrize("s3_policy", ["pytest_tests/resources/files/policy.json"], indirect=True) @allure.title("Do not ignore unhealthy tree endpoints (s3_client={s3_client})") def test_unhealthy_tree( self, s3_client: S3ClientWrapper, default_wallet: WalletInfo, simple_object_size: ObjectSize, cluster_state_controller: ClusterStateController, ): default_node = self.cluster.cluster_nodes[0] with reporter.step("Turn S3 GW off on default node"): cluster_state_controller.stop_service_of_type(default_node, S3Gate) with reporter.step("Turn off storage on default node"): cluster_state_controller.stop_service_of_type(default_node, StorageNode) with reporter.step("Turn on S3 GW on default node"): cluster_state_controller.start_service_of_type(default_node, S3Gate) with reporter.step("Turn on storage on default node"): cluster_state_controller.start_service_of_type(default_node, StorageNode) with reporter.step("Create bucket with REP 1 SELECT 1 policy"): bucket = s3_client.create_bucket( location_constraint="rep-1", ) file_path = generate_file(simple_object_size.value) file_name = s3_helper.object_key_from_file_path(file_path) with reporter.step("Put object into bucket"): put_object = s3_client.put_object(bucket, file_path) s3_helper.check_objects_in_bucket(s3_client, bucket, expected_objects=[file_name]) node_bucket = search_nodes_with_bucket( cluster=self.cluster, bucket_name=bucket, wallet=default_wallet, shell=self.shell, endpoint=self.cluster.storage_nodes[0].get_rpc_endpoint(), )[0] with reporter.step("Turn off all storage nodes except bucket node"): for node in [node_to_stop for node_to_stop in self.cluster.cluster_nodes if node_to_stop != node_bucket]: with reporter.step(f"Stop storage service on node: {node}"): cluster_state_controller.stop_service_of_type(node, StorageNode) with reporter.step(f"Change s3 endpoint to bucket node"): s3_client.set_endpoint(node_bucket.s3_gate.get_endpoint()) with reporter.step("Check that object is available"): s3_helper.check_objects_in_bucket(s3_client, bucket, expected_objects=[file_name]) with reporter.step("Start storage nodes"): cluster_state_controller.start_all_stopped_services() @pytest.mark.failover @pytest.mark.failover_empty_map class TestEmptyMap(ClusterTestBase): """ A set of tests for makes map empty and verify that we can read objects after that """ @reporter.step("Teardown after EmptyMap offline test") @pytest.fixture() def empty_map_offline_teardown(self): yield with reporter.step("Return all storage nodes to network map"): for node in stopped_nodes: include_node_to_network_map(node, node, shell=self.shell, cluster=self.cluster) stopped_nodes.remove(node) @pytest.mark.failover_empty_map_offlne @allure.title("Empty network map via offline all storage nodes (s3_client={s3_client})") def test_offline_all_storage_nodes( self, s3_client: S3ClientWrapper, bucket: str, simple_object_size: ObjectSize, empty_map_offline_teardown, ): """ The test makes network map empty (set offline status on all storage nodes) then returns all nodes to map and checks that object can read through s3. Steps: 1. Check that bucket is empty 2: PUT object into bucket 3: Check that object exists in bucket 4: Exclude all storage nodes from network map (set status OFFLINE) 5: Return all storage nodes to network map 6: Check that we can read object from #2 Args: bucket: bucket which contains tested object simple_object_size: size of object """ file_path = generate_file(simple_object_size.value) file_name = s3_helper.object_key_from_file_path(file_path) bucket_objects = [file_name] with reporter.step("Put object into bucket"): s3_client.put_object(bucket, file_path) with reporter.step("Check that object exists in bucket"): s3_helper.check_objects_in_bucket(s3_client, bucket, bucket_objects) storage_nodes = self.cluster.storage_nodes with reporter.step("Exclude all storage nodes from network map"): for node in storage_nodes: stopped_nodes.append(node) exclude_node_from_network_map(node, node, shell=self.shell, cluster=self.cluster) with reporter.step("Return all storage nodes to network map"): for node in storage_nodes: include_node_to_network_map(node, node, shell=self.shell, cluster=self.cluster) stopped_nodes.remove(node) with reporter.step("Check that we can read object"): s3_helper.check_objects_in_bucket(s3_client, bucket, bucket_objects) @reporter.step("Teardown after EmptyMap stop service test") @pytest.fixture() def empty_map_stop_service_teardown(self, cluster_state_controller: ClusterStateController): yield with reporter.step("Return all storage nodes to network map"): cluster_state_controller.start_all_stopped_services() for node in stopped_nodes: check_node_in_map(node, shell=self.shell, alive_node=node) @pytest.mark.failover_empty_map_stop_service @allure.title("Empty network map via stop all storage services (s3_client={s3_client})") def test_stop_all_storage_nodes( self, s3_client: S3ClientWrapper, bucket: str, simple_object_size: ObjectSize, empty_map_stop_service_teardown, cluster_state_controller: ClusterStateController, ): """ The test makes network map empty (stop storage service on all nodes then use 'frostfs-adm morph delete-nodes' to delete nodes from map) then start all services and checks that object can read through s3. Steps: 1. Check that bucket is empty 2: PUT object into bucket 3: Check that object exists in bucket 4: Exclude all storage nodes from network map (stop storage service and manual exclude from map) 5: Return all storage nodes to network map 6: Check that we can read object from #2 Args: bucket: bucket which contains tested object simple_object_size: size of object """ file_path = generate_file(simple_object_size.value) file_name = s3_helper.object_key_from_file_path(file_path) bucket_objects = [file_name] with reporter.step("Put object into bucket"): s3_client.put_object(bucket, file_path) with reporter.step("Check that object exists in bucket"): s3_helper.check_objects_in_bucket(s3_client, bucket, bucket_objects) with reporter.step("Stop all storage nodes"): cluster_state_controller.stop_services_of_type(StorageNode) with reporter.step("Remove all nodes from network map"): remove_nodes_from_map_morph( shell=self.shell, cluster=self.cluster, remove_nodes=self.cluster.services(StorageNode) ) with reporter.step("Return all storage nodes to network map"): self.return_nodes_after_stop_with_check_empty_map(cluster_state_controller) with reporter.step("Check that object exists in bucket"): s3_helper.check_objects_in_bucket(s3_client, bucket, bucket_objects) @reporter.step("Return all nodes to cluster with check empty map first") def return_nodes_after_stop_with_check_empty_map(self, cluster_state_controller: ClusterStateController) -> None: first_node = self.cluster.cluster_nodes[0].service(StorageNode) with reporter.step("Start first node and check network map"): cluster_state_controller.start_service_of_type(self.cluster.cluster_nodes[0], StorageNode) wait_for_node_to_be_ready(first_node) for check_node in self.cluster.storage_nodes: check_node_not_in_map(check_node, shell=self.shell, alive_node=first_node) for node in self.cluster.cluster_nodes[1:]: storage_node = node.service(StorageNode) cluster_state_controller.start_service_of_type(node, StorageNode) wait_for_node_to_be_ready(storage_node) sleep(datetime_utils.parse_time(MORPH_BLOCK_TIME)) self.tick_epochs(1) check_node_in_map(storage_node, shell=self.shell, alive_node=first_node) @allure.title("Object loss from fstree/blobovnicza (versioning=enabled, s3_client={s3_client})") def test_s3_fstree_blobovnicza_loss_versioning_on( self, s3_client: S3ClientWrapper, simple_object_size: ObjectSize, cluster_state_controller: ClusterStateController, bucket: str, ): s3_helper.set_bucket_versioning(s3_client, bucket, VersioningStatus.ENABLED) file_path = generate_file(simple_object_size.value) file_name = s3_helper.object_key_from_file_path(file_path) object_versions = [] with reporter.step("Put object into one bucket"): put_object = s3_client.put_object(bucket, file_path) s3_helper.check_objects_in_bucket(s3_client, bucket, expected_objects=[file_name]) object_versions.append(put_object) with reporter.step("Stop all storage nodes"): cluster_state_controller.stop_services_of_type(StorageNode) with reporter.step("Delete blobovnicza and fstree from all nodes"): for node in self.cluster.storage_nodes: node.delete_blobovnicza() node.delete_fstree() with reporter.step("Start all storage nodes"): cluster_state_controller.start_all_stopped_services() # need to get Delete Marker first with reporter.step("Delete the object from the bucket"): delete_object = s3_client.delete_object(bucket, file_name) object_versions.append(delete_object["VersionId"]) # and now delete all versions of object (including Delete Markers) with reporter.step("Delete all versions of the object from the bucket"): for version in object_versions: delete_object = s3_client.delete_object(bucket, file_name, version_id=version) with reporter.step("Delete bucket"): s3_client.delete_bucket(bucket) @allure.title("Object loss from fstree/blobovnicza (versioning=disabled, s3_client={s3_client})") def test_s3_fstree_blobovnicza_loss_versioning_off( self, s3_client: S3ClientWrapper, simple_object_size: ObjectSize, cluster_state_controller: ClusterStateController, bucket: str, ): file_path = generate_file(simple_object_size.value) file_name = s3_helper.object_key_from_file_path(file_path) with reporter.step("Put object into one bucket"): s3_client.put_object(bucket, file_path) s3_helper.check_objects_in_bucket(s3_client, bucket, expected_objects=[file_name]) with reporter.step("Stop all storage nodes"): cluster_state_controller.stop_services_of_type(StorageNode) with reporter.step("Delete blobovnicza and fstree from all nodes"): for node in self.cluster.storage_nodes: node.delete_blobovnicza() node.delete_fstree() with reporter.step("Start all storage nodes"): cluster_state_controller.start_all_stopped_services() with reporter.step("Delete the object from the bucket"): s3_client.delete_object(bucket, file_name) with reporter.step("Delete bucket"): s3_client.delete_bucket(bucket) @pytest.mark.skip(reason="Need to increase cache lifetime") @pytest.mark.parametrize( # versioning should NOT be VersioningStatus.SUSPENDED, it needs to be undefined "versioning_status", [VersioningStatus.ENABLED, VersioningStatus.UNDEFINED], ) @allure.title( "After Pilorama.db loss on all nodes list objects should return nothing in second listing (versioning_status={versioning_status}, s3_client={s3_client})" ) def test_s3_pilorama_loss( self, s3_client: S3ClientWrapper, simple_object_size: ObjectSize, versioning_status: VersioningStatus, cluster_state_controller: ClusterStateController, bucket: str, ): s3_helper.set_bucket_versioning(s3_client, bucket, versioning_status) file_path = generate_file(simple_object_size.value) file_name = s3_helper.object_key_from_file_path(file_path) with reporter.step("Put object into one bucket"): s3_client.put_object(bucket, file_path) s3_helper.check_objects_in_bucket(s3_client, bucket, expected_objects=[file_name]) with reporter.step("Stop all storage nodes"): cluster_state_controller.stop_services_of_type(StorageNode) with reporter.step("Delete pilorama.db from all nodes"): for node in self.cluster.storage_nodes: for shard in node.get_shards(): node.delete_file(shard.pilorama) with reporter.step("Start all storage nodes"): cluster_state_controller.start_all_stopped_services() with reporter.step("Check list objects first time"): objects_list = s3_client.list_objects(bucket) assert objects_list, f"Expected not empty bucket" with reporter.step("Check list objects second time"): objects_list = s3_client.list_objects(bucket) assert not objects_list, f"Expected empty bucket, got {objects_list}" with reporter.step("Delete bucket"): s3_client.delete_bucket(bucket) @pytest.mark.failover @pytest.mark.failover_data_loss class TestStorageDataLoss(ClusterTestBase): @allure.title( "After metabase loss on all nodes operations on objects and buckets should be still available via S3 (s3_client={s3_client})" ) @pytest.mark.metabase_loss def test_metabase_loss( self, s3_client: S3ClientWrapper, simple_object_size: ObjectSize, complex_object_size: ObjectSize, cluster_state_controller: ClusterStateController, file_keeper: FileKeeper, bucket: str, ): with reporter.step("Put objects into bucket"): simple_object_path = generate_file(simple_object_size.value) simple_object_key = s3_helper.object_key_from_file_path(simple_object_path) complex_object_path = generate_file(complex_object_size.value) complex_object_key = s3_helper.object_key_from_file_path(complex_object_path) s3_client.put_object(bucket, simple_object_path) s3_client.put_object(bucket, complex_object_path) with reporter.step("Check objects are in bucket"): s3_helper.check_objects_in_bucket( s3_client, bucket, expected_objects=[simple_object_key, complex_object_key] ) with reporter.step("Stop storage services on all nodes"): cluster_state_controller.stop_services_of_type(StorageNode) with reporter.step("Delete metabase from all nodes"): for node in cluster_state_controller.cluster.storage_nodes: node.delete_metabase() with reporter.step("Enable resync_metabase option for storage services"): for storage_node in cluster_state_controller.cluster.storage_nodes: with reporter.step(f"Enable resync_metabase option for {storage_node}"): config_file_path, config = storage_node.get_shards_config() if not config["storage"]["shard"]["default"]["resync_metabase"]: file_keeper.add(storage_node, config_file_path) config["storage"]["shard"]["default"]["resync_metabase"] = True storage_node.save_config(config, config_file_path) with reporter.step("Start storage services on all nodes"): cluster_state_controller.start_all_stopped_services() with reporter.step("Wait for tree rebalance"): # TODO: Use product metric when we have proper ones for this check sleep(30) with reporter.step("Delete objects from bucket"): with reporter.step("Delete simple object from bucket"): with expect_not_raises(): s3_client.delete_object(bucket, simple_object_key) with reporter.step("Delete complex object from bucket"): with expect_not_raises(): s3_client.delete_object(bucket, complex_object_key) with reporter.step("Delete bucket"): with expect_not_raises(): s3_client.delete_bucket(bucket) @allure.title("Write cache loss on one node should not affect shards and should not produce errors in log") @pytest.mark.write_cache_loss def test_write_cache_loss_on_one_node( self, node_under_test: ClusterNode, simple_object_size: ObjectSize, cluster_state_controller: ClusterStateController, shards_watcher: ShardsWatcher, default_wallet: WalletInfo, test_start_time: datetime, ): exception_messages = [] with reporter.step(f"Create container on node {node_under_test}"): locode = node_under_test.storage_node.get_un_locode() placement_rule = f"""REP 1 IN X CBF 1 SELECT 1 FROM C AS X FILTER 'UN-LOCODE' EQ '{locode}' AS C""" cid = create_container( default_wallet, self.shell, node_under_test.storage_node.get_rpc_endpoint(), rule=placement_rule, ) container = StorageContainer( StorageContainerInfo(cid, default_wallet), self.shell, cluster_state_controller.cluster, ) with reporter.step(f"Put couple objects to container on node {node_under_test}"): storage_objects: list[StorageObjectInfo] = [] for _ in range(5): storage_object = container.generate_object( simple_object_size.value, endpoint=node_under_test.storage_node.get_rpc_endpoint(), ) storage_objects.append(storage_object) with reporter.step("Take shards snapshot"): shards_watcher.take_shards_snapshot() with reporter.step(f"Stop storage service on node {node_under_test}"): cluster_state_controller.stop_service_of_type(node_under_test, StorageNode) with reporter.step(f"Delete write cache from node {node_under_test}"): node_under_test.storage_node.delete_write_cache() with reporter.step(f"Start storage service on node {node_under_test}"): cluster_state_controller.start_all_stopped_services() with reporter.step("Objects should be available"): for storage_object in storage_objects: get_object( storage_object.wallet, container.get_id(), storage_object.oid, self.shell, node_under_test.storage_node.get_rpc_endpoint(), ) with reporter.step("No shards should have new errors"): shards_watcher.take_shards_snapshot() shards_with_errors = shards_watcher.get_shards_with_new_errors() if shards_with_errors: exception_messages.append(f"Shards have new errors: {shards_with_errors}") with reporter.step("No shards should have degraded status"): snapshot = shards_watcher.get_shards_snapshot() for shard in snapshot: status = snapshot[shard]["mode"] if status != "read-write": exception_messages.append(f"Shard {shard} changed status to {status}") with reporter.step("No related errors should be in log"): if node_under_test.host.is_message_in_logs( message_regex=r"\Wno such file or directory\W", since=test_start_time ): exception_messages.append(f"Node {node_under_test} have shard errors in logs") with reporter.step("Pass test if no errors found"): assert not exception_messages, "\n".join(exception_messages) @allure.title( "Loss of one node should trigger use of tree and storage service in another node (s3_client={s3_client})" ) def test_s3_one_endpoint_loss( self, bucket, s3_client: S3ClientWrapper, simple_object_size: ObjectSize, cluster_state_controller: ClusterStateController, ): # TODO: need to check that s3 gate is connected to localhost (such metric will be supported in 1.3) with reporter.step("Stop one node and wait for rebalance connection of s3 gate to storage service"): current_node = self.cluster.cluster_nodes[0] cluster_state_controller.stop_service_of_type(current_node, StorageNode) # waiting for rebalance connection of s3 gate to storage service sleep(60) file_path = generate_file(simple_object_size.value) file_name = s3_helper.object_key_from_file_path(file_path) with reporter.step("Put object into one bucket"): put_object = s3_client.put_object(bucket, file_path) s3_helper.check_objects_in_bucket(s3_client, bucket, expected_objects=[file_name]) @pytest.mark.parametrize("s3_policy", ["pytest_tests/resources/files/policy.json"], indirect=True) @allure.title("After Pilorama.db loss on one node object is retrievable (s3_client={s3_client})") def test_s3_one_pilorama_loss( self, s3_client: S3ClientWrapper, simple_object_size: ObjectSize, cluster_state_controller: ClusterStateController, ): bucket = s3_client.create_bucket( location_constraint="rep3", grant_read="uri=http://acs.amazonaws.com/groups/global/AllUsers", ) s3_helper.set_bucket_versioning(s3_client, bucket, VersioningStatus.ENABLED) with reporter.step("Check bucket versioning"): bucket_versioning = s3_client.get_bucket_versioning_status(bucket) assert bucket_versioning == "Enabled", "Bucket should have enabled versioning" file_path = generate_file(simple_object_size.value) file_name = s3_helper.object_key_from_file_path(file_path) object_versions = [] with reporter.step("Put object into one bucket"): put_object = s3_client.put_object(bucket, file_path) s3_helper.check_objects_in_bucket(s3_client, bucket, expected_objects=[file_name]) object_versions.append(put_object) node_to_check = self.cluster.storage_nodes[0] piloramas_list_before_removing = [] with reporter.step("Get list of all pilorama.db on shards"): for shard in node_to_check.get_shards(): piloramas_list_before_removing.append(shard.pilorama) with reporter.step("Check that all pilorama.db files exist on node"): for pilorama in piloramas_list_before_removing: assert node_to_check.is_file_exist(pilorama), f"File {pilorama} does not exist" with reporter.step("Stop all storage nodes"): cluster_state_controller.stop_services_of_type(StorageNode) with reporter.step("Delete pilorama.db from one node"): for pilorama in piloramas_list_before_removing: node_to_check.delete_file(pilorama) with reporter.step("Start all storage nodes"): cluster_state_controller.start_all_stopped_services() with reporter.step("Tick epoch to trigger sync and then wait for 1 minute"): self.tick_epochs(1) sleep(120) with reporter.step("Get list of all pilorama.db after sync"): for pilorama in piloramas_list_before_removing: assert node_to_check.is_file_exist(pilorama), f"File {pilorama} does not exist" with reporter.step("Check bucket versioning"): bucket_versioning = s3_client.get_bucket_versioning_status(bucket) assert bucket_versioning == "Enabled", "Bucket should have enabled versioning" with reporter.step("Check list objects"): objects_list = s3_client.list_objects(bucket) assert objects_list, f"Expected not empty bucket" with reporter.step("Delete the object from the bucket"): delete_object = s3_client.delete_object(bucket, file_name) assert "DeleteMarker" in delete_object.keys(), "Delete markers not found" with reporter.step("Check list objects"): objects_list = s3_client.list_objects_versions(bucket) assert objects_list, f"Expected not empty bucket" object_versions.append(delete_object["VersionId"]) # and now delete all versions of object (including Delete Markers) with reporter.step("Delete all versions of the object from the bucket"): for version in object_versions: delete_object = s3_client.delete_object(bucket, file_name, version_id=version) with reporter.step("Check list objects"): objects_list = s3_client.list_objects_versions(bucket) assert not objects_list, f"Expected empty bucket" with reporter.step("Delete bucket"): s3_client.delete_bucket(bucket)