2024-11-18 12:17:52 +00:00
import itertools
import allure
import pytest
from frostfs_testlib import reporter
from frostfs_testlib . cli . frostfs_cli . cli import FrostfsCli
from frostfs_testlib . shell . interfaces import Shell
from frostfs_testlib . steps . cli . container import delete_container
from frostfs_testlib . steps . cli . object import delete_object , put_object_to_random_node
from frostfs_testlib . steps . node_management import get_netmap_snapshot
from frostfs_testlib . steps . storage_policy import get_nodes_with_object
from frostfs_testlib . storage . cluster import Cluster
from frostfs_testlib . storage . controllers . cluster_state_controller import ClusterStateController
from frostfs_testlib . storage . controllers . state_managers . config_state_manager import ConfigStateManager
from frostfs_testlib . storage . dataclasses . frostfs_services import StorageNode
from frostfs_testlib . storage . dataclasses . object_size import ObjectSize
from frostfs_testlib . storage . dataclasses . wallet import WalletInfo
from frostfs_testlib . testing import parallel
from frostfs_testlib . testing . cluster_test_base import ClusterTestBase
from frostfs_testlib . testing . test_control import wait_for_success
from frostfs_testlib . utils . cli_utils import parse_netmap_output
from frostfs_testlib . utils . file_utils import generate_file
from . . . helpers . container_creation import create_container_with_ape
from . . . helpers . container_request import PUBLIC_WITH_POLICY , ContainerRequest
from . . . helpers . policy_validation import get_netmap_param , validate_object_policy
@pytest.mark.weekly
@pytest.mark.policy
@pytest.mark.policy_price
class TestPolicyWithPrice ( ClusterTestBase ) :
@wait_for_success ( 1200 , 60 , title = " Wait for full field price on node " , expected_result = True )
def await_for_price_attribute_on_nodes ( self ) :
netmap = parse_netmap_output ( get_netmap_snapshot ( node = self . cluster . storage_nodes [ 0 ] , shell = self . shell ) )
netmap = get_netmap_param ( netmap )
for node in self . cluster . storage_nodes :
node_address = node . get_rpc_endpoint ( ) . split ( " : " ) [ 0 ]
if netmap [ node_address ] [ " Price " ] is None :
return False
return True
@pytest.fixture ( scope = " module " )
def fill_field_price ( self , cluster : Cluster , cluster_state_controller_session : ClusterStateController ) :
prices = [ " 15 " , " 10 " , " 65 " , " 55 " ]
config_manager = cluster_state_controller_session . manager ( ConfigStateManager )
parallel (
config_manager . set_on_node ,
cluster . cluster_nodes ,
StorageNode ,
itertools . cycle ( [ { " node:attribute_5 " : f " Price: { price } " } for price in prices ] ) ,
)
cluster_state_controller_session . wait_after_storage_startup ( )
self . tick_epoch ( )
self . await_for_price_attribute_on_nodes ( )
yield
cluster_state_controller_session . manager ( ConfigStateManager ) . revert_all ( )
@pytest.fixture
def container (
self ,
default_wallet : WalletInfo ,
frostfs_cli : FrostfsCli ,
client_shell : Shell ,
cluster : Cluster ,
rpc_endpoint : str ,
container_request : ContainerRequest ,
# In these set of tests containers should be created after the fill_field_price fixture
fill_field_price ,
) - > str :
2024-11-20 14:11:04 +00:00
return create_container_with_ape ( container_request , frostfs_cli , default_wallet , client_shell , cluster , rpc_endpoint )
2024-11-18 12:17:52 +00:00
@allure.title ( " Policy with SELECT and FILTER results with 25 % o f available nodes " )
@pytest.mark.parametrize (
" container_request " ,
[ PUBLIC_WITH_POLICY ( " REP 1 IN Nodes25 SELECT 1 FROM LE10 AS Nodes25 FILTER Price LE 10 AS LE10 " ) ] ,
indirect = True ,
)
def test_policy_with_select_and_filter_results_with_25_of_available_nodes (
self ,
default_wallet : WalletInfo ,
rpc_endpoint : str ,
simple_object_size : ObjectSize ,
container : str ,
container_request : ContainerRequest ,
) :
"""
This test checks object ' s copies based on container ' s placement policy with SELECT and FILTER results with 25 % of available nodes .
"""
placement_params = { " Price " : 10 }
file_path = generate_file ( simple_object_size . value )
expected_copies = 1
with reporter . step ( f " Check container policy " ) :
validate_object_policy ( default_wallet , self . shell , container_request . policy , container , rpc_endpoint )
with reporter . step ( f " Put object in container " ) :
oid = put_object_to_random_node ( default_wallet , file_path , container , self . shell , self . cluster )
with reporter . step ( f " Check object expected copies " ) :
resulting_copies = get_nodes_with_object ( container , oid , shell = self . shell , nodes = self . cluster . storage_nodes )
assert len ( resulting_copies ) == expected_copies , f " Expected { expected_copies } copies, got { len ( resulting_copies ) } "
with reporter . step ( f " Check the object appearance " ) :
netmap = parse_netmap_output ( get_netmap_snapshot ( node = resulting_copies [ 0 ] , shell = self . shell ) )
netmap = get_netmap_param ( netmap )
node_address = resulting_copies [ 0 ] . get_rpc_endpoint ( ) . split ( " : " ) [ 0 ]
with reporter . step ( f " Check the node is selected with price <= { placement_params [ ' Price ' ] } " ) :
assert (
int ( netmap [ node_address ] [ " Price " ] ) < = placement_params [ " Price " ]
) , f " The node is selected with the wrong price. Got { netmap [ node_address ] } "
with reporter . step ( f " Delete the object from the container " ) :
delete_object ( default_wallet , container , oid , self . shell , rpc_endpoint )
with reporter . step ( f " Delete the container " ) :
delete_container ( default_wallet , container , self . shell , rpc_endpoint , await_mode = False )
@allure.title ( " Policy with select and complex filter results with 25 % o f available nodes " )
@pytest.mark.parametrize (
" container_request " ,
[
PUBLIC_WITH_POLICY (
" REP 1 IN Nodes25 SELECT 1 FROM BET0AND10 AS Nodes25 FILTER Price LE 10 AS LE10 FILTER Price GT 0 AS GT0 FILTER @LE10 AND @GT0 AS BET0AND10 "
)
] ,
indirect = True ,
)
def test_policy_with_select_and_complex_filter_results_with_25_of_available_nodes (
self ,
default_wallet : WalletInfo ,
rpc_endpoint : str ,
simple_object_size : ObjectSize ,
container : str ,
container_request : ContainerRequest ,
) :
"""
This test checks object ' s copies based on container ' s placement policy with SELECT and Complex FILTER results with 25 % of available nodes .
"""
placement_params = { " Price " : [ 10 , 0 ] }
file_path = generate_file ( simple_object_size . value )
expected_copies = 1
with reporter . step ( f " Check container policy " ) :
validate_object_policy ( default_wallet , self . shell , container_request . policy , container , rpc_endpoint )
with reporter . step ( f " Put object in container " ) :
oid = put_object_to_random_node ( default_wallet , file_path , container , self . shell , self . cluster )
with reporter . step ( f " Check object expected copies " ) :
resulting_copies = get_nodes_with_object ( container , oid , shell = self . shell , nodes = self . cluster . storage_nodes )
assert len ( resulting_copies ) == expected_copies , f " Expected { expected_copies } copies, got { len ( resulting_copies ) } "
with reporter . step ( f " Check the object appearance " ) :
netmap = parse_netmap_output ( get_netmap_snapshot ( node = resulting_copies [ 0 ] , shell = self . shell ) )
netmap = get_netmap_param ( netmap )
with reporter . step ( f " Check the node is selected with price between 1 and 10 " ) :
for node in resulting_copies :
node_address = node . get_rpc_endpoint ( ) . split ( " : " ) [ 0 ]
assert (
int ( netmap [ node_address ] [ " Price " ] ) > placement_params [ " Price " ] [ 1 ]
and int ( netmap [ node_address ] [ " Price " ] ) < = placement_params [ " Price " ] [ 0 ]
) , f " The node is selected with the wrong price. Got { netmap [ node_address ] } "
with reporter . step ( f " Delete the object from the container " ) :
delete_object ( default_wallet , container , oid , self . shell , rpc_endpoint )
with reporter . step ( f " Delete the container " ) :
delete_container ( default_wallet , container , self . shell , rpc_endpoint , await_mode = False )
@allure.title ( " Policy with Multi SELECTs and FILTERs results with 25 % o f available nodes " )
@pytest.mark.parametrize (
" container_request " ,
[
PUBLIC_WITH_POLICY (
" UNIQUE REP 1 IN One REP 1 IN One CBF 1 SELECT 1 FROM MINMAX AS One FILTER Price LT 15 AS LT15 FILTER Price GT 55 AS GT55 FILTER @LT15 OR @GT55 AS MINMAX "
)
] ,
indirect = True ,
)
def test_policy_with_multi_selects_and_filters_results_with_25_of_available_nodes (
self ,
default_wallet : WalletInfo ,
rpc_endpoint : str ,
simple_object_size : ObjectSize ,
container : str ,
container_request : ContainerRequest ,
) :
"""
This test checks object ' s copies based on container ' s placement policy with Multi SELECTs and FILTERs results with 25 % of available nodes .
"""
placement_params = { " Price " : [ 15 , 55 ] }
file_path = generate_file ( simple_object_size . value )
expected_copies = 2
with reporter . step ( f " Check container policy " ) :
validate_object_policy ( default_wallet , self . shell , container_request . policy , container , rpc_endpoint )
with reporter . step ( f " Put object in container " ) :
oid = put_object_to_random_node ( default_wallet , file_path , container , self . shell , self . cluster )
with reporter . step ( f " Check object expected copies " ) :
resulting_copies = get_nodes_with_object ( container , oid , shell = self . shell , nodes = self . cluster . storage_nodes )
assert len ( resulting_copies ) == expected_copies , f " Expected { expected_copies } copies, got { len ( resulting_copies ) } "
with reporter . step ( f " Check the object appearance " ) :
netmap = parse_netmap_output ( get_netmap_snapshot ( node = resulting_copies [ 0 ] , shell = self . shell ) )
netmap = get_netmap_param ( netmap )
with reporter . step ( f " Check two nodes are selected with max and min prices " ) :
for node in resulting_copies :
node_address = node . get_rpc_endpoint ( ) . split ( " : " ) [ 0 ]
assert (
int ( netmap [ node_address ] [ " Price " ] ) > placement_params [ " Price " ] [ 1 ]
or int ( netmap [ node_address ] [ " Price " ] ) < placement_params [ " Price " ] [ 0 ]
) , f " The node is selected with the wrong price. Got { netmap [ node_address ] } "
with reporter . step ( f " Delete the object from the container " ) :
delete_object ( default_wallet , container , oid , self . shell , rpc_endpoint )
with reporter . step ( f " Delete the container " ) :
delete_container ( default_wallet , container , self . shell , rpc_endpoint , await_mode = False )
@allure.title ( " Policy with SELECT and FILTER results with 50 % o f available nodes " )
@pytest.mark.parametrize (
" container_request " ,
[ PUBLIC_WITH_POLICY ( " REP 2 IN HALF CBF 1 SELECT 2 FROM GT15 AS HALF FILTER Price GT 15 AS GT15 " ) ] ,
indirect = True ,
)
def test_policy_with_select_and_filter_results_with_50_of_available_nodes (
self ,
default_wallet : WalletInfo ,
rpc_endpoint : str ,
simple_object_size : ObjectSize ,
container : str ,
container_request : ContainerRequest ,
) :
"""
This test checks object ' s copies based on container ' s placement policy with SELECT and FILTER results with 50 % of available nodes .
"""
placement_params = { " Price " : 15 }
file_path = generate_file ( simple_object_size . value )
expected_copies = 2
with reporter . step ( f " Check container policy " ) :
validate_object_policy ( default_wallet , self . shell , container_request . policy , container , rpc_endpoint )
with reporter . step ( f " Put object in container " ) :
oid = put_object_to_random_node ( default_wallet , file_path , container , self . shell , self . cluster )
with reporter . step ( f " Check object expected copies " ) :
resulting_copies = get_nodes_with_object ( container , oid , shell = self . shell , nodes = self . cluster . storage_nodes )
assert len ( resulting_copies ) == expected_copies , f " Expected { expected_copies } copies, got { len ( resulting_copies ) } "
with reporter . step ( f " Check the object appearance " ) :
netmap = parse_netmap_output ( get_netmap_snapshot ( node = resulting_copies [ 0 ] , shell = self . shell ) )
netmap = get_netmap_param ( netmap )
with reporter . step ( f " Check two nodes are selected with price > { placement_params [ ' Price ' ] } " ) :
for node in resulting_copies :
node_address = node . get_rpc_endpoint ( ) . split ( " : " ) [ 0 ]
assert (
int ( netmap [ node_address ] [ " Price " ] ) > placement_params [ " Price " ]
) , f " The node is selected with the wrong price. Got { netmap [ node_address ] } "
with reporter . step ( f " Delete the object from the container " ) :
delete_object ( default_wallet , container , oid , self . shell , rpc_endpoint )
with reporter . step ( f " Delete the container " ) :
delete_container ( default_wallet , container , self . shell , rpc_endpoint , await_mode = False )
@allure.title ( " Policy with SELECT and Complex FILTER results with 50 % o f available nodes " )
@pytest.mark.parametrize (
" container_request " ,
[
PUBLIC_WITH_POLICY (
" REP 2 IN HALF CBF 2 SELECT 2 FROM GE15 AS HALF FILTER CountryCode NE RU AS NOTRU FILTER @NOTRU AND Price GE 15 AS GE15 "
)
] ,
indirect = True ,
)
def test_policy_with_select_and_complex_filter_results_with_50_of_available_nodes (
self ,
default_wallet : WalletInfo ,
rpc_endpoint : str ,
simple_object_size : ObjectSize ,
container : str ,
container_request : ContainerRequest ,
) :
"""
This test checks object ' s copies based on container ' s placement policy with SELECT and Complex FILTER results with 50 % of available nodes .
"""
placement_params = { " Price " : 15 , " country_code " : " RU " }
file_path = generate_file ( simple_object_size . value )
expected_copies = 2
with reporter . step ( f " Check container policy " ) :
validate_object_policy ( default_wallet , self . shell , container_request . policy , container , rpc_endpoint )
with reporter . step ( f " Put object in container " ) :
oid = put_object_to_random_node ( default_wallet , file_path , container , self . shell , self . cluster )
with reporter . step ( f " Check object expected copies " ) :
resulting_copies = get_nodes_with_object ( container , oid , shell = self . shell , nodes = self . cluster . storage_nodes )
assert len ( resulting_copies ) == expected_copies , f " Expected { expected_copies } copies, got { len ( resulting_copies ) } "
with reporter . step ( f " Check the object appearance " ) :
netmap = parse_netmap_output ( get_netmap_snapshot ( node = resulting_copies [ 0 ] , shell = self . shell ) )
netmap = get_netmap_param ( netmap )
with reporter . step ( f " Check two nodes are selected not with country code ' { placement_params [ ' country_code ' ] } ' " ) :
for node in resulting_copies :
node_address = node . get_rpc_endpoint ( ) . split ( " : " ) [ 0 ]
assert (
not netmap [ node_address ] [ " country_code " ] == placement_params [ " country_code " ]
or not netmap [ node_address ] [ " country_code " ] == placement_params [ " country_code " ]
and int ( netmap [ node_address ] [ " Price " ] ) > = placement_params [ " Price " ]
) , f " The node is selected with the wrong price or country code. Got { netmap [ node_address ] } "
with reporter . step ( f " Delete the object from the container " ) :
delete_object ( default_wallet , container , oid , self . shell , rpc_endpoint )
with reporter . step ( f " Delete the container " ) :
delete_container ( default_wallet , container , self . shell , rpc_endpoint , await_mode = False )
@allure.title ( " Policy with Multi SELECTs and FILTERs results with 50 % o f available nodes " )
@pytest.mark.parametrize (
" container_request " ,
[
PUBLIC_WITH_POLICY (
" REP 2 IN FH REP 1 IN SH CBF 2 SELECT 2 FROM LE55 AS FH SELECT 2 FROM GE15 AS SH FILTER ' UN-LOCODE ' EQ ' RU LED ' OR ' UN-LOCODE ' EQ ' RU MOW ' AS RU FILTER NOT (@RU) AS NOTRU FILTER @NOTRU AND Price GE 15 AS GE15 FILTER @RU AND Price LE 55 AS LE55 "
)
] ,
indirect = True ,
)
def test_policy_with_multi_selects_and_filters_results_with_50_of_available_nodes (
self ,
default_wallet : WalletInfo ,
rpc_endpoint : str ,
simple_object_size : ObjectSize ,
container : str ,
container_request : ContainerRequest ,
) :
"""
This test checks object ' s copies based on container ' s placement policy with Multi SELECTs and FILTERs results with 50 % of available nodes .
"""
placement_params = { " un_locode " : [ " RU LED " , " RU MOW " ] , " Price " : [ 15 , 55 ] }
file_path = generate_file ( simple_object_size . value )
expected_copies = 3
with reporter . step ( f " Check container policy " ) :
validate_object_policy ( default_wallet , self . shell , container_request . policy , container , rpc_endpoint )
with reporter . step ( f " Put object in container " ) :
oid = put_object_to_random_node ( default_wallet , file_path , container , self . shell , self . cluster )
with reporter . step ( f " Check object expected copies " ) :
resulting_copies = get_nodes_with_object ( container , oid , shell = self . shell , nodes = self . cluster . storage_nodes )
assert len ( resulting_copies ) == expected_copies , f " Expected { expected_copies } copies, got { len ( resulting_copies ) } "
with reporter . step ( f " Check the object appearance " ) :
netmap = parse_netmap_output ( get_netmap_snapshot ( node = resulting_copies [ 0 ] , shell = self . shell ) )
netmap = get_netmap_param ( netmap )
with reporter . step ( f " Check all nodes are selected " ) :
for node in resulting_copies :
node_address = node . get_rpc_endpoint ( ) . split ( " : " ) [ 0 ]
assert (
netmap [ node_address ] [ " un_locode " ] in placement_params [ " un_locode " ]
or not netmap [ node_address ] [ " un_locode " ] == placement_params [ " un_locode " ] [ 1 ]
or (
not netmap [ node_address ] [ " un_locode " ] == placement_params [ " un_locode " ] [ 1 ]
and int ( netmap [ node_address ] [ " Price " ] ) > = placement_params [ " Price " ] [ 0 ]
)
or (
netmap [ node_address ] [ " un_locode " ] == placement_params [ " un_locode " ] [ 1 ]
and int ( netmap [ node_address ] [ " Price " ] ) < = placement_params [ " Price " ] [ 1 ]
)
) , f " The node is selected with the wrong price or un_locode. Expected { placement_params } and got { netmap [ node_address ] } "
with reporter . step ( f " Delete the object from the container " ) :
delete_object ( default_wallet , container , oid , self . shell , rpc_endpoint )
with reporter . step ( f " Delete the container " ) :
delete_container ( default_wallet , container , self . shell , rpc_endpoint , await_mode = False )
@allure.title ( " Policy with SELECT and FILTER results with 75 % o f available nodes " )
@pytest.mark.parametrize (
" container_request " ,
[ PUBLIC_WITH_POLICY ( " REP 2 IN NODES75 SELECT 2 FROM LT65 AS NODES75 FILTER Price LT 65 AS LT65 " ) ] ,
indirect = True ,
)
def test_policy_with_select_and_filter_results_with_75_of_available_nodes (
self ,
default_wallet : WalletInfo ,
rpc_endpoint : str ,
simple_object_size : ObjectSize ,
container : str ,
container_request : ContainerRequest ,
) :
"""
This test checks object ' s copies based on container ' s placement policy with SELECT and FILTER results with 75 % of available nodes .
"""
placement_params = { " Price " : 65 }
file_path = generate_file ( simple_object_size . value )
expected_copies = 2
with reporter . step ( f " Check container policy " ) :
validate_object_policy ( default_wallet , self . shell , container_request . policy , container , rpc_endpoint )
with reporter . step ( f " Put object in container " ) :
oid = put_object_to_random_node ( default_wallet , file_path , container , self . shell , self . cluster )
with reporter . step ( f " Check object expected copies " ) :
resulting_copies = get_nodes_with_object ( container , oid , shell = self . shell , nodes = self . cluster . storage_nodes )
assert len ( resulting_copies ) == expected_copies , f " Expected { expected_copies } copies, got { len ( resulting_copies ) } "
with reporter . step ( f " Check the object appearance " ) :
netmap = parse_netmap_output ( get_netmap_snapshot ( node = resulting_copies [ 0 ] , shell = self . shell ) )
netmap = get_netmap_param ( netmap )
with reporter . step ( f " Check two nodes are selected with price < { placement_params [ ' Price ' ] } " ) :
for node in resulting_copies :
node_address = node . get_rpc_endpoint ( ) . split ( " : " ) [ 0 ]
assert (
int ( netmap [ node_address ] [ " Price " ] ) < placement_params [ " Price " ]
) , f " The node is selected with the wrong price. Got { netmap [ node_address ] } "
with reporter . step ( f " Delete the object from the container " ) :
delete_object ( default_wallet , container , oid , self . shell , rpc_endpoint )
with reporter . step ( f " Delete the container " ) :
delete_container ( default_wallet , container , self . shell , rpc_endpoint , await_mode = False )
@allure.title ( " Policy with SELECT and Complex FILTER results with 75 % o f available nodes " )
@pytest.mark.parametrize (
" container_request " ,
[
PUBLIC_WITH_POLICY (
" REP 2 IN NODES75 SELECT 2 FROM LT65 AS NODES75 FILTER Continent NE America AS NOAM FILTER @NOAM AND Price LT 65 AS LT65 "
)
] ,
indirect = True ,
)
def test_policy_with_select_and_complex_filter_results_with_75_of_available_nodes (
self ,
default_wallet : WalletInfo ,
rpc_endpoint : str ,
simple_object_size : ObjectSize ,
container : str ,
container_request : ContainerRequest ,
) :
"""
This test checks object ' s copies based on container ' s placement policy with SELECT and Complex FILTER results with 75 % of available nodes .
"""
placement_params = { " Price " : 65 , " continent " : " America " }
file_path = generate_file ( simple_object_size . value )
expected_copies = 2
with reporter . step ( f " Check container policy " ) :
validate_object_policy ( default_wallet , self . shell , container_request . policy , container , rpc_endpoint )
with reporter . step ( f " Put object in container " ) :
oid = put_object_to_random_node ( default_wallet , file_path , container , self . shell , self . cluster )
with reporter . step ( f " Check object expected copies " ) :
resulting_copies = get_nodes_with_object ( container , oid , shell = self . shell , nodes = self . cluster . storage_nodes )
assert len ( resulting_copies ) == expected_copies , f " Expected { expected_copies } copies, got { len ( resulting_copies ) } "
with reporter . step ( f " Check the object appearance " ) :
netmap = parse_netmap_output ( get_netmap_snapshot ( node = resulting_copies [ 0 ] , shell = self . shell ) )
netmap = get_netmap_param ( netmap )
with reporter . step ( f " Check three nodes are selected not from { placement_params [ ' continent ' ] } " ) :
for node in resulting_copies :
node_address = node . get_rpc_endpoint ( ) . split ( " : " ) [ 0 ]
assert (
int ( netmap [ node_address ] [ " Price " ] ) < placement_params [ " Price " ]
and not netmap [ node_address ] [ " continent " ] == placement_params [ " continent " ]
) or (
not netmap [ node_address ] [ " continent " ] == placement_params [ " continent " ]
) , f " The node is selected with the wrong price or continent. Got { netmap [ node_address ] } "
with reporter . step ( f " Delete the object from the container " ) :
delete_object ( default_wallet , container , oid , self . shell , rpc_endpoint )
with reporter . step ( f " Delete the container " ) :
delete_container ( default_wallet , container , self . shell , rpc_endpoint , await_mode = False )
@allure.title ( " Policy with Multi SELECTs and FILTERs results with 75 % o f available nodes " )
@pytest.mark.parametrize (
" container_request " ,
[
PUBLIC_WITH_POLICY (
" REP 3 IN EXPNSV REP 3 IN CHEAP SELECT 3 FROM GT10 AS EXPNSV SELECT 3 FROM LT65 AS CHEAP FILTER NOT (Continent EQ America) AS NOAM FILTER @NOAM AND Price LT 65 AS LT65 FILTER @NOAM AND Price GT 10 AS GT10 "
)
] ,
indirect = True ,
)
def test_policy_with_multi_selects_and_filters_results_with_75_of_available_nodes (
self ,
default_wallet : WalletInfo ,
rpc_endpoint : str ,
simple_object_size : ObjectSize ,
container : str ,
container_request : ContainerRequest ,
) :
"""
This test checks object ' s copies based on container ' s placement policy with Multi SELECTs and FILTERs results with 75 % of available nodes .
"""
placement_params = { " Price " : [ 65 , 10 ] , " continent " : " America " }
file_path = generate_file ( simple_object_size . value )
expected_copies = 4
with reporter . step ( f " Check container policy " ) :
validate_object_policy ( default_wallet , self . shell , container_request . policy , container , rpc_endpoint )
with reporter . step ( f " Put object in container " ) :
oid = put_object_to_random_node ( default_wallet , file_path , container , self . shell , self . cluster )
with reporter . step ( f " Check object expected copies " ) :
resulting_copies = get_nodes_with_object ( container , oid , shell = self . shell , nodes = self . cluster . storage_nodes )
assert len ( resulting_copies ) == expected_copies , f " Expected { expected_copies } copies, got { len ( resulting_copies ) } "
with reporter . step ( f " Check the object appearance " ) :
netmap = parse_netmap_output ( get_netmap_snapshot ( node = resulting_copies [ 0 ] , shell = self . shell ) )
netmap = get_netmap_param ( netmap )
with reporter . step ( f " Check all nodes are selected " ) :
for node in resulting_copies :
node_address = node . get_rpc_endpoint ( ) . split ( " : " ) [ 0 ]
assert (
(
int ( netmap [ node_address ] [ " Price " ] ) > placement_params [ " Price " ] [ 1 ]
and not netmap [ node_address ] [ " continent " ] == placement_params [ " continent " ]
)
or (
int ( netmap [ node_address ] [ " Price " ] ) < placement_params [ " Price " ] [ 0 ]
and not netmap [ node_address ] [ " continent " ] == placement_params [ " continent " ]
)
or not ( netmap [ node_address ] [ " continent " ] == placement_params [ " continent " ] )
) , f " The node is selected with the wrong price or continent. Got { netmap [ node_address ] } "
with reporter . step ( f " Delete the object from the container " ) :
delete_object ( default_wallet , container , oid , self . shell , rpc_endpoint )
with reporter . step ( f " Delete the container " ) :
delete_container ( default_wallet , container , self . shell , rpc_endpoint , await_mode = False )
@allure.title ( " Policy with SELECT and FILTER results with 100 % o f available nodes " )
@pytest.mark.parametrize (
" container_request " , [ PUBLIC_WITH_POLICY ( " REP 1 IN All SELECT 4 FROM AllNodes AS All FILTER Price GE 0 AS AllNodes " ) ] , indirect = True
)
def test_policy_with_select_and_filter_results_with_100_of_available_nodes (
self ,
default_wallet : WalletInfo ,
rpc_endpoint : str ,
simple_object_size : ObjectSize ,
container : str ,
container_request : ContainerRequest ,
) :
"""
This test checks object ' s copies based on container ' s placement policy with SELECT and FILTER results with 100 % of available nodes .
"""
placement_params = { " Price " : 0 }
file_path = generate_file ( simple_object_size . value )
expected_copies = 1
with reporter . step ( f " Check container policy " ) :
validate_object_policy ( default_wallet , self . shell , container_request . policy , container , rpc_endpoint )
with reporter . step ( f " Put object in container " ) :
oid = put_object_to_random_node ( default_wallet , file_path , container , self . shell , self . cluster )
with reporter . step ( f " Check object expected copies " ) :
resulting_copies = get_nodes_with_object ( container , oid , shell = self . shell , nodes = self . cluster . storage_nodes )
assert len ( resulting_copies ) == expected_copies , f " Expected { expected_copies } copies, got { len ( resulting_copies ) } "
with reporter . step ( f " Check the object appearance " ) :
netmap = parse_netmap_output ( get_netmap_snapshot ( node = resulting_copies [ 0 ] , shell = self . shell ) )
netmap = get_netmap_param ( netmap )
node_address = resulting_copies [ 0 ] . get_rpc_endpoint ( ) . split ( " : " ) [ 0 ]
with reporter . step ( f " Check the node is selected with price >= { placement_params [ ' Price ' ] } " ) :
assert (
int ( netmap [ node_address ] [ " Price " ] ) > = placement_params [ " Price " ]
) , f " The node is selected with the wrong price. Got { netmap [ node_address ] } "
with reporter . step ( f " Delete the object from the container " ) :
delete_object ( default_wallet , container , oid , self . shell , rpc_endpoint )
with reporter . step ( f " Delete the container " ) :
delete_container ( default_wallet , container , self . shell , rpc_endpoint , await_mode = False )
@allure.title ( " Policy with Multi SELECTs and FILTERs results with 100 % o f available nodes " )
@pytest.mark.parametrize (
" container_request " ,
[
PUBLIC_WITH_POLICY (
" REP 4 IN AllOne REP 4 IN AllTwo CBF 4 SELECT 2 FROM GEZero AS AllOne SELECT 2 FROM AllCountries AS AllTwo FILTER Country EQ Russia OR Country EQ Sweden OR Country EQ Finland AS AllCountries FILTER Price GE 0 AS GEZero "
)
] ,
indirect = True ,
)
def test_policy_with_multi_selects_and_filters_results_with_100_of_available_nodes (
self ,
default_wallet : WalletInfo ,
rpc_endpoint : str ,
simple_object_size : ObjectSize ,
container : str ,
container_request : ContainerRequest ,
) :
"""
This test checks object ' s copies based on container ' s placement policy with Multi SELECTs and FILTERs results with 100 % of available nodes .
"""
placement_params = { " country " : [ " Russia " , " Sweden " , " Finland " ] , " Price " : 0 }
file_path = generate_file ( simple_object_size . value )
expected_copies = 4
with reporter . step ( f " Check container policy " ) :
validate_object_policy ( default_wallet , self . shell , container_request . policy , container , rpc_endpoint )
with reporter . step ( f " Put object in container " ) :
oid = put_object_to_random_node ( default_wallet , file_path , container , self . shell , self . cluster )
with reporter . step ( f " Check object expected copies " ) :
resulting_copies = get_nodes_with_object ( container , oid , shell = self . shell , nodes = self . cluster . storage_nodes )
assert len ( resulting_copies ) == expected_copies , f " Expected { expected_copies } copies, got { len ( resulting_copies ) } "
with reporter . step ( f " Check the object appearance " ) :
netmap = parse_netmap_output ( get_netmap_snapshot ( node = resulting_copies [ 0 ] , shell = self . shell ) )
netmap = get_netmap_param ( netmap )
with reporter . step ( f " Check all node are selected " ) :
for node in resulting_copies :
node_address = node . get_rpc_endpoint ( ) . split ( " : " ) [ 0 ]
assert ( netmap [ node_address ] [ " country " ] in placement_params [ " country " ] ) or (
int ( netmap [ node_address ] [ " Price " ] ) > = placement_params [ " Price " ]
) , f " The node is selected from the wrong country or with wrong price. Got { netmap [ node_address ] } "
with reporter . step ( f " Delete the object from the container " ) :
delete_object ( default_wallet , container , oid , self . shell , rpc_endpoint )
with reporter . step ( f " Delete the container " ) :
delete_container ( default_wallet , container , self . shell , rpc_endpoint , await_mode = False )