This commit is contained in:
Ekaterina Chernitsyna 2023-12-07 14:25:06 +03:00
parent dc6b0e407f
commit 02433bac42
4 changed files with 50 additions and 20 deletions

View file

@ -17,6 +17,8 @@ from frostfs_testlib.resources.load_params import K6_STOP_SIGNAL_TIMEOUT, K6_TEA
from frostfs_testlib.shell import Shell
from frostfs_testlib.storage.dataclasses.wallet import WalletInfo
from frostfs_testlib.testing.test_control import wait_for_success
from threading import Event
from concurrent.futures import ThreadPoolExecutor
EXIT_RESULT_CODE = 0
@ -58,9 +60,21 @@ class K6:
self.load_params.working_dir,
f"{self.load_params.load_id}_{self.load_params.scenario.value}_summary.json",
)
self._k6_dir: str = k6_dir
def _get_fill_percents(self):
fill_percents = self.shell.exec("df -H --output=source,pcent,target | grep /srv/frostfs/").stdout.split("\n")
return [line.split() for line in fill_percents][:-1]
def check_fill_percent(self):
fill_percents = self._get_fill_percents()
percent_mean = 0
for line in fill_percents:
percent_mean += float(line[1].split('%')[0])
percent_mean = percent_mean / len(fill_percents)
logger.info(f"{self.loader.ip} mean fill percent is {percent_mean}")
return percent_mean >= self.load_params.fill_percent
@property
def process_dir(self) -> str:
return self._k6_process.process_dir
@ -121,7 +135,7 @@ class K6:
user = STORAGE_USER_NAME if self.load_params.scenario == LoadScenario.LOCAL else None
self._k6_process = RemoteProcess.create(command, self.shell, self.load_params.working_dir, user)
def wait_until_finished(self, soft_timeout: int = 0) -> None:
def wait_until_finished(self, event, soft_timeout: int = 0) -> None:
with reporter.step(f"Wait until load is finished from loader {self.loader.ip} on endpoints {self.endpoints}"):
if self.load_params.scenario == LoadScenario.VERIFY:
timeout = self.load_params.verify_time or 0
@ -162,6 +176,7 @@ class K6:
wait_interval = min_wait_interval
if self._k6_process is None:
assert "No k6 instances were executed"
while timeout > 0:
if not self._k6_process.running():
return
@ -177,6 +192,14 @@ class K6:
min_wait_interval,
)
if not self.load_params.fill_percent is None:
with reporter.step(f"Check the percentage of filling of all data disks on the node"):
if self.check_fill_percent():
logger.info(f"Stopping load on because disks is filled more then {self.load_params.fill_percent}%")
self.stop()
event.set()
return
if not self._k6_process.running():
return

View file

@ -185,6 +185,8 @@ class LoadParams:
"NO_VERIFY_SSL",
False,
)
# Percentage of filling of all data disks on all nodes
fill_percent: Optional[float] = None
# ------- COMMON SCENARIO PARAMS -------
# Load time is the maximum duration for k6 to give load. Default is the BACKGROUND_LOAD_DEFAULT_TIME value.

View file

