import re from frostfs_testlib import reporter from frostfs_testlib.storage.cluster import ClusterNode from frostfs_testlib.testing.test_control import wait_for_success @reporter.step("Check metrics result") @wait_for_success(interval=10) def check_metrics_counter( cluster_nodes: list[ClusterNode], operator: str = "==", counter_exp: int = 0, parse_from_command: bool = False, **metrics_greps: str, ): counter_act = 0 for cluster_node in cluster_nodes: counter_act += get_metrics_value(cluster_node, parse_from_command, **metrics_greps) assert eval( f"{counter_act} {operator} {counter_exp}" ), f"Expected: {counter_exp} {operator} Actual: {counter_act} in nodes: {cluster_nodes}" @reporter.step("Get metrics value from node: {node}") def get_metrics_value(node: ClusterNode, parse_from_command: bool = False, **metrics_greps: str): try: command_result = node.metrics.storage.get_metrics_search_by_greps(**metrics_greps) if parse_from_command: metrics_counter = calc_metrics_count_from_stdout(command_result.stdout, **metrics_greps) else: metrics_counter = calc_metrics_count_from_stdout(command_result.stdout) except RuntimeError as e: metrics_counter = 0 return metrics_counter @reporter.step("Parse metrics count and calc sum of result") def calc_metrics_count_from_stdout(metric_result_stdout: str, command: str = None): if command: result = re.findall(rf"{command}\s*([\d.e+-]+)", metric_result_stdout) else: result = re.findall(r"}\s*([\d.e+-]+)", metric_result_stdout) return sum(map(lambda x: int(float(x)), result))