diff --git a/pytest_tests/testsuites/replication/test_replication.py b/pytest_tests/testsuites/replication/test_replication.py new file mode 100644 index 0000000..e62c1ac --- /dev/null +++ b/pytest_tests/testsuites/replication/test_replication.py @@ -0,0 +1,101 @@ +import logging +import random +from time import sleep + +import allure +import pytest +from frostfs_testlib.shell import Shell +from frostfs_testlib.steps.cli.container import create_container, delete_container +from frostfs_testlib.steps.cli.object import delete_object, head_object, put_object +from frostfs_testlib.storage.cluster import Cluster +from frostfs_testlib.testing.cluster_test_base import ClusterTestBase +from frostfs_testlib.utils.file_utils import generate_file + +logger = logging.getLogger("NeoLogger") + +OBJECT_ATTRIBUTES = {"common_key": "common_value"} +WAIT_FOR_REPLICATION = 60 + + +@pytest.mark.replication +class TestReplication(ClusterTestBase): + @pytest.mark.parametrize( + "object_size", + [pytest.lazy_fixture("simple_object_size"), pytest.lazy_fixture("complex_object_size")], + ids=["simple object", "complex object"], + ) + @allure.title("Test replication") + def test_replication( + self, + default_wallet: str, + client_shell: Shell, + cluster: Cluster, + object_size, + cluster_state_controller, + ): + nodes_count = len(cluster.cluster_nodes) + node_for_rep = random.choice(cluster.cluster_nodes) + alive_nodes = [node for node in cluster.cluster_nodes if node != node_for_rep] + cid = create_container( + wallet=default_wallet, + shell=client_shell, + endpoint=cluster.default_rpc_endpoint, + rule=f"REP 1 IN SELF_PLACE REP {nodes_count - 1} IN OTHER_PLACE CBF 1 " + "SELECT 1 FROM SELF AS SELF_PLACE " + f"SELECT {nodes_count - 1} FROM OTHER AS OTHER_PLACE " + f"FILTER 'UN-LOCODE' EQ '{node_for_rep.storage_node.get_un_locode()}' AS SELF " + f"FILTER 'UN-LOCODE' NE '{node_for_rep.storage_node.get_un_locode()}' AS OTHER", + ) + + cluster_state_controller.stop_node_host(node_for_rep, mode="hard") + + file_path = generate_file(object_size) + + with allure.step("Put object"): + oid = put_object( + wallet=default_wallet, + path=file_path, + cid=cid, + shell=client_shell, + attributes=OBJECT_ATTRIBUTES, + copies_number=3, + endpoint=random.choice(alive_nodes).storage_node.get_rpc_endpoint(), + ) + + cluster_state_controller.start_node_host(node_for_rep) + with allure.step(f"Wait for replication. Sleep {WAIT_FOR_REPLICATION}s"): + sleep(WAIT_FOR_REPLICATION) + + for node in cluster.cluster_nodes: + with allure.step(f"Check object on node {node}"): + header_info = head_object( + wallet=default_wallet, + oid=oid, + cid=cid, + shell=self.shell, + endpoint=node.storage_node.get_rpc_endpoint(), + is_direct=True, + )["header"] + attributes = header_info["attributes"] + for attribute_key, attribute_value in OBJECT_ATTRIBUTES.items(): + assert ( + attribute_key in attributes + ), f"{attribute_key} not found in {header_info}" + assert header_info["attributes"].get(attribute_key) == str(attribute_value), ( + f"{attribute_key} value not equal: " + f"got attribute value: {attributes.get(attribute_key)}" + f"expected attribute value: {attribute_value}" + ) + with allure.step("Cleanup"): + delete_object( + cid=cid, + oid=oid, + shell=client_shell, + endpoint=cluster.default_rpc_endpoint, + ) + + delete_container( + cid=cid, + shell=client_shell, + endpoint=cluster.default_rpc_endpoint, + )