[#151] Refactor load report

Signed-off-by: Andrey Berezin <a.berezin@yadro.com>
This commit is contained in:
Andrey Berezin 2023-12-20 16:02:54 +03:00 committed by Andrey Berezin
parent 663c144709
commit 10a6efa333
4 changed files with 278 additions and 321 deletions

View file

@ -0,0 +1,93 @@
from dataclasses import dataclass, field
from frostfs_testlib.load.load_config import LoadParams, LoadScenario
from frostfs_testlib.load.load_metrics import get_metrics_object
@dataclass
class SummarizedErorrs:
total: int = field(default_factory=int)
percent: float = field(default_factory=float)
threshold: float = field(default_factory=float)
by_node: dict[str, int] = field(default_factory=dict)
def calc_stats(self, operations):
self.total += sum(self.by_node.values())
if not operations:
return
self.percent = self.total / operations * 100
@dataclass
class SummarizedLatencies:
avg: float = field(default_factory=float)
min: float = field(default_factory=float)
max: float = field(default_factory=float)
by_node: dict[str, dict[str, int]] = field(default_factory=dict)
def calc_stats(self):
if not self.by_node:
return
avgs = [lt["avg"] for lt in self.by_node.values()]
self.avg = sum(avgs) / len(avgs)
minimal = [lt["min"] for lt in self.by_node.values()]
self.min = min(minimal)
maximum = [lt["max"] for lt in self.by_node.values()]
self.max = max(maximum)
@dataclass
class SummarizedStats:
threads: int = field(default_factory=int)
requested_rate: int = field(default_factory=int)
operations: int = field(default_factory=int)
rate: float = field(default_factory=float)
throughput: float = field(default_factory=float)
latencies: SummarizedLatencies = field(default_factory=SummarizedLatencies)
errors: SummarizedErorrs = field(default_factory=SummarizedErorrs)
passed: bool = True
def calc_stats(self):
self.errors.calc_stats(self.operations)
self.latencies.calc_stats()
self.passed = self.errors.percent <= self.errors.threshold
@staticmethod
def collect(load_params: LoadParams, load_summaries: dict) -> dict[str, "SummarizedStats"]:
if load_params.scenario in [LoadScenario.gRPC_CAR, LoadScenario.S3_CAR]:
delete_vus = max(load_params.preallocated_deleters or 0, load_params.max_deleters or 0)
write_vus = max(load_params.preallocated_writers or 0, load_params.max_writers or 0)
read_vus = max(load_params.preallocated_readers or 0, load_params.max_readers or 0)
else:
write_vus = load_params.writers
read_vus = load_params.readers
delete_vus = load_params.deleters
summarized = {
"Write": SummarizedStats(threads=write_vus, requested_rate=load_params.write_rate),
"Read": SummarizedStats(threads=read_vus, requested_rate=load_params.read_rate),
"Delete": SummarizedStats(threads=delete_vus, requested_rate=load_params.delete_rate),
}
for node_key, load_summary in load_summaries.items():
metrics = get_metrics_object(load_params.scenario, load_summary)
for operation in metrics.operations:
target = summarized[operation._NAME]
if not operation.total_iterations:
continue
target.operations += operation.total_iterations
target.rate += operation.rate
target.latencies.by_node[node_key] = operation.latency
target.throughput += operation.throughput
if metrics.write.failed_iterations:
target.errors.by_node[node_key] = operation.failed_iterations
for operation in summarized.values():
operation.calc_stats()
return summarized

View file

@ -1,95 +1,43 @@
from abc import ABC from abc import ABC
from typing import Any from typing import Any, Optional
from frostfs_testlib.load.load_config import LoadScenario from frostfs_testlib.load.load_config import LoadScenario
class MetricsBase(ABC): class OperationMetric(ABC):
_WRITE_SUCCESS = "" _NAME = ""
_WRITE_ERRORS = "" _SUCCESS = ""
_WRITE_THROUGHPUT = "data_sent" _ERRORS = ""
_WRITE_LATENCY = "" _THROUGHPUT = ""
_LATENCY = ""
_READ_SUCCESS = ""
_READ_ERRORS = ""
_READ_LATENCY = ""
_READ_THROUGHPUT = "data_received"
_DELETE_SUCCESS = ""
_DELETE_LATENCY = ""
_DELETE_ERRORS = ""
def __init__(self, summary) -> None: def __init__(self, summary) -> None:
self.summary = summary self.summary = summary
self.metrics = summary["metrics"] self.metrics = summary["metrics"]
@property @property
def write_total_iterations(self) -> int: def total_iterations(self) -> int:
return self._get_metric(self._WRITE_SUCCESS) + self._get_metric(self._WRITE_ERRORS) return self._get_metric(self._SUCCESS) + self._get_metric(self._ERRORS)
@property @property
def write_success_iterations(self) -> int: def success_iterations(self) -> int:
return self._get_metric(self._WRITE_SUCCESS) return self._get_metric(self._SUCCESS)
@property @property
def write_latency(self) -> dict: def latency(self) -> dict:
return self._get_metric(self._WRITE_LATENCY) return self._get_metric(self._LATENCY)
@property @property
def write_rate(self) -> float: def rate(self) -> float:
return self._get_metric_rate(self._WRITE_SUCCESS) return self._get_metric_rate(self._SUCCESS)
@property @property
def write_failed_iterations(self) -> int: def failed_iterations(self) -> int:
return self._get_metric(self._WRITE_ERRORS) return self._get_metric(self._ERRORS)
@property @property
def write_throughput(self) -> float: def throughput(self) -> float:
return self._get_metric_rate(self._WRITE_THROUGHPUT) return self._get_metric_rate(self._THROUGHPUT)
@property
def read_total_iterations(self) -> int:
return self._get_metric(self._READ_SUCCESS) + self._get_metric(self._READ_ERRORS)
@property
def read_success_iterations(self) -> int:
return self._get_metric(self._READ_SUCCESS)
@property
def read_latency(self) -> dict:
return self._get_metric(self._READ_LATENCY)
@property
def read_rate(self) -> int:
return self._get_metric_rate(self._READ_SUCCESS)
@property
def read_failed_iterations(self) -> int:
return self._get_metric(self._READ_ERRORS)
@property
def read_throughput(self) -> float:
return self._get_metric_rate(self._READ_THROUGHPUT)
@property
def delete_total_iterations(self) -> int:
return self._get_metric(self._DELETE_SUCCESS) + self._get_metric(self._DELETE_ERRORS)
@property
def delete_success_iterations(self) -> int:
return self._get_metric(self._DELETE_SUCCESS)
@property
def delete_latency(self) -> dict:
return self._get_metric(self._DELETE_LATENCY)
@property
def delete_failed_iterations(self) -> int:
return self._get_metric(self._DELETE_ERRORS)
@property
def delete_rate(self) -> int:
return self._get_metric_rate(self._DELETE_SUCCESS)
def _get_metric(self, metric: str) -> int: def _get_metric(self, metric: str) -> int:
metrics_method_map = { metrics_method_map = {
@ -104,9 +52,7 @@ class MetricsBase(ABC):
metric = self.metrics[metric] metric = self.metrics[metric]
metric_type = metric["type"] metric_type = metric["type"]
if metric_type not in metrics_method_map: if metric_type not in metrics_method_map:
raise Exception( raise Exception(f"Unsupported metric type: {metric_type}, supported: {metrics_method_map.keys()}")
f"Unsupported metric type: {metric_type}, supported: {metrics_method_map.keys()}"
)
return metrics_method_map[metric_type](metric) return metrics_method_map[metric_type](metric)
@ -119,9 +65,7 @@ class MetricsBase(ABC):
metric = self.metrics[metric] metric = self.metrics[metric]
metric_type = metric["type"] metric_type = metric["type"]
if metric_type not in metrics_method_map: if metric_type not in metrics_method_map:
raise Exception( raise Exception(f"Unsupported rate metric type: {metric_type}, supported: {metrics_method_map.keys()}")
f"Unsupported rate metric type: {metric_type}, supported: {metrics_method_map.keys()}"
)
return metrics_method_map[metric_type](metric) return metrics_method_map[metric_type](metric)
@ -138,63 +82,145 @@ class MetricsBase(ABC):
return metric["values"] return metric["values"]
class WriteOperationMetric(OperationMetric):
_NAME = "Write"
_SUCCESS = ""
_ERRORS = ""
_THROUGHPUT = "data_sent"
_LATENCY = ""
class ReadOperationMetric(OperationMetric):
_NAME = "Read"
_SUCCESS = ""
_ERRORS = ""
_THROUGHPUT = "data_received"
_LATENCY = ""
class DeleteOperationMetric(OperationMetric):
_NAME = "Delete"
_SUCCESS = ""
_ERRORS = ""
_THROUGHPUT = ""
_LATENCY = ""
class GrpcWriteOperationMetric(WriteOperationMetric):
_SUCCESS = "frostfs_obj_put_total"
_ERRORS = "frostfs_obj_put_fails"
_LATENCY = "frostfs_obj_put_duration"
class GrpcReadOperationMetric(ReadOperationMetric):
_SUCCESS = "frostfs_obj_get_total"
_ERRORS = "frostfs_obj_get_fails"
_LATENCY = "frostfs_obj_get_duration"
class GrpcDeleteOperationMetric(DeleteOperationMetric):
_SUCCESS = "frostfs_obj_delete_total"
_ERRORS = "frostfs_obj_delete_fails"
_LATENCY = "frostfs_obj_delete_duration"
class S3WriteOperationMetric(WriteOperationMetric):
_SUCCESS = "aws_obj_put_total"
_ERRORS = "aws_obj_put_fails"
_LATENCY = "aws_obj_put_duration"
class S3ReadOperationMetric(ReadOperationMetric):
_SUCCESS = "aws_obj_get_total"
_ERRORS = "aws_obj_get_fails"
_LATENCY = "aws_obj_get_duration"
class S3DeleteOperationMetric(DeleteOperationMetric):
_SUCCESS = "aws_obj_delete_total"
_ERRORS = "aws_obj_delete_fails"
_LATENCY = "aws_obj_delete_duration"
class S3LocalWriteOperationMetric(WriteOperationMetric):
_SUCCESS = "s3local_obj_put_total"
_ERRORS = "s3local_obj_put_fails"
_LATENCY = "s3local_obj_put_duration"
class S3LocalReadOperationMetric(ReadOperationMetric):
_SUCCESS = "s3local_obj_get_total"
_ERRORS = "s3local_obj_get_fails"
_LATENCY = "s3local_obj_get_duration"
class LocalWriteOperationMetric(WriteOperationMetric):
_SUCCESS = "local_obj_put_total"
_ERRORS = "local_obj_put_fails"
_LATENCY = "local_obj_put_duration"
class LocalReadOperationMetric(ReadOperationMetric):
_SUCCESS = "local_obj_get_total"
_ERRORS = "local_obj_get_fails"
class LocalDeleteOperationMetric(DeleteOperationMetric):
_SUCCESS = "local_obj_delete_total"
_ERRORS = "local_obj_delete_fails"
class VerifyReadOperationMetric(ReadOperationMetric):
_SUCCESS = "verified_obj"
_ERRORS = "invalid_obj"
class MetricsBase(ABC):
def __init__(self) -> None:
self.write: Optional[WriteOperationMetric] = None
self.read: Optional[ReadOperationMetric] = None
self.delete: Optional[DeleteOperationMetric] = None
@property
def operations(self) -> list[OperationMetric]:
return [metric for metric in [self.write, self.read, self.delete] if metric is not None]
class GrpcMetrics(MetricsBase): class GrpcMetrics(MetricsBase):
_WRITE_SUCCESS = "frostfs_obj_put_total" def __init__(self, summary) -> None:
_WRITE_ERRORS = "frostfs_obj_put_fails" super().__init__()
_WRITE_LATENCY = "frostfs_obj_put_duration" self.write = GrpcWriteOperationMetric(summary)
self.read = GrpcReadOperationMetric(summary)
_READ_SUCCESS = "frostfs_obj_get_total" self.delete = GrpcDeleteOperationMetric(summary)
_READ_ERRORS = "frostfs_obj_get_fails"
_READ_LATENCY = "frostfs_obj_get_duration"
_DELETE_SUCCESS = "frostfs_obj_delete_total"
_DELETE_ERRORS = "frostfs_obj_delete_fails"
_DELETE_LATENCY = "frostfs_obj_delete_duration"
class S3Metrics(MetricsBase): class S3Metrics(MetricsBase):
_WRITE_SUCCESS = "aws_obj_put_total" def __init__(self, summary) -> None:
_WRITE_ERRORS = "aws_obj_put_fails" super().__init__()
_WRITE_LATENCY = "aws_obj_put_duration" self.write = S3WriteOperationMetric(summary)
self.read = S3ReadOperationMetric(summary)
self.delete = S3DeleteOperationMetric(summary)
_READ_SUCCESS = "aws_obj_get_total"
_READ_ERRORS = "aws_obj_get_fails"
_READ_LATENCY = "aws_obj_get_duration"
_DELETE_SUCCESS = "aws_obj_delete_total"
_DELETE_ERRORS = "aws_obj_delete_fails"
_DELETE_LATENCY = "aws_obj_delete_duration"
class S3LocalMetrics(MetricsBase): class S3LocalMetrics(MetricsBase):
_WRITE_SUCCESS = "s3local_obj_put_total" def __init__(self, summary) -> None:
_WRITE_ERRORS = "s3local_obj_put_fails" super().__init__()
_WRITE_LATENCY = "s3local_obj_put_duration" self.write = S3LocalWriteOperationMetric(summary)
self.read = S3LocalReadOperationMetric(summary)
_READ_SUCCESS = "s3local_obj_get_total"
_READ_ERRORS = "s3local_obj_get_fails"
_READ_LATENCY = "s3local_obj_get_duration"
class LocalMetrics(MetricsBase): class LocalMetrics(MetricsBase):
_WRITE_SUCCESS = "local_obj_put_total" def __init__(self, summary) -> None:
_WRITE_ERRORS = "local_obj_put_fails" super().__init__()
_WRITE_LATENCY = "local_obj_put_duration" self.write = LocalWriteOperationMetric(summary)
self.read = LocalReadOperationMetric(summary)
_READ_SUCCESS = "local_obj_get_total" self.delete = LocalDeleteOperationMetric(summary)
_READ_ERRORS = "local_obj_get_fails"
_DELETE_SUCCESS = "local_obj_delete_total"
_DELETE_ERRORS = "local_obj_delete_fails"
class VerifyMetrics(MetricsBase): class VerifyMetrics(MetricsBase):
_WRITE_SUCCESS = "N/A" def __init__(self, summary) -> None:
_WRITE_ERRORS = "N/A" super().__init__()
self.read = VerifyReadOperationMetric(summary)
_READ_SUCCESS = "verified_obj"
_READ_ERRORS = "invalid_obj"
_DELETE_SUCCESS = "N/A"
_DELETE_ERRORS = "N/A"
def get_metrics_object(load_type: LoadScenario, summary: dict[str, Any]) -> MetricsBase: def get_metrics_object(load_type: LoadScenario, summary: dict[str, Any]) -> MetricsBase:

View file

@ -3,8 +3,8 @@ from typing import Optional
import yaml import yaml
from frostfs_testlib.load.interfaces.summarized import SummarizedStats
from frostfs_testlib.load.load_config import K6ProcessAllocationStrategy, LoadParams, LoadScenario from frostfs_testlib.load.load_config import K6ProcessAllocationStrategy, LoadParams, LoadScenario
from frostfs_testlib.load.load_metrics import get_metrics_object
from frostfs_testlib.utils.converting_utils import calc_unit from frostfs_testlib.utils.converting_utils import calc_unit
@ -114,38 +114,23 @@ class LoadReport:
return model_map[self.load_params.scenario] return model_map[self.load_params.scenario]
def _get_operations_sub_section_html( def _get_operations_sub_section_html(self, operation_type: str, stats: SummarizedStats):
self,
operation_type: str,
total_operations: int,
requested_rate_str: str,
vus_str: str,
total_rate: float,
throughput: float,
errors: dict[str, int],
latency: dict[str, dict],
):
throughput_html = "" throughput_html = ""
if throughput > 0: if stats.throughput > 0:
throughput, unit = calc_unit(throughput) throughput, unit = calc_unit(stats.throughput)
throughput_html = self._row("Throughput", f"{throughput:.2f} {unit}/sec") throughput_html = self._row("Throughput", f"{throughput:.2f} {unit}/sec")
per_node_errors_html = "" per_node_errors_html = ""
total_errors = 0 for node_key, errors in stats.errors.by_node.items():
if errors:
total_errors: int = 0
for node_key, errors in errors.items():
total_errors += errors
if self.load_params.k6_process_allocation_strategy == K6ProcessAllocationStrategy.PER_ENDPOINT: if self.load_params.k6_process_allocation_strategy == K6ProcessAllocationStrategy.PER_ENDPOINT:
per_node_errors_html += self._row(f"At {node_key}", errors) per_node_errors_html += self._row(f"At {node_key}", errors)
latency_html = "" latency_html = ""
if latency: for node_key, latencies in stats.latencies.by_node.items():
for node_key, latency_dict in latency.items():
latency_values = "N/A" latency_values = "N/A"
if latency_dict: if latencies:
latency_values = "" latency_values = ""
for param_name, param_val in latency_dict.items(): for param_name, param_val in latencies.items():
latency_values += f"{param_name}={param_val:.2f}ms " latency_values += f"{param_name}={param_val:.2f}ms "
latency_html += self._row(f"{operation_type} latency {node_key.split(':')[0]}", latency_values) latency_html += self._row(f"{operation_type} latency {node_key.split(':')[0]}", latency_values)
@ -153,24 +138,22 @@ class LoadReport:
object_size, object_size_unit = calc_unit(self.load_params.object_size, 1) object_size, object_size_unit = calc_unit(self.load_params.object_size, 1)
duration = self._seconds_to_formatted_duration(self.load_params.load_time) duration = self._seconds_to_formatted_duration(self.load_params.load_time)
model = self._get_model_string() model = self._get_model_string()
requested_rate_str = f"{stats.requested_rate}op/sec" if stats.requested_rate else ""
# write 8KB 15h49m 50op/sec 50th open model/closed model/min_iteration duration=1s - 1.636MB/s 199.57451/s # write 8KB 15h49m 50op/sec 50th open model/closed model/min_iteration duration=1s - 1.636MB/s 199.57451/s
short_summary = f"{operation_type} {object_size}{object_size_unit} {duration} {requested_rate_str} {vus_str} {model} - {throughput:.2f}{unit}/s {total_rate:.2f}/s" short_summary = f"{operation_type} {object_size}{object_size_unit} {duration} {requested_rate_str} {stats.threads}th {model} - {throughput:.2f}{unit}/s {stats.rate:.2f}/s"
errors_percent = 0
if total_operations:
errors_percent = total_errors / total_operations * 100.0
html = f""" html = f"""
<table border="1" cellpadding="5px"><tbody> <table border="1" cellpadding="5px"><tbody>
<tr><th colspan="2" bgcolor="gainsboro">{short_summary}</th></tr> <tr><th colspan="2" bgcolor="gainsboro">{short_summary}</th></tr>
<tr><th colspan="2" bgcolor="gainsboro">Metrics</th></tr> <tr><th colspan="2" bgcolor="gainsboro">Metrics</th></tr>
{self._row("Total operations", total_operations)} {self._row("Total operations", stats.operations)}
{self._row("OP/sec", f"{total_rate:.2f}")} {self._row("OP/sec", f"{stats.rate:.2f}")}
{throughput_html} {throughput_html}
{latency_html} {latency_html}
<tr><th colspan="2" bgcolor="gainsboro">Errors</th></tr> <tr><th colspan="2" bgcolor="gainsboro">Errors</th></tr>
{per_node_errors_html} {per_node_errors_html}
{self._row("Total", f"{total_errors} ({errors_percent:.2f}%)")} {self._row("Total", f"{stats.errors.total} ({stats.errors.percent:.2f}%)")}
{self._row("Threshold", f"{self.load_params.error_threshold:.2f}%")} {self._row("Threshold", f"{stats.errors.threshold:.2f}%")}
</tbody></table><br><hr> </tbody></table><br><hr>
""" """
@ -178,111 +161,12 @@ class LoadReport:
def _get_totals_section_html(self): def _get_totals_section_html(self):
html = "" html = ""
for i, load_summaries in enumerate(self.load_summaries_list, 1): for i in range(len(self.load_summaries_list)):
html += f"<h3>Load Results for load #{i}</h3>" html += f"<h3>Load Results for load #{i+1}</h3>"
write_operations = 0 summarized = SummarizedStats.collect(self.load_params, self.load_summaries_list[i])
write_op_sec = 0 for operation_type, stats in summarized.items():
write_throughput = 0 if stats.operations:
write_latency = {} html += self._get_operations_sub_section_html(operation_type, stats)
write_errors = {}
requested_write_rate = self.load_params.write_rate
requested_write_rate_str = f"{requested_write_rate}op/sec" if requested_write_rate else ""
read_operations = 0
read_op_sec = 0
read_throughput = 0
read_latency = {}
read_errors = {}
requested_read_rate = self.load_params.read_rate
requested_read_rate_str = f"{requested_read_rate}op/sec" if requested_read_rate else ""
delete_operations = 0
delete_op_sec = 0
delete_latency = {}
delete_errors = {}
requested_delete_rate = self.load_params.delete_rate
requested_delete_rate_str = f"{requested_delete_rate}op/sec" if requested_delete_rate else ""
if self.load_params.scenario in [LoadScenario.gRPC_CAR, LoadScenario.S3_CAR]:
delete_vus = max(self.load_params.preallocated_deleters or 0, self.load_params.max_deleters or 0)
write_vus = max(self.load_params.preallocated_writers or 0, self.load_params.max_writers or 0)
read_vus = max(self.load_params.preallocated_readers or 0, self.load_params.max_readers or 0)
else:
write_vus = self.load_params.writers
read_vus = self.load_params.readers
delete_vus = self.load_params.deleters
write_vus_str = f"{write_vus}th"
read_vus_str = f"{read_vus}th"
delete_vus_str = f"{delete_vus}th"
write_section_required = False
read_section_required = False
delete_section_required = False
for node_key, load_summary in load_summaries.items():
metrics = get_metrics_object(self.load_params.scenario, load_summary)
write_operations += metrics.write_total_iterations
if write_operations:
write_section_required = True
write_op_sec += metrics.write_rate
write_latency[node_key] = metrics.write_latency
write_throughput += metrics.write_throughput
if metrics.write_failed_iterations:
write_errors[node_key] = metrics.write_failed_iterations
read_operations += metrics.read_total_iterations
if read_operations:
read_section_required = True
read_op_sec += metrics.read_rate
read_throughput += metrics.read_throughput
read_latency[node_key] = metrics.read_latency
if metrics.read_failed_iterations:
read_errors[node_key] = metrics.read_failed_iterations
delete_operations += metrics.delete_total_iterations
if delete_operations:
delete_section_required = True
delete_op_sec += metrics.delete_rate
delete_latency[node_key] = metrics.delete_latency
if metrics.delete_failed_iterations:
delete_errors[node_key] = metrics.delete_failed_iterations
if write_section_required:
html += self._get_operations_sub_section_html(
"Write",
write_operations,
requested_write_rate_str,
write_vus_str,
write_op_sec,
write_throughput,
write_errors,
write_latency,
)
if read_section_required:
html += self._get_operations_sub_section_html(
"Read",
read_operations,
requested_read_rate_str,
read_vus_str,
read_op_sec,
read_throughput,
read_errors,
read_latency,
)
if delete_section_required:
html += self._get_operations_sub_section_html(
"Delete",
delete_operations,
requested_delete_rate_str,
delete_vus_str,
delete_op_sec,
0,
delete_errors,
delete_latency,
)
return html return html

View file

@ -1,4 +1,5 @@
from frostfs_testlib import reporter from frostfs_testlib import reporter
from frostfs_testlib.load.interfaces.summarized import SummarizedStats
from frostfs_testlib.load.load_config import LoadParams, LoadScenario from frostfs_testlib.load.load_config import LoadParams, LoadScenario
from frostfs_testlib.load.load_metrics import get_metrics_object from frostfs_testlib.load.load_metrics import get_metrics_object
@ -8,56 +9,16 @@ class LoadVerifier:
self.load_params = load_params self.load_params = load_params
def collect_load_issues(self, load_summaries: dict[str, dict]) -> list[str]: def collect_load_issues(self, load_summaries: dict[str, dict]) -> list[str]:
write_operations = 0 summarized = SummarizedStats.collect(self.load_params, load_summaries)
write_errors = 0
read_operations = 0
read_errors = 0
delete_operations = 0
delete_errors = 0
writers = self.load_params.writers or self.load_params.preallocated_writers or 0
readers = self.load_params.readers or self.load_params.preallocated_readers or 0
deleters = self.load_params.deleters or self.load_params.preallocated_deleters or 0
for load_summary in load_summaries.values():
metrics = get_metrics_object(self.load_params.scenario, load_summary)
if writers:
write_operations += metrics.write_total_iterations
write_errors += metrics.write_failed_iterations
if readers:
read_operations += metrics.read_total_iterations
read_errors += metrics.read_failed_iterations
if deleters:
delete_operations += metrics.delete_total_iterations
delete_errors += metrics.delete_failed_iterations
issues = [] issues = []
if writers and not write_operations:
issues.append(f"No any write operation was performed")
if readers and not read_operations:
issues.append(f"No any read operation was performed")
if deleters and not delete_operations:
issues.append(f"No any delete operation was performed")
error_rate = self._get_error_rate(writers, write_operations, write_errors) for operation_type, stats in summarized.items():
if error_rate > self.load_params.error_threshold: if stats.threads and not stats.operations:
rate_str = self._get_rate_str(error_rate) issues.append(f"No any {operation_type.lower()} operation was performed")
issues.append(f"Write errors exceeded threshold: {rate_str} > {self.load_params.error_threshold}%")
error_rate = self._get_error_rate(readers, read_operations, read_errors) if stats.errors.percent > stats.errors.threshold:
if error_rate > self.load_params.error_threshold: rate_str = self._get_rate_str(stats.errors.percent)
rate_str = self._get_rate_str(error_rate) issues.append(f"{operation_type} errors exceeded threshold: {rate_str} > {stats.errors.threshold}%")
issues.append(f"Read errors exceeded threshold: {rate_str} > {self.load_params.error_threshold}%")
error_rate = self._get_error_rate(deleters, delete_operations, delete_errors)
if error_rate > self.load_params.error_threshold:
rate_str = self._get_rate_str(error_rate)
issues.append(f"Delete errors exceeded threshold: {rate_str} > {self.load_params.error_threshold}%")
return issues return issues
@ -74,13 +35,6 @@ class LoadVerifier:
) )
return verify_issues return verify_issues
def _get_error_rate(self, vus: int, operations: int, errors: int) -> float:
if not operations or not vus:
return 0
error_rate = errors / operations * 100
return error_rate
def _get_rate_str(self, rate: float, minimal: float = 0.01) -> str: def _get_rate_str(self, rate: float, minimal: float = 0.01) -> str:
return f"{rate:.2f}%" if rate >= minimal else f"~{minimal}%" return f"{rate:.2f}%" if rate >= minimal else f"~{minimal}%"
@ -95,13 +49,13 @@ class LoadVerifier:
delete_success = 0 delete_success = 0
if deleters > 0: if deleters > 0:
delete_success = load_metrics.delete_success_iterations delete_success = load_metrics.delete.success_iterations
if verification_summary: if verification_summary:
verify_metrics = get_metrics_object(LoadScenario.VERIFY, verification_summary) verify_metrics = get_metrics_object(LoadScenario.VERIFY, verification_summary)
verified_objects = verify_metrics.read_success_iterations verified_objects = verify_metrics.read.success_iterations
invalid_objects = verify_metrics.read_failed_iterations invalid_objects = verify_metrics.read.failed_iterations
total_left_objects = load_metrics.write_success_iterations - delete_success total_left_objects = load_metrics.write.success_iterations - delete_success
# Due to interruptions we may see total verified objects to be less than written on writers count # Due to interruptions we may see total verified objects to be less than written on writers count
if abs(total_left_objects - verified_objects) > writers: if abs(total_left_objects - verified_objects) > writers: