import logging import random from datetime import datetime from time import sleep import allure import pytest from frostfs_testlib.hosting import Host from frostfs_testlib.resources.common import MORPH_BLOCK_TIME from frostfs_testlib.resources.wellknown_acl import PUBLIC_ACL from frostfs_testlib.s3 import AwsCliClient, Boto3ClientWrapper, S3ClientWrapper, VersioningStatus from frostfs_testlib.shell import CommandOptions from frostfs_testlib.steps.cli.container import StorageContainer, StorageContainerInfo, create_container from frostfs_testlib.steps.cli.object import get_object, put_object_to_random_node from frostfs_testlib.steps.node_management import ( check_node_in_map, check_node_not_in_map, exclude_node_from_network_map, include_node_to_network_map, remove_nodes_from_map_morph, wait_for_node_to_be_ready, ) from frostfs_testlib.steps.s3 import s3_helper from frostfs_testlib.storage.cluster import Cluster, ClusterNode, StorageNode from frostfs_testlib.storage.controllers import ClusterStateController, ShardsWatcher from frostfs_testlib.storage.dataclasses.object_size import ObjectSize from frostfs_testlib.storage.dataclasses.storage_object_info import StorageObjectInfo from frostfs_testlib.storage.dataclasses.wallet import WalletInfo from frostfs_testlib.testing.cluster_test_base import ClusterTestBase from frostfs_testlib.testing.test_control import expect_not_raises from frostfs_testlib.utils import datetime_utils from frostfs_testlib.utils.failover_utils import wait_object_replication from frostfs_testlib.utils.file_keeper import FileKeeper from frostfs_testlib.utils.file_utils import generate_file, get_file_hash logger = logging.getLogger("NeoLogger") stopped_nodes: list[StorageNode] = [] def pytest_generate_tests(metafunc: pytest.Metafunc): if "s3_client" in metafunc.fixturenames: metafunc.parametrize("s3_client", [AwsCliClient, Boto3ClientWrapper], indirect=True) @pytest.fixture(scope="function") @allure.title("Provide File Keeper") def file_keeper(): keeper = FileKeeper() yield keeper keeper.restore_files() @allure.step("Return all stopped hosts") @pytest.fixture(scope="function", autouse=True) def after_run_return_all_stopped_hosts(cluster_state_controller: ClusterStateController) -> str: yield cluster_state_controller.start_stopped_hosts() @allure.step("Return all stopped storage services after test") @pytest.fixture(scope="function") def after_run_return_all_stopped_services(cluster_state_controller: ClusterStateController): yield cluster_state_controller.start_stopped_storage_services() @allure.step("Return all stopped S3 GateWay services after test") @pytest.fixture(scope="function") def after_run_return_all_stopped_s3(cluster_state_controller: ClusterStateController): yield cluster_state_controller.start_stopped_s3_gates() def panic_reboot_host(host: Host) -> None: shell = host.get_shell() shell.exec('sudo sh -c "echo 1 > /proc/sys/kernel/sysrq"') options = CommandOptions(close_stdin=True, timeout=1, check=False) shell.exec('sudo sh -c "echo b > /proc/sysrq-trigger"', options) @pytest.mark.failover @pytest.mark.failover_storage class TestFailoverStorage(ClusterTestBase): @allure.title("Shutdown and start node (stop_mode={stop_mode})") @pytest.mark.parametrize("stop_mode", ["hard", "soft"]) @pytest.mark.failover_reboot def test_lose_storage_node_host( self, default_wallet, stop_mode: str, require_multiple_hosts, simple_object_size: ObjectSize, cluster: Cluster, cluster_state_controller: ClusterStateController, ): wallet = default_wallet placement_rule = "REP 2 IN X CBF 2 SELECT 2 FROM * AS X" source_file_path = generate_file(simple_object_size.value) stopped_hosts_nodes = [] with allure.step(f"Create container and put object"): cid = create_container( wallet, shell=self.shell, endpoint=self.cluster.default_rpc_endpoint, rule=placement_rule, basic_acl=PUBLIC_ACL, ) oid = put_object_to_random_node(wallet, source_file_path, cid, shell=self.shell, cluster=self.cluster) with allure.step(f"Wait for replication and get nodes with object"): nodes_with_object = wait_object_replication(cid, oid, 2, shell=self.shell, nodes=self.cluster.storage_nodes) with allure.step(f"Stop 2 nodes with object and wait replication one by one"): for storage_node in random.sample(nodes_with_object, 2): stopped_hosts_nodes.append(storage_node) cluster_node = cluster.node(storage_node) cluster_state_controller.stop_node_host(cluster_node, stop_mode) replicated_nodes = wait_object_replication( cid, oid, 2, shell=self.shell, nodes=list(set(self.cluster.storage_nodes) - {*stopped_hosts_nodes}), ) with allure.step("Check object data is not corrupted"): got_file_path = get_object( wallet, cid, oid, endpoint=replicated_nodes[0].get_rpc_endpoint(), shell=self.shell ) assert get_file_hash(source_file_path) == get_file_hash(got_file_path) with allure.step("Return all hosts"): cluster_state_controller.start_stopped_hosts() with allure.step("Check object data is not corrupted"): replicated_nodes = wait_object_replication(cid, oid, 2, shell=self.shell, nodes=self.cluster.storage_nodes) got_file_path = get_object( wallet, cid, oid, shell=self.shell, endpoint=replicated_nodes[0].get_rpc_endpoint() ) assert get_file_hash(source_file_path) == get_file_hash(got_file_path) @allure.title("Panic reboot nodes (sequenced_reboots={sequence})") @pytest.mark.parametrize("sequence", [True, False]) @pytest.mark.failover_panic def test_panic_storage_node_host( self, default_wallet, require_multiple_hosts, sequence: bool, simple_object_size: ObjectSize, cluster_state_controller: ClusterStateController, ): wallet = default_wallet placement_rule = "REP 2 IN X CBF 2 SELECT 2 FROM * AS X" source_file_path = generate_file(simple_object_size.value) cid = create_container( wallet, shell=self.shell, endpoint=self.cluster.default_rpc_endpoint, rule=placement_rule, basic_acl=PUBLIC_ACL, ) oid = put_object_to_random_node(wallet, source_file_path, cid, shell=self.shell, cluster=self.cluster) nodes_with_object = wait_object_replication(cid, oid, 2, shell=self.shell, nodes=self.cluster.storage_nodes) allure.attach( "\n".join([str(node) for node in nodes_with_object]), "Current nodes with object", allure.attachment_type.TEXT, ) new_nodes: list[StorageNode] = [] with allure.step(f"Hard reboot hosts via magic SysRq option"): for storage_node in nodes_with_object: cluster_state_controller.panic_reboot_host(self.cluster.node(storage_node)) if sequence: new_nodes = wait_object_replication( cid, oid, 2, shell=self.shell, nodes=self.cluster.storage_nodes, ) allure.attach( "\n".join([str(new_node) for new_node in new_nodes]), f"Nodes with object after {storage_node} fail", allure.attachment_type.TEXT, ) if not sequence: new_nodes = wait_object_replication(cid, oid, 2, shell=self.shell, nodes=self.cluster.storage_nodes) allure.attach( "\n".join([str(new_node) for new_node in new_nodes]), "Nodes with object after nodes fail", allure.attachment_type.TEXT, ) got_file_path = get_object(wallet, cid, oid, shell=self.shell, endpoint=new_nodes[0].get_rpc_endpoint()) assert get_file_hash(source_file_path) == get_file_hash(got_file_path) @allure.title("Do not ignore unhealthy tree endpoints (s3_client={s3_client})") def test_unhealthy_tree( self, s3_client: S3ClientWrapper, simple_object_size: ObjectSize, cluster_state_controller: ClusterStateController, after_run_return_all_stopped_s3, after_run_return_all_stopped_services, ): default_node = self.cluster.cluster_nodes[0] with allure.step("Turn S3 GW off on default node"): cluster_state_controller.stop_s3_gate(default_node) with allure.step("Turn off storage on default node"): cluster_state_controller.stop_storage_service(default_node) with allure.step("Turn on S3 GW on default node"): cluster_state_controller.start_s3_gate(default_node) with allure.step("Turn on storage on default node"): cluster_state_controller.start_storage_service(default_node) with allure.step("Create bucket with REP 1 SELECT 1 policy"): bucket = s3_client.create_bucket( location_constraint="load-1-1", ) file_path = generate_file(simple_object_size.value) file_name = s3_helper.object_key_from_file_path(file_path) with allure.step("Put object into bucket"): put_object = s3_client.put_object(bucket, file_path) s3_helper.check_objects_in_bucket(s3_client, bucket, expected_objects=[file_name]) with allure.step("Turn off all storage nodes except default"): for node in self.cluster.cluster_nodes[1:]: with allure.step(f"Stop storage service on node: {node}"): cluster_state_controller.stop_storage_service(node) with allure.step("Check that object is available"): s3_helper.check_objects_in_bucket(s3_client, bucket, expected_objects=[file_name]) with allure.step("Start storage nodes"): cluster_state_controller.start_stopped_storage_services() @pytest.mark.failover @pytest.mark.failover_empty_map class TestEmptyMap(ClusterTestBase): """ A set of tests for makes map empty and verify that we can read objects after that """ @allure.step("Teardown after EmptyMap offline test") @pytest.fixture() def empty_map_offline_teardown(self): yield with allure.step("Return all storage nodes to network map"): for node in stopped_nodes: include_node_to_network_map(node, node, shell=self.shell, cluster=self.cluster) stopped_nodes.remove(node) @pytest.mark.failover_empty_map_offlne @allure.title("Empty network map via offline all storage nodes (s3_client={s3_client})") def test_offline_all_storage_nodes( self, s3_client: S3ClientWrapper, bucket: str, simple_object_size: ObjectSize, empty_map_offline_teardown, ): """ The test makes network map empty (set offline status on all storage nodes) then returns all nodes to map and checks that object can read through s3. Steps: 1. Check that bucket is empty 2: PUT object into bucket 3: Check that object exists in bucket 4: Exclude all storage nodes from network map (set status OFFLINE) 5: Return all storage nodes to network map 6: Check that we can read object from #2 Args: bucket: bucket which contains tested object simple_object_size: size of object """ file_path = generate_file(simple_object_size.value) file_name = s3_helper.object_key_from_file_path(file_path) bucket_objects = [file_name] objects_list = s3_client.list_objects(bucket) assert not objects_list, f"Expected empty bucket, got {objects_list}" with allure.step("Put object into bucket"): s3_client.put_object(bucket, file_path) with allure.step("Check that object exists in bucket"): s3_helper.check_objects_in_bucket(s3_client, bucket, bucket_objects) storage_nodes = self.cluster.storage_nodes with allure.step("Exclude all storage nodes from network map"): for node in storage_nodes: stopped_nodes.append(node) exclude_node_from_network_map(node, node, shell=self.shell, cluster=self.cluster) with allure.step("Return all storage nodes to network map"): for node in storage_nodes: include_node_to_network_map(node, node, shell=self.shell, cluster=self.cluster) stopped_nodes.remove(node) with allure.step("Check that we can read object"): s3_helper.check_objects_in_bucket(s3_client, bucket, bucket_objects) @allure.step("Teardown after EmptyMap stop service test") @pytest.fixture() def empty_map_stop_service_teardown(self, cluster_state_controller: ClusterStateController): yield with allure.step("Return all storage nodes to network map"): cluster_state_controller.start_stopped_storage_services() for node in stopped_nodes: check_node_in_map(node, shell=self.shell, alive_node=node) @pytest.mark.failover_empty_map_stop_service @allure.title("Empty network map via stop all storage services (s3_client={s3_client})") def test_stop_all_storage_nodes( self, s3_client: S3ClientWrapper, bucket: str, simple_object_size: ObjectSize, empty_map_stop_service_teardown, cluster_state_controller: ClusterStateController, ): """ The test makes network map empty (stop storage service on all nodes then use 'frostfs-adm morph delete-nodes' to delete nodes from map) then start all services and checks that object can read through s3. Steps: 1. Check that bucket is empty 2: PUT object into bucket 3: Check that object exists in bucket 4: Exclude all storage nodes from network map (stop storage service and manual exclude from map) 5: Return all storage nodes to network map 6: Check that we can read object from #2 Args: bucket: bucket which contains tested object simple_object_size: size of object """ file_path = generate_file(simple_object_size.value) file_name = s3_helper.object_key_from_file_path(file_path) bucket_objects = [file_name] objects_list = s3_client.list_objects(bucket) assert not objects_list, f"Expected empty bucket, got {objects_list}" with allure.step("Put object into bucket"): s3_client.put_object(bucket, file_path) with allure.step("Check that object exists in bucket"): s3_helper.check_objects_in_bucket(s3_client, bucket, bucket_objects) with allure.step("Stop all storage nodes"): cluster_state_controller.stop_all_storage_services() with allure.step("Remove all nodes from network map"): remove_nodes_from_map_morph( shell=self.shell, cluster=self.cluster, remove_nodes=self.cluster.services(StorageNode) ) with allure.step("Return all storage nodes to network map"): self.return_nodes_after_stop_with_check_empty_map(cluster_state_controller) with allure.step("Check that object exists in bucket"): s3_helper.check_objects_in_bucket(s3_client, bucket, bucket_objects) @allure.step("Return all nodes to cluster with check empty map first") def return_nodes_after_stop_with_check_empty_map(self, cluster_state_controller: ClusterStateController) -> None: first_node = self.cluster.cluster_nodes[0].service(StorageNode) with allure.step("Start first node and check network map"): cluster_state_controller.start_storage_service(self.cluster.cluster_nodes[0]) wait_for_node_to_be_ready(first_node) for check_node in self.cluster.storage_nodes: check_node_not_in_map(check_node, shell=self.shell, alive_node=first_node) for node in self.cluster.cluster_nodes[1:]: storage_node = node.service(StorageNode) cluster_state_controller.start_storage_service(node) wait_for_node_to_be_ready(storage_node) sleep(datetime_utils.parse_time(MORPH_BLOCK_TIME)) self.tick_epochs(1) check_node_in_map(storage_node, shell=self.shell, alive_node=first_node) @allure.title("Object loss from fstree/blobovnicza (versioning=enabled, s3_client={s3_client})") def test_s3_fstree_blobovnicza_loss_versioning_on( self, s3_client: S3ClientWrapper, simple_object_size: ObjectSize, cluster_state_controller: ClusterStateController, ): bucket = s3_client.create_bucket() s3_helper.set_bucket_versioning(s3_client, bucket, VersioningStatus.ENABLED) file_path = generate_file(simple_object_size.value) file_name = s3_helper.object_key_from_file_path(file_path) object_versions = [] with allure.step("Put object into one bucket"): put_object = s3_client.put_object(bucket, file_path) s3_helper.check_objects_in_bucket(s3_client, bucket, expected_objects=[file_name]) object_versions.append(put_object) with allure.step("Stop all storage nodes"): for node in self.cluster.cluster_nodes: with allure.step(f"Stop storage service on node: {node}"): cluster_state_controller.stop_storage_service(node) with allure.step("Delete blobovnicza and fstree from all nodes"): for node in self.cluster.storage_nodes: node.delete_blobovnicza() node.delete_fstree() with allure.step("Start all storage nodes"): cluster_state_controller.start_stopped_storage_services() # need to get Delete Marker first with allure.step("Delete the object from the bucket"): delete_object = s3_client.delete_object(bucket, file_name) object_versions.append(delete_object["VersionId"]) # and now delete all versions of object (including Delete Markers) with allure.step("Delete all versions of the object from the bucket"): for version in object_versions: delete_object = s3_client.delete_object(bucket, file_name, version_id=version) with allure.step("Delete bucket"): s3_client.delete_bucket(bucket) @allure.title("Object loss from fstree/blobovnicza (versioning=disabled, s3_client={s3_client})") def test_s3_fstree_blobovnicza_loss_versioning_off( self, s3_client: S3ClientWrapper, simple_object_size: ObjectSize, cluster_state_controller: ClusterStateController, ): bucket = s3_client.create_bucket() file_path = generate_file(simple_object_size.value) file_name = s3_helper.object_key_from_file_path(file_path) with allure.step("Put object into one bucket"): s3_client.put_object(bucket, file_path) s3_helper.check_objects_in_bucket(s3_client, bucket, expected_objects=[file_name]) with allure.step("Stop all storage nodes"): for node in self.cluster.cluster_nodes: with allure.step(f"Stop storage service on node: {node}"): cluster_state_controller.stop_storage_service(node) with allure.step("Delete blobovnicza and fstree from all nodes"): for node in self.cluster.storage_nodes: node.delete_blobovnicza() node.delete_fstree() with allure.step("Start all storage nodes"): cluster_state_controller.start_stopped_storage_services() with allure.step("Delete the object from the bucket"): s3_client.delete_object(bucket, file_name) with allure.step("Delete bucket"): s3_client.delete_bucket(bucket) @pytest.mark.skip(reason="Need to increase cache lifetime") @pytest.mark.parametrize( # versioning should NOT be VersioningStatus.SUSPENDED, it needs to be undefined "versioning_status", [VersioningStatus.ENABLED, VersioningStatus.UNDEFINED], ) @allure.title( "After Pilorama.db loss on all nodes list objects should return nothing in second listing (versioning_status={versioning_status}, s3_client={s3_client})" ) def test_s3_pilorama_loss( self, s3_client: S3ClientWrapper, simple_object_size: ObjectSize, versioning_status: VersioningStatus, cluster_state_controller: ClusterStateController, ): bucket = s3_client.create_bucket() s3_helper.set_bucket_versioning(s3_client, bucket, versioning_status) file_path = generate_file(simple_object_size.value) file_name = s3_helper.object_key_from_file_path(file_path) with allure.step("Put object into one bucket"): s3_client.put_object(bucket, file_path) s3_helper.check_objects_in_bucket(s3_client, bucket, expected_objects=[file_name]) with allure.step("Stop all storage nodes"): for node in self.cluster.cluster_nodes: with allure.step(f"Stop storage service on node: {node}"): cluster_state_controller.stop_storage_service(node) with allure.step("Delete pilorama.db from all nodes"): for node in self.cluster.storage_nodes: node.delete_pilorama() with allure.step("Start all storage nodes"): cluster_state_controller.start_stopped_storage_services() with allure.step("Check list objects first time"): objects_list = s3_client.list_objects(bucket) assert objects_list, f"Expected not empty bucket" with allure.step("Check list objects second time"): objects_list = s3_client.list_objects(bucket) assert not objects_list, f"Expected empty bucket, got {objects_list}" with allure.step("Delete bucket"): s3_client.delete_bucket(bucket) @pytest.mark.failover @pytest.mark.failover_data_loss class TestStorageDataLoss(ClusterTestBase): @allure.step("Get list of all piloramas on node") def get_piloramas_list(self, node: StorageNode) -> list: data_directory_path = node.get_data_directory() cmd = f"sudo ls -1 {data_directory_path}/meta*/pilorama*" shell = node.host.get_shell() stdout = shell.exec(cmd).stdout piloramas = stdout.split("\n") return piloramas @allure.title( "After metabase loss on all nodes operations on objects and buckets should be still available via S3 (s3_client={s3_client})" ) @pytest.mark.metabase_loss def test_metabase_loss( self, s3_client: S3ClientWrapper, simple_object_size: ObjectSize, complex_object_size: ObjectSize, cluster_state_controller: ClusterStateController, after_run_return_all_stopped_services: str, file_keeper: FileKeeper, ): allure.dynamic.description(after_run_return_all_stopped_services) with allure.step("Create bucket"): bucket = s3_client.create_bucket() with allure.step("Put objects into bucket"): simple_object_path = generate_file(simple_object_size.value) simple_object_key = s3_helper.object_key_from_file_path(simple_object_path) complex_object_path = generate_file(complex_object_size.value) complex_object_key = s3_helper.object_key_from_file_path(complex_object_path) s3_client.put_object(bucket, simple_object_path) s3_client.put_object(bucket, complex_object_path) with allure.step("Check objects are in bucket"): s3_helper.check_objects_in_bucket( s3_client, bucket, expected_objects=[simple_object_key, complex_object_key] ) with allure.step("Stop storage services on all nodes"): cluster_state_controller.stop_all_storage_services() with allure.step("Delete metabase from all nodes"): for node in cluster_state_controller.cluster.storage_nodes: node.delete_metabase() with allure.step("Enable resync_metabase option for storage services"): for storage_node in cluster_state_controller.cluster.storage_nodes: with allure.step(f"Enable resync_metabase option for {storage_node}"): config_file_path, config = storage_node.get_shards_config() if not config["storage"]["shard"]["default"]["resync_metabase"]: file_keeper.add(storage_node, config_file_path) config["storage"]["shard"]["default"]["resync_metabase"] = True storage_node.save_config(config, config_file_path) with allure.step("Start storage services on all nodes"): cluster_state_controller.start_stopped_storage_services() with allure.step("Delete objects from bucket"): with allure.step("Delete simple object from bucket"): with expect_not_raises(): s3_client.delete_object(bucket, simple_object_key) with allure.step("Delete complex object from bucket"): with expect_not_raises(): s3_client.delete_object(bucket, complex_object_key) with allure.step("Delete bucket"): with expect_not_raises(): s3_client.delete_bucket(bucket) @allure.title("Write cache loss on one node should not affect shards and should not produce errors in log") @pytest.mark.write_cache_loss def test_write_cache_loss_on_one_node( self, node_under_test: ClusterNode, simple_object_size: ObjectSize, cluster_state_controller: ClusterStateController, shards_watcher: ShardsWatcher, default_wallet: str, test_start_time: datetime, after_run_return_all_stopped_services: str, ): exception_messages = [] allure.dynamic.description(after_run_return_all_stopped_services) with allure.step(f"Create container on node {node_under_test}"): locode = node_under_test.storage_node.get_un_locode() placement_rule = f"""REP 1 IN X CBF 1 SELECT 1 FROM C AS X FILTER 'UN-LOCODE' EQ '{locode}' AS C""" cid = create_container( default_wallet, self.shell, node_under_test.storage_node.get_rpc_endpoint(), rule=placement_rule, ) container = StorageContainer( StorageContainerInfo(cid, WalletInfo(default_wallet)), self.shell, cluster_state_controller.cluster, ) with allure.step(f"Put couple objects to container on node {node_under_test}"): storage_objects: list[StorageObjectInfo] = [] for _ in range(5): storage_object = container.generate_object( simple_object_size.value, endpoint=node_under_test.storage_node.get_rpc_endpoint(), ) storage_objects.append(storage_object) with allure.step("Take shards snapshot"): shards_watcher.take_shards_snapshot() with allure.step(f"Stop storage service on node {node_under_test}"): cluster_state_controller.stop_storage_service(node_under_test) with allure.step(f"Delete write cache from node {node_under_test}"): node_under_test.storage_node.delete_write_cache() with allure.step(f"Start storage service on node {node_under_test}"): cluster_state_controller.start_storage_service(node_under_test) with allure.step("Objects should be available"): for storage_object in storage_objects: get_object( storage_object.wallet_file_path, container.get_id(), storage_object.oid, self.shell, node_under_test.storage_node.get_rpc_endpoint(), ) with allure.step("No shards should have new errors"): shards_watcher.take_shards_snapshot() shards_with_errors = shards_watcher.get_shards_with_new_errors() if shards_with_errors: exception_messages.append(f"Shards have new errors: {shards_with_errors}") with allure.step("No shards should have degraded status"): snapshot = shards_watcher.get_shards_snapshot() for shard in snapshot: status = snapshot[shard]["mode"] if status != "read-write": exception_messages.append(f"Shard {shard} changed status to {status}") with allure.step("No related errors should be in log"): if node_under_test.host.is_message_in_logs( message_regex=r"\Wno such file or directory\W", since=test_start_time ): exception_messages.append(f"Node {node_under_test} have shard errors in logs") with allure.step("Pass test if no errors found"): assert not exception_messages, "\n".join(exception_messages) @allure.title( "Loss of one node should trigger use of tree and storage service in another node (s3_client={s3_client})" ) def test_s3_one_endpoint_loss( self, bucket, s3_client: S3ClientWrapper, simple_object_size: ObjectSize, after_run_return_all_stopped_services, cluster_state_controller: ClusterStateController, ): # TODO: need to check that s3 gate is connected to localhost (such metric will be supported in 1.3) with allure.step("Stop one node and wait for rebalance connection of s3 gate to storage service"): current_node = self.cluster.cluster_nodes[0] cluster_state_controller.stop_storage_service(current_node) # waiting for rebalance connection of s3 gate to storage service sleep(60) file_path = generate_file(simple_object_size.value) file_name = s3_helper.object_key_from_file_path(file_path) with allure.step("Put object into one bucket"): put_object = s3_client.put_object(bucket, file_path) s3_helper.check_objects_in_bucket(s3_client, bucket, expected_objects=[file_name]) @allure.title("After Pilorama.db loss on one node object is retrievable (s3_client={s3_client})") def test_s3_one_pilorama_loss( self, s3_client: S3ClientWrapper, simple_object_size: ObjectSize, cluster_state_controller: ClusterStateController, ): bucket = s3_client.create_bucket( location_constraint="load-1-4", grant_read="uri=http://acs.amazonaws.com/groups/global/AllUsers", ) s3_helper.set_bucket_versioning(s3_client, bucket, VersioningStatus.ENABLED) with allure.step("Check bucket versioning"): bucket_versioning = s3_client.get_bucket_versioning_status(bucket) assert bucket_versioning == "Enabled", "Bucket should have enabled versioning" file_path = generate_file(simple_object_size.value) file_name = s3_helper.object_key_from_file_path(file_path) object_versions = [] with allure.step("Put object into one bucket"): put_object = s3_client.put_object(bucket, file_path) s3_helper.check_objects_in_bucket(s3_client, bucket, expected_objects=[file_name]) object_versions.append(put_object) node_to_check = self.cluster.storage_nodes[0] piloramas_list_before_removing = {} with allure.step("Get list of all pilorama.db"): piloramas_list_before_removing = self.get_piloramas_list(node_to_check) with allure.step("Stop all storage nodes"): for node in self.cluster.cluster_nodes: with allure.step(f"Stop storage service on node: {node}"): cluster_state_controller.stop_storage_service(node) with allure.step("Delete pilorama.db from one node"): node_to_check.delete_pilorama() with allure.step("Start all storage nodes"): cluster_state_controller.start_stopped_storage_services() with allure.step("Tick epoch to trigger sync and then wait for 1 minute"): self.tick_epochs(1) sleep(120) piloramas_list_afrer_removing = {} with allure.step("Get list of all pilorama.db after sync"): piloramas_list_afrer_removing = self.get_piloramas_list(node_to_check) assert piloramas_list_afrer_removing == piloramas_list_before_removing, "List of pilorama.db is different" with allure.step("Check bucket versioning"): bucket_versioning = s3_client.get_bucket_versioning_status(bucket) assert bucket_versioning == "Enabled", "Bucket should have enabled versioning" with allure.step("Check list objects"): objects_list = s3_client.list_objects(bucket) assert objects_list, f"Expected not empty bucket" with allure.step("Delete the object from the bucket"): delete_object = s3_client.delete_object(bucket, file_name) assert "DeleteMarker" in delete_object.keys(), "Delete markers not found" with allure.step("Check list objects"): objects_list = s3_client.list_objects_versions(bucket) assert objects_list, f"Expected not empty bucket" object_versions.append(delete_object["VersionId"]) # and now delete all versions of object (including Delete Markers) with allure.step("Delete all versions of the object from the bucket"): for version in object_versions: delete_object = s3_client.delete_object(bucket, file_name, version_id=version) with allure.step("Check list objects"): objects_list = s3_client.list_objects_versions(bucket) assert not objects_list, f"Expected empty bucket" with allure.step("Delete bucket"): s3_client.delete_bucket(bucket)