import logging import random from time import sleep import allure import pytest from frostfs_testlib.shell import Shell from frostfs_testlib.steps.cli.container import create_container, delete_container from frostfs_testlib.steps.cli.object import delete_object, head_object, put_object from frostfs_testlib.storage.cluster import Cluster from frostfs_testlib.storage.controllers.cluster_state_controller import ClusterStateController from frostfs_testlib.testing.cluster_test_base import ClusterTestBase from frostfs_testlib.utils.file_utils import generate_file logger = logging.getLogger("NeoLogger") OBJECT_ATTRIBUTES = {"common_key": "common_value"} WAIT_FOR_REPLICATION = 60 # Adding failover mark because it may make cluster unhealthy @pytest.mark.failover @pytest.mark.replication class TestReplication(ClusterTestBase): @pytest.fixture(autouse=True) def start_stopped_nodes_after_test(self, cluster_state_controller: ClusterStateController): yield cluster_state_controller.start_stopped_hosts() @pytest.mark.parametrize( "object_size", [pytest.lazy_fixture("simple_object_size"), pytest.lazy_fixture("complex_object_size")], ids=["simple object", "complex object"], ) @allure.title("Test replication") def test_replication( self, default_wallet: str, client_shell: Shell, cluster: Cluster, object_size, cluster_state_controller: ClusterStateController, ): nodes_count = len(cluster.cluster_nodes) node_for_rep = random.choice(cluster.cluster_nodes) alive_nodes = [node for node in cluster.cluster_nodes if node != node_for_rep] cid = create_container( wallet=default_wallet, shell=client_shell, endpoint=cluster.default_rpc_endpoint, rule=f"REP 1 IN SELF_PLACE REP {nodes_count - 1} IN OTHER_PLACE CBF 1 " "SELECT 1 FROM SELF AS SELF_PLACE " f"SELECT {nodes_count - 1} FROM OTHER AS OTHER_PLACE " f"FILTER 'UN-LOCODE' EQ '{node_for_rep.storage_node.get_un_locode()}' AS SELF " f"FILTER 'UN-LOCODE' NE '{node_for_rep.storage_node.get_un_locode()}' AS OTHER", ) cluster_state_controller.stop_node_host(node_for_rep, mode="hard") file_path = generate_file(object_size) with allure.step("Put object"): oid = put_object( wallet=default_wallet, path=file_path, cid=cid, shell=client_shell, attributes=OBJECT_ATTRIBUTES, copies_number=3, endpoint=random.choice(alive_nodes).storage_node.get_rpc_endpoint(), ) cluster_state_controller.start_node_host(node_for_rep) with allure.step(f"Wait for replication. Sleep {WAIT_FOR_REPLICATION}s"): sleep(WAIT_FOR_REPLICATION) for node in cluster.cluster_nodes: with allure.step(f"Check object on node {node}"): header_info = head_object( wallet=default_wallet, oid=oid, cid=cid, shell=self.shell, endpoint=node.storage_node.get_rpc_endpoint(), is_direct=True, )["header"] attributes = header_info["attributes"] for attribute_key, attribute_value in OBJECT_ATTRIBUTES.items(): assert ( attribute_key in attributes ), f"{attribute_key} not found in {header_info}" assert header_info["attributes"].get(attribute_key) == str(attribute_value), ( f"{attribute_key} value not equal: " f"got attribute value: {attributes.get(attribute_key)}" f"expected attribute value: {attribute_value}" ) with allure.step("Cleanup"): delete_object( wallet=default_wallet, cid=cid, oid=oid, shell=client_shell, endpoint=cluster.default_rpc_endpoint, ) delete_container( wallet=default_wallet, cid=cid, shell=client_shell, endpoint=cluster.default_rpc_endpoint, )