import json import logging import math import os from dataclasses import dataclass from datetime import datetime from threading import Event from time import sleep from typing import Any from urllib.parse import urlparse from frostfs_testlib import reporter from frostfs_testlib.credentials.interfaces import User from frostfs_testlib.load.interfaces.loader import Loader from frostfs_testlib.load.load_config import K6ProcessAllocationStrategy, LoadParams, LoadScenario, LoadType from frostfs_testlib.processes.remote_process import RemoteProcess from frostfs_testlib.resources.common import STORAGE_USER_NAME from frostfs_testlib.resources.load_params import K6_STOP_SIGNAL_TIMEOUT, K6_TEARDOWN_PERIOD from frostfs_testlib.shell import Shell from frostfs_testlib.testing.test_control import wait_for_success EXIT_RESULT_CODE = 0 logger = logging.getLogger("NeoLogger") @dataclass class LoadResults: data_sent: float = 0.0 data_received: float = 0.0 read_ops: float = 0.0 write_ops: float = 0.0 total_ops: float = 0.0 class K6: _k6_process: RemoteProcess def __init__( self, load_params: LoadParams, endpoints: list[str], k6_dir: str, shell: Shell, loader: Loader, user: User, ): if load_params.scenario is None: raise RuntimeError("Scenario should not be none") self.load_params = load_params self.endpoints = endpoints self.loader = loader self.shell = shell self.user = user self.preset_output: str = "" self.summary_json: str = os.path.join( self.load_params.working_dir, f"{self.load_params.load_id}_{self.load_params.scenario.value}_summary.json", ) self._k6_dir: str = k6_dir command = ( f"{self._generate_env_variables()}{self._k6_dir}/k6 run {self._generate_k6_variables()} " f"{self._k6_dir}/scenarios/{self.load_params.scenario.value}.js" ) remote_user = STORAGE_USER_NAME if self.load_params.scenario == LoadScenario.LOCAL else None process_id = self.load_params.load_id if self.load_params.scenario != LoadScenario.VERIFY else f"{self.load_params.load_id}_verify" self._k6_process = RemoteProcess.create(command, self.shell, self.load_params.working_dir, remote_user, process_id) def _get_fill_percents(self): fill_percents = self.shell.exec("df -H --output=source,pcent,target | grep frostfs | grep data").stdout.split("\n") return [line.split() for line in fill_percents][:-1] def check_fill_percent(self): fill_percents = self._get_fill_percents() percent_mean = 0 for line in fill_percents: percent_mean += float(line[1].split("%")[0]) percent_mean = percent_mean / len(fill_percents) logger.info(f"{self.loader.ip} mean fill percent is {percent_mean}") return percent_mean >= self.load_params.fill_percent @property def process_dir(self) -> str: return self._k6_process.process_dir def preset(self) -> str: with reporter.step(f"Run preset on loader {self.loader.ip} for endpoints {self.endpoints}"): preset_grpc = f"{self._k6_dir}/scenarios/preset/preset_grpc.py" preset_s3 = f"{self._k6_dir}/scenarios/preset/preset_s3.py" preset_map = { LoadType.gRPC: preset_grpc, LoadType.S3: preset_s3, LoadType.HTTP: preset_grpc, } base_args = { preset_grpc: [ preset_grpc, f"--endpoint {','.join(self.endpoints)}", f"--wallet {self.user.wallet.path} ", f"--config {self.user.wallet.config_path} ", ], preset_s3: [ preset_s3, f"--endpoint {','.join(self.endpoints)}", ], } preset_scenario = preset_map[self.load_params.load_type] command_args = base_args[preset_scenario].copy() command_args += self.load_params.get_preset_arguments() command = " ".join(command_args) result = self.shell.exec(command) assert result.return_code == EXIT_RESULT_CODE, f"Return code of preset is not zero: {result.stdout}" self.preset_output = result.stdout.strip("\n") return self.preset_output @reporter.step("Generate K6 variables") def _generate_k6_variables(self) -> str: env_vars = self.load_params.get_k6_vars() env_vars[f"{self.load_params.load_type.value.upper()}_ENDPOINTS"] = ",".join(self.endpoints) env_vars["SUMMARY_JSON"] = self.summary_json reporter.attach("\n".join(f"{param}: {value}" for param, value in env_vars.items()), "K6 ENV variables") return " ".join([f"-e {param}='{value}'" for param, value in env_vars.items() if value is not None]) @reporter.step("Generate env variables") def _generate_env_variables(self) -> str: env_vars = self.load_params.get_env_vars() if not env_vars: return "" reporter.attach("\n".join(f"{param}: {value}" for param, value in env_vars.items()), "ENV variables") return " ".join([f"{param}='{value}'" for param, value in env_vars.items() if value is not None]) + " " def get_start_time(self) -> datetime: return datetime.fromtimestamp(self._k6_process.start_time()) def get_end_time(self) -> datetime: return datetime.fromtimestamp(self._k6_process.end_time()) def start(self) -> None: with reporter.step(f"Start load from loader {self.loader.ip} on endpoints {self.endpoints}"): self._k6_process.start() def wait_until_finished(self, event: Event, soft_timeout: int = 0) -> None: with reporter.step(f"Wait until load is finished from loader {self.loader.ip} on endpoints {self.endpoints}"): if self.load_params.scenario == LoadScenario.VERIFY: timeout = self.load_params.verify_time or 0 else: timeout = self.load_params.load_time or 0 start_time = int(self.get_start_time().timestamp()) current_time = int(datetime.utcnow().timestamp()) working_time = current_time - start_time remaining_time = timeout - working_time setup_teardown_time = ( int(K6_TEARDOWN_PERIOD) + self.load_params.get_init_time() + int(self.load_params.setup_timeout.replace("s", "").strip()) ) remaining_time_including_setup_and_teardown = remaining_time + setup_teardown_time timeout = remaining_time_including_setup_and_teardown if soft_timeout: timeout = min(timeout, soft_timeout) original_timeout = timeout timeouts = { "K6 start time": start_time, "Current time": current_time, "K6 working time": working_time, "Remaining time for load": remaining_time, "Setup and teardown": setup_teardown_time, "Remaining time including setup/teardown": remaining_time_including_setup_and_teardown, "Soft timeout": soft_timeout, "Selected timeout": original_timeout, } reporter.attach("\n".join([f"{k}: {v}" for k, v in timeouts.items()]), "timeouts.txt") min_wait_interval = 10 wait_interval = min_wait_interval if self._k6_process is None: assert "No k6 instances were executed" while timeout > 0: if not self.load_params.fill_percent is None: with reporter.step(f"Check the percentage of filling of all data disks on the node"): if self.check_fill_percent(): logger.info(f"Stopping load on because disks is filled more then {self.load_params.fill_percent}%") event.set() self.stop() return if event.is_set(): self.stop() return if not self._k6_process.running(): return remaining_time_hours = f"{timeout//3600}h" if timeout // 3600 != 0 else "" remaining_time_minutes = f"{timeout//60%60}m" if timeout // 60 % 60 != 0 else "" logger.info( f"K6 is running. Remaining time {remaining_time_hours}{remaining_time_minutes}{timeout%60}s. Next check after {wait_interval} seconds..." ) sleep(wait_interval) timeout -= min(timeout, wait_interval) wait_interval = max( min(timeout, int(math.log2(timeout + 1)) * 15) - min_wait_interval, min_wait_interval, ) if not self._k6_process.running(): return self.stop() if not soft_timeout: raise TimeoutError(f"Expected K6 to finish after {original_timeout} sec.") def get_results(self) -> Any: with reporter.step(f"Get load results from loader {self.loader.ip} on endpoints {self.endpoints}"): self.__log_output() if not self.summary_json: return None summary_text = self.shell.exec(f"cat {self.summary_json}").stdout summary_json = json.loads(summary_text) endpoint = urlparse(self.endpoints[0]).netloc or self.endpoints[0] allure_filenames = { K6ProcessAllocationStrategy.PER_LOAD_NODE: f"{self.loader.ip}_{self.load_params.scenario.value}_summary.json", K6ProcessAllocationStrategy.PER_ENDPOINT: f"{self.loader.ip}_{self.load_params.scenario.value}_{endpoint}_summary.json", } allure_filename = allure_filenames[self.load_params.k6_process_allocation_strategy] reporter.attach(summary_text, allure_filename) return summary_json def stop(self) -> None: with reporter.step(f"Stop load from loader {self.loader.ip} on endpoints {self.endpoints}"): if self.is_running(): self._k6_process.stop() self._wait_until_process_end() def is_running(self) -> bool: if self._k6_process: return self._k6_process.running() return False @reporter.step("Wait until K6 process end") @wait_for_success(K6_STOP_SIGNAL_TIMEOUT, 15, False, False, "Can not stop K6 process within timeout") def _wait_until_process_end(self): return self._k6_process.running() def __log_output(self) -> None: reporter.attach(self._k6_process.stdout(full=True), "K6 stdout") reporter.attach(f"{self._k6_process.process_dir}/stderr", "K6 stderr path")