import re from frostfs_testlib.storage.cluster import ClusterNode from frostfs_testlib.storage.dataclasses.storage_object_info import NodeNetInfo, NodeNetmapInfo, NodeStatus class NetmapParser: @staticmethod def netinfo(output: str) -> NodeNetInfo: regexes = { "epoch": r"Epoch: (?P\d+)", "network_magic": r"Network magic: (?P.*$)", "time_per_block": r"Time per block: (?P\d+\w+)", "container_fee": r"Container fee: (?P\d+)", "epoch_duration": r"Epoch duration: (?P\d+)", "inner_ring_candidate_fee": r"Inner Ring candidate fee: (?P\d+)", "maximum_object_size": r"Maximum object size: (?P\d+)", "maximum_count_of_data_shards": r"Maximum count of data shards: (?P\d+)", "maximum_count_of_parity_shards": r"Maximum count of parity shards: (?P\d+)", "withdrawal_fee": r"Withdrawal fee: (?P\d+)", "homomorphic_hashing_disabled": r"Homomorphic hashing disabled: (?Ptrue|false)", "maintenance_mode_allowed": r"Maintenance mode allowed: (?Ptrue|false)", "eigen_trust_alpha": r"EigenTrustAlpha: (?P\d+\w+$)", "eigen_trust_iterations": r"EigenTrustIterations: (?P\d+)", } parse_result = {} for key, regex in regexes.items(): search_result = re.search(regex, output, flags=re.MULTILINE) if search_result == None: parse_result[key] = None continue parse_result[key] = search_result[key].strip() node_netinfo = NodeNetInfo(**parse_result) return node_netinfo @staticmethod def snapshot_all_nodes(output: str) -> list[NodeNetmapInfo]: """The code will parse each line and return each node as dataclass.""" netmap_nodes = output.split("Node ")[1:] dataclasses_netmap = [] result_netmap = {} regexes = { "node_id": r"\d+: (?P\w+)", "node_data_ips": r"(?P/ip4/.+?)$", "node_status": r"(?PONLINE|MAINTENANCE|OFFLINE)", "cluster_name": r"ClusterName: (?P\w+)", "continent": r"Continent: (?P\w+)", "country": r"Country: (?P\w+)", "country_code": r"CountryCode: (?P\w+)", "external_address": r"ExternalAddr: (?P/ip[4].+?)$", "location": r"Location: (?P\w+.*)", "node": r"Node: (?P\d+\.\d+\.\d+\.\d+)", "price": r"Price: (?P\d+)", "sub_div": r"SubDiv: (?P.*)", "sub_div_code": r"SubDivCode: (?P\w+)", "un_locode": r"UN-LOCODE: (?P\w+.*)", "role": r"role: (?P\w+)", } for node in netmap_nodes: for key, regex in regexes.items(): search_result = re.search(regex, node, flags=re.MULTILINE) if search_result == None: result_netmap[key] = None continue if key == "node_data_ips": result_netmap[key] = search_result[key].strip().split(" ") continue if key == "external_address": result_netmap[key] = search_result[key].strip().split(",") continue if key == "node_status": result_netmap[key] = NodeStatus(search_result[key].strip().lower()) continue result_netmap[key] = search_result[key].strip() dataclasses_netmap.append(NodeNetmapInfo(**result_netmap)) return dataclasses_netmap @staticmethod def snapshot_one_node(output: str, cluster_node: ClusterNode) -> NodeNetmapInfo | None: snapshot_nodes = NetmapParser.snapshot_all_nodes(output=output) snapshot_node = [node for node in snapshot_nodes if node.node == cluster_node.host_ip] if not snapshot_node: return None return snapshot_node[0]