Compare commits

..

No commits in common. "master" and "master" have entirely different histories.

9 changed files with 122 additions and 217 deletions

View file

@ -15,7 +15,8 @@ def validate_object_policy(wallet: str, shell: Shell, placement_rule: str, cid:
def get_netmap_param(netmap_info: list[NodeNetmapInfo]) -> dict:
dict_external = dict()
for node in netmap_info:
dict_external[node.node] = {
external_adress = node.external_address[0].split("/")[2]
dict_external[external_adress] = {
"country": node.country,
"country_code": node.country_code,
"Price": node.price,

View file

@ -210,35 +210,12 @@ def frostfs_cli(client_shell: Shell, wallet: WalletInfo) -> FrostfsCli:
return FrostfsCli(client_shell, FROSTFS_CLI_EXEC, wallet.config_path)
@pytest.fixture(scope="session")
@allure.title("Init Frostfs CLI remote")
def remote_frostfs_cli(cluster: Cluster) -> FrostfsCli:
node = cluster.cluster_nodes[0]
host = node.host
service_config = host.get_service_config(node.storage_node.name)
wallet_path = service_config.attributes["wallet_path"]
wallet_password = service_config.attributes["wallet_password"]
shell = host.get_shell()
wallet_config_path = f"/tmp/{node.storage_node.name}-config.yaml"
wallet_config = f'wallet: {wallet_path}\npassword: "{wallet_password}"'
shell.exec(f"echo '{wallet_config}' > {wallet_config_path}")
cli = FrostfsCli(shell=shell, frostfs_cli_exec_path=FROSTFS_CLI_EXEC, config_file=wallet_config_path)
return cli
@pytest.fixture(scope="session")
@allure.title("Init GrpcClientWrapper with local Frostfs CLI")
def grpc_client(frostfs_cli: FrostfsCli) -> GrpcClientWrapper:
return CliClientWrapper(frostfs_cli)
@pytest.fixture(scope="session")
@allure.title("Init GrpcClientWrapper with remote Frostfs CLI")
def remote_grpc_client(remote_frostfs_cli: FrostfsCli) -> GrpcClientWrapper:
return CliClientWrapper(remote_frostfs_cli)
# By default we want all tests to be executed with both storage policies.
# This can be overriden in choosen tests if needed.
@pytest.fixture(scope="session", params=[pytest.param("rep", marks=pytest.mark.rep), pytest.param("ec", marks=pytest.mark.ec)])
@ -508,7 +485,6 @@ def user(user_tag: str) -> User:
@pytest.fixture(scope="session")
@cached_fixture(optionals.OPTIONAL_CACHE_FIXTURES)
def wallet(user: User, credentials_provider: CredentialsProvider, cluster: Cluster) -> WalletInfo:
credentials_provider.GRPC.provide(user, cluster.cluster_nodes[0])
return user.wallet

View file

@ -5,9 +5,7 @@ from frostfs_testlib.steps.cli.container import create_container, delete_contain
from frostfs_testlib.steps.cli.object import delete_object, put_object_to_random_node
from frostfs_testlib.steps.node_management import get_netmap_snapshot
from frostfs_testlib.steps.storage_policy import get_nodes_with_object
from frostfs_testlib.storage.cluster import Cluster
from frostfs_testlib.storage.dataclasses.object_size import ObjectSize
from frostfs_testlib.storage.dataclasses.storage_object_info import Interfaces
from frostfs_testlib.storage.dataclasses.wallet import WalletInfo
from frostfs_testlib.testing.cluster_test_base import ClusterTestBase
from frostfs_testlib.utils.cli_utils import parse_netmap_output
@ -171,7 +169,7 @@ class TestPolicy(ClusterTestBase):
with reporter.step(f"Check the object appearance"):
netmap = parse_netmap_output(get_netmap_snapshot(node=resulting_copies[0], shell=self.shell))
netmap = get_netmap_param(netmap)
node_address = self.cluster.node(resulting_copies[0]).get_interface(Interfaces.MGMT)
node_address = resulting_copies[0].get_rpc_endpoint().split(":")[0]
with reporter.step(f"Check the node is selected from {placement_params['country']}"):
assert (
placement_params["country"] == netmap[node_address]["country"]
@ -221,7 +219,7 @@ class TestPolicy(ClusterTestBase):
with reporter.step(f"Check the object appearance"):
netmap = parse_netmap_output(get_netmap_snapshot(node=resulting_copies[0], shell=self.shell))
netmap = get_netmap_param(netmap)
node_address = self.cluster.node(resulting_copies[0]).get_interface(Interfaces.MGMT)
node_address = resulting_copies[0].get_rpc_endpoint().split(":")[0]
with reporter.step(f"Check the node is selected from {placement_params['country'][0]}"):
assert (
not (placement_params["country"][1] == netmap[node_address]["country"])
@ -275,8 +273,8 @@ class TestPolicy(ClusterTestBase):
netmap = parse_netmap_output(get_netmap_snapshot(node=resulting_copies[0], shell=self.shell))
netmap = get_netmap_param(netmap)
with reporter.step(f"Check two nodes are selected from any country"):
for storage_node in resulting_copies:
node_address = self.cluster.node(storage_node).get_interface(Interfaces.MGMT)
for node in resulting_copies:
node_address = resulting_copies[0].get_rpc_endpoint().split(":")[0]
assert (placement_params["country"][1] == netmap[node_address]["country"]) or (
not (placement_params["country"][1] == netmap[node_address]["country"])
and (placement_params["country"][0] == netmap[node_address]["country"])
@ -328,8 +326,8 @@ class TestPolicy(ClusterTestBase):
netmap = parse_netmap_output(get_netmap_snapshot(node=resulting_copies[0], shell=self.shell))
netmap = get_netmap_param(netmap)
with reporter.step(f"Check two nodes are selected from {placement_params['country']}"):
for storage_node in resulting_copies:
node_address = self.cluster.node(storage_node).get_interface(Interfaces.MGMT)
for node in resulting_copies:
node_address = node.get_rpc_endpoint().split(":")[0]
assert (
placement_params["country"] == netmap[node_address]["country"]
), f"The node is selected from the wrong country. Got {netmap[node_address]['country']}"
@ -637,8 +635,8 @@ class TestPolicy(ClusterTestBase):
netmap = parse_netmap_output(get_netmap_snapshot(node=resulting_copies[0], shell=self.shell))
netmap = get_netmap_param(netmap)
with reporter.step(f"Check two nodes are selected from {' or '.join(placement_params['country'])}"):
for storage_node in resulting_copies:
node_address = self.cluster.node(storage_node).get_interface(Interfaces.MGMT)
for node in resulting_copies:
node_address = node.get_rpc_endpoint().split(":")[0]
assert (
(netmap[node_address]["country"] in placement_params["country"])
or (netmap[node_address]["country"] in placement_params["country"])
@ -758,8 +756,8 @@ class TestPolicy(ClusterTestBase):
netmap = parse_netmap_output(get_netmap_snapshot(node=resulting_copies[0], shell=self.shell))
netmap = get_netmap_param(netmap)
with reporter.step(f"Check two nodes are selected not from {placement_params['country'][0]}"):
for storage_node in resulting_copies:
node_address = self.cluster.node(storage_node).get_interface(Interfaces.MGMT)
for node in resulting_copies:
node_address = node.get_rpc_endpoint().split(":")[0]
assert not (placement_params["country"][0] == netmap[node_address]["country"]) or (
not (placement_params["country"][0] == netmap[node_address]["country"])
and not (placement_params["country_code"] == netmap[node_address]["country_code"])
@ -811,8 +809,8 @@ class TestPolicy(ClusterTestBase):
netmap = parse_netmap_output(get_netmap_snapshot(node=resulting_copies[0], shell=self.shell))
netmap = get_netmap_param(netmap)
with reporter.step(f"Check three nodes are selected from any country"):
for storage_node in resulting_copies:
node_address = self.cluster.node(storage_node).get_interface(Interfaces.MGMT)
for node in resulting_copies:
node_address = node.get_rpc_endpoint().split(":")[0]
assert (placement_params["country"][0] == netmap[node_address]["country"]) or (
not (placement_params["country"][0] == netmap[node_address]["country"])
and (placement_params["country"][1] == netmap[node_address]["country"])
@ -866,8 +864,8 @@ class TestPolicy(ClusterTestBase):
netmap = parse_netmap_output(get_netmap_snapshot(node=resulting_copies[0], shell=self.shell))
netmap = get_netmap_param(netmap)
list_of_location = []
for storage_node in resulting_copies:
node_address = self.cluster.node(storage_node).get_interface(Interfaces.MGMT)
for node in resulting_copies:
node_address = node.get_rpc_endpoint().split(":")[0]
list_of_location.append(netmap[node_address]["location"])
with reporter.step(f"Check two or three nodes are selected from Russia and from any other country"):

View file

@ -9,12 +9,11 @@ from frostfs_testlib.steps.cli.container import delete_container
from frostfs_testlib.steps.cli.object import delete_object, put_object_to_random_node
from frostfs_testlib.steps.node_management import get_netmap_snapshot
from frostfs_testlib.steps.storage_policy import get_nodes_with_object
from frostfs_testlib.storage.cluster import Cluster, ClusterNode
from frostfs_testlib.storage.cluster import Cluster
from frostfs_testlib.storage.controllers.cluster_state_controller import ClusterStateController
from frostfs_testlib.storage.controllers.state_managers.config_state_manager import ConfigStateManager
from frostfs_testlib.storage.dataclasses.frostfs_services import StorageNode
from frostfs_testlib.storage.dataclasses.object_size import ObjectSize
from frostfs_testlib.storage.dataclasses.storage_object_info import Interfaces, NodeNetmapInfo
from frostfs_testlib.storage.dataclasses.wallet import WalletInfo
from frostfs_testlib.testing import parallel
from frostfs_testlib.testing.cluster_test_base import ClusterTestBase
@ -35,36 +34,23 @@ class TestPolicyWithPrice(ClusterTestBase):
def await_for_price_attribute_on_nodes(self):
netmap = parse_netmap_output(get_netmap_snapshot(node=self.cluster.storage_nodes[0], shell=self.shell))
netmap = get_netmap_param(netmap)
for node in self.cluster.cluster_nodes:
node_address = node.get_interface(Interfaces.MGMT)
for node in self.cluster.storage_nodes:
node_address = node.get_rpc_endpoint().split(":")[0]
if netmap[node_address]["Price"] is None:
return False
return True
@reporter.step("Set Pirce field on {cluster_node}")
def set_price_on_node(
self, cluster_node: ClusterNode, locode_price_map: dict[str, str], netmap: list[NodeNetmapInfo], config_manager: ConfigStateManager
):
node_address = cluster_node.get_interface(Interfaces.MGMT)
node_netmap = [netmap_entry for netmap_entry in netmap if netmap_entry.node == node_address]
assert node_netmap, f"No node found with address {node_address}: \n{netmap}"
price = locode_price_map[node_netmap[0].un_locode]
config_manager.set_on_node(cluster_node, StorageNode, {"node:attribute_5": f"Price:{price}"})
@pytest.fixture(scope="module")
def fill_field_price(self, cluster: Cluster, cluster_state_controller_session: ClusterStateController):
locode_price_map = {
"RU MOW": "15",
"RU LED": "10",
"SE STO": "65",
"FI HEL": "55",
}
prices = ["15", "10", "65", "55"]
netmap = parse_netmap_output(get_netmap_snapshot(node=self.cluster.storage_nodes[0], shell=self.shell))
config_manager = cluster_state_controller_session.manager(ConfigStateManager)
parallel(self.set_price_on_node, cluster.cluster_nodes, locode_price_map, netmap, config_manager)
parallel(
config_manager.set_on_node,
cluster.cluster_nodes,
StorageNode,
itertools.cycle([{"node:attribute_5": f"Price:{price}"} for price in prices]),
)
cluster_state_controller_session.wait_after_storage_startup()
self.tick_epoch()
@ -122,7 +108,7 @@ class TestPolicyWithPrice(ClusterTestBase):
with reporter.step(f"Check the object appearance"):
netmap = parse_netmap_output(get_netmap_snapshot(node=resulting_copies[0], shell=self.shell))
netmap = get_netmap_param(netmap)
node_address = self.cluster.node(resulting_copies[0]).get_interface(Interfaces.MGMT)
node_address = resulting_copies[0].get_rpc_endpoint().split(":")[0]
with reporter.step(f"Check the node is selected with price <= {placement_params['Price']}"):
assert (
int(netmap[node_address]["Price"]) <= placement_params["Price"]
@ -174,7 +160,7 @@ class TestPolicyWithPrice(ClusterTestBase):
netmap = get_netmap_param(netmap)
with reporter.step(f"Check the node is selected with price between 1 and 10"):
for node in resulting_copies:
node_address = self.cluster.node(node).get_interface(Interfaces.MGMT)
node_address = node.get_rpc_endpoint().split(":")[0]
assert (
int(netmap[node_address]["Price"]) > placement_params["Price"][1]
and int(netmap[node_address]["Price"]) <= placement_params["Price"][0]
@ -226,7 +212,7 @@ class TestPolicyWithPrice(ClusterTestBase):
netmap = get_netmap_param(netmap)
with reporter.step(f"Check two nodes are selected with max and min prices"):
for node in resulting_copies:
node_address = self.cluster.node(node).get_interface(Interfaces.MGMT)
node_address = node.get_rpc_endpoint().split(":")[0]
assert (
int(netmap[node_address]["Price"]) > placement_params["Price"][1]
or int(netmap[node_address]["Price"]) < placement_params["Price"][0]
@ -274,7 +260,7 @@ class TestPolicyWithPrice(ClusterTestBase):
netmap = get_netmap_param(netmap)
with reporter.step(f"Check two nodes are selected with price > {placement_params['Price']}"):
for node in resulting_copies:
node_address = self.cluster.node(node).get_interface(Interfaces.MGMT)
node_address = node.get_rpc_endpoint().split(":")[0]
assert (
int(netmap[node_address]["Price"]) > placement_params["Price"]
), f"The node is selected with the wrong price. Got {netmap[node_address]}"
@ -325,7 +311,7 @@ class TestPolicyWithPrice(ClusterTestBase):
netmap = get_netmap_param(netmap)
with reporter.step(f"Check two nodes are selected not with country code '{placement_params['country_code']}'"):
for node in resulting_copies:
node_address = self.cluster.node(node).get_interface(Interfaces.MGMT)
node_address = node.get_rpc_endpoint().split(":")[0]
assert (
not netmap[node_address]["country_code"] == placement_params["country_code"]
or not netmap[node_address]["country_code"] == placement_params["country_code"]
@ -378,7 +364,7 @@ class TestPolicyWithPrice(ClusterTestBase):
netmap = get_netmap_param(netmap)
with reporter.step(f"Check all nodes are selected"):
for node in resulting_copies:
node_address = self.cluster.node(node).get_interface(Interfaces.MGMT)
node_address = node.get_rpc_endpoint().split(":")[0]
assert (
netmap[node_address]["un_locode"] in placement_params["un_locode"]
or not netmap[node_address]["un_locode"] == placement_params["un_locode"][1]
@ -434,7 +420,7 @@ class TestPolicyWithPrice(ClusterTestBase):
netmap = get_netmap_param(netmap)
with reporter.step(f"Check two nodes are selected with price < {placement_params['Price']}"):
for node in resulting_copies:
node_address = self.cluster.node(node).get_interface(Interfaces.MGMT)
node_address = node.get_rpc_endpoint().split(":")[0]
assert (
int(netmap[node_address]["Price"]) < placement_params["Price"]
), f"The node is selected with the wrong price. Got {netmap[node_address]}"
@ -485,7 +471,7 @@ class TestPolicyWithPrice(ClusterTestBase):
netmap = get_netmap_param(netmap)
with reporter.step(f"Check three nodes are selected not from {placement_params['continent']}"):
for node in resulting_copies:
node_address = self.cluster.node(node).get_interface(Interfaces.MGMT)
node_address = node.get_rpc_endpoint().split(":")[0]
assert (
int(netmap[node_address]["Price"]) < placement_params["Price"]
and not netmap[node_address]["continent"] == placement_params["continent"]
@ -539,7 +525,7 @@ class TestPolicyWithPrice(ClusterTestBase):
netmap = get_netmap_param(netmap)
with reporter.step(f"Check all nodes are selected"):
for node in resulting_copies:
node_address = self.cluster.node(node).get_interface(Interfaces.MGMT)
node_address = node.get_rpc_endpoint().split(":")[0]
assert (
(
int(netmap[node_address]["Price"]) > placement_params["Price"][1]
@ -590,7 +576,7 @@ class TestPolicyWithPrice(ClusterTestBase):
with reporter.step(f"Check the object appearance"):
netmap = parse_netmap_output(get_netmap_snapshot(node=resulting_copies[0], shell=self.shell))
netmap = get_netmap_param(netmap)
node_address = self.cluster.node(resulting_copies[0]).get_interface(Interfaces.MGMT)
node_address = resulting_copies[0].get_rpc_endpoint().split(":")[0]
with reporter.step(f"Check the node is selected with price >= {placement_params['Price']}"):
assert (
int(netmap[node_address]["Price"]) >= placement_params["Price"]
@ -642,7 +628,7 @@ class TestPolicyWithPrice(ClusterTestBase):
netmap = get_netmap_param(netmap)
with reporter.step(f"Check all node are selected"):
for node in resulting_copies:
node_address = self.cluster.node(node).get_interface(Interfaces.MGMT)
node_address = node.get_rpc_endpoint().split(":")[0]
assert (netmap[node_address]["country"] in placement_params["country"]) or (
int(netmap[node_address]["Price"]) >= placement_params["Price"]
), f"The node is selected from the wrong country or with wrong price. Got {netmap[node_address]}"

View file

@ -309,11 +309,6 @@ class TestFailoverNetwork(ClusterTestBase):
block_interface: Interfaces,
other_interface: Interfaces,
):
endpoint_id_map = {
Interfaces.DATA_O: 0,
Interfaces.DATA_1: 1,
}
endpoint_id = endpoint_id_map[other_interface]
cluster_nodes = self.cluster.cluster_nodes
with reporter.step(f"Block {block_interface.value} interfaces"):
cluster_state_controller.down_interface(cluster_nodes, block_interface.value)
@ -325,27 +320,28 @@ class TestFailoverNetwork(ClusterTestBase):
cid = create_container(
wallet=default_wallet,
shell=self.shell,
endpoint=cluster_nodes[0].storage_node.get_all_rpc_endpoint()[endpoint_id],
endpoint=f"{cluster_nodes[0].get_data_interface(other_interface.value)[0]}:8080",
rule="REP 4 CBF 1",
)
with reporter.step("Put object"):
file_path = generate_file(simple_object_size.value)
oid = put_object(
wallet=default_wallet,
path=file_path,
cid=cid,
shell=self.shell,
endpoint=cluster_nodes[0].storage_node.get_all_rpc_endpoint()[endpoint_id],
endpoint=f"{cluster_nodes[0].get_data_interface(other_interface.value)[0]}:8080",
)
with reporter.step("Get object"):
get_object(
file_get_path = get_object(
wallet=default_wallet,
cid=cid,
oid=oid,
shell=self.shell,
endpoint=cluster_nodes[0].storage_node.get_all_rpc_endpoint()[endpoint_id],
endpoint=f"{cluster_nodes[0].get_data_interface(other_interface.value)[0]}:8080",
)
with reporter.step("Restore interfaces all nodes"):

View file

@ -36,7 +36,7 @@ from frostfs_testlib.steps.storage_policy import get_nodes_with_object
from frostfs_testlib.storage.cluster import ClusterNode, StorageNode
from frostfs_testlib.storage.controllers import ClusterStateController
from frostfs_testlib.storage.dataclasses.object_size import ObjectSize
from frostfs_testlib.storage.dataclasses.storage_object_info import Interfaces, NodeStatus
from frostfs_testlib.storage.dataclasses.storage_object_info import NodeStatus
from frostfs_testlib.storage.dataclasses.wallet import WalletInfo
from frostfs_testlib.testing.cluster_test_base import ClusterTestBase
from frostfs_testlib.utils import string_utils
@ -343,11 +343,9 @@ class TestMaintenanceMode(ClusterTestBase):
def check_node_status(self, expected_status: NodeStatus, node_under_test: ClusterNode, frostfs_cli: FrostfsCli, rpc_endpoint: str):
netmap = frostfs_cli.netmap.snapshot(rpc_endpoint).stdout
all_snapshots = NetmapParser.snapshot_all_nodes(netmap)
node_snapshot = [snapshot for snapshot in all_snapshots if node_under_test.get_interface(Interfaces.MGMT) == snapshot.node]
node_snapshot = [snapshot for snapshot in all_snapshots if node_under_test.host_ip == snapshot.node]
if expected_status == NodeStatus.OFFLINE and not node_snapshot:
assert (
node_under_test.get_interface(Interfaces.MGMT) not in netmap
), f"{node_under_test} status should be {expected_status}. See netmap:\n{netmap}"
assert node_under_test.host_ip not in netmap, f"{node_under_test} status should be {expected_status}. See netmap:\n{netmap}"
return
assert node_snapshot, f"{node_under_test} status should be {expected_status}, but was not in netmap. See netmap:\n{netmap}"

View file

@ -31,7 +31,6 @@ class TestGRPCMetrics(ClusterTestBase):
@allure.title("GRPC metrics container operations")
def test_grpc_metrics_container_operations(self, default_wallet: WalletInfo, cluster: Cluster):
operations_count = 10
placement_policy = "REP 2 IN X CBF 1 SELECT 4 FROM * AS X"
with reporter.step("Select random node"):
@ -57,14 +56,12 @@ class TestGRPCMetrics(ClusterTestBase):
metrics_counter_get = get_metrics_value(node, command="grpc_server_handled_total", service="ContainerService", method="Get")
with reporter.step(f"Get container"):
for _ in range(operations_count):
get_container(default_wallet, cid, self.shell, node.storage_node.get_rpc_endpoint())
get_container(default_wallet, cid, self.shell, node.storage_node.get_rpc_endpoint())
with reporter.step(f"Check gRPC metrics method=Get, 'the counter should increase by 1'"):
metrics_counter_get += operations_count
metrics_counter_get += 1
check_metrics_counter(
[node],
operator=">=",
counter_exp=metrics_counter_get,
command="grpc_server_handled_total",
service="ContainerService",
@ -75,14 +72,12 @@ class TestGRPCMetrics(ClusterTestBase):
metrics_counter_list = get_metrics_value(node, command="grpc_server_handled_total", service="ContainerService", method="List")
with reporter.step(f"Get container list"):
for _ in range(operations_count):
list_containers(default_wallet, self.shell, node.storage_node.get_rpc_endpoint())
list_containers(default_wallet, self.shell, node.storage_node.get_rpc_endpoint())
with reporter.step(f"Check gRPC metrics method=List, 'the counter should increase by 1'"):
metrics_counter_list += operations_count
metrics_counter_list += 1
check_metrics_counter(
[node],
operator=">=",
counter_exp=metrics_counter_list,
command="grpc_server_handled_total",
service="ContainerService",
@ -93,7 +88,6 @@ class TestGRPCMetrics(ClusterTestBase):
def test_grpc_metrics_object_operations(
self, simple_object_size: ObjectSize, default_wallet: WalletInfo, cluster: Cluster, container: str, disable_policer
):
operations_count = 10
file_path = generate_file(simple_object_size.value)
with reporter.step("Select random node"):
@ -103,14 +97,12 @@ class TestGRPCMetrics(ClusterTestBase):
metrics_counter_put = get_metrics_value(node, command="grpc_server_handled_total", service="ObjectService", method="Put")
with reporter.step("Put object to selected node"):
for _ in range(operations_count):
oid = put_object(default_wallet, file_path, container, self.shell, node.storage_node.get_rpc_endpoint())
oid = put_object(default_wallet, file_path, container, self.shell, node.storage_node.get_rpc_endpoint())
with reporter.step(f"Check gRPC metrics method 'Put', 'the counter should increase by {operations_count}'"):
metrics_counter_put += operations_count
with reporter.step(f"Check gRPC metrics method 'Put', 'the counter should increase by 1'"):
metrics_counter_put += 1
check_metrics_counter(
[node],
operator=">=",
counter_exp=metrics_counter_put,
command="grpc_server_handled_total",
service="ObjectService",
@ -121,14 +113,12 @@ class TestGRPCMetrics(ClusterTestBase):
metrics_counter_get = get_metrics_value(node, command="grpc_server_handled_total", service="ObjectService", method="Get")
with reporter.step(f"Get object"):
for _ in range(operations_count):
get_object(default_wallet, container, oid, self.shell, node.storage_node.get_rpc_endpoint())
get_object(default_wallet, container, oid, self.shell, node.storage_node.get_rpc_endpoint())
with reporter.step(f"Check gRPC metrics method=Get, 'the counter should increase by {operations_count}'"):
metrics_counter_get += operations_count
with reporter.step(f"Check gRPC metrics method=Get, 'the counter should increase by 1'"):
metrics_counter_get += 1
check_metrics_counter(
[node],
operator=">=",
counter_exp=metrics_counter_get,
command="grpc_server_handled_total",
service="ObjectService",
@ -139,14 +129,12 @@ class TestGRPCMetrics(ClusterTestBase):
metrics_counter_search = get_metrics_value(node, command="grpc_server_handled_total", service="ObjectService", method="Search")
with reporter.step(f"Search object"):
for _ in range(operations_count):
search_object(default_wallet, container, self.shell, node.storage_node.get_rpc_endpoint())
search_object(default_wallet, container, self.shell, node.storage_node.get_rpc_endpoint())
with reporter.step(f"Check gRPC metrics method=Search, 'the counter should increase by {operations_count}'"):
metrics_counter_search += operations_count
with reporter.step(f"Check gRPC metrics method=Search, 'the counter should increase by 1'"):
metrics_counter_search += 1
check_metrics_counter(
[node],
operator=">=",
counter_exp=metrics_counter_search,
command="grpc_server_handled_total",
service="ObjectService",
@ -157,14 +145,12 @@ class TestGRPCMetrics(ClusterTestBase):
metrics_counter_head = get_metrics_value(node, command="grpc_server_handled_total", service="ObjectService", method="Head")
with reporter.step(f"Head object"):
for _ in range(operations_count):
head_object(default_wallet, container, oid, self.shell, node.storage_node.get_rpc_endpoint())
head_object(default_wallet, container, oid, self.shell, node.storage_node.get_rpc_endpoint())
with reporter.step(f"Check gRPC metrics method=Head, 'the counter should increase by {operations_count}'"):
metrics_counter_head += operations_count
with reporter.step(f"Check gRPC metrics method=Head, 'the counter should increase by 1'"):
metrics_counter_head += 1
check_metrics_counter(
[node],
operator=">=",
counter_exp=metrics_counter_head,
command="grpc_server_handled_total",
service="ObjectService",

View file

@ -733,11 +733,9 @@ class TestObjectApiPatch(ClusterTestBase):
@allure.title("[NEGATIVE] Patch cannot be applied to part of complex object (policy={placement_policy})")
@pytest.mark.parametrize("placement_policy", ["rep"], indirect=True)
@pytest.mark.parametrize("object_size", ["complex"], indirect=True)
def test_patch_part_of_complex_object_rep(
self, grpc_client: GrpcClientWrapper, remote_grpc_client: GrpcClientWrapper, container: str, original_object: str
):
def test_patch_part_of_complex_object_rep(self, grpc_client: GrpcClientWrapper, container: str, original_object: str):
with reporter.step("Get parts of complex object"):
parts = remote_grpc_client.object.parts(container, original_object, self.cluster.cluster_nodes[0])
parts = grpc_client.object.parts(container, original_object, self.cluster.cluster_nodes[0])
assert parts, f"Expected list of OIDs of object parts: {parts}"
part_oid = parts[0]
@ -754,11 +752,9 @@ class TestObjectApiPatch(ClusterTestBase):
@allure.title("[NEGATIVE] Patch cannot be applied to EC chunk (object_size={object_size}, policy={placement_policy})")
@pytest.mark.parametrize("placement_policy", ["ec"], indirect=True)
def test_patch_ec_chunk(
self, grpc_client: GrpcClientWrapper, remote_grpc_client: GrpcClientWrapper, container: str, original_object: str
):
def test_patch_ec_chunk(self, grpc_client: GrpcClientWrapper, container: str, original_object: str):
with reporter.step("Get chunks of object"):
chunks = remote_grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, container, original_object)
chunks = grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, container, original_object)
assert chunks, f"Expected object chunks, but they are not there: {chunks}"
with reporter.step("Try patch chunk of object and catch exception"):

View file

@ -145,13 +145,13 @@ class TestECReplication(ClusterTestBase):
@allure.title("Create container with EC policy (size={object_size})")
@requires_container(PUBLIC_WITH_POLICY("EC 2.1"))
def test_create_container_with_ec_policy(
self, container: str, rep_count: int, grpc_client: GrpcClientWrapper, remote_grpc_client: GrpcClientWrapper, test_file: TestFile
self, container: str, rep_count: int, grpc_client: GrpcClientWrapper, test_file: TestFile
) -> None:
with reporter.step("Put object in container."):
oid = grpc_client.object.put(test_file, container, self.cluster.default_rpc_endpoint)
with reporter.step("Check replication chunks."):
assert self.check_replication(rep_count, remote_grpc_client, container, oid)
assert self.check_replication(rep_count, grpc_client, container, oid)
@allure.title("Lose node with chunk data")
@pytest.mark.failover
@ -159,7 +159,6 @@ class TestECReplication(ClusterTestBase):
def test_lose_node_with_data_chunk(
self,
grpc_client: GrpcClientWrapper,
remote_grpc_client: GrpcClientWrapper,
simple_object_size: ObjectSize,
cluster_state_controller: ClusterStateController,
container: str,
@ -170,10 +169,10 @@ class TestECReplication(ClusterTestBase):
oid = grpc_client.object.put(test_file, container, self.cluster.default_rpc_endpoint)
with reporter.step("Check chunk replication on 4 nodes."):
assert self.check_replication(4, remote_grpc_client, container, oid)
assert self.check_replication(4, grpc_client, container, oid)
with reporter.step("Search node data chunk"):
chunk = remote_grpc_client.object.chunks.get_first_data(self.cluster.default_rpc_endpoint, container, oid=oid)
chunk = grpc_client.object.chunks.get_first_data(self.cluster.default_rpc_endpoint, container, oid=oid)
chunk_node = grpc_client.object.chunks.get_chunk_node(self.cluster, chunk)
with reporter.step("Stop node with data chunk."):
@ -185,7 +184,7 @@ class TestECReplication(ClusterTestBase):
with reporter.step("Start stopped node, and check replication chunks."):
cluster_state_controller.start_node_host(chunk_node[0])
self.wait_replication(4, remote_grpc_client, container, oid)
self.wait_replication(4, grpc_client, container, oid)
@allure.title("Lose node with chunk parity")
@pytest.mark.failover
@ -193,7 +192,6 @@ class TestECReplication(ClusterTestBase):
def test_lose_node_with_parity_chunk(
self,
grpc_client: GrpcClientWrapper,
remote_grpc_client: GrpcClientWrapper,
simple_object_size: ObjectSize,
cluster_state_controller: ClusterStateController,
container: str,
@ -204,11 +202,11 @@ class TestECReplication(ClusterTestBase):
oid = grpc_client.object.put(test_file, container, self.cluster.default_rpc_endpoint)
with reporter.step("Check chunk replication on 4 nodes."):
assert self.check_replication(4, remote_grpc_client, container, oid)
assert self.check_replication(4, grpc_client, container, oid)
with reporter.step("Search node with parity chunk"):
chunk = remote_grpc_client.object.chunks.get_parity(self.cluster.default_rpc_endpoint, container, oid=oid)
chunk_node = remote_grpc_client.object.chunks.get_chunk_node(self.cluster, chunk)[0]
chunk = grpc_client.object.chunks.get_parity(self.cluster.default_rpc_endpoint, container, oid=oid)
chunk_node = grpc_client.object.chunks.get_chunk_node(self.cluster, chunk)[0]
with reporter.step("Stop node parity chunk."):
cluster_state_controller.stop_node_host(chunk_node, "hard")
@ -219,7 +217,7 @@ class TestECReplication(ClusterTestBase):
with reporter.step("Start stoped node, and check replication chunks."):
cluster_state_controller.start_node_host(chunk_node)
self.wait_replication(4, remote_grpc_client, container, oid)
self.wait_replication(4, grpc_client, container, oid)
@allure.title("Lose nodes with chunk data and parity")
@pytest.mark.failover
@ -227,7 +225,6 @@ class TestECReplication(ClusterTestBase):
def test_lose_nodes_data_chunk_and_parity(
self,
grpc_client: GrpcClientWrapper,
remote_grpc_client: GrpcClientWrapper,
simple_object_size: ObjectSize,
cluster_state_controller: ClusterStateController,
container: str,
@ -238,13 +235,13 @@ class TestECReplication(ClusterTestBase):
oid = grpc_client.object.put(test_file, container, self.cluster.default_rpc_endpoint)
with reporter.step("Check count chunks, expect 4."):
assert self.check_replication(4, remote_grpc_client, container, oid)
assert self.check_replication(4, grpc_client, container, oid)
with reporter.step("Search node data chunk and node parity chunk"):
data_chunk = remote_grpc_client.object.chunks.get_first_data(self.cluster.default_rpc_endpoint, container, oid=oid)
data_chunk_node = remote_grpc_client.object.chunks.get_chunk_node(self.cluster, data_chunk)[0]
parity_chunk = remote_grpc_client.object.chunks.get_parity(self.cluster.default_rpc_endpoint, container, oid=oid)
parity_chunk_node = remote_grpc_client.object.chunks.get_chunk_node(self.cluster, parity_chunk)[0]
data_chunk = grpc_client.object.chunks.get_first_data(self.cluster.default_rpc_endpoint, container, oid=oid)
data_chunk_node = grpc_client.object.chunks.get_chunk_node(self.cluster, data_chunk)[0]
parity_chunk = grpc_client.object.chunks.get_parity(self.cluster.default_rpc_endpoint, container, oid=oid)
parity_chunk_node = grpc_client.object.chunks.get_chunk_node(self.cluster, parity_chunk)[0]
with reporter.step("Stop node with data chunk."):
cluster_state_controller.stop_node_host(data_chunk_node, "hard")
@ -255,7 +252,7 @@ class TestECReplication(ClusterTestBase):
with reporter.step("Start stopped host and check chunks."):
cluster_state_controller.start_node_host(data_chunk_node)
self.wait_replication(4, remote_grpc_client, container, oid)
self.wait_replication(4, grpc_client, container, oid)
with reporter.step("Stop node with parity chunk and one all node."):
cluster_state_controller.stop_node_host(data_chunk_node, "hard")
@ -267,7 +264,7 @@ class TestECReplication(ClusterTestBase):
with reporter.step("Start stopped nodes and check replication chunk."):
cluster_state_controller.start_stopped_hosts()
self.wait_replication(4, remote_grpc_client, container, oid)
self.wait_replication(4, grpc_client, container, oid)
@allure.title("Policer work with chunk")
@pytest.mark.failover
@ -276,7 +273,6 @@ class TestECReplication(ClusterTestBase):
self,
simple_object_size: ObjectSize,
grpc_client: GrpcClientWrapper,
remote_grpc_client: GrpcClientWrapper,
cluster_state_controller: ClusterStateController,
container: str,
include_excluded_nodes: None,
@ -286,12 +282,12 @@ class TestECReplication(ClusterTestBase):
oid = grpc_client.object.put(test_file, container, self.cluster.default_rpc_endpoint)
with reporter.step("Check count chunks nodes on 3."):
assert self.check_replication(3, remote_grpc_client, container, oid)
assert self.check_replication(3, grpc_client, container, oid)
with reporter.step("Search node with chunk."):
data_chunk = remote_grpc_client.object.chunks.get_first_data(self.cluster.default_rpc_endpoint, container, oid=oid)
node_data_chunk = remote_grpc_client.object.chunks.get_chunk_node(self.cluster, data_chunk)[0]
first_all_chunks = remote_grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, container, oid)
data_chunk = grpc_client.object.chunks.get_first_data(self.cluster.default_rpc_endpoint, container, oid=oid)
node_data_chunk = grpc_client.object.chunks.get_chunk_node(self.cluster, data_chunk)[0]
first_all_chunks = grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, container, oid)
with reporter.step("Remove chunk node from network map"):
cluster_state_controller.remove_node_from_netmap([node_data_chunk.storage_node])
@ -304,10 +300,10 @@ class TestECReplication(ClusterTestBase):
node = grpc_client.object.chunks.search_node_without_chunks(
first_all_chunks, self.cluster, alive_node.storage_node.get_rpc_endpoint()
)[0]
self.wait_replication(3, remote_grpc_client, container, oid)
self.wait_replication(3, grpc_client, container, oid)
with reporter.step("Get new chunks"):
second_all_chunks = remote_grpc_client.object.chunks.get_all(node.storage_node.get_rpc_endpoint(), container, oid)
second_all_chunks = grpc_client.object.chunks.get_all(node.storage_node.get_rpc_endpoint(), container, oid)
with reporter.step("Check that oid no change."):
assert [chunk for chunk in second_all_chunks if data_chunk.object_id == chunk.object_id]
@ -315,17 +311,11 @@ class TestECReplication(ClusterTestBase):
with reporter.step("Include node in netmap"):
cluster_state_controller.include_node_to_netmap(node_data_chunk.storage_node, alive_node.storage_node)
self.wait_sync_count_chunks_nodes(remote_grpc_client, container, oid, 3)
self.wait_sync_count_chunks_nodes(grpc_client, container, oid, 3)
@allure.title("EC X.Y combinations (nodes={node_count},policy={ec_policy},size={object_size})")
def test_create_container_with_difference_count_nodes(
self,
frostfs_cli: FrostfsCli,
node_count: int,
ec_policy: str,
object_size: ObjectSize,
grpc_client: GrpcClientWrapper,
remote_grpc_client: GrpcClientWrapper,
self, frostfs_cli: FrostfsCli, node_count: int, ec_policy: str, object_size: ObjectSize, grpc_client: GrpcClientWrapper
) -> None:
with reporter.step("Create container."):
expected_chunks = int(ec_policy.split(" ")[1].split(".")[0]) + int(ec_policy.split(" ")[1].split(".")[1])
@ -346,7 +336,7 @@ class TestECReplication(ClusterTestBase):
oid = grpc_client.object.put(test_file, cid, self.cluster.default_rpc_endpoint)
with reporter.step("Check count object chunks."):
chunks = remote_grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, cid, oid)
chunks = grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, cid, oid)
assert len(chunks) == expected_chunks
with reporter.step("get object and check hash."):
@ -355,15 +345,13 @@ class TestECReplication(ClusterTestBase):
@allure.title("Request PUT with copies_number flag")
@requires_container(PUBLIC_WITH_POLICY("EC 2.1"))
def test_put_object_with_copies_number(
self, container: str, grpc_client: GrpcClientWrapper, remote_grpc_client: GrpcClientWrapper, simple_object_size: ObjectSize
) -> None:
def test_put_object_with_copies_number(self, container: str, grpc_client: GrpcClientWrapper, simple_object_size: ObjectSize) -> None:
with reporter.step("Put object in container with copies number = 1"):
test_file = generate_file(simple_object_size.value)
oid = grpc_client.object.put(test_file, container, self.cluster.default_rpc_endpoint, copies_number=1)
with reporter.step("Check that count chunks > 1."):
chunks = remote_grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, container, oid)
chunks = grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, container, oid)
assert len(chunks) > 1
@allure.title("Request PUT and 1 node off")
@ -385,15 +373,13 @@ class TestECReplication(ClusterTestBase):
@allure.title("Request PUT (size={object_size})")
@requires_container(PUBLIC_WITH_POLICY("EC 2.1"))
def test_put_object_with_ec_cnr(
self, container: str, grpc_client: GrpcClientWrapper, remote_grpc_client: GrpcClientWrapper, object_size: ObjectSize
) -> None:
def test_put_object_with_ec_cnr(self, container: str, grpc_client: GrpcClientWrapper, object_size: ObjectSize) -> None:
with reporter.step("Put object in container"):
test_file = generate_file(object_size.value)
oid = grpc_client.object.put(test_file, container, self.cluster.default_rpc_endpoint)
with reporter.step("Get chunks object."):
chunks = remote_grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, container, oid)
chunks = grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, container, oid)
with reporter.step("Check header chunks object"):
for chunk in chunks:
@ -404,16 +390,14 @@ class TestECReplication(ClusterTestBase):
@allure.title("Request GET (size={object_size})")
@requires_container(PUBLIC_WITH_POLICY("EC 2.1 CBF 1"))
def test_get_object_in_ec_cnr(
self, container: str, grpc_client: GrpcClientWrapper, remote_grpc_client: GrpcClientWrapper, object_size: ObjectSize
) -> None:
def test_get_object_in_ec_cnr(self, container: str, grpc_client: GrpcClientWrapper, object_size: ObjectSize) -> None:
with reporter.step("Put object in container"):
test_file = generate_file(object_size.value)
hash_origin_file = get_file_hash(test_file)
oid = grpc_client.object.put(test_file, container, self.cluster.default_rpc_endpoint)
with reporter.step("Get id all chunks."):
chunks = remote_grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, container, oid)
chunks = grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, container, oid)
with reporter.step("Search chunk node and not chunks node."):
chunk_node = grpc_client.object.chunks.get_chunk_node(self.cluster, chunks[0])[0]
@ -444,30 +428,26 @@ class TestECReplication(ClusterTestBase):
@allure.title("Request SEARCH check valid chunk id (size={object_size})")
@requires_container(PUBLIC_WITH_POLICY("EC 2.1"))
def test_search_object_in_ec_cnr_chunk_id(
self, container: str, grpc_client: GrpcClientWrapper, remote_grpc_client: GrpcClientWrapper, object_size: ObjectSize
) -> None:
def test_search_object_in_ec_cnr_chunk_id(self, container: str, grpc_client: GrpcClientWrapper, object_size: ObjectSize) -> None:
with reporter.step("Put object in container"):
test_file = generate_file(object_size.value)
oid = grpc_client.object.put(test_file, container, self.cluster.default_rpc_endpoint)
with reporter.step("Search operation object"):
search_output = grpc_client.object.search(container, self.cluster.default_rpc_endpoint)
chunks = remote_grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, container, oid)
chunks = grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, container, oid)
for chunk in chunks:
assert chunk.object_id in search_output
@allure.title("Request SEARCH check no chunk index info (size={object_size})")
@requires_container(PUBLIC_WITH_POLICY("EC 2.1"))
def test_search_object_in_ec_cnr(
self, container: str, grpc_client: GrpcClientWrapper, remote_grpc_client: GrpcClientWrapper, object_size: ObjectSize
) -> None:
def test_search_object_in_ec_cnr(self, container: str, grpc_client: GrpcClientWrapper, object_size: ObjectSize) -> None:
with reporter.step("Put object in container"):
test_file = generate_file(object_size.value)
oid = grpc_client.object.put(test_file, container, self.cluster.default_rpc_endpoint)
with reporter.step("Search operation all chunk"):
chunks = remote_grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, container, oid)
chunks = grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, container, oid)
for chunk in chunks:
chunk_search = grpc_client.object.search(container, self.cluster.default_rpc_endpoint, oid=chunk.object_id)
assert "index" not in chunk_search
@ -476,19 +456,14 @@ class TestECReplication(ClusterTestBase):
@pytest.mark.failover
@requires_container(PUBLIC_WITH_POLICY("EC 2.1"))
def test_delete_object_in_ec_cnr(
self,
container: str,
grpc_client: GrpcClientWrapper,
remote_grpc_client: GrpcClientWrapper,
object_size: ObjectSize,
cluster_state_controller: ClusterStateController,
self, container: str, grpc_client: GrpcClientWrapper, object_size: ObjectSize, cluster_state_controller: ClusterStateController
) -> None:
with reporter.step("Put object in container."):
test_file = generate_file(object_size.value)
oid = grpc_client.object.put(test_file, container, self.cluster.default_rpc_endpoint)
with reporter.step("Check object chunks nodes."):
chunks = remote_grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, container, oid)
chunks = grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, container, oid)
replication_count = 3 if object_size.name == "simple" else 3 * 4
assert len(chunks) == replication_count
@ -504,7 +479,7 @@ class TestECReplication(ClusterTestBase):
oid_second = grpc_client.object.put(test_file, container, self.cluster.default_rpc_endpoint)
with reporter.step("Check second object chunks nodes."):
chunks_second_object = remote_grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, container, oid_second)
chunks_second_object = grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, container, oid_second)
assert len(chunks_second_object) == replication_count
with reporter.step("Stop nodes with chunk."):
@ -528,7 +503,6 @@ class TestECReplication(ClusterTestBase):
container: str,
test_file: TestFile,
grpc_client: GrpcClientWrapper,
remote_grpc_client: GrpcClientWrapper,
frostfs_cli: FrostfsCli,
object_size: ObjectSize,
cluster_state_controller: ClusterStateController,
@ -538,7 +512,7 @@ class TestECReplication(ClusterTestBase):
oid = grpc_client.object.put(test_file, container, self.cluster.default_rpc_endpoint)
with reporter.step("Check object chunks nodes."):
chunks_object = remote_grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, container, oid)
chunks_object = grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, container, oid)
replication_count = 3 if object_size.name == "simple" else 3 * 4
assert len(chunks_object) == replication_count
@ -635,23 +609,20 @@ class TestECReplication(ClusterTestBase):
assert len(container_nodes) == expected_nodes
@allure.title("Create container with EC policy and FILTER")
@requires_container(PUBLIC_WITH_POLICY("EC 1.1 IN RUS SELECT 2 FROM RU AS RUS FILTER Country EQ Russia AS RU"))
def test_create_container_with_filter(
self,
grpc_client: GrpcClientWrapper,
remote_grpc_client: GrpcClientWrapper,
simple_object_size: ObjectSize,
container: str,
) -> None:
def test_create_container_with_filter(self, grpc_client: GrpcClientWrapper, simple_object_size: ObjectSize) -> None:
with reporter.step("Create Container."):
policy = "EC 1.1 IN RUS SELECT 2 FROM RU AS RUS FILTER Country EQ Russia AS RU"
cid = grpc_client.container.create(self.cluster.default_rpc_endpoint, policy=policy, await_mode=True)
with reporter.step("Put object in container."):
test_file = generate_file(simple_object_size.value)
oid = grpc_client.object.put(test_file, container, self.cluster.default_rpc_endpoint)
oid = grpc_client.object.put(test_file, cid, self.cluster.default_rpc_endpoint)
with reporter.step("Check object is decomposed exclusively on Russian nodes"):
data_chunk = remote_grpc_client.object.chunks.get_first_data(self.cluster.default_rpc_endpoint, container, oid=oid)
parity_chunk = remote_grpc_client.object.chunks.get_parity(self.cluster.default_rpc_endpoint, container, oid=oid)
node_data_chunk = remote_grpc_client.object.chunks.get_chunk_node(self.cluster, data_chunk)
node_parity_chunk = remote_grpc_client.object.chunks.get_chunk_node(self.cluster, parity_chunk)
data_chunk = grpc_client.object.chunks.get_first_data(self.cluster.default_rpc_endpoint, cid, oid=oid)
parity_chunk = grpc_client.object.chunks.get_parity(self.cluster.default_rpc_endpoint, cid, oid=oid)
node_data_chunk = grpc_client.object.chunks.get_chunk_node(self.cluster, data_chunk)
node_parity_chunk = grpc_client.object.chunks.get_chunk_node(self.cluster, parity_chunk)
for node in [node_data_chunk[1], node_parity_chunk[1]]:
assert "Russia" in node.country
@ -664,7 +635,6 @@ class TestECReplication(ClusterTestBase):
container: str,
restore_nodes_shards_mode: None,
frostfs_cli: FrostfsCli,
remote_frostfs_cli: FrostfsCli,
grpc_client: GrpcClientWrapper,
max_object_size: int,
type: str,
@ -675,7 +645,7 @@ class TestECReplication(ClusterTestBase):
oid = grpc_client.object.put(test_file, container, self.cluster.default_rpc_endpoint)
with reporter.step("Get object chunks."):
chunk = get_chunk(self, remote_frostfs_cli, container, oid, self.cluster.default_rpc_endpoint)
chunk = get_chunk(self, frostfs_cli, container, oid, self.cluster.default_rpc_endpoint)
chunk_node = grpc_client.object.chunks.get_chunk_node(self.cluster, chunk)
frostfs_node_cli = self.get_node_cli(chunk_node[0], config=chunk_node[0].storage_node.get_remote_wallet_config_path())
@ -720,7 +690,7 @@ class TestECReplication(ClusterTestBase):
test_file: TestFile,
s3_client: S3ClientWrapper,
bucket_container_resolver: BucketContainerResolver,
remote_grpc_client: GrpcClientWrapper,
grpc_client: GrpcClientWrapper,
object_size: ObjectSize,
) -> None:
with reporter.step("Create bucket with EC location constrain"):
@ -736,27 +706,25 @@ class TestECReplication(ClusterTestBase):
with reporter.step("Watch replication count chunks"):
cid = bucket_container_resolver.resolve(self.cluster.cluster_nodes[0], bucket)
chunks = remote_grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, cid, bucket_object)
chunks = grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, cid, bucket_object)
expect_chunks = 4 if object_size.name == "simple" else 16
assert len(chunks) == expect_chunks
@allure.title("Replication chunk after drop (size={object_size})")
@requires_container(PUBLIC_WITH_POLICY("EC 2.1 CBF 1"))
def test_drop_chunk_and_replication(
self, test_file: TestFile, container: str, grpc_client: GrpcClientWrapper, remote_grpc_client: GrpcClientWrapper, rep_count: int
) -> None:
def test_drop_chunk_and_replication(self, test_file: TestFile, container: str, grpc_client: GrpcClientWrapper, rep_count: int) -> None:
with reporter.step("Put object"):
oid = grpc_client.object.put(test_file, container, self.cluster.default_rpc_endpoint)
with reporter.step("Get all chunks"):
data_chunk = remote_grpc_client.object.chunks.get_first_data(self.cluster.default_rpc_endpoint, container, oid=oid)
data_chunk = grpc_client.object.chunks.get_first_data(self.cluster.default_rpc_endpoint, container, oid=oid)
with reporter.step("Search chunk node"):
chunk_node = remote_grpc_client.object.chunks.get_chunk_node(self.cluster, data_chunk)
chunk_node = grpc_client.object.chunks.get_chunk_node(self.cluster, data_chunk)
shell_chunk_node = chunk_node[0].host.get_shell()
with reporter.step("Get replication count"):
assert self.check_replication(rep_count, remote_grpc_client, container, oid)
assert self.check_replication(rep_count, grpc_client, container, oid)
with reporter.step("Delete chunk"):
frostfs_node_cli = FrostfsCli(
@ -767,4 +735,4 @@ class TestECReplication(ClusterTestBase):
frostfs_node_cli.control.drop_objects(chunk_node[0].storage_node.get_control_endpoint(), f"{container}/{data_chunk.object_id}")
with reporter.step("Wait replication count after drop one chunk"):
self.wait_replication(rep_count, remote_grpc_client, container, oid)
self.wait_replication(rep_count, grpc_client, container, oid)