From 1d22162d2f2012f0472a735722488d6aa07d1824 Mon Sep 17 00:00:00 2001 From: Dmitriy Zayakin Date: Tue, 10 Oct 2023 16:39:24 +0300 Subject: [PATCH] Add network data interface test Signed-off-by: Dmitriy Zayakin --- .../failovers/test_failover_network.py | 152 +++++++++++++++--- 1 file changed, 127 insertions(+), 25 deletions(-) diff --git a/pytest_tests/testsuites/failovers/test_failover_network.py b/pytest_tests/testsuites/failovers/test_failover_network.py index 851ff2b3..233b480c 100644 --- a/pytest_tests/testsuites/failovers/test_failover_network.py +++ b/pytest_tests/testsuites/failovers/test_failover_network.py @@ -1,4 +1,5 @@ import logging +import os import random from time import sleep @@ -6,15 +7,14 @@ import allure import pytest from frostfs_testlib.resources.wellknown_acl import PUBLIC_ACL from frostfs_testlib.steps.cli.container import create_container -from frostfs_testlib.steps.cli.object import get_object, put_object_to_random_node +from frostfs_testlib.steps.cli.object import get_object, get_object_nodes, put_object_to_random_node +from frostfs_testlib.steps.storage_object import delete_objects from frostfs_testlib.storage.cluster import ClusterNode from frostfs_testlib.storage.controllers import ClusterStateController from frostfs_testlib.storage.dataclasses.object_size import ObjectSize +from frostfs_testlib.storage.dataclasses.storage_object_info import StorageObjectInfo from frostfs_testlib.testing.cluster_test_base import ClusterTestBase -from frostfs_testlib.utils.failover_utils import ( - wait_all_storage_nodes_returned, - wait_object_replication, -) +from frostfs_testlib.utils.failover_utils import wait_all_storage_nodes_returned, wait_object_replication from frostfs_testlib.utils.file_utils import generate_file, get_file_hash logger = logging.getLogger("NeoLogger") @@ -22,6 +22,13 @@ STORAGE_NODE_COMMUNICATION_PORT = "8080" STORAGE_NODE_COMMUNICATION_PORT_TLS = "8082" PORTS_TO_BLOCK = [STORAGE_NODE_COMMUNICATION_PORT, STORAGE_NODE_COMMUNICATION_PORT_TLS] blocked_nodes: list[ClusterNode] = [] +file_wait_delete = [] + +OBJECT_ATTRIBUTES = [ + None, + {"key1": 1, "key2": "abc", "common_key": "common_value"}, + {"key1": 2, "common_key": "common_value"}, +] @pytest.mark.failover @@ -40,6 +47,60 @@ class TestFailoverNetwork(ClusterTestBase): if not_empty: wait_all_storage_nodes_returned(self.shell, self.cluster) + @pytest.fixture() + @allure.title("Restore drop traffic to system") + def restore_traffic(self, cluster_state_controller: ClusterStateController): + yield + cluster_state_controller.restore_interfaces() + + @pytest.fixture() + @allure.title("Delete file after test") + def delete_file_after_test(self) -> None: + yield + for path in file_wait_delete: + os.remove(path) + + @pytest.fixture() + def storage_objects(self, simple_object_size: ObjectSize, default_wallet: str) -> list[StorageObjectInfo]: + + file_path = generate_file(simple_object_size.value) + file_hash = get_file_hash(file_path) + + with allure.step("Create container"): + placement_rule = "REP 1" + cid = create_container( + wallet=default_wallet, + shell=self.shell, + endpoint=self.cluster.default_rpc_endpoint, + rule=placement_rule, + await_mode=True, + ) + + storage_objects = [] + + with allure.step("Put object"): + for attribute in OBJECT_ATTRIBUTES: + oid = put_object_to_random_node( + wallet=default_wallet, + path=file_path, + cid=cid, + shell=self.shell, + cluster=self.cluster, + ) + + storage_object = StorageObjectInfo(cid=cid, oid=oid) + storage_object.size = simple_object_size.value + storage_object.wallet_file_path = default_wallet + storage_object.file_path = file_path + storage_object.file_hash = file_hash + storage_object.attributes = attribute + + storage_objects.append(storage_object) + + yield storage_objects + + delete_objects(storage_objects, self.shell, self.cluster) + @allure.title("Block Storage node traffic") def test_block_storage_node_traffic( self, @@ -64,13 +125,9 @@ class TestFailoverNetwork(ClusterTestBase): rule=placement_rule, basic_acl=PUBLIC_ACL, ) - oid = put_object_to_random_node( - wallet, source_file_path, cid, shell=self.shell, cluster=self.cluster - ) + oid = put_object_to_random_node(wallet, source_file_path, cid, shell=self.shell, cluster=self.cluster) - nodes = wait_object_replication( - cid, oid, 2, shell=self.shell, nodes=self.cluster.storage_nodes - ) + nodes = wait_object_replication(cid, oid, 2, shell=self.shell, nodes=self.cluster.storage_nodes) logger.info(f"Nodes are {nodes}") nodes_to_block = nodes @@ -82,9 +139,7 @@ class TestFailoverNetwork(ClusterTestBase): for node in nodes_to_block: with allure.step(f"Block incoming traffic at node {node} on port {PORTS_TO_BLOCK}"): block_node = [ - cluster_node - for cluster_node in self.cluster.cluster_nodes - if cluster_node.storage_node == node + cluster_node for cluster_node in self.cluster.cluster_nodes if cluster_node.storage_node == node ] blocked_nodes.append(*block_node) excluded_nodes.append(node) @@ -106,28 +161,75 @@ class TestFailoverNetwork(ClusterTestBase): assert node not in new_nodes with allure.step("Check object data is not corrupted"): - got_file_path = get_object( - wallet, cid, oid, endpoint=new_nodes[0].get_rpc_endpoint(), shell=self.shell - ) + got_file_path = get_object(wallet, cid, oid, endpoint=new_nodes[0].get_rpc_endpoint(), shell=self.shell) assert get_file_hash(source_file_path) == get_file_hash(got_file_path) for node in nodes_to_block: with allure.step(f"Unblock incoming traffic at host {node} on port {PORTS_TO_BLOCK}"): cluster_state_controller.restore_traffic(mode="ports", node=node) block_node = [ - cluster_node - for cluster_node in self.cluster.cluster_nodes - if cluster_node.storage_node == node + cluster_node for cluster_node in self.cluster.cluster_nodes if cluster_node.storage_node == node ] blocked_nodes.remove(*block_node) sleep(wakeup_node_timeout) with allure.step("Check object data is not corrupted"): - new_nodes = wait_object_replication( - cid, oid, 2, shell=self.shell, nodes=self.cluster.storage_nodes + new_nodes = wait_object_replication(cid, oid, 2, shell=self.shell, nodes=self.cluster.storage_nodes) + + got_file_path = get_object(wallet, cid, oid, shell=self.shell, endpoint=new_nodes[0].get_rpc_endpoint()) + assert get_file_hash(source_file_path) == get_file_hash(got_file_path) + + @allure.title("Block DATA interface node") + def test_block_data_interface( + self, + cluster_state_controller: ClusterStateController, + default_wallet: str, + restore_traffic: None, + delete_file_after_test: None, + storage_objects: list[StorageObjectInfo], + ): + storage_object = storage_objects[0] + + with allure.step("Search nodes with object"): + nodes_with_object = get_object_nodes( + cluster=self.cluster, + wallet=default_wallet, + cid=storage_object.cid, + oid=storage_object.oid, + shell=self.shell, + endpoint=self.cluster.default_rpc_endpoint, ) - got_file_path = get_object( - wallet, cid, oid, shell=self.shell, endpoint=new_nodes[0].get_rpc_endpoint() + with allure.step("Get data interface to node"): + data = "data" + config_interfaces = list(nodes_with_object[0].host.config.interfaces.keys()) + with allure.step(f"Get {data} in {config_interfaces}"): + data_interfaces = [interface for interface in config_interfaces if data in interface] + + with allure.step("Block data interfaces for node"): + for interface in data_interfaces: + cluster_state_controller.down_interface(nodes=nodes_with_object, interface=interface) + + with allure.step("Tick epoch and wait 2 block"): + nodes_without_an_object = list(set(self.cluster.cluster_nodes) - set(nodes_with_object)) + self.tick_epochs(1, alive_node=nodes_without_an_object[0].storage_node, wait_block=2) + + with allure.step("Get object for target nodes to data interfaces, expect false"): + with pytest.raises(RuntimeError, match="return code: 1"): + get_object( + wallet=default_wallet, + cid=storage_object.cid, + oid=storage_object.oid, + shell=self.shell, + endpoint=nodes_with_object[0].storage_node.get_rpc_endpoint(), + ) + + with allure.step(f"Get object others nodes, expect true"): + input_file = get_object( + wallet=default_wallet, + cid=storage_object.cid, + oid=storage_object.oid, + shell=self.shell, + endpoint=nodes_without_an_object[0].storage_node.get_rpc_endpoint(), ) - assert get_file_hash(source_file_path) == get_file_hash(got_file_path) + file_wait_delete.append(input_file)