diff --git a/src/frostfs_testlib/storage/dataclasses/storage_object_info.py b/src/frostfs_testlib/storage/dataclasses/storage_object_info.py index 21a820f..f7d51db 100644 --- a/src/frostfs_testlib/storage/dataclasses/storage_object_info.py +++ b/src/frostfs_testlib/storage/dataclasses/storage_object_info.py @@ -29,14 +29,15 @@ class StorageObjectInfo(ObjectRef): class NodeNetmapInfo: node_id: str = None node_status: str = None - node_data_ip: str = None + node_data_ips: list[str] = None cluster_name: str = None continent: str = None country: str = None country_code: str = None - external_address: str = None + external_address: list[str] = None location: str = None node: str = None + price: int = None sub_div: str = None sub_div_code: int = None un_locode: str = None diff --git a/src/frostfs_testlib/utils/cli_utils.py b/src/frostfs_testlib/utils/cli_utils.py index 5bd4695..0fa6cde 100644 --- a/src/frostfs_testlib/utils/cli_utils.py +++ b/src/frostfs_testlib/utils/cli_utils.py @@ -8,6 +8,7 @@ Helper functions to use with `frostfs-cli`, `neo-go` and other CLIs. import csv import json import logging +import re import subprocess import sys from contextlib import suppress @@ -138,32 +139,47 @@ def log_command_execution(cmd: str, output: Union[str, TypedDict]) -> None: def parse_netmap_output(output: str) -> list[NodeNetmapInfo]: """ - The cli command will return something like. - - Epoch: 240 - Node 1: 01234 ONLINE /ip4/10.10.10.10/tcp/8080 - Continent: Europe - Country: Russia - CountryCode: RU - ExternalAddr: /ip4/10.10.11.18/tcp/8080 - Location: Moskva - Node: 10.10.10.12 - Price: 5 - SubDiv: Moskva - SubDivCode: MOW - UN-LOCODE: RU MOW - role: alphabet - The code will parse each line and return each node as dataclass. """ - netmap_list = output.split("Node ")[1:] - dataclass_list = [] - for node in netmap_list: - node = node.replace("\t", "").split("\n") - node = *node[0].split(" ")[1:-1], *[row.split(": ")[-1] for row in node[1:-1]] - dataclass_list.append(NodeNetmapInfo(*node)) + netmap_nodes = output.split("Node ")[1:] + dataclasses_netmap = [] + result_netmap = {} - return dataclass_list + regexes = { + "node_id": r"\d+: (?P\w+)", + "node_data_ips": r"(?P/ip4/.+?)$", + "node_status": r"(?PONLINE|OFFLINE)", + "cluster_name": r"ClusterName: (?P\w+)", + "continent": r"Continent: (?P\w+)", + "country": r"Country: (?P\w+)", + "country_code": r"CountryCode: (?P\w+)", + "external_address": r"ExternalAddr: (?P/ip[4].+?)$", + "location": r"Location: (?P\w+.*)", + "node": r"Node: (?P\d+\.\d+\.\d+\.\d+)", + "price": r"Price: (?P\d+)", + "sub_div": r"SubDiv: (?P.*)", + "sub_div_code": r"SubDivCode: (?P\w+)", + "un_locode": r"UN-LOCODE: (?P\w+.*)", + "role": r"role: (?P\w+)", + } + + for node in netmap_nodes: + for key, regex in regexes.items(): + search_result = re.search(regex, node, flags=re.MULTILINE) + if key == "node_data_ips": + result_netmap[key] = search_result[key].strip().split(" ") + continue + if key == "external_address": + result_netmap[key] = search_result[key].strip().split(",") + continue + if search_result == None: + result_netmap[key] = None + continue + result_netmap[key] = search_result[key].strip() + + dataclasses_netmap.append(NodeNetmapInfo(**result_netmap)) + + return dataclasses_netmap def parse_cmd_table(output: str, delimiter="|") -> list[dict[str, str]]: