diff --git a/src/frostfs_testlib/load/k6.py b/src/frostfs_testlib/load/k6.py index 2ce7c757..38167d24 100644 --- a/src/frostfs_testlib/load/k6.py +++ b/src/frostfs_testlib/load/k6.py @@ -72,6 +72,19 @@ class K6: ) self._k6_process = RemoteProcess.create(command, self.shell, self.load_params.working_dir, user, process_id) + def _get_fill_percents(self): + fill_percents = self.shell.exec("df -H --output=source,pcent,target | grep frostfs").stdout.split("\n") + return [line.split() for line in fill_percents][:-1] + + def check_fill_percent(self): + fill_percents = self._get_fill_percents() + percent_mean = 0 + for line in fill_percents: + percent_mean += float(line[1].split('%')[0]) + percent_mean = percent_mean / len(fill_percents) + logger.info(f"{self.loader.ip} mean fill percent is {percent_mean}") + return percent_mean >= self.load_params.fill_percent + @property def process_dir(self) -> str: return self._k6_process.process_dir @@ -132,7 +145,7 @@ class K6: with reporter.step(f"Start load from loader {self.loader.ip} on endpoints {self.endpoints}"): self._k6_process.start() - def wait_until_finished(self, soft_timeout: int = 0) -> None: + def wait_until_finished(self, event, soft_timeout: int = 0) -> None: with reporter.step(f"Wait until load is finished from loader {self.loader.ip} on endpoints {self.endpoints}"): if self.load_params.scenario == LoadScenario.VERIFY: timeout = self.load_params.verify_time or 0 @@ -175,9 +188,23 @@ class K6: wait_interval = min_wait_interval if self._k6_process is None: assert "No k6 instances were executed" + while timeout > 0: + if not self.load_params.fill_percent is None: + with reporter.step(f"Check the percentage of filling of all data disks on the node"): + if self.check_fill_percent(): + logger.info(f"Stopping load on because disks is filled more then {self.load_params.fill_percent}%") + event.set() + self.stop() + return + + if event.is_set(): + self.stop() + return + if not self._k6_process.running(): return + remaining_time_hours = f"{timeout//3600}h" if timeout // 3600 != 0 else "" remaining_time_minutes = f"{timeout//60%60}m" if timeout // 60 % 60 != 0 else "" logger.info( diff --git a/src/frostfs_testlib/load/load_config.py b/src/frostfs_testlib/load/load_config.py index c1c98feb..df465214 100644 --- a/src/frostfs_testlib/load/load_config.py +++ b/src/frostfs_testlib/load/load_config.py @@ -195,6 +195,8 @@ class LoadParams: "NO_VERIFY_SSL", False, ) + # Percentage of filling of all data disks on all nodes + fill_percent: Optional[float] = None # ------- COMMON SCENARIO PARAMS ------- # Load time is the maximum duration for k6 to give load. Default is the BACKGROUND_LOAD_DEFAULT_TIME value. diff --git a/src/frostfs_testlib/load/runners.py b/src/frostfs_testlib/load/runners.py index f5284d82..dd6d50e4 100644 --- a/src/frostfs_testlib/load/runners.py +++ b/src/frostfs_testlib/load/runners.py @@ -30,6 +30,7 @@ from frostfs_testlib.testing import parallel, run_optionally from frostfs_testlib.testing.test_control import retry from frostfs_testlib.utils import datetime_utils from frostfs_testlib.utils.file_keeper import FileKeeper +from threading import Event class RunnerBase(ScenarioRunner): @@ -41,7 +42,8 @@ class RunnerBase(ScenarioRunner): @reporter.step("Wait until load finish") def wait_until_finish(self, soft_timeout: int = 0): - parallel([k6.wait_until_finished for k6 in self.k6_instances], soft_timeout=soft_timeout) + event = Event() + parallel([k6.wait_until_finished for k6 in self.k6_instances], event=event, soft_timeout=soft_timeout) @property def is_running(self):