import logging from dataclasses import dataclass from enum import Enum from typing import Optional from frostfs_testlib.testing.readable import HumanReadableEnum from frostfs_testlib.utils import string_utils logger = logging.getLogger("NeoLogger") EACL_LIFETIME = 100500 FROSTFS_CONTRACT_CACHE_TIMEOUT = 30 class ObjectOperations(HumanReadableEnum): PUT = "object.put" PATCH = "object.patch" GET = "object.get" HEAD = "object.head" GET_RANGE = "object.range" GET_RANGE_HASH = "object.hash" SEARCH = "object.search" DELETE = "object.delete" WILDCARD_ALL = "object.*" @staticmethod def get_all(): return [op for op in ObjectOperations if op != ObjectOperations.WILDCARD_ALL] class ContainerOperations(HumanReadableEnum): PUT = "container.put" GET = "container.get" LIST = "container.list" DELETE = "container.delete" WILDCARD_ALL = "container.*" @staticmethod def get_all(): return [op for op in ObjectOperations if op != ObjectOperations.WILDCARD_ALL] @dataclass class Operations: GET_CONTAINER = "GetContainer" PUT_CONTAINER = "PutContainer" DELETE_CONTAINER = "DeleteContainer" LIST_CONTAINER = "ListContainers" GET_OBJECT = "GetObject" DELETE_OBJECT = "DeleteObject" HASH_OBJECT = "HashObject" RANGE_OBJECT = "RangeObject" SEARCH_OBJECT = "SearchObject" HEAD_OBJECT = "HeadObject" PUT_OBJECT = "PutObject" class Verb(HumanReadableEnum): ALLOW = "allow" DENY = "deny" class Role(HumanReadableEnum): OWNER = "owner" IR = "ir" CONTAINER = "container" OTHERS = "others" class ConditionType(HumanReadableEnum): RESOURCE = "ResourceCondition" REQUEST = "RequestCondition" # See https://git.frostfs.info/TrueCloudLab/policy-engine/src/branch/master/schema/native/consts.go#L40-L53 class ConditionKey(HumanReadableEnum): ROLE = '"\\$Actor:role"' PUBLIC_KEY = '"\\$Actor:publicKey"' OBJECT_TYPE = '"\\$Object:objectType"' OBJECT_ID = '"\\$Object:objectID"' class MatchType(HumanReadableEnum): EQUAL = "=" NOT_EQUAL = "!=" @dataclass class Condition: condition_key: ConditionKey | str condition_value: str condition_type: ConditionType = ConditionType.REQUEST match_type: MatchType = MatchType.EQUAL def as_string(self): key = self.condition_key.value if isinstance(self.condition_key, ConditionKey) else self.condition_key value = self.condition_value.value if isinstance(self.condition_value, Enum) else self.condition_value return f"{self.condition_type.value}:{key}{self.match_type.value}{value}" @staticmethod def by_role(*args, **kwargs) -> "Condition": return Condition(ConditionKey.ROLE, *args, **kwargs) @staticmethod def by_key(*args, **kwargs) -> "Condition": return Condition(ConditionKey.PUBLIC_KEY, *args, **kwargs) @staticmethod def by_object_type(*args, **kwargs) -> "Condition": return Condition(ConditionKey.OBJECT_TYPE, *args, **kwargs) @staticmethod def by_object_id(*args, **kwargs) -> "Condition": return Condition(ConditionKey.OBJECT_ID, *args, **kwargs) class Rule: def __init__( self, access: Verb, operations: list[ObjectOperations] | ObjectOperations, conditions: list[Condition] | Condition = None, chain_id: Optional[str] = None, ) -> None: self.access = access self.operations = operations if not conditions: self.conditions = [] elif isinstance(conditions, Condition): self.conditions = [conditions] else: self.conditions = conditions if not isinstance(self.conditions, list): raise RuntimeError("Conditions must be a list") if not operations: self.operations = [] elif isinstance(operations, (ObjectOperations, ContainerOperations)): self.operations = [operations] else: self.operations = operations if not isinstance(self.operations, list): raise RuntimeError("Operations must be a list") self.chain_id = chain_id if chain_id else string_utils.unique_name("chain-id-") def as_string(self): conditions = " ".join([cond.as_string() for cond in self.conditions]) operations = " ".join([op.value for op in self.operations]) return f"{self.access.value} {operations} {conditions} *"