diff --git a/src/frostfs_testlib/load/interfaces.py b/src/frostfs_testlib/load/interfaces.py index fbbc20b6..6f298684 100644 --- a/src/frostfs_testlib/load/interfaces.py +++ b/src/frostfs_testlib/load/interfaces.py @@ -39,6 +39,10 @@ class ScenarioRunner(ABC): def stop(self): """Stop K6 instances""" + @abstractmethod + def preset(self): + """Run preset for load""" + @property @abstractmethod def is_running(self) -> bool: diff --git a/src/frostfs_testlib/load/k6.py b/src/frostfs_testlib/load/k6.py index ca3f696b..7ec3c216 100644 --- a/src/frostfs_testlib/load/k6.py +++ b/src/frostfs_testlib/load/k6.py @@ -72,58 +72,58 @@ class K6: def process_dir(self) -> str: return self._k6_process.process_dir - @reporter.step_deco("Preset containers and objects") def preset(self) -> str: - preset_grpc = f"{self._k6_dir}/scenarios/preset/preset_grpc.py" - preset_s3 = f"{self._k6_dir}/scenarios/preset/preset_s3.py" - preset_map = { - LoadType.gRPC: preset_grpc, - LoadType.S3: preset_s3, - LoadType.HTTP: preset_grpc, - } + with reporter.step(f"Run preset on loader {self.loader.ip} for endpoints {self.endpoints}"): + preset_grpc = f"{self._k6_dir}/scenarios/preset/preset_grpc.py" + preset_s3 = f"{self._k6_dir}/scenarios/preset/preset_s3.py" + preset_map = { + LoadType.gRPC: preset_grpc, + LoadType.S3: preset_s3, + LoadType.HTTP: preset_grpc, + } - base_args = { - preset_grpc: [ - preset_grpc, - f"--endpoint {self.endpoints[0]}", - f"--wallet {self.wallet.path} ", - f"--config {self.wallet.config_path} ", - ], - preset_s3: [ - preset_s3, - f"--endpoint {self.endpoints[0]}", - ], - } + base_args = { + preset_grpc: [ + preset_grpc, + f"--endpoint {','.join(self.endpoints)}", + f"--wallet {self.wallet.path} ", + f"--config {self.wallet.config_path} ", + ], + preset_s3: [ + preset_s3, + f"--endpoint {','.join(self.endpoints)}", + ], + } - preset_scenario = preset_map[self.load_params.load_type] - command_args = base_args[preset_scenario].copy() + preset_scenario = preset_map[self.load_params.load_type] + command_args = base_args[preset_scenario].copy() - command_args += [ - f"--{field.metadata['preset_argument']} '{getattr(self.load_params, field.name)}'" - for field in fields(self.load_params) - if field.metadata - and self.scenario in field.metadata["applicable_scenarios"] - and field.metadata["preset_argument"] - and getattr(self.load_params, field.name) is not None - ] - - if self.load_params.preset: command_args += [ - f"--{field.metadata['preset_argument']} '{getattr(self.load_params.preset, field.name)}'" - for field in fields(self.load_params.preset) + f"--{field.metadata['preset_argument']} '{getattr(self.load_params, field.name)}'" + for field in fields(self.load_params) if field.metadata and self.scenario in field.metadata["applicable_scenarios"] and field.metadata["preset_argument"] - and getattr(self.load_params.preset, field.name) is not None + and getattr(self.load_params, field.name) is not None ] - command = " ".join(command_args) - result = self.shell.exec(command) + if self.load_params.preset: + command_args += [ + f"--{field.metadata['preset_argument']} '{getattr(self.load_params.preset, field.name)}'" + for field in fields(self.load_params.preset) + if field.metadata + and self.scenario in field.metadata["applicable_scenarios"] + and field.metadata["preset_argument"] + and getattr(self.load_params.preset, field.name) is not None + ] - assert ( - result.return_code == EXIT_RESULT_CODE - ), f"Return code of preset is not zero: {result.stdout}" - return result.stdout.strip("\n") + command = " ".join(command_args) + result = self.shell.exec(command) + + assert ( + result.return_code == EXIT_RESULT_CODE + ), f"Return code of preset is not zero: {result.stdout}" + return result.stdout.strip("\n") @reporter.step_deco("Generate K6 command") def _generate_env_variables(self) -> str: @@ -232,7 +232,6 @@ class K6: self._wait_until_process_end() - @property def is_running(self) -> bool: if self._k6_process: return self._k6_process.running() diff --git a/src/frostfs_testlib/load/load_report.py b/src/frostfs_testlib/load/load_report.py index 7f912e4f..dcd81b49 100644 --- a/src/frostfs_testlib/load/load_report.py +++ b/src/frostfs_testlib/load/load_report.py @@ -43,8 +43,10 @@ class LoadReport: return html def _get_load_params_section_html(self) -> str: - params: str = yaml.safe_dump(self.load_test, sort_keys=False) - params = params.replace("\n", "
") + params: str = yaml.safe_dump( + [self.load_test], sort_keys=False, indent=2, explicit_start=True + ) + params = params.replace("\n", "
").replace(" ", " ") section_html = f"""

Scenario params

{params}
@@ -139,7 +141,7 @@ class LoadReport: duration = self._seconds_to_formatted_duration(self.load_params.load_time) model = self._get_model_string() # write 8KB 15h49m 50op/sec 50th open model/closed model/min_iteration duration=1s - 1.636MB/s 199.57451/s - short_summary = f"{operation_type} {object_size}{object_size_unit} {duration} {requested_rate_str} {vus_str} {model} - {throughput:.2f}{unit} {total_rate:.2f}/s" + short_summary = f"{operation_type} {object_size}{object_size_unit} {duration} {requested_rate_str} {vus_str} {model} - {throughput:.2f}{unit}/s {total_rate:.2f}/s" html = f""" diff --git a/src/frostfs_testlib/load/runners.py b/src/frostfs_testlib/load/runners.py index d8758f66..d6cf2ae9 100644 --- a/src/frostfs_testlib/load/runners.py +++ b/src/frostfs_testlib/load/runners.py @@ -28,15 +28,31 @@ from frostfs_testlib.storage.cluster import ClusterNode from frostfs_testlib.storage.controllers.cluster_state_controller import ClusterStateController from frostfs_testlib.storage.dataclasses.frostfs_services import S3Gate, StorageNode from frostfs_testlib.storage.dataclasses.wallet import WalletInfo -from frostfs_testlib.testing.test_control import run_optionally -from frostfs_testlib.utils import datetime_utils -from frostfs_testlib.utils.file_keeper import FileKeeper +from frostfs_testlib.testing import parallel, run_optionally +from frostfs_testlib.utils import FileKeeper, datetime_utils reporter = get_reporter() -class DefaultRunner(ScenarioRunner): +class RunnerBase(ScenarioRunner): k6_instances: list[K6] + + @reporter.step_deco("Run preset on loaders") + def preset(self): + parallel([k6.preset for k6 in self.k6_instances]) + + @reporter.step_deco("Wait until load finish") + def wait_until_finish(self): + parallel([k6.wait_until_finished for k6 in self.k6_instances]) + + @property + def is_running(self): + futures = parallel([k6.is_running for k6 in self.k6_instances]) + + return any([future.result() for future in futures]) + + +class DefaultRunner(RunnerBase): loaders: list[Loader] loaders_wallet: WalletInfo @@ -51,7 +67,7 @@ class DefaultRunner(ScenarioRunner): self.loaders_wallet = loaders_wallet @run_optionally(optionals.OPTIONAL_BACKGROUND_LOAD_ENABLED) - @reporter.step_deco("Prepare load instances") + @reporter.step_deco("Preparation steps") def prepare( self, load_params: LoadParams, @@ -68,48 +84,52 @@ class DefaultRunner(ScenarioRunner): ] grpc_peer = storage_node.get_rpc_endpoint() - for loader in self.loaders: - with reporter.step(f"Init s3 client on {loader.ip}"): - shell = loader.get_shell() - frostfs_authmate_exec: FrostfsAuthmate = FrostfsAuthmate( - shell, FROSTFS_AUTHMATE_EXEC - ) - issue_secret_output = frostfs_authmate_exec.secret.issue( - wallet=self.loaders_wallet.path, - peer=grpc_peer, - gate_public_key=s3_public_keys, - container_placement_policy=load_params.preset.container_placement_policy, - container_policy=f"{k6_dir}/scenarios/files/policy.json", - wallet_password=self.loaders_wallet.password, - ).stdout - aws_access_key_id = str( - re.search( - r"access_key_id.*:\s.(?P\w*)", issue_secret_output - ).group("aws_access_key_id") - ) - aws_secret_access_key = str( - re.search( - r"secret_access_key.*:\s.(?P\w*)", - issue_secret_output, - ).group("aws_secret_access_key") - ) + parallel( + self._prepare_loader, self.loaders, load_params, grpc_peer, s3_public_keys, k6_dir + ) - configure_input = [ - InteractiveInput( - prompt_pattern=r"AWS Access Key ID.*", input=aws_access_key_id - ), - InteractiveInput( - prompt_pattern=r"AWS Secret Access Key.*", input=aws_secret_access_key - ), - InteractiveInput(prompt_pattern=r".*", input=""), - InteractiveInput(prompt_pattern=r".*", input=""), - ] - shell.exec("aws configure", CommandOptions(interactive_inputs=configure_input)) + def _prepare_loader( + self, + loader: Loader, + load_params: LoadParams, + grpc_peer: str, + s3_public_keys: list[str], + k6_dir: str, + ): + with reporter.step(f"Init s3 client on {loader.ip}"): + shell = loader.get_shell() + frostfs_authmate_exec: FrostfsAuthmate = FrostfsAuthmate(shell, FROSTFS_AUTHMATE_EXEC) + issue_secret_output = frostfs_authmate_exec.secret.issue( + wallet=self.loaders_wallet.path, + peer=grpc_peer, + gate_public_key=s3_public_keys, + container_placement_policy=load_params.preset.container_placement_policy, + container_policy=f"{k6_dir}/scenarios/files/policy.json", + wallet_password=self.loaders_wallet.password, + ).stdout + aws_access_key_id = str( + re.search( + r"access_key_id.*:\s.(?P\w*)", issue_secret_output + ).group("aws_access_key_id") + ) + aws_secret_access_key = str( + re.search( + r"secret_access_key.*:\s.(?P\w*)", + issue_secret_output, + ).group("aws_secret_access_key") + ) - def wait_until_finish(self): - for k6_instance in self.k6_instances: - k6_instance.wait_until_finished() + configure_input = [ + InteractiveInput(prompt_pattern=r"AWS Access Key ID.*", input=aws_access_key_id), + InteractiveInput( + prompt_pattern=r"AWS Secret Access Key.*", input=aws_secret_access_key + ), + InteractiveInput(prompt_pattern=r".*", input=""), + InteractiveInput(prompt_pattern=r".*", input=""), + ] + shell.exec("aws configure", CommandOptions(interactive_inputs=configure_input)) + @reporter.step_deco("Init k6 instances") def init_k6_instances(self, load_params: LoadParams, endpoints: list[str], k6_dir: str): self.k6_instances = [] cycled_loaders = itertools.cycle(self.loaders) @@ -131,29 +151,32 @@ class DefaultRunner(ScenarioRunner): load_params, k6_processes_count ) - for distributed_load_params in distributed_load_params_list: - loader = next(cycled_loaders) - shell = loader.get_shell() - with reporter.step( - f"Init K6 instances on {loader.ip} for load id {distributed_load_params.load_id}" - ): - with reporter.step(f"Make working directory"): - shell.exec(f"sudo mkdir -p {distributed_load_params.working_dir}") - shell.exec( - f"sudo chown {LOAD_NODE_SSH_USER} {distributed_load_params.working_dir}" - ) + futures = parallel( + self._init_k6_instance, + distributed_load_params_list, + loader=cycled_loaders, + endpoints=endpoints_gen, + k6_dir=k6_dir, + ) + self.k6_instances = [future.result() for future in futures] - k6_instance = K6( - distributed_load_params, - next(endpoints_gen), - k6_dir, - shell, - loader, - self.loaders_wallet, - ) - self.k6_instances.append(k6_instance) - if load_params.preset: - k6_instance.preset() + def _init_k6_instance( + self, load_params_for_loader: LoadParams, loader: Loader, endpoints: list[str], k6_dir: str + ): + shell = loader.get_shell() + with reporter.step(f"Init K6 instance on {loader.ip} for endpoints {endpoints}"): + with reporter.step(f"Make working directory"): + shell.exec(f"sudo mkdir -p {load_params_for_loader.working_dir}") + shell.exec(f"sudo chown {LOAD_NODE_SSH_USER} {load_params_for_loader.working_dir}") + + return K6( + load_params_for_loader, + endpoints, + k6_dir, + shell, + loader, + self.loaders_wallet, + ) def _get_distributed_load_params_list( self, original_load_params: LoadParams, workers_count: int @@ -215,15 +238,7 @@ class DefaultRunner(ScenarioRunner): def start(self): load_params = self.k6_instances[0].load_params - with ThreadPoolExecutor(max_workers=len(self.k6_instances)) as executor: - futures = [executor.submit(k6.start) for k6 in self.k6_instances] - - # Check for exceptions - exceptions = [future.exception() for future in futures if future.exception()] - if exceptions: - raise RuntimeError( - f"The following exceptions occured during start of k6: {exceptions}" - ) + parallel([k6.start for k6 in self.k6_instances]) wait_after_start_time = datetime_utils.parse_time(load_params.setup_timeout) + 5 with reporter.step( @@ -251,17 +266,8 @@ class DefaultRunner(ScenarioRunner): return results - @property - def is_running(self): - for k6_instance in self.k6_instances: - if not k6_instance.is_running: - return False - return True - - -class LocalRunner(ScenarioRunner): - k6_instances: list[K6] +class LocalRunner(RunnerBase): loaders: list[Loader] cluster_state_controller: ClusterStateController file_keeper: FileKeeper @@ -278,7 +284,7 @@ class LocalRunner(ScenarioRunner): self.loaders = [NodeLoader(node) for node in nodes_under_load] @run_optionally(optionals.OPTIONAL_BACKGROUND_LOAD_ENABLED) - @reporter.step_deco("Prepare load instances") + @reporter.step_deco("Preparation steps") def prepare( self, load_params: LoadParams, @@ -319,37 +325,39 @@ class LocalRunner(ScenarioRunner): for _ in result: pass - def wait_until_finish(self): - for k6_instance in self.k6_instances: - k6_instance.wait_until_finished() - + @reporter.step_deco("Init k6 instances") def init_k6_instances(self, load_params: LoadParams, endpoints: list[str], k6_dir: str): self.k6_instances = [] - for loader in self.loaders: - shell = loader.get_shell() - with reporter.step(f"Init K6 instances on {loader.ip}"): - with reporter.step(f"Make working directory"): - shell.exec(f"sudo mkdir -p {load_params.working_dir}") - # If we chmod /home/ folder we can no longer ssh to the node - # !! IMPORTANT !! - if ( - load_params.working_dir - and not load_params.working_dir == f"/home/{LOAD_NODE_SSH_USER}" - and not load_params.working_dir == f"/home/{LOAD_NODE_SSH_USER}/" - ): - shell.exec(f"sudo chmod -R 777 {load_params.working_dir}") + futures = parallel( + self._init_k6_instance, + self.loaders, + load_params, + k6_dir, + ) + self.k6_instances = [future.result() for future in futures] - k6_instance = K6( - load_params, - ["localhost:8080"], - k6_dir, - shell, - loader, - self.wallet, - ) - self.k6_instances.append(k6_instance) - if load_params.preset: - k6_instance.preset() + def _init_k6_instance(self, loader: Loader, load_params: LoadParams, k6_dir: str): + shell = loader.get_shell() + with reporter.step(f"Init K6 instance on {loader.ip}"): + with reporter.step(f"Make working directory"): + shell.exec(f"sudo mkdir -p {load_params.working_dir}") + # If we chmod /home/ folder we can no longer ssh to the node + # !! IMPORTANT !! + if ( + load_params.working_dir + and not load_params.working_dir == f"/home/{LOAD_NODE_SSH_USER}" + and not load_params.working_dir == f"/home/{LOAD_NODE_SSH_USER}/" + ): + shell.exec(f"sudo chmod -R 777 {load_params.working_dir}") + + return K6( + load_params, + ["localhost:8080"], + k6_dir, + shell, + loader, + self.wallet, + ) def start(self): load_params = self.k6_instances[0].load_params @@ -357,15 +365,7 @@ class LocalRunner(ScenarioRunner): self.cluster_state_controller.stop_all_s3_gates() self.cluster_state_controller.stop_all_storage_services() - with ThreadPoolExecutor(max_workers=len(self.k6_instances)) as executor: - futures = [executor.submit(k6.start) for k6 in self.k6_instances] - - # Check for exceptions - exceptions = [future.exception() for future in futures if future.exception()] - if exceptions: - raise RuntimeError( - f"The following exceptions occured during start of k6: {exceptions}" - ) + parallel([k6.start for k6 in self.k6_instances]) wait_after_start_time = datetime_utils.parse_time(load_params.setup_timeout) + 5 with reporter.step( @@ -387,11 +387,3 @@ class LocalRunner(ScenarioRunner): results[k6_instance.loader.ip] = result return results - - @property - def is_running(self): - for k6_instance in self.k6_instances: - if not k6_instance.is_running: - return False - - return True diff --git a/src/frostfs_testlib/storage/controllers/background_load_controller.py b/src/frostfs_testlib/storage/controllers/background_load_controller.py index 6cedd0f2..ac3a9201 100644 --- a/src/frostfs_testlib/storage/controllers/background_load_controller.py +++ b/src/frostfs_testlib/storage/controllers/background_load_controller.py @@ -80,14 +80,17 @@ class BackgroundLoadController: LoadType.S3: { EndpointSelectionStrategy.ALL: list( set( - endpoint.replace("http://", "") + endpoint.replace("http://", "").replace("https://", "") for node_under_load in self.nodes_under_load for endpoint in node_under_load.service(S3Gate).get_all_endpoints() ) ), EndpointSelectionStrategy.FIRST: list( set( - node_under_load.service(S3Gate).get_endpoint().replace("http://", "") + node_under_load.service(S3Gate) + .get_endpoint() + .replace("http://", "") + .replace("https://", "") for node_under_load in self.nodes_under_load ) ), @@ -131,8 +134,13 @@ class BackgroundLoadController: @reporter.step_deco("Startup load") def startup(self): self.prepare() + self.preset() self.start() + @run_optionally(optionals.OPTIONAL_BACKGROUND_LOAD_ENABLED) + def preset(self): + self.runner.preset() + @run_optionally(optionals.OPTIONAL_BACKGROUND_LOAD_ENABLED) @reporter.step_deco("Stop and get results of load") def teardown(self, load_report: Optional[LoadReport] = None): diff --git a/src/frostfs_testlib/testing/__init__.py b/src/frostfs_testlib/testing/__init__.py new file mode 100644 index 00000000..34839721 --- /dev/null +++ b/src/frostfs_testlib/testing/__init__.py @@ -0,0 +1,2 @@ +from frostfs_testlib.testing.parallel import parallel +from frostfs_testlib.testing.test_control import expect_not_raises, run_optionally, wait_for_success diff --git a/src/frostfs_testlib/testing/parallel.py b/src/frostfs_testlib/testing/parallel.py new file mode 100644 index 00000000..7f4ee264 --- /dev/null +++ b/src/frostfs_testlib/testing/parallel.py @@ -0,0 +1,98 @@ +import itertools +from concurrent.futures import Future, ThreadPoolExecutor +from typing import Callable, Collection, Optional, Union + + +def parallel( + fn: Union[Callable, list[Callable]], + parallel_items: Optional[Collection] = None, + *args, + **kwargs, +) -> list[Future]: + """Parallel execution of selected function or list of function using ThreadPoolExecutor. + Also checks the exceptions of each thread. + + Args: + fn: function(s) to run. Can work in 2 modes: + 1. If you have dedicated function with some items to process in parallel, + like you do with executor.map(fn, parallel_items), pass this function as fn. + 2. If you need to process each item with it's own method, like you do + with executor.submit(fn, args, kwargs), pass list of methods here. + See examples in runners.py in this repo. + parallel_items: items to iterate on (should be None in case of 2nd mode). + args: any other args required in target function(s). + if any arg is itertool.cycle, it will be iterated before passing to new thread. + kwargs: any other kwargs required in target function(s) + if any kwarg is itertool.cycle, it will be iterated before passing to new thread. + + Returns: + list of futures. + """ + + if callable(fn): + if not parallel_items: + raise RuntimeError("Parallel items should not be none when fn is callable.") + futures = _run_by_items(fn, parallel_items, *args, **kwargs) + elif isinstance(fn, list): + futures = _run_by_fn_list(fn, *args, **kwargs) + else: + raise RuntimeError("Nothing to run. fn should be either callable or list of callables.") + + # Check for exceptions + exceptions = [future.exception() for future in futures if future.exception()] + if exceptions: + message = "\n".join([str(e) for e in exceptions]) + raise RuntimeError(f"The following exceptions occured during parallel run: {message}") + return futures + + +def _run_by_fn_list(fn_list: list[Callable], *args, **kwargs) -> list[Future]: + if not len(fn_list): + return [] + if not all([callable(f) for f in fn_list]): + raise RuntimeError("fn_list should contain only callables") + + futures: list[Future] = [] + + with ThreadPoolExecutor(max_workers=len(fn_list)) as executor: + for fn in fn_list: + task_args = _get_args(*args) + task_kwargs = _get_kwargs(**kwargs) + + futures.append(executor.submit(fn, *task_args, **task_kwargs)) + + return futures + + +def _run_by_items(fn: Callable, parallel_items: Collection, *args, **kwargs) -> list[Future]: + futures: list[Future] = [] + + with ThreadPoolExecutor(max_workers=len(parallel_items)) as executor: + for item in parallel_items: + task_args = _get_args(*args) + task_kwargs = _get_kwargs(**kwargs) + task_args.insert(0, item) + + futures.append(executor.submit(fn, *task_args, **task_kwargs)) + + return futures + + +def _get_kwargs(**kwargs): + actkwargs = {} + for key, arg in kwargs.items(): + if isinstance(arg, itertools.cycle): + actkwargs[key] = next(arg) + else: + actkwargs[key] = arg + return actkwargs + + +def _get_args(*args): + actargs = [] + for arg in args: + if isinstance(arg, itertools.cycle): + actargs.append(next(arg)) + else: + actargs.append(arg) + return actargs diff --git a/src/frostfs_testlib/utils/__init__.py b/src/frostfs_testlib/utils/__init__.py index fbc4a8f7..01cf462e 100644 --- a/src/frostfs_testlib/utils/__init__.py +++ b/src/frostfs_testlib/utils/__init__.py @@ -3,3 +3,4 @@ import frostfs_testlib.utils.datetime_utils import frostfs_testlib.utils.json_utils import frostfs_testlib.utils.string_utils import frostfs_testlib.utils.wallet_utils +from frostfs_testlib.utils.file_keeper import FileKeeper