diff --git a/pytest_tests/testsuites/failovers/test_failover_storage.py b/pytest_tests/testsuites/failovers/test_failover_storage.py index f90f423d..8200a6c3 100644 --- a/pytest_tests/testsuites/failovers/test_failover_storage.py +++ b/pytest_tests/testsuites/failovers/test_failover_storage.py @@ -9,11 +9,7 @@ from frostfs_testlib.resources.common import MORPH_BLOCK_TIME from frostfs_testlib.resources.wellknown_acl import PUBLIC_ACL from frostfs_testlib.s3 import AwsCliClient, Boto3ClientWrapper, S3ClientWrapper, VersioningStatus from frostfs_testlib.shell import CommandOptions, Shell -from frostfs_testlib.steps.cli.container import ( - StorageContainer, - StorageContainerInfo, - create_container, -) +from frostfs_testlib.steps.cli.container import StorageContainer, StorageContainerInfo, create_container from frostfs_testlib.steps.cli.object import get_object, put_object_to_random_node from frostfs_testlib.steps.node_management import ( check_node_in_map, @@ -32,10 +28,7 @@ from frostfs_testlib.storage.dataclasses.wallet import WalletInfo from frostfs_testlib.testing.cluster_test_base import ClusterTestBase from frostfs_testlib.testing.test_control import expect_not_raises from frostfs_testlib.utils import datetime_utils -from frostfs_testlib.utils.failover_utils import ( - wait_all_storage_nodes_returned, - wait_object_replication, -) +from frostfs_testlib.utils.failover_utils import wait_all_storage_nodes_returned, wait_object_replication from frostfs_testlib.utils.file_keeper import FileKeeper from frostfs_testlib.utils.file_utils import generate_file, get_file_hash @@ -116,12 +109,8 @@ class TestFailoverStorage(ClusterTestBase): rule=placement_rule, basic_acl=PUBLIC_ACL, ) - oid = put_object_to_random_node( - wallet, source_file_path, cid, shell=self.shell, cluster=self.cluster - ) - nodes = wait_object_replication( - cid, oid, 2, shell=self.shell, nodes=self.cluster.storage_nodes - ) + oid = put_object_to_random_node(wallet, source_file_path, cid, shell=self.shell, cluster=self.cluster) + nodes = wait_object_replication(cid, oid, 2, shell=self.shell, nodes=self.cluster.storage_nodes) for node in nodes: stopped_nodes.append(node) @@ -139,21 +128,15 @@ class TestFailoverStorage(ClusterTestBase): assert all(old_node not in new_nodes for old_node in nodes) with allure.step("Check object data is not corrupted"): - got_file_path = get_object( - wallet, cid, oid, endpoint=new_nodes[0].get_rpc_endpoint(), shell=self.shell - ) + got_file_path = get_object(wallet, cid, oid, endpoint=new_nodes[0].get_rpc_endpoint(), shell=self.shell) assert get_file_hash(source_file_path) == get_file_hash(got_file_path) with allure.step("Return all hosts"): return_stopped_hosts(self.shell, self.cluster) with allure.step("Check object data is not corrupted"): - new_nodes = wait_object_replication( - cid, oid, 2, shell=self.shell, nodes=self.cluster.storage_nodes - ) - got_file_path = get_object( - wallet, cid, oid, shell=self.shell, endpoint=new_nodes[0].get_rpc_endpoint() - ) + new_nodes = wait_object_replication(cid, oid, 2, shell=self.shell, nodes=self.cluster.storage_nodes) + got_file_path = get_object(wallet, cid, oid, shell=self.shell, endpoint=new_nodes[0].get_rpc_endpoint()) assert get_file_hash(source_file_path) == get_file_hash(got_file_path) @allure.title("Panic reboot nodes (sequenced_reboots={sequence})") @@ -172,13 +155,9 @@ class TestFailoverStorage(ClusterTestBase): rule=placement_rule, basic_acl=PUBLIC_ACL, ) - oid = put_object_to_random_node( - wallet, source_file_path, cid, shell=self.shell, cluster=self.cluster - ) + oid = put_object_to_random_node(wallet, source_file_path, cid, shell=self.shell, cluster=self.cluster) - nodes = wait_object_replication( - cid, oid, 2, shell=self.shell, nodes=self.cluster.storage_nodes - ) + nodes = wait_object_replication(cid, oid, 2, shell=self.shell, nodes=self.cluster.storage_nodes) allure.attach( "\n".join([str(node) for node in nodes]), "Current nodes with object", @@ -214,18 +193,14 @@ class TestFailoverStorage(ClusterTestBase): ) if not sequence: - new_nodes = wait_object_replication( - cid, oid, 2, shell=self.shell, nodes=self.cluster.storage_nodes - ) + new_nodes = wait_object_replication(cid, oid, 2, shell=self.shell, nodes=self.cluster.storage_nodes) allure.attach( "\n".join([str(new_node) for new_node in new_nodes]), "Nodes with object after nodes fail", allure.attachment_type.TEXT, ) - got_file_path = get_object( - wallet, cid, oid, shell=self.shell, endpoint=new_nodes[0].get_rpc_endpoint() - ) + got_file_path = get_object(wallet, cid, oid, shell=self.shell, endpoint=new_nodes[0].get_rpc_endpoint()) assert get_file_hash(source_file_path) == get_file_hash(got_file_path) @allure.title("Do not ignore unhealthy tree endpoints (s3_client={s3_client})") @@ -400,9 +375,7 @@ class TestEmptyMap(ClusterTestBase): node.stop_service() with allure.step("Remove all nodes from network map"): - remove_nodes_from_map_morph( - shell=self.shell, cluster=self.cluster, remove_nodes=stopped_nodes - ) + remove_nodes_from_map_morph(shell=self.shell, cluster=self.cluster, remove_nodes=stopped_nodes) with allure.step("Return all storage nodes to network map"): self.return_nodes_after_stop_with_check_empty_map(stopped_nodes) @@ -475,9 +448,7 @@ class TestEmptyMap(ClusterTestBase): with allure.step("Delete bucket"): s3_client.delete_bucket(bucket) - @allure.title( - "Object loss from fstree/blobovnicza (versioning=disabled, s3_client={s3_client})" - ) + @allure.title("Object loss from fstree/blobovnicza (versioning=disabled, s3_client={s3_client})") def test_s3_fstree_blobovnicza_loss_versioning_off( self, s3_client: S3ClientWrapper, @@ -619,7 +590,7 @@ class TestStorageDataLoss(ClusterTestBase): with allure.step("Enable resync_metabase option for storage services"): for storage_node in cluster_state_controller.cluster.storage_nodes: with allure.step(f"Enable resync_metabase option for {storage_node}"): - config_file_path, config = storage_node.get_config() + config_file_path, config = storage_node.get_shard_config_path() if not config["storage"]["shard"]["default"]["resync_metabase"]: file_keeper.add(storage_node, config_file_path) config["storage"]["shard"]["default"]["resync_metabase"] = True @@ -641,9 +612,7 @@ class TestStorageDataLoss(ClusterTestBase): with expect_not_raises(): s3_client.delete_bucket(bucket) - @allure.title( - "Write cache loss on one node should not affect shards and should not produce errors in log" - ) + @allure.title("Write cache loss on one node should not affect shards and should not produce errors in log") @pytest.mark.write_cache_loss def test_write_cache_loss_on_one_node( self, @@ -742,9 +711,7 @@ class TestStorageDataLoss(ClusterTestBase): ): # TODO: need to check that s3 gate is connected to localhost (such metric will be supported in 1.3) - with allure.step( - "Stop one node and wait for rebalance connection of s3 gate to storage service" - ): + with allure.step("Stop one node and wait for rebalance connection of s3 gate to storage service"): current_node = self.cluster.cluster_nodes[0] cluster_state_controller.stop_storage_service(current_node) # waiting for rebalance connection of s3 gate to storage service @@ -756,9 +723,7 @@ class TestStorageDataLoss(ClusterTestBase): put_object = s3_client.put_object(bucket, file_path) s3_helper.check_objects_in_bucket(s3_client, bucket, expected_objects=[file_name]) - @allure.title( - "After Pilorama.db loss on one node object is retrievable (s3_client={s3_client})" - ) + @allure.title("After Pilorama.db loss on one node object is retrievable (s3_client={s3_client})") def test_s3_one_pilorama_loss( self, s3_client: S3ClientWrapper, @@ -787,9 +752,7 @@ class TestStorageDataLoss(ClusterTestBase): node_to_check = self.cluster.storage_nodes[0] piloramas_list_before_removing = {} with allure.step("Get list of all pilorama.db"): - piloramas_list_before_removing = self.get_piloramas_list( - node_to_check, cluster_state_controller - ) + piloramas_list_before_removing = self.get_piloramas_list(node_to_check, cluster_state_controller) with allure.step("Stop all storage nodes"): for node in self.cluster.cluster_nodes: @@ -808,12 +771,8 @@ class TestStorageDataLoss(ClusterTestBase): piloramas_list_afrer_removing = {} with allure.step("Get list of all pilorama.db after sync"): - piloramas_list_afrer_removing = self.get_piloramas_list( - node_to_check, cluster_state_controller - ) - assert ( - piloramas_list_afrer_removing == piloramas_list_before_removing - ), "List of pilorama.db is different" + piloramas_list_afrer_removing = self.get_piloramas_list(node_to_check, cluster_state_controller) + assert piloramas_list_afrer_removing == piloramas_list_before_removing, "List of pilorama.db is different" with allure.step("Check bucket versioning"): bucket_versioning = s3_client.get_bucket_versioning_status(bucket) diff --git a/pytest_tests/testsuites/network/test_node_management.py b/pytest_tests/testsuites/network/test_node_management.py index 1e532c91..73efc0b9 100644 --- a/pytest_tests/testsuites/network/test_node_management.py +++ b/pytest_tests/testsuites/network/test_node_management.py @@ -39,10 +39,7 @@ from frostfs_testlib.utils import datetime_utils, string_utils from frostfs_testlib.utils.failover_utils import wait_object_replication from frostfs_testlib.utils.file_utils import generate_file -from pytest_tests.helpers.utility import ( - placement_policy_from_container, - wait_for_gc_pass_on_storage_nodes, -) +from pytest_tests.helpers.utility import placement_policy_from_container, wait_for_gc_pass_on_storage_nodes logger = logging.getLogger("NeoLogger") check_nodes: list[StorageNode] = [] @@ -85,10 +82,10 @@ class TestNodeManagement(ClusterTestBase): node_shard_list(node) @allure.step("Tick epoch with retries") - def tick_epoch_with_retries(self, attempts: int = 3, timeout: int = 3): + def tick_epoch_with_retries(self, attempts: int = 3, timeout: int = 3, wait_block: int = None): for attempt in range(attempts): try: - self.tick_epoch() + self.tick_epoch(wait_block=wait_block) except RuntimeError: sleep(timeout) if attempt >= attempts - 1: @@ -123,7 +120,7 @@ class TestNodeManagement(ClusterTestBase): check_nodes.remove(node) sleep(datetime_utils.parse_time(MORPH_BLOCK_TIME)) - self.tick_epoch_with_retries(3) + self.tick_epoch_with_retries(3, wait_block=2) check_node_in_map(node, shell=self.shell, alive_node=alive_node) @allure.title("Add one node to cluster") @@ -152,9 +149,7 @@ class TestNodeManagement(ClusterTestBase): # Add node to recovery list before messing with it check_nodes.append(random_node) - exclude_node_from_network_map( - random_node, alive_node, shell=self.shell, cluster=self.cluster - ) + exclude_node_from_network_map(random_node, alive_node, shell=self.shell, cluster=self.cluster) delete_node_data(random_node) cid = create_container( @@ -179,9 +174,7 @@ class TestNodeManagement(ClusterTestBase): random_node = random.choice(list(set(storage_nodes) - {random_node, alive_node})) # Add node to recovery list before messing with it check_nodes.append(random_node) - exclude_node_from_network_map( - random_node, alive_node, shell=self.shell, cluster=self.cluster - ) + exclude_node_from_network_map(random_node, alive_node, shell=self.shell, cluster=self.cluster) wait_object_replication( cid, @@ -190,9 +183,7 @@ class TestNodeManagement(ClusterTestBase): shell=self.shell, nodes=list(set(storage_nodes) - {random_node}), ) - include_node_to_network_map( - random_node, alive_node, shell=self.shell, cluster=self.cluster - ) + include_node_to_network_map(random_node, alive_node, shell=self.shell, cluster=self.cluster) wait_object_replication(cid, oid, 3, shell=self.shell, nodes=storage_nodes) with allure.step("Check container could be created with new node"): @@ -226,9 +217,7 @@ class TestNodeManagement(ClusterTestBase): ) @pytest.mark.node_mgmt @allure.title("Placement policy (copies={expected_copies}, policy={placement_rule})") - def test_placement_policy( - self, default_wallet, placement_rule, expected_copies, simple_object_size: ObjectSize - ): + def test_placement_policy(self, default_wallet, placement_rule, expected_copies, simple_object_size: ObjectSize): """ This test checks object's copies based on container's placement policy. """ @@ -301,38 +290,13 @@ class TestNodeManagement(ClusterTestBase): """ wallet = default_wallet file_path = generate_file(simple_object_size.value) - cid, oid, found_nodes = self.validate_object_copies( - wallet, placement_rule, file_path, expected_copies - ) + cid, oid, found_nodes = self.validate_object_copies(wallet, placement_rule, file_path, expected_copies) - assert ( - found_nodes == expected_nodes_id - ), f"Expected nodes {expected_nodes_id}, got {found_nodes}" - - @pytest.mark.parametrize( - "placement_rule,expected_copies", - [ - ("REP 2 IN X CBF 2 SELECT 6 FROM * AS X", 2), - ], - ) - @pytest.mark.node_mgmt - @allure.title("[NEGATIVE] Placement policy (policy={placement_rule})") - def test_placement_policy_negative( - self, default_wallet, placement_rule, expected_copies, simple_object_size: ObjectSize - ): - """ - Negative test for placement policy. - """ - wallet = default_wallet - file_path = generate_file(simple_object_size.value) - with pytest.raises(RuntimeError, match=".*not enough nodes to SELECT from.*"): - self.validate_object_copies(wallet, placement_rule, file_path, expected_copies) + assert found_nodes == expected_nodes_id, f"Expected nodes {expected_nodes_id}, got {found_nodes}" @pytest.mark.node_mgmt @allure.title("Drop object using control command") - def test_drop_object( - self, default_wallet, complex_object_size: ObjectSize, simple_object_size: ObjectSize - ): + def test_drop_object(self, default_wallet, complex_object_size: ObjectSize, simple_object_size: ObjectSize): """ Test checks object could be dropped using `frostfs-cli control drop-objects` command. """ @@ -342,29 +306,21 @@ class TestNodeManagement(ClusterTestBase): file_path_complex = generate_file(complex_object_size.value) locode = get_locode_from_random_node(self.cluster) - rule = f"REP 1 CBF 1 SELECT 1 FROM * FILTER 'UN-LOCODE' EQ '{locode}' AS LOC" + rule = f"REP 1 IN SE CBF 1 SELECT 1 FROM LOC AS SE FILTER 'UN-LOCODE' EQ '{locode}' AS LOC" cid = create_container(wallet, rule=rule, shell=self.shell, endpoint=endpoint) - oid_simple = put_object_to_random_node( - wallet, file_path_simple, cid, shell=self.shell, cluster=self.cluster - ) - oid_complex = put_object_to_random_node( - wallet, file_path_complex, cid, shell=self.shell, cluster=self.cluster - ) + oid_simple = put_object_to_random_node(wallet, file_path_simple, cid, shell=self.shell, cluster=self.cluster) + oid_complex = put_object_to_random_node(wallet, file_path_complex, cid, shell=self.shell, cluster=self.cluster) for oid in (oid_simple, oid_complex): get_object_from_random_node(wallet, cid, oid, shell=self.shell, cluster=self.cluster) head_object(wallet, cid, oid, shell=self.shell, endpoint=endpoint) - nodes_with_object = get_nodes_with_object( - cid, oid_simple, shell=self.shell, nodes=self.cluster.storage_nodes - ) + nodes_with_object = get_nodes_with_object(cid, oid_simple, shell=self.shell, nodes=self.cluster.storage_nodes) random_node = random.choice(nodes_with_object) for oid in (oid_simple, oid_complex): with allure.step(f"Drop object {oid}"): - get_object_from_random_node( - wallet, cid, oid, shell=self.shell, cluster=self.cluster - ) + get_object_from_random_node(wallet, cid, oid, shell=self.shell, cluster=self.cluster) head_object(wallet, cid, oid, shell=self.shell, endpoint=endpoint) drop_object(random_node, cid, oid) self.wait_for_obj_dropped(wallet, cid, oid, endpoint, get_object) @@ -400,9 +356,7 @@ class TestNodeManagement(ClusterTestBase): put_object_to_random_node(wallet, file_path, cid, self.shell, self.cluster) with pytest.raises(RuntimeError): - delete_object( - wallet, cid, original_oid, self.shell, self.cluster.default_rpc_endpoint - ) + delete_object(wallet, cid, original_oid, self.shell, self.cluster.default_rpc_endpoint) get_object_from_random_node(wallet, cid, original_oid, self.shell, self.cluster) @@ -417,11 +371,9 @@ class TestNodeManagement(ClusterTestBase): @pytest.mark.node_mgmt @allure.title("Put object with stopped node") - def test_stop_node( - self, default_wallet, return_nodes_after_test_run, simple_object_size: ObjectSize - ): + def test_stop_node(self, default_wallet, return_nodes_after_test_run, simple_object_size: ObjectSize): wallet = default_wallet - placement_rule = "REP 3 SELECT 4 FROM * AS X" + placement_rule = "REP 3 IN X SELECT 4 FROM * AS X" source_file_path = generate_file(simple_object_size.value) storage_nodes = self.cluster.storage_nodes random_node = random.choice(storage_nodes[1:]) @@ -454,18 +406,14 @@ class TestNodeManagement(ClusterTestBase): self, wallet: str, placement_rule: str, file_path: str, expected_copies: int ) -> set[int]: endpoint = self.cluster.default_rpc_endpoint - cid = create_container( - wallet, rule=placement_rule, basic_acl=PUBLIC_ACL, shell=self.shell, endpoint=endpoint - ) + cid = create_container(wallet, rule=placement_rule, basic_acl=PUBLIC_ACL, shell=self.shell, endpoint=endpoint) got_policy = placement_policy_from_container( get_container(wallet, cid, json_mode=False, shell=self.shell, endpoint=endpoint) ) assert got_policy.replace("'", "") == placement_rule.replace( "'", "" ), f"Expected \n{placement_rule} and got policy \n{got_policy} are the same" - oid = put_object_to_random_node( - wallet, file_path, cid, shell=self.shell, cluster=self.cluster - ) + oid = put_object_to_random_node(wallet, file_path, cid, shell=self.shell, cluster=self.cluster) nodes = get_nodes_with_object(cid, oid, shell=self.shell, nodes=self.cluster.storage_nodes) nodes_id = {node.id for node in nodes} assert len(nodes) == expected_copies, f"Expected {expected_copies} copies, got {len(nodes)}" @@ -477,22 +425,15 @@ class TestNodeManagement(ClusterTestBase): for _ in range(attempts): try: health_check = storage_node_healthcheck(node) - assert ( - health_check.health_status == "READY" - and health_check.network_status == "ONLINE" - ) + assert health_check.health_status == "READY" and health_check.network_status == "ONLINE" return except Exception as err: logger.warning(f"Node {node} is not online:\n{err}") sleep(timeout) - raise AssertionError( - f"Node {node} hasn't gone to the READY and ONLINE state after {timeout * attempts} second" - ) + raise AssertionError(f"Node {node} hasn't gone to the READY and ONLINE state after {timeout * attempts} second") @allure.step("Wait for {expected_copies} object copies in the wallet") - def wait_for_expected_object_copies( - self, wallet: str, cid: str, oid: str, expected_copies: int = 2 - ) -> None: + def wait_for_expected_object_copies(self, wallet: str, cid: str, oid: str, expected_copies: int = 2) -> None: nodes = self.cluster.storage_nodes for _ in range(2): copies = get_simple_object_copies(wallet, cid, oid, self.shell, nodes)