import json import logging import re from dataclasses import dataclass from time import sleep from typing import Optional, Union from frostfs_testlib import reporter from frostfs_testlib.cli import FrostfsCli from frostfs_testlib.plugins import load_plugin from frostfs_testlib.resources.cli import CLI_DEFAULT_TIMEOUT, FROSTFS_CLI_EXEC from frostfs_testlib.s3.interfaces import BucketContainerResolver from frostfs_testlib.shell import Shell from frostfs_testlib.steps.cli.object import put_object, put_object_to_random_node from frostfs_testlib.storage.cluster import Cluster, ClusterNode from frostfs_testlib.storage.dataclasses.storage_object_info import StorageObjectInfo from frostfs_testlib.storage.dataclasses.wallet import WalletInfo from frostfs_testlib.utils import json_utils from frostfs_testlib.utils.file_utils import generate_file, get_file_hash logger = logging.getLogger("NeoLogger") @dataclass class StorageContainerInfo: id: str wallet: WalletInfo class StorageContainer: def __init__( self, storage_container_info: StorageContainerInfo, shell: Shell, cluster: Cluster, ) -> None: self.shell = shell self.storage_container_info = storage_container_info self.cluster = cluster def get_id(self) -> str: return self.storage_container_info.id def get_wallet(self) -> str: return self.storage_container_info.wallet @reporter.step("Generate new object and put in container") def generate_object( self, size: int, expire_at: Optional[int] = None, bearer_token: Optional[str] = None, endpoint: Optional[str] = None, ) -> StorageObjectInfo: with reporter.step(f"Generate object with size {size}"): file_path = generate_file(size) file_hash = get_file_hash(file_path) container_id = self.get_id() wallet = self.get_wallet() with reporter.step(f"Put object with size {size} to container {container_id}"): if endpoint: object_id = put_object( wallet=wallet, path=file_path, cid=container_id, expire_at=expire_at, shell=self.shell, endpoint=endpoint, bearer=bearer_token, ) else: object_id = put_object_to_random_node( wallet=wallet, path=file_path, cid=container_id, expire_at=expire_at, shell=self.shell, cluster=self.cluster, bearer=bearer_token, ) storage_object = StorageObjectInfo( container_id, object_id, size=size, wallet=wallet, file_path=file_path, file_hash=file_hash, ) return storage_object DEFAULT_PLACEMENT_RULE = "REP 2 IN X CBF 1 SELECT 4 FROM * AS X" SINGLE_PLACEMENT_RULE = "REP 1 IN X CBF 1 SELECT 4 FROM * AS X" REP_2_FOR_3_NODES_PLACEMENT_RULE = "REP 2 IN X CBF 1 SELECT 3 FROM * AS X" DEFAULT_EC_PLACEMENT_RULE = "EC 3.1" @reporter.step("Create Container") def create_container( wallet: WalletInfo, shell: Shell, endpoint: str, rule: str = DEFAULT_PLACEMENT_RULE, basic_acl: str = "", attributes: Optional[dict] = None, session_token: str = "", name: Optional[str] = None, options: Optional[dict] = None, await_mode: bool = True, wait_for_creation: bool = True, timeout: Optional[str] = CLI_DEFAULT_TIMEOUT, ) -> str: """ A wrapper for `frostfs-cli container create` call. Args: wallet (WalletInfo): a wallet on whose behalf a container is created rule (optional, str): placement rule for container basic_acl (optional, str): an ACL for container, will be appended to `--basic-acl` key attributes (optional, dict): container attributes , will be appended to `--attributes` key session_token (optional, str): a path to session token file session_wallet(optional, str): a path to the wallet which signed the session token; this parameter makes sense when paired with `session_token` shell: executor for cli command endpoint: FrostFS endpoint to send request to, appends to `--rpc-endpoint` key options (optional, dict): any other options to pass to the call name (optional, str): container name attribute await_mode (bool): block execution until container is persisted wait_for_creation (): Wait for container shows in container list timeout: Timeout for the operation. Returns: (str): CID of the created container """ cli = FrostfsCli(shell, FROSTFS_CLI_EXEC, wallet.config_path) result = cli.container.create( rpc_endpoint=endpoint, policy=rule, basic_acl=basic_acl, attributes=attributes, name=name, session=session_token, await_mode=await_mode, timeout=timeout, **options or {}, ) cid = _parse_cid(result.stdout) logger.info("Container created; waiting until it is persisted in the sidechain") if wait_for_creation: wait_for_container_creation(wallet, cid, shell, endpoint) return cid def wait_for_container_creation(wallet: WalletInfo, cid: str, shell: Shell, endpoint: str, attempts: int = 15, sleep_interval: int = 1): for _ in range(attempts): containers = list_containers(wallet, shell, endpoint) if cid in containers: return logger.info(f"There is no {cid} in {containers} yet; sleep {sleep_interval} and continue") sleep(sleep_interval) raise RuntimeError(f"After {attempts * sleep_interval} seconds container {cid} hasn't been persisted; exiting") def wait_for_container_deletion(wallet: WalletInfo, cid: str, shell: Shell, endpoint: str, attempts: int = 30, sleep_interval: int = 1): for _ in range(attempts): try: get_container(wallet, cid, shell=shell, endpoint=endpoint) sleep(sleep_interval) continue except Exception as err: if "container not found" not in str(err): raise AssertionError(f'Expected "container not found" in error, got\n{err}') return raise AssertionError(f"Expected container deleted during {attempts * sleep_interval} sec.") @reporter.step("List Containers") def list_containers(wallet: WalletInfo, shell: Shell, endpoint: str, timeout: Optional[str] = CLI_DEFAULT_TIMEOUT) -> list[str]: """ A wrapper for `frostfs-cli container list` call. It returns all the available containers for the given wallet. Args: wallet (WalletInfo): a wallet on whose behalf we list the containers shell: executor for cli command endpoint: FrostFS endpoint to send request to, appends to `--rpc-endpoint` key timeout: Timeout for the operation. Returns: (list): list of containers """ cli = FrostfsCli(shell, FROSTFS_CLI_EXEC, wallet.config_path) result = cli.container.list(rpc_endpoint=endpoint, timeout=timeout) return result.stdout.split() @reporter.step("List Objects in container") def list_objects( wallet: WalletInfo, shell: Shell, container_id: str, endpoint: str, timeout: Optional[str] = CLI_DEFAULT_TIMEOUT, ) -> list[str]: """ A wrapper for `frostfs-cli container list-objects` call. It returns all the available objects in container. Args: wallet (WalletInfo): a wallet on whose behalf we list the containers objects shell: executor for cli command container_id: cid of container endpoint: FrostFS endpoint to send request to, appends to `--rpc-endpoint` key timeout: Timeout for the operation. Returns: (list): list of containers """ cli = FrostfsCli(shell, FROSTFS_CLI_EXEC, wallet.config_path) result = cli.container.list_objects(rpc_endpoint=endpoint, cid=container_id, timeout=timeout) logger.info(f"Container objects: \n{result}") return result.stdout.split() @reporter.step("Get Container") def get_container( wallet: WalletInfo, cid: str, shell: Shell, endpoint: str, json_mode: bool = True, timeout: Optional[str] = CLI_DEFAULT_TIMEOUT, ) -> Union[dict, str]: """ A wrapper for `frostfs-cli container get` call. It extracts container's attributes and rearranges them into a more compact view. Args: wallet (WalletInfo): path to a wallet on whose behalf we get the container cid (str): ID of the container to get shell: executor for cli command endpoint: FrostFS endpoint to send request to, appends to `--rpc-endpoint` key json_mode (bool): return container in JSON format timeout: Timeout for the operation. Returns: (dict, str): dict of container attributes """ cli = FrostfsCli(shell, FROSTFS_CLI_EXEC, wallet.config_path) result = cli.container.get(rpc_endpoint=endpoint, cid=cid, json_mode=json_mode, timeout=timeout) if not json_mode: return result.stdout container_info = json.loads(result.stdout) attributes = dict() for attr in container_info["attributes"]: attributes[attr["key"]] = attr["value"] container_info["attributes"] = attributes container_info["ownerID"] = json_utils.json_reencode(container_info["ownerID"]["value"]) return container_info @reporter.step("Delete Container") # TODO: make the error message about a non-found container more user-friendly def delete_container( wallet: WalletInfo, cid: str, shell: Shell, endpoint: str, force: bool = False, session_token: Optional[str] = None, await_mode: bool = False, ) -> None: """ A wrapper for `frostfs-cli container delete` call. Args: await_mode: Block execution until container is removed. wallet (WalletInfo): path to a wallet on whose behalf we delete the container cid (str): ID of the container to delete shell: executor for cli command endpoint: FrostFS endpoint to send request to, appends to `--rpc-endpoint` key force (bool): do not check whether container contains locks and remove immediately session_token: a path to session token file This function doesn't return anything. """ cli = FrostfsCli(shell, FROSTFS_CLI_EXEC, wallet.config_path) cli.container.delete( cid=cid, rpc_endpoint=endpoint, force=force, session=session_token, await_mode=await_mode, ) def _parse_cid(output: str) -> str: """ Parses container ID from a given CLI output. The input string we expect: container ID: 2tz86kVTDpJxWHrhw3h6PbKMwkLtBEwoqhHQCKTre1FN awaiting... container has been persisted on sidechain We want to take 'container ID' value from the string. Args: output (str): CLI output to parse Returns: (str): extracted CID """ try: # taking first line from command's output first_line = output.split("\n")[0] except Exception: first_line = "" logger.error(f"Got empty output: {output}") splitted = first_line.split(": ") if len(splitted) != 2: raise ValueError(f"no CID was parsed from command output: \t{first_line}") return splitted[1] @reporter.step("Search container by name") def search_container_by_name(name: str, node: ClusterNode): resolver_cls = load_plugin("frostfs.testlib.bucket_cid_resolver", node.host.config.product) resolver: BucketContainerResolver = resolver_cls() return resolver.resolve(node, name) @reporter.step("Search for nodes with a container") def search_nodes_with_container( wallet: WalletInfo, cid: str, shell: Shell, endpoint: str, cluster: Cluster, timeout: Optional[str] = CLI_DEFAULT_TIMEOUT, ) -> list[ClusterNode]: cli = FrostfsCli(shell, FROSTFS_CLI_EXEC, wallet.config_path) result = cli.container.search_node(rpc_endpoint=endpoint, cid=cid, timeout=timeout) pattern = r"[0-9]+(?:\.[0-9]+){3}" nodes_ip = list(set(re.findall(pattern, result.stdout))) with reporter.step(f"nodes ips = {nodes_ip}"): nodes_list = cluster.get_nodes_by_ip(nodes_ip) with reporter.step(f"Return nodes - {nodes_list}"): return nodes_list