import functools from typing import Optional from frostfs_testlib.shell import Shell from frostfs_testlib.storage.cluster import Cluster from frostfs_testlib.storage.dataclasses import ape from frostfs_testlib.storage.dataclasses.wallet import WalletInfo from pytest_tests.helpers.object_access import ( can_delete_object, can_get_head_object, can_get_object, can_get_range_hash_of_object, can_get_range_of_object, can_put_object, can_search_object, ) ALL_OBJECT_OPERATIONS = ape.ObjectOperations.get_all() FULL_ACCESS = {op: True for op in ALL_OBJECT_OPERATIONS} NO_ACCESS = {op: False for op in ALL_OBJECT_OPERATIONS} RO_ACCESS = {op: True if op not in [ape.ObjectOperations.PUT, ape.ObjectOperations.DELETE] else False for op in ALL_OBJECT_OPERATIONS} def assert_access_to_container( access_matrix: dict[ape.ObjectOperations, bool], wallet: WalletInfo, cid: str, oid: str, file_name: str, shell: Shell, cluster: Cluster, bearer: Optional[str] = None, xhdr: Optional[dict] = None, ): endpoint = cluster.default_rpc_endpoint results: dict = {} results[ape.ObjectOperations.PUT] = can_put_object(wallet, cid, file_name, shell, cluster, bearer, xhdr) results[ape.ObjectOperations.HEAD] = can_get_head_object(wallet, cid, oid, shell, endpoint, bearer, xhdr) results[ape.ObjectOperations.GET_RANGE] = can_get_range_of_object(wallet, cid, oid, shell, endpoint, bearer, xhdr) results[ape.ObjectOperations.GET_RANGE_HASH] = can_get_range_hash_of_object(wallet, cid, oid, shell, endpoint, bearer, xhdr) results[ape.ObjectOperations.SEARCH] = can_search_object(wallet, cid, shell, endpoint, oid, bearer, xhdr) results[ape.ObjectOperations.GET] = can_get_object(wallet, cid, oid, file_name, shell, cluster, bearer, xhdr) results[ape.ObjectOperations.DELETE] = can_delete_object(wallet, cid, oid, shell, endpoint, bearer, xhdr) failed_checks = [ f"allowed {action} failed" for action, success in results.items() if not success and access_matrix[action] != results[action] ] + [f"denied {action} succeeded" for action, success in results.items() if success and access_matrix[action] != results[action]] assert not failed_checks, ", ".join(failed_checks) assert_full_access_to_container = functools.partial(assert_access_to_container, FULL_ACCESS) assert_no_access_to_container = functools.partial(assert_access_to_container, NO_ACCESS) assert_read_only_container = functools.partial(assert_access_to_container, RO_ACCESS)