from typing import Optional from frostfs_testlib import reporter from frostfs_testlib.resources.cli import CLI_DEFAULT_TIMEOUT from frostfs_testlib.resources.error_patterns import OBJECT_ACCESS_DENIED, OBJECT_NOT_FOUND from frostfs_testlib.shell import Shell from frostfs_testlib.steps.cli.object import ( delete_object, get_object_from_random_node, get_range, get_range_hash, head_object, put_object_to_random_node, search_object, ) from frostfs_testlib.storage.cluster import Cluster from frostfs_testlib.storage.dataclasses.wallet import WalletInfo from frostfs_testlib.utils import string_utils from frostfs_testlib.utils.file_utils import get_file_hash OPERATION_ERROR_TYPE = RuntimeError # TODO: Revert to just OBJECT_ACCESS_DENIED when the issue is fixed # https://git.frostfs.info/TrueCloudLab/frostfs-node/issues/1297 OBJECT_NO_ACCESS = rf"(?:{OBJECT_NOT_FOUND}|{OBJECT_ACCESS_DENIED})" def can_get_object( wallet: WalletInfo, cid: str, oid: str, file_name: str, shell: Shell, cluster: Cluster, bearer: Optional[str] = None, xhdr: Optional[dict] = None, ) -> bool: with reporter.step("Try get object from container"): try: got_file_path = get_object_from_random_node( wallet, cid, oid, bearer=bearer, xhdr=xhdr, shell=shell, cluster=cluster, ) except OPERATION_ERROR_TYPE as err: assert string_utils.is_str_match_pattern(err, OBJECT_NO_ACCESS), f"Expected {err} to match {OBJECT_NO_ACCESS}" return False assert get_file_hash(file_name) == get_file_hash(got_file_path) return True def can_put_object( wallet: WalletInfo, cid: str, file_name: str, shell: Shell, cluster: Cluster, bearer: Optional[str] = None, xhdr: Optional[dict] = None, attributes: Optional[dict] = None, ) -> bool: with reporter.step("Try put object to container"): try: put_object_to_random_node( wallet, file_name, cid, bearer=bearer, xhdr=xhdr, attributes=attributes, shell=shell, cluster=cluster, ) except OPERATION_ERROR_TYPE as err: assert string_utils.is_str_match_pattern(err, OBJECT_ACCESS_DENIED), f"Expected {err} to match {OBJECT_ACCESS_DENIED}" return False return True def can_delete_object( wallet: WalletInfo, cid: str, oid: str, shell: Shell, endpoint: str, bearer: Optional[str] = None, xhdr: Optional[dict] = None, ) -> bool: with reporter.step("Try delete object from container"): try: delete_object( wallet, cid, oid, bearer=bearer, xhdr=xhdr, shell=shell, endpoint=endpoint, ) except OPERATION_ERROR_TYPE as err: assert string_utils.is_str_match_pattern(err, OBJECT_NO_ACCESS), f"Expected {err} to match {OBJECT_NO_ACCESS}" return False return True def can_get_head_object( wallet: WalletInfo, cid: str, oid: str, shell: Shell, endpoint: str, bearer: Optional[str] = None, xhdr: Optional[dict] = None, timeout: Optional[str] = CLI_DEFAULT_TIMEOUT, ) -> bool: with reporter.step("Try get head of object"): try: head_object( wallet, cid, oid, bearer=bearer, xhdr=xhdr, shell=shell, endpoint=endpoint, timeout=timeout, ) except OPERATION_ERROR_TYPE as err: assert string_utils.is_str_match_pattern(err, OBJECT_NO_ACCESS), f"Expected {err} to match {OBJECT_NO_ACCESS}" return False return True def can_get_range_of_object( wallet: WalletInfo, cid: str, oid: str, shell: Shell, endpoint: str, bearer: Optional[str] = None, xhdr: Optional[dict] = None, timeout: Optional[str] = CLI_DEFAULT_TIMEOUT, ) -> bool: with reporter.step("Try get range of object"): try: get_range( wallet, cid, oid, bearer=bearer, range_cut="0:10", xhdr=xhdr, shell=shell, endpoint=endpoint, timeout=timeout, ) except OPERATION_ERROR_TYPE as err: assert string_utils.is_str_match_pattern(err, OBJECT_NO_ACCESS), f"Expected {err} to match {OBJECT_NO_ACCESS}" return False return True def can_get_range_hash_of_object( wallet: WalletInfo, cid: str, oid: str, shell: Shell, endpoint: str, bearer: Optional[str] = None, xhdr: Optional[dict] = None, timeout: Optional[str] = CLI_DEFAULT_TIMEOUT, ) -> bool: with reporter.step("Try get range hash of object"): try: get_range_hash( wallet, cid, oid, bearer=bearer, range_cut="0:10", xhdr=xhdr, shell=shell, endpoint=endpoint, timeout=timeout, ) except OPERATION_ERROR_TYPE as err: assert string_utils.is_str_match_pattern(err, OBJECT_NO_ACCESS), f"Expected {err} to match {OBJECT_NO_ACCESS}" return False return True def can_search_object( wallet: WalletInfo, cid: str, shell: Shell, endpoint: str, oid: Optional[str] = None, bearer: Optional[str] = None, xhdr: Optional[dict] = None, timeout: Optional[str] = CLI_DEFAULT_TIMEOUT, ) -> bool: with reporter.step("Try search object in container"): try: oids = search_object( wallet, cid, bearer=bearer, xhdr=xhdr, shell=shell, endpoint=endpoint, timeout=timeout, ) except OPERATION_ERROR_TYPE as err: assert string_utils.is_str_match_pattern(err, OBJECT_NO_ACCESS), f"Expected {err} to match {OBJECT_NO_ACCESS}" return False if oid: return oid in oids return True