Compare commits

...

10 commits

Author SHA1 Message Date
46e57870d0 Change logs pattern
Signed-off-by: anikeev-yadro <a.anikeev@yadro.com>
2024-01-31 10:31:38 +03:00
0d7befe9a6 [#186] Change call object nodes func
Signed-off-by: Dmitriy Zayakin <d.zayakin@yadro.com>
2024-01-22 14:29:49 +03:00
f49d68a6e7 rename local_config_path 2024-01-16 11:16:53 +00:00
241d3d0585 [#184] Small fix using function search container
Signed-off-by: Dmitriy Zayakin <d.zayakin@yadro.com>
2024-01-16 08:14:49 +03:00
3a380755b4 [#182] Updates for dev-env
Signed-off-by: Andrey Berezin <a.berezin@yadro.com>
2024-01-12 16:42:01 +00:00
5bc170e6f9 update policy with field price 2024-01-12 16:38:19 +00:00
522b8e576d [#180] Add test shard mode
Signed-off-by: Dmitriy Zayakin <d.zayakin@yadro.com>
2024-01-12 08:40:40 +00:00
5ddc31cca6 [#179] Change argument func
Signed-off-by: Dmitriy Zayakin <d.zayakin@yadro.com>
2024-01-10 13:43:52 +03:00
9e89dba03d Update pilorama loss and shards test cases 2023-12-22 17:10:30 +03:00
d327e8149b [OBJECT-5670] test_container_creation_deletion_parallel extended
test_container_creation_deletion_parallel extended

Signed-off-by: Mikhail Kadilov m.kadilov@yadro.com
2023-12-20 15:00:53 +03:00
11 changed files with 281 additions and 265 deletions

View file

@ -3,13 +3,17 @@ hosts:
attributes:
sudo_shell: false
plugin_name: docker
healthcheck_plugin_name: basic
attributes:
skip_readiness_check: True
force_transactions: True
services:
- name: s01
attributes:
container_name: s01
config_path: ../frostfs-dev-env/services/storage/.storage.env
wallet_path: ../frostfs-dev-env/services/storage/wallet01.json
local_config_path: ./TemporaryDir/empty-password.yml
local_wallet_config_path: ./TemporaryDir/empty-password.yml
local_wallet_path: ../frostfs-dev-env/services/storage/wallet01.json
wallet_password: ""
volume_name: storage_storage_s01
@ -23,7 +27,7 @@ hosts:
container_name: s02
config_path: ../frostfs-dev-env/services/storage/.storage.env
wallet_path: ../frostfs-dev-env/services/storage/wallet02.json
local_config_path: ./TemporaryDir/empty-password.yml
local_wallet_config_path: ./TemporaryDir/empty-password.yml
local_wallet_path: ../frostfs-dev-env/services/storage/wallet02.json
wallet_password: ""
volume_name: storage_storage_s02
@ -37,7 +41,7 @@ hosts:
container_name: s03
config_path: ../frostfs-dev-env/services/storage/.storage.env
wallet_path: ../frostfs-dev-env/services/storage/wallet03.json
local_config_path: ./TemporaryDir/empty-password.yml
local_wallet_config_path: ./TemporaryDir/empty-password.yml
local_wallet_path: ../frostfs-dev-env/services/storage/wallet03.json
wallet_password: ""
volume_name: storage_storage_s03
@ -51,7 +55,7 @@ hosts:
container_name: s04
config_path: ../frostfs-dev-env/services/storage/.storage.env
wallet_path: ../frostfs-dev-env/services/storage/wallet04.json
local_config_path: ./TemporaryDir/empty-password.yml
local_wallet_config_path: ./TemporaryDir/empty-password.yml
local_wallet_path: ../frostfs-dev-env/services/storage/wallet04.json
wallet_password: ""
volume_name: storage_storage_s04
@ -65,7 +69,7 @@ hosts:
container_name: s3_gate
config_path: ../frostfs-dev-env/services/s3_gate/.s3.env
wallet_path: ../frostfs-dev-env/services/s3_gate/wallet.json
local_config_path: ./TemporaryDir/password-s3.yml
local_wallet_config_path: ./TemporaryDir/password-s3.yml
local_wallet_path: ../frostfs-dev-env/services/s3_gate/wallet.json
wallet_password: "s3"
endpoint_data0: https://s3.frostfs.devenv:8080
@ -74,7 +78,7 @@ hosts:
container_name: http_gate
config_path: ../frostfs-dev-env/services/http_gate/.http.env
wallet_path: ../frostfs-dev-env/services/http_gate/wallet.json
local_config_path: ./TemporaryDir/password-other.yml
local_wallet_config_path: ./TemporaryDir/password-other.yml
local_wallet_path: ../frostfs-dev-env/services/http_gate/wallet.json
wallet_password: "one"
endpoint_data0: http://http.frostfs.devenv
@ -83,7 +87,7 @@ hosts:
container_name: ir01
config_path: ../frostfs-dev-env/services/ir/.ir.env
wallet_path: ../frostfs-dev-env/services/ir/az.json
local_config_path: ./TemporaryDir/password-other.yml
local_wallet_config_path: ./TemporaryDir/password-other.yml
local_wallet_path: ../frostfs-dev-env/services/ir/az.json
wallet_password: "one"
- name: morph-chain01
@ -91,7 +95,7 @@ hosts:
container_name: morph_chain
config_path: ../frostfs-dev-env/services/morph_chain/protocol.privnet.yml
wallet_path: ../frostfs-dev-env/services/morph_chain/node-wallet.json
local_config_path: ./TemporaryDir/password-other.yml
local_wallet_config_path: ./TemporaryDir/password-other.yml
local_wallet_path: ../frostfs-dev-env/services/morph_chain/node-wallet.json
wallet_password: "one"
endpoint_internal0: http://morph-chain.frostfs.devenv:30333
@ -100,7 +104,7 @@ hosts:
container_name: main_chain
config_path: ../frostfs-dev-env/services/chain/protocol.privnet.yml
wallet_path: ../frostfs-dev-env/services/chain/node-wallet.json
local_config_path: ./TemporaryDir/password-other.yml
local_wallet_config_path: ./TemporaryDir/password-other.yml
local_wallet_path: ../frostfs-dev-env/services/chain/node-wallet.json
wallet_password: "one"
endpoint_internal0: http://main-chain.frostfs.devenv:30333

View file

@ -41,6 +41,8 @@ from pytest_tests.resources.common import HOSTING_CONFIG_FILE, TEST_CYCLES_COUNT
logger = logging.getLogger("NeoLogger")
SERVICE_ACTIVE_TIME = 20
# Add logs check test even if it's not fit to mark selectors
def pytest_configure(config: pytest.Config):
markers = config.option.markexpr
@ -367,11 +369,14 @@ def after_deploy_healthcheck(cluster: Cluster):
parallel(readiness_on_node, cluster.cluster_nodes)
SERVICE_ACTIVE_TIME = 20
@wait_for_success(60 * SERVICE_ACTIVE_TIME * 3, 60, title="Wait for {cluster_node} readiness")
def readiness_on_node(cluster_node: ClusterNode):
if (
"skip_readiness_check" in cluster_node.host.config.attributes
and cluster_node.host.config.attributes["skip_readiness_check"]
):
return
# TODO: Move to healtcheck classes
svc_name = cluster_node.service(StorageNode).get_service_systemctl_name()
with reporter.step(f"Check service {svc_name} is active"):

View file

@ -86,34 +86,35 @@ class TestContainer(ClusterTestBase):
containers_count = 3
wallet = default_wallet
placement_rule = "REP 2 IN X CBF 1 SELECT 2 FROM * AS X"
iteration_count = 10
cids: list[str] = []
with reporter.step(f"Create {containers_count} containers"):
for _ in range(containers_count):
cids.append(
create_container(
for iteration in range(iteration_count):
cids: list[str] = []
with reporter.step(f"Create {containers_count} containers"):
for _ in range(containers_count):
cids.append(
create_container(
wallet,
rule=placement_rule,
await_mode=False,
shell=self.shell,
endpoint=self.cluster.default_rpc_endpoint,
wait_for_creation=False,
)
)
with reporter.step("Wait for containers occur in container list"):
for cid in cids:
wait_for_container_creation(
wallet,
rule=placement_rule,
await_mode=False,
cid,
sleep_interval=containers_count,
shell=self.shell,
endpoint=self.cluster.default_rpc_endpoint,
wait_for_creation=False,
)
)
with reporter.step("Wait for containers occur in container list"):
for cid in cids:
wait_for_container_creation(
wallet,
cid,
sleep_interval=containers_count,
shell=self.shell,
endpoint=self.cluster.default_rpc_endpoint,
)
with reporter.step("Delete containers and check they were deleted"):
for cid in cids:
delete_container(wallet, cid, shell=self.shell, endpoint=self.cluster.default_rpc_endpoint)
self.tick_epoch()
for cid in cids:
wait_for_container_deletion(wallet, cid, shell=self.shell, endpoint=self.cluster.default_rpc_endpoint)
with reporter.step("Delete containers and check they were deleted"):
for cid in cids:
delete_container(wallet, cid, shell=self.shell, endpoint=self.cluster.default_rpc_endpoint, await_mode=True)
containers_list = list_containers(wallet, shell=self.shell, endpoint=self.cluster.default_rpc_endpoint)
assert cid not in containers_list, "Container not deleted"

View file

@ -1,15 +1,18 @@
import pytest
import allure
import pytest
from frostfs_testlib import reporter
from frostfs_testlib.resources.wellknown_acl import PUBLIC_ACL
from frostfs_testlib.steps.cli.container import create_container, delete_container, get_container
from frostfs_testlib.steps.cli.object import delete_object, put_object_to_random_node
from frostfs_testlib.steps.node_management import get_netmap_snapshot
from frostfs_testlib.steps.storage_policy import get_nodes_with_object, get_nodes_without_object
from frostfs_testlib.steps.storage_policy import get_nodes_with_object
from frostfs_testlib.storage.controllers.cluster_state_controller import ClusterStateController
from frostfs_testlib.storage.controllers.state_managers.config_state_manager import ConfigStateManager
from frostfs_testlib.storage.dataclasses.frostfs_services import StorageNode
from frostfs_testlib.storage.dataclasses.object_size import ObjectSize
from frostfs_testlib.storage.dataclasses.storage_object_info import NodeNetmapInfo
from frostfs_testlib.testing.cluster_test_base import ClusterTestBase
from frostfs_testlib.testing.test_control import wait_for_success
from frostfs_testlib.utils.cli_utils import parse_netmap_output
from frostfs_testlib.utils.file_utils import generate_file
@ -25,6 +28,31 @@ from pytest_tests.resources.policy_error_patterns import (
@pytest.mark.container
@pytest.mark.policy
class TestPolicy(ClusterTestBase):
@wait_for_success(1200, 60, title="Wait for full field price on node", expected_result=True)
def await_for_price_attribute_on_nodes(self):
netmap = parse_netmap_output(get_netmap_snapshot(node=self.cluster.storage_nodes[0], shell=self.shell))
netmap = self.get_netmap_param(netmap)
for node in self.cluster.storage_nodes:
node_address = node.get_rpc_endpoint().split(":")[0]
if netmap[node_address]["Price"] is None:
return False
return True
@pytest.fixture(scope="module")
def fill_field_price(self, cluster_state_controller: ClusterStateController):
prices = ["15", "10", "65", "55"]
config_manager = cluster_state_controller.manager(ConfigStateManager)
for i in zip(self.cluster.cluster_nodes, prices):
config_manager.set_on_node(i[0], StorageNode, {"node:attribute_5": f"Price:{i[1]}"})
self.tick_epoch()
self.await_for_price_attribute_on_nodes()
yield
cluster_state_controller.manager(ConfigStateManager).revert_all()
@allure.title("[NEGATIVE] Placement policy: Can't parse placement policy")
def test_placement_policy_negative(self, default_wallet):
"""
@ -32,6 +60,7 @@ class TestPolicy(ClusterTestBase):
"""
placement_rule = "REP 1 IN SPB REP 1 in MSK CBF 1 SELECT 1 FROM SPBRU AS SPB SELECT 1 FROM MSKRU AS MSK FILTER SubDivCode EQ SPE AS SPBRU FILTER NOT (Country EQ Sweden OR CountryCode EQ FI) AND Location EQ Moskva AS MSKRU"
endpoint = self.cluster.default_rpc_endpoint
with reporter.step(f"Create container with policy {placement_rule}"):
with pytest.raises(Exception, match=NOT_PARSE_POLICY):
cid = create_container(
@ -93,7 +122,6 @@ class TestPolicy(ClusterTestBase):
endpoint=endpoint,
)
@pytest.mark.sanity
@allure.title("Simple policy results with one node")
def test_simple_policy_results_with_one_node(
self,
@ -168,7 +196,6 @@ class TestPolicy(ClusterTestBase):
with reporter.step(f"Delete the container"):
delete_container(wallet=default_wallet, cid=cid, shell=self.shell, endpoint=endpoint)
@pytest.mark.sanity
@allure.title("Policy with SELECT and FILTER results with one node")
def test_policy_with_select_and_filter_results_with_one_node(
self,
@ -252,9 +279,7 @@ class TestPolicy(ClusterTestBase):
netmap = parse_netmap_output(get_netmap_snapshot(node=resulting_copies[0], shell=self.shell))
netmap = self.get_netmap_param(netmap)
node_address = resulting_copies[0].get_rpc_endpoint().split(":")[0]
with reporter.step(
f"Check the node is selected from {placement_params['country'][0]}"
):
with reporter.step(f"Check the node is selected from {placement_params['country'][0]}"):
assert (
not (placement_params["country"][1] == netmap[node_address]["country"])
or not (placement_params["country"][1] == netmap[node_address]["country"])
@ -268,7 +293,6 @@ class TestPolicy(ClusterTestBase):
with reporter.step(f"Delete the container"):
delete_container(wallet=default_wallet, cid=cid, shell=self.shell, endpoint=endpoint)
@pytest.mark.sanity
@allure.title("Policy with Multi SELECTs and FILTERs results with one node")
def test_policy_with_multi_selects_and_filters_results_with_one_node(
self,
@ -319,7 +343,6 @@ class TestPolicy(ClusterTestBase):
with reporter.step(f"Delete the container"):
delete_container(wallet=default_wallet, cid=cid, shell=self.shell, endpoint=endpoint)
@pytest.mark.sanity
@allure.title("Policy with SELECT and FILTER results with UNIQUE nodes")
def test_policy_with_select_and_filter_results_with_unique_nodes(
self,
@ -334,7 +357,7 @@ class TestPolicy(ClusterTestBase):
file_path = generate_file(simple_object_size.value)
expected_copies = 2
endpoint = self.cluster.default_rpc_endpoint
with reporter.step(f"Create container with policy {placement_rule}"):
cid = create_container(
wallet=default_wallet, rule=placement_rule, basic_acl=PUBLIC_ACL, shell=self.shell, endpoint=endpoint
@ -357,7 +380,7 @@ class TestPolicy(ClusterTestBase):
netmap = self.get_netmap_param(netmap)
with reporter.step(f"Check two nodes are selected from {placement_params['country']}"):
for node in resulting_copies:
node_address = node.get_rpc_endpoint().split(":")[0]
node_address = node.get_rpc_endpoint().split(":")[0]
assert (
placement_params["country"] == netmap[node_address]["country"]
), f"The node is selected from the wrong country. Got {netmap[node_address]['country']}"
@ -442,18 +465,18 @@ class TestPolicy(ClusterTestBase):
with reporter.step(f"Delete the container"):
delete_container(wallet=default_wallet, cid=cid, shell=self.shell, endpoint=endpoint)
@pytest.mark.skip(reason="price")
@allure.title("Policy with SELECT and FILTER results with 25% of available nodes")
def test_policy_with_select_and_filter_results_with_25_of_available_nodes(
self,
default_wallet,
simple_object_size: ObjectSize,
fill_field_price,
):
"""
This test checks object's copies based on container's placement policy with SELECT and FILTER results with 25% of available nodes.
"""
placement_rule = "REP 1 IN Nodes25 SELECT 1 FROM LE10 AS Nodes25 FILTER Price LE 10 AS LE10"
placement_params = {"price": 10}
placement_params = {"Price": 10}
file_path = generate_file(simple_object_size.value)
expected_copies = 1
endpoint = self.cluster.default_rpc_endpoint
@ -479,9 +502,9 @@ class TestPolicy(ClusterTestBase):
netmap = parse_netmap_output(get_netmap_snapshot(node=resulting_copies[0], shell=self.shell))
netmap = self.get_netmap_param(netmap)
node_address = resulting_copies[0].get_rpc_endpoint().split(":")[0]
with reporter.step(f"Check the node is selected with price <= {placement_params['price']}"):
with reporter.step(f"Check the node is selected with price <= {placement_params['Price']}"):
assert (
int(netmap[node_address]["price"]) <= placement_params["price"]
int(netmap[node_address]["Price"]) <= placement_params["Price"]
), f"The node is selected with the wrong price. Got {netmap[node_address]}"
with reporter.step(f"Delete the object from the container"):
@ -490,18 +513,18 @@ class TestPolicy(ClusterTestBase):
with reporter.step(f"Delete the container"):
delete_container(wallet=default_wallet, cid=cid, shell=self.shell, endpoint=endpoint)
@pytest.mark.skip(reason="price")
@allure.title("Policy with select and complex filter results with 25% of available nodes")
def test_policy_with_select_and_complex_filter_results_with_25_of_available_nodes(
self,
default_wallet,
simple_object_size: ObjectSize,
fill_field_price,
):
"""
This test checks object's copies based on container's placement policy with SELECT and Complex FILTER results with 25% of available nodes.
"""
placement_rule = "REP 1 IN Nodes25 SELECT 1 FROM BET0AND10 AS Nodes25 FILTER Price LE 10 AS LE10 FILTER Price GT 0 AS GT0 FILTER @LE10 AND @GT0 AS BET0AND10"
placement_params = {"price": [10, 0]}
placement_params = {"Price": [10, 0]}
file_path = generate_file(simple_object_size.value)
expected_copies = 1
endpoint = self.cluster.default_rpc_endpoint
@ -528,10 +551,10 @@ class TestPolicy(ClusterTestBase):
netmap = self.get_netmap_param(netmap)
with reporter.step(f"Check the node is selected with price between 1 and 10"):
for node in resulting_copies:
node_address = node.get_rpc_endpoint().split(":")[0]
node_address = node.get_rpc_endpoint().split(":")[0]
assert (
int(netmap[node_address]["price"]) > placement_params["price"][1]
and int(netmap[node_address]["price"]) <= placement_params["price"][0]
int(netmap[node_address]["Price"]) > placement_params["Price"][1]
and int(netmap[node_address]["Price"]) <= placement_params["Price"][0]
), f"The node is selected with the wrong price. Got {netmap[node_address]}"
with reporter.step(f"Delete the object from the container"):
@ -540,18 +563,18 @@ class TestPolicy(ClusterTestBase):
with reporter.step(f"Delete the container"):
delete_container(wallet=default_wallet, cid=cid, shell=self.shell, endpoint=endpoint)
@pytest.mark.skip(reason="price")
@allure.title("Policy with Multi SELECTs and FILTERs results with 25% of available nodes")
def test_policy_with_multi_selects_and_filters_results_with_25_of_available_nodes(
self,
default_wallet,
simple_object_size: ObjectSize,
fill_field_price,
):
"""
This test checks object's copies based on container's placement policy with Multi SELECTs and FILTERs results with 25% of available nodes.
"""
placement_rule = "UNIQUE REP 1 IN One REP 1 IN One CBF 1 SELECT 1 FROM MINMAX AS One FILTER Price LT 15 AS LT15 FILTER Price GT 55 AS GT55 FILTER @LT15 OR @GT55 AS MINMAX"
placement_params = {"price": [15, 55]}
placement_params = {"Price": [15, 55]}
file_path = generate_file(simple_object_size.value)
expected_copies = 2
endpoint = self.cluster.default_rpc_endpoint
@ -580,8 +603,8 @@ class TestPolicy(ClusterTestBase):
for node in resulting_copies:
node_address = node.get_rpc_endpoint().split(":")[0]
assert (
int(netmap[node_address]["price"]) > placement_params["price"][1]
or int(netmap[node_address]["price"]) < placement_params["price"][0]
int(netmap[node_address]["Price"]) > placement_params["Price"][1]
or int(netmap[node_address]["Price"]) < placement_params["Price"][0]
), f"The node is selected with the wrong price. Got {netmap[node_address]}"
with reporter.step(f"Delete the object from the container"):
@ -664,18 +687,18 @@ class TestPolicy(ClusterTestBase):
with reporter.step(f"Delete the container"):
delete_container(wallet=default_wallet, cid=cid, shell=self.shell, endpoint=endpoint)
@pytest.mark.skip(reason="price")
@allure.title("Policy with SELECT and FILTER results with 50% of available nodes")
def test_policy_with_select_and_filter_results_with_50_of_available_nodes(
self,
default_wallet,
simple_object_size: ObjectSize,
fill_field_price,
):
"""
This test checks object's copies based on container's placement policy with SELECT and FILTER results with 50% of available nodes.
"""
placement_rule = "REP 2 IN HALF CBF 1 SELECT 2 FROM GT15 AS HALF FILTER Price GT 15 AS GT15"
placement_params = {"price": 15}
placement_params = {"Price": 15}
file_path = generate_file(simple_object_size.value)
expected_copies = 2
endpoint = self.cluster.default_rpc_endpoint
@ -700,11 +723,11 @@ class TestPolicy(ClusterTestBase):
with reporter.step(f"Check the object appearance"):
netmap = parse_netmap_output(get_netmap_snapshot(node=resulting_copies[0], shell=self.shell))
netmap = self.get_netmap_param(netmap)
with reporter.step(f"Check two nodes are selected with price > {placement_params['price']}"):
with reporter.step(f"Check two nodes are selected with price > {placement_params['Price']}"):
for node in resulting_copies:
node_address = node.get_rpc_endpoint().split(":")[0]
assert (
int(netmap[node_address]["price"]) > placement_params["price"]
int(netmap[node_address]["Price"]) > placement_params["Price"]
), f"The node is selected with the wrong price. Got {netmap[node_address]}"
with reporter.step(f"Delete the object from the container"):
@ -713,18 +736,18 @@ class TestPolicy(ClusterTestBase):
with reporter.step(f"Delete the container"):
delete_container(wallet=default_wallet, cid=cid, shell=self.shell, endpoint=endpoint)
@pytest.mark.skip(reason="price")
@allure.title("Policy with SELECT and Complex FILTER results with 50% of available nodes")
def test_policy_with_select_and_complex_filter_results_with_50_of_available_nodes(
self,
default_wallet,
simple_object_size: ObjectSize,
fill_field_price,
):
"""
This test checks object's copies based on container's placement policy with SELECT and Complex FILTER results with 50% of available nodes.
"""
placement_rule = "REP 2 IN HALF CBF 2 SELECT 2 FROM GE15 AS HALF FILTER CountryCode NE RU AS NOTRU FILTER @NOTRU AND Price GE 15 AS GE15"
placement_params = {"price": 15, "country_code": "RU"}
placement_params = {"Price": 15, "country_code": "RU"}
file_path = generate_file(simple_object_size.value)
expected_copies = 2
endpoint = self.cluster.default_rpc_endpoint
@ -749,12 +772,15 @@ class TestPolicy(ClusterTestBase):
with reporter.step(f"Check the object appearance"):
netmap = parse_netmap_output(get_netmap_snapshot(node=resulting_copies[0], shell=self.shell))
netmap = self.get_netmap_param(netmap)
with reporter.step(f"Check two nodes are selected not with country code '{placement_params['country_code']}'"):
with reporter.step(
f"Check two nodes are selected not with country code '{placement_params['country_code']}'"
):
for node in resulting_copies:
node_address = node.get_rpc_endpoint().split(":")[0]
assert (not netmap[node_address]["country_code"] == placement_params["country_code"]
assert (
not netmap[node_address]["country_code"] == placement_params["country_code"]
or not netmap[node_address]["country_code"] == placement_params["country_code"]
and int(netmap[node_address]["price"]) >= placement_params["price"]
and int(netmap[node_address]["Price"]) >= placement_params["Price"]
), f"The node is selected with the wrong price or country code. Got {netmap[node_address]}"
with reporter.step(f"Delete the object from the container"):
@ -763,20 +789,20 @@ class TestPolicy(ClusterTestBase):
with reporter.step(f"Delete the container"):
delete_container(wallet=default_wallet, cid=cid, shell=self.shell, endpoint=endpoint)
@pytest.mark.skip(reason="price")
@allure.title("Policy with Multi SELECTs and FILTERs results with 50% of available nodes")
def test_policy_with_multi_selects_and_filters_results_with_50_of_available_nodes(
self,
default_wallet,
simple_object_size: ObjectSize,
fill_field_price,
):
"""
This test checks object's copies based on container's placement policy with Multi SELECTs and FILTERs results with 50% of available nodes.
"""
placement_rule = "REP 2 IN FH REP 1 IN SH CBF 2 SELECT 2 FROM LE55 AS FH SELECT 2 FROM GE15 AS SH FILTER 'UN-LOCODE' EQ RU_LED OR 'UN-LOCODE' EQ RU_MOW AS RU FILTER NOT(@RU) AS NOTRU FILTER @NOTRU AND Price GE 15 AS GE15 FILTER @RU AND Price LE 55 AS LE55"
placement_params = {"un_locode": ["RU_LED", "RU_MOW"], "price": [15, 55]}
placement_rule = "REP 2 IN FH REP 1 IN SH CBF 2 SELECT 2 FROM LE55 AS FH SELECT 2 FROM GE15 AS SH FILTER 'UN-LOCODE' EQ 'RU LED' OR 'UN-LOCODE' EQ 'RU MOW' AS RU FILTER NOT (@RU) AS NOTRU FILTER @NOTRU AND Price GE 15 AS GE15 FILTER @RU AND Price LE 55 AS LE55"
placement_params = {"un_locode": ["RU LED", "RU MOW"], "Price": [15, 55]}
file_path = generate_file(simple_object_size.value)
expected_copies = 4
expected_copies = 3
endpoint = self.cluster.default_rpc_endpoint
with reporter.step(f"Create container with policy {placement_rule}"):
@ -807,11 +833,11 @@ class TestPolicy(ClusterTestBase):
or not netmap[node_address]["un_locode"] == placement_params["un_locode"][1]
or (
not netmap[node_address]["un_locode"] == placement_params["un_locode"][1]
and int(netmap[node_address]["price"]) >= placement_params["price"][0]
and int(netmap[node_address]["Price"]) >= placement_params["Price"][0]
)
or (
netmap[node_address]["un_locode"] == placement_params["un_locode"][1]
and int(netmap[node_address]["price"]) <= placement_params["price"][1]
and int(netmap[node_address]["Price"]) <= placement_params["Price"][1]
)
), f"The node is selected with the wrong price or un_locode. Expected {placement_params} and got {netmap[node_address]}"
@ -821,7 +847,6 @@ class TestPolicy(ClusterTestBase):
with reporter.step(f"Delete the container"):
delete_container(wallet=default_wallet, cid=cid, shell=self.shell, endpoint=endpoint)
@pytest.mark.sanity
@allure.title("Simple policy results with 75% of available nodes")
def test_simple_policy_results_with_75_of_available_nodes(
self,
@ -896,18 +921,18 @@ class TestPolicy(ClusterTestBase):
with reporter.step(f"Delete the container"):
delete_container(wallet=default_wallet, cid=cid, shell=self.shell, endpoint=endpoint)
@pytest.mark.skip(reason="price")
@allure.title("Policy with SELECT and FILTER results with 75% of available nodes")
def test_policy_with_select_and_filter_results_with_75_of_available_nodes(
self,
default_wallet,
simple_object_size: ObjectSize,
fill_field_price,
):
"""
This test checks object's copies based on container's placement policy with SELECT and FILTER results with 75% of available nodes.
"""
placement_rule = "REP 2 IN NODES75 SELECT 2 FROM LT65 AS NODES75 FILTER Price LT 65 AS LT65"
placement_params = {"price": 65}
placement_params = {"Price": 65}
file_path = generate_file(simple_object_size.value)
expected_copies = 2
endpoint = self.cluster.default_rpc_endpoint
@ -932,11 +957,11 @@ class TestPolicy(ClusterTestBase):
with reporter.step(f"Check the object appearance"):
netmap = parse_netmap_output(get_netmap_snapshot(node=resulting_copies[0], shell=self.shell))
netmap = self.get_netmap_param(netmap)
with reporter.step(f"Check two nodes are selected with price < {placement_params['price']}"):
with reporter.step(f"Check two nodes are selected with price < {placement_params['Price']}"):
for node in resulting_copies:
node_address = node.get_rpc_endpoint().split(":")[0]
assert (
int(netmap[node_address]["price"]) < placement_params["price"]
int(netmap[node_address]["Price"]) < placement_params["Price"]
), f"The node is selected with the wrong price. Got {netmap[node_address]}"
with reporter.step(f"Delete the object from the container"):
@ -945,20 +970,20 @@ class TestPolicy(ClusterTestBase):
with reporter.step(f"Delete the container"):
delete_container(wallet=default_wallet, cid=cid, shell=self.shell, endpoint=endpoint)
@pytest.mark.skip(reason="price")
@allure.title("Policy with SELECT and Complex FILTER results with 75% of available nodes")
def test_policy_with_select_and_complex_filter_results_with_75_of_available_nodes(
self,
default_wallet,
simple_object_size: ObjectSize,
fill_field_price: None,
):
"""
This test checks object's copies based on container's placement policy with SELECT and Complex FILTER results with 75% of available nodes.
"""
placement_rule = "REP 2 IN NODES75 SELECT 2 FROM LT65 AS NODES75 FILTER Continent NE America AS NOAM FILTER @NOAM AND Price LT 65 AS LT65"
placement_params = {"price": 65, "continent": "America"}
placement_params = {"Price": 65, "continent": "America"}
file_path = generate_file(simple_object_size.value)
expected_copies = 3
expected_copies = 2
endpoint = self.cluster.default_rpc_endpoint
with reporter.step(f"Create container with policy {placement_rule}"):
@ -981,12 +1006,11 @@ class TestPolicy(ClusterTestBase):
with reporter.step(f"Check the object appearance"):
netmap = parse_netmap_output(get_netmap_snapshot(node=resulting_copies[0], shell=self.shell))
netmap = self.get_netmap_param(netmap)
with reporter.step(f"Check three nodes are selected not from {placement_params['continent']}"
):
with reporter.step(f"Check three nodes are selected not from {placement_params['continent']}"):
for node in resulting_copies:
node_address = node.get_rpc_endpoint().split(":")[0]
assert (
int(netmap[node_address]["price"]) < placement_params["price"]
int(netmap[node_address]["Price"]) < placement_params["Price"]
and not netmap[node_address]["continent"] == placement_params["continent"]
) or (
not netmap[node_address]["continent"] == placement_params["continent"]
@ -998,18 +1022,18 @@ class TestPolicy(ClusterTestBase):
with reporter.step(f"Delete the container"):
delete_container(wallet=default_wallet, cid=cid, shell=self.shell, endpoint=endpoint)
@pytest.mark.skip(reason="price")
@allure.title("Policy with Multi SELECTs and FILTERs results with 75% of available nodes")
def test_policy_with_multi_selects_and_filters_results_with_75_of_available_nodes(
self,
default_wallet,
simple_object_size: ObjectSize,
fill_field_price,
):
"""
This test checks object's copies based on container's placement policy with Multi SELECTs and FILTERs results with 75% of available nodes.
"""
placement_rule = "REP 2 IN EXPNSV REP 2 IN CHEAP SELECT 3 FROM GT10 AS EXPNSV SELECT 3 FROM LT65 AS CHEAP FILTER NOT(Continent EQ America) AS NOAM FILTER @NOAM AND Price LT 65 AS LT65 FILTER @NOAM AND Price GT 10 AS GT10"
placement_params = {"price": [65, 10], "continent": "America"}
placement_rule = "REP 3 IN EXPNSV REP 3 IN CHEAP SELECT 3 FROM GT10 AS EXPNSV SELECT 3 FROM LT65 AS CHEAP FILTER NOT (Continent EQ America) AS NOAM FILTER @NOAM AND Price LT 65 AS LT65 FILTER @NOAM AND Price GT 10 AS GT10"
placement_params = {"Price": [65, 10], "continent": "America"}
file_path = generate_file(simple_object_size.value)
expected_copies = 4
endpoint = self.cluster.default_rpc_endpoint
@ -1039,11 +1063,11 @@ class TestPolicy(ClusterTestBase):
node_address = node.get_rpc_endpoint().split(":")[0]
assert (
(
int(netmap[node_address]["price"]) > placement_params["price"][1]
int(netmap[node_address]["Price"]) > placement_params["Price"][1]
and not netmap[node_address]["continent"] == placement_params["continent"]
)
or (
int(netmap[node_address]["price"]) < placement_params["price"][0]
int(netmap[node_address]["Price"]) < placement_params["Price"][0]
and not netmap[node_address]["continent"] == placement_params["continent"]
)
or not (netmap[node_address]["continent"] == placement_params["continent"])
@ -1055,7 +1079,6 @@ class TestPolicy(ClusterTestBase):
with reporter.step(f"Delete the container"):
delete_container(wallet=default_wallet, cid=cid, shell=self.shell, endpoint=endpoint)
@pytest.mark.sanity
@allure.title("Simple policy results with 100% of available nodes")
def test_simple_policy_results_with_100_of_available_nodes(
self,
@ -1130,18 +1153,18 @@ class TestPolicy(ClusterTestBase):
with reporter.step(f"Delete the container"):
delete_container(wallet=default_wallet, cid=cid, shell=self.shell, endpoint=endpoint)
@pytest.mark.skip(reason="price")
@allure.title("Policy with SELECT and FILTER results with 100% of available nodes")
def test_policy_with_select_and_filter_results_with_100_of_available_nodes(
self,
default_wallet,
simple_object_size: ObjectSize,
fill_field_price,
):
"""
This test checks object's copies based on container's placement policy with SELECT and FILTER results with 100% of available nodes.
"""
placement_rule = "REP 1 IN All SELECT 4 FROM AllNodes AS All FILTER Price GE 0 AS AllNodes"
placement_params = {"price": 0}
placement_params = {"Price": 0}
file_path = generate_file(simple_object_size.value)
expected_copies = 1
endpoint = self.cluster.default_rpc_endpoint
@ -1167,9 +1190,9 @@ class TestPolicy(ClusterTestBase):
netmap = parse_netmap_output(get_netmap_snapshot(node=resulting_copies[0], shell=self.shell))
netmap = self.get_netmap_param(netmap)
node_address = resulting_copies[0].get_rpc_endpoint().split(":")[0]
with reporter.step(f"Check the node is selected with price >= {placement_params['price']}"):
with reporter.step(f"Check the node is selected with price >= {placement_params['Price']}"):
assert (
netmap[node_address]["price"] >= int(placement_params["price"])
int(netmap[node_address]["Price"]) >= placement_params["Price"]
), f"The node is selected with the wrong price. Got {netmap[node_address]}"
with reporter.step(f"Delete the object from the container"):
@ -1233,12 +1256,13 @@ class TestPolicy(ClusterTestBase):
self,
default_wallet,
simple_object_size: ObjectSize,
fill_field_price,
):
"""
This test checks object's copies based on container's placement policy with Multi SELECTs and FILTERs results with 100% of available nodes.
"""
placement_rule = "REP 4 IN AllOne REP 4 IN AllTwo CBF 4 SELECT 2 FROM GEZero AS AllOne SELECT 2 FROM AllCountries AS AllTwo FILTER Country EQ Russia OR Country EQ Sweden OR Country EQ Finland AS AllCountries FILTER Price GE 0 AS GEZero"
placement_params = {"country": ["Russia", "Sweden", "Finland"], "price": 0}
placement_params = {"country": ["Russia", "Sweden", "Finland"], "Price": 0}
file_path = generate_file(simple_object_size.value)
expected_copies = 4
endpoint = self.cluster.default_rpc_endpoint
@ -1265,9 +1289,9 @@ class TestPolicy(ClusterTestBase):
netmap = self.get_netmap_param(netmap)
with reporter.step(f"Check all node are selected"):
for node in resulting_copies:
node_address = node.get_rpc_endpoint().split(":")[0]
node_address = node.get_rpc_endpoint().split(":")[0]
assert (netmap[node_address]["country"] in placement_params["country"]) or (
int(netmap[node_address]["price"]) >= placement_params["price"]
int(netmap[node_address]["Price"]) >= placement_params["Price"]
), f"The node is selected from the wrong country or with wrong price. Got {netmap[node_address]}"
with reporter.step(f"Delete the object from the container"):
@ -1276,7 +1300,6 @@ class TestPolicy(ClusterTestBase):
with reporter.step(f"Delete the container"):
delete_container(wallet=default_wallet, cid=cid, shell=self.shell, endpoint=endpoint)
@pytest.mark.sanity
@allure.title("Simple policy results with UNIQUE nodes")
def test_simple_policy_results_with_unique_nodes(
self,
@ -1351,7 +1374,6 @@ class TestPolicy(ClusterTestBase):
with reporter.step(f"Delete the container"):
delete_container(wallet=default_wallet, cid=cid, shell=self.shell, endpoint=endpoint)
@pytest.mark.sanity
@allure.title("Policy with SELECT and Complex FILTER results with UNIQUE nodes")
def test_policy_with_select_and_complex_filter_results_with_unique_nodes(
self,
@ -1402,7 +1424,6 @@ class TestPolicy(ClusterTestBase):
with reporter.step(f"Delete the container"):
delete_container(wallet=default_wallet, cid=cid, shell=self.shell, endpoint=endpoint)
@pytest.mark.sanity
@allure.title("Policy with Multi SELECTs and FILTERs results with UNIQUE nodes")
def test_policy_with_multi_selects_and_filters_results_with_unique_nodes(
self,
@ -1438,11 +1459,9 @@ class TestPolicy(ClusterTestBase):
with reporter.step(f"Check the object appearance"):
netmap = parse_netmap_output(get_netmap_snapshot(node=resulting_copies[0], shell=self.shell))
netmap = self.get_netmap_param(netmap)
with reporter.step(
f"Check three nodes are selected from any country"
):
with reporter.step(f"Check three nodes are selected from any country"):
for node in resulting_copies:
node_address = node.get_rpc_endpoint().split(":")[0]
node_address = node.get_rpc_endpoint().split(":")[0]
assert (placement_params["country"][0] == netmap[node_address]["country"]) or (
not (placement_params["country"][0] == netmap[node_address]["country"])
and (placement_params["country"][1] == netmap[node_address]["country"])
@ -1523,7 +1542,7 @@ class TestPolicy(ClusterTestBase):
dict_external[external_adress] = {
"country": node.country,
"country_code": node.country_code,
"price": node.price,
"Price": node.price,
"continent": node.continent,
"un_locode": node.un_locode,
"location": node.location,

View file

@ -7,7 +7,7 @@ import allure
import pytest
from frostfs_testlib import reporter
from frostfs_testlib.healthcheck.interfaces import Healthcheck
from frostfs_testlib.resources.wellknown_acl import PUBLIC_ACL
from frostfs_testlib.resources.wellknown_acl import EACL_PUBLIC_READ_WRITE, PUBLIC_ACL
from frostfs_testlib.steps.cli.container import create_container
from frostfs_testlib.steps.cli.object import (
get_object,
@ -88,6 +88,7 @@ class TestFailoverNetwork(ClusterTestBase):
endpoint=self.cluster.default_rpc_endpoint,
rule=placement_rule,
await_mode=True,
basic_acl=EACL_PUBLIC_READ_WRITE,
)
storage_objects = []
@ -210,11 +211,9 @@ class TestFailoverNetwork(ClusterTestBase):
with reporter.step("Search nodes with object"):
nodes_with_object = get_object_nodes(
cluster=self.cluster,
wallet=default_wallet,
cid=storage_object.cid,
oid=storage_object.oid,
shell=self.shell,
endpoint=self.cluster.default_rpc_endpoint,
alive_node=self.cluster.cluster_nodes[0],
)
with reporter.step("Get data interface to node"):
@ -270,11 +269,9 @@ class TestFailoverNetwork(ClusterTestBase):
with reporter.step("Search nodes with object"):
nodes_with_object = get_object_nodes(
cluster=self.cluster,
wallet=default_wallet,
cid=storage_object.cid,
oid=storage_object.oid,
shell=self.shell,
endpoint=self.cluster.default_rpc_endpoint,
alive_node=self.cluster.cluster_nodes[0],
)
with reporter.step("Get internal interface to node"):

View file

@ -156,12 +156,7 @@ class TestFailoverServer(ClusterTestBase):
) -> tuple[StorageObjectInfo, list[ClusterNode]]:
object_info = container.generate_object(simple_object_size.value)
object_nodes = get_object_nodes(
cluster=self.cluster,
wallet=default_wallet,
cid=object_info.cid,
oid=object_info.oid,
shell=self.shell,
endpoint=self.cluster.default_rpc_endpoint,
cluster=self.cluster, cid=object_info.cid, oid=object_info.oid, alive_node=self.cluster.cluster_nodes[0]
)
return object_info, object_nodes
@ -313,12 +308,7 @@ class TestFailoverServer(ClusterTestBase):
)
with reporter.step("Search nodes with object"):
object_nodes = get_object_nodes(
cluster=self.cluster,
wallet=default_wallet,
cid=cid_1,
oid=oid,
shell=self.shell,
endpoint=self.cluster.default_rpc_endpoint,
cluster=self.cluster, cid=cid_1, oid=oid, alive_node=self.cluster.cluster_nodes[0]
)
with reporter.step("Turn off random node with object"):
cluster_state_controller.stop_node_host(node=random.choice(object_nodes[1:]), mode="hard")

View file

@ -425,7 +425,8 @@ class TestEmptyMap(ClusterTestBase):
with reporter.step("Delete pilorama.db from all nodes"):
for node in self.cluster.storage_nodes:
node.delete_pilorama()
for shard in node.get_shards():
node.delete_file(shard.pilorama)
with reporter.step("Start all storage nodes"):
cluster_state_controller.start_all_stopped_services()
@ -445,17 +446,6 @@ class TestEmptyMap(ClusterTestBase):
@pytest.mark.failover
@pytest.mark.failover_data_loss
class TestStorageDataLoss(ClusterTestBase):
@reporter.step("Get list of all piloramas on node")
def get_piloramas_list(self, node: StorageNode) -> list:
data_directory_path = node.get_data_directory()
cmd = f"sudo ls -1 {data_directory_path}/meta*/pilorama*"
shell = node.host.get_shell()
stdout = shell.exec(cmd).stdout
piloramas = stdout.split("\n")
return piloramas
@allure.title(
"After metabase loss on all nodes operations on objects and buckets should be still available via S3 (s3_client={s3_client})"
)
@ -663,15 +653,22 @@ class TestStorageDataLoss(ClusterTestBase):
object_versions.append(put_object)
node_to_check = self.cluster.storage_nodes[0]
piloramas_list_before_removing = {}
with reporter.step("Get list of all pilorama.db"):
piloramas_list_before_removing = self.get_piloramas_list(node_to_check)
piloramas_list_before_removing = []
with reporter.step("Get list of all pilorama.db on shards"):
for shard in node_to_check.get_shards():
piloramas_list_before_removing.append(shard.pilorama)
with reporter.step("Check that all pilorama.db files exist on node"):
for pilorama in piloramas_list_before_removing:
assert node_to_check.is_file_exist(pilorama), f"File {pilorama} does not exist"
with reporter.step("Stop all storage nodes"):
cluster_state_controller.stop_services_of_type(StorageNode)
with reporter.step("Delete pilorama.db from one node"):
node_to_check.delete_pilorama()
for pilorama in piloramas_list_before_removing:
node_to_check.delete_file(pilorama)
with reporter.step("Start all storage nodes"):
cluster_state_controller.start_all_stopped_services()
@ -680,10 +677,9 @@ class TestStorageDataLoss(ClusterTestBase):
self.tick_epochs(1)
sleep(120)
piloramas_list_afrer_removing = {}
with reporter.step("Get list of all pilorama.db after sync"):
piloramas_list_afrer_removing = self.get_piloramas_list(node_to_check)
assert piloramas_list_afrer_removing == piloramas_list_before_removing, "List of pilorama.db is different"
for pilorama in piloramas_list_before_removing:
assert node_to_check.is_file_exist(pilorama), f"File {pilorama} does not exist"
with reporter.step("Check bucket versioning"):
bucket_versioning = s3_client.get_bucket_versioning_status(bucket)

View file

@ -66,9 +66,10 @@ class TestS3GateMultipart(ClusterTestBase):
upload_key = "multipart_abort"
with reporter.step(f"Get related container_id for bucket '{bucket}'"):
container_id = search_container_by_name(
default_wallet, bucket, self.shell, self.cluster.default_rpc_endpoint
)
for cluster_node in self.cluster.cluster_nodes:
container_id = search_container_by_name(bucket, cluster_node)
if container_id:
break
with reporter.step("Create multipart upload"):
upload_id = s3_client.create_multipart_upload(bucket, upload_key)

View file

@ -51,12 +51,10 @@ class TestS3GatePolicy(ClusterTestBase):
assert bucket_loc_2 == "rep-3"
with reporter.step("Check object policy"):
cid_1 = search_container_by_name(
default_wallet,
bucket_1,
shell=self.shell,
endpoint=self.cluster.default_rpc_endpoint,
)
for cluster_node in self.cluster.cluster_nodes:
cid_1 = search_container_by_name(name=bucket_1, node=cluster_node)
if cid_1:
break
copies_1 = get_simple_object_copies(
wallet=default_wallet,
cid=cid_1,
@ -65,12 +63,10 @@ class TestS3GatePolicy(ClusterTestBase):
nodes=self.cluster.storage_nodes,
)
assert copies_1 == 1
cid_2 = search_container_by_name(
default_wallet,
bucket_2,
shell=self.shell,
endpoint=self.cluster.default_rpc_endpoint,
)
for cluster_node in self.cluster.cluster_nodes:
cid_2 = search_container_by_name(name=bucket_1, node=cluster_node)
if cid_2:
break
copies_2 = get_simple_object_copies(
wallet=default_wallet,
cid=cid_2,

View file

@ -1,125 +1,92 @@
import json
import pathlib
import re
from dataclasses import dataclass
from io import StringIO
import allure
import pytest
import yaml
from configobj import ConfigObj
from frostfs_testlib import reporter
from frostfs_testlib.cli import FrostfsCli
from frostfs_testlib.resources.cli import CLI_DEFAULT_TIMEOUT
from frostfs_testlib.resources.common import DEFAULT_WALLET_CONFIG
from frostfs_testlib.storage.cluster import Cluster, StorageNode
SHARD_PREFIX = "FROSTFS_STORAGE_SHARD_"
BLOBSTOR_PREFIX = "_BLOBSTOR_"
@dataclass
class Blobstor:
path: str
path_type: str
def __eq__(self, other) -> bool:
if not isinstance(other, self.__class__):
raise RuntimeError(f"Only two {self.__class__.__name__} instances can be compared")
return self.path == other.path and self.path_type == other.path_type
def __hash__(self):
return hash((self.path, self.path_type))
@staticmethod
def from_config_object(section: ConfigObj, shard_id: str, blobstor_id: str):
var_prefix = f"{SHARD_PREFIX}{shard_id}{BLOBSTOR_PREFIX}{blobstor_id}"
return Blobstor(section.get(f"{var_prefix}_PATH"), section.get(f"{var_prefix}_TYPE"))
@dataclass
class Shard:
blobstor: list[Blobstor]
metabase: str
writecache: str
def __eq__(self, other) -> bool:
if not isinstance(other, self.__class__):
raise RuntimeError(f"Only two {self.__class__.__name__} instances can be compared")
return (
set(self.blobstor) == set(other.blobstor)
and self.metabase == other.metabase
and self.writecache == other.writecache
)
def __hash__(self):
return hash((self.metabase, self.writecache))
@staticmethod
def _get_blobstor_count_from_section(config_object: ConfigObj, shard_id: int):
pattern = f"{SHARD_PREFIX}{shard_id}{BLOBSTOR_PREFIX}"
blobstors = {key[: len(pattern) + 2] for key in config_object.keys() if pattern in key}
return len(blobstors)
@staticmethod
def from_config_object(config_object: ConfigObj, shard_id: int):
var_prefix = f"{SHARD_PREFIX}{shard_id}"
blobstor_count = Shard._get_blobstor_count_from_section(config_object, shard_id)
blobstors = [
Blobstor.from_config_object(config_object, shard_id, blobstor_id) for blobstor_id in range(blobstor_count)
]
write_cache_enabled = config_object.as_bool(f"{var_prefix}_WRITECACHE_ENABLED")
return Shard(
blobstors,
config_object.get(f"{var_prefix}_METABASE_PATH"),
config_object.get(f"{var_prefix}_WRITECACHE_PATH") if write_cache_enabled else "",
)
@staticmethod
def from_object(shard):
metabase = shard["metabase"]["path"] if "path" in shard["metabase"] else shard["metabase"]
writecache = shard["writecache"]["path"] if "path" in shard["writecache"] else shard["writecache"]
return Shard(
blobstor=[Blobstor(path=blobstor["path"], path_type=blobstor["type"]) for blobstor in shard["blobstor"]],
metabase=metabase,
writecache=writecache,
)
def shards_from_yaml(contents: str) -> list[Shard]:
config = yaml.safe_load(contents)
config["storage"]["shard"].pop("default")
return [Shard.from_object(shard) for shard in config["storage"]["shard"].values()]
def shards_from_env(contents: str) -> list[Shard]:
configObj = ConfigObj(StringIO(contents))
pattern = f"{SHARD_PREFIX}\d*"
num_shards = len(set(re.findall(pattern, contents)))
return [Shard.from_config_object(configObj, shard_id) for shard_id in range(num_shards)]
from frostfs_testlib.resources.wellknown_acl import EACL_PUBLIC_READ_WRITE
from frostfs_testlib.steps.cli.container import create_container, delete_container
from frostfs_testlib.steps.cli.object import delete_object, get_object, get_object_nodes, put_object
from frostfs_testlib.storage.cluster import Cluster, ClusterNode, StorageNode
from frostfs_testlib.storage.controllers import ClusterStateController, ShardsWatcher
from frostfs_testlib.storage.controllers.state_managers.config_state_manager import ConfigStateManager
from frostfs_testlib.storage.dataclasses.shard import Shard
from frostfs_testlib.testing import parallel
from frostfs_testlib.testing.cluster_test_base import ClusterTestBase
from frostfs_testlib.utils.file_utils import generate_file
@pytest.mark.shard
class TestControlShard:
class TestControlShard(ClusterTestBase):
@staticmethod
def get_object_path_and_name_file(oid: str, cid: str, node: ClusterNode) -> tuple[str, str]:
oid_path = f"{oid[0]}/{oid[1]}/{oid[2]}/{oid[3]}"
with reporter.step("Search object file"):
node_shell = node.storage_node.host.get_shell()
data_path = node.storage_node.get_data_directory()
all_datas = node_shell.exec(f"ls -la {data_path} | awk '{{ print $9 }}'").stdout.strip()
for data in all_datas.replace(".", "").strip().split("\n"):
check_dir = node_shell.exec(f" [ -d {data_path}/{data}/{oid_path} ] && echo 1 || echo 0").stdout
if "1" in check_dir:
object_path = f"{data_path}/{data}/{oid_path}"
object_name = f"{oid[4:]}.{cid}"
break
return object_path, object_name
def set_shard_rw_mode(self, node: ClusterNode):
watcher = ShardsWatcher(node)
shards = watcher.get_shards()
for shard in shards:
watcher.set_shard_mode(shard["shard_id"], mode="read-write")
watcher.await_for_all_shards_status(status="read-write")
@pytest.fixture()
@allure.title("Revert all shards mode")
def revert_all_shards_mode(self) -> None:
yield
parallel(self.set_shard_rw_mode, self.cluster.cluster_nodes)
@pytest.fixture()
def oid_cid_node(self, default_wallet: str) -> tuple[str, str, ClusterNode]:
with reporter.step("Create container, and put object"):
cid = create_container(
wallet=default_wallet,
shell=self.shell,
endpoint=self.cluster.default_rpc_endpoint,
rule="REP 1 CBF 1",
basic_acl=EACL_PUBLIC_READ_WRITE,
)
file = generate_file(5242880)
oid = put_object(
wallet=default_wallet, path=file, cid=cid, shell=self.shell, endpoint=self.cluster.default_rpc_endpoint
)
with reporter.step("Search node with object"):
nodes = get_object_nodes(cluster=self.cluster, cid=cid, oid=oid, alive_node=self.cluster.cluster_nodes[0])
yield oid, cid, nodes[0]
object_path, object_name = self.get_object_path_and_name_file(oid, cid, nodes[0])
nodes[0].host.get_shell().exec(f"chmod +r {object_path}/{object_name}")
delete_object(
wallet=default_wallet, cid=cid, oid=oid, shell=self.shell, endpoint=self.cluster.default_rpc_endpoint
)
delete_container(wallet=default_wallet, cid=cid, shell=self.shell, endpoint=self.cluster.default_rpc_endpoint)
@staticmethod
def get_shards_from_config(node: StorageNode) -> list[Shard]:
config_file = node.get_shard_config_path()
file_type = pathlib.Path(config_file).suffix
contents = node.host.get_shell().exec(f"cat {config_file}").stdout
parser_method = {
".env": shards_from_env,
".yaml": shards_from_yaml,
".yml": shards_from_yaml,
".env": node.get_shards_from_env,
".yaml": node.get_shards,
".yml": node.get_shards,
}
shards = parser_method[file_type](contents)
shards = parser_method[file_type]()
return shards
@staticmethod
@ -140,9 +107,49 @@ class TestControlShard:
)
return [Shard.from_object(shard) for shard in json.loads(result.stdout.split(">", 1)[1])]
@pytest.fixture()
def change_config_storage(self, cluster_state_controller: ClusterStateController):
with reporter.step("Change threshold error shards"):
cluster_state_controller.manager(ConfigStateManager).set_on_all_nodes(
service_type=StorageNode, values={"storage:shard_ro_error_threshold": "5"}
)
yield
with reporter.step("Restore threshold error shards"):
cluster_state_controller.manager(ConfigStateManager).revert_all()
@allure.title("All shards are available")
def test_control_shard(self, cluster: Cluster):
for storage_node in cluster.storage_nodes:
shards_from_config = self.get_shards_from_config(storage_node)
shards_from_cli = self.get_shards_from_cli(storage_node)
assert set(shards_from_config) == set(shards_from_cli)
@pytest.mark.failover
def test_shard_errors(
self,
default_wallet: str,
oid_cid_node: tuple[str, str, ClusterNode],
change_config_storage: None,
revert_all_shards_mode: None,
):
oid, cid, node = oid_cid_node
object_path, object_name = self.get_object_path_and_name_file(*oid_cid_node)
with reporter.step("Block read file"):
node.host.get_shell().exec(f"chmod a-r {object_path}/{object_name}")
with reporter.step("Get object, expect 6 errors"):
for _ in range(6):
with pytest.raises(RuntimeError):
get_object(
wallet=default_wallet,
cid=cid,
oid=oid,
shell=self.shell,
endpoint=node.storage_node.get_rpc_endpoint(),
)
with reporter.step("Check shard status"):
for shard in ShardsWatcher(node).get_shards():
if shard["blobstor"][1]["path"] in object_path:
with reporter.step(f"Shard - {shard['shard_id']} to {node.host_ip}, mode - {shard['mode']}"):
assert shard["mode"] == "read-only"
break

View file

@ -36,7 +36,7 @@ class TestLogs:
logs_dir = os.path.join(temp_directory, "logs")
os.makedirs(logs_dir)
# Using \b here because 'oom' and 'panic' can sometimes be found in OID or CID
issues_regex = r"\bpanic\b|\boom\b|too many|insufficient funds|insufficient amount of gas|wallet passwd|secret key|access key|cannot assign requested address"
issues_regex = r"\bpanic\b|\boom\b|too many|insufficient funds|insufficient amount of gas|wallet passwd|secret \bkey\b|access key|cannot assign requested address"
exclude_filter = r"too many requests"
time.sleep(2)