diff --git a/pytest_tests/testsuites/conftest.py b/pytest_tests/testsuites/conftest.py index 3d423d1d..9dedc53d 100644 --- a/pytest_tests/testsuites/conftest.py +++ b/pytest_tests/testsuites/conftest.py @@ -6,7 +6,6 @@ from datetime import datetime, timedelta, timezone from typing import Optional import allure -from frostfs_testlib.storage.grpc_operations.client_wrappers import CliClientWrapper import pytest from dateutil import parser from frostfs_testlib import plugins, reporter @@ -28,6 +27,8 @@ from frostfs_testlib.storage.dataclasses.frostfs_services import StorageNode from frostfs_testlib.storage.dataclasses.object_size import ObjectSize from frostfs_testlib.storage.dataclasses.policy import PlacementPolicy from frostfs_testlib.storage.dataclasses.wallet import WalletInfo +from frostfs_testlib.storage.grpc_operations.client_wrappers import CliClientWrapper +from frostfs_testlib.storage.grpc_operations.interfaces import GrpcClientWrapper from frostfs_testlib.testing.cluster_test_base import ClusterTestBase from frostfs_testlib.testing.parallel import parallel from frostfs_testlib.testing.test_control import run_optionally, wait_for_success @@ -41,6 +42,7 @@ logger = logging.getLogger("NeoLogger") SERVICE_ACTIVE_TIME = 20 WALLTETS_IN_POOL = 2 + # Add logs check test even if it's not fit to mark selectors def pytest_configure(config: pytest.Config): markers = config.option.markexpr @@ -51,6 +53,8 @@ def pytest_configure(config: pytest.Config): number_key = pytest.StashKey[str]() start_time = pytest.StashKey[int]() test_outcome = pytest.StashKey[str]() + + # pytest hook. Do not rename def pytest_collection_modifyitems(items: list[pytest.Item]): # Change order of tests based on @pytest.mark.order() marker @@ -108,11 +112,7 @@ def pytest_generate_tests(metafunc: pytest.Metafunc): return metafunc.fixturenames.append("cycle") - metafunc.parametrize( - "cycle", - range(1, TEST_CYCLES_COUNT + 1), - ids=[f"cycle {cycle}" for cycle in range(1, TEST_CYCLES_COUNT + 1)], - ) + metafunc.parametrize("cycle", range(1, TEST_CYCLES_COUNT + 1), ids=[f"cycle {cycle}" for cycle in range(1, TEST_CYCLES_COUNT + 1)]) @pytest.fixture(scope="session") @@ -148,11 +148,7 @@ def require_multiple_interfaces(cluster: Cluster): def max_object_size(cluster: Cluster, client_shell: Shell) -> int: storage_node = cluster.storage_nodes[0] wallet = WalletInfo.from_node(storage_node) - net_info = get_netmap_netinfo( - wallet=wallet, - endpoint=storage_node.get_rpc_endpoint(), - shell=client_shell, - ) + net_info = get_netmap_netinfo(wallet=wallet, endpoint=storage_node.get_rpc_endpoint(), shell=client_shell) yield net_info["maximum_object_size"] @@ -176,8 +172,7 @@ def complex_object_size(max_object_size: int) -> ObjectSize: # By default we want all tests to be executed with both object sizes # This can be overriden in choosen tests if needed @pytest.fixture( - scope="session", - params=[pytest.param("simple", marks=pytest.mark.simple), pytest.param("complex", marks=pytest.mark.complex)], + scope="session", params=[pytest.param("simple", marks=pytest.mark.simple), pytest.param("complex", marks=pytest.mark.complex)] ) def object_size(simple_object_size: ObjectSize, complex_object_size: ObjectSize, request: pytest.FixtureRequest) -> ObjectSize: if request.param == "simple": @@ -203,17 +198,14 @@ def frostfs_cli(client_shell: Shell, default_wallet: WalletInfo) -> FrostfsCli: @pytest.fixture(scope="session") -@allure.title("Init CliClientWrapper with local Frostfs CLI") -def grpc_client(frostfs_cli: FrostfsCli) -> CliClientWrapper: +@allure.title("Init GrpcClientWrapper with local Frostfs CLI") +def grpc_client(frostfs_cli: FrostfsCli) -> GrpcClientWrapper: return CliClientWrapper(frostfs_cli) # By default we want all tests to be executed with both storage policies. # This can be overriden in choosen tests if needed. -@pytest.fixture( - scope="session", - params=[pytest.param("rep", marks=pytest.mark.rep), pytest.param("ec", marks=pytest.mark.ec)], -) +@pytest.fixture(scope="session", params=[pytest.param("rep", marks=pytest.mark.rep), pytest.param("ec", marks=pytest.mark.ec)]) def placement_policy( rep_placement_policy: PlacementPolicy, ec_placement_policy: PlacementPolicy, request: pytest.FixtureRequest ) -> PlacementPolicy: diff --git a/pytest_tests/testsuites/replication/test_ec_replication.py b/pytest_tests/testsuites/replication/test_ec_replication.py index 4899c81a..e27ba6b2 100644 --- a/pytest_tests/testsuites/replication/test_ec_replication.py +++ b/pytest_tests/testsuites/replication/test_ec_replication.py @@ -1,31 +1,25 @@ import json -from dataclasses import dataclass import time import allure -from frostfs_testlib.shell.interfaces import Shell -from frostfs_testlib.steps.node_management import check_node_in_map, include_node_to_network_map, remove_nodes_from_map_morph, storage_node_set_status -from frostfs_testlib.storage.grpc_operations.client_wrappers import CliClientWrapper -from frostfs_testlib.utils import datetime_utils import pytest import yaml -from frostfs_testlib import plugins, reporter +from frostfs_testlib import reporter from frostfs_testlib.cli import FrostfsAdm, FrostfsCli from frostfs_testlib.cli.netmap_parser import NetmapParser -from frostfs_testlib.credentials.interfaces import User from frostfs_testlib.resources.cli import CLI_DEFAULT_TIMEOUT, FROSTFS_ADM_CONFIG_PATH, FROSTFS_ADM_EXEC, FROSTFS_CLI_EXEC from frostfs_testlib.resources.common import COMPLEX_OBJECT_CHUNKS_COUNT, COMPLEX_OBJECT_TAIL_SIZE, HOSTING_CONFIG_FILE, MORPH_BLOCK_TIME from frostfs_testlib.s3 import AwsCliClient, S3ClientWrapper from frostfs_testlib.s3.interfaces import BucketContainerResolver, VersioningStatus -from frostfs_testlib.steps.cli.object import get_object, put_object from frostfs_testlib.storage.cluster import Cluster, ClusterNode, StorageNode -from frostfs_testlib.storage.controllers import ClusterStateController, ShardsWatcher +from frostfs_testlib.storage.controllers import ClusterStateController from frostfs_testlib.storage.controllers.state_managers.config_state_manager import ConfigStateManager from frostfs_testlib.storage.dataclasses.object_size import ObjectSize -from frostfs_testlib.storage.dataclasses.storage_object_info import NodeNetmapInfo +from frostfs_testlib.storage.dataclasses.storage_object_info import Chunk +from frostfs_testlib.storage.grpc_operations.interfaces import GrpcClientWrapper from frostfs_testlib.testing.cluster_test_base import ClusterTestBase from frostfs_testlib.testing.test_control import wait_for_success -from frostfs_testlib.utils.cli_utils import parse_netmap_output +from frostfs_testlib.utils import datetime_utils from frostfs_testlib.utils.file_utils import generate_file, get_file_hash @@ -48,19 +42,6 @@ def pytest_generate_tests(metafunc: pytest.Metafunc) -> None: metafunc.parametrize("ec_policy, node_count", ((ec_policy, node_count) for ec_policy in ec_map[node_count])) -@dataclass -class Chunk: - def __init__(self, object_id: str, required_nodes: list, confirmed_nodes: list, ec_parent_object_id: str, ec_index: int) -> None: - self.object_id = object_id - self.required_nodes = required_nodes - self.confirmed_nodes = confirmed_nodes - self.ec_parent_object_id = ec_parent_object_id - self.ec_index = ec_index - - def __str__(self) -> str: - return self.object_id - - @allure.title("Initialized remote FrostfsAdm") @pytest.fixture def frostfs_remote_adm(cluster: Cluster) -> FrostfsAdm: @@ -80,11 +61,11 @@ class TestECReplication(ClusterTestBase): @pytest.fixture() def restore_nodes_shards_mode(self): - yield + yield for cli, endpoint in self.cli_change_shards_mode.items(): - cli.shards.set_mode(endpoint, mode='read-write', all=True) - + cli.shards.set_mode(endpoint, mode="read-write", all=True) + time.sleep(datetime_utils.parse_time(MORPH_BLOCK_TIME)) @pytest.fixture() @@ -95,23 +76,11 @@ class TestECReplication(ClusterTestBase): return rep_count @wait_for_success(120, 5) - def wait_replication(self, total_chunks: int, local_cli: FrostfsCli, cid: str, oid: str, success: bool = True) -> None: + def wait_replication(self, total_chunks: int, client: GrpcClientWrapper, cid: str, oid: str, success: bool = True) -> None: if not success: - assert not self.check_replication(total_chunks, local_cli, cid, oid) + assert not self.check_replication(total_chunks, client, cid, oid) else: - assert self.check_replication(total_chunks, local_cli, cid, oid) - - @allure.title("Search shard") - def get_shard_chunk(self, node: ClusterNode, chunk: Chunk) -> str: - oid_path = f"{chunk.object_id[0]}/{chunk.object_id[1]}/{chunk.object_id[2]}/{chunk.object_id[3]}" - node_shell = node.storage_node.host.get_shell() - shards_watcher = ShardsWatcher(node) - - with reporter.step("Search object file"): - for shard_id, shard_info in shards_watcher.shards_snapshots[-1].items(): - check_dir = node_shell.exec(f" [ -d {shard_info['blobstor'][1]['path']}/{oid_path} ] && echo 1 || echo 0").stdout - if "1" in check_dir.strip(): - return shard_id + assert self.check_replication(total_chunks, client, cid, oid) @allure.title("Restore chunk maximum params in network params ") @pytest.fixture @@ -128,11 +97,6 @@ class TestECReplication(ClusterTestBase): raise object_nodes["errors"] return object_nodes - @reporter.step("Get all chunks object ") - def get_all_chunks_object(self, cli: FrostfsCli, cid: str, oid: str, endpoint: str = None) -> list[Chunk]: - chunks = self.get_object_nodes(cli, cid, oid, endpoint) - return [Chunk(**chunk) for chunk in chunks["data_objects"]] - @reporter.step("Get parity chunk ") def get_parity_chunk_object(self, cli: FrostfsCli, cid: str, oid: str, endpoint: str = None) -> Chunk: chunks = self.get_object_nodes(cli, cid, oid, endpoint)["data_objects"] @@ -143,222 +107,173 @@ class TestECReplication(ClusterTestBase): chunks = self.get_object_nodes(cli, cid, oid, endpoint)["data_objects"] return Chunk(**chunks[0]) - @reporter.step("Search node without chunks ") - def search_node_not_chunks(self, chunks: list[Chunk], local_cli: FrostfsCli, endpoint: str = None) -> list[ClusterNode]: - if not endpoint: - self.cluster.default_rpc_endpoint - netmap = parse_netmap_output(local_cli.netmap.snapshot(endpoint, timeout=CLI_DEFAULT_TIMEOUT).stdout) - chunks_node_key = [] - for chunk in chunks: - chunks_node_key.extend(chunk.confirmed_nodes) - for node_info in netmap.copy(): - if node_info.node_id in chunks_node_key and node_info in netmap: - netmap.remove(node_info) - result = [] - for node_info in netmap: - for cluster_node in self.cluster.cluster_nodes: - if node_info.node == cluster_node.host_ip: - result.append(cluster_node) - return result - - @reporter.step("Create container, policy={policy}") - def create_container(self, user_cli: FrostfsCli, endpoint: str, policy: str) -> str: - return ( - user_cli.container.create(endpoint, policy=policy, await_mode=True, timeout=CLI_DEFAULT_TIMEOUT) - .stdout.split(" ")[1] - .strip() - .split("\n")[0] - ) - - @reporter.step("Search node chunk {chunk}") - def get_chunk_node(self, frostfs_cli: FrostfsCli, chunk: Chunk) -> tuple[ClusterNode, NodeNetmapInfo]: - netmap = parse_netmap_output(frostfs_cli.netmap.snapshot(self.cluster.default_rpc_endpoint, timeout=CLI_DEFAULT_TIMEOUT).stdout) - for node_info in netmap: - if node_info.node_id in chunk.confirmed_nodes: - for cluster_node in self.cluster.cluster_nodes: - if cluster_node.host_ip == node_info.node: - return (cluster_node, node_info) - @reporter.step("Check replication chunks={total_chunks} chunks ") - def check_replication(self, total_chunks: int, local_cli: FrostfsCli, cid: str, oid: str) -> bool: - object_nodes_info = local_cli.object.nodes( - self.cluster.default_rpc_endpoint, cid, oid=oid, json=True, timeout=CLI_DEFAULT_TIMEOUT - ).stdout - object_nodes_info = json.loads(object_nodes_info) - return len(object_nodes_info["data_objects"]) == total_chunks + def check_replication(self, total_chunks: int, client: GrpcClientWrapper, cid: str, oid: str) -> bool: + object_nodes_info = client.object.chunks.get_all(self.cluster.default_rpc_endpoint, cid, oid) + return len(object_nodes_info) == total_chunks + + @pytest.fixture() + def include_excluded_nodes(self, cluster_state_controller: ClusterStateController): + yield + + cluster_state_controller.include_all_excluded_nodes() @allure.title("Disable Policer on all nodes") @pytest.fixture() - def disable_policer( - self, - cluster_state_controller: ClusterStateController, - ) -> None: - with reporter.step(f"Disable policer for nodes"): + def disable_policer(self, cluster_state_controller: ClusterStateController) -> None: + with reporter.step("Disable policer for nodes"): cluster_state_controller.manager(ConfigStateManager).set_on_all_nodes( service_type=StorageNode, values={"policer": {"unsafe_disable": True}} ) yield - with reporter.step(f"Enable policer for nodes"): + with reporter.step("Enable policer for nodes"): cluster_state_controller.start_stopped_hosts() cluster_state_controller.manager(ConfigStateManager).revert_all() - + @wait_for_success(300, 15) @reporter.step("Check count nodes chunks") - def wait_sync_count_chunks_nodes(self, grpc_client: CliClientWrapper, cid: str, oid: str, count: int): + def wait_sync_count_chunks_nodes(self, grpc_client: GrpcClientWrapper, cid: str, oid: str, count: int): all_chunks_after_include_node = grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, cid, oid) chunks_nodes = [node for chunk in all_chunks_after_include_node for node in chunk.confirmed_nodes] assert len(chunks_nodes) == count - - @wait_for_success(300, 15) - def include_node_in_map(self, alive_storage_node: StorageNode, include_storage_node: StorageNode): - self.tick_epoch(alive_storage_node, 2) - check_node_in_map(include_storage_node, self.shell, alive_storage_node) @allure.title("Create container with EC policy (size={object_size})") - def test_create_container_with_ec_policy( - self, - default_user: User, - frostfs_cli: FrostfsCli, - object_size: ObjectSize, - rep_count: int, - ) -> None: + def test_create_container_with_ec_policy(self, object_size: ObjectSize, rep_count: int, grpc_client: GrpcClientWrapper) -> None: test_file = generate_file(object_size.value) with reporter.step("Create container."): - cid = self.create_container(frostfs_cli, self.cluster.default_rpc_endpoint, "EC 2.1") + cid = grpc_client.container.create(self.cluster.default_rpc_endpoint, policy="EC 2.1", await_mode=True) with reporter.step("Put object in container."): - oid = put_object(default_user.wallet, test_file, cid, self.shell, self.cluster.default_rpc_endpoint) + oid = grpc_client.object.put(test_file, cid, self.cluster.default_rpc_endpoint) with reporter.step("Check replication chunks."): - assert self.check_replication(rep_count, frostfs_cli, cid, oid) + assert self.check_replication(rep_count, grpc_client, cid, oid) @allure.title("Lose node with chunk data") @pytest.mark.failover def test_lose_node_with_data_chunk( self, - frostfs_cli: FrostfsCli, - default_user: User, + grpc_client: GrpcClientWrapper, simple_object_size: ObjectSize, cluster_state_controller: ClusterStateController, disable_policer: None, ) -> None: with reporter.step("Create container."): - cid = self.create_container(frostfs_cli, self.cluster.default_rpc_endpoint, "EC 3.1") + cid = grpc_client.container.create(self.cluster.default_rpc_endpoint, policy="EC 3.1", await_mode=True) with reporter.step("Put object in container."): test_file = generate_file(simple_object_size.value) - oid = put_object(default_user.wallet, test_file, cid, self.shell, self.cluster.default_rpc_endpoint) + oid = grpc_client.object.put(test_file, cid, self.cluster.default_rpc_endpoint) with reporter.step("Check chunk replication on 4 nodes."): - assert self.check_replication(4, frostfs_cli, cid, oid) + assert self.check_replication(4, grpc_client, cid, oid) with reporter.step("Search node data chunk"): - chunk = self.get_data_chunk_object(frostfs_cli, cid, oid) - chunk_node = self.get_chunk_node(frostfs_cli, chunk)[0] + chunk = grpc_client.object.chunks.get_first_data(self.cluster.default_rpc_endpoint, cid, oid=oid) + chunk_node = grpc_client.object.chunks.get_chunk_node(self.cluster, chunk) with reporter.step("Stop node with data chunk."): - cluster_state_controller.stop_node_host(chunk_node, "hard") + cluster_state_controller.stop_node_host(chunk_node[0], "hard") with reporter.step("Get object"): - node = list(set(self.cluster.cluster_nodes) - {chunk_node})[0] - get_object(default_user.wallet, cid, oid, self.shell, node.storage_node.get_rpc_endpoint()) + node = list(set(self.cluster.cluster_nodes) - {chunk_node[0]})[0] + grpc_client.object.get(cid, oid, node.storage_node.get_rpc_endpoint()) with reporter.step("Start stopped node, and check replication chunks."): - cluster_state_controller.start_node_host(chunk_node) - assert self.check_replication(4, frostfs_cli, cid, oid) + cluster_state_controller.start_node_host(chunk_node[0]) + self.wait_replication(4, grpc_client, cid, oid) @allure.title("Lose node with chunk parity") @pytest.mark.failover def test_lose_node_with_parity_chunk( self, - frostfs_cli: FrostfsCli, - default_user: User, + grpc_client: GrpcClientWrapper, simple_object_size: ObjectSize, cluster_state_controller: ClusterStateController, disable_policer: None, ) -> None: with reporter.step("Create container."): - cid = self.create_container(frostfs_cli, self.cluster.default_rpc_endpoint, "EC 3.1") + cid = grpc_client.container.create(self.cluster.default_rpc_endpoint, policy="EC 3.1", await_mode=True) with reporter.step("Put object in container."): test_file = generate_file(simple_object_size.value) - oid = put_object(default_user.wallet, test_file, cid, self.shell, self.cluster.default_rpc_endpoint) + oid = grpc_client.object.put(test_file, cid, self.cluster.default_rpc_endpoint) with reporter.step("Check chunk replication on 4 nodes."): - assert self.check_replication(4, frostfs_cli, cid, oid) + assert self.check_replication(4, grpc_client, cid, oid) with reporter.step("Search node with parity chunk"): - chunk = self.get_parity_chunk_object(frostfs_cli, cid, oid) - chunk_node = self.get_chunk_node(frostfs_cli, chunk)[0] + chunk = grpc_client.object.chunks.get_parity(self.cluster.default_rpc_endpoint, cid, oid=oid) + chunk_node = grpc_client.object.chunks.get_chunk_node(self.cluster, chunk)[0] with reporter.step("Stop node parity chunk."): cluster_state_controller.stop_node_host(chunk_node, "hard") with reporter.step("Get object, expect success."): node = list(set(self.cluster.cluster_nodes) - {chunk_node})[0] - get_object(default_user.wallet, cid, oid, self.shell, node.storage_node.get_rpc_endpoint()) + grpc_client.object.get(cid, oid, node.storage_node.get_rpc_endpoint()) with reporter.step("Start stoped node, and check replication chunks."): cluster_state_controller.start_node_host(chunk_node) - assert self.check_replication(4, frostfs_cli, cid, oid) + self.wait_replication(4, grpc_client, cid, oid) @allure.title("Lose nodes with chunk data and parity") @pytest.mark.failover def test_lose_nodes_data_chunk_and_parity( self, - frostfs_cli: FrostfsCli, - default_user: User, + grpc_client: GrpcClientWrapper, simple_object_size: ObjectSize, cluster_state_controller: ClusterStateController, disable_policer: None, ) -> None: with reporter.step("Create container."): - cid = self.create_container(frostfs_cli, self.cluster.default_rpc_endpoint, "EC 3.1") + cid = grpc_client.container.create(self.cluster.default_rpc_endpoint, policy="EC 3.1", await_mode=True) with reporter.step("Put object in container."): test_file = generate_file(simple_object_size.value) - oid = put_object(default_user.wallet, test_file, cid, self.shell, self.cluster.default_rpc_endpoint) + oid = grpc_client.object.put(test_file, cid, self.cluster.default_rpc_endpoint) with reporter.step("Check count chunks, expect 4."): - assert self.check_replication(4, frostfs_cli, cid, oid) + assert self.check_replication(4, grpc_client, cid, oid) with reporter.step("Search node data chunk and node parity chunk"): - data_chunk = self.get_data_chunk_object(frostfs_cli, cid, oid) - node_data_chunk = self.get_chunk_node(frostfs_cli, data_chunk)[0] - parity_chunk = self.get_parity_chunk_object(frostfs_cli, cid, oid) - node_parity_chunk = self.get_chunk_node(frostfs_cli, parity_chunk)[0] + data_chunk = grpc_client.object.chunks.get_first_data(self.cluster.default_rpc_endpoint, cid, oid=oid) + data_chunk_node = grpc_client.object.chunks.get_chunk_node(self.cluster, data_chunk)[0] + parity_chunk = grpc_client.object.chunks.get_parity(self.cluster.default_rpc_endpoint, cid, oid=oid) + parity_chunk_node = grpc_client.object.chunks.get_chunk_node(self.cluster, parity_chunk)[0] with reporter.step("Stop node with data chunk."): - cluster_state_controller.stop_node_host(node_data_chunk, "hard") + cluster_state_controller.stop_node_host(data_chunk_node, "hard") with reporter.step("Get object"): - node = list(set(self.cluster.cluster_nodes) - {node_data_chunk, node_parity_chunk})[0] - get_object(default_user.wallet, cid, oid, self.shell, node.storage_node.get_rpc_endpoint()) + node = list(set(self.cluster.cluster_nodes) - {data_chunk_node, parity_chunk_node})[0] + grpc_client.object.get(cid, oid, node.storage_node.get_rpc_endpoint()) with reporter.step("Start stopped host and check chunks."): - cluster_state_controller.start_node_host(node_data_chunk) - assert self.check_replication(4, frostfs_cli, cid, oid) + cluster_state_controller.start_node_host(data_chunk_node) + self.wait_replication(4, grpc_client, cid, oid) with reporter.step("Stop node with parity chunk and one all node."): - cluster_state_controller.stop_node_host(node_data_chunk, "hard") - cluster_state_controller.stop_node_host(node_parity_chunk, "hard") + cluster_state_controller.stop_node_host(data_chunk_node, "hard") + cluster_state_controller.stop_node_host(parity_chunk_node, "hard") with reporter.step("Get object, expect error."): with pytest.raises(RuntimeError): - get_object(default_user.wallet, cid, oid, self.shell, node.storage_node.get_rpc_endpoint()) + grpc_client.object.get(cid, oid, node.storage_node.get_rpc_endpoint()) with reporter.step("Start stopped nodes and check replication chunk."): cluster_state_controller.start_stopped_hosts() - assert self.check_replication(4, frostfs_cli, cid, oid) + self.wait_replication(4, grpc_client, cid, oid) @allure.title("Policer work with chunk") @pytest.mark.failover def test_work_policer_with_nodes( self, simple_object_size: ObjectSize, - frostfs_cli: FrostfsCli, - grpc_client: CliClientWrapper, + grpc_client: GrpcClientWrapper, + cluster_state_controller: ClusterStateController, + include_excluded_nodes: None, ) -> None: with reporter.step("Create container."): cid = grpc_client.container.create(self.cluster.default_rpc_endpoint, policy="EC 2.1", await_mode=True) @@ -368,25 +283,25 @@ class TestECReplication(ClusterTestBase): oid = grpc_client.object.put(test_file, cid, self.cluster.default_rpc_endpoint) with reporter.step("Check count chunks nodes on 3."): - assert self.check_replication(3, frostfs_cli, cid, oid) + assert self.check_replication(3, grpc_client, cid, oid) with reporter.step("Search node with chunk."): data_chunk = grpc_client.object.chunks.get_first_data(self.cluster.default_rpc_endpoint, cid, oid=oid) node_data_chunk = grpc_client.object.chunks.get_chunk_node(self.cluster, data_chunk)[0] first_all_chunks = grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, cid, oid) - + with reporter.step("Remove chunk node from network map"): - remove_nodes_from_map_morph( - shell=self.shell, cluster=self.cluster, remove_nodes=[node_data_chunk.storage_node] - ) + cluster_state_controller.remove_node_from_netmap([node_data_chunk.storage_node]) with reporter.step("Tick epoch."): alive_node = list(set(self.cluster.cluster_nodes) - {node_data_chunk})[0] self.tick_epoch(alive_node.storage_node, 2) with reporter.step("Wait replication chunk with different node."): - node = grpc_client.object.chunks.search_node_without_chunks(first_all_chunks, self.cluster, alive_node.storage_node.get_rpc_endpoint())[0] - self.wait_replication(3, frostfs_cli, cid, oid) + node = grpc_client.object.chunks.search_node_without_chunks( + first_all_chunks, self.cluster, alive_node.storage_node.get_rpc_endpoint() + )[0] + self.wait_replication(3, grpc_client, cid, oid) with reporter.step("Get new chunks"): second_all_chunks = grpc_client.object.chunks.get_all(node.storage_node.get_rpc_endpoint(), cid, oid) @@ -395,328 +310,262 @@ class TestECReplication(ClusterTestBase): assert [chunk for chunk in second_all_chunks if data_chunk.object_id == chunk.object_id] with reporter.step("Include node in netmap"): - include_node_to_network_map( - node_to_include=node_data_chunk.storage_node, - alive_node=alive_node.storage_node, - shell=self.shell, - cluster=self.cluster) - + cluster_state_controller.include_node_to_netmap(node_data_chunk.storage_node, alive_node.storage_node) + self.wait_sync_count_chunks_nodes(grpc_client, cid, oid, 3) @allure.title("EC X.Y combinations (nodes={node_count},policy={ec_policy},size={object_size})") def test_create_container_with_difference_count_nodes( - self, - node_count: int, - ec_policy: str, - object_size: ObjectSize, - default_user: User, - frostfs_cli: FrostfsCli, + self, node_count: int, ec_policy: str, object_size: ObjectSize, grpc_client: GrpcClientWrapper ) -> None: with reporter.step("Create container."): expected_chunks = int(ec_policy.split(" ")[1].split(".")[0]) + int(ec_policy.split(" ")[1].split(".")[1]) if "complex" in object_size.name: expected_chunks *= 4 - cid = self.create_container(frostfs_cli, self.cluster.default_rpc_endpoint, ec_policy) + cid = grpc_client.container.create(self.cluster.default_rpc_endpoint, policy=ec_policy, await_mode=True) with reporter.step("Put object in container."): test_file = generate_file(object_size.value) - oid = put_object(default_user.wallet, test_file, cid, self.shell, self.cluster.default_rpc_endpoint) + oid = grpc_client.object.put(test_file, cid, self.cluster.default_rpc_endpoint) with reporter.step("Check count object chunks."): - chunks = self.get_all_chunks_object(frostfs_cli, cid, oid, self.cluster.default_rpc_endpoint) + chunks = grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, cid, oid) assert len(chunks) == expected_chunks with reporter.step("get object and check hash."): - file_with_node = get_object(default_user.wallet, cid, oid, self.shell, self.cluster.default_rpc_endpoint) + file_with_node = grpc_client.object.get(cid, oid, self.cluster.default_rpc_endpoint) assert get_file_hash(test_file) == get_file_hash(file_with_node) @allure.title("Request PUT with copies_number flag") - def test_put_object_with_copies_number( - self, - frostfs_cli: FrostfsCli, - default_user: User, - simple_object_size: ObjectSize, - ) -> None: + def test_put_object_with_copies_number(self, grpc_client: GrpcClientWrapper, simple_object_size: ObjectSize) -> None: with reporter.step("Create container."): - cid = self.create_container(frostfs_cli, self.cluster.default_rpc_endpoint, "EC 2.1") + cid = grpc_client.container.create(self.cluster.default_rpc_endpoint, policy="EC 2.1", await_mode=True) with reporter.step("Put object in container with copies number = 1"): test_file = generate_file(simple_object_size.value) - oid = put_object(default_user.wallet, test_file, cid, self.shell, self.cluster.default_rpc_endpoint, copies_number=1) + oid = grpc_client.object.put(test_file, cid, self.cluster.default_rpc_endpoint, copies_number=1) with reporter.step("Check that count chunks > 1."): - chunks = self.get_all_chunks_object(frostfs_cli, cid, oid, self.cluster.default_rpc_endpoint) + chunks = grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, cid, oid) assert len(chunks) > 1 @allure.title("Request PUT and 1 node off") @pytest.mark.failover def test_put_object_with_off_cnr_node( - self, - frostfs_cli: FrostfsCli, - cluster_state_controller: ClusterStateController, - default_user: User, - simple_object_size: ObjectSize, + self, grpc_client: GrpcClientWrapper, cluster_state_controller: ClusterStateController, simple_object_size: ObjectSize ) -> None: with reporter.step("Create container."): - cid = self.create_container(frostfs_cli, self.cluster.default_rpc_endpoint, "EC 3.1") + cid = grpc_client.container.create(self.cluster.default_rpc_endpoint, policy="EC 3.1", await_mode=True) with reporter.step("Stop one node in container nodes"): cluster_state_controller.stop_node_host(self.cluster.cluster_nodes[1], "hard") with reporter.step("Put object in container, expect success for EC container."): test_file = generate_file(simple_object_size.value) - put_object(default_user.wallet, test_file, cid, self.shell, self.cluster.default_rpc_endpoint) + grpc_client.object.put(test_file, cid, self.cluster.default_rpc_endpoint, copies_number=1) @allure.title("Request PUT (size={object_size})") - def test_put_object_with_ec_cnr( - self, - frostfs_cli: FrostfsCli, - default_user: User, - object_size: ObjectSize, - ) -> None: + def test_put_object_with_ec_cnr(self, grpc_client: GrpcClientWrapper, object_size: ObjectSize) -> None: with reporter.step("Create container."): - cid = self.create_container(frostfs_cli, self.cluster.default_rpc_endpoint, "EC 2.1") + cid = grpc_client.container.create(self.cluster.default_rpc_endpoint, policy="EC 2.1", await_mode=True) - with reporter.step("Put object in container with --prepare-locally."): + with reporter.step("Put object in container"): test_file = generate_file(object_size.value) - oid = put_object(default_user.wallet, test_file, cid, self.shell, self.cluster.default_rpc_endpoint) + oid = grpc_client.object.put(test_file, cid, self.cluster.default_rpc_endpoint) with reporter.step("Get chunks object."): - chunks = self.get_all_chunks_object(frostfs_cli, cid, oid, self.cluster.default_rpc_endpoint) + chunks = grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, cid, oid) with reporter.step("Check header chunks object"): for chunk in chunks: - chunk_head = frostfs_cli.object.head(self.cluster.default_rpc_endpoint, cid, chunk.object_id, raw=True).stdout + chunk_head = grpc_client.object.head( + cid, chunk.object_id, self.cluster.default_rpc_endpoint, is_raw=True, json_output=False + ).stdout assert "EC header:" in chunk_head - @allure.title("Request PUT with copies number") - def test_put_object_with_copies_number( - self, - frostfs_cli: FrostfsCli, - default_user: User, - simple_object_size: ObjectSize, - ) -> None: - with reporter.step("Create container."): - cid = self.create_container(frostfs_cli, self.cluster.default_rpc_endpoint, "EC 2.1") - - with reporter.step("Put object in container with --copies-number=1."): - test_file = generate_file(simple_object_size.value) - oid = put_object(default_user.wallet, test_file, cid, self.shell, self.cluster.default_rpc_endpoint, copies_number=1) - - with reporter.step("Check len chunks object."): - chunks = self.get_all_chunks_object(frostfs_cli, cid, oid, self.cluster.default_rpc_endpoint) - assert len(chunks) > 1 - @allure.title("Request GET (size={object_size})") - def test_get_object_in_ec_cnr( - self, - default_user: User, - frostfs_cli: FrostfsCli, - object_size: ObjectSize, - ) -> None: + def test_get_object_in_ec_cnr(self, grpc_client: GrpcClientWrapper, object_size: ObjectSize) -> None: with reporter.step("Create container."): - cid = self.create_container(frostfs_cli, self.cluster.default_rpc_endpoint, "EC 2.1 CBF 1") + cid = grpc_client.container.create(self.cluster.default_rpc_endpoint, policy="EC 2.1 CBF 1", await_mode=True) with reporter.step("Put object in container"): test_file = generate_file(object_size.value) hash_origin_file = get_file_hash(test_file) - oid = put_object(default_user.wallet, test_file, cid, self.shell, self.cluster.default_rpc_endpoint) + oid = grpc_client.object.put(test_file, cid, self.cluster.default_rpc_endpoint) with reporter.step("Get id all chunks."): - chunks = self.get_all_chunks_object(frostfs_cli, cid, oid, self.cluster.default_rpc_endpoint) + chunks = grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, cid, oid) with reporter.step("Search chunk node and not chunks node."): - chunk_node = self.get_chunk_node(frostfs_cli, chunks[0]) - not_chunk_node = self.search_node_not_chunks(chunks, frostfs_cli, self.cluster.default_rpc_endpoint) + chunk_node = grpc_client.object.chunks.get_chunk_node(self.cluster, chunks[0])[0] + not_chunk_node = grpc_client.object.chunks.search_node_without_chunks(chunks, self.cluster, self.cluster.default_rpc_endpoint)[ + 0 + ] with reporter.step("GET request with chunk node, expect success"): - file_one = get_object(default_user.wallet, cid, oid, self.shell, chunk_node[0].storage_node.get_rpc_endpoint()) + file_one = grpc_client.object.get(cid, oid, chunk_node.storage_node.get_rpc_endpoint()) hash_file_one = get_file_hash(file_one) assert hash_file_one == hash_origin_file with reporter.step("Get request with not chunk node"): - file_two = get_object(default_user.wallet, cid, oid, self.shell, not_chunk_node[0].storage_node.get_rpc_endpoint()) + file_two = grpc_client.object.get(cid, oid, not_chunk_node.storage_node.get_rpc_endpoint()) hash_file_two = get_file_hash(file_two) assert hash_file_two == hash_file_one == hash_origin_file @allure.title("Request SEARCH with flags 'root' (size={object_size})") - def test_search_object_in_ec_cnr_root_flags( - self, - default_user: User, - frostfs_cli: FrostfsCli, - object_size: ObjectSize, - ) -> None: + def test_search_object_in_ec_cnr_root_flags(self, grpc_client: GrpcClientWrapper, object_size: ObjectSize) -> None: with reporter.step("Create container."): - cid = self.create_container(frostfs_cli, self.cluster.default_rpc_endpoint, "EC 2.1") + cid = grpc_client.container.create(self.cluster.default_rpc_endpoint, policy="EC 2.1", await_mode=True) with reporter.step("Put object in container"): test_file = generate_file(object_size.value) - oid = put_object(default_user.wallet, test_file, cid, self.shell, self.cluster.default_rpc_endpoint) + oid = grpc_client.object.put(test_file, cid, self.cluster.default_rpc_endpoint) with reporter.step("Search operation with --root flags"): - search_output = frostfs_cli.object.search( - self.cluster.default_rpc_endpoint, cid, root=True, timeout=CLI_DEFAULT_TIMEOUT - ).stdout.split("\n")[1:] + search_output = grpc_client.object.search(cid, self.cluster.default_rpc_endpoint, root=True) assert search_output[0] == oid @allure.title("Request SEARCH check valid chunk id (size={object_size})") - def test_search_object_in_ec_cnr_chunk_id( - self, - default_user: User, - frostfs_cli: FrostfsCli, - object_size: ObjectSize, - ) -> None: + def test_search_object_in_ec_cnr_chunk_id(self, grpc_client: GrpcClientWrapper, object_size: ObjectSize) -> None: with reporter.step("Create container."): - cid = self.create_container(frostfs_cli, self.cluster.default_rpc_endpoint, "EC 2.1") + cid = grpc_client.container.create(self.cluster.default_rpc_endpoint, policy="EC 2.1", await_mode=True) with reporter.step("Put object in container"): test_file = generate_file(object_size.value) - oid = put_object(default_user.wallet, test_file, cid, self.shell, self.cluster.default_rpc_endpoint) + oid = grpc_client.object.put(test_file, cid, self.cluster.default_rpc_endpoint) with reporter.step("Search operation object"): - search_output = frostfs_cli.object.search(self.cluster.default_rpc_endpoint, cid, timeout=CLI_DEFAULT_TIMEOUT).stdout - chunks = self.get_all_chunks_object(frostfs_cli, cid, oid, self.cluster.default_rpc_endpoint) + search_output = grpc_client.object.search(cid, self.cluster.default_rpc_endpoint) + chunks = grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, cid, oid) for chunk in chunks: assert chunk.object_id in search_output @allure.title("Request SEARCH check no chunk index info (size={object_size})") - def test_search_object_in_ec_cnr( - self, - default_user: User, - frostfs_cli: FrostfsCli, - object_size: ObjectSize, - ) -> None: + def test_search_object_in_ec_cnr(self, grpc_client: GrpcClientWrapper, object_size: ObjectSize) -> None: with reporter.step("Create container."): - cid = self.create_container(frostfs_cli, self.cluster.default_rpc_endpoint, "EC 2.1") + cid = grpc_client.container.create(self.cluster.default_rpc_endpoint, policy="EC 2.1", await_mode=True) with reporter.step("Put object in container"): test_file = generate_file(object_size.value) - oid = put_object(default_user.wallet, test_file, cid, self.shell, self.cluster.default_rpc_endpoint) + oid = grpc_client.object.put(test_file, cid, self.cluster.default_rpc_endpoint) with reporter.step("Search operation all chunk"): - chunks = self.get_all_chunks_object(frostfs_cli, cid, oid, self.cluster.default_rpc_endpoint) + chunks = grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, cid, oid) for chunk in chunks: - chunk_search = frostfs_cli.object.search( - self.cluster.default_rpc_endpoint, cid, oid=chunk.object_id, timeout=CLI_DEFAULT_TIMEOUT - ).stdout + chunk_search = grpc_client.object.search(cid, self.cluster.default_rpc_endpoint, oid=chunk.object_id) assert "index" not in chunk_search @allure.title("Request DELETE (size={object_size})") @pytest.mark.failover def test_delete_object_in_ec_cnr( - self, - default_user: User, - frostfs_cli: FrostfsCli, - object_size: ObjectSize, - cluster_state_controller: ClusterStateController, + self, grpc_client: GrpcClientWrapper, object_size: ObjectSize, cluster_state_controller: ClusterStateController ) -> None: with reporter.step("Create container."): - cid = self.create_container(frostfs_cli, self.cluster.default_rpc_endpoint, "EC 2.1") + cid = grpc_client.container.create(self.cluster.default_rpc_endpoint, policy="EC 2.1", await_mode=True) with reporter.step("Put object in container."): test_file = generate_file(object_size.value) - oid = put_object(default_user.wallet, test_file, cid, self.shell, self.cluster.default_rpc_endpoint) + oid = grpc_client.object.put(test_file, cid, self.cluster.default_rpc_endpoint) with reporter.step("Check object chunks nodes."): - chunks_object = self.get_all_chunks_object(frostfs_cli, cid, oid, self.cluster.default_rpc_endpoint) + chunks = grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, cid, oid) replication_count = 3 if object_size.name == "simple" else 3 * 4 - assert len(chunks_object) == replication_count + assert len(chunks) == replication_count with reporter.step("Delete object"): - frostfs_cli.object.delete(self.cluster.default_rpc_endpoint, cid, oid, timeout=CLI_DEFAULT_TIMEOUT) + grpc_client.object.delete(cid, oid, self.cluster.default_rpc_endpoint) with reporter.step("Check that delete all chunks."): - for chunk in chunks_object: + for chunk in chunks: with pytest.raises(RuntimeError, match="object already removed"): - frostfs_cli.object.head(self.cluster.default_rpc_endpoint, cid, chunk.object_id, timeout=CLI_DEFAULT_TIMEOUT) + grpc_client.object.head(cid, chunk.object_id, self.cluster.default_rpc_endpoint) with reporter.step("Put second object."): - oid_second = put_object(default_user.wallet, test_file, cid, self.shell, self.cluster.default_rpc_endpoint) + oid_second = grpc_client.object.put(test_file, cid, self.cluster.default_rpc_endpoint) with reporter.step("Check second object chunks nodes."): - chunks_second_object = self.get_all_chunks_object(frostfs_cli, cid, oid_second, self.cluster.default_rpc_endpoint) + chunks_second_object = grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, cid, oid_second) assert len(chunks_second_object) == replication_count with reporter.step("Stop nodes with chunk."): - chunk_node = self.get_chunk_node(frostfs_cli, chunks_second_object[0]) + chunk_node = grpc_client.object.chunks.get_chunk_node(self.cluster, chunks_second_object[0]) cluster_state_controller.stop_node_host(chunk_node[0], "hard") with reporter.step("Delete second object"): cluster_nodes = list(set(self.cluster.cluster_nodes) - {chunk_node[0]}) - frostfs_cli.object.delete(cluster_nodes[0].storage_node.get_rpc_endpoint(), cid, oid_second, timeout=CLI_DEFAULT_TIMEOUT) + grpc_client.object.delete(cid, oid_second, cluster_nodes[0].storage_node.get_rpc_endpoint()) with reporter.step("Check that delete all chunk second object."): for chunk in chunks_second_object: - with pytest.raises(RuntimeError, match="object already removed"): - frostfs_cli.object.head(self.cluster.default_rpc_endpoint, cid, chunk.object_id, timeout=CLI_DEFAULT_TIMEOUT) + with pytest.raises(RuntimeError, match="object already removed|object not found"): + grpc_client.object.head(cid, chunk.object_id, cluster_nodes[0].storage_node.get_rpc_endpoint()) @allure.title("Request LOCK (size={object_size})") @pytest.mark.failover def test_lock_object_in_ec_cnr( self, + grpc_client: GrpcClientWrapper, frostfs_cli: FrostfsCli, object_size: ObjectSize, - default_user: User, + cluster_state_controller: ClusterStateController, + include_excluded_nodes: None, ) -> None: with reporter.step("Create container."): - cid = self.create_container(frostfs_cli, self.cluster.default_rpc_endpoint, "EC 2.1") + cid = grpc_client.container.create(self.cluster.default_rpc_endpoint, policy="EC 2.1", await_mode=True) with reporter.step("Put object in container."): test_file = generate_file(object_size.value) - oid = put_object(default_user.wallet, test_file, cid, self.shell, self.cluster.default_rpc_endpoint) + oid = grpc_client.object.put(test_file, cid, self.cluster.default_rpc_endpoint) with reporter.step("Check object chunks nodes."): - chunks_object = self.get_all_chunks_object(frostfs_cli, cid, oid, self.cluster.default_rpc_endpoint) + chunks_object = grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, cid, oid) replication_count = 3 if object_size.name == "simple" else 3 * 4 assert len(chunks_object) == replication_count with reporter.step("Put LOCK in object."): + # TODO Rework for the grpc_client when the netmap methods are implemented epoch = frostfs_cli.netmap.epoch(self.cluster.default_rpc_endpoint, timeout=CLI_DEFAULT_TIMEOUT).stdout.strip() - frostfs_cli.object.lock( - self.cluster.default_rpc_endpoint, cid, oid, timeout=CLI_DEFAULT_TIMEOUT, expire_at=(int(epoch) + 5) - ).stdout + grpc_client.object.lock(cid, oid, self.cluster.default_rpc_endpoint, expire_at=(int(epoch) + 5)) with reporter.step("Check don`t delete chunk"): for chunk in chunks_object: with pytest.raises(RuntimeError, match="Lock EC chunk failed"): - frostfs_cli.object.delete(self.cluster.default_rpc_endpoint, cid, chunk.object_id, timeout=CLI_DEFAULT_TIMEOUT) + grpc_client.object.delete(cid, chunk.object_id, self.cluster.default_rpc_endpoint) with reporter.step("Check enable LOCK object"): with pytest.raises(RuntimeError, match="object is locked"): - frostfs_cli.object.delete(self.cluster.default_rpc_endpoint, cid, oid, timeout=CLI_DEFAULT_TIMEOUT) + grpc_client.object.delete(cid, oid, self.cluster.default_rpc_endpoint) with reporter.step("Remove node in netmap."): - chunk_node = self.get_chunk_node(frostfs_cli, chunks_object[0]) - alive_node = list(set(self.cluster.cluster_nodes) - {chunk_node[0]})[0] - remove_nodes_from_map_morph(self.shell, self.cluster, [chunk_node[0].storage_node], alive_node.storage_node) + chunk_node = grpc_client.object.chunks.get_chunk_node(self.cluster, chunks_object[0])[0] + alive_node = list(set(self.cluster.cluster_nodes) - {chunk_node})[0] + cluster_state_controller.remove_node_from_netmap([chunk_node.storage_node]) with reporter.step("Check don`t delete chunk."): for chunk in chunks_object: with pytest.raises(RuntimeError, match="Lock EC chunk failed|object not found"): - frostfs_cli.object.delete(self.cluster.default_rpc_endpoint, cid, chunk.object_id, timeout=CLI_DEFAULT_TIMEOUT) + grpc_client.object.delete(cid, chunk.object_id, alive_node.storage_node.get_rpc_endpoint()) with reporter.step("Check enable LOCK object"): with pytest.raises(RuntimeError, match="object is locked"): - frostfs_cli.object.delete(self.cluster.default_rpc_endpoint, cid, oid, timeout=CLI_DEFAULT_TIMEOUT) + grpc_client.object.delete(cid, oid, alive_node.storage_node.get_rpc_endpoint()) with reporter.step("Include node in netmap"): - storage_node_set_status(chunk_node[0].storage_node, "online") - self.include_node_in_map(alive_node.storage_node, chunk_node[0].storage_node) + cluster_state_controller.include_node_to_netmap(chunk_node.storage_node, alive_node.storage_node) @allure.title("Output MaxEC* params in frostf-scli (type={type_shards})") @pytest.mark.parametrize("type_shards", ["Maximum count of data shards", "Maximum count of parity shards"]) def test_maxec_info_with_output_cli(self, frostfs_cli: FrostfsCli, type_shards: str) -> None: with reporter.step("Get and check params"): + # TODO Rework for the grpc_client when the netmap methods are implemented net_info = frostfs_cli.netmap.netinfo(self.cluster.default_rpc_endpoint).stdout assert type_shards in net_info @allure.title("Change MaxEC*Count params") def test_change_max_data_shards_params( - self, - frostfs_remote_adm: FrostfsAdm, - frostfs_cli: FrostfsCli, - restore_network_config: None, + self, frostfs_remote_adm: FrostfsAdm, frostfs_cli: FrostfsCli, restore_network_config: None ) -> None: + # TODO Rework for the grpc_client when the netmap methods are implemented with reporter.step("Get now params MaxECDataCount and MaxECParityCount"): node_netinfo = NetmapParser.netinfo( frostfs_cli.netmap.netinfo(self.cluster.default_rpc_endpoint, timeout=CLI_DEFAULT_TIMEOUT).stdout @@ -737,73 +586,50 @@ class TestECReplication(ClusterTestBase): ) @allure.title("Check maximum count data and parity shards") - def test_change_over_max_parity_shards_params( - self, - frostfs_remote_adm: FrostfsAdm, - ) -> None: + def test_change_over_max_parity_shards_params(self, frostfs_remote_adm: FrostfsAdm) -> None: with reporter.step("Change over maximum params shards count."): with pytest.raises(RuntimeError, match="MaxECDataCount and MaxECParityCount must be <= 256"): frostfs_remote_adm.morph.set_config(set_key_value='"MaxECDataCount=130" "MaxECParityCount=130"') @allure.title("Create container with EC policy and SELECT (SELECT={select})") @pytest.mark.parametrize("select", [2, 4]) - def test_create_container_with_select( - self, - select: int, - frostfs_cli: FrostfsCli, - ) -> None: + def test_create_container_with_select(self, select: int, grpc_client: GrpcClientWrapper) -> None: with reporter.step("Create container"): policy = f"EC 1.1 CBF 1 SELECT {select} FROM *" - cid = self.create_container(frostfs_cli, self.cluster.default_rpc_endpoint, policy) + cid = grpc_client.container.create(self.cluster.default_rpc_endpoint, policy=policy, await_mode=True) with reporter.step("Check container nodes decomposed"): - container_nodes = ( - frostfs_cli.container.search_node(self.cluster.default_rpc_endpoint, cid, timeout=CLI_DEFAULT_TIMEOUT) - .stdout.strip() - .split("\n")[1:] - ) + container_nodes = grpc_client.container.nodes(self.cluster.default_rpc_endpoint, cid, self.cluster) + assert len(container_nodes) == select @allure.title("Create container with EC policy and CBF (CBF={cbf})") @pytest.mark.parametrize("cbf, expected_nodes", [(1, 2), (2, 4)]) - def test_create_container_with_cbf( - self, - cbf: int, - expected_nodes: int, - frostfs_cli: FrostfsCli, - ) -> None: + def test_create_container_with_cbf(self, cbf: int, expected_nodes: int, grpc_client: GrpcClientWrapper) -> None: with reporter.step("Create container."): policy = f"EC 1.1 CBF {cbf}" - cid = self.create_container(frostfs_cli, self.cluster.default_rpc_endpoint, policy) + cid = grpc_client.container.create(self.cluster.default_rpc_endpoint, policy=policy, await_mode=True) with reporter.step("Check expected container nodes."): - container_nodes = ( - frostfs_cli.container.search_node(self.cluster.default_rpc_endpoint, cid, timeout=CLI_DEFAULT_TIMEOUT) - .stdout.strip() - .split("\n")[1:] - ) + container_nodes = grpc_client.container.nodes(self.cluster.default_rpc_endpoint, cid, self.cluster) assert len(container_nodes) == expected_nodes @allure.title("Create container with EC policy and FILTER") - def test_create_container_with_filter( - self, - default_user: User, - frostfs_cli: FrostfsCli, - simple_object_size: ObjectSize, - ) -> None: + def test_create_container_with_filter(self, grpc_client: GrpcClientWrapper, simple_object_size: ObjectSize) -> None: with reporter.step("Create Container."): policy = "EC 1.1 IN RUS SELECT 2 FROM RU AS RUS FILTER Country EQ Russia AS RU" - cid = self.create_container(frostfs_cli, self.cluster.default_rpc_endpoint, policy) + cid = grpc_client.container.create(self.cluster.default_rpc_endpoint, policy=policy, await_mode=True) with reporter.step("Put object in container."): test_file = generate_file(simple_object_size.value) - oid = put_object(default_user.wallet, test_file, cid, self.shell, self.cluster.default_rpc_endpoint) + oid = grpc_client.object.put(test_file, cid, self.cluster.default_rpc_endpoint) with reporter.step("Check object is decomposed exclusively on Russian nodes"): - data_chunk = self.get_data_chunk_object(frostfs_cli, cid, oid, self.cluster.default_rpc_endpoint) - parity_chunk = self.get_parity_chunk_object(frostfs_cli, cid, oid, self.cluster.default_rpc_endpoint) - node_data_chunk = self.get_chunk_node(frostfs_cli, data_chunk) - node_parity_chunk = self.get_chunk_node(frostfs_cli, parity_chunk) + data_chunk = grpc_client.object.chunks.get_first_data(self.cluster.default_rpc_endpoint, cid, oid=oid) + parity_chunk = grpc_client.object.chunks.get_parity(self.cluster.default_rpc_endpoint, cid, oid=oid) + node_data_chunk = grpc_client.object.chunks.get_chunk_node(self.cluster, data_chunk) + node_parity_chunk = grpc_client.object.chunks.get_chunk_node(self.cluster, parity_chunk) + for node in [node_data_chunk[1], node_parity_chunk[1]]: assert "Russia" in node.country @@ -812,53 +638,47 @@ class TestECReplication(ClusterTestBase): def test_evacuation_data_shard( self, restore_nodes_shards_mode: None, - default_user: User, frostfs_cli: FrostfsCli, + grpc_client: GrpcClientWrapper, max_object_size: int, type: str, get_chunk, ) -> None: with reporter.step("Create container."): - cid = self.create_container(frostfs_cli, self.cluster.default_rpc_endpoint, "EC 1.1 CBF 1") + cid = grpc_client.container.create(self.cluster.default_rpc_endpoint, policy="EC 1.1 CBF 1", await_mode=True) with reporter.step("Put object in container."): test_file = generate_file(max_object_size - 1000) - oid = put_object(default_user.wallet, test_file, cid, self.shell, self.cluster.default_rpc_endpoint) + oid = grpc_client.object.put(test_file, cid, self.cluster.default_rpc_endpoint) with reporter.step("Get object chunks."): chunk = get_chunk(self, frostfs_cli, cid, oid, self.cluster.default_rpc_endpoint) - chunk_node = self.get_chunk_node(frostfs_cli, chunk) - frostfs_node_cli = self.get_node_cli( - chunk_node[0], - config=chunk_node[0].storage_node.get_remote_wallet_config_path(), - ) + chunk_node = grpc_client.object.chunks.get_chunk_node(self.cluster, chunk) + frostfs_node_cli = self.get_node_cli(chunk_node[0], config=chunk_node[0].storage_node.get_remote_wallet_config_path()) with reporter.step("Search shards chunk"): - shard_id = self.get_shard_chunk(chunk_node[0], chunk) + time.sleep(datetime_utils.parse_time(MORPH_BLOCK_TIME) * 2) + shard_id = grpc_client.object.chunks.get_shard_chunk(chunk_node[0], chunk) with reporter.step("Enable evacuation for shard"): frostfs_node_cli.shards.set_mode(chunk_node[0].storage_node.get_control_endpoint(), mode="read-only", id=shard_id) frostfs_node_cli.shards.evacuation_start(chunk_node[0].storage_node.get_control_endpoint(), shard_id, await_mode=True) with reporter.step("Get object after evacuation shard"): - get_object(default_user.wallet, cid, oid, self.shell, self.cluster.default_rpc_endpoint) + grpc_client.object.get(cid, oid, self.cluster.default_rpc_endpoint) @allure.title("[NEGATIVE] Don`t create more 1 EC policy") - def test_more_one_ec_policy( - self, - frostfs_cli: FrostfsCli, - ) -> None: + def test_more_one_ec_policy(self, grpc_client: GrpcClientWrapper) -> None: with reporter.step("Create container with policy - 'EC 2.1 EC 1.1'"): with pytest.raises(RuntimeError, match="can't parse placement policy"): - self.create_container(frostfs_cli, self.cluster.default_rpc_endpoint, "EC 2.1 EC 1.1 CBF 1 SELECT 4 FROM *") + grpc_client.container.create( + self.cluster.default_rpc_endpoint, policy="EC 2.1 EC 1.1 CBF 1 SELECT 4 FROM *", await_mode=True + ) @allure.title("Create bucket with EC policy (s3_client={s3_client})") @pytest.mark.parametrize("s3_policy, s3_client", [("pytest_tests/resources/files/policy.json", AwsCliClient)], indirect=True) def test_create_bucket_with_ec_location( - self, - s3_client: S3ClientWrapper, - bucket_container_resolver: BucketContainerResolver, - frostfs_cli: FrostfsCli, + self, s3_client: S3ClientWrapper, bucket_container_resolver: BucketContainerResolver, grpc_client: GrpcClientWrapper ) -> None: with reporter.step("Create bucket with EC location constrain"): bucket = s3_client.create_bucket(location_constraint="ec3.1") @@ -867,9 +687,7 @@ class TestECReplication(ClusterTestBase): cid = bucket_container_resolver.resolve(self.cluster.cluster_nodes[0], bucket) with reporter.step("Validate container policy"): - container = frostfs_cli.container.get( - self.cluster.default_rpc_endpoint, cid, json_mode=True, timeout=CLI_DEFAULT_TIMEOUT - ).stdout + container = grpc_client.container.get(self.cluster.default_rpc_endpoint, cid, json_mode=True, timeout=CLI_DEFAULT_TIMEOUT) assert container @allure.title("Bucket object count chunks (s3_client={s3_client}, size={object_size})") @@ -878,7 +696,7 @@ class TestECReplication(ClusterTestBase): self, s3_client: S3ClientWrapper, bucket_container_resolver: BucketContainerResolver, - frostfs_cli: FrostfsCli, + grpc_client: GrpcClientWrapper, object_size: ObjectSize, ) -> None: with reporter.step("Create bucket with EC location constrain"): @@ -895,34 +713,28 @@ class TestECReplication(ClusterTestBase): with reporter.step("Watch replication count chunks"): cid = bucket_container_resolver.resolve(self.cluster.cluster_nodes[0], bucket) - chunks = self.get_all_chunks_object(frostfs_cli, cid, bucket_object, self.cluster.default_rpc_endpoint) + chunks = grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, cid, bucket_object) expect_chunks = 4 if object_size.name == "simple" else 16 assert len(chunks) == expect_chunks @allure.title("Replication chunk after drop (size={object_size})") - def test_drop_chunk_and_replication( - self, - frostfs_cli: FrostfsCli, - default_user: User, - object_size: ObjectSize, - rep_count: int, - ) -> None: + def test_drop_chunk_and_replication(self, grpc_client: GrpcClientWrapper, object_size: ObjectSize, rep_count: int) -> None: with reporter.step("Create container"): - cid = self.create_container(frostfs_cli, self.cluster.default_rpc_endpoint, "EC 2.1 CBF 1") + cid = grpc_client.container.create(self.cluster.default_rpc_endpoint, policy="EC 2.1 CBF 1", await_mode=True) with reporter.step("Put object"): test_file = generate_file(object_size.value) - oid = put_object(default_user.wallet, test_file, cid, self.shell, self.cluster.default_rpc_endpoint) + oid = grpc_client.object.put(test_file, cid, self.cluster.default_rpc_endpoint) with reporter.step("Get all chunks"): - chunk = self.get_data_chunk_object(frostfs_cli, cid, oid, self.cluster.default_rpc_endpoint) + data_chunk = grpc_client.object.chunks.get_first_data(self.cluster.default_rpc_endpoint, cid, oid=oid) with reporter.step("Search chunk node"): - chunk_node = self.get_chunk_node(frostfs_cli, chunk) + chunk_node = grpc_client.object.chunks.get_chunk_node(self.cluster, data_chunk) shell_chunk_node = chunk_node[0].host.get_shell() with reporter.step("Get replication count"): - assert self.check_replication(rep_count, frostfs_cli, cid, oid) + assert self.check_replication(rep_count, grpc_client, cid, oid) with reporter.step("Delete chunk"): frostfs_node_cli = FrostfsCli( @@ -930,7 +742,7 @@ class TestECReplication(ClusterTestBase): frostfs_cli_exec_path=FROSTFS_CLI_EXEC, config_file=chunk_node[0].storage_node.get_remote_wallet_config_path(), ) - frostfs_node_cli.control.drop_objects(chunk_node[0].storage_node.get_control_endpoint(), f"{cid}/{chunk.object_id}") + frostfs_node_cli.control.drop_objects(chunk_node[0].storage_node.get_control_endpoint(), f"{cid}/{data_chunk.object_id}") with reporter.step("Wait replication count after drop one chunk"): - self.wait_replication(rep_count, frostfs_cli, cid, oid) + self.wait_replication(rep_count, grpc_client, cid, oid)