frostfs-testlib/src/frostfs_testlib/cli/netmap_parser.py
Kirill Sosnovskikh dcde9e15b1
All checks were successful
DCO action / DCO (pull_request) Successful in 22s
[#365] Change type hint for NetmapOperations.nodeinfo
Signed-off-by: Kirill Sosnovskikh <k.sosnovskikh@yadro.com>
2025-03-18 14:56:12 +00:00

102 lines
4.5 KiB
Python

import re
from frostfs_testlib.storage.cluster import ClusterNode
from frostfs_testlib.storage.dataclasses.storage_object_info import Interfaces, NodeInfo, NodeNetInfo, NodeNetmapInfo, NodeStatus
class NetmapParser:
@staticmethod
def netinfo(output: str) -> NodeNetInfo:
regexes = {
"epoch": r"Epoch: (?P<epoch>\d+)",
"network_magic": r"Network magic: (?P<network_magic>.*$)",
"time_per_block": r"Time per block: (?P<time_per_block>\d+\w+)",
"container_fee": r"Container fee: (?P<container_fee>\d+)",
"epoch_duration": r"Epoch duration: (?P<epoch_duration>\d+)",
"inner_ring_candidate_fee": r"Inner Ring candidate fee: (?P<inner_ring_candidate_fee>\d+)",
"maximum_object_size": r"Maximum object size: (?P<maximum_object_size>\d+)",
"maximum_count_of_data_shards": r"Maximum count of data shards: (?P<maximum_count_of_data_shards>\d+)",
"maximum_count_of_parity_shards": r"Maximum count of parity shards: (?P<maximum_count_of_parity_shards>\d+)",
"withdrawal_fee": r"Withdrawal fee: (?P<withdrawal_fee>\d+)",
"homomorphic_hashing_disabled": r"Homomorphic hashing disabled: (?P<homomorphic_hashing_disabled>true|false)",
"maintenance_mode_allowed": r"Maintenance mode allowed: (?P<maintenance_mode_allowed>true|false)",
}
parse_result = {}
for key, regex in regexes.items():
search_result = re.search(regex, output, flags=re.MULTILINE)
if search_result == None:
parse_result[key] = None
continue
parse_result[key] = search_result[key].strip()
node_netinfo = NodeNetInfo(**parse_result)
return node_netinfo
@staticmethod
def snapshot_all_nodes(output: str) -> list[NodeNetmapInfo]:
"""The code will parse each line and return each node as dataclass."""
netmap_nodes = output.split("Node ")[1:]
dataclasses_netmap = []
result_netmap = {}
regexes = {
"node_id": r"\d+: (?P<node_id>\w+)",
"node_data_ips": r"(?P<node_data_ips>/ip4/.+?)$",
"node_status": r"(?P<node_status>ONLINE|MAINTENANCE|OFFLINE)",
"cluster_name": r"ClusterName: (?P<cluster_name>\w+)",
"continent": r"Continent: (?P<continent>\w+)",
"country": r"Country: (?P<country>\w+)",
"country_code": r"CountryCode: (?P<country_code>\w+)",
"external_address": r"ExternalAddr: (?P<external_address>/ip[4].+?)$",
"location": r"Location: (?P<location>\w+.*)",
"node": r"Node: (?P<node>\d+\.\d+\.\d+\.\d+)",
"price": r"Price: (?P<price>\d+)",
"sub_div": r"SubDiv: (?P<sub_div>.*)",
"sub_div_code": r"SubDivCode: (?P<sub_div_code>\w+)",
"un_locode": r"UN-LOCODE: (?P<un_locode>\w+.*)",
"role": r"role: (?P<role>\w+)",
}
for node in netmap_nodes:
for key, regex in regexes.items():
search_result = re.search(regex, node, flags=re.MULTILINE)
if search_result is None:
result_netmap[key] = None
continue
if key == "node_data_ips":
result_netmap[key] = search_result[key].strip().split(" ")
continue
if key == "external_address":
result_netmap[key] = search_result[key].strip().split(",")
continue
if key == "node_status":
result_netmap[key] = NodeStatus(search_result[key].strip().lower())
continue
result_netmap[key] = search_result[key].strip()
dataclasses_netmap.append(NodeNetmapInfo(**result_netmap))
return dataclasses_netmap
@staticmethod
def snapshot_one_node(output: str, rpc_endpoint: str) -> NodeNetmapInfo | None:
snapshot_nodes = NetmapParser.snapshot_all_nodes(output=output)
for snapshot in snapshot_nodes:
for endpoint in snapshot.external_address:
if rpc_endpoint.split(":")[0] in endpoint:
return snapshot
@staticmethod
def node_info(output: dict) -> NodeInfo:
data_dict = {"attributes": {}}
for key, value in output.items():
if key != "attributes":
data_dict[key] = value
for attribute in output["attributes"]:
data_dict["attributes"][attribute["key"]] = attribute["value"]
return NodeInfo(**data_dict)