from abc import ABC from typing import Any from frostfs_testlib.load.load_config import LoadScenario class MetricsBase(ABC): _WRITE_SUCCESS = "" _WRITE_ERRORS = "" _WRITE_THROUGHPUT = "data_sent" _WRITE_LATENCY = "" _READ_SUCCESS = "" _READ_ERRORS = "" _READ_LATENCY = "" _READ_THROUGHPUT = "data_received" _DELETE_SUCCESS = "" _DELETE_LATENCY = "" _DELETE_ERRORS = "" def __init__(self, summary) -> None: self.summary = summary self.metrics = summary["metrics"] @property def write_total_iterations(self) -> int: return self._get_metric(self._WRITE_SUCCESS) + self._get_metric(self._WRITE_ERRORS) @property def write_success_iterations(self) -> int: return self._get_metric(self._WRITE_SUCCESS) @property def write_latency(self) -> dict: return self._get_metric(self._WRITE_LATENCY) @property def write_rate(self) -> float: return self._get_metric_rate(self._WRITE_SUCCESS) @property def write_failed_iterations(self) -> int: return self._get_metric(self._WRITE_ERRORS) @property def write_throughput(self) -> float: return self._get_metric_rate(self._WRITE_THROUGHPUT) @property def read_total_iterations(self) -> int: return self._get_metric(self._READ_SUCCESS) + self._get_metric(self._READ_ERRORS) @property def read_success_iterations(self) -> int: return self._get_metric(self._READ_SUCCESS) @property def read_latency(self) -> dict: return self._get_metric(self._READ_LATENCY) @property def read_rate(self) -> int: return self._get_metric_rate(self._READ_SUCCESS) @property def read_failed_iterations(self) -> int: return self._get_metric(self._READ_ERRORS) @property def read_throughput(self) -> float: return self._get_metric_rate(self._READ_THROUGHPUT) @property def delete_total_iterations(self) -> int: return self._get_metric(self._DELETE_SUCCESS) + self._get_metric(self._DELETE_ERRORS) @property def delete_success_iterations(self) -> int: return self._get_metric(self._DELETE_SUCCESS) @property def delete_latency(self) -> dict: return self._get_metric(self._DELETE_LATENCY) @property def delete_failed_iterations(self) -> int: return self._get_metric(self._DELETE_ERRORS) @property def delete_rate(self) -> int: return self._get_metric_rate(self._DELETE_SUCCESS) def _get_metric(self, metric: str) -> int: metrics_method_map = {"counter": self._get_counter_metric, "gauge": self._get_gauge_metric, "trend" : self._get_trend_metrics} if metric not in self.metrics: return 0 metric = self.metrics[metric] metric_type = metric["type"] if metric_type not in metrics_method_map: raise Exception( f"Unsupported metric type: {metric_type}, supported: {metrics_method_map.keys()}" ) return metrics_method_map[metric_type](metric) def _get_metric_rate(self, metric: str) -> int: metrics_method_map = {"counter": self._get_counter_metric_rate} if metric not in self.metrics: return 0 metric = self.metrics[metric] metric_type = metric["type"] if metric_type not in metrics_method_map: raise Exception( f"Unsupported rate metric type: {metric_type}, supported: {metrics_method_map.keys()}" ) return metrics_method_map[metric_type](metric) def _get_counter_metric_rate(self, metric: str) -> int: return metric["values"]["rate"] def _get_counter_metric(self, metric: str) -> int: return metric["values"]["count"] def _get_gauge_metric(self, metric: str) -> int: return metric["values"]["value"] def _get_trend_metrics(self, metric: str) -> int: return metric["values"] class GrpcMetrics(MetricsBase): _WRITE_SUCCESS = "frostfs_obj_put_total" _WRITE_ERRORS = "frostfs_obj_put_fails" _WRITE_LATENCY = "frostfs_obj_put_duration" _READ_SUCCESS = "frostfs_obj_get_total" _READ_ERRORS = "frostfs_obj_get_fails" _READ_LATENCY = "frostfs_obj_get_duration" _DELETE_SUCCESS = "frostfs_obj_delete_total" _DELETE_ERRORS = "frostfs_obj_delete_fails" _DELETE_LATENCY = "frostfs_obj_delete_duration" class S3Metrics(MetricsBase): _WRITE_SUCCESS = "aws_obj_put_total" _WRITE_ERRORS = "aws_obj_put_fails" _WRITE_LATENCY = "aws_obj_put_duration" _READ_SUCCESS = "aws_obj_get_total" _READ_ERRORS = "aws_obj_get_fails" _READ_LATENCY = "aws_obj_get_duration" _DELETE_SUCCESS = "aws_obj_delete_total" _DELETE_ERRORS = "aws_obj_delete_fails" _DELETE_LATENCY = "aws_obj_delete_duration" class LocalMetrics(MetricsBase): _WRITE_SUCCESS = "local_obj_put_total" _WRITE_ERRORS = "local_obj_put_fails" _READ_SUCCESS = "local_obj_get_total" _READ_ERRORS = "local_obj_get_fails" _DELETE_SUCCESS = "local_obj_delete_total" _DELETE_ERRORS = "local_obj_delete_fails" class VerifyMetrics(MetricsBase): _WRITE_SUCCESS = "N/A" _WRITE_ERRORS = "N/A" _READ_SUCCESS = "verified_obj" _READ_ERRORS = "invalid_obj" _DELETE_SUCCESS = "N/A" _DELETE_ERRORS = "N/A" def get_metrics_object(load_type: LoadScenario, summary: dict[str, Any]) -> MetricsBase: class_map = { LoadScenario.gRPC: GrpcMetrics, LoadScenario.gRPC_CAR: GrpcMetrics, LoadScenario.HTTP: GrpcMetrics, LoadScenario.S3: S3Metrics, LoadScenario.S3_CAR: S3Metrics, LoadScenario.VERIFY: VerifyMetrics, LoadScenario.LOCAL: LocalMetrics, } return class_map[load_type](summary)