from dataclasses import dataclass from functools import partial import pytest from frostfs_testlib.steps.cli.container import DEFAULT_PLACEMENT_RULE from frostfs_testlib.storage.cluster import Cluster from frostfs_testlib.storage.dataclasses import ape APE_EVERYONE_ALLOW_ALL = [ape.Rule(ape.Verb.ALLOW, ape.ObjectOperations.WILDCARD_ALL)] # In case if we need container operations # ape.Rule(ape.Verb.ALLOW, ape.ContainerOperations.WILDCARD_ALL)] APE_OWNER_ALLOW_ALL = [ape.Rule(ape.Verb.ALLOW, ape.ObjectOperations.WILDCARD_ALL, ape.Condition.by_role(ape.Role.OWNER))] # In case if we need container operations # ape.Rule(ape.Verb.ALLOW, ape.ContainerOperations.WILDCARD_ALL, ape.Condition.by_role(ape.Role.OWNER))] @dataclass class ContainerRequest: policy: str = None ape_rules: list[ape.Rule] = None short_name: str | None = None ns_name: str | None = None ns_zone: str | None = None def __post_init__(self): if self.ape_rules is None: self.ape_rules = [] # For pytest instead of ids=[...] everywhere self.__name__ = self.short_name def parsed_rule(self, cluster: Cluster): if self.policy is None: return None substitutions = {"%NODE_COUNT%": str(len(cluster.cluster_nodes))} parsed_rule = self.policy for sub, replacement in substitutions.items(): parsed_rule = parsed_rule.replace(sub, replacement) return parsed_rule def __repr__(self): if self.short_name: return self.short_name spec_info: list[str] = [] if self.policy: spec_info.append(f"policy='{self.policy}'") if self.ape_rules: ape_rules_list = ", ".join([f"'{rule.as_string()}'" for rule in self.ape_rules]) spec_info.append(f"ape_rules=[{ape_rules_list}]") return f"({', '.join(spec_info)})" class MultipleContainersRequest(list[ContainerRequest]): def __init__(self, iterable=None): """Override initializer which can accept iterable""" super(MultipleContainersRequest, self).__init__() if iterable: self.extend(iterable) self.__set_name() def __set_name(self): self.__name__ = ", ".join([s.__name__ for s in self]) PUBLIC_WITH_POLICY = partial(ContainerRequest, ape_rules=APE_EVERYONE_ALLOW_ALL, short_name="Custom_policy_with_allow_all_ape_rule") # REPS REP_1_1_1 = "REP 1 IN X CBF 1 SELECT 1 FROM * AS X" REP_2_1_2 = "REP 2 IN X CBF 1 SELECT 2 FROM * AS X" REP_2_1_4 = "REP 2 IN X CBF 1 SELECT 4 FROM * AS X" REP_2_2_2 = "REP 2 IN X CBF 2 SELECT 2 FROM * AS X" REP_2_2_4 = "REP 2 IN X CBF 2 SELECT 4 FROM * AS X" # # Public means it has APE rule which allows everything for everyone REP_1_1_1_PUBLIC = PUBLIC_WITH_POLICY(REP_1_1_1, short_name="REP 1 CBF 1 SELECT 1 (public)") REP_2_1_2_PUBLIC = PUBLIC_WITH_POLICY(REP_2_1_2, short_name="REP 2 CBF 1 SELECT 2 (public)") REP_2_1_4_PUBLIC = PUBLIC_WITH_POLICY(REP_2_1_4, short_name="REP 2 CBF 1 SELECT 4 (public)") REP_2_2_2_PUBLIC = PUBLIC_WITH_POLICY(REP_2_2_2, short_name="REP 2 CBF 2 SELECT 2 (public)") REP_2_2_4_PUBLIC = PUBLIC_WITH_POLICY(REP_2_2_4, short_name="REP 2 CBF 2 SELECT 4 (public)") # EVERYONE_ALLOW_ALL = ContainerRequest(policy=DEFAULT_PLACEMENT_RULE, ape_rules=APE_EVERYONE_ALLOW_ALL, short_name="Everyone_Allow_All") OWNER_ALLOW_ALL = ContainerRequest(policy=DEFAULT_PLACEMENT_RULE, ape_rules=APE_OWNER_ALLOW_ALL, short_name="Owner_Allow_All") PRIVATE = ContainerRequest(policy=DEFAULT_PLACEMENT_RULE, ape_rules=[], short_name="Private_No_APE") def requires_container(container_request: None | ContainerRequest | list[ContainerRequest] = None) -> pytest.MarkDecorator: if container_request is None: container_request = EVERYONE_ALLOW_ALL if not isinstance(container_request, list): container_request = [container_request] return pytest.mark.parametrize("container_request", container_request, indirect=True)