import logging import random import allure import pytest from frostfs_testlib import reporter from frostfs_testlib.shell import Shell from frostfs_testlib.steps.cli.container import create_container, delete_container from frostfs_testlib.steps.cli.object import delete_object, head_object, put_object from frostfs_testlib.storage.cluster import Cluster from frostfs_testlib.storage.controllers.cluster_state_controller import ClusterStateController from frostfs_testlib.storage.dataclasses.object_size import ObjectSize from frostfs_testlib.storage.dataclasses.wallet import WalletInfo from frostfs_testlib.testing.cluster_test_base import ClusterTestBase from frostfs_testlib.utils.failover_utils import wait_object_replication from frostfs_testlib.utils.file_utils import generate_file logger = logging.getLogger("NeoLogger") OBJECT_ATTRIBUTES = {"common_key": "common_value"} WAIT_FOR_REPLICATION = 60 # Adding failover mark because it may make cluster unhealthy @pytest.mark.sanity @pytest.mark.failover @pytest.mark.replication class TestReplication(ClusterTestBase): @pytest.fixture(autouse=True) def start_stopped_nodes_after_test(self, cluster_state_controller: ClusterStateController): yield cluster_state_controller.start_stopped_hosts() @allure.title("Replication (obj_size={object_size})") def test_replication( self, default_wallet: WalletInfo, client_shell: Shell, cluster: Cluster, object_size: ObjectSize, cluster_state_controller: ClusterStateController, ): nodes_count = len(cluster.cluster_nodes) node_for_rep = random.choice(cluster.cluster_nodes) alive_nodes = [node for node in cluster.cluster_nodes if node != node_for_rep] cid = create_container( wallet=default_wallet, shell=client_shell, endpoint=cluster.default_rpc_endpoint, rule=f"REP 1 IN SELF_PLACE REP {nodes_count - 1} IN OTHER_PLACE CBF 1 " "SELECT 1 FROM SELF AS SELF_PLACE " f"SELECT {nodes_count - 1} FROM OTHER AS OTHER_PLACE " f"FILTER 'UN-LOCODE' EQ '{node_for_rep.storage_node.get_un_locode()}' AS SELF " f"FILTER 'UN-LOCODE' NE '{node_for_rep.storage_node.get_un_locode()}' AS OTHER", ) cluster_state_controller.stop_node_host(node_for_rep, mode="hard") file_path = generate_file(object_size.value) with reporter.step("Put object"): oid = put_object( wallet=default_wallet, path=file_path, cid=cid, shell=client_shell, attributes=OBJECT_ATTRIBUTES, copies_number=3, endpoint=random.choice(alive_nodes).storage_node.get_rpc_endpoint(), timeout="45s", ) cluster_state_controller.start_node_host(node_for_rep) with reporter.step(f"Wait for replication."): object_nodes = wait_object_replication( cid=cid, oid=oid, expected_copies=len(self.cluster.cluster_nodes), shell=client_shell, nodes=self.cluster.storage_nodes, ) with reporter.step("Check attributes"): for node in object_nodes: header_info = head_object( wallet=default_wallet, oid=oid, cid=cid, shell=self.shell, endpoint=node.get_rpc_endpoint(), is_direct=True, )["header"] attributes = header_info["attributes"] for attribute_key, attribute_value in OBJECT_ATTRIBUTES.items(): assert attribute_key in attributes, f"{attribute_key} not found in {header_info}" assert header_info["attributes"].get(attribute_key) == str(attribute_value), ( f"{attribute_key} value not equal: " f"got attribute value: {attributes.get(attribute_key)}" f"expected attribute value: {attribute_value}" ) # TODO: Research why this fails # with reporter.step("Cleanup"): # delete_object( # wallet=default_wallet, # cid=cid, # oid=oid, # shell=client_shell, # endpoint=cluster.default_rpc_endpoint, # ) # delete_container( # wallet=default_wallet, # cid=cid, # shell=client_shell, # endpoint=cluster.default_rpc_endpoint, # )