frostfs-testcases/pytest_tests/testsuites/container/test_policy.py

1550 lines
81 KiB
Python

import allure
import pytest
from frostfs_testlib import reporter
from frostfs_testlib.resources.wellknown_acl import PUBLIC_ACL
from frostfs_testlib.steps.cli.container import create_container, delete_container, get_container
from frostfs_testlib.steps.cli.object import delete_object, put_object_to_random_node
from frostfs_testlib.steps.node_management import get_netmap_snapshot
from frostfs_testlib.steps.storage_policy import get_nodes_with_object
from frostfs_testlib.storage.controllers.cluster_state_controller import ClusterStateController
from frostfs_testlib.storage.controllers.state_managers.config_state_manager import ConfigStateManager
from frostfs_testlib.storage.dataclasses.frostfs_services import StorageNode
from frostfs_testlib.storage.dataclasses.object_size import ObjectSize
from frostfs_testlib.storage.dataclasses.storage_object_info import NodeNetmapInfo
from frostfs_testlib.testing.cluster_test_base import ClusterTestBase
from frostfs_testlib.testing.test_control import wait_for_success
from frostfs_testlib.utils.cli_utils import parse_netmap_output
from frostfs_testlib.utils.file_utils import generate_file
from pytest_tests.helpers.utility import placement_policy_from_container
from pytest_tests.resources.policy_error_patterns import (
NOT_ENOUGH_TO_SELECT,
NOT_FOUND_FILTER,
NOT_FOUND_SELECTOR,
NOT_PARSE_POLICY,
)
@pytest.mark.container
@pytest.mark.policy
class TestPolicy(ClusterTestBase):
@wait_for_success(1200, 60, title="Wait for full field price on node", expected_result=True)
def await_for_price_attribute_on_nodes(self):
netmap = parse_netmap_output(get_netmap_snapshot(node=self.cluster.storage_nodes[0], shell=self.shell))
netmap = self.get_netmap_param(netmap)
for node in self.cluster.storage_nodes:
node_address = node.get_rpc_endpoint().split(":")[0]
if netmap[node_address]["Price"] is None:
return False
return True
@pytest.fixture(scope="module")
def fill_field_price(self, cluster_state_controller: ClusterStateController):
prices = ["15", "10", "65", "55"]
config_manager = cluster_state_controller.manager(ConfigStateManager)
for i in zip(self.cluster.cluster_nodes, prices):
config_manager.set_on_node(i[0], StorageNode, {"node:attribute_5": f"Price:{i[1]}"})
self.tick_epoch()
self.await_for_price_attribute_on_nodes()
yield
cluster_state_controller.manager(ConfigStateManager).revert_all()
@allure.title("[NEGATIVE] Placement policy: Can't parse placement policy")
def test_placement_policy_negative(self, default_wallet):
"""
Negative test for placement policy: Can't parse placement policy.
"""
placement_rule = "REP 1 IN SPB REP 1 in MSK CBF 1 SELECT 1 FROM SPBRU AS SPB SELECT 1 FROM MSKRU AS MSK FILTER SubDivCode EQ SPE AS SPBRU FILTER NOT (Country EQ Sweden OR CountryCode EQ FI) AND Location EQ Moskva AS MSKRU"
endpoint = self.cluster.default_rpc_endpoint
with reporter.step(f"Create container with policy {placement_rule}"):
with pytest.raises(Exception, match=NOT_PARSE_POLICY):
cid = create_container(
wallet=default_wallet,
rule=placement_rule,
basic_acl=PUBLIC_ACL,
shell=self.shell,
endpoint=endpoint,
)
@allure.title("[NEGATIVE] Placement policy: Not enough nodes to SELECT")
def test_placement_policy_negative_not_enough_nodes_to_select(self, default_wallet):
"""
Negative test for placement policy: Not enough nodes to SELECT.
"""
placement_rule = "REP 2 IN RU SELECT 2 FROM RUS AS RU FILTER Country EQ Russia AND SubDivCode EQ SPE AS RUS"
endpoint = self.cluster.default_rpc_endpoint
with reporter.step(f"Create container with policy {placement_rule}"):
with pytest.raises(Exception, match=NOT_ENOUGH_TO_SELECT):
cid = create_container(
wallet=default_wallet,
rule=placement_rule,
basic_acl=PUBLIC_ACL,
shell=self.shell,
endpoint=endpoint,
)
@allure.title("[NEGATIVE] Placement policy: Filter not found")
def test_placement_policy_negative_not_enough_nodes_to_filter(self, default_wallet):
"""
Negative test for placement policy: Filter not found.
"""
placement_rule = "REP 2 IN HALF CBF 1 SELECT 2 FROM GT15 AS HALF FILTER @NOTRU AND Price GT 15 AS GT15 FILTER CountryCode NE RU AS NOTRU"
endpoint = self.cluster.default_rpc_endpoint
with reporter.step(f"Create container with policy {placement_rule}"):
with pytest.raises(Exception, match=NOT_FOUND_FILTER):
cid = create_container(
wallet=default_wallet,
rule=placement_rule,
basic_acl=PUBLIC_ACL,
shell=self.shell,
endpoint=endpoint,
)
@allure.title("[NEGATIVE] Placement policy: SELECTOR not found")
def test_placement_policy_negative_not_enough_nodes_to_selector(self, default_wallet):
"""
Negative test for placement policy: Filter not found.
"""
placement_rule = "REP 2 IN HALFIC CBF 1 SELECT 2 FROM GT15 AS HALF FILTER Price GT 15 AS GT15"
endpoint = self.cluster.default_rpc_endpoint
with reporter.step(f"Create container with policy {placement_rule}"):
with pytest.raises(Exception, match=NOT_FOUND_SELECTOR):
cid = create_container(
wallet=default_wallet,
rule=placement_rule,
basic_acl=PUBLIC_ACL,
shell=self.shell,
endpoint=endpoint,
)
@allure.title("Simple policy results with one node")
def test_simple_policy_results_with_one_node(
self,
default_wallet,
simple_object_size: ObjectSize,
):
"""
This test checks object's copies based on container's placement simple policy results with one node.
"""
placement_rule = "REP 1 REP 1 CBF 1"
file_path = generate_file(simple_object_size.value)
expected_copies = 1
endpoint = self.cluster.default_rpc_endpoint
with reporter.step(f"Create container with policy {placement_rule}"):
cid = create_container(
wallet=default_wallet, rule=placement_rule, basic_acl=PUBLIC_ACL, shell=self.shell, endpoint=endpoint
)
with reporter.step(f"Check container policy"):
self.validate_object_policy(default_wallet, placement_rule, cid, endpoint)
with reporter.step(f"Put object in container"):
oid = put_object_to_random_node(default_wallet, file_path, cid, shell=self.shell, cluster=self.cluster)
with reporter.step(f"Check object expected copies"):
resulting_copies = get_nodes_with_object(cid, oid, shell=self.shell, nodes=self.cluster.storage_nodes)
assert (
len(resulting_copies) == expected_copies
), f"Expected {expected_copies} copies, got {len(resulting_copies)}"
with reporter.step(f"Delete the object from the container"):
delete_object(wallet=default_wallet, cid=cid, oid=oid, shell=self.shell, endpoint=endpoint)
with reporter.step(f"Delete the container"):
delete_container(wallet=default_wallet, cid=cid, shell=self.shell, endpoint=endpoint)
@allure.title("Policy with SELECT results with one node")
def test_policy_with_select_results_with_one_node(
self,
default_wallet,
simple_object_size: ObjectSize,
):
"""
This test checks object's copies based on container's placement policy with SELECT results with one node.
"""
placement_rule = "REP 1 IN AnyNode REP 1 IN AnyNode CBF 1 SELECT 1 FROM * AS AnyNode"
file_path = generate_file(simple_object_size.value)
expected_copies = 1
endpoint = self.cluster.default_rpc_endpoint
with reporter.step(f"Create container with policy {placement_rule}"):
cid = create_container(
wallet=default_wallet, rule=placement_rule, basic_acl=PUBLIC_ACL, shell=self.shell, endpoint=endpoint
)
with reporter.step(f"Check container policy"):
self.validate_object_policy(default_wallet, placement_rule, cid, endpoint)
with reporter.step(f"Put object in container"):
oid = put_object_to_random_node(default_wallet, file_path, cid, shell=self.shell, cluster=self.cluster)
with reporter.step(f"Check object expected copies"):
resulting_copies = get_nodes_with_object(cid, oid, shell=self.shell, nodes=self.cluster.storage_nodes)
assert (
len(resulting_copies) == expected_copies
), f"Expected {expected_copies} copies, got {len(resulting_copies)}"
with reporter.step(f"Delete the object from the container"):
delete_object(wallet=default_wallet, cid=cid, oid=oid, shell=self.shell, endpoint=endpoint)
with reporter.step(f"Delete the container"):
delete_container(wallet=default_wallet, cid=cid, shell=self.shell, endpoint=endpoint)
@allure.title("Policy with SELECT and FILTER results with one node")
def test_policy_with_select_and_filter_results_with_one_node(
self,
default_wallet,
simple_object_size: ObjectSize,
):
"""
This test checks object's copies based on container's placement policy with SELECT and FILTER results with one node.
"""
placement_rule = "REP 1 IN MyRussianNodes REP 1 IN MyRussianNodes CBF 1 SELECT 1 FROM RussianNodes AS MyRussianNodes FILTER Country EQ 'Russia' AS RussianNodes"
file_path = generate_file(simple_object_size.value)
expected_copies = 1
endpoint = self.cluster.default_rpc_endpoint
placement_params = {"country": "Russia"}
with reporter.step(f"Create container with policy {placement_rule}"):
cid = create_container(
wallet=default_wallet, rule=placement_rule, basic_acl=PUBLIC_ACL, shell=self.shell, endpoint=endpoint
)
with reporter.step(f"Check container policy"):
self.validate_object_policy(default_wallet, placement_rule, cid, endpoint)
with reporter.step(f"Put object in container"):
oid = put_object_to_random_node(default_wallet, file_path, cid, shell=self.shell, cluster=self.cluster)
with reporter.step(f"Check object expected copies"):
resulting_copies = get_nodes_with_object(cid, oid, shell=self.shell, nodes=self.cluster.storage_nodes)
assert (
len(resulting_copies) == expected_copies
), f"Expected {expected_copies} copies, got {len(resulting_copies)}"
with reporter.step(f"Check the object appearance"):
netmap = parse_netmap_output(get_netmap_snapshot(node=resulting_copies[0], shell=self.shell))
netmap = self.get_netmap_param(netmap)
node_address = resulting_copies[0].get_rpc_endpoint().split(":")[0]
with reporter.step(f"Check the node is selected from {placement_params['country']}"):
assert (
placement_params["country"] == netmap[node_address]["country"]
), f"The node is selected from the wrong country. Got {netmap[node_address]}"
with reporter.step(f"Delete the object from the container"):
delete_object(wallet=default_wallet, cid=cid, oid=oid, shell=self.shell, endpoint=endpoint)
with reporter.step(f"Delete the container"):
delete_container(wallet=default_wallet, cid=cid, shell=self.shell, endpoint=endpoint)
@allure.title("Policy with SELECT and Complex FILTER results with one node")
def test_policy_with_select_and_complex_filter_results_with_one_nodes(
self,
default_wallet,
simple_object_size: ObjectSize,
):
"""
This test checks object's copies based on container's placement policy with SELECT and Complex FILTER results with one node.
"""
placement_rule = "REP 1 IN RUS REP 1 IN RUS CBF 1 SELECT 1 FROM RU AS RUS FILTER Country NE Sweden AS NotSE FILTER @NotSE AND NOT (CountryCode EQ FI) AND Country EQ 'Russia' AS RU"
placement_params = {"country": ["Russia", "Sweden"], "country_code": "FI"}
file_path = generate_file(simple_object_size.value)
expected_copies = 1
endpoint = self.cluster.default_rpc_endpoint
with reporter.step(f"Create container with policy {placement_rule}"):
cid = create_container(
wallet=default_wallet, rule=placement_rule, basic_acl=PUBLIC_ACL, shell=self.shell, endpoint=endpoint
)
with reporter.step(f"Check container policy"):
self.validate_object_policy(default_wallet, placement_rule, cid, endpoint)
with reporter.step(f"Put object in container"):
oid = put_object_to_random_node(default_wallet, file_path, cid, shell=self.shell, cluster=self.cluster)
with reporter.step(f"Check object expected copies"):
resulting_copies = get_nodes_with_object(cid, oid, shell=self.shell, nodes=self.cluster.storage_nodes)
assert (
len(resulting_copies) == expected_copies
), f"Expected {expected_copies} copies, got {len(resulting_copies)}"
with reporter.step(f"Check the object appearance"):
netmap = parse_netmap_output(get_netmap_snapshot(node=resulting_copies[0], shell=self.shell))
netmap = self.get_netmap_param(netmap)
node_address = resulting_copies[0].get_rpc_endpoint().split(":")[0]
with reporter.step(f"Check the node is selected from {placement_params['country'][0]}"):
assert (
not (placement_params["country"][1] == netmap[node_address]["country"])
or not (placement_params["country"][1] == netmap[node_address]["country"])
and not (placement_params["country_code"] == netmap[node_address]["country_code"])
and (placement_params["country"][0] == netmap[node_address]["country"])
), f"The node is selected from the wrong country or country code. Got {netmap[node_address]}"
with reporter.step(f"Delete the object from the container"):
delete_object(wallet=default_wallet, cid=cid, oid=oid, shell=self.shell, endpoint=endpoint)
with reporter.step(f"Delete the container"):
delete_container(wallet=default_wallet, cid=cid, shell=self.shell, endpoint=endpoint)
@allure.title("Policy with Multi SELECTs and FILTERs results with one node")
def test_policy_with_multi_selects_and_filters_results_with_one_node(
self,
default_wallet,
simple_object_size: ObjectSize,
):
"""
This test checks object's copies based on container's placement policy with Multi SELECTs and FILTERs results with one nodes.
"""
placement_rule = "REP 1 IN RU REP 1 IN EU REP 1 IN RU CBF 1 SELECT 1 FROM RUS AS RU SELECT 1 FROM EUR AS EU FILTER Country EQ Russia AS RUS FILTER NOT (@RUS) AND Country EQ Sweden OR CountryCode EQ FI AS EUR"
placement_params = {"country": ["Sweden", "Russia"], "country_code": "FI"}
file_path = generate_file(simple_object_size.value)
expected_copies = 2
endpoint = self.cluster.default_rpc_endpoint
with reporter.step(f"Create container with policy {placement_rule}"):
cid = create_container(
wallet=default_wallet, rule=placement_rule, basic_acl=PUBLIC_ACL, shell=self.shell, endpoint=endpoint
)
with reporter.step(f"Check container policy"):
self.validate_object_policy(default_wallet, placement_rule, cid, endpoint)
with reporter.step(f"Put object in container"):
oid = put_object_to_random_node(default_wallet, file_path, cid, shell=self.shell, cluster=self.cluster)
with reporter.step(f"Check object expected copies"):
resulting_copies = get_nodes_with_object(cid, oid, shell=self.shell, nodes=self.cluster.storage_nodes)
assert (
len(resulting_copies) == expected_copies
), f"Expected {expected_copies} copies, got {len(resulting_copies)}"
with reporter.step(f"Check the object appearance"):
netmap = parse_netmap_output(get_netmap_snapshot(node=resulting_copies[0], shell=self.shell))
netmap = self.get_netmap_param(netmap)
with reporter.step(f"Check two nodes are selected from any country"):
for node in resulting_copies:
node_address = resulting_copies[0].get_rpc_endpoint().split(":")[0]
assert (placement_params["country"][1] == netmap[node_address]["country"]) or (
not (placement_params["country"][1] == netmap[node_address]["country"])
and (placement_params["country"][0] == netmap[node_address]["country"])
or (placement_params["country_code"] == netmap[node_address]["country_code"])
), f"The node is selected from the wrong country or country code. Got {netmap[node_address]}"
with reporter.step(f"Delete the object from the container"):
delete_object(wallet=default_wallet, cid=cid, oid=oid, shell=self.shell, endpoint=endpoint)
with reporter.step(f"Delete the container"):
delete_container(wallet=default_wallet, cid=cid, shell=self.shell, endpoint=endpoint)
@allure.title("Policy with SELECT and FILTER results with UNIQUE nodes")
def test_policy_with_select_and_filter_results_with_unique_nodes(
self,
default_wallet,
simple_object_size: ObjectSize,
):
"""
This test checks object's copies based on container's placement policy with SELECT and FILTER results with UNIQUE nodes.
"""
placement_rule = "UNIQUE REP 1 IN MyRussianNodes REP 1 IN MyRussianNodes CBF 1 SELECT 1 FROM RussianNodes AS MyRussianNodes FILTER Country EQ 'Russia' AS RussianNodes"
placement_params = {"country": "Russia"}
file_path = generate_file(simple_object_size.value)
expected_copies = 2
endpoint = self.cluster.default_rpc_endpoint
with reporter.step(f"Create container with policy {placement_rule}"):
cid = create_container(
wallet=default_wallet, rule=placement_rule, basic_acl=PUBLIC_ACL, shell=self.shell, endpoint=endpoint
)
with reporter.step(f"Check container policy"):
self.validate_object_policy(default_wallet, placement_rule, cid, endpoint)
with reporter.step(f"Put object in container"):
oid = put_object_to_random_node(default_wallet, file_path, cid, shell=self.shell, cluster=self.cluster)
with reporter.step(f"Check object expected copies"):
resulting_copies = get_nodes_with_object(cid, oid, shell=self.shell, nodes=self.cluster.storage_nodes)
assert (
len(resulting_copies) == expected_copies
), f"Expected {expected_copies} copies, got {len(resulting_copies)}"
with reporter.step(f"Check the object appearance"):
netmap = parse_netmap_output(get_netmap_snapshot(node=resulting_copies[0], shell=self.shell))
netmap = self.get_netmap_param(netmap)
with reporter.step(f"Check two nodes are selected from {placement_params['country']}"):
for node in resulting_copies:
node_address = node.get_rpc_endpoint().split(":")[0]
assert (
placement_params["country"] == netmap[node_address]["country"]
), f"The node is selected from the wrong country. Got {netmap[node_address]['country']}"
with reporter.step(f"Delete the object from the container"):
delete_object(wallet=default_wallet, cid=cid, oid=oid, shell=self.shell, endpoint=endpoint)
with reporter.step(f"Delete the container"):
delete_container(wallet=default_wallet, cid=cid, shell=self.shell, endpoint=endpoint)
@allure.title("Simple policy results with 25% of available nodes")
def test_simple_policy_results_with_25_of_available_nodes(
self,
default_wallet,
simple_object_size: ObjectSize,
):
"""
This test checks object's copies based on container's placement policy results with 25% of available nodes.
"""
placement_rule = "REP 1 CBF 1"
file_path = generate_file(simple_object_size.value)
expected_copies = 1
endpoint = self.cluster.default_rpc_endpoint
with reporter.step(f"Create container with policy {placement_rule}"):
cid = create_container(
wallet=default_wallet, rule=placement_rule, basic_acl=PUBLIC_ACL, shell=self.shell, endpoint=endpoint
)
with reporter.step(f"Check container policy"):
self.validate_object_policy(default_wallet, placement_rule, cid, endpoint)
with reporter.step(f"Put object in container"):
oid = put_object_to_random_node(default_wallet, file_path, cid, shell=self.shell, cluster=self.cluster)
with reporter.step(f"Check object expected copies"):
resulting_copies = get_nodes_with_object(cid, oid, shell=self.shell, nodes=self.cluster.storage_nodes)
assert (
len(resulting_copies) == expected_copies
), f"Expected {expected_copies} copies, got {len(resulting_copies)}"
with reporter.step(f"Delete the object from the container"):
delete_object(wallet=default_wallet, cid=cid, oid=oid, shell=self.shell, endpoint=endpoint)
with reporter.step(f"Delete the container"):
delete_container(wallet=default_wallet, cid=cid, shell=self.shell, endpoint=endpoint)
@allure.title("Policy with SELECT results with 25% of available nodes")
def test_policy_with_select_results_with_25_of_available_nodes(
self,
default_wallet,
simple_object_size: ObjectSize,
):
"""
This test checks object's copies based on container's placement policy with SELECT results with 25% of available nodes.
"""
placement_rule = "REP 1 IN One CBF 1 SELECT 1 FROM * AS One"
file_path = generate_file(simple_object_size.value)
expected_copies = 1
endpoint = self.cluster.default_rpc_endpoint
with reporter.step(f"Create container with policy {placement_rule}"):
cid = create_container(
wallet=default_wallet, rule=placement_rule, basic_acl=PUBLIC_ACL, shell=self.shell, endpoint=endpoint
)
with reporter.step(f"Check container policy"):
self.validate_object_policy(default_wallet, placement_rule, cid, endpoint)
with reporter.step(f"Put object in container"):
oid = put_object_to_random_node(default_wallet, file_path, cid, shell=self.shell, cluster=self.cluster)
with reporter.step(f"Check object expected copies"):
resulting_copies = get_nodes_with_object(cid, oid, shell=self.shell, nodes=self.cluster.storage_nodes)
assert (
len(resulting_copies) == expected_copies
), f"Expected {expected_copies} copies, got {len(resulting_copies)}"
with reporter.step(f"Delete the object from the container"):
delete_object(wallet=default_wallet, cid=cid, oid=oid, shell=self.shell, endpoint=endpoint)
with reporter.step(f"Delete the container"):
delete_container(wallet=default_wallet, cid=cid, shell=self.shell, endpoint=endpoint)
@allure.title("Policy with SELECT and FILTER results with 25% of available nodes")
def test_policy_with_select_and_filter_results_with_25_of_available_nodes(
self,
default_wallet,
simple_object_size: ObjectSize,
fill_field_price,
):
"""
This test checks object's copies based on container's placement policy with SELECT and FILTER results with 25% of available nodes.
"""
placement_rule = "REP 1 IN Nodes25 SELECT 1 FROM LE10 AS Nodes25 FILTER Price LE 10 AS LE10"
placement_params = {"Price": 10}
file_path = generate_file(simple_object_size.value)
expected_copies = 1
endpoint = self.cluster.default_rpc_endpoint
with reporter.step(f"Create container with policy {placement_rule}"):
cid = create_container(
wallet=default_wallet, rule=placement_rule, basic_acl=PUBLIC_ACL, shell=self.shell, endpoint=endpoint
)
with reporter.step(f"Check container policy"):
self.validate_object_policy(default_wallet, placement_rule, cid, endpoint)
with reporter.step(f"Put object in container"):
oid = put_object_to_random_node(default_wallet, file_path, cid, shell=self.shell, cluster=self.cluster)
with reporter.step(f"Check object expected copies"):
resulting_copies = get_nodes_with_object(cid, oid, shell=self.shell, nodes=self.cluster.storage_nodes)
assert (
len(resulting_copies) == expected_copies
), f"Expected {expected_copies} copies, got {len(resulting_copies)}"
with reporter.step(f"Check the object appearance"):
netmap = parse_netmap_output(get_netmap_snapshot(node=resulting_copies[0], shell=self.shell))
netmap = self.get_netmap_param(netmap)
node_address = resulting_copies[0].get_rpc_endpoint().split(":")[0]
with reporter.step(f"Check the node is selected with price <= {placement_params['Price']}"):
assert (
int(netmap[node_address]["Price"]) <= placement_params["Price"]
), f"The node is selected with the wrong price. Got {netmap[node_address]}"
with reporter.step(f"Delete the object from the container"):
delete_object(wallet=default_wallet, cid=cid, oid=oid, shell=self.shell, endpoint=endpoint)
with reporter.step(f"Delete the container"):
delete_container(wallet=default_wallet, cid=cid, shell=self.shell, endpoint=endpoint)
@allure.title("Policy with select and complex filter results with 25% of available nodes")
def test_policy_with_select_and_complex_filter_results_with_25_of_available_nodes(
self,
default_wallet,
simple_object_size: ObjectSize,
fill_field_price,
):
"""
This test checks object's copies based on container's placement policy with SELECT and Complex FILTER results with 25% of available nodes.
"""
placement_rule = "REP 1 IN Nodes25 SELECT 1 FROM BET0AND10 AS Nodes25 FILTER Price LE 10 AS LE10 FILTER Price GT 0 AS GT0 FILTER @LE10 AND @GT0 AS BET0AND10"
placement_params = {"Price": [10, 0]}
file_path = generate_file(simple_object_size.value)
expected_copies = 1
endpoint = self.cluster.default_rpc_endpoint
with reporter.step(f"Create container with policy {placement_rule}"):
cid = create_container(
wallet=default_wallet, rule=placement_rule, basic_acl=PUBLIC_ACL, shell=self.shell, endpoint=endpoint
)
with reporter.step(f"Check container policy"):
self.validate_object_policy(default_wallet, placement_rule, cid, endpoint)
with reporter.step(f"Put object in container"):
oid = put_object_to_random_node(default_wallet, file_path, cid, shell=self.shell, cluster=self.cluster)
with reporter.step(f"Check object expected copies"):
resulting_copies = get_nodes_with_object(cid, oid, shell=self.shell, nodes=self.cluster.storage_nodes)
assert (
len(resulting_copies) == expected_copies
), f"Expected {expected_copies} copies, got {len(resulting_copies)}"
with reporter.step(f"Check the object appearance"):
netmap = parse_netmap_output(get_netmap_snapshot(node=resulting_copies[0], shell=self.shell))
netmap = self.get_netmap_param(netmap)
with reporter.step(f"Check the node is selected with price between 1 and 10"):
for node in resulting_copies:
node_address = node.get_rpc_endpoint().split(":")[0]
assert (
int(netmap[node_address]["Price"]) > placement_params["Price"][1]
and int(netmap[node_address]["Price"]) <= placement_params["Price"][0]
), f"The node is selected with the wrong price. Got {netmap[node_address]}"
with reporter.step(f"Delete the object from the container"):
delete_object(wallet=default_wallet, cid=cid, oid=oid, shell=self.shell, endpoint=endpoint)
with reporter.step(f"Delete the container"):
delete_container(wallet=default_wallet, cid=cid, shell=self.shell, endpoint=endpoint)
@allure.title("Policy with Multi SELECTs and FILTERs results with 25% of available nodes")
def test_policy_with_multi_selects_and_filters_results_with_25_of_available_nodes(
self,
default_wallet,
simple_object_size: ObjectSize,
fill_field_price,
):
"""
This test checks object's copies based on container's placement policy with Multi SELECTs and FILTERs results with 25% of available nodes.
"""
placement_rule = "UNIQUE REP 1 IN One REP 1 IN One CBF 1 SELECT 1 FROM MINMAX AS One FILTER Price LT 15 AS LT15 FILTER Price GT 55 AS GT55 FILTER @LT15 OR @GT55 AS MINMAX"
placement_params = {"Price": [15, 55]}
file_path = generate_file(simple_object_size.value)
expected_copies = 2
endpoint = self.cluster.default_rpc_endpoint
with reporter.step(f"Create container with policy {placement_rule}"):
cid = create_container(
wallet=default_wallet, rule=placement_rule, basic_acl=PUBLIC_ACL, shell=self.shell, endpoint=endpoint
)
with reporter.step(f"Check container policy"):
self.validate_object_policy(default_wallet, placement_rule, cid, endpoint)
with reporter.step(f"Put object in container"):
oid = put_object_to_random_node(default_wallet, file_path, cid, shell=self.shell, cluster=self.cluster)
with reporter.step(f"Check object expected copies"):
resulting_copies = get_nodes_with_object(cid, oid, shell=self.shell, nodes=self.cluster.storage_nodes)
assert (
len(resulting_copies) == expected_copies
), f"Expected {expected_copies} copies, got {len(resulting_copies)}"
with reporter.step(f"Check the object appearance"):
netmap = parse_netmap_output(get_netmap_snapshot(node=resulting_copies[0], shell=self.shell))
netmap = self.get_netmap_param(netmap)
with reporter.step(f"Check two nodes are selected with max and min prices"):
for node in resulting_copies:
node_address = node.get_rpc_endpoint().split(":")[0]
assert (
int(netmap[node_address]["Price"]) > placement_params["Price"][1]
or int(netmap[node_address]["Price"]) < placement_params["Price"][0]
), f"The node is selected with the wrong price. Got {netmap[node_address]}"
with reporter.step(f"Delete the object from the container"):
delete_object(wallet=default_wallet, cid=cid, oid=oid, shell=self.shell, endpoint=endpoint)
with reporter.step(f"Delete the container"):
delete_container(wallet=default_wallet, cid=cid, shell=self.shell, endpoint=endpoint)
@allure.title("Simple policy results with 50% of available nodes")
def test_simple_policy_results_with_50_of_available_nodes(
self,
default_wallet,
simple_object_size: ObjectSize,
):
"""
This test checks object's copies based on container's placement simple policy results with 50% of available nodes.
"""
placement_rule = "REP 2 CBF 1"
file_path = generate_file(simple_object_size.value)
expected_copies = 2
endpoint = self.cluster.default_rpc_endpoint
with reporter.step(f"Create container with policy {placement_rule}"):
cid = create_container(
wallet=default_wallet, rule=placement_rule, basic_acl=PUBLIC_ACL, shell=self.shell, endpoint=endpoint
)
with reporter.step(f"Check container policy"):
self.validate_object_policy(default_wallet, placement_rule, cid, endpoint)
with reporter.step(f"Put object in container"):
oid = put_object_to_random_node(default_wallet, file_path, cid, shell=self.shell, cluster=self.cluster)
with reporter.step(f"Check object expected copies"):
resulting_copies = get_nodes_with_object(cid, oid, shell=self.shell, nodes=self.cluster.storage_nodes)
assert (
len(resulting_copies) == expected_copies
), f"Expected {expected_copies} copies, got {len(resulting_copies)}"
with reporter.step(f"Delete the object from the container"):
delete_object(wallet=default_wallet, cid=cid, oid=oid, shell=self.shell, endpoint=endpoint)
with reporter.step(f"Delete the container"):
delete_container(wallet=default_wallet, cid=cid, shell=self.shell, endpoint=endpoint)
@allure.title("Policy with SELECT results with 50% of available nodes")
def test_policy_with_select_results_with_50_of_available_nodes(
self,
default_wallet,
simple_object_size: ObjectSize,
):
"""
This test checks object's copies based on container's placement policy with SELECT results with 50% of available nodes.
"""
placement_rule = "REP 2 IN HALF CBF 1 SELECT 2 FROM * AS HALF"
file_path = generate_file(simple_object_size.value)
expected_copies = 2
endpoint = self.cluster.default_rpc_endpoint
with reporter.step(f"Create container with policy {placement_rule}"):
cid = create_container(
wallet=default_wallet, rule=placement_rule, basic_acl=PUBLIC_ACL, shell=self.shell, endpoint=endpoint
)
with reporter.step(f"Check container policy"):
self.validate_object_policy(default_wallet, placement_rule, cid, endpoint)
with reporter.step(f"Put object in container"):
oid = put_object_to_random_node(default_wallet, file_path, cid, shell=self.shell, cluster=self.cluster)
with reporter.step(f"Check object expected copies"):
resulting_copies = get_nodes_with_object(cid, oid, shell=self.shell, nodes=self.cluster.storage_nodes)
assert (
len(resulting_copies) == expected_copies
), f"Expected {expected_copies} copies, got {len(resulting_copies)}"
with reporter.step(f"Delete the object from the container"):
delete_object(wallet=default_wallet, cid=cid, oid=oid, shell=self.shell, endpoint=endpoint)
with reporter.step(f"Delete the container"):
delete_container(wallet=default_wallet, cid=cid, shell=self.shell, endpoint=endpoint)
@allure.title("Policy with SELECT and FILTER results with 50% of available nodes")
def test_policy_with_select_and_filter_results_with_50_of_available_nodes(
self,
default_wallet,
simple_object_size: ObjectSize,
fill_field_price,
):
"""
This test checks object's copies based on container's placement policy with SELECT and FILTER results with 50% of available nodes.
"""
placement_rule = "REP 2 IN HALF CBF 1 SELECT 2 FROM GT15 AS HALF FILTER Price GT 15 AS GT15"
placement_params = {"Price": 15}
file_path = generate_file(simple_object_size.value)
expected_copies = 2
endpoint = self.cluster.default_rpc_endpoint
with reporter.step(f"Create container with policy {placement_rule}"):
cid = create_container(
wallet=default_wallet, rule=placement_rule, basic_acl=PUBLIC_ACL, shell=self.shell, endpoint=endpoint
)
with reporter.step(f"Check container policy"):
self.validate_object_policy(default_wallet, placement_rule, cid, endpoint)
with reporter.step(f"Put object in container"):
oid = put_object_to_random_node(default_wallet, file_path, cid, shell=self.shell, cluster=self.cluster)
with reporter.step(f"Check object expected copies"):
resulting_copies = get_nodes_with_object(cid, oid, shell=self.shell, nodes=self.cluster.storage_nodes)
assert (
len(resulting_copies) == expected_copies
), f"Expected {expected_copies} copies, got {len(resulting_copies)}"
with reporter.step(f"Check the object appearance"):
netmap = parse_netmap_output(get_netmap_snapshot(node=resulting_copies[0], shell=self.shell))
netmap = self.get_netmap_param(netmap)
with reporter.step(f"Check two nodes are selected with price > {placement_params['Price']}"):
for node in resulting_copies:
node_address = node.get_rpc_endpoint().split(":")[0]
assert (
int(netmap[node_address]["Price"]) > placement_params["Price"]
), f"The node is selected with the wrong price. Got {netmap[node_address]}"
with reporter.step(f"Delete the object from the container"):
delete_object(wallet=default_wallet, cid=cid, oid=oid, shell=self.shell, endpoint=endpoint)
with reporter.step(f"Delete the container"):
delete_container(wallet=default_wallet, cid=cid, shell=self.shell, endpoint=endpoint)
@allure.title("Policy with SELECT and Complex FILTER results with 50% of available nodes")
def test_policy_with_select_and_complex_filter_results_with_50_of_available_nodes(
self,
default_wallet,
simple_object_size: ObjectSize,
fill_field_price,
):
"""
This test checks object's copies based on container's placement policy with SELECT and Complex FILTER results with 50% of available nodes.
"""
placement_rule = "REP 2 IN HALF CBF 2 SELECT 2 FROM GE15 AS HALF FILTER CountryCode NE RU AS NOTRU FILTER @NOTRU AND Price GE 15 AS GE15"
placement_params = {"Price": 15, "country_code": "RU"}
file_path = generate_file(simple_object_size.value)
expected_copies = 2
endpoint = self.cluster.default_rpc_endpoint
with reporter.step(f"Create container with policy {placement_rule}"):
cid = create_container(
wallet=default_wallet, rule=placement_rule, basic_acl=PUBLIC_ACL, shell=self.shell, endpoint=endpoint
)
with reporter.step(f"Check container policy"):
self.validate_object_policy(default_wallet, placement_rule, cid, endpoint)
with reporter.step(f"Put object in container"):
oid = put_object_to_random_node(default_wallet, file_path, cid, shell=self.shell, cluster=self.cluster)
with reporter.step(f"Check object expected copies"):
resulting_copies = get_nodes_with_object(cid, oid, shell=self.shell, nodes=self.cluster.storage_nodes)
assert (
len(resulting_copies) == expected_copies
), f"Expected {expected_copies} copies, got {len(resulting_copies)}"
with reporter.step(f"Check the object appearance"):
netmap = parse_netmap_output(get_netmap_snapshot(node=resulting_copies[0], shell=self.shell))
netmap = self.get_netmap_param(netmap)
with reporter.step(
f"Check two nodes are selected not with country code '{placement_params['country_code']}'"
):
for node in resulting_copies:
node_address = node.get_rpc_endpoint().split(":")[0]
assert (
not netmap[node_address]["country_code"] == placement_params["country_code"]
or not netmap[node_address]["country_code"] == placement_params["country_code"]
and int(netmap[node_address]["Price"]) >= placement_params["Price"]
), f"The node is selected with the wrong price or country code. Got {netmap[node_address]}"
with reporter.step(f"Delete the object from the container"):
delete_object(wallet=default_wallet, cid=cid, oid=oid, shell=self.shell, endpoint=endpoint)
with reporter.step(f"Delete the container"):
delete_container(wallet=default_wallet, cid=cid, shell=self.shell, endpoint=endpoint)
@allure.title("Policy with Multi SELECTs and FILTERs results with 50% of available nodes")
def test_policy_with_multi_selects_and_filters_results_with_50_of_available_nodes(
self,
default_wallet,
simple_object_size: ObjectSize,
fill_field_price,
):
"""
This test checks object's copies based on container's placement policy with Multi SELECTs and FILTERs results with 50% of available nodes.
"""
placement_rule = "REP 2 IN FH REP 1 IN SH CBF 2 SELECT 2 FROM LE55 AS FH SELECT 2 FROM GE15 AS SH FILTER 'UN-LOCODE' EQ 'RU LED' OR 'UN-LOCODE' EQ 'RU MOW' AS RU FILTER NOT (@RU) AS NOTRU FILTER @NOTRU AND Price GE 15 AS GE15 FILTER @RU AND Price LE 55 AS LE55"
placement_params = {"un_locode": ["RU LED", "RU MOW"], "Price": [15, 55]}
file_path = generate_file(simple_object_size.value)
expected_copies = 3
endpoint = self.cluster.default_rpc_endpoint
with reporter.step(f"Create container with policy {placement_rule}"):
cid = create_container(
wallet=default_wallet, rule=placement_rule, basic_acl=PUBLIC_ACL, shell=self.shell, endpoint=endpoint
)
with reporter.step(f"Check container policy"):
self.validate_object_policy(default_wallet, placement_rule, cid, endpoint)
with reporter.step(f"Put object in container"):
oid = put_object_to_random_node(default_wallet, file_path, cid, shell=self.shell, cluster=self.cluster)
with reporter.step(f"Check object expected copies"):
resulting_copies = get_nodes_with_object(cid, oid, shell=self.shell, nodes=self.cluster.storage_nodes)
assert (
len(resulting_copies) == expected_copies
), f"Expected {expected_copies} copies, got {len(resulting_copies)}"
with reporter.step(f"Check the object appearance"):
netmap = parse_netmap_output(get_netmap_snapshot(node=resulting_copies[0], shell=self.shell))
netmap = self.get_netmap_param(netmap)
with reporter.step(f"Check all nodes are selected"):
for node in resulting_copies:
node_address = node.get_rpc_endpoint().split(":")[0]
assert (
netmap[node_address]["un_locode"] in placement_params["un_locode"]
or not netmap[node_address]["un_locode"] == placement_params["un_locode"][1]
or (
not netmap[node_address]["un_locode"] == placement_params["un_locode"][1]
and int(netmap[node_address]["Price"]) >= placement_params["Price"][0]
)
or (
netmap[node_address]["un_locode"] == placement_params["un_locode"][1]
and int(netmap[node_address]["Price"]) <= placement_params["Price"][1]
)
), f"The node is selected with the wrong price or un_locode. Expected {placement_params} and got {netmap[node_address]}"
with reporter.step(f"Delete the object from the container"):
delete_object(wallet=default_wallet, cid=cid, oid=oid, shell=self.shell, endpoint=endpoint)
with reporter.step(f"Delete the container"):
delete_container(wallet=default_wallet, cid=cid, shell=self.shell, endpoint=endpoint)
@allure.title("Simple policy results with 75% of available nodes")
def test_simple_policy_results_with_75_of_available_nodes(
self,
default_wallet,
simple_object_size: ObjectSize,
):
"""
This test checks object's copies based on container's placement simple policy results with 75% of available nodes.
"""
placement_rule = "REP 1"
file_path = generate_file(simple_object_size.value)
expected_copies = 1
endpoint = self.cluster.default_rpc_endpoint
with reporter.step(f"Create container with policy {placement_rule}"):
cid = create_container(
wallet=default_wallet, rule=placement_rule, basic_acl=PUBLIC_ACL, shell=self.shell, endpoint=endpoint
)
with reporter.step(f"Check container policy"):
self.validate_object_policy(default_wallet, placement_rule, cid, endpoint)
with reporter.step(f"Put object in container"):
oid = put_object_to_random_node(default_wallet, file_path, cid, shell=self.shell, cluster=self.cluster)
with reporter.step(f"Check object expected copies"):
resulting_copies = get_nodes_with_object(cid, oid, shell=self.shell, nodes=self.cluster.storage_nodes)
assert (
len(resulting_copies) == expected_copies
), f"Expected {expected_copies} copies, got {len(resulting_copies)}"
with reporter.step(f"Delete the object from the container"):
delete_object(wallet=default_wallet, cid=cid, oid=oid, shell=self.shell, endpoint=endpoint)
with reporter.step(f"Delete the container"):
delete_container(wallet=default_wallet, cid=cid, shell=self.shell, endpoint=endpoint)
@allure.title("Policy with SELECT results with 75% of available nodes")
def test_policy_with_select_results_with_75_of_available_nodes(
self,
default_wallet,
simple_object_size: ObjectSize,
):
"""
This test checks object's copies based on container's placement policy with SELECT results with 75% of available nodes.
"""
placement_rule = "REP 2 IN DS CBF 1 SELECT 3 IN DISTINCT Country FROM * AS DS"
file_path = generate_file(simple_object_size.value)
expected_copies = 2
endpoint = self.cluster.default_rpc_endpoint
with reporter.step(f"Create container with policy {placement_rule}"):
cid = create_container(
wallet=default_wallet, rule=placement_rule, basic_acl=PUBLIC_ACL, shell=self.shell, endpoint=endpoint
)
with reporter.step(f"Check container policy"):
self.validate_object_policy(default_wallet, placement_rule, cid, endpoint)
with reporter.step(f"Put object in container"):
oid = put_object_to_random_node(default_wallet, file_path, cid, shell=self.shell, cluster=self.cluster)
with reporter.step(f"Check object expected copies"):
resulting_copies = get_nodes_with_object(cid, oid, shell=self.shell, nodes=self.cluster.storage_nodes)
assert (
len(resulting_copies) == expected_copies
), f"Expected {expected_copies} copies, got {len(resulting_copies)}"
with reporter.step(f"Delete the object from the container"):
delete_object(wallet=default_wallet, cid=cid, oid=oid, shell=self.shell, endpoint=endpoint)
with reporter.step(f"Delete the container"):
delete_container(wallet=default_wallet, cid=cid, shell=self.shell, endpoint=endpoint)
@allure.title("Policy with SELECT and FILTER results with 75% of available nodes")
def test_policy_with_select_and_filter_results_with_75_of_available_nodes(
self,
default_wallet,
simple_object_size: ObjectSize,
fill_field_price,
):
"""
This test checks object's copies based on container's placement policy with SELECT and FILTER results with 75% of available nodes.
"""
placement_rule = "REP 2 IN NODES75 SELECT 2 FROM LT65 AS NODES75 FILTER Price LT 65 AS LT65"
placement_params = {"Price": 65}
file_path = generate_file(simple_object_size.value)
expected_copies = 2
endpoint = self.cluster.default_rpc_endpoint
with reporter.step(f"Create container with policy {placement_rule}"):
cid = create_container(
wallet=default_wallet, rule=placement_rule, basic_acl=PUBLIC_ACL, shell=self.shell, endpoint=endpoint
)
with reporter.step(f"Check container policy"):
self.validate_object_policy(default_wallet, placement_rule, cid, endpoint)
with reporter.step(f"Put object in container"):
oid = put_object_to_random_node(default_wallet, file_path, cid, shell=self.shell, cluster=self.cluster)
with reporter.step(f"Check object expected copies"):
resulting_copies = get_nodes_with_object(cid, oid, shell=self.shell, nodes=self.cluster.storage_nodes)
assert (
len(resulting_copies) == expected_copies
), f"Expected {expected_copies} copies, got {len(resulting_copies)}"
with reporter.step(f"Check the object appearance"):
netmap = parse_netmap_output(get_netmap_snapshot(node=resulting_copies[0], shell=self.shell))
netmap = self.get_netmap_param(netmap)
with reporter.step(f"Check two nodes are selected with price < {placement_params['Price']}"):
for node in resulting_copies:
node_address = node.get_rpc_endpoint().split(":")[0]
assert (
int(netmap[node_address]["Price"]) < placement_params["Price"]
), f"The node is selected with the wrong price. Got {netmap[node_address]}"
with reporter.step(f"Delete the object from the container"):
delete_object(wallet=default_wallet, cid=cid, oid=oid, shell=self.shell, endpoint=endpoint)
with reporter.step(f"Delete the container"):
delete_container(wallet=default_wallet, cid=cid, shell=self.shell, endpoint=endpoint)
@allure.title("Policy with SELECT and Complex FILTER results with 75% of available nodes")
def test_policy_with_select_and_complex_filter_results_with_75_of_available_nodes(
self,
default_wallet,
simple_object_size: ObjectSize,
fill_field_price: None,
):
"""
This test checks object's copies based on container's placement policy with SELECT and Complex FILTER results with 75% of available nodes.
"""
placement_rule = "REP 2 IN NODES75 SELECT 2 FROM LT65 AS NODES75 FILTER Continent NE America AS NOAM FILTER @NOAM AND Price LT 65 AS LT65"
placement_params = {"Price": 65, "continent": "America"}
file_path = generate_file(simple_object_size.value)
expected_copies = 2
endpoint = self.cluster.default_rpc_endpoint
with reporter.step(f"Create container with policy {placement_rule}"):
cid = create_container(
wallet=default_wallet, rule=placement_rule, basic_acl=PUBLIC_ACL, shell=self.shell, endpoint=endpoint
)
with reporter.step(f"Check container policy"):
self.validate_object_policy(default_wallet, placement_rule, cid, endpoint)
with reporter.step(f"Put object in container"):
oid = put_object_to_random_node(default_wallet, file_path, cid, shell=self.shell, cluster=self.cluster)
with reporter.step(f"Check object expected copies"):
resulting_copies = get_nodes_with_object(cid, oid, shell=self.shell, nodes=self.cluster.storage_nodes)
assert (
len(resulting_copies) == expected_copies
), f"Expected {expected_copies} copies, got {len(resulting_copies)}"
with reporter.step(f"Check the object appearance"):
netmap = parse_netmap_output(get_netmap_snapshot(node=resulting_copies[0], shell=self.shell))
netmap = self.get_netmap_param(netmap)
with reporter.step(f"Check three nodes are selected not from {placement_params['continent']}"):
for node in resulting_copies:
node_address = node.get_rpc_endpoint().split(":")[0]
assert (
int(netmap[node_address]["Price"]) < placement_params["Price"]
and not netmap[node_address]["continent"] == placement_params["continent"]
) or (
not netmap[node_address]["continent"] == placement_params["continent"]
), f"The node is selected with the wrong price or continent. Got {netmap[node_address]}"
with reporter.step(f"Delete the object from the container"):
delete_object(wallet=default_wallet, cid=cid, oid=oid, shell=self.shell, endpoint=endpoint)
with reporter.step(f"Delete the container"):
delete_container(wallet=default_wallet, cid=cid, shell=self.shell, endpoint=endpoint)
@allure.title("Policy with Multi SELECTs and FILTERs results with 75% of available nodes")
def test_policy_with_multi_selects_and_filters_results_with_75_of_available_nodes(
self,
default_wallet,
simple_object_size: ObjectSize,
fill_field_price,
):
"""
This test checks object's copies based on container's placement policy with Multi SELECTs and FILTERs results with 75% of available nodes.
"""
placement_rule = "REP 3 IN EXPNSV REP 3 IN CHEAP SELECT 3 FROM GT10 AS EXPNSV SELECT 3 FROM LT65 AS CHEAP FILTER NOT (Continent EQ America) AS NOAM FILTER @NOAM AND Price LT 65 AS LT65 FILTER @NOAM AND Price GT 10 AS GT10"
placement_params = {"Price": [65, 10], "continent": "America"}
file_path = generate_file(simple_object_size.value)
expected_copies = 4
endpoint = self.cluster.default_rpc_endpoint
with reporter.step(f"Create container with policy {placement_rule}"):
cid = create_container(
wallet=default_wallet, rule=placement_rule, basic_acl=PUBLIC_ACL, shell=self.shell, endpoint=endpoint
)
with reporter.step(f"Check container policy"):
self.validate_object_policy(default_wallet, placement_rule, cid, endpoint)
with reporter.step(f"Put object in container"):
oid = put_object_to_random_node(default_wallet, file_path, cid, shell=self.shell, cluster=self.cluster)
with reporter.step(f"Check object expected copies"):
resulting_copies = get_nodes_with_object(cid, oid, shell=self.shell, nodes=self.cluster.storage_nodes)
assert (
len(resulting_copies) == expected_copies
), f"Expected {expected_copies} copies, got {len(resulting_copies)}"
with reporter.step(f"Check the object appearance"):
netmap = parse_netmap_output(get_netmap_snapshot(node=resulting_copies[0], shell=self.shell))
netmap = self.get_netmap_param(netmap)
with reporter.step(f"Check all nodes are selected"):
for node in resulting_copies:
node_address = node.get_rpc_endpoint().split(":")[0]
assert (
(
int(netmap[node_address]["Price"]) > placement_params["Price"][1]
and not netmap[node_address]["continent"] == placement_params["continent"]
)
or (
int(netmap[node_address]["Price"]) < placement_params["Price"][0]
and not netmap[node_address]["continent"] == placement_params["continent"]
)
or not (netmap[node_address]["continent"] == placement_params["continent"])
), f"The node is selected with the wrong price or continent. Got {netmap[node_address]}"
with reporter.step(f"Delete the object from the container"):
delete_object(wallet=default_wallet, cid=cid, oid=oid, shell=self.shell, endpoint=endpoint)
with reporter.step(f"Delete the container"):
delete_container(wallet=default_wallet, cid=cid, shell=self.shell, endpoint=endpoint)
@allure.title("Simple policy results with 100% of available nodes")
def test_simple_policy_results_with_100_of_available_nodes(
self,
default_wallet,
simple_object_size: ObjectSize,
):
"""
This test checks object's copies based on container's placement simple policy results with 100% of available nodes.
"""
placement_rule = "REP 4"
file_path = generate_file(simple_object_size.value)
expected_copies = 4
endpoint = self.cluster.default_rpc_endpoint
with reporter.step(f"Create container with policy {placement_rule}"):
cid = create_container(
wallet=default_wallet, rule=placement_rule, basic_acl=PUBLIC_ACL, shell=self.shell, endpoint=endpoint
)
with reporter.step(f"Check container policy"):
self.validate_object_policy(default_wallet, placement_rule, cid, endpoint)
with reporter.step(f"Put object in container"):
oid = put_object_to_random_node(default_wallet, file_path, cid, shell=self.shell, cluster=self.cluster)
with reporter.step(f"Check object expected copies"):
resulting_copies = get_nodes_with_object(cid, oid, shell=self.shell, nodes=self.cluster.storage_nodes)
assert (
len(resulting_copies) == expected_copies
), f"Expected {expected_copies} copies, got {len(resulting_copies)}"
with reporter.step(f"Delete the object from the container"):
delete_object(wallet=default_wallet, cid=cid, oid=oid, shell=self.shell, endpoint=endpoint)
with reporter.step(f"Delete the container"):
delete_container(wallet=default_wallet, cid=cid, shell=self.shell, endpoint=endpoint)
@allure.title("Policy with SELECT results with 100% of available nodes")
def test_policy_with_select_results_with_100_of_available_nodes(
self,
default_wallet,
simple_object_size: ObjectSize,
):
"""
This test checks object's copies based on container's placement policy with SELECT results with 100% of available nodes.
"""
placement_rule = "REP 1 IN All SELECT 4 FROM * AS All"
file_path = generate_file(simple_object_size.value)
expected_copies = 1
endpoint = self.cluster.default_rpc_endpoint
with reporter.step(f"Create container with policy {placement_rule}"):
cid = create_container(
wallet=default_wallet, rule=placement_rule, basic_acl=PUBLIC_ACL, shell=self.shell, endpoint=endpoint
)
with reporter.step(f"Check container policy"):
self.validate_object_policy(default_wallet, placement_rule, cid, endpoint)
with reporter.step(f"Put object in container"):
oid = put_object_to_random_node(default_wallet, file_path, cid, shell=self.shell, cluster=self.cluster)
with reporter.step(f"Check object expected copies"):
resulting_copies = get_nodes_with_object(cid, oid, shell=self.shell, nodes=self.cluster.storage_nodes)
assert (
len(resulting_copies) == expected_copies
), f"Expected {expected_copies} copies, got {len(resulting_copies)}"
with reporter.step(f"Delete the object from the container"):
delete_object(wallet=default_wallet, cid=cid, oid=oid, shell=self.shell, endpoint=endpoint)
with reporter.step(f"Delete the container"):
delete_container(wallet=default_wallet, cid=cid, shell=self.shell, endpoint=endpoint)
@allure.title("Policy with SELECT and FILTER results with 100% of available nodes")
def test_policy_with_select_and_filter_results_with_100_of_available_nodes(
self,
default_wallet,
simple_object_size: ObjectSize,
fill_field_price,
):
"""
This test checks object's copies based on container's placement policy with SELECT and FILTER results with 100% of available nodes.
"""
placement_rule = "REP 1 IN All SELECT 4 FROM AllNodes AS All FILTER Price GE 0 AS AllNodes"
placement_params = {"Price": 0}
file_path = generate_file(simple_object_size.value)
expected_copies = 1
endpoint = self.cluster.default_rpc_endpoint
with reporter.step(f"Create container with policy {placement_rule}"):
cid = create_container(
wallet=default_wallet, rule=placement_rule, basic_acl=PUBLIC_ACL, shell=self.shell, endpoint=endpoint
)
with reporter.step(f"Check container policy"):
self.validate_object_policy(default_wallet, placement_rule, cid, endpoint)
with reporter.step(f"Put object in container"):
oid = put_object_to_random_node(default_wallet, file_path, cid, shell=self.shell, cluster=self.cluster)
with reporter.step(f"Check object expected copies"):
resulting_copies = get_nodes_with_object(cid, oid, shell=self.shell, nodes=self.cluster.storage_nodes)
assert (
len(resulting_copies) == expected_copies
), f"Expected {expected_copies} copies, got {len(resulting_copies)}"
with reporter.step(f"Check the object appearance"):
netmap = parse_netmap_output(get_netmap_snapshot(node=resulting_copies[0], shell=self.shell))
netmap = self.get_netmap_param(netmap)
node_address = resulting_copies[0].get_rpc_endpoint().split(":")[0]
with reporter.step(f"Check the node is selected with price >= {placement_params['Price']}"):
assert (
int(netmap[node_address]["Price"]) >= placement_params["Price"]
), f"The node is selected with the wrong price. Got {netmap[node_address]}"
with reporter.step(f"Delete the object from the container"):
delete_object(wallet=default_wallet, cid=cid, oid=oid, shell=self.shell, endpoint=endpoint)
with reporter.step(f"Delete the container"):
delete_container(wallet=default_wallet, cid=cid, shell=self.shell, endpoint=endpoint)
@allure.title("Policy with SELECT and Complex FILTER results with 100% of available nodes")
def test_policy_with_select_and_complex_filter_results_with_100_of_available_nodes(
self,
default_wallet,
simple_object_size: ObjectSize,
):
"""
This test checks object's copies based on container's placement policy with SELECT and Complex FILTER results with 100% of available nodes.
"""
placement_rule = "REP 2 IN All SELECT 4 FROM AllNodes AS All FILTER Country EQ Russia OR Country EQ Sweden OR Country EQ Finland AS AllCountries FILTER @AllCountries AND Continent EQ Europe AS AllNodes"
placement_params = {"country": ["Russia", "Sweden", "Finland"], "continent": "Europe"}
file_path = generate_file(simple_object_size.value)
expected_copies = 2
endpoint = self.cluster.default_rpc_endpoint
with reporter.step(f"Create container with policy {placement_rule}"):
cid = create_container(
wallet=default_wallet, rule=placement_rule, basic_acl=PUBLIC_ACL, shell=self.shell, endpoint=endpoint
)
with reporter.step(f"Check container policy"):
self.validate_object_policy(default_wallet, placement_rule, cid, endpoint)
with reporter.step(f"Put object in container"):
oid = put_object_to_random_node(default_wallet, file_path, cid, shell=self.shell, cluster=self.cluster)
with reporter.step(f"Check object expected copies"):
resulting_copies = get_nodes_with_object(cid, oid, shell=self.shell, nodes=self.cluster.storage_nodes)
assert (
len(resulting_copies) == expected_copies
), f"Expected {expected_copies} copies, got {len(resulting_copies)}"
with reporter.step(f"Check the object appearance"):
netmap = parse_netmap_output(get_netmap_snapshot(node=resulting_copies[0], shell=self.shell))
netmap = self.get_netmap_param(netmap)
with reporter.step(f"Check two nodes are selected from {' or '.join(placement_params['country'])}"):
for node in resulting_copies:
node_address = node.get_rpc_endpoint().split(":")[0]
assert (
(netmap[node_address]["country"] in placement_params["country"])
or (netmap[node_address]["country"] in placement_params["country"])
and (netmap[node_address]["continent"] == placement_params["continent"])
), f"The node is selected from the wrong country or continent. Got {netmap[node_address]}"
with reporter.step(f"Delete the object from the container"):
delete_object(wallet=default_wallet, cid=cid, oid=oid, shell=self.shell, endpoint=endpoint)
with reporter.step(f"Delete the container"):
delete_container(wallet=default_wallet, cid=cid, shell=self.shell, endpoint=endpoint)
@allure.title("Policy with Multi SELECTs and FILTERs results with 100% of available nodes")
def test_policy_with_multi_selects_and_filters_results_with_100_of_available_nodes(
self,
default_wallet,
simple_object_size: ObjectSize,
fill_field_price,
):
"""
This test checks object's copies based on container's placement policy with Multi SELECTs and FILTERs results with 100% of available nodes.
"""
placement_rule = "REP 4 IN AllOne REP 4 IN AllTwo CBF 4 SELECT 2 FROM GEZero AS AllOne SELECT 2 FROM AllCountries AS AllTwo FILTER Country EQ Russia OR Country EQ Sweden OR Country EQ Finland AS AllCountries FILTER Price GE 0 AS GEZero"
placement_params = {"country": ["Russia", "Sweden", "Finland"], "Price": 0}
file_path = generate_file(simple_object_size.value)
expected_copies = 4
endpoint = self.cluster.default_rpc_endpoint
with reporter.step(f"Create container with policy {placement_rule}"):
cid = create_container(
wallet=default_wallet, rule=placement_rule, basic_acl=PUBLIC_ACL, shell=self.shell, endpoint=endpoint
)
with reporter.step(f"Check container policy"):
self.validate_object_policy(default_wallet, placement_rule, cid, endpoint)
with reporter.step(f"Put object in container"):
oid = put_object_to_random_node(default_wallet, file_path, cid, shell=self.shell, cluster=self.cluster)
with reporter.step(f"Check object expected copies"):
resulting_copies = get_nodes_with_object(cid, oid, shell=self.shell, nodes=self.cluster.storage_nodes)
assert (
len(resulting_copies) == expected_copies
), f"Expected {expected_copies} copies, got {len(resulting_copies)}"
with reporter.step(f"Check the object appearance"):
netmap = parse_netmap_output(get_netmap_snapshot(node=resulting_copies[0], shell=self.shell))
netmap = self.get_netmap_param(netmap)
with reporter.step(f"Check all node are selected"):
for node in resulting_copies:
node_address = node.get_rpc_endpoint().split(":")[0]
assert (netmap[node_address]["country"] in placement_params["country"]) or (
int(netmap[node_address]["Price"]) >= placement_params["Price"]
), f"The node is selected from the wrong country or with wrong price. Got {netmap[node_address]}"
with reporter.step(f"Delete the object from the container"):
delete_object(wallet=default_wallet, cid=cid, oid=oid, shell=self.shell, endpoint=endpoint)
with reporter.step(f"Delete the container"):
delete_container(wallet=default_wallet, cid=cid, shell=self.shell, endpoint=endpoint)
@allure.title("Simple policy results with UNIQUE nodes")
def test_simple_policy_results_with_unique_nodes(
self,
default_wallet,
simple_object_size: ObjectSize,
):
"""
This test checks object's copies based on container's placement simple policy results with UNIQUE nodes.
"""
placement_rule = "UNIQUE REP 1 REP 1 CBF 1"
file_path = generate_file(simple_object_size.value)
expected_copies = 2
endpoint = self.cluster.default_rpc_endpoint
with reporter.step(f"Create container with policy {placement_rule}"):
cid = create_container(
wallet=default_wallet, rule=placement_rule, basic_acl=PUBLIC_ACL, shell=self.shell, endpoint=endpoint
)
with reporter.step(f"Check container policy"):
self.validate_object_policy(default_wallet, placement_rule, cid, endpoint)
with reporter.step(f"Put object in container"):
oid = put_object_to_random_node(default_wallet, file_path, cid, shell=self.shell, cluster=self.cluster)
with reporter.step(f"Check object expected copies"):
resulting_copies = get_nodes_with_object(cid, oid, shell=self.shell, nodes=self.cluster.storage_nodes)
assert (
len(resulting_copies) == expected_copies
), f"Expected {expected_copies} copies, got {len(resulting_copies)}"
with reporter.step(f"Delete the object from the container"):
delete_object(wallet=default_wallet, cid=cid, oid=oid, shell=self.shell, endpoint=endpoint)
with reporter.step(f"Delete the container"):
delete_container(wallet=default_wallet, cid=cid, shell=self.shell, endpoint=endpoint)
@allure.title("Policy with SELECT results with UNIQUE nodes")
def test_policy_with_select_results_with_unique_nodes(
self,
default_wallet,
simple_object_size: ObjectSize,
):
"""
This test checks object's copies based on container's placement policy with SELECT results with UNIQUE nodes.
"""
placement_rule = "UNIQUE REP 1 IN AnyNode REP 1 IN AnyNode CBF 1 SELECT 1 FROM * AS AnyNode"
file_path = generate_file(simple_object_size.value)
expected_copies = 2
endpoint = self.cluster.default_rpc_endpoint
with reporter.step(f"Create container with policy {placement_rule}"):
cid = create_container(
wallet=default_wallet, rule=placement_rule, basic_acl=PUBLIC_ACL, shell=self.shell, endpoint=endpoint
)
with reporter.step(f"Check container policy"):
self.validate_object_policy(default_wallet, placement_rule, cid, endpoint)
with reporter.step(f"Put object in container"):
oid = put_object_to_random_node(default_wallet, file_path, cid, shell=self.shell, cluster=self.cluster)
with reporter.step(f"Check object expected copies"):
resulting_copies = get_nodes_with_object(cid, oid, shell=self.shell, nodes=self.cluster.storage_nodes)
assert (
len(resulting_copies) == expected_copies
), f"Expected {expected_copies} copies, got {len(resulting_copies)}"
with reporter.step(f"Delete the object from the container"):
delete_object(wallet=default_wallet, cid=cid, oid=oid, shell=self.shell, endpoint=endpoint)
with reporter.step(f"Delete the container"):
delete_container(wallet=default_wallet, cid=cid, shell=self.shell, endpoint=endpoint)
@allure.title("Policy with SELECT and Complex FILTER results with UNIQUE nodes")
def test_policy_with_select_and_complex_filter_results_with_unique_nodes(
self,
default_wallet,
simple_object_size: ObjectSize,
):
"""
This test checks object's copies based on container's placement policy with SELECT and Complex FILTER results with UNIQUE nodes.
"""
placement_rule = "UNIQUE REP 1 IN RUS REP 1 IN RUS CBF 1 SELECT 1 FROM RU AS RUS FILTER Country NE Sweden AS NotSE FILTER @NotSE AND NOT (CountryCode EQ FI) AND Country EQ 'Russia' AS RU"
placement_params = {"country_code": "FI", "country": ["Sweden", "Russia"]}
file_path = generate_file(simple_object_size.value)
expected_copies = 2
endpoint = self.cluster.default_rpc_endpoint
with reporter.step(f"Create container with policy {placement_rule}"):
cid = create_container(
wallet=default_wallet, rule=placement_rule, basic_acl=PUBLIC_ACL, shell=self.shell, endpoint=endpoint
)
with reporter.step(f"Check container policy"):
self.validate_object_policy(default_wallet, placement_rule, cid, endpoint)
with reporter.step(f"Put object in container"):
oid = put_object_to_random_node(default_wallet, file_path, cid, shell=self.shell, cluster=self.cluster)
with reporter.step(f"Check object expected copies"):
resulting_copies = get_nodes_with_object(cid, oid, shell=self.shell, nodes=self.cluster.storage_nodes)
assert (
len(resulting_copies) == expected_copies
), f"Expected {expected_copies} copies, got {len(resulting_copies)}"
with reporter.step(f"Check the object appearance"):
netmap = parse_netmap_output(get_netmap_snapshot(node=resulting_copies[0], shell=self.shell))
netmap = self.get_netmap_param(netmap)
with reporter.step(f"Check two nodes are selected not from {placement_params['country'][0]}"):
for node in resulting_copies:
node_address = node.get_rpc_endpoint().split(":")[0]
assert not (placement_params["country"][0] == netmap[node_address]["country"]) or (
not (placement_params["country"][0] == netmap[node_address]["country"])
and not (placement_params["country_code"] == netmap[node_address]["country_code"])
and placement_params["country"][1] == netmap[node_address]["country"]
), f"The node is selected from the wrong country or with wrong country code. Got {netmap[node_address]}"
with reporter.step(f"Delete the object from the container"):
delete_object(wallet=default_wallet, cid=cid, oid=oid, shell=self.shell, endpoint=endpoint)
with reporter.step(f"Delete the container"):
delete_container(wallet=default_wallet, cid=cid, shell=self.shell, endpoint=endpoint)
@allure.title("Policy with Multi SELECTs and FILTERs results with UNIQUE nodes")
def test_policy_with_multi_selects_and_filters_results_with_unique_nodes(
self,
default_wallet,
simple_object_size: ObjectSize,
):
"""
This test checks object's copies based on container's placement policy with Multi SELECTs and FILTERs results with UNIQUE nodes.
"""
placement_rule = "UNIQUE REP 1 IN RU REP 1 IN EU REP 1 IN RU CBF 1 SELECT 1 FROM RUS AS RU SELECT 1 FROM EUR AS EU FILTER Country EQ Russia AS RUS FILTER NOT (@RUS) AND Country EQ Sweden OR CountryCode EQ FI AS EUR"
placement_params = {"country": ["Russia", "Sweden"], "country_code": "FI"}
file_path = generate_file(simple_object_size.value)
expected_copies = 3
endpoint = self.cluster.default_rpc_endpoint
with reporter.step(f"Create container with policy {placement_rule}"):
cid = create_container(
wallet=default_wallet, rule=placement_rule, basic_acl=PUBLIC_ACL, shell=self.shell, endpoint=endpoint
)
with reporter.step(f"Check container policy"):
self.validate_object_policy(default_wallet, placement_rule, cid, endpoint)
with reporter.step(f"Put object in container"):
oid = put_object_to_random_node(default_wallet, file_path, cid, shell=self.shell, cluster=self.cluster)
with reporter.step(f"Check object expected copies"):
resulting_copies = get_nodes_with_object(cid, oid, shell=self.shell, nodes=self.cluster.storage_nodes)
assert (
len(resulting_copies) == expected_copies
), f"Expected {expected_copies} copies, got {len(resulting_copies)}"
with reporter.step(f"Check the object appearance"):
netmap = parse_netmap_output(get_netmap_snapshot(node=resulting_copies[0], shell=self.shell))
netmap = self.get_netmap_param(netmap)
with reporter.step(f"Check three nodes are selected from any country"):
for node in resulting_copies:
node_address = node.get_rpc_endpoint().split(":")[0]
assert (placement_params["country"][0] == netmap[node_address]["country"]) or (
not (placement_params["country"][0] == netmap[node_address]["country"])
and (placement_params["country"][1] == netmap[node_address]["country"])
or (placement_params["country_code"] == netmap[node_address]["country_code"])
), f"The node is selected from the wrong country or with wrong country code. Got {netmap[node_address]}"
with reporter.step(f"Delete the object from the container"):
delete_object(wallet=default_wallet, cid=cid, oid=oid, shell=self.shell, endpoint=endpoint)
with reporter.step(f"Delete the container"):
delete_container(wallet=default_wallet, cid=cid, shell=self.shell, endpoint=endpoint)
@allure.title("Policy: REP 1 IN SPB REP 1 IN MSK REP 3")
def test_policy_rep_1_in_spb_rep_1_in_msk_rep_3(
self,
default_wallet,
simple_object_size: ObjectSize,
):
"""
This test checks object's copies based on container's placement policy: REP 1 IN SPB REP 1 IN MSK REP 3.
"""
placement_rule = "REP 1 IN SPB REP 1 IN MSK REP 3 CBF 1 SELECT 2 FROM LED AS SPB SELECT 2 FROM MOW AS MSK FILTER Location EQ 'Saint Petersburg (ex Leningrad)' AS LED FILTER Location EQ Moskva AS MOW"
placement_params = {"location": ["Saint Petersburg (ex Leningrad)", "Moskva"]}
file_path = generate_file(simple_object_size.value)
expected_copies = 3
endpoint = self.cluster.default_rpc_endpoint
with reporter.step(f"Create container with policy {placement_rule}"):
cid = create_container(
wallet=default_wallet, rule=placement_rule, basic_acl=PUBLIC_ACL, shell=self.shell, endpoint=endpoint
)
with reporter.step(f"Check container policy"):
self.validate_object_policy(default_wallet, placement_rule, cid, endpoint)
with reporter.step(f"Put object in container"):
oid = put_object_to_random_node(default_wallet, file_path, cid, shell=self.shell, cluster=self.cluster)
with reporter.step(f"Check object expected copies"):
resulting_copies = get_nodes_with_object(cid, oid, shell=self.shell, nodes=self.cluster.storage_nodes)
assert len(resulting_copies) >= expected_copies and len(resulting_copies) <= len(
self.cluster.storage_nodes
), f"Expected {expected_copies} or {expected_copies + 1} copies, got {len(resulting_copies)}"
with reporter.step(f"Check the object appearance"):
netmap = parse_netmap_output(get_netmap_snapshot(node=resulting_copies[0], shell=self.shell))
netmap = self.get_netmap_param(netmap)
list_of_location = []
for node in resulting_copies:
node_address = node.get_rpc_endpoint().split(":")[0]
list_of_location.append(netmap[node_address]["location"])
with reporter.step(f"Check two or three nodes are selected from Russia and from any other country"):
assert (
placement_params["location"][0] in list_of_location
and placement_params["location"][1] in list_of_location
and len(resulting_copies) > 2
), f"The node is selected from the wrong location. Got {netmap[node_address]}"
with reporter.step(f"Delete the object from the container"):
delete_object(wallet=default_wallet, cid=cid, oid=oid, shell=self.shell, endpoint=endpoint)
with reporter.step(f"Delete the container"):
delete_container(wallet=default_wallet, cid=cid, shell=self.shell, endpoint=endpoint)
def validate_object_policy(self, wallet: str, placement_rule: str, cid: str, endpoint: str):
got_policy = placement_policy_from_container(
get_container(wallet, cid, json_mode=False, shell=self.shell, endpoint=endpoint)
)
assert got_policy.replace("'", "") == placement_rule.replace(
"'", ""
), f"Expected \n{placement_rule} and got policy \n{got_policy} are the same"
def get_netmap_param(self, netmap_info: list[NodeNetmapInfo]) -> dict:
dict_external = dict()
for node in netmap_info:
external_adress = node.external_address[0].split("/")[2]
dict_external[external_adress] = {
"country": node.country,
"country_code": node.country_code,
"Price": node.price,
"continent": node.continent,
"un_locode": node.un_locode,
"location": node.location,
}
return dict_external