[#361] Update test_replication to work with more than 4 nodes
All checks were successful
DCO check / DCO (pull_request) Successful in 25s

Signed-off-by: a.berezin <a.berezin@yadro.com>
This commit is contained in:
Andrey Berezin 2025-01-22 17:01:02 +03:00 committed by Andrey Berezin
parent 2976d3f936
commit b5d0a6691c

View file

@ -5,15 +5,16 @@ import allure
import pytest
from frostfs_testlib import reporter
from frostfs_testlib.shell import Shell
from frostfs_testlib.steps.cli.container import create_container
from frostfs_testlib.steps.cli.object import head_object, put_object
from frostfs_testlib.steps.cli.container import delete_container
from frostfs_testlib.steps.cli.object import delete_object, head_object, put_object
from frostfs_testlib.storage.cluster import Cluster
from frostfs_testlib.storage.controllers.cluster_state_controller import ClusterStateController
from frostfs_testlib.storage.dataclasses.object_size import ObjectSize
from frostfs_testlib.storage.dataclasses.wallet import WalletInfo
from frostfs_testlib.testing.cluster_test_base import ClusterTestBase
from frostfs_testlib.utils.failover_utils import wait_object_replication
from frostfs_testlib.utils.file_utils import generate_file
from frostfs_testlib.utils.file_utils import TestFile
from ...helpers.container_request import PUBLIC_WITH_POLICY, requires_container
logger = logging.getLogger("NeoLogger")
@ -26,65 +27,45 @@ WAIT_FOR_REPLICATION = 60
@pytest.mark.replication
class TestReplication(ClusterTestBase):
@allure.title("Replication (obj_size={object_size})")
@requires_container(PUBLIC_WITH_POLICY("REP %NODE_COUNT% CBF 1", short_name="Public_all_except_one_node"))
def test_replication(
self,
default_wallet: WalletInfo,
wallet: WalletInfo,
client_shell: Shell,
cluster: Cluster,
object_size: ObjectSize,
container: str,
test_file: TestFile,
cluster_state_controller: ClusterStateController,
):
nodes_count = len(cluster.cluster_nodes)
node_for_rep = random.choice(cluster.cluster_nodes)
alive_nodes = [node for node in cluster.cluster_nodes if node != node_for_rep]
cid = create_container(
wallet=default_wallet,
shell=client_shell,
endpoint=cluster.default_rpc_endpoint,
rule=f"REP 1 IN SELF_PLACE REP {nodes_count - 1} IN OTHER_PLACE CBF 1 "
"SELECT 1 FROM SELF AS SELF_PLACE "
f"SELECT {nodes_count - 1} FROM OTHER AS OTHER_PLACE "
f"FILTER 'UN-LOCODE' EQ '{node_for_rep.storage_node.get_un_locode()}' AS SELF "
f"FILTER 'UN-LOCODE' NE '{node_for_rep.storage_node.get_un_locode()}' AS OTHER",
)
cluster_state_controller.stop_node_host(node_for_rep, mode="hard")
file_path = generate_file(object_size.value)
with reporter.step("Stop container node host"):
cluster_state_controller.stop_node_host(node_for_rep, mode="hard")
with reporter.step("Put object"):
oid = put_object(
wallet=default_wallet,
path=file_path,
cid=cid,
wallet=wallet,
path=test_file,
cid=container,
shell=client_shell,
attributes=OBJECT_ATTRIBUTES,
copies_number=3,
copies_number=nodes_count - 1,
endpoint=random.choice(alive_nodes).storage_node.get_rpc_endpoint(),
timeout="45s",
)
cluster_state_controller.start_node_host(node_for_rep)
with reporter.step(f"Wait for replication."):
object_nodes = wait_object_replication(
cid=cid,
oid=oid,
expected_copies=len(self.cluster.cluster_nodes),
shell=client_shell,
nodes=self.cluster.storage_nodes,
)
with reporter.step("Start container node host"):
cluster_state_controller.start_node_host(node_for_rep)
with reporter.step(f"Wait for replication"):
object_nodes = wait_object_replication(container, oid, nodes_count, client_shell, self.cluster.storage_nodes)
with reporter.step("Check attributes"):
for node in object_nodes:
header_info = head_object(
wallet=default_wallet,
oid=oid,
cid=cid,
shell=self.shell,
endpoint=node.get_rpc_endpoint(),
is_direct=True,
)["header"]
attributes = header_info["attributes"]
header_info = head_object(wallet, container, oid, client_shell, node.get_rpc_endpoint(), is_direct=True).get("header", {})
attributes = header_info.get("attributes", {})
for attribute_key, attribute_value in OBJECT_ATTRIBUTES.items():
assert attribute_key in attributes, f"{attribute_key} not found in {header_info}"
assert header_info["attributes"].get(attribute_key) == str(attribute_value), (
@ -93,19 +74,6 @@ class TestReplication(ClusterTestBase):
f"expected attribute value: {attribute_value}"
)
# TODO: Research why this fails
# with reporter.step("Cleanup"):
# delete_object(
# wallet=default_wallet,
# cid=cid,
# oid=oid,
# shell=client_shell,
# endpoint=cluster.default_rpc_endpoint,
# )
# delete_container(
# wallet=default_wallet,
# cid=cid,
# shell=client_shell,
# endpoint=cluster.default_rpc_endpoint,
# )
with reporter.step("Cleanup"):
delete_object(wallet, container, oid, client_shell, cluster.default_rpc_endpoint)
delete_container(wallet, container, client_shell, cluster.default_rpc_endpoint)