from dataclasses import dataclass, field from frostfs_testlib.load.load_config import LoadParams, LoadScenario from frostfs_testlib.load.load_metrics import get_metrics_object @dataclass class SummarizedErorrs: total: int = field(default_factory=int) percent: float = field(default_factory=float) threshold: float = field(default_factory=float) by_node: dict[str, int] = field(default_factory=dict) def calc_stats(self, operations): self.total += sum(self.by_node.values()) if not operations: return self.percent = self.total / operations * 100 @dataclass class SummarizedLatencies: avg: float = field(default_factory=float) min: float = field(default_factory=float) max: float = field(default_factory=float) by_node: dict[str, dict[str, int]] = field(default_factory=dict) def calc_stats(self): if not self.by_node: return avgs = [lt["avg"] for lt in self.by_node.values()] self.avg = sum(avgs) / len(avgs) minimal = [lt["min"] for lt in self.by_node.values()] self.min = min(minimal) maximum = [lt["max"] for lt in self.by_node.values()] self.max = max(maximum) @dataclass class SummarizedStats: threads: int = field(default_factory=int) requested_rate: int = field(default_factory=int) operations: int = field(default_factory=int) rate: float = field(default_factory=float) throughput: float = field(default_factory=float) latencies: SummarizedLatencies = field(default_factory=SummarizedLatencies) errors: SummarizedErorrs = field(default_factory=SummarizedErorrs) total_bytes: int = field(default_factory=int) passed: bool = True def calc_stats(self): self.errors.calc_stats(self.operations) self.latencies.calc_stats() self.passed = self.errors.percent <= self.errors.threshold @staticmethod def collect(load_params: LoadParams, load_summaries: dict) -> dict[str, "SummarizedStats"]: if load_params.scenario in [LoadScenario.gRPC_CAR, LoadScenario.S3_CAR]: delete_vus = max(load_params.preallocated_deleters or 0, load_params.max_deleters or 0) write_vus = max(load_params.preallocated_writers or 0, load_params.max_writers or 0) read_vus = max(load_params.preallocated_readers or 0, load_params.max_readers or 0) else: write_vus = load_params.writers read_vus = load_params.readers delete_vus = load_params.deleters summarized = { "Write": SummarizedStats(threads=write_vus, requested_rate=load_params.write_rate), "Read": SummarizedStats(threads=read_vus, requested_rate=load_params.read_rate), "Delete": SummarizedStats(threads=delete_vus, requested_rate=load_params.delete_rate), } for node_key, load_summary in load_summaries.items(): metrics = get_metrics_object(load_params.scenario, load_summary) for operation in metrics.operations: target = summarized[operation._NAME] if not operation.total_iterations: continue target.operations += operation.total_iterations target.rate += operation.rate target.latencies.by_node[node_key] = operation.latency target.throughput += operation.throughput target.errors.threshold = load_params.error_threshold target.total_bytes = operation.total_bytes if operation.failed_iterations: target.errors.by_node[node_key] = operation.failed_iterations for operation in summarized.values(): operation.calc_stats() return summarized