import copy import itertools import math import re import time from concurrent.futures import ThreadPoolExecutor from dataclasses import fields from typing import Optional from urllib.parse import urlparse import yaml from frostfs_testlib.cli.frostfs_authmate.authmate import FrostfsAuthmate from frostfs_testlib.load.interfaces import Loader, ScenarioRunner from frostfs_testlib.load.k6 import K6 from frostfs_testlib.load.load_config import K6ProcessAllocationStrategy, LoadParams, LoadType from frostfs_testlib.load.loaders import NodeLoader, RemoteLoader from frostfs_testlib.reporter import get_reporter from frostfs_testlib.resources import optionals from frostfs_testlib.resources.cli import FROSTFS_AUTHMATE_EXEC from frostfs_testlib.resources.common import STORAGE_USER_NAME from frostfs_testlib.resources.load_params import ( BACKGROUND_LOAD_VUS_COUNT_DIVISOR, LOAD_NODE_SSH_USER, LOAD_NODES, ) from frostfs_testlib.shell.interfaces import CommandOptions, InteractiveInput from frostfs_testlib.storage.cluster import ClusterNode from frostfs_testlib.storage.controllers.cluster_state_controller import ClusterStateController from frostfs_testlib.storage.dataclasses.frostfs_services import S3Gate, StorageNode from frostfs_testlib.storage.dataclasses.wallet import WalletInfo from frostfs_testlib.testing import parallel, run_optionally from frostfs_testlib.utils import FileKeeper, datetime_utils reporter = get_reporter() class RunnerBase(ScenarioRunner): k6_instances: list[K6] @reporter.step_deco("Run preset on loaders") def preset(self): parallel([k6.preset for k6 in self.k6_instances]) @reporter.step_deco("Wait until load finish") def wait_until_finish(self): parallel([k6.wait_until_finished for k6 in self.k6_instances]) @property def is_running(self): futures = parallel([k6.is_running for k6 in self.k6_instances]) return any([future.result() for future in futures]) class DefaultRunner(RunnerBase): loaders: list[Loader] loaders_wallet: WalletInfo def __init__( self, loaders_wallet: WalletInfo, load_ip_list: Optional[list[str]] = None, ) -> None: if load_ip_list is None: load_ip_list = LOAD_NODES self.loaders = RemoteLoader.from_ip_list(load_ip_list) self.loaders_wallet = loaders_wallet @run_optionally(optionals.OPTIONAL_BACKGROUND_LOAD_ENABLED) @reporter.step_deco("Preparation steps") def prepare( self, load_params: LoadParams, nodes_under_load: list[ClusterNode], k6_dir: str, ): if load_params.load_type != LoadType.S3: return with reporter.step("Init s3 client on loaders"): storage_node = nodes_under_load[0].service(StorageNode) s3_public_keys = [ node.service(S3Gate).get_wallet_public_key() for node in nodes_under_load ] grpc_peer = storage_node.get_rpc_endpoint() parallel( self._prepare_loader, self.loaders, load_params, grpc_peer, s3_public_keys, k6_dir ) def _prepare_loader( self, loader: Loader, load_params: LoadParams, grpc_peer: str, s3_public_keys: list[str], k6_dir: str, ): with reporter.step(f"Init s3 client on {loader.ip}"): shell = loader.get_shell() frostfs_authmate_exec: FrostfsAuthmate = FrostfsAuthmate(shell, FROSTFS_AUTHMATE_EXEC) issue_secret_output = frostfs_authmate_exec.secret.issue( wallet=self.loaders_wallet.path, peer=grpc_peer, gate_public_key=s3_public_keys, container_placement_policy=load_params.preset.container_placement_policy, container_policy=f"{k6_dir}/scenarios/files/policy.json", wallet_password=self.loaders_wallet.password, ).stdout aws_access_key_id = str( re.search( r"access_key_id.*:\s.(?P\w*)", issue_secret_output ).group("aws_access_key_id") ) aws_secret_access_key = str( re.search( r"secret_access_key.*:\s.(?P\w*)", issue_secret_output, ).group("aws_secret_access_key") ) configure_input = [ InteractiveInput(prompt_pattern=r"AWS Access Key ID.*", input=aws_access_key_id), InteractiveInput( prompt_pattern=r"AWS Secret Access Key.*", input=aws_secret_access_key ), InteractiveInput(prompt_pattern=r".*", input=""), InteractiveInput(prompt_pattern=r".*", input=""), ] shell.exec("aws configure", CommandOptions(interactive_inputs=configure_input)) @reporter.step_deco("Init k6 instances") def init_k6_instances(self, load_params: LoadParams, endpoints: list[str], k6_dir: str): self.k6_instances = [] cycled_loaders = itertools.cycle(self.loaders) k6_distribution_count = { K6ProcessAllocationStrategy.PER_LOAD_NODE: len(self.loaders), K6ProcessAllocationStrategy.PER_ENDPOINT: len(endpoints), } endpoints_generators = { K6ProcessAllocationStrategy.PER_LOAD_NODE: itertools.cycle([endpoints]), K6ProcessAllocationStrategy.PER_ENDPOINT: itertools.cycle( [[endpoint] for endpoint in endpoints] ), } k6_processes_count = k6_distribution_count[load_params.k6_process_allocation_strategy] endpoints_gen = endpoints_generators[load_params.k6_process_allocation_strategy] distributed_load_params_list = self._get_distributed_load_params_list( load_params, k6_processes_count ) futures = parallel( self._init_k6_instance, distributed_load_params_list, loader=cycled_loaders, endpoints=endpoints_gen, k6_dir=k6_dir, ) self.k6_instances = [future.result() for future in futures] def _init_k6_instance( self, load_params_for_loader: LoadParams, loader: Loader, endpoints: list[str], k6_dir: str ): shell = loader.get_shell() with reporter.step(f"Init K6 instance on {loader.ip} for endpoints {endpoints}"): with reporter.step(f"Make working directory"): shell.exec(f"sudo mkdir -p {load_params_for_loader.working_dir}") shell.exec(f"sudo chown {LOAD_NODE_SSH_USER} {load_params_for_loader.working_dir}") return K6( load_params_for_loader, endpoints, k6_dir, shell, loader, self.loaders_wallet, ) def _get_distributed_load_params_list( self, original_load_params: LoadParams, workers_count: int ) -> list[LoadParams]: divisor = int(BACKGROUND_LOAD_VUS_COUNT_DIVISOR) distributed_load_params: list[LoadParams] = [] for i in range(workers_count): load_params = copy.deepcopy(original_load_params) # Append #i here in case if multiple k6 processes goes into same load node load_params.set_id(f"{load_params.load_id}_{i}") distributed_load_params.append(load_params) load_fields = fields(original_load_params) for field in load_fields: if ( field.metadata and original_load_params.scenario in field.metadata["applicable_scenarios"] and field.metadata["distributed"] and getattr(original_load_params, field.name) is not None ): original_value = getattr(original_load_params, field.name) distribution = self._get_distribution( math.ceil(original_value / divisor), workers_count ) for i in range(workers_count): setattr(distributed_load_params[i], field.name, distribution[i]) return distributed_load_params def _get_distribution(self, clients_count: int, workers_count: int) -> list[int]: """ This function will distribute evenly as possible X clients to Y workers. For example if we have 150 readers (clients) and we want to spread it over 4 load nodes (workers) this will return [38, 38, 37, 37]. Args: clients_count: amount of things needs to be distributed. workers_count: amount of workers. Returns: list of distribution. """ if workers_count < 1: raise Exception("Workers cannot be less then 1") # Amount of guaranteed payload on one worker clients_per_worker = clients_count // workers_count # Remainder of clients left to be distributed remainder = clients_count - clients_per_worker * workers_count distribution = [ clients_per_worker + 1 if i < remainder else clients_per_worker for i in range(workers_count) ] return distribution def start(self): load_params = self.k6_instances[0].load_params parallel([k6.start for k6 in self.k6_instances]) wait_after_start_time = datetime_utils.parse_time(load_params.setup_timeout) + 5 with reporter.step( f"Wait for start timeout + couple more seconds ({wait_after_start_time}) before moving on" ): time.sleep(wait_after_start_time) def stop(self): for k6_instance in self.k6_instances: k6_instance.stop() def get_results(self) -> dict: results = {} for k6_instance in self.k6_instances: if k6_instance.load_params.k6_process_allocation_strategy is None: raise RuntimeError("k6_process_allocation_strategy should not be none") result = k6_instance.get_results() endpoint = urlparse(k6_instance.endpoints[0]).netloc or k6_instance.endpoints[0] keys_map = { K6ProcessAllocationStrategy.PER_LOAD_NODE: k6_instance.loader.ip, K6ProcessAllocationStrategy.PER_ENDPOINT: endpoint, } key = keys_map[k6_instance.load_params.k6_process_allocation_strategy] results[key] = result return results class LocalRunner(RunnerBase): loaders: list[Loader] cluster_state_controller: ClusterStateController file_keeper: FileKeeper wallet: WalletInfo def __init__( self, cluster_state_controller: ClusterStateController, file_keeper: FileKeeper, nodes_under_load: list[ClusterNode], ) -> None: self.cluster_state_controller = cluster_state_controller self.file_keeper = file_keeper self.loaders = [NodeLoader(node) for node in nodes_under_load] @run_optionally(optionals.OPTIONAL_BACKGROUND_LOAD_ENABLED) @reporter.step_deco("Preparation steps") def prepare( self, load_params: LoadParams, nodes_under_load: list[ClusterNode], k6_dir: str, ): @reporter.step_deco("Prepare node {cluster_node}") def prepare_node(cluster_node: ClusterNode): shell = cluster_node.host.get_shell() with reporter.step("Allow storage user to login into system"): shell.exec(f"sudo chsh -s /bin/bash {STORAGE_USER_NAME}") with reporter.step("Update limits.conf"): limits_path = "/etc/security/limits.conf" self.file_keeper.add(cluster_node.storage_node, limits_path) content = f"{STORAGE_USER_NAME} hard nofile 65536\n{STORAGE_USER_NAME} soft nofile 65536\n" shell.exec(f"echo '{content}' | sudo tee {limits_path}") with reporter.step("Download K6"): shell.exec(f"sudo rm -rf {k6_dir};sudo mkdir {k6_dir}") shell.exec(f"sudo curl -so {k6_dir}/k6.tar.gz {load_params.k6_url}") shell.exec(f"sudo tar xf {k6_dir}/k6.tar.gz -C {k6_dir}") shell.exec(f"sudo chmod -R 777 {k6_dir}") with reporter.step("Create empty_passwd"): self.wallet = WalletInfo( f"{k6_dir}/scenarios/files/wallet.json", "", "/tmp/empty_passwd.yml" ) content = yaml.dump({"password": ""}) shell.exec(f'echo "{content}" | sudo tee {self.wallet.config_path}') shell.exec(f"sudo chmod -R 777 {self.wallet.config_path}") with ThreadPoolExecutor(max_workers=len(nodes_under_load)) as executor: result = executor.map(prepare_node, nodes_under_load) # Check for exceptions for _ in result: pass @reporter.step_deco("Init k6 instances") def init_k6_instances(self, load_params: LoadParams, endpoints: list[str], k6_dir: str): self.k6_instances = [] futures = parallel( self._init_k6_instance, self.loaders, load_params, k6_dir, ) self.k6_instances = [future.result() for future in futures] def _init_k6_instance(self, loader: Loader, load_params: LoadParams, k6_dir: str): shell = loader.get_shell() with reporter.step(f"Init K6 instance on {loader.ip}"): with reporter.step(f"Make working directory"): shell.exec(f"sudo mkdir -p {load_params.working_dir}") # If we chmod /home/ folder we can no longer ssh to the node # !! IMPORTANT !! if ( load_params.working_dir and not load_params.working_dir == f"/home/{LOAD_NODE_SSH_USER}" and not load_params.working_dir == f"/home/{LOAD_NODE_SSH_USER}/" ): shell.exec(f"sudo chmod -R 777 {load_params.working_dir}") return K6( load_params, ["localhost:8080"], k6_dir, shell, loader, self.wallet, ) def start(self): load_params = self.k6_instances[0].load_params self.cluster_state_controller.stop_all_s3_gates() self.cluster_state_controller.stop_all_storage_services() parallel([k6.start for k6 in self.k6_instances]) wait_after_start_time = datetime_utils.parse_time(load_params.setup_timeout) + 5 with reporter.step( f"Wait for start timeout + couple more seconds ({wait_after_start_time}) before moving on" ): time.sleep(wait_after_start_time) def stop(self): for k6_instance in self.k6_instances: k6_instance.stop() self.cluster_state_controller.start_stopped_storage_services() self.cluster_state_controller.start_stopped_s3_gates() def get_results(self) -> dict: results = {} for k6_instance in self.k6_instances: result = k6_instance.get_results() results[k6_instance.loader.ip] = result return results