from dataclasses import dataclass from frostfs_testlib.steps.cli.container import DEFAULT_PLACEMENT_RULE from frostfs_testlib.storage.cluster import Cluster from frostfs_testlib.storage.dataclasses import ape APE_PUBLIC_READ_WRITE = [ape.Rule(ape.Verb.ALLOW, ape.ObjectOperations.WILDCARD_ALL)] @dataclass class ContainerSpec: rule: str = DEFAULT_PLACEMENT_RULE # TODO: Deprecated basic_acl: str = None # TODO: Deprecated allow_owner_via_ape: bool = False ape_rules: list[ape.Rule] = None def __post_init__(self): if self.ape_rules is None: self.ape_rules = [] def parsed_rule(self, cluster: Cluster): if self.rule is None: return None substitutions = {"%NODE_COUNT%": str(len(cluster.cluster_nodes))} parsed_rule = self.rule for sub, replacement in substitutions.items(): parsed_rule = parsed_rule.replace(sub, replacement) return parsed_rule def __repr__(self): spec_info: list[str] = [] if self.rule: spec_info.append(f"rule='{self.rule}'") if self.ape_rules: ape_rules_list = ", ".join([f"'{rule.as_string()}'" for rule in self.ape_rules]) spec_info.append(f"ape_rules=[{ape_rules_list}]") return f"ContainerSpec({', '.join(spec_info)})" class ContainerSpecs: PublicReadWrite = ContainerSpec(ape_rules=APE_PUBLIC_READ_WRITE)