frostfs-testcases/pytest_tests/helpers/container_request.py

108 lines
3.9 KiB
Python
Raw Permalink Normal View History

from dataclasses import dataclass
from functools import partial
import pytest
from frostfs_testlib.steps.cli.container import DEFAULT_PLACEMENT_RULE
from frostfs_testlib.storage.cluster import Cluster
from frostfs_testlib.storage.dataclasses import ape
APE_EVERYONE_ALLOW_ALL = [ape.Rule(ape.Verb.ALLOW, ape.ObjectOperations.WILDCARD_ALL)]
# In case if we need container operations
# ape.Rule(ape.Verb.ALLOW, ape.ContainerOperations.WILDCARD_ALL)]
APE_OWNER_ALLOW_ALL = [ape.Rule(ape.Verb.ALLOW, ape.ObjectOperations.WILDCARD_ALL, ape.Condition.by_role(ape.Role.OWNER))]
# In case if we need container operations
# ape.Rule(ape.Verb.ALLOW, ape.ContainerOperations.WILDCARD_ALL, ape.Condition.by_role(ape.Role.OWNER))]
@dataclass
class ContainerRequest:
policy: str = None
ape_rules: list[ape.Rule] = None
short_name: str | None = None
ns_name: str | None = None
ns_zone: str | None = None
def __post_init__(self):
if self.ape_rules is None:
self.ape_rules = []
# For pytest instead of ids=[...] everywhere
self.__name__ = self.short_name
def parsed_rule(self, cluster: Cluster):
if self.policy is None:
return None
substitutions = {"%NODE_COUNT%": str(len(cluster.cluster_nodes))}
parsed_rule = self.policy
for sub, replacement in substitutions.items():
parsed_rule = parsed_rule.replace(sub, replacement)
return parsed_rule
def __repr__(self):
if self.short_name:
return self.short_name
spec_info: list[str] = []
if self.policy:
spec_info.append(f"policy='{self.policy}'")
if self.ape_rules:
ape_rules_list = ", ".join([f"'{rule.as_string()}'" for rule in self.ape_rules])
spec_info.append(f"ape_rules=[{ape_rules_list}]")
return f"({', '.join(spec_info)})"
class MultipleContainersRequest(list[ContainerRequest]):
def __init__(self, iterable=None):
"""Override initializer which can accept iterable"""
super(MultipleContainersRequest, self).__init__()
if iterable:
self.extend(iterable)
self.__set_name()
def __set_name(self):
self.__name__ = ", ".join([s.__name__ for s in self])
PUBLIC_WITH_POLICY = partial(ContainerRequest, ape_rules=APE_EVERYONE_ALLOW_ALL, short_name="Custom_policy_with_allow_all_ape_rule")
# REPS
REP_1_1_1 = "REP 1 IN X CBF 1 SELECT 1 FROM * AS X"
REP_2_1_2 = "REP 2 IN X CBF 1 SELECT 2 FROM * AS X"
REP_2_1_4 = "REP 2 IN X CBF 1 SELECT 4 FROM * AS X"
REP_2_2_2 = "REP 2 IN X CBF 2 SELECT 2 FROM * AS X"
REP_2_2_4 = "REP 2 IN X CBF 2 SELECT 4 FROM * AS X"
#
# Public means it has APE rule which allows everything for everyone
REP_1_1_1_PUBLIC = PUBLIC_WITH_POLICY(REP_1_1_1, short_name="REP 1 CBF 1 SELECT 1 (public)")
REP_2_1_2_PUBLIC = PUBLIC_WITH_POLICY(REP_2_1_2, short_name="REP 2 CBF 1 SELECT 2 (public)")
REP_2_1_4_PUBLIC = PUBLIC_WITH_POLICY(REP_2_1_4, short_name="REP 2 CBF 1 SELECT 4 (public)")
REP_2_2_2_PUBLIC = PUBLIC_WITH_POLICY(REP_2_2_2, short_name="REP 2 CBF 2 SELECT 2 (public)")
REP_2_2_4_PUBLIC = PUBLIC_WITH_POLICY(REP_2_2_4, short_name="REP 2 CBF 2 SELECT 4 (public)")
#
EVERYONE_ALLOW_ALL = ContainerRequest(policy=DEFAULT_PLACEMENT_RULE, ape_rules=APE_EVERYONE_ALLOW_ALL, short_name="Everyone_Allow_All")
OWNER_ALLOW_ALL = ContainerRequest(policy=DEFAULT_PLACEMENT_RULE, ape_rules=APE_OWNER_ALLOW_ALL, short_name="Owner_Allow_All")
PRIVATE = ContainerRequest(policy=DEFAULT_PLACEMENT_RULE, ape_rules=[], short_name="Private_No_APE")
def requires_container(container_request: None | ContainerRequest | list[ContainerRequest] = None) -> pytest.MarkDecorator:
if container_request is None:
container_request = EVERYONE_ALLOW_ALL
if not isinstance(container_request, list):
container_request = [container_request]
return pytest.mark.parametrize("container_request", container_request, indirect=True)