Compare commits
10 commits
6ee9b70d50
...
46e57870d0
Author | SHA1 | Date | |
---|---|---|---|
46e57870d0 | |||
0d7befe9a6 | |||
f49d68a6e7 | |||
241d3d0585 | |||
3a380755b4 | |||
5bc170e6f9 | |||
522b8e576d | |||
5ddc31cca6 | |||
9e89dba03d | |||
d327e8149b |
11 changed files with 281 additions and 265 deletions
|
@ -3,13 +3,17 @@ hosts:
|
|||
attributes:
|
||||
sudo_shell: false
|
||||
plugin_name: docker
|
||||
healthcheck_plugin_name: basic
|
||||
attributes:
|
||||
skip_readiness_check: True
|
||||
force_transactions: True
|
||||
services:
|
||||
- name: s01
|
||||
attributes:
|
||||
container_name: s01
|
||||
config_path: ../frostfs-dev-env/services/storage/.storage.env
|
||||
wallet_path: ../frostfs-dev-env/services/storage/wallet01.json
|
||||
local_config_path: ./TemporaryDir/empty-password.yml
|
||||
local_wallet_config_path: ./TemporaryDir/empty-password.yml
|
||||
local_wallet_path: ../frostfs-dev-env/services/storage/wallet01.json
|
||||
wallet_password: ""
|
||||
volume_name: storage_storage_s01
|
||||
|
@ -23,7 +27,7 @@ hosts:
|
|||
container_name: s02
|
||||
config_path: ../frostfs-dev-env/services/storage/.storage.env
|
||||
wallet_path: ../frostfs-dev-env/services/storage/wallet02.json
|
||||
local_config_path: ./TemporaryDir/empty-password.yml
|
||||
local_wallet_config_path: ./TemporaryDir/empty-password.yml
|
||||
local_wallet_path: ../frostfs-dev-env/services/storage/wallet02.json
|
||||
wallet_password: ""
|
||||
volume_name: storage_storage_s02
|
||||
|
@ -37,7 +41,7 @@ hosts:
|
|||
container_name: s03
|
||||
config_path: ../frostfs-dev-env/services/storage/.storage.env
|
||||
wallet_path: ../frostfs-dev-env/services/storage/wallet03.json
|
||||
local_config_path: ./TemporaryDir/empty-password.yml
|
||||
local_wallet_config_path: ./TemporaryDir/empty-password.yml
|
||||
local_wallet_path: ../frostfs-dev-env/services/storage/wallet03.json
|
||||
wallet_password: ""
|
||||
volume_name: storage_storage_s03
|
||||
|
@ -51,7 +55,7 @@ hosts:
|
|||
container_name: s04
|
||||
config_path: ../frostfs-dev-env/services/storage/.storage.env
|
||||
wallet_path: ../frostfs-dev-env/services/storage/wallet04.json
|
||||
local_config_path: ./TemporaryDir/empty-password.yml
|
||||
local_wallet_config_path: ./TemporaryDir/empty-password.yml
|
||||
local_wallet_path: ../frostfs-dev-env/services/storage/wallet04.json
|
||||
wallet_password: ""
|
||||
volume_name: storage_storage_s04
|
||||
|
@ -65,7 +69,7 @@ hosts:
|
|||
container_name: s3_gate
|
||||
config_path: ../frostfs-dev-env/services/s3_gate/.s3.env
|
||||
wallet_path: ../frostfs-dev-env/services/s3_gate/wallet.json
|
||||
local_config_path: ./TemporaryDir/password-s3.yml
|
||||
local_wallet_config_path: ./TemporaryDir/password-s3.yml
|
||||
local_wallet_path: ../frostfs-dev-env/services/s3_gate/wallet.json
|
||||
wallet_password: "s3"
|
||||
endpoint_data0: https://s3.frostfs.devenv:8080
|
||||
|
@ -74,7 +78,7 @@ hosts:
|
|||
container_name: http_gate
|
||||
config_path: ../frostfs-dev-env/services/http_gate/.http.env
|
||||
wallet_path: ../frostfs-dev-env/services/http_gate/wallet.json
|
||||
local_config_path: ./TemporaryDir/password-other.yml
|
||||
local_wallet_config_path: ./TemporaryDir/password-other.yml
|
||||
local_wallet_path: ../frostfs-dev-env/services/http_gate/wallet.json
|
||||
wallet_password: "one"
|
||||
endpoint_data0: http://http.frostfs.devenv
|
||||
|
@ -83,7 +87,7 @@ hosts:
|
|||
container_name: ir01
|
||||
config_path: ../frostfs-dev-env/services/ir/.ir.env
|
||||
wallet_path: ../frostfs-dev-env/services/ir/az.json
|
||||
local_config_path: ./TemporaryDir/password-other.yml
|
||||
local_wallet_config_path: ./TemporaryDir/password-other.yml
|
||||
local_wallet_path: ../frostfs-dev-env/services/ir/az.json
|
||||
wallet_password: "one"
|
||||
- name: morph-chain01
|
||||
|
@ -91,7 +95,7 @@ hosts:
|
|||
container_name: morph_chain
|
||||
config_path: ../frostfs-dev-env/services/morph_chain/protocol.privnet.yml
|
||||
wallet_path: ../frostfs-dev-env/services/morph_chain/node-wallet.json
|
||||
local_config_path: ./TemporaryDir/password-other.yml
|
||||
local_wallet_config_path: ./TemporaryDir/password-other.yml
|
||||
local_wallet_path: ../frostfs-dev-env/services/morph_chain/node-wallet.json
|
||||
wallet_password: "one"
|
||||
endpoint_internal0: http://morph-chain.frostfs.devenv:30333
|
||||
|
@ -100,7 +104,7 @@ hosts:
|
|||
container_name: main_chain
|
||||
config_path: ../frostfs-dev-env/services/chain/protocol.privnet.yml
|
||||
wallet_path: ../frostfs-dev-env/services/chain/node-wallet.json
|
||||
local_config_path: ./TemporaryDir/password-other.yml
|
||||
local_wallet_config_path: ./TemporaryDir/password-other.yml
|
||||
local_wallet_path: ../frostfs-dev-env/services/chain/node-wallet.json
|
||||
wallet_password: "one"
|
||||
endpoint_internal0: http://main-chain.frostfs.devenv:30333
|
||||
|
|
|
@ -41,6 +41,8 @@ from pytest_tests.resources.common import HOSTING_CONFIG_FILE, TEST_CYCLES_COUNT
|
|||
|
||||
logger = logging.getLogger("NeoLogger")
|
||||
|
||||
SERVICE_ACTIVE_TIME = 20
|
||||
|
||||
# Add logs check test even if it's not fit to mark selectors
|
||||
def pytest_configure(config: pytest.Config):
|
||||
markers = config.option.markexpr
|
||||
|
@ -367,11 +369,14 @@ def after_deploy_healthcheck(cluster: Cluster):
|
|||
parallel(readiness_on_node, cluster.cluster_nodes)
|
||||
|
||||
|
||||
SERVICE_ACTIVE_TIME = 20
|
||||
|
||||
|
||||
@wait_for_success(60 * SERVICE_ACTIVE_TIME * 3, 60, title="Wait for {cluster_node} readiness")
|
||||
def readiness_on_node(cluster_node: ClusterNode):
|
||||
if (
|
||||
"skip_readiness_check" in cluster_node.host.config.attributes
|
||||
and cluster_node.host.config.attributes["skip_readiness_check"]
|
||||
):
|
||||
return
|
||||
|
||||
# TODO: Move to healtcheck classes
|
||||
svc_name = cluster_node.service(StorageNode).get_service_systemctl_name()
|
||||
with reporter.step(f"Check service {svc_name} is active"):
|
||||
|
|
|
@ -86,34 +86,35 @@ class TestContainer(ClusterTestBase):
|
|||
containers_count = 3
|
||||
wallet = default_wallet
|
||||
placement_rule = "REP 2 IN X CBF 1 SELECT 2 FROM * AS X"
|
||||
iteration_count = 10
|
||||
|
||||
cids: list[str] = []
|
||||
with reporter.step(f"Create {containers_count} containers"):
|
||||
for _ in range(containers_count):
|
||||
cids.append(
|
||||
create_container(
|
||||
for iteration in range(iteration_count):
|
||||
cids: list[str] = []
|
||||
with reporter.step(f"Create {containers_count} containers"):
|
||||
for _ in range(containers_count):
|
||||
cids.append(
|
||||
create_container(
|
||||
wallet,
|
||||
rule=placement_rule,
|
||||
await_mode=False,
|
||||
shell=self.shell,
|
||||
endpoint=self.cluster.default_rpc_endpoint,
|
||||
wait_for_creation=False,
|
||||
)
|
||||
)
|
||||
|
||||
with reporter.step("Wait for containers occur in container list"):
|
||||
for cid in cids:
|
||||
wait_for_container_creation(
|
||||
wallet,
|
||||
rule=placement_rule,
|
||||
await_mode=False,
|
||||
cid,
|
||||
sleep_interval=containers_count,
|
||||
shell=self.shell,
|
||||
endpoint=self.cluster.default_rpc_endpoint,
|
||||
wait_for_creation=False,
|
||||
)
|
||||
)
|
||||
|
||||
with reporter.step("Wait for containers occur in container list"):
|
||||
for cid in cids:
|
||||
wait_for_container_creation(
|
||||
wallet,
|
||||
cid,
|
||||
sleep_interval=containers_count,
|
||||
shell=self.shell,
|
||||
endpoint=self.cluster.default_rpc_endpoint,
|
||||
)
|
||||
|
||||
with reporter.step("Delete containers and check they were deleted"):
|
||||
for cid in cids:
|
||||
delete_container(wallet, cid, shell=self.shell, endpoint=self.cluster.default_rpc_endpoint)
|
||||
self.tick_epoch()
|
||||
for cid in cids:
|
||||
wait_for_container_deletion(wallet, cid, shell=self.shell, endpoint=self.cluster.default_rpc_endpoint)
|
||||
with reporter.step("Delete containers and check they were deleted"):
|
||||
for cid in cids:
|
||||
delete_container(wallet, cid, shell=self.shell, endpoint=self.cluster.default_rpc_endpoint, await_mode=True)
|
||||
containers_list = list_containers(wallet, shell=self.shell, endpoint=self.cluster.default_rpc_endpoint)
|
||||
assert cid not in containers_list, "Container not deleted"
|
||||
|
|
|
@ -1,15 +1,18 @@
|
|||
import pytest
|
||||
import allure
|
||||
import pytest
|
||||
from frostfs_testlib import reporter
|
||||
from frostfs_testlib.resources.wellknown_acl import PUBLIC_ACL
|
||||
from frostfs_testlib.steps.cli.container import create_container, delete_container, get_container
|
||||
from frostfs_testlib.steps.cli.object import delete_object, put_object_to_random_node
|
||||
from frostfs_testlib.steps.node_management import get_netmap_snapshot
|
||||
from frostfs_testlib.steps.storage_policy import get_nodes_with_object, get_nodes_without_object
|
||||
from frostfs_testlib.steps.storage_policy import get_nodes_with_object
|
||||
from frostfs_testlib.storage.controllers.cluster_state_controller import ClusterStateController
|
||||
from frostfs_testlib.storage.controllers.state_managers.config_state_manager import ConfigStateManager
|
||||
from frostfs_testlib.storage.dataclasses.frostfs_services import StorageNode
|
||||
from frostfs_testlib.storage.dataclasses.object_size import ObjectSize
|
||||
from frostfs_testlib.storage.dataclasses.storage_object_info import NodeNetmapInfo
|
||||
from frostfs_testlib.testing.cluster_test_base import ClusterTestBase
|
||||
from frostfs_testlib.testing.test_control import wait_for_success
|
||||
from frostfs_testlib.utils.cli_utils import parse_netmap_output
|
||||
from frostfs_testlib.utils.file_utils import generate_file
|
||||
|
||||
|
@ -25,6 +28,31 @@ from pytest_tests.resources.policy_error_patterns import (
|
|||
@pytest.mark.container
|
||||
@pytest.mark.policy
|
||||
class TestPolicy(ClusterTestBase):
|
||||
@wait_for_success(1200, 60, title="Wait for full field price on node", expected_result=True)
|
||||
def await_for_price_attribute_on_nodes(self):
|
||||
netmap = parse_netmap_output(get_netmap_snapshot(node=self.cluster.storage_nodes[0], shell=self.shell))
|
||||
netmap = self.get_netmap_param(netmap)
|
||||
for node in self.cluster.storage_nodes:
|
||||
node_address = node.get_rpc_endpoint().split(":")[0]
|
||||
if netmap[node_address]["Price"] is None:
|
||||
return False
|
||||
return True
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def fill_field_price(self, cluster_state_controller: ClusterStateController):
|
||||
prices = ["15", "10", "65", "55"]
|
||||
|
||||
config_manager = cluster_state_controller.manager(ConfigStateManager)
|
||||
for i in zip(self.cluster.cluster_nodes, prices):
|
||||
config_manager.set_on_node(i[0], StorageNode, {"node:attribute_5": f"Price:{i[1]}"})
|
||||
|
||||
self.tick_epoch()
|
||||
self.await_for_price_attribute_on_nodes()
|
||||
|
||||
yield
|
||||
|
||||
cluster_state_controller.manager(ConfigStateManager).revert_all()
|
||||
|
||||
@allure.title("[NEGATIVE] Placement policy: Can't parse placement policy")
|
||||
def test_placement_policy_negative(self, default_wallet):
|
||||
"""
|
||||
|
@ -32,6 +60,7 @@ class TestPolicy(ClusterTestBase):
|
|||
"""
|
||||
placement_rule = "REP 1 IN SPB REP 1 in MSK CBF 1 SELECT 1 FROM SPBRU AS SPB SELECT 1 FROM MSKRU AS MSK FILTER SubDivCode EQ SPE AS SPBRU FILTER NOT (Country EQ Sweden OR CountryCode EQ FI) AND Location EQ Moskva AS MSKRU"
|
||||
endpoint = self.cluster.default_rpc_endpoint
|
||||
|
||||
with reporter.step(f"Create container with policy {placement_rule}"):
|
||||
with pytest.raises(Exception, match=NOT_PARSE_POLICY):
|
||||
cid = create_container(
|
||||
|
@ -93,7 +122,6 @@ class TestPolicy(ClusterTestBase):
|
|||
endpoint=endpoint,
|
||||
)
|
||||
|
||||
@pytest.mark.sanity
|
||||
@allure.title("Simple policy results with one node")
|
||||
def test_simple_policy_results_with_one_node(
|
||||
self,
|
||||
|
@ -168,7 +196,6 @@ class TestPolicy(ClusterTestBase):
|
|||
with reporter.step(f"Delete the container"):
|
||||
delete_container(wallet=default_wallet, cid=cid, shell=self.shell, endpoint=endpoint)
|
||||
|
||||
@pytest.mark.sanity
|
||||
@allure.title("Policy with SELECT and FILTER results with one node")
|
||||
def test_policy_with_select_and_filter_results_with_one_node(
|
||||
self,
|
||||
|
@ -252,9 +279,7 @@ class TestPolicy(ClusterTestBase):
|
|||
netmap = parse_netmap_output(get_netmap_snapshot(node=resulting_copies[0], shell=self.shell))
|
||||
netmap = self.get_netmap_param(netmap)
|
||||
node_address = resulting_copies[0].get_rpc_endpoint().split(":")[0]
|
||||
with reporter.step(
|
||||
f"Check the node is selected from {placement_params['country'][0]}"
|
||||
):
|
||||
with reporter.step(f"Check the node is selected from {placement_params['country'][0]}"):
|
||||
assert (
|
||||
not (placement_params["country"][1] == netmap[node_address]["country"])
|
||||
or not (placement_params["country"][1] == netmap[node_address]["country"])
|
||||
|
@ -268,7 +293,6 @@ class TestPolicy(ClusterTestBase):
|
|||
with reporter.step(f"Delete the container"):
|
||||
delete_container(wallet=default_wallet, cid=cid, shell=self.shell, endpoint=endpoint)
|
||||
|
||||
@pytest.mark.sanity
|
||||
@allure.title("Policy with Multi SELECTs and FILTERs results with one node")
|
||||
def test_policy_with_multi_selects_and_filters_results_with_one_node(
|
||||
self,
|
||||
|
@ -319,7 +343,6 @@ class TestPolicy(ClusterTestBase):
|
|||
with reporter.step(f"Delete the container"):
|
||||
delete_container(wallet=default_wallet, cid=cid, shell=self.shell, endpoint=endpoint)
|
||||
|
||||
@pytest.mark.sanity
|
||||
@allure.title("Policy with SELECT and FILTER results with UNIQUE nodes")
|
||||
def test_policy_with_select_and_filter_results_with_unique_nodes(
|
||||
self,
|
||||
|
@ -334,7 +357,7 @@ class TestPolicy(ClusterTestBase):
|
|||
file_path = generate_file(simple_object_size.value)
|
||||
expected_copies = 2
|
||||
endpoint = self.cluster.default_rpc_endpoint
|
||||
|
||||
|
||||
with reporter.step(f"Create container with policy {placement_rule}"):
|
||||
cid = create_container(
|
||||
wallet=default_wallet, rule=placement_rule, basic_acl=PUBLIC_ACL, shell=self.shell, endpoint=endpoint
|
||||
|
@ -357,7 +380,7 @@ class TestPolicy(ClusterTestBase):
|
|||
netmap = self.get_netmap_param(netmap)
|
||||
with reporter.step(f"Check two nodes are selected from {placement_params['country']}"):
|
||||
for node in resulting_copies:
|
||||
node_address = node.get_rpc_endpoint().split(":")[0]
|
||||
node_address = node.get_rpc_endpoint().split(":")[0]
|
||||
assert (
|
||||
placement_params["country"] == netmap[node_address]["country"]
|
||||
), f"The node is selected from the wrong country. Got {netmap[node_address]['country']}"
|
||||
|
@ -442,18 +465,18 @@ class TestPolicy(ClusterTestBase):
|
|||
with reporter.step(f"Delete the container"):
|
||||
delete_container(wallet=default_wallet, cid=cid, shell=self.shell, endpoint=endpoint)
|
||||
|
||||
@pytest.mark.skip(reason="price")
|
||||
@allure.title("Policy with SELECT and FILTER results with 25% of available nodes")
|
||||
def test_policy_with_select_and_filter_results_with_25_of_available_nodes(
|
||||
self,
|
||||
default_wallet,
|
||||
simple_object_size: ObjectSize,
|
||||
fill_field_price,
|
||||
):
|
||||
"""
|
||||
This test checks object's copies based on container's placement policy with SELECT and FILTER results with 25% of available nodes.
|
||||
"""
|
||||
placement_rule = "REP 1 IN Nodes25 SELECT 1 FROM LE10 AS Nodes25 FILTER Price LE 10 AS LE10"
|
||||
placement_params = {"price": 10}
|
||||
placement_params = {"Price": 10}
|
||||
file_path = generate_file(simple_object_size.value)
|
||||
expected_copies = 1
|
||||
endpoint = self.cluster.default_rpc_endpoint
|
||||
|
@ -479,9 +502,9 @@ class TestPolicy(ClusterTestBase):
|
|||
netmap = parse_netmap_output(get_netmap_snapshot(node=resulting_copies[0], shell=self.shell))
|
||||
netmap = self.get_netmap_param(netmap)
|
||||
node_address = resulting_copies[0].get_rpc_endpoint().split(":")[0]
|
||||
with reporter.step(f"Check the node is selected with price <= {placement_params['price']}"):
|
||||
with reporter.step(f"Check the node is selected with price <= {placement_params['Price']}"):
|
||||
assert (
|
||||
int(netmap[node_address]["price"]) <= placement_params["price"]
|
||||
int(netmap[node_address]["Price"]) <= placement_params["Price"]
|
||||
), f"The node is selected with the wrong price. Got {netmap[node_address]}"
|
||||
|
||||
with reporter.step(f"Delete the object from the container"):
|
||||
|
@ -490,18 +513,18 @@ class TestPolicy(ClusterTestBase):
|
|||
with reporter.step(f"Delete the container"):
|
||||
delete_container(wallet=default_wallet, cid=cid, shell=self.shell, endpoint=endpoint)
|
||||
|
||||
@pytest.mark.skip(reason="price")
|
||||
@allure.title("Policy with select and complex filter results with 25% of available nodes")
|
||||
def test_policy_with_select_and_complex_filter_results_with_25_of_available_nodes(
|
||||
self,
|
||||
default_wallet,
|
||||
simple_object_size: ObjectSize,
|
||||
fill_field_price,
|
||||
):
|
||||
"""
|
||||
This test checks object's copies based on container's placement policy with SELECT and Complex FILTER results with 25% of available nodes.
|
||||
"""
|
||||
placement_rule = "REP 1 IN Nodes25 SELECT 1 FROM BET0AND10 AS Nodes25 FILTER Price LE 10 AS LE10 FILTER Price GT 0 AS GT0 FILTER @LE10 AND @GT0 AS BET0AND10"
|
||||
placement_params = {"price": [10, 0]}
|
||||
placement_params = {"Price": [10, 0]}
|
||||
file_path = generate_file(simple_object_size.value)
|
||||
expected_copies = 1
|
||||
endpoint = self.cluster.default_rpc_endpoint
|
||||
|
@ -528,10 +551,10 @@ class TestPolicy(ClusterTestBase):
|
|||
netmap = self.get_netmap_param(netmap)
|
||||
with reporter.step(f"Check the node is selected with price between 1 and 10"):
|
||||
for node in resulting_copies:
|
||||
node_address = node.get_rpc_endpoint().split(":")[0]
|
||||
node_address = node.get_rpc_endpoint().split(":")[0]
|
||||
assert (
|
||||
int(netmap[node_address]["price"]) > placement_params["price"][1]
|
||||
and int(netmap[node_address]["price"]) <= placement_params["price"][0]
|
||||
int(netmap[node_address]["Price"]) > placement_params["Price"][1]
|
||||
and int(netmap[node_address]["Price"]) <= placement_params["Price"][0]
|
||||
), f"The node is selected with the wrong price. Got {netmap[node_address]}"
|
||||
|
||||
with reporter.step(f"Delete the object from the container"):
|
||||
|
@ -540,18 +563,18 @@ class TestPolicy(ClusterTestBase):
|
|||
with reporter.step(f"Delete the container"):
|
||||
delete_container(wallet=default_wallet, cid=cid, shell=self.shell, endpoint=endpoint)
|
||||
|
||||
@pytest.mark.skip(reason="price")
|
||||
@allure.title("Policy with Multi SELECTs and FILTERs results with 25% of available nodes")
|
||||
def test_policy_with_multi_selects_and_filters_results_with_25_of_available_nodes(
|
||||
self,
|
||||
default_wallet,
|
||||
simple_object_size: ObjectSize,
|
||||
fill_field_price,
|
||||
):
|
||||
"""
|
||||
This test checks object's copies based on container's placement policy with Multi SELECTs and FILTERs results with 25% of available nodes.
|
||||
"""
|
||||
placement_rule = "UNIQUE REP 1 IN One REP 1 IN One CBF 1 SELECT 1 FROM MINMAX AS One FILTER Price LT 15 AS LT15 FILTER Price GT 55 AS GT55 FILTER @LT15 OR @GT55 AS MINMAX"
|
||||
placement_params = {"price": [15, 55]}
|
||||
placement_params = {"Price": [15, 55]}
|
||||
file_path = generate_file(simple_object_size.value)
|
||||
expected_copies = 2
|
||||
endpoint = self.cluster.default_rpc_endpoint
|
||||
|
@ -580,8 +603,8 @@ class TestPolicy(ClusterTestBase):
|
|||
for node in resulting_copies:
|
||||
node_address = node.get_rpc_endpoint().split(":")[0]
|
||||
assert (
|
||||
int(netmap[node_address]["price"]) > placement_params["price"][1]
|
||||
or int(netmap[node_address]["price"]) < placement_params["price"][0]
|
||||
int(netmap[node_address]["Price"]) > placement_params["Price"][1]
|
||||
or int(netmap[node_address]["Price"]) < placement_params["Price"][0]
|
||||
), f"The node is selected with the wrong price. Got {netmap[node_address]}"
|
||||
|
||||
with reporter.step(f"Delete the object from the container"):
|
||||
|
@ -664,18 +687,18 @@ class TestPolicy(ClusterTestBase):
|
|||
with reporter.step(f"Delete the container"):
|
||||
delete_container(wallet=default_wallet, cid=cid, shell=self.shell, endpoint=endpoint)
|
||||
|
||||
@pytest.mark.skip(reason="price")
|
||||
@allure.title("Policy with SELECT and FILTER results with 50% of available nodes")
|
||||
def test_policy_with_select_and_filter_results_with_50_of_available_nodes(
|
||||
self,
|
||||
default_wallet,
|
||||
simple_object_size: ObjectSize,
|
||||
fill_field_price,
|
||||
):
|
||||
"""
|
||||
This test checks object's copies based on container's placement policy with SELECT and FILTER results with 50% of available nodes.
|
||||
"""
|
||||
placement_rule = "REP 2 IN HALF CBF 1 SELECT 2 FROM GT15 AS HALF FILTER Price GT 15 AS GT15"
|
||||
placement_params = {"price": 15}
|
||||
placement_params = {"Price": 15}
|
||||
file_path = generate_file(simple_object_size.value)
|
||||
expected_copies = 2
|
||||
endpoint = self.cluster.default_rpc_endpoint
|
||||
|
@ -700,11 +723,11 @@ class TestPolicy(ClusterTestBase):
|
|||
with reporter.step(f"Check the object appearance"):
|
||||
netmap = parse_netmap_output(get_netmap_snapshot(node=resulting_copies[0], shell=self.shell))
|
||||
netmap = self.get_netmap_param(netmap)
|
||||
with reporter.step(f"Check two nodes are selected with price > {placement_params['price']}"):
|
||||
with reporter.step(f"Check two nodes are selected with price > {placement_params['Price']}"):
|
||||
for node in resulting_copies:
|
||||
node_address = node.get_rpc_endpoint().split(":")[0]
|
||||
assert (
|
||||
int(netmap[node_address]["price"]) > placement_params["price"]
|
||||
int(netmap[node_address]["Price"]) > placement_params["Price"]
|
||||
), f"The node is selected with the wrong price. Got {netmap[node_address]}"
|
||||
|
||||
with reporter.step(f"Delete the object from the container"):
|
||||
|
@ -713,18 +736,18 @@ class TestPolicy(ClusterTestBase):
|
|||
with reporter.step(f"Delete the container"):
|
||||
delete_container(wallet=default_wallet, cid=cid, shell=self.shell, endpoint=endpoint)
|
||||
|
||||
@pytest.mark.skip(reason="price")
|
||||
@allure.title("Policy with SELECT and Complex FILTER results with 50% of available nodes")
|
||||
def test_policy_with_select_and_complex_filter_results_with_50_of_available_nodes(
|
||||
self,
|
||||
default_wallet,
|
||||
simple_object_size: ObjectSize,
|
||||
fill_field_price,
|
||||
):
|
||||
"""
|
||||
This test checks object's copies based on container's placement policy with SELECT and Complex FILTER results with 50% of available nodes.
|
||||
"""
|
||||
placement_rule = "REP 2 IN HALF CBF 2 SELECT 2 FROM GE15 AS HALF FILTER CountryCode NE RU AS NOTRU FILTER @NOTRU AND Price GE 15 AS GE15"
|
||||
placement_params = {"price": 15, "country_code": "RU"}
|
||||
placement_params = {"Price": 15, "country_code": "RU"}
|
||||
file_path = generate_file(simple_object_size.value)
|
||||
expected_copies = 2
|
||||
endpoint = self.cluster.default_rpc_endpoint
|
||||
|
@ -749,12 +772,15 @@ class TestPolicy(ClusterTestBase):
|
|||
with reporter.step(f"Check the object appearance"):
|
||||
netmap = parse_netmap_output(get_netmap_snapshot(node=resulting_copies[0], shell=self.shell))
|
||||
netmap = self.get_netmap_param(netmap)
|
||||
with reporter.step(f"Check two nodes are selected not with country code '{placement_params['country_code']}'"):
|
||||
with reporter.step(
|
||||
f"Check two nodes are selected not with country code '{placement_params['country_code']}'"
|
||||
):
|
||||
for node in resulting_copies:
|
||||
node_address = node.get_rpc_endpoint().split(":")[0]
|
||||
assert (not netmap[node_address]["country_code"] == placement_params["country_code"]
|
||||
assert (
|
||||
not netmap[node_address]["country_code"] == placement_params["country_code"]
|
||||
or not netmap[node_address]["country_code"] == placement_params["country_code"]
|
||||
and int(netmap[node_address]["price"]) >= placement_params["price"]
|
||||
and int(netmap[node_address]["Price"]) >= placement_params["Price"]
|
||||
), f"The node is selected with the wrong price or country code. Got {netmap[node_address]}"
|
||||
|
||||
with reporter.step(f"Delete the object from the container"):
|
||||
|
@ -763,20 +789,20 @@ class TestPolicy(ClusterTestBase):
|
|||
with reporter.step(f"Delete the container"):
|
||||
delete_container(wallet=default_wallet, cid=cid, shell=self.shell, endpoint=endpoint)
|
||||
|
||||
@pytest.mark.skip(reason="price")
|
||||
@allure.title("Policy with Multi SELECTs and FILTERs results with 50% of available nodes")
|
||||
def test_policy_with_multi_selects_and_filters_results_with_50_of_available_nodes(
|
||||
self,
|
||||
default_wallet,
|
||||
simple_object_size: ObjectSize,
|
||||
fill_field_price,
|
||||
):
|
||||
"""
|
||||
This test checks object's copies based on container's placement policy with Multi SELECTs and FILTERs results with 50% of available nodes.
|
||||
"""
|
||||
placement_rule = "REP 2 IN FH REP 1 IN SH CBF 2 SELECT 2 FROM LE55 AS FH SELECT 2 FROM GE15 AS SH FILTER 'UN-LOCODE' EQ RU_LED OR 'UN-LOCODE' EQ RU_MOW AS RU FILTER NOT(@RU) AS NOTRU FILTER @NOTRU AND Price GE 15 AS GE15 FILTER @RU AND Price LE 55 AS LE55"
|
||||
placement_params = {"un_locode": ["RU_LED", "RU_MOW"], "price": [15, 55]}
|
||||
placement_rule = "REP 2 IN FH REP 1 IN SH CBF 2 SELECT 2 FROM LE55 AS FH SELECT 2 FROM GE15 AS SH FILTER 'UN-LOCODE' EQ 'RU LED' OR 'UN-LOCODE' EQ 'RU MOW' AS RU FILTER NOT (@RU) AS NOTRU FILTER @NOTRU AND Price GE 15 AS GE15 FILTER @RU AND Price LE 55 AS LE55"
|
||||
placement_params = {"un_locode": ["RU LED", "RU MOW"], "Price": [15, 55]}
|
||||
file_path = generate_file(simple_object_size.value)
|
||||
expected_copies = 4
|
||||
expected_copies = 3
|
||||
endpoint = self.cluster.default_rpc_endpoint
|
||||
|
||||
with reporter.step(f"Create container with policy {placement_rule}"):
|
||||
|
@ -807,11 +833,11 @@ class TestPolicy(ClusterTestBase):
|
|||
or not netmap[node_address]["un_locode"] == placement_params["un_locode"][1]
|
||||
or (
|
||||
not netmap[node_address]["un_locode"] == placement_params["un_locode"][1]
|
||||
and int(netmap[node_address]["price"]) >= placement_params["price"][0]
|
||||
and int(netmap[node_address]["Price"]) >= placement_params["Price"][0]
|
||||
)
|
||||
or (
|
||||
netmap[node_address]["un_locode"] == placement_params["un_locode"][1]
|
||||
and int(netmap[node_address]["price"]) <= placement_params["price"][1]
|
||||
and int(netmap[node_address]["Price"]) <= placement_params["Price"][1]
|
||||
)
|
||||
), f"The node is selected with the wrong price or un_locode. Expected {placement_params} and got {netmap[node_address]}"
|
||||
|
||||
|
@ -821,7 +847,6 @@ class TestPolicy(ClusterTestBase):
|
|||
with reporter.step(f"Delete the container"):
|
||||
delete_container(wallet=default_wallet, cid=cid, shell=self.shell, endpoint=endpoint)
|
||||
|
||||
@pytest.mark.sanity
|
||||
@allure.title("Simple policy results with 75% of available nodes")
|
||||
def test_simple_policy_results_with_75_of_available_nodes(
|
||||
self,
|
||||
|
@ -896,18 +921,18 @@ class TestPolicy(ClusterTestBase):
|
|||
with reporter.step(f"Delete the container"):
|
||||
delete_container(wallet=default_wallet, cid=cid, shell=self.shell, endpoint=endpoint)
|
||||
|
||||
@pytest.mark.skip(reason="price")
|
||||
@allure.title("Policy with SELECT and FILTER results with 75% of available nodes")
|
||||
def test_policy_with_select_and_filter_results_with_75_of_available_nodes(
|
||||
self,
|
||||
default_wallet,
|
||||
simple_object_size: ObjectSize,
|
||||
fill_field_price,
|
||||
):
|
||||
"""
|
||||
This test checks object's copies based on container's placement policy with SELECT and FILTER results with 75% of available nodes.
|
||||
"""
|
||||
placement_rule = "REP 2 IN NODES75 SELECT 2 FROM LT65 AS NODES75 FILTER Price LT 65 AS LT65"
|
||||
placement_params = {"price": 65}
|
||||
placement_params = {"Price": 65}
|
||||
file_path = generate_file(simple_object_size.value)
|
||||
expected_copies = 2
|
||||
endpoint = self.cluster.default_rpc_endpoint
|
||||
|
@ -932,11 +957,11 @@ class TestPolicy(ClusterTestBase):
|
|||
with reporter.step(f"Check the object appearance"):
|
||||
netmap = parse_netmap_output(get_netmap_snapshot(node=resulting_copies[0], shell=self.shell))
|
||||
netmap = self.get_netmap_param(netmap)
|
||||
with reporter.step(f"Check two nodes are selected with price < {placement_params['price']}"):
|
||||
with reporter.step(f"Check two nodes are selected with price < {placement_params['Price']}"):
|
||||
for node in resulting_copies:
|
||||
node_address = node.get_rpc_endpoint().split(":")[0]
|
||||
assert (
|
||||
int(netmap[node_address]["price"]) < placement_params["price"]
|
||||
int(netmap[node_address]["Price"]) < placement_params["Price"]
|
||||
), f"The node is selected with the wrong price. Got {netmap[node_address]}"
|
||||
|
||||
with reporter.step(f"Delete the object from the container"):
|
||||
|
@ -945,20 +970,20 @@ class TestPolicy(ClusterTestBase):
|
|||
with reporter.step(f"Delete the container"):
|
||||
delete_container(wallet=default_wallet, cid=cid, shell=self.shell, endpoint=endpoint)
|
||||
|
||||
@pytest.mark.skip(reason="price")
|
||||
@allure.title("Policy with SELECT and Complex FILTER results with 75% of available nodes")
|
||||
def test_policy_with_select_and_complex_filter_results_with_75_of_available_nodes(
|
||||
self,
|
||||
default_wallet,
|
||||
simple_object_size: ObjectSize,
|
||||
fill_field_price: None,
|
||||
):
|
||||
"""
|
||||
This test checks object's copies based on container's placement policy with SELECT and Complex FILTER results with 75% of available nodes.
|
||||
"""
|
||||
placement_rule = "REP 2 IN NODES75 SELECT 2 FROM LT65 AS NODES75 FILTER Continent NE America AS NOAM FILTER @NOAM AND Price LT 65 AS LT65"
|
||||
placement_params = {"price": 65, "continent": "America"}
|
||||
placement_params = {"Price": 65, "continent": "America"}
|
||||
file_path = generate_file(simple_object_size.value)
|
||||
expected_copies = 3
|
||||
expected_copies = 2
|
||||
endpoint = self.cluster.default_rpc_endpoint
|
||||
|
||||
with reporter.step(f"Create container with policy {placement_rule}"):
|
||||
|
@ -981,12 +1006,11 @@ class TestPolicy(ClusterTestBase):
|
|||
with reporter.step(f"Check the object appearance"):
|
||||
netmap = parse_netmap_output(get_netmap_snapshot(node=resulting_copies[0], shell=self.shell))
|
||||
netmap = self.get_netmap_param(netmap)
|
||||
with reporter.step(f"Check three nodes are selected not from {placement_params['continent']}"
|
||||
):
|
||||
with reporter.step(f"Check three nodes are selected not from {placement_params['continent']}"):
|
||||
for node in resulting_copies:
|
||||
node_address = node.get_rpc_endpoint().split(":")[0]
|
||||
assert (
|
||||
int(netmap[node_address]["price"]) < placement_params["price"]
|
||||
int(netmap[node_address]["Price"]) < placement_params["Price"]
|
||||
and not netmap[node_address]["continent"] == placement_params["continent"]
|
||||
) or (
|
||||
not netmap[node_address]["continent"] == placement_params["continent"]
|
||||
|
@ -998,18 +1022,18 @@ class TestPolicy(ClusterTestBase):
|
|||
with reporter.step(f"Delete the container"):
|
||||
delete_container(wallet=default_wallet, cid=cid, shell=self.shell, endpoint=endpoint)
|
||||
|
||||
@pytest.mark.skip(reason="price")
|
||||
@allure.title("Policy with Multi SELECTs and FILTERs results with 75% of available nodes")
|
||||
def test_policy_with_multi_selects_and_filters_results_with_75_of_available_nodes(
|
||||
self,
|
||||
default_wallet,
|
||||
simple_object_size: ObjectSize,
|
||||
fill_field_price,
|
||||
):
|
||||
"""
|
||||
This test checks object's copies based on container's placement policy with Multi SELECTs and FILTERs results with 75% of available nodes.
|
||||
"""
|
||||
placement_rule = "REP 2 IN EXPNSV REP 2 IN CHEAP SELECT 3 FROM GT10 AS EXPNSV SELECT 3 FROM LT65 AS CHEAP FILTER NOT(Continent EQ America) AS NOAM FILTER @NOAM AND Price LT 65 AS LT65 FILTER @NOAM AND Price GT 10 AS GT10"
|
||||
placement_params = {"price": [65, 10], "continent": "America"}
|
||||
placement_rule = "REP 3 IN EXPNSV REP 3 IN CHEAP SELECT 3 FROM GT10 AS EXPNSV SELECT 3 FROM LT65 AS CHEAP FILTER NOT (Continent EQ America) AS NOAM FILTER @NOAM AND Price LT 65 AS LT65 FILTER @NOAM AND Price GT 10 AS GT10"
|
||||
placement_params = {"Price": [65, 10], "continent": "America"}
|
||||
file_path = generate_file(simple_object_size.value)
|
||||
expected_copies = 4
|
||||
endpoint = self.cluster.default_rpc_endpoint
|
||||
|
@ -1039,11 +1063,11 @@ class TestPolicy(ClusterTestBase):
|
|||
node_address = node.get_rpc_endpoint().split(":")[0]
|
||||
assert (
|
||||
(
|
||||
int(netmap[node_address]["price"]) > placement_params["price"][1]
|
||||
int(netmap[node_address]["Price"]) > placement_params["Price"][1]
|
||||
and not netmap[node_address]["continent"] == placement_params["continent"]
|
||||
)
|
||||
or (
|
||||
int(netmap[node_address]["price"]) < placement_params["price"][0]
|
||||
int(netmap[node_address]["Price"]) < placement_params["Price"][0]
|
||||
and not netmap[node_address]["continent"] == placement_params["continent"]
|
||||
)
|
||||
or not (netmap[node_address]["continent"] == placement_params["continent"])
|
||||
|
@ -1055,7 +1079,6 @@ class TestPolicy(ClusterTestBase):
|
|||
with reporter.step(f"Delete the container"):
|
||||
delete_container(wallet=default_wallet, cid=cid, shell=self.shell, endpoint=endpoint)
|
||||
|
||||
@pytest.mark.sanity
|
||||
@allure.title("Simple policy results with 100% of available nodes")
|
||||
def test_simple_policy_results_with_100_of_available_nodes(
|
||||
self,
|
||||
|
@ -1130,18 +1153,18 @@ class TestPolicy(ClusterTestBase):
|
|||
with reporter.step(f"Delete the container"):
|
||||
delete_container(wallet=default_wallet, cid=cid, shell=self.shell, endpoint=endpoint)
|
||||
|
||||
@pytest.mark.skip(reason="price")
|
||||
@allure.title("Policy with SELECT and FILTER results with 100% of available nodes")
|
||||
def test_policy_with_select_and_filter_results_with_100_of_available_nodes(
|
||||
self,
|
||||
default_wallet,
|
||||
simple_object_size: ObjectSize,
|
||||
fill_field_price,
|
||||
):
|
||||
"""
|
||||
This test checks object's copies based on container's placement policy with SELECT and FILTER results with 100% of available nodes.
|
||||
"""
|
||||
placement_rule = "REP 1 IN All SELECT 4 FROM AllNodes AS All FILTER Price GE 0 AS AllNodes"
|
||||
placement_params = {"price": 0}
|
||||
placement_params = {"Price": 0}
|
||||
file_path = generate_file(simple_object_size.value)
|
||||
expected_copies = 1
|
||||
endpoint = self.cluster.default_rpc_endpoint
|
||||
|
@ -1167,9 +1190,9 @@ class TestPolicy(ClusterTestBase):
|
|||
netmap = parse_netmap_output(get_netmap_snapshot(node=resulting_copies[0], shell=self.shell))
|
||||
netmap = self.get_netmap_param(netmap)
|
||||
node_address = resulting_copies[0].get_rpc_endpoint().split(":")[0]
|
||||
with reporter.step(f"Check the node is selected with price >= {placement_params['price']}"):
|
||||
with reporter.step(f"Check the node is selected with price >= {placement_params['Price']}"):
|
||||
assert (
|
||||
netmap[node_address]["price"] >= int(placement_params["price"])
|
||||
int(netmap[node_address]["Price"]) >= placement_params["Price"]
|
||||
), f"The node is selected with the wrong price. Got {netmap[node_address]}"
|
||||
|
||||
with reporter.step(f"Delete the object from the container"):
|
||||
|
@ -1233,12 +1256,13 @@ class TestPolicy(ClusterTestBase):
|
|||
self,
|
||||
default_wallet,
|
||||
simple_object_size: ObjectSize,
|
||||
fill_field_price,
|
||||
):
|
||||
"""
|
||||
This test checks object's copies based on container's placement policy with Multi SELECTs and FILTERs results with 100% of available nodes.
|
||||
"""
|
||||
placement_rule = "REP 4 IN AllOne REP 4 IN AllTwo CBF 4 SELECT 2 FROM GEZero AS AllOne SELECT 2 FROM AllCountries AS AllTwo FILTER Country EQ Russia OR Country EQ Sweden OR Country EQ Finland AS AllCountries FILTER Price GE 0 AS GEZero"
|
||||
placement_params = {"country": ["Russia", "Sweden", "Finland"], "price": 0}
|
||||
placement_params = {"country": ["Russia", "Sweden", "Finland"], "Price": 0}
|
||||
file_path = generate_file(simple_object_size.value)
|
||||
expected_copies = 4
|
||||
endpoint = self.cluster.default_rpc_endpoint
|
||||
|
@ -1265,9 +1289,9 @@ class TestPolicy(ClusterTestBase):
|
|||
netmap = self.get_netmap_param(netmap)
|
||||
with reporter.step(f"Check all node are selected"):
|
||||
for node in resulting_copies:
|
||||
node_address = node.get_rpc_endpoint().split(":")[0]
|
||||
node_address = node.get_rpc_endpoint().split(":")[0]
|
||||
assert (netmap[node_address]["country"] in placement_params["country"]) or (
|
||||
int(netmap[node_address]["price"]) >= placement_params["price"]
|
||||
int(netmap[node_address]["Price"]) >= placement_params["Price"]
|
||||
), f"The node is selected from the wrong country or with wrong price. Got {netmap[node_address]}"
|
||||
|
||||
with reporter.step(f"Delete the object from the container"):
|
||||
|
@ -1276,7 +1300,6 @@ class TestPolicy(ClusterTestBase):
|
|||
with reporter.step(f"Delete the container"):
|
||||
delete_container(wallet=default_wallet, cid=cid, shell=self.shell, endpoint=endpoint)
|
||||
|
||||
@pytest.mark.sanity
|
||||
@allure.title("Simple policy results with UNIQUE nodes")
|
||||
def test_simple_policy_results_with_unique_nodes(
|
||||
self,
|
||||
|
@ -1351,7 +1374,6 @@ class TestPolicy(ClusterTestBase):
|
|||
with reporter.step(f"Delete the container"):
|
||||
delete_container(wallet=default_wallet, cid=cid, shell=self.shell, endpoint=endpoint)
|
||||
|
||||
@pytest.mark.sanity
|
||||
@allure.title("Policy with SELECT and Complex FILTER results with UNIQUE nodes")
|
||||
def test_policy_with_select_and_complex_filter_results_with_unique_nodes(
|
||||
self,
|
||||
|
@ -1402,7 +1424,6 @@ class TestPolicy(ClusterTestBase):
|
|||
with reporter.step(f"Delete the container"):
|
||||
delete_container(wallet=default_wallet, cid=cid, shell=self.shell, endpoint=endpoint)
|
||||
|
||||
@pytest.mark.sanity
|
||||
@allure.title("Policy with Multi SELECTs and FILTERs results with UNIQUE nodes")
|
||||
def test_policy_with_multi_selects_and_filters_results_with_unique_nodes(
|
||||
self,
|
||||
|
@ -1438,11 +1459,9 @@ class TestPolicy(ClusterTestBase):
|
|||
with reporter.step(f"Check the object appearance"):
|
||||
netmap = parse_netmap_output(get_netmap_snapshot(node=resulting_copies[0], shell=self.shell))
|
||||
netmap = self.get_netmap_param(netmap)
|
||||
with reporter.step(
|
||||
f"Check three nodes are selected from any country"
|
||||
):
|
||||
with reporter.step(f"Check three nodes are selected from any country"):
|
||||
for node in resulting_copies:
|
||||
node_address = node.get_rpc_endpoint().split(":")[0]
|
||||
node_address = node.get_rpc_endpoint().split(":")[0]
|
||||
assert (placement_params["country"][0] == netmap[node_address]["country"]) or (
|
||||
not (placement_params["country"][0] == netmap[node_address]["country"])
|
||||
and (placement_params["country"][1] == netmap[node_address]["country"])
|
||||
|
@ -1523,7 +1542,7 @@ class TestPolicy(ClusterTestBase):
|
|||
dict_external[external_adress] = {
|
||||
"country": node.country,
|
||||
"country_code": node.country_code,
|
||||
"price": node.price,
|
||||
"Price": node.price,
|
||||
"continent": node.continent,
|
||||
"un_locode": node.un_locode,
|
||||
"location": node.location,
|
||||
|
|
|
@ -7,7 +7,7 @@ import allure
|
|||
import pytest
|
||||
from frostfs_testlib import reporter
|
||||
from frostfs_testlib.healthcheck.interfaces import Healthcheck
|
||||
from frostfs_testlib.resources.wellknown_acl import PUBLIC_ACL
|
||||
from frostfs_testlib.resources.wellknown_acl import EACL_PUBLIC_READ_WRITE, PUBLIC_ACL
|
||||
from frostfs_testlib.steps.cli.container import create_container
|
||||
from frostfs_testlib.steps.cli.object import (
|
||||
get_object,
|
||||
|
@ -88,6 +88,7 @@ class TestFailoverNetwork(ClusterTestBase):
|
|||
endpoint=self.cluster.default_rpc_endpoint,
|
||||
rule=placement_rule,
|
||||
await_mode=True,
|
||||
basic_acl=EACL_PUBLIC_READ_WRITE,
|
||||
)
|
||||
|
||||
storage_objects = []
|
||||
|
@ -210,11 +211,9 @@ class TestFailoverNetwork(ClusterTestBase):
|
|||
with reporter.step("Search nodes with object"):
|
||||
nodes_with_object = get_object_nodes(
|
||||
cluster=self.cluster,
|
||||
wallet=default_wallet,
|
||||
cid=storage_object.cid,
|
||||
oid=storage_object.oid,
|
||||
shell=self.shell,
|
||||
endpoint=self.cluster.default_rpc_endpoint,
|
||||
alive_node=self.cluster.cluster_nodes[0],
|
||||
)
|
||||
|
||||
with reporter.step("Get data interface to node"):
|
||||
|
@ -270,11 +269,9 @@ class TestFailoverNetwork(ClusterTestBase):
|
|||
with reporter.step("Search nodes with object"):
|
||||
nodes_with_object = get_object_nodes(
|
||||
cluster=self.cluster,
|
||||
wallet=default_wallet,
|
||||
cid=storage_object.cid,
|
||||
oid=storage_object.oid,
|
||||
shell=self.shell,
|
||||
endpoint=self.cluster.default_rpc_endpoint,
|
||||
alive_node=self.cluster.cluster_nodes[0],
|
||||
)
|
||||
|
||||
with reporter.step("Get internal interface to node"):
|
||||
|
|
|
@ -156,12 +156,7 @@ class TestFailoverServer(ClusterTestBase):
|
|||
) -> tuple[StorageObjectInfo, list[ClusterNode]]:
|
||||
object_info = container.generate_object(simple_object_size.value)
|
||||
object_nodes = get_object_nodes(
|
||||
cluster=self.cluster,
|
||||
wallet=default_wallet,
|
||||
cid=object_info.cid,
|
||||
oid=object_info.oid,
|
||||
shell=self.shell,
|
||||
endpoint=self.cluster.default_rpc_endpoint,
|
||||
cluster=self.cluster, cid=object_info.cid, oid=object_info.oid, alive_node=self.cluster.cluster_nodes[0]
|
||||
)
|
||||
return object_info, object_nodes
|
||||
|
||||
|
@ -313,12 +308,7 @@ class TestFailoverServer(ClusterTestBase):
|
|||
)
|
||||
with reporter.step("Search nodes with object"):
|
||||
object_nodes = get_object_nodes(
|
||||
cluster=self.cluster,
|
||||
wallet=default_wallet,
|
||||
cid=cid_1,
|
||||
oid=oid,
|
||||
shell=self.shell,
|
||||
endpoint=self.cluster.default_rpc_endpoint,
|
||||
cluster=self.cluster, cid=cid_1, oid=oid, alive_node=self.cluster.cluster_nodes[0]
|
||||
)
|
||||
with reporter.step("Turn off random node with object"):
|
||||
cluster_state_controller.stop_node_host(node=random.choice(object_nodes[1:]), mode="hard")
|
||||
|
|
|
@ -425,7 +425,8 @@ class TestEmptyMap(ClusterTestBase):
|
|||
|
||||
with reporter.step("Delete pilorama.db from all nodes"):
|
||||
for node in self.cluster.storage_nodes:
|
||||
node.delete_pilorama()
|
||||
for shard in node.get_shards():
|
||||
node.delete_file(shard.pilorama)
|
||||
|
||||
with reporter.step("Start all storage nodes"):
|
||||
cluster_state_controller.start_all_stopped_services()
|
||||
|
@ -445,17 +446,6 @@ class TestEmptyMap(ClusterTestBase):
|
|||
@pytest.mark.failover
|
||||
@pytest.mark.failover_data_loss
|
||||
class TestStorageDataLoss(ClusterTestBase):
|
||||
@reporter.step("Get list of all piloramas on node")
|
||||
def get_piloramas_list(self, node: StorageNode) -> list:
|
||||
data_directory_path = node.get_data_directory()
|
||||
|
||||
cmd = f"sudo ls -1 {data_directory_path}/meta*/pilorama*"
|
||||
shell = node.host.get_shell()
|
||||
stdout = shell.exec(cmd).stdout
|
||||
|
||||
piloramas = stdout.split("\n")
|
||||
return piloramas
|
||||
|
||||
@allure.title(
|
||||
"After metabase loss on all nodes operations on objects and buckets should be still available via S3 (s3_client={s3_client})"
|
||||
)
|
||||
|
@ -663,15 +653,22 @@ class TestStorageDataLoss(ClusterTestBase):
|
|||
object_versions.append(put_object)
|
||||
|
||||
node_to_check = self.cluster.storage_nodes[0]
|
||||
piloramas_list_before_removing = {}
|
||||
with reporter.step("Get list of all pilorama.db"):
|
||||
piloramas_list_before_removing = self.get_piloramas_list(node_to_check)
|
||||
|
||||
piloramas_list_before_removing = []
|
||||
with reporter.step("Get list of all pilorama.db on shards"):
|
||||
for shard in node_to_check.get_shards():
|
||||
piloramas_list_before_removing.append(shard.pilorama)
|
||||
|
||||
with reporter.step("Check that all pilorama.db files exist on node"):
|
||||
for pilorama in piloramas_list_before_removing:
|
||||
assert node_to_check.is_file_exist(pilorama), f"File {pilorama} does not exist"
|
||||
|
||||
with reporter.step("Stop all storage nodes"):
|
||||
cluster_state_controller.stop_services_of_type(StorageNode)
|
||||
|
||||
with reporter.step("Delete pilorama.db from one node"):
|
||||
node_to_check.delete_pilorama()
|
||||
for pilorama in piloramas_list_before_removing:
|
||||
node_to_check.delete_file(pilorama)
|
||||
|
||||
with reporter.step("Start all storage nodes"):
|
||||
cluster_state_controller.start_all_stopped_services()
|
||||
|
@ -680,10 +677,9 @@ class TestStorageDataLoss(ClusterTestBase):
|
|||
self.tick_epochs(1)
|
||||
sleep(120)
|
||||
|
||||
piloramas_list_afrer_removing = {}
|
||||
with reporter.step("Get list of all pilorama.db after sync"):
|
||||
piloramas_list_afrer_removing = self.get_piloramas_list(node_to_check)
|
||||
assert piloramas_list_afrer_removing == piloramas_list_before_removing, "List of pilorama.db is different"
|
||||
for pilorama in piloramas_list_before_removing:
|
||||
assert node_to_check.is_file_exist(pilorama), f"File {pilorama} does not exist"
|
||||
|
||||
with reporter.step("Check bucket versioning"):
|
||||
bucket_versioning = s3_client.get_bucket_versioning_status(bucket)
|
||||
|
|
|
@ -66,9 +66,10 @@ class TestS3GateMultipart(ClusterTestBase):
|
|||
upload_key = "multipart_abort"
|
||||
|
||||
with reporter.step(f"Get related container_id for bucket '{bucket}'"):
|
||||
container_id = search_container_by_name(
|
||||
default_wallet, bucket, self.shell, self.cluster.default_rpc_endpoint
|
||||
)
|
||||
for cluster_node in self.cluster.cluster_nodes:
|
||||
container_id = search_container_by_name(bucket, cluster_node)
|
||||
if container_id:
|
||||
break
|
||||
|
||||
with reporter.step("Create multipart upload"):
|
||||
upload_id = s3_client.create_multipart_upload(bucket, upload_key)
|
||||
|
|
|
@ -51,12 +51,10 @@ class TestS3GatePolicy(ClusterTestBase):
|
|||
assert bucket_loc_2 == "rep-3"
|
||||
|
||||
with reporter.step("Check object policy"):
|
||||
cid_1 = search_container_by_name(
|
||||
default_wallet,
|
||||
bucket_1,
|
||||
shell=self.shell,
|
||||
endpoint=self.cluster.default_rpc_endpoint,
|
||||
)
|
||||
for cluster_node in self.cluster.cluster_nodes:
|
||||
cid_1 = search_container_by_name(name=bucket_1, node=cluster_node)
|
||||
if cid_1:
|
||||
break
|
||||
copies_1 = get_simple_object_copies(
|
||||
wallet=default_wallet,
|
||||
cid=cid_1,
|
||||
|
@ -65,12 +63,10 @@ class TestS3GatePolicy(ClusterTestBase):
|
|||
nodes=self.cluster.storage_nodes,
|
||||
)
|
||||
assert copies_1 == 1
|
||||
cid_2 = search_container_by_name(
|
||||
default_wallet,
|
||||
bucket_2,
|
||||
shell=self.shell,
|
||||
endpoint=self.cluster.default_rpc_endpoint,
|
||||
)
|
||||
for cluster_node in self.cluster.cluster_nodes:
|
||||
cid_2 = search_container_by_name(name=bucket_1, node=cluster_node)
|
||||
if cid_2:
|
||||
break
|
||||
copies_2 = get_simple_object_copies(
|
||||
wallet=default_wallet,
|
||||
cid=cid_2,
|
||||
|
|
|
@ -1,125 +1,92 @@
|
|||
import json
|
||||
import pathlib
|
||||
import re
|
||||
from dataclasses import dataclass
|
||||
from io import StringIO
|
||||
|
||||
import allure
|
||||
import pytest
|
||||
import yaml
|
||||
from configobj import ConfigObj
|
||||
from frostfs_testlib import reporter
|
||||
from frostfs_testlib.cli import FrostfsCli
|
||||
from frostfs_testlib.resources.cli import CLI_DEFAULT_TIMEOUT
|
||||
from frostfs_testlib.resources.common import DEFAULT_WALLET_CONFIG
|
||||
from frostfs_testlib.storage.cluster import Cluster, StorageNode
|
||||
|
||||
SHARD_PREFIX = "FROSTFS_STORAGE_SHARD_"
|
||||
BLOBSTOR_PREFIX = "_BLOBSTOR_"
|
||||
|
||||
|
||||
@dataclass
|
||||
class Blobstor:
|
||||
path: str
|
||||
path_type: str
|
||||
|
||||
def __eq__(self, other) -> bool:
|
||||
if not isinstance(other, self.__class__):
|
||||
raise RuntimeError(f"Only two {self.__class__.__name__} instances can be compared")
|
||||
return self.path == other.path and self.path_type == other.path_type
|
||||
|
||||
def __hash__(self):
|
||||
return hash((self.path, self.path_type))
|
||||
|
||||
@staticmethod
|
||||
def from_config_object(section: ConfigObj, shard_id: str, blobstor_id: str):
|
||||
var_prefix = f"{SHARD_PREFIX}{shard_id}{BLOBSTOR_PREFIX}{blobstor_id}"
|
||||
return Blobstor(section.get(f"{var_prefix}_PATH"), section.get(f"{var_prefix}_TYPE"))
|
||||
|
||||
|
||||
@dataclass
|
||||
class Shard:
|
||||
blobstor: list[Blobstor]
|
||||
metabase: str
|
||||
writecache: str
|
||||
|
||||
def __eq__(self, other) -> bool:
|
||||
if not isinstance(other, self.__class__):
|
||||
raise RuntimeError(f"Only two {self.__class__.__name__} instances can be compared")
|
||||
return (
|
||||
set(self.blobstor) == set(other.blobstor)
|
||||
and self.metabase == other.metabase
|
||||
and self.writecache == other.writecache
|
||||
)
|
||||
|
||||
def __hash__(self):
|
||||
return hash((self.metabase, self.writecache))
|
||||
|
||||
@staticmethod
|
||||
def _get_blobstor_count_from_section(config_object: ConfigObj, shard_id: int):
|
||||
pattern = f"{SHARD_PREFIX}{shard_id}{BLOBSTOR_PREFIX}"
|
||||
blobstors = {key[: len(pattern) + 2] for key in config_object.keys() if pattern in key}
|
||||
return len(blobstors)
|
||||
|
||||
@staticmethod
|
||||
def from_config_object(config_object: ConfigObj, shard_id: int):
|
||||
var_prefix = f"{SHARD_PREFIX}{shard_id}"
|
||||
|
||||
blobstor_count = Shard._get_blobstor_count_from_section(config_object, shard_id)
|
||||
blobstors = [
|
||||
Blobstor.from_config_object(config_object, shard_id, blobstor_id) for blobstor_id in range(blobstor_count)
|
||||
]
|
||||
|
||||
write_cache_enabled = config_object.as_bool(f"{var_prefix}_WRITECACHE_ENABLED")
|
||||
|
||||
return Shard(
|
||||
blobstors,
|
||||
config_object.get(f"{var_prefix}_METABASE_PATH"),
|
||||
config_object.get(f"{var_prefix}_WRITECACHE_PATH") if write_cache_enabled else "",
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def from_object(shard):
|
||||
metabase = shard["metabase"]["path"] if "path" in shard["metabase"] else shard["metabase"]
|
||||
writecache = shard["writecache"]["path"] if "path" in shard["writecache"] else shard["writecache"]
|
||||
|
||||
return Shard(
|
||||
blobstor=[Blobstor(path=blobstor["path"], path_type=blobstor["type"]) for blobstor in shard["blobstor"]],
|
||||
metabase=metabase,
|
||||
writecache=writecache,
|
||||
)
|
||||
|
||||
|
||||
def shards_from_yaml(contents: str) -> list[Shard]:
|
||||
config = yaml.safe_load(contents)
|
||||
config["storage"]["shard"].pop("default")
|
||||
|
||||
return [Shard.from_object(shard) for shard in config["storage"]["shard"].values()]
|
||||
|
||||
|
||||
def shards_from_env(contents: str) -> list[Shard]:
|
||||
configObj = ConfigObj(StringIO(contents))
|
||||
|
||||
pattern = f"{SHARD_PREFIX}\d*"
|
||||
num_shards = len(set(re.findall(pattern, contents)))
|
||||
|
||||
return [Shard.from_config_object(configObj, shard_id) for shard_id in range(num_shards)]
|
||||
from frostfs_testlib.resources.wellknown_acl import EACL_PUBLIC_READ_WRITE
|
||||
from frostfs_testlib.steps.cli.container import create_container, delete_container
|
||||
from frostfs_testlib.steps.cli.object import delete_object, get_object, get_object_nodes, put_object
|
||||
from frostfs_testlib.storage.cluster import Cluster, ClusterNode, StorageNode
|
||||
from frostfs_testlib.storage.controllers import ClusterStateController, ShardsWatcher
|
||||
from frostfs_testlib.storage.controllers.state_managers.config_state_manager import ConfigStateManager
|
||||
from frostfs_testlib.storage.dataclasses.shard import Shard
|
||||
from frostfs_testlib.testing import parallel
|
||||
from frostfs_testlib.testing.cluster_test_base import ClusterTestBase
|
||||
from frostfs_testlib.utils.file_utils import generate_file
|
||||
|
||||
|
||||
@pytest.mark.shard
|
||||
class TestControlShard:
|
||||
class TestControlShard(ClusterTestBase):
|
||||
@staticmethod
|
||||
def get_object_path_and_name_file(oid: str, cid: str, node: ClusterNode) -> tuple[str, str]:
|
||||
oid_path = f"{oid[0]}/{oid[1]}/{oid[2]}/{oid[3]}"
|
||||
with reporter.step("Search object file"):
|
||||
node_shell = node.storage_node.host.get_shell()
|
||||
data_path = node.storage_node.get_data_directory()
|
||||
all_datas = node_shell.exec(f"ls -la {data_path} | awk '{{ print $9 }}'").stdout.strip()
|
||||
for data in all_datas.replace(".", "").strip().split("\n"):
|
||||
check_dir = node_shell.exec(f" [ -d {data_path}/{data}/{oid_path} ] && echo 1 || echo 0").stdout
|
||||
if "1" in check_dir:
|
||||
object_path = f"{data_path}/{data}/{oid_path}"
|
||||
object_name = f"{oid[4:]}.{cid}"
|
||||
break
|
||||
return object_path, object_name
|
||||
|
||||
def set_shard_rw_mode(self, node: ClusterNode):
|
||||
watcher = ShardsWatcher(node)
|
||||
shards = watcher.get_shards()
|
||||
for shard in shards:
|
||||
watcher.set_shard_mode(shard["shard_id"], mode="read-write")
|
||||
watcher.await_for_all_shards_status(status="read-write")
|
||||
|
||||
@pytest.fixture()
|
||||
@allure.title("Revert all shards mode")
|
||||
def revert_all_shards_mode(self) -> None:
|
||||
yield
|
||||
parallel(self.set_shard_rw_mode, self.cluster.cluster_nodes)
|
||||
|
||||
@pytest.fixture()
|
||||
def oid_cid_node(self, default_wallet: str) -> tuple[str, str, ClusterNode]:
|
||||
with reporter.step("Create container, and put object"):
|
||||
cid = create_container(
|
||||
wallet=default_wallet,
|
||||
shell=self.shell,
|
||||
endpoint=self.cluster.default_rpc_endpoint,
|
||||
rule="REP 1 CBF 1",
|
||||
basic_acl=EACL_PUBLIC_READ_WRITE,
|
||||
)
|
||||
file = generate_file(5242880)
|
||||
oid = put_object(
|
||||
wallet=default_wallet, path=file, cid=cid, shell=self.shell, endpoint=self.cluster.default_rpc_endpoint
|
||||
)
|
||||
with reporter.step("Search node with object"):
|
||||
nodes = get_object_nodes(cluster=self.cluster, cid=cid, oid=oid, alive_node=self.cluster.cluster_nodes[0])
|
||||
|
||||
yield oid, cid, nodes[0]
|
||||
|
||||
object_path, object_name = self.get_object_path_and_name_file(oid, cid, nodes[0])
|
||||
nodes[0].host.get_shell().exec(f"chmod +r {object_path}/{object_name}")
|
||||
delete_object(
|
||||
wallet=default_wallet, cid=cid, oid=oid, shell=self.shell, endpoint=self.cluster.default_rpc_endpoint
|
||||
)
|
||||
delete_container(wallet=default_wallet, cid=cid, shell=self.shell, endpoint=self.cluster.default_rpc_endpoint)
|
||||
|
||||
@staticmethod
|
||||
def get_shards_from_config(node: StorageNode) -> list[Shard]:
|
||||
config_file = node.get_shard_config_path()
|
||||
file_type = pathlib.Path(config_file).suffix
|
||||
contents = node.host.get_shell().exec(f"cat {config_file}").stdout
|
||||
|
||||
parser_method = {
|
||||
".env": shards_from_env,
|
||||
".yaml": shards_from_yaml,
|
||||
".yml": shards_from_yaml,
|
||||
".env": node.get_shards_from_env,
|
||||
".yaml": node.get_shards,
|
||||
".yml": node.get_shards,
|
||||
}
|
||||
|
||||
shards = parser_method[file_type](contents)
|
||||
shards = parser_method[file_type]()
|
||||
return shards
|
||||
|
||||
@staticmethod
|
||||
|
@ -140,9 +107,49 @@ class TestControlShard:
|
|||
)
|
||||
return [Shard.from_object(shard) for shard in json.loads(result.stdout.split(">", 1)[1])]
|
||||
|
||||
@pytest.fixture()
|
||||
def change_config_storage(self, cluster_state_controller: ClusterStateController):
|
||||
with reporter.step("Change threshold error shards"):
|
||||
cluster_state_controller.manager(ConfigStateManager).set_on_all_nodes(
|
||||
service_type=StorageNode, values={"storage:shard_ro_error_threshold": "5"}
|
||||
)
|
||||
yield
|
||||
with reporter.step("Restore threshold error shards"):
|
||||
cluster_state_controller.manager(ConfigStateManager).revert_all()
|
||||
|
||||
@allure.title("All shards are available")
|
||||
def test_control_shard(self, cluster: Cluster):
|
||||
for storage_node in cluster.storage_nodes:
|
||||
shards_from_config = self.get_shards_from_config(storage_node)
|
||||
shards_from_cli = self.get_shards_from_cli(storage_node)
|
||||
|
||||
assert set(shards_from_config) == set(shards_from_cli)
|
||||
|
||||
@pytest.mark.failover
|
||||
def test_shard_errors(
|
||||
self,
|
||||
default_wallet: str,
|
||||
oid_cid_node: tuple[str, str, ClusterNode],
|
||||
change_config_storage: None,
|
||||
revert_all_shards_mode: None,
|
||||
):
|
||||
oid, cid, node = oid_cid_node
|
||||
object_path, object_name = self.get_object_path_and_name_file(*oid_cid_node)
|
||||
with reporter.step("Block read file"):
|
||||
node.host.get_shell().exec(f"chmod a-r {object_path}/{object_name}")
|
||||
with reporter.step("Get object, expect 6 errors"):
|
||||
for _ in range(6):
|
||||
with pytest.raises(RuntimeError):
|
||||
get_object(
|
||||
wallet=default_wallet,
|
||||
cid=cid,
|
||||
oid=oid,
|
||||
shell=self.shell,
|
||||
endpoint=node.storage_node.get_rpc_endpoint(),
|
||||
)
|
||||
with reporter.step("Check shard status"):
|
||||
for shard in ShardsWatcher(node).get_shards():
|
||||
if shard["blobstor"][1]["path"] in object_path:
|
||||
with reporter.step(f"Shard - {shard['shard_id']} to {node.host_ip}, mode - {shard['mode']}"):
|
||||
assert shard["mode"] == "read-only"
|
||||
break
|
||||
|
|
|
@ -36,7 +36,7 @@ class TestLogs:
|
|||
logs_dir = os.path.join(temp_directory, "logs")
|
||||
os.makedirs(logs_dir)
|
||||
# Using \b here because 'oom' and 'panic' can sometimes be found in OID or CID
|
||||
issues_regex = r"\bpanic\b|\boom\b|too many|insufficient funds|insufficient amount of gas|wallet passwd|secret key|access key|cannot assign requested address"
|
||||
issues_regex = r"\bpanic\b|\boom\b|too many|insufficient funds|insufficient amount of gas|wallet passwd|secret \bkey\b|access key|cannot assign requested address"
|
||||
exclude_filter = r"too many requests"
|
||||
|
||||
time.sleep(2)
|
||||
|
|
Loading…
Reference in a new issue