import json import logging import math import os from dataclasses import dataclass from datetime import datetime from time import sleep from typing import Any from urllib.parse import urlparse from frostfs_testlib import reporter from frostfs_testlib.load.interfaces.loader import Loader from frostfs_testlib.load.load_config import K6ProcessAllocationStrategy, LoadParams, LoadScenario, LoadType from frostfs_testlib.processes.remote_process import RemoteProcess from frostfs_testlib.resources.common import STORAGE_USER_NAME from frostfs_testlib.resources.load_params import K6_STOP_SIGNAL_TIMEOUT, K6_TEARDOWN_PERIOD from frostfs_testlib.shell import Shell from frostfs_testlib.storage.dataclasses.wallet import WalletInfo from frostfs_testlib.testing.test_control import wait_for_success EXIT_RESULT_CODE = 0 logger = logging.getLogger("NeoLogger") @dataclass class LoadResults: data_sent: float = 0.0 data_received: float = 0.0 read_ops: float = 0.0 write_ops: float = 0.0 total_ops: float = 0.0 class K6: _k6_process: RemoteProcess def __init__( self, load_params: LoadParams, endpoints: list[str], k6_dir: str, shell: Shell, loader: Loader, wallet: WalletInfo, ): if load_params.scenario is None: raise RuntimeError("Scenario should not be none") self.load_params: LoadParams = load_params self.endpoints = endpoints self.loader: Loader = loader self.shell: Shell = shell self.wallet = wallet self.preset_output: str = "" self.summary_json: str = os.path.join( self.load_params.working_dir, f"{self.load_params.load_id}_{self.load_params.scenario.value}_summary.json", ) self._k6_dir: str = k6_dir command = ( f"{self._k6_dir}/k6 run {self._generate_env_variables()} " f"{self._k6_dir}/scenarios/{self.load_params.scenario.value}.js" ) user = STORAGE_USER_NAME if self.load_params.scenario == LoadScenario.LOCAL else None process_id = ( self.load_params.load_id if self.load_params.scenario != LoadScenario.VERIFY else f"{self.load_params.load_id}_verify" ) self._k6_process = RemoteProcess.create(command, self.shell, self.load_params.working_dir, user, process_id) def _get_fill_percents(self): fill_percents = self.shell.exec("df -H --output=source,pcent,target | grep frostfs").stdout.split("\n") return [line.split() for line in fill_percents][:-1] def check_fill_percent(self): fill_percents = self._get_fill_percents() percent_mean = 0 for line in fill_percents: percent_mean += float(line[1].split('%')[0]) percent_mean = percent_mean / len(fill_percents) logger.info(f"{self.loader.ip} mean fill percent is {percent_mean}") return percent_mean >= self.load_params.fill_percent @property def process_dir(self) -> str: return self._k6_process.process_dir def preset(self) -> str: with reporter.step(f"Run preset on loader {self.loader.ip} for endpoints {self.endpoints}"): preset_grpc = f"{self._k6_dir}/scenarios/preset/preset_grpc.py" preset_s3 = f"{self._k6_dir}/scenarios/preset/preset_s3.py" preset_map = { LoadType.gRPC: preset_grpc, LoadType.S3: preset_s3, LoadType.HTTP: preset_grpc, } base_args = { preset_grpc: [ preset_grpc, f"--endpoint {','.join(self.endpoints)}", f"--wallet {self.wallet.path} ", f"--config {self.wallet.config_path} ", ], preset_s3: [ preset_s3, f"--endpoint {','.join(self.endpoints)}", ], } preset_scenario = preset_map[self.load_params.load_type] command_args = base_args[preset_scenario].copy() command_args += self.load_params.get_preset_arguments() command = " ".join(command_args) result = self.shell.exec(command) assert result.return_code == EXIT_RESULT_CODE, f"Return code of preset is not zero: {result.stdout}" self.preset_output = result.stdout.strip("\n") return self.preset_output @reporter.step("Generate K6 command") def _generate_env_variables(self) -> str: env_vars = self.load_params.get_env_vars() env_vars[f"{self.load_params.load_type.value.upper()}_ENDPOINTS"] = ",".join(self.endpoints) env_vars["SUMMARY_JSON"] = self.summary_json reporter.attach("\n".join(f"{param}: {value}" for param, value in env_vars.items()), "K6 ENV variables") return " ".join([f"-e {param}='{value}'" for param, value in env_vars.items() if value is not None]) def get_start_time(self) -> datetime: return datetime.fromtimestamp(self._k6_process.start_time()) def get_end_time(self) -> datetime: return datetime.fromtimestamp(self._k6_process.end_time()) def start(self) -> None: with reporter.step(f"Start load from loader {self.loader.ip} on endpoints {self.endpoints}"): self._k6_process.start() def wait_until_finished(self, event, soft_timeout: int = 0) -> None: with reporter.step(f"Wait until load is finished from loader {self.loader.ip} on endpoints {self.endpoints}"): if self.load_params.scenario == LoadScenario.VERIFY: timeout = self.load_params.verify_time or 0 else: timeout = self.load_params.load_time or 0 start_time = int(self.get_start_time().timestamp()) current_time = int(datetime.utcnow().timestamp()) working_time = current_time - start_time remaining_time = timeout - working_time setup_teardown_time = ( int(K6_TEARDOWN_PERIOD) + self.load_params.get_init_time() + int(self.load_params.setup_timeout.replace("s", "").strip()) ) remaining_time_including_setup_and_teardown = remaining_time + setup_teardown_time timeout = remaining_time_including_setup_and_teardown if soft_timeout: timeout = min(timeout, soft_timeout) original_timeout = timeout timeouts = { "K6 start time": start_time, "Current time": current_time, "K6 working time": working_time, "Remaining time for load": remaining_time, "Setup and teardown": setup_teardown_time, "Remaining time including setup/teardown": remaining_time_including_setup_and_teardown, "Soft timeout": soft_timeout, "Selected timeout": original_timeout, } reporter.attach("\n".join([f"{k}: {v}" for k, v in timeouts.items()]), "timeouts.txt") min_wait_interval = 10 wait_interval = min_wait_interval if self._k6_process is None: assert "No k6 instances were executed" while timeout > 0: if not self.load_params.fill_percent is None: with reporter.step(f"Check the percentage of filling of all data disks on the node"): if self.check_fill_percent(): logger.info(f"Stopping load on because disks is filled more then {self.load_params.fill_percent}%") event.set() self.stop() return if event.is_set(): self.stop() return if not self._k6_process.running(): return remaining_time_hours = f"{timeout//3600}h" if timeout // 3600 != 0 else "" remaining_time_minutes = f"{timeout//60%60}m" if timeout // 60 % 60 != 0 else "" logger.info( f"K6 is running. Remaining time {remaining_time_hours}{remaining_time_minutes}{timeout%60}s. Next check after {wait_interval} seconds..." ) sleep(wait_interval) timeout -= min(timeout, wait_interval) wait_interval = max( min(timeout, int(math.log2(timeout + 1)) * 15) - min_wait_interval, min_wait_interval, ) if not self._k6_process.running(): return self.stop() if not soft_timeout: raise TimeoutError(f"Expected K6 to finish after {original_timeout} sec.") def get_results(self) -> Any: with reporter.step(f"Get load results from loader {self.loader.ip} on endpoints {self.endpoints}"): self.__log_output() if not self.summary_json: return None summary_text = self.shell.exec(f"cat {self.summary_json}").stdout summary_json = json.loads(summary_text) endpoint = urlparse(self.endpoints[0]).netloc or self.endpoints[0] allure_filenames = { K6ProcessAllocationStrategy.PER_LOAD_NODE: f"{self.loader.ip}_{self.load_params.scenario.value}_summary.json", K6ProcessAllocationStrategy.PER_ENDPOINT: f"{self.loader.ip}_{self.load_params.scenario.value}_{endpoint}_summary.json", } allure_filename = allure_filenames[self.load_params.k6_process_allocation_strategy] reporter.attach(summary_text, allure_filename) return summary_json def stop(self) -> None: with reporter.step(f"Stop load from loader {self.loader.ip} on endpoints {self.endpoints}"): if self.is_running(): self._k6_process.stop() self._wait_until_process_end() def is_running(self) -> bool: if self._k6_process: return self._k6_process.running() return False @reporter.step("Wait until K6 process end") @wait_for_success(K6_STOP_SIGNAL_TIMEOUT, 15, False, False, "Can not stop K6 process within timeout") def _wait_until_process_end(self): return self._k6_process.running() def __log_output(self) -> None: reporter.attach(self._k6_process.stdout(full=True), "K6 stdout") reporter.attach(f"{self._k6_process.process_dir}/stderr", "K6 stderr path")