forked from TrueCloudLab/frostfs-testcases
Compare commits
4 commits
Author | SHA1 | Date | |
---|---|---|---|
44bb446847 | |||
67a42cae81 | |||
6174330f49 | |||
ee1f898849 |
9 changed files with 217 additions and 122 deletions
|
@ -15,8 +15,7 @@ def validate_object_policy(wallet: str, shell: Shell, placement_rule: str, cid:
|
|||
def get_netmap_param(netmap_info: list[NodeNetmapInfo]) -> dict:
|
||||
dict_external = dict()
|
||||
for node in netmap_info:
|
||||
external_adress = node.external_address[0].split("/")[2]
|
||||
dict_external[external_adress] = {
|
||||
dict_external[node.node] = {
|
||||
"country": node.country,
|
||||
"country_code": node.country_code,
|
||||
"Price": node.price,
|
||||
|
|
|
@ -210,12 +210,35 @@ def frostfs_cli(client_shell: Shell, wallet: WalletInfo) -> FrostfsCli:
|
|||
return FrostfsCli(client_shell, FROSTFS_CLI_EXEC, wallet.config_path)
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
@allure.title("Init Frostfs CLI remote")
|
||||
def remote_frostfs_cli(cluster: Cluster) -> FrostfsCli:
|
||||
node = cluster.cluster_nodes[0]
|
||||
host = node.host
|
||||
service_config = host.get_service_config(node.storage_node.name)
|
||||
wallet_path = service_config.attributes["wallet_path"]
|
||||
wallet_password = service_config.attributes["wallet_password"]
|
||||
|
||||
shell = host.get_shell()
|
||||
wallet_config_path = f"/tmp/{node.storage_node.name}-config.yaml"
|
||||
wallet_config = f'wallet: {wallet_path}\npassword: "{wallet_password}"'
|
||||
shell.exec(f"echo '{wallet_config}' > {wallet_config_path}")
|
||||
cli = FrostfsCli(shell=shell, frostfs_cli_exec_path=FROSTFS_CLI_EXEC, config_file=wallet_config_path)
|
||||
return cli
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
@allure.title("Init GrpcClientWrapper with local Frostfs CLI")
|
||||
def grpc_client(frostfs_cli: FrostfsCli) -> GrpcClientWrapper:
|
||||
return CliClientWrapper(frostfs_cli)
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
@allure.title("Init GrpcClientWrapper with remote Frostfs CLI")
|
||||
def remote_grpc_client(remote_frostfs_cli: FrostfsCli) -> GrpcClientWrapper:
|
||||
return CliClientWrapper(remote_frostfs_cli)
|
||||
|
||||
|
||||
# By default we want all tests to be executed with both storage policies.
|
||||
# This can be overriden in choosen tests if needed.
|
||||
@pytest.fixture(scope="session", params=[pytest.param("rep", marks=pytest.mark.rep), pytest.param("ec", marks=pytest.mark.ec)])
|
||||
|
@ -485,6 +508,7 @@ def user(user_tag: str) -> User:
|
|||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
@cached_fixture(optionals.OPTIONAL_CACHE_FIXTURES)
|
||||
def wallet(user: User, credentials_provider: CredentialsProvider, cluster: Cluster) -> WalletInfo:
|
||||
credentials_provider.GRPC.provide(user, cluster.cluster_nodes[0])
|
||||
return user.wallet
|
||||
|
|
|
@ -5,7 +5,9 @@ from frostfs_testlib.steps.cli.container import create_container, delete_contain
|
|||
from frostfs_testlib.steps.cli.object import delete_object, put_object_to_random_node
|
||||
from frostfs_testlib.steps.node_management import get_netmap_snapshot
|
||||
from frostfs_testlib.steps.storage_policy import get_nodes_with_object
|
||||
from frostfs_testlib.storage.cluster import Cluster
|
||||
from frostfs_testlib.storage.dataclasses.object_size import ObjectSize
|
||||
from frostfs_testlib.storage.dataclasses.storage_object_info import Interfaces
|
||||
from frostfs_testlib.storage.dataclasses.wallet import WalletInfo
|
||||
from frostfs_testlib.testing.cluster_test_base import ClusterTestBase
|
||||
from frostfs_testlib.utils.cli_utils import parse_netmap_output
|
||||
|
@ -169,7 +171,7 @@ class TestPolicy(ClusterTestBase):
|
|||
with reporter.step(f"Check the object appearance"):
|
||||
netmap = parse_netmap_output(get_netmap_snapshot(node=resulting_copies[0], shell=self.shell))
|
||||
netmap = get_netmap_param(netmap)
|
||||
node_address = resulting_copies[0].get_rpc_endpoint().split(":")[0]
|
||||
node_address = self.cluster.node(resulting_copies[0]).get_interface(Interfaces.MGMT)
|
||||
with reporter.step(f"Check the node is selected from {placement_params['country']}"):
|
||||
assert (
|
||||
placement_params["country"] == netmap[node_address]["country"]
|
||||
|
@ -219,7 +221,7 @@ class TestPolicy(ClusterTestBase):
|
|||
with reporter.step(f"Check the object appearance"):
|
||||
netmap = parse_netmap_output(get_netmap_snapshot(node=resulting_copies[0], shell=self.shell))
|
||||
netmap = get_netmap_param(netmap)
|
||||
node_address = resulting_copies[0].get_rpc_endpoint().split(":")[0]
|
||||
node_address = self.cluster.node(resulting_copies[0]).get_interface(Interfaces.MGMT)
|
||||
with reporter.step(f"Check the node is selected from {placement_params['country'][0]}"):
|
||||
assert (
|
||||
not (placement_params["country"][1] == netmap[node_address]["country"])
|
||||
|
@ -273,8 +275,8 @@ class TestPolicy(ClusterTestBase):
|
|||
netmap = parse_netmap_output(get_netmap_snapshot(node=resulting_copies[0], shell=self.shell))
|
||||
netmap = get_netmap_param(netmap)
|
||||
with reporter.step(f"Check two nodes are selected from any country"):
|
||||
for node in resulting_copies:
|
||||
node_address = resulting_copies[0].get_rpc_endpoint().split(":")[0]
|
||||
for storage_node in resulting_copies:
|
||||
node_address = self.cluster.node(storage_node).get_interface(Interfaces.MGMT)
|
||||
assert (placement_params["country"][1] == netmap[node_address]["country"]) or (
|
||||
not (placement_params["country"][1] == netmap[node_address]["country"])
|
||||
and (placement_params["country"][0] == netmap[node_address]["country"])
|
||||
|
@ -326,8 +328,8 @@ class TestPolicy(ClusterTestBase):
|
|||
netmap = parse_netmap_output(get_netmap_snapshot(node=resulting_copies[0], shell=self.shell))
|
||||
netmap = get_netmap_param(netmap)
|
||||
with reporter.step(f"Check two nodes are selected from {placement_params['country']}"):
|
||||
for node in resulting_copies:
|
||||
node_address = node.get_rpc_endpoint().split(":")[0]
|
||||
for storage_node in resulting_copies:
|
||||
node_address = self.cluster.node(storage_node).get_interface(Interfaces.MGMT)
|
||||
assert (
|
||||
placement_params["country"] == netmap[node_address]["country"]
|
||||
), f"The node is selected from the wrong country. Got {netmap[node_address]['country']}"
|
||||
|
@ -635,8 +637,8 @@ class TestPolicy(ClusterTestBase):
|
|||
netmap = parse_netmap_output(get_netmap_snapshot(node=resulting_copies[0], shell=self.shell))
|
||||
netmap = get_netmap_param(netmap)
|
||||
with reporter.step(f"Check two nodes are selected from {' or '.join(placement_params['country'])}"):
|
||||
for node in resulting_copies:
|
||||
node_address = node.get_rpc_endpoint().split(":")[0]
|
||||
for storage_node in resulting_copies:
|
||||
node_address = self.cluster.node(storage_node).get_interface(Interfaces.MGMT)
|
||||
assert (
|
||||
(netmap[node_address]["country"] in placement_params["country"])
|
||||
or (netmap[node_address]["country"] in placement_params["country"])
|
||||
|
@ -756,8 +758,8 @@ class TestPolicy(ClusterTestBase):
|
|||
netmap = parse_netmap_output(get_netmap_snapshot(node=resulting_copies[0], shell=self.shell))
|
||||
netmap = get_netmap_param(netmap)
|
||||
with reporter.step(f"Check two nodes are selected not from {placement_params['country'][0]}"):
|
||||
for node in resulting_copies:
|
||||
node_address = node.get_rpc_endpoint().split(":")[0]
|
||||
for storage_node in resulting_copies:
|
||||
node_address = self.cluster.node(storage_node).get_interface(Interfaces.MGMT)
|
||||
assert not (placement_params["country"][0] == netmap[node_address]["country"]) or (
|
||||
not (placement_params["country"][0] == netmap[node_address]["country"])
|
||||
and not (placement_params["country_code"] == netmap[node_address]["country_code"])
|
||||
|
@ -809,8 +811,8 @@ class TestPolicy(ClusterTestBase):
|
|||
netmap = parse_netmap_output(get_netmap_snapshot(node=resulting_copies[0], shell=self.shell))
|
||||
netmap = get_netmap_param(netmap)
|
||||
with reporter.step(f"Check three nodes are selected from any country"):
|
||||
for node in resulting_copies:
|
||||
node_address = node.get_rpc_endpoint().split(":")[0]
|
||||
for storage_node in resulting_copies:
|
||||
node_address = self.cluster.node(storage_node).get_interface(Interfaces.MGMT)
|
||||
assert (placement_params["country"][0] == netmap[node_address]["country"]) or (
|
||||
not (placement_params["country"][0] == netmap[node_address]["country"])
|
||||
and (placement_params["country"][1] == netmap[node_address]["country"])
|
||||
|
@ -864,8 +866,8 @@ class TestPolicy(ClusterTestBase):
|
|||
netmap = parse_netmap_output(get_netmap_snapshot(node=resulting_copies[0], shell=self.shell))
|
||||
netmap = get_netmap_param(netmap)
|
||||
list_of_location = []
|
||||
for node in resulting_copies:
|
||||
node_address = node.get_rpc_endpoint().split(":")[0]
|
||||
for storage_node in resulting_copies:
|
||||
node_address = self.cluster.node(storage_node).get_interface(Interfaces.MGMT)
|
||||
list_of_location.append(netmap[node_address]["location"])
|
||||
|
||||
with reporter.step(f"Check two or three nodes are selected from Russia and from any other country"):
|
||||
|
|
|
@ -9,11 +9,12 @@ from frostfs_testlib.steps.cli.container import delete_container
|
|||
from frostfs_testlib.steps.cli.object import delete_object, put_object_to_random_node
|
||||
from frostfs_testlib.steps.node_management import get_netmap_snapshot
|
||||
from frostfs_testlib.steps.storage_policy import get_nodes_with_object
|
||||
from frostfs_testlib.storage.cluster import Cluster
|
||||
from frostfs_testlib.storage.cluster import Cluster, ClusterNode
|
||||
from frostfs_testlib.storage.controllers.cluster_state_controller import ClusterStateController
|
||||
from frostfs_testlib.storage.controllers.state_managers.config_state_manager import ConfigStateManager
|
||||
from frostfs_testlib.storage.dataclasses.frostfs_services import StorageNode
|
||||
from frostfs_testlib.storage.dataclasses.object_size import ObjectSize
|
||||
from frostfs_testlib.storage.dataclasses.storage_object_info import Interfaces, NodeNetmapInfo
|
||||
from frostfs_testlib.storage.dataclasses.wallet import WalletInfo
|
||||
from frostfs_testlib.testing import parallel
|
||||
from frostfs_testlib.testing.cluster_test_base import ClusterTestBase
|
||||
|
@ -34,23 +35,36 @@ class TestPolicyWithPrice(ClusterTestBase):
|
|||
def await_for_price_attribute_on_nodes(self):
|
||||
netmap = parse_netmap_output(get_netmap_snapshot(node=self.cluster.storage_nodes[0], shell=self.shell))
|
||||
netmap = get_netmap_param(netmap)
|
||||
for node in self.cluster.storage_nodes:
|
||||
node_address = node.get_rpc_endpoint().split(":")[0]
|
||||
for node in self.cluster.cluster_nodes:
|
||||
node_address = node.get_interface(Interfaces.MGMT)
|
||||
if netmap[node_address]["Price"] is None:
|
||||
return False
|
||||
return True
|
||||
|
||||
@reporter.step("Set Pirce field on {cluster_node}")
|
||||
def set_price_on_node(
|
||||
self, cluster_node: ClusterNode, locode_price_map: dict[str, str], netmap: list[NodeNetmapInfo], config_manager: ConfigStateManager
|
||||
):
|
||||
node_address = cluster_node.get_interface(Interfaces.MGMT)
|
||||
node_netmap = [netmap_entry for netmap_entry in netmap if netmap_entry.node == node_address]
|
||||
|
||||
assert node_netmap, f"No node found with address {node_address}: \n{netmap}"
|
||||
|
||||
price = locode_price_map[node_netmap[0].un_locode]
|
||||
config_manager.set_on_node(cluster_node, StorageNode, {"node:attribute_5": f"Price:{price}"})
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def fill_field_price(self, cluster: Cluster, cluster_state_controller_session: ClusterStateController):
|
||||
prices = ["15", "10", "65", "55"]
|
||||
locode_price_map = {
|
||||
"RU MOW": "15",
|
||||
"RU LED": "10",
|
||||
"SE STO": "65",
|
||||
"FI HEL": "55",
|
||||
}
|
||||
|
||||
netmap = parse_netmap_output(get_netmap_snapshot(node=self.cluster.storage_nodes[0], shell=self.shell))
|
||||
config_manager = cluster_state_controller_session.manager(ConfigStateManager)
|
||||
parallel(
|
||||
config_manager.set_on_node,
|
||||
cluster.cluster_nodes,
|
||||
StorageNode,
|
||||
itertools.cycle([{"node:attribute_5": f"Price:{price}"} for price in prices]),
|
||||
)
|
||||
parallel(self.set_price_on_node, cluster.cluster_nodes, locode_price_map, netmap, config_manager)
|
||||
cluster_state_controller_session.wait_after_storage_startup()
|
||||
|
||||
self.tick_epoch()
|
||||
|
@ -108,7 +122,7 @@ class TestPolicyWithPrice(ClusterTestBase):
|
|||
with reporter.step(f"Check the object appearance"):
|
||||
netmap = parse_netmap_output(get_netmap_snapshot(node=resulting_copies[0], shell=self.shell))
|
||||
netmap = get_netmap_param(netmap)
|
||||
node_address = resulting_copies[0].get_rpc_endpoint().split(":")[0]
|
||||
node_address = self.cluster.node(resulting_copies[0]).get_interface(Interfaces.MGMT)
|
||||
with reporter.step(f"Check the node is selected with price <= {placement_params['Price']}"):
|
||||
assert (
|
||||
int(netmap[node_address]["Price"]) <= placement_params["Price"]
|
||||
|
@ -160,7 +174,7 @@ class TestPolicyWithPrice(ClusterTestBase):
|
|||
netmap = get_netmap_param(netmap)
|
||||
with reporter.step(f"Check the node is selected with price between 1 and 10"):
|
||||
for node in resulting_copies:
|
||||
node_address = node.get_rpc_endpoint().split(":")[0]
|
||||
node_address = self.cluster.node(node).get_interface(Interfaces.MGMT)
|
||||
assert (
|
||||
int(netmap[node_address]["Price"]) > placement_params["Price"][1]
|
||||
and int(netmap[node_address]["Price"]) <= placement_params["Price"][0]
|
||||
|
@ -212,7 +226,7 @@ class TestPolicyWithPrice(ClusterTestBase):
|
|||
netmap = get_netmap_param(netmap)
|
||||
with reporter.step(f"Check two nodes are selected with max and min prices"):
|
||||
for node in resulting_copies:
|
||||
node_address = node.get_rpc_endpoint().split(":")[0]
|
||||
node_address = self.cluster.node(node).get_interface(Interfaces.MGMT)
|
||||
assert (
|
||||
int(netmap[node_address]["Price"]) > placement_params["Price"][1]
|
||||
or int(netmap[node_address]["Price"]) < placement_params["Price"][0]
|
||||
|
@ -260,7 +274,7 @@ class TestPolicyWithPrice(ClusterTestBase):
|
|||
netmap = get_netmap_param(netmap)
|
||||
with reporter.step(f"Check two nodes are selected with price > {placement_params['Price']}"):
|
||||
for node in resulting_copies:
|
||||
node_address = node.get_rpc_endpoint().split(":")[0]
|
||||
node_address = self.cluster.node(node).get_interface(Interfaces.MGMT)
|
||||
assert (
|
||||
int(netmap[node_address]["Price"]) > placement_params["Price"]
|
||||
), f"The node is selected with the wrong price. Got {netmap[node_address]}"
|
||||
|
@ -311,7 +325,7 @@ class TestPolicyWithPrice(ClusterTestBase):
|
|||
netmap = get_netmap_param(netmap)
|
||||
with reporter.step(f"Check two nodes are selected not with country code '{placement_params['country_code']}'"):
|
||||
for node in resulting_copies:
|
||||
node_address = node.get_rpc_endpoint().split(":")[0]
|
||||
node_address = self.cluster.node(node).get_interface(Interfaces.MGMT)
|
||||
assert (
|
||||
not netmap[node_address]["country_code"] == placement_params["country_code"]
|
||||
or not netmap[node_address]["country_code"] == placement_params["country_code"]
|
||||
|
@ -364,7 +378,7 @@ class TestPolicyWithPrice(ClusterTestBase):
|
|||
netmap = get_netmap_param(netmap)
|
||||
with reporter.step(f"Check all nodes are selected"):
|
||||
for node in resulting_copies:
|
||||
node_address = node.get_rpc_endpoint().split(":")[0]
|
||||
node_address = self.cluster.node(node).get_interface(Interfaces.MGMT)
|
||||
assert (
|
||||
netmap[node_address]["un_locode"] in placement_params["un_locode"]
|
||||
or not netmap[node_address]["un_locode"] == placement_params["un_locode"][1]
|
||||
|
@ -420,7 +434,7 @@ class TestPolicyWithPrice(ClusterTestBase):
|
|||
netmap = get_netmap_param(netmap)
|
||||
with reporter.step(f"Check two nodes are selected with price < {placement_params['Price']}"):
|
||||
for node in resulting_copies:
|
||||
node_address = node.get_rpc_endpoint().split(":")[0]
|
||||
node_address = self.cluster.node(node).get_interface(Interfaces.MGMT)
|
||||
assert (
|
||||
int(netmap[node_address]["Price"]) < placement_params["Price"]
|
||||
), f"The node is selected with the wrong price. Got {netmap[node_address]}"
|
||||
|
@ -471,7 +485,7 @@ class TestPolicyWithPrice(ClusterTestBase):
|
|||
netmap = get_netmap_param(netmap)
|
||||
with reporter.step(f"Check three nodes are selected not from {placement_params['continent']}"):
|
||||
for node in resulting_copies:
|
||||
node_address = node.get_rpc_endpoint().split(":")[0]
|
||||
node_address = self.cluster.node(node).get_interface(Interfaces.MGMT)
|
||||
assert (
|
||||
int(netmap[node_address]["Price"]) < placement_params["Price"]
|
||||
and not netmap[node_address]["continent"] == placement_params["continent"]
|
||||
|
@ -525,7 +539,7 @@ class TestPolicyWithPrice(ClusterTestBase):
|
|||
netmap = get_netmap_param(netmap)
|
||||
with reporter.step(f"Check all nodes are selected"):
|
||||
for node in resulting_copies:
|
||||
node_address = node.get_rpc_endpoint().split(":")[0]
|
||||
node_address = self.cluster.node(node).get_interface(Interfaces.MGMT)
|
||||
assert (
|
||||
(
|
||||
int(netmap[node_address]["Price"]) > placement_params["Price"][1]
|
||||
|
@ -576,7 +590,7 @@ class TestPolicyWithPrice(ClusterTestBase):
|
|||
with reporter.step(f"Check the object appearance"):
|
||||
netmap = parse_netmap_output(get_netmap_snapshot(node=resulting_copies[0], shell=self.shell))
|
||||
netmap = get_netmap_param(netmap)
|
||||
node_address = resulting_copies[0].get_rpc_endpoint().split(":")[0]
|
||||
node_address = self.cluster.node(resulting_copies[0]).get_interface(Interfaces.MGMT)
|
||||
with reporter.step(f"Check the node is selected with price >= {placement_params['Price']}"):
|
||||
assert (
|
||||
int(netmap[node_address]["Price"]) >= placement_params["Price"]
|
||||
|
@ -628,7 +642,7 @@ class TestPolicyWithPrice(ClusterTestBase):
|
|||
netmap = get_netmap_param(netmap)
|
||||
with reporter.step(f"Check all node are selected"):
|
||||
for node in resulting_copies:
|
||||
node_address = node.get_rpc_endpoint().split(":")[0]
|
||||
node_address = self.cluster.node(node).get_interface(Interfaces.MGMT)
|
||||
assert (netmap[node_address]["country"] in placement_params["country"]) or (
|
||||
int(netmap[node_address]["Price"]) >= placement_params["Price"]
|
||||
), f"The node is selected from the wrong country or with wrong price. Got {netmap[node_address]}"
|
||||
|
|
|
@ -309,6 +309,11 @@ class TestFailoverNetwork(ClusterTestBase):
|
|||
block_interface: Interfaces,
|
||||
other_interface: Interfaces,
|
||||
):
|
||||
endpoint_id_map = {
|
||||
Interfaces.DATA_O: 0,
|
||||
Interfaces.DATA_1: 1,
|
||||
}
|
||||
endpoint_id = endpoint_id_map[other_interface]
|
||||
cluster_nodes = self.cluster.cluster_nodes
|
||||
with reporter.step(f"Block {block_interface.value} interfaces"):
|
||||
cluster_state_controller.down_interface(cluster_nodes, block_interface.value)
|
||||
|
@ -320,28 +325,27 @@ class TestFailoverNetwork(ClusterTestBase):
|
|||
cid = create_container(
|
||||
wallet=default_wallet,
|
||||
shell=self.shell,
|
||||
endpoint=f"{cluster_nodes[0].get_data_interface(other_interface.value)[0]}:8080",
|
||||
endpoint=cluster_nodes[0].storage_node.get_all_rpc_endpoint()[endpoint_id],
|
||||
rule="REP 4 CBF 1",
|
||||
)
|
||||
|
||||
with reporter.step("Put object"):
|
||||
file_path = generate_file(simple_object_size.value)
|
||||
|
||||
oid = put_object(
|
||||
wallet=default_wallet,
|
||||
path=file_path,
|
||||
cid=cid,
|
||||
shell=self.shell,
|
||||
endpoint=f"{cluster_nodes[0].get_data_interface(other_interface.value)[0]}:8080",
|
||||
endpoint=cluster_nodes[0].storage_node.get_all_rpc_endpoint()[endpoint_id],
|
||||
)
|
||||
|
||||
with reporter.step("Get object"):
|
||||
file_get_path = get_object(
|
||||
get_object(
|
||||
wallet=default_wallet,
|
||||
cid=cid,
|
||||
oid=oid,
|
||||
shell=self.shell,
|
||||
endpoint=f"{cluster_nodes[0].get_data_interface(other_interface.value)[0]}:8080",
|
||||
endpoint=cluster_nodes[0].storage_node.get_all_rpc_endpoint()[endpoint_id],
|
||||
)
|
||||
|
||||
with reporter.step("Restore interfaces all nodes"):
|
||||
|
|
|
@ -36,7 +36,7 @@ from frostfs_testlib.steps.storage_policy import get_nodes_with_object
|
|||
from frostfs_testlib.storage.cluster import ClusterNode, StorageNode
|
||||
from frostfs_testlib.storage.controllers import ClusterStateController
|
||||
from frostfs_testlib.storage.dataclasses.object_size import ObjectSize
|
||||
from frostfs_testlib.storage.dataclasses.storage_object_info import NodeStatus
|
||||
from frostfs_testlib.storage.dataclasses.storage_object_info import Interfaces, NodeStatus
|
||||
from frostfs_testlib.storage.dataclasses.wallet import WalletInfo
|
||||
from frostfs_testlib.testing.cluster_test_base import ClusterTestBase
|
||||
from frostfs_testlib.utils import string_utils
|
||||
|
@ -343,9 +343,11 @@ class TestMaintenanceMode(ClusterTestBase):
|
|||
def check_node_status(self, expected_status: NodeStatus, node_under_test: ClusterNode, frostfs_cli: FrostfsCli, rpc_endpoint: str):
|
||||
netmap = frostfs_cli.netmap.snapshot(rpc_endpoint).stdout
|
||||
all_snapshots = NetmapParser.snapshot_all_nodes(netmap)
|
||||
node_snapshot = [snapshot for snapshot in all_snapshots if node_under_test.host_ip == snapshot.node]
|
||||
node_snapshot = [snapshot for snapshot in all_snapshots if node_under_test.get_interface(Interfaces.MGMT) == snapshot.node]
|
||||
if expected_status == NodeStatus.OFFLINE and not node_snapshot:
|
||||
assert node_under_test.host_ip not in netmap, f"{node_under_test} status should be {expected_status}. See netmap:\n{netmap}"
|
||||
assert (
|
||||
node_under_test.get_interface(Interfaces.MGMT) not in netmap
|
||||
), f"{node_under_test} status should be {expected_status}. See netmap:\n{netmap}"
|
||||
return
|
||||
|
||||
assert node_snapshot, f"{node_under_test} status should be {expected_status}, but was not in netmap. See netmap:\n{netmap}"
|
||||
|
|
|
@ -31,6 +31,7 @@ class TestGRPCMetrics(ClusterTestBase):
|
|||
|
||||
@allure.title("GRPC metrics container operations")
|
||||
def test_grpc_metrics_container_operations(self, default_wallet: WalletInfo, cluster: Cluster):
|
||||
operations_count = 10
|
||||
placement_policy = "REP 2 IN X CBF 1 SELECT 4 FROM * AS X"
|
||||
|
||||
with reporter.step("Select random node"):
|
||||
|
@ -56,12 +57,14 @@ class TestGRPCMetrics(ClusterTestBase):
|
|||
metrics_counter_get = get_metrics_value(node, command="grpc_server_handled_total", service="ContainerService", method="Get")
|
||||
|
||||
with reporter.step(f"Get container"):
|
||||
get_container(default_wallet, cid, self.shell, node.storage_node.get_rpc_endpoint())
|
||||
for _ in range(operations_count):
|
||||
get_container(default_wallet, cid, self.shell, node.storage_node.get_rpc_endpoint())
|
||||
|
||||
with reporter.step(f"Check gRPC metrics method=Get, 'the counter should increase by 1'"):
|
||||
metrics_counter_get += 1
|
||||
metrics_counter_get += operations_count
|
||||
check_metrics_counter(
|
||||
[node],
|
||||
operator=">=",
|
||||
counter_exp=metrics_counter_get,
|
||||
command="grpc_server_handled_total",
|
||||
service="ContainerService",
|
||||
|
@ -72,12 +75,14 @@ class TestGRPCMetrics(ClusterTestBase):
|
|||
metrics_counter_list = get_metrics_value(node, command="grpc_server_handled_total", service="ContainerService", method="List")
|
||||
|
||||
with reporter.step(f"Get container list"):
|
||||
list_containers(default_wallet, self.shell, node.storage_node.get_rpc_endpoint())
|
||||
for _ in range(operations_count):
|
||||
list_containers(default_wallet, self.shell, node.storage_node.get_rpc_endpoint())
|
||||
|
||||
with reporter.step(f"Check gRPC metrics method=List, 'the counter should increase by 1'"):
|
||||
metrics_counter_list += 1
|
||||
metrics_counter_list += operations_count
|
||||
check_metrics_counter(
|
||||
[node],
|
||||
operator=">=",
|
||||
counter_exp=metrics_counter_list,
|
||||
command="grpc_server_handled_total",
|
||||
service="ContainerService",
|
||||
|
@ -88,6 +93,7 @@ class TestGRPCMetrics(ClusterTestBase):
|
|||
def test_grpc_metrics_object_operations(
|
||||
self, simple_object_size: ObjectSize, default_wallet: WalletInfo, cluster: Cluster, container: str, disable_policer
|
||||
):
|
||||
operations_count = 10
|
||||
file_path = generate_file(simple_object_size.value)
|
||||
|
||||
with reporter.step("Select random node"):
|
||||
|
@ -97,12 +103,14 @@ class TestGRPCMetrics(ClusterTestBase):
|
|||
metrics_counter_put = get_metrics_value(node, command="grpc_server_handled_total", service="ObjectService", method="Put")
|
||||
|
||||
with reporter.step("Put object to selected node"):
|
||||
oid = put_object(default_wallet, file_path, container, self.shell, node.storage_node.get_rpc_endpoint())
|
||||
for _ in range(operations_count):
|
||||
oid = put_object(default_wallet, file_path, container, self.shell, node.storage_node.get_rpc_endpoint())
|
||||
|
||||
with reporter.step(f"Check gRPC metrics method 'Put', 'the counter should increase by 1'"):
|
||||
metrics_counter_put += 1
|
||||
with reporter.step(f"Check gRPC metrics method 'Put', 'the counter should increase by {operations_count}'"):
|
||||
metrics_counter_put += operations_count
|
||||
check_metrics_counter(
|
||||
[node],
|
||||
operator=">=",
|
||||
counter_exp=metrics_counter_put,
|
||||
command="grpc_server_handled_total",
|
||||
service="ObjectService",
|
||||
|
@ -113,12 +121,14 @@ class TestGRPCMetrics(ClusterTestBase):
|
|||
metrics_counter_get = get_metrics_value(node, command="grpc_server_handled_total", service="ObjectService", method="Get")
|
||||
|
||||
with reporter.step(f"Get object"):
|
||||
get_object(default_wallet, container, oid, self.shell, node.storage_node.get_rpc_endpoint())
|
||||
for _ in range(operations_count):
|
||||
get_object(default_wallet, container, oid, self.shell, node.storage_node.get_rpc_endpoint())
|
||||
|
||||
with reporter.step(f"Check gRPC metrics method=Get, 'the counter should increase by 1'"):
|
||||
metrics_counter_get += 1
|
||||
with reporter.step(f"Check gRPC metrics method=Get, 'the counter should increase by {operations_count}'"):
|
||||
metrics_counter_get += operations_count
|
||||
check_metrics_counter(
|
||||
[node],
|
||||
operator=">=",
|
||||
counter_exp=metrics_counter_get,
|
||||
command="grpc_server_handled_total",
|
||||
service="ObjectService",
|
||||
|
@ -129,12 +139,14 @@ class TestGRPCMetrics(ClusterTestBase):
|
|||
metrics_counter_search = get_metrics_value(node, command="grpc_server_handled_total", service="ObjectService", method="Search")
|
||||
|
||||
with reporter.step(f"Search object"):
|
||||
search_object(default_wallet, container, self.shell, node.storage_node.get_rpc_endpoint())
|
||||
for _ in range(operations_count):
|
||||
search_object(default_wallet, container, self.shell, node.storage_node.get_rpc_endpoint())
|
||||
|
||||
with reporter.step(f"Check gRPC metrics method=Search, 'the counter should increase by 1'"):
|
||||
metrics_counter_search += 1
|
||||
with reporter.step(f"Check gRPC metrics method=Search, 'the counter should increase by {operations_count}'"):
|
||||
metrics_counter_search += operations_count
|
||||
check_metrics_counter(
|
||||
[node],
|
||||
operator=">=",
|
||||
counter_exp=metrics_counter_search,
|
||||
command="grpc_server_handled_total",
|
||||
service="ObjectService",
|
||||
|
@ -145,12 +157,14 @@ class TestGRPCMetrics(ClusterTestBase):
|
|||
metrics_counter_head = get_metrics_value(node, command="grpc_server_handled_total", service="ObjectService", method="Head")
|
||||
|
||||
with reporter.step(f"Head object"):
|
||||
head_object(default_wallet, container, oid, self.shell, node.storage_node.get_rpc_endpoint())
|
||||
for _ in range(operations_count):
|
||||
head_object(default_wallet, container, oid, self.shell, node.storage_node.get_rpc_endpoint())
|
||||
|
||||
with reporter.step(f"Check gRPC metrics method=Head, 'the counter should increase by 1'"):
|
||||
metrics_counter_head += 1
|
||||
with reporter.step(f"Check gRPC metrics method=Head, 'the counter should increase by {operations_count}'"):
|
||||
metrics_counter_head += operations_count
|
||||
check_metrics_counter(
|
||||
[node],
|
||||
operator=">=",
|
||||
counter_exp=metrics_counter_head,
|
||||
command="grpc_server_handled_total",
|
||||
service="ObjectService",
|
||||
|
|
|
@ -733,9 +733,11 @@ class TestObjectApiPatch(ClusterTestBase):
|
|||
@allure.title("[NEGATIVE] Patch cannot be applied to part of complex object (policy={placement_policy})")
|
||||
@pytest.mark.parametrize("placement_policy", ["rep"], indirect=True)
|
||||
@pytest.mark.parametrize("object_size", ["complex"], indirect=True)
|
||||
def test_patch_part_of_complex_object_rep(self, grpc_client: GrpcClientWrapper, container: str, original_object: str):
|
||||
def test_patch_part_of_complex_object_rep(
|
||||
self, grpc_client: GrpcClientWrapper, remote_grpc_client: GrpcClientWrapper, container: str, original_object: str
|
||||
):
|
||||
with reporter.step("Get parts of complex object"):
|
||||
parts = grpc_client.object.parts(container, original_object, self.cluster.cluster_nodes[0])
|
||||
parts = remote_grpc_client.object.parts(container, original_object, self.cluster.cluster_nodes[0])
|
||||
assert parts, f"Expected list of OIDs of object parts: {parts}"
|
||||
|
||||
part_oid = parts[0]
|
||||
|
@ -752,9 +754,11 @@ class TestObjectApiPatch(ClusterTestBase):
|
|||
|
||||
@allure.title("[NEGATIVE] Patch cannot be applied to EC chunk (object_size={object_size}, policy={placement_policy})")
|
||||
@pytest.mark.parametrize("placement_policy", ["ec"], indirect=True)
|
||||
def test_patch_ec_chunk(self, grpc_client: GrpcClientWrapper, container: str, original_object: str):
|
||||
def test_patch_ec_chunk(
|
||||
self, grpc_client: GrpcClientWrapper, remote_grpc_client: GrpcClientWrapper, container: str, original_object: str
|
||||
):
|
||||
with reporter.step("Get chunks of object"):
|
||||
chunks = grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, container, original_object)
|
||||
chunks = remote_grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, container, original_object)
|
||||
assert chunks, f"Expected object chunks, but they are not there: {chunks}"
|
||||
|
||||
with reporter.step("Try patch chunk of object and catch exception"):
|
||||
|
|
|
@ -145,13 +145,13 @@ class TestECReplication(ClusterTestBase):
|
|||
@allure.title("Create container with EC policy (size={object_size})")
|
||||
@requires_container(PUBLIC_WITH_POLICY("EC 2.1"))
|
||||
def test_create_container_with_ec_policy(
|
||||
self, container: str, rep_count: int, grpc_client: GrpcClientWrapper, test_file: TestFile
|
||||
self, container: str, rep_count: int, grpc_client: GrpcClientWrapper, remote_grpc_client: GrpcClientWrapper, test_file: TestFile
|
||||
) -> None:
|
||||
with reporter.step("Put object in container."):
|
||||
oid = grpc_client.object.put(test_file, container, self.cluster.default_rpc_endpoint)
|
||||
|
||||
with reporter.step("Check replication chunks."):
|
||||
assert self.check_replication(rep_count, grpc_client, container, oid)
|
||||
assert self.check_replication(rep_count, remote_grpc_client, container, oid)
|
||||
|
||||
@allure.title("Lose node with chunk data")
|
||||
@pytest.mark.failover
|
||||
|
@ -159,6 +159,7 @@ class TestECReplication(ClusterTestBase):
|
|||
def test_lose_node_with_data_chunk(
|
||||
self,
|
||||
grpc_client: GrpcClientWrapper,
|
||||
remote_grpc_client: GrpcClientWrapper,
|
||||
simple_object_size: ObjectSize,
|
||||
cluster_state_controller: ClusterStateController,
|
||||
container: str,
|
||||
|
@ -169,10 +170,10 @@ class TestECReplication(ClusterTestBase):
|
|||
oid = grpc_client.object.put(test_file, container, self.cluster.default_rpc_endpoint)
|
||||
|
||||
with reporter.step("Check chunk replication on 4 nodes."):
|
||||
assert self.check_replication(4, grpc_client, container, oid)
|
||||
assert self.check_replication(4, remote_grpc_client, container, oid)
|
||||
|
||||
with reporter.step("Search node data chunk"):
|
||||
chunk = grpc_client.object.chunks.get_first_data(self.cluster.default_rpc_endpoint, container, oid=oid)
|
||||
chunk = remote_grpc_client.object.chunks.get_first_data(self.cluster.default_rpc_endpoint, container, oid=oid)
|
||||
chunk_node = grpc_client.object.chunks.get_chunk_node(self.cluster, chunk)
|
||||
|
||||
with reporter.step("Stop node with data chunk."):
|
||||
|
@ -184,7 +185,7 @@ class TestECReplication(ClusterTestBase):
|
|||
|
||||
with reporter.step("Start stopped node, and check replication chunks."):
|
||||
cluster_state_controller.start_node_host(chunk_node[0])
|
||||
self.wait_replication(4, grpc_client, container, oid)
|
||||
self.wait_replication(4, remote_grpc_client, container, oid)
|
||||
|
||||
@allure.title("Lose node with chunk parity")
|
||||
@pytest.mark.failover
|
||||
|
@ -192,6 +193,7 @@ class TestECReplication(ClusterTestBase):
|
|||
def test_lose_node_with_parity_chunk(
|
||||
self,
|
||||
grpc_client: GrpcClientWrapper,
|
||||
remote_grpc_client: GrpcClientWrapper,
|
||||
simple_object_size: ObjectSize,
|
||||
cluster_state_controller: ClusterStateController,
|
||||
container: str,
|
||||
|
@ -202,11 +204,11 @@ class TestECReplication(ClusterTestBase):
|
|||
oid = grpc_client.object.put(test_file, container, self.cluster.default_rpc_endpoint)
|
||||
|
||||
with reporter.step("Check chunk replication on 4 nodes."):
|
||||
assert self.check_replication(4, grpc_client, container, oid)
|
||||
assert self.check_replication(4, remote_grpc_client, container, oid)
|
||||
|
||||
with reporter.step("Search node with parity chunk"):
|
||||
chunk = grpc_client.object.chunks.get_parity(self.cluster.default_rpc_endpoint, container, oid=oid)
|
||||
chunk_node = grpc_client.object.chunks.get_chunk_node(self.cluster, chunk)[0]
|
||||
chunk = remote_grpc_client.object.chunks.get_parity(self.cluster.default_rpc_endpoint, container, oid=oid)
|
||||
chunk_node = remote_grpc_client.object.chunks.get_chunk_node(self.cluster, chunk)[0]
|
||||
|
||||
with reporter.step("Stop node parity chunk."):
|
||||
cluster_state_controller.stop_node_host(chunk_node, "hard")
|
||||
|
@ -217,7 +219,7 @@ class TestECReplication(ClusterTestBase):
|
|||
|
||||
with reporter.step("Start stoped node, and check replication chunks."):
|
||||
cluster_state_controller.start_node_host(chunk_node)
|
||||
self.wait_replication(4, grpc_client, container, oid)
|
||||
self.wait_replication(4, remote_grpc_client, container, oid)
|
||||
|
||||
@allure.title("Lose nodes with chunk data and parity")
|
||||
@pytest.mark.failover
|
||||
|
@ -225,6 +227,7 @@ class TestECReplication(ClusterTestBase):
|
|||
def test_lose_nodes_data_chunk_and_parity(
|
||||
self,
|
||||
grpc_client: GrpcClientWrapper,
|
||||
remote_grpc_client: GrpcClientWrapper,
|
||||
simple_object_size: ObjectSize,
|
||||
cluster_state_controller: ClusterStateController,
|
||||
container: str,
|
||||
|
@ -235,13 +238,13 @@ class TestECReplication(ClusterTestBase):
|
|||
oid = grpc_client.object.put(test_file, container, self.cluster.default_rpc_endpoint)
|
||||
|
||||
with reporter.step("Check count chunks, expect 4."):
|
||||
assert self.check_replication(4, grpc_client, container, oid)
|
||||
assert self.check_replication(4, remote_grpc_client, container, oid)
|
||||
|
||||
with reporter.step("Search node data chunk and node parity chunk"):
|
||||
data_chunk = grpc_client.object.chunks.get_first_data(self.cluster.default_rpc_endpoint, container, oid=oid)
|
||||
data_chunk_node = grpc_client.object.chunks.get_chunk_node(self.cluster, data_chunk)[0]
|
||||
parity_chunk = grpc_client.object.chunks.get_parity(self.cluster.default_rpc_endpoint, container, oid=oid)
|
||||
parity_chunk_node = grpc_client.object.chunks.get_chunk_node(self.cluster, parity_chunk)[0]
|
||||
data_chunk = remote_grpc_client.object.chunks.get_first_data(self.cluster.default_rpc_endpoint, container, oid=oid)
|
||||
data_chunk_node = remote_grpc_client.object.chunks.get_chunk_node(self.cluster, data_chunk)[0]
|
||||
parity_chunk = remote_grpc_client.object.chunks.get_parity(self.cluster.default_rpc_endpoint, container, oid=oid)
|
||||
parity_chunk_node = remote_grpc_client.object.chunks.get_chunk_node(self.cluster, parity_chunk)[0]
|
||||
|
||||
with reporter.step("Stop node with data chunk."):
|
||||
cluster_state_controller.stop_node_host(data_chunk_node, "hard")
|
||||
|
@ -252,7 +255,7 @@ class TestECReplication(ClusterTestBase):
|
|||
|
||||
with reporter.step("Start stopped host and check chunks."):
|
||||
cluster_state_controller.start_node_host(data_chunk_node)
|
||||
self.wait_replication(4, grpc_client, container, oid)
|
||||
self.wait_replication(4, remote_grpc_client, container, oid)
|
||||
|
||||
with reporter.step("Stop node with parity chunk and one all node."):
|
||||
cluster_state_controller.stop_node_host(data_chunk_node, "hard")
|
||||
|
@ -264,7 +267,7 @@ class TestECReplication(ClusterTestBase):
|
|||
|
||||
with reporter.step("Start stopped nodes and check replication chunk."):
|
||||
cluster_state_controller.start_stopped_hosts()
|
||||
self.wait_replication(4, grpc_client, container, oid)
|
||||
self.wait_replication(4, remote_grpc_client, container, oid)
|
||||
|
||||
@allure.title("Policer work with chunk")
|
||||
@pytest.mark.failover
|
||||
|
@ -273,6 +276,7 @@ class TestECReplication(ClusterTestBase):
|
|||
self,
|
||||
simple_object_size: ObjectSize,
|
||||
grpc_client: GrpcClientWrapper,
|
||||
remote_grpc_client: GrpcClientWrapper,
|
||||
cluster_state_controller: ClusterStateController,
|
||||
container: str,
|
||||
include_excluded_nodes: None,
|
||||
|
@ -282,12 +286,12 @@ class TestECReplication(ClusterTestBase):
|
|||
oid = grpc_client.object.put(test_file, container, self.cluster.default_rpc_endpoint)
|
||||
|
||||
with reporter.step("Check count chunks nodes on 3."):
|
||||
assert self.check_replication(3, grpc_client, container, oid)
|
||||
assert self.check_replication(3, remote_grpc_client, container, oid)
|
||||
|
||||
with reporter.step("Search node with chunk."):
|
||||
data_chunk = grpc_client.object.chunks.get_first_data(self.cluster.default_rpc_endpoint, container, oid=oid)
|
||||
node_data_chunk = grpc_client.object.chunks.get_chunk_node(self.cluster, data_chunk)[0]
|
||||
first_all_chunks = grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, container, oid)
|
||||
data_chunk = remote_grpc_client.object.chunks.get_first_data(self.cluster.default_rpc_endpoint, container, oid=oid)
|
||||
node_data_chunk = remote_grpc_client.object.chunks.get_chunk_node(self.cluster, data_chunk)[0]
|
||||
first_all_chunks = remote_grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, container, oid)
|
||||
|
||||
with reporter.step("Remove chunk node from network map"):
|
||||
cluster_state_controller.remove_node_from_netmap([node_data_chunk.storage_node])
|
||||
|
@ -300,10 +304,10 @@ class TestECReplication(ClusterTestBase):
|
|||
node = grpc_client.object.chunks.search_node_without_chunks(
|
||||
first_all_chunks, self.cluster, alive_node.storage_node.get_rpc_endpoint()
|
||||
)[0]
|
||||
self.wait_replication(3, grpc_client, container, oid)
|
||||
self.wait_replication(3, remote_grpc_client, container, oid)
|
||||
|
||||
with reporter.step("Get new chunks"):
|
||||
second_all_chunks = grpc_client.object.chunks.get_all(node.storage_node.get_rpc_endpoint(), container, oid)
|
||||
second_all_chunks = remote_grpc_client.object.chunks.get_all(node.storage_node.get_rpc_endpoint(), container, oid)
|
||||
|
||||
with reporter.step("Check that oid no change."):
|
||||
assert [chunk for chunk in second_all_chunks if data_chunk.object_id == chunk.object_id]
|
||||
|
@ -311,11 +315,17 @@ class TestECReplication(ClusterTestBase):
|
|||
with reporter.step("Include node in netmap"):
|
||||
cluster_state_controller.include_node_to_netmap(node_data_chunk.storage_node, alive_node.storage_node)
|
||||
|
||||
self.wait_sync_count_chunks_nodes(grpc_client, container, oid, 3)
|
||||
self.wait_sync_count_chunks_nodes(remote_grpc_client, container, oid, 3)
|
||||
|
||||
@allure.title("EC X.Y combinations (nodes={node_count},policy={ec_policy},size={object_size})")
|
||||
def test_create_container_with_difference_count_nodes(
|
||||
self, frostfs_cli: FrostfsCli, node_count: int, ec_policy: str, object_size: ObjectSize, grpc_client: GrpcClientWrapper
|
||||
self,
|
||||
frostfs_cli: FrostfsCli,
|
||||
node_count: int,
|
||||
ec_policy: str,
|
||||
object_size: ObjectSize,
|
||||
grpc_client: GrpcClientWrapper,
|
||||
remote_grpc_client: GrpcClientWrapper,
|
||||
) -> None:
|
||||
with reporter.step("Create container."):
|
||||
expected_chunks = int(ec_policy.split(" ")[1].split(".")[0]) + int(ec_policy.split(" ")[1].split(".")[1])
|
||||
|
@ -336,7 +346,7 @@ class TestECReplication(ClusterTestBase):
|
|||
oid = grpc_client.object.put(test_file, cid, self.cluster.default_rpc_endpoint)
|
||||
|
||||
with reporter.step("Check count object chunks."):
|
||||
chunks = grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, cid, oid)
|
||||
chunks = remote_grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, cid, oid)
|
||||
assert len(chunks) == expected_chunks
|
||||
|
||||
with reporter.step("get object and check hash."):
|
||||
|
@ -345,13 +355,15 @@ class TestECReplication(ClusterTestBase):
|
|||
|
||||
@allure.title("Request PUT with copies_number flag")
|
||||
@requires_container(PUBLIC_WITH_POLICY("EC 2.1"))
|
||||
def test_put_object_with_copies_number(self, container: str, grpc_client: GrpcClientWrapper, simple_object_size: ObjectSize) -> None:
|
||||
def test_put_object_with_copies_number(
|
||||
self, container: str, grpc_client: GrpcClientWrapper, remote_grpc_client: GrpcClientWrapper, simple_object_size: ObjectSize
|
||||
) -> None:
|
||||
with reporter.step("Put object in container with copies number = 1"):
|
||||
test_file = generate_file(simple_object_size.value)
|
||||
oid = grpc_client.object.put(test_file, container, self.cluster.default_rpc_endpoint, copies_number=1)
|
||||
|
||||
with reporter.step("Check that count chunks > 1."):
|
||||
chunks = grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, container, oid)
|
||||
chunks = remote_grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, container, oid)
|
||||
assert len(chunks) > 1
|
||||
|
||||
@allure.title("Request PUT and 1 node off")
|
||||
|
@ -373,13 +385,15 @@ class TestECReplication(ClusterTestBase):
|
|||
|
||||
@allure.title("Request PUT (size={object_size})")
|
||||
@requires_container(PUBLIC_WITH_POLICY("EC 2.1"))
|
||||
def test_put_object_with_ec_cnr(self, container: str, grpc_client: GrpcClientWrapper, object_size: ObjectSize) -> None:
|
||||
def test_put_object_with_ec_cnr(
|
||||
self, container: str, grpc_client: GrpcClientWrapper, remote_grpc_client: GrpcClientWrapper, object_size: ObjectSize
|
||||
) -> None:
|
||||
with reporter.step("Put object in container"):
|
||||
test_file = generate_file(object_size.value)
|
||||
oid = grpc_client.object.put(test_file, container, self.cluster.default_rpc_endpoint)
|
||||
|
||||
with reporter.step("Get chunks object."):
|
||||
chunks = grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, container, oid)
|
||||
chunks = remote_grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, container, oid)
|
||||
|
||||
with reporter.step("Check header chunks object"):
|
||||
for chunk in chunks:
|
||||
|
@ -390,14 +404,16 @@ class TestECReplication(ClusterTestBase):
|
|||
|
||||
@allure.title("Request GET (size={object_size})")
|
||||
@requires_container(PUBLIC_WITH_POLICY("EC 2.1 CBF 1"))
|
||||
def test_get_object_in_ec_cnr(self, container: str, grpc_client: GrpcClientWrapper, object_size: ObjectSize) -> None:
|
||||
def test_get_object_in_ec_cnr(
|
||||
self, container: str, grpc_client: GrpcClientWrapper, remote_grpc_client: GrpcClientWrapper, object_size: ObjectSize
|
||||
) -> None:
|
||||
with reporter.step("Put object in container"):
|
||||
test_file = generate_file(object_size.value)
|
||||
hash_origin_file = get_file_hash(test_file)
|
||||
oid = grpc_client.object.put(test_file, container, self.cluster.default_rpc_endpoint)
|
||||
|
||||
with reporter.step("Get id all chunks."):
|
||||
chunks = grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, container, oid)
|
||||
chunks = remote_grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, container, oid)
|
||||
|
||||
with reporter.step("Search chunk node and not chunks node."):
|
||||
chunk_node = grpc_client.object.chunks.get_chunk_node(self.cluster, chunks[0])[0]
|
||||
|
@ -428,26 +444,30 @@ class TestECReplication(ClusterTestBase):
|
|||
|
||||
@allure.title("Request SEARCH check valid chunk id (size={object_size})")
|
||||
@requires_container(PUBLIC_WITH_POLICY("EC 2.1"))
|
||||
def test_search_object_in_ec_cnr_chunk_id(self, container: str, grpc_client: GrpcClientWrapper, object_size: ObjectSize) -> None:
|
||||
def test_search_object_in_ec_cnr_chunk_id(
|
||||
self, container: str, grpc_client: GrpcClientWrapper, remote_grpc_client: GrpcClientWrapper, object_size: ObjectSize
|
||||
) -> None:
|
||||
with reporter.step("Put object in container"):
|
||||
test_file = generate_file(object_size.value)
|
||||
oid = grpc_client.object.put(test_file, container, self.cluster.default_rpc_endpoint)
|
||||
|
||||
with reporter.step("Search operation object"):
|
||||
search_output = grpc_client.object.search(container, self.cluster.default_rpc_endpoint)
|
||||
chunks = grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, container, oid)
|
||||
chunks = remote_grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, container, oid)
|
||||
for chunk in chunks:
|
||||
assert chunk.object_id in search_output
|
||||
|
||||
@allure.title("Request SEARCH check no chunk index info (size={object_size})")
|
||||
@requires_container(PUBLIC_WITH_POLICY("EC 2.1"))
|
||||
def test_search_object_in_ec_cnr(self, container: str, grpc_client: GrpcClientWrapper, object_size: ObjectSize) -> None:
|
||||
def test_search_object_in_ec_cnr(
|
||||
self, container: str, grpc_client: GrpcClientWrapper, remote_grpc_client: GrpcClientWrapper, object_size: ObjectSize
|
||||
) -> None:
|
||||
with reporter.step("Put object in container"):
|
||||
test_file = generate_file(object_size.value)
|
||||
oid = grpc_client.object.put(test_file, container, self.cluster.default_rpc_endpoint)
|
||||
|
||||
with reporter.step("Search operation all chunk"):
|
||||
chunks = grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, container, oid)
|
||||
chunks = remote_grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, container, oid)
|
||||
for chunk in chunks:
|
||||
chunk_search = grpc_client.object.search(container, self.cluster.default_rpc_endpoint, oid=chunk.object_id)
|
||||
assert "index" not in chunk_search
|
||||
|
@ -456,14 +476,19 @@ class TestECReplication(ClusterTestBase):
|
|||
@pytest.mark.failover
|
||||
@requires_container(PUBLIC_WITH_POLICY("EC 2.1"))
|
||||
def test_delete_object_in_ec_cnr(
|
||||
self, container: str, grpc_client: GrpcClientWrapper, object_size: ObjectSize, cluster_state_controller: ClusterStateController
|
||||
self,
|
||||
container: str,
|
||||
grpc_client: GrpcClientWrapper,
|
||||
remote_grpc_client: GrpcClientWrapper,
|
||||
object_size: ObjectSize,
|
||||
cluster_state_controller: ClusterStateController,
|
||||
) -> None:
|
||||
with reporter.step("Put object in container."):
|
||||
test_file = generate_file(object_size.value)
|
||||
oid = grpc_client.object.put(test_file, container, self.cluster.default_rpc_endpoint)
|
||||
|
||||
with reporter.step("Check object chunks nodes."):
|
||||
chunks = grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, container, oid)
|
||||
chunks = remote_grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, container, oid)
|
||||
replication_count = 3 if object_size.name == "simple" else 3 * 4
|
||||
assert len(chunks) == replication_count
|
||||
|
||||
|
@ -479,7 +504,7 @@ class TestECReplication(ClusterTestBase):
|
|||
oid_second = grpc_client.object.put(test_file, container, self.cluster.default_rpc_endpoint)
|
||||
|
||||
with reporter.step("Check second object chunks nodes."):
|
||||
chunks_second_object = grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, container, oid_second)
|
||||
chunks_second_object = remote_grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, container, oid_second)
|
||||
assert len(chunks_second_object) == replication_count
|
||||
|
||||
with reporter.step("Stop nodes with chunk."):
|
||||
|
@ -503,6 +528,7 @@ class TestECReplication(ClusterTestBase):
|
|||
container: str,
|
||||
test_file: TestFile,
|
||||
grpc_client: GrpcClientWrapper,
|
||||
remote_grpc_client: GrpcClientWrapper,
|
||||
frostfs_cli: FrostfsCli,
|
||||
object_size: ObjectSize,
|
||||
cluster_state_controller: ClusterStateController,
|
||||
|
@ -512,7 +538,7 @@ class TestECReplication(ClusterTestBase):
|
|||
oid = grpc_client.object.put(test_file, container, self.cluster.default_rpc_endpoint)
|
||||
|
||||
with reporter.step("Check object chunks nodes."):
|
||||
chunks_object = grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, container, oid)
|
||||
chunks_object = remote_grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, container, oid)
|
||||
replication_count = 3 if object_size.name == "simple" else 3 * 4
|
||||
assert len(chunks_object) == replication_count
|
||||
|
||||
|
@ -609,20 +635,23 @@ class TestECReplication(ClusterTestBase):
|
|||
assert len(container_nodes) == expected_nodes
|
||||
|
||||
@allure.title("Create container with EC policy and FILTER")
|
||||
def test_create_container_with_filter(self, grpc_client: GrpcClientWrapper, simple_object_size: ObjectSize) -> None:
|
||||
with reporter.step("Create Container."):
|
||||
policy = "EC 1.1 IN RUS SELECT 2 FROM RU AS RUS FILTER Country EQ Russia AS RU"
|
||||
cid = grpc_client.container.create(self.cluster.default_rpc_endpoint, policy=policy, await_mode=True)
|
||||
|
||||
@requires_container(PUBLIC_WITH_POLICY("EC 1.1 IN RUS SELECT 2 FROM RU AS RUS FILTER Country EQ Russia AS RU"))
|
||||
def test_create_container_with_filter(
|
||||
self,
|
||||
grpc_client: GrpcClientWrapper,
|
||||
remote_grpc_client: GrpcClientWrapper,
|
||||
simple_object_size: ObjectSize,
|
||||
container: str,
|
||||
) -> None:
|
||||
with reporter.step("Put object in container."):
|
||||
test_file = generate_file(simple_object_size.value)
|
||||
oid = grpc_client.object.put(test_file, cid, self.cluster.default_rpc_endpoint)
|
||||
oid = grpc_client.object.put(test_file, container, self.cluster.default_rpc_endpoint)
|
||||
|
||||
with reporter.step("Check object is decomposed exclusively on Russian nodes"):
|
||||
data_chunk = grpc_client.object.chunks.get_first_data(self.cluster.default_rpc_endpoint, cid, oid=oid)
|
||||
parity_chunk = grpc_client.object.chunks.get_parity(self.cluster.default_rpc_endpoint, cid, oid=oid)
|
||||
node_data_chunk = grpc_client.object.chunks.get_chunk_node(self.cluster, data_chunk)
|
||||
node_parity_chunk = grpc_client.object.chunks.get_chunk_node(self.cluster, parity_chunk)
|
||||
data_chunk = remote_grpc_client.object.chunks.get_first_data(self.cluster.default_rpc_endpoint, container, oid=oid)
|
||||
parity_chunk = remote_grpc_client.object.chunks.get_parity(self.cluster.default_rpc_endpoint, container, oid=oid)
|
||||
node_data_chunk = remote_grpc_client.object.chunks.get_chunk_node(self.cluster, data_chunk)
|
||||
node_parity_chunk = remote_grpc_client.object.chunks.get_chunk_node(self.cluster, parity_chunk)
|
||||
|
||||
for node in [node_data_chunk[1], node_parity_chunk[1]]:
|
||||
assert "Russia" in node.country
|
||||
|
@ -635,6 +664,7 @@ class TestECReplication(ClusterTestBase):
|
|||
container: str,
|
||||
restore_nodes_shards_mode: None,
|
||||
frostfs_cli: FrostfsCli,
|
||||
remote_frostfs_cli: FrostfsCli,
|
||||
grpc_client: GrpcClientWrapper,
|
||||
max_object_size: int,
|
||||
type: str,
|
||||
|
@ -645,7 +675,7 @@ class TestECReplication(ClusterTestBase):
|
|||
oid = grpc_client.object.put(test_file, container, self.cluster.default_rpc_endpoint)
|
||||
|
||||
with reporter.step("Get object chunks."):
|
||||
chunk = get_chunk(self, frostfs_cli, container, oid, self.cluster.default_rpc_endpoint)
|
||||
chunk = get_chunk(self, remote_frostfs_cli, container, oid, self.cluster.default_rpc_endpoint)
|
||||
chunk_node = grpc_client.object.chunks.get_chunk_node(self.cluster, chunk)
|
||||
frostfs_node_cli = self.get_node_cli(chunk_node[0], config=chunk_node[0].storage_node.get_remote_wallet_config_path())
|
||||
|
||||
|
@ -690,7 +720,7 @@ class TestECReplication(ClusterTestBase):
|
|||
test_file: TestFile,
|
||||
s3_client: S3ClientWrapper,
|
||||
bucket_container_resolver: BucketContainerResolver,
|
||||
grpc_client: GrpcClientWrapper,
|
||||
remote_grpc_client: GrpcClientWrapper,
|
||||
object_size: ObjectSize,
|
||||
) -> None:
|
||||
with reporter.step("Create bucket with EC location constrain"):
|
||||
|
@ -706,25 +736,27 @@ class TestECReplication(ClusterTestBase):
|
|||
|
||||
with reporter.step("Watch replication count chunks"):
|
||||
cid = bucket_container_resolver.resolve(self.cluster.cluster_nodes[0], bucket)
|
||||
chunks = grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, cid, bucket_object)
|
||||
chunks = remote_grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, cid, bucket_object)
|
||||
expect_chunks = 4 if object_size.name == "simple" else 16
|
||||
assert len(chunks) == expect_chunks
|
||||
|
||||
@allure.title("Replication chunk after drop (size={object_size})")
|
||||
@requires_container(PUBLIC_WITH_POLICY("EC 2.1 CBF 1"))
|
||||
def test_drop_chunk_and_replication(self, test_file: TestFile, container: str, grpc_client: GrpcClientWrapper, rep_count: int) -> None:
|
||||
def test_drop_chunk_and_replication(
|
||||
self, test_file: TestFile, container: str, grpc_client: GrpcClientWrapper, remote_grpc_client: GrpcClientWrapper, rep_count: int
|
||||
) -> None:
|
||||
with reporter.step("Put object"):
|
||||
oid = grpc_client.object.put(test_file, container, self.cluster.default_rpc_endpoint)
|
||||
|
||||
with reporter.step("Get all chunks"):
|
||||
data_chunk = grpc_client.object.chunks.get_first_data(self.cluster.default_rpc_endpoint, container, oid=oid)
|
||||
data_chunk = remote_grpc_client.object.chunks.get_first_data(self.cluster.default_rpc_endpoint, container, oid=oid)
|
||||
|
||||
with reporter.step("Search chunk node"):
|
||||
chunk_node = grpc_client.object.chunks.get_chunk_node(self.cluster, data_chunk)
|
||||
chunk_node = remote_grpc_client.object.chunks.get_chunk_node(self.cluster, data_chunk)
|
||||
shell_chunk_node = chunk_node[0].host.get_shell()
|
||||
|
||||
with reporter.step("Get replication count"):
|
||||
assert self.check_replication(rep_count, grpc_client, container, oid)
|
||||
assert self.check_replication(rep_count, remote_grpc_client, container, oid)
|
||||
|
||||
with reporter.step("Delete chunk"):
|
||||
frostfs_node_cli = FrostfsCli(
|
||||
|
@ -735,4 +767,4 @@ class TestECReplication(ClusterTestBase):
|
|||
frostfs_node_cli.control.drop_objects(chunk_node[0].storage_node.get_control_endpoint(), f"{container}/{data_chunk.object_id}")
|
||||
|
||||
with reporter.step("Wait replication count after drop one chunk"):
|
||||
self.wait_replication(rep_count, grpc_client, container, oid)
|
||||
self.wait_replication(rep_count, remote_grpc_client, container, oid)
|
||||
|
|
Loading…
Reference in a new issue