diff --git a/src/frostfs_testlib/load/k6.py b/src/frostfs_testlib/load/k6.py index 92da8e0..127a837 100644 --- a/src/frostfs_testlib/load/k6.py +++ b/src/frostfs_testlib/load/k6.py @@ -17,6 +17,8 @@ from frostfs_testlib.resources.load_params import K6_STOP_SIGNAL_TIMEOUT, K6_TEA from frostfs_testlib.shell import Shell from frostfs_testlib.storage.dataclasses.wallet import WalletInfo from frostfs_testlib.testing.test_control import wait_for_success +from threading import Event +from concurrent.futures import ThreadPoolExecutor EXIT_RESULT_CODE = 0 @@ -58,9 +60,21 @@ class K6: self.load_params.working_dir, f"{self.load_params.load_id}_{self.load_params.scenario.value}_summary.json", ) - self._k6_dir: str = k6_dir + def _get_fill_percents(self): + fill_percents = self.shell.exec("df -H --output=source,pcent,target | grep /srv/frostfs/").stdout.split("\n") + return [line.split() for line in fill_percents][:-1] + + def check_fill_percent(self): + fill_percents = self._get_fill_percents() + percent_mean = 0 + for line in fill_percents: + percent_mean += float(line[1].split('%')[0]) + percent_mean = percent_mean / len(fill_percents) + logger.info(f"{self.loader.ip} mean fill percent is {percent_mean}") + return percent_mean >= self.load_params.fill_percent + @property def process_dir(self) -> str: return self._k6_process.process_dir @@ -121,7 +135,7 @@ class K6: user = STORAGE_USER_NAME if self.load_params.scenario == LoadScenario.LOCAL else None self._k6_process = RemoteProcess.create(command, self.shell, self.load_params.working_dir, user) - def wait_until_finished(self, soft_timeout: int = 0) -> None: + def wait_until_finished(self, event, soft_timeout: int = 0) -> None: with reporter.step(f"Wait until load is finished from loader {self.loader.ip} on endpoints {self.endpoints}"): if self.load_params.scenario == LoadScenario.VERIFY: timeout = self.load_params.verify_time or 0 @@ -162,6 +176,7 @@ class K6: wait_interval = min_wait_interval if self._k6_process is None: assert "No k6 instances were executed" + while timeout > 0: if not self._k6_process.running(): return @@ -177,6 +192,14 @@ class K6: min_wait_interval, ) + if not self.load_params.fill_percent is None: + with reporter.step(f"Check the percentage of filling of all data disks on the node"): + if self.check_fill_percent(): + logger.info(f"Stopping load on because disks is filled more then {self.load_params.fill_percent}%") + self.stop() + event.set() + return + if not self._k6_process.running(): return diff --git a/src/frostfs_testlib/load/load_config.py b/src/frostfs_testlib/load/load_config.py index 735d8ec..1284519 100644 --- a/src/frostfs_testlib/load/load_config.py +++ b/src/frostfs_testlib/load/load_config.py @@ -185,6 +185,8 @@ class LoadParams: "NO_VERIFY_SSL", False, ) + # Percentage of filling of all data disks on all nodes + fill_percent: Optional[float] = None # ------- COMMON SCENARIO PARAMS ------- # Load time is the maximum duration for k6 to give load. Default is the BACKGROUND_LOAD_DEFAULT_TIME value. diff --git a/src/frostfs_testlib/load/runners.py b/src/frostfs_testlib/load/runners.py index f5284d8..f5cf689 100644 --- a/src/frostfs_testlib/load/runners.py +++ b/src/frostfs_testlib/load/runners.py @@ -9,13 +9,13 @@ from urllib.parse import urlparse import yaml -from frostfs_testlib import reporter from frostfs_testlib.cli.frostfs_authmate.authmate import FrostfsAuthmate from frostfs_testlib.load.interfaces.loader import Loader from frostfs_testlib.load.interfaces.scenario_runner import ScenarioRunner from frostfs_testlib.load.k6 import K6 from frostfs_testlib.load.load_config import K6ProcessAllocationStrategy, LoadParams, LoadType from frostfs_testlib.load.loaders import NodeLoader, RemoteLoader +from frostfs_testlib.reporter import get_reporter from frostfs_testlib.resources import optionals from frostfs_testlib.resources.cli import FROSTFS_AUTHMATE_EXEC from frostfs_testlib.resources.common import STORAGE_USER_NAME @@ -30,19 +30,24 @@ from frostfs_testlib.testing import parallel, run_optionally from frostfs_testlib.testing.test_control import retry from frostfs_testlib.utils import datetime_utils from frostfs_testlib.utils.file_keeper import FileKeeper +from threading import Event +from concurrent.futures import ThreadPoolExecutor + +reporter = get_reporter() class RunnerBase(ScenarioRunner): k6_instances: list[K6] - @reporter.step("Run preset on loaders") + @reporter.step_deco("Run preset on loaders") def preset(self): parallel([k6.preset for k6 in self.k6_instances]) - @reporter.step("Wait until load finish") + @reporter.step_deco("Wait until load finish") def wait_until_finish(self, soft_timeout: int = 0): - parallel([k6.wait_until_finished for k6 in self.k6_instances], soft_timeout=soft_timeout) - + event = Event() + parallel([k6.wait_until_finished for k6 in self.k6_instances], soft_timeout=soft_timeout, event=event) + @property def is_running(self): futures = parallel([k6.is_running for k6 in self.k6_instances]) @@ -68,7 +73,7 @@ class DefaultRunner(RunnerBase): self.loaders_wallet = loaders_wallet @run_optionally(optionals.OPTIONAL_BACKGROUND_LOAD_ENABLED) - @reporter.step("Preparation steps") + @reporter.step_deco("Preparation steps") def prepare( self, load_params: LoadParams, @@ -125,7 +130,7 @@ class DefaultRunner(RunnerBase): ] shell.exec("aws configure", CommandOptions(interactive_inputs=configure_input)) - @reporter.step("Init k6 instances") + @reporter.step_deco("Init k6 instances") def init_k6_instances(self, load_params: LoadParams, endpoints: list[str], k6_dir: str): self.k6_instances = [] cycled_loaders = itertools.cycle(self.loaders) @@ -269,7 +274,7 @@ class LocalRunner(RunnerBase): self.nodes_under_load = nodes_under_load @run_optionally(optionals.OPTIONAL_BACKGROUND_LOAD_ENABLED) - @reporter.step("Preparation steps") + @reporter.step_deco("Preparation steps") def prepare( self, load_params: LoadParams, @@ -296,7 +301,7 @@ class LocalRunner(RunnerBase): return True - @reporter.step("Prepare node {cluster_node}") + @reporter.step_deco("Prepare node {cluster_node}") def prepare_node(self, cluster_node: ClusterNode, k6_dir: str, load_params: LoadParams): shell = cluster_node.host.get_shell() @@ -321,7 +326,7 @@ class LocalRunner(RunnerBase): shell.exec(f'echo "{content}" | sudo tee {self.wallet.config_path}') shell.exec(f"sudo chmod -R 777 {self.wallet.config_path}") - @reporter.step("Init k6 instances") + @reporter.step_deco("Init k6 instances") def init_k6_instances(self, load_params: LoadParams, endpoints: list[str], k6_dir: str): self.k6_instances = [] futures = parallel( @@ -367,12 +372,12 @@ class LocalRunner(RunnerBase): with reporter.step(f"Wait for start timeout + couple more seconds ({wait_after_start_time}) before moving on"): time.sleep(wait_after_start_time) - @reporter.step("Restore passwd on {cluster_node}") + @reporter.step_deco("Restore passwd on {cluster_node}") def restore_passwd_on_node(self, cluster_node: ClusterNode): shell = cluster_node.host.get_shell() shell.exec("sudo chattr -i /etc/passwd") - @reporter.step("Lock passwd on {cluster_node}") + @reporter.step_deco("Lock passwd on {cluster_node}") def lock_passwd_on_node(self, cluster_node: ClusterNode): shell = cluster_node.host.get_shell() shell.exec("sudo chattr +i /etc/passwd") @@ -398,19 +403,19 @@ class S3LocalRunner(LocalRunner): endpoints: list[str] k6_dir: str - @reporter.step("Run preset on loaders") + @reporter.step_deco("Run preset on loaders") def preset(self): LocalRunner.preset(self) with reporter.step(f"Resolve containers in preset"): parallel(self._resolve_containers_in_preset, self.k6_instances) - @reporter.step("Resolve containers in preset") + @reporter.step_deco("Resolve containers in preset") def _resolve_containers_in_preset(self, k6_instance: K6): k6_instance.shell.exec( f"sudo {self.k6_dir}/scenarios/preset/resolve_containers_in_preset.py --endpoint {k6_instance.endpoints[0]} --preset_file {k6_instance.load_params.preset.pregen_json}" ) - @reporter.step("Init k6 instances") + @reporter.step_deco("Init k6 instances") def init_k6_instances(self, load_params: LoadParams, endpoints: list[str], k6_dir: str): self.k6_instances = [] futures = parallel( @@ -446,7 +451,7 @@ class S3LocalRunner(LocalRunner): ) @run_optionally(optionals.OPTIONAL_BACKGROUND_LOAD_ENABLED) - @reporter.step("Preparation steps") + @reporter.step_deco("Preparation steps") def prepare( self, load_params: LoadParams, @@ -462,7 +467,7 @@ class S3LocalRunner(LocalRunner): parallel(self.prepare_node, nodes_under_load, k6_dir, load_params, s3_public_keys, grpc_peer) - @reporter.step("Prepare node {cluster_node}") + @reporter.step_deco("Prepare node {cluster_node}") def prepare_node( self, cluster_node: ClusterNode, k6_dir: str, load_params: LoadParams, s3_public_keys: list[str], grpc_peer: str ): diff --git a/src/frostfs_testlib/resources/load_params.py b/src/frostfs_testlib/resources/load_params.py index 97193cc..960c039 100644 --- a/src/frostfs_testlib/resources/load_params.py +++ b/src/frostfs_testlib/resources/load_params.py @@ -1,7 +1,7 @@ import os # Background load node parameters -LOAD_NODES = os.getenv("LOAD_NODES", "").split() +LOAD_NODES = ["172.26.160.187"] # Must hardcode for now LOAD_NODE_SSH_USER = os.getenv("LOAD_NODE_SSH_USER", "service") LOAD_NODE_SSH_PASSWORD = os.getenv("LOAD_NODE_SSH_PASSWORD")