diff --git a/pytest_tests/helpers/k6.py b/pytest_tests/helpers/k6.py new file mode 100644 index 00000000..4afa9dd4 --- /dev/null +++ b/pytest_tests/helpers/k6.py @@ -0,0 +1,177 @@ +from dataclasses import dataclass +import allure +from contextlib import contextmanager +from time import sleep +from pytest_tests.helpers.ssh_helper import HostClient +from pytest_tests.helpers.remote_process import RemoteProcess + +EXIT_RESULT_CODE = 0 + + +@dataclass +class LoadParams: + obj_size: int + containers_count: int + out_file: str + obj_count: int + writers_percent: int + load_time: int + clients_count: int + load_type: str + endpoint: str + + +class K6: + def __init__( + self, + load_params: LoadParams, + host_client: HostClient + ): + + self.load_params = load_params + self.host_client = host_client + + self._k6_dir = None + self._k6_result = None + + self._k6_process = None + self._k6_stop_attempts = 5 + self._k6_stop_timeout = 15 + + @property + def process_dir(self) -> str: + return self._k6_process.process_dir + + @property + def k6_dir(self) -> str: + if not self._k6_dir: + self._k6_dir = self.host_client.exec("locate -l 1 'k6'").stdout.strip("\n") + return self._k6_dir + + @allure.step("Prepare containers and objects") + def prepare(self) -> str: + self._k6_dir = self.k6_dir + if ( + self.load_params.load_type == "http" + or self.load_params.load_type == "grpc" + ): + command = ( + f"{self.k6_dir}/scenarios/preset/preset_grpc.py " + f"--size {self.load_params.obj_size} " + f"--containers {self.load_params.containers_count} " + f"--out {self.load_params.load_type}_{self.load_params.out_file} " + f"--endpoint {self.load_params.endpoint} " + f"--preload_obj {self.load_params.obj_count} " + ) + terminal = self.host_client.exec(command) + return terminal.stdout.strip("\n") + elif self.load_params.load_type == "s3": + command = ( + f"{self.k6_dir}/scenarios/preset/preset_s3.py --size {self.load_params.obj_size} " + f"--buckets {self.load_params.containers_count} " + f"--out {self.load_params.load_type}_{self.load_params.out_file} " + f"--endpoint {self.load_params.endpoint} " + f"--preload_obj {self.load_params.obj_count} " + f"--location load-1-1" + ) + terminal = self.host_client.exec(command) + return terminal.stdout.strip("\n") + raise AssertionError("Wrong K6 load type") + + @allure.step("Start K6 on initiator") + def start(self) -> None: + + self._k6_dir = self.k6_dir + command = ( + f"{self.k6_dir}/k6 run -e " + f"PROFILE={self.load_params.writers_percent}:{self.load_params.load_time} " + f"-e WRITE_OBJ_SIZE={self.load_params.obj_size} " + f"-e CLIENTS={self.load_params.clients_count} -e NODES={self.load_params.endpoint} " + f"-e PREGEN_JSON={self.k6_dir}/{self.load_params.load_type}_{self.load_params.out_file} " + f"{self.k6_dir}/scenarios/{self.load_params.load_type}.js" + ) + self._k6_process = RemoteProcess.create(command, self.host_client) + + @allure.step("Wait until K6 is finished") + def wait_until_finished(self, timeout: int = 0, k6_should_be_running: bool = False) -> None: + if self._k6_process is None: + assert "No k6 instances were executed" + if k6_should_be_running: + assert self._k6_process.running(), "k6 should be running." + for __attempt in reversed(range(5)) if timeout else [0]: + if not self._k6_process.running(): + return + if __attempt: # no sleep in last iteration + sleep(int(timeout / 5)) + self._stop_k6() + raise TimeoutError(f"Expected K6 finished in {timeout} sec.") + + @contextmanager + def start_context( + self, warm_up_time: int = 0, expected_finish: bool = False, expected_fail: bool = False + ) -> None: + self.start() + sleep(warm_up_time) + try: + yield self + except Exception as err: + if self._k6_process.running(): + self._kill_k6() + raise + + if expected_fail: + self._kill_k6() + elif expected_finish: + if self._k6_process.running(): + self._kill_k6() + raise AssertionError("K6 has not finished in expected time") + else: + self._k6_should_be_finished() + else: + self._stop_k6() + + @allure.step("Get K6 results") + def get_k6_results(self) -> None: + self.__log_k6_output() + + @allure.step("Assert K6 should be finished") + def _k6_should_be_finished(self) -> None: + k6_rc = self._k6_process.rc() + assert k6_rc == 0, f"K6 unexpectedly finished with RC {k6_rc}" + + @allure.step("Terminate K6 on initiator") + def stop(self) -> None: + if not self._k6_process.running(): + raise AssertionError("K6 unexpectedly finished") + + self._stop_k6() + + k6_rc = self._k6_process.rc() + assert k6_rc == EXIT_RESULT_CODE, f"Return code of K6 job should be 0, but {k6_rc}" + + def check_k6_is_running(self) -> bool: + if self._k6_process: + return self._k6_process.running() + return False + + @property + def is_finished(self) -> bool: + return not self._k6_process.running() + + @allure.step("Try to stop K6 with SIGTERM") + def _stop_k6(self) -> None: + for __attempt in range(self._k6_stop_attempts): + if not self._k6_process.running(): + break + + self._k6_process.stop() + sleep(self._k6_stop_timeout) + else: + raise AssertionError("Can not stop K6 process within timeout") + + def _kill_k6(self) -> None: + self._k6_process.kill() + + @allure.step("Log K6 output") + def __log_k6_output(self) -> None: + allure.attach(self._k6_process.stdout(full=True), "K6 output", allure.attachment_type.TEXT) diff --git a/pytest_tests/helpers/remote_process.py b/pytest_tests/helpers/remote_process.py new file mode 100644 index 00000000..3e6e6245 --- /dev/null +++ b/pytest_tests/helpers/remote_process.py @@ -0,0 +1,186 @@ +from __future__ import annotations +import uuid +import allure +from tenacity import retry, stop_after_attempt, wait_fixed +from typing import Optional +from pytest_tests.helpers.ssh_helper import HostClient + + +class RemoteProcess: + def __init__(self, cmd: str, process_dir: str, host_client: HostClient): + self.process_dir = process_dir + self.cmd = cmd + self.stdout_last_line_number = 0 + self.stderr_last_line_number = 0 + self.pid: Optional[str] = None + self.proc_rc: Optional[int] = None + self.saved_stdout: Optional[str] = None + self.saved_stderr: Optional[str] = None + self.host_client = host_client + + @classmethod + @allure.step("Create remote process") + def create(cls, command: str, host_client: HostClient) -> RemoteProcess: + """ + Create a process on a remote host. + + Created dir for process with following files: + command.sh: script to execute + pid: contains process id + rc: contains script return code + stderr: contains script errors + stdout: contains script output + + Args: + host_client: Host client instance + command: command to be run on a remote host + + Returns: + RemoteProcess instance for further examination + """ + remote_process = cls( + cmd=command, process_dir=f"/tmp/proc_{uuid.uuid4()}", host_client=host_client + ) + remote_process._create_process_dir() + remote_process._generate_command_script(command) + remote_process._start_process() + remote_process.pid = remote_process._get_pid() + return remote_process + + @allure.step("Get process stdout") + def stdout(self, full: bool = False) -> str: + """ + Method to get process stdout, either fresh info or full. + + Args: + full: returns full stdout that we have to this moment + + Returns: + Fresh stdout. By means of stdout_last_line_number only new stdout lines are returned. + If process is finished (proc_rc is not None) saved stdout is returned + """ + if self.saved_stdout is not None: + cur_stdout = self.saved_stdout + else: + terminal = self.host_client.exec(f"cat {self.process_dir}/stdout") + if self.proc_rc is not None: + self.saved_stdout = terminal.stdout + cur_stdout = terminal.stdout + + if full: + return cur_stdout + whole_stdout = cur_stdout.split("\n") + if len(whole_stdout) > self.stdout_last_line_number: + resulted_stdout = "\n".join(whole_stdout[self.stdout_last_line_number :]) + self.stdout_last_line_number = len(whole_stdout) + return resulted_stdout + return "" + + @allure.step("Get process stderr") + def stderr(self, full: bool = False) -> str: + """ + Method to get process stderr, either fresh info or full. + + Args: + full: returns full stderr that we have to this moment + + Returns: + Fresh stderr. By means of stderr_last_line_number only new stderr lines are returned. + If process is finished (proc_rc is not None) saved stderr is returned + """ + if self.saved_stderr is not None: + cur_stderr = self.saved_stderr + else: + terminal = self.host_client.exec(f"cat {self.process_dir}/stderr") + if self.proc_rc is not None: + self.saved_stderr = terminal.stdout + cur_stderr = terminal.stdout + if full: + return cur_stderr + whole_stderr = cur_stderr.split("\n") + if len(whole_stderr) > self.stderr_last_line_number: + resulted_stderr = "\n".join(whole_stderr[self.stderr_last_line_number :]) + self.stderr_last_line_number = len(whole_stderr) + return resulted_stderr + return "" + + @allure.step("Get process rc") + def rc(self) -> Optional[int]: + if self.proc_rc is not None: + return self.proc_rc + + terminal = self.host_client.exec(f"cat {self.process_dir}/rc", verify=False) + if "No such file or directory" in terminal.stderr: + return None + elif terminal.stderr or terminal.rc != 0: + raise AssertionError(f"cat process rc was not successfull: {terminal.stderr}") + + self.proc_rc = int(terminal.stdout) + return self.proc_rc + + @allure.step("Check if process is running") + def running(self) -> bool: + return self.rc() is None + + @allure.step("Send signal to process") + def send_signal(self, signal: int) -> None: + kill_res = self.host_client.exec(f"kill -{signal} {self.pid}", verify=False) + if "No such process" in kill_res.stderr: + return + if kill_res.rc: + raise AssertionError(f"Signal {signal} not sent. Return code of kill: {kill_res.rc}") + + @allure.step("Stop process") + def stop(self) -> None: + self.send_signal(15) + + @allure.step("Kill process") + def kill(self) -> None: + self.send_signal(9) + + @allure.step("Clear process directory") + def clear(self) -> None: + if self.process_dir == "/": + raise AssertionError(f"Invalid path to delete: {self.process_dir}") + self.host_client.exec(f"rm -rf {self.process_dir}") + + @allure.step("Start remote process") + def _start_process(self) -> None: + self.host_client.exec( + f"nohup {self.process_dir}/command.sh {self.process_dir}/stdout " + f"2>{self.process_dir}/stderr &" + ) + + @allure.step("Create process directory") + def _create_process_dir(self) -> None: + self.host_client.exec(f"mkdir {self.process_dir}; chmod 777 {self.process_dir}") + terminal = self.host_client.exec(f"realpath {self.process_dir}") + self.process_dir = terminal.stdout.strip() + + @allure.step("Get pid") + @retry(wait=wait_fixed(10), stop=stop_after_attempt(5), reraise=True) + def _get_pid(self) -> str: + terminal = self.host_client.exec(f"cat {self.process_dir}/pid") + assert terminal.stdout, f"invalid pid: {terminal.stdout}" + return terminal.stdout.strip() + + @allure.step("Generate command script") + def _generate_command_script(self, command: str) -> None: + command = command.replace('"', '\\"').replace("\\", "\\\\") + script = ( + f"#!/bin/bash\n" + f"cd {self.process_dir}\n" + f"{command} &\n" + f"pid=\$!\n" + f"cd {self.process_dir}\n" + f"echo \$pid > {self.process_dir}/pid\n" + f"wait \$pid\n" + f"echo $? > {self.process_dir}/rc" + ) + + self.host_client.exec( + f'echo "{script}" > {self.process_dir}/command.sh' + ) + self.host_client.exec(f"cat {self.process_dir}/command.sh") + self.host_client.exec(f"chmod +x {self.process_dir}/command.sh")