import logging from frostfs_testlib.load.load_config import LoadParams, LoadScenario from frostfs_testlib.load.load_metrics import get_metrics_object from frostfs_testlib.reporter import get_reporter reporter = get_reporter() logger = logging.getLogger("NeoLogger") class LoadVerifier: def __init__(self, load_params: LoadParams) -> None: self.load_params = load_params def verify_load_results(self, load_summaries: dict[str, dict]): write_operations = 0 write_errors = 0 read_operations = 0 read_errors = 0 delete_operations = 0 delete_errors = 0 writers = self.load_params.writers or self.load_params.preallocated_writers or 0 readers = self.load_params.readers or self.load_params.preallocated_readers or 0 deleters = self.load_params.deleters or self.load_params.preallocated_deleters or 0 for load_summary in load_summaries.values(): metrics = get_metrics_object(self.load_params.scenario, load_summary) if writers: write_operations += metrics.write_total_iterations write_errors += metrics.write_failed_iterations if readers: read_operations += metrics.read_total_iterations read_errors += metrics.read_failed_iterations if deleters: delete_operations += metrics.delete_total_iterations delete_errors += metrics.delete_failed_iterations exceptions = [] if writers and not write_operations: exceptions.append(f"No any write operation was performed") if readers and not read_operations: exceptions.append(f"No any read operation was performed") if deleters and not delete_operations: exceptions.append(f"No any delete operation was performed") if write_operations and writers and write_errors / write_operations * 100 > self.load_params.error_threshold: exceptions.append( f"Write error rate is greater than threshold: {write_errors / write_operations * 100} > {self.load_params.error_threshold}" ) if read_operations and readers and read_errors / read_operations * 100 > self.load_params.error_threshold: exceptions.append( f"Read error rate is greater than threshold: {read_errors / read_operations * 100} > {self.load_params.error_threshold}" ) if delete_operations and deleters and delete_errors / delete_operations * 100 > self.load_params.error_threshold: exceptions.append( f"Delete error rate is greater than threshold: {delete_errors / delete_operations * 100} > {self.load_params.error_threshold}" ) assert not exceptions, "\n".join(exceptions) def check_verify_results(self, load_summaries, verification_summaries) -> None: for node_or_endpoint in load_summaries: with reporter.step(f"Check verify scenario results for {node_or_endpoint}"): self._check_verify_result( load_summaries[node_or_endpoint], verification_summaries[node_or_endpoint] ) def _check_verify_result(self, load_summary, verification_summary) -> None: exceptions = [] load_metrics = get_metrics_object(self.load_params.scenario, load_summary) writers = self.load_params.writers or self.load_params.preallocated_writers or 0 deleters = self.load_params.deleters or self.load_params.preallocated_deleters or 0 delete_success = 0 if deleters > 0: delete_success = load_metrics.delete_success_iterations if verification_summary: verify_metrics = get_metrics_object(LoadScenario.VERIFY, verification_summary) verified_objects = verify_metrics.read_success_iterations invalid_objects = verify_metrics.read_failed_iterations total_left_objects = load_metrics.write_success_iterations - delete_success # Due to interruptions we may see total verified objects to be less than written on writers count if abs(total_left_objects - verified_objects) > writers: exceptions.append( f"Verified objects mismatch. Total: {total_left_objects}, Verified: {verified_objects}. Writers: {writers}." ) assert not exceptions, "\n".join(exceptions)