diff --git a/pyproject.toml b/pyproject.toml index f85b883..778e2fc 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -44,6 +44,9 @@ allure = "frostfs_testlib.reporter.allure_handler:AllureHandler" [project.entry-points."frostfs.testlib.hosting"] docker = "frostfs_testlib.hosting.docker_host:DockerHost" +[project.entry-points."frostfs.testlib.healthcheck"] +basic = "frostfs_testlib.healthcheck.basic_healthcheck:BasicHealthcheck" + [tool.isort] profile = "black" src_paths = ["src", "tests"] diff --git a/src/frostfs_testlib/cli/frostfs_cli/cli.py b/src/frostfs_testlib/cli/frostfs_cli/cli.py index 5d55f55..a78da8b 100644 --- a/src/frostfs_testlib/cli/frostfs_cli/cli.py +++ b/src/frostfs_testlib/cli/frostfs_cli/cli.py @@ -8,6 +8,7 @@ from frostfs_testlib.cli.frostfs_cli.object import FrostfsCliObject from frostfs_testlib.cli.frostfs_cli.session import FrostfsCliSession from frostfs_testlib.cli.frostfs_cli.shards import FrostfsCliShards from frostfs_testlib.cli.frostfs_cli.storagegroup import FrostfsCliStorageGroup +from frostfs_testlib.cli.frostfs_cli.tree import FrostfsCliTree from frostfs_testlib.cli.frostfs_cli.util import FrostfsCliUtil from frostfs_testlib.cli.frostfs_cli.version import FrostfsCliVersion from frostfs_testlib.shell import Shell @@ -36,3 +37,4 @@ class FrostfsCli: self.storagegroup = FrostfsCliStorageGroup(shell, frostfs_cli_exec_path, config=config_file) self.util = FrostfsCliUtil(shell, frostfs_cli_exec_path, config=config_file) self.version = FrostfsCliVersion(shell, frostfs_cli_exec_path, config=config_file) + self.tree = FrostfsCliTree(shell, frostfs_cli_exec_path, config=config_file) diff --git a/src/frostfs_testlib/cli/frostfs_cli/tree.py b/src/frostfs_testlib/cli/frostfs_cli/tree.py new file mode 100644 index 0000000..af330fe --- /dev/null +++ b/src/frostfs_testlib/cli/frostfs_cli/tree.py @@ -0,0 +1,29 @@ +from typing import Optional + +from frostfs_testlib.cli.cli_command import CliCommand +from frostfs_testlib.shell import CommandResult + + +class FrostfsCliTree(CliCommand): + def healthcheck( + self, + wallet: Optional[str] = None, + rpc_endpoint: Optional[str] = None, + timeout: Optional[str] = None, + ) -> CommandResult: + """Get internal balance of FrostFS account + + Args: + address: Address of wallet account. + owner: Owner of balance account (omit to use owner from private key). + rpc_endpoint: Remote node address (as 'multiaddr' or ':'). + wallet: WIF (NEP-2) string or path to the wallet or binary key. + + Returns: + Command's result. + + """ + return self._execute( + "tree healthcheck", + **{param: value for param, value in locals().items() if param not in ["self"]}, + ) diff --git a/src/frostfs_testlib/healthcheck/basic_healthcheck.py b/src/frostfs_testlib/healthcheck/basic_healthcheck.py new file mode 100644 index 0000000..9ec8694 --- /dev/null +++ b/src/frostfs_testlib/healthcheck/basic_healthcheck.py @@ -0,0 +1,14 @@ +from frostfs_testlib.healthcheck.interfaces import Healthcheck +from frostfs_testlib.reporter import get_reporter +from frostfs_testlib.steps.node_management import storage_node_healthcheck +from frostfs_testlib.storage.cluster import ClusterNode + +reporter = get_reporter() + + +class BasicHealthcheck(Healthcheck): + @reporter.step_deco("Perform healthcheck for {cluster_node}") + def perform_healthcheck(self, cluster_node: ClusterNode): + health_check = storage_node_healthcheck(cluster_node.storage_node) + if health_check.health_status != "READY" or health_check.network_status != "ONLINE": + raise AssertionError("Node {cluster_node} is not healthy") diff --git a/src/frostfs_testlib/healthcheck/interfaces.py b/src/frostfs_testlib/healthcheck/interfaces.py new file mode 100644 index 0000000..0c77957 --- /dev/null +++ b/src/frostfs_testlib/healthcheck/interfaces.py @@ -0,0 +1,9 @@ +from abc import ABC, abstractmethod + +from frostfs_testlib.storage.cluster import ClusterNode + + +class Healthcheck(ABC): + @abstractmethod + def perform(self, cluster_node: ClusterNode): + """Perform healthcheck on the target cluster node""" diff --git a/src/frostfs_testlib/hosting/config.py b/src/frostfs_testlib/hosting/config.py index 6679470..88fe3e7 100644 --- a/src/frostfs_testlib/hosting/config.py +++ b/src/frostfs_testlib/hosting/config.py @@ -52,6 +52,7 @@ class HostConfig: Attributes: plugin_name: Name of plugin that should be used to manage the host. + healthcheck_plugin_name: Name of the plugin for healthcheck operations. address: Address of the machine (IP or DNS name). services: List of services hosted on the machine. clis: List of CLI tools available on the machine. @@ -60,6 +61,7 @@ class HostConfig: """ plugin_name: str + healthcheck_plugin_name: str address: str services: list[ServiceConfig] = field(default_factory=list) clis: list[CLIConfig] = field(default_factory=list) diff --git a/src/frostfs_testlib/load/interfaces.py b/src/frostfs_testlib/load/interfaces.py index 98c9d62..394fff7 100644 --- a/src/frostfs_testlib/load/interfaces.py +++ b/src/frostfs_testlib/load/interfaces.py @@ -50,7 +50,7 @@ class ScenarioRunner(ABC): """Returns True if load is running at the moment""" @abstractmethod - def wait_until_finish(self): + def wait_until_finish(self, soft_timeout: int = 0): """Wait until load is finished""" @abstractmethod diff --git a/src/frostfs_testlib/load/k6.py b/src/frostfs_testlib/load/k6.py index cb3576e..e7a2b39 100644 --- a/src/frostfs_testlib/load/k6.py +++ b/src/frostfs_testlib/load/k6.py @@ -3,6 +3,7 @@ import logging import math import os from dataclasses import dataclass +from datetime import datetime from time import sleep from typing import Any from urllib.parse import urlparse @@ -39,6 +40,7 @@ class LoadResults: class K6: _k6_process: RemoteProcess + _start_time: datetime def __init__( self, @@ -122,6 +124,7 @@ class K6: with reporter.step( f"Start load from loader {self.loader.ip} on endpoints {self.endpoints}" ): + self._start_time = int(datetime.utcnow().timestamp()) command = ( f"{self._k6_dir}/k6 run {self._generate_env_variables()} " f"{self._k6_dir}/scenarios/{self.load_params.scenario.value}.js" @@ -131,7 +134,7 @@ class K6: command, self.shell, self.load_params.working_dir, user ) - def wait_until_finished(self) -> None: + def wait_until_finished(self, soft_timeout: int = 0) -> None: with reporter.step( f"Wait until load is finished from loader {self.loader.ip} on endpoints {self.endpoints}" ): @@ -140,9 +143,36 @@ class K6: else: timeout = self.load_params.load_time or 0 - timeout += int(K6_TEARDOWN_PERIOD) + current_time = int(datetime.utcnow().timestamp()) + working_time = current_time - self._start_time + remaining_time = timeout - working_time + + setup_teardown_time = ( + int(K6_TEARDOWN_PERIOD) + + self.load_params.get_init_time() + + int(self.load_params.setup_timeout.replace("s", "").strip()) + ) + remaining_time_including_setup_and_teardown = remaining_time + setup_teardown_time + timeout = remaining_time_including_setup_and_teardown + + if soft_timeout: + timeout = min(timeout, soft_timeout) + original_timeout = timeout + timeouts = { + "K6 start time": self._start_time, + "Current time": current_time, + "K6 working time": working_time, + "Remaining time for load": remaining_time, + "Setup and teardown": setup_teardown_time, + "Remaining time including setup/teardown": remaining_time_including_setup_and_teardown, + "Soft timeout": soft_timeout, + "Selected timeout": original_timeout, + } + + reporter.attach("\n".join([f"{k}: {v}" for k, v in timeouts.items()]), "timeouts.txt") + min_wait_interval = 10 wait_interval = min_wait_interval if self._k6_process is None: @@ -162,7 +192,8 @@ class K6: return self.stop() - raise TimeoutError(f"Expected K6 to finish after {original_timeout} sec.") + if not soft_timeout: + raise TimeoutError(f"Expected K6 to finish after {original_timeout} sec.") def get_results(self) -> Any: with reporter.step( @@ -187,7 +218,7 @@ class K6: def stop(self) -> None: with reporter.step(f"Stop load from loader {self.loader.ip} on endpoints {self.endpoints}"): - if self.is_running: + if self.is_running(): self._k6_process.stop() self._wait_until_process_end() diff --git a/src/frostfs_testlib/load/load_config.py b/src/frostfs_testlib/load/load_config.py index 97f5dd6..678fc38 100644 --- a/src/frostfs_testlib/load/load_config.py +++ b/src/frostfs_testlib/load/load_config.py @@ -1,3 +1,4 @@ +import math import os from dataclasses import dataclass, field, fields, is_dataclass from enum import Enum @@ -133,6 +134,12 @@ class Preset: # S3 region (AKA placement policy for S3 buckets) s3_location: Optional[str] = metadata_field(s3_preset_scenarios, "location", None, False) + # Delay between containers creation and object upload for preset + object_upload_delay: Optional[int] = metadata_field(all_load_scenarios, "sleep", None, False) + + # Flag to control preset erorrs + ignore_errors: Optional[bool] = metadata_field(all_load_scenarios, "ignore-errors", None, False) + @dataclass class LoadParams: @@ -194,6 +201,12 @@ class LoadParams: # https://k6.io/docs/using-k6/k6-options/reference/#setup-timeout setup_timeout: Optional[str] = metadata_field(all_scenarios, None, "K6_SETUP_TIMEOUT", False) + # Delay for read operations in case if we read from registry + read_age: Optional[int] = metadata_field(all_load_scenarios, None, "READ_AGE", None, False) + + # Initialization time for each VU for k6 load + vu_init_time: Optional[float] = None + # ------- CONSTANT VUS SCENARIO PARAMS ------- # Amount of Writers VU. writers: Optional[int] = metadata_field(constant_vus_scenarios, None, "WRITERS", True, True) @@ -306,6 +319,16 @@ class LoadParams: return command_args + def get_init_time(self) -> int: + return math.ceil(self._get_total_vus() * self.vu_init_time) + + def _get_total_vus(self) -> int: + vu_fields = ["writers", "preallocated_writers"] + data_fields = [ + getattr(self, field.name) or 0 for field in fields(self) if field.name in vu_fields + ] + return sum(data_fields) + def _get_applicable_fields(self): applicable_fields = [ meta_field diff --git a/src/frostfs_testlib/load/load_metrics.py b/src/frostfs_testlib/load/load_metrics.py index 5cec8ea..6c201ec 100644 --- a/src/frostfs_testlib/load/load_metrics.py +++ b/src/frostfs_testlib/load/load_metrics.py @@ -30,7 +30,7 @@ class MetricsBase(ABC): @property def write_success_iterations(self) -> int: return self._get_metric(self._WRITE_SUCCESS) - + @property def write_latency(self) -> dict: return self._get_metric(self._WRITE_LATENCY) @@ -54,7 +54,7 @@ class MetricsBase(ABC): @property def read_success_iterations(self) -> int: return self._get_metric(self._READ_SUCCESS) - + @property def read_latency(self) -> dict: return self._get_metric(self._READ_LATENCY) @@ -78,7 +78,7 @@ class MetricsBase(ABC): @property def delete_success_iterations(self) -> int: return self._get_metric(self._DELETE_SUCCESS) - + @property def delete_latency(self) -> dict: return self._get_metric(self._DELETE_LATENCY) @@ -92,7 +92,11 @@ class MetricsBase(ABC): return self._get_metric_rate(self._DELETE_SUCCESS) def _get_metric(self, metric: str) -> int: - metrics_method_map = {"counter": self._get_counter_metric, "gauge": self._get_gauge_metric, "trend" : self._get_trend_metrics} + metrics_method_map = { + "counter": self._get_counter_metric, + "gauge": self._get_gauge_metric, + "trend": self._get_trend_metrics, + } if metric not in self.metrics: return 0 @@ -129,8 +133,8 @@ class MetricsBase(ABC): def _get_gauge_metric(self, metric: str) -> int: return metric["values"]["value"] - - def _get_trend_metrics(self, metric: str) -> int: + + def _get_trend_metrics(self, metric: str) -> int: return metric["values"] diff --git a/src/frostfs_testlib/load/load_report.py b/src/frostfs_testlib/load/load_report.py index 26ab542..ec6d539 100644 --- a/src/frostfs_testlib/load/load_report.py +++ b/src/frostfs_testlib/load/load_report.py @@ -2,7 +2,6 @@ from datetime import datetime from typing import Optional import yaml -import os from frostfs_testlib.load.load_config import K6ProcessAllocationStrategy, LoadParams, LoadScenario from frostfs_testlib.load.load_metrics import get_metrics_object @@ -110,7 +109,7 @@ class LoadReport: total_rate: float, throughput: float, errors: dict[str, int], - latency: dict[str, dict], + latency: dict[str, dict], ): throughput_html = "" if throughput > 0: @@ -131,12 +130,16 @@ class LoadReport: latency_html = "" if latency: - for node_key, param_dict in latency.items(): - latency_values = "" - for param_name, param_val in param_dict.items(): - latency_values += f"{param_name}={param_val:.2f}ms " + for node_key, latency_dict in latency.items(): + latency_values = "N/A" + if latency_dict: + latency_values = "" + for param_name, param_val in latency_dict.items(): + latency_values += f"{param_name}={param_val:.2f}ms " - latency_html += self._row(f"Put latency {node_key.split(':')[0]}", latency_values) + latency_html += self._row( + f"{operation_type} latency {node_key.split(':')[0]}", latency_values + ) object_size, object_size_unit = calc_unit(self.load_params.object_size, 1) duration = self._seconds_to_formatted_duration(self.load_params.load_time) @@ -145,8 +148,8 @@ class LoadReport: short_summary = f"{operation_type} {object_size}{object_size_unit} {duration} {requested_rate_str} {vus_str} {model} - {throughput:.2f}{unit}/s {total_rate:.2f}/s" errors_percent = 0 if total_operations: - errors_percent = total_errors/total_operations*100.0 - + errors_percent = total_errors / total_operations * 100.0 + html = f""" diff --git a/src/frostfs_testlib/load/load_verifiers.py b/src/frostfs_testlib/load/load_verifiers.py index 80c3962..b691b02 100644 --- a/src/frostfs_testlib/load/load_verifiers.py +++ b/src/frostfs_testlib/load/load_verifiers.py @@ -12,7 +12,7 @@ class LoadVerifier: def __init__(self, load_params: LoadParams) -> None: self.load_params = load_params - def verify_load_results(self, load_summaries: dict[str, dict]): + def collect_load_issues(self, load_summaries: dict[str, dict]) -> list[str]: write_operations = 0 write_errors = 0 @@ -41,38 +41,58 @@ class LoadVerifier: delete_operations += metrics.delete_total_iterations delete_errors += metrics.delete_failed_iterations - exceptions = [] + issues = [] if writers and not write_operations: - exceptions.append(f"No any write operation was performed") + issues.append(f"No any write operation was performed") if readers and not read_operations: - exceptions.append(f"No any read operation was performed") + issues.append(f"No any read operation was performed") if deleters and not delete_operations: - exceptions.append(f"No any delete operation was performed") + issues.append(f"No any delete operation was performed") - if write_operations and writers and write_errors / write_operations * 100 > self.load_params.error_threshold: - exceptions.append( + if ( + write_operations + and writers + and write_errors / write_operations * 100 > self.load_params.error_threshold + ): + issues.append( f"Write error rate is greater than threshold: {write_errors / write_operations * 100} > {self.load_params.error_threshold}" ) - if read_operations and readers and read_errors / read_operations * 100 > self.load_params.error_threshold: - exceptions.append( + if ( + read_operations + and readers + and read_errors / read_operations * 100 > self.load_params.error_threshold + ): + issues.append( f"Read error rate is greater than threshold: {read_errors / read_operations * 100} > {self.load_params.error_threshold}" ) - if delete_operations and deleters and delete_errors / delete_operations * 100 > self.load_params.error_threshold: - exceptions.append( + if ( + delete_operations + and deleters + and delete_errors / delete_operations * 100 > self.load_params.error_threshold + ): + issues.append( f"Delete error rate is greater than threshold: {delete_errors / delete_operations * 100} > {self.load_params.error_threshold}" ) - assert not exceptions, "\n".join(exceptions) + return issues - def check_verify_results(self, load_summaries, verification_summaries) -> None: - for node_or_endpoint in load_summaries: - with reporter.step(f"Check verify scenario results for {node_or_endpoint}"): - self._check_verify_result( - load_summaries[node_or_endpoint], verification_summaries[node_or_endpoint] + def collect_verify_issues(self, load_summaries, verification_summaries) -> list[str]: + verify_issues: list[str] = [] + for k6_process_label in load_summaries: + with reporter.step(f"Check verify scenario results for {k6_process_label}"): + verify_issues.extend( + self._collect_verify_issues_on_process( + k6_process_label, + load_summaries[k6_process_label], + verification_summaries[k6_process_label], + ) ) + return verify_issues - def _check_verify_result(self, load_summary, verification_summary) -> None: - exceptions = [] + def _collect_verify_issues_on_process( + self, label, load_summary, verification_summary + ) -> list[str]: + issues = [] load_metrics = get_metrics_object(self.load_params.scenario, load_summary) @@ -92,8 +112,8 @@ class LoadVerifier: # Due to interruptions we may see total verified objects to be less than written on writers count if abs(total_left_objects - verified_objects) > writers: - exceptions.append( - f"Verified objects mismatch. Total: {total_left_objects}, Verified: {verified_objects}. Writers: {writers}." + issues.append( + f"Verified objects mismatch for {label}. Total: {total_left_objects}, Verified: {verified_objects}. Writers: {writers}." ) - assert not exceptions, "\n".join(exceptions) + return issues diff --git a/src/frostfs_testlib/load/runners.py b/src/frostfs_testlib/load/runners.py index a7fa787..489ddcd 100644 --- a/src/frostfs_testlib/load/runners.py +++ b/src/frostfs_testlib/load/runners.py @@ -43,8 +43,8 @@ class RunnerBase(ScenarioRunner): parallel([k6.preset for k6 in self.k6_instances]) @reporter.step_deco("Wait until load finish") - def wait_until_finish(self): - parallel([k6.wait_until_finished for k6 in self.k6_instances]) + def wait_until_finish(self, soft_timeout: int = 0): + parallel([k6.wait_until_finished for k6 in self.k6_instances], soft_timeout=soft_timeout) @property def is_running(self): diff --git a/src/frostfs_testlib/resources/load_params.py b/src/frostfs_testlib/resources/load_params.py index bd99859..97193cc 100644 --- a/src/frostfs_testlib/resources/load_params.py +++ b/src/frostfs_testlib/resources/load_params.py @@ -13,6 +13,7 @@ BACKGROUND_DELETERS_COUNT = os.getenv("BACKGROUND_DELETERS_COUNT", 0) BACKGROUND_VERIFIERS_COUNT = os.getenv("BACKGROUND_VERIFIERS_COUNT", 0) BACKGROUND_LOAD_DEFAULT_TIME = os.getenv("BACKGROUND_LOAD_DEFAULT_TIME", 1800) BACKGROUND_LOAD_DEFAULT_OBJECT_SIZE = os.getenv("BACKGROUND_LOAD_DEFAULT_OBJECT_SIZE", 32) +BACKGROUND_LOAD_DEFAULT_VU_INIT_TIME = float(os.getenv("BACKGROUND_LOAD_DEFAULT_VU_INIT_TIME", 0.8)) BACKGROUND_LOAD_SETUP_TIMEOUT = os.getenv("BACKGROUND_LOAD_SETUP_TIMEOUT", "5s") # This will decrease load params for some weak environments @@ -26,7 +27,7 @@ BACKGROUND_LOAD_CONTAINER_PLACEMENT_POLICY = os.getenv( BACKGROUND_LOAD_S3_LOCATION = os.getenv("BACKGROUND_LOAD_S3_LOCATION", "node-off") PRESET_CONTAINERS_COUNT = os.getenv("CONTAINERS_COUNT", "40") # TODO: At lease one object is required due to bug in xk6 (buckets with no objects produce millions exceptions in read) -PRESET_OBJECTS_COUNT = os.getenv("OBJ_COUNT", "10") +PRESET_OBJECTS_COUNT = os.getenv("OBJ_COUNT", "1") K6_DIRECTORY = os.getenv("K6_DIRECTORY", "/etc/k6") K6_TEARDOWN_PERIOD = os.getenv("K6_TEARDOWN_PERIOD", "30") K6_STOP_SIGNAL_TIMEOUT = int(os.getenv("K6_STOP_SIGNAL_TIMEOUT", 300)) diff --git a/src/frostfs_testlib/storage/controllers/background_load_controller.py b/src/frostfs_testlib/storage/controllers/background_load_controller.py index 38cdf0f..a18a603 100644 --- a/src/frostfs_testlib/storage/controllers/background_load_controller.py +++ b/src/frostfs_testlib/storage/controllers/background_load_controller.py @@ -158,25 +158,27 @@ class BackgroundLoadController: @reporter.step_deco("Run post-load verification") def verify(self): try: - self._verify_load_results() + load_issues = self._collect_load_issues() if self.load_params.verify: - self._run_verify_scenario() + load_issues.extend(self._run_verify_scenario()) + + assert not load_issues, "\n".join(load_issues) finally: self._reset_for_consequent_load() @run_optionally(optionals.OPTIONAL_BACKGROUND_LOAD_ENABLED) - @reporter.step_deco("Verify load results") - def _verify_load_results(self): + @reporter.step_deco("Collect load issues") + def _collect_load_issues(self): verifier = LoadVerifier(self.load_params) - verifier.verify_load_results(self.load_summaries) + return verifier.collect_load_issues(self.load_summaries) @run_optionally(optionals.OPTIONAL_BACKGROUND_LOAD_ENABLED) - def wait_until_finish(self): - self.runner.wait_until_finish() + def wait_until_finish(self, soft_timeout: int = 0): + self.runner.wait_until_finish(soft_timeout) @run_optionally(optionals.OPTIONAL_BACKGROUND_LOAD_ENABLED) @reporter.step_deco("Verify loaded objects") - def _run_verify_scenario(self): + def _run_verify_scenario(self) -> list[str]: self.verification_params = LoadParams( verify_clients=self.load_params.verify_clients, scenario=LoadScenario.VERIFY, @@ -185,6 +187,7 @@ class BackgroundLoadController: verify_time=self.load_params.verify_time, load_type=self.load_params.load_type, load_id=self.load_params.load_id, + vu_init_time=0, working_dir=self.load_params.working_dir, endpoint_selection_strategy=self.load_params.endpoint_selection_strategy, k6_process_allocation_strategy=self.load_params.k6_process_allocation_strategy, @@ -199,10 +202,10 @@ class BackgroundLoadController: self.runner.start() self.runner.wait_until_finish() - with reporter.step("Check verify results"): + with reporter.step("Collect verify issues"): verification_summaries = self._get_results() verifier = LoadVerifier(self.load_params) - verifier.check_verify_results(self.load_summaries, verification_summaries) + return verifier.collect_verify_issues(self.load_summaries, verification_summaries) @run_optionally(optionals.OPTIONAL_BACKGROUND_LOAD_ENABLED) def _get_results(self) -> dict: diff --git a/src/frostfs_testlib/testing/parallel.py b/src/frostfs_testlib/testing/parallel.py index 7f4ee26..ebddd38 100644 --- a/src/frostfs_testlib/testing/parallel.py +++ b/src/frostfs_testlib/testing/parallel.py @@ -42,7 +42,7 @@ def parallel( exceptions = [future.exception() for future in futures if future.exception()] if exceptions: message = "\n".join([str(e) for e in exceptions]) - raise RuntimeError(f"The following exceptions occured during parallel run: {message}") + raise RuntimeError(f"The following exceptions occured during parallel run:\n {message}") return futures
{short_summary}