Move shared code to testlib

Signed-off-by: Andrey Berezin <a.berezin@yadro.com>
This commit is contained in:
Andrey Berezin 2023-05-14 13:43:59 +03:00
parent d97a02d1d3
commit 997e768e92
69 changed files with 9213 additions and 64 deletions

View file

@ -0,0 +1,103 @@
import logging
from dataclasses import dataclass
from enum import Enum
from typing import Any, Dict, List, Optional, Union
from frostfs_testlib.utils import wallet_utils
logger = logging.getLogger("NeoLogger")
EACL_LIFETIME = 100500
FROSTFS_CONTRACT_CACHE_TIMEOUT = 30
class EACLOperation(Enum):
PUT = "put"
GET = "get"
HEAD = "head"
GET_RANGE = "getrange"
GET_RANGE_HASH = "getrangehash"
SEARCH = "search"
DELETE = "delete"
class EACLAccess(Enum):
ALLOW = "allow"
DENY = "deny"
class EACLRole(Enum):
OTHERS = "others"
USER = "user"
SYSTEM = "system"
class EACLHeaderType(Enum):
REQUEST = "req" # Filter request headers
OBJECT = "obj" # Filter object headers
SERVICE = "SERVICE" # Filter service headers. These are not processed by FrostFS nodes and exist for service use only
class EACLMatchType(Enum):
STRING_EQUAL = "=" # Return true if strings are equal
STRING_NOT_EQUAL = "!=" # Return true if strings are different
@dataclass
class EACLFilter:
header_type: EACLHeaderType = EACLHeaderType.REQUEST
match_type: EACLMatchType = EACLMatchType.STRING_EQUAL
key: Optional[str] = None
value: Optional[str] = None
def to_dict(self) -> Dict[str, Any]:
return {
"headerType": self.header_type,
"matchType": self.match_type,
"key": self.key,
"value": self.value,
}
@dataclass
class EACLFilters:
filters: Optional[List[EACLFilter]] = None
def __str__(self):
return ",".join(
[
f"{filter.header_type.value}:"
f"{filter.key}{filter.match_type.value}{filter.value}"
for filter in self.filters
]
if self.filters
else []
)
@dataclass
class EACLPubKey:
keys: Optional[List[str]] = None
@dataclass
class EACLRule:
operation: Optional[EACLOperation] = None
access: Optional[EACLAccess] = None
role: Optional[Union[EACLRole, str]] = None
filters: Optional[EACLFilters] = None
def to_dict(self) -> Dict[str, Any]:
return {
"Operation": self.operation,
"Access": self.access,
"Role": self.role,
"Filters": self.filters or [],
}
def __str__(self):
role = (
self.role.value
if isinstance(self.role, EACLRole)
else f'pubkey:{wallet_utils.get_wallet_public_key(self.role, "")}'
)
return f'{self.access.value} {self.operation.value} {self.filters or ""} {role}'