import json from dataclasses import dataclass import allure import pytest import yaml from frostfs_testlib import reporter from frostfs_testlib.cli import FrostfsAdm, FrostfsCli from frostfs_testlib.cli.netmap_parser import NetmapParser from frostfs_testlib.credentials.interfaces import User from frostfs_testlib.resources.cli import FROSTFS_ADM_CONFIG_PATH, FROSTFS_ADM_EXEC, FROSTFS_CLI_EXEC from frostfs_testlib.shell import Shell from frostfs_testlib.steps.cli.object import get_object, put_object from frostfs_testlib.storage.cluster import Cluster, ClusterNode, StorageNode from frostfs_testlib.storage.controllers import ClusterStateController from frostfs_testlib.storage.controllers.state_managers.config_state_manager import ConfigStateManager from frostfs_testlib.storage.dataclasses.object_size import ObjectSize from frostfs_testlib.storage.dataclasses.storage_object_info import NodeNetmapInfo from frostfs_testlib.testing.cluster_test_base import ClusterTestBase from frostfs_testlib.utils.cli_utils import parse_netmap_output from frostfs_testlib.utils.file_utils import generate_file, get_file_hash from pytest_tests.resources.common import HOSTING_CONFIG_FILE def pytest_generate_tests(metafunc: pytest.Metafunc) -> None: if "ec_policy" not in metafunc.fixturenames: return with open(HOSTING_CONFIG_FILE, "r") as file: hosting_config = yaml.full_load(file) node_count = len(hosting_config["hosts"]) ec_map = { 4: ["EC 1.1", "EC 2.1", "EC 3.1"], 8: ["EC 5.3", "EC 3.2", "EC 7.1", "EC 4.4"], 16: ["EC 12.4", "EC 8.4", "EC 5.3", "EC 4.4"], 100: ["EC 12.4", "EC 8.4", "EC 5.3", "EC 4.4"], } metafunc.parametrize("ec_policy, node_count", ((ec_policy, node_count) for ec_policy in ec_map[node_count])) @dataclass class Chunk: def __init__(self, object_id: str, required_nodes: list, confirmed_nodes: list, ec_parent_object_id: str, ec_index: int) -> None: self.object_id = object_id self.required_nodes = required_nodes self.confirmed_nodes = confirmed_nodes self.ec_parent_object_id = ec_parent_object_id self.ec_index = ec_index def __str__(self) -> str: return self.object_id @allure.title("Initialized local FrostfsCli") @pytest.fixture() def frostfs_local_cli(client_shell: Shell, default_user: User) -> FrostfsCli: return FrostfsCli(client_shell, frostfs_cli_exec_path=FROSTFS_CLI_EXEC, config_file=default_user.wallet.config_path) @allure.title("Initialized remote FrostfsAdm") @pytest.fixture def frostfs_remote_adm(cluster: Cluster) -> FrostfsAdm: node = cluster.cluster_nodes[0] shell = node.host.get_shell() return FrostfsAdm(shell, frostfs_adm_exec_path=FROSTFS_ADM_EXEC, config_file=FROSTFS_ADM_CONFIG_PATH) @pytest.mark.replication @pytest.mark.ec_replication class TestECReplication(ClusterTestBase): @allure.title("Restore chunk maximum params in network params ") @pytest.fixture def restore_network_config(self, frostfs_remote_adm: FrostfsAdm) -> None: yield frostfs_remote_adm.morph.set_config(set_key_value='"MaxECDataCount=12" "MaxECParityCount=5"') @reporter.step("Get object nodes output ") def get_object_nodes(self, cli: FrostfsCli, cid: str, oid: str, endpoint: str = None) -> dict: if not endpoint: endpoint = self.cluster.default_rpc_endpoint return json.loads(cli.object.nodes(endpoint, cid, oid=oid, json=True).stdout) @reporter.step("Get all chunks object ") def get_all_chunks_object(self, cli: FrostfsCli, cid: str, oid: str, endpoint: str = None) -> list[Chunk]: chunks = self.get_object_nodes(cli, cid, oid, endpoint) return [Chunk(**chunk) for chunk in chunks["data_objects"]] @reporter.step("Get parity chunk ") def get_parity_chunk_object(self, cli: FrostfsCli, cid: str, oid: str, endpoint: str = None) -> Chunk: chunks = self.get_object_nodes(cli, cid, oid, endpoint)["data_objects"] return Chunk(**chunks[-1]) @reporter.step("Get data chunk ") def get_data_chunk_object(self, cli: FrostfsCli, cid: str, oid: str, endpoint: str = None) -> Chunk: chunks = self.get_object_nodes(cli, cid, oid, endpoint)["data_objects"] return Chunk(**chunks[0]) @reporter.step("Search node without chunks ") def search_node_not_chunks(self, chunks: list[Chunk], local_cli: FrostfsCli, endpoint: str = None) -> list[ClusterNode]: if not endpoint: self.cluster.default_rpc_endpoint netmap = parse_netmap_output(local_cli.netmap.snapshot(endpoint).stdout) chunks_node_key = [] for chunk in chunks: chunks_node_key.extend(chunk.confirmed_nodes) for node_info in netmap.copy(): if node_info.node_id in chunks_node_key and node_info in netmap: netmap.remove(node_info) result = [] for node_info in netmap: for cluster_node in self.cluster.cluster_nodes: if node_info.node == cluster_node.host_ip: result.append(cluster_node) return result @reporter.step("Create container, policy={policy}") def create_container(self, user_cli: FrostfsCli, endpoint: str, policy: str) -> str: return user_cli.container.create(endpoint, policy=policy, await_mode=True).stdout.split(" ")[1].strip().split("\n")[0] @reporter.step("Search node chunk {chunk}") def get_chunk_node(self, frostfs_cli: FrostfsCli, chunk: Chunk) -> tuple[ClusterNode, NodeNetmapInfo]: netmap = parse_netmap_output(frostfs_cli.netmap.snapshot(self.cluster.default_rpc_endpoint).stdout) for node_info in netmap: if node_info.node_id in chunk.confirmed_nodes: for cluster_node in self.cluster.cluster_nodes: if cluster_node.host_ip == node_info.node: return (cluster_node, node_info) @reporter.step("Check replication chunks={total_chunks} chunks ") def check_replication(self, total_chunks: int, local_cli: FrostfsCli, cid: str, oid: str) -> bool: object_nodes_info = local_cli.object.nodes(self.cluster.default_rpc_endpoint, cid, oid=oid, json=True).stdout object_nodes_info = json.loads(object_nodes_info) return len(object_nodes_info["data_objects"]) == total_chunks @allure.title("Disable Policer on all nodes") @pytest.fixture() def disable_policer( self, cluster_state_controller: ClusterStateController, ) -> None: with reporter.step(f"Disable policer for nodes"): cluster_state_controller.manager(ConfigStateManager).set_on_all_nodes( service_type=StorageNode, values={"policer": {"unsafe_disable": True}} ) yield with reporter.step(f"Enable policer for nodes"): cluster_state_controller.start_stopped_hosts() cluster_state_controller.manager(ConfigStateManager).revert_all() @allure.title("Create container with EC policy (size={object_size.value})") def test_create_container_with_ec_policy( self, default_user: User, frostfs_local_cli: FrostfsCli, object_size: ObjectSize, ) -> None: test_file = generate_file(object_size.value) rep_count = 3 if object_size.name == "complex": rep_count *= 4 with reporter.step("Create container."): cid = self.create_container(frostfs_local_cli, self.cluster.default_rpc_endpoint, "EC 2.1") with reporter.step("Put object in container."): oid = put_object(default_user.wallet, test_file, cid, self.shell, self.cluster.default_rpc_endpoint) with reporter.step("Check replication chunks."): assert self.check_replication(rep_count, frostfs_local_cli, cid, oid) @allure.title("Lose node with chunk data") @pytest.mark.failover def test_lose_node_with_data_chunk( self, frostfs_local_cli: FrostfsCli, default_user: User, simple_object_size: ObjectSize, cluster_state_controller: ClusterStateController, disable_policer: None, ) -> None: with reporter.step("Create container."): cid = self.create_container(frostfs_local_cli, self.cluster.default_rpc_endpoint, "EC 3.1") with reporter.step("Put object in container."): test_file = generate_file(simple_object_size.value) oid = put_object(default_user.wallet, test_file, cid, self.shell, self.cluster.default_rpc_endpoint) with reporter.step("Check chunk replication on 4 nodes."): assert self.check_replication(4, frostfs_local_cli, cid, oid) with reporter.step("Search node data chunk"): chunk = self.get_data_chunk_object(frostfs_local_cli, cid, oid) chunk_node = self.get_chunk_node(frostfs_local_cli, chunk)[0] with reporter.step("Stop node with data chunk."): cluster_state_controller.stop_node_host(chunk_node, "hard") with reporter.step("Get object"): node = list(set(self.cluster.cluster_nodes) - {chunk_node})[0] get_object(default_user.wallet, cid, oid, self.shell, node.storage_node.get_rpc_endpoint()) with reporter.step("Start stopped node, and check replication chunks."): cluster_state_controller.start_node_host(chunk_node) assert self.check_replication(4, frostfs_local_cli, cid, oid) @allure.title("Lose node with chunk parity") @pytest.mark.failover def test_lose_node_with_parity_chunk( self, frostfs_local_cli: FrostfsCli, default_user: User, simple_object_size: ObjectSize, cluster_state_controller: ClusterStateController, disable_policer: None, ) -> None: with reporter.step("Create container."): cid = self.create_container(frostfs_local_cli, self.cluster.default_rpc_endpoint, "EC 3.1") with reporter.step("Put object in container."): test_file = generate_file(simple_object_size.value) oid = put_object(default_user.wallet, test_file, cid, self.shell, self.cluster.default_rpc_endpoint) with reporter.step("Check chunk replication on 4 nodes."): assert self.check_replication(4, frostfs_local_cli, cid, oid) with reporter.step("Search node with parity chunk"): chunk = self.get_parity_chunk_object(frostfs_local_cli, cid, oid) chunk_node = self.get_chunk_node(frostfs_local_cli, chunk)[0] with reporter.step("Stop node parity chunk."): cluster_state_controller.stop_node_host(chunk_node, "hard") with reporter.step("Get object, expect success."): node = list(set(self.cluster.cluster_nodes) - {chunk_node})[0] get_object(default_user.wallet, cid, oid, self.shell, node.storage_node.get_rpc_endpoint()) with reporter.step("Start stoped node, and check replication chunks."): cluster_state_controller.start_node_host(chunk_node) assert self.check_replication(4, frostfs_local_cli, cid, oid) @allure.title("Lose nodes with chunk data and parity") @pytest.mark.failover def test_lose_nodes_data_chunk_and_parity( self, frostfs_local_cli: FrostfsCli, default_user: User, simple_object_size: ObjectSize, cluster_state_controller: ClusterStateController, disable_policer: None, ) -> None: with reporter.step("Create container."): cid = self.create_container(frostfs_local_cli, self.cluster.default_rpc_endpoint, "EC 3.1") with reporter.step("Put object in container."): test_file = generate_file(simple_object_size.value) oid = put_object(default_user.wallet, test_file, cid, self.shell, self.cluster.default_rpc_endpoint) with reporter.step("Check count chunks, expect 4."): assert self.check_replication(4, frostfs_local_cli, cid, oid) with reporter.step("Search node data chunk and node parity chunk"): data_chunk = self.get_data_chunk_object(frostfs_local_cli, cid, oid) node_data_chunk = self.get_chunk_node(frostfs_local_cli, data_chunk)[0] parity_chunk = self.get_parity_chunk_object(frostfs_local_cli, cid, oid) node_parity_chunk = self.get_chunk_node(frostfs_local_cli, parity_chunk)[0] with reporter.step("Stop node with data chunk."): cluster_state_controller.stop_node_host(node_data_chunk, "hard") with reporter.step("Get object"): node = list(set(self.cluster.cluster_nodes) - {node_data_chunk, node_parity_chunk})[0] get_object(default_user.wallet, cid, oid, self.shell, node.storage_node.get_rpc_endpoint()) with reporter.step("Start stopped host and check chunks."): cluster_state_controller.start_node_host(node_data_chunk) assert self.check_replication(4, frostfs_local_cli, cid, oid) with reporter.step("Stop node with parity chunk and one all node."): cluster_state_controller.stop_node_host(node_data_chunk, "hard") cluster_state_controller.stop_node_host(node_parity_chunk, "hard") with reporter.step("Get object, expect error."): with pytest.raises(RuntimeError): get_object(default_user.wallet, cid, oid, self.shell, node.storage_node.get_rpc_endpoint()) with reporter.step("Start stopped nodes and check replication chunk."): cluster_state_controller.start_stopped_hosts() assert self.check_replication(4, frostfs_local_cli, cid, oid) @allure.title("Policer work with chunk") @pytest.mark.failover def test_work_policer_with_nodes( self, simple_object_size: ObjectSize, frostfs_local_cli: FrostfsCli, default_user: User, cluster_state_controller: ClusterStateController, ) -> None: with reporter.step("Create container."): cid = self.create_container(frostfs_local_cli, self.cluster.default_rpc_endpoint, "EC 2.1") with reporter.step("Put object on container."): test_file = generate_file(simple_object_size.value) oid = put_object(default_user.wallet, test_file, cid, self.shell, self.cluster.default_rpc_endpoint) with reporter.step("Check count chunks nodes on 3."): assert self.check_replication(3, frostfs_local_cli, cid, oid) with reporter.step("Stop node with chunk."): data_chunk = self.get_data_chunk_object(frostfs_local_cli, cid, oid) first_all_chunks = self.get_all_chunks_object(frostfs_local_cli, cid, oid) node_data_chunk = self.get_chunk_node(frostfs_local_cli, data_chunk)[0] cluster_state_controller.stop_node_host(node_data_chunk, "hard") with reporter.step("Check replication chunk with different node."): alive_endpoint = list(set(self.cluster.cluster_nodes) - {node_data_chunk})[0].storage_node.get_rpc_endpoint() node = self.search_node_not_chunks(first_all_chunks, frostfs_local_cli, endpoint=alive_endpoint)[0] second_all_chunks = self.get_all_chunks_object(frostfs_local_cli, cid, oid, node.storage_node.get_rpc_endpoint()) with reporter.step("Check that oid no change."): oid_chunk_check = [chunk for chunk in second_all_chunks if data_chunk.object_id == chunk.object_id] assert len(oid_chunk_check) > 0 with reporter.step("Start stopped host, and check delete 4 chunk."): cluster_state_controller.start_node_host(node_data_chunk) all_chunks_after_start_node = self.get_all_chunks_object(frostfs_local_cli, cid, oid) assert len(all_chunks_after_start_node) == 3 @allure.title("EC X.Y combinations (nodes={node_count},policy={ec_policy},size={object_size.name})") def test_create_container_with_difference_count_nodes( self, node_count: int, ec_policy: str, object_size: ObjectSize, default_user: User, frostfs_local_cli: FrostfsCli, ) -> None: with reporter.step("Create container."): expected_chunks = int(ec_policy.split(" ")[1].split(".")[0]) + int(ec_policy.split(" ")[1].split(".")[1]) if "complex" in object_size.name: expected_chunks *= 4 cid = self.create_container(frostfs_local_cli, self.cluster.default_rpc_endpoint, ec_policy) with reporter.step("Put object in container."): test_file = generate_file(object_size.value) oid = put_object(default_user.wallet, test_file, cid, self.shell, self.cluster.default_rpc_endpoint) with reporter.step("Check count object chunks."): chunks = self.get_all_chunks_object(frostfs_local_cli, cid, oid, self.cluster.default_rpc_endpoint) assert len(chunks) == expected_chunks with reporter.step("get object and check hash."): file_with_node = get_object(default_user.wallet, cid, oid, self.shell, self.cluster.default_rpc_endpoint) assert get_file_hash(test_file) == get_file_hash(file_with_node) @allure.title("Request PUT with copies_number flag") def test_put_object_with_copies_number( self, frostfs_local_cli: FrostfsCli, default_user: User, simple_object_size: ObjectSize, ) -> None: with reporter.step("Create container."): cid = self.create_container(frostfs_local_cli, self.cluster.default_rpc_endpoint, "EC 2.1") with reporter.step("Put object in container with copies number = 1"): test_file = generate_file(simple_object_size.value) oid = put_object(default_user.wallet, test_file, cid, self.shell, self.cluster.default_rpc_endpoint, copies_number=1) with reporter.step("Check that count chunks > 1."): chunks = self.get_all_chunks_object(frostfs_local_cli, cid, oid, self.cluster.default_rpc_endpoint) assert len(chunks) > 1 @allure.title("Request PUT and 1 node off") @pytest.mark.failover def test_put_object_with_off_cnr_node( self, frostfs_local_cli: FrostfsCli, cluster_state_controller: ClusterStateController, default_user: User, simple_object_size: ObjectSize, ) -> None: with reporter.step("Create container."): cid = self.create_container(frostfs_local_cli, self.cluster.default_rpc_endpoint, "EC 3.1") with reporter.step("Stop one node in container nodes"): cluster_state_controller.stop_node_host(self.cluster.cluster_nodes[1], "hard") with reporter.step("Put object in container, expect error."): test_file = generate_file(simple_object_size.value) with pytest.raises(RuntimeError, match="put single object on client"): put_object(default_user.wallet, test_file, cid, self.shell, self.cluster.default_rpc_endpoint) @allure.title("Request DELETE (size={object_size.name})") @pytest.mark.failover def test_delete_object_in_ec_cnr( self, default_user: User, frostfs_local_cli: FrostfsCli, object_size: ObjectSize, cluster_state_controller: ClusterStateController, ) -> None: with reporter.step("Create container."): cid = self.create_container(frostfs_local_cli, self.cluster.default_rpc_endpoint, "EC 2.1") with reporter.step("Put object in container."): test_file = generate_file(object_size.value) oid = put_object(default_user.wallet, test_file, cid, self.shell, self.cluster.default_rpc_endpoint) with reporter.step("Check object chunks nodes."): chunks_object = self.get_all_chunks_object(frostfs_local_cli, cid, oid, self.cluster.default_rpc_endpoint) replication_count = 3 if object_size.name == "simple" else 3 * 4 assert len(chunks_object) == replication_count with reporter.step("Delete object"): frostfs_local_cli.object.delete(self.cluster.default_rpc_endpoint, cid, oid) with reporter.step("Check that delete all chunks."): for chunk in chunks_object: with pytest.raises(RuntimeError, match="object already removed"): frostfs_local_cli.object.head(self.cluster.default_rpc_endpoint, cid, chunk.object_id) with reporter.step("Put second object."): oid_second = put_object(default_user.wallet, test_file, cid, self.shell, self.cluster.default_rpc_endpoint) with reporter.step("Check second object chunks nodes."): chunks_second_object = self.get_all_chunks_object(frostfs_local_cli, cid, oid_second, self.cluster.default_rpc_endpoint) assert len(chunks_second_object) == replication_count with reporter.step("Stop nodes with chunk."): chunk_node = self.get_chunk_node(frostfs_local_cli, chunks_second_object[0]) cluster_state_controller.stop_node_host(chunk_node[0], "hard") with reporter.step("Delete second object"): frostfs_local_cli.object.delete(self.cluster.default_rpc_endpoint, cid, oid_second) with reporter.step("Check that delete all chunk second object."): for chunk in chunks_second_object: with pytest.raises(RuntimeError, match="object already removed"): frostfs_local_cli.object.head(self.cluster.default_rpc_endpoint, cid, chunk.object_id) @allure.title("Request LOCK (size={object_size.name})") @pytest.mark.failover def test_lock_object_in_ec_cnr( self, frostfs_local_cli: FrostfsCli, object_size: ObjectSize, default_user: User, cluster_state_controller: ClusterStateController, ) -> None: with reporter.step("Create container."): cid = self.create_container(frostfs_local_cli, self.cluster.default_rpc_endpoint, "EC 2.1") with reporter.step("Put object in container."): test_file = generate_file(object_size.value) oid = put_object(default_user.wallet, test_file, cid, self.shell, self.cluster.default_rpc_endpoint) with reporter.step("Check object chunks nodes."): chunks_object = self.get_all_chunks_object(frostfs_local_cli, cid, oid, self.cluster.default_rpc_endpoint) replication_count = 3 if object_size.name == "simple" else 3 * 4 assert len(chunks_object) == replication_count with reporter.step("Put LOCK in object."): epoch = frostfs_local_cli.netmap.epoch(self.cluster.default_rpc_endpoint).stdout.strip() frostfs_local_cli.object.lock(self.cluster.default_rpc_endpoint, cid, oid, expire_at=int(epoch) + 5).stdout with reporter.step("Check LOCK in object"): chunks = frostfs_local_cli.object.head(self.cluster.default_rpc_endpoint, cid, oid, raw=True).stdout.strip().split(" ") oids_chunks = [chunk.strip() for chunk in chunks if len(chunk) > 35] for chunk_id in oids_chunks: with pytest.raises(RuntimeError, match="could not delete objects"): frostfs_local_cli.object.delete(self.cluster.default_rpc_endpoint, cid, chunk_id) with reporter.step("Stop chunk node."): chunk_node = self.get_chunk_node(frostfs_local_cli, chunks_object[0]) cluster_state_controller.stop_node_host(chunk_node[0], "hard") cluster_state_controller.start_node_host(chunk_node[0]) with reporter.step("Check LOCK in object."): chunks = self.get_all_chunks_object(frostfs_local_cli, cid, oid, self.cluster.default_rpc_endpoint) for chunk_id in oids_chunks: with pytest.raises(RuntimeError, match="could not delete objects"): frostfs_local_cli.object.delete(self.cluster.default_rpc_endpoint, cid, chunk_id) @allure.title("Output MaxEC* params in frostfscli (type={type_shards})") @pytest.mark.parametrize("type_shards", ["Maximum count of data shards", "Maximum count of parity shards"]) def test_maxec_info_with_output_cli(self, frostfs_local_cli: FrostfsCli, type_shards: str) -> None: with reporter.step("Get and check params"): net_info = frostfs_local_cli.netmap.netinfo(self.cluster.default_rpc_endpoint).stdout assert type_shards in net_info @allure.title("Change MaxEC*Count params") def test_change_max_data_shards_params( self, frostfs_remote_adm: FrostfsAdm, frostfs_local_cli: FrostfsCli, restore_network_config: None, ) -> None: with reporter.step("Get now params MaxECDataCount and MaxECParityCount"): node_netinfo = NetmapParser.netinfo(frostfs_local_cli.netmap.netinfo(self.cluster.default_rpc_endpoint).stdout) with reporter.step("Change params"): frostfs_remote_adm.morph.set_config(set_key_value='"MaxECDataCount=5" "MaxECParityCount=3"') with reporter.step("Get update params"): update_net_info = NetmapParser.netinfo(frostfs_local_cli.netmap.netinfo(self.cluster.default_rpc_endpoint).stdout) with reporter.step("Check old and new params difference"): assert ( update_net_info.maximum_count_of_data_shards not in node_netinfo.maximum_count_of_data_shards and update_net_info.maximum_count_of_parity_shards not in node_netinfo.maximum_count_of_parity_shards ) @allure.title("Check maximum count data and parity shards") def test_change_over_max_parity_shards_params( self, frostfs_remote_adm: FrostfsAdm, ) -> None: with reporter.step("Change over maximum params shards count."): with pytest.raises(RuntimeError, match="MaxECDataCount and MaxECParityCount must be <= 256"): frostfs_remote_adm.morph.set_config(set_key_value='"MaxECDataCount=130" "MaxECParityCount=130"') @allure.title("Create container with EC policy and SELECT (SELECT={select})") @pytest.mark.parametrize("select", [2, 4]) def test_create_container_with_select( self, select: int, frostfs_local_cli: FrostfsCli, ) -> None: with reporter.step("Create container"): policy = f"EC 1.1 CBF 1 SELECT {select} FROM *" cid = self.create_container(frostfs_local_cli, self.cluster.default_rpc_endpoint, policy) with reporter.step("Check container nodes decomposed"): container_nodes = frostfs_local_cli.container.search_node(self.cluster.default_rpc_endpoint, cid).stdout.strip().split("\n")[1:] assert len(container_nodes) == select @allure.title("Create container with EC policy and CBF (CBF={cbf})") @pytest.mark.parametrize("cbf, expected_nodes", [(1, 2), (2, 4)]) def test_create_container_with_cbf( self, cbf: int, expected_nodes: int, frostfs_local_cli: FrostfsCli, ) -> None: with reporter.step("Create container."): policy = f"EC 1.1 CBF {cbf}" cid = self.create_container(frostfs_local_cli, self.cluster.default_rpc_endpoint, policy) with reporter.step("Check expected container nodes."): container_nodes = frostfs_local_cli.container.search_node(self.cluster.default_rpc_endpoint, cid).stdout.strip().split("\n")[1:] assert len(container_nodes) == expected_nodes @allure.title("Create container with EC policy and FILTER") def test_create_container_with_filter( self, default_user: User, frostfs_local_cli: FrostfsCli, simple_object_size: ObjectSize, ) -> None: with reporter.step("Create Container."): policy = "EC 1.1 IN RUS SELECT 2 FROM RU AS RUS FILTER Country EQ Russia AS RU" cid = self.create_container(frostfs_local_cli, self.cluster.default_rpc_endpoint, policy) with reporter.step("Put object in container."): test_file = generate_file(simple_object_size.value) oid = put_object(default_user.wallet, test_file, cid, self.shell, self.cluster.default_rpc_endpoint) with reporter.step("Check object is decomposed exclusively on Russian nodes"): data_chunk = self.get_data_chunk_object(frostfs_local_cli, cid, oid, self.cluster.default_rpc_endpoint) parity_chunk = self.get_parity_chunk_object(frostfs_local_cli, cid, oid, self.cluster.default_rpc_endpoint) node_data_chunk = self.get_chunk_node(frostfs_local_cli, data_chunk) node_parity_chunk = self.get_chunk_node(frostfs_local_cli, parity_chunk) for node in [node_data_chunk[1], node_parity_chunk[1]]: assert "Russia" in node.country