forked from TrueCloudLab/frostfs-testlib
146 lines
4.7 KiB
Python
146 lines
4.7 KiB
Python
#!/usr/bin/python3.10
|
|
|
|
# TODO: This file is deprecated and all code which uses these calls should be refactored to use shell classes
|
|
|
|
"""
|
|
Helper functions to use with `frostfs-cli`, `neo-go` and other CLIs.
|
|
"""
|
|
import csv
|
|
import json
|
|
import logging
|
|
import re
|
|
import subprocess
|
|
import sys
|
|
from contextlib import suppress
|
|
from datetime import datetime
|
|
from io import StringIO
|
|
from textwrap import shorten
|
|
from typing import Dict, List, TypedDict, Union
|
|
|
|
import pexpect
|
|
|
|
from frostfs_testlib import reporter
|
|
from frostfs_testlib.storage.dataclasses.storage_object_info import NodeNetmapInfo
|
|
|
|
logger = logging.getLogger("NeoLogger")
|
|
COLOR_GREEN = "\033[92m"
|
|
COLOR_OFF = "\033[0m"
|
|
|
|
|
|
def _run_with_passwd(cmd: str) -> str:
|
|
child = pexpect.spawn(cmd)
|
|
child.delaybeforesend = 1
|
|
child.expect(".*")
|
|
child.sendline("\r")
|
|
if sys.platform == "darwin":
|
|
child.expect(pexpect.EOF)
|
|
cmd = child.before
|
|
else:
|
|
child.wait()
|
|
cmd = child.read()
|
|
return cmd.decode()
|
|
|
|
|
|
def _configure_aws_cli(cmd: str, key_id: str, access_key: str, out_format: str = "json") -> str:
|
|
child = pexpect.spawn(cmd)
|
|
child.delaybeforesend = 1
|
|
|
|
child.expect("AWS Access Key ID.*")
|
|
child.sendline(key_id)
|
|
|
|
child.expect("AWS Secret Access Key.*")
|
|
child.sendline(access_key)
|
|
|
|
child.expect("Default region name.*")
|
|
child.sendline("")
|
|
|
|
child.expect("Default output format.*")
|
|
child.sendline(out_format)
|
|
|
|
child.wait()
|
|
cmd = child.read()
|
|
# child.expect(pexpect.EOF)
|
|
# cmd = child.before
|
|
return cmd.decode()
|
|
|
|
|
|
def _attach_allure_log(cmd: str, output: str, return_code: int, start_time: datetime, end_time: datetime) -> None:
|
|
command_attachment = (
|
|
f"COMMAND: '{cmd}'\n"
|
|
f"OUTPUT:\n {output}\n"
|
|
f"RC: {return_code}\n"
|
|
f"Start / End / Elapsed\t {start_time.time()} / {end_time.time()} / {end_time - start_time}"
|
|
)
|
|
with reporter.step(f'COMMAND: {shorten(cmd, width=60, placeholder="...")}'):
|
|
reporter.attach(command_attachment, "Command execution")
|
|
|
|
|
|
def log_command_execution(cmd: str, output: Union[str, TypedDict]) -> None:
|
|
logger.info(f"{cmd}: {output}")
|
|
with suppress(Exception):
|
|
json_output = json.dumps(output, indent=4, sort_keys=True)
|
|
output = json_output
|
|
command_attachment = f"COMMAND: '{cmd}'\n" f"OUTPUT:\n {output}\n"
|
|
with reporter.step(f'COMMAND: {shorten(cmd, width=60, placeholder="...")}'):
|
|
reporter.attach(command_attachment, "Command execution")
|
|
|
|
|
|
def parse_netmap_output(output: str) -> list[NodeNetmapInfo]:
|
|
"""
|
|
The code will parse each line and return each node as dataclass.
|
|
"""
|
|
netmap_nodes = output.split("Node ")[1:]
|
|
dataclasses_netmap = []
|
|
result_netmap = {}
|
|
|
|
regexes = {
|
|
"node_id": r"\d+: (?P<node_id>\w+)",
|
|
"node_data_ips": r"(?P<node_data_ips>/ip4/.+?)$",
|
|
"node_status": r"(?P<node_status>ONLINE|OFFLINE)",
|
|
"cluster_name": r"ClusterName: (?P<cluster_name>\w+)",
|
|
"continent": r"Continent: (?P<continent>\w+)",
|
|
"country": r"Country: (?P<country>\w+)",
|
|
"country_code": r"CountryCode: (?P<country_code>\w+)",
|
|
"external_address": r"ExternalAddr: (?P<external_address>/ip[4].+?)$",
|
|
"location": r"Location: (?P<location>\w+.*)",
|
|
"node": r"Node: (?P<node>\d+\.\d+\.\d+\.\d+)",
|
|
"price": r"Price: (?P<price>\d+)",
|
|
"sub_div": r"SubDiv: (?P<sub_div>.*)",
|
|
"sub_div_code": r"SubDivCode: (?P<sub_div_code>\w+)",
|
|
"un_locode": r"UN-LOCODE: (?P<un_locode>\w+.*)",
|
|
"role": r"role: (?P<role>\w+)",
|
|
}
|
|
|
|
for node in netmap_nodes:
|
|
for key, regex in regexes.items():
|
|
search_result = re.search(regex, node, flags=re.MULTILINE)
|
|
if key == "node_data_ips":
|
|
result_netmap[key] = search_result[key].strip().split(" ")
|
|
continue
|
|
if key == "external_address":
|
|
result_netmap[key] = search_result[key].strip().split(",")
|
|
continue
|
|
if search_result == None:
|
|
result_netmap[key] = None
|
|
continue
|
|
result_netmap[key] = search_result[key].strip()
|
|
|
|
dataclasses_netmap.append(NodeNetmapInfo(**result_netmap))
|
|
|
|
return dataclasses_netmap
|
|
|
|
|
|
def parse_cmd_table(output: str, delimiter="|") -> list[dict[str, str]]:
|
|
parsing_output = []
|
|
reader = csv.reader(StringIO(output.strip()), delimiter=delimiter)
|
|
iter_reader = iter(reader)
|
|
header_row = next(iter_reader)
|
|
for row in iter_reader:
|
|
table = {}
|
|
for i in range(len(row)):
|
|
header = header_row[i].strip().lower().replace(" ", "_")
|
|
value = row[i].strip().lower()
|
|
if header:
|
|
table[header] = value
|
|
parsing_output.append(table)
|
|
return parsing_output
|