#!/usr/bin/python3.10 # TODO: This file is deprecated and all code which uses these calls should be refactored to use shell classes """ Helper functions to use with `frostfs-cli`, `neo-go` and other CLIs. """ import csv import json import logging import re import sys from contextlib import suppress from datetime import datetime from io import StringIO from textwrap import shorten from typing import Any, Optional, Union import pexpect from frostfs_testlib import reporter from frostfs_testlib.storage.dataclasses.storage_object_info import NodeNetmapInfo logger = logging.getLogger("NeoLogger") COLOR_GREEN = "\033[92m" COLOR_OFF = "\033[0m" def _run_with_passwd(cmd: str) -> str: child = pexpect.spawn(cmd) child.delaybeforesend = 1 child.expect(".*") child.sendline("\r") if sys.platform == "darwin": child.expect(pexpect.EOF) cmd = child.before else: child.wait() cmd = child.read() return cmd.decode() def _configure_aws_cli(cmd: str, key_id: str, access_key: str, region: str, out_format: str = "json") -> str: child = pexpect.spawn(cmd) child.delaybeforesend = 1 child.expect("AWS Access Key ID.*") child.sendline(key_id) child.expect("AWS Secret Access Key.*") child.sendline(access_key) child.expect("Default region name.*") child.sendline("region") child.expect("Default output format.*") child.sendline(out_format) child.wait() cmd = child.read() # child.expect(pexpect.EOF) # cmd = child.before return cmd.decode() def _attach_allure_log(cmd: str, output: str, return_code: int, start_time: datetime, end_time: datetime) -> None: command_attachment = ( f"COMMAND: '{cmd}'\n" f"OUTPUT:\n {output}\n" f"RC: {return_code}\n" f"Start / End / Elapsed\t {start_time} / {end_time} / {end_time - start_time}" ) with reporter.step(f'COMMAND: {shorten(cmd, width=60, placeholder="...")}'): reporter.attach(command_attachment, "Command execution") def log_command_execution(cmd: str, output: Union[str, dict], params: Optional[dict] = None, **kwargs) -> None: logger.info(f"{cmd}: {output}") if not params: params = {} if params.get("Body") and len(params.get("Body")) > 1000: params["Body"] = "" output_params = params try: json_params = json.dumps(params, indent=4, sort_keys=True, default=str) except TypeError as err: logger.warning(f"Failed to serialize '{cmd}' request parameters:\n{params}\nException: {err}") else: output_params = json_params output = json.dumps(output, indent=4, sort_keys=True, default=str) command_execution = f"COMMAND: '{cmd}'\n" f"URL: {kwargs['endpoint']}\n" f"PARAMS:\n{output_params}\n" f"OUTPUT:\n{output}\n" aws_command = _convert_request_to_aws_cli_command(cmd, params, **kwargs) reporter.attach(command_execution, "Command execution") reporter.attach(aws_command, "AWS CLI Command") def _convert_request_to_aws_cli_command(command: str, params: dict, **kwargs) -> str: overriden_names = [_convert_json_name_to_aws_cli(name) for name in kwargs.keys()] command = command.replace("_", "-") options = [] for name, value in params.items(): name = _convert_json_name_to_aws_cli(name) # To override parameters for AWS CLI if name in overriden_names: continue if option := _create_option(name, value): options.append(option) for name, value in kwargs.items(): name = _convert_json_name_to_aws_cli(name) if option := _create_option(name, value): options.append(option) options = " ".join(options) api = "s3api" if "s3" in kwargs["endpoint"] else "iam" return f"aws --no-verify-ssl --no-paginate {api} {command} {options}" def _convert_json_name_to_aws_cli(name: str) -> str: specific_names = {"CORSConfiguration": "cors-configuration"} if aws_cli_name := specific_names.get(name): return aws_cli_name return re.sub(r"([a-z])([A-Z])", r"\1 \2", name).lower().replace(" ", "-").replace("_", "-") def _create_option(name: str, value: Any) -> str | None: if isinstance(value, bool) and value: return f"--{name}" if isinstance(value, dict): value = json.dumps(value, indent=4, sort_keys=True, default=str) return f"--{name} '{value}'" if value: return f"--{name} {value}" return None def parse_netmap_output(output: str) -> list[NodeNetmapInfo]: """ The code will parse each line and return each node as dataclass. """ netmap_nodes = output.split("Node ")[1:] dataclasses_netmap = [] result_netmap = {} regexes = { "node_id": r"\d+: (?P\w+)", "node_data_ips": r"(?P/ip4/.+?)$", "node_status": r"(?PONLINE|OFFLINE)", "cluster_name": r"ClusterName: (?P\w+)", "continent": r"Continent: (?P\w+)", "country": r"Country: (?P\w+)", "country_code": r"CountryCode: (?P\w+)", "external_address": r"ExternalAddr: (?P/ip[4].+?)$", "location": r"Location: (?P\w+.*)", "node": r"Node: (?P\d+\.\d+\.\d+\.\d+)", "price": r"Price: (?P\d+)", "sub_div": r"SubDiv: (?P.*)", "sub_div_code": r"SubDivCode: (?P\w+)", "un_locode": r"UN-LOCODE: (?P\w+.*)", "role": r"role: (?P\w+)", } for node in netmap_nodes: for key, regex in regexes.items(): search_result = re.search(regex, node, flags=re.MULTILINE) if key == "node_data_ips": result_netmap[key] = search_result[key].strip().split(" ") continue if key == "external_address": result_netmap[key] = search_result[key].strip().split(",") continue if search_result == None: result_netmap[key] = None continue result_netmap[key] = search_result[key].strip() dataclasses_netmap.append(NodeNetmapInfo(**result_netmap)) return dataclasses_netmap def parse_cmd_table(output: str, delimiter="|") -> list[dict[str, str]]: parsing_output = [] reader = csv.reader(StringIO(output.strip()), delimiter=delimiter) iter_reader = iter(reader) header_row = next(iter_reader) for row in iter_reader: table = {} for i in range(len(row)): header = header_row[i].strip().lower().replace(" ", "_") value = row[i].strip().lower() if header: table[header] = value parsing_output.append(table) return parsing_output