adding k6 + remote_process helper
Why script file: We have script file for debug after test is finished We don't need too long strings for passing environment variables We can easy get PID https://serverfault.com/questions/420905/nohup-multiple-sequential-commands Signed-off-by: a.lipay <a.lipay@yadro.com>
This commit is contained in:
parent
02c859796f
commit
2452cccba0
2 changed files with 363 additions and 0 deletions
177
pytest_tests/helpers/k6.py
Normal file
177
pytest_tests/helpers/k6.py
Normal file
|
@ -0,0 +1,177 @@
|
|||
from dataclasses import dataclass
|
||||
import allure
|
||||
from contextlib import contextmanager
|
||||
from time import sleep
|
||||
from pytest_tests.helpers.ssh_helper import HostClient
|
||||
from pytest_tests.helpers.remote_process import RemoteProcess
|
||||
|
||||
EXIT_RESULT_CODE = 0
|
||||
|
||||
|
||||
@dataclass
|
||||
class LoadParams:
|
||||
obj_size: int
|
||||
containers_count: int
|
||||
out_file: str
|
||||
obj_count: int
|
||||
writers_percent: int
|
||||
load_time: int
|
||||
clients_count: int
|
||||
load_type: str
|
||||
endpoint: str
|
||||
|
||||
|
||||
class K6:
|
||||
def __init__(
|
||||
self,
|
||||
load_params: LoadParams,
|
||||
host_client: HostClient
|
||||
):
|
||||
|
||||
self.load_params = load_params
|
||||
self.host_client = host_client
|
||||
|
||||
self._k6_dir = None
|
||||
self._k6_result = None
|
||||
|
||||
self._k6_process = None
|
||||
self._k6_stop_attempts = 5
|
||||
self._k6_stop_timeout = 15
|
||||
|
||||
@property
|
||||
def process_dir(self) -> str:
|
||||
return self._k6_process.process_dir
|
||||
|
||||
@property
|
||||
def k6_dir(self) -> str:
|
||||
if not self._k6_dir:
|
||||
self._k6_dir = self.host_client.exec("locate -l 1 'k6'").stdout.strip("\n")
|
||||
return self._k6_dir
|
||||
|
||||
@allure.step("Prepare containers and objects")
|
||||
def prepare(self) -> str:
|
||||
self._k6_dir = self.k6_dir
|
||||
if (
|
||||
self.load_params.load_type == "http"
|
||||
or self.load_params.load_type == "grpc"
|
||||
):
|
||||
command = (
|
||||
f"{self.k6_dir}/scenarios/preset/preset_grpc.py "
|
||||
f"--size {self.load_params.obj_size} "
|
||||
f"--containers {self.load_params.containers_count} "
|
||||
f"--out {self.load_params.load_type}_{self.load_params.out_file} "
|
||||
f"--endpoint {self.load_params.endpoint} "
|
||||
f"--preload_obj {self.load_params.obj_count} "
|
||||
)
|
||||
terminal = self.host_client.exec(command)
|
||||
return terminal.stdout.strip("\n")
|
||||
elif self.load_params.load_type == "s3":
|
||||
command = (
|
||||
f"{self.k6_dir}/scenarios/preset/preset_s3.py --size {self.load_params.obj_size} "
|
||||
f"--buckets {self.load_params.containers_count} "
|
||||
f"--out {self.load_params.load_type}_{self.load_params.out_file} "
|
||||
f"--endpoint {self.load_params.endpoint} "
|
||||
f"--preload_obj {self.load_params.obj_count} "
|
||||
f"--location load-1-1"
|
||||
)
|
||||
terminal = self.host_client.exec(command)
|
||||
return terminal.stdout.strip("\n")
|
||||
raise AssertionError("Wrong K6 load type")
|
||||
|
||||
@allure.step("Start K6 on initiator")
|
||||
def start(self) -> None:
|
||||
|
||||
self._k6_dir = self.k6_dir
|
||||
command = (
|
||||
f"{self.k6_dir}/k6 run -e "
|
||||
f"PROFILE={self.load_params.writers_percent}:{self.load_params.load_time} "
|
||||
f"-e WRITE_OBJ_SIZE={self.load_params.obj_size} "
|
||||
f"-e CLIENTS={self.load_params.clients_count} -e NODES={self.load_params.endpoint} "
|
||||
f"-e PREGEN_JSON={self.k6_dir}/{self.load_params.load_type}_{self.load_params.out_file} "
|
||||
f"{self.k6_dir}/scenarios/{self.load_params.load_type}.js"
|
||||
)
|
||||
self._k6_process = RemoteProcess.create(command, self.host_client)
|
||||
|
||||
@allure.step("Wait until K6 is finished")
|
||||
def wait_until_finished(self, timeout: int = 0, k6_should_be_running: bool = False) -> None:
|
||||
if self._k6_process is None:
|
||||
assert "No k6 instances were executed"
|
||||
if k6_should_be_running:
|
||||
assert self._k6_process.running(), "k6 should be running."
|
||||
for __attempt in reversed(range(5)) if timeout else [0]:
|
||||
if not self._k6_process.running():
|
||||
return
|
||||
if __attempt: # no sleep in last iteration
|
||||
sleep(int(timeout / 5))
|
||||
self._stop_k6()
|
||||
raise TimeoutError(f"Expected K6 finished in {timeout} sec.")
|
||||
|
||||
@contextmanager
|
||||
def start_context(
|
||||
self, warm_up_time: int = 0, expected_finish: bool = False, expected_fail: bool = False
|
||||
) -> None:
|
||||
self.start()
|
||||
sleep(warm_up_time)
|
||||
try:
|
||||
yield self
|
||||
except Exception as err:
|
||||
if self._k6_process.running():
|
||||
self._kill_k6()
|
||||
raise
|
||||
|
||||
if expected_fail:
|
||||
self._kill_k6()
|
||||
elif expected_finish:
|
||||
if self._k6_process.running():
|
||||
self._kill_k6()
|
||||
raise AssertionError("K6 has not finished in expected time")
|
||||
else:
|
||||
self._k6_should_be_finished()
|
||||
else:
|
||||
self._stop_k6()
|
||||
|
||||
@allure.step("Get K6 results")
|
||||
def get_k6_results(self) -> None:
|
||||
self.__log_k6_output()
|
||||
|
||||
@allure.step("Assert K6 should be finished")
|
||||
def _k6_should_be_finished(self) -> None:
|
||||
k6_rc = self._k6_process.rc()
|
||||
assert k6_rc == 0, f"K6 unexpectedly finished with RC {k6_rc}"
|
||||
|
||||
@allure.step("Terminate K6 on initiator")
|
||||
def stop(self) -> None:
|
||||
if not self._k6_process.running():
|
||||
raise AssertionError("K6 unexpectedly finished")
|
||||
|
||||
self._stop_k6()
|
||||
|
||||
k6_rc = self._k6_process.rc()
|
||||
assert k6_rc == EXIT_RESULT_CODE, f"Return code of K6 job should be 0, but {k6_rc}"
|
||||
|
||||
def check_k6_is_running(self) -> bool:
|
||||
if self._k6_process:
|
||||
return self._k6_process.running()
|
||||
return False
|
||||
|
||||
@property
|
||||
def is_finished(self) -> bool:
|
||||
return not self._k6_process.running()
|
||||
|
||||
@allure.step("Try to stop K6 with SIGTERM")
|
||||
def _stop_k6(self) -> None:
|
||||
for __attempt in range(self._k6_stop_attempts):
|
||||
if not self._k6_process.running():
|
||||
break
|
||||
|
||||
self._k6_process.stop()
|
||||
sleep(self._k6_stop_timeout)
|
||||
else:
|
||||
raise AssertionError("Can not stop K6 process within timeout")
|
||||
|
||||
def _kill_k6(self) -> None:
|
||||
self._k6_process.kill()
|
||||
|
||||
@allure.step("Log K6 output")
|
||||
def __log_k6_output(self) -> None:
|
||||
allure.attach(self._k6_process.stdout(full=True), "K6 output", allure.attachment_type.TEXT)
|
186
pytest_tests/helpers/remote_process.py
Normal file
186
pytest_tests/helpers/remote_process.py
Normal file
|
@ -0,0 +1,186 @@
|
|||
from __future__ import annotations
|
||||
import uuid
|
||||
import allure
|
||||
from tenacity import retry, stop_after_attempt, wait_fixed
|
||||
from typing import Optional
|
||||
from pytest_tests.helpers.ssh_helper import HostClient
|
||||
|
||||
|
||||
class RemoteProcess:
|
||||
def __init__(self, cmd: str, process_dir: str, host_client: HostClient):
|
||||
self.process_dir = process_dir
|
||||
self.cmd = cmd
|
||||
self.stdout_last_line_number = 0
|
||||
self.stderr_last_line_number = 0
|
||||
self.pid: Optional[str] = None
|
||||
self.proc_rc: Optional[int] = None
|
||||
self.saved_stdout: Optional[str] = None
|
||||
self.saved_stderr: Optional[str] = None
|
||||
self.host_client = host_client
|
||||
|
||||
@classmethod
|
||||
@allure.step("Create remote process")
|
||||
def create(cls, command: str, host_client: HostClient) -> RemoteProcess:
|
||||
"""
|
||||
Create a process on a remote host.
|
||||
|
||||
Created dir for process with following files:
|
||||
command.sh: script to execute
|
||||
pid: contains process id
|
||||
rc: contains script return code
|
||||
stderr: contains script errors
|
||||
stdout: contains script output
|
||||
|
||||
Args:
|
||||
host_client: Host client instance
|
||||
command: command to be run on a remote host
|
||||
|
||||
Returns:
|
||||
RemoteProcess instance for further examination
|
||||
"""
|
||||
remote_process = cls(
|
||||
cmd=command, process_dir=f"/tmp/proc_{uuid.uuid4()}", host_client=host_client
|
||||
)
|
||||
remote_process._create_process_dir()
|
||||
remote_process._generate_command_script(command)
|
||||
remote_process._start_process()
|
||||
remote_process.pid = remote_process._get_pid()
|
||||
return remote_process
|
||||
|
||||
@allure.step("Get process stdout")
|
||||
def stdout(self, full: bool = False) -> str:
|
||||
"""
|
||||
Method to get process stdout, either fresh info or full.
|
||||
|
||||
Args:
|
||||
full: returns full stdout that we have to this moment
|
||||
|
||||
Returns:
|
||||
Fresh stdout. By means of stdout_last_line_number only new stdout lines are returned.
|
||||
If process is finished (proc_rc is not None) saved stdout is returned
|
||||
"""
|
||||
if self.saved_stdout is not None:
|
||||
cur_stdout = self.saved_stdout
|
||||
else:
|
||||
terminal = self.host_client.exec(f"cat {self.process_dir}/stdout")
|
||||
if self.proc_rc is not None:
|
||||
self.saved_stdout = terminal.stdout
|
||||
cur_stdout = terminal.stdout
|
||||
|
||||
if full:
|
||||
return cur_stdout
|
||||
whole_stdout = cur_stdout.split("\n")
|
||||
if len(whole_stdout) > self.stdout_last_line_number:
|
||||
resulted_stdout = "\n".join(whole_stdout[self.stdout_last_line_number :])
|
||||
self.stdout_last_line_number = len(whole_stdout)
|
||||
return resulted_stdout
|
||||
return ""
|
||||
|
||||
@allure.step("Get process stderr")
|
||||
def stderr(self, full: bool = False) -> str:
|
||||
"""
|
||||
Method to get process stderr, either fresh info or full.
|
||||
|
||||
Args:
|
||||
full: returns full stderr that we have to this moment
|
||||
|
||||
Returns:
|
||||
Fresh stderr. By means of stderr_last_line_number only new stderr lines are returned.
|
||||
If process is finished (proc_rc is not None) saved stderr is returned
|
||||
"""
|
||||
if self.saved_stderr is not None:
|
||||
cur_stderr = self.saved_stderr
|
||||
else:
|
||||
terminal = self.host_client.exec(f"cat {self.process_dir}/stderr")
|
||||
if self.proc_rc is not None:
|
||||
self.saved_stderr = terminal.stdout
|
||||
cur_stderr = terminal.stdout
|
||||
if full:
|
||||
return cur_stderr
|
||||
whole_stderr = cur_stderr.split("\n")
|
||||
if len(whole_stderr) > self.stderr_last_line_number:
|
||||
resulted_stderr = "\n".join(whole_stderr[self.stderr_last_line_number :])
|
||||
self.stderr_last_line_number = len(whole_stderr)
|
||||
return resulted_stderr
|
||||
return ""
|
||||
|
||||
@allure.step("Get process rc")
|
||||
def rc(self) -> Optional[int]:
|
||||
if self.proc_rc is not None:
|
||||
return self.proc_rc
|
||||
|
||||
terminal = self.host_client.exec(f"cat {self.process_dir}/rc", verify=False)
|
||||
if "No such file or directory" in terminal.stderr:
|
||||
return None
|
||||
elif terminal.stderr or terminal.rc != 0:
|
||||
raise AssertionError(f"cat process rc was not successfull: {terminal.stderr}")
|
||||
|
||||
self.proc_rc = int(terminal.stdout)
|
||||
return self.proc_rc
|
||||
|
||||
@allure.step("Check if process is running")
|
||||
def running(self) -> bool:
|
||||
return self.rc() is None
|
||||
|
||||
@allure.step("Send signal to process")
|
||||
def send_signal(self, signal: int) -> None:
|
||||
kill_res = self.host_client.exec(f"kill -{signal} {self.pid}", verify=False)
|
||||
if "No such process" in kill_res.stderr:
|
||||
return
|
||||
if kill_res.rc:
|
||||
raise AssertionError(f"Signal {signal} not sent. Return code of kill: {kill_res.rc}")
|
||||
|
||||
@allure.step("Stop process")
|
||||
def stop(self) -> None:
|
||||
self.send_signal(15)
|
||||
|
||||
@allure.step("Kill process")
|
||||
def kill(self) -> None:
|
||||
self.send_signal(9)
|
||||
|
||||
@allure.step("Clear process directory")
|
||||
def clear(self) -> None:
|
||||
if self.process_dir == "/":
|
||||
raise AssertionError(f"Invalid path to delete: {self.process_dir}")
|
||||
self.host_client.exec(f"rm -rf {self.process_dir}")
|
||||
|
||||
@allure.step("Start remote process")
|
||||
def _start_process(self) -> None:
|
||||
self.host_client.exec(
|
||||
f"nohup {self.process_dir}/command.sh </dev/null "
|
||||
f">{self.process_dir}/stdout "
|
||||
f"2>{self.process_dir}/stderr &"
|
||||
)
|
||||
|
||||
@allure.step("Create process directory")
|
||||
def _create_process_dir(self) -> None:
|
||||
self.host_client.exec(f"mkdir {self.process_dir}; chmod 777 {self.process_dir}")
|
||||
terminal = self.host_client.exec(f"realpath {self.process_dir}")
|
||||
self.process_dir = terminal.stdout.strip()
|
||||
|
||||
@allure.step("Get pid")
|
||||
@retry(wait=wait_fixed(10), stop=stop_after_attempt(5), reraise=True)
|
||||
def _get_pid(self) -> str:
|
||||
terminal = self.host_client.exec(f"cat {self.process_dir}/pid")
|
||||
assert terminal.stdout, f"invalid pid: {terminal.stdout}"
|
||||
return terminal.stdout.strip()
|
||||
|
||||
@allure.step("Generate command script")
|
||||
def _generate_command_script(self, command: str) -> None:
|
||||
command = command.replace('"', '\\"').replace("\\", "\\\\")
|
||||
script = (
|
||||
f"#!/bin/bash\n"
|
||||
f"cd {self.process_dir}\n"
|
||||
f"{command} &\n"
|
||||
f"pid=\$!\n"
|
||||
f"cd {self.process_dir}\n"
|
||||
f"echo \$pid > {self.process_dir}/pid\n"
|
||||
f"wait \$pid\n"
|
||||
f"echo $? > {self.process_dir}/rc"
|
||||
)
|
||||
|
||||
self.host_client.exec(
|
||||
f'echo "{script}" > {self.process_dir}/command.sh'
|
||||
)
|
||||
self.host_client.exec(f"cat {self.process_dir}/command.sh")
|
||||
self.host_client.exec(f"chmod +x {self.process_dir}/command.sh")
|
Loading…
Reference in a new issue