frostfs-testlib/src/frostfs_testlib/load/k6.py

265 lines
10 KiB
Python

import json
import logging
import math
import os
from dataclasses import dataclass
from datetime import datetime
from time import sleep
from typing import Any
from urllib.parse import urlparse
from frostfs_testlib import reporter
from frostfs_testlib.load.interfaces.loader import Loader
from frostfs_testlib.load.load_config import K6ProcessAllocationStrategy, LoadParams, LoadScenario, LoadType
from frostfs_testlib.processes.remote_process import RemoteProcess
from frostfs_testlib.resources.common import STORAGE_USER_NAME
from frostfs_testlib.resources.load_params import K6_STOP_SIGNAL_TIMEOUT, K6_TEARDOWN_PERIOD
from frostfs_testlib.shell import Shell
from frostfs_testlib.storage.dataclasses.wallet import WalletInfo
from frostfs_testlib.testing.test_control import wait_for_success
EXIT_RESULT_CODE = 0
logger = logging.getLogger("NeoLogger")
@dataclass
class LoadResults:
data_sent: float = 0.0
data_received: float = 0.0
read_ops: float = 0.0
write_ops: float = 0.0
total_ops: float = 0.0
class K6:
_k6_process: RemoteProcess
def __init__(
self,
load_params: LoadParams,
endpoints: list[str],
k6_dir: str,
shell: Shell,
loader: Loader,
wallet: WalletInfo,
):
if load_params.scenario is None:
raise RuntimeError("Scenario should not be none")
self.load_params: LoadParams = load_params
self.endpoints = endpoints
self.loader: Loader = loader
self.shell: Shell = shell
self.wallet = wallet
self.preset_output: str = ""
self.summary_json: str = os.path.join(
self.load_params.working_dir,
f"{self.load_params.load_id}_{self.load_params.scenario.value}_summary.json",
)
self._k6_dir: str = k6_dir
command = (
f"{self._k6_dir}/k6 run {self._generate_env_variables()} "
f"{self._k6_dir}/scenarios/{self.load_params.scenario.value}.js"
)
user = STORAGE_USER_NAME if self.load_params.scenario == LoadScenario.LOCAL else None
process_id = (
self.load_params.load_id
if self.load_params.scenario != LoadScenario.VERIFY
else f"{self.load_params.load_id}_verify"
)
self._k6_process = RemoteProcess.create(command, self.shell, self.load_params.working_dir, user, process_id)
def _get_fill_percents(self):
fill_percents = self.shell.exec("df -H --output=source,pcent,target | grep frostfs").stdout.split("\n")
return [line.split() for line in fill_percents][:-1]
def check_fill_percent(self):
fill_percents = self._get_fill_percents()
percent_mean = 0
for line in fill_percents:
percent_mean += float(line[1].split('%')[0])
percent_mean = percent_mean / len(fill_percents)
logger.info(f"{self.loader.ip} mean fill percent is {percent_mean}")
return percent_mean >= self.load_params.fill_percent
@property
def process_dir(self) -> str:
return self._k6_process.process_dir
def preset(self) -> str:
with reporter.step(f"Run preset on loader {self.loader.ip} for endpoints {self.endpoints}"):
preset_grpc = f"{self._k6_dir}/scenarios/preset/preset_grpc.py"
preset_s3 = f"{self._k6_dir}/scenarios/preset/preset_s3.py"
preset_map = {
LoadType.gRPC: preset_grpc,
LoadType.S3: preset_s3,
LoadType.HTTP: preset_grpc,
}
base_args = {
preset_grpc: [
preset_grpc,
f"--endpoint {','.join(self.endpoints)}",
f"--wallet {self.wallet.path} ",
f"--config {self.wallet.config_path} ",
],
preset_s3: [
preset_s3,
f"--endpoint {','.join(self.endpoints)}",
],
}
preset_scenario = preset_map[self.load_params.load_type]
command_args = base_args[preset_scenario].copy()
command_args += self.load_params.get_preset_arguments()
command = " ".join(command_args)
result = self.shell.exec(command)
assert result.return_code == EXIT_RESULT_CODE, f"Return code of preset is not zero: {result.stdout}"
self.preset_output = result.stdout.strip("\n")
return self.preset_output
@reporter.step("Generate K6 command")
def _generate_env_variables(self) -> str:
env_vars = self.load_params.get_env_vars()
env_vars[f"{self.load_params.load_type.value.upper()}_ENDPOINTS"] = ",".join(self.endpoints)
env_vars["SUMMARY_JSON"] = self.summary_json
reporter.attach("\n".join(f"{param}: {value}" for param, value in env_vars.items()), "K6 ENV variables")
return " ".join([f"-e {param}='{value}'" for param, value in env_vars.items() if value is not None])
def get_start_time(self) -> datetime:
return datetime.fromtimestamp(self._k6_process.start_time())
def get_end_time(self) -> datetime:
return datetime.fromtimestamp(self._k6_process.end_time())
def start(self) -> None:
with reporter.step(f"Start load from loader {self.loader.ip} on endpoints {self.endpoints}"):
self._k6_process.start()
def wait_until_finished(self, event, soft_timeout: int = 0) -> None:
with reporter.step(f"Wait until load is finished from loader {self.loader.ip} on endpoints {self.endpoints}"):
if self.load_params.scenario == LoadScenario.VERIFY:
timeout = self.load_params.verify_time or 0
else:
timeout = self.load_params.load_time or 0
start_time = int(self.get_start_time().timestamp())
current_time = int(datetime.utcnow().timestamp())
working_time = current_time - start_time
remaining_time = timeout - working_time
setup_teardown_time = (
int(K6_TEARDOWN_PERIOD)
+ self.load_params.get_init_time()
+ int(self.load_params.setup_timeout.replace("s", "").strip())
)
remaining_time_including_setup_and_teardown = remaining_time + setup_teardown_time
timeout = remaining_time_including_setup_and_teardown
if soft_timeout:
timeout = min(timeout, soft_timeout)
original_timeout = timeout
timeouts = {
"K6 start time": start_time,
"Current time": current_time,
"K6 working time": working_time,
"Remaining time for load": remaining_time,
"Setup and teardown": setup_teardown_time,
"Remaining time including setup/teardown": remaining_time_including_setup_and_teardown,
"Soft timeout": soft_timeout,
"Selected timeout": original_timeout,
}
reporter.attach("\n".join([f"{k}: {v}" for k, v in timeouts.items()]), "timeouts.txt")
min_wait_interval = 10
wait_interval = min_wait_interval
if self._k6_process is None:
assert "No k6 instances were executed"
while timeout > 0:
if not self.load_params.fill_percent is None:
with reporter.step(f"Check the percentage of filling of all data disks on the node"):
if self.check_fill_percent():
logger.info(f"Stopping load on because disks is filled more then {self.load_params.fill_percent}%")
event.set()
self.stop()
return
if event.is_set():
self.stop()
return
if not self._k6_process.running():
return
remaining_time_hours = f"{timeout//3600}h" if timeout // 3600 != 0 else ""
remaining_time_minutes = f"{timeout//60%60}m" if timeout // 60 % 60 != 0 else ""
logger.info(
f"K6 is running. Remaining time {remaining_time_hours}{remaining_time_minutes}{timeout%60}s. Next check after {wait_interval} seconds..."
)
sleep(wait_interval)
timeout -= min(timeout, wait_interval)
wait_interval = max(
min(timeout, int(math.log2(timeout + 1)) * 15) - min_wait_interval,
min_wait_interval,
)
if not self._k6_process.running():
return
self.stop()
if not soft_timeout:
raise TimeoutError(f"Expected K6 to finish after {original_timeout} sec.")
def get_results(self) -> Any:
with reporter.step(f"Get load results from loader {self.loader.ip} on endpoints {self.endpoints}"):
self.__log_output()
if not self.summary_json:
return None
summary_text = self.shell.exec(f"cat {self.summary_json}").stdout
summary_json = json.loads(summary_text)
endpoint = urlparse(self.endpoints[0]).netloc or self.endpoints[0]
allure_filenames = {
K6ProcessAllocationStrategy.PER_LOAD_NODE: f"{self.loader.ip}_{self.load_params.scenario.value}_summary.json",
K6ProcessAllocationStrategy.PER_ENDPOINT: f"{self.loader.ip}_{self.load_params.scenario.value}_{endpoint}_summary.json",
}
allure_filename = allure_filenames[self.load_params.k6_process_allocation_strategy]
reporter.attach(summary_text, allure_filename)
return summary_json
def stop(self) -> None:
with reporter.step(f"Stop load from loader {self.loader.ip} on endpoints {self.endpoints}"):
if self.is_running():
self._k6_process.stop()
self._wait_until_process_end()
def is_running(self) -> bool:
if self._k6_process:
return self._k6_process.running()
return False
@reporter.step("Wait until K6 process end")
@wait_for_success(K6_STOP_SIGNAL_TIMEOUT, 15, False, False, "Can not stop K6 process within timeout")
def _wait_until_process_end(self):
return self._k6_process.running()
def __log_output(self) -> None:
reporter.attach(self._k6_process.stdout(full=True), "K6 stdout")
reporter.attach(f"{self._k6_process.process_dir}/stderr", "K6 stderr path")