import json import logging import os from dataclasses import dataclass, fields from time import sleep from typing import Any from frostfs_testlib.load.load_config import ( K6ProcessAllocationStrategy, LoadParams, LoadScenario, LoadType, ) from frostfs_testlib.processes.remote_process import RemoteProcess from frostfs_testlib.reporter import get_reporter from frostfs_testlib.resources.load_params import K6_STOP_SIGNAL_TIMEOUT, LOAD_NODE_SSH_USER from frostfs_testlib.shell import Shell from frostfs_testlib.storage.dataclasses.wallet import WalletInfo from frostfs_testlib.testing.test_control import wait_for_success EXIT_RESULT_CODE = 0 logger = logging.getLogger("NeoLogger") reporter = get_reporter() @dataclass class LoadResults: data_sent: float = 0.0 data_received: float = 0.0 read_ops: float = 0.0 write_ops: float = 0.0 total_ops: float = 0.0 class K6: _k6_process: RemoteProcess def __init__( self, load_params: LoadParams, endpoints: list[str], k6_dir: str, shell: Shell, load_node: str, wallet: WalletInfo, ): if load_params.scenario is None: raise RuntimeError("Scenario should not be none") self.load_params: LoadParams = load_params self.endpoints = endpoints self.load_node: str = load_node self.shell: Shell = shell self.wallet = wallet self.scenario: LoadScenario = load_params.scenario self.summary_json: str = os.path.join( self.load_params.working_dir, f"{self.load_params.load_id}_{self.scenario.value}_summary.json", ) self._k6_dir: str = k6_dir @property def process_dir(self) -> str: return self._k6_process.process_dir @reporter.step_deco("Preset containers and objects") def preset(self) -> str: preset_grpc = f"{self._k6_dir}/scenarios/preset/preset_grpc.py" preset_s3 = f"{self._k6_dir}/scenarios/preset/preset_s3.py" preset_map = { LoadType.gRPC: preset_grpc, LoadType.S3: preset_s3, LoadType.HTTP: preset_grpc, } base_args = { preset_grpc: [ preset_grpc, f"--endpoint {self.endpoints[0]}", f"--wallet {self.wallet.path} ", f"--config {self.wallet.config_path} ", ], preset_s3: [ preset_s3, f"--endpoint {self.endpoints[0]}", ], } preset_scenario = preset_map[self.load_params.load_type] command_args = base_args[preset_scenario].copy() command_args += [ f"--{field.metadata['preset_argument']} '{getattr(self.load_params, field.name)}'" for field in fields(self.load_params) if field.metadata and self.scenario in field.metadata["applicable_scenarios"] and field.metadata["preset_argument"] and getattr(self.load_params, field.name) is not None ] if self.load_params.preset: command_args += [ f"--{field.metadata['preset_argument']} '{getattr(self.load_params.preset, field.name)}'" for field in fields(self.load_params.preset) if field.metadata and self.scenario in field.metadata["applicable_scenarios"] and field.metadata["preset_argument"] and getattr(self.load_params.preset, field.name) is not None ] command = " ".join(command_args) result = self.shell.exec(command) assert ( result.return_code == EXIT_RESULT_CODE ), f"Return code of preset is not zero: {result.stdout}" return result.stdout.strip("\n") @reporter.step_deco("Generate K6 command") def _generate_env_variables(self) -> str: env_vars = { field.metadata["env_variable"]: getattr(self.load_params, field.name) for field in fields(self.load_params) if field.metadata and self.scenario in field.metadata["applicable_scenarios"] and field.metadata["env_variable"] and getattr(self.load_params, field.name) is not None } if self.load_params.preset: env_vars.update( { field.metadata["env_variable"]: getattr(self.load_params.preset, field.name) for field in fields(self.load_params.preset) if field.metadata and self.scenario in field.metadata["applicable_scenarios"] and field.metadata["env_variable"] and getattr(self.load_params.preset, field.name) is not None } ) env_vars[f"{self.load_params.load_type.value.upper()}_ENDPOINTS"] = ",".join(self.endpoints) env_vars["SUMMARY_JSON"] = self.summary_json reporter.attach( "\n".join(f"{param}: {value}" for param, value in env_vars.items()), "K6 ENV variables" ) return " ".join( [f"-e {param}='{value}'" for param, value in env_vars.items() if value is not None] ) @reporter.step_deco("Start K6 on initiator") def start(self) -> None: # Make working_dir directory self.shell.exec(f"sudo mkdir -p {self.load_params.working_dir}") self.shell.exec(f"sudo chown {LOAD_NODE_SSH_USER} {self.load_params.working_dir}") command = ( f"{self._k6_dir}/k6 run {self._generate_env_variables()} " f"{self._k6_dir}/scenarios/{self.scenario.value}.js" ) self._k6_process = RemoteProcess.create(command, self.shell, self.load_params.working_dir) @reporter.step_deco("Wait until K6 is finished") def wait_until_finished(self, timeout: int = 0, k6_should_be_running: bool = False) -> None: wait_interval = 10 if self._k6_process is None: assert "No k6 instances were executed" if k6_should_be_running: assert self._k6_process.running(), "k6 should be running." while timeout >= 0: if not self._k6_process.running(): return logger.info(f"K6 is running. Waiting {wait_interval} seconds...") if timeout > 0: sleep(wait_interval) timeout -= wait_interval self.stop() raise TimeoutError(f"Expected K6 finished in {timeout} sec.") def get_results(self) -> Any: with reporter.step(f"K6 results from {self.load_node}"): self.__log_output() if not self.summary_json: return None summary_text = self.shell.exec(f"cat {self.summary_json}").stdout summary_json = json.loads(summary_text) allure_filenames = { K6ProcessAllocationStrategy.PER_LOAD_NODE: f"{self.load_node}_{self.scenario.value}_summary.json", K6ProcessAllocationStrategy.PER_ENDPOINT: f"{self.load_node}_{self.scenario.value}_{self.endpoints[0]}_summary.json", } allure_filename = allure_filenames[self.load_params.k6_process_allocation_strategy] reporter.attach(summary_text, allure_filename) return summary_json @reporter.step_deco("Stop K6") def stop(self) -> None: if self.is_running: self._k6_process.stop() self._wait_until_process_end() @property def is_running(self) -> bool: if self._k6_process: return self._k6_process.running() return False @reporter.step_deco("Wait until process end") @wait_for_success( K6_STOP_SIGNAL_TIMEOUT, 15, False, False, "Can not stop K6 process within timeout" ) def _wait_until_process_end(self): return self._k6_process.running() def __log_output(self) -> None: reporter.attach(self._k6_process.stdout(full=True), "K6 stdout") reporter.attach(f"{self._k6_process.process_dir}/stderr", "K6 stderr path")