#!/usr/bin/python3.10 # TODO: This file is deprecated and all code which uses these calls should be refactored to use shell classes """ Helper functions to use with `frostfs-cli`, `neo-go` and other CLIs. """ import csv import json import logging import re import subprocess import sys from contextlib import suppress from datetime import datetime from io import StringIO from textwrap import shorten from typing import Dict, List, Optional, TypedDict, Union import pexpect from frostfs_testlib import reporter from frostfs_testlib.storage.dataclasses.storage_object_info import NodeNetmapInfo logger = logging.getLogger("NeoLogger") COLOR_GREEN = "\033[92m" COLOR_OFF = "\033[0m" def _run_with_passwd(cmd: str) -> str: child = pexpect.spawn(cmd) child.delaybeforesend = 1 child.expect(".*") child.sendline("\r") if sys.platform == "darwin": child.expect(pexpect.EOF) cmd = child.before else: child.wait() cmd = child.read() return cmd.decode() def _configure_aws_cli(cmd: str, key_id: str, access_key: str, region: str, out_format: str = "json") -> str: child = pexpect.spawn(cmd) child.delaybeforesend = 1 child.expect("AWS Access Key ID.*") child.sendline(key_id) child.expect("AWS Secret Access Key.*") child.sendline(access_key) child.expect("Default region name.*") child.sendline("region") child.expect("Default output format.*") child.sendline(out_format) child.wait() cmd = child.read() # child.expect(pexpect.EOF) # cmd = child.before return cmd.decode() def _attach_allure_log(cmd: str, output: str, return_code: int, start_time: datetime, end_time: datetime) -> None: command_attachment = ( f"COMMAND: '{cmd}'\n" f"OUTPUT:\n {output}\n" f"RC: {return_code}\n" f"Start / End / Elapsed\t {start_time.time()} / {end_time.time()} / {end_time - start_time}" ) with reporter.step(f'COMMAND: {shorten(cmd, width=60, placeholder="...")}'): reporter.attach(command_attachment, "Command execution") def log_command_execution(url: str, cmd: str, output: Union[str, dict], params: Optional[dict] = None) -> None: logger.info(f"{cmd}: {output}") with suppress(Exception): json_output = json.dumps(output, indent=4, sort_keys=True) output = json_output try: json_params = json.dumps(params, indent=4, sort_keys=True) except TypeError as err: logger.warning(f"Failed to serialize '{cmd}' request parameters:\n{params}\nException: {err}") else: params = json_params command_attachment = f"COMMAND: '{cmd}'\n" f"URL: {url}\n" f"PARAMS:\n{params}\n" f"OUTPUT:\n{output}\n" reporter.attach(command_attachment, "Command execution") def parse_netmap_output(output: str) -> list[NodeNetmapInfo]: """ The code will parse each line and return each node as dataclass. """ netmap_nodes = output.split("Node ")[1:] dataclasses_netmap = [] result_netmap = {} regexes = { "node_id": r"\d+: (?P\w+)", "node_data_ips": r"(?P/ip4/.+?)$", "node_status": r"(?PONLINE|OFFLINE)", "cluster_name": r"ClusterName: (?P\w+)", "continent": r"Continent: (?P\w+)", "country": r"Country: (?P\w+)", "country_code": r"CountryCode: (?P\w+)", "external_address": r"ExternalAddr: (?P/ip[4].+?)$", "location": r"Location: (?P\w+.*)", "node": r"Node: (?P\d+\.\d+\.\d+\.\d+)", "price": r"Price: (?P\d+)", "sub_div": r"SubDiv: (?P.*)", "sub_div_code": r"SubDivCode: (?P\w+)", "un_locode": r"UN-LOCODE: (?P\w+.*)", "role": r"role: (?P\w+)", } for node in netmap_nodes: for key, regex in regexes.items(): search_result = re.search(regex, node, flags=re.MULTILINE) if key == "node_data_ips": result_netmap[key] = search_result[key].strip().split(" ") continue if key == "external_address": result_netmap[key] = search_result[key].strip().split(",") continue if search_result == None: result_netmap[key] = None continue result_netmap[key] = search_result[key].strip() dataclasses_netmap.append(NodeNetmapInfo(**result_netmap)) return dataclasses_netmap def parse_cmd_table(output: str, delimiter="|") -> list[dict[str, str]]: parsing_output = [] reader = csv.reader(StringIO(output.strip()), delimiter=delimiter) iter_reader = iter(reader) header_row = next(iter_reader) for row in iter_reader: table = {} for i in range(len(row)): header = header_row[i].strip().lower().replace(" ", "_") value = row[i].strip().lower() if header: table[header] = value parsing_output.append(table) return parsing_output