@ -9,13 +9,13 @@ from urllib.parse import urlparse
import yaml
from frostfs_testlib import reporter
from frostfs_testlib.cli.frostfs_authmate.authmate import FrostfsAuthmate
from frostfs_testlib.load.interfaces.loader import Loader
from frostfs_testlib.load.interfaces.scenario_runner import ScenarioRunner
from frostfs_testlib.load.k6 import K6
from frostfs_testlib.load.load_config import K6ProcessAllocationStrategy, LoadParams, LoadType
from frostfs_testlib.load.loaders import NodeLoader, RemoteLoader
from frostfs_testlib.reporter import get_reporter
from frostfs_testlib.resources import optionals
from frostfs_testlib.resources.cli import FROSTFS_AUTHMATE_EXEC
from frostfs_testlib.resources.common import STORAGE_USER_NAME
@ -30,19 +30,24 @@ from frostfs_testlib.testing import parallel, run_optionally
from frostfs_testlib.testing.test_control import retry
from frostfs_testlib.utils import datetime_utils
from frostfs_testlib.utils.file_keeper import FileKeeper
from threading import Event
from concurrent.futures import ThreadPoolExecutor
reporter = get_reporter()
class RunnerBase(ScenarioRunner):
k6_instances: list[K6]
@reporter.step("Run preset on loaders")
@reporter.step_deco("Run preset on loaders")
def preset(self):
parallel([k6.preset for k6 in self.k6_instances])
@reporter.step("Wait until load finish")
@reporter.step_deco("Wait until load finish")
def wait_until_finish(self, soft_timeout: int = 0):
parallel([k6.wait_until_finished for k6 in self.k6_instances], soft_timeout=soft_timeout)
event = Event()
parallel([k6.wait_until_finished for k6 in self.k6_instances], soft_timeout=soft_timeout, event=event)
@property
def is_running(self):
futures = parallel([k6.is_running for k6 in self.k6_instances])
@ -68,7 +73,7 @@ class DefaultRunner(RunnerBase):
self.loaders_wallet = loaders_wallet
@run_optionally(optionals.OPTIONAL_BACKGROUND_LOAD_ENABLED)
@reporter.step("Preparation steps")
@reporter.step_deco("Preparation steps")
def prepare(
self,
load_params: LoadParams,
@ -125,7 +130,7 @@ class DefaultRunner(RunnerBase):
]
shell.exec("aws configure", CommandOptions(interactive_inputs=configure_input))
@reporter.step("Init k6 instances")
@reporter.step_deco("Init k6 instances")
def init_k6_instances(self, load_params: LoadParams, endpoints: list[str], k6_dir: str):
self.k6_instances = []
cycled_loaders = itertools.cycle(self.loaders)
@ -269,7 +274,7 @@ class LocalRunner(RunnerBase):
self.nodes_under_load = nodes_under_load
@run_optionally(optionals.OPTIONAL_BACKGROUND_LOAD_ENABLED)
@reporter.step("Preparation steps")
@reporter.step_deco("Preparation steps")
def prepare(
self,
load_params: LoadParams,
@ -296,7 +301,7 @@ class LocalRunner(RunnerBase):
return True
@reporter.step("Prepare node {cluster_node}")
@reporter.step_deco("Prepare node {cluster_node}")
def prepare_node(self, cluster_node: ClusterNode, k6_dir: str, load_params: LoadParams):
shell = cluster_node.host.get_shell()
@ -321,7 +326,7 @@ class LocalRunner(RunnerBase):
shell.exec(f'echo "{content}" | sudo tee {self.wallet.config_path}')
shell.exec(f"sudo chmod -R 777 {self.wallet.config_path}")
@reporter.step("Init k6 instances")
@reporter.step_deco("Init k6 instances")
def init_k6_instances(self, load_params: LoadParams, endpoints: list[str], k6_dir: str):
self.k6_instances = []
futures = parallel(
@ -367,12 +372,12 @@ class LocalRunner(RunnerBase):
with reporter.step(f"Wait for start timeout + couple more seconds ({wait_after_start_time}) before moving on"):
time.sleep(wait_after_start_time)
@reporter.step("Restore passwd on {cluster_node}")
@reporter.step_deco("Restore passwd on {cluster_node}")
def restore_passwd_on_node(self, cluster_node: ClusterNode):
shell = cluster_node.host.get_shell()
shell.exec("sudo chattr -i /etc/passwd")
@reporter.step("Lock passwd on {cluster_node}")
@reporter.step_deco("Lock passwd on {cluster_node}")
def lock_passwd_on_node(self, cluster_node: ClusterNode):
shell = cluster_node.host.get_shell()
shell.exec("sudo chattr +i /etc/passwd")
@ -398,19 +403,19 @@ class S3LocalRunner(LocalRunner):
endpoints: list[str]
k6_dir: str
@reporter.step("Run preset on loaders")
@reporter.step_deco("Run preset on loaders")
def preset(self):
LocalRunner.preset(self)
with reporter.step(f"Resolve containers in preset"):
parallel(self._resolve_containers_in_preset, self.k6_instances)
@reporter.step("Resolve containers in preset")
@reporter.step_deco("Resolve containers in preset")
def _resolve_containers_in_preset(self, k6_instance: K6):
k6_instance.shell.exec(
f"sudo {self.k6_dir}/scenarios/preset/resolve_containers_in_preset.py --endpoint {k6_instance.endpoints[0]} --preset_file {k6_instance.load_params.preset.pregen_json}"
)
@reporter.step("Init k6 instances")
@reporter.step_deco("Init k6 instances")
def init_k6_instances(self, load_params: LoadParams, endpoints: list[str], k6_dir: str):
self.k6_instances = []
futures = parallel(
@ -446,7 +451,7 @@ class S3LocalRunner(LocalRunner):
)
@run_optionally(optionals.OPTIONAL_BACKGROUND_LOAD_ENABLED)
@reporter.step("Preparation steps")
@reporter.step_deco("Preparation steps")
def prepare(
self,
load_params: LoadParams,
@ -462,7 +467,7 @@ class S3LocalRunner(LocalRunner):
parallel(self.prepare_node, nodes_under_load, k6_dir, load_params, s3_public_keys, grpc_peer)
@reporter.step("Prepare node {cluster_node}")
@reporter.step_deco("Prepare node {cluster_node}")
def prepare_node(
self, cluster_node: ClusterNode, k6_dir: str, load_params: LoadParams, s3_public_keys: list[str], grpc_peer: str
):

View file

@ -1,7 +1,7 @@
import os
# Background load node parameters
LOAD_NODES = os.getenv("LOAD_NODES", "").split()
LOAD_NODES = ["172.26.160.187"]
# Must hardcode for now
LOAD_NODE_SSH_USER = os.getenv("LOAD_NODE_SSH_USER", "service")
LOAD_NODE_SSH_PASSWORD = os.getenv("LOAD_NODE_SSH_PASSWORD")