frostfs-testcases/pytest_tests/helpers/k6.py

216 lines
7.8 KiB
Python

import re
from contextlib import contextmanager
from dataclasses import dataclass
from time import sleep
import allure
from neofs_testlib.shell import Shell
from pytest_tests.helpers.remote_process import RemoteProcess
EXIT_RESULT_CODE = 0
@dataclass
class LoadParams:
obj_size: int
containers_count: int
out_file: str
obj_count: int
writers: int
readers: int
deleters: int
load_time: int
load_type: str
endpoint: str
@dataclass
class LoadResults:
latency_write_min: float = 0.0
latency_write_max: float = 0.0
latency_write_med: float = 0.0
latency_write_avg: float = 0.0
latency_read_min: float = 0.0
latency_read_max: float = 0.0
latency_read_med: float = 0.0
latency_read_avg: float = 0.0
read_ops: float = 0.0
write_ops: float = 0.0
class K6:
def __init__(self, load_params: LoadParams, shell: Shell):
self.load_params = load_params
self.shell = shell
self._k6_dir = None
self._k6_result = None
self._k6_process = None
self._k6_stop_attempts = 5
self._k6_stop_timeout = 15
@property
def process_dir(self) -> str:
return self._k6_process.process_dir
@property
def k6_dir(self) -> str:
if not self._k6_dir:
self._k6_dir = self.shell.exec("locate -l 1 'k6'").stdout.strip("\n")
return self._k6_dir
@allure.step("Prepare containers and objects")
def prepare(self) -> str:
self._k6_dir = self.k6_dir
if self.load_params.load_type == "http" or self.load_params.load_type == "grpc":
command = (
f"{self.k6_dir}/scenarios/preset/preset_grpc.py "
f"--size {self.load_params.obj_size} "
f"--containers {self.load_params.containers_count} "
f"--out {self.k6_dir}/{self.load_params.load_type}_{self.load_params.out_file} "
f"--endpoint {self.load_params.endpoint.split(',')[0]} "
f"--preload_obj {self.load_params.obj_count} "
)
terminal = self.shell.exec(command)
return terminal.stdout.strip("\n")
elif self.load_params.load_type == "s3":
command = (
f"{self.k6_dir}/scenarios/preset/preset_s3.py --size {self.load_params.obj_size} "
f"--buckets {self.load_params.containers_count} "
f"--out {self.k6_dir}/{self.load_params.load_type}_{self.load_params.out_file} "
f"--endpoint {self.load_params.endpoint.split(',')[0]} "
f"--preload_obj {self.load_params.obj_count} "
f"--location load-1-1"
)
terminal = self.shell.exec(command)
return terminal.stdout.strip("\n")
else:
raise AssertionError("Wrong K6 load type")
@allure.step("Start K6 on initiator")
def start(self) -> None:
self._k6_dir = self.k6_dir
command = (
f"{self.k6_dir}/k6 run "
f"-e DURATION={self.load_params.load_time} "
f"-e WRITE_OBJ_SIZE={self.load_params.obj_size} "
f"-e WRITERS={self.load_params.writers} -e READERS={self.load_params.readers} "
f"-e DELETERS={self.load_params.deleters} "
f"-e {self.load_params.load_type.upper()}_ENDPOINTS={self.load_params.endpoint} "
f"-e PREGEN_JSON={self.k6_dir}/"
f"{self.load_params.load_type}_{self.load_params.out_file} "
f"{self.k6_dir}/scenarios/{self.load_params.load_type}.js"
)
self._k6_process = RemoteProcess.create(command, self.shell)
@allure.step("Wait until K6 is finished")
def wait_until_finished(self, timeout: int = 0, k6_should_be_running: bool = False) -> None:
if self._k6_process is None:
assert "No k6 instances were executed"
if k6_should_be_running:
assert self._k6_process.running(), "k6 should be running."
for __attempt in reversed(range(5)) if timeout else [0]:
if not self._k6_process.running():
return
if __attempt: # no sleep in last iteration
sleep(int(timeout / 5))
self._stop_k6()
raise TimeoutError(f"Expected K6 finished in {timeout} sec.")
@contextmanager
def start_context(
self, warm_up_time: int = 0, expected_finish: bool = False, expected_fail: bool = False
) -> None:
self.start()
sleep(warm_up_time)
try:
yield self
except Exception as err:
if self._k6_process.running():
self._kill_k6()
raise
if expected_fail:
self._kill_k6()
elif expected_finish:
if self._k6_process.running():
self._kill_k6()
raise AssertionError("K6 has not finished in expected time")
else:
self._k6_should_be_finished()
else:
self._stop_k6()
@allure.step("Get K6 results")
def get_k6_results(self) -> None:
self.__log_k6_output()
@allure.step("Assert K6 should be finished")
def _k6_should_be_finished(self) -> None:
k6_rc = self._k6_process.rc()
assert k6_rc == 0, f"K6 unexpectedly finished with RC {k6_rc}"
@allure.step("Terminate K6 on initiator")
def stop(self) -> None:
if not self._k6_process.running():
raise AssertionError("K6 unexpectedly finished")
self._stop_k6()
k6_rc = self._k6_process.rc()
assert k6_rc == EXIT_RESULT_CODE, f"Return code of K6 job should be 0, but {k6_rc}"
def check_k6_is_running(self) -> bool:
if self._k6_process:
return self._k6_process.running()
return False
@property
def is_finished(self) -> bool:
return not self._k6_process.running()
def parsing_results(self) -> LoadResults:
output = self._k6_process.stdout(full=True).replace("\n", "")
metric_regex_map = {
"latency_write_min": r"neofs_obj_put_duration.*?min=(?P<latency_write_min>\d*\.\d*)",
"latency_write_max": r"neofs_obj_put_duration.*?max=(?P<latency_write_max>\d*\.\d*)",
"latency_write_med": r"neofs_obj_put_duration.*?med=(?P<latency_write_med>\d*\.\d*)",
"latency_write_avg": r"neofs_obj_put_duration.*?avg=(?P<latency_write_avg>\d*\.\d*)",
"write_ops": r"neofs_obj_put_total\W*\d*\W*(?P<write_ops>\d*\.\d*)",
"latency_read_min": r"neofs_obj_get_duration.*?min=(?P<latency_read_min>\d*\.\d*)",
"latency_read_max": r"neofs_obj_get_duration.*?max=(?P<latency_read_max>\d*\.\d*)",
"latency_read_med": r"neofs_obj_get_duration.*?med=(?P<latency_read_med>\d*\.\d*)",
"latency_read_avg": r"neofs_obj_get_duration.*?avg=(?P<latency_read_avg>\d*\.\d*)",
"read_ops": r"neofs_obj_get_total\W*\d*\W*(?P<read_ops>\d*\.\d*)",
}
metric_values = {}
for metric_name, metric_regex in metric_regex_map.items():
match = re.search(metric_regex, output)
if match:
metric_values[metric_name] = float(match.group(metric_name))
continue
metric_values[metric_name] = 0.0
load_result = LoadResults(**metric_values)
return load_result
@allure.step("Try to stop K6 with SIGTERM")
def _stop_k6(self) -> None:
for __attempt in range(self._k6_stop_attempts):
if not self._k6_process.running():
break
self._k6_process.stop()
sleep(self._k6_stop_timeout)
else:
raise AssertionError("Can not stop K6 process within timeout")
def _kill_k6(self) -> None:
self._k6_process.kill()
@allure.step("Log K6 output")
def __log_k6_output(self) -> None:
allure.attach(self._k6_process.stdout(full=True), "K6 output", allure.attachment_type.TEXT)