forked from TrueCloudLab/frostfs-testcases
83 lines
3 KiB
Python
83 lines
3 KiB
Python
from dataclasses import dataclass
|
|
from functools import partial
|
|
|
|
import pytest
|
|
from frostfs_testlib.steps.cli.container import DEFAULT_PLACEMENT_RULE
|
|
from frostfs_testlib.storage.cluster import Cluster
|
|
from frostfs_testlib.storage.dataclasses import ape
|
|
|
|
APE_EVERYONE_ALLOW_ALL = [ape.Rule(ape.Verb.ALLOW, ape.ObjectOperations.WILDCARD_ALL)]
|
|
# In case if we need container operations
|
|
# ape.Rule(ape.Verb.ALLOW, ape.ContainerOperations.WILDCARD_ALL)]
|
|
APE_OWNER_ALLOW_ALL = [ape.Rule(ape.Verb.ALLOW, ape.ObjectOperations.WILDCARD_ALL, ape.Condition.by_role(ape.Role.OWNER))]
|
|
# In case if we need container operations
|
|
# ape.Rule(ape.Verb.ALLOW, ape.ContainerOperations.WILDCARD_ALL, ape.Condition.by_role(ape.Role.OWNER))]
|
|
|
|
|
|
@dataclass
|
|
class ContainerRequest:
|
|
policy: str = None
|
|
ape_rules: list[ape.Rule] = None
|
|
|
|
short_name: str | None = None
|
|
|
|
def __post_init__(self):
|
|
if self.ape_rules is None:
|
|
self.ape_rules = []
|
|
|
|
# For pytest instead of ids=[...] everywhere
|
|
self.__name__ = self.short_name
|
|
|
|
def parsed_rule(self, cluster: Cluster):
|
|
if self.policy is None:
|
|
return None
|
|
|
|
substitutions = {"%NODE_COUNT%": str(len(cluster.cluster_nodes))}
|
|
|
|
parsed_rule = self.policy
|
|
for sub, replacement in substitutions.items():
|
|
parsed_rule = parsed_rule.replace(sub, replacement)
|
|
|
|
return parsed_rule
|
|
|
|
def __repr__(self):
|
|
if self.short_name:
|
|
return self.short_name
|
|
|
|
spec_info: list[str] = []
|
|
|
|
if self.policy:
|
|
spec_info.append(f"policy='{self.policy}'")
|
|
if self.ape_rules:
|
|
ape_rules_list = ", ".join([f"'{rule.as_string()}'" for rule in self.ape_rules])
|
|
spec_info.append(f"ape_rules=[{ape_rules_list}]")
|
|
|
|
return f"({', '.join(spec_info)})"
|
|
|
|
|
|
class MultipleContainersRequest(list[ContainerRequest]):
|
|
def __init__(self, iterable=None):
|
|
"""Override initializer which can accept iterable"""
|
|
super(MultipleContainersRequest, self).__init__()
|
|
if iterable:
|
|
self.extend(iterable)
|
|
self.__set_name()
|
|
|
|
def __set_name(self):
|
|
self.__name__ = ", ".join([s.__name__ for s in self])
|
|
|
|
|
|
PUBLIC_WITH_POLICY = partial(ContainerRequest, ape_rules=APE_EVERYONE_ALLOW_ALL, short_name="Custom_policy_with_allow_all_ape_rule")
|
|
EVERYONE_ALLOW_ALL = ContainerRequest(policy=DEFAULT_PLACEMENT_RULE, ape_rules=APE_EVERYONE_ALLOW_ALL, short_name="Everyone_Allow_All")
|
|
OWNER_ALLOW_ALL = ContainerRequest(policy=DEFAULT_PLACEMENT_RULE, ape_rules=APE_OWNER_ALLOW_ALL, short_name="Owner_Allow_All")
|
|
PRIVATE = ContainerRequest(policy=DEFAULT_PLACEMENT_RULE, ape_rules=[], short_name="Private_No_APE")
|
|
|
|
|
|
def requires_container(container_request: None | ContainerRequest | list[ContainerRequest] = None) -> pytest.MarkDecorator:
|
|
if container_request is None:
|
|
container_request = EVERYONE_ALLOW_ALL
|
|
|
|
if not isinstance(container_request, list):
|
|
container_request = [container_request]
|
|
|
|
return pytest.mark.parametrize("container_request", container_request, indirect=True)
|