from typing import Optional from frostfs_testlib import reporter from frostfs_testlib.resources.cli import CLI_DEFAULT_TIMEOUT from frostfs_testlib.resources.error_patterns import OBJECT_ACCESS_DENIED from frostfs_testlib.shell import Shell from frostfs_testlib.steps.cli.object import ( delete_object, get_object_from_random_node, get_range, get_range_hash, head_object, put_object_to_random_node, search_object, ) from frostfs_testlib.storage.cluster import Cluster from frostfs_testlib.utils import string_utils from frostfs_testlib.utils.file_utils import get_file_hash OPERATION_ERROR_TYPE = RuntimeError def can_get_object( wallet: str, cid: str, oid: str, file_name: str, shell: Shell, cluster: Cluster, bearer: Optional[str] = None, wallet_config: Optional[str] = None, xhdr: Optional[dict] = None, ) -> bool: with reporter.step("Try get object from container"): try: got_file_path = get_object_from_random_node( wallet, cid, oid, bearer=bearer, wallet_config=wallet_config, xhdr=xhdr, shell=shell, cluster=cluster, ) except OPERATION_ERROR_TYPE as err: assert string_utils.is_str_match_pattern( err, OBJECT_ACCESS_DENIED ), f"Expected {err} to match {OBJECT_ACCESS_DENIED}" return False assert get_file_hash(file_name) == get_file_hash(got_file_path) return True def can_put_object( wallet: str, cid: str, file_name: str, shell: Shell, cluster: Cluster, bearer: Optional[str] = None, wallet_config: Optional[str] = None, xhdr: Optional[dict] = None, attributes: Optional[dict] = None, ) -> bool: with reporter.step("Try put object to container"): try: put_object_to_random_node( wallet, file_name, cid, bearer=bearer, wallet_config=wallet_config, xhdr=xhdr, attributes=attributes, shell=shell, cluster=cluster, ) except OPERATION_ERROR_TYPE as err: assert string_utils.is_str_match_pattern( err, OBJECT_ACCESS_DENIED ), f"Expected {err} to match {OBJECT_ACCESS_DENIED}" return False return True def can_delete_object( wallet: str, cid: str, oid: str, shell: Shell, endpoint: str, bearer: Optional[str] = None, wallet_config: Optional[str] = None, xhdr: Optional[dict] = None, ) -> bool: with reporter.step("Try delete object from container"): try: delete_object( wallet, cid, oid, bearer=bearer, wallet_config=wallet_config, xhdr=xhdr, shell=shell, endpoint=endpoint, ) except OPERATION_ERROR_TYPE as err: assert string_utils.is_str_match_pattern( err, OBJECT_ACCESS_DENIED ), f"Expected {err} to match {OBJECT_ACCESS_DENIED}" return False return True def can_get_head_object( wallet: str, cid: str, oid: str, shell: Shell, endpoint: str, bearer: Optional[str] = None, wallet_config: Optional[str] = None, xhdr: Optional[dict] = None, timeout: Optional[str] = CLI_DEFAULT_TIMEOUT, ) -> bool: with reporter.step("Try get head of object"): try: head_object( wallet, cid, oid, bearer=bearer, wallet_config=wallet_config, xhdr=xhdr, shell=shell, endpoint=endpoint, timeout=timeout, ) except OPERATION_ERROR_TYPE as err: assert string_utils.is_str_match_pattern( err, OBJECT_ACCESS_DENIED ), f"Expected {err} to match {OBJECT_ACCESS_DENIED}" return False return True def can_get_range_of_object( wallet: str, cid: str, oid: str, shell: Shell, endpoint: str, bearer: Optional[str] = None, wallet_config: Optional[str] = None, xhdr: Optional[dict] = None, timeout: Optional[str] = CLI_DEFAULT_TIMEOUT, ) -> bool: with reporter.step("Try get range of object"): try: get_range( wallet, cid, oid, bearer=bearer, range_cut="0:10", wallet_config=wallet_config, xhdr=xhdr, shell=shell, endpoint=endpoint, timeout=timeout, ) except OPERATION_ERROR_TYPE as err: assert string_utils.is_str_match_pattern( err, OBJECT_ACCESS_DENIED ), f"Expected {err} to match {OBJECT_ACCESS_DENIED}" return False return True def can_get_range_hash_of_object( wallet: str, cid: str, oid: str, shell: Shell, endpoint: str, bearer: Optional[str] = None, wallet_config: Optional[str] = None, xhdr: Optional[dict] = None, timeout: Optional[str] = CLI_DEFAULT_TIMEOUT, ) -> bool: with reporter.step("Try get range hash of object"): try: get_range_hash( wallet, cid, oid, bearer=bearer, range_cut="0:10", wallet_config=wallet_config, xhdr=xhdr, shell=shell, endpoint=endpoint, timeout=timeout, ) except OPERATION_ERROR_TYPE as err: assert string_utils.is_str_match_pattern( err, OBJECT_ACCESS_DENIED ), f"Expected {err} to match {OBJECT_ACCESS_DENIED}" return False return True def can_search_object( wallet: str, cid: str, shell: Shell, endpoint: str, oid: Optional[str] = None, bearer: Optional[str] = None, wallet_config: Optional[str] = None, xhdr: Optional[dict] = None, timeout: Optional[str] = CLI_DEFAULT_TIMEOUT, ) -> bool: with reporter.step("Try search object in container"): try: oids = search_object( wallet, cid, bearer=bearer, wallet_config=wallet_config, xhdr=xhdr, shell=shell, endpoint=endpoint, timeout=timeout, ) except OPERATION_ERROR_TYPE as err: assert string_utils.is_str_match_pattern( err, OBJECT_ACCESS_DENIED ), f"Expected {err} to match {OBJECT_ACCESS_DENIED}" return False if oid: return oid in oids return True