frostfs-testcases/pytest_tests/testsuites/replication/test_replication.py

111 lines
4.4 KiB
Python

import logging
import random
from time import sleep
import allure
import pytest
from frostfs_testlib.shell import Shell
from frostfs_testlib.steps.cli.container import create_container, delete_container
from frostfs_testlib.steps.cli.object import delete_object, head_object, put_object
from frostfs_testlib.storage.cluster import Cluster
from frostfs_testlib.storage.controllers.cluster_state_controller import ClusterStateController
from frostfs_testlib.storage.dataclasses.object_size import ObjectSize
from frostfs_testlib.testing.cluster_test_base import ClusterTestBase
from frostfs_testlib.utils.file_utils import generate_file
logger = logging.getLogger("NeoLogger")
OBJECT_ATTRIBUTES = {"common_key": "common_value"}
WAIT_FOR_REPLICATION = 60
# Adding failover mark because it may make cluster unhealthy
@pytest.mark.failover
@pytest.mark.replication
class TestReplication(ClusterTestBase):
@pytest.fixture(autouse=True)
def start_stopped_nodes_after_test(self, cluster_state_controller: ClusterStateController):
yield
cluster_state_controller.start_stopped_hosts()
@pytest.mark.parametrize(
"object_size",
[pytest.lazy_fixture("simple_object_size"), pytest.lazy_fixture("complex_object_size")],
ids=["simple object size", "complex object size"],
)
@allure.title("Test replication for {object_size}")
def test_replication(
self,
default_wallet: str,
client_shell: Shell,
cluster: Cluster,
object_size: ObjectSize,
cluster_state_controller: ClusterStateController,
):
nodes_count = len(cluster.cluster_nodes)
node_for_rep = random.choice(cluster.cluster_nodes)
alive_nodes = [node for node in cluster.cluster_nodes if node != node_for_rep]
cid = create_container(
wallet=default_wallet,
shell=client_shell,
endpoint=cluster.default_rpc_endpoint,
rule=f"REP 1 IN SELF_PLACE REP {nodes_count - 1} IN OTHER_PLACE CBF 1 "
"SELECT 1 FROM SELF AS SELF_PLACE "
f"SELECT {nodes_count - 1} FROM OTHER AS OTHER_PLACE "
f"FILTER 'UN-LOCODE' EQ '{node_for_rep.storage_node.get_un_locode()}' AS SELF "
f"FILTER 'UN-LOCODE' NE '{node_for_rep.storage_node.get_un_locode()}' AS OTHER",
)
cluster_state_controller.stop_node_host(node_for_rep, mode="hard")
file_path = generate_file(object_size.value)
with allure.step("Put object"):
oid = put_object(
wallet=default_wallet,
path=file_path,
cid=cid,
shell=client_shell,
attributes=OBJECT_ATTRIBUTES,
copies_number=3,
endpoint=random.choice(alive_nodes).storage_node.get_rpc_endpoint(),
)
cluster_state_controller.start_node_host(node_for_rep)
with allure.step(f"Wait for replication. Sleep {WAIT_FOR_REPLICATION}s"):
sleep(WAIT_FOR_REPLICATION)
for node in cluster.cluster_nodes:
with allure.step(f"Check object on node {node}"):
header_info = head_object(
wallet=default_wallet,
oid=oid,
cid=cid,
shell=self.shell,
endpoint=node.storage_node.get_rpc_endpoint(),
is_direct=True,
)["header"]
attributes = header_info["attributes"]
for attribute_key, attribute_value in OBJECT_ATTRIBUTES.items():
assert (
attribute_key in attributes
), f"{attribute_key} not found in {header_info}"
assert header_info["attributes"].get(attribute_key) == str(attribute_value), (
f"{attribute_key} value not equal: "
f"got attribute value: {attributes.get(attribute_key)}"
f"expected attribute value: {attribute_value}"
)
with allure.step("Cleanup"):
delete_object(
wallet=default_wallet,
cid=cid,
oid=oid,
shell=client_shell,
endpoint=cluster.default_rpc_endpoint,
)
delete_container(
wallet=default_wallet,
cid=cid,
shell=client_shell,
endpoint=cluster.default_rpc_endpoint,
)