forked from TrueCloudLab/frostfs-testcases
79 lines
3.5 KiB
Python
79 lines
3.5 KiB
Python
import logging
|
|
import random
|
|
|
|
import allure
|
|
import pytest
|
|
from frostfs_testlib import reporter
|
|
from frostfs_testlib.shell import Shell
|
|
from frostfs_testlib.steps.cli.container import delete_container
|
|
from frostfs_testlib.steps.cli.object import delete_object, head_object, put_object
|
|
from frostfs_testlib.storage.cluster import Cluster
|
|
from frostfs_testlib.storage.controllers.cluster_state_controller import ClusterStateController
|
|
from frostfs_testlib.storage.dataclasses.wallet import WalletInfo
|
|
from frostfs_testlib.testing.cluster_test_base import ClusterTestBase
|
|
from frostfs_testlib.utils.failover_utils import wait_object_replication
|
|
from frostfs_testlib.utils.file_utils import TestFile
|
|
|
|
from ...helpers.container_request import PUBLIC_WITH_POLICY, requires_container
|
|
|
|
logger = logging.getLogger("NeoLogger")
|
|
|
|
OBJECT_ATTRIBUTES = {"common_key": "common_value"}
|
|
WAIT_FOR_REPLICATION = 60
|
|
|
|
# Adding failover mark because it may make cluster unhealthy
|
|
@pytest.mark.sanity
|
|
@pytest.mark.failover
|
|
@pytest.mark.replication
|
|
class TestReplication(ClusterTestBase):
|
|
@allure.title("Replication (obj_size={object_size})")
|
|
@requires_container(PUBLIC_WITH_POLICY("REP %NODE_COUNT% CBF 1", short_name="Public_all_except_one_node"))
|
|
def test_replication(
|
|
self,
|
|
wallet: WalletInfo,
|
|
client_shell: Shell,
|
|
cluster: Cluster,
|
|
container: str,
|
|
test_file: TestFile,
|
|
cluster_state_controller: ClusterStateController,
|
|
):
|
|
nodes_count = len(cluster.cluster_nodes)
|
|
node_for_rep = random.choice(cluster.cluster_nodes)
|
|
alive_nodes = [node for node in cluster.cluster_nodes if node != node_for_rep]
|
|
|
|
with reporter.step("Stop container node host"):
|
|
cluster_state_controller.stop_node_host(node_for_rep, mode="hard")
|
|
|
|
with reporter.step("Put object"):
|
|
oid = put_object(
|
|
wallet=wallet,
|
|
path=test_file,
|
|
cid=container,
|
|
shell=client_shell,
|
|
attributes=OBJECT_ATTRIBUTES,
|
|
copies_number=nodes_count - 1,
|
|
endpoint=random.choice(alive_nodes).storage_node.get_rpc_endpoint(),
|
|
timeout="45s",
|
|
)
|
|
|
|
with reporter.step("Start container node host"):
|
|
cluster_state_controller.start_node_host(node_for_rep)
|
|
|
|
with reporter.step(f"Wait for replication"):
|
|
object_nodes = wait_object_replication(container, oid, nodes_count, client_shell, self.cluster.storage_nodes)
|
|
|
|
with reporter.step("Check attributes"):
|
|
for node in object_nodes:
|
|
header_info = head_object(wallet, container, oid, client_shell, node.get_rpc_endpoint(), is_direct=True).get("header", {})
|
|
attributes = header_info.get("attributes", {})
|
|
for attribute_key, attribute_value in OBJECT_ATTRIBUTES.items():
|
|
assert attribute_key in attributes, f"{attribute_key} not found in {header_info}"
|
|
assert header_info["attributes"].get(attribute_key) == str(attribute_value), (
|
|
f"{attribute_key} value not equal: "
|
|
f"got attribute value: {attributes.get(attribute_key)}"
|
|
f"expected attribute value: {attribute_value}"
|
|
)
|
|
|
|
with reporter.step("Cleanup"):
|
|
delete_object(wallet, container, oid, client_shell, cluster.default_rpc_endpoint)
|
|
delete_container(wallet, container, client_shell, cluster.default_rpc_endpoint)
|