diff --git a/pytest_tests/helpers/policy_validation.py b/pytest_tests/helpers/policy_validation.py index e1c9fda7..68341048 100644 --- a/pytest_tests/helpers/policy_validation.py +++ b/pytest_tests/helpers/policy_validation.py @@ -15,8 +15,7 @@ def validate_object_policy(wallet: str, shell: Shell, placement_rule: str, cid: def get_netmap_param(netmap_info: list[NodeNetmapInfo]) -> dict: dict_external = dict() for node in netmap_info: - external_adress = node.external_address[0].split("/")[2] - dict_external[external_adress] = { + dict_external[node.node] = { "country": node.country, "country_code": node.country_code, "Price": node.price, diff --git a/pytest_tests/testsuites/conftest.py b/pytest_tests/testsuites/conftest.py index 942d0ce3..0290b870 100644 --- a/pytest_tests/testsuites/conftest.py +++ b/pytest_tests/testsuites/conftest.py @@ -210,12 +210,35 @@ def frostfs_cli(client_shell: Shell, wallet: WalletInfo) -> FrostfsCli: return FrostfsCli(client_shell, FROSTFS_CLI_EXEC, wallet.config_path) +@pytest.fixture(scope="session") +@allure.title("Init Frostfs CLI remote") +def remote_frostfs_cli(cluster: Cluster) -> FrostfsCli: + node = cluster.cluster_nodes[0] + host = node.host + service_config = host.get_service_config(node.storage_node.name) + wallet_path = service_config.attributes["wallet_path"] + wallet_password = service_config.attributes["wallet_password"] + + shell = host.get_shell() + wallet_config_path = f"/tmp/{node.storage_node.name}-config.yaml" + wallet_config = f'wallet: {wallet_path}\npassword: "{wallet_password}"' + shell.exec(f"echo '{wallet_config}' > {wallet_config_path}") + cli = FrostfsCli(shell=shell, frostfs_cli_exec_path=FROSTFS_CLI_EXEC, config_file=wallet_config_path) + return cli + + @pytest.fixture(scope="session") @allure.title("Init GrpcClientWrapper with local Frostfs CLI") def grpc_client(frostfs_cli: FrostfsCli) -> GrpcClientWrapper: return CliClientWrapper(frostfs_cli) +@pytest.fixture(scope="session") +@allure.title("Init GrpcClientWrapper with remote Frostfs CLI") +def remote_grpc_client(remote_frostfs_cli: FrostfsCli) -> GrpcClientWrapper: + return CliClientWrapper(remote_frostfs_cli) + + # By default we want all tests to be executed with both storage policies. # This can be overriden in choosen tests if needed. @pytest.fixture(scope="session", params=[pytest.param("rep", marks=pytest.mark.rep), pytest.param("ec", marks=pytest.mark.ec)]) @@ -485,6 +508,7 @@ def user(user_tag: str) -> User: @pytest.fixture(scope="session") +@cached_fixture(optionals.OPTIONAL_CACHE_FIXTURES) def wallet(user: User, credentials_provider: CredentialsProvider, cluster: Cluster) -> WalletInfo: credentials_provider.GRPC.provide(user, cluster.cluster_nodes[0]) return user.wallet diff --git a/pytest_tests/testsuites/container/test_policy.py b/pytest_tests/testsuites/container/test_policy.py index 1779242e..14f56e0b 100644 --- a/pytest_tests/testsuites/container/test_policy.py +++ b/pytest_tests/testsuites/container/test_policy.py @@ -5,7 +5,9 @@ from frostfs_testlib.steps.cli.container import create_container, delete_contain from frostfs_testlib.steps.cli.object import delete_object, put_object_to_random_node from frostfs_testlib.steps.node_management import get_netmap_snapshot from frostfs_testlib.steps.storage_policy import get_nodes_with_object +from frostfs_testlib.storage.cluster import Cluster from frostfs_testlib.storage.dataclasses.object_size import ObjectSize +from frostfs_testlib.storage.dataclasses.storage_object_info import Interfaces from frostfs_testlib.storage.dataclasses.wallet import WalletInfo from frostfs_testlib.testing.cluster_test_base import ClusterTestBase from frostfs_testlib.utils.cli_utils import parse_netmap_output @@ -169,7 +171,7 @@ class TestPolicy(ClusterTestBase): with reporter.step(f"Check the object appearance"): netmap = parse_netmap_output(get_netmap_snapshot(node=resulting_copies[0], shell=self.shell)) netmap = get_netmap_param(netmap) - node_address = resulting_copies[0].get_rpc_endpoint().split(":")[0] + node_address = self.cluster.node(resulting_copies[0]).get_interface(Interfaces.MGMT) with reporter.step(f"Check the node is selected from {placement_params['country']}"): assert ( placement_params["country"] == netmap[node_address]["country"] @@ -219,7 +221,7 @@ class TestPolicy(ClusterTestBase): with reporter.step(f"Check the object appearance"): netmap = parse_netmap_output(get_netmap_snapshot(node=resulting_copies[0], shell=self.shell)) netmap = get_netmap_param(netmap) - node_address = resulting_copies[0].get_rpc_endpoint().split(":")[0] + node_address = self.cluster.node(resulting_copies[0]).get_interface(Interfaces.MGMT) with reporter.step(f"Check the node is selected from {placement_params['country'][0]}"): assert ( not (placement_params["country"][1] == netmap[node_address]["country"]) @@ -273,8 +275,8 @@ class TestPolicy(ClusterTestBase): netmap = parse_netmap_output(get_netmap_snapshot(node=resulting_copies[0], shell=self.shell)) netmap = get_netmap_param(netmap) with reporter.step(f"Check two nodes are selected from any country"): - for node in resulting_copies: - node_address = resulting_copies[0].get_rpc_endpoint().split(":")[0] + for storage_node in resulting_copies: + node_address = self.cluster.node(storage_node).get_interface(Interfaces.MGMT) assert (placement_params["country"][1] == netmap[node_address]["country"]) or ( not (placement_params["country"][1] == netmap[node_address]["country"]) and (placement_params["country"][0] == netmap[node_address]["country"]) @@ -326,8 +328,8 @@ class TestPolicy(ClusterTestBase): netmap = parse_netmap_output(get_netmap_snapshot(node=resulting_copies[0], shell=self.shell)) netmap = get_netmap_param(netmap) with reporter.step(f"Check two nodes are selected from {placement_params['country']}"): - for node in resulting_copies: - node_address = node.get_rpc_endpoint().split(":")[0] + for storage_node in resulting_copies: + node_address = self.cluster.node(storage_node).get_interface(Interfaces.MGMT) assert ( placement_params["country"] == netmap[node_address]["country"] ), f"The node is selected from the wrong country. Got {netmap[node_address]['country']}" @@ -635,8 +637,8 @@ class TestPolicy(ClusterTestBase): netmap = parse_netmap_output(get_netmap_snapshot(node=resulting_copies[0], shell=self.shell)) netmap = get_netmap_param(netmap) with reporter.step(f"Check two nodes are selected from {' or '.join(placement_params['country'])}"): - for node in resulting_copies: - node_address = node.get_rpc_endpoint().split(":")[0] + for storage_node in resulting_copies: + node_address = self.cluster.node(storage_node).get_interface(Interfaces.MGMT) assert ( (netmap[node_address]["country"] in placement_params["country"]) or (netmap[node_address]["country"] in placement_params["country"]) @@ -756,8 +758,8 @@ class TestPolicy(ClusterTestBase): netmap = parse_netmap_output(get_netmap_snapshot(node=resulting_copies[0], shell=self.shell)) netmap = get_netmap_param(netmap) with reporter.step(f"Check two nodes are selected not from {placement_params['country'][0]}"): - for node in resulting_copies: - node_address = node.get_rpc_endpoint().split(":")[0] + for storage_node in resulting_copies: + node_address = self.cluster.node(storage_node).get_interface(Interfaces.MGMT) assert not (placement_params["country"][0] == netmap[node_address]["country"]) or ( not (placement_params["country"][0] == netmap[node_address]["country"]) and not (placement_params["country_code"] == netmap[node_address]["country_code"]) @@ -809,8 +811,8 @@ class TestPolicy(ClusterTestBase): netmap = parse_netmap_output(get_netmap_snapshot(node=resulting_copies[0], shell=self.shell)) netmap = get_netmap_param(netmap) with reporter.step(f"Check three nodes are selected from any country"): - for node in resulting_copies: - node_address = node.get_rpc_endpoint().split(":")[0] + for storage_node in resulting_copies: + node_address = self.cluster.node(storage_node).get_interface(Interfaces.MGMT) assert (placement_params["country"][0] == netmap[node_address]["country"]) or ( not (placement_params["country"][0] == netmap[node_address]["country"]) and (placement_params["country"][1] == netmap[node_address]["country"]) @@ -864,8 +866,8 @@ class TestPolicy(ClusterTestBase): netmap = parse_netmap_output(get_netmap_snapshot(node=resulting_copies[0], shell=self.shell)) netmap = get_netmap_param(netmap) list_of_location = [] - for node in resulting_copies: - node_address = node.get_rpc_endpoint().split(":")[0] + for storage_node in resulting_copies: + node_address = self.cluster.node(storage_node).get_interface(Interfaces.MGMT) list_of_location.append(netmap[node_address]["location"]) with reporter.step(f"Check two or three nodes are selected from Russia and from any other country"): diff --git a/pytest_tests/testsuites/container/test_policy_with_price.py b/pytest_tests/testsuites/container/test_policy_with_price.py index a431dfd8..6d1dfecd 100644 --- a/pytest_tests/testsuites/container/test_policy_with_price.py +++ b/pytest_tests/testsuites/container/test_policy_with_price.py @@ -14,6 +14,7 @@ from frostfs_testlib.storage.controllers.cluster_state_controller import Cluster from frostfs_testlib.storage.controllers.state_managers.config_state_manager import ConfigStateManager from frostfs_testlib.storage.dataclasses.frostfs_services import StorageNode from frostfs_testlib.storage.dataclasses.object_size import ObjectSize +from frostfs_testlib.storage.dataclasses.storage_object_info import Interfaces from frostfs_testlib.storage.dataclasses.wallet import WalletInfo from frostfs_testlib.testing import parallel from frostfs_testlib.testing.cluster_test_base import ClusterTestBase @@ -34,8 +35,8 @@ class TestPolicyWithPrice(ClusterTestBase): def await_for_price_attribute_on_nodes(self): netmap = parse_netmap_output(get_netmap_snapshot(node=self.cluster.storage_nodes[0], shell=self.shell)) netmap = get_netmap_param(netmap) - for node in self.cluster.storage_nodes: - node_address = node.get_rpc_endpoint().split(":")[0] + for node in self.cluster.cluster_nodes: + node_address = node.get_interface(Interfaces.MGMT) if netmap[node_address]["Price"] is None: return False return True @@ -108,7 +109,7 @@ class TestPolicyWithPrice(ClusterTestBase): with reporter.step(f"Check the object appearance"): netmap = parse_netmap_output(get_netmap_snapshot(node=resulting_copies[0], shell=self.shell)) netmap = get_netmap_param(netmap) - node_address = resulting_copies[0].get_rpc_endpoint().split(":")[0] + node_address = self.cluster.node(resulting_copies[0]).get_interface(Interfaces.MGMT) with reporter.step(f"Check the node is selected with price <= {placement_params['Price']}"): assert ( int(netmap[node_address]["Price"]) <= placement_params["Price"] @@ -160,7 +161,7 @@ class TestPolicyWithPrice(ClusterTestBase): netmap = get_netmap_param(netmap) with reporter.step(f"Check the node is selected with price between 1 and 10"): for node in resulting_copies: - node_address = node.get_rpc_endpoint().split(":")[0] + node_address = self.cluster.node(node).get_interface(Interfaces.MGMT) assert ( int(netmap[node_address]["Price"]) > placement_params["Price"][1] and int(netmap[node_address]["Price"]) <= placement_params["Price"][0] @@ -212,7 +213,7 @@ class TestPolicyWithPrice(ClusterTestBase): netmap = get_netmap_param(netmap) with reporter.step(f"Check two nodes are selected with max and min prices"): for node in resulting_copies: - node_address = node.get_rpc_endpoint().split(":")[0] + node_address = self.cluster.node(node).get_interface(Interfaces.MGMT) assert ( int(netmap[node_address]["Price"]) > placement_params["Price"][1] or int(netmap[node_address]["Price"]) < placement_params["Price"][0] @@ -260,7 +261,7 @@ class TestPolicyWithPrice(ClusterTestBase): netmap = get_netmap_param(netmap) with reporter.step(f"Check two nodes are selected with price > {placement_params['Price']}"): for node in resulting_copies: - node_address = node.get_rpc_endpoint().split(":")[0] + node_address = self.cluster.node(node).get_interface(Interfaces.MGMT) assert ( int(netmap[node_address]["Price"]) > placement_params["Price"] ), f"The node is selected with the wrong price. Got {netmap[node_address]}" @@ -311,7 +312,7 @@ class TestPolicyWithPrice(ClusterTestBase): netmap = get_netmap_param(netmap) with reporter.step(f"Check two nodes are selected not with country code '{placement_params['country_code']}'"): for node in resulting_copies: - node_address = node.get_rpc_endpoint().split(":")[0] + node_address = self.cluster.node(node).get_interface(Interfaces.MGMT) assert ( not netmap[node_address]["country_code"] == placement_params["country_code"] or not netmap[node_address]["country_code"] == placement_params["country_code"] @@ -364,7 +365,7 @@ class TestPolicyWithPrice(ClusterTestBase): netmap = get_netmap_param(netmap) with reporter.step(f"Check all nodes are selected"): for node in resulting_copies: - node_address = node.get_rpc_endpoint().split(":")[0] + node_address = self.cluster.node(node).get_interface(Interfaces.MGMT) assert ( netmap[node_address]["un_locode"] in placement_params["un_locode"] or not netmap[node_address]["un_locode"] == placement_params["un_locode"][1] @@ -420,7 +421,7 @@ class TestPolicyWithPrice(ClusterTestBase): netmap = get_netmap_param(netmap) with reporter.step(f"Check two nodes are selected with price < {placement_params['Price']}"): for node in resulting_copies: - node_address = node.get_rpc_endpoint().split(":")[0] + node_address = self.cluster.node(node).get_interface(Interfaces.MGMT) assert ( int(netmap[node_address]["Price"]) < placement_params["Price"] ), f"The node is selected with the wrong price. Got {netmap[node_address]}" @@ -471,7 +472,7 @@ class TestPolicyWithPrice(ClusterTestBase): netmap = get_netmap_param(netmap) with reporter.step(f"Check three nodes are selected not from {placement_params['continent']}"): for node in resulting_copies: - node_address = node.get_rpc_endpoint().split(":")[0] + node_address = self.cluster.node(node).get_interface(Interfaces.MGMT) assert ( int(netmap[node_address]["Price"]) < placement_params["Price"] and not netmap[node_address]["continent"] == placement_params["continent"] @@ -525,7 +526,7 @@ class TestPolicyWithPrice(ClusterTestBase): netmap = get_netmap_param(netmap) with reporter.step(f"Check all nodes are selected"): for node in resulting_copies: - node_address = node.get_rpc_endpoint().split(":")[0] + node_address = self.cluster.node(node).get_interface(Interfaces.MGMT) assert ( ( int(netmap[node_address]["Price"]) > placement_params["Price"][1] @@ -576,7 +577,7 @@ class TestPolicyWithPrice(ClusterTestBase): with reporter.step(f"Check the object appearance"): netmap = parse_netmap_output(get_netmap_snapshot(node=resulting_copies[0], shell=self.shell)) netmap = get_netmap_param(netmap) - node_address = resulting_copies[0].get_rpc_endpoint().split(":")[0] + node_address = self.cluster.node(resulting_copies[0]).get_interface(Interfaces.MGMT) with reporter.step(f"Check the node is selected with price >= {placement_params['Price']}"): assert ( int(netmap[node_address]["Price"]) >= placement_params["Price"] @@ -628,7 +629,7 @@ class TestPolicyWithPrice(ClusterTestBase): netmap = get_netmap_param(netmap) with reporter.step(f"Check all node are selected"): for node in resulting_copies: - node_address = node.get_rpc_endpoint().split(":")[0] + node_address = self.cluster.node(node).get_interface(Interfaces.MGMT) assert (netmap[node_address]["country"] in placement_params["country"]) or ( int(netmap[node_address]["Price"]) >= placement_params["Price"] ), f"The node is selected from the wrong country or with wrong price. Got {netmap[node_address]}" diff --git a/pytest_tests/testsuites/failovers/test_frostfs_failover_network.py b/pytest_tests/testsuites/failovers/test_frostfs_failover_network.py index f6e982eb..234d88b2 100644 --- a/pytest_tests/testsuites/failovers/test_frostfs_failover_network.py +++ b/pytest_tests/testsuites/failovers/test_frostfs_failover_network.py @@ -309,6 +309,11 @@ class TestFailoverNetwork(ClusterTestBase): block_interface: Interfaces, other_interface: Interfaces, ): + endpoint_id_map = { + Interfaces.DATA_O: 0, + Interfaces.DATA_1: 1, + } + endpoint_id = endpoint_id_map[other_interface] cluster_nodes = self.cluster.cluster_nodes with reporter.step(f"Block {block_interface.value} interfaces"): cluster_state_controller.down_interface(cluster_nodes, block_interface.value) @@ -320,28 +325,27 @@ class TestFailoverNetwork(ClusterTestBase): cid = create_container( wallet=default_wallet, shell=self.shell, - endpoint=f"{cluster_nodes[0].get_data_interface(other_interface.value)[0]}:8080", + endpoint=cluster_nodes[0].storage_node.get_all_rpc_endpoint()[endpoint_id], rule="REP 4 CBF 1", ) with reporter.step("Put object"): file_path = generate_file(simple_object_size.value) - oid = put_object( wallet=default_wallet, path=file_path, cid=cid, shell=self.shell, - endpoint=f"{cluster_nodes[0].get_data_interface(other_interface.value)[0]}:8080", + endpoint=cluster_nodes[0].storage_node.get_all_rpc_endpoint()[endpoint_id], ) with reporter.step("Get object"): - file_get_path = get_object( + get_object( wallet=default_wallet, cid=cid, oid=oid, shell=self.shell, - endpoint=f"{cluster_nodes[0].get_data_interface(other_interface.value)[0]}:8080", + endpoint=cluster_nodes[0].storage_node.get_all_rpc_endpoint()[endpoint_id], ) with reporter.step("Restore interfaces all nodes"): diff --git a/pytest_tests/testsuites/management/test_node_management.py b/pytest_tests/testsuites/management/test_node_management.py index 66fe9b38..5eae5406 100644 --- a/pytest_tests/testsuites/management/test_node_management.py +++ b/pytest_tests/testsuites/management/test_node_management.py @@ -36,7 +36,7 @@ from frostfs_testlib.steps.storage_policy import get_nodes_with_object from frostfs_testlib.storage.cluster import ClusterNode, StorageNode from frostfs_testlib.storage.controllers import ClusterStateController from frostfs_testlib.storage.dataclasses.object_size import ObjectSize -from frostfs_testlib.storage.dataclasses.storage_object_info import NodeStatus +from frostfs_testlib.storage.dataclasses.storage_object_info import Interfaces, NodeStatus from frostfs_testlib.storage.dataclasses.wallet import WalletInfo from frostfs_testlib.testing.cluster_test_base import ClusterTestBase from frostfs_testlib.utils import string_utils @@ -343,9 +343,11 @@ class TestMaintenanceMode(ClusterTestBase): def check_node_status(self, expected_status: NodeStatus, node_under_test: ClusterNode, frostfs_cli: FrostfsCli, rpc_endpoint: str): netmap = frostfs_cli.netmap.snapshot(rpc_endpoint).stdout all_snapshots = NetmapParser.snapshot_all_nodes(netmap) - node_snapshot = [snapshot for snapshot in all_snapshots if node_under_test.host_ip == snapshot.node] + node_snapshot = [snapshot for snapshot in all_snapshots if node_under_test.get_interface(Interfaces.MGMT) == snapshot.node] if expected_status == NodeStatus.OFFLINE and not node_snapshot: - assert node_under_test.host_ip not in netmap, f"{node_under_test} status should be {expected_status}. See netmap:\n{netmap}" + assert ( + node_under_test.get_interface(Interfaces.MGMT) not in netmap + ), f"{node_under_test} status should be {expected_status}. See netmap:\n{netmap}" return assert node_snapshot, f"{node_under_test} status should be {expected_status}, but was not in netmap. See netmap:\n{netmap}" diff --git a/pytest_tests/testsuites/object/test_object_api_patch.py b/pytest_tests/testsuites/object/test_object_api_patch.py index a85a61cb..f74e1aa1 100644 --- a/pytest_tests/testsuites/object/test_object_api_patch.py +++ b/pytest_tests/testsuites/object/test_object_api_patch.py @@ -733,9 +733,11 @@ class TestObjectApiPatch(ClusterTestBase): @allure.title("[NEGATIVE] Patch cannot be applied to part of complex object (policy={placement_policy})") @pytest.mark.parametrize("placement_policy", ["rep"], indirect=True) @pytest.mark.parametrize("object_size", ["complex"], indirect=True) - def test_patch_part_of_complex_object_rep(self, grpc_client: GrpcClientWrapper, container: str, original_object: str): + def test_patch_part_of_complex_object_rep( + self, grpc_client: GrpcClientWrapper, remote_grpc_client: GrpcClientWrapper, container: str, original_object: str + ): with reporter.step("Get parts of complex object"): - parts = grpc_client.object.parts(container, original_object, self.cluster.cluster_nodes[0]) + parts = remote_grpc_client.object.parts(container, original_object, self.cluster.cluster_nodes[0]) assert parts, f"Expected list of OIDs of object parts: {parts}" part_oid = parts[0] @@ -752,9 +754,11 @@ class TestObjectApiPatch(ClusterTestBase): @allure.title("[NEGATIVE] Patch cannot be applied to EC chunk (object_size={object_size}, policy={placement_policy})") @pytest.mark.parametrize("placement_policy", ["ec"], indirect=True) - def test_patch_ec_chunk(self, grpc_client: GrpcClientWrapper, container: str, original_object: str): + def test_patch_ec_chunk( + self, grpc_client: GrpcClientWrapper, remote_grpc_client: GrpcClientWrapper, container: str, original_object: str + ): with reporter.step("Get chunks of object"): - chunks = grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, container, original_object) + chunks = remote_grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, container, original_object) assert chunks, f"Expected object chunks, but they are not there: {chunks}" with reporter.step("Try patch chunk of object and catch exception"): diff --git a/pytest_tests/testsuites/replication/test_ec_replication.py b/pytest_tests/testsuites/replication/test_ec_replication.py index e990c8f0..155f8f24 100644 --- a/pytest_tests/testsuites/replication/test_ec_replication.py +++ b/pytest_tests/testsuites/replication/test_ec_replication.py @@ -145,13 +145,13 @@ class TestECReplication(ClusterTestBase): @allure.title("Create container with EC policy (size={object_size})") @requires_container(PUBLIC_WITH_POLICY("EC 2.1")) def test_create_container_with_ec_policy( - self, container: str, rep_count: int, grpc_client: GrpcClientWrapper, test_file: TestFile + self, container: str, rep_count: int, grpc_client: GrpcClientWrapper, remote_grpc_client: GrpcClientWrapper, test_file: TestFile ) -> None: with reporter.step("Put object in container."): oid = grpc_client.object.put(test_file, container, self.cluster.default_rpc_endpoint) with reporter.step("Check replication chunks."): - assert self.check_replication(rep_count, grpc_client, container, oid) + assert self.check_replication(rep_count, remote_grpc_client, container, oid) @allure.title("Lose node with chunk data") @pytest.mark.failover @@ -159,6 +159,7 @@ class TestECReplication(ClusterTestBase): def test_lose_node_with_data_chunk( self, grpc_client: GrpcClientWrapper, + remote_grpc_client: GrpcClientWrapper, simple_object_size: ObjectSize, cluster_state_controller: ClusterStateController, container: str, @@ -169,10 +170,10 @@ class TestECReplication(ClusterTestBase): oid = grpc_client.object.put(test_file, container, self.cluster.default_rpc_endpoint) with reporter.step("Check chunk replication on 4 nodes."): - assert self.check_replication(4, grpc_client, container, oid) + assert self.check_replication(4, remote_grpc_client, container, oid) with reporter.step("Search node data chunk"): - chunk = grpc_client.object.chunks.get_first_data(self.cluster.default_rpc_endpoint, container, oid=oid) + chunk = remote_grpc_client.object.chunks.get_first_data(self.cluster.default_rpc_endpoint, container, oid=oid) chunk_node = grpc_client.object.chunks.get_chunk_node(self.cluster, chunk) with reporter.step("Stop node with data chunk."): @@ -184,7 +185,7 @@ class TestECReplication(ClusterTestBase): with reporter.step("Start stopped node, and check replication chunks."): cluster_state_controller.start_node_host(chunk_node[0]) - self.wait_replication(4, grpc_client, container, oid) + self.wait_replication(4, remote_grpc_client, container, oid) @allure.title("Lose node with chunk parity") @pytest.mark.failover @@ -192,6 +193,7 @@ class TestECReplication(ClusterTestBase): def test_lose_node_with_parity_chunk( self, grpc_client: GrpcClientWrapper, + remote_grpc_client: GrpcClientWrapper, simple_object_size: ObjectSize, cluster_state_controller: ClusterStateController, container: str, @@ -202,11 +204,11 @@ class TestECReplication(ClusterTestBase): oid = grpc_client.object.put(test_file, container, self.cluster.default_rpc_endpoint) with reporter.step("Check chunk replication on 4 nodes."): - assert self.check_replication(4, grpc_client, container, oid) + assert self.check_replication(4, remote_grpc_client, container, oid) with reporter.step("Search node with parity chunk"): - chunk = grpc_client.object.chunks.get_parity(self.cluster.default_rpc_endpoint, container, oid=oid) - chunk_node = grpc_client.object.chunks.get_chunk_node(self.cluster, chunk)[0] + chunk = remote_grpc_client.object.chunks.get_parity(self.cluster.default_rpc_endpoint, container, oid=oid) + chunk_node = remote_grpc_client.object.chunks.get_chunk_node(self.cluster, chunk)[0] with reporter.step("Stop node parity chunk."): cluster_state_controller.stop_node_host(chunk_node, "hard") @@ -217,7 +219,7 @@ class TestECReplication(ClusterTestBase): with reporter.step("Start stoped node, and check replication chunks."): cluster_state_controller.start_node_host(chunk_node) - self.wait_replication(4, grpc_client, container, oid) + self.wait_replication(4, remote_grpc_client, container, oid) @allure.title("Lose nodes with chunk data and parity") @pytest.mark.failover @@ -225,6 +227,7 @@ class TestECReplication(ClusterTestBase): def test_lose_nodes_data_chunk_and_parity( self, grpc_client: GrpcClientWrapper, + remote_grpc_client: GrpcClientWrapper, simple_object_size: ObjectSize, cluster_state_controller: ClusterStateController, container: str, @@ -235,13 +238,13 @@ class TestECReplication(ClusterTestBase): oid = grpc_client.object.put(test_file, container, self.cluster.default_rpc_endpoint) with reporter.step("Check count chunks, expect 4."): - assert self.check_replication(4, grpc_client, container, oid) + assert self.check_replication(4, remote_grpc_client, container, oid) with reporter.step("Search node data chunk and node parity chunk"): - data_chunk = grpc_client.object.chunks.get_first_data(self.cluster.default_rpc_endpoint, container, oid=oid) - data_chunk_node = grpc_client.object.chunks.get_chunk_node(self.cluster, data_chunk)[0] - parity_chunk = grpc_client.object.chunks.get_parity(self.cluster.default_rpc_endpoint, container, oid=oid) - parity_chunk_node = grpc_client.object.chunks.get_chunk_node(self.cluster, parity_chunk)[0] + data_chunk = remote_grpc_client.object.chunks.get_first_data(self.cluster.default_rpc_endpoint, container, oid=oid) + data_chunk_node = remote_grpc_client.object.chunks.get_chunk_node(self.cluster, data_chunk)[0] + parity_chunk = remote_grpc_client.object.chunks.get_parity(self.cluster.default_rpc_endpoint, container, oid=oid) + parity_chunk_node = remote_grpc_client.object.chunks.get_chunk_node(self.cluster, parity_chunk)[0] with reporter.step("Stop node with data chunk."): cluster_state_controller.stop_node_host(data_chunk_node, "hard") @@ -252,7 +255,7 @@ class TestECReplication(ClusterTestBase): with reporter.step("Start stopped host and check chunks."): cluster_state_controller.start_node_host(data_chunk_node) - self.wait_replication(4, grpc_client, container, oid) + self.wait_replication(4, remote_grpc_client, container, oid) with reporter.step("Stop node with parity chunk and one all node."): cluster_state_controller.stop_node_host(data_chunk_node, "hard") @@ -264,7 +267,7 @@ class TestECReplication(ClusterTestBase): with reporter.step("Start stopped nodes and check replication chunk."): cluster_state_controller.start_stopped_hosts() - self.wait_replication(4, grpc_client, container, oid) + self.wait_replication(4, remote_grpc_client, container, oid) @allure.title("Policer work with chunk") @pytest.mark.failover @@ -273,6 +276,7 @@ class TestECReplication(ClusterTestBase): self, simple_object_size: ObjectSize, grpc_client: GrpcClientWrapper, + remote_grpc_client: GrpcClientWrapper, cluster_state_controller: ClusterStateController, container: str, include_excluded_nodes: None, @@ -282,12 +286,12 @@ class TestECReplication(ClusterTestBase): oid = grpc_client.object.put(test_file, container, self.cluster.default_rpc_endpoint) with reporter.step("Check count chunks nodes on 3."): - assert self.check_replication(3, grpc_client, container, oid) + assert self.check_replication(3, remote_grpc_client, container, oid) with reporter.step("Search node with chunk."): - data_chunk = grpc_client.object.chunks.get_first_data(self.cluster.default_rpc_endpoint, container, oid=oid) - node_data_chunk = grpc_client.object.chunks.get_chunk_node(self.cluster, data_chunk)[0] - first_all_chunks = grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, container, oid) + data_chunk = remote_grpc_client.object.chunks.get_first_data(self.cluster.default_rpc_endpoint, container, oid=oid) + node_data_chunk = remote_grpc_client.object.chunks.get_chunk_node(self.cluster, data_chunk)[0] + first_all_chunks = remote_grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, container, oid) with reporter.step("Remove chunk node from network map"): cluster_state_controller.remove_node_from_netmap([node_data_chunk.storage_node]) @@ -300,10 +304,10 @@ class TestECReplication(ClusterTestBase): node = grpc_client.object.chunks.search_node_without_chunks( first_all_chunks, self.cluster, alive_node.storage_node.get_rpc_endpoint() )[0] - self.wait_replication(3, grpc_client, container, oid) + self.wait_replication(3, remote_grpc_client, container, oid) with reporter.step("Get new chunks"): - second_all_chunks = grpc_client.object.chunks.get_all(node.storage_node.get_rpc_endpoint(), container, oid) + second_all_chunks = remote_grpc_client.object.chunks.get_all(node.storage_node.get_rpc_endpoint(), container, oid) with reporter.step("Check that oid no change."): assert [chunk for chunk in second_all_chunks if data_chunk.object_id == chunk.object_id] @@ -311,11 +315,17 @@ class TestECReplication(ClusterTestBase): with reporter.step("Include node in netmap"): cluster_state_controller.include_node_to_netmap(node_data_chunk.storage_node, alive_node.storage_node) - self.wait_sync_count_chunks_nodes(grpc_client, container, oid, 3) + self.wait_sync_count_chunks_nodes(remote_grpc_client, container, oid, 3) @allure.title("EC X.Y combinations (nodes={node_count},policy={ec_policy},size={object_size})") def test_create_container_with_difference_count_nodes( - self, frostfs_cli: FrostfsCli, node_count: int, ec_policy: str, object_size: ObjectSize, grpc_client: GrpcClientWrapper + self, + frostfs_cli: FrostfsCli, + node_count: int, + ec_policy: str, + object_size: ObjectSize, + grpc_client: GrpcClientWrapper, + remote_grpc_client: GrpcClientWrapper, ) -> None: with reporter.step("Create container."): expected_chunks = int(ec_policy.split(" ")[1].split(".")[0]) + int(ec_policy.split(" ")[1].split(".")[1]) @@ -336,7 +346,7 @@ class TestECReplication(ClusterTestBase): oid = grpc_client.object.put(test_file, cid, self.cluster.default_rpc_endpoint) with reporter.step("Check count object chunks."): - chunks = grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, cid, oid) + chunks = remote_grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, cid, oid) assert len(chunks) == expected_chunks with reporter.step("get object and check hash."): @@ -345,13 +355,15 @@ class TestECReplication(ClusterTestBase): @allure.title("Request PUT with copies_number flag") @requires_container(PUBLIC_WITH_POLICY("EC 2.1")) - def test_put_object_with_copies_number(self, container: str, grpc_client: GrpcClientWrapper, simple_object_size: ObjectSize) -> None: + def test_put_object_with_copies_number( + self, container: str, grpc_client: GrpcClientWrapper, remote_grpc_client: GrpcClientWrapper, simple_object_size: ObjectSize + ) -> None: with reporter.step("Put object in container with copies number = 1"): test_file = generate_file(simple_object_size.value) oid = grpc_client.object.put(test_file, container, self.cluster.default_rpc_endpoint, copies_number=1) with reporter.step("Check that count chunks > 1."): - chunks = grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, container, oid) + chunks = remote_grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, container, oid) assert len(chunks) > 1 @allure.title("Request PUT and 1 node off") @@ -373,13 +385,15 @@ class TestECReplication(ClusterTestBase): @allure.title("Request PUT (size={object_size})") @requires_container(PUBLIC_WITH_POLICY("EC 2.1")) - def test_put_object_with_ec_cnr(self, container: str, grpc_client: GrpcClientWrapper, object_size: ObjectSize) -> None: + def test_put_object_with_ec_cnr( + self, container: str, grpc_client: GrpcClientWrapper, remote_grpc_client: GrpcClientWrapper, object_size: ObjectSize + ) -> None: with reporter.step("Put object in container"): test_file = generate_file(object_size.value) oid = grpc_client.object.put(test_file, container, self.cluster.default_rpc_endpoint) with reporter.step("Get chunks object."): - chunks = grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, container, oid) + chunks = remote_grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, container, oid) with reporter.step("Check header chunks object"): for chunk in chunks: @@ -390,14 +404,16 @@ class TestECReplication(ClusterTestBase): @allure.title("Request GET (size={object_size})") @requires_container(PUBLIC_WITH_POLICY("EC 2.1 CBF 1")) - def test_get_object_in_ec_cnr(self, container: str, grpc_client: GrpcClientWrapper, object_size: ObjectSize) -> None: + def test_get_object_in_ec_cnr( + self, container: str, grpc_client: GrpcClientWrapper, remote_grpc_client: GrpcClientWrapper, object_size: ObjectSize + ) -> None: with reporter.step("Put object in container"): test_file = generate_file(object_size.value) hash_origin_file = get_file_hash(test_file) oid = grpc_client.object.put(test_file, container, self.cluster.default_rpc_endpoint) with reporter.step("Get id all chunks."): - chunks = grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, container, oid) + chunks = remote_grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, container, oid) with reporter.step("Search chunk node and not chunks node."): chunk_node = grpc_client.object.chunks.get_chunk_node(self.cluster, chunks[0])[0] @@ -428,26 +444,30 @@ class TestECReplication(ClusterTestBase): @allure.title("Request SEARCH check valid chunk id (size={object_size})") @requires_container(PUBLIC_WITH_POLICY("EC 2.1")) - def test_search_object_in_ec_cnr_chunk_id(self, container: str, grpc_client: GrpcClientWrapper, object_size: ObjectSize) -> None: + def test_search_object_in_ec_cnr_chunk_id( + self, container: str, grpc_client: GrpcClientWrapper, remote_grpc_client: GrpcClientWrapper, object_size: ObjectSize + ) -> None: with reporter.step("Put object in container"): test_file = generate_file(object_size.value) oid = grpc_client.object.put(test_file, container, self.cluster.default_rpc_endpoint) with reporter.step("Search operation object"): search_output = grpc_client.object.search(container, self.cluster.default_rpc_endpoint) - chunks = grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, container, oid) + chunks = remote_grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, container, oid) for chunk in chunks: assert chunk.object_id in search_output @allure.title("Request SEARCH check no chunk index info (size={object_size})") @requires_container(PUBLIC_WITH_POLICY("EC 2.1")) - def test_search_object_in_ec_cnr(self, container: str, grpc_client: GrpcClientWrapper, object_size: ObjectSize) -> None: + def test_search_object_in_ec_cnr( + self, container: str, grpc_client: GrpcClientWrapper, remote_grpc_client: GrpcClientWrapper, object_size: ObjectSize + ) -> None: with reporter.step("Put object in container"): test_file = generate_file(object_size.value) oid = grpc_client.object.put(test_file, container, self.cluster.default_rpc_endpoint) with reporter.step("Search operation all chunk"): - chunks = grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, container, oid) + chunks = remote_grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, container, oid) for chunk in chunks: chunk_search = grpc_client.object.search(container, self.cluster.default_rpc_endpoint, oid=chunk.object_id) assert "index" not in chunk_search @@ -456,14 +476,19 @@ class TestECReplication(ClusterTestBase): @pytest.mark.failover @requires_container(PUBLIC_WITH_POLICY("EC 2.1")) def test_delete_object_in_ec_cnr( - self, container: str, grpc_client: GrpcClientWrapper, object_size: ObjectSize, cluster_state_controller: ClusterStateController + self, + container: str, + grpc_client: GrpcClientWrapper, + remote_grpc_client: GrpcClientWrapper, + object_size: ObjectSize, + cluster_state_controller: ClusterStateController, ) -> None: with reporter.step("Put object in container."): test_file = generate_file(object_size.value) oid = grpc_client.object.put(test_file, container, self.cluster.default_rpc_endpoint) with reporter.step("Check object chunks nodes."): - chunks = grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, container, oid) + chunks = remote_grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, container, oid) replication_count = 3 if object_size.name == "simple" else 3 * 4 assert len(chunks) == replication_count @@ -479,7 +504,7 @@ class TestECReplication(ClusterTestBase): oid_second = grpc_client.object.put(test_file, container, self.cluster.default_rpc_endpoint) with reporter.step("Check second object chunks nodes."): - chunks_second_object = grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, container, oid_second) + chunks_second_object = remote_grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, container, oid_second) assert len(chunks_second_object) == replication_count with reporter.step("Stop nodes with chunk."): @@ -503,6 +528,7 @@ class TestECReplication(ClusterTestBase): container: str, test_file: TestFile, grpc_client: GrpcClientWrapper, + remote_grpc_client: GrpcClientWrapper, frostfs_cli: FrostfsCli, object_size: ObjectSize, cluster_state_controller: ClusterStateController, @@ -512,7 +538,7 @@ class TestECReplication(ClusterTestBase): oid = grpc_client.object.put(test_file, container, self.cluster.default_rpc_endpoint) with reporter.step("Check object chunks nodes."): - chunks_object = grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, container, oid) + chunks_object = remote_grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, container, oid) replication_count = 3 if object_size.name == "simple" else 3 * 4 assert len(chunks_object) == replication_count @@ -613,6 +639,7 @@ class TestECReplication(ClusterTestBase): def test_create_container_with_filter( self, grpc_client: GrpcClientWrapper, + remote_grpc_client: GrpcClientWrapper, simple_object_size: ObjectSize, container: str, ) -> None: @@ -621,10 +648,10 @@ class TestECReplication(ClusterTestBase): oid = grpc_client.object.put(test_file, container, self.cluster.default_rpc_endpoint) with reporter.step("Check object is decomposed exclusively on Russian nodes"): - data_chunk = grpc_client.object.chunks.get_first_data(self.cluster.default_rpc_endpoint, container, oid=oid) - parity_chunk = grpc_client.object.chunks.get_parity(self.cluster.default_rpc_endpoint, container, oid=oid) - node_data_chunk = grpc_client.object.chunks.get_chunk_node(self.cluster, data_chunk) - node_parity_chunk = grpc_client.object.chunks.get_chunk_node(self.cluster, parity_chunk) + data_chunk = remote_grpc_client.object.chunks.get_first_data(self.cluster.default_rpc_endpoint, container, oid=oid) + parity_chunk = remote_grpc_client.object.chunks.get_parity(self.cluster.default_rpc_endpoint, container, oid=oid) + node_data_chunk = remote_grpc_client.object.chunks.get_chunk_node(self.cluster, data_chunk) + node_parity_chunk = remote_grpc_client.object.chunks.get_chunk_node(self.cluster, parity_chunk) for node in [node_data_chunk[1], node_parity_chunk[1]]: assert "Russia" in node.country @@ -637,6 +664,7 @@ class TestECReplication(ClusterTestBase): container: str, restore_nodes_shards_mode: None, frostfs_cli: FrostfsCli, + remote_frostfs_cli: FrostfsCli, grpc_client: GrpcClientWrapper, max_object_size: int, type: str, @@ -647,7 +675,7 @@ class TestECReplication(ClusterTestBase): oid = grpc_client.object.put(test_file, container, self.cluster.default_rpc_endpoint) with reporter.step("Get object chunks."): - chunk = get_chunk(self, frostfs_cli, container, oid, self.cluster.default_rpc_endpoint) + chunk = get_chunk(self, remote_frostfs_cli, container, oid, self.cluster.default_rpc_endpoint) chunk_node = grpc_client.object.chunks.get_chunk_node(self.cluster, chunk) frostfs_node_cli = self.get_node_cli(chunk_node[0], config=chunk_node[0].storage_node.get_remote_wallet_config_path()) @@ -692,7 +720,7 @@ class TestECReplication(ClusterTestBase): test_file: TestFile, s3_client: S3ClientWrapper, bucket_container_resolver: BucketContainerResolver, - grpc_client: GrpcClientWrapper, + remote_grpc_client: GrpcClientWrapper, object_size: ObjectSize, ) -> None: with reporter.step("Create bucket with EC location constrain"): @@ -708,25 +736,27 @@ class TestECReplication(ClusterTestBase): with reporter.step("Watch replication count chunks"): cid = bucket_container_resolver.resolve(self.cluster.cluster_nodes[0], bucket) - chunks = grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, cid, bucket_object) + chunks = remote_grpc_client.object.chunks.get_all(self.cluster.default_rpc_endpoint, cid, bucket_object) expect_chunks = 4 if object_size.name == "simple" else 16 assert len(chunks) == expect_chunks @allure.title("Replication chunk after drop (size={object_size})") @requires_container(PUBLIC_WITH_POLICY("EC 2.1 CBF 1")) - def test_drop_chunk_and_replication(self, test_file: TestFile, container: str, grpc_client: GrpcClientWrapper, rep_count: int) -> None: + def test_drop_chunk_and_replication( + self, test_file: TestFile, container: str, grpc_client: GrpcClientWrapper, remote_grpc_client: GrpcClientWrapper, rep_count: int + ) -> None: with reporter.step("Put object"): oid = grpc_client.object.put(test_file, container, self.cluster.default_rpc_endpoint) with reporter.step("Get all chunks"): - data_chunk = grpc_client.object.chunks.get_first_data(self.cluster.default_rpc_endpoint, container, oid=oid) + data_chunk = remote_grpc_client.object.chunks.get_first_data(self.cluster.default_rpc_endpoint, container, oid=oid) with reporter.step("Search chunk node"): - chunk_node = grpc_client.object.chunks.get_chunk_node(self.cluster, data_chunk) + chunk_node = remote_grpc_client.object.chunks.get_chunk_node(self.cluster, data_chunk) shell_chunk_node = chunk_node[0].host.get_shell() with reporter.step("Get replication count"): - assert self.check_replication(rep_count, grpc_client, container, oid) + assert self.check_replication(rep_count, remote_grpc_client, container, oid) with reporter.step("Delete chunk"): frostfs_node_cli = FrostfsCli( @@ -737,4 +767,4 @@ class TestECReplication(ClusterTestBase): frostfs_node_cli.control.drop_objects(chunk_node[0].storage_node.get_control_endpoint(), f"{container}/{data_chunk.object_id}") with reporter.step("Wait replication count after drop one chunk"): - self.wait_replication(rep_count, grpc_client, container, oid) + self.wait_replication(rep_count, remote_grpc_client, container, oid)