From 269caf8e03415ff1707624e3daa47e3b3b3744f7 Mon Sep 17 00:00:00 2001 From: Dmitriy Zayakin Date: Fri, 24 Nov 2023 13:22:00 +0300 Subject: [PATCH] [#154] Add new test Signed-off-by: Dmitriy Zayakin --- pytest_tests/testsuites/conftest.py | 3 +- pytest_tests/testsuites/failovers/conftest.py | 11 + .../failovers/test_failover_server.py | 178 ++++- .../failovers/test_failover_storage.py | 345 +--------- .../management/test_node_management.py | 651 ++++++++++++++++++ .../network/test_node_management.py | 322 --------- 6 files changed, 833 insertions(+), 677 deletions(-) create mode 100644 pytest_tests/testsuites/management/test_node_management.py delete mode 100644 pytest_tests/testsuites/network/test_node_management.py diff --git a/pytest_tests/testsuites/conftest.py b/pytest_tests/testsuites/conftest.py index 12549c8..0a145be 100644 --- a/pytest_tests/testsuites/conftest.py +++ b/pytest_tests/testsuites/conftest.py @@ -53,7 +53,8 @@ def pytest_collection_modifyitems(items: list[pytest.Item]): def priority(item: pytest.Item) -> int: is_node_mgmt_test = 1 if item.get_closest_marker("node_mgmt") else 0 is_logs_check_test = 100 if item.get_closest_marker("logs_after_session") else 0 - return is_node_mgmt_test + is_logs_check_test + is_system_time_test = 10 if item.get_closest_marker("system_time") else 0 + return is_node_mgmt_test + is_logs_check_test + is_system_time_test items.sort(key=lambda item: priority(item)) diff --git a/pytest_tests/testsuites/failovers/conftest.py b/pytest_tests/testsuites/failovers/conftest.py index b1f0695..c5732c6 100644 --- a/pytest_tests/testsuites/failovers/conftest.py +++ b/pytest_tests/testsuites/failovers/conftest.py @@ -1,3 +1,4 @@ +import os import random from datetime import datetime @@ -5,6 +6,8 @@ import allure import pytest from frostfs_testlib.storage.cluster import Cluster, ClusterNode from frostfs_testlib.storage.controllers import ShardsWatcher +from frostfs_testlib.storage.dataclasses.object_size import ObjectSize +from frostfs_testlib.utils.file_utils import generate_file @pytest.fixture() @@ -27,3 +30,11 @@ def shards_watcher(node_under_test: ClusterNode) -> ShardsWatcher: def test_start_time() -> datetime: start_time = datetime.utcnow() return start_time + + +@pytest.fixture() +@allure.title("Generate simple size file") +def simple_file(simple_object_size: ObjectSize) -> str: + path_file = generate_file(size=simple_object_size.value) + yield path_file + os.remove(path_file) diff --git a/pytest_tests/testsuites/failovers/test_failover_server.py b/pytest_tests/testsuites/failovers/test_failover_server.py index d01805a..c28b6a0 100644 --- a/pytest_tests/testsuites/failovers/test_failover_server.py +++ b/pytest_tests/testsuites/failovers/test_failover_server.py @@ -1,12 +1,14 @@ import logging import os.path import random +import time import allure import pytest +from frostfs_testlib.resources.common import MORPH_BLOCK_TIME from frostfs_testlib.resources.wellknown_acl import PUBLIC_ACL from frostfs_testlib.steps.cli.container import StorageContainer, StorageContainerInfo, create_container -from frostfs_testlib.steps.cli.object import get_object +from frostfs_testlib.steps.cli.object import get_object, get_object_nodes, put_object from frostfs_testlib.steps.node_management import check_node_in_map, check_node_not_in_map from frostfs_testlib.storage.cluster import ClusterNode, StorageNode from frostfs_testlib.storage.controllers import ClusterStateController @@ -15,6 +17,7 @@ from frostfs_testlib.storage.dataclasses.storage_object_info import StorageObjec from frostfs_testlib.storage.dataclasses.wallet import WalletInfo from frostfs_testlib.testing.cluster_test_base import ClusterTestBase from frostfs_testlib.testing.test_control import wait_for_success +from frostfs_testlib.utils import datetime_utils from frostfs_testlib.utils.failover_utils import wait_object_replication from frostfs_testlib.utils.file_utils import get_file_hash from pytest import FixtureRequest @@ -61,6 +64,22 @@ class TestFailoverServer(ClusterTestBase): return containers + @allure.step("Creation container") + @pytest.fixture() + def container(self, default_wallet: str) -> StorageContainer: + select = len(self.cluster.cluster_nodes) + placement_rule = f"REP {select - 1} CBF 1 SELECT {select} FROM *" + cont_id = create_container( + default_wallet, + shell=self.shell, + endpoint=self.cluster.default_rpc_endpoint, + rule=placement_rule, + basic_acl=PUBLIC_ACL, + ) + wallet = WalletInfo(path=default_wallet) + storage_cont_info = StorageContainerInfo(id=cont_id, wallet_file=wallet) + return StorageContainer(storage_container_info=storage_cont_info, shell=self.shell, cluster=self.cluster) + @allure.step("Create object and delete after test") @pytest.fixture(scope="class") def storage_objects( @@ -96,20 +115,25 @@ class TestFailoverServer(ClusterTestBase): self, nodes: list[StorageNode], storage_objects: list[StorageObjectInfo] ) -> list[StorageObjectInfo]: corrupted_objects = [] + errors_get = [] for node in nodes: for storage_object in storage_objects: - got_file_path = get_object( - storage_object.wallet_file_path, - storage_object.cid, - storage_object.oid, - endpoint=node.get_rpc_endpoint(), - shell=self.shell, - timeout="60s", - ) - if storage_object.file_hash != get_file_hash(got_file_path): - corrupted_objects.append(storage_object) - os.remove(got_file_path) + try: + got_file_path = get_object( + storage_object.wallet_file_path, + storage_object.cid, + storage_object.oid, + endpoint=node.get_rpc_endpoint(), + shell=self.shell, + timeout="60s", + ) + if storage_object.file_hash != get_file_hash(got_file_path): + corrupted_objects.append(storage_object) + os.remove(got_file_path) + except RuntimeError: + errors_get.append(storage_object.oid) + assert len(errors_get) == 0, f"Get failed - {errors_get}" return corrupted_objects def check_objects_replication( @@ -126,6 +150,26 @@ class TestFailoverServer(ClusterTestBase): attempts=60, ) + @pytest.fixture() + def object_and_nodes( + self, simple_object_size: ObjectSize, container: StorageContainer, default_wallet: str + ) -> tuple[StorageObjectInfo, list[ClusterNode]]: + object_info = container.generate_object(simple_object_size.value) + object_nodes = get_object_nodes( + cluster=self.cluster, + wallet=default_wallet, + cid=object_info.cid, + oid=object_info.oid, + shell=self.shell, + endpoint=self.cluster.default_rpc_endpoint, + ) + return object_info, object_nodes + + @pytest.fixture() + def up_stop_nodes(self, cluster_state_controller: ClusterStateController): + yield + cluster_state_controller.start_stopped_hosts() + @allure.title("Full shutdown node") @pytest.mark.parametrize("containers, storage_objects", [(5, 10)], indirect=True) def test_complete_node_shutdown( @@ -157,7 +201,7 @@ class TestFailoverServer(ClusterTestBase): count_tick_epoch = int(alive_nodes[0].ir_node.get_netmap_cleaner_threshold()) + 4 with allure.step(f"Tick {count_tick_epoch} epochs and wait for 2 blocks"): - self.tick_epochs(count_tick_epoch, wait_block=2) + self.tick_epochs(count_tick_epoch, storage_nodes[0], wait_block=2) with allure.step(f"Check {node_to_stop} in not map"): self.wait_node_not_in_map(node_to_stop.storage_node, self.shell, alive_node=storage_nodes[0]) @@ -195,3 +239,111 @@ class TestFailoverServer(ClusterTestBase): with allure.step("Verify that there are no corrupted objects"): corrupted_objects_list = self.get_corrupted_objects_list(storage_nodes, storage_objects) assert not corrupted_objects_list + + @allure.title("Not enough nodes in the container with policy - 'REP 3 CBF 1 SELECT 4 FROM *'") + def test_not_enough_nodes_in_container_rep_3( + self, + container: list[StorageContainer], + object_and_nodes: tuple[StorageObjectInfo, list[ClusterNode]], + default_wallet: str, + cluster_state_controller: ClusterStateController, + simple_file: str, + up_stop_nodes: None, + ): + object_info, object_nodes = object_and_nodes + node_not_object = list(set(self.cluster.cluster_nodes) - set(object_nodes))[0] + + with allure.step("Stop all nodes with object, except 1"): + for cluster_node in object_nodes[1:]: + cluster_state_controller.stop_node_host(node=cluster_node, mode="hard") + + with allure.step("Get object"): + with allure.step(f"Get operation to {node_not_object} where it does not exist, expect success"): + get_object( + wallet=default_wallet, + cid=object_info.cid, + oid=object_info.oid, + shell=self.shell, + endpoint=node_not_object.storage_node.get_rpc_endpoint(), + ) + with allure.step(f"Get operation to {object_nodes[0]} with object, expect success"): + get_object( + wallet=default_wallet, + cid=object_info.cid, + oid=object_info.oid, + shell=self.shell, + endpoint=object_nodes[0].storage_node.get_rpc_endpoint(), + ) + + with allure.step(f"Put operation to node with object, expect error"): + with pytest.raises(RuntimeError): + put_object( + wallet=default_wallet, + path=simple_file, + cid=object_info.cid, + shell=self.shell, + endpoint=node_not_object.storage_node.get_rpc_endpoint(), + ) + + @allure.title("Not enough nodes in the container with policy - 'REP 2 CBF 2 SELECT 4 FROM *'") + def test_not_enough_nodes_in_container_rep_2( + self, + default_wallet: str, + cluster_state_controller: ClusterStateController, + simple_file: str, + up_stop_nodes: None, + ): + with allure.step("Creating a container with a full network map"): + select = len(self.cluster.cluster_nodes) + placement_rule = f"REP {select - 2} IN X CBF 2 SELECT {select} FROM * AS X" + cid_1 = create_container( + default_wallet, + shell=self.shell, + endpoint=self.cluster.default_rpc_endpoint, + rule=placement_rule, + basic_acl=PUBLIC_ACL, + ) + with allure.step("Put object"): + oid = put_object( + wallet=default_wallet, + path=simple_file, + cid=cid_1, + shell=self.shell, + endpoint=self.cluster.default_rpc_endpoint, + ) + with allure.step("Search nodes with object"): + object_nodes = get_object_nodes( + cluster=self.cluster, + wallet=default_wallet, + cid=cid_1, + oid=oid, + shell=self.shell, + endpoint=self.cluster.default_rpc_endpoint, + ) + with allure.step("Turn off random node with object"): + cluster_state_controller.stop_node_host(node=random.choice(object_nodes[1:]), mode="hard") + with allure.step("Checking PUT operation"): + oid_2 = put_object( + wallet=default_wallet, + path=simple_file, + cid=cid_1, + shell=self.shell, + endpoint=self.cluster.default_rpc_endpoint, + ) + with allure.step("Checking GET operation"): + get_file = get_object( + wallet=default_wallet, + cid=cid_1, + oid=oid_2, + shell=self.shell, + endpoint=object_nodes[0].storage_node.get_rpc_endpoint(), + ) + os.remove(get_file) + with allure.step("Checking creating container"): + _ = create_container( + default_wallet, + shell=self.shell, + endpoint=object_nodes[0].storage_node.get_rpc_endpoint(), + rule=placement_rule, + basic_acl=PUBLIC_ACL, + ) diff --git a/pytest_tests/testsuites/failovers/test_failover_storage.py b/pytest_tests/testsuites/failovers/test_failover_storage.py index bff58e9..aa78db1 100644 --- a/pytest_tests/testsuites/failovers/test_failover_storage.py +++ b/pytest_tests/testsuites/failovers/test_failover_storage.py @@ -1,30 +1,15 @@ import logging -import os import random from datetime import datetime from time import sleep import allure import pytest -from frostfs_testlib.cli import FrostfsCli -from frostfs_testlib.cli.netmap_parser import NetmapParser -from frostfs_testlib.resources.cli import FROSTFS_CLI_EXEC -from frostfs_testlib.resources.common import DEFAULT_WALLET_CONFIG, MORPH_BLOCK_TIME +from frostfs_testlib.resources.common import MORPH_BLOCK_TIME from frostfs_testlib.resources.wellknown_acl import PUBLIC_ACL from frostfs_testlib.s3 import S3ClientWrapper, VersioningStatus -from frostfs_testlib.steps.cli.container import ( - StorageContainer, - StorageContainerInfo, - create_container, - search_nodes_with_container, -) -from frostfs_testlib.steps.cli.object import ( - delete_object, - get_object, - put_object, - put_object_to_random_node, - search_object, -) +from frostfs_testlib.steps.cli.container import StorageContainer, StorageContainerInfo, create_container +from frostfs_testlib.steps.cli.object import get_object, put_object_to_random_node from frostfs_testlib.steps.node_management import ( check_node_in_map, check_node_not_in_map, @@ -37,7 +22,7 @@ from frostfs_testlib.steps.s3 import s3_helper from frostfs_testlib.storage.cluster import Cluster, ClusterNode, S3Gate, StorageNode from frostfs_testlib.storage.controllers import ClusterStateController, ShardsWatcher from frostfs_testlib.storage.dataclasses.object_size import ObjectSize -from frostfs_testlib.storage.dataclasses.storage_object_info import ModeNode, StorageObjectInfo +from frostfs_testlib.storage.dataclasses.storage_object_info import StorageObjectInfo from frostfs_testlib.storage.dataclasses.wallet import WalletInfo from frostfs_testlib.testing.cluster_test_base import ClusterTestBase from frostfs_testlib.testing.test_control import expect_not_raises @@ -727,325 +712,3 @@ class TestStorageDataLoss(ClusterTestBase): with allure.step("Delete bucket"): s3_client.delete_bucket(bucket) - - -@pytest.mark.maintenance -class TestMaintenanceMode(ClusterTestBase): - change_node: ClusterNode = None - - @pytest.fixture() - @allure.title("Init Frostfs CLI remote") - def frostfs_cli_remote(self, node_under_test: ClusterNode) -> FrostfsCli: - host = node_under_test.host - service_config = host.get_service_config(node_under_test.storage_node.name) - wallet_path = service_config.attributes["wallet_path"] - wallet_password = service_config.attributes["wallet_password"] - - shell = host.get_shell() - wallet_config_path = f"/tmp/{node_under_test.storage_node.name}-config.yaml" - wallet_config = f'wallet: {wallet_path}\npassword: "{wallet_password}"' - shell.exec(f"echo '{wallet_config}' > {wallet_config_path}") - cli = FrostfsCli(shell=shell, frostfs_cli_exec_path=FROSTFS_CLI_EXEC, config_file=wallet_config_path) - return cli - - @pytest.fixture() - @allure.title("Init Frostfs CLI remote") - def frostfs_cli(self) -> FrostfsCli: - cli = FrostfsCli(shell=self.shell, frostfs_cli_exec_path=FROSTFS_CLI_EXEC, config_file=DEFAULT_WALLET_CONFIG) - return cli - - @pytest.fixture() - def restore_online_mode_node(self, cluster_state_controller: ClusterStateController, default_wallet: str): - yield - cluster_state_controller.set_mode_node(cluster_node=self.change_node, wallet=default_wallet, status="online") - self.tick_epoch(wait_block=2) - - def basic_operations(self, wallet, cid, oid, shell, endpoint, matchs, object_size): - file_path = generate_file(object_size) - default_kw = {"wallet": wallet, "cid": cid, "shell": shell, "endpoint": endpoint} - operations = { - get_object: {"oid": oid}, - search_object: {}, - delete_object: {"oid": oid}, - put_object: {"path": file_path}, - } - for index, operation, kw in enumerate(operations.items()): - with allure.step(f"Run {operation.__name__} object, waiting response - {matchs[index]}"): - default_kw.update(kw) - if operation == search_object and "Found" in matchs[index]: - operation(**default_kw) - continue - with pytest.raises(RuntimeError, match=matchs[index]): - operation(**default_kw) - os.remove(file_path) - - @allure.title("Test of basic node operations in maintenance mode") - def test_maintenance_mode( - self, - default_wallet: str, - simple_object_size: ObjectSize, - cluster_state_controller: ClusterStateController, - restore_online_mode_node: None, - ): - with allure.step("Create container and create\put object"): - cid = create_container( - wallet=default_wallet, - shell=self.shell, - endpoint=self.cluster.default_rpc_endpoint, - rule="REP 1 CBF 1", - ) - node = search_nodes_with_container( - wallet=default_wallet, - cid=cid, - shell=self.shell, - endpoint=self.cluster.default_rpc_endpoint, - cluster=self.cluster, - ) - self.change_node = node[0] - file_path = generate_file(simple_object_size.value) - oid = put_object( - wallet=default_wallet, - path=file_path, - cid=cid, - shell=self.shell, - endpoint=self.change_node.storage_node.get_rpc_endpoint(), - ) - with allure.step("Enable MaintenanceModeAllowed:"): - cluster_state_controller.set_mode_node( - cluster_node=self.change_node, wallet=default_wallet, status="maintenance" - ) - - other_nodes = list(set(self.cluster.cluster_nodes) - set(node)) - node_and_match = { - self.change_node: ["node is under maintenance"] * 4, - other_nodes[0]: [ - "object not found", - "Found 0 objects", - "object not found", - "node is under maintenance", - ], - } - with allure.step("Run basic operations"): - for cluster_node, matchs in node_and_match.items(): - self.basic_operations( - wallet=default_wallet, - cid=cid, - oid=oid, - shell=self.shell, - endpoint=cluster_node.storage_node.get_rpc_endpoint(), - matchs=matchs, - object_size=simple_object_size.value, - ) - - @pytest.mark.sanity - @allure.title("MAINTENANCE and OFFLINE mode transitions") - def test_mode_transitions( - self, - cluster_state_controller: ClusterStateController, - node_under_test: ClusterNode, - default_wallet: str, - frostfs_cli: FrostfsCli, - restore_online_mode_node: None, - ): - self.change_node = node_under_test - cluster_nodes = list(set(self.cluster.cluster_nodes) - {node_under_test}) - - with allure.step("Tick epoch"): - self.tick_epochs(epochs_to_tick=2, alive_node=cluster_nodes[0].storage_node, wait_block=2) - - with allure.step("Set node mode to offline"): - cluster_state_controller.set_mode_node( - cluster_node=node_under_test, - wallet=default_wallet, - status=ModeNode.OFFLINE.value, - ) - - with allure.step("Tick epoch to update the network map"): - self.tick_epochs(epochs_to_tick=2, alive_node=cluster_nodes[0].storage_node, wait_block=2) - - with ((allure.step("Check node mode = offline, after update the network map"))): - netmap = frostfs_cli.netmap.snapshot( - rpc_endpoint=cluster_nodes[0].storage_node.get_rpc_endpoint(), wallet=default_wallet - ).stdout - netmap = NetmapParser.snapshot_all_nodes(netmap) - assert node_under_test.host_ip not in [ - netmap_node.node for netmap_node in netmap - ], f"Node {node_under_test.host_ip} not in state offline" - - with allure.step("Restart storage service"): - cluster_state_controller.stop_storage_service(node_under_test) - cluster_state_controller.start_storage_service(node_under_test) - - with allure.step("Tick epoch after restart storage service and set mode to offline"): - self.tick_epochs(epochs_to_tick=2, alive_node=cluster_nodes[0].storage_node, wait_block=2) - - with (allure.step("Check node mode = online, after restart storage service")): - netmap = frostfs_cli.netmap.snapshot( - rpc_endpoint=cluster_nodes[0].storage_node.get_rpc_endpoint(), wallet=default_wallet - ).stdout - node_state = NetmapParser.snapshot_one_node(netmap, node_under_test).node_status - assert node_state == ModeNode.ONLINE.value.upper(), ( - f"Node actual state - {node_state}, " f"expect - {ModeNode.ONLINE.value}" - ) - - with allure.step("Tick epoch"): - self.tick_epochs(epochs_to_tick=2, alive_node=cluster_nodes[0].storage_node, wait_block=2) - - with allure.step("Set node mode to maintenance"): - cluster_state_controller.set_mode_node( - cluster_node=node_under_test, wallet=default_wallet, status=ModeNode.MAINTENANCE.value - ) - with allure.step("Restart storage service"): - cluster_state_controller.stop_storage_service(node_under_test) - cluster_state_controller.start_storage_service(node_under_test) - - with allure.step("Tick epoch after restart storage service"): - self.tick_epochs(epochs_to_tick=2, alive_node=cluster_nodes[0].storage_node, wait_block=2) - - with allure.step("Check node mode = maintenance, after restart storage service and tick epoch"): - netmap = frostfs_cli.netmap.snapshot( - rpc_endpoint=cluster_nodes[0].storage_node.get_rpc_endpoint(), wallet=default_wallet - ).stdout - node_state = NetmapParser.snapshot_one_node(netmap, node_under_test).node_status - assert node_state == ModeNode.MAINTENANCE.value.upper(), ( - f"Node actual state - {node_state}, " f"expect - {ModeNode.MAINTENANCE.value}" - ) - - with allure.step("Tick epoch"): - self.tick_epochs(epochs_to_tick=2, alive_node=cluster_nodes[0].storage_node, wait_block=2) - - with allure.step("Set node mode to offline"): - netmap = frostfs_cli.netmap.snapshot( - rpc_endpoint=cluster_nodes[0].storage_node.get_rpc_endpoint(), wallet=default_wallet - ).stdout - node_state = NetmapParser.snapshot_one_node(netmap, node_under_test).node_status - assert node_state == ModeNode.OFFLINE.value.upper(), ( - f"Node actual state - {node_state}, " f"expect - {ModeNode.OFFLINE.value}" - ) - - with allure.step("Stop storage service"): - cluster_state_controller.stop_storage_service(node_under_test) - - with allure.step("Tick epoch"): - self.tick_epochs(epochs_to_tick=2, alive_node=cluster_nodes[0].storage_node, wait_block=2) - - with allure.step("Start storage service"): - cluster_state_controller.start_storage_service(node_under_test) - - with allure.step("Check node mode = offline, after tick epoch and start storage service"): - netmap = frostfs_cli.netmap.snapshot( - rpc_endpoint=cluster_nodes[0].storage_node.get_rpc_endpoint(), wallet=default_wallet - ).stdout - node_state = NetmapParser.snapshot_one_node(netmap, node_under_test).node_status - assert node_state == ModeNode.OFFLINE.value.upper(), ( - f"Node actual state - {node_state}, " f"expect - {ModeNode.OFFLINE.value}" - ) - - with allure.step("Tick epoch"): - self.tick_epochs(epochs_to_tick=2, alive_node=cluster_nodes[0].storage_node, wait_block=2) - - with allure.step("Check node mode = online, after start storage service and tick epoch"): - netmap = frostfs_cli.netmap.snapshot( - rpc_endpoint=cluster_nodes[0].storage_node.get_rpc_endpoint(), wallet=default_wallet - ).stdout - node_state = NetmapParser.snapshot_one_node(netmap, node_under_test).node_status - assert node_state == ModeNode.ONLINE.value.upper(), ( - f"Node actual state - {node_state}, " f"expect - {ModeNode.ONLINE.value}" - ) - - with allure.step("Tick epoch"): - self.tick_epochs(epochs_to_tick=2, alive_node=cluster_nodes[0].storage_node, wait_block=2) - - with allure.step("Set node mode to maintenance"): - cluster_state_controller.set_mode_node( - cluster_node=node_under_test, wallet=default_wallet, status=ModeNode.MAINTENANCE.value - ) - - with allure.step("Stop storage service"): - cluster_state_controller.stop_storage_service(node_under_test) - - with allure.step("Tick epoch"): - self.tick_epochs(epochs_to_tick=2, alive_node=cluster_nodes[0].storage_node, wait_block=2) - - with allure.step("Start storage service"): - cluster_state_controller.start_storage_service(node_under_test) - - with allure.step("Check node mode = maintenance"): - netmap = frostfs_cli.netmap.snapshot( - rpc_endpoint=cluster_nodes[0].storage_node.get_rpc_endpoint(), wallet=default_wallet - ).stdout - node_state = NetmapParser.snapshot_one_node(netmap, node_under_test).node_status - - assert ( - node_state == ModeNode.MAINTENANCE.value.upper() - ), f"Node actual state - {node_state}, expect - {ModeNode.MAINTENANCE.value}" - - with allure.step("Tick epoch"): - self.tick_epochs(epochs_to_tick=2, alive_node=cluster_nodes[0].storage_node, wait_block=2) - - with allure.step("Check node mode = maintenance"): - netmap = frostfs_cli.netmap.snapshot( - rpc_endpoint=cluster_nodes[0].storage_node.get_rpc_endpoint(), wallet=default_wallet - ).stdout - node_state = NetmapParser.snapshot_one_node(netmap, node_under_test).node_status - assert node_state == ModeNode.MAINTENANCE.value.upper(), ( - f"Node actual state - {node_state}, " f"expect - {ModeNode.MAINTENANCE.value}" - ) - - @allure.title("A node cannot go into maintenance if maintenance is prohibited globally in the network") - def test_maintenance_globally_forbidden( - self, - cluster_state_controller: ClusterStateController, - node_under_test: ClusterNode, - frostfs_cli_remote: FrostfsCli, - frostfs_cli: FrostfsCli, - default_wallet: str, - restore_online_mode_node: None, - ): - self.change_node = node_under_test - control_endpoint = node_under_test.service(StorageNode).get_control_endpoint() - - with allure.step("Set MaintenanceModeAllowed = false"): - cluster_state_controller.set_maintenance_mode_allowed("false", node_under_test) - - with allure.step("Set status node - maintenance"): - with pytest.raises(RuntimeError, match="maintenance mode is not allowed by the network"): - frostfs_cli_remote.control.set_status(endpoint=control_endpoint, status="maintenance") - - with allure.step("Set MaintenanceModeAllowed = true"): - cluster_state_controller.set_maintenance_mode_allowed("true", node_under_test) - - with allure.step("Set status node - maintenance"): - output = frostfs_cli_remote.control.set_status(endpoint=control_endpoint, status="maintenance") - assert "update request successfully sent" in output.stdout, f"Response = {output}" - - with allure.step("Tick epoch"): - self.tick_epoch(wait_block=2) - - with allure.step("Check state node = maintenance "): - netmap_node = NetmapParser.snapshot_one_node( - frostfs_cli.netmap.snapshot( - rpc_endpoint=node_under_test.storage_node.get_rpc_endpoint(), wallet=default_wallet - ).stdout, - node_under_test, - ) - assert ( - netmap_node.node_status == ModeNode.MAINTENANCE.value.upper() - ), f"Node actual state - {netmap_node.node_status}, expect - {ModeNode.MAINTENANCE.value}" - - with allure.step("Set status node - online "): - frostfs_cli_remote.control.set_status(endpoint=control_endpoint, status="online") - - with allure.step("Tick epoch"): - self.tick_epoch() - - with allure.step("Check state node: online"): - netmap_node = NetmapParser.snapshot_one_node( - frostfs_cli.netmap.snapshot( - rpc_endpoint=node_under_test.storage_node.get_rpc_endpoint(), wallet=default_wallet - ).stdout, - node_under_test, - ) - assert ( - netmap_node.node_status == ModeNode.ONLINE.value.upper() - ), f"Node actual state - {netmap_node.node_status}, expect - {ModeNode.ONLINE.value}" diff --git a/pytest_tests/testsuites/management/test_node_management.py b/pytest_tests/testsuites/management/test_node_management.py new file mode 100644 index 0000000..b199c4a --- /dev/null +++ b/pytest_tests/testsuites/management/test_node_management.py @@ -0,0 +1,651 @@ +import logging +import os +import random +from time import sleep +from typing import Optional, Tuple + +import allure +import pytest +from frostfs_testlib.cli import FrostfsCli +from frostfs_testlib.cli.netmap_parser import NetmapParser +from frostfs_testlib.resources.cli import FROSTFS_CLI_EXEC +from frostfs_testlib.resources.common import DEFAULT_WALLET_CONFIG, MORPH_BLOCK_TIME +from frostfs_testlib.resources.error_patterns import OBJECT_NOT_FOUND +from frostfs_testlib.resources.wellknown_acl import PUBLIC_ACL +from frostfs_testlib.steps.cli.container import create_container, search_nodes_with_container +from frostfs_testlib.steps.cli.object import ( + delete_object, + get_object, + get_object_from_random_node, + head_object, + put_object, + put_object_to_random_node, + search_object, +) +from frostfs_testlib.steps.node_management import ( + check_node_in_map, + delete_node_data, + drop_object, + exclude_node_from_network_map, + get_locode_from_random_node, + include_node_to_network_map, + node_shard_list, + node_shard_set_mode, + storage_node_set_status, + wait_for_node_to_be_ready, +) +from frostfs_testlib.steps.storage_policy import get_nodes_with_object +from frostfs_testlib.storage.cluster import ClusterNode, StorageNode +from frostfs_testlib.storage.controllers import ClusterStateController +from frostfs_testlib.storage.dataclasses.object_size import ObjectSize +from frostfs_testlib.storage.dataclasses.storage_object_info import ModeNode +from frostfs_testlib.testing.cluster_test_base import ClusterTestBase +from frostfs_testlib.utils import datetime_utils, string_utils +from frostfs_testlib.utils.failover_utils import wait_object_replication +from frostfs_testlib.utils.file_utils import generate_file + +from pytest_tests.helpers.utility import wait_for_gc_pass_on_storage_nodes + +logger = logging.getLogger("NeoLogger") +check_nodes: list[StorageNode] = [] + + +@allure.title("Add one node to cluster") +@pytest.mark.add_nodes +@pytest.mark.node_mgmt +class TestNodeManagement(ClusterTestBase): + @pytest.fixture + @allure.title("Create container and pick the node with data") + def create_container_and_pick_node( + self, default_wallet: str, simple_object_size: ObjectSize + ) -> Tuple[str, StorageNode]: + file_path = generate_file(simple_object_size.value) + placement_rule = "REP 1 IN X CBF 1 SELECT 1 FROM * AS X" + endpoint = self.cluster.default_rpc_endpoint + + cid = create_container( + default_wallet, + shell=self.shell, + endpoint=endpoint, + rule=placement_rule, + basic_acl=PUBLIC_ACL, + ) + oid = put_object_to_random_node(default_wallet, file_path, cid, self.shell, self.cluster) + + nodes = get_nodes_with_object(cid, oid, shell=self.shell, nodes=self.cluster.storage_nodes) + assert len(nodes) == 1 + node = nodes[0] + + yield cid, node + + shards = node_shard_list(node) + assert shards + + for shard in shards: + node_shard_set_mode(node, shard, "read-write") + + node_shard_list(node) + + @allure.step("Tick epoch with retries") + def tick_epoch_with_retries(self, attempts: int = 3, timeout: int = 3, wait_block: int = None): + for attempt in range(attempts): + try: + self.tick_epoch(wait_block=wait_block) + except RuntimeError: + sleep(timeout) + if attempt >= attempts - 1: + raise + continue + return + + @pytest.fixture + def after_run_start_all_nodes(self): + yield + self.return_nodes() + + @pytest.fixture + def return_nodes_after_test_run(self): + yield + self.return_nodes() + + @allure.step("Return node to cluster") + def return_nodes(self, alive_node: Optional[StorageNode] = None) -> None: + for node in list(check_nodes): + with allure.step(f"Start node {node}"): + node.start_service() + with allure.step(f"Waiting status ready for node {node}"): + wait_for_node_to_be_ready(node) + + # We need to wait for node to establish notifications from morph-chain + # Otherwise it will hang up when we will try to set status + sleep(datetime_utils.parse_time(MORPH_BLOCK_TIME)) + + with allure.step(f"Move node {node} to online state"): + storage_node_set_status(node, status="online", retries=2) + + check_nodes.remove(node) + sleep(datetime_utils.parse_time(MORPH_BLOCK_TIME)) + self.tick_epoch_with_retries(3, wait_block=2) + check_node_in_map(node, shell=self.shell, alive_node=alive_node) + + @allure.title("Add one node to cluster") + @pytest.mark.add_nodes + def test_add_nodes( + self, + default_wallet: str, + simple_object_size: ObjectSize, + return_nodes_after_test_run, + ): + """ + This test remove one node from frostfs_testlib.storage.cluster then add it back. Test uses base control operations with storage nodes (healthcheck, netmap-snapshot, set-status). + """ + wallet = default_wallet + placement_rule_3 = "REP 3 IN X CBF 1 SELECT 3 FROM * AS X" + placement_rule_4 = "REP 4 IN X CBF 1 SELECT 4 FROM * AS X" + source_file_path = generate_file(simple_object_size.value) + + storage_nodes = self.cluster.storage_nodes + random_node = random.choice(storage_nodes[1:]) + alive_node = random.choice( + [storage_node for storage_node in storage_nodes if storage_node.id != random_node.id] + ) + + check_node_in_map(random_node, shell=self.shell, alive_node=alive_node) + + # Add node to recovery list before messing with it + check_nodes.append(random_node) + exclude_node_from_network_map(random_node, alive_node, shell=self.shell, cluster=self.cluster) + delete_node_data(random_node) + + cid = create_container( + wallet, + rule=placement_rule_3, + basic_acl=PUBLIC_ACL, + shell=self.shell, + endpoint=alive_node.get_rpc_endpoint(), + ) + oid = put_object( + wallet, + source_file_path, + cid, + shell=self.shell, + endpoint=alive_node.get_rpc_endpoint(), + ) + wait_object_replication(cid, oid, 3, shell=self.shell, nodes=storage_nodes) + + self.return_nodes(alive_node) + + with allure.step("Check data could be replicated to new node"): + random_node = random.choice(list(set(storage_nodes) - {random_node, alive_node})) + # Add node to recovery list before messing with it + check_nodes.append(random_node) + exclude_node_from_network_map(random_node, alive_node, shell=self.shell, cluster=self.cluster) + + wait_object_replication( + cid, + oid, + 3, + shell=self.shell, + nodes=list(set(storage_nodes) - {random_node}), + ) + include_node_to_network_map(random_node, alive_node, shell=self.shell, cluster=self.cluster) + wait_object_replication(cid, oid, 3, shell=self.shell, nodes=storage_nodes) + + with allure.step("Check container could be created with new node"): + cid = create_container( + wallet, + rule=placement_rule_4, + basic_acl=PUBLIC_ACL, + shell=self.shell, + endpoint=alive_node.get_rpc_endpoint(), + ) + oid = put_object( + wallet, + source_file_path, + cid, + shell=self.shell, + endpoint=alive_node.get_rpc_endpoint(), + ) + wait_object_replication(cid, oid, 4, shell=self.shell, nodes=storage_nodes) + + @pytest.mark.node_mgmt + @allure.title("Drop object using control command") + def test_drop_object(self, default_wallet, complex_object_size: ObjectSize, simple_object_size: ObjectSize): + """ + Test checks object could be dropped using `frostfs-cli control drop-objects` command. + """ + wallet = default_wallet + endpoint = self.cluster.default_rpc_endpoint + file_path_simple = generate_file(simple_object_size.value) + file_path_complex = generate_file(complex_object_size.value) + + locode = get_locode_from_random_node(self.cluster) + rule = f"REP 1 IN SE CBF 1 SELECT 1 FROM LOC AS SE FILTER 'UN-LOCODE' EQ '{locode}' AS LOC" + cid = create_container(wallet, rule=rule, shell=self.shell, endpoint=endpoint) + oid_simple = put_object_to_random_node(wallet, file_path_simple, cid, shell=self.shell, cluster=self.cluster) + oid_complex = put_object_to_random_node(wallet, file_path_complex, cid, shell=self.shell, cluster=self.cluster) + + for oid in (oid_simple, oid_complex): + get_object_from_random_node(wallet, cid, oid, shell=self.shell, cluster=self.cluster) + head_object(wallet, cid, oid, shell=self.shell, endpoint=endpoint) + + nodes_with_object = get_nodes_with_object(cid, oid_simple, shell=self.shell, nodes=self.cluster.storage_nodes) + random_node = random.choice(nodes_with_object) + + for oid in (oid_simple, oid_complex): + with allure.step(f"Drop object {oid}"): + get_object_from_random_node(wallet, cid, oid, shell=self.shell, cluster=self.cluster) + head_object(wallet, cid, oid, shell=self.shell, endpoint=endpoint) + drop_object(random_node, cid, oid) + self.wait_for_obj_dropped(wallet, cid, oid, endpoint, get_object) + self.wait_for_obj_dropped(wallet, cid, oid, endpoint, head_object) + + @pytest.mark.node_mgmt + @pytest.mark.skip(reason="Need to clarify scenario") + @allure.title("Control Operations with storage nodes") + def test_shards( + self, + default_wallet, + create_container_and_pick_node, + simple_object_size: ObjectSize, + ): + wallet = default_wallet + file_path = generate_file(simple_object_size.value) + + cid, node = create_container_and_pick_node + original_oid = put_object_to_random_node(wallet, file_path, cid, self.shell, self.cluster) + + # for mode in ('read-only', 'degraded'): + for mode in ("degraded",): + shards = node_shard_list(node) + assert shards + + for shard in shards: + node_shard_set_mode(node, shard, mode) + + shards = node_shard_list(node) + assert shards + + with pytest.raises(RuntimeError): + put_object_to_random_node(wallet, file_path, cid, self.shell, self.cluster) + + with pytest.raises(RuntimeError): + delete_object(wallet, cid, original_oid, self.shell, self.cluster.default_rpc_endpoint) + + get_object_from_random_node(wallet, cid, original_oid, self.shell, self.cluster) + + for shard in shards: + node_shard_set_mode(node, shard, "read-write") + + shards = node_shard_list(node) + assert shards + + oid = put_object_to_random_node(wallet, file_path, cid, self.shell, self.cluster) + delete_object(wallet, cid, oid, self.shell, self.cluster.default_rpc_endpoint) + + @pytest.mark.node_mgmt + @allure.title("Put object with stopped node") + def test_stop_node(self, default_wallet, return_nodes_after_test_run, simple_object_size: ObjectSize): + wallet = default_wallet + placement_rule = "REP 3 IN X SELECT 4 FROM * AS X" + source_file_path = generate_file(simple_object_size.value) + storage_nodes = self.cluster.storage_nodes + random_node = random.choice(storage_nodes[1:]) + alive_node = random.choice( + [storage_node for storage_node in storage_nodes if storage_node.id != random_node.id] + ) + + cid = create_container( + wallet, + rule=placement_rule, + basic_acl=PUBLIC_ACL, + shell=self.shell, + endpoint=random_node.get_rpc_endpoint(), + ) + with allure.step("Stop the random node"): + check_nodes.append(random_node) + random_node.stop_service() + with allure.step("Try to put an object and expect success"): + put_object( + wallet, + source_file_path, + cid, + shell=self.shell, + endpoint=alive_node.get_rpc_endpoint(), + ) + self.return_nodes(alive_node) + + @allure.step("Wait for object to be dropped") + def wait_for_obj_dropped(self, wallet: str, cid: str, oid: str, endpoint: str, checker) -> None: + for _ in range(3): + try: + checker(wallet, cid, oid, shell=self.shell, endpoint=endpoint) + wait_for_gc_pass_on_storage_nodes() + except Exception as err: + if string_utils.is_str_match_pattern(err, OBJECT_NOT_FOUND): + return + raise AssertionError(f'Expected "{OBJECT_NOT_FOUND}" error, got\n{err}') + + raise AssertionError(f"Object {oid} was not dropped from node") + + +@pytest.mark.maintenance +class TestMaintenanceMode(ClusterTestBase): + change_node: ClusterNode = None + + @pytest.fixture() + @allure.title("Init Frostfs CLI remote") + def frostfs_cli_remote(self, node_under_test: ClusterNode) -> FrostfsCli: + host = node_under_test.host + service_config = host.get_service_config(node_under_test.storage_node.name) + wallet_path = service_config.attributes["wallet_path"] + wallet_password = service_config.attributes["wallet_password"] + + shell = host.get_shell() + wallet_config_path = f"/tmp/{node_under_test.storage_node.name}-config.yaml" + wallet_config = f'wallet: {wallet_path}\npassword: "{wallet_password}"' + shell.exec(f"echo '{wallet_config}' > {wallet_config_path}") + cli = FrostfsCli(shell=shell, frostfs_cli_exec_path=FROSTFS_CLI_EXEC, config_file=wallet_config_path) + return cli + + @pytest.fixture() + @allure.title("Init Frostfs CLI remote") + def frostfs_cli(self) -> FrostfsCli: + cli = FrostfsCli(shell=self.shell, frostfs_cli_exec_path=FROSTFS_CLI_EXEC, config_file=DEFAULT_WALLET_CONFIG) + return cli + + @pytest.fixture() + def restore_online_mode_node(self, cluster_state_controller: ClusterStateController, default_wallet: str): + yield + cluster_state_controller.set_mode_node(cluster_node=self.change_node, wallet=default_wallet, status="online") + self.tick_epoch(wait_block=2) + + def basic_operations(self, wallet, cid, oid, shell, endpoint, matchs, object_size): + file_path = generate_file(object_size) + default_kw = {"wallet": wallet, "cid": cid, "shell": shell, "endpoint": endpoint} + operations = { + get_object: {"oid": oid}, + search_object: {}, + delete_object: {"oid": oid}, + put_object: {"path": file_path}, + } + for index, operation, kw in enumerate(operations.items()): + with allure.step(f"Run {operation.__name__} object, waiting response - {matchs[index]}"): + default_kw.update(kw) + if operation == search_object and "Found" in matchs[index]: + operation(**default_kw) + continue + with pytest.raises(RuntimeError, match=matchs[index]): + operation(**default_kw) + os.remove(file_path) + + @allure.title("Test of basic node operations in maintenance mode") + def test_maintenance_mode( + self, + default_wallet: str, + simple_object_size: ObjectSize, + cluster_state_controller: ClusterStateController, + restore_online_mode_node: None, + ): + with allure.step("Create container and create\put object"): + cid = create_container( + wallet=default_wallet, + shell=self.shell, + endpoint=self.cluster.default_rpc_endpoint, + rule="REP 1 CBF 1", + ) + node = search_nodes_with_container( + wallet=default_wallet, + cid=cid, + shell=self.shell, + endpoint=self.cluster.default_rpc_endpoint, + cluster=self.cluster, + ) + self.change_node = node[0] + file_path = generate_file(simple_object_size.value) + oid = put_object( + wallet=default_wallet, + path=file_path, + cid=cid, + shell=self.shell, + endpoint=self.change_node.storage_node.get_rpc_endpoint(), + ) + with allure.step("Enable MaintenanceModeAllowed:"): + cluster_state_controller.set_mode_node( + cluster_node=self.change_node, wallet=default_wallet, status="maintenance" + ) + + other_nodes = list(set(self.cluster.cluster_nodes) - set(node)) + node_and_match = { + self.change_node: ["node is under maintenance"] * 4, + other_nodes[0]: [ + "object not found", + "Found 0 objects", + "object not found", + "node is under maintenance", + ], + } + with allure.step("Run basic operations"): + for cluster_node, matchs in node_and_match.items(): + self.basic_operations( + wallet=default_wallet, + cid=cid, + oid=oid, + shell=self.shell, + endpoint=cluster_node.storage_node.get_rpc_endpoint(), + matchs=matchs, + object_size=simple_object_size.value, + ) + + @pytest.mark.sanity + @allure.title("MAINTENANCE and OFFLINE mode transitions") + def test_mode_transitions( + self, + cluster_state_controller: ClusterStateController, + node_under_test: ClusterNode, + default_wallet: str, + frostfs_cli: FrostfsCli, + restore_online_mode_node: None, + ): + self.change_node = node_under_test + cluster_nodes = list(set(self.cluster.cluster_nodes) - {node_under_test}) + + with allure.step("Tick epoch"): + self.tick_epochs(epochs_to_tick=2, alive_node=cluster_nodes[0].storage_node, wait_block=2) + + with allure.step("Set node mode to offline"): + cluster_state_controller.set_mode_node( + cluster_node=node_under_test, + wallet=default_wallet, + status=ModeNode.OFFLINE.value, + ) + + with allure.step("Tick epoch to update the network map"): + self.tick_epochs(epochs_to_tick=2, alive_node=cluster_nodes[0].storage_node, wait_block=2) + + with ((allure.step("Check node mode = offline, after update the network map"))): + netmap = frostfs_cli.netmap.snapshot( + rpc_endpoint=cluster_nodes[0].storage_node.get_rpc_endpoint(), wallet=default_wallet + ).stdout + netmap = NetmapParser.snapshot_all_nodes(netmap) + assert node_under_test.host_ip not in [ + netmap_node.node for netmap_node in netmap + ], f"Node {node_under_test.host_ip} not in state offline" + + with allure.step("Restart storage service"): + cluster_state_controller.stop_storage_service(node_under_test) + cluster_state_controller.start_storage_service(node_under_test) + + with allure.step("Tick epoch after restart storage service and set mode to offline"): + self.tick_epochs(epochs_to_tick=2, alive_node=cluster_nodes[0].storage_node, wait_block=2) + + with (allure.step("Check node mode = online, after restart storage service")): + netmap = frostfs_cli.netmap.snapshot( + rpc_endpoint=cluster_nodes[0].storage_node.get_rpc_endpoint(), wallet=default_wallet + ).stdout + node_state = NetmapParser.snapshot_one_node(netmap, node_under_test).node_status + assert node_state == ModeNode.ONLINE.value.upper(), ( + f"Node actual state - {node_state}, " f"expect - {ModeNode.ONLINE.value}" + ) + + with allure.step("Tick epoch"): + self.tick_epochs(epochs_to_tick=2, alive_node=cluster_nodes[0].storage_node, wait_block=2) + + with allure.step("Set node mode to maintenance"): + cluster_state_controller.set_mode_node( + cluster_node=node_under_test, wallet=default_wallet, status=ModeNode.MAINTENANCE.value + ) + with allure.step("Restart storage service"): + cluster_state_controller.stop_storage_service(node_under_test) + cluster_state_controller.start_storage_service(node_under_test) + + with allure.step("Tick epoch after restart storage service"): + self.tick_epochs(epochs_to_tick=2, alive_node=cluster_nodes[0].storage_node, wait_block=2) + + with allure.step("Check node mode = maintenance, after restart storage service and tick epoch"): + netmap = frostfs_cli.netmap.snapshot( + rpc_endpoint=cluster_nodes[0].storage_node.get_rpc_endpoint(), wallet=default_wallet + ).stdout + node_state = NetmapParser.snapshot_one_node(netmap, node_under_test).node_status + assert node_state == ModeNode.MAINTENANCE.value.upper(), ( + f"Node actual state - {node_state}, " f"expect - {ModeNode.MAINTENANCE.value}" + ) + + with allure.step("Tick epoch"): + self.tick_epochs(epochs_to_tick=2, alive_node=cluster_nodes[0].storage_node, wait_block=2) + + with allure.step("Set node mode to offline"): + netmap = frostfs_cli.netmap.snapshot( + rpc_endpoint=cluster_nodes[0].storage_node.get_rpc_endpoint(), wallet=default_wallet + ).stdout + node_state = NetmapParser.snapshot_one_node(netmap, node_under_test).node_status + assert node_state == ModeNode.OFFLINE.value.upper(), ( + f"Node actual state - {node_state}, " f"expect - {ModeNode.OFFLINE.value}" + ) + + with allure.step("Stop storage service"): + cluster_state_controller.stop_storage_service(node_under_test) + + with allure.step("Tick epoch"): + self.tick_epochs(epochs_to_tick=2, alive_node=cluster_nodes[0].storage_node, wait_block=2) + + with allure.step("Start storage service"): + cluster_state_controller.start_storage_service(node_under_test) + + with allure.step("Check node mode = offline, after tick epoch and start storage service"): + netmap = frostfs_cli.netmap.snapshot( + rpc_endpoint=cluster_nodes[0].storage_node.get_rpc_endpoint(), wallet=default_wallet + ).stdout + node_state = NetmapParser.snapshot_one_node(netmap, node_under_test).node_status + assert node_state == ModeNode.OFFLINE.value.upper(), ( + f"Node actual state - {node_state}, " f"expect - {ModeNode.OFFLINE.value}" + ) + + with allure.step("Tick epoch"): + self.tick_epochs(epochs_to_tick=2, alive_node=cluster_nodes[0].storage_node, wait_block=2) + + with allure.step("Check node mode = online, after start storage service and tick epoch"): + netmap = frostfs_cli.netmap.snapshot( + rpc_endpoint=cluster_nodes[0].storage_node.get_rpc_endpoint(), wallet=default_wallet + ).stdout + node_state = NetmapParser.snapshot_one_node(netmap, node_under_test).node_status + assert node_state == ModeNode.ONLINE.value.upper(), ( + f"Node actual state - {node_state}, " f"expect - {ModeNode.ONLINE.value}" + ) + + with allure.step("Tick epoch"): + self.tick_epochs(epochs_to_tick=2, alive_node=cluster_nodes[0].storage_node, wait_block=2) + + with allure.step("Set node mode to maintenance"): + cluster_state_controller.set_mode_node( + cluster_node=node_under_test, wallet=default_wallet, status=ModeNode.MAINTENANCE.value + ) + + with allure.step("Stop storage service"): + cluster_state_controller.stop_storage_service(node_under_test) + + with allure.step("Tick epoch"): + self.tick_epochs(epochs_to_tick=2, alive_node=cluster_nodes[0].storage_node, wait_block=2) + + with allure.step("Start storage service"): + cluster_state_controller.start_storage_service(node_under_test) + + with allure.step("Check node mode = maintenance"): + netmap = frostfs_cli.netmap.snapshot( + rpc_endpoint=cluster_nodes[0].storage_node.get_rpc_endpoint(), wallet=default_wallet + ).stdout + node_state = NetmapParser.snapshot_one_node(netmap, node_under_test).node_status + + assert ( + node_state == ModeNode.MAINTENANCE.value.upper() + ), f"Node actual state - {node_state}, expect - {ModeNode.MAINTENANCE.value}" + + with allure.step("Tick epoch"): + self.tick_epochs(epochs_to_tick=2, alive_node=cluster_nodes[0].storage_node, wait_block=2) + + with allure.step("Check node mode = maintenance"): + netmap = frostfs_cli.netmap.snapshot( + rpc_endpoint=cluster_nodes[0].storage_node.get_rpc_endpoint(), wallet=default_wallet + ).stdout + node_state = NetmapParser.snapshot_one_node(netmap, node_under_test).node_status + assert node_state == ModeNode.MAINTENANCE.value.upper(), ( + f"Node actual state - {node_state}, " f"expect - {ModeNode.MAINTENANCE.value}" + ) + + @allure.title("A node cannot go into maintenance if maintenance is prohibited globally in the network") + def test_maintenance_globally_forbidden( + self, + cluster_state_controller: ClusterStateController, + node_under_test: ClusterNode, + frostfs_cli_remote: FrostfsCli, + frostfs_cli: FrostfsCli, + default_wallet: str, + restore_online_mode_node: None, + ): + self.change_node = node_under_test + control_endpoint = node_under_test.service(StorageNode).get_control_endpoint() + + with allure.step("Set MaintenanceModeAllowed = false"): + cluster_state_controller.set_maintenance_mode_allowed("false", node_under_test) + + with allure.step("Set status node - maintenance"): + with pytest.raises(RuntimeError, match="maintenance mode is not allowed by the network"): + frostfs_cli_remote.control.set_status(endpoint=control_endpoint, status="maintenance") + + with allure.step("Set MaintenanceModeAllowed = true"): + cluster_state_controller.set_maintenance_mode_allowed("true", node_under_test) + + with allure.step("Set status node - maintenance"): + output = frostfs_cli_remote.control.set_status(endpoint=control_endpoint, status="maintenance") + assert "update request successfully sent" in output.stdout, f"Response = {output}" + + with allure.step("Tick epoch"): + self.tick_epoch(wait_block=2) + + with allure.step("Check state node = maintenance "): + netmap_node = NetmapParser.snapshot_one_node( + frostfs_cli.netmap.snapshot( + rpc_endpoint=node_under_test.storage_node.get_rpc_endpoint(), wallet=default_wallet + ).stdout, + node_under_test, + ) + assert ( + netmap_node.node_status == ModeNode.MAINTENANCE.value.upper() + ), f"Node actual state - {netmap_node.node_status}, expect - {ModeNode.MAINTENANCE.value}" + + with allure.step("Set status node - online "): + frostfs_cli_remote.control.set_status(endpoint=control_endpoint, status="online") + + with allure.step("Tick epoch"): + self.tick_epoch() + + with allure.step("Check state node: online"): + netmap_node = NetmapParser.snapshot_one_node( + frostfs_cli.netmap.snapshot( + rpc_endpoint=node_under_test.storage_node.get_rpc_endpoint(), wallet=default_wallet + ).stdout, + node_under_test, + ) + assert ( + netmap_node.node_status == ModeNode.ONLINE.value.upper() + ), f"Node actual state - {netmap_node.node_status}, expect - {ModeNode.ONLINE.value}" diff --git a/pytest_tests/testsuites/network/test_node_management.py b/pytest_tests/testsuites/network/test_node_management.py deleted file mode 100644 index 2dfa68b..0000000 --- a/pytest_tests/testsuites/network/test_node_management.py +++ /dev/null @@ -1,322 +0,0 @@ -import logging -import random -from time import sleep -from typing import Optional, Tuple - -import allure -import pytest -from frostfs_testlib.resources.common import MORPH_BLOCK_TIME -from frostfs_testlib.resources.error_patterns import OBJECT_NOT_FOUND -from frostfs_testlib.resources.wellknown_acl import PUBLIC_ACL -from frostfs_testlib.steps.cli.container import create_container -from frostfs_testlib.steps.cli.object import ( - delete_object, - get_object, - get_object_from_random_node, - head_object, - put_object, - put_object_to_random_node, -) -from frostfs_testlib.steps.node_management import ( - check_node_in_map, - delete_node_data, - drop_object, - exclude_node_from_network_map, - get_locode_from_random_node, - include_node_to_network_map, - node_shard_list, - node_shard_set_mode, - storage_node_set_status, - wait_for_node_to_be_ready, -) -from frostfs_testlib.steps.storage_policy import get_nodes_with_object -from frostfs_testlib.storage.cluster import StorageNode -from frostfs_testlib.storage.dataclasses.object_size import ObjectSize -from frostfs_testlib.testing.cluster_test_base import ClusterTestBase -from frostfs_testlib.utils import datetime_utils, string_utils -from frostfs_testlib.utils.failover_utils import wait_object_replication -from frostfs_testlib.utils.file_utils import generate_file - -from pytest_tests.helpers.utility import wait_for_gc_pass_on_storage_nodes - -logger = logging.getLogger("NeoLogger") -check_nodes: list[StorageNode] = [] - - -@allure.title("Add one node to cluster") -@pytest.mark.add_nodes -@pytest.mark.node_mgmt -class TestNodeManagement(ClusterTestBase): - @pytest.fixture - @allure.title("Create container and pick the node with data") - def create_container_and_pick_node( - self, default_wallet: str, simple_object_size: ObjectSize - ) -> Tuple[str, StorageNode]: - file_path = generate_file(simple_object_size.value) - placement_rule = "REP 1 IN X CBF 1 SELECT 1 FROM * AS X" - endpoint = self.cluster.default_rpc_endpoint - - cid = create_container( - default_wallet, - shell=self.shell, - endpoint=endpoint, - rule=placement_rule, - basic_acl=PUBLIC_ACL, - ) - oid = put_object_to_random_node(default_wallet, file_path, cid, self.shell, self.cluster) - - nodes = get_nodes_with_object(cid, oid, shell=self.shell, nodes=self.cluster.storage_nodes) - assert len(nodes) == 1 - node = nodes[0] - - yield cid, node - - shards = node_shard_list(node) - assert shards - - for shard in shards: - node_shard_set_mode(node, shard, "read-write") - - node_shard_list(node) - - @allure.step("Tick epoch with retries") - def tick_epoch_with_retries(self, attempts: int = 3, timeout: int = 3, wait_block: int = None): - for attempt in range(attempts): - try: - self.tick_epoch(wait_block=wait_block) - except RuntimeError: - sleep(timeout) - if attempt >= attempts - 1: - raise - continue - return - - @pytest.fixture - def after_run_start_all_nodes(self): - yield - self.return_nodes() - - @pytest.fixture - def return_nodes_after_test_run(self): - yield - self.return_nodes() - - @allure.step("Return node to cluster") - def return_nodes(self, alive_node: Optional[StorageNode] = None) -> None: - for node in list(check_nodes): - with allure.step(f"Start node {node}"): - node.start_service() - with allure.step(f"Waiting status ready for node {node}"): - wait_for_node_to_be_ready(node) - - # We need to wait for node to establish notifications from morph-chain - # Otherwise it will hang up when we will try to set status - sleep(datetime_utils.parse_time(MORPH_BLOCK_TIME)) - - with allure.step(f"Move node {node} to online state"): - storage_node_set_status(node, status="online", retries=2) - - check_nodes.remove(node) - sleep(datetime_utils.parse_time(MORPH_BLOCK_TIME)) - self.tick_epoch_with_retries(3, wait_block=2) - check_node_in_map(node, shell=self.shell, alive_node=alive_node) - - @allure.title("Add one node to cluster") - @pytest.mark.add_nodes - def test_add_nodes( - self, - default_wallet: str, - simple_object_size: ObjectSize, - return_nodes_after_test_run, - ): - """ - This test remove one node from frostfs_testlib.storage.cluster then add it back. Test uses base control operations with storage nodes (healthcheck, netmap-snapshot, set-status). - """ - wallet = default_wallet - placement_rule_3 = "REP 3 IN X CBF 1 SELECT 3 FROM * AS X" - placement_rule_4 = "REP 4 IN X CBF 1 SELECT 4 FROM * AS X" - source_file_path = generate_file(simple_object_size.value) - - storage_nodes = self.cluster.storage_nodes - random_node = random.choice(storage_nodes[1:]) - alive_node = random.choice( - [storage_node for storage_node in storage_nodes if storage_node.id != random_node.id] - ) - - check_node_in_map(random_node, shell=self.shell, alive_node=alive_node) - - # Add node to recovery list before messing with it - check_nodes.append(random_node) - exclude_node_from_network_map(random_node, alive_node, shell=self.shell, cluster=self.cluster) - delete_node_data(random_node) - - cid = create_container( - wallet, - rule=placement_rule_3, - basic_acl=PUBLIC_ACL, - shell=self.shell, - endpoint=alive_node.get_rpc_endpoint(), - ) - oid = put_object( - wallet, - source_file_path, - cid, - shell=self.shell, - endpoint=alive_node.get_rpc_endpoint(), - ) - wait_object_replication(cid, oid, 3, shell=self.shell, nodes=storage_nodes) - - self.return_nodes(alive_node) - - with allure.step("Check data could be replicated to new node"): - random_node = random.choice(list(set(storage_nodes) - {random_node, alive_node})) - # Add node to recovery list before messing with it - check_nodes.append(random_node) - exclude_node_from_network_map(random_node, alive_node, shell=self.shell, cluster=self.cluster) - - wait_object_replication( - cid, - oid, - 3, - shell=self.shell, - nodes=list(set(storage_nodes) - {random_node}), - ) - include_node_to_network_map(random_node, alive_node, shell=self.shell, cluster=self.cluster) - wait_object_replication(cid, oid, 3, shell=self.shell, nodes=storage_nodes) - - with allure.step("Check container could be created with new node"): - cid = create_container( - wallet, - rule=placement_rule_4, - basic_acl=PUBLIC_ACL, - shell=self.shell, - endpoint=alive_node.get_rpc_endpoint(), - ) - oid = put_object( - wallet, - source_file_path, - cid, - shell=self.shell, - endpoint=alive_node.get_rpc_endpoint(), - ) - wait_object_replication(cid, oid, 4, shell=self.shell, nodes=storage_nodes) - - @pytest.mark.node_mgmt - @allure.title("Drop object using control command") - def test_drop_object(self, default_wallet, complex_object_size: ObjectSize, simple_object_size: ObjectSize): - """ - Test checks object could be dropped using `frostfs-cli control drop-objects` command. - """ - wallet = default_wallet - endpoint = self.cluster.default_rpc_endpoint - file_path_simple = generate_file(simple_object_size.value) - file_path_complex = generate_file(complex_object_size.value) - - locode = get_locode_from_random_node(self.cluster) - rule = f"REP 1 IN SE CBF 1 SELECT 1 FROM LOC AS SE FILTER 'UN-LOCODE' EQ '{locode}' AS LOC" - cid = create_container(wallet, rule=rule, shell=self.shell, endpoint=endpoint) - oid_simple = put_object_to_random_node(wallet, file_path_simple, cid, shell=self.shell, cluster=self.cluster) - oid_complex = put_object_to_random_node(wallet, file_path_complex, cid, shell=self.shell, cluster=self.cluster) - - for oid in (oid_simple, oid_complex): - get_object_from_random_node(wallet, cid, oid, shell=self.shell, cluster=self.cluster) - head_object(wallet, cid, oid, shell=self.shell, endpoint=endpoint) - - nodes_with_object = get_nodes_with_object(cid, oid_simple, shell=self.shell, nodes=self.cluster.storage_nodes) - random_node = random.choice(nodes_with_object) - - for oid in (oid_simple, oid_complex): - with allure.step(f"Drop object {oid}"): - get_object_from_random_node(wallet, cid, oid, shell=self.shell, cluster=self.cluster) - head_object(wallet, cid, oid, shell=self.shell, endpoint=endpoint) - drop_object(random_node, cid, oid) - self.wait_for_obj_dropped(wallet, cid, oid, endpoint, get_object) - self.wait_for_obj_dropped(wallet, cid, oid, endpoint, head_object) - - @pytest.mark.node_mgmt - @pytest.mark.skip(reason="Need to clarify scenario") - @allure.title("Control Operations with storage nodes") - def test_shards( - self, - default_wallet, - create_container_and_pick_node, - simple_object_size: ObjectSize, - ): - wallet = default_wallet - file_path = generate_file(simple_object_size.value) - - cid, node = create_container_and_pick_node - original_oid = put_object_to_random_node(wallet, file_path, cid, self.shell, self.cluster) - - # for mode in ('read-only', 'degraded'): - for mode in ("degraded",): - shards = node_shard_list(node) - assert shards - - for shard in shards: - node_shard_set_mode(node, shard, mode) - - shards = node_shard_list(node) - assert shards - - with pytest.raises(RuntimeError): - put_object_to_random_node(wallet, file_path, cid, self.shell, self.cluster) - - with pytest.raises(RuntimeError): - delete_object(wallet, cid, original_oid, self.shell, self.cluster.default_rpc_endpoint) - - get_object_from_random_node(wallet, cid, original_oid, self.shell, self.cluster) - - for shard in shards: - node_shard_set_mode(node, shard, "read-write") - - shards = node_shard_list(node) - assert shards - - oid = put_object_to_random_node(wallet, file_path, cid, self.shell, self.cluster) - delete_object(wallet, cid, oid, self.shell, self.cluster.default_rpc_endpoint) - - @pytest.mark.node_mgmt - @allure.title("Put object with stopped node") - def test_stop_node(self, default_wallet, return_nodes_after_test_run, simple_object_size: ObjectSize): - wallet = default_wallet - placement_rule = "REP 3 IN X SELECT 4 FROM * AS X" - source_file_path = generate_file(simple_object_size.value) - storage_nodes = self.cluster.storage_nodes - random_node = random.choice(storage_nodes[1:]) - alive_node = random.choice( - [storage_node for storage_node in storage_nodes if storage_node.id != random_node.id] - ) - - cid = create_container( - wallet, - rule=placement_rule, - basic_acl=PUBLIC_ACL, - shell=self.shell, - endpoint=random_node.get_rpc_endpoint(), - ) - with allure.step("Stop the random node"): - check_nodes.append(random_node) - random_node.stop_service() - with allure.step("Try to put an object and expect success"): - put_object( - wallet, - source_file_path, - cid, - shell=self.shell, - endpoint=alive_node.get_rpc_endpoint(), - ) - self.return_nodes(alive_node) - - @allure.step("Wait for object to be dropped") - def wait_for_obj_dropped(self, wallet: str, cid: str, oid: str, endpoint: str, checker) -> None: - for _ in range(3): - try: - checker(wallet, cid, oid, shell=self.shell, endpoint=endpoint) - wait_for_gc_pass_on_storage_nodes() - except Exception as err: - if string_utils.is_str_match_pattern(err, OBJECT_NOT_FOUND): - return - raise AssertionError(f'Expected "{OBJECT_NOT_FOUND}" error, got\n{err}') - - raise AssertionError(f"Object {oid} was not dropped from node")