import logging import random from time import sleep from typing import Optional, Tuple import allure import pytest from frostfs_testlib.resources.common import FROSTFS_CONTRACT_CACHE_TIMEOUT, MORPH_BLOCK_TIME from frostfs_testlib.resources.error_patterns import OBJECT_NOT_FOUND from frostfs_testlib.resources.wellknown_acl import PUBLIC_ACL from frostfs_testlib.steps.cli.container import create_container, get_container from frostfs_testlib.steps.cli.object import ( delete_object, get_object, get_object_from_random_node, head_object, put_object, put_object_to_random_node, ) from frostfs_testlib.steps.epoch import tick_epoch from frostfs_testlib.steps.node_management import ( check_node_in_map, delete_node_data, drop_object, exclude_node_from_network_map, get_locode_from_random_node, include_node_to_network_map, node_shard_list, node_shard_set_mode, storage_node_healthcheck, storage_node_set_status, wait_for_node_to_be_ready, ) from frostfs_testlib.steps.storage_policy import get_nodes_with_object, get_simple_object_copies from frostfs_testlib.storage.cluster import StorageNode from frostfs_testlib.storage.dataclasses.object_size import ObjectSize from frostfs_testlib.testing.cluster_test_base import ClusterTestBase from frostfs_testlib.utils import datetime_utils, string_utils from frostfs_testlib.utils.failover_utils import wait_object_replication from frostfs_testlib.utils.file_utils import generate_file from pytest_tests.helpers.utility import ( placement_policy_from_container, wait_for_gc_pass_on_storage_nodes, ) logger = logging.getLogger("NeoLogger") check_nodes: list[StorageNode] = [] @allure.title("Add one node to cluster") @pytest.mark.add_nodes @pytest.mark.node_mgmt class TestNodeManagement(ClusterTestBase): @pytest.fixture @allure.title("Create container and pick the node with data") def create_container_and_pick_node( self, default_wallet: str, simple_object_size: ObjectSize ) -> Tuple[str, StorageNode]: file_path = generate_file(simple_object_size.value) placement_rule = "REP 1 IN X CBF 1 SELECT 1 FROM * AS X" endpoint = self.cluster.default_rpc_endpoint cid = create_container( default_wallet, shell=self.shell, endpoint=endpoint, rule=placement_rule, basic_acl=PUBLIC_ACL, ) oid = put_object_to_random_node(default_wallet, file_path, cid, self.shell, self.cluster) nodes = get_nodes_with_object(cid, oid, shell=self.shell, nodes=self.cluster.storage_nodes) assert len(nodes) == 1 node = nodes[0] yield cid, node shards = node_shard_list(node) assert shards for shard in shards: node_shard_set_mode(node, shard, "read-write") node_shard_list(node) @allure.step("Tick epoch with retries") def tick_epoch_with_retries(self, attempts: int = 3, timeout: int = 3): for attempt in range(attempts): try: self.tick_epoch() except RuntimeError: sleep(timeout) if attempt >= attempts - 1: raise continue return @pytest.fixture def after_run_start_all_nodes(self): yield self.return_nodes() @pytest.fixture def return_nodes_after_test_run(self): yield self.return_nodes() @allure.step("Return node to cluster") def return_nodes(self, alive_node: Optional[StorageNode] = None) -> None: for node in list(check_nodes): with allure.step(f"Start node {node}"): node.start_service() with allure.step(f"Waiting status ready for node {node}"): wait_for_node_to_be_ready(node) # We need to wait for node to establish notifications from morph-chain # Otherwise it will hang up when we will try to set status sleep(datetime_utils.parse_time(MORPH_BLOCK_TIME)) with allure.step(f"Move node {node} to online state"): storage_node_set_status(node, status="online", retries=2) check_nodes.remove(node) sleep(datetime_utils.parse_time(MORPH_BLOCK_TIME)) self.tick_epoch_with_retries(3) check_node_in_map(node, shell=self.shell, alive_node=alive_node) @allure.title("Add one node to cluster") @pytest.mark.add_nodes def test_add_nodes( self, default_wallet: str, simple_object_size: ObjectSize, return_nodes_after_test_run, ): """ This test remove one node from frostfs_testlib.storage.cluster then add it back. Test uses base control operations with storage nodes (healthcheck, netmap-snapshot, set-status). """ wallet = default_wallet placement_rule_3 = "REP 3 IN X CBF 1 SELECT 3 FROM * AS X" placement_rule_4 = "REP 4 IN X CBF 1 SELECT 4 FROM * AS X" source_file_path = generate_file(simple_object_size.value) storage_nodes = self.cluster.storage_nodes random_node = random.choice(storage_nodes[1:]) alive_node = random.choice( [storage_node for storage_node in storage_nodes if storage_node.id != random_node.id] ) check_node_in_map(random_node, shell=self.shell, alive_node=alive_node) # Add node to recovery list before messing with it check_nodes.append(random_node) exclude_node_from_network_map( random_node, alive_node, shell=self.shell, cluster=self.cluster ) delete_node_data(random_node) cid = create_container( wallet, rule=placement_rule_3, basic_acl=PUBLIC_ACL, shell=self.shell, endpoint=alive_node.get_rpc_endpoint(), ) oid = put_object( wallet, source_file_path, cid, shell=self.shell, endpoint=alive_node.get_rpc_endpoint(), ) wait_object_replication(cid, oid, 3, shell=self.shell, nodes=storage_nodes) self.return_nodes(alive_node) with allure.step("Check data could be replicated to new node"): random_node = random.choice(list(set(storage_nodes) - {random_node, alive_node})) # Add node to recovery list before messing with it check_nodes.append(random_node) exclude_node_from_network_map( random_node, alive_node, shell=self.shell, cluster=self.cluster ) wait_object_replication( cid, oid, 3, shell=self.shell, nodes=list(set(storage_nodes) - {random_node}), ) include_node_to_network_map( random_node, alive_node, shell=self.shell, cluster=self.cluster ) wait_object_replication(cid, oid, 3, shell=self.shell, nodes=storage_nodes) with allure.step("Check container could be created with new node"): cid = create_container( wallet, rule=placement_rule_4, basic_acl=PUBLIC_ACL, shell=self.shell, endpoint=alive_node.get_rpc_endpoint(), ) oid = put_object( wallet, source_file_path, cid, shell=self.shell, endpoint=alive_node.get_rpc_endpoint(), ) wait_object_replication(cid, oid, 4, shell=self.shell, nodes=storage_nodes) @pytest.mark.parametrize( "placement_rule,expected_copies", [ ("REP 2 IN X CBF 2 SELECT 2 FROM * AS X", 2), ("REP 2 IN X CBF 1 SELECT 2 FROM * AS X", 2), ("REP 3 IN X CBF 1 SELECT 3 FROM * AS X", 3), ("REP 1 IN X CBF 1 SELECT 1 FROM * AS X", 1), ("REP 1 IN X CBF 2 SELECT 1 FROM * AS X", 1), ("REP 4 IN X CBF 1 SELECT 4 FROM * AS X", 4), ("REP 2 IN X CBF 1 SELECT 4 FROM * AS X", 2), ], ) @pytest.mark.node_mgmt @allure.title("Placement policy (copies={expected_copies}, policy={placement_rule})") def test_placement_policy( self, default_wallet, placement_rule, expected_copies, simple_object_size: ObjectSize ): """ This test checks object's copies based on container's placement policy. """ wallet = default_wallet file_path = generate_file(simple_object_size.value) self.validate_object_copies(wallet, placement_rule, file_path, expected_copies) @pytest.mark.parametrize( "placement_rule,expected_copies,expected_nodes_id", [ ("REP 4 IN X CBF 1 SELECT 4 FROM * AS X", 4, {1, 2, 3, 4}), ( "REP 1 IN LOC_PLACE CBF 1 SELECT 1 FROM LOC_SW AS LOC_PLACE FILTER Country EQ Sweden AS LOC_SW", 1, {3}, ), ( "REP 1 IN SEL_SPB CBF 1 SELECT 1 FROM LOC_SPB AS SEL_SPB FILTER 'UN-LOCODE' EQ 'RU LED' AS LOC_SPB", 1, {2}, ), ( "REP 1 IN LOC_SPB_PLACE REP 1 IN LOC_MSK_PLACE CBF 1 SELECT 1 FROM LOC_SPB AS LOC_SPB_PLACE " "SELECT 1 FROM LOC_MSK AS LOC_MSK_PLACE " "FILTER 'UN-LOCODE' EQ 'RU LED' AS LOC_SPB FILTER 'UN-LOCODE' EQ 'RU MOW' AS LOC_MSK", 2, {1, 2}, ), ( "REP 4 CBF 1 SELECT 4 FROM LOC_EU FILTER Continent EQ Europe AS LOC_EU", 4, {1, 2, 3, 4}, ), ( "REP 1 IN SEL_SPB CBF 1 SELECT 1 FROM LOC_SPB AS SEL_SPB " "FILTER 'UN-LOCODE' NE 'RU MOW' AND 'UN-LOCODE' NE 'SE STO' AND 'UN-LOCODE' NE 'FI HEL' AS LOC_SPB", 1, {2}, ), ( "REP 2 IN SEL CBF 1 SELECT 2 FROM LOC_RU AS SEL FILTER SubDivCode NE 'AB' AND SubDivCode NE '18' AS LOC_RU", 2, {1, 2}, ), ( "REP 2 IN SEL_EU CBF 1 SELECT 2 FROM LOC_RU AS SEL_EU FILTER Country EQ 'Russia' AS LOC_RU", 2, {1, 2}, ), ( "REP 2 IN SEL_EU CBF 1 SELECT 2 FROM LOC_EU AS SEL_EU FILTER Country NE 'Russia' AS LOC_EU", 2, {3, 4}, ), ], ) @pytest.mark.node_mgmt @allure.title("Placement policy (nodes_id={expected_nodes_id}, policy={placement_rule})") def test_placement_policy_with_nodes( self, default_wallet, placement_rule, expected_copies, expected_nodes_id: set[int], simple_object_size: ObjectSize, ): """ Based on container's placement policy check that storage nodes are piked correctly and object has correct copies amount. """ wallet = default_wallet file_path = generate_file(simple_object_size.value) cid, oid, found_nodes = self.validate_object_copies( wallet, placement_rule, file_path, expected_copies ) assert ( found_nodes == expected_nodes_id ), f"Expected nodes {expected_nodes_id}, got {found_nodes}" @pytest.mark.parametrize( "placement_rule,expected_copies", [ ("REP 2 IN X CBF 2 SELECT 6 FROM * AS X", 2), ], ) @pytest.mark.node_mgmt @allure.title("[NEGATIVE] Placement policy (policy={placement_rule})") def test_placement_policy_negative( self, default_wallet, placement_rule, expected_copies, simple_object_size: ObjectSize ): """ Negative test for placement policy. """ wallet = default_wallet file_path = generate_file(simple_object_size.value) with pytest.raises(RuntimeError, match=".*not enough nodes to SELECT from.*"): self.validate_object_copies(wallet, placement_rule, file_path, expected_copies) @pytest.mark.node_mgmt @allure.title("Drop object using control command") def test_drop_object( self, default_wallet, complex_object_size: ObjectSize, simple_object_size: ObjectSize ): """ Test checks object could be dropped using `frostfs-cli control drop-objects` command. """ wallet = default_wallet endpoint = self.cluster.default_rpc_endpoint file_path_simple = generate_file(simple_object_size.value) file_path_complex = generate_file(complex_object_size.value) locode = get_locode_from_random_node(self.cluster) rule = f"REP 1 CBF 1 SELECT 1 FROM * FILTER 'UN-LOCODE' EQ '{locode}' AS LOC" cid = create_container(wallet, rule=rule, shell=self.shell, endpoint=endpoint) oid_simple = put_object_to_random_node( wallet, file_path_simple, cid, shell=self.shell, cluster=self.cluster ) oid_complex = put_object_to_random_node( wallet, file_path_complex, cid, shell=self.shell, cluster=self.cluster ) for oid in (oid_simple, oid_complex): get_object_from_random_node(wallet, cid, oid, shell=self.shell, cluster=self.cluster) head_object(wallet, cid, oid, shell=self.shell, endpoint=endpoint) nodes_with_object = get_nodes_with_object( cid, oid_simple, shell=self.shell, nodes=self.cluster.storage_nodes ) random_node = random.choice(nodes_with_object) for oid in (oid_simple, oid_complex): with allure.step(f"Drop object {oid}"): get_object_from_random_node( wallet, cid, oid, shell=self.shell, cluster=self.cluster ) head_object(wallet, cid, oid, shell=self.shell, endpoint=endpoint) drop_object(random_node, cid, oid) self.wait_for_obj_dropped(wallet, cid, oid, endpoint, get_object) self.wait_for_obj_dropped(wallet, cid, oid, endpoint, head_object) @pytest.mark.node_mgmt @pytest.mark.skip(reason="Need to clarify scenario") @allure.title("Control Operations with storage nodes") def test_shards( self, default_wallet, create_container_and_pick_node, simple_object_size: ObjectSize, ): wallet = default_wallet file_path = generate_file(simple_object_size.value) cid, node = create_container_and_pick_node original_oid = put_object_to_random_node(wallet, file_path, cid, self.shell, self.cluster) # for mode in ('read-only', 'degraded'): for mode in ("degraded",): shards = node_shard_list(node) assert shards for shard in shards: node_shard_set_mode(node, shard, mode) shards = node_shard_list(node) assert shards with pytest.raises(RuntimeError): put_object_to_random_node(wallet, file_path, cid, self.shell, self.cluster) with pytest.raises(RuntimeError): delete_object( wallet, cid, original_oid, self.shell, self.cluster.default_rpc_endpoint ) get_object_from_random_node(wallet, cid, original_oid, self.shell, self.cluster) for shard in shards: node_shard_set_mode(node, shard, "read-write") shards = node_shard_list(node) assert shards oid = put_object_to_random_node(wallet, file_path, cid, self.shell, self.cluster) delete_object(wallet, cid, oid, self.shell, self.cluster.default_rpc_endpoint) @pytest.mark.node_mgmt @allure.title("Put object with stopped node") def test_stop_node( self, default_wallet, return_nodes_after_test_run, simple_object_size: ObjectSize ): wallet = default_wallet placement_rule = "REP 3 SELECT 4 FROM * AS X" source_file_path = generate_file(simple_object_size.value) storage_nodes = self.cluster.storage_nodes random_node = random.choice(storage_nodes[1:]) alive_node = random.choice( [storage_node for storage_node in storage_nodes if storage_node.id != random_node.id] ) cid = create_container( wallet, rule=placement_rule, basic_acl=PUBLIC_ACL, shell=self.shell, endpoint=random_node.get_rpc_endpoint(), ) with allure.step("Stop the random node"): check_nodes.append(random_node) random_node.stop_service() with allure.step("Try to put an object and expect success"): put_object( wallet, source_file_path, cid, shell=self.shell, endpoint=alive_node.get_rpc_endpoint(), ) self.return_nodes(alive_node) @allure.step("Validate object has {expected_copies} copies") def validate_object_copies( self, wallet: str, placement_rule: str, file_path: str, expected_copies: int ) -> set[int]: endpoint = self.cluster.default_rpc_endpoint cid = create_container( wallet, rule=placement_rule, basic_acl=PUBLIC_ACL, shell=self.shell, endpoint=endpoint ) got_policy = placement_policy_from_container( get_container(wallet, cid, json_mode=False, shell=self.shell, endpoint=endpoint) ) assert got_policy.replace("'", "") == placement_rule.replace( "'", "" ), f"Expected \n{placement_rule} and got policy \n{got_policy} are the same" oid = put_object_to_random_node( wallet, file_path, cid, shell=self.shell, cluster=self.cluster ) nodes = get_nodes_with_object(cid, oid, shell=self.shell, nodes=self.cluster.storage_nodes) nodes_id = {node.id for node in nodes} assert len(nodes) == expected_copies, f"Expected {expected_copies} copies, got {len(nodes)}" return cid, oid, nodes_id @allure.step("Wait for node {node} goes online") def wait_for_node_go_online(self, node: StorageNode) -> None: timeout, attempts = 5, 20 for _ in range(attempts): try: health_check = storage_node_healthcheck(node) assert ( health_check.health_status == "READY" and health_check.network_status == "ONLINE" ) return except Exception as err: logger.warning(f"Node {node} is not online:\n{err}") sleep(timeout) raise AssertionError( f"Node {node} hasn't gone to the READY and ONLINE state after {timeout * attempts} second" ) @allure.step("Wait for {expected_copies} object copies in the wallet") def wait_for_expected_object_copies( self, wallet: str, cid: str, oid: str, expected_copies: int = 2 ) -> None: nodes = self.cluster.storage_nodes for _ in range(2): copies = get_simple_object_copies(wallet, cid, oid, self.shell, nodes) if copies == expected_copies: break tick_epoch(self.shell, self.cluster) sleep(datetime_utils.parse_time(FROSTFS_CONTRACT_CACHE_TIMEOUT)) else: raise AssertionError(f"There are no {expected_copies} copies during time") @allure.step("Wait for object to be dropped") def wait_for_obj_dropped(self, wallet: str, cid: str, oid: str, endpoint: str, checker) -> None: for _ in range(3): try: checker(wallet, cid, oid, shell=self.shell, endpoint=endpoint) wait_for_gc_pass_on_storage_nodes() except Exception as err: if string_utils.is_str_match_pattern(err, OBJECT_NOT_FOUND): return raise AssertionError(f'Expected "{OBJECT_NOT_FOUND}" error, got\n{err}') raise AssertionError(f"Object {oid} was not dropped from node")