import logging from typing import Optional from frostfs_testlib import reporter from frostfs_testlib.cli.frostfs_cli.cli import FrostfsCli from frostfs_testlib.resources.cli import CLI_DEFAULT_TIMEOUT from frostfs_testlib.storage.constants import PlacementRule from frostfs_testlib.storage.grps_operations import interfaces logger = logging.getLogger("NeoLogger") class ContainerOperations(interfaces.ContainerInterface): def __init__(self, cli: FrostfsCli) -> None: self.cli = cli @reporter.step("Create Container") def create( self, endpoint: str, rule: str = PlacementRule.DEFAULT_PLACEMENT_RULE, basic_acl: str = "", attributes: Optional[dict] = None, session_token: str = "", name: Optional[str] = None, options: Optional[dict] = None, await_mode: bool = True, timeout: Optional[str] = CLI_DEFAULT_TIMEOUT, ) -> str: """ A wrapper for `frostfs-cli container create` call. Args: wallet (WalletInfo): a wallet on whose behalf a container is created rule (optional, str): placement rule for container basic_acl (optional, str): an ACL for container, will be appended to `--basic-acl` key attributes (optional, dict): container attributes , will be appended to `--attributes` key session_token (optional, str): a path to session token file session_wallet(optional, str): a path to the wallet which signed the session token; this parameter makes sense when paired with `session_token` shell: executor for cli command endpoint: FrostFS endpoint to send request to, appends to `--rpc-endpoint` key options (optional, dict): any other options to pass to the call name (optional, str): container name attribute await_mode (bool): block execution until container is persisted wait_for_creation (): Wait for container shows in container list timeout: Timeout for the operation. Returns: (str): CID of the created container """ result = self.cli.container.create( rpc_endpoint=endpoint, policy=rule, basic_acl=basic_acl, attributes=attributes, name=name, session=session_token, await_mode=await_mode, timeout=timeout, **options or {}, ) cid = self._parse_cid(result.stdout) logger.info("Container created; waiting until it is persisted in the sidechain") return cid @reporter.step("List Containers") def list(self, endpoint: str, timeout: Optional[str] = CLI_DEFAULT_TIMEOUT) -> list[str]: """ A wrapper for `frostfs-cli container list` call. It returns all the available containers for the given wallet. Args: wallet (WalletInfo): a wallet on whose behalf we list the containers shell: executor for cli command endpoint: FrostFS endpoint to send request to, appends to `--rpc-endpoint` key timeout: Timeout for the operation. Returns: (list): list of containers """ result = self.cli.container.list(rpc_endpoint=endpoint, timeout=timeout) return result.stdout.split() def _parse_cid(self, output: str) -> str: """ Parses container ID from a given CLI output. The input string we expect: container ID: 2tz86kVTDpJxWHrhw3h6PbKMwkLtBEwoqhHQCKTre1FN awaiting... container has been persisted on sidechain We want to take 'container ID' value from the string. Args: output (str): CLI output to parse Returns: (str): extracted CID """ try: # taking first line from command's output first_line = output.split("\n")[0] except Exception: first_line = "" logger.error(f"Got empty output: {output}") splitted = first_line.split(": ") if len(splitted) != 2: raise ValueError(f"no CID was parsed from command output: \t{first_line}") return splitted[1]