57 lines
2.5 KiB
Python
57 lines
2.5 KiB
Python
import functools
|
|
from typing import Optional
|
|
|
|
from frostfs_testlib.shell import Shell
|
|
from frostfs_testlib.storage.cluster import Cluster
|
|
from frostfs_testlib.storage.dataclasses import ape
|
|
from frostfs_testlib.storage.dataclasses.wallet import WalletInfo
|
|
|
|
from pytest_tests.helpers.object_access import (
|
|
can_delete_object,
|
|
can_get_head_object,
|
|
can_get_object,
|
|
can_get_range_hash_of_object,
|
|
can_get_range_of_object,
|
|
can_put_object,
|
|
can_search_object,
|
|
)
|
|
|
|
ALL_OBJECT_OPERATIONS = ape.ObjectOperations.get_all()
|
|
|
|
FULL_ACCESS = {op: True for op in ALL_OBJECT_OPERATIONS}
|
|
NO_ACCESS = {op: False for op in ALL_OBJECT_OPERATIONS}
|
|
RO_ACCESS = {op: True if op not in [ape.ObjectOperations.PUT, ape.ObjectOperations.DELETE] else False for op in ALL_OBJECT_OPERATIONS}
|
|
|
|
|
|
def assert_access_to_container(
|
|
access_matrix: dict[ape.ObjectOperations, bool],
|
|
wallet: WalletInfo,
|
|
cid: str,
|
|
oid: str,
|
|
file_name: str,
|
|
shell: Shell,
|
|
cluster: Cluster,
|
|
bearer: Optional[str] = None,
|
|
xhdr: Optional[dict] = None,
|
|
):
|
|
endpoint = cluster.default_rpc_endpoint
|
|
results: dict = {}
|
|
|
|
results[ape.ObjectOperations.PUT] = can_put_object(wallet, cid, file_name, shell, cluster, bearer, xhdr)
|
|
results[ape.ObjectOperations.HEAD] = can_get_head_object(wallet, cid, oid, shell, endpoint, bearer, xhdr)
|
|
results[ape.ObjectOperations.GET_RANGE] = can_get_range_of_object(wallet, cid, oid, shell, endpoint, bearer, xhdr)
|
|
results[ape.ObjectOperations.GET_RANGE_HASH] = can_get_range_hash_of_object(wallet, cid, oid, shell, endpoint, bearer, xhdr)
|
|
results[ape.ObjectOperations.SEARCH] = can_search_object(wallet, cid, shell, endpoint, oid, bearer, xhdr)
|
|
results[ape.ObjectOperations.GET] = can_get_object(wallet, cid, oid, file_name, shell, cluster, bearer, xhdr)
|
|
results[ape.ObjectOperations.DELETE] = can_delete_object(wallet, cid, oid, shell, endpoint, bearer, xhdr)
|
|
|
|
failed_checks = [
|
|
f"allowed {action} failed" for action, success in results.items() if not success and access_matrix[action] != results[action]
|
|
] + [f"denied {action} succeeded" for action, success in results.items() if success and access_matrix[action] != results[action]]
|
|
|
|
assert not failed_checks, ", ".join(failed_checks)
|
|
|
|
|
|
assert_full_access_to_container = functools.partial(assert_access_to_container, FULL_ACCESS)
|
|
assert_no_access_to_container = functools.partial(assert_access_to_container, NO_ACCESS)
|
|
assert_read_only_container = functools.partial(assert_access_to_container, RO_ACCESS)
|