[#357] Use interfaces when appropriate

Signed-off-by: a.berezin <a.berezin@yadro.com>
This commit is contained in:
Andrey Berezin 2024-12-27 12:09:22 +03:00
parent 6174330f49
commit 67a42cae81
8 changed files with 158 additions and 92 deletions

View file

@ -15,8 +15,7 @@ def validate_object_policy(wallet: str, shell: Shell, placement_rule: str, cid:
def get_netmap_param(netmap_info: list[NodeNetmapInfo]) -> dict:
dict_external = dict()
for node in netmap_info:
external_adress = node.external_address[0].split("/")[2]
dict_external[external_adress] = {
dict_external[node.node] = {
"country": node.country,
"country_code": node.country_code,
"Price": node.price,

View file

@ -210,12 +210,35 @@ def frostfs_cli(client_shell: Shell, wallet: WalletInfo) -> FrostfsCli:
return FrostfsCli(client_shell, FROSTFS_CLI_EXEC, wallet.config_path)
@pytest.fixture(scope="session")
@allure.title("Init Frostfs CLI remote")
def remote_frostfs_cli(cluster: Cluster) -> FrostfsCli:
node = cluster.cluster_nodes[0]
host = node.host
service_config = host.get_service_config(node.storage_node.name)
wallet_path = service_config.attributes["wallet_path"]
wallet_password = service_config.attributes["wallet_password"]
shell = host.get_shell()
wallet_config_path = f"/tmp/{node.storage_node.name}-config.yaml"
wallet_config = f'wallet: {wallet_path}\npassword: "{wallet_password}"'
shell.exec(f"echo '{wallet_config}' > {wallet_config_path}")
cli = FrostfsCli(shell=shell, frostfs_cli_exec_path=FROSTFS_CLI_EXEC, config_file=wallet_config_path)
return cli
@pytest.fixture(scope="session")
@allure.title("Init GrpcClientWrapper with local Frostfs CLI")
def grpc_client(frostfs_cli: FrostfsCli) -> GrpcClientWrapper:
return CliClientWrapper(frostfs_cli)
@pytest.fixture(scope="session")
@allure.title("Init GrpcClientWrapper with remote Frostfs CLI")
def remote_grpc_client(remote_frostfs_cli: FrostfsCli) -> GrpcClientWrapper:
return CliClientWrapper(remote_frostfs_cli)
# By default we want all tests to be executed with both storage policies.
# This can be overriden in choosen tests if needed.
@pytest.fixture(scope="session", params=[pytest.param("rep", marks=pytest.mark.rep), pytest.param("ec", marks=pytest.mark.ec)])
@ -485,6 +508,7 @@ def user(user_tag: str) -> User:
@pytest.fixture(scope="session")
@cached_fixture(optionals.OPTIONAL_CACHE_FIXTURES)
def wallet(user: User, credentials_provider: CredentialsProvider, cluster: Cluster) -> WalletInfo:
credentials_provider.GRPC.provide(user, cluster.cluster_nodes[0])
return user.wallet

View file

@ -5,7 +5,9 @@ from frostfs_testlib.steps.cli.container import create_container, delete_contain
from frostfs_testlib.steps.cli.object import delete_object, put_object_to_random_node
from frostfs_testlib.steps.node_management import get_netmap_snapshot
from frostfs_testlib.steps.storage_policy import get_nodes_with_object
from frostfs_testlib.storage.cluster import Cluster
from frostfs_testlib.storage.dataclasses.object_size import ObjectSize
from frostfs_testlib.storage.dataclasses.storage_object_info import Interfaces
from frostfs_testlib.storage.dataclasses.wallet import WalletInfo
from frostfs_testlib.testing.cluster_test_base import ClusterTestBase
from frostfs_testlib.utils.cli_utils import parse_netmap_output
@ -169,7 +171,7 @@ class TestPolicy(ClusterTestBase):
with reporter.step(f"Check the object appearance"):
netmap = parse_netmap_output(get_netmap_snapshot(node=resulting_copies[0], shell=self.shell))
netmap = get_netmap_param(netmap)
node_address = resulting_copies[0].get_rpc_endpoint().split(":")[0]
node_address = self.cluster.node(resulting_copies[0]).get_interface(Interfaces.MGMT)
with reporter.step(f"Check the node is selected from {placement_params['country']}"):
assert (
placement_params["country"] == netmap[node_address]["country"]
@ -219,7 +221,7 @@ class TestPolicy(ClusterTestBase):
with reporter.step(f"Check the object appearance"):
netmap = parse_netmap_output(get_netmap_snapshot(node=resulting_copies[0], shell=self.shell))
netmap = get_netmap_param(netmap)
node_address = resulting_copies[0].get_rpc_endpoint().split(":")[0]
node_address = self.cluster.node(resulting_copies[0]).get_interface(Interfaces.MGMT)
with reporter.step(f"Check the node is selected from {placement_params['country'][0]}"):
assert (
not (placement_params["country"][1] == netmap[node_address]["country"])
@ -273,8 +275,8 @@ class TestPolicy(ClusterTestBase):
netmap = parse_netmap_output(get_netmap_snapshot(node=resulting_copies[0], shell=self.shell))
netmap = get_netmap_param(netmap)
with reporter.step(f"Check two nodes are selected from any country"):
for node in resulting_copies:
node_address = resulting_copies[0].get_rpc_endpoint().split(":")[0]
for storage_node in resulting_copies:
node_address = self.cluster.node(storage_node).get_interface(Interfaces.MGMT)
assert (placement_params["country"][1] == netmap[node_address]["country"]) or (
not (placement_params["country"][1] == netmap[node_address]["country"])
and (placement_params["country"][0] == netmap[node_address]["country"])
@ -326,8 +328,8 @@ class TestPolicy(ClusterTestBase):
netmap = parse_netmap_output(get_netmap_snapshot(node=resulting_copies[0], shell=self.shell))
netmap = get_netmap_param(netmap)
with reporter.step(f"Check two nodes are selected from {placement_params['country']}"):
for node in resulting_copies:
node_address = node.get_rpc_endpoint().split(":")[0]
for storage_node in resulting_copies:
node_address = self.cluster.node(storage_node).get_interface(Interfaces.MGMT)
assert (
placement_params["country"] == netmap[node_address]["country"]
), f"The node is selected from the wrong country. Got {netmap[node_address]['country']}"
@ -635,8 +637,8 @@ class TestPolicy(ClusterTestBase):
netmap = parse_netmap_output(get_netmap_snapshot(node=resulting_copies[0], shell=self.shell))
netmap = get_netmap_param(netmap)
with reporter.step(f"Check two nodes are selected from {' or '.join(placement_params['country'])}"):
for node in resulting_copies:
node_address = node.get_rpc_endpoint().split(":")[0]
for storage_node in resulting_copies:
node_address = self.cluster.node(storage_node).get_interface(Interfaces.MGMT)
assert (
(netmap[node_address]["country"] in placement_params["country"])
or (netmap[node_address]["country"] in placement_params["country"])
@ -756,8 +758,8 @@ class TestPolicy(ClusterTestBase):
netmap = parse_netmap_output(get_netmap_snapshot(node=resulting_copies[0], shell=self.shell))
netmap = get_netmap_param(netmap)
with reporter.step(f"Check two nodes are selected not from {placement_params['country'][0]}"):
for node in resulting_copies:
node_address = node.get_rpc_endpoint().split(":")[0]
for storage_node in resulting_copies:
node_address = self.cluster.node(storage_node).get_interface(Interfaces.MGMT)
assert not (placement_params["country"][0] == netmap[node_address]["country"]) or (
not (placement_params["country"][0] == netmap[node_address]["country"])
and not (placement_params["country_code"] == netmap[node_address]["country_code"])
@ -809,8 +811,8 @@ class TestPolicy(ClusterTestBase):
netmap = parse_netmap_output(get_netmap_snapshot(node=resulting_copies[0], shell=self.shell))
netmap = get_netmap_param(netmap)
with reporter.step(f"Check three nodes are selected from any country"):
for node in resulting_copies:
node_address = node.get_rpc_endpoint().split(":")[0]
for storage_node in resulting_copies:
node_address = self.cluster.node(storage_node).get_interface(Interfaces.MGMT)
assert (placement_params["country"][0] == netmap[node_address]["country"]) or (
not (placement_params["country"][0] == netmap[node_address]["country"])
and (placement_params["country"][1] == netmap[node_address]["country"])
@ -864,8 +866,8 @@ class TestPolicy(ClusterTestBase):
netmap = parse_netmap_output(get_netmap_snapshot(node=resulting_copies[0], shell=self.shell))
netmap = get_netmap_param(netmap)
list_of_location = []
for node in resulting_copies:
node_address = node.get_rpc_endpoint().split(":")[0]
for storage_node in resulting_copies:
node_address = self.cluster.node(storage_node).get_interface(Interfaces.MGMT)
list_of_location.append(netmap[node_address]["location"])
with reporter.step(f"Check two or three nodes are selected from Russia and from any other country"):

View file

@ -14,6 +14,7 @@ from frostfs_testlib.storage.controllers.cluster_state_controller import Cluster
from frostfs_testlib.storage.controllers.state_managers.config_state_manager import ConfigStateManager
from frostfs_testlib.storage.dataclasses.frostfs_services import StorageNode
from frostfs_testlib.storage.dataclasses.object_size import ObjectSize
from frostfs_testlib.storage.dataclasses.storage_object_info import Interfaces
from frostfs_testlib.storage.dataclasses.wallet import WalletInfo
from frostfs_testlib.testing import parallel
from frostfs_testlib.testing.cluster_test_base import ClusterTestBase
@ -34,8 +35,8 @@ class TestPolicyWithPrice(ClusterTestBase):
def await_for_price_attribute_on_nodes(self):
netmap = parse_netmap_output(get_netmap_snapshot(node=self.cluster.storage_nodes[0], shell=self.shell))
netmap = get_netmap_param(netmap)
for node in self.cluster.storage_nodes:
node_address = node.get_rpc_endpoint().split(":")[0]
for node in self.cluster.cluster_nodes:
node_address = node.get_interface(Interfaces.MGMT)
if netmap[node_address]["Price"] is None:
return False
return True
@ -108,7 +109,7 @@ class TestPolicyWithPrice(ClusterTestBase):
with reporter.step(f"Check the object appearance"):
netmap = parse_netmap_output(get_netmap_snapshot(node=resulting_copies[0], shell=self.shell))
netmap = get_netmap_param(netmap)
node_address = resulting_copies[0].get_rpc_endpoint().split(":")[0]
node_address = self.cluster.node(resulting_copies[0]).get_interface(Interfaces.MGMT)
with reporter.step(f"Check the node is selected with price <= {placement_params['Price']}"):
assert (
int(netmap[node_address]["Price"]) <= placement_params["Price"]
@ -160,7 +161,7 @@ class TestPolicyWithPrice(ClusterTestBase):
netmap = get_netmap_param(netmap)
with reporter.step(f"Check the node is selected with price between 1 and 10"):
for node in resulting_copies:
node_address = node.get_rpc_endpoint().split(":")[0]
node_address = self.cluster.node(node).get_interface(Interfaces.MGMT)
assert (
int(netmap[node_address]["Price"]) > placement_params["Price"][1]
and int(netmap[node_address]["Price"]) <= placement_params["Price"][0]
@ -212,7 +213,7 @@ class TestPolicyWithPrice(ClusterTestBase):
netmap = get_netmap_param(netmap)
with reporter.step(f"Check two nodes are selected with max and min prices"):
for node in resulting_copies:
node_address = node.get_rpc_endpoint().split(":")[0]
node_address = self.cluster.node(node).get_interface(Interfaces.MGMT)
assert (
int(netmap[node_address]["Price"]) > placement_params["Price"][1]
or int(netmap[node_address]["Price"]) < placement_params["Price"][0]
@ -260,7 +261,7 @@ class TestPolicyWithPrice(ClusterTestBase):
netmap = get_netmap_param(netmap)
with reporter.step(f"Check two nodes are selected with price > {placement_params['Price']}"):
for node in resulting_copies:
node_address = node.get_rpc_endpoint().split(":")[0]
node_address = self.cluster.node(node).get_interface(Interfaces.MGMT)
assert (
int(netmap[node_address]["Price"]) > placement_params["Price"]
), f"The node is selected with the wrong price. Got {netmap[node_address]}"
@ -311,7 +312,7 @@ class TestPolicyWithPrice(ClusterTestBase):
netmap = get_netmap_param(netmap)
with reporter.step(f"Check two nodes are selected not with country code '{placement_params['country_code']}'"):
for node in resulting_copies:
node_address = node.get_rpc_endpoint().split(":")[0]
node_address = self.cluster.node(node).get_interface(Interfaces.MGMT)
assert (
not netmap[node_address]["country_code"] == placement_params["country_code"]
or not netmap[node_address]["country_code"] == placement_params["country_code"]
@ -364,7 +365,7 @@ class TestPolicyWithPrice(ClusterTestBase):
netmap = get_netmap_param(netmap)
with reporter.step(f"Check all nodes are selected"):
for node in resulting_copies:
node_address = node.get_rpc_endpoint().split(":")[0]
node_address = self.cluster.node(node).get_interface(Interfaces.MGMT)
assert (
netmap[node_address]["un_locode"] in placement_params["un_locode"]
or not netmap[node_address]["un_locode"] == placement_params["un_locode"][1]
@ -420,7 +421,7 @@ class TestPolicyWithPrice(ClusterTestBase):
netmap = get_netmap_param(netmap)
with reporter.step(f"Check two nodes are selected with price < {placement_params['Price']}"):
for node in resulting_copies:
node_address = node.get_rpc_endpoint().split(":")[0]
node_address = self.cluster.node(node).get_interface(Interfaces.MGMT)
assert (
int(netmap[node_address]["Price"]) < placement_params["Price"]
), f"The node is selected with the wrong price. Got {netmap[node_address]}"
@ -471,7 +472,7 @@ class TestPolicyWithPrice(ClusterTestBase):
netmap = get_netmap_param(netmap)
with reporter.step(f"Check three nodes are selected not from {placement_params['continent']}"):
for node in resulting_copies:
node_address = node.get_rpc_endpoint().split(":")[0]
node_address = self.cluster.node(node).get_interface(Interfaces.MGMT)
assert (
int(netmap[node_address]["Price"]) < placement_params["Price"]
and not netmap[node_address]["continent"] == placement_params["continent"]
@ -525,7 +526,7 @@ class TestPolicyWithPrice(ClusterTestBase):
netmap = get_netmap_param(netmap)
with reporter.step(f"Check all nodes are selected"):
for node in resulting_copies:
node_address = node.get_rpc_endpoint().split(":")[0]
node_address = self.cluster.node(node).get_interface(Interfaces.MGMT)
assert (
(
int(netmap[node_address]["Price"]) > placement_params["Price"][1]
@ -576,7 +577,7 @@ class TestPolicyWithPrice(ClusterTestBase):
with reporter.step(f"Check the object appearance"):
netmap = parse_netmap_output(get_netmap_snapshot(node=resulting_copies[0], shell=self.shell))
netmap = get_netmap_param(netmap)
node_address = resulting_copies[0].get_rpc_endpoint().split(":")[0]
node_address = self.cluster.node(resulting_copies[0]).get_interface(Interfaces.MGMT)
with reporter.step(f"Check the node is selected with price >= {placement_params['Price']}"):
assert (
int(netmap[node_address]["Price"]) >= placement_params["Price"]
@ -628,7 +629,7 @@ class TestPolicyWithPrice(ClusterTestBase):
netmap = get_netmap_param(netmap)
with reporter.step(f"Check all node are selected"):
for node in resulting_copies:
node_address = node.get_rpc_endpoint().split(":")[0]
node_address = self.cluster.node(node).get_interface(Interfaces.MGMT)
assert (netmap[node_address]["country"] in placement_params["country"]) or (
int(netmap[node_address]["Price"]) >= placement_params["Price"]
), f"The node is selected from the wrong country or with wrong price. Got {netmap[node_address]}"

View file

@ -309,6 +309,11 @@ class TestFailoverNetwork(ClusterTestBase):
block_interface: Interfaces,
other_interface: Interfaces,
):
endpoint_id_map = {
Interfaces.DATA_O: 0,
Interfaces.DATA_1: 1,
}
endpoint_id = endpoint_id_map[other_interface]
cluster_nodes = self.cluster.cluster_nodes
with reporter.step(f"Block {block_interface.value} interfaces"):
cluster_state_controller.down_interface(cluster_nodes, block_interface.value)
@ -320,28 +325,27 @@ class TestFailoverNetwork(ClusterTestBase):
cid = create_container(
wallet=default_wallet,
shell=self.shell,
endpoint=f"{cluster_nodes[0].get_data_interface(other_interface.value)[0]}:8080",
endpoint=cluster_nodes[0].storage_node.get_all_rpc_endpoint()[endpoint_id],
rule="REP 4 CBF 1",
)
with reporter.step("Put object"):
file_path = generate_file(simple_object_size.value)
oid = put_object(
wallet=default_wallet,
path=file_path,
cid=cid,
shell=self.shell,
endpoint=f"{cluster_nodes[0].get_data_interface(other_interface.value)[0]}:8080",
endpoint=cluster_nodes[0].storage_node.get_all_rpc_endpoint()[endpoint_id],
)
with reporter.step("Get object"):
file_get_path = get_object(
get_object(
wallet=default_wallet,
cid=cid,
oid=oid,
shell=self.shell,
endpoint=f"{cluster_nodes[0].get_data_interface(other_interface.value)[0]}:8080",
endpoint=cluster_nodes[0].storage_node.get_all_rpc_endpoint()[endpoint_id],
)
with reporter.step("Restore interfaces all nodes"):

View file

@ -36,7 +36,7 @@ from frostfs_testlib.steps.storage_policy import get_nodes_with_object
from frostfs_testlib.storage.cluster import ClusterNode, StorageNode
from frostfs_testlib.storage.controllers import ClusterStateController
from frostfs_testlib.storage.dataclasses.object_size import ObjectSize
from frostfs_testlib.storage.dataclasses.storage_object_info import NodeStatus
from frostfs_testlib.storage.dataclasses.storage_object_info import Interfaces, NodeStatus
from frostfs_testlib.storage.dataclasses.wallet import WalletInfo
from frostfs_testlib.testing.cluster_test_base import ClusterTestBase
from frostfs_testlib.utils import string_utils
@ -343,9 +343,11 @@ class TestMaintenanceMode(ClusterTestBase):
def check_node_status(self, expected_status: NodeStatus, node_under_test: ClusterNode, frostfs_cli: FrostfsCli, rpc_endpoint: str):
netmap = frostfs_cli.netmap.snapshot(rpc_endpoint).stdout
all_snapshots = NetmapParser.snapshot_all_nodes(netmap)
node_snapshot = [snapshot for snapshot in all_snapshots if node_under_test.host_ip == snapshot.node]
node_snapshot = [snapshot for snapshot in all_snapshots if node_under_test.get_interface(Interfaces.MGMT) == snapshot.node]
if expected_status == NodeStatus.OFFLINE and not node_snapshot:
assert node_under_test.host_ip not in netmap, f"{node_under_test} status should be {expected_status}. See netmap:\n{netmap}"
assert (
node_under_test.get_interface(Interfaces.MGMT) not in netmap
), f"{node_under_test} status should be {expected_status}. See netmap:\n{netmap}"
return
assert node_snapshot, f"{node_under_test} status should be {expected_status}, but was not in netmap. See netmap:\n{netmap}"

View file

@ -733,9 +733,11 @@ class TestObjectApiPatch(ClusterTestBase):
@allure.title("[NEGATIVE] Patch cannot be applied to part of complex object (policy={placement_policy})")
@pytest.mark.parametrize("placement_policy", ["rep"], indirect=True)
@pytest.mark.parametrize("object_size", ["complex"], indirect=True)
def test_patch_part_of_complex_object_rep(self, grpc_client: GrpcClientWrapper, container: str, original_object: str):
def test_patch_part_of_complex_object_rep(
self, grpc_client: GrpcClientWrapper, remote_grpc_client: GrpcClientWrapper, container: str, original_object: str
):
with reporter.step("Get parts of complex object"):
parts = grpc_client.object.parts(container, original_object, self.cluster.cluster_nodes[0])
parts = remote_grpc_client.object.parts(container, original_object, self.cluster.cluster_nodes[0])
assert parts, f"Expected list of OIDs of object parts: {parts}"
part_oid = parts[0]
@ -752,9 +754,11 @@ class TestObjectApiPatch(ClusterTestBase):
@allure.title("[NEGATIVE] Patch cannot be applied to EC chunk (object_size={object_size}, policy={placement_policy})")
@pytest.mark.parametrize("placement_policy", ["ec"], indirect=True)
def test_patch_ec_chunk(self, grpc_client: GrpcClientWrapper, container: str, original_object: str):
def test_patch_ec_chunk(
self, grpc_client: GrpcClientWrapper, remote_grpc_client: GrpcClientWrapper, container: str, original_object: str
):
with reporter.step("Get chunks of object"):
chunks = grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, container, original_object)
chunks = remote_grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, container, original_object)
assert chunks, f"Expected object chunks, but they are not there: {chunks}"
with reporter.step("Try patch chunk of object and catch exception"):

View file

@ -145,13 +145,13 @@ class TestECReplication(ClusterTestBase):
@allure.title("Create container with EC policy (size={object_size})")
@requires_container(PUBLIC_WITH_POLICY("EC 2.1"))
def test_create_container_with_ec_policy(
self, container: str, rep_count: int, grpc_client: GrpcClientWrapper, test_file: TestFile
self, container: str, rep_count: int, grpc_client: GrpcClientWrapper, remote_grpc_client: GrpcClientWrapper, test_file: TestFile
) -> None:
with reporter.step("Put object in container."):
oid = grpc_client.object.put(test_file, container, self.cluster.default_rpc_endpoint)
with reporter.step("Check replication chunks."):
assert self.check_replication(rep_count, grpc_client, container, oid)
assert self.check_replication(rep_count, remote_grpc_client, container, oid)
@allure.title("Lose node with chunk data")
@pytest.mark.failover
@ -159,6 +159,7 @@ class TestECReplication(ClusterTestBase):
def test_lose_node_with_data_chunk(
self,
grpc_client: GrpcClientWrapper,
remote_grpc_client: GrpcClientWrapper,
simple_object_size: ObjectSize,
cluster_state_controller: ClusterStateController,
container: str,
@ -169,10 +170,10 @@ class TestECReplication(ClusterTestBase):
oid = grpc_client.object.put(test_file, container, self.cluster.default_rpc_endpoint)
with reporter.step("Check chunk replication on 4 nodes."):
assert self.check_replication(4, grpc_client, container, oid)
assert self.check_replication(4, remote_grpc_client, container, oid)
with reporter.step("Search node data chunk"):
chunk = grpc_client.object.chunks.get_first_data(self.cluster.default_rpc_endpoint, container, oid=oid)
chunk = remote_grpc_client.object.chunks.get_first_data(self.cluster.default_rpc_endpoint, container, oid=oid)
chunk_node = grpc_client.object.chunks.get_chunk_node(self.cluster, chunk)
with reporter.step("Stop node with data chunk."):
@ -184,7 +185,7 @@ class TestECReplication(ClusterTestBase):
with reporter.step("Start stopped node, and check replication chunks."):
cluster_state_controller.start_node_host(chunk_node[0])
self.wait_replication(4, grpc_client, container, oid)
self.wait_replication(4, remote_grpc_client, container, oid)
@allure.title("Lose node with chunk parity")
@pytest.mark.failover
@ -192,6 +193,7 @@ class TestECReplication(ClusterTestBase):
def test_lose_node_with_parity_chunk(
self,
grpc_client: GrpcClientWrapper,
remote_grpc_client: GrpcClientWrapper,
simple_object_size: ObjectSize,
cluster_state_controller: ClusterStateController,
container: str,
@ -202,11 +204,11 @@ class TestECReplication(ClusterTestBase):
oid = grpc_client.object.put(test_file, container, self.cluster.default_rpc_endpoint)
with reporter.step("Check chunk replication on 4 nodes."):
assert self.check_replication(4, grpc_client, container, oid)
assert self.check_replication(4, remote_grpc_client, container, oid)
with reporter.step("Search node with parity chunk"):
chunk = grpc_client.object.chunks.get_parity(self.cluster.default_rpc_endpoint, container, oid=oid)
chunk_node = grpc_client.object.chunks.get_chunk_node(self.cluster, chunk)[0]
chunk = remote_grpc_client.object.chunks.get_parity(self.cluster.default_rpc_endpoint, container, oid=oid)
chunk_node = remote_grpc_client.object.chunks.get_chunk_node(self.cluster, chunk)[0]
with reporter.step("Stop node parity chunk."):
cluster_state_controller.stop_node_host(chunk_node, "hard")
@ -217,7 +219,7 @@ class TestECReplication(ClusterTestBase):
with reporter.step("Start stoped node, and check replication chunks."):
cluster_state_controller.start_node_host(chunk_node)
self.wait_replication(4, grpc_client, container, oid)
self.wait_replication(4, remote_grpc_client, container, oid)
@allure.title("Lose nodes with chunk data and parity")
@pytest.mark.failover
@ -225,6 +227,7 @@ class TestECReplication(ClusterTestBase):
def test_lose_nodes_data_chunk_and_parity(
self,
grpc_client: GrpcClientWrapper,
remote_grpc_client: GrpcClientWrapper,
simple_object_size: ObjectSize,
cluster_state_controller: ClusterStateController,
container: str,
@ -235,13 +238,13 @@ class TestECReplication(ClusterTestBase):
oid = grpc_client.object.put(test_file, container, self.cluster.default_rpc_endpoint)
with reporter.step("Check count chunks, expect 4."):
assert self.check_replication(4, grpc_client, container, oid)
assert self.check_replication(4, remote_grpc_client, container, oid)
with reporter.step("Search node data chunk and node parity chunk"):
data_chunk = grpc_client.object.chunks.get_first_data(self.cluster.default_rpc_endpoint, container, oid=oid)
data_chunk_node = grpc_client.object.chunks.get_chunk_node(self.cluster, data_chunk)[0]
parity_chunk = grpc_client.object.chunks.get_parity(self.cluster.default_rpc_endpoint, container, oid=oid)
parity_chunk_node = grpc_client.object.chunks.get_chunk_node(self.cluster, parity_chunk)[0]
data_chunk = remote_grpc_client.object.chunks.get_first_data(self.cluster.default_rpc_endpoint, container, oid=oid)
data_chunk_node = remote_grpc_client.object.chunks.get_chunk_node(self.cluster, data_chunk)[0]
parity_chunk = remote_grpc_client.object.chunks.get_parity(self.cluster.default_rpc_endpoint, container, oid=oid)
parity_chunk_node = remote_grpc_client.object.chunks.get_chunk_node(self.cluster, parity_chunk)[0]
with reporter.step("Stop node with data chunk."):
cluster_state_controller.stop_node_host(data_chunk_node, "hard")
@ -252,7 +255,7 @@ class TestECReplication(ClusterTestBase):
with reporter.step("Start stopped host and check chunks."):
cluster_state_controller.start_node_host(data_chunk_node)
self.wait_replication(4, grpc_client, container, oid)
self.wait_replication(4, remote_grpc_client, container, oid)
with reporter.step("Stop node with parity chunk and one all node."):
cluster_state_controller.stop_node_host(data_chunk_node, "hard")
@ -264,7 +267,7 @@ class TestECReplication(ClusterTestBase):
with reporter.step("Start stopped nodes and check replication chunk."):
cluster_state_controller.start_stopped_hosts()
self.wait_replication(4, grpc_client, container, oid)
self.wait_replication(4, remote_grpc_client, container, oid)
@allure.title("Policer work with chunk")
@pytest.mark.failover
@ -273,6 +276,7 @@ class TestECReplication(ClusterTestBase):
self,
simple_object_size: ObjectSize,
grpc_client: GrpcClientWrapper,
remote_grpc_client: GrpcClientWrapper,
cluster_state_controller: ClusterStateController,
container: str,
include_excluded_nodes: None,
@ -282,12 +286,12 @@ class TestECReplication(ClusterTestBase):
oid = grpc_client.object.put(test_file, container, self.cluster.default_rpc_endpoint)
with reporter.step("Check count chunks nodes on 3."):
assert self.check_replication(3, grpc_client, container, oid)
assert self.check_replication(3, remote_grpc_client, container, oid)
with reporter.step("Search node with chunk."):
data_chunk = grpc_client.object.chunks.get_first_data(self.cluster.default_rpc_endpoint, container, oid=oid)
node_data_chunk = grpc_client.object.chunks.get_chunk_node(self.cluster, data_chunk)[0]
first_all_chunks = grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, container, oid)
data_chunk = remote_grpc_client.object.chunks.get_first_data(self.cluster.default_rpc_endpoint, container, oid=oid)
node_data_chunk = remote_grpc_client.object.chunks.get_chunk_node(self.cluster, data_chunk)[0]
first_all_chunks = remote_grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, container, oid)
with reporter.step("Remove chunk node from network map"):
cluster_state_controller.remove_node_from_netmap([node_data_chunk.storage_node])
@ -300,10 +304,10 @@ class TestECReplication(ClusterTestBase):
node = grpc_client.object.chunks.search_node_without_chunks(
first_all_chunks, self.cluster, alive_node.storage_node.get_rpc_endpoint()
)[0]
self.wait_replication(3, grpc_client, container, oid)
self.wait_replication(3, remote_grpc_client, container, oid)
with reporter.step("Get new chunks"):
second_all_chunks = grpc_client.object.chunks.get_all(node.storage_node.get_rpc_endpoint(), container, oid)
second_all_chunks = remote_grpc_client.object.chunks.get_all(node.storage_node.get_rpc_endpoint(), container, oid)
with reporter.step("Check that oid no change."):
assert [chunk for chunk in second_all_chunks if data_chunk.object_id == chunk.object_id]
@ -311,11 +315,17 @@ class TestECReplication(ClusterTestBase):
with reporter.step("Include node in netmap"):
cluster_state_controller.include_node_to_netmap(node_data_chunk.storage_node, alive_node.storage_node)
self.wait_sync_count_chunks_nodes(grpc_client, container, oid, 3)
self.wait_sync_count_chunks_nodes(remote_grpc_client, container, oid, 3)
@allure.title("EC X.Y combinations (nodes={node_count},policy={ec_policy},size={object_size})")
def test_create_container_with_difference_count_nodes(
self, frostfs_cli: FrostfsCli, node_count: int, ec_policy: str, object_size: ObjectSize, grpc_client: GrpcClientWrapper
self,
frostfs_cli: FrostfsCli,
node_count: int,
ec_policy: str,
object_size: ObjectSize,
grpc_client: GrpcClientWrapper,
remote_grpc_client: GrpcClientWrapper,
) -> None:
with reporter.step("Create container."):
expected_chunks = int(ec_policy.split(" ")[1].split(".")[0]) + int(ec_policy.split(" ")[1].split(".")[1])
@ -336,7 +346,7 @@ class TestECReplication(ClusterTestBase):
oid = grpc_client.object.put(test_file, cid, self.cluster.default_rpc_endpoint)
with reporter.step("Check count object chunks."):
chunks = grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, cid, oid)
chunks = remote_grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, cid, oid)
assert len(chunks) == expected_chunks
with reporter.step("get object and check hash."):
@ -345,13 +355,15 @@ class TestECReplication(ClusterTestBase):
@allure.title("Request PUT with copies_number flag")
@requires_container(PUBLIC_WITH_POLICY("EC 2.1"))
def test_put_object_with_copies_number(self, container: str, grpc_client: GrpcClientWrapper, simple_object_size: ObjectSize) -> None:
def test_put_object_with_copies_number(
self, container: str, grpc_client: GrpcClientWrapper, remote_grpc_client: GrpcClientWrapper, simple_object_size: ObjectSize
) -> None:
with reporter.step("Put object in container with copies number = 1"):
test_file = generate_file(simple_object_size.value)
oid = grpc_client.object.put(test_file, container, self.cluster.default_rpc_endpoint, copies_number=1)
with reporter.step("Check that count chunks > 1."):
chunks = grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, container, oid)
chunks = remote_grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, container, oid)
assert len(chunks) > 1
@allure.title("Request PUT and 1 node off")
@ -373,13 +385,15 @@ class TestECReplication(ClusterTestBase):
@allure.title("Request PUT (size={object_size})")
@requires_container(PUBLIC_WITH_POLICY("EC 2.1"))
def test_put_object_with_ec_cnr(self, container: str, grpc_client: GrpcClientWrapper, object_size: ObjectSize) -> None:
def test_put_object_with_ec_cnr(
self, container: str, grpc_client: GrpcClientWrapper, remote_grpc_client: GrpcClientWrapper, object_size: ObjectSize
) -> None:
with reporter.step("Put object in container"):
test_file = generate_file(object_size.value)
oid = grpc_client.object.put(test_file, container, self.cluster.default_rpc_endpoint)
with reporter.step("Get chunks object."):
chunks = grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, container, oid)
chunks = remote_grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, container, oid)
with reporter.step("Check header chunks object"):
for chunk in chunks:
@ -390,14 +404,16 @@ class TestECReplication(ClusterTestBase):
@allure.title("Request GET (size={object_size})")
@requires_container(PUBLIC_WITH_POLICY("EC 2.1 CBF 1"))
def test_get_object_in_ec_cnr(self, container: str, grpc_client: GrpcClientWrapper, object_size: ObjectSize) -> None:
def test_get_object_in_ec_cnr(
self, container: str, grpc_client: GrpcClientWrapper, remote_grpc_client: GrpcClientWrapper, object_size: ObjectSize
) -> None:
with reporter.step("Put object in container"):
test_file = generate_file(object_size.value)
hash_origin_file = get_file_hash(test_file)
oid = grpc_client.object.put(test_file, container, self.cluster.default_rpc_endpoint)
with reporter.step("Get id all chunks."):
chunks = grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, container, oid)
chunks = remote_grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, container, oid)
with reporter.step("Search chunk node and not chunks node."):
chunk_node = grpc_client.object.chunks.get_chunk_node(self.cluster, chunks[0])[0]
@ -428,26 +444,30 @@ class TestECReplication(ClusterTestBase):
@allure.title("Request SEARCH check valid chunk id (size={object_size})")
@requires_container(PUBLIC_WITH_POLICY("EC 2.1"))
def test_search_object_in_ec_cnr_chunk_id(self, container: str, grpc_client: GrpcClientWrapper, object_size: ObjectSize) -> None:
def test_search_object_in_ec_cnr_chunk_id(
self, container: str, grpc_client: GrpcClientWrapper, remote_grpc_client: GrpcClientWrapper, object_size: ObjectSize
) -> None:
with reporter.step("Put object in container"):
test_file = generate_file(object_size.value)
oid = grpc_client.object.put(test_file, container, self.cluster.default_rpc_endpoint)
with reporter.step("Search operation object"):
search_output = grpc_client.object.search(container, self.cluster.default_rpc_endpoint)
chunks = grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, container, oid)
chunks = remote_grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, container, oid)
for chunk in chunks:
assert chunk.object_id in search_output
@allure.title("Request SEARCH check no chunk index info (size={object_size})")
@requires_container(PUBLIC_WITH_POLICY("EC 2.1"))
def test_search_object_in_ec_cnr(self, container: str, grpc_client: GrpcClientWrapper, object_size: ObjectSize) -> None:
def test_search_object_in_ec_cnr(
self, container: str, grpc_client: GrpcClientWrapper, remote_grpc_client: GrpcClientWrapper, object_size: ObjectSize
) -> None:
with reporter.step("Put object in container"):
test_file = generate_file(object_size.value)
oid = grpc_client.object.put(test_file, container, self.cluster.default_rpc_endpoint)
with reporter.step("Search operation all chunk"):
chunks = grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, container, oid)
chunks = remote_grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, container, oid)
for chunk in chunks:
chunk_search = grpc_client.object.search(container, self.cluster.default_rpc_endpoint, oid=chunk.object_id)
assert "index" not in chunk_search
@ -456,14 +476,19 @@ class TestECReplication(ClusterTestBase):
@pytest.mark.failover
@requires_container(PUBLIC_WITH_POLICY("EC 2.1"))
def test_delete_object_in_ec_cnr(
self, container: str, grpc_client: GrpcClientWrapper, object_size: ObjectSize, cluster_state_controller: ClusterStateController
self,
container: str,
grpc_client: GrpcClientWrapper,
remote_grpc_client: GrpcClientWrapper,
object_size: ObjectSize,
cluster_state_controller: ClusterStateController,
) -> None:
with reporter.step("Put object in container."):
test_file = generate_file(object_size.value)
oid = grpc_client.object.put(test_file, container, self.cluster.default_rpc_endpoint)
with reporter.step("Check object chunks nodes."):
chunks = grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, container, oid)
chunks = remote_grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, container, oid)
replication_count = 3 if object_size.name == "simple" else 3 * 4
assert len(chunks) == replication_count
@ -479,7 +504,7 @@ class TestECReplication(ClusterTestBase):
oid_second = grpc_client.object.put(test_file, container, self.cluster.default_rpc_endpoint)
with reporter.step("Check second object chunks nodes."):
chunks_second_object = grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, container, oid_second)
chunks_second_object = remote_grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, container, oid_second)
assert len(chunks_second_object) == replication_count
with reporter.step("Stop nodes with chunk."):
@ -503,6 +528,7 @@ class TestECReplication(ClusterTestBase):
container: str,
test_file: TestFile,
grpc_client: GrpcClientWrapper,
remote_grpc_client: GrpcClientWrapper,
frostfs_cli: FrostfsCli,
object_size: ObjectSize,
cluster_state_controller: ClusterStateController,
@ -512,7 +538,7 @@ class TestECReplication(ClusterTestBase):
oid = grpc_client.object.put(test_file, container, self.cluster.default_rpc_endpoint)
with reporter.step("Check object chunks nodes."):
chunks_object = grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, container, oid)
chunks_object = remote_grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, container, oid)
replication_count = 3 if object_size.name == "simple" else 3 * 4
assert len(chunks_object) == replication_count
@ -613,6 +639,7 @@ class TestECReplication(ClusterTestBase):
def test_create_container_with_filter(
self,
grpc_client: GrpcClientWrapper,
remote_grpc_client: GrpcClientWrapper,
simple_object_size: ObjectSize,
container: str,
) -> None:
@ -621,10 +648,10 @@ class TestECReplication(ClusterTestBase):
oid = grpc_client.object.put(test_file, container, self.cluster.default_rpc_endpoint)
with reporter.step("Check object is decomposed exclusively on Russian nodes"):
data_chunk = grpc_client.object.chunks.get_first_data(self.cluster.default_rpc_endpoint, container, oid=oid)
parity_chunk = grpc_client.object.chunks.get_parity(self.cluster.default_rpc_endpoint, container, oid=oid)
node_data_chunk = grpc_client.object.chunks.get_chunk_node(self.cluster, data_chunk)
node_parity_chunk = grpc_client.object.chunks.get_chunk_node(self.cluster, parity_chunk)
data_chunk = remote_grpc_client.object.chunks.get_first_data(self.cluster.default_rpc_endpoint, container, oid=oid)
parity_chunk = remote_grpc_client.object.chunks.get_parity(self.cluster.default_rpc_endpoint, container, oid=oid)
node_data_chunk = remote_grpc_client.object.chunks.get_chunk_node(self.cluster, data_chunk)
node_parity_chunk = remote_grpc_client.object.chunks.get_chunk_node(self.cluster, parity_chunk)
for node in [node_data_chunk[1], node_parity_chunk[1]]:
assert "Russia" in node.country
@ -637,6 +664,7 @@ class TestECReplication(ClusterTestBase):
container: str,
restore_nodes_shards_mode: None,
frostfs_cli: FrostfsCli,
remote_frostfs_cli: FrostfsCli,
grpc_client: GrpcClientWrapper,
max_object_size: int,
type: str,
@ -647,7 +675,7 @@ class TestECReplication(ClusterTestBase):
oid = grpc_client.object.put(test_file, container, self.cluster.default_rpc_endpoint)
with reporter.step("Get object chunks."):
chunk = get_chunk(self, frostfs_cli, container, oid, self.cluster.default_rpc_endpoint)
chunk = get_chunk(self, remote_frostfs_cli, container, oid, self.cluster.default_rpc_endpoint)
chunk_node = grpc_client.object.chunks.get_chunk_node(self.cluster, chunk)
frostfs_node_cli = self.get_node_cli(chunk_node[0], config=chunk_node[0].storage_node.get_remote_wallet_config_path())
@ -692,7 +720,7 @@ class TestECReplication(ClusterTestBase):
test_file: TestFile,
s3_client: S3ClientWrapper,
bucket_container_resolver: BucketContainerResolver,
grpc_client: GrpcClientWrapper,
remote_grpc_client: GrpcClientWrapper,
object_size: ObjectSize,
) -> None:
with reporter.step("Create bucket with EC location constrain"):
@ -708,25 +736,27 @@ class TestECReplication(ClusterTestBase):
with reporter.step("Watch replication count chunks"):
cid = bucket_container_resolver.resolve(self.cluster.cluster_nodes[0], bucket)
chunks = grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, cid, bucket_object)
chunks = remote_grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, cid, bucket_object)
expect_chunks = 4 if object_size.name == "simple" else 16
assert len(chunks) == expect_chunks
@allure.title("Replication chunk after drop (size={object_size})")
@requires_container(PUBLIC_WITH_POLICY("EC 2.1 CBF 1"))
def test_drop_chunk_and_replication(self, test_file: TestFile, container: str, grpc_client: GrpcClientWrapper, rep_count: int) -> None:
def test_drop_chunk_and_replication(
self, test_file: TestFile, container: str, grpc_client: GrpcClientWrapper, remote_grpc_client: GrpcClientWrapper, rep_count: int
) -> None:
with reporter.step("Put object"):
oid = grpc_client.object.put(test_file, container, self.cluster.default_rpc_endpoint)
with reporter.step("Get all chunks"):
data_chunk = grpc_client.object.chunks.get_first_data(self.cluster.default_rpc_endpoint, container, oid=oid)
data_chunk = remote_grpc_client.object.chunks.get_first_data(self.cluster.default_rpc_endpoint, container, oid=oid)
with reporter.step("Search chunk node"):
chunk_node = grpc_client.object.chunks.get_chunk_node(self.cluster, data_chunk)
chunk_node = remote_grpc_client.object.chunks.get_chunk_node(self.cluster, data_chunk)
shell_chunk_node = chunk_node[0].host.get_shell()
with reporter.step("Get replication count"):
assert self.check_replication(rep_count, grpc_client, container, oid)
assert self.check_replication(rep_count, remote_grpc_client, container, oid)
with reporter.step("Delete chunk"):
frostfs_node_cli = FrostfsCli(
@ -737,4 +767,4 @@ class TestECReplication(ClusterTestBase):
frostfs_node_cli.control.drop_objects(chunk_node[0].storage_node.get_control_endpoint(), f"{container}/{data_chunk.object_id}")
with reporter.step("Wait replication count after drop one chunk"):
self.wait_replication(rep_count, grpc_client, container, oid)
self.wait_replication(rep_count, remote_grpc_client, container, oid)