from dataclasses import dataclass from frostfs_testlib.steps.cli.container import DEFAULT_PLACEMENT_RULE from frostfs_testlib.storage.cluster import Cluster from frostfs_testlib.storage.dataclasses import ape APE_EVERYONE_ALLOW_ALL = [ape.Rule(ape.Verb.ALLOW, ape.ObjectOperations.WILDCARD_ALL)] # In case if we need container operations # ape.Rule(ape.Verb.ALLOW, ape.ContainerOperations.WILDCARD_ALL)] APE_OWNER_ALLOW_ALL = [ape.Rule(ape.Verb.ALLOW, ape.ObjectOperations.WILDCARD_ALL, ape.Condition.by_role(ape.Role.OWNER))] # In case if we need container operations # ape.Rule(ape.Verb.ALLOW, ape.ContainerOperations.WILDCARD_ALL, ape.Condition.by_role(ape.Role.OWNER))] @dataclass class ContainerRequest: policy: str = None ape_rules: list[ape.Rule] = None short_name: str | None = None def __post_init__(self): if self.ape_rules is None: self.ape_rules = [] # For pytest instead of ids=[...] everywhere self.__name__ = self.short_name def parsed_rule(self, cluster: Cluster): if self.policy is None: return None substitutions = {"%NODE_COUNT%": str(len(cluster.cluster_nodes))} parsed_rule = self.policy for sub, replacement in substitutions.items(): parsed_rule = parsed_rule.replace(sub, replacement) return parsed_rule def __repr__(self): if self.short_name: return self.short_name spec_info: list[str] = [] if self.policy: spec_info.append(f"policy='{self.policy}'") if self.ape_rules: ape_rules_list = ", ".join([f"'{rule.as_string()}'" for rule in self.ape_rules]) spec_info.append(f"ape_rules=[{ape_rules_list}]") return f"({', '.join(spec_info)})" EVERYONE_ALLOW_ALL = ContainerRequest(policy=DEFAULT_PLACEMENT_RULE, ape_rules=APE_EVERYONE_ALLOW_ALL, short_name="Everyone_Allow_All") OWNER_ALLOW_ALL = ContainerRequest(policy=DEFAULT_PLACEMENT_RULE, ape_rules=APE_OWNER_ALLOW_ALL, short_name="Owner_Allow_All") PRIVATE = ContainerRequest(policy=DEFAULT_PLACEMENT_RULE, ape_rules=[], short_name="Private_No_APE")