diff --git a/pytest_tests/helpers/container_request.py b/pytest_tests/helpers/container_request.py index 9905112..72740b3 100644 --- a/pytest_tests/helpers/container_request.py +++ b/pytest_tests/helpers/container_request.py @@ -1,4 +1,5 @@ from dataclasses import dataclass +from functools import partial from frostfs_testlib.steps.cli.container import DEFAULT_PLACEMENT_RULE from frostfs_testlib.storage.cluster import Cluster @@ -53,6 +54,7 @@ class ContainerRequest: return f"({', '.join(spec_info)})" +PUBLIC_WITH_POLICY = partial(ContainerRequest, ape_rules=APE_EVERYONE_ALLOW_ALL, short_name="Custom_policy_with_allow_all_ape_rule") EVERYONE_ALLOW_ALL = ContainerRequest(policy=DEFAULT_PLACEMENT_RULE, ape_rules=APE_EVERYONE_ALLOW_ALL, short_name="Everyone_Allow_All") OWNER_ALLOW_ALL = ContainerRequest(policy=DEFAULT_PLACEMENT_RULE, ape_rules=APE_OWNER_ALLOW_ALL, short_name="Owner_Allow_All") PRIVATE = ContainerRequest(policy=DEFAULT_PLACEMENT_RULE, ape_rules=[], short_name="Private_No_APE") diff --git a/pytest_tests/helpers/policy_validation.py b/pytest_tests/helpers/policy_validation.py new file mode 100644 index 0000000..ab18ab5 --- /dev/null +++ b/pytest_tests/helpers/policy_validation.py @@ -0,0 +1,26 @@ +from frostfs_testlib.shell.interfaces import Shell +from frostfs_testlib.steps.cli.container import get_container +from frostfs_testlib.storage.dataclasses.storage_object_info import NodeNetmapInfo +from workspace.frostfs_testcases.pytest_tests.helpers.utility import placement_policy_from_container + + +def validate_object_policy(wallet: str, shell: Shell, placement_rule: str, cid: str, endpoint: str): + got_policy = placement_policy_from_container(get_container(wallet, cid, shell, endpoint, False)) + assert got_policy.replace("'", "") == placement_rule.replace( + "'", "" + ), f"Expected \n{placement_rule} and got policy \n{got_policy} are the same" + + +def get_netmap_param(netmap_info: list[NodeNetmapInfo]) -> dict: + dict_external = dict() + for node in netmap_info: + external_adress = node.external_address[0].split("/")[2] + dict_external[external_adress] = { + "country": node.country, + "country_code": node.country_code, + "Price": node.price, + "continent": node.continent, + "un_locode": node.un_locode, + "location": node.location, + } + return dict_external diff --git a/pytest_tests/testsuites/container/test_policy.py b/pytest_tests/testsuites/container/test_policy.py index 6847904..1779242 100644 --- a/pytest_tests/testsuites/container/test_policy.py +++ b/pytest_tests/testsuites/container/test_policy.py @@ -1,221 +1,174 @@ import allure import pytest from frostfs_testlib import reporter -from frostfs_testlib.resources.wellknown_acl import PUBLIC_ACL -from frostfs_testlib.steps.cli.container import create_container, delete_container, get_container +from frostfs_testlib.steps.cli.container import create_container, delete_container from frostfs_testlib.steps.cli.object import delete_object, put_object_to_random_node from frostfs_testlib.steps.node_management import get_netmap_snapshot from frostfs_testlib.steps.storage_policy import get_nodes_with_object -from frostfs_testlib.storage.controllers.cluster_state_controller import ClusterStateController -from frostfs_testlib.storage.controllers.state_managers.config_state_manager import ConfigStateManager -from frostfs_testlib.storage.dataclasses.frostfs_services import StorageNode from frostfs_testlib.storage.dataclasses.object_size import ObjectSize -from frostfs_testlib.storage.dataclasses.storage_object_info import NodeNetmapInfo +from frostfs_testlib.storage.dataclasses.wallet import WalletInfo from frostfs_testlib.testing.cluster_test_base import ClusterTestBase -from frostfs_testlib.testing.test_control import wait_for_success from frostfs_testlib.utils.cli_utils import parse_netmap_output from frostfs_testlib.utils.file_utils import generate_file -from ...helpers.utility import placement_policy_from_container +from ...helpers.container_request import PUBLIC_WITH_POLICY, ContainerRequest +from ...helpers.policy_validation import get_netmap_param, validate_object_policy from ...resources.policy_error_patterns import NOT_ENOUGH_TO_SELECT, NOT_FOUND_FILTER, NOT_FOUND_SELECTOR, NOT_PARSE_POLICY @pytest.mark.nightly @pytest.mark.policy class TestPolicy(ClusterTestBase): - @wait_for_success(1200, 60, title="Wait for full field price on node", expected_result=True) - def await_for_price_attribute_on_nodes(self): - netmap = parse_netmap_output(get_netmap_snapshot(node=self.cluster.storage_nodes[0], shell=self.shell)) - netmap = self.get_netmap_param(netmap) - for node in self.cluster.storage_nodes: - node_address = node.get_rpc_endpoint().split(":")[0] - if netmap[node_address]["Price"] is None: - return False - return True - - @pytest.fixture(scope="module") - def fill_field_price(self, cluster_state_controller_session: ClusterStateController): - prices = ["15", "10", "65", "55"] - - config_manager = cluster_state_controller_session.manager(ConfigStateManager) - for i in zip(self.cluster.cluster_nodes, prices): - config_manager.set_on_node(i[0], StorageNode, {"node:attribute_5": f"Price:{i[1]}"}) - - self.tick_epoch() - self.await_for_price_attribute_on_nodes() - - yield - - cluster_state_controller_session.manager(ConfigStateManager).revert_all() - @allure.title("[NEGATIVE] Placement policy: Can't parse placement policy") - def test_placement_policy_negative(self, default_wallet): + def test_placement_policy_negative(self, default_wallet: WalletInfo, rpc_endpoint: str): """ Negative test for placement policy: Can't parse placement policy. """ placement_rule = "REP 1 IN SPB REP 1 in MSK CBF 1 SELECT 1 FROM SPBRU AS SPB SELECT 1 FROM MSKRU AS MSK FILTER SubDivCode EQ SPE AS SPBRU FILTER NOT (Country EQ Sweden OR CountryCode EQ FI) AND Location EQ Moskva AS MSKRU" - endpoint = self.cluster.default_rpc_endpoint with reporter.step(f"Create container with policy {placement_rule}"): with pytest.raises(Exception, match=NOT_PARSE_POLICY): - cid = create_container( - wallet=default_wallet, - rule=placement_rule, - basic_acl=PUBLIC_ACL, - shell=self.shell, - endpoint=endpoint, - ) + create_container(default_wallet, self.shell, rpc_endpoint, placement_rule) @allure.title("[NEGATIVE] Placement policy: Not enough nodes to SELECT") - def test_placement_policy_negative_not_enough_nodes_to_select(self, default_wallet): + def test_placement_policy_negative_not_enough_nodes_to_select(self, default_wallet: WalletInfo, rpc_endpoint: str): """ Negative test for placement policy: Not enough nodes to SELECT. """ placement_rule = "REP 2 IN RU SELECT 2 FROM RUS AS RU FILTER Country EQ Russia AND SubDivCode EQ SPE AS RUS" - endpoint = self.cluster.default_rpc_endpoint + with reporter.step(f"Create container with policy {placement_rule}"): with pytest.raises(Exception, match=NOT_ENOUGH_TO_SELECT): - cid = create_container( - wallet=default_wallet, - rule=placement_rule, - basic_acl=PUBLIC_ACL, - shell=self.shell, - endpoint=endpoint, - ) + create_container(default_wallet, self.shell, rpc_endpoint, placement_rule) @allure.title("[NEGATIVE] Placement policy: Filter not found") - def test_placement_policy_negative_not_enough_nodes_to_filter(self, default_wallet): + def test_placement_policy_negative_not_enough_nodes_to_filter(self, default_wallet: WalletInfo, rpc_endpoint: str): """ Negative test for placement policy: Filter not found. """ placement_rule = ( "REP 2 IN HALF CBF 1 SELECT 2 FROM GT15 AS HALF FILTER @NOTRU AND Price GT 15 AS GT15 FILTER CountryCode NE RU AS NOTRU" ) - endpoint = self.cluster.default_rpc_endpoint + with reporter.step(f"Create container with policy {placement_rule}"): with pytest.raises(Exception, match=NOT_FOUND_FILTER): - cid = create_container( - wallet=default_wallet, - rule=placement_rule, - basic_acl=PUBLIC_ACL, - shell=self.shell, - endpoint=endpoint, - ) + create_container(default_wallet, self.shell, rpc_endpoint, placement_rule) @allure.title("[NEGATIVE] Placement policy: SELECTOR not found") - def test_placement_policy_negative_not_enough_nodes_to_selector(self, default_wallet): + def test_placement_policy_negative_not_enough_nodes_to_selector(self, default_wallet: WalletInfo, rpc_endpoint: str): """ Negative test for placement policy: Filter not found. """ placement_rule = "REP 2 IN HALFIC CBF 1 SELECT 2 FROM GT15 AS HALF FILTER Price GT 15 AS GT15" - endpoint = self.cluster.default_rpc_endpoint + with reporter.step(f"Create container with policy {placement_rule}"): with pytest.raises(Exception, match=NOT_FOUND_SELECTOR): - cid = create_container( - wallet=default_wallet, - rule=placement_rule, - basic_acl=PUBLIC_ACL, - shell=self.shell, - endpoint=endpoint, - ) + create_container(default_wallet, self.shell, rpc_endpoint, placement_rule) @allure.title("Simple policy results with one node") + @pytest.mark.parametrize("container_request", [PUBLIC_WITH_POLICY("REP 1 REP 1 CBF 1")], indirect=True) def test_simple_policy_results_with_one_node( self, - default_wallet, + default_wallet: WalletInfo, + rpc_endpoint: str, simple_object_size: ObjectSize, + container: str, + container_request: ContainerRequest, ): """ This test checks object's copies based on container's placement simple policy results with one node. """ - placement_rule = "REP 1 REP 1 CBF 1" file_path = generate_file(simple_object_size.value) expected_copies = 1 - endpoint = self.cluster.default_rpc_endpoint - - with reporter.step(f"Create container with policy {placement_rule}"): - cid = create_container(wallet=default_wallet, rule=placement_rule, basic_acl=PUBLIC_ACL, shell=self.shell, endpoint=endpoint) with reporter.step(f"Check container policy"): - self.validate_object_policy(default_wallet, placement_rule, cid, endpoint) + validate_object_policy(default_wallet, self.shell, container_request.policy, container, rpc_endpoint) with reporter.step(f"Put object in container"): - oid = put_object_to_random_node(default_wallet, file_path, cid, shell=self.shell, cluster=self.cluster) + oid = put_object_to_random_node(default_wallet, file_path, container, self.shell, self.cluster) with reporter.step(f"Check object expected copies"): - resulting_copies = get_nodes_with_object(cid, oid, shell=self.shell, nodes=self.cluster.storage_nodes) + resulting_copies = get_nodes_with_object(container, oid, self.shell, self.cluster.storage_nodes) assert len(resulting_copies) == expected_copies, f"Expected {expected_copies} copies, got {len(resulting_copies)}" with reporter.step(f"Delete the object from the container"): - delete_object(wallet=default_wallet, cid=cid, oid=oid, shell=self.shell, endpoint=endpoint) + delete_object(default_wallet, container, oid, self.shell, rpc_endpoint) with reporter.step(f"Delete the container"): - delete_container(wallet=default_wallet, cid=cid, shell=self.shell, endpoint=endpoint) + delete_container(default_wallet, container, self.shell, rpc_endpoint, await_mode=False) @allure.title("Policy with SELECT results with one node") + @pytest.mark.parametrize( + "container_request", [PUBLIC_WITH_POLICY("REP 1 IN AnyNode REP 1 IN AnyNode CBF 1 SELECT 1 FROM * AS AnyNode")], indirect=True + ) def test_policy_with_select_results_with_one_node( self, - default_wallet, + default_wallet: WalletInfo, + rpc_endpoint: str, simple_object_size: ObjectSize, + container: str, + container_request: ContainerRequest, ): """ This test checks object's copies based on container's placement policy with SELECT results with one node. """ - placement_rule = "REP 1 IN AnyNode REP 1 IN AnyNode CBF 1 SELECT 1 FROM * AS AnyNode" file_path = generate_file(simple_object_size.value) expected_copies = 1 - endpoint = self.cluster.default_rpc_endpoint - - with reporter.step(f"Create container with policy {placement_rule}"): - cid = create_container(wallet=default_wallet, rule=placement_rule, basic_acl=PUBLIC_ACL, shell=self.shell, endpoint=endpoint) with reporter.step(f"Check container policy"): - self.validate_object_policy(default_wallet, placement_rule, cid, endpoint) + validate_object_policy(default_wallet, self.shell, container_request.policy, container, rpc_endpoint) with reporter.step(f"Put object in container"): - oid = put_object_to_random_node(default_wallet, file_path, cid, shell=self.shell, cluster=self.cluster) + oid = put_object_to_random_node(default_wallet, file_path, container, self.shell, self.cluster) with reporter.step(f"Check object expected copies"): - resulting_copies = get_nodes_with_object(cid, oid, shell=self.shell, nodes=self.cluster.storage_nodes) + resulting_copies = get_nodes_with_object(container, oid, self.shell, self.cluster.storage_nodes) assert len(resulting_copies) == expected_copies, f"Expected {expected_copies} copies, got {len(resulting_copies)}" with reporter.step(f"Delete the object from the container"): - delete_object(wallet=default_wallet, cid=cid, oid=oid, shell=self.shell, endpoint=endpoint) + delete_object(default_wallet, container, oid, self.shell, rpc_endpoint) with reporter.step(f"Delete the container"): - delete_container(wallet=default_wallet, cid=cid, shell=self.shell, endpoint=endpoint) + delete_container(default_wallet, container, self.shell, rpc_endpoint, await_mode=False) @allure.title("Policy with SELECT and FILTER results with one node") + @pytest.mark.parametrize( + "container_request", + [ + PUBLIC_WITH_POLICY( + "REP 1 IN MyRussianNodes REP 1 IN MyRussianNodes CBF 1 SELECT 1 FROM RussianNodes AS MyRussianNodes FILTER Country EQ 'Russia' AS RussianNodes" + ) + ], + indirect=True, + ) def test_policy_with_select_and_filter_results_with_one_node( self, - default_wallet, + default_wallet: WalletInfo, + rpc_endpoint: str, simple_object_size: ObjectSize, + container: str, + container_request: ContainerRequest, ): """ This test checks object's copies based on container's placement policy with SELECT and FILTER results with one node. """ - placement_rule = "REP 1 IN MyRussianNodes REP 1 IN MyRussianNodes CBF 1 SELECT 1 FROM RussianNodes AS MyRussianNodes FILTER Country EQ 'Russia' AS RussianNodes" file_path = generate_file(simple_object_size.value) expected_copies = 1 - endpoint = self.cluster.default_rpc_endpoint placement_params = {"country": "Russia"} - with reporter.step(f"Create container with policy {placement_rule}"): - cid = create_container(wallet=default_wallet, rule=placement_rule, basic_acl=PUBLIC_ACL, shell=self.shell, endpoint=endpoint) - with reporter.step(f"Check container policy"): - self.validate_object_policy(default_wallet, placement_rule, cid, endpoint) + validate_object_policy(default_wallet, self.shell, container_request.policy, container, rpc_endpoint) with reporter.step(f"Put object in container"): - oid = put_object_to_random_node(default_wallet, file_path, cid, shell=self.shell, cluster=self.cluster) + oid = put_object_to_random_node(default_wallet, file_path, container, self.shell, self.cluster) with reporter.step(f"Check object expected copies"): - resulting_copies = get_nodes_with_object(cid, oid, shell=self.shell, nodes=self.cluster.storage_nodes) + resulting_copies = get_nodes_with_object(container, oid, self.shell, self.cluster.storage_nodes) assert len(resulting_copies) == expected_copies, f"Expected {expected_copies} copies, got {len(resulting_copies)}" with reporter.step(f"Check the object appearance"): netmap = parse_netmap_output(get_netmap_snapshot(node=resulting_copies[0], shell=self.shell)) - netmap = self.get_netmap_param(netmap) + netmap = get_netmap_param(netmap) node_address = resulting_copies[0].get_rpc_endpoint().split(":")[0] with reporter.step(f"Check the node is selected from {placement_params['country']}"): assert ( @@ -223,42 +176,49 @@ class TestPolicy(ClusterTestBase): ), f"The node is selected from the wrong country. Got {netmap[node_address]}" with reporter.step(f"Delete the object from the container"): - delete_object(wallet=default_wallet, cid=cid, oid=oid, shell=self.shell, endpoint=endpoint) + delete_object(default_wallet, container, oid, self.shell, rpc_endpoint) with reporter.step(f"Delete the container"): - delete_container(wallet=default_wallet, cid=cid, shell=self.shell, endpoint=endpoint) + delete_container(default_wallet, container, self.shell, rpc_endpoint, await_mode=False) @allure.title("Policy with SELECT and Complex FILTER results with one node") + @pytest.mark.parametrize( + "container_request", + [ + PUBLIC_WITH_POLICY( + "REP 1 IN RUS REP 1 IN RUS CBF 1 SELECT 1 FROM RU AS RUS FILTER Country NE Sweden AS NotSE FILTER @NotSE AND NOT (CountryCode EQ FI) AND Country EQ 'Russia' AS RU" + ) + ], + indirect=True, + ) def test_policy_with_select_and_complex_filter_results_with_one_nodes( self, - default_wallet, + default_wallet: WalletInfo, + rpc_endpoint: str, simple_object_size: ObjectSize, + container: str, + container_request: ContainerRequest, ): """ This test checks object's copies based on container's placement policy with SELECT and Complex FILTER results with one node. """ - placement_rule = "REP 1 IN RUS REP 1 IN RUS CBF 1 SELECT 1 FROM RU AS RUS FILTER Country NE Sweden AS NotSE FILTER @NotSE AND NOT (CountryCode EQ FI) AND Country EQ 'Russia' AS RU" placement_params = {"country": ["Russia", "Sweden"], "country_code": "FI"} file_path = generate_file(simple_object_size.value) expected_copies = 1 - endpoint = self.cluster.default_rpc_endpoint - - with reporter.step(f"Create container with policy {placement_rule}"): - cid = create_container(wallet=default_wallet, rule=placement_rule, basic_acl=PUBLIC_ACL, shell=self.shell, endpoint=endpoint) with reporter.step(f"Check container policy"): - self.validate_object_policy(default_wallet, placement_rule, cid, endpoint) + validate_object_policy(default_wallet, self.shell, container_request.policy, container, rpc_endpoint) with reporter.step(f"Put object in container"): - oid = put_object_to_random_node(default_wallet, file_path, cid, shell=self.shell, cluster=self.cluster) + oid = put_object_to_random_node(default_wallet, file_path, container, self.shell, self.cluster) with reporter.step(f"Check object expected copies"): - resulting_copies = get_nodes_with_object(cid, oid, shell=self.shell, nodes=self.cluster.storage_nodes) + resulting_copies = get_nodes_with_object(container, oid, self.shell, self.cluster.storage_nodes) assert len(resulting_copies) == expected_copies, f"Expected {expected_copies} copies, got {len(resulting_copies)}" with reporter.step(f"Check the object appearance"): netmap = parse_netmap_output(get_netmap_snapshot(node=resulting_copies[0], shell=self.shell)) - netmap = self.get_netmap_param(netmap) + netmap = get_netmap_param(netmap) node_address = resulting_copies[0].get_rpc_endpoint().split(":")[0] with reporter.step(f"Check the node is selected from {placement_params['country'][0]}"): assert ( @@ -269,42 +229,49 @@ class TestPolicy(ClusterTestBase): ), f"The node is selected from the wrong country or country code. Got {netmap[node_address]}" with reporter.step(f"Delete the object from the container"): - delete_object(wallet=default_wallet, cid=cid, oid=oid, shell=self.shell, endpoint=endpoint) + delete_object(default_wallet, container, oid, self.shell, rpc_endpoint) with reporter.step(f"Delete the container"): - delete_container(wallet=default_wallet, cid=cid, shell=self.shell, endpoint=endpoint) + delete_container(default_wallet, container, self.shell, rpc_endpoint, await_mode=False) @allure.title("Policy with Multi SELECTs and FILTERs results with one node") + @pytest.mark.parametrize( + "container_request", + [ + PUBLIC_WITH_POLICY( + "REP 1 IN RU REP 1 IN EU REP 1 IN RU CBF 1 SELECT 1 FROM RUS AS RU SELECT 1 FROM EUR AS EU FILTER Country EQ Russia AS RUS FILTER NOT (@RUS) AND Country EQ Sweden OR CountryCode EQ FI AS EUR" + ) + ], + indirect=True, + ) def test_policy_with_multi_selects_and_filters_results_with_one_node( self, - default_wallet, + default_wallet: WalletInfo, + rpc_endpoint: str, simple_object_size: ObjectSize, + container: str, + container_request: ContainerRequest, ): """ This test checks object's copies based on container's placement policy with Multi SELECTs and FILTERs results with one nodes. """ - placement_rule = "REP 1 IN RU REP 1 IN EU REP 1 IN RU CBF 1 SELECT 1 FROM RUS AS RU SELECT 1 FROM EUR AS EU FILTER Country EQ Russia AS RUS FILTER NOT (@RUS) AND Country EQ Sweden OR CountryCode EQ FI AS EUR" placement_params = {"country": ["Sweden", "Russia"], "country_code": "FI"} file_path = generate_file(simple_object_size.value) expected_copies = 2 - endpoint = self.cluster.default_rpc_endpoint - - with reporter.step(f"Create container with policy {placement_rule}"): - cid = create_container(wallet=default_wallet, rule=placement_rule, basic_acl=PUBLIC_ACL, shell=self.shell, endpoint=endpoint) with reporter.step(f"Check container policy"): - self.validate_object_policy(default_wallet, placement_rule, cid, endpoint) + validate_object_policy(default_wallet, self.shell, container_request.policy, container, rpc_endpoint) with reporter.step(f"Put object in container"): - oid = put_object_to_random_node(default_wallet, file_path, cid, shell=self.shell, cluster=self.cluster) + oid = put_object_to_random_node(default_wallet, file_path, container, self.shell, self.cluster) with reporter.step(f"Check object expected copies"): - resulting_copies = get_nodes_with_object(cid, oid, shell=self.shell, nodes=self.cluster.storage_nodes) + resulting_copies = get_nodes_with_object(container, oid, self.shell, self.cluster.storage_nodes) assert len(resulting_copies) == expected_copies, f"Expected {expected_copies} copies, got {len(resulting_copies)}" with reporter.step(f"Check the object appearance"): netmap = parse_netmap_output(get_netmap_snapshot(node=resulting_copies[0], shell=self.shell)) - netmap = self.get_netmap_param(netmap) + netmap = get_netmap_param(netmap) with reporter.step(f"Check two nodes are selected from any country"): for node in resulting_copies: node_address = resulting_copies[0].get_rpc_endpoint().split(":")[0] @@ -315,42 +282,49 @@ class TestPolicy(ClusterTestBase): ), f"The node is selected from the wrong country or country code. Got {netmap[node_address]}" with reporter.step(f"Delete the object from the container"): - delete_object(wallet=default_wallet, cid=cid, oid=oid, shell=self.shell, endpoint=endpoint) + delete_object(default_wallet, container, oid, self.shell, rpc_endpoint) with reporter.step(f"Delete the container"): - delete_container(wallet=default_wallet, cid=cid, shell=self.shell, endpoint=endpoint) + delete_container(default_wallet, container, self.shell, rpc_endpoint, await_mode=False) @allure.title("Policy with SELECT and FILTER results with UNIQUE nodes") + @pytest.mark.parametrize( + "container_request", + [ + PUBLIC_WITH_POLICY( + "UNIQUE REP 1 IN MyRussianNodes REP 1 IN MyRussianNodes CBF 1 SELECT 1 FROM RussianNodes AS MyRussianNodes FILTER Country EQ 'Russia' AS RussianNodes" + ) + ], + indirect=True, + ) def test_policy_with_select_and_filter_results_with_unique_nodes( self, - default_wallet, + default_wallet: WalletInfo, + rpc_endpoint: str, simple_object_size: ObjectSize, + container: str, + container_request: ContainerRequest, ): """ This test checks object's copies based on container's placement policy with SELECT and FILTER results with UNIQUE nodes. """ - placement_rule = "UNIQUE REP 1 IN MyRussianNodes REP 1 IN MyRussianNodes CBF 1 SELECT 1 FROM RussianNodes AS MyRussianNodes FILTER Country EQ 'Russia' AS RussianNodes" placement_params = {"country": "Russia"} file_path = generate_file(simple_object_size.value) expected_copies = 2 - endpoint = self.cluster.default_rpc_endpoint - - with reporter.step(f"Create container with policy {placement_rule}"): - cid = create_container(wallet=default_wallet, rule=placement_rule, basic_acl=PUBLIC_ACL, shell=self.shell, endpoint=endpoint) with reporter.step(f"Check container policy"): - self.validate_object_policy(default_wallet, placement_rule, cid, endpoint) + validate_object_policy(default_wallet, self.shell, container_request.policy, container, rpc_endpoint) with reporter.step(f"Put object in container"): - oid = put_object_to_random_node(default_wallet, file_path, cid, shell=self.shell, cluster=self.cluster) + oid = put_object_to_random_node(default_wallet, file_path, container, self.shell, self.cluster) with reporter.step(f"Check object expected copies"): - resulting_copies = get_nodes_with_object(cid, oid, shell=self.shell, nodes=self.cluster.storage_nodes) + resulting_copies = get_nodes_with_object(container, oid, self.shell, self.cluster.storage_nodes) assert len(resulting_copies) == expected_copies, f"Expected {expected_copies} copies, got {len(resulting_copies)}" with reporter.step(f"Check the object appearance"): netmap = parse_netmap_output(get_netmap_snapshot(node=resulting_copies[0], shell=self.shell)) - netmap = self.get_netmap_param(netmap) + netmap = get_netmap_param(netmap) with reporter.step(f"Check two nodes are selected from {placement_params['country']}"): for node in resulting_copies: node_address = node.get_rpc_endpoint().split(":")[0] @@ -359,782 +333,307 @@ class TestPolicy(ClusterTestBase): ), f"The node is selected from the wrong country. Got {netmap[node_address]['country']}" with reporter.step(f"Delete the object from the container"): - delete_object(wallet=default_wallet, cid=cid, oid=oid, shell=self.shell, endpoint=endpoint) + delete_object(default_wallet, container, oid, self.shell, rpc_endpoint) with reporter.step(f"Delete the container"): - delete_container(wallet=default_wallet, cid=cid, shell=self.shell, endpoint=endpoint) + delete_container(default_wallet, container, self.shell, rpc_endpoint, await_mode=False) @allure.title("Simple policy results with 25% of available nodes") + @pytest.mark.parametrize("container_request", [PUBLIC_WITH_POLICY("REP 1 CBF 1")], indirect=True) def test_simple_policy_results_with_25_of_available_nodes( self, - default_wallet, + default_wallet: WalletInfo, + rpc_endpoint: str, simple_object_size: ObjectSize, + container: str, + container_request: ContainerRequest, ): """ This test checks object's copies based on container's placement policy results with 25% of available nodes. """ - placement_rule = "REP 1 CBF 1" file_path = generate_file(simple_object_size.value) expected_copies = 1 - endpoint = self.cluster.default_rpc_endpoint - - with reporter.step(f"Create container with policy {placement_rule}"): - cid = create_container(wallet=default_wallet, rule=placement_rule, basic_acl=PUBLIC_ACL, shell=self.shell, endpoint=endpoint) with reporter.step(f"Check container policy"): - self.validate_object_policy(default_wallet, placement_rule, cid, endpoint) + validate_object_policy(default_wallet, self.shell, container_request.policy, container, rpc_endpoint) with reporter.step(f"Put object in container"): - oid = put_object_to_random_node(default_wallet, file_path, cid, shell=self.shell, cluster=self.cluster) + oid = put_object_to_random_node(default_wallet, file_path, container, self.shell, self.cluster) with reporter.step(f"Check object expected copies"): - resulting_copies = get_nodes_with_object(cid, oid, shell=self.shell, nodes=self.cluster.storage_nodes) + resulting_copies = get_nodes_with_object(container, oid, self.shell, self.cluster.storage_nodes) assert len(resulting_copies) == expected_copies, f"Expected {expected_copies} copies, got {len(resulting_copies)}" with reporter.step(f"Delete the object from the container"): - delete_object(wallet=default_wallet, cid=cid, oid=oid, shell=self.shell, endpoint=endpoint) + delete_object(default_wallet, container, oid, self.shell, rpc_endpoint) with reporter.step(f"Delete the container"): - delete_container(wallet=default_wallet, cid=cid, shell=self.shell, endpoint=endpoint) + delete_container(default_wallet, container, self.shell, rpc_endpoint, await_mode=False) @allure.title("Policy with SELECT results with 25% of available nodes") + @pytest.mark.parametrize("container_request", [PUBLIC_WITH_POLICY("REP 1 IN One CBF 1 SELECT 1 FROM * AS One")], indirect=True) def test_policy_with_select_results_with_25_of_available_nodes( self, - default_wallet, + default_wallet: WalletInfo, + rpc_endpoint: str, simple_object_size: ObjectSize, + container: str, + container_request: ContainerRequest, ): """ This test checks object's copies based on container's placement policy with SELECT results with 25% of available nodes. """ - placement_rule = "REP 1 IN One CBF 1 SELECT 1 FROM * AS One" file_path = generate_file(simple_object_size.value) expected_copies = 1 - endpoint = self.cluster.default_rpc_endpoint - - with reporter.step(f"Create container with policy {placement_rule}"): - cid = create_container(wallet=default_wallet, rule=placement_rule, basic_acl=PUBLIC_ACL, shell=self.shell, endpoint=endpoint) with reporter.step(f"Check container policy"): - self.validate_object_policy(default_wallet, placement_rule, cid, endpoint) + validate_object_policy(default_wallet, self.shell, container_request.policy, container, rpc_endpoint) with reporter.step(f"Put object in container"): - oid = put_object_to_random_node(default_wallet, file_path, cid, shell=self.shell, cluster=self.cluster) + oid = put_object_to_random_node(default_wallet, file_path, container, self.shell, self.cluster) with reporter.step(f"Check object expected copies"): - resulting_copies = get_nodes_with_object(cid, oid, shell=self.shell, nodes=self.cluster.storage_nodes) + resulting_copies = get_nodes_with_object(container, oid, self.shell, self.cluster.storage_nodes) assert len(resulting_copies) == expected_copies, f"Expected {expected_copies} copies, got {len(resulting_copies)}" with reporter.step(f"Delete the object from the container"): - delete_object(wallet=default_wallet, cid=cid, oid=oid, shell=self.shell, endpoint=endpoint) + delete_object(default_wallet, container, oid, self.shell, rpc_endpoint) with reporter.step(f"Delete the container"): - delete_container(wallet=default_wallet, cid=cid, shell=self.shell, endpoint=endpoint) - - @allure.title("Policy with SELECT and FILTER results with 25% of available nodes") - def test_policy_with_select_and_filter_results_with_25_of_available_nodes( - self, - default_wallet, - simple_object_size: ObjectSize, - fill_field_price, - ): - """ - This test checks object's copies based on container's placement policy with SELECT and FILTER results with 25% of available nodes. - """ - placement_rule = "REP 1 IN Nodes25 SELECT 1 FROM LE10 AS Nodes25 FILTER Price LE 10 AS LE10" - placement_params = {"Price": 10} - file_path = generate_file(simple_object_size.value) - expected_copies = 1 - endpoint = self.cluster.default_rpc_endpoint - - with reporter.step(f"Create container with policy {placement_rule}"): - cid = create_container(wallet=default_wallet, rule=placement_rule, basic_acl=PUBLIC_ACL, shell=self.shell, endpoint=endpoint) - - with reporter.step(f"Check container policy"): - self.validate_object_policy(default_wallet, placement_rule, cid, endpoint) - - with reporter.step(f"Put object in container"): - oid = put_object_to_random_node(default_wallet, file_path, cid, shell=self.shell, cluster=self.cluster) - - with reporter.step(f"Check object expected copies"): - resulting_copies = get_nodes_with_object(cid, oid, shell=self.shell, nodes=self.cluster.storage_nodes) - assert len(resulting_copies) == expected_copies, f"Expected {expected_copies} copies, got {len(resulting_copies)}" - - with reporter.step(f"Check the object appearance"): - netmap = parse_netmap_output(get_netmap_snapshot(node=resulting_copies[0], shell=self.shell)) - netmap = self.get_netmap_param(netmap) - node_address = resulting_copies[0].get_rpc_endpoint().split(":")[0] - with reporter.step(f"Check the node is selected with price <= {placement_params['Price']}"): - assert ( - int(netmap[node_address]["Price"]) <= placement_params["Price"] - ), f"The node is selected with the wrong price. Got {netmap[node_address]}" - - with reporter.step(f"Delete the object from the container"): - delete_object(wallet=default_wallet, cid=cid, oid=oid, shell=self.shell, endpoint=endpoint) - - with reporter.step(f"Delete the container"): - delete_container(wallet=default_wallet, cid=cid, shell=self.shell, endpoint=endpoint) - - @allure.title("Policy with select and complex filter results with 25% of available nodes") - def test_policy_with_select_and_complex_filter_results_with_25_of_available_nodes( - self, - default_wallet, - simple_object_size: ObjectSize, - fill_field_price, - ): - """ - This test checks object's copies based on container's placement policy with SELECT and Complex FILTER results with 25% of available nodes. - """ - placement_rule = "REP 1 IN Nodes25 SELECT 1 FROM BET0AND10 AS Nodes25 FILTER Price LE 10 AS LE10 FILTER Price GT 0 AS GT0 FILTER @LE10 AND @GT0 AS BET0AND10" - placement_params = {"Price": [10, 0]} - file_path = generate_file(simple_object_size.value) - expected_copies = 1 - endpoint = self.cluster.default_rpc_endpoint - - with reporter.step(f"Create container with policy {placement_rule}"): - cid = create_container(wallet=default_wallet, rule=placement_rule, basic_acl=PUBLIC_ACL, shell=self.shell, endpoint=endpoint) - - with reporter.step(f"Check container policy"): - self.validate_object_policy(default_wallet, placement_rule, cid, endpoint) - - with reporter.step(f"Put object in container"): - oid = put_object_to_random_node(default_wallet, file_path, cid, shell=self.shell, cluster=self.cluster) - - with reporter.step(f"Check object expected copies"): - resulting_copies = get_nodes_with_object(cid, oid, shell=self.shell, nodes=self.cluster.storage_nodes) - assert len(resulting_copies) == expected_copies, f"Expected {expected_copies} copies, got {len(resulting_copies)}" - - with reporter.step(f"Check the object appearance"): - netmap = parse_netmap_output(get_netmap_snapshot(node=resulting_copies[0], shell=self.shell)) - netmap = self.get_netmap_param(netmap) - with reporter.step(f"Check the node is selected with price between 1 and 10"): - for node in resulting_copies: - node_address = node.get_rpc_endpoint().split(":")[0] - assert ( - int(netmap[node_address]["Price"]) > placement_params["Price"][1] - and int(netmap[node_address]["Price"]) <= placement_params["Price"][0] - ), f"The node is selected with the wrong price. Got {netmap[node_address]}" - - with reporter.step(f"Delete the object from the container"): - delete_object(wallet=default_wallet, cid=cid, oid=oid, shell=self.shell, endpoint=endpoint) - - with reporter.step(f"Delete the container"): - delete_container(wallet=default_wallet, cid=cid, shell=self.shell, endpoint=endpoint) - - @allure.title("Policy with Multi SELECTs and FILTERs results with 25% of available nodes") - def test_policy_with_multi_selects_and_filters_results_with_25_of_available_nodes( - self, - default_wallet, - simple_object_size: ObjectSize, - fill_field_price, - ): - """ - This test checks object's copies based on container's placement policy with Multi SELECTs and FILTERs results with 25% of available nodes. - """ - placement_rule = "UNIQUE REP 1 IN One REP 1 IN One CBF 1 SELECT 1 FROM MINMAX AS One FILTER Price LT 15 AS LT15 FILTER Price GT 55 AS GT55 FILTER @LT15 OR @GT55 AS MINMAX" - placement_params = {"Price": [15, 55]} - file_path = generate_file(simple_object_size.value) - expected_copies = 2 - endpoint = self.cluster.default_rpc_endpoint - - with reporter.step(f"Create container with policy {placement_rule}"): - cid = create_container(wallet=default_wallet, rule=placement_rule, basic_acl=PUBLIC_ACL, shell=self.shell, endpoint=endpoint) - - with reporter.step(f"Check container policy"): - self.validate_object_policy(default_wallet, placement_rule, cid, endpoint) - - with reporter.step(f"Put object in container"): - oid = put_object_to_random_node(default_wallet, file_path, cid, shell=self.shell, cluster=self.cluster) - - with reporter.step(f"Check object expected copies"): - resulting_copies = get_nodes_with_object(cid, oid, shell=self.shell, nodes=self.cluster.storage_nodes) - assert len(resulting_copies) == expected_copies, f"Expected {expected_copies} copies, got {len(resulting_copies)}" - - with reporter.step(f"Check the object appearance"): - netmap = parse_netmap_output(get_netmap_snapshot(node=resulting_copies[0], shell=self.shell)) - netmap = self.get_netmap_param(netmap) - with reporter.step(f"Check two nodes are selected with max and min prices"): - for node in resulting_copies: - node_address = node.get_rpc_endpoint().split(":")[0] - assert ( - int(netmap[node_address]["Price"]) > placement_params["Price"][1] - or int(netmap[node_address]["Price"]) < placement_params["Price"][0] - ), f"The node is selected with the wrong price. Got {netmap[node_address]}" - - with reporter.step(f"Delete the object from the container"): - delete_object(wallet=default_wallet, cid=cid, oid=oid, shell=self.shell, endpoint=endpoint) - - with reporter.step(f"Delete the container"): - delete_container(wallet=default_wallet, cid=cid, shell=self.shell, endpoint=endpoint) + delete_container(default_wallet, container, self.shell, rpc_endpoint, await_mode=False) @allure.title("Simple policy results with 50% of available nodes") + @pytest.mark.parametrize("container_request", [PUBLIC_WITH_POLICY("REP 2 CBF 1")], indirect=True) def test_simple_policy_results_with_50_of_available_nodes( self, - default_wallet, + default_wallet: WalletInfo, + rpc_endpoint: str, simple_object_size: ObjectSize, + container: str, + container_request: ContainerRequest, ): """ This test checks object's copies based on container's placement simple policy results with 50% of available nodes. """ - placement_rule = "REP 2 CBF 1" file_path = generate_file(simple_object_size.value) expected_copies = 2 - endpoint = self.cluster.default_rpc_endpoint - - with reporter.step(f"Create container with policy {placement_rule}"): - cid = create_container(wallet=default_wallet, rule=placement_rule, basic_acl=PUBLIC_ACL, shell=self.shell, endpoint=endpoint) with reporter.step(f"Check container policy"): - self.validate_object_policy(default_wallet, placement_rule, cid, endpoint) + validate_object_policy(default_wallet, self.shell, container_request.policy, container, rpc_endpoint) with reporter.step(f"Put object in container"): - oid = put_object_to_random_node(default_wallet, file_path, cid, shell=self.shell, cluster=self.cluster) + oid = put_object_to_random_node(default_wallet, file_path, container, self.shell, self.cluster) with reporter.step(f"Check object expected copies"): - resulting_copies = get_nodes_with_object(cid, oid, shell=self.shell, nodes=self.cluster.storage_nodes) + resulting_copies = get_nodes_with_object(container, oid, self.shell, self.cluster.storage_nodes) assert len(resulting_copies) == expected_copies, f"Expected {expected_copies} copies, got {len(resulting_copies)}" with reporter.step(f"Delete the object from the container"): - delete_object(wallet=default_wallet, cid=cid, oid=oid, shell=self.shell, endpoint=endpoint) + delete_object(default_wallet, container, oid, self.shell, rpc_endpoint) with reporter.step(f"Delete the container"): - delete_container(wallet=default_wallet, cid=cid, shell=self.shell, endpoint=endpoint) + delete_container(default_wallet, container, self.shell, rpc_endpoint, await_mode=False) @allure.title("Policy with SELECT results with 50% of available nodes") + @pytest.mark.parametrize("container_request", [PUBLIC_WITH_POLICY("REP 2 IN HALF CBF 1 SELECT 2 FROM * AS HALF")], indirect=True) def test_policy_with_select_results_with_50_of_available_nodes( self, - default_wallet, + default_wallet: WalletInfo, + rpc_endpoint: str, simple_object_size: ObjectSize, + container: str, + container_request: ContainerRequest, ): """ This test checks object's copies based on container's placement policy with SELECT results with 50% of available nodes. """ - placement_rule = "REP 2 IN HALF CBF 1 SELECT 2 FROM * AS HALF" file_path = generate_file(simple_object_size.value) expected_copies = 2 - endpoint = self.cluster.default_rpc_endpoint - - with reporter.step(f"Create container with policy {placement_rule}"): - cid = create_container(wallet=default_wallet, rule=placement_rule, basic_acl=PUBLIC_ACL, shell=self.shell, endpoint=endpoint) with reporter.step(f"Check container policy"): - self.validate_object_policy(default_wallet, placement_rule, cid, endpoint) + validate_object_policy(default_wallet, self.shell, container_request.policy, container, rpc_endpoint) with reporter.step(f"Put object in container"): - oid = put_object_to_random_node(default_wallet, file_path, cid, shell=self.shell, cluster=self.cluster) + oid = put_object_to_random_node(default_wallet, file_path, container, self.shell, self.cluster) with reporter.step(f"Check object expected copies"): - resulting_copies = get_nodes_with_object(cid, oid, shell=self.shell, nodes=self.cluster.storage_nodes) + resulting_copies = get_nodes_with_object(container, oid, self.shell, self.cluster.storage_nodes) assert len(resulting_copies) == expected_copies, f"Expected {expected_copies} copies, got {len(resulting_copies)}" with reporter.step(f"Delete the object from the container"): - delete_object(wallet=default_wallet, cid=cid, oid=oid, shell=self.shell, endpoint=endpoint) + delete_object(default_wallet, container, oid, self.shell, rpc_endpoint) with reporter.step(f"Delete the container"): - delete_container(wallet=default_wallet, cid=cid, shell=self.shell, endpoint=endpoint) - - @allure.title("Policy with SELECT and FILTER results with 50% of available nodes") - def test_policy_with_select_and_filter_results_with_50_of_available_nodes( - self, - default_wallet, - simple_object_size: ObjectSize, - fill_field_price, - ): - """ - This test checks object's copies based on container's placement policy with SELECT and FILTER results with 50% of available nodes. - """ - placement_rule = "REP 2 IN HALF CBF 1 SELECT 2 FROM GT15 AS HALF FILTER Price GT 15 AS GT15" - placement_params = {"Price": 15} - file_path = generate_file(simple_object_size.value) - expected_copies = 2 - endpoint = self.cluster.default_rpc_endpoint - - with reporter.step(f"Create container with policy {placement_rule}"): - cid = create_container(wallet=default_wallet, rule=placement_rule, basic_acl=PUBLIC_ACL, shell=self.shell, endpoint=endpoint) - - with reporter.step(f"Check container policy"): - self.validate_object_policy(default_wallet, placement_rule, cid, endpoint) - - with reporter.step(f"Put object in container"): - oid = put_object_to_random_node(default_wallet, file_path, cid, shell=self.shell, cluster=self.cluster) - - with reporter.step(f"Check object expected copies"): - resulting_copies = get_nodes_with_object(cid, oid, shell=self.shell, nodes=self.cluster.storage_nodes) - assert len(resulting_copies) == expected_copies, f"Expected {expected_copies} copies, got {len(resulting_copies)}" - - with reporter.step(f"Check the object appearance"): - netmap = parse_netmap_output(get_netmap_snapshot(node=resulting_copies[0], shell=self.shell)) - netmap = self.get_netmap_param(netmap) - with reporter.step(f"Check two nodes are selected with price > {placement_params['Price']}"): - for node in resulting_copies: - node_address = node.get_rpc_endpoint().split(":")[0] - assert ( - int(netmap[node_address]["Price"]) > placement_params["Price"] - ), f"The node is selected with the wrong price. Got {netmap[node_address]}" - - with reporter.step(f"Delete the object from the container"): - delete_object(wallet=default_wallet, cid=cid, oid=oid, shell=self.shell, endpoint=endpoint) - - with reporter.step(f"Delete the container"): - delete_container(wallet=default_wallet, cid=cid, shell=self.shell, endpoint=endpoint) - - @allure.title("Policy with SELECT and Complex FILTER results with 50% of available nodes") - def test_policy_with_select_and_complex_filter_results_with_50_of_available_nodes( - self, - default_wallet, - simple_object_size: ObjectSize, - fill_field_price, - ): - """ - This test checks object's copies based on container's placement policy with SELECT and Complex FILTER results with 50% of available nodes. - """ - placement_rule = ( - "REP 2 IN HALF CBF 2 SELECT 2 FROM GE15 AS HALF FILTER CountryCode NE RU AS NOTRU FILTER @NOTRU AND Price GE 15 AS GE15" - ) - placement_params = {"Price": 15, "country_code": "RU"} - file_path = generate_file(simple_object_size.value) - expected_copies = 2 - endpoint = self.cluster.default_rpc_endpoint - - with reporter.step(f"Create container with policy {placement_rule}"): - cid = create_container(wallet=default_wallet, rule=placement_rule, basic_acl=PUBLIC_ACL, shell=self.shell, endpoint=endpoint) - - with reporter.step(f"Check container policy"): - self.validate_object_policy(default_wallet, placement_rule, cid, endpoint) - - with reporter.step(f"Put object in container"): - oid = put_object_to_random_node(default_wallet, file_path, cid, shell=self.shell, cluster=self.cluster) - - with reporter.step(f"Check object expected copies"): - resulting_copies = get_nodes_with_object(cid, oid, shell=self.shell, nodes=self.cluster.storage_nodes) - assert len(resulting_copies) == expected_copies, f"Expected {expected_copies} copies, got {len(resulting_copies)}" - - with reporter.step(f"Check the object appearance"): - netmap = parse_netmap_output(get_netmap_snapshot(node=resulting_copies[0], shell=self.shell)) - netmap = self.get_netmap_param(netmap) - with reporter.step(f"Check two nodes are selected not with country code '{placement_params['country_code']}'"): - for node in resulting_copies: - node_address = node.get_rpc_endpoint().split(":")[0] - assert ( - not netmap[node_address]["country_code"] == placement_params["country_code"] - or not netmap[node_address]["country_code"] == placement_params["country_code"] - and int(netmap[node_address]["Price"]) >= placement_params["Price"] - ), f"The node is selected with the wrong price or country code. Got {netmap[node_address]}" - - with reporter.step(f"Delete the object from the container"): - delete_object(wallet=default_wallet, cid=cid, oid=oid, shell=self.shell, endpoint=endpoint) - - with reporter.step(f"Delete the container"): - delete_container(wallet=default_wallet, cid=cid, shell=self.shell, endpoint=endpoint) - - @allure.title("Policy with Multi SELECTs and FILTERs results with 50% of available nodes") - def test_policy_with_multi_selects_and_filters_results_with_50_of_available_nodes( - self, - default_wallet, - simple_object_size: ObjectSize, - fill_field_price, - ): - """ - This test checks object's copies based on container's placement policy with Multi SELECTs and FILTERs results with 50% of available nodes. - """ - placement_rule = "REP 2 IN FH REP 1 IN SH CBF 2 SELECT 2 FROM LE55 AS FH SELECT 2 FROM GE15 AS SH FILTER 'UN-LOCODE' EQ 'RU LED' OR 'UN-LOCODE' EQ 'RU MOW' AS RU FILTER NOT (@RU) AS NOTRU FILTER @NOTRU AND Price GE 15 AS GE15 FILTER @RU AND Price LE 55 AS LE55" - placement_params = {"un_locode": ["RU LED", "RU MOW"], "Price": [15, 55]} - file_path = generate_file(simple_object_size.value) - expected_copies = 3 - endpoint = self.cluster.default_rpc_endpoint - - with reporter.step(f"Create container with policy {placement_rule}"): - cid = create_container(wallet=default_wallet, rule=placement_rule, basic_acl=PUBLIC_ACL, shell=self.shell, endpoint=endpoint) - - with reporter.step(f"Check container policy"): - self.validate_object_policy(default_wallet, placement_rule, cid, endpoint) - - with reporter.step(f"Put object in container"): - oid = put_object_to_random_node(default_wallet, file_path, cid, shell=self.shell, cluster=self.cluster) - - with reporter.step(f"Check object expected copies"): - resulting_copies = get_nodes_with_object(cid, oid, shell=self.shell, nodes=self.cluster.storage_nodes) - assert len(resulting_copies) == expected_copies, f"Expected {expected_copies} copies, got {len(resulting_copies)}" - - with reporter.step(f"Check the object appearance"): - netmap = parse_netmap_output(get_netmap_snapshot(node=resulting_copies[0], shell=self.shell)) - netmap = self.get_netmap_param(netmap) - with reporter.step(f"Check all nodes are selected"): - for node in resulting_copies: - node_address = node.get_rpc_endpoint().split(":")[0] - assert ( - netmap[node_address]["un_locode"] in placement_params["un_locode"] - or not netmap[node_address]["un_locode"] == placement_params["un_locode"][1] - or ( - not netmap[node_address]["un_locode"] == placement_params["un_locode"][1] - and int(netmap[node_address]["Price"]) >= placement_params["Price"][0] - ) - or ( - netmap[node_address]["un_locode"] == placement_params["un_locode"][1] - and int(netmap[node_address]["Price"]) <= placement_params["Price"][1] - ) - ), f"The node is selected with the wrong price or un_locode. Expected {placement_params} and got {netmap[node_address]}" - - with reporter.step(f"Delete the object from the container"): - delete_object(wallet=default_wallet, cid=cid, oid=oid, shell=self.shell, endpoint=endpoint) - - with reporter.step(f"Delete the container"): - delete_container(wallet=default_wallet, cid=cid, shell=self.shell, endpoint=endpoint) + delete_container(default_wallet, container, self.shell, rpc_endpoint, await_mode=False) @allure.title("Simple policy results with 75% of available nodes") + @pytest.mark.parametrize("container_request", [PUBLIC_WITH_POLICY("REP 1")], indirect=True) def test_simple_policy_results_with_75_of_available_nodes( self, - default_wallet, + default_wallet: WalletInfo, + rpc_endpoint: str, simple_object_size: ObjectSize, + container: str, + container_request: ContainerRequest, ): """ This test checks object's copies based on container's placement simple policy results with 75% of available nodes. """ - placement_rule = "REP 1" file_path = generate_file(simple_object_size.value) expected_copies = 1 - endpoint = self.cluster.default_rpc_endpoint - - with reporter.step(f"Create container with policy {placement_rule}"): - cid = create_container(wallet=default_wallet, rule=placement_rule, basic_acl=PUBLIC_ACL, shell=self.shell, endpoint=endpoint) with reporter.step(f"Check container policy"): - self.validate_object_policy(default_wallet, placement_rule, cid, endpoint) + validate_object_policy(default_wallet, self.shell, container_request.policy, container, rpc_endpoint) with reporter.step(f"Put object in container"): - oid = put_object_to_random_node(default_wallet, file_path, cid, shell=self.shell, cluster=self.cluster) + oid = put_object_to_random_node(default_wallet, file_path, container, self.shell, self.cluster) with reporter.step(f"Check object expected copies"): - resulting_copies = get_nodes_with_object(cid, oid, shell=self.shell, nodes=self.cluster.storage_nodes) + resulting_copies = get_nodes_with_object(container, oid, self.shell, self.cluster.storage_nodes) assert len(resulting_copies) == expected_copies, f"Expected {expected_copies} copies, got {len(resulting_copies)}" with reporter.step(f"Delete the object from the container"): - delete_object(wallet=default_wallet, cid=cid, oid=oid, shell=self.shell, endpoint=endpoint) + delete_object(default_wallet, container, oid, self.shell, rpc_endpoint) with reporter.step(f"Delete the container"): - delete_container(wallet=default_wallet, cid=cid, shell=self.shell, endpoint=endpoint) + delete_container(default_wallet, container, self.shell, rpc_endpoint, await_mode=False) @allure.title("Policy with SELECT results with 75% of available nodes") + @pytest.mark.parametrize( + "container_request", [PUBLIC_WITH_POLICY("REP 2 IN DS CBF 1 SELECT 3 IN DISTINCT Country FROM * AS DS")], indirect=True + ) def test_policy_with_select_results_with_75_of_available_nodes( self, - default_wallet, + default_wallet: WalletInfo, + rpc_endpoint: str, simple_object_size: ObjectSize, + container: str, + container_request: ContainerRequest, ): """ This test checks object's copies based on container's placement policy with SELECT results with 75% of available nodes. """ - placement_rule = "REP 2 IN DS CBF 1 SELECT 3 IN DISTINCT Country FROM * AS DS" file_path = generate_file(simple_object_size.value) expected_copies = 2 - endpoint = self.cluster.default_rpc_endpoint - - with reporter.step(f"Create container with policy {placement_rule}"): - cid = create_container(wallet=default_wallet, rule=placement_rule, basic_acl=PUBLIC_ACL, shell=self.shell, endpoint=endpoint) with reporter.step(f"Check container policy"): - self.validate_object_policy(default_wallet, placement_rule, cid, endpoint) + validate_object_policy(default_wallet, self.shell, container_request.policy, container, rpc_endpoint) with reporter.step(f"Put object in container"): - oid = put_object_to_random_node(default_wallet, file_path, cid, shell=self.shell, cluster=self.cluster) + oid = put_object_to_random_node(default_wallet, file_path, container, self.shell, self.cluster) with reporter.step(f"Check object expected copies"): - resulting_copies = get_nodes_with_object(cid, oid, shell=self.shell, nodes=self.cluster.storage_nodes) + resulting_copies = get_nodes_with_object(container, oid, self.shell, self.cluster.storage_nodes) assert len(resulting_copies) == expected_copies, f"Expected {expected_copies} copies, got {len(resulting_copies)}" with reporter.step(f"Delete the object from the container"): - delete_object(wallet=default_wallet, cid=cid, oid=oid, shell=self.shell, endpoint=endpoint) + delete_object(default_wallet, container, oid, self.shell, rpc_endpoint) with reporter.step(f"Delete the container"): - delete_container(wallet=default_wallet, cid=cid, shell=self.shell, endpoint=endpoint) - - @allure.title("Policy with SELECT and FILTER results with 75% of available nodes") - def test_policy_with_select_and_filter_results_with_75_of_available_nodes( - self, - default_wallet, - simple_object_size: ObjectSize, - fill_field_price, - ): - """ - This test checks object's copies based on container's placement policy with SELECT and FILTER results with 75% of available nodes. - """ - placement_rule = "REP 2 IN NODES75 SELECT 2 FROM LT65 AS NODES75 FILTER Price LT 65 AS LT65" - placement_params = {"Price": 65} - file_path = generate_file(simple_object_size.value) - expected_copies = 2 - endpoint = self.cluster.default_rpc_endpoint - - with reporter.step(f"Create container with policy {placement_rule}"): - cid = create_container(wallet=default_wallet, rule=placement_rule, basic_acl=PUBLIC_ACL, shell=self.shell, endpoint=endpoint) - - with reporter.step(f"Check container policy"): - self.validate_object_policy(default_wallet, placement_rule, cid, endpoint) - - with reporter.step(f"Put object in container"): - oid = put_object_to_random_node(default_wallet, file_path, cid, shell=self.shell, cluster=self.cluster) - - with reporter.step(f"Check object expected copies"): - resulting_copies = get_nodes_with_object(cid, oid, shell=self.shell, nodes=self.cluster.storage_nodes) - assert len(resulting_copies) == expected_copies, f"Expected {expected_copies} copies, got {len(resulting_copies)}" - - with reporter.step(f"Check the object appearance"): - netmap = parse_netmap_output(get_netmap_snapshot(node=resulting_copies[0], shell=self.shell)) - netmap = self.get_netmap_param(netmap) - with reporter.step(f"Check two nodes are selected with price < {placement_params['Price']}"): - for node in resulting_copies: - node_address = node.get_rpc_endpoint().split(":")[0] - assert ( - int(netmap[node_address]["Price"]) < placement_params["Price"] - ), f"The node is selected with the wrong price. Got {netmap[node_address]}" - - with reporter.step(f"Delete the object from the container"): - delete_object(wallet=default_wallet, cid=cid, oid=oid, shell=self.shell, endpoint=endpoint) - - with reporter.step(f"Delete the container"): - delete_container(wallet=default_wallet, cid=cid, shell=self.shell, endpoint=endpoint) - - @allure.title("Policy with SELECT and Complex FILTER results with 75% of available nodes") - def test_policy_with_select_and_complex_filter_results_with_75_of_available_nodes( - self, - default_wallet, - simple_object_size: ObjectSize, - fill_field_price: None, - ): - """ - This test checks object's copies based on container's placement policy with SELECT and Complex FILTER results with 75% of available nodes. - """ - placement_rule = ( - "REP 2 IN NODES75 SELECT 2 FROM LT65 AS NODES75 FILTER Continent NE America AS NOAM FILTER @NOAM AND Price LT 65 AS LT65" - ) - placement_params = {"Price": 65, "continent": "America"} - file_path = generate_file(simple_object_size.value) - expected_copies = 2 - endpoint = self.cluster.default_rpc_endpoint - - with reporter.step(f"Create container with policy {placement_rule}"): - cid = create_container(wallet=default_wallet, rule=placement_rule, basic_acl=PUBLIC_ACL, shell=self.shell, endpoint=endpoint) - - with reporter.step(f"Check container policy"): - self.validate_object_policy(default_wallet, placement_rule, cid, endpoint) - - with reporter.step(f"Put object in container"): - oid = put_object_to_random_node(default_wallet, file_path, cid, shell=self.shell, cluster=self.cluster) - - with reporter.step(f"Check object expected copies"): - resulting_copies = get_nodes_with_object(cid, oid, shell=self.shell, nodes=self.cluster.storage_nodes) - assert len(resulting_copies) == expected_copies, f"Expected {expected_copies} copies, got {len(resulting_copies)}" - - with reporter.step(f"Check the object appearance"): - netmap = parse_netmap_output(get_netmap_snapshot(node=resulting_copies[0], shell=self.shell)) - netmap = self.get_netmap_param(netmap) - with reporter.step(f"Check three nodes are selected not from {placement_params['continent']}"): - for node in resulting_copies: - node_address = node.get_rpc_endpoint().split(":")[0] - assert ( - int(netmap[node_address]["Price"]) < placement_params["Price"] - and not netmap[node_address]["continent"] == placement_params["continent"] - ) or ( - not netmap[node_address]["continent"] == placement_params["continent"] - ), f"The node is selected with the wrong price or continent. Got {netmap[node_address]}" - - with reporter.step(f"Delete the object from the container"): - delete_object(wallet=default_wallet, cid=cid, oid=oid, shell=self.shell, endpoint=endpoint) - - with reporter.step(f"Delete the container"): - delete_container(wallet=default_wallet, cid=cid, shell=self.shell, endpoint=endpoint) - - @allure.title("Policy with Multi SELECTs and FILTERs results with 75% of available nodes") - def test_policy_with_multi_selects_and_filters_results_with_75_of_available_nodes( - self, - default_wallet, - simple_object_size: ObjectSize, - fill_field_price, - ): - """ - This test checks object's copies based on container's placement policy with Multi SELECTs and FILTERs results with 75% of available nodes. - """ - placement_rule = "REP 3 IN EXPNSV REP 3 IN CHEAP SELECT 3 FROM GT10 AS EXPNSV SELECT 3 FROM LT65 AS CHEAP FILTER NOT (Continent EQ America) AS NOAM FILTER @NOAM AND Price LT 65 AS LT65 FILTER @NOAM AND Price GT 10 AS GT10" - placement_params = {"Price": [65, 10], "continent": "America"} - file_path = generate_file(simple_object_size.value) - expected_copies = 4 - endpoint = self.cluster.default_rpc_endpoint - - with reporter.step(f"Create container with policy {placement_rule}"): - cid = create_container(wallet=default_wallet, rule=placement_rule, basic_acl=PUBLIC_ACL, shell=self.shell, endpoint=endpoint) - - with reporter.step(f"Check container policy"): - self.validate_object_policy(default_wallet, placement_rule, cid, endpoint) - - with reporter.step(f"Put object in container"): - oid = put_object_to_random_node(default_wallet, file_path, cid, shell=self.shell, cluster=self.cluster) - - with reporter.step(f"Check object expected copies"): - resulting_copies = get_nodes_with_object(cid, oid, shell=self.shell, nodes=self.cluster.storage_nodes) - assert len(resulting_copies) == expected_copies, f"Expected {expected_copies} copies, got {len(resulting_copies)}" - - with reporter.step(f"Check the object appearance"): - netmap = parse_netmap_output(get_netmap_snapshot(node=resulting_copies[0], shell=self.shell)) - netmap = self.get_netmap_param(netmap) - with reporter.step(f"Check all nodes are selected"): - for node in resulting_copies: - node_address = node.get_rpc_endpoint().split(":")[0] - assert ( - ( - int(netmap[node_address]["Price"]) > placement_params["Price"][1] - and not netmap[node_address]["continent"] == placement_params["continent"] - ) - or ( - int(netmap[node_address]["Price"]) < placement_params["Price"][0] - and not netmap[node_address]["continent"] == placement_params["continent"] - ) - or not (netmap[node_address]["continent"] == placement_params["continent"]) - ), f"The node is selected with the wrong price or continent. Got {netmap[node_address]}" - - with reporter.step(f"Delete the object from the container"): - delete_object(wallet=default_wallet, cid=cid, oid=oid, shell=self.shell, endpoint=endpoint) - - with reporter.step(f"Delete the container"): - delete_container(wallet=default_wallet, cid=cid, shell=self.shell, endpoint=endpoint) + delete_container(default_wallet, container, self.shell, rpc_endpoint, await_mode=False) @allure.title("Simple policy results with 100% of available nodes") + @pytest.mark.parametrize("container_request", [PUBLIC_WITH_POLICY("REP 4")], indirect=True) def test_simple_policy_results_with_100_of_available_nodes( self, - default_wallet, + default_wallet: WalletInfo, + rpc_endpoint: str, simple_object_size: ObjectSize, + container: str, + container_request: ContainerRequest, ): """ This test checks object's copies based on container's placement simple policy results with 100% of available nodes. """ - placement_rule = "REP 4" file_path = generate_file(simple_object_size.value) expected_copies = 4 - endpoint = self.cluster.default_rpc_endpoint - - with reporter.step(f"Create container with policy {placement_rule}"): - cid = create_container(wallet=default_wallet, rule=placement_rule, basic_acl=PUBLIC_ACL, shell=self.shell, endpoint=endpoint) with reporter.step(f"Check container policy"): - self.validate_object_policy(default_wallet, placement_rule, cid, endpoint) + validate_object_policy(default_wallet, self.shell, container_request.policy, container, rpc_endpoint) with reporter.step(f"Put object in container"): - oid = put_object_to_random_node(default_wallet, file_path, cid, shell=self.shell, cluster=self.cluster) + oid = put_object_to_random_node(default_wallet, file_path, container, self.shell, self.cluster) with reporter.step(f"Check object expected copies"): - resulting_copies = get_nodes_with_object(cid, oid, shell=self.shell, nodes=self.cluster.storage_nodes) + resulting_copies = get_nodes_with_object(container, oid, self.shell, self.cluster.storage_nodes) assert len(resulting_copies) == expected_copies, f"Expected {expected_copies} copies, got {len(resulting_copies)}" with reporter.step(f"Delete the object from the container"): - delete_object(wallet=default_wallet, cid=cid, oid=oid, shell=self.shell, endpoint=endpoint) + delete_object(default_wallet, container, oid, self.shell, rpc_endpoint) with reporter.step(f"Delete the container"): - delete_container(wallet=default_wallet, cid=cid, shell=self.shell, endpoint=endpoint) + delete_container(default_wallet, container, self.shell, rpc_endpoint, await_mode=False) @allure.title("Policy with SELECT results with 100% of available nodes") + @pytest.mark.parametrize("container_request", [PUBLIC_WITH_POLICY("REP 1 IN All SELECT 4 FROM * AS All")], indirect=True) def test_policy_with_select_results_with_100_of_available_nodes( self, - default_wallet, + default_wallet: WalletInfo, + rpc_endpoint: str, simple_object_size: ObjectSize, + container: str, + container_request: ContainerRequest, ): """ This test checks object's copies based on container's placement policy with SELECT results with 100% of available nodes. """ - placement_rule = "REP 1 IN All SELECT 4 FROM * AS All" file_path = generate_file(simple_object_size.value) expected_copies = 1 - endpoint = self.cluster.default_rpc_endpoint - - with reporter.step(f"Create container with policy {placement_rule}"): - cid = create_container(wallet=default_wallet, rule=placement_rule, basic_acl=PUBLIC_ACL, shell=self.shell, endpoint=endpoint) with reporter.step(f"Check container policy"): - self.validate_object_policy(default_wallet, placement_rule, cid, endpoint) + validate_object_policy(default_wallet, self.shell, container_request.policy, container, rpc_endpoint) with reporter.step(f"Put object in container"): - oid = put_object_to_random_node(default_wallet, file_path, cid, shell=self.shell, cluster=self.cluster) + oid = put_object_to_random_node(default_wallet, file_path, container, self.shell, self.cluster) with reporter.step(f"Check object expected copies"): - resulting_copies = get_nodes_with_object(cid, oid, shell=self.shell, nodes=self.cluster.storage_nodes) + resulting_copies = get_nodes_with_object(container, oid, self.shell, self.cluster.storage_nodes) assert len(resulting_copies) == expected_copies, f"Expected {expected_copies} copies, got {len(resulting_copies)}" with reporter.step(f"Delete the object from the container"): - delete_object(wallet=default_wallet, cid=cid, oid=oid, shell=self.shell, endpoint=endpoint) + delete_object(default_wallet, container, oid, self.shell, rpc_endpoint) with reporter.step(f"Delete the container"): - delete_container(wallet=default_wallet, cid=cid, shell=self.shell, endpoint=endpoint) - - @allure.title("Policy with SELECT and FILTER results with 100% of available nodes") - def test_policy_with_select_and_filter_results_with_100_of_available_nodes( - self, - default_wallet, - simple_object_size: ObjectSize, - fill_field_price, - ): - """ - This test checks object's copies based on container's placement policy with SELECT and FILTER results with 100% of available nodes. - """ - placement_rule = "REP 1 IN All SELECT 4 FROM AllNodes AS All FILTER Price GE 0 AS AllNodes" - placement_params = {"Price": 0} - file_path = generate_file(simple_object_size.value) - expected_copies = 1 - endpoint = self.cluster.default_rpc_endpoint - - with reporter.step(f"Create container with policy {placement_rule}"): - cid = create_container(wallet=default_wallet, rule=placement_rule, basic_acl=PUBLIC_ACL, shell=self.shell, endpoint=endpoint) - - with reporter.step(f"Check container policy"): - self.validate_object_policy(default_wallet, placement_rule, cid, endpoint) - - with reporter.step(f"Put object in container"): - oid = put_object_to_random_node(default_wallet, file_path, cid, shell=self.shell, cluster=self.cluster) - - with reporter.step(f"Check object expected copies"): - resulting_copies = get_nodes_with_object(cid, oid, shell=self.shell, nodes=self.cluster.storage_nodes) - assert len(resulting_copies) == expected_copies, f"Expected {expected_copies} copies, got {len(resulting_copies)}" - - with reporter.step(f"Check the object appearance"): - netmap = parse_netmap_output(get_netmap_snapshot(node=resulting_copies[0], shell=self.shell)) - netmap = self.get_netmap_param(netmap) - node_address = resulting_copies[0].get_rpc_endpoint().split(":")[0] - with reporter.step(f"Check the node is selected with price >= {placement_params['Price']}"): - assert ( - int(netmap[node_address]["Price"]) >= placement_params["Price"] - ), f"The node is selected with the wrong price. Got {netmap[node_address]}" - - with reporter.step(f"Delete the object from the container"): - delete_object(wallet=default_wallet, cid=cid, oid=oid, shell=self.shell, endpoint=endpoint) - - with reporter.step(f"Delete the container"): - delete_container(wallet=default_wallet, cid=cid, shell=self.shell, endpoint=endpoint) + delete_container(default_wallet, container, self.shell, rpc_endpoint, await_mode=False) @allure.title("Policy with SELECT and Complex FILTER results with 100% of available nodes") + @pytest.mark.parametrize( + "container_request", + [ + PUBLIC_WITH_POLICY( + "REP 2 IN All SELECT 4 FROM AllNodes AS All FILTER Country EQ Russia OR Country EQ Sweden OR Country EQ Finland AS AllCountries FILTER @AllCountries AND Continent EQ Europe AS AllNodes" + ) + ], + indirect=True, + ) def test_policy_with_select_and_complex_filter_results_with_100_of_available_nodes( self, - default_wallet, + default_wallet: WalletInfo, + rpc_endpoint: str, simple_object_size: ObjectSize, + container: str, + container_request: ContainerRequest, ): """ This test checks object's copies based on container's placement policy with SELECT and Complex FILTER results with 100% of available nodes. """ - placement_rule = "REP 2 IN All SELECT 4 FROM AllNodes AS All FILTER Country EQ Russia OR Country EQ Sweden OR Country EQ Finland AS AllCountries FILTER @AllCountries AND Continent EQ Europe AS AllNodes" placement_params = {"country": ["Russia", "Sweden", "Finland"], "continent": "Europe"} file_path = generate_file(simple_object_size.value) expected_copies = 2 - endpoint = self.cluster.default_rpc_endpoint - - with reporter.step(f"Create container with policy {placement_rule}"): - cid = create_container(wallet=default_wallet, rule=placement_rule, basic_acl=PUBLIC_ACL, shell=self.shell, endpoint=endpoint) with reporter.step(f"Check container policy"): - self.validate_object_policy(default_wallet, placement_rule, cid, endpoint) + validate_object_policy(default_wallet, self.shell, container_request.policy, container, rpc_endpoint) with reporter.step(f"Put object in container"): - oid = put_object_to_random_node(default_wallet, file_path, cid, shell=self.shell, cluster=self.cluster) + oid = put_object_to_random_node(default_wallet, file_path, container, self.shell, self.cluster) with reporter.step(f"Check object expected copies"): - resulting_copies = get_nodes_with_object(cid, oid, shell=self.shell, nodes=self.cluster.storage_nodes) + resulting_copies = get_nodes_with_object(container, oid, self.shell, self.cluster.storage_nodes) assert len(resulting_copies) == expected_copies, f"Expected {expected_copies} copies, got {len(resulting_copies)}" with reporter.step(f"Check the object appearance"): netmap = parse_netmap_output(get_netmap_snapshot(node=resulting_copies[0], shell=self.shell)) - netmap = self.get_netmap_param(netmap) + netmap = get_netmap_param(netmap) with reporter.step(f"Check two nodes are selected from {' or '.join(placement_params['country'])}"): for node in resulting_copies: node_address = node.get_rpc_endpoint().split(":")[0] @@ -1145,153 +644,117 @@ class TestPolicy(ClusterTestBase): ), f"The node is selected from the wrong country or continent. Got {netmap[node_address]}" with reporter.step(f"Delete the object from the container"): - delete_object(wallet=default_wallet, cid=cid, oid=oid, shell=self.shell, endpoint=endpoint) + delete_object(default_wallet, container, oid, self.shell, rpc_endpoint) with reporter.step(f"Delete the container"): - delete_container(wallet=default_wallet, cid=cid, shell=self.shell, endpoint=endpoint) - - @allure.title("Policy with Multi SELECTs and FILTERs results with 100% of available nodes") - def test_policy_with_multi_selects_and_filters_results_with_100_of_available_nodes( - self, - default_wallet, - simple_object_size: ObjectSize, - fill_field_price, - ): - """ - This test checks object's copies based on container's placement policy with Multi SELECTs and FILTERs results with 100% of available nodes. - """ - placement_rule = "REP 4 IN AllOne REP 4 IN AllTwo CBF 4 SELECT 2 FROM GEZero AS AllOne SELECT 2 FROM AllCountries AS AllTwo FILTER Country EQ Russia OR Country EQ Sweden OR Country EQ Finland AS AllCountries FILTER Price GE 0 AS GEZero" - placement_params = {"country": ["Russia", "Sweden", "Finland"], "Price": 0} - file_path = generate_file(simple_object_size.value) - expected_copies = 4 - endpoint = self.cluster.default_rpc_endpoint - - with reporter.step(f"Create container with policy {placement_rule}"): - cid = create_container(wallet=default_wallet, rule=placement_rule, basic_acl=PUBLIC_ACL, shell=self.shell, endpoint=endpoint) - - with reporter.step(f"Check container policy"): - self.validate_object_policy(default_wallet, placement_rule, cid, endpoint) - - with reporter.step(f"Put object in container"): - oid = put_object_to_random_node(default_wallet, file_path, cid, shell=self.shell, cluster=self.cluster) - - with reporter.step(f"Check object expected copies"): - resulting_copies = get_nodes_with_object(cid, oid, shell=self.shell, nodes=self.cluster.storage_nodes) - assert len(resulting_copies) == expected_copies, f"Expected {expected_copies} copies, got {len(resulting_copies)}" - - with reporter.step(f"Check the object appearance"): - netmap = parse_netmap_output(get_netmap_snapshot(node=resulting_copies[0], shell=self.shell)) - netmap = self.get_netmap_param(netmap) - with reporter.step(f"Check all node are selected"): - for node in resulting_copies: - node_address = node.get_rpc_endpoint().split(":")[0] - assert (netmap[node_address]["country"] in placement_params["country"]) or ( - int(netmap[node_address]["Price"]) >= placement_params["Price"] - ), f"The node is selected from the wrong country or with wrong price. Got {netmap[node_address]}" - - with reporter.step(f"Delete the object from the container"): - delete_object(wallet=default_wallet, cid=cid, oid=oid, shell=self.shell, endpoint=endpoint) - - with reporter.step(f"Delete the container"): - delete_container(wallet=default_wallet, cid=cid, shell=self.shell, endpoint=endpoint) + delete_container(default_wallet, container, self.shell, rpc_endpoint, await_mode=False) @allure.title("Simple policy results with UNIQUE nodes") + @pytest.mark.parametrize("container_request", [PUBLIC_WITH_POLICY("UNIQUE REP 1 REP 1 CBF 1")], indirect=True) def test_simple_policy_results_with_unique_nodes( self, - default_wallet, + default_wallet: WalletInfo, + rpc_endpoint: str, simple_object_size: ObjectSize, + container: str, + container_request: ContainerRequest, ): """ This test checks object's copies based on container's placement simple policy results with UNIQUE nodes. """ - placement_rule = "UNIQUE REP 1 REP 1 CBF 1" file_path = generate_file(simple_object_size.value) expected_copies = 2 - endpoint = self.cluster.default_rpc_endpoint - - with reporter.step(f"Create container with policy {placement_rule}"): - cid = create_container(wallet=default_wallet, rule=placement_rule, basic_acl=PUBLIC_ACL, shell=self.shell, endpoint=endpoint) with reporter.step(f"Check container policy"): - self.validate_object_policy(default_wallet, placement_rule, cid, endpoint) + validate_object_policy(default_wallet, self.shell, container_request.policy, container, rpc_endpoint) with reporter.step(f"Put object in container"): - oid = put_object_to_random_node(default_wallet, file_path, cid, shell=self.shell, cluster=self.cluster) + oid = put_object_to_random_node(default_wallet, file_path, container, self.shell, self.cluster) with reporter.step(f"Check object expected copies"): - resulting_copies = get_nodes_with_object(cid, oid, shell=self.shell, nodes=self.cluster.storage_nodes) + resulting_copies = get_nodes_with_object(container, oid, self.shell, self.cluster.storage_nodes) assert len(resulting_copies) == expected_copies, f"Expected {expected_copies} copies, got {len(resulting_copies)}" with reporter.step(f"Delete the object from the container"): - delete_object(wallet=default_wallet, cid=cid, oid=oid, shell=self.shell, endpoint=endpoint) + delete_object(default_wallet, container, oid, self.shell, rpc_endpoint) with reporter.step(f"Delete the container"): - delete_container(wallet=default_wallet, cid=cid, shell=self.shell, endpoint=endpoint) + delete_container(default_wallet, container, self.shell, rpc_endpoint, await_mode=False) @allure.title("Policy with SELECT results with UNIQUE nodes") + @pytest.mark.parametrize( + "container_request", + [PUBLIC_WITH_POLICY("UNIQUE REP 1 IN AnyNode REP 1 IN AnyNode CBF 1 SELECT 1 FROM * AS AnyNode")], + indirect=True, + ) def test_policy_with_select_results_with_unique_nodes( self, - default_wallet, + default_wallet: WalletInfo, + rpc_endpoint: str, simple_object_size: ObjectSize, + container: str, + container_request: ContainerRequest, ): """ This test checks object's copies based on container's placement policy with SELECT results with UNIQUE nodes. """ - placement_rule = "UNIQUE REP 1 IN AnyNode REP 1 IN AnyNode CBF 1 SELECT 1 FROM * AS AnyNode" file_path = generate_file(simple_object_size.value) expected_copies = 2 - endpoint = self.cluster.default_rpc_endpoint - - with reporter.step(f"Create container with policy {placement_rule}"): - cid = create_container(wallet=default_wallet, rule=placement_rule, basic_acl=PUBLIC_ACL, shell=self.shell, endpoint=endpoint) with reporter.step(f"Check container policy"): - self.validate_object_policy(default_wallet, placement_rule, cid, endpoint) + validate_object_policy(default_wallet, self.shell, container_request.policy, container, rpc_endpoint) with reporter.step(f"Put object in container"): - oid = put_object_to_random_node(default_wallet, file_path, cid, shell=self.shell, cluster=self.cluster) + oid = put_object_to_random_node(default_wallet, file_path, container, self.shell, self.cluster) with reporter.step(f"Check object expected copies"): - resulting_copies = get_nodes_with_object(cid, oid, shell=self.shell, nodes=self.cluster.storage_nodes) + resulting_copies = get_nodes_with_object(container, oid, self.shell, self.cluster.storage_nodes) assert len(resulting_copies) == expected_copies, f"Expected {expected_copies} copies, got {len(resulting_copies)}" with reporter.step(f"Delete the object from the container"): - delete_object(wallet=default_wallet, cid=cid, oid=oid, shell=self.shell, endpoint=endpoint) + delete_object(default_wallet, container, oid, self.shell, rpc_endpoint) with reporter.step(f"Delete the container"): - delete_container(wallet=default_wallet, cid=cid, shell=self.shell, endpoint=endpoint) + delete_container(default_wallet, container, self.shell, rpc_endpoint, await_mode=False) @allure.title("Policy with SELECT and Complex FILTER results with UNIQUE nodes") + @pytest.mark.parametrize( + "container_request", + [ + PUBLIC_WITH_POLICY( + "UNIQUE REP 1 IN RUS REP 1 IN RUS CBF 1 SELECT 1 FROM RU AS RUS FILTER Country NE Sweden AS NotSE FILTER @NotSE AND NOT (CountryCode EQ FI) AND Country EQ 'Russia' AS RU" + ) + ], + indirect=True, + ) def test_policy_with_select_and_complex_filter_results_with_unique_nodes( self, - default_wallet, + default_wallet: WalletInfo, + rpc_endpoint: str, simple_object_size: ObjectSize, + container: str, + container_request: ContainerRequest, ): """ This test checks object's copies based on container's placement policy with SELECT and Complex FILTER results with UNIQUE nodes. """ - placement_rule = "UNIQUE REP 1 IN RUS REP 1 IN RUS CBF 1 SELECT 1 FROM RU AS RUS FILTER Country NE Sweden AS NotSE FILTER @NotSE AND NOT (CountryCode EQ FI) AND Country EQ 'Russia' AS RU" placement_params = {"country_code": "FI", "country": ["Sweden", "Russia"]} file_path = generate_file(simple_object_size.value) expected_copies = 2 - endpoint = self.cluster.default_rpc_endpoint - - with reporter.step(f"Create container with policy {placement_rule}"): - cid = create_container(wallet=default_wallet, rule=placement_rule, basic_acl=PUBLIC_ACL, shell=self.shell, endpoint=endpoint) with reporter.step(f"Check container policy"): - self.validate_object_policy(default_wallet, placement_rule, cid, endpoint) + validate_object_policy(default_wallet, self.shell, container_request.policy, container, rpc_endpoint) with reporter.step(f"Put object in container"): - oid = put_object_to_random_node(default_wallet, file_path, cid, shell=self.shell, cluster=self.cluster) + oid = put_object_to_random_node(default_wallet, file_path, container, self.shell, self.cluster) with reporter.step(f"Check object expected copies"): - resulting_copies = get_nodes_with_object(cid, oid, shell=self.shell, nodes=self.cluster.storage_nodes) + resulting_copies = get_nodes_with_object(container, oid, self.shell, self.cluster.storage_nodes) assert len(resulting_copies) == expected_copies, f"Expected {expected_copies} copies, got {len(resulting_copies)}" with reporter.step(f"Check the object appearance"): netmap = parse_netmap_output(get_netmap_snapshot(node=resulting_copies[0], shell=self.shell)) - netmap = self.get_netmap_param(netmap) + netmap = get_netmap_param(netmap) with reporter.step(f"Check two nodes are selected not from {placement_params['country'][0]}"): for node in resulting_copies: node_address = node.get_rpc_endpoint().split(":")[0] @@ -1302,42 +765,49 @@ class TestPolicy(ClusterTestBase): ), f"The node is selected from the wrong country or with wrong country code. Got {netmap[node_address]}" with reporter.step(f"Delete the object from the container"): - delete_object(wallet=default_wallet, cid=cid, oid=oid, shell=self.shell, endpoint=endpoint) + delete_object(default_wallet, container, oid, self.shell, rpc_endpoint) with reporter.step(f"Delete the container"): - delete_container(wallet=default_wallet, cid=cid, shell=self.shell, endpoint=endpoint) + delete_container(default_wallet, container, self.shell, rpc_endpoint, await_mode=False) @allure.title("Policy with Multi SELECTs and FILTERs results with UNIQUE nodes") + @pytest.mark.parametrize( + "container_request", + [ + PUBLIC_WITH_POLICY( + "UNIQUE REP 1 IN RU REP 1 IN EU REP 1 IN RU CBF 1 SELECT 1 FROM RUS AS RU SELECT 1 FROM EUR AS EU FILTER Country EQ Russia AS RUS FILTER NOT (@RUS) AND Country EQ Sweden OR CountryCode EQ FI AS EUR" + ) + ], + indirect=True, + ) def test_policy_with_multi_selects_and_filters_results_with_unique_nodes( self, - default_wallet, + default_wallet: WalletInfo, + rpc_endpoint: str, simple_object_size: ObjectSize, + container: str, + container_request: ContainerRequest, ): """ This test checks object's copies based on container's placement policy with Multi SELECTs and FILTERs results with UNIQUE nodes. """ - placement_rule = "UNIQUE REP 1 IN RU REP 1 IN EU REP 1 IN RU CBF 1 SELECT 1 FROM RUS AS RU SELECT 1 FROM EUR AS EU FILTER Country EQ Russia AS RUS FILTER NOT (@RUS) AND Country EQ Sweden OR CountryCode EQ FI AS EUR" placement_params = {"country": ["Russia", "Sweden"], "country_code": "FI"} file_path = generate_file(simple_object_size.value) expected_copies = 3 - endpoint = self.cluster.default_rpc_endpoint - - with reporter.step(f"Create container with policy {placement_rule}"): - cid = create_container(wallet=default_wallet, rule=placement_rule, basic_acl=PUBLIC_ACL, shell=self.shell, endpoint=endpoint) with reporter.step(f"Check container policy"): - self.validate_object_policy(default_wallet, placement_rule, cid, endpoint) + validate_object_policy(default_wallet, self.shell, container_request.policy, container, rpc_endpoint) with reporter.step(f"Put object in container"): - oid = put_object_to_random_node(default_wallet, file_path, cid, shell=self.shell, cluster=self.cluster) + oid = put_object_to_random_node(default_wallet, file_path, container, self.shell, self.cluster) with reporter.step(f"Check object expected copies"): - resulting_copies = get_nodes_with_object(cid, oid, shell=self.shell, nodes=self.cluster.storage_nodes) + resulting_copies = get_nodes_with_object(container, oid, self.shell, self.cluster.storage_nodes) assert len(resulting_copies) == expected_copies, f"Expected {expected_copies} copies, got {len(resulting_copies)}" with reporter.step(f"Check the object appearance"): netmap = parse_netmap_output(get_netmap_snapshot(node=resulting_copies[0], shell=self.shell)) - netmap = self.get_netmap_param(netmap) + netmap = get_netmap_param(netmap) with reporter.step(f"Check three nodes are selected from any country"): for node in resulting_copies: node_address = node.get_rpc_endpoint().split(":")[0] @@ -1348,44 +818,51 @@ class TestPolicy(ClusterTestBase): ), f"The node is selected from the wrong country or with wrong country code. Got {netmap[node_address]}" with reporter.step(f"Delete the object from the container"): - delete_object(wallet=default_wallet, cid=cid, oid=oid, shell=self.shell, endpoint=endpoint) + delete_object(default_wallet, container, oid, self.shell, rpc_endpoint) with reporter.step(f"Delete the container"): - delete_container(wallet=default_wallet, cid=cid, shell=self.shell, endpoint=endpoint) + delete_container(default_wallet, container, self.shell, rpc_endpoint, await_mode=False) @allure.title("Policy: REP 1 IN SPB REP 1 IN MSK REP 3") + @pytest.mark.parametrize( + "container_request", + [ + PUBLIC_WITH_POLICY( + "REP 1 IN SPB REP 1 IN MSK REP 3 CBF 1 SELECT 2 FROM LED AS SPB SELECT 2 FROM MOW AS MSK FILTER Location EQ 'Saint Petersburg (ex Leningrad)' AS LED FILTER Location EQ Moskva AS MOW" + ) + ], + indirect=True, + ) def test_policy_rep_1_in_spb_rep_1_in_msk_rep_3( self, - default_wallet, + default_wallet: WalletInfo, + rpc_endpoint: str, simple_object_size: ObjectSize, + container: str, + container_request: ContainerRequest, ): """ This test checks object's copies based on container's placement policy: REP 1 IN SPB REP 1 IN MSK REP 3. """ - placement_rule = "REP 1 IN SPB REP 1 IN MSK REP 3 CBF 1 SELECT 2 FROM LED AS SPB SELECT 2 FROM MOW AS MSK FILTER Location EQ 'Saint Petersburg (ex Leningrad)' AS LED FILTER Location EQ Moskva AS MOW" placement_params = {"location": ["Saint Petersburg (ex Leningrad)", "Moskva"]} file_path = generate_file(simple_object_size.value) expected_copies = 3 - endpoint = self.cluster.default_rpc_endpoint - - with reporter.step(f"Create container with policy {placement_rule}"): - cid = create_container(wallet=default_wallet, rule=placement_rule, basic_acl=PUBLIC_ACL, shell=self.shell, endpoint=endpoint) with reporter.step(f"Check container policy"): - self.validate_object_policy(default_wallet, placement_rule, cid, endpoint) + validate_object_policy(default_wallet, self.shell, container_request.policy, container, rpc_endpoint) with reporter.step(f"Put object in container"): - oid = put_object_to_random_node(default_wallet, file_path, cid, shell=self.shell, cluster=self.cluster) + oid = put_object_to_random_node(default_wallet, file_path, container, self.shell, self.cluster) with reporter.step(f"Check object expected copies"): - resulting_copies = get_nodes_with_object(cid, oid, shell=self.shell, nodes=self.cluster.storage_nodes) + resulting_copies = get_nodes_with_object(container, oid, self.shell, self.cluster.storage_nodes) assert len(resulting_copies) >= expected_copies and len(resulting_copies) <= len( self.cluster.storage_nodes ), f"Expected {expected_copies} or {expected_copies + 1} copies, got {len(resulting_copies)}" with reporter.step(f"Check the object appearance"): netmap = parse_netmap_output(get_netmap_snapshot(node=resulting_copies[0], shell=self.shell)) - netmap = self.get_netmap_param(netmap) + netmap = get_netmap_param(netmap) list_of_location = [] for node in resulting_copies: node_address = node.get_rpc_endpoint().split(":")[0] @@ -1399,27 +876,7 @@ class TestPolicy(ClusterTestBase): ), f"The node is selected from the wrong location. Got {netmap[node_address]}" with reporter.step(f"Delete the object from the container"): - delete_object(wallet=default_wallet, cid=cid, oid=oid, shell=self.shell, endpoint=endpoint) + delete_object(default_wallet, container, oid, self.shell, rpc_endpoint) with reporter.step(f"Delete the container"): - delete_container(wallet=default_wallet, cid=cid, shell=self.shell, endpoint=endpoint) - - def validate_object_policy(self, wallet: str, placement_rule: str, cid: str, endpoint: str): - got_policy = placement_policy_from_container(get_container(wallet, cid, json_mode=False, shell=self.shell, endpoint=endpoint)) - assert got_policy.replace("'", "") == placement_rule.replace( - "'", "" - ), f"Expected \n{placement_rule} and got policy \n{got_policy} are the same" - - def get_netmap_param(self, netmap_info: list[NodeNetmapInfo]) -> dict: - dict_external = dict() - for node in netmap_info: - external_adress = node.external_address[0].split("/")[2] - dict_external[external_adress] = { - "country": node.country, - "country_code": node.country_code, - "Price": node.price, - "continent": node.continent, - "un_locode": node.un_locode, - "location": node.location, - } - return dict_external + delete_container(default_wallet, container, self.shell, rpc_endpoint, await_mode=False) diff --git a/pytest_tests/testsuites/container/test_policy_with_price.py b/pytest_tests/testsuites/container/test_policy_with_price.py new file mode 100644 index 0000000..b1734c0 --- /dev/null +++ b/pytest_tests/testsuites/container/test_policy_with_price.py @@ -0,0 +1,640 @@ +import itertools + +import allure +import pytest +from frostfs_testlib import reporter +from frostfs_testlib.cli.frostfs_cli.cli import FrostfsCli +from frostfs_testlib.shell.interfaces import Shell +from frostfs_testlib.steps.cli.container import delete_container +from frostfs_testlib.steps.cli.object import delete_object, put_object_to_random_node +from frostfs_testlib.steps.node_management import get_netmap_snapshot +from frostfs_testlib.steps.storage_policy import get_nodes_with_object +from frostfs_testlib.storage.cluster import Cluster +from frostfs_testlib.storage.controllers.cluster_state_controller import ClusterStateController +from frostfs_testlib.storage.controllers.state_managers.config_state_manager import ConfigStateManager +from frostfs_testlib.storage.dataclasses.frostfs_services import StorageNode +from frostfs_testlib.storage.dataclasses.object_size import ObjectSize +from frostfs_testlib.storage.dataclasses.wallet import WalletInfo +from frostfs_testlib.testing import parallel +from frostfs_testlib.testing.cluster_test_base import ClusterTestBase +from frostfs_testlib.testing.test_control import wait_for_success +from frostfs_testlib.utils.cli_utils import parse_netmap_output +from frostfs_testlib.utils.file_utils import generate_file + +from ...helpers.container_creation import create_container_with_ape +from ...helpers.container_request import PUBLIC_WITH_POLICY, ContainerRequest +from ...helpers.policy_validation import get_netmap_param, validate_object_policy + + +@pytest.mark.weekly +@pytest.mark.policy +@pytest.mark.policy_price +class TestPolicyWithPrice(ClusterTestBase): + @wait_for_success(1200, 60, title="Wait for full field price on node", expected_result=True) + def await_for_price_attribute_on_nodes(self): + netmap = parse_netmap_output(get_netmap_snapshot(node=self.cluster.storage_nodes[0], shell=self.shell)) + netmap = get_netmap_param(netmap) + for node in self.cluster.storage_nodes: + node_address = node.get_rpc_endpoint().split(":")[0] + if netmap[node_address]["Price"] is None: + return False + return True + + @pytest.fixture(scope="module") + def fill_field_price(self, cluster: Cluster, cluster_state_controller_session: ClusterStateController): + prices = ["15", "10", "65", "55"] + + config_manager = cluster_state_controller_session.manager(ConfigStateManager) + parallel( + config_manager.set_on_node, + cluster.cluster_nodes, + StorageNode, + itertools.cycle([{"node:attribute_5": f"Price:{price}"} for price in prices]), + ) + cluster_state_controller_session.wait_after_storage_startup() + + self.tick_epoch() + self.await_for_price_attribute_on_nodes() + + yield + + cluster_state_controller_session.manager(ConfigStateManager).revert_all() + + @pytest.fixture + def container( + self, + default_wallet: WalletInfo, + frostfs_cli: FrostfsCli, + client_shell: Shell, + cluster: Cluster, + rpc_endpoint: str, + container_request: ContainerRequest, + # In these set of tests containers should be created after the fill_field_price fixture + fill_field_price, + ) -> str: + return create_container_with_ape(frostfs_cli, default_wallet, client_shell, cluster, rpc_endpoint, container_request) + + @allure.title("Policy with SELECT and FILTER results with 25% of available nodes") + @pytest.mark.parametrize( + "container_request", + [PUBLIC_WITH_POLICY("REP 1 IN Nodes25 SELECT 1 FROM LE10 AS Nodes25 FILTER Price LE 10 AS LE10")], + indirect=True, + ) + def test_policy_with_select_and_filter_results_with_25_of_available_nodes( + self, + default_wallet: WalletInfo, + rpc_endpoint: str, + simple_object_size: ObjectSize, + container: str, + container_request: ContainerRequest, + ): + """ + This test checks object's copies based on container's placement policy with SELECT and FILTER results with 25% of available nodes. + """ + placement_params = {"Price": 10} + file_path = generate_file(simple_object_size.value) + expected_copies = 1 + + with reporter.step(f"Check container policy"): + validate_object_policy(default_wallet, self.shell, container_request.policy, container, rpc_endpoint) + + with reporter.step(f"Put object in container"): + oid = put_object_to_random_node(default_wallet, file_path, container, self.shell, self.cluster) + + with reporter.step(f"Check object expected copies"): + resulting_copies = get_nodes_with_object(container, oid, shell=self.shell, nodes=self.cluster.storage_nodes) + assert len(resulting_copies) == expected_copies, f"Expected {expected_copies} copies, got {len(resulting_copies)}" + + with reporter.step(f"Check the object appearance"): + netmap = parse_netmap_output(get_netmap_snapshot(node=resulting_copies[0], shell=self.shell)) + netmap = get_netmap_param(netmap) + node_address = resulting_copies[0].get_rpc_endpoint().split(":")[0] + with reporter.step(f"Check the node is selected with price <= {placement_params['Price']}"): + assert ( + int(netmap[node_address]["Price"]) <= placement_params["Price"] + ), f"The node is selected with the wrong price. Got {netmap[node_address]}" + + with reporter.step(f"Delete the object from the container"): + delete_object(default_wallet, container, oid, self.shell, rpc_endpoint) + + with reporter.step(f"Delete the container"): + delete_container(default_wallet, container, self.shell, rpc_endpoint, await_mode=False) + + @allure.title("Policy with select and complex filter results with 25% of available nodes") + @pytest.mark.parametrize( + "container_request", + [ + PUBLIC_WITH_POLICY( + "REP 1 IN Nodes25 SELECT 1 FROM BET0AND10 AS Nodes25 FILTER Price LE 10 AS LE10 FILTER Price GT 0 AS GT0 FILTER @LE10 AND @GT0 AS BET0AND10" + ) + ], + indirect=True, + ) + def test_policy_with_select_and_complex_filter_results_with_25_of_available_nodes( + self, + default_wallet: WalletInfo, + rpc_endpoint: str, + simple_object_size: ObjectSize, + container: str, + container_request: ContainerRequest, + ): + """ + This test checks object's copies based on container's placement policy with SELECT and Complex FILTER results with 25% of available nodes. + """ + placement_params = {"Price": [10, 0]} + file_path = generate_file(simple_object_size.value) + expected_copies = 1 + + with reporter.step(f"Check container policy"): + validate_object_policy(default_wallet, self.shell, container_request.policy, container, rpc_endpoint) + + with reporter.step(f"Put object in container"): + oid = put_object_to_random_node(default_wallet, file_path, container, self.shell, self.cluster) + + with reporter.step(f"Check object expected copies"): + resulting_copies = get_nodes_with_object(container, oid, shell=self.shell, nodes=self.cluster.storage_nodes) + assert len(resulting_copies) == expected_copies, f"Expected {expected_copies} copies, got {len(resulting_copies)}" + + with reporter.step(f"Check the object appearance"): + netmap = parse_netmap_output(get_netmap_snapshot(node=resulting_copies[0], shell=self.shell)) + netmap = get_netmap_param(netmap) + with reporter.step(f"Check the node is selected with price between 1 and 10"): + for node in resulting_copies: + node_address = node.get_rpc_endpoint().split(":")[0] + assert ( + int(netmap[node_address]["Price"]) > placement_params["Price"][1] + and int(netmap[node_address]["Price"]) <= placement_params["Price"][0] + ), f"The node is selected with the wrong price. Got {netmap[node_address]}" + + with reporter.step(f"Delete the object from the container"): + delete_object(default_wallet, container, oid, self.shell, rpc_endpoint) + + with reporter.step(f"Delete the container"): + delete_container(default_wallet, container, self.shell, rpc_endpoint, await_mode=False) + + @allure.title("Policy with Multi SELECTs and FILTERs results with 25% of available nodes") + @pytest.mark.parametrize( + "container_request", + [ + PUBLIC_WITH_POLICY( + "UNIQUE REP 1 IN One REP 1 IN One CBF 1 SELECT 1 FROM MINMAX AS One FILTER Price LT 15 AS LT15 FILTER Price GT 55 AS GT55 FILTER @LT15 OR @GT55 AS MINMAX" + ) + ], + indirect=True, + ) + def test_policy_with_multi_selects_and_filters_results_with_25_of_available_nodes( + self, + default_wallet: WalletInfo, + rpc_endpoint: str, + simple_object_size: ObjectSize, + container: str, + container_request: ContainerRequest, + ): + """ + This test checks object's copies based on container's placement policy with Multi SELECTs and FILTERs results with 25% of available nodes. + """ + placement_params = {"Price": [15, 55]} + file_path = generate_file(simple_object_size.value) + expected_copies = 2 + + with reporter.step(f"Check container policy"): + validate_object_policy(default_wallet, self.shell, container_request.policy, container, rpc_endpoint) + + with reporter.step(f"Put object in container"): + oid = put_object_to_random_node(default_wallet, file_path, container, self.shell, self.cluster) + + with reporter.step(f"Check object expected copies"): + resulting_copies = get_nodes_with_object(container, oid, shell=self.shell, nodes=self.cluster.storage_nodes) + assert len(resulting_copies) == expected_copies, f"Expected {expected_copies} copies, got {len(resulting_copies)}" + + with reporter.step(f"Check the object appearance"): + netmap = parse_netmap_output(get_netmap_snapshot(node=resulting_copies[0], shell=self.shell)) + netmap = get_netmap_param(netmap) + with reporter.step(f"Check two nodes are selected with max and min prices"): + for node in resulting_copies: + node_address = node.get_rpc_endpoint().split(":")[0] + assert ( + int(netmap[node_address]["Price"]) > placement_params["Price"][1] + or int(netmap[node_address]["Price"]) < placement_params["Price"][0] + ), f"The node is selected with the wrong price. Got {netmap[node_address]}" + + with reporter.step(f"Delete the object from the container"): + delete_object(default_wallet, container, oid, self.shell, rpc_endpoint) + + with reporter.step(f"Delete the container"): + delete_container(default_wallet, container, self.shell, rpc_endpoint, await_mode=False) + + @allure.title("Policy with SELECT and FILTER results with 50% of available nodes") + @pytest.mark.parametrize( + "container_request", + [PUBLIC_WITH_POLICY("REP 2 IN HALF CBF 1 SELECT 2 FROM GT15 AS HALF FILTER Price GT 15 AS GT15")], + indirect=True, + ) + def test_policy_with_select_and_filter_results_with_50_of_available_nodes( + self, + default_wallet: WalletInfo, + rpc_endpoint: str, + simple_object_size: ObjectSize, + container: str, + container_request: ContainerRequest, + ): + """ + This test checks object's copies based on container's placement policy with SELECT and FILTER results with 50% of available nodes. + """ + placement_params = {"Price": 15} + file_path = generate_file(simple_object_size.value) + expected_copies = 2 + + with reporter.step(f"Check container policy"): + validate_object_policy(default_wallet, self.shell, container_request.policy, container, rpc_endpoint) + + with reporter.step(f"Put object in container"): + oid = put_object_to_random_node(default_wallet, file_path, container, self.shell, self.cluster) + + with reporter.step(f"Check object expected copies"): + resulting_copies = get_nodes_with_object(container, oid, shell=self.shell, nodes=self.cluster.storage_nodes) + assert len(resulting_copies) == expected_copies, f"Expected {expected_copies} copies, got {len(resulting_copies)}" + + with reporter.step(f"Check the object appearance"): + netmap = parse_netmap_output(get_netmap_snapshot(node=resulting_copies[0], shell=self.shell)) + netmap = get_netmap_param(netmap) + with reporter.step(f"Check two nodes are selected with price > {placement_params['Price']}"): + for node in resulting_copies: + node_address = node.get_rpc_endpoint().split(":")[0] + assert ( + int(netmap[node_address]["Price"]) > placement_params["Price"] + ), f"The node is selected with the wrong price. Got {netmap[node_address]}" + + with reporter.step(f"Delete the object from the container"): + delete_object(default_wallet, container, oid, self.shell, rpc_endpoint) + + with reporter.step(f"Delete the container"): + delete_container(default_wallet, container, self.shell, rpc_endpoint, await_mode=False) + + @allure.title("Policy with SELECT and Complex FILTER results with 50% of available nodes") + @pytest.mark.parametrize( + "container_request", + [ + PUBLIC_WITH_POLICY( + "REP 2 IN HALF CBF 2 SELECT 2 FROM GE15 AS HALF FILTER CountryCode NE RU AS NOTRU FILTER @NOTRU AND Price GE 15 AS GE15" + ) + ], + indirect=True, + ) + def test_policy_with_select_and_complex_filter_results_with_50_of_available_nodes( + self, + default_wallet: WalletInfo, + rpc_endpoint: str, + simple_object_size: ObjectSize, + container: str, + container_request: ContainerRequest, + ): + """ + This test checks object's copies based on container's placement policy with SELECT and Complex FILTER results with 50% of available nodes. + """ + placement_params = {"Price": 15, "country_code": "RU"} + file_path = generate_file(simple_object_size.value) + expected_copies = 2 + + with reporter.step(f"Check container policy"): + validate_object_policy(default_wallet, self.shell, container_request.policy, container, rpc_endpoint) + + with reporter.step(f"Put object in container"): + oid = put_object_to_random_node(default_wallet, file_path, container, self.shell, self.cluster) + + with reporter.step(f"Check object expected copies"): + resulting_copies = get_nodes_with_object(container, oid, shell=self.shell, nodes=self.cluster.storage_nodes) + assert len(resulting_copies) == expected_copies, f"Expected {expected_copies} copies, got {len(resulting_copies)}" + + with reporter.step(f"Check the object appearance"): + netmap = parse_netmap_output(get_netmap_snapshot(node=resulting_copies[0], shell=self.shell)) + netmap = get_netmap_param(netmap) + with reporter.step(f"Check two nodes are selected not with country code '{placement_params['country_code']}'"): + for node in resulting_copies: + node_address = node.get_rpc_endpoint().split(":")[0] + assert ( + not netmap[node_address]["country_code"] == placement_params["country_code"] + or not netmap[node_address]["country_code"] == placement_params["country_code"] + and int(netmap[node_address]["Price"]) >= placement_params["Price"] + ), f"The node is selected with the wrong price or country code. Got {netmap[node_address]}" + + with reporter.step(f"Delete the object from the container"): + delete_object(default_wallet, container, oid, self.shell, rpc_endpoint) + + with reporter.step(f"Delete the container"): + delete_container(default_wallet, container, self.shell, rpc_endpoint, await_mode=False) + + @allure.title("Policy with Multi SELECTs and FILTERs results with 50% of available nodes") + @pytest.mark.parametrize( + "container_request", + [ + PUBLIC_WITH_POLICY( + "REP 2 IN FH REP 1 IN SH CBF 2 SELECT 2 FROM LE55 AS FH SELECT 2 FROM GE15 AS SH FILTER 'UN-LOCODE' EQ 'RU LED' OR 'UN-LOCODE' EQ 'RU MOW' AS RU FILTER NOT (@RU) AS NOTRU FILTER @NOTRU AND Price GE 15 AS GE15 FILTER @RU AND Price LE 55 AS LE55" + ) + ], + indirect=True, + ) + def test_policy_with_multi_selects_and_filters_results_with_50_of_available_nodes( + self, + default_wallet: WalletInfo, + rpc_endpoint: str, + simple_object_size: ObjectSize, + container: str, + container_request: ContainerRequest, + ): + """ + This test checks object's copies based on container's placement policy with Multi SELECTs and FILTERs results with 50% of available nodes. + """ + placement_params = {"un_locode": ["RU LED", "RU MOW"], "Price": [15, 55]} + file_path = generate_file(simple_object_size.value) + expected_copies = 3 + + with reporter.step(f"Check container policy"): + validate_object_policy(default_wallet, self.shell, container_request.policy, container, rpc_endpoint) + + with reporter.step(f"Put object in container"): + oid = put_object_to_random_node(default_wallet, file_path, container, self.shell, self.cluster) + + with reporter.step(f"Check object expected copies"): + resulting_copies = get_nodes_with_object(container, oid, shell=self.shell, nodes=self.cluster.storage_nodes) + assert len(resulting_copies) == expected_copies, f"Expected {expected_copies} copies, got {len(resulting_copies)}" + + with reporter.step(f"Check the object appearance"): + netmap = parse_netmap_output(get_netmap_snapshot(node=resulting_copies[0], shell=self.shell)) + netmap = get_netmap_param(netmap) + with reporter.step(f"Check all nodes are selected"): + for node in resulting_copies: + node_address = node.get_rpc_endpoint().split(":")[0] + assert ( + netmap[node_address]["un_locode"] in placement_params["un_locode"] + or not netmap[node_address]["un_locode"] == placement_params["un_locode"][1] + or ( + not netmap[node_address]["un_locode"] == placement_params["un_locode"][1] + and int(netmap[node_address]["Price"]) >= placement_params["Price"][0] + ) + or ( + netmap[node_address]["un_locode"] == placement_params["un_locode"][1] + and int(netmap[node_address]["Price"]) <= placement_params["Price"][1] + ) + ), f"The node is selected with the wrong price or un_locode. Expected {placement_params} and got {netmap[node_address]}" + + with reporter.step(f"Delete the object from the container"): + delete_object(default_wallet, container, oid, self.shell, rpc_endpoint) + + with reporter.step(f"Delete the container"): + delete_container(default_wallet, container, self.shell, rpc_endpoint, await_mode=False) + + @allure.title("Policy with SELECT and FILTER results with 75% of available nodes") + @pytest.mark.parametrize( + "container_request", + [PUBLIC_WITH_POLICY("REP 2 IN NODES75 SELECT 2 FROM LT65 AS NODES75 FILTER Price LT 65 AS LT65")], + indirect=True, + ) + def test_policy_with_select_and_filter_results_with_75_of_available_nodes( + self, + default_wallet: WalletInfo, + rpc_endpoint: str, + simple_object_size: ObjectSize, + container: str, + container_request: ContainerRequest, + ): + """ + This test checks object's copies based on container's placement policy with SELECT and FILTER results with 75% of available nodes. + """ + placement_params = {"Price": 65} + file_path = generate_file(simple_object_size.value) + expected_copies = 2 + + with reporter.step(f"Check container policy"): + validate_object_policy(default_wallet, self.shell, container_request.policy, container, rpc_endpoint) + + with reporter.step(f"Put object in container"): + oid = put_object_to_random_node(default_wallet, file_path, container, self.shell, self.cluster) + + with reporter.step(f"Check object expected copies"): + resulting_copies = get_nodes_with_object(container, oid, shell=self.shell, nodes=self.cluster.storage_nodes) + assert len(resulting_copies) == expected_copies, f"Expected {expected_copies} copies, got {len(resulting_copies)}" + + with reporter.step(f"Check the object appearance"): + netmap = parse_netmap_output(get_netmap_snapshot(node=resulting_copies[0], shell=self.shell)) + netmap = get_netmap_param(netmap) + with reporter.step(f"Check two nodes are selected with price < {placement_params['Price']}"): + for node in resulting_copies: + node_address = node.get_rpc_endpoint().split(":")[0] + assert ( + int(netmap[node_address]["Price"]) < placement_params["Price"] + ), f"The node is selected with the wrong price. Got {netmap[node_address]}" + + with reporter.step(f"Delete the object from the container"): + delete_object(default_wallet, container, oid, self.shell, rpc_endpoint) + + with reporter.step(f"Delete the container"): + delete_container(default_wallet, container, self.shell, rpc_endpoint, await_mode=False) + + @allure.title("Policy with SELECT and Complex FILTER results with 75% of available nodes") + @pytest.mark.parametrize( + "container_request", + [ + PUBLIC_WITH_POLICY( + "REP 2 IN NODES75 SELECT 2 FROM LT65 AS NODES75 FILTER Continent NE America AS NOAM FILTER @NOAM AND Price LT 65 AS LT65" + ) + ], + indirect=True, + ) + def test_policy_with_select_and_complex_filter_results_with_75_of_available_nodes( + self, + default_wallet: WalletInfo, + rpc_endpoint: str, + simple_object_size: ObjectSize, + container: str, + container_request: ContainerRequest, + ): + """ + This test checks object's copies based on container's placement policy with SELECT and Complex FILTER results with 75% of available nodes. + """ + placement_params = {"Price": 65, "continent": "America"} + file_path = generate_file(simple_object_size.value) + expected_copies = 2 + + with reporter.step(f"Check container policy"): + validate_object_policy(default_wallet, self.shell, container_request.policy, container, rpc_endpoint) + + with reporter.step(f"Put object in container"): + oid = put_object_to_random_node(default_wallet, file_path, container, self.shell, self.cluster) + + with reporter.step(f"Check object expected copies"): + resulting_copies = get_nodes_with_object(container, oid, shell=self.shell, nodes=self.cluster.storage_nodes) + assert len(resulting_copies) == expected_copies, f"Expected {expected_copies} copies, got {len(resulting_copies)}" + + with reporter.step(f"Check the object appearance"): + netmap = parse_netmap_output(get_netmap_snapshot(node=resulting_copies[0], shell=self.shell)) + netmap = get_netmap_param(netmap) + with reporter.step(f"Check three nodes are selected not from {placement_params['continent']}"): + for node in resulting_copies: + node_address = node.get_rpc_endpoint().split(":")[0] + assert ( + int(netmap[node_address]["Price"]) < placement_params["Price"] + and not netmap[node_address]["continent"] == placement_params["continent"] + ) or ( + not netmap[node_address]["continent"] == placement_params["continent"] + ), f"The node is selected with the wrong price or continent. Got {netmap[node_address]}" + + with reporter.step(f"Delete the object from the container"): + delete_object(default_wallet, container, oid, self.shell, rpc_endpoint) + + with reporter.step(f"Delete the container"): + delete_container(default_wallet, container, self.shell, rpc_endpoint, await_mode=False) + + @allure.title("Policy with Multi SELECTs and FILTERs results with 75% of available nodes") + @pytest.mark.parametrize( + "container_request", + [ + PUBLIC_WITH_POLICY( + "REP 3 IN EXPNSV REP 3 IN CHEAP SELECT 3 FROM GT10 AS EXPNSV SELECT 3 FROM LT65 AS CHEAP FILTER NOT (Continent EQ America) AS NOAM FILTER @NOAM AND Price LT 65 AS LT65 FILTER @NOAM AND Price GT 10 AS GT10" + ) + ], + indirect=True, + ) + def test_policy_with_multi_selects_and_filters_results_with_75_of_available_nodes( + self, + default_wallet: WalletInfo, + rpc_endpoint: str, + simple_object_size: ObjectSize, + container: str, + container_request: ContainerRequest, + ): + """ + This test checks object's copies based on container's placement policy with Multi SELECTs and FILTERs results with 75% of available nodes. + """ + placement_params = {"Price": [65, 10], "continent": "America"} + file_path = generate_file(simple_object_size.value) + expected_copies = 4 + + with reporter.step(f"Check container policy"): + validate_object_policy(default_wallet, self.shell, container_request.policy, container, rpc_endpoint) + + with reporter.step(f"Put object in container"): + oid = put_object_to_random_node(default_wallet, file_path, container, self.shell, self.cluster) + + with reporter.step(f"Check object expected copies"): + resulting_copies = get_nodes_with_object(container, oid, shell=self.shell, nodes=self.cluster.storage_nodes) + assert len(resulting_copies) == expected_copies, f"Expected {expected_copies} copies, got {len(resulting_copies)}" + + with reporter.step(f"Check the object appearance"): + netmap = parse_netmap_output(get_netmap_snapshot(node=resulting_copies[0], shell=self.shell)) + netmap = get_netmap_param(netmap) + with reporter.step(f"Check all nodes are selected"): + for node in resulting_copies: + node_address = node.get_rpc_endpoint().split(":")[0] + assert ( + ( + int(netmap[node_address]["Price"]) > placement_params["Price"][1] + and not netmap[node_address]["continent"] == placement_params["continent"] + ) + or ( + int(netmap[node_address]["Price"]) < placement_params["Price"][0] + and not netmap[node_address]["continent"] == placement_params["continent"] + ) + or not (netmap[node_address]["continent"] == placement_params["continent"]) + ), f"The node is selected with the wrong price or continent. Got {netmap[node_address]}" + + with reporter.step(f"Delete the object from the container"): + delete_object(default_wallet, container, oid, self.shell, rpc_endpoint) + + with reporter.step(f"Delete the container"): + delete_container(default_wallet, container, self.shell, rpc_endpoint, await_mode=False) + + @allure.title("Policy with SELECT and FILTER results with 100% of available nodes") + @pytest.mark.parametrize( + "container_request", [PUBLIC_WITH_POLICY("REP 1 IN All SELECT 4 FROM AllNodes AS All FILTER Price GE 0 AS AllNodes")], indirect=True + ) + def test_policy_with_select_and_filter_results_with_100_of_available_nodes( + self, + default_wallet: WalletInfo, + rpc_endpoint: str, + simple_object_size: ObjectSize, + container: str, + container_request: ContainerRequest, + ): + """ + This test checks object's copies based on container's placement policy with SELECT and FILTER results with 100% of available nodes. + """ + placement_params = {"Price": 0} + file_path = generate_file(simple_object_size.value) + expected_copies = 1 + + with reporter.step(f"Check container policy"): + validate_object_policy(default_wallet, self.shell, container_request.policy, container, rpc_endpoint) + + with reporter.step(f"Put object in container"): + oid = put_object_to_random_node(default_wallet, file_path, container, self.shell, self.cluster) + + with reporter.step(f"Check object expected copies"): + resulting_copies = get_nodes_with_object(container, oid, shell=self.shell, nodes=self.cluster.storage_nodes) + assert len(resulting_copies) == expected_copies, f"Expected {expected_copies} copies, got {len(resulting_copies)}" + + with reporter.step(f"Check the object appearance"): + netmap = parse_netmap_output(get_netmap_snapshot(node=resulting_copies[0], shell=self.shell)) + netmap = get_netmap_param(netmap) + node_address = resulting_copies[0].get_rpc_endpoint().split(":")[0] + with reporter.step(f"Check the node is selected with price >= {placement_params['Price']}"): + assert ( + int(netmap[node_address]["Price"]) >= placement_params["Price"] + ), f"The node is selected with the wrong price. Got {netmap[node_address]}" + + with reporter.step(f"Delete the object from the container"): + delete_object(default_wallet, container, oid, self.shell, rpc_endpoint) + + with reporter.step(f"Delete the container"): + delete_container(default_wallet, container, self.shell, rpc_endpoint, await_mode=False) + + @allure.title("Policy with Multi SELECTs and FILTERs results with 100% of available nodes") + @pytest.mark.parametrize( + "container_request", + [ + PUBLIC_WITH_POLICY( + "REP 4 IN AllOne REP 4 IN AllTwo CBF 4 SELECT 2 FROM GEZero AS AllOne SELECT 2 FROM AllCountries AS AllTwo FILTER Country EQ Russia OR Country EQ Sweden OR Country EQ Finland AS AllCountries FILTER Price GE 0 AS GEZero" + ) + ], + indirect=True, + ) + def test_policy_with_multi_selects_and_filters_results_with_100_of_available_nodes( + self, + default_wallet: WalletInfo, + rpc_endpoint: str, + simple_object_size: ObjectSize, + container: str, + container_request: ContainerRequest, + ): + """ + This test checks object's copies based on container's placement policy with Multi SELECTs and FILTERs results with 100% of available nodes. + """ + placement_params = {"country": ["Russia", "Sweden", "Finland"], "Price": 0} + file_path = generate_file(simple_object_size.value) + expected_copies = 4 + + with reporter.step(f"Check container policy"): + validate_object_policy(default_wallet, self.shell, container_request.policy, container, rpc_endpoint) + + with reporter.step(f"Put object in container"): + oid = put_object_to_random_node(default_wallet, file_path, container, self.shell, self.cluster) + + with reporter.step(f"Check object expected copies"): + resulting_copies = get_nodes_with_object(container, oid, shell=self.shell, nodes=self.cluster.storage_nodes) + assert len(resulting_copies) == expected_copies, f"Expected {expected_copies} copies, got {len(resulting_copies)}" + + with reporter.step(f"Check the object appearance"): + netmap = parse_netmap_output(get_netmap_snapshot(node=resulting_copies[0], shell=self.shell)) + netmap = get_netmap_param(netmap) + with reporter.step(f"Check all node are selected"): + for node in resulting_copies: + node_address = node.get_rpc_endpoint().split(":")[0] + assert (netmap[node_address]["country"] in placement_params["country"]) or ( + int(netmap[node_address]["Price"]) >= placement_params["Price"] + ), f"The node is selected from the wrong country or with wrong price. Got {netmap[node_address]}" + + with reporter.step(f"Delete the object from the container"): + delete_object(default_wallet, container, oid, self.shell, rpc_endpoint) + + with reporter.step(f"Delete the container"): + delete_container(default_wallet, container, self.shell, rpc_endpoint, await_mode=False)