import json import logging import os import re import uuid from typing import Any, Optional from frostfs_testlib import reporter, utils from frostfs_testlib.cli.frostfs_cli.cli import FrostfsCli from frostfs_testlib.resources.cli import CLI_DEFAULT_TIMEOUT from frostfs_testlib.resources.common import ASSETS_DIR from frostfs_testlib.shell.interfaces import CommandResult from frostfs_testlib.storage.cluster import Cluster, ClusterNode from frostfs_testlib.storage.grpc_operations import interfaces from frostfs_testlib.storage.grpc_operations.implementations.chunks import ChunksOperations from frostfs_testlib.testing.test_control import wait_for_success from frostfs_testlib.utils import cli_utils, file_utils logger = logging.getLogger("NeoLogger") class ObjectOperations(interfaces.ObjectInterface): def __init__(self, cli: FrostfsCli) -> None: self.cli = cli self.chunks: interfaces.ChunksInterface = ChunksOperations(self.cli) @reporter.step("Delete object") def delete( self, cid: str, oid: str, endpoint: str, bearer: str = "", xhdr: Optional[dict] = None, session: Optional[str] = None, timeout: Optional[str] = CLI_DEFAULT_TIMEOUT, ) -> str: """ DELETE an Object. Args: cid: ID of Container where we get the Object from oid: ID of Object we are going to delete bearer: path to Bearer Token file, appends to `--bearer` key endpoint: FrostFS endpoint to send request to, appends to `--rpc-endpoint` key xhdr: Request X-Headers in form of Key=Value session: path to a JSON-encoded container session token timeout: Timeout for the operation. Returns: (str): Tombstone ID """ result = self.cli.object.delete( rpc_endpoint=endpoint, cid=cid, oid=oid, bearer=bearer, xhdr=xhdr, session=session, timeout=timeout, ) id_str = result.stdout.split("\n")[1] tombstone = id_str.split(":")[1] return tombstone.strip() @reporter.step("Get object") def get( self, cid: str, oid: str, endpoint: str, bearer: Optional[str] = None, write_object: Optional[str] = None, xhdr: Optional[dict] = None, no_progress: bool = True, session: Optional[str] = None, timeout: Optional[str] = CLI_DEFAULT_TIMEOUT, ) -> file_utils.TestFile: """ GET from FrostFS. Args: cid (str): ID of Container where we get the Object from oid (str): Object ID bearer: path to Bearer Token file, appends to `--bearer` key write_object: path to downloaded file, appends to `--file` key endpoint: FrostFS endpoint to send request to, appends to `--rpc-endpoint` key no_progress(optional, bool): do not show progress bar xhdr (optional, dict): Request X-Headers in form of Key=Value session (optional, dict): path to a JSON-encoded container session token timeout: Timeout for the operation. Returns: (str): path to downloaded file """ if not write_object: write_object = str(uuid.uuid4()) test_file = file_utils.TestFile(os.path.join(ASSETS_DIR, write_object)) self.cli.object.get( rpc_endpoint=endpoint, cid=cid, oid=oid, file=test_file, bearer=bearer, no_progress=no_progress, xhdr=xhdr, session=session, timeout=timeout, ) return test_file @reporter.step("Get object from random node") def get_from_random_node( self, cid: str, oid: str, cluster: Cluster, bearer: Optional[str] = None, write_object: Optional[str] = None, xhdr: Optional[dict] = None, no_progress: bool = True, session: Optional[str] = None, timeout: Optional[str] = CLI_DEFAULT_TIMEOUT, ) -> str: """ GET from FrostFS random storage node Args: cid: ID of Container where we get the Object from oid: Object ID cluster: cluster object bearer (optional, str): path to Bearer Token file, appends to `--bearer` key write_object (optional, str): path to downloaded file, appends to `--file` key no_progress(optional, bool): do not show progress bar xhdr (optional, dict): Request X-Headers in form of Key=Value session (optional, dict): path to a JSON-encoded container session token timeout: Timeout for the operation. Returns: (str): path to downloaded file """ endpoint = cluster.get_random_storage_rpc_endpoint() return self.get( cid, oid, endpoint, bearer, write_object, xhdr, no_progress, session, timeout, ) @reporter.step("Get hash object") def hash( self, rpc_endpoint: str, cid: str, oid: str, address: Optional[str] = None, bearer: Optional[str] = None, generate_key: Optional[bool] = None, range: Optional[str] = None, salt: Optional[str] = None, ttl: Optional[int] = None, session: Optional[str] = None, hash_type: Optional[str] = None, xhdr: Optional[dict] = None, timeout: Optional[str] = CLI_DEFAULT_TIMEOUT, ) -> str: """ Get object hash. Args: address: Address of wallet account. bearer: File with signed JSON or binary encoded bearer token. cid: Container ID. generate_key: Generate new private key. oid: Object ID. range: Range to take hash from in the form offset1:length1,... rpc_endpoint: Remote node address (as 'multiaddr' or ':'). salt: Salt in hex format. ttl: TTL value in request meta header (default 2). session: Filepath to a JSON- or binary-encoded token of the object RANGEHASH session. hash_type: Hash type. Either 'sha256' or 'tz' (default "sha256"). wallet: WIF (NEP-2) string or path to the wallet or binary key. xhdr: Dict with request X-Headers. timeout: Timeout for the operation (default 15s). Returns: Command's result. """ result = self.cli.object.hash( rpc_endpoint=rpc_endpoint, cid=cid, oid=oid, address=address, bearer=bearer, generate_key=generate_key, range=range, salt=salt, ttl=ttl, xhdr=xhdr, session=session, hash_type=hash_type, timeout=timeout, ) return result.stdout @reporter.step("Head object") def head( self, cid: str, oid: str, endpoint: str, bearer: str = "", xhdr: Optional[dict] = None, json_output: bool = True, is_raw: bool = False, is_direct: bool = False, session: Optional[str] = None, timeout: Optional[str] = CLI_DEFAULT_TIMEOUT, ) -> CommandResult | Any: """ HEAD an Object. Args: cid (str): ID of Container where we get the Object from oid (str): ObjectID to HEAD bearer (optional, str): path to Bearer Token file, appends to `--bearer` key endpoint(optional, str): FrostFS endpoint to send request to json_output(optional, bool): return response in JSON format or not; this flag turns into `--json` key is_raw(optional, bool): send "raw" request or not; this flag turns into `--raw` key is_direct(optional, bool): send request directly to the node or not; this flag turns into `--ttl 1` key xhdr (optional, dict): Request X-Headers in form of Key=Value session (optional, dict): path to a JSON-encoded container session token timeout: Timeout for the operation. Returns: depending on the `json_output` parameter value, the function returns (dict): HEAD response in JSON format or (str): HEAD response as a plain text """ result = self.cli.object.head( rpc_endpoint=endpoint, cid=cid, oid=oid, bearer=bearer, json_mode=json_output, raw=is_raw, ttl=1 if is_direct else None, xhdr=xhdr, session=session, timeout=timeout, ) if not json_output: return result try: decoded = json.loads(result.stdout) except Exception as exc: # If we failed to parse output as JSON, the cause might be # the plain text string in the beginning of the output. # Here we cut off first string and try to parse again. logger.info(f"failed to parse output: {exc}") logger.info("parsing output in another way") fst_line_idx = result.stdout.find("\n") decoded = json.loads(result.stdout[fst_line_idx:]) # if response if "chunks" in decoded.keys(): logger.info("decoding ec chunks") return decoded["chunks"] # If response is Complex Object header, it has `splitId` key if "splitId" in decoded.keys(): logger.info("decoding split header") return utils.json_utils.decode_split_header(decoded) # If response is Last or Linking Object header, # it has `header` dictionary and non-null `split` dictionary if "split" in decoded["header"].keys(): if decoded["header"]["split"]: logger.info("decoding linking object") return utils.json_utils.decode_linking_object(decoded) if decoded["header"]["objectType"] == "STORAGE_GROUP": logger.info("decoding storage group") return utils.json_utils.decode_storage_group(decoded) if decoded["header"]["objectType"] == "TOMBSTONE": logger.info("decoding tombstone") return utils.json_utils.decode_tombstone(decoded) logger.info("decoding simple header") return utils.json_utils.decode_simple_header(decoded) @reporter.step("Lock Object") def lock( self, cid: str, oid: str, endpoint: str, lifetime: Optional[int] = None, expire_at: Optional[int] = None, address: Optional[str] = None, bearer: Optional[str] = None, session: Optional[str] = None, ttl: Optional[int] = None, xhdr: Optional[dict] = None, timeout: Optional[str] = CLI_DEFAULT_TIMEOUT, ) -> str: """ Locks object in container. Args: address: Address of wallet account. bearer: File with signed JSON or binary encoded bearer token. cid: Container ID. oid: Object ID. lifetime: Lock lifetime. expire_at: Lock expiration epoch. shell: executor for cli command endpoint: FrostFS endpoint to send request to, appends to `--rpc-endpoint` key session: Path to a JSON-encoded container session token. ttl: TTL value in request meta header (default 2). wallet: WIF (NEP-2) string or path to the wallet or binary key. xhdr: Dict with request X-Headers. timeout: Timeout for the operation. Returns: Lock object ID """ result = self.cli.object.lock( rpc_endpoint=endpoint, lifetime=lifetime, expire_at=expire_at, address=address, cid=cid, oid=oid, bearer=bearer, xhdr=xhdr, session=session, ttl=ttl, timeout=timeout, ) # Splitting CLI output to separate lines and taking the penultimate line id_str = result.stdout.strip().split("\n")[0] oid = id_str.split(":")[1] return oid.strip() @reporter.step("Put object") def put( self, path: str, cid: str, endpoint: str, bearer: Optional[str] = None, copies_number: Optional[int] = None, attributes: Optional[dict] = None, xhdr: Optional[dict] = None, expire_at: Optional[int] = None, no_progress: bool = True, session: Optional[str] = None, timeout: Optional[str] = CLI_DEFAULT_TIMEOUT, ) -> str: """ PUT of given file. Args: path: path to file to be PUT cid: ID of Container where we get the Object from bearer: path to Bearer Token file, appends to `--bearer` key copies_number: Number of copies of the object to store within the RPC call attributes: User attributes in form of Key1=Value1,Key2=Value2 endpoint: FrostFS endpoint to send request to, appends to `--rpc-endpoint` key no_progress: do not show progress bar expire_at: Last epoch in the life of the object xhdr: Request X-Headers in form of Key=Value session: path to a JSON-encoded container session token timeout: Timeout for the operation. Returns: (str): ID of uploaded Object """ result = self.cli.object.put( rpc_endpoint=endpoint, file=path, cid=cid, attributes=attributes, bearer=bearer, copies_number=copies_number, expire_at=expire_at, no_progress=no_progress, xhdr=xhdr, session=session, timeout=timeout, ) # Splitting CLI output to separate lines and taking the penultimate line id_str = result.stdout.strip().split("\n")[-2] oid = id_str.split(":")[1] return oid.strip() @reporter.step("Put object to random node") def put_to_random_node( self, path: str, cid: str, cluster: Cluster, bearer: Optional[str] = None, copies_number: Optional[int] = None, attributes: Optional[dict] = None, xhdr: Optional[dict] = None, expire_at: Optional[int] = None, no_progress: bool = True, session: Optional[str] = None, timeout: Optional[str] = CLI_DEFAULT_TIMEOUT, ) -> str: """ PUT of given file to a random storage node. Args: path: path to file to be PUT cid: ID of Container where we get the Object from cluster: cluster under test bearer: path to Bearer Token file, appends to `--bearer` key copies_number: Number of copies of the object to store within the RPC call attributes: User attributes in form of Key1=Value1,Key2=Value2 cluster: cluster under test no_progress: do not show progress bar expire_at: Last epoch in the life of the object xhdr: Request X-Headers in form of Key=Value session: path to a JSON-encoded container session token timeout: Timeout for the operation. Returns: ID of uploaded Object """ endpoint = cluster.get_random_storage_rpc_endpoint() return self.put( path, cid, endpoint, bearer, copies_number, attributes, xhdr, expire_at, no_progress, session, timeout=timeout, ) @reporter.step("Get Range") def range( self, cid: str, oid: str, range_cut: str, endpoint: str, bearer: str = "", xhdr: Optional[dict] = None, session: Optional[str] = None, timeout: Optional[str] = CLI_DEFAULT_TIMEOUT, ) -> tuple[file_utils.TestFile, bytes]: """ GETRANGE an Object. Args: wallet: wallet on whose behalf GETRANGE is done cid: ID of Container where we get the Object from oid: ID of Object we are going to request range_cut: range to take data from in the form offset:length shell: executor for cli command endpoint: FrostFS endpoint to send request to, appends to `--rpc-endpoint` key bearer: path to Bearer Token file, appends to `--bearer` key xhdr: Request X-Headers in form of Key=Value session: path to a JSON-encoded container session token timeout: Timeout for the operation. Returns: (str, bytes) - path to the file with range content and content of this file as bytes """ test_file = file_utils.TestFile(os.path.join(ASSETS_DIR, str(uuid.uuid4()))) self.cli.object.range( rpc_endpoint=endpoint, cid=cid, oid=oid, range=range_cut, file=test_file, bearer=bearer, xhdr=xhdr, session=session, timeout=timeout, ) with open(test_file, "rb") as file: content = file.read() return test_file, content @reporter.step("Search object") def search( self, cid: str, endpoint: str, bearer: str = "", oid: Optional[str] = None, filters: Optional[dict] = None, expected_objects_list: Optional[list] = None, xhdr: Optional[dict] = None, session: Optional[str] = None, phy: bool = False, root: bool = False, timeout: Optional[str] = CLI_DEFAULT_TIMEOUT, address: Optional[str] = None, generate_key: Optional[bool] = None, ttl: Optional[int] = None, ) -> list: """ SEARCH an Object. Args: wallet: wallet on whose behalf SEARCH is done cid: ID of Container where we get the Object from shell: executor for cli command bearer: path to Bearer Token file, appends to `--bearer` key endpoint: FrostFS endpoint to send request to, appends to `--rpc-endpoint` key filters: key=value pairs to filter Objects expected_objects_list: a list of ObjectIDs to compare found Objects with xhdr: Request X-Headers in form of Key=Value session: path to a JSON-encoded container session token phy: Search physically stored objects. root: Search for user objects. timeout: Timeout for the operation. Returns: list of found ObjectIDs """ result = self.cli.object.search( rpc_endpoint=endpoint, cid=cid, bearer=bearer, oid=oid, xhdr=xhdr, filters=[f"{filter_key} EQ {filter_val}" for filter_key, filter_val in filters.items()] if filters else None, session=session, phy=phy, root=root, address=address, generate_key=generate_key, ttl=ttl, timeout=timeout, ) found_objects = re.findall(r"(\w{43,44})", result.stdout) if expected_objects_list: if sorted(found_objects) == sorted(expected_objects_list): logger.info(f"Found objects list '{found_objects}' " f"is equal for expected list '{expected_objects_list}'") else: logger.warning(f"Found object list {found_objects} " f"is not equal to expected list '{expected_objects_list}'") return found_objects @wait_for_success() @reporter.step("Search object nodes") def nodes( self, cluster: Cluster, cid: str, oid: str, alive_node: ClusterNode, bearer: str = "", xhdr: Optional[dict] = None, is_direct: bool = False, verify_presence_all: bool = False, timeout: Optional[str] = CLI_DEFAULT_TIMEOUT, ) -> list[ClusterNode]: endpoint = alive_node.storage_node.get_rpc_endpoint() response = self.cli.object.nodes( rpc_endpoint=endpoint, cid=cid, oid=oid, bearer=bearer, ttl=1 if is_direct else None, json=True, xhdr=xhdr, timeout=timeout, verify_presence_all=verify_presence_all, ) response_json = json.loads(response.stdout) # Currently, the command will show expected and confirmed nodes. # And we (currently) count only nodes which are both expected and confirmed object_nodes_id = { required_node for data_object in response_json["data_objects"] for required_node in data_object["required_nodes"] if required_node in data_object["confirmed_nodes"] } netmap_nodes_list = cli_utils.parse_netmap_output( self.cli.netmap.snapshot( rpc_endpoint=endpoint, ).stdout ) netmap_nodes = [ netmap_node for object_node in object_nodes_id for netmap_node in netmap_nodes_list if object_node == netmap_node.node_id ] object_nodes = [ cluster_node for netmap_node in netmap_nodes for cluster_node in cluster.cluster_nodes if netmap_node.node == cluster_node.host_ip ] return object_nodes