diff --git a/pytest_tests/testsuites/conftest.py b/pytest_tests/testsuites/conftest.py index d377338b..131bbfcb 100644 --- a/pytest_tests/testsuites/conftest.py +++ b/pytest_tests/testsuites/conftest.py @@ -6,6 +6,7 @@ from datetime import datetime, timedelta, timezone from typing import Optional import allure +from frostfs_testlib.storage.grpc_operations.client_wrappers import CliClientWrapper import pytest from dateutil import parser from frostfs_testlib import plugins, reporter @@ -201,6 +202,12 @@ def frostfs_cli(client_shell: Shell, default_wallet: WalletInfo) -> FrostfsCli: return FrostfsCli(client_shell, FROSTFS_CLI_EXEC, default_wallet.config_path) +@pytest.fixture(scope="session") +@allure.title("Init CliClientWrapper with local Frostfs CLI") +def grpc_client(frostfs_cli: FrostfsCli) -> CliClientWrapper: + return CliClientWrapper(frostfs_cli) + + # By default we want all tests to be executed with both storage policies. # This can be overriden in choosen tests if needed. @pytest.fixture( diff --git a/pytest_tests/testsuites/replication/test_ec_replication.py b/pytest_tests/testsuites/replication/test_ec_replication.py index bb560415..4899c81a 100644 --- a/pytest_tests/testsuites/replication/test_ec_replication.py +++ b/pytest_tests/testsuites/replication/test_ec_replication.py @@ -4,6 +4,8 @@ import time import allure from frostfs_testlib.shell.interfaces import Shell +from frostfs_testlib.steps.node_management import check_node_in_map, include_node_to_network_map, remove_nodes_from_map_morph, storage_node_set_status +from frostfs_testlib.storage.grpc_operations.client_wrappers import CliClientWrapper from frostfs_testlib.utils import datetime_utils import pytest import yaml @@ -92,14 +94,6 @@ class TestECReplication(ClusterTestBase): rep_count *= int(COMPLEX_OBJECT_CHUNKS_COUNT) + 1 if COMPLEX_OBJECT_TAIL_SIZE else int(COMPLEX_OBJECT_CHUNKS_COUNT) return rep_count - @wait_for_success(120, 5) - def wait_for_nodes_appears_in_map(self, frostfs_cli: FrostfsCli, alive_node: ClusterNode, desired_nodes_count: int) -> bool: - self.tick_epoch(alive_node, 2) - netmap = parse_netmap_output( - frostfs_cli.netmap.snapshot(alive_node.storage_node.get_rpc_endpoint(), timeout=CLI_DEFAULT_TIMEOUT).stdout - ) - assert len(netmap) == desired_nodes_count - @wait_for_success(120, 5) def wait_replication(self, total_chunks: int, local_cli: FrostfsCli, cid: str, oid: str, success: bool = True) -> None: if not success: @@ -207,6 +201,18 @@ class TestECReplication(ClusterTestBase): with reporter.step(f"Enable policer for nodes"): cluster_state_controller.start_stopped_hosts() cluster_state_controller.manager(ConfigStateManager).revert_all() + + @wait_for_success(300, 15) + @reporter.step("Check count nodes chunks") + def wait_sync_count_chunks_nodes(self, grpc_client: CliClientWrapper, cid: str, oid: str, count: int): + all_chunks_after_include_node = grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, cid, oid) + chunks_nodes = [node for chunk in all_chunks_after_include_node for node in chunk.confirmed_nodes] + assert len(chunks_nodes) == count + + @wait_for_success(300, 15) + def include_node_in_map(self, alive_storage_node: StorageNode, include_storage_node: StorageNode): + self.tick_epoch(alive_storage_node, 2) + check_node_in_map(include_storage_node, self.shell, alive_storage_node) @allure.title("Create container with EC policy (size={object_size})") def test_create_container_with_ec_policy( @@ -352,43 +358,50 @@ class TestECReplication(ClusterTestBase): self, simple_object_size: ObjectSize, frostfs_cli: FrostfsCli, - default_user: User, - cluster_state_controller: ClusterStateController, + grpc_client: CliClientWrapper, ) -> None: with reporter.step("Create container."): - cid = self.create_container(frostfs_cli, self.cluster.default_rpc_endpoint, "EC 2.1") + cid = grpc_client.container.create(self.cluster.default_rpc_endpoint, policy="EC 2.1", await_mode=True) with reporter.step("Put object on container."): test_file = generate_file(simple_object_size.value) - oid = put_object(default_user.wallet, test_file, cid, self.shell, self.cluster.default_rpc_endpoint) + oid = grpc_client.object.put(test_file, cid, self.cluster.default_rpc_endpoint) with reporter.step("Check count chunks nodes on 3."): assert self.check_replication(3, frostfs_cli, cid, oid) - with reporter.step("Stop node with chunk."): - data_chunk = self.get_data_chunk_object(frostfs_cli, cid, oid) - first_all_chunks = self.get_all_chunks_object(frostfs_cli, cid, oid) - node_data_chunk = self.get_chunk_node(frostfs_cli, data_chunk)[0] - cluster_state_controller.stop_node_host(node_data_chunk, "hard") + with reporter.step("Search node with chunk."): + data_chunk = grpc_client.object.chunks.get_first_data(self.cluster.default_rpc_endpoint, cid, oid=oid) + node_data_chunk = grpc_client.object.chunks.get_chunk_node(self.cluster, data_chunk)[0] + first_all_chunks = grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, cid, oid) + + with reporter.step("Remove chunk node from network map"): + remove_nodes_from_map_morph( + shell=self.shell, cluster=self.cluster, remove_nodes=[node_data_chunk.storage_node] + ) - with reporter.step("Tick epoch and wait update network map."): + with reporter.step("Tick epoch."): alive_node = list(set(self.cluster.cluster_nodes) - {node_data_chunk})[0] - self.wait_for_nodes_appears_in_map(frostfs_cli, alive_node, 3) + self.tick_epoch(alive_node.storage_node, 2) with reporter.step("Wait replication chunk with different node."): - node = self.search_node_not_chunks(first_all_chunks, frostfs_cli, endpoint=alive_node.storage_node.get_rpc_endpoint())[0] + node = grpc_client.object.chunks.search_node_without_chunks(first_all_chunks, self.cluster, alive_node.storage_node.get_rpc_endpoint())[0] self.wait_replication(3, frostfs_cli, cid, oid) with reporter.step("Get new chunks"): - second_all_chunks = self.get_all_chunks_object(frostfs_cli, cid, oid, node.storage_node.get_rpc_endpoint()) + second_all_chunks = grpc_client.object.chunks.get_all(node.storage_node.get_rpc_endpoint(), cid, oid) with reporter.step("Check that oid no change."): assert [chunk for chunk in second_all_chunks if data_chunk.object_id == chunk.object_id] - with reporter.step("Start stopped host, and check delete 4 chunk."): - cluster_state_controller.start_node_host(node_data_chunk) - all_chunks_after_start_node = self.get_all_chunks_object(frostfs_cli, cid, oid) - assert len(all_chunks_after_start_node) == 3 + with reporter.step("Include node in netmap"): + include_node_to_network_map( + node_to_include=node_data_chunk.storage_node, + alive_node=alive_node.storage_node, + shell=self.shell, + cluster=self.cluster) + + self.wait_sync_count_chunks_nodes(grpc_client, cid, oid, 3) @allure.title("EC X.Y combinations (nodes={node_count},policy={ec_policy},size={object_size})") def test_create_container_with_difference_count_nodes( @@ -644,7 +657,6 @@ class TestECReplication(ClusterTestBase): frostfs_cli: FrostfsCli, object_size: ObjectSize, default_user: User, - cluster_state_controller: ClusterStateController, ) -> None: with reporter.step("Create container."): cid = self.create_container(frostfs_cli, self.cluster.default_rpc_endpoint, "EC 2.1") @@ -673,20 +685,24 @@ class TestECReplication(ClusterTestBase): with pytest.raises(RuntimeError, match="object is locked"): frostfs_cli.object.delete(self.cluster.default_rpc_endpoint, cid, oid, timeout=CLI_DEFAULT_TIMEOUT) - with reporter.step("Stop chunk node."): + with reporter.step("Remove node in netmap."): chunk_node = self.get_chunk_node(frostfs_cli, chunks_object[0]) - cluster_state_controller.stop_node_host(chunk_node[0], "hard") - cluster_state_controller.start_node_host(chunk_node[0]) + alive_node = list(set(self.cluster.cluster_nodes) - {chunk_node[0]})[0] + remove_nodes_from_map_morph(self.shell, self.cluster, [chunk_node[0].storage_node], alive_node.storage_node) with reporter.step("Check don`t delete chunk."): for chunk in chunks_object: - with pytest.raises(RuntimeError, match="Lock EC chunk failed"): + with pytest.raises(RuntimeError, match="Lock EC chunk failed|object not found"): frostfs_cli.object.delete(self.cluster.default_rpc_endpoint, cid, chunk.object_id, timeout=CLI_DEFAULT_TIMEOUT) with reporter.step("Check enable LOCK object"): with pytest.raises(RuntimeError, match="object is locked"): frostfs_cli.object.delete(self.cluster.default_rpc_endpoint, cid, oid, timeout=CLI_DEFAULT_TIMEOUT) + with reporter.step("Include node in netmap"): + storage_node_set_status(chunk_node[0].storage_node, "online") + self.include_node_in_map(alive_node.storage_node, chunk_node[0].storage_node) + @allure.title("Output MaxEC* params in frostf-scli (type={type_shards})") @pytest.mark.parametrize("type_shards", ["Maximum count of data shards", "Maximum count of parity shards"]) def test_maxec_info_with_output_cli(self, frostfs_cli: FrostfsCli, type_shards: str) -> None: