[#91] Failover enhancements

Signed-off-by: Andrey Berezin <a.berezin@yadro.com>
This commit is contained in:
Andrey Berezin 2023-10-03 15:18:29 +03:00
parent 9feb8135e3
commit 98ccd4c382
16 changed files with 200 additions and 56 deletions

View file

@ -50,7 +50,7 @@ class ScenarioRunner(ABC):
"""Returns True if load is running at the moment"""
@abstractmethod
def wait_until_finish(self):
def wait_until_finish(self, soft_timeout: int = 0):
"""Wait until load is finished"""
@abstractmethod

View file

@ -3,6 +3,7 @@ import logging
import math
import os
from dataclasses import dataclass
from datetime import datetime
from time import sleep
from typing import Any
from urllib.parse import urlparse
@ -39,6 +40,7 @@ class LoadResults:
class K6:
_k6_process: RemoteProcess
_start_time: datetime
def __init__(
self,
@ -122,6 +124,7 @@ class K6:
with reporter.step(
f"Start load from loader {self.loader.ip} on endpoints {self.endpoints}"
):
self._start_time = int(datetime.utcnow().timestamp())
command = (
f"{self._k6_dir}/k6 run {self._generate_env_variables()} "
f"{self._k6_dir}/scenarios/{self.load_params.scenario.value}.js"
@ -131,7 +134,7 @@ class K6:
command, self.shell, self.load_params.working_dir, user
)
def wait_until_finished(self) -> None:
def wait_until_finished(self, soft_timeout: int = 0) -> None:
with reporter.step(
f"Wait until load is finished from loader {self.loader.ip} on endpoints {self.endpoints}"
):
@ -140,9 +143,36 @@ class K6:
else:
timeout = self.load_params.load_time or 0
timeout += int(K6_TEARDOWN_PERIOD)
current_time = int(datetime.utcnow().timestamp())
working_time = current_time - self._start_time
remaining_time = timeout - working_time
setup_teardown_time = (
int(K6_TEARDOWN_PERIOD)
+ self.load_params.get_init_time()
+ int(self.load_params.setup_timeout.replace("s", "").strip())
)
remaining_time_including_setup_and_teardown = remaining_time + setup_teardown_time
timeout = remaining_time_including_setup_and_teardown
if soft_timeout:
timeout = min(timeout, soft_timeout)
original_timeout = timeout
timeouts = {
"K6 start time": self._start_time,
"Current time": current_time,
"K6 working time": working_time,
"Remaining time for load": remaining_time,
"Setup and teardown": setup_teardown_time,
"Remaining time including setup/teardown": remaining_time_including_setup_and_teardown,
"Soft timeout": soft_timeout,
"Selected timeout": original_timeout,
}
reporter.attach("\n".join([f"{k}: {v}" for k, v in timeouts.items()]), "timeouts.txt")
min_wait_interval = 10
wait_interval = min_wait_interval
if self._k6_process is None:
@ -162,7 +192,8 @@ class K6:
return
self.stop()
raise TimeoutError(f"Expected K6 to finish after {original_timeout} sec.")
if not soft_timeout:
raise TimeoutError(f"Expected K6 to finish after {original_timeout} sec.")
def get_results(self) -> Any:
with reporter.step(
@ -187,7 +218,7 @@ class K6:
def stop(self) -> None:
with reporter.step(f"Stop load from loader {self.loader.ip} on endpoints {self.endpoints}"):
if self.is_running:
if self.is_running():
self._k6_process.stop()
self._wait_until_process_end()

View file

@ -1,3 +1,4 @@
import math
import os
from dataclasses import dataclass, field, fields, is_dataclass
from enum import Enum
@ -133,6 +134,12 @@ class Preset:
# S3 region (AKA placement policy for S3 buckets)
s3_location: Optional[str] = metadata_field(s3_preset_scenarios, "location", None, False)
# Delay between containers creation and object upload for preset
object_upload_delay: Optional[int] = metadata_field(all_load_scenarios, "sleep", None, False)
# Flag to control preset erorrs
ignore_errors: Optional[bool] = metadata_field(all_load_scenarios, "ignore-errors", None, False)
@dataclass
class LoadParams:
@ -194,6 +201,12 @@ class LoadParams:
# https://k6.io/docs/using-k6/k6-options/reference/#setup-timeout
setup_timeout: Optional[str] = metadata_field(all_scenarios, None, "K6_SETUP_TIMEOUT", False)
# Delay for read operations in case if we read from registry
read_age: Optional[int] = metadata_field(all_load_scenarios, None, "READ_AGE", None, False)
# Initialization time for each VU for k6 load
vu_init_time: Optional[float] = None
# ------- CONSTANT VUS SCENARIO PARAMS -------
# Amount of Writers VU.
writers: Optional[int] = metadata_field(constant_vus_scenarios, None, "WRITERS", True, True)
@ -306,6 +319,16 @@ class LoadParams:
return command_args
def get_init_time(self) -> int:
return math.ceil(self._get_total_vus() * self.vu_init_time)
def _get_total_vus(self) -> int:
vu_fields = ["writers", "preallocated_writers"]
data_fields = [
getattr(self, field.name) or 0 for field in fields(self) if field.name in vu_fields
]
return sum(data_fields)
def _get_applicable_fields(self):
applicable_fields = [
meta_field

View file

@ -30,7 +30,7 @@ class MetricsBase(ABC):
@property
def write_success_iterations(self) -> int:
return self._get_metric(self._WRITE_SUCCESS)
@property
def write_latency(self) -> dict:
return self._get_metric(self._WRITE_LATENCY)
@ -54,7 +54,7 @@ class MetricsBase(ABC):
@property
def read_success_iterations(self) -> int:
return self._get_metric(self._READ_SUCCESS)
@property
def read_latency(self) -> dict:
return self._get_metric(self._READ_LATENCY)
@ -78,7 +78,7 @@ class MetricsBase(ABC):
@property
def delete_success_iterations(self) -> int:
return self._get_metric(self._DELETE_SUCCESS)
@property
def delete_latency(self) -> dict:
return self._get_metric(self._DELETE_LATENCY)
@ -92,7 +92,11 @@ class MetricsBase(ABC):
return self._get_metric_rate(self._DELETE_SUCCESS)
def _get_metric(self, metric: str) -> int:
metrics_method_map = {"counter": self._get_counter_metric, "gauge": self._get_gauge_metric, "trend" : self._get_trend_metrics}
metrics_method_map = {
"counter": self._get_counter_metric,
"gauge": self._get_gauge_metric,
"trend": self._get_trend_metrics,
}
if metric not in self.metrics:
return 0
@ -129,8 +133,8 @@ class MetricsBase(ABC):
def _get_gauge_metric(self, metric: str) -> int:
return metric["values"]["value"]
def _get_trend_metrics(self, metric: str) -> int:
def _get_trend_metrics(self, metric: str) -> int:
return metric["values"]

View file

@ -2,7 +2,6 @@ from datetime import datetime
from typing import Optional
import yaml
import os
from frostfs_testlib.load.load_config import K6ProcessAllocationStrategy, LoadParams, LoadScenario
from frostfs_testlib.load.load_metrics import get_metrics_object
@ -110,7 +109,7 @@ class LoadReport:
total_rate: float,
throughput: float,
errors: dict[str, int],
latency: dict[str, dict],
latency: dict[str, dict],
):
throughput_html = ""
if throughput > 0:
@ -131,12 +130,16 @@ class LoadReport:
latency_html = ""
if latency:
for node_key, param_dict in latency.items():
latency_values = ""
for param_name, param_val in param_dict.items():
latency_values += f"{param_name}={param_val:.2f}ms "
for node_key, latency_dict in latency.items():
latency_values = "N/A"
if latency_dict:
latency_values = ""
for param_name, param_val in latency_dict.items():
latency_values += f"{param_name}={param_val:.2f}ms "
latency_html += self._row(f"Put latency {node_key.split(':')[0]}", latency_values)
latency_html += self._row(
f"{operation_type} latency {node_key.split(':')[0]}", latency_values
)
object_size, object_size_unit = calc_unit(self.load_params.object_size, 1)
duration = self._seconds_to_formatted_duration(self.load_params.load_time)
@ -145,8 +148,8 @@ class LoadReport:
short_summary = f"{operation_type} {object_size}{object_size_unit} {duration} {requested_rate_str} {vus_str} {model} - {throughput:.2f}{unit}/s {total_rate:.2f}/s"
errors_percent = 0
if total_operations:
errors_percent = total_errors/total_operations*100.0
errors_percent = total_errors / total_operations * 100.0
html = f"""
<table border="1" cellpadding="5px"><tbody>
<tr><th colspan="2" bgcolor="gainsboro">{short_summary}</th></tr>

View file

@ -12,7 +12,7 @@ class LoadVerifier:
def __init__(self, load_params: LoadParams) -> None:
self.load_params = load_params
def verify_load_results(self, load_summaries: dict[str, dict]):
def collect_load_issues(self, load_summaries: dict[str, dict]) -> list[str]:
write_operations = 0
write_errors = 0
@ -41,38 +41,58 @@ class LoadVerifier:
delete_operations += metrics.delete_total_iterations
delete_errors += metrics.delete_failed_iterations
exceptions = []
issues = []
if writers and not write_operations:
exceptions.append(f"No any write operation was performed")
issues.append(f"No any write operation was performed")
if readers and not read_operations:
exceptions.append(f"No any read operation was performed")
issues.append(f"No any read operation was performed")
if deleters and not delete_operations:
exceptions.append(f"No any delete operation was performed")
issues.append(f"No any delete operation was performed")
if write_operations and writers and write_errors / write_operations * 100 > self.load_params.error_threshold:
exceptions.append(
if (
write_operations
and writers
and write_errors / write_operations * 100 > self.load_params.error_threshold
):
issues.append(
f"Write error rate is greater than threshold: {write_errors / write_operations * 100} > {self.load_params.error_threshold}"
)
if read_operations and readers and read_errors / read_operations * 100 > self.load_params.error_threshold:
exceptions.append(
if (
read_operations
and readers
and read_errors / read_operations * 100 > self.load_params.error_threshold
):
issues.append(
f"Read error rate is greater than threshold: {read_errors / read_operations * 100} > {self.load_params.error_threshold}"
)
if delete_operations and deleters and delete_errors / delete_operations * 100 > self.load_params.error_threshold:
exceptions.append(
if (
delete_operations
and deleters
and delete_errors / delete_operations * 100 > self.load_params.error_threshold
):
issues.append(
f"Delete error rate is greater than threshold: {delete_errors / delete_operations * 100} > {self.load_params.error_threshold}"
)
assert not exceptions, "\n".join(exceptions)
return issues
def check_verify_results(self, load_summaries, verification_summaries) -> None:
for node_or_endpoint in load_summaries:
with reporter.step(f"Check verify scenario results for {node_or_endpoint}"):
self._check_verify_result(
load_summaries[node_or_endpoint], verification_summaries[node_or_endpoint]
def collect_verify_issues(self, load_summaries, verification_summaries) -> list[str]:
verify_issues: list[str] = []
for k6_process_label in load_summaries:
with reporter.step(f"Check verify scenario results for {k6_process_label}"):
verify_issues.extend(
self._collect_verify_issues_on_process(
k6_process_label,
load_summaries[k6_process_label],
verification_summaries[k6_process_label],
)
)
return verify_issues
def _check_verify_result(self, load_summary, verification_summary) -> None:
exceptions = []
def _collect_verify_issues_on_process(
self, label, load_summary, verification_summary
) -> list[str]:
issues = []
load_metrics = get_metrics_object(self.load_params.scenario, load_summary)
@ -92,8 +112,8 @@ class LoadVerifier:
# Due to interruptions we may see total verified objects to be less than written on writers count
if abs(total_left_objects - verified_objects) > writers:
exceptions.append(
f"Verified objects mismatch. Total: {total_left_objects}, Verified: {verified_objects}. Writers: {writers}."
issues.append(
f"Verified objects mismatch for {label}. Total: {total_left_objects}, Verified: {verified_objects}. Writers: {writers}."
)
assert not exceptions, "\n".join(exceptions)
return issues

View file

@ -43,8 +43,8 @@ class RunnerBase(ScenarioRunner):
parallel([k6.preset for k6 in self.k6_instances])
@reporter.step_deco("Wait until load finish")
def wait_until_finish(self):
parallel([k6.wait_until_finished for k6 in self.k6_instances])
def wait_until_finish(self, soft_timeout: int = 0):
parallel([k6.wait_until_finished for k6 in self.k6_instances], soft_timeout=soft_timeout)
@property
def is_running(self):