From 0c881c6fc87b719df0977df3162c41ddab59db02 Mon Sep 17 00:00:00 2001 From: Dmitriy Zayakin Date: Tue, 6 Aug 2024 11:54:04 +0300 Subject: [PATCH] [#281] Added tests EC policy Signed-off-by: Dmitriy Zayakin --- pytest_tests/resources/files/policy.json | 3 +- .../metrics/test_container_metrics.py | 111 ++-- .../replication/test_ec_replication.py | 576 ++++++++++++++---- 3 files changed, 537 insertions(+), 153 deletions(-) diff --git a/pytest_tests/resources/files/policy.json b/pytest_tests/resources/files/policy.json index e73996a1..35e13c68 100644 --- a/pytest_tests/resources/files/policy.json +++ b/pytest_tests/resources/files/policy.json @@ -1,5 +1,6 @@ { "rep-3": "REP 3", "rep-1": "REP 1", - "complex": "REP 1 IN X CBF 1 SELECT 1 FROM * AS X" + "complex": "REP 1 IN X CBF 1 SELECT 1 FROM * AS X", + "ec3.1": "EC 3.1 CBF 1 SELECT 4 FROM *" } diff --git a/pytest_tests/testsuites/metrics/test_container_metrics.py b/pytest_tests/testsuites/metrics/test_container_metrics.py index 61d1233f..c271ff44 100644 --- a/pytest_tests/testsuites/metrics/test_container_metrics.py +++ b/pytest_tests/testsuites/metrics/test_container_metrics.py @@ -1,11 +1,12 @@ import math +import time import allure import pytest from frostfs_testlib import reporter -from frostfs_testlib.steps.cli.container import create_container -from frostfs_testlib.steps.cli.object import delete_object, put_object_to_random_node -from frostfs_testlib.steps.metrics import check_metrics_counter +from frostfs_testlib.steps.cli.container import create_container, search_nodes_with_container +from frostfs_testlib.steps.cli.object import delete_object, head_object, put_object_to_random_node +from frostfs_testlib.steps.metrics import check_metrics_counter, get_metrics_value from frostfs_testlib.steps.storage_policy import get_nodes_with_object from frostfs_testlib.storage.cluster import Cluster from frostfs_testlib.storage.dataclasses.object_size import ObjectSize @@ -16,23 +17,29 @@ from frostfs_testlib.utils.file_utils import generate_file @pytest.mark.container class TestContainerMetrics(ClusterTestBase): - @allure.title("Container metrics (obj_size={object_size})") + @allure.title("Container metrics (obj_size={object_size},policy={policy})") + @pytest.mark.parametrize("placement_policy, policy", [("REP 2 IN X CBF 2 SELECT 2 FROM * AS X", "REP"), ("EC 1.1 CBF 1", "EC")]) def test_container_metrics( - self, object_size: ObjectSize, max_object_size: int, default_wallet: WalletInfo, cluster: Cluster + self, + object_size: ObjectSize, + max_object_size: int, + default_wallet: WalletInfo, + cluster: Cluster, + placement_policy: str, + policy: str, ): file_path = generate_file(object_size.value) - placement_policy = "REP 2 IN X CBF 2 SELECT 2 FROM * AS X" - copies = 2 - object_chunks = 0 - head_object = 1 + copies = 2 if policy == "REP" else 1 + object_chunks = 1 link_object = 0 - if object_size.value > max_object_size: - object_chunks = math.ceil(object_size.value / max_object_size) - link_object = 1 with reporter.step(f"Create container with policy {placement_policy}"): cid = create_container(default_wallet, self.shell, cluster.default_rpc_endpoint, placement_policy) + if object_size.value > max_object_size: + object_chunks = math.ceil(object_size.value / max_object_size) + link_object = len(search_nodes_with_container(default_wallet, cid, self.shell, cluster.default_rpc_endpoint, cluster)) + with reporter.step("Put object to random node"): oid = put_object_to_random_node( wallet=default_wallet, @@ -44,44 +51,72 @@ class TestContainerMetrics(ClusterTestBase): with reporter.step("Get object nodes"): object_storage_nodes = get_nodes_with_object(cid, oid, self.shell, cluster.storage_nodes) - object_nodes = [ - cluster_node - for cluster_node in cluster.cluster_nodes - if cluster_node.storage_node in object_storage_nodes - ] + object_nodes = [cluster_node for cluster_node in cluster.cluster_nodes if cluster_node.storage_node in object_storage_nodes] with reporter.step("Check metric appears in node where the object is located"): - count_metrics = (object_chunks + head_object + link_object) * copies - check_metrics_counter( - object_nodes, counter_exp=count_metrics, command="container_objects_total", cid=cid, type="phy" - ) - check_metrics_counter( - object_nodes, counter_exp=count_metrics, command="container_objects_total", cid=cid, type="logic" - ) - check_metrics_counter( - object_nodes, counter_exp=copies, command="container_objects_total", cid=cid, type="user" - ) + count_metrics = (object_chunks * copies) + link_object + if policy == "EC": + count_metrics = (object_chunks * 2) + link_object + check_metrics_counter(object_nodes, counter_exp=count_metrics, command="container_objects_total", cid=cid, type="phy") + check_metrics_counter(object_nodes, counter_exp=count_metrics, command="container_objects_total", cid=cid, type="logic") + check_metrics_counter(object_nodes, counter_exp=copies, command="container_objects_total", cid=cid, type="user") with reporter.step("Delete file, wait until gc remove object"): delete_object(default_wallet, cid, oid, self.shell, cluster.default_rpc_endpoint) with reporter.step(f"Check container metrics 'the counter should equal {len(object_nodes)}' in object nodes"): - check_metrics_counter( - object_nodes, counter_exp=len(object_nodes), command="container_objects_total", cid=cid, type="phy" - ) - check_metrics_counter( - object_nodes, counter_exp=len(object_nodes), command="container_objects_total", cid=cid, type="logic" - ) + check_metrics_counter(object_nodes, counter_exp=len(object_nodes), command="container_objects_total", cid=cid, type="phy") + check_metrics_counter(object_nodes, counter_exp=len(object_nodes), command="container_objects_total", cid=cid, type="logic") check_metrics_counter(object_nodes, counter_exp=0, command="container_objects_total", cid=cid, type="user") with reporter.step("Check metrics(Phy, Logic, User) in each nodes"): # Phy and Logic metrics are 4, because in rule 'CBF 2 SELECT 2 FROM', cbf2*sel2=4 + expect_metrics = 4 if policy == "REP" else 2 + check_metrics_counter(cluster.cluster_nodes, counter_exp=expect_metrics, command="container_objects_total", cid=cid, type="phy") check_metrics_counter( - cluster.cluster_nodes, counter_exp=4, command="container_objects_total", cid=cid, type="phy" + cluster.cluster_nodes, counter_exp=expect_metrics, command="container_objects_total", cid=cid, type="logic" ) - check_metrics_counter( - cluster.cluster_nodes, counter_exp=4, command="container_objects_total", cid=cid, type="logic" + check_metrics_counter(cluster.cluster_nodes, counter_exp=0, command="container_objects_total", cid=cid, type="user") + + @allure.title("Container size metrics (obj_size={object_size},policy={policy})") + @pytest.mark.parametrize("placement_policy, policy", [("REP 2 IN X CBF 2 SELECT 2 FROM * AS X", "REP"), ("EC 1.1 CBF 1", "EC")]) + def test_container_size_metrics( + self, + object_size: ObjectSize, + default_wallet: WalletInfo, + placement_policy: str, + policy: str, + ): + file_path = generate_file(object_size.value) + + with reporter.step(f"Create container with policy {policy}"): + cid = create_container(default_wallet, self.shell, self.cluster.default_rpc_endpoint, placement_policy) + + with reporter.step("Put object to random node"): + oid = put_object_to_random_node( + wallet=default_wallet, + path=file_path, + cid=cid, + shell=self.shell, + cluster=self.cluster, ) - check_metrics_counter( - cluster.cluster_nodes, counter_exp=0, command="container_objects_total", cid=cid, type="user" + + with reporter.step("Get object nodes"): + object_storage_nodes = get_nodes_with_object(cid, oid, self.shell, self.cluster.storage_nodes) + object_nodes = [ + cluster_node for cluster_node in self.cluster.cluster_nodes if cluster_node.storage_node in object_storage_nodes + ] + + with reporter.step("Check metric appears in all node where the object is located"): + act_metric = sum( + [get_metrics_value(node, command="frostfs_node_engine_container_size_bytes", cid=cid) for node in object_nodes] ) + assert (act_metric // 2) == object_size.value + + with reporter.step("Delete file, wait until gc remove object"): + id_tombstone = delete_object(default_wallet, cid, oid, self.shell, self.cluster.default_rpc_endpoint) + tombstone = head_object(default_wallet, cid, id_tombstone, self.shell, self.cluster.default_rpc_endpoint) + + with reporter.step(f"Check container size metrics"): + act_metric = get_metrics_value(object_nodes[0], command="frostfs_node_engine_container_size_bytes", cid=cid) + assert act_metric == int(tombstone["header"]["payloadLength"]) diff --git a/pytest_tests/testsuites/replication/test_ec_replication.py b/pytest_tests/testsuites/replication/test_ec_replication.py index dfa8d785..fc723932 100644 --- a/pytest_tests/testsuites/replication/test_ec_replication.py +++ b/pytest_tests/testsuites/replication/test_ec_replication.py @@ -4,20 +4,28 @@ from dataclasses import dataclass import allure import pytest import yaml -from frostfs_testlib import reporter +from frostfs_testlib import plugins, reporter from frostfs_testlib.cli import FrostfsAdm, FrostfsCli from frostfs_testlib.cli.netmap_parser import NetmapParser from frostfs_testlib.credentials.interfaces import User -from frostfs_testlib.resources.cli import FROSTFS_ADM_CONFIG_PATH, FROSTFS_ADM_EXEC, FROSTFS_CLI_EXEC -from frostfs_testlib.resources.common import HOSTING_CONFIG_FILE -from frostfs_testlib.shell import Shell +from frostfs_testlib.resources.cli import ( + CLI_DEFAULT_TIMEOUT, + FROSTFS_ADM_CONFIG_PATH, + FROSTFS_ADM_EXEC, + FROSTFS_CLI_EXEC, + HOSTING_CONFIG_FILE, +) +from frostfs_testlib.resources.common import COMPLEX_OBJECT_CHUNKS_COUNT, COMPLEX_OBJECT_TAIL_SIZE +from frostfs_testlib.s3 import AwsCliClient, S3ClientWrapper +from frostfs_testlib.s3.interfaces import BucketContainerResolver, VersioningStatus from frostfs_testlib.steps.cli.object import get_object, put_object from frostfs_testlib.storage.cluster import Cluster, ClusterNode, StorageNode -from frostfs_testlib.storage.controllers import ClusterStateController +from frostfs_testlib.storage.controllers import ClusterStateController, ShardsWatcher from frostfs_testlib.storage.controllers.state_managers.config_state_manager import ConfigStateManager from frostfs_testlib.storage.dataclasses.object_size import ObjectSize from frostfs_testlib.storage.dataclasses.storage_object_info import NodeNetmapInfo from frostfs_testlib.testing.cluster_test_base import ClusterTestBase +from frostfs_testlib.testing.test_control import wait_for_success from frostfs_testlib.utils.cli_utils import parse_netmap_output from frostfs_testlib.utils.file_utils import generate_file, get_file_hash @@ -32,8 +40,8 @@ def pytest_generate_tests(metafunc: pytest.Metafunc) -> None: node_count = len(hosting_config["hosts"]) ec_map = { - 4: ["EC 1.1", "EC 2.1", "EC 3.1"], - 8: ["EC 5.3", "EC 3.2", "EC 7.1", "EC 4.4"], + 4: ["EC 1.1", "EC 2.1", "EC 3.1", "EC 2.2"], + 8: ["EC 5.3", "EC 3.2", "EC 7.1", "EC 4.4", "EC 3.1"], 16: ["EC 12.4", "EC 8.4", "EC 5.3", "EC 4.4"], 100: ["EC 12.4", "EC 8.4", "EC 5.3", "EC 4.4"], } @@ -54,10 +62,12 @@ class Chunk: return self.object_id -@allure.title("Initialized local FrostfsCli") +@allure.title("Init bucket container resolver") @pytest.fixture() -def frostfs_local_cli(client_shell: Shell, default_user: User) -> FrostfsCli: - return FrostfsCli(client_shell, frostfs_cli_exec_path=FROSTFS_CLI_EXEC, config_file=default_user.wallet.config_path) +def bucket_container_resolver(node_under_test: ClusterNode) -> BucketContainerResolver: + resolver_cls = plugins.load_plugin("frostfs.testlib.bucket_cid_resolver", node_under_test.host.config.product) + resolver: BucketContainerResolver = resolver_cls() + return resolver @allure.title("Initialized remote FrostfsAdm") @@ -71,6 +81,40 @@ def frostfs_remote_adm(cluster: Cluster) -> FrostfsAdm: @pytest.mark.replication @pytest.mark.ec_replication class TestECReplication(ClusterTestBase): + @pytest.fixture() + def rep_count(self, object_size: ObjectSize) -> int: + rep_count = 3 + if object_size.name == "complex": + rep_count *= int(COMPLEX_OBJECT_CHUNKS_COUNT) + 1 if COMPLEX_OBJECT_TAIL_SIZE else int(COMPLEX_OBJECT_CHUNKS_COUNT) + return rep_count + + @wait_for_success(120, 5) + def wait_for_nodes_appears_in_map(self, frostfs_cli: FrostfsCli, alive_node: ClusterNode, desired_nodes_count: int) -> bool: + self.tick_epoch(alive_node, 2) + netmap = parse_netmap_output( + frostfs_cli.netmap.snapshot(alive_node.storage_node.get_rpc_endpoint(), timeout=CLI_DEFAULT_TIMEOUT).stdout + ) + assert len(netmap) == desired_nodes_count + + @wait_for_success(120, 5) + def wait_replication(self, total_chunks: int, local_cli: FrostfsCli, cid: str, oid: str, success: bool = True) -> None: + if not success: + assert not self.check_replication(total_chunks, local_cli, cid, oid) + else: + assert self.check_replication(total_chunks, local_cli, cid, oid) + + @allure.title("Search shard") + def get_shard_chunk(self, node: ClusterNode, chunk: Chunk) -> str: + oid_path = f"{chunk.object_id[0]}/{chunk.object_id[1]}/{chunk.object_id[2]}/{chunk.object_id[3]}" + node_shell = node.storage_node.host.get_shell() + shards_watcher = ShardsWatcher(node) + + with reporter.step("Search object file"): + for shard_id, shard_info in shards_watcher.shards_snapshots[-1].items(): + check_dir = node_shell.exec(f" [ -d {shard_info['blobstor'][1]['path']}/{oid_path} ] && echo 1 || echo 0").stdout + if "1" in check_dir.strip(): + return shard_id + @allure.title("Restore chunk maximum params in network params ") @pytest.fixture def restore_network_config(self, frostfs_remote_adm: FrostfsAdm) -> None: @@ -81,7 +125,10 @@ class TestECReplication(ClusterTestBase): def get_object_nodes(self, cli: FrostfsCli, cid: str, oid: str, endpoint: str = None) -> dict: if not endpoint: endpoint = self.cluster.default_rpc_endpoint - return json.loads(cli.object.nodes(endpoint, cid, oid=oid, json=True).stdout) + object_nodes = json.loads(cli.object.nodes(endpoint, cid, oid=oid, json=True, timeout=CLI_DEFAULT_TIMEOUT).stdout) + if object_nodes.get("errors"): + raise object_nodes["errors"] + return object_nodes @reporter.step("Get all chunks object ") def get_all_chunks_object(self, cli: FrostfsCli, cid: str, oid: str, endpoint: str = None) -> list[Chunk]: @@ -102,7 +149,7 @@ class TestECReplication(ClusterTestBase): def search_node_not_chunks(self, chunks: list[Chunk], local_cli: FrostfsCli, endpoint: str = None) -> list[ClusterNode]: if not endpoint: self.cluster.default_rpc_endpoint - netmap = parse_netmap_output(local_cli.netmap.snapshot(endpoint).stdout) + netmap = parse_netmap_output(local_cli.netmap.snapshot(endpoint, timeout=CLI_DEFAULT_TIMEOUT).stdout) chunks_node_key = [] for chunk in chunks: chunks_node_key.extend(chunk.confirmed_nodes) @@ -118,11 +165,16 @@ class TestECReplication(ClusterTestBase): @reporter.step("Create container, policy={policy}") def create_container(self, user_cli: FrostfsCli, endpoint: str, policy: str) -> str: - return user_cli.container.create(endpoint, policy=policy, await_mode=True).stdout.split(" ")[1].strip().split("\n")[0] + return ( + user_cli.container.create(endpoint, policy=policy, await_mode=True, timeout=CLI_DEFAULT_TIMEOUT) + .stdout.split(" ")[1] + .strip() + .split("\n")[0] + ) @reporter.step("Search node chunk {chunk}") def get_chunk_node(self, frostfs_cli: FrostfsCli, chunk: Chunk) -> tuple[ClusterNode, NodeNetmapInfo]: - netmap = parse_netmap_output(frostfs_cli.netmap.snapshot(self.cluster.default_rpc_endpoint).stdout) + netmap = parse_netmap_output(frostfs_cli.netmap.snapshot(self.cluster.default_rpc_endpoint, timeout=CLI_DEFAULT_TIMEOUT).stdout) for node_info in netmap: if node_info.node_id in chunk.confirmed_nodes: for cluster_node in self.cluster.cluster_nodes: @@ -131,7 +183,9 @@ class TestECReplication(ClusterTestBase): @reporter.step("Check replication chunks={total_chunks} chunks ") def check_replication(self, total_chunks: int, local_cli: FrostfsCli, cid: str, oid: str) -> bool: - object_nodes_info = local_cli.object.nodes(self.cluster.default_rpc_endpoint, cid, oid=oid, json=True).stdout + object_nodes_info = local_cli.object.nodes( + self.cluster.default_rpc_endpoint, cid, oid=oid, json=True, timeout=CLI_DEFAULT_TIMEOUT + ).stdout object_nodes_info = json.loads(object_nodes_info) return len(object_nodes_info["data_objects"]) == total_chunks @@ -150,49 +204,48 @@ class TestECReplication(ClusterTestBase): cluster_state_controller.start_stopped_hosts() cluster_state_controller.manager(ConfigStateManager).revert_all() - @allure.title("Create container with EC policy (size={object_size.value})") + @allure.title("Create container with EC policy (size={object_size})") def test_create_container_with_ec_policy( self, default_user: User, - frostfs_local_cli: FrostfsCli, + frostfs_cli: FrostfsCli, object_size: ObjectSize, + rep_count: int, ) -> None: test_file = generate_file(object_size.value) - rep_count = 3 - if object_size.name == "complex": - rep_count *= 4 + with reporter.step("Create container."): - cid = self.create_container(frostfs_local_cli, self.cluster.default_rpc_endpoint, "EC 2.1") + cid = self.create_container(frostfs_cli, self.cluster.default_rpc_endpoint, "EC 2.1") with reporter.step("Put object in container."): oid = put_object(default_user.wallet, test_file, cid, self.shell, self.cluster.default_rpc_endpoint) with reporter.step("Check replication chunks."): - assert self.check_replication(rep_count, frostfs_local_cli, cid, oid) + assert self.check_replication(rep_count, frostfs_cli, cid, oid) @allure.title("Lose node with chunk data") @pytest.mark.failover def test_lose_node_with_data_chunk( self, - frostfs_local_cli: FrostfsCli, + frostfs_cli: FrostfsCli, default_user: User, simple_object_size: ObjectSize, cluster_state_controller: ClusterStateController, disable_policer: None, ) -> None: with reporter.step("Create container."): - cid = self.create_container(frostfs_local_cli, self.cluster.default_rpc_endpoint, "EC 3.1") + cid = self.create_container(frostfs_cli, self.cluster.default_rpc_endpoint, "EC 3.1") with reporter.step("Put object in container."): test_file = generate_file(simple_object_size.value) oid = put_object(default_user.wallet, test_file, cid, self.shell, self.cluster.default_rpc_endpoint) with reporter.step("Check chunk replication on 4 nodes."): - assert self.check_replication(4, frostfs_local_cli, cid, oid) + assert self.check_replication(4, frostfs_cli, cid, oid) with reporter.step("Search node data chunk"): - chunk = self.get_data_chunk_object(frostfs_local_cli, cid, oid) - chunk_node = self.get_chunk_node(frostfs_local_cli, chunk)[0] + chunk = self.get_data_chunk_object(frostfs_cli, cid, oid) + chunk_node = self.get_chunk_node(frostfs_cli, chunk)[0] with reporter.step("Stop node with data chunk."): cluster_state_controller.stop_node_host(chunk_node, "hard") @@ -203,31 +256,31 @@ class TestECReplication(ClusterTestBase): with reporter.step("Start stopped node, and check replication chunks."): cluster_state_controller.start_node_host(chunk_node) - assert self.check_replication(4, frostfs_local_cli, cid, oid) + assert self.check_replication(4, frostfs_cli, cid, oid) @allure.title("Lose node with chunk parity") @pytest.mark.failover def test_lose_node_with_parity_chunk( self, - frostfs_local_cli: FrostfsCli, + frostfs_cli: FrostfsCli, default_user: User, simple_object_size: ObjectSize, cluster_state_controller: ClusterStateController, disable_policer: None, ) -> None: with reporter.step("Create container."): - cid = self.create_container(frostfs_local_cli, self.cluster.default_rpc_endpoint, "EC 3.1") + cid = self.create_container(frostfs_cli, self.cluster.default_rpc_endpoint, "EC 3.1") with reporter.step("Put object in container."): test_file = generate_file(simple_object_size.value) oid = put_object(default_user.wallet, test_file, cid, self.shell, self.cluster.default_rpc_endpoint) with reporter.step("Check chunk replication on 4 nodes."): - assert self.check_replication(4, frostfs_local_cli, cid, oid) + assert self.check_replication(4, frostfs_cli, cid, oid) with reporter.step("Search node with parity chunk"): - chunk = self.get_parity_chunk_object(frostfs_local_cli, cid, oid) - chunk_node = self.get_chunk_node(frostfs_local_cli, chunk)[0] + chunk = self.get_parity_chunk_object(frostfs_cli, cid, oid) + chunk_node = self.get_chunk_node(frostfs_cli, chunk)[0] with reporter.step("Stop node parity chunk."): cluster_state_controller.stop_node_host(chunk_node, "hard") @@ -238,33 +291,33 @@ class TestECReplication(ClusterTestBase): with reporter.step("Start stoped node, and check replication chunks."): cluster_state_controller.start_node_host(chunk_node) - assert self.check_replication(4, frostfs_local_cli, cid, oid) + assert self.check_replication(4, frostfs_cli, cid, oid) @allure.title("Lose nodes with chunk data and parity") @pytest.mark.failover def test_lose_nodes_data_chunk_and_parity( self, - frostfs_local_cli: FrostfsCli, + frostfs_cli: FrostfsCli, default_user: User, simple_object_size: ObjectSize, cluster_state_controller: ClusterStateController, disable_policer: None, ) -> None: with reporter.step("Create container."): - cid = self.create_container(frostfs_local_cli, self.cluster.default_rpc_endpoint, "EC 3.1") + cid = self.create_container(frostfs_cli, self.cluster.default_rpc_endpoint, "EC 3.1") with reporter.step("Put object in container."): test_file = generate_file(simple_object_size.value) oid = put_object(default_user.wallet, test_file, cid, self.shell, self.cluster.default_rpc_endpoint) with reporter.step("Check count chunks, expect 4."): - assert self.check_replication(4, frostfs_local_cli, cid, oid) + assert self.check_replication(4, frostfs_cli, cid, oid) with reporter.step("Search node data chunk and node parity chunk"): - data_chunk = self.get_data_chunk_object(frostfs_local_cli, cid, oid) - node_data_chunk = self.get_chunk_node(frostfs_local_cli, data_chunk)[0] - parity_chunk = self.get_parity_chunk_object(frostfs_local_cli, cid, oid) - node_parity_chunk = self.get_chunk_node(frostfs_local_cli, parity_chunk)[0] + data_chunk = self.get_data_chunk_object(frostfs_cli, cid, oid) + node_data_chunk = self.get_chunk_node(frostfs_cli, data_chunk)[0] + parity_chunk = self.get_parity_chunk_object(frostfs_cli, cid, oid) + node_parity_chunk = self.get_chunk_node(frostfs_cli, parity_chunk)[0] with reporter.step("Stop node with data chunk."): cluster_state_controller.stop_node_host(node_data_chunk, "hard") @@ -275,7 +328,7 @@ class TestECReplication(ClusterTestBase): with reporter.step("Start stopped host and check chunks."): cluster_state_controller.start_node_host(node_data_chunk) - assert self.check_replication(4, frostfs_local_cli, cid, oid) + assert self.check_replication(4, frostfs_cli, cid, oid) with reporter.step("Stop node with parity chunk and one all node."): cluster_state_controller.stop_node_host(node_data_chunk, "hard") @@ -287,68 +340,73 @@ class TestECReplication(ClusterTestBase): with reporter.step("Start stopped nodes and check replication chunk."): cluster_state_controller.start_stopped_hosts() - assert self.check_replication(4, frostfs_local_cli, cid, oid) + assert self.check_replication(4, frostfs_cli, cid, oid) @allure.title("Policer work with chunk") @pytest.mark.failover def test_work_policer_with_nodes( self, simple_object_size: ObjectSize, - frostfs_local_cli: FrostfsCli, + frostfs_cli: FrostfsCli, default_user: User, cluster_state_controller: ClusterStateController, ) -> None: with reporter.step("Create container."): - cid = self.create_container(frostfs_local_cli, self.cluster.default_rpc_endpoint, "EC 2.1") + cid = self.create_container(frostfs_cli, self.cluster.default_rpc_endpoint, "EC 2.1") with reporter.step("Put object on container."): test_file = generate_file(simple_object_size.value) oid = put_object(default_user.wallet, test_file, cid, self.shell, self.cluster.default_rpc_endpoint) with reporter.step("Check count chunks nodes on 3."): - assert self.check_replication(3, frostfs_local_cli, cid, oid) + assert self.check_replication(3, frostfs_cli, cid, oid) with reporter.step("Stop node with chunk."): - data_chunk = self.get_data_chunk_object(frostfs_local_cli, cid, oid) - first_all_chunks = self.get_all_chunks_object(frostfs_local_cli, cid, oid) - node_data_chunk = self.get_chunk_node(frostfs_local_cli, data_chunk)[0] + data_chunk = self.get_data_chunk_object(frostfs_cli, cid, oid) + first_all_chunks = self.get_all_chunks_object(frostfs_cli, cid, oid) + node_data_chunk = self.get_chunk_node(frostfs_cli, data_chunk)[0] cluster_state_controller.stop_node_host(node_data_chunk, "hard") - with reporter.step("Check replication chunk with different node."): - alive_endpoint = list(set(self.cluster.cluster_nodes) - {node_data_chunk})[0].storage_node.get_rpc_endpoint() - node = self.search_node_not_chunks(first_all_chunks, frostfs_local_cli, endpoint=alive_endpoint)[0] - second_all_chunks = self.get_all_chunks_object(frostfs_local_cli, cid, oid, node.storage_node.get_rpc_endpoint()) + with reporter.step("Tick epoch and wait update network map."): + alive_node = list(set(self.cluster.cluster_nodes) - {node_data_chunk})[0] + self.wait_for_nodes_appears_in_map(frostfs_cli, alive_node, 3) + + with reporter.step("Wait replication chunk with different node."): + node = self.search_node_not_chunks(first_all_chunks, frostfs_cli, endpoint=alive_node.storage_node.get_rpc_endpoint())[0] + self.wait_replication(3, frostfs_cli, cid, oid) + + with reporter.step("Get new chunks"): + second_all_chunks = self.get_all_chunks_object(frostfs_cli, cid, oid, node.storage_node.get_rpc_endpoint()) with reporter.step("Check that oid no change."): - oid_chunk_check = [chunk for chunk in second_all_chunks if data_chunk.object_id == chunk.object_id] - assert len(oid_chunk_check) > 0 + assert [chunk for chunk in second_all_chunks if data_chunk.object_id == chunk.object_id] with reporter.step("Start stopped host, and check delete 4 chunk."): cluster_state_controller.start_node_host(node_data_chunk) - all_chunks_after_start_node = self.get_all_chunks_object(frostfs_local_cli, cid, oid) + all_chunks_after_start_node = self.get_all_chunks_object(frostfs_cli, cid, oid) assert len(all_chunks_after_start_node) == 3 - @allure.title("EC X.Y combinations (nodes={node_count},policy={ec_policy},size={object_size.name})") + @allure.title("EC X.Y combinations (nodes={node_count},policy={ec_policy},size={object_size})") def test_create_container_with_difference_count_nodes( self, node_count: int, ec_policy: str, object_size: ObjectSize, default_user: User, - frostfs_local_cli: FrostfsCli, + frostfs_cli: FrostfsCli, ) -> None: with reporter.step("Create container."): expected_chunks = int(ec_policy.split(" ")[1].split(".")[0]) + int(ec_policy.split(" ")[1].split(".")[1]) if "complex" in object_size.name: expected_chunks *= 4 - cid = self.create_container(frostfs_local_cli, self.cluster.default_rpc_endpoint, ec_policy) + cid = self.create_container(frostfs_cli, self.cluster.default_rpc_endpoint, ec_policy) with reporter.step("Put object in container."): test_file = generate_file(object_size.value) oid = put_object(default_user.wallet, test_file, cid, self.shell, self.cluster.default_rpc_endpoint) with reporter.step("Check count object chunks."): - chunks = self.get_all_chunks_object(frostfs_local_cli, cid, oid, self.cluster.default_rpc_endpoint) + chunks = self.get_all_chunks_object(frostfs_cli, cid, oid, self.cluster.default_rpc_endpoint) assert len(chunks) == expected_chunks with reporter.step("get object and check hash."): @@ -358,154 +416,299 @@ class TestECReplication(ClusterTestBase): @allure.title("Request PUT with copies_number flag") def test_put_object_with_copies_number( self, - frostfs_local_cli: FrostfsCli, + frostfs_cli: FrostfsCli, default_user: User, simple_object_size: ObjectSize, ) -> None: with reporter.step("Create container."): - cid = self.create_container(frostfs_local_cli, self.cluster.default_rpc_endpoint, "EC 2.1") + cid = self.create_container(frostfs_cli, self.cluster.default_rpc_endpoint, "EC 2.1") with reporter.step("Put object in container with copies number = 1"): test_file = generate_file(simple_object_size.value) oid = put_object(default_user.wallet, test_file, cid, self.shell, self.cluster.default_rpc_endpoint, copies_number=1) with reporter.step("Check that count chunks > 1."): - chunks = self.get_all_chunks_object(frostfs_local_cli, cid, oid, self.cluster.default_rpc_endpoint) + chunks = self.get_all_chunks_object(frostfs_cli, cid, oid, self.cluster.default_rpc_endpoint) assert len(chunks) > 1 @allure.title("Request PUT and 1 node off") @pytest.mark.failover def test_put_object_with_off_cnr_node( self, - frostfs_local_cli: FrostfsCli, + frostfs_cli: FrostfsCli, cluster_state_controller: ClusterStateController, default_user: User, simple_object_size: ObjectSize, ) -> None: with reporter.step("Create container."): - cid = self.create_container(frostfs_local_cli, self.cluster.default_rpc_endpoint, "EC 3.1") + cid = self.create_container(frostfs_cli, self.cluster.default_rpc_endpoint, "EC 3.1") with reporter.step("Stop one node in container nodes"): cluster_state_controller.stop_node_host(self.cluster.cluster_nodes[1], "hard") - with reporter.step("Put object in container, expect error."): + with reporter.step("Put object in container, expect success for EC container."): test_file = generate_file(simple_object_size.value) - with pytest.raises(RuntimeError, match="put single object on client"): - put_object(default_user.wallet, test_file, cid, self.shell, self.cluster.default_rpc_endpoint) + put_object(default_user.wallet, test_file, cid, self.shell, self.cluster.default_rpc_endpoint) - @allure.title("Request DELETE (size={object_size.name})") + @allure.title("Request PUT (size={object_size})") + def test_put_object_with_ec_cnr( + self, + frostfs_cli: FrostfsCli, + default_user: User, + object_size: ObjectSize, + ) -> None: + with reporter.step("Create container."): + cid = self.create_container(frostfs_cli, self.cluster.default_rpc_endpoint, "EC 2.1") + + with reporter.step("Put object in container with --prepare-locally."): + test_file = generate_file(object_size.value) + oid = put_object(default_user.wallet, test_file, cid, self.shell, self.cluster.default_rpc_endpoint) + + with reporter.step("Get chunks object."): + chunks = self.get_all_chunks_object(frostfs_cli, cid, oid, self.cluster.default_rpc_endpoint) + + with reporter.step("Check header chunks object"): + for chunk in chunks: + chunk_head = frostfs_cli.object.head(self.cluster.default_rpc_endpoint, cid, chunk.object_id, raw=True).stdout + assert "EC header:" in chunk_head + + @allure.title("Request PUT with copies number") + def test_put_object_with_copies_number( + self, + frostfs_cli: FrostfsCli, + default_user: User, + simple_object_size: ObjectSize, + ) -> None: + with reporter.step("Create container."): + cid = self.create_container(frostfs_cli, self.cluster.default_rpc_endpoint, "EC 2.1") + + with reporter.step("Put object in container with --copies-number=1."): + test_file = generate_file(simple_object_size.value) + oid = put_object(default_user.wallet, test_file, cid, self.shell, self.cluster.default_rpc_endpoint, copies_number=1) + + with reporter.step("Check len chunks object."): + chunks = self.get_all_chunks_object(frostfs_cli, cid, oid, self.cluster.default_rpc_endpoint) + assert len(chunks) > 1 + + @allure.title("Request GET (size={object_size})") + def test_get_object_in_ec_cnr( + self, + default_user: User, + frostfs_cli: FrostfsCli, + object_size: ObjectSize, + ) -> None: + with reporter.step("Create container."): + cid = self.create_container(frostfs_cli, self.cluster.default_rpc_endpoint, "EC 2.1 CBF 1") + + with reporter.step("Put object in container"): + test_file = generate_file(object_size.value) + hash_origin_file = get_file_hash(test_file) + oid = put_object(default_user.wallet, test_file, cid, self.shell, self.cluster.default_rpc_endpoint) + + with reporter.step("Get id all chunks."): + chunks = self.get_all_chunks_object(frostfs_cli, cid, oid, self.cluster.default_rpc_endpoint) + + with reporter.step("Search chunk node and not chunks node."): + chunk_node = self.get_chunk_node(frostfs_cli, chunks[0]) + not_chunk_node = self.search_node_not_chunks(chunks, frostfs_cli, self.cluster.default_rpc_endpoint) + + with reporter.step("GET request with chunk node, expect success"): + file_one = get_object(default_user.wallet, cid, oid, self.shell, chunk_node[0].storage_node.get_rpc_endpoint()) + hash_file_one = get_file_hash(file_one) + assert hash_file_one == hash_origin_file + + with reporter.step("Get request with not chunk node"): + file_two = get_object(default_user.wallet, cid, oid, self.shell, not_chunk_node[0].storage_node.get_rpc_endpoint()) + hash_file_two = get_file_hash(file_two) + assert hash_file_two == hash_file_one == hash_origin_file + + @allure.title("Request SEARCH with flags 'root' (size={object_size})") + def test_search_object_in_ec_cnr_root_flags( + self, + default_user: User, + frostfs_cli: FrostfsCli, + object_size: ObjectSize, + ) -> None: + with reporter.step("Create container."): + cid = self.create_container(frostfs_cli, self.cluster.default_rpc_endpoint, "EC 2.1") + + with reporter.step("Put object in container"): + test_file = generate_file(object_size.value) + oid = put_object(default_user.wallet, test_file, cid, self.shell, self.cluster.default_rpc_endpoint) + + with reporter.step("Search operation with --root flags"): + search_output = frostfs_cli.object.search( + self.cluster.default_rpc_endpoint, cid, root=True, timeout=CLI_DEFAULT_TIMEOUT + ).stdout.split("\n")[1:] + assert search_output[0] == oid + + @allure.title("Request SEARCH check valid chunk id (size={object_size})") + def test_search_object_in_ec_cnr_chunk_id( + self, + default_user: User, + frostfs_cli: FrostfsCli, + object_size: ObjectSize, + ) -> None: + with reporter.step("Create container."): + cid = self.create_container(frostfs_cli, self.cluster.default_rpc_endpoint, "EC 2.1") + + with reporter.step("Put object in container"): + test_file = generate_file(object_size.value) + oid = put_object(default_user.wallet, test_file, cid, self.shell, self.cluster.default_rpc_endpoint) + + with reporter.step("Search operation object"): + search_output = frostfs_cli.object.search(self.cluster.default_rpc_endpoint, cid, timeout=CLI_DEFAULT_TIMEOUT).stdout + chunks = self.get_all_chunks_object(frostfs_cli, cid, oid, self.cluster.default_rpc_endpoint) + for chunk in chunks: + assert chunk.object_id in search_output + + @allure.title("Request SEARCH check no chunk index info (size={object_size})") + def test_search_object_in_ec_cnr( + self, + default_user: User, + frostfs_cli: FrostfsCli, + object_size: ObjectSize, + ) -> None: + with reporter.step("Create container."): + cid = self.create_container(frostfs_cli, self.cluster.default_rpc_endpoint, "EC 2.1") + + with reporter.step("Put object in container"): + test_file = generate_file(object_size.value) + oid = put_object(default_user.wallet, test_file, cid, self.shell, self.cluster.default_rpc_endpoint) + + with reporter.step("Search operation all chunk"): + chunks = self.get_all_chunks_object(frostfs_cli, cid, oid, self.cluster.default_rpc_endpoint) + for chunk in chunks: + chunk_search = frostfs_cli.object.search( + self.cluster.default_rpc_endpoint, cid, oid=chunk.object_id, timeout=CLI_DEFAULT_TIMEOUT + ).stdout + assert "index" not in chunk_search + + @allure.title("Request DELETE (size={object_size})") @pytest.mark.failover def test_delete_object_in_ec_cnr( self, default_user: User, - frostfs_local_cli: FrostfsCli, + frostfs_cli: FrostfsCli, object_size: ObjectSize, cluster_state_controller: ClusterStateController, ) -> None: with reporter.step("Create container."): - cid = self.create_container(frostfs_local_cli, self.cluster.default_rpc_endpoint, "EC 2.1") + cid = self.create_container(frostfs_cli, self.cluster.default_rpc_endpoint, "EC 2.1") with reporter.step("Put object in container."): test_file = generate_file(object_size.value) oid = put_object(default_user.wallet, test_file, cid, self.shell, self.cluster.default_rpc_endpoint) with reporter.step("Check object chunks nodes."): - chunks_object = self.get_all_chunks_object(frostfs_local_cli, cid, oid, self.cluster.default_rpc_endpoint) + chunks_object = self.get_all_chunks_object(frostfs_cli, cid, oid, self.cluster.default_rpc_endpoint) replication_count = 3 if object_size.name == "simple" else 3 * 4 assert len(chunks_object) == replication_count with reporter.step("Delete object"): - frostfs_local_cli.object.delete(self.cluster.default_rpc_endpoint, cid, oid) + frostfs_cli.object.delete(self.cluster.default_rpc_endpoint, cid, oid, timeout=CLI_DEFAULT_TIMEOUT) with reporter.step("Check that delete all chunks."): for chunk in chunks_object: with pytest.raises(RuntimeError, match="object already removed"): - frostfs_local_cli.object.head(self.cluster.default_rpc_endpoint, cid, chunk.object_id) + frostfs_cli.object.head(self.cluster.default_rpc_endpoint, cid, chunk.object_id, timeout=CLI_DEFAULT_TIMEOUT) with reporter.step("Put second object."): oid_second = put_object(default_user.wallet, test_file, cid, self.shell, self.cluster.default_rpc_endpoint) with reporter.step("Check second object chunks nodes."): - chunks_second_object = self.get_all_chunks_object(frostfs_local_cli, cid, oid_second, self.cluster.default_rpc_endpoint) + chunks_second_object = self.get_all_chunks_object(frostfs_cli, cid, oid_second, self.cluster.default_rpc_endpoint) assert len(chunks_second_object) == replication_count with reporter.step("Stop nodes with chunk."): - chunk_node = self.get_chunk_node(frostfs_local_cli, chunks_second_object[0]) + chunk_node = self.get_chunk_node(frostfs_cli, chunks_second_object[0]) cluster_state_controller.stop_node_host(chunk_node[0], "hard") with reporter.step("Delete second object"): - frostfs_local_cli.object.delete(self.cluster.default_rpc_endpoint, cid, oid_second) + cluster_nodes = list(set(self.cluster.cluster_nodes) - {chunk_node[0]}) + frostfs_cli.object.delete(cluster_nodes[0].storage_node.get_rpc_endpoint(), cid, oid_second, timeout=CLI_DEFAULT_TIMEOUT) with reporter.step("Check that delete all chunk second object."): for chunk in chunks_second_object: with pytest.raises(RuntimeError, match="object already removed"): - frostfs_local_cli.object.head(self.cluster.default_rpc_endpoint, cid, chunk.object_id) + frostfs_cli.object.head(self.cluster.default_rpc_endpoint, cid, chunk.object_id, timeout=CLI_DEFAULT_TIMEOUT) - @allure.title("Request LOCK (size={object_size.name})") + @allure.title("Request LOCK (size={object_size})") @pytest.mark.failover def test_lock_object_in_ec_cnr( self, - frostfs_local_cli: FrostfsCli, + frostfs_cli: FrostfsCli, object_size: ObjectSize, default_user: User, cluster_state_controller: ClusterStateController, ) -> None: with reporter.step("Create container."): - cid = self.create_container(frostfs_local_cli, self.cluster.default_rpc_endpoint, "EC 2.1") + cid = self.create_container(frostfs_cli, self.cluster.default_rpc_endpoint, "EC 2.1") with reporter.step("Put object in container."): test_file = generate_file(object_size.value) oid = put_object(default_user.wallet, test_file, cid, self.shell, self.cluster.default_rpc_endpoint) with reporter.step("Check object chunks nodes."): - chunks_object = self.get_all_chunks_object(frostfs_local_cli, cid, oid, self.cluster.default_rpc_endpoint) + chunks_object = self.get_all_chunks_object(frostfs_cli, cid, oid, self.cluster.default_rpc_endpoint) replication_count = 3 if object_size.name == "simple" else 3 * 4 assert len(chunks_object) == replication_count with reporter.step("Put LOCK in object."): - epoch = frostfs_local_cli.netmap.epoch(self.cluster.default_rpc_endpoint).stdout.strip() - frostfs_local_cli.object.lock(self.cluster.default_rpc_endpoint, cid, oid, expire_at=int(epoch) + 5).stdout + epoch = frostfs_cli.netmap.epoch(self.cluster.default_rpc_endpoint, timeout=CLI_DEFAULT_TIMEOUT).stdout.strip() + frostfs_cli.object.lock( + self.cluster.default_rpc_endpoint, cid, oid, timeout=CLI_DEFAULT_TIMEOUT, expire_at=(int(epoch) + 5) + ).stdout - with reporter.step("Check LOCK in object"): - chunks = frostfs_local_cli.object.head(self.cluster.default_rpc_endpoint, cid, oid, raw=True).stdout.strip().split(" ") - oids_chunks = [chunk.strip() for chunk in chunks if len(chunk) > 35] - for chunk_id in oids_chunks: - with pytest.raises(RuntimeError, match="could not delete objects"): - frostfs_local_cli.object.delete(self.cluster.default_rpc_endpoint, cid, chunk_id) + with reporter.step("Check don`t delete chunk"): + for chunk in chunks_object: + with pytest.raises(RuntimeError, match="Lock EC chunk failed"): + frostfs_cli.object.delete(self.cluster.default_rpc_endpoint, cid, chunk.object_id, timeout=CLI_DEFAULT_TIMEOUT) + + with reporter.step("Check enable LOCK object"): + with pytest.raises(RuntimeError, match="object is locked"): + frostfs_cli.object.delete(self.cluster.default_rpc_endpoint, cid, oid, timeout=CLI_DEFAULT_TIMEOUT) with reporter.step("Stop chunk node."): - chunk_node = self.get_chunk_node(frostfs_local_cli, chunks_object[0]) + chunk_node = self.get_chunk_node(frostfs_cli, chunks_object[0]) cluster_state_controller.stop_node_host(chunk_node[0], "hard") cluster_state_controller.start_node_host(chunk_node[0]) - with reporter.step("Check LOCK in object."): - chunks = self.get_all_chunks_object(frostfs_local_cli, cid, oid, self.cluster.default_rpc_endpoint) - for chunk_id in oids_chunks: - with pytest.raises(RuntimeError, match="could not delete objects"): - frostfs_local_cli.object.delete(self.cluster.default_rpc_endpoint, cid, chunk_id) + with reporter.step("Check don`t delete chunk."): + for chunk in chunks_object: + with pytest.raises(RuntimeError, match="Lock EC chunk failed"): + frostfs_cli.object.delete(self.cluster.default_rpc_endpoint, cid, chunk.object_id, timeout=CLI_DEFAULT_TIMEOUT) - @allure.title("Output MaxEC* params in frostfscli (type={type_shards})") + with reporter.step("Check enable LOCK object"): + with pytest.raises(RuntimeError, match="object is locked"): + frostfs_cli.object.delete(self.cluster.default_rpc_endpoint, cid, oid, timeout=CLI_DEFAULT_TIMEOUT) + + @allure.title("Output MaxEC* params in frostf-scli (type={type_shards})") @pytest.mark.parametrize("type_shards", ["Maximum count of data shards", "Maximum count of parity shards"]) - def test_maxec_info_with_output_cli(self, frostfs_local_cli: FrostfsCli, type_shards: str) -> None: + def test_maxec_info_with_output_cli(self, frostfs_cli: FrostfsCli, type_shards: str) -> None: with reporter.step("Get and check params"): - net_info = frostfs_local_cli.netmap.netinfo(self.cluster.default_rpc_endpoint).stdout + net_info = frostfs_cli.netmap.netinfo(self.cluster.default_rpc_endpoint).stdout assert type_shards in net_info @allure.title("Change MaxEC*Count params") def test_change_max_data_shards_params( self, frostfs_remote_adm: FrostfsAdm, - frostfs_local_cli: FrostfsCli, + frostfs_cli: FrostfsCli, restore_network_config: None, ) -> None: with reporter.step("Get now params MaxECDataCount and MaxECParityCount"): - node_netinfo = NetmapParser.netinfo(frostfs_local_cli.netmap.netinfo(self.cluster.default_rpc_endpoint).stdout) + node_netinfo = NetmapParser.netinfo( + frostfs_cli.netmap.netinfo(self.cluster.default_rpc_endpoint, timeout=CLI_DEFAULT_TIMEOUT).stdout + ) with reporter.step("Change params"): frostfs_remote_adm.morph.set_config(set_key_value='"MaxECDataCount=5" "MaxECParityCount=3"') with reporter.step("Get update params"): - update_net_info = NetmapParser.netinfo(frostfs_local_cli.netmap.netinfo(self.cluster.default_rpc_endpoint).stdout) + update_net_info = NetmapParser.netinfo( + frostfs_cli.netmap.netinfo(self.cluster.default_rpc_endpoint, timeout=CLI_DEFAULT_TIMEOUT).stdout + ) with reporter.step("Check old and new params difference"): assert ( @@ -527,14 +730,18 @@ class TestECReplication(ClusterTestBase): def test_create_container_with_select( self, select: int, - frostfs_local_cli: FrostfsCli, + frostfs_cli: FrostfsCli, ) -> None: with reporter.step("Create container"): policy = f"EC 1.1 CBF 1 SELECT {select} FROM *" - cid = self.create_container(frostfs_local_cli, self.cluster.default_rpc_endpoint, policy) + cid = self.create_container(frostfs_cli, self.cluster.default_rpc_endpoint, policy) with reporter.step("Check container nodes decomposed"): - container_nodes = frostfs_local_cli.container.search_node(self.cluster.default_rpc_endpoint, cid).stdout.strip().split("\n")[1:] + container_nodes = ( + frostfs_cli.container.search_node(self.cluster.default_rpc_endpoint, cid, timeout=CLI_DEFAULT_TIMEOUT) + .stdout.strip() + .split("\n")[1:] + ) assert len(container_nodes) == select @allure.title("Create container with EC policy and CBF (CBF={cbf})") @@ -543,35 +750,176 @@ class TestECReplication(ClusterTestBase): self, cbf: int, expected_nodes: int, - frostfs_local_cli: FrostfsCli, + frostfs_cli: FrostfsCli, ) -> None: with reporter.step("Create container."): policy = f"EC 1.1 CBF {cbf}" - cid = self.create_container(frostfs_local_cli, self.cluster.default_rpc_endpoint, policy) + cid = self.create_container(frostfs_cli, self.cluster.default_rpc_endpoint, policy) with reporter.step("Check expected container nodes."): - container_nodes = frostfs_local_cli.container.search_node(self.cluster.default_rpc_endpoint, cid).stdout.strip().split("\n")[1:] + container_nodes = ( + frostfs_cli.container.search_node(self.cluster.default_rpc_endpoint, cid, timeout=CLI_DEFAULT_TIMEOUT) + .stdout.strip() + .split("\n")[1:] + ) assert len(container_nodes) == expected_nodes @allure.title("Create container with EC policy and FILTER") def test_create_container_with_filter( self, default_user: User, - frostfs_local_cli: FrostfsCli, + frostfs_cli: FrostfsCli, simple_object_size: ObjectSize, ) -> None: with reporter.step("Create Container."): policy = "EC 1.1 IN RUS SELECT 2 FROM RU AS RUS FILTER Country EQ Russia AS RU" - cid = self.create_container(frostfs_local_cli, self.cluster.default_rpc_endpoint, policy) + cid = self.create_container(frostfs_cli, self.cluster.default_rpc_endpoint, policy) with reporter.step("Put object in container."): test_file = generate_file(simple_object_size.value) oid = put_object(default_user.wallet, test_file, cid, self.shell, self.cluster.default_rpc_endpoint) with reporter.step("Check object is decomposed exclusively on Russian nodes"): - data_chunk = self.get_data_chunk_object(frostfs_local_cli, cid, oid, self.cluster.default_rpc_endpoint) - parity_chunk = self.get_parity_chunk_object(frostfs_local_cli, cid, oid, self.cluster.default_rpc_endpoint) - node_data_chunk = self.get_chunk_node(frostfs_local_cli, data_chunk) - node_parity_chunk = self.get_chunk_node(frostfs_local_cli, parity_chunk) + data_chunk = self.get_data_chunk_object(frostfs_cli, cid, oid, self.cluster.default_rpc_endpoint) + parity_chunk = self.get_parity_chunk_object(frostfs_cli, cid, oid, self.cluster.default_rpc_endpoint) + node_data_chunk = self.get_chunk_node(frostfs_cli, data_chunk) + node_parity_chunk = self.get_chunk_node(frostfs_cli, parity_chunk) for node in [node_data_chunk[1], node_parity_chunk[1]]: assert "Russia" in node.country + + @allure.title("Evacuation shard with chunk (type={type})") + @pytest.mark.parametrize("type, get_chunk", [("data", get_data_chunk_object), ("parity", get_parity_chunk_object)]) + def test_evacuation_data_shard( + self, + default_user: User, + frostfs_cli: FrostfsCli, + max_object_size: int, + type: str, + get_chunk, + ) -> None: + with reporter.step("Create container."): + cid = self.create_container(frostfs_cli, self.cluster.default_rpc_endpoint, "EC 1.1 CBF 1") + + with reporter.step("Put object in container."): + test_file = generate_file(max_object_size - 1000) + oid = put_object(default_user.wallet, test_file, cid, self.shell, self.cluster.default_rpc_endpoint) + + with reporter.step("Get object chunks."): + chunk = get_chunk(self, frostfs_cli, cid, oid, self.cluster.default_rpc_endpoint) + chunk_node = self.get_chunk_node(frostfs_cli, chunk) + frostfs_node_cli = FrostfsCli( + chunk_node[0].host.get_shell(), + frostfs_cli_exec_path=FROSTFS_CLI_EXEC, + config_file=chunk_node[0].storage_node.get_remote_wallet_config_path(), + ) + + with reporter.step("Search shards chunk"): + shard_id = self.get_shard_chunk(chunk_node[0], chunk) + + with reporter.step("Enable evacuation for shard"): + frostfs_node_cli.shards.set_mode(chunk_node[0].storage_node.get_control_endpoint(), mode="read-only", id=shard_id) + frostfs_node_cli.shards.evacuation_start(chunk_node[0].storage_node.get_control_endpoint(), shard_id, await_mode=True) + + with reporter.step("Get object after evacuation shard"): + get_object(default_user.wallet, cid, oid, self.shell, self.cluster.default_rpc_endpoint) + + @allure.title("[NEGATIVE] Don`t create more 1 EC policy") + def test_more_one_ec_policy( + self, + frostfs_cli: FrostfsCli, + ) -> None: + with reporter.step("Create container with policy - 'EC 2.1 EC 1.1'"): + with pytest.raises(RuntimeError, match="can't parse placement policy"): + self.create_container(frostfs_cli, self.cluster.default_rpc_endpoint, "EC 2.1 EC 1.1 CBF 1 SELECT 4 FROM *") + + @allure.title("Create bucket with EC policy (s3_client={s3_client})") + @pytest.mark.parametrize("s3_policy, s3_client", [("pytest_tests/resources/files/policy.json", AwsCliClient)], indirect=True) + def test_create_bucket_with_ec_location( + self, + s3_client: S3ClientWrapper, + bucket_container_resolver: BucketContainerResolver, + frostfs_cli: FrostfsCli, + ) -> None: + with reporter.step("Create bucket with EC location constrain"): + bucket = s3_client.create_bucket(location_constraint="ec3.1") + + with reporter.step("Resolve container bucket"): + cid = bucket_container_resolver.resolve(self.cluster.cluster_nodes[0], bucket) + + with reporter.step("Validate container policy"): + container = frostfs_cli.container.get( + self.cluster.default_rpc_endpoint, cid, json_mode=True, timeout=CLI_DEFAULT_TIMEOUT + ).stdout + assert container + + @allure.title("[NEGATIVE] Don`t create more 1 EC policy") + def test_more_one_ec_policy( + self, + frostfs_cli: FrostfsCli, + ) -> None: + with reporter.step("Create container with policy - 'EC 2.1 EC 1.1'"): + with pytest.raises(RuntimeError, match="can't parse placement policy"): + self.create_container(frostfs_cli, self.cluster.default_rpc_endpoint, "EC 2.1 EC 1.1 CBF 1 SELECT 4 FROM *") + + @allure.title("Bucket object count chunks (s3_client={s3_client}, size={object_size})") + @pytest.mark.parametrize("s3_policy, s3_client", [("pytest_tests/resources/files/policy.json", AwsCliClient)], indirect=True) + def test_count_chunks_bucket_with_ec_location( + self, + s3_client: S3ClientWrapper, + bucket_container_resolver: BucketContainerResolver, + frostfs_cli: FrostfsCli, + object_size: ObjectSize, + ) -> None: + with reporter.step("Create bucket with EC location constrain"): + bucket = s3_client.create_bucket(location_constraint="ec3.1") + + with reporter.step("Enable versioning object"): + s3_client.put_bucket_versioning(bucket, VersioningStatus.ENABLED) + bucket_status = s3_client.get_bucket_versioning_status(bucket) + assert bucket_status == VersioningStatus.ENABLED.value + + with reporter.step("Put object in bucket"): + test_file = generate_file(object_size.value) + bucket_object = s3_client.put_object(bucket, test_file) + + with reporter.step("Watch replication count chunks"): + cid = bucket_container_resolver.resolve(self.cluster.cluster_nodes[0], bucket) + chunks = self.get_all_chunks_object(frostfs_cli, cid, bucket_object, self.cluster.default_rpc_endpoint) + expect_chunks = 4 if object_size.name == "simple" else 16 + assert len(chunks) == expect_chunks + + @allure.title("Replication chunk after drop (size={object_size})") + def test_drop_chunk_and_replication( + self, + frostfs_cli: FrostfsCli, + default_user: User, + object_size: ObjectSize, + rep_count: int, + ) -> None: + with reporter.step("Create container"): + cid = self.create_container(frostfs_cli, self.cluster.default_rpc_endpoint, "EC 2.1 CBF 1") + + with reporter.step("Put object"): + test_file = generate_file(object_size.value) + oid = put_object(default_user.wallet, test_file, cid, self.shell, self.cluster.default_rpc_endpoint) + + with reporter.step("Get all chunks"): + chunk = self.get_data_chunk_object(frostfs_cli, cid, oid, self.cluster.default_rpc_endpoint) + + with reporter.step("Search chunk node"): + chunk_node = self.get_chunk_node(frostfs_cli, chunk) + shell_chunk_node = chunk_node[0].host.get_shell() + + with reporter.step("Get replication count"): + assert self.check_replication(rep_count, frostfs_cli, cid, oid) + + with reporter.step("Delete chunk"): + frostfs_node_cli = FrostfsCli( + shell_chunk_node, + frostfs_cli_exec_path=FROSTFS_CLI_EXEC, + config_file=chunk_node[0].storage_node.get_remote_wallet_config_path(), + ) + frostfs_node_cli.control.drop_objects(chunk_node[0].storage_node.get_control_endpoint(), f"{cid}/{chunk.object_id}") + + with reporter.step("Wait replication count after drop one chunk"): + self.wait_replication(rep_count, frostfs_cli, cid, oid)