import json import time import allure import pytest from frostfs_testlib import reporter from frostfs_testlib.cli.frostfs_cli.cli import FrostfsCli from frostfs_testlib.resources.common import MORPH_BLOCK_TIME from frostfs_testlib.resources.wellknown_acl import PUBLIC_ACL from frostfs_testlib.shell import Shell from frostfs_testlib.steps.cli.container import create_container, search_nodes_with_container from frostfs_testlib.steps.cli.object import put_object_to_random_node from frostfs_testlib.storage.cluster import Cluster, ClusterNode from frostfs_testlib.storage.dataclasses import ape from frostfs_testlib.storage.dataclasses.wallet import WalletInfo from frostfs_testlib.testing.parallel import parallel from frostfs_testlib.utils import datetime_utils from ...helpers.container_spec import ContainerSpec OBJECT_COUNT = 5 @pytest.fixture(scope="session") def ir_wallet(cluster: Cluster) -> WalletInfo: return WalletInfo.from_node(cluster.ir_nodes[0]) @pytest.fixture(scope="session") def storage_wallet(cluster: Cluster) -> WalletInfo: return WalletInfo.from_node(cluster.storage_nodes[0]) @pytest.fixture(scope="session") def role(request: pytest.FixtureRequest): return request.param @pytest.fixture(scope="session") def test_wallet(default_wallet: WalletInfo, other_wallet: WalletInfo, role: ape.Role): role_to_wallet_map = { ape.Role.OWNER: default_wallet, ape.Role.OTHERS: other_wallet, } assert role in role_to_wallet_map, "Missing wallet with role {role}" return role_to_wallet_map[role] @pytest.fixture def container( default_wallet: WalletInfo, frostfs_cli: FrostfsCli, client_shell: Shell, cluster: Cluster, request: pytest.FixtureRequest, rpc_endpoint: str, ) -> str: container_spec = _get_container_spec(request) cid = _create_container_by_spec(default_wallet, client_shell, cluster, rpc_endpoint, container_spec) if container_spec.allow_owner_via_ape: _allow_owner_via_ape(frostfs_cli, cluster, cid) return cid def _create_container_by_spec( default_wallet: WalletInfo, client_shell: Shell, cluster: Cluster, rpc_endpoint: str, container_spec: ContainerSpec ) -> str: # TODO: add container spec to step message with reporter.step("Create container"): cid = create_container( default_wallet, client_shell, rpc_endpoint, basic_acl=container_spec.basic_acl, rule=container_spec.parsed_rule(cluster) ) with reporter.step("Search nodes holding the container"): container_holder_nodes = search_nodes_with_container(default_wallet, cid, client_shell, cluster.default_rpc_endpoint, cluster) report_data = {node.id: node.host_ip for node in container_holder_nodes} reporter.attach(json.dumps(report_data, indent=2), "container_nodes.json") return cid def _get_container_spec(request: pytest.FixtureRequest) -> ContainerSpec: container_marker = request.node.get_closest_marker("container") # let default container to be public at the moment container_spec = ContainerSpec(basic_acl=PUBLIC_ACL) if container_marker: if len(container_marker.args) != 1: raise RuntimeError(f"Something wrong with container marker: {container_marker}") container_spec = container_marker.args[0] if "param" in request.__dict__: container_spec = request.param if not container_spec: raise RuntimeError( f"""Container specification is empty. Either add @pytest.mark.container(ContainerSpec(...)) or @pytest.mark.parametrize(\"container\", [ContainerSpec(...)], indirect=True) decorator""" ) return container_spec def _allow_owner_via_ape(frostfs_cli: FrostfsCli, cluster: Cluster, container: str): with reporter.step("Create allow APE rule for container owner"): role_condition = ape.Condition.by_role(ape.Role.OWNER) deny_rule = ape.Rule(ape.Verb.ALLOW, ape.ObjectOperations.WILDCARD_ALL, role_condition) frostfs_cli.ape_manager.add( cluster.default_rpc_endpoint, deny_rule.chain_id, target_name=container, target_type="container", rule=deny_rule.as_string(), ) with reporter.step("Wait for one block"): time.sleep(datetime_utils.parse_time(MORPH_BLOCK_TIME)) @pytest.fixture def objects(container: str, default_wallet: WalletInfo, client_shell: Shell, cluster: Cluster, file_path: str): with reporter.step("Add test objects to container"): put_results = parallel( [put_object_to_random_node] * OBJECT_COUNT, wallet=default_wallet, path=file_path, cid=container, shell=client_shell, cluster=cluster, ) objects_oids = [put_result.result() for put_result in put_results] return objects_oids @pytest.fixture def container_nodes(default_wallet: WalletInfo, container: str, client_shell: Shell, cluster: Cluster) -> list[ClusterNode]: cid = container container_holder_nodes = search_nodes_with_container(default_wallet, cid, client_shell, cluster.default_rpc_endpoint, cluster) report_data = {node.id: node.host_ip for node in container_holder_nodes} reporter.attach(json.dumps(report_data, indent=2), "container_nodes.json") return container_holder_nodes @pytest.fixture def container_node_wallet(container_nodes: list[ClusterNode]) -> WalletInfo: return WalletInfo.from_node(container_nodes[0].storage_node)