diff --git a/pyproject.toml b/pyproject.toml index 7506ab9a..57c1f3a9 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,8 +1,8 @@ [tool.isort] profile = "black" src_paths = ["pytest_tests"] -line_length = 120 +line_length = 140 [tool.black] -line-length = 120 +line-length = 140 target-version = ["py310"] diff --git a/pytest.ini b/pytest.ini index 067a4bac..5f358ee0 100644 --- a/pytest.ini +++ b/pytest.ini @@ -64,5 +64,6 @@ markers = write_cache_loss: tests for write cache loss time: time tests replication: replication tests + ec_replication: replication EC static_session_container: tests for a static session in a container shard: shard management tests diff --git a/pytest_tests/testsuites/replication/test_ec_replication.py b/pytest_tests/testsuites/replication/test_ec_replication.py new file mode 100644 index 00000000..852ceb53 --- /dev/null +++ b/pytest_tests/testsuites/replication/test_ec_replication.py @@ -0,0 +1,578 @@ +import json +from dataclasses import dataclass + +import allure +import pytest +import yaml +from frostfs_testlib import reporter +from frostfs_testlib.cli import FrostfsAdm, FrostfsCli +from frostfs_testlib.cli.netmap_parser import NetmapParser +from frostfs_testlib.credentials.interfaces import User +from frostfs_testlib.resources.cli import FROSTFS_ADM_CONFIG_PATH, FROSTFS_ADM_EXEC, FROSTFS_CLI_EXEC +from frostfs_testlib.shell import Shell +from frostfs_testlib.steps.cli.object import get_object, put_object +from frostfs_testlib.storage.cluster import Cluster, ClusterNode, StorageNode +from frostfs_testlib.storage.controllers import ClusterStateController +from frostfs_testlib.storage.controllers.state_managers.config_state_manager import ConfigStateManager +from frostfs_testlib.storage.dataclasses.object_size import ObjectSize +from frostfs_testlib.storage.dataclasses.storage_object_info import NodeNetmapInfo +from frostfs_testlib.testing.cluster_test_base import ClusterTestBase +from frostfs_testlib.utils.cli_utils import parse_netmap_output +from frostfs_testlib.utils.file_utils import generate_file, get_file_hash + +from pytest_tests.resources.common import HOSTING_CONFIG_FILE + + +def pytest_generate_tests(metafunc: pytest.Metafunc) -> None: + if "ec_policy" not in metafunc.fixturenames: + return + + with open(HOSTING_CONFIG_FILE, "r") as file: + hosting_config = yaml.full_load(file) + + node_count = len(hosting_config["hosts"]) + + ec_map = { + 4: ["EC 1.1", "EC 2.1", "EC 3.1"], + 8: ["EC 5.3", "EC 3.2", "EC 7.1", "EC 4.4"], + 16: ["EC 12.4", "EC 8.4", "EC 5.3", "EC 4.4"], + 100: ["EC 12.4", "EC 8.4", "EC 5.3", "EC 4.4"], + } + + metafunc.parametrize("ec_policy, node_count", ((ec_policy, node_count) for ec_policy in ec_map[node_count])) + + +@dataclass +class Chunk: + def __init__(self, object_id: str, required_nodes: list, confirmed_nodes: list, ec_parent_object_id: str, ec_index: int) -> None: + self.object_id = object_id + self.required_nodes = required_nodes + self.confirmed_nodes = confirmed_nodes + self.ec_parent_object_id = ec_parent_object_id + self.ec_index = ec_index + + def __str__(self) -> str: + return self.object_id + + +@allure.title("Initialized local FrostfsCli") +@pytest.fixture() +def frostfs_local_cli(client_shell: Shell, default_user: User) -> FrostfsCli: + return FrostfsCli(client_shell, frostfs_cli_exec_path=FROSTFS_CLI_EXEC, config_file=default_user.wallet.config_path) + + +@allure.title("Initialized remote FrostfsAdm") +@pytest.fixture +def frostfs_remote_adm(cluster: Cluster) -> FrostfsAdm: + node = cluster.cluster_nodes[0] + shell = node.host.get_shell() + return FrostfsAdm(shell, frostfs_adm_exec_path=FROSTFS_ADM_EXEC, config_file=FROSTFS_ADM_CONFIG_PATH) + + +@pytest.mark.replication +@pytest.mark.ec_replication +class TestECReplication(ClusterTestBase): + @allure.title("Restore chunk maximum params in network params ") + @pytest.fixture + def restore_network_config(self, frostfs_remote_adm: FrostfsAdm) -> None: + yield + frostfs_remote_adm.morph.set_config(set_key_value='"MaxECDataCount=12" "MaxECParityCount=5"') + + @reporter.step("Get object nodes output ") + def get_object_nodes(self, cli: FrostfsCli, cid: str, oid: str, endpoint: str = None) -> dict: + if not endpoint: + endpoint = self.cluster.default_rpc_endpoint + return json.loads(cli.object.nodes(endpoint, cid, oid=oid, json=True).stdout) + + @reporter.step("Get all chunks object ") + def get_all_chunks_object(self, cli: FrostfsCli, cid: str, oid: str, endpoint: str = None) -> list[Chunk]: + chunks = self.get_object_nodes(cli, cid, oid, endpoint) + return [Chunk(**chunk) for chunk in chunks["data_objects"]] + + @reporter.step("Get parity chunk ") + def get_parity_chunk_object(self, cli: FrostfsCli, cid: str, oid: str, endpoint: str = None) -> Chunk: + chunks = self.get_object_nodes(cli, cid, oid, endpoint)["data_objects"] + return Chunk(**chunks[-1]) + + @reporter.step("Get data chunk ") + def get_data_chunk_object(self, cli: FrostfsCli, cid: str, oid: str, endpoint: str = None) -> Chunk: + chunks = self.get_object_nodes(cli, cid, oid, endpoint)["data_objects"] + return Chunk(**chunks[0]) + + @reporter.step("Search node without chunks ") + def search_node_not_chunks(self, chunks: list[Chunk], local_cli: FrostfsCli, endpoint: str = None) -> list[ClusterNode]: + if not endpoint: + self.cluster.default_rpc_endpoint + netmap = parse_netmap_output(local_cli.netmap.snapshot(endpoint).stdout) + chunks_node_key = [] + for chunk in chunks: + chunks_node_key.extend(chunk.confirmed_nodes) + for node_info in netmap.copy(): + if node_info.node_id in chunks_node_key and node_info in netmap: + netmap.remove(node_info) + result = [] + for node_info in netmap: + for cluster_node in self.cluster.cluster_nodes: + if node_info.node == cluster_node.host_ip: + result.append(cluster_node) + return result + + @reporter.step("Create container, policy={policy}") + def create_container(self, user_cli: FrostfsCli, endpoint: str, policy: str) -> str: + return user_cli.container.create(endpoint, policy=policy, await_mode=True).stdout.split(" ")[1].strip().split("\n")[0] + + @reporter.step("Search node chunk {chunk}") + def get_chunk_node(self, frostfs_cli: FrostfsCli, chunk: Chunk) -> tuple[ClusterNode, NodeNetmapInfo]: + netmap = parse_netmap_output(frostfs_cli.netmap.snapshot(self.cluster.default_rpc_endpoint).stdout) + for node_info in netmap: + if node_info.node_id in chunk.confirmed_nodes: + for cluster_node in self.cluster.cluster_nodes: + if cluster_node.host_ip == node_info.node: + return (cluster_node, node_info) + + @reporter.step("Check replication chunks={total_chunks} chunks ") + def check_replication(self, total_chunks: int, local_cli: FrostfsCli, cid: str, oid: str) -> bool: + object_nodes_info = local_cli.object.nodes(self.cluster.default_rpc_endpoint, cid, oid=oid, json=True).stdout + object_nodes_info = json.loads(object_nodes_info) + return len(object_nodes_info["data_objects"]) == total_chunks + + @allure.title("Disable Policer on all nodes") + @pytest.fixture() + def disable_policer( + self, + cluster_state_controller: ClusterStateController, + ) -> None: + with reporter.step(f"Disable policer for nodes"): + cluster_state_controller.manager(ConfigStateManager).set_on_all_nodes( + service_type=StorageNode, values={"policer": {"unsafe_disable": True}} + ) + yield + with reporter.step(f"Enable policer for nodes"): + cluster_state_controller.start_stopped_hosts() + cluster_state_controller.manager(ConfigStateManager).revert_all() + + @allure.title("Create container with EC policy (size={object_size.value})") + def test_create_container_with_ec_policy( + self, + default_user: User, + frostfs_local_cli: FrostfsCli, + object_size: ObjectSize, + ) -> None: + test_file = generate_file(object_size.value) + rep_count = 3 + if object_size.name == "complex": + rep_count *= 4 + with reporter.step("Create container."): + cid = self.create_container(frostfs_local_cli, self.cluster.default_rpc_endpoint, "EC 2.1") + + with reporter.step("Put object in container."): + oid = put_object(default_user.wallet, test_file, cid, self.shell, self.cluster.default_rpc_endpoint) + + with reporter.step("Check replication chunks."): + assert self.check_replication(rep_count, frostfs_local_cli, cid, oid) + + @allure.title("Lose node with chunk data") + @pytest.mark.failover + def test_lose_node_with_data_chunk( + self, + frostfs_local_cli: FrostfsCli, + default_user: User, + simple_object_size: ObjectSize, + cluster_state_controller: ClusterStateController, + disable_policer: None, + ) -> None: + with reporter.step("Create container."): + cid = self.create_container(frostfs_local_cli, self.cluster.default_rpc_endpoint, "EC 3.1") + + with reporter.step("Put object in container."): + test_file = generate_file(simple_object_size.value) + oid = put_object(default_user.wallet, test_file, cid, self.shell, self.cluster.default_rpc_endpoint) + + with reporter.step("Check chunk replication on 4 nodes."): + assert self.check_replication(4, frostfs_local_cli, cid, oid) + + with reporter.step("Search node data chunk"): + chunk = self.get_data_chunk_object(frostfs_local_cli, cid, oid) + chunk_node = self.get_chunk_node(frostfs_local_cli, chunk)[0] + + with reporter.step("Stop node with data chunk."): + cluster_state_controller.stop_node_host(chunk_node, "hard") + + with reporter.step("Get object"): + node = list(set(self.cluster.cluster_nodes) - {chunk_node})[0] + get_object(default_user.wallet, cid, oid, self.shell, node.storage_node.get_rpc_endpoint()) + + with reporter.step("Start stopped node, and check replication chunks."): + cluster_state_controller.start_node_host(chunk_node) + assert self.check_replication(4, frostfs_local_cli, cid, oid) + + @allure.title("Lose node with chunk parity") + @pytest.mark.failover + def test_lose_node_with_parity_chunk( + self, + frostfs_local_cli: FrostfsCli, + default_user: User, + simple_object_size: ObjectSize, + cluster_state_controller: ClusterStateController, + disable_policer: None, + ) -> None: + with reporter.step("Create container."): + cid = self.create_container(frostfs_local_cli, self.cluster.default_rpc_endpoint, "EC 3.1") + + with reporter.step("Put object in container."): + test_file = generate_file(simple_object_size.value) + oid = put_object(default_user.wallet, test_file, cid, self.shell, self.cluster.default_rpc_endpoint) + + with reporter.step("Check chunk replication on 4 nodes."): + assert self.check_replication(4, frostfs_local_cli, cid, oid) + + with reporter.step("Search node with parity chunk"): + chunk = self.get_parity_chunk_object(frostfs_local_cli, cid, oid) + chunk_node = self.get_chunk_node(frostfs_local_cli, chunk)[0] + + with reporter.step("Stop node parity chunk."): + cluster_state_controller.stop_node_host(chunk_node, "hard") + + with reporter.step("Get object, expect success."): + node = list(set(self.cluster.cluster_nodes) - {chunk_node})[0] + get_object(default_user.wallet, cid, oid, self.shell, node.storage_node.get_rpc_endpoint()) + + with reporter.step("Start stoped node, and check replication chunks."): + cluster_state_controller.start_node_host(chunk_node) + assert self.check_replication(4, frostfs_local_cli, cid, oid) + + @allure.title("Lose nodes with chunk data and parity") + @pytest.mark.failover + def test_lose_nodes_data_chunk_and_parity( + self, + frostfs_local_cli: FrostfsCli, + default_user: User, + simple_object_size: ObjectSize, + cluster_state_controller: ClusterStateController, + disable_policer: None, + ) -> None: + with reporter.step("Create container."): + cid = self.create_container(frostfs_local_cli, self.cluster.default_rpc_endpoint, "EC 3.1") + + with reporter.step("Put object in container."): + test_file = generate_file(simple_object_size.value) + oid = put_object(default_user.wallet, test_file, cid, self.shell, self.cluster.default_rpc_endpoint) + + with reporter.step("Check count chunks, expect 4."): + assert self.check_replication(4, frostfs_local_cli, cid, oid) + + with reporter.step("Search node data chunk and node parity chunk"): + data_chunk = self.get_data_chunk_object(frostfs_local_cli, cid, oid) + node_data_chunk = self.get_chunk_node(frostfs_local_cli, data_chunk)[0] + parity_chunk = self.get_parity_chunk_object(frostfs_local_cli, cid, oid) + node_parity_chunk = self.get_chunk_node(frostfs_local_cli, parity_chunk)[0] + + with reporter.step("Stop node with data chunk."): + cluster_state_controller.stop_node_host(node_data_chunk, "hard") + + with reporter.step("Get object"): + node = list(set(self.cluster.cluster_nodes) - {node_data_chunk, node_parity_chunk})[0] + get_object(default_user.wallet, cid, oid, self.shell, node.storage_node.get_rpc_endpoint()) + + with reporter.step("Start stopped host and check chunks."): + cluster_state_controller.start_node_host(node_data_chunk) + assert self.check_replication(4, frostfs_local_cli, cid, oid) + + with reporter.step("Stop node with parity chunk and one all node."): + cluster_state_controller.stop_node_host(node_data_chunk, "hard") + cluster_state_controller.stop_node_host(node_parity_chunk, "hard") + + with reporter.step("Get object, expect error."): + with pytest.raises(RuntimeError): + get_object(default_user.wallet, cid, oid, self.shell, node.storage_node.get_rpc_endpoint()) + + with reporter.step("Start stopped nodes and check replication chunk."): + cluster_state_controller.start_stopped_hosts() + assert self.check_replication(4, frostfs_local_cli, cid, oid) + + @allure.title("Policer work with chunk") + @pytest.mark.failover + def test_work_policer_with_nodes( + self, + simple_object_size: ObjectSize, + frostfs_local_cli: FrostfsCli, + default_user: User, + cluster_state_controller: ClusterStateController, + ) -> None: + with reporter.step("Create container."): + cid = self.create_container(frostfs_local_cli, self.cluster.default_rpc_endpoint, "EC 2.1") + + with reporter.step("Put object on container."): + test_file = generate_file(simple_object_size.value) + oid = put_object(default_user.wallet, test_file, cid, self.shell, self.cluster.default_rpc_endpoint) + + with reporter.step("Check count chunks nodes on 3."): + assert self.check_replication(3, frostfs_local_cli, cid, oid) + + with reporter.step("Stop node with chunk."): + data_chunk = self.get_data_chunk_object(frostfs_local_cli, cid, oid) + first_all_chunks = self.get_all_chunks_object(frostfs_local_cli, cid, oid) + node_data_chunk = self.get_chunk_node(frostfs_local_cli, data_chunk)[0] + cluster_state_controller.stop_node_host(node_data_chunk, "hard") + + with reporter.step("Check replication chunk with different node."): + alive_endpoint = list(set(self.cluster.cluster_nodes) - {node_data_chunk})[0].storage_node.get_rpc_endpoint() + node = self.search_node_not_chunks(first_all_chunks, frostfs_local_cli, endpoint=alive_endpoint)[0] + second_all_chunks = self.get_all_chunks_object(frostfs_local_cli, cid, oid, node.storage_node.get_rpc_endpoint()) + + with reporter.step("Check that oid no change."): + oid_chunk_check = [chunk for chunk in second_all_chunks if data_chunk.object_id == chunk.object_id] + assert len(oid_chunk_check) > 0 + + with reporter.step("Start stopped host, and check delete 4 chunk."): + cluster_state_controller.start_node_host(node_data_chunk) + all_chunks_after_start_node = self.get_all_chunks_object(frostfs_local_cli, cid, oid) + assert len(all_chunks_after_start_node) == 3 + + @allure.title("EC X.Y combinations (nodes={node_count},policy={ec_policy},size={object_size.name})") + def test_create_container_with_difference_count_nodes( + self, + node_count: int, + ec_policy: str, + object_size: ObjectSize, + default_user: User, + frostfs_local_cli: FrostfsCli, + ) -> None: + with reporter.step("Create container."): + expected_chunks = int(ec_policy.split(" ")[1].split(".")[0]) + int(ec_policy.split(" ")[1].split(".")[1]) + if "complex" in object_size.name: + expected_chunks *= 4 + cid = self.create_container(frostfs_local_cli, self.cluster.default_rpc_endpoint, ec_policy) + + with reporter.step("Put object in container."): + test_file = generate_file(object_size.value) + oid = put_object(default_user.wallet, test_file, cid, self.shell, self.cluster.default_rpc_endpoint) + + with reporter.step("Check count object chunks."): + chunks = self.get_all_chunks_object(frostfs_local_cli, cid, oid, self.cluster.default_rpc_endpoint) + assert len(chunks) == expected_chunks + + with reporter.step("get object and check hash."): + file_with_node = get_object(default_user.wallet, cid, oid, self.shell, self.cluster.default_rpc_endpoint) + assert get_file_hash(test_file) == get_file_hash(file_with_node) + + @allure.title("Request PUT with copies_number flag") + def test_put_object_with_copies_number( + self, + frostfs_local_cli: FrostfsCli, + default_user: User, + simple_object_size: ObjectSize, + ) -> None: + with reporter.step("Create container."): + cid = self.create_container(frostfs_local_cli, self.cluster.default_rpc_endpoint, "EC 2.1") + + with reporter.step("Put object in container with copies number = 1"): + test_file = generate_file(simple_object_size.value) + oid = put_object(default_user.wallet, test_file, cid, self.shell, self.cluster.default_rpc_endpoint, copies_number=1) + + with reporter.step("Check that count chunks > 1."): + chunks = self.get_all_chunks_object(frostfs_local_cli, cid, oid, self.cluster.default_rpc_endpoint) + assert len(chunks) > 1 + + @allure.title("Request PUT and 1 node off") + @pytest.mark.failover + def test_put_object_with_off_cnr_node( + self, + frostfs_local_cli: FrostfsCli, + cluster_state_controller: ClusterStateController, + default_user: User, + simple_object_size: ObjectSize, + ) -> None: + with reporter.step("Create container."): + cid = self.create_container(frostfs_local_cli, self.cluster.default_rpc_endpoint, "EC 3.1") + + with reporter.step("Stop one node in container nodes"): + cluster_state_controller.stop_node_host(self.cluster.cluster_nodes[1], "hard") + + with reporter.step("Put object in container, expect error."): + test_file = generate_file(simple_object_size.value) + with pytest.raises(RuntimeError, match="put single object on client"): + put_object(default_user.wallet, test_file, cid, self.shell, self.cluster.default_rpc_endpoint) + + @allure.title("Request DELETE (size={object_size.name})") + @pytest.mark.failover + def test_delete_object_in_ec_cnr( + self, + default_user: User, + frostfs_local_cli: FrostfsCli, + object_size: ObjectSize, + cluster_state_controller: ClusterStateController, + ) -> None: + with reporter.step("Create container."): + cid = self.create_container(frostfs_local_cli, self.cluster.default_rpc_endpoint, "EC 2.1") + + with reporter.step("Put object in container."): + test_file = generate_file(object_size.value) + oid = put_object(default_user.wallet, test_file, cid, self.shell, self.cluster.default_rpc_endpoint) + + with reporter.step("Check object chunks nodes."): + chunks_object = self.get_all_chunks_object(frostfs_local_cli, cid, oid, self.cluster.default_rpc_endpoint) + replication_count = 3 if object_size.name == "simple" else 3 * 4 + assert len(chunks_object) == replication_count + + with reporter.step("Delete object"): + frostfs_local_cli.object.delete(self.cluster.default_rpc_endpoint, cid, oid) + + with reporter.step("Check that delete all chunks."): + for chunk in chunks_object: + with pytest.raises(RuntimeError, match="object already removed"): + frostfs_local_cli.object.head(self.cluster.default_rpc_endpoint, cid, chunk.object_id) + + with reporter.step("Put second object."): + oid_second = put_object(default_user.wallet, test_file, cid, self.shell, self.cluster.default_rpc_endpoint) + + with reporter.step("Check second object chunks nodes."): + chunks_second_object = self.get_all_chunks_object(frostfs_local_cli, cid, oid_second, self.cluster.default_rpc_endpoint) + assert len(chunks_second_object) == replication_count + + with reporter.step("Stop nodes with chunk."): + chunk_node = self.get_chunk_node(frostfs_local_cli, chunks_second_object[0]) + cluster_state_controller.stop_node_host(chunk_node[0], "hard") + + with reporter.step("Delete second object"): + frostfs_local_cli.object.delete(self.cluster.default_rpc_endpoint, cid, oid_second) + + with reporter.step("Check that delete all chunk second object."): + for chunk in chunks_second_object: + with pytest.raises(RuntimeError, match="object already removed"): + frostfs_local_cli.object.head(self.cluster.default_rpc_endpoint, cid, chunk.object_id) + + @allure.title("Request LOCK (size={object_size.name})") + @pytest.mark.failover + def test_lock_object_in_ec_cnr( + self, + frostfs_local_cli: FrostfsCli, + object_size: ObjectSize, + default_user: User, + cluster_state_controller: ClusterStateController, + ) -> None: + with reporter.step("Create container."): + cid = self.create_container(frostfs_local_cli, self.cluster.default_rpc_endpoint, "EC 2.1") + + with reporter.step("Put object in container."): + test_file = generate_file(object_size.value) + oid = put_object(default_user.wallet, test_file, cid, self.shell, self.cluster.default_rpc_endpoint) + + with reporter.step("Check object chunks nodes."): + chunks_object = self.get_all_chunks_object(frostfs_local_cli, cid, oid, self.cluster.default_rpc_endpoint) + replication_count = 3 if object_size.name == "simple" else 3 * 4 + assert len(chunks_object) == replication_count + + with reporter.step("Put LOCK in object."): + epoch = frostfs_local_cli.netmap.epoch(self.cluster.default_rpc_endpoint).stdout.strip() + frostfs_local_cli.object.lock(self.cluster.default_rpc_endpoint, cid, oid, expire_at=int(epoch) + 5).stdout + + with reporter.step("Check LOCK in object"): + chunks = frostfs_local_cli.object.head(self.cluster.default_rpc_endpoint, cid, oid, raw=True).stdout.strip().split(" ") + oids_chunks = [chunk.strip() for chunk in chunks if len(chunk) > 35] + for chunk_id in oids_chunks: + with pytest.raises(RuntimeError, match="could not delete objects"): + frostfs_local_cli.object.delete(self.cluster.default_rpc_endpoint, cid, chunk_id) + + with reporter.step("Stop chunk node."): + chunk_node = self.get_chunk_node(frostfs_local_cli, chunks_object[0]) + cluster_state_controller.stop_node_host(chunk_node[0], "hard") + cluster_state_controller.start_node_host(chunk_node[0]) + + with reporter.step("Check LOCK in object."): + chunks = self.get_all_chunks_object(frostfs_local_cli, cid, oid, self.cluster.default_rpc_endpoint) + for chunk_id in oids_chunks: + with pytest.raises(RuntimeError, match="could not delete objects"): + frostfs_local_cli.object.delete(self.cluster.default_rpc_endpoint, cid, chunk_id) + + @allure.title("Output MaxEC* params in frostfscli (type={type_shards})") + @pytest.mark.parametrize("type_shards", ["Maximum count of data shards", "Maximum count of parity shards"]) + def test_maxec_info_with_output_cli(self, frostfs_local_cli: FrostfsCli, type_shards: str) -> None: + with reporter.step("Get and check params"): + net_info = frostfs_local_cli.netmap.netinfo(self.cluster.default_rpc_endpoint).stdout + assert type_shards in net_info + + @allure.title("Change MaxEC*Count params") + def test_change_max_data_shards_params( + self, + frostfs_remote_adm: FrostfsAdm, + frostfs_local_cli: FrostfsCli, + restore_network_config: None, + ) -> None: + with reporter.step("Get now params MaxECDataCount and MaxECParityCount"): + node_netinfo = NetmapParser.netinfo(frostfs_local_cli.netmap.netinfo(self.cluster.default_rpc_endpoint).stdout) + + with reporter.step("Change params"): + frostfs_remote_adm.morph.set_config(set_key_value='"MaxECDataCount=5" "MaxECParityCount=3"') + + with reporter.step("Get update params"): + update_net_info = NetmapParser.netinfo(frostfs_local_cli.netmap.netinfo(self.cluster.default_rpc_endpoint).stdout) + + with reporter.step("Check old and new params difference"): + assert ( + update_net_info.maximum_count_of_data_shards not in node_netinfo.maximum_count_of_data_shards + and update_net_info.maximum_count_of_parity_shards not in node_netinfo.maximum_count_of_parity_shards + ) + + @allure.title("Check maximum count data and parity shards") + def test_change_over_max_parity_shards_params( + self, + frostfs_remote_adm: FrostfsAdm, + ) -> None: + with reporter.step("Change over maximum params shards count."): + with pytest.raises(RuntimeError, match="MaxECDataCount and MaxECParityCount must be <= 256"): + frostfs_remote_adm.morph.set_config(set_key_value='"MaxECDataCount=130" "MaxECParityCount=130"') + + @allure.title("Create container with EC policy and SELECT (SELECT={select})") + @pytest.mark.parametrize("select", [2, 4]) + def test_create_container_with_select( + self, + select: int, + frostfs_local_cli: FrostfsCli, + ) -> None: + with reporter.step("Create container"): + policy = f"EC 1.1 CBF 1 SELECT {select} FROM *" + cid = self.create_container(frostfs_local_cli, self.cluster.default_rpc_endpoint, policy) + + with reporter.step("Check container nodes decomposed"): + container_nodes = frostfs_local_cli.container.search_node(self.cluster.default_rpc_endpoint, cid).stdout.strip().split("\n")[1:] + assert len(container_nodes) == select + + @allure.title("Create container with EC policy and CBF (CBF={cbf})") + @pytest.mark.parametrize("cbf, expected_nodes", [(1, 2), (2, 4)]) + def test_create_container_with_cbf( + self, + cbf: int, + expected_nodes: int, + frostfs_local_cli: FrostfsCli, + ) -> None: + with reporter.step("Create container."): + policy = f"EC 1.1 CBF {cbf}" + cid = self.create_container(frostfs_local_cli, self.cluster.default_rpc_endpoint, policy) + + with reporter.step("Check expected container nodes."): + container_nodes = frostfs_local_cli.container.search_node(self.cluster.default_rpc_endpoint, cid).stdout.strip().split("\n")[1:] + assert len(container_nodes) == expected_nodes + + @allure.title("Create container with EC policy and FILTER") + def test_create_container_with_filter( + self, + default_user: User, + frostfs_local_cli: FrostfsCli, + simple_object_size: ObjectSize, + ) -> None: + with reporter.step("Create Container."): + policy = "EC 1.1 IN RUS SELECT 2 FROM RU AS RUS FILTER Country EQ Russia AS RU" + cid = self.create_container(frostfs_local_cli, self.cluster.default_rpc_endpoint, policy) + + with reporter.step("Put object in container."): + test_file = generate_file(simple_object_size.value) + oid = put_object(default_user.wallet, test_file, cid, self.shell, self.cluster.default_rpc_endpoint) + + with reporter.step("Check object is decomposed exclusively on Russian nodes"): + data_chunk = self.get_data_chunk_object(frostfs_local_cli, cid, oid, self.cluster.default_rpc_endpoint) + parity_chunk = self.get_parity_chunk_object(frostfs_local_cli, cid, oid, self.cluster.default_rpc_endpoint) + node_data_chunk = self.get_chunk_node(frostfs_local_cli, data_chunk) + node_parity_chunk = self.get_chunk_node(frostfs_local_cli, parity_chunk) + for node in [node_data_chunk[1], node_parity_chunk[1]]: + assert "Russia" in node.country