import logging import random import allure import pytest from frostfs_testlib import reporter from frostfs_testlib.shell import Shell from frostfs_testlib.steps.cli.container import delete_container from frostfs_testlib.steps.cli.object import delete_object, head_object, put_object from frostfs_testlib.storage.cluster import Cluster from frostfs_testlib.storage.controllers.cluster_state_controller import ClusterStateController from frostfs_testlib.storage.dataclasses.wallet import WalletInfo from frostfs_testlib.testing.cluster_test_base import ClusterTestBase from frostfs_testlib.utils.failover_utils import wait_object_replication from frostfs_testlib.utils.file_utils import TestFile from ...helpers.container_request import PUBLIC_WITH_POLICY, requires_container logger = logging.getLogger("NeoLogger") OBJECT_ATTRIBUTES = {"common_key": "common_value"} WAIT_FOR_REPLICATION = 60 # Adding failover mark because it may make cluster unhealthy @pytest.mark.failover @pytest.mark.replication class TestReplication(ClusterTestBase): @allure.title("Replication (obj_size={object_size})") @requires_container(PUBLIC_WITH_POLICY("REP %NODE_COUNT% CBF 1", short_name="Public_all_except_one_node")) def test_replication( self, wallet: WalletInfo, client_shell: Shell, cluster: Cluster, container: str, test_file: TestFile, cluster_state_controller: ClusterStateController, ): nodes_count = len(cluster.cluster_nodes) node_for_rep = random.choice(cluster.cluster_nodes) alive_nodes = [node for node in cluster.cluster_nodes if node != node_for_rep] with reporter.step("Stop container node host"): cluster_state_controller.stop_node_host(node_for_rep, mode="hard") with reporter.step("Put object"): oid = put_object( wallet=wallet, path=test_file, cid=container, shell=client_shell, attributes=OBJECT_ATTRIBUTES, copies_number=nodes_count - 1, endpoint=random.choice(alive_nodes).storage_node.get_rpc_endpoint(), timeout="45s", ) with reporter.step("Start container node host"): cluster_state_controller.start_node_host(node_for_rep) with reporter.step(f"Wait for replication"): object_nodes = wait_object_replication(container, oid, nodes_count, client_shell, self.cluster.storage_nodes) with reporter.step("Check attributes"): for node in object_nodes: header_info = head_object(wallet, container, oid, client_shell, node.get_rpc_endpoint(), is_direct=True).get("header", {}) attributes = header_info.get("attributes", {}) for attribute_key, attribute_value in OBJECT_ATTRIBUTES.items(): assert attribute_key in attributes, f"{attribute_key} not found in {header_info}" assert header_info["attributes"].get(attribute_key) == str(attribute_value), ( f"{attribute_key} value not equal: " f"got attribute value: {attributes.get(attribute_key)}" f"expected attribute value: {attribute_value}" ) with reporter.step("Cleanup"): delete_object(wallet, container, oid, client_shell, cluster.default_rpc_endpoint) delete_container(wallet, container, client_shell, cluster.default_rpc_endpoint)