import logging import os import random from time import sleep from typing import Optional, Tuple import allure import pytest from frostfs_testlib import reporter from frostfs_testlib.cli import FrostfsCli from frostfs_testlib.cli.netmap_parser import NetmapParser from frostfs_testlib.resources.cli import FROSTFS_CLI_EXEC from frostfs_testlib.resources.common import DEFAULT_WALLET_CONFIG, MORPH_BLOCK_TIME from frostfs_testlib.resources.error_patterns import OBJECT_NOT_FOUND from frostfs_testlib.resources.wellknown_acl import PUBLIC_ACL from frostfs_testlib.steps.cli.container import create_container, search_nodes_with_container from frostfs_testlib.steps.cli.object import ( delete_object, get_object, get_object_from_random_node, head_object, put_object, put_object_to_random_node, search_object, ) from frostfs_testlib.steps.node_management import ( check_node_in_map, delete_node_data, drop_object, exclude_node_from_network_map, get_locode_from_random_node, include_node_to_network_map, node_shard_list, node_shard_set_mode, storage_node_set_status, wait_for_node_to_be_ready, ) from frostfs_testlib.steps.storage_policy import get_nodes_with_object from frostfs_testlib.storage.cluster import ClusterNode, StorageNode from frostfs_testlib.storage.controllers import ClusterStateController from frostfs_testlib.storage.dataclasses.object_size import ObjectSize from frostfs_testlib.storage.dataclasses.storage_object_info import ModeNode from frostfs_testlib.storage.dataclasses.wallet import WalletInfo from frostfs_testlib.testing.cluster_test_base import ClusterTestBase from frostfs_testlib.utils import datetime_utils, string_utils from frostfs_testlib.utils.failover_utils import wait_object_replication from frostfs_testlib.utils.file_utils import generate_file from pytest_tests.helpers.utility import wait_for_gc_pass_on_storage_nodes logger = logging.getLogger("NeoLogger") check_nodes: list[StorageNode] = [] @allure.title("Add one node to cluster") @pytest.mark.add_nodes @pytest.mark.node_mgmt class TestNodeManagement(ClusterTestBase): @pytest.fixture @allure.title("Create container and pick the node with data") def create_container_and_pick_node( self, default_wallet: WalletInfo, simple_object_size: ObjectSize ) -> Tuple[str, StorageNode]: file_path = generate_file(simple_object_size.value) placement_rule = "REP 1 IN X CBF 1 SELECT 1 FROM * AS X" endpoint = self.cluster.default_rpc_endpoint cid = create_container( default_wallet, shell=self.shell, endpoint=endpoint, rule=placement_rule, basic_acl=PUBLIC_ACL, ) oid = put_object_to_random_node(default_wallet, file_path, cid, self.shell, self.cluster) nodes = get_nodes_with_object(cid, oid, shell=self.shell, nodes=self.cluster.storage_nodes) assert len(nodes) == 1 node = nodes[0] yield cid, node shards = node_shard_list(node) assert shards for shard in shards: node_shard_set_mode(node, shard, "read-write") node_shard_list(node) @reporter.step("Tick epoch with retries") def tick_epoch_with_retries(self, attempts: int = 3, timeout: int = 3, wait_block: int = None): for attempt in range(attempts): try: self.tick_epoch(wait_block=wait_block) except RuntimeError: sleep(timeout) if attempt >= attempts - 1: raise continue return @pytest.fixture def after_run_start_all_nodes(self): yield self.return_nodes() @pytest.fixture def return_nodes_after_test_run(self): yield self.return_nodes() @reporter.step("Return node to cluster") def return_nodes(self, alive_node: Optional[StorageNode] = None) -> None: for node in list(check_nodes): with reporter.step(f"Start node {node}"): node.start_service() with reporter.step(f"Waiting status ready for node {node}"): wait_for_node_to_be_ready(node) # We need to wait for node to establish notifications from morph-chain # Otherwise it will hang up when we will try to set status sleep(datetime_utils.parse_time(MORPH_BLOCK_TIME)) with reporter.step(f"Move node {node} to online state"): storage_node_set_status(node, status="online", retries=2) check_nodes.remove(node) sleep(datetime_utils.parse_time(MORPH_BLOCK_TIME)) self.tick_epoch_with_retries(3, wait_block=2) check_node_in_map(node, shell=self.shell, alive_node=alive_node) @allure.title("Add one node to cluster") @pytest.mark.add_nodes def test_add_nodes( self, default_wallet: WalletInfo, simple_object_size: ObjectSize, return_nodes_after_test_run, ): """ This test remove one node from frostfs_testlib.storage.cluster then add it back. Test uses base control operations with storage nodes (healthcheck, netmap-snapshot, set-status). """ wallet = default_wallet placement_rule_3 = "REP 3 IN X CBF 1 SELECT 3 FROM * AS X" placement_rule_4 = "REP 4 IN X CBF 1 SELECT 4 FROM * AS X" source_file_path = generate_file(simple_object_size.value) storage_nodes = self.cluster.storage_nodes random_node = random.choice(storage_nodes[1:]) alive_node = random.choice( [storage_node for storage_node in storage_nodes if storage_node.id != random_node.id] ) check_node_in_map(random_node, shell=self.shell, alive_node=alive_node) # Add node to recovery list before messing with it check_nodes.append(random_node) exclude_node_from_network_map(random_node, alive_node, shell=self.shell, cluster=self.cluster) delete_node_data(random_node) cid = create_container( wallet, rule=placement_rule_3, basic_acl=PUBLIC_ACL, shell=self.shell, endpoint=alive_node.get_rpc_endpoint(), ) oid = put_object( wallet, source_file_path, cid, shell=self.shell, endpoint=alive_node.get_rpc_endpoint(), ) wait_object_replication(cid, oid, 3, shell=self.shell, nodes=storage_nodes) self.return_nodes(alive_node) with reporter.step("Check data could be replicated to new node"): random_node = random.choice(list(set(storage_nodes) - {random_node, alive_node})) # Add node to recovery list before messing with it check_nodes.append(random_node) exclude_node_from_network_map(random_node, alive_node, shell=self.shell, cluster=self.cluster) wait_object_replication( cid, oid, 3, shell=self.shell, nodes=list(set(storage_nodes) - {random_node}), ) include_node_to_network_map(random_node, alive_node, shell=self.shell, cluster=self.cluster) wait_object_replication(cid, oid, 3, shell=self.shell, nodes=storage_nodes) with reporter.step("Check container could be created with new node"): cid = create_container( wallet, rule=placement_rule_4, basic_acl=PUBLIC_ACL, shell=self.shell, endpoint=alive_node.get_rpc_endpoint(), ) oid = put_object( wallet, source_file_path, cid, shell=self.shell, endpoint=alive_node.get_rpc_endpoint(), ) wait_object_replication(cid, oid, 4, shell=self.shell, nodes=storage_nodes) @pytest.mark.node_mgmt @allure.title("Drop object using control command") def test_drop_object(self, default_wallet, complex_object_size: ObjectSize, simple_object_size: ObjectSize): """ Test checks object could be dropped using `frostfs-cli control drop-objects` command. """ wallet = default_wallet endpoint = self.cluster.default_rpc_endpoint file_path_simple = generate_file(simple_object_size.value) file_path_complex = generate_file(complex_object_size.value) locode = get_locode_from_random_node(self.cluster) rule = f"REP 1 IN SE CBF 1 SELECT 1 FROM LOC AS SE FILTER 'UN-LOCODE' EQ '{locode}' AS LOC" cid = create_container(wallet, rule=rule, shell=self.shell, endpoint=endpoint) oid_simple = put_object_to_random_node(wallet, file_path_simple, cid, shell=self.shell, cluster=self.cluster) oid_complex = put_object_to_random_node(wallet, file_path_complex, cid, shell=self.shell, cluster=self.cluster) for oid in (oid_simple, oid_complex): get_object_from_random_node(wallet, cid, oid, shell=self.shell, cluster=self.cluster) head_object(wallet, cid, oid, shell=self.shell, endpoint=endpoint) nodes_with_object = get_nodes_with_object(cid, oid_simple, shell=self.shell, nodes=self.cluster.storage_nodes) random_node = random.choice(nodes_with_object) for oid in (oid_simple, oid_complex): with reporter.step(f"Drop object {oid}"): get_object_from_random_node(wallet, cid, oid, shell=self.shell, cluster=self.cluster) head_object(wallet, cid, oid, shell=self.shell, endpoint=endpoint) drop_object(random_node, cid, oid) self.wait_for_obj_dropped(wallet, cid, oid, endpoint, get_object) self.wait_for_obj_dropped(wallet, cid, oid, endpoint, head_object) @pytest.mark.node_mgmt @pytest.mark.skip(reason="Need to clarify scenario") @allure.title("Control Operations with storage nodes") def test_shards( self, default_wallet, create_container_and_pick_node, simple_object_size: ObjectSize, ): wallet = default_wallet file_path = generate_file(simple_object_size.value) cid, node = create_container_and_pick_node original_oid = put_object_to_random_node(wallet, file_path, cid, self.shell, self.cluster) # for mode in ('read-only', 'degraded'): for mode in ("degraded",): shards = node_shard_list(node) assert shards for shard in shards: node_shard_set_mode(node, shard, mode) shards = node_shard_list(node) assert shards with pytest.raises(RuntimeError): put_object_to_random_node(wallet, file_path, cid, self.shell, self.cluster) with pytest.raises(RuntimeError): delete_object(wallet, cid, original_oid, self.shell, self.cluster.default_rpc_endpoint) get_object_from_random_node(wallet, cid, original_oid, self.shell, self.cluster) for shard in shards: node_shard_set_mode(node, shard, "read-write") shards = node_shard_list(node) assert shards oid = put_object_to_random_node(wallet, file_path, cid, self.shell, self.cluster) delete_object(wallet, cid, oid, self.shell, self.cluster.default_rpc_endpoint) @pytest.mark.node_mgmt @allure.title("Put object with stopped node") def test_stop_node(self, default_wallet, return_nodes_after_test_run, simple_object_size: ObjectSize): wallet = default_wallet placement_rule = "REP 3 IN X SELECT 4 FROM * AS X" source_file_path = generate_file(simple_object_size.value) storage_nodes = self.cluster.storage_nodes random_node = random.choice(storage_nodes[1:]) alive_node = random.choice( [storage_node for storage_node in storage_nodes if storage_node.id != random_node.id] ) cid = create_container( wallet, rule=placement_rule, basic_acl=PUBLIC_ACL, shell=self.shell, endpoint=random_node.get_rpc_endpoint(), ) with reporter.step("Stop the random node"): check_nodes.append(random_node) random_node.stop_service() with reporter.step("Try to put an object and expect success"): put_object( wallet, source_file_path, cid, shell=self.shell, endpoint=alive_node.get_rpc_endpoint(), ) self.return_nodes(alive_node) @reporter.step("Wait for object to be dropped") def wait_for_obj_dropped(self, wallet: str, cid: str, oid: str, endpoint: str, checker) -> None: for _ in range(3): try: checker(wallet, cid, oid, shell=self.shell, endpoint=endpoint) wait_for_gc_pass_on_storage_nodes() except Exception as err: if string_utils.is_str_match_pattern(err, OBJECT_NOT_FOUND): return raise AssertionError(f'Expected "{OBJECT_NOT_FOUND}" error, got\n{err}') raise AssertionError(f"Object {oid} was not dropped from node") @pytest.mark.maintenance class TestMaintenanceMode(ClusterTestBase): change_node: ClusterNode = None @pytest.fixture() @allure.title("Init Frostfs CLI remote") def frostfs_cli_remote(self, node_under_test: ClusterNode) -> FrostfsCli: host = node_under_test.host service_config = host.get_service_config(node_under_test.storage_node.name) wallet_path = service_config.attributes["wallet_path"] wallet_password = service_config.attributes["wallet_password"] shell = host.get_shell() wallet_config_path = f"/tmp/{node_under_test.storage_node.name}-config.yaml" wallet_config = f'wallet: {wallet_path}\npassword: "{wallet_password}"' shell.exec(f"echo '{wallet_config}' > {wallet_config_path}") cli = FrostfsCli(shell=shell, frostfs_cli_exec_path=FROSTFS_CLI_EXEC, config_file=wallet_config_path) return cli @pytest.fixture() @allure.title("Init Frostfs CLI remote") def frostfs_cli(self) -> FrostfsCli: cli = FrostfsCli(shell=self.shell, frostfs_cli_exec_path=FROSTFS_CLI_EXEC, config_file=DEFAULT_WALLET_CONFIG) return cli @pytest.fixture() def restore_online_mode_node(self, cluster_state_controller: ClusterStateController, default_wallet: WalletInfo): yield cluster_state_controller.set_mode_node(cluster_node=self.change_node, wallet=default_wallet, status="online") self.tick_epoch(wait_block=2) def basic_operations(self, wallet, cid, oid, shell, endpoint, matchs, object_size): file_path = generate_file(object_size) default_kw = {"wallet": wallet, "cid": cid, "shell": shell, "endpoint": endpoint} operations = { get_object: {"oid": oid}, search_object: {}, delete_object: {"oid": oid}, put_object: {"path": file_path}, } for index, operation, kw in enumerate(operations.items()): with reporter.step(f"Run {operation.__name__} object, waiting response - {matchs[index]}"): default_kw.update(kw) if operation == search_object and "Found" in matchs[index]: operation(**default_kw) continue with pytest.raises(RuntimeError, match=matchs[index]): operation(**default_kw) os.remove(file_path) @allure.title("Test of basic node operations in maintenance mode") def test_maintenance_mode( self, default_wallet: WalletInfo, simple_object_size: ObjectSize, cluster_state_controller: ClusterStateController, restore_online_mode_node: None, ): with reporter.step("Create container and create\put object"): cid = create_container( wallet=default_wallet, shell=self.shell, endpoint=self.cluster.default_rpc_endpoint, rule="REP 1 CBF 1", ) node = search_nodes_with_container( wallet=default_wallet, cid=cid, shell=self.shell, endpoint=self.cluster.default_rpc_endpoint, cluster=self.cluster, ) self.change_node = node[0] file_path = generate_file(simple_object_size.value) oid = put_object( wallet=default_wallet, path=file_path, cid=cid, shell=self.shell, endpoint=self.change_node.storage_node.get_rpc_endpoint(), ) with reporter.step("Enable MaintenanceModeAllowed:"): cluster_state_controller.set_mode_node( cluster_node=self.change_node, wallet=default_wallet, status="maintenance" ) other_nodes = list(set(self.cluster.cluster_nodes) - set(node)) node_and_match = { self.change_node: ["node is under maintenance"] * 4, other_nodes[0]: [ "object not found", "Found 0 objects", "object not found", "node is under maintenance", ], } with reporter.step("Run basic operations"): for cluster_node, matchs in node_and_match.items(): self.basic_operations( wallet=default_wallet, cid=cid, oid=oid, shell=self.shell, endpoint=cluster_node.storage_node.get_rpc_endpoint(), matchs=matchs, object_size=simple_object_size.value, ) @pytest.mark.sanity @allure.title("MAINTENANCE and OFFLINE mode transitions") def test_mode_transitions( self, cluster_state_controller: ClusterStateController, node_under_test: ClusterNode, default_wallet: WalletInfo, frostfs_cli: FrostfsCli, restore_online_mode_node: None, ): self.change_node = node_under_test cluster_nodes = list(set(self.cluster.cluster_nodes) - {node_under_test}) with reporter.step("Tick epoch"): self.tick_epochs(epochs_to_tick=2, alive_node=cluster_nodes[0].storage_node, wait_block=2) with reporter.step("Set node mode to offline"): cluster_state_controller.set_mode_node( cluster_node=node_under_test, wallet=default_wallet, status=ModeNode.OFFLINE.value, ) with reporter.step("Tick epoch to update the network map"): self.tick_epochs(epochs_to_tick=2, alive_node=cluster_nodes[0].storage_node, wait_block=2) with reporter.step("Check node mode = offline, after update the network map"): netmap = frostfs_cli.netmap.snapshot( rpc_endpoint=cluster_nodes[0].storage_node.get_rpc_endpoint(), wallet=default_wallet ).stdout netmap = NetmapParser.snapshot_all_nodes(netmap) assert node_under_test.host_ip not in [ netmap_node.node for netmap_node in netmap ], f"Node {node_under_test.host_ip} not in state offline. netmap - {netmap}" with reporter.step("Restart storage service"): cluster_state_controller.stop_storage_service(node_under_test) cluster_state_controller.start_storage_service(node_under_test) with reporter.step("Tick epoch after restart storage service and set mode to offline"): self.tick_epochs(epochs_to_tick=2, alive_node=cluster_nodes[0].storage_node, wait_block=2) with (reporter.step("Check node mode = online, after restart storage service")): netmap = frostfs_cli.netmap.snapshot( rpc_endpoint=cluster_nodes[0].storage_node.get_rpc_endpoint(), wallet=default_wallet ).stdout node = NetmapParser.snapshot_one_node(netmap, node_under_test) assert ( node.node_status == ModeNode.ONLINE.value.upper() ), f"{node_under_test} actual state in netmap - {netmap}" with reporter.step("Tick epoch"): self.tick_epochs(epochs_to_tick=2, alive_node=cluster_nodes[0].storage_node, wait_block=2) with reporter.step("Set node mode to maintenance"): cluster_state_controller.set_mode_node( cluster_node=node_under_test, wallet=default_wallet, status=ModeNode.MAINTENANCE.value ) with reporter.step("Restart storage service"): cluster_state_controller.stop_storage_service(node_under_test) cluster_state_controller.start_storage_service(node_under_test) with reporter.step("Tick epoch after restart storage service"): self.tick_epochs(epochs_to_tick=2, alive_node=cluster_nodes[0].storage_node, wait_block=2) with reporter.step("Check node mode = maintenance, after restart storage service and tick epoch"): netmap = frostfs_cli.netmap.snapshot( rpc_endpoint=cluster_nodes[0].storage_node.get_rpc_endpoint(), wallet=default_wallet ).stdout node = NetmapParser.snapshot_one_node(netmap, node_under_test) assert node == ModeNode.MAINTENANCE.value.upper(), f"{node_under_test} actual state in netmap - {netmap}" with reporter.step("Tick epoch"): self.tick_epochs(epochs_to_tick=2, alive_node=cluster_nodes[0].storage_node, wait_block=2) with reporter.step("Set node mode to offline"): netmap = frostfs_cli.netmap.snapshot( rpc_endpoint=cluster_nodes[0].storage_node.get_rpc_endpoint(), wallet=default_wallet ).stdout node = NetmapParser.snapshot_one_node(netmap, node_under_test) assert ( node.node_status == ModeNode.OFFLINE.value.upper() ), f"{node_under_test} actual state in netmap - {netmap}" with reporter.step("Stop storage service"): cluster_state_controller.stop_storage_service(node_under_test) with reporter.step("Tick epoch"): self.tick_epochs(epochs_to_tick=2, alive_node=cluster_nodes[0].storage_node, wait_block=2) with reporter.step("Start storage service"): cluster_state_controller.start_storage_service(node_under_test) with reporter.step("Check node mode = offline, after tick epoch and start storage service"): netmap = frostfs_cli.netmap.snapshot( rpc_endpoint=cluster_nodes[0].storage_node.get_rpc_endpoint(), wallet=default_wallet ).stdout node = NetmapParser.snapshot_one_node(netmap, node_under_test) assert node == ModeNode.OFFLINE.value.upper(), f"{node_under_test} actual state in netmap - {netmap}" with reporter.step("Tick epoch"): self.tick_epochs(epochs_to_tick=2, alive_node=cluster_nodes[0].storage_node, wait_block=2) with reporter.step("Check node mode = online, after start storage service and tick epoch"): netmap = frostfs_cli.netmap.snapshot( rpc_endpoint=cluster_nodes[0].storage_node.get_rpc_endpoint(), wallet=default_wallet ).stdout node = NetmapParser.snapshot_one_node(netmap, node_under_test) assert ( node.node_status == ModeNode.ONLINE.value.upper() ), f"{node_under_test} actual state in netmap - {netmap}" with reporter.step("Tick epoch"): self.tick_epochs(epochs_to_tick=2, alive_node=cluster_nodes[0].storage_node, wait_block=2) with reporter.step("Set node mode to maintenance"): cluster_state_controller.set_mode_node( cluster_node=node_under_test, wallet=default_wallet, status=ModeNode.MAINTENANCE.value ) with reporter.step("Stop storage service"): cluster_state_controller.stop_storage_service(node_under_test) with reporter.step("Tick epoch"): self.tick_epochs(epochs_to_tick=2, alive_node=cluster_nodes[0].storage_node, wait_block=2) with reporter.step("Start storage service"): cluster_state_controller.start_storage_service(node_under_test) with reporter.step("Check node mode = maintenance"): netmap = frostfs_cli.netmap.snapshot( rpc_endpoint=cluster_nodes[0].storage_node.get_rpc_endpoint(), wallet=default_wallet ).stdout node = NetmapParser.snapshot_one_node(netmap, node_under_test) assert ( node.node_status == ModeNode.MAINTENANCE.value.upper() ), f"{node_under_test} actual state in netmap - {netmap}" with reporter.step("Tick epoch"): self.tick_epochs(epochs_to_tick=2, alive_node=cluster_nodes[0].storage_node, wait_block=2) with reporter.step("Check node mode = maintenance"): netmap = frostfs_cli.netmap.snapshot( rpc_endpoint=cluster_nodes[0].storage_node.get_rpc_endpoint(), wallet=default_wallet ).stdout node = NetmapParser.snapshot_one_node(netmap, node_under_test) assert ( node.node_status == ModeNode.MAINTENANCE.value.upper() ), f"{node_under_test} actual state in netmap - {netmap}" @allure.title("A node cannot go into maintenance if maintenance is prohibited globally in the network") def test_maintenance_globally_forbidden( self, cluster_state_controller: ClusterStateController, node_under_test: ClusterNode, frostfs_cli_remote: FrostfsCli, frostfs_cli: FrostfsCli, default_wallet: WalletInfo, restore_online_mode_node: None, ): self.change_node = node_under_test control_endpoint = node_under_test.service(StorageNode).get_control_endpoint() with reporter.step("Set MaintenanceModeAllowed = false"): cluster_state_controller.set_maintenance_mode_allowed("false", node_under_test) with reporter.step("Set status node - maintenance"): with pytest.raises(RuntimeError, match="maintenance mode is not allowed by the network"): frostfs_cli_remote.control.set_status(endpoint=control_endpoint, status="maintenance") with reporter.step("Set MaintenanceModeAllowed = true"): cluster_state_controller.set_maintenance_mode_allowed("true", node_under_test) with reporter.step("Set status node - maintenance"): output = frostfs_cli_remote.control.set_status(endpoint=control_endpoint, status="maintenance") assert "update request successfully sent" in output.stdout, f"Response = {output}" with reporter.step("Tick epoch"): self.tick_epoch(wait_block=2) with reporter.step("Check state node = maintenance "): netmap_node = NetmapParser.snapshot_one_node( frostfs_cli.netmap.snapshot( rpc_endpoint=node_under_test.storage_node.get_rpc_endpoint(), wallet=default_wallet ).stdout, node_under_test, ) assert ( netmap_node.node_status == ModeNode.MAINTENANCE.value.upper() ), f"Node actual state - {netmap_node.node_status}, expect - {ModeNode.MAINTENANCE.value}" with reporter.step("Set status node - online "): frostfs_cli_remote.control.set_status(endpoint=control_endpoint, status="online") with reporter.step("Tick epoch"): self.tick_epoch() with reporter.step("Check state node: online"): netmap_node = NetmapParser.snapshot_one_node( frostfs_cli.netmap.snapshot( rpc_endpoint=node_under_test.storage_node.get_rpc_endpoint(), wallet=default_wallet ).stdout, node_under_test, ) assert ( netmap_node.node_status == ModeNode.ONLINE.value.upper() ), f"Node actual state - {netmap_node.node_status}, expect - {ModeNode.ONLINE.value}"