diff --git a/src/frostfs_testlib/cli/frostfs_authmate/authmate.py b/src/frostfs_testlib/cli/frostfs_authmate/authmate.py index ba3a3b02..7912dae2 100644 --- a/src/frostfs_testlib/cli/frostfs_authmate/authmate.py +++ b/src/frostfs_testlib/cli/frostfs_authmate/authmate.py @@ -6,8 +6,8 @@ from frostfs_testlib.shell import Shell class FrostfsAuthmate: - secret: Optional[FrostfsAuthmateSecret] = None - version: Optional[FrostfsAuthmateVersion] = None + secret: FrostfsAuthmateSecret + version: FrostfsAuthmateVersion def __init__(self, shell: Shell, frostfs_authmate_exec_path: str): self.secret = FrostfsAuthmateSecret(shell, frostfs_authmate_exec_path) diff --git a/src/frostfs_testlib/load/__init__.py b/src/frostfs_testlib/load/__init__.py new file mode 100644 index 00000000..e8ed75e4 --- /dev/null +++ b/src/frostfs_testlib/load/__init__.py @@ -0,0 +1,13 @@ +from frostfs_testlib.load.interfaces import Loader, ScenarioRunner +from frostfs_testlib.load.load_config import ( + EndpointSelectionStrategy, + K6ProcessAllocationStrategy, + LoadParams, + LoadScenario, + LoadType, + NodesSelectionStrategy, + Preset, +) +from frostfs_testlib.load.load_report import LoadReport +from frostfs_testlib.load.loaders import NodeLoader, RemoteLoader +from frostfs_testlib.load.runners import DefaultRunner, LocalRunner diff --git a/src/frostfs_testlib/load/interfaces.py b/src/frostfs_testlib/load/interfaces.py new file mode 100644 index 00000000..fbbc20b6 --- /dev/null +++ b/src/frostfs_testlib/load/interfaces.py @@ -0,0 +1,53 @@ +from abc import ABC, abstractmethod + +from frostfs_testlib.load.load_config import LoadParams +from frostfs_testlib.shell.interfaces import Shell +from frostfs_testlib.storage.cluster import ClusterNode +from frostfs_testlib.storage.dataclasses.wallet import WalletInfo + + +class Loader(ABC): + @abstractmethod + def get_shell(self) -> Shell: + """Get shell for the loader""" + + @property + @abstractmethod + def ip(self): + """Get address of the loader""" + + +class ScenarioRunner(ABC): + @abstractmethod + def prepare( + self, + load_params: LoadParams, + nodes_under_load: list[ClusterNode], + k6_dir: str, + ): + """Preparation steps before running the load""" + + @abstractmethod + def init_k6_instances(self, load_params: LoadParams, endpoints: list[str], k6_dir: str): + """Init K6 instances""" + + @abstractmethod + def start(self): + """Start K6 instances""" + + @abstractmethod + def stop(self): + """Stop K6 instances""" + + @property + @abstractmethod + def is_running(self) -> bool: + """Returns True if load is running at the moment""" + + @abstractmethod + def wait_until_finish(self): + """Wait until load is finished""" + + @abstractmethod + def get_results(self) -> dict: + """Get results from K6 run""" diff --git a/src/frostfs_testlib/load/k6.py b/src/frostfs_testlib/load/k6.py index 2fa2c000..ca3f696b 100644 --- a/src/frostfs_testlib/load/k6.py +++ b/src/frostfs_testlib/load/k6.py @@ -1,10 +1,12 @@ import json import logging +import math import os from dataclasses import dataclass, fields from time import sleep from typing import Any +from frostfs_testlib.load.interfaces import Loader from frostfs_testlib.load.load_config import ( K6ProcessAllocationStrategy, LoadParams, @@ -13,7 +15,12 @@ from frostfs_testlib.load.load_config import ( ) from frostfs_testlib.processes.remote_process import RemoteProcess from frostfs_testlib.reporter import get_reporter -from frostfs_testlib.resources.load_params import K6_STOP_SIGNAL_TIMEOUT, LOAD_NODE_SSH_USER +from frostfs_testlib.resources.common import STORAGE_USER_NAME +from frostfs_testlib.resources.load_params import ( + K6_STOP_SIGNAL_TIMEOUT, + K6_TEARDOWN_PERIOD, + LOAD_NODE_SSH_USER, +) from frostfs_testlib.shell import Shell from frostfs_testlib.storage.dataclasses.wallet import WalletInfo from frostfs_testlib.testing.test_control import wait_for_success @@ -42,7 +49,7 @@ class K6: endpoints: list[str], k6_dir: str, shell: Shell, - load_node: str, + loader: Loader, wallet: WalletInfo, ): if load_params.scenario is None: @@ -50,7 +57,7 @@ class K6: self.load_params: LoadParams = load_params self.endpoints = endpoints - self.load_node: str = load_node + self.loader: Loader = loader self.shell: Shell = shell self.wallet = wallet self.scenario: LoadScenario = load_params.scenario @@ -151,32 +158,56 @@ class K6: [f"-e {param}='{value}'" for param, value in env_vars.items() if value is not None] ) - @reporter.step_deco("Start K6 on initiator") def start(self) -> None: - command = ( - f"{self._k6_dir}/k6 run {self._generate_env_variables()} " - f"{self._k6_dir}/scenarios/{self.scenario.value}.js" - ) - self._k6_process = RemoteProcess.create(command, self.shell, self.load_params.working_dir) + with reporter.step( + f"Start load from loader {self.loader.ip} on endpoints {self.endpoints}" + ): + command = ( + f"{self._k6_dir}/k6 run {self._generate_env_variables()} " + f"{self._k6_dir}/scenarios/{self.scenario.value}.js" + ) + user = STORAGE_USER_NAME if self.load_params.scenario == LoadScenario.LOCAL else None + self._k6_process = RemoteProcess.create( + command, self.shell, self.load_params.working_dir, user + ) + + def wait_until_finished(self) -> None: + with reporter.step( + f"Wait until load is finished from loader {self.loader.ip} on endpoints {self.endpoints}" + ): + if self.load_params.scenario == LoadScenario.VERIFY: + timeout = self.load_params.verify_time or 0 + else: + timeout = self.load_params.load_time or 0 + + timeout += int(K6_TEARDOWN_PERIOD) + original_timeout = timeout + + min_wait_interval = 10 + wait_interval = min_wait_interval + if self._k6_process is None: + assert "No k6 instances were executed" + while timeout > 0: + if not self._k6_process.running(): + return + logger.info(f"K6 is running. Waiting {wait_interval} seconds...") + sleep(wait_interval) + timeout -= min(timeout, wait_interval) + wait_interval = max( + min(timeout, int(math.log2(timeout + 1)) * 15) - min_wait_interval, + min_wait_interval, + ) - @reporter.step_deco("Wait until K6 is finished") - def wait_until_finished(self, timeout: int = 0, k6_should_be_running: bool = False) -> None: - wait_interval = 10 - if self._k6_process is None: - assert "No k6 instances were executed" - if k6_should_be_running: - assert self._k6_process.running(), "k6 should be running." - while timeout > 0: if not self._k6_process.running(): return - logger.info(f"K6 is running. Waiting {wait_interval} seconds...") - sleep(wait_interval) - timeout -= wait_interval - self.stop() - raise TimeoutError(f"Expected K6 finished in {timeout} sec.") + + self.stop() + raise TimeoutError(f"Expected K6 to finish after {original_timeout} sec.") def get_results(self) -> Any: - with reporter.step(f"K6 results from {self.load_node}"): + with reporter.step( + f"Get load results from loader {self.loader.ip} on endpoints {self.endpoints}" + ): self.__log_output() if not self.summary_json: @@ -186,20 +217,20 @@ class K6: summary_json = json.loads(summary_text) allure_filenames = { - K6ProcessAllocationStrategy.PER_LOAD_NODE: f"{self.load_node}_{self.scenario.value}_summary.json", - K6ProcessAllocationStrategy.PER_ENDPOINT: f"{self.load_node}_{self.scenario.value}_{self.endpoints[0]}_summary.json", + K6ProcessAllocationStrategy.PER_LOAD_NODE: f"{self.loader.ip}_{self.scenario.value}_summary.json", + K6ProcessAllocationStrategy.PER_ENDPOINT: f"{self.loader.ip}_{self.scenario.value}_{self.endpoints[0]}_summary.json", } allure_filename = allure_filenames[self.load_params.k6_process_allocation_strategy] reporter.attach(summary_text, allure_filename) return summary_json - @reporter.step_deco("Stop K6") def stop(self) -> None: - if self.is_running: - self._k6_process.stop() + with reporter.step(f"Stop load from loader {self.loader.ip} on endpoints {self.endpoints}"): + if self.is_running: + self._k6_process.stop() - self._wait_until_process_end() + self._wait_until_process_end() @property def is_running(self) -> bool: @@ -207,7 +238,7 @@ class K6: return self._k6_process.running() return False - @reporter.step_deco("Wait until process end") + @reporter.step_deco("Wait until K6 process end") @wait_for_success( K6_STOP_SIGNAL_TIMEOUT, 15, False, False, "Can not stop K6 process within timeout" ) diff --git a/src/frostfs_testlib/load/load_config.py b/src/frostfs_testlib/load/load_config.py index 4e673215..c337d7cc 100644 --- a/src/frostfs_testlib/load/load_config.py +++ b/src/frostfs_testlib/load/load_config.py @@ -17,6 +17,7 @@ class LoadScenario(Enum): S3_CAR = "s3_car" HTTP = "http" VERIFY = "verify" + LOCAL = "local" all_load_scenarios = [ @@ -25,13 +26,19 @@ all_load_scenarios = [ LoadScenario.HTTP, LoadScenario.S3_CAR, LoadScenario.gRPC_CAR, + LoadScenario.LOCAL, ] all_scenarios = all_load_scenarios.copy() + [LoadScenario.VERIFY] -constant_vus_scenarios = [LoadScenario.gRPC, LoadScenario.S3, LoadScenario.HTTP] +constant_vus_scenarios = [LoadScenario.gRPC, LoadScenario.S3, LoadScenario.HTTP, LoadScenario.LOCAL] constant_arrival_rate_scenarios = [LoadScenario.gRPC_CAR, LoadScenario.S3_CAR] -grpc_preset_scenarios = [LoadScenario.gRPC, LoadScenario.HTTP, LoadScenario.gRPC_CAR] +grpc_preset_scenarios = [ + LoadScenario.gRPC, + LoadScenario.HTTP, + LoadScenario.gRPC_CAR, + LoadScenario.LOCAL, +] s3_preset_scenarios = [LoadScenario.S3, LoadScenario.S3_CAR] @@ -129,6 +136,8 @@ class LoadParams: working_dir: Optional[str] = None # Preset for the k6 run preset: Optional[Preset] = None + # K6 download url + k6_url: Optional[str] = None # ------- COMMON SCENARIO PARAMS ------- # Load time is the maximum duration for k6 to give load. Default is the BACKGROUND_LOAD_DEFAULT_TIME value. @@ -207,6 +216,10 @@ class LoadParams: # Amount of Verification VU. verify_clients: Optional[int] = metadata_field([LoadScenario.VERIFY], None, "CLIENTS", True) + # ------- LOCAL SCENARIO PARAMS ------- + # Config file location (filled automatically) + config_file: Optional[str] = metadata_field([LoadScenario.LOCAL], None, "CONFIG_FILE") + def set_id(self, load_id): self.load_id = load_id self.registry_file = os.path.join(self.working_dir, f"{load_id}_registry.bolt") diff --git a/src/frostfs_testlib/load/load_metrics.py b/src/frostfs_testlib/load/load_metrics.py index 50d7b389..0b4e28e1 100644 --- a/src/frostfs_testlib/load/load_metrics.py +++ b/src/frostfs_testlib/load/load_metrics.py @@ -138,6 +138,17 @@ class S3Metrics(MetricsBase): _DELETE_ERRORS = "aws_obj_delete_fails" +class LocalMetrics(MetricsBase): + _WRITE_SUCCESS = "local_obj_put_total" + _WRITE_ERRORS = "local_obj_put_fails" + + _READ_SUCCESS = "local_obj_get_total" + _READ_ERRORS = "local_obj_get_fails" + + _DELETE_SUCCESS = "local_obj_delete_total" + _DELETE_ERRORS = "local_obj_delete_fails" + + class VerifyMetrics(MetricsBase): _WRITE_SUCCESS = "N/A" _WRITE_ERRORS = "N/A" @@ -157,6 +168,7 @@ def get_metrics_object(load_type: LoadScenario, summary: dict[str, Any]) -> Metr LoadScenario.S3: S3Metrics, LoadScenario.S3_CAR: S3Metrics, LoadScenario.VERIFY: VerifyMetrics, + LoadScenario.LOCAL: LocalMetrics, } return class_map[load_type](summary) diff --git a/src/frostfs_testlib/load/load_report.py b/src/frostfs_testlib/load/load_report.py index 5f225159..7f912e4f 100644 --- a/src/frostfs_testlib/load/load_report.py +++ b/src/frostfs_testlib/load/load_report.py @@ -103,6 +103,7 @@ class LoadReport: LoadScenario.HTTP: "closed model", LoadScenario.gRPC_CAR: "open model", LoadScenario.S3_CAR: "open model", + LoadScenario.LOCAL: "local fill", } return model_map[self.load_params.scenario] diff --git a/src/frostfs_testlib/load/load_steps.py b/src/frostfs_testlib/load/load_steps.py deleted file mode 100644 index b55ff224..00000000 --- a/src/frostfs_testlib/load/load_steps.py +++ /dev/null @@ -1,191 +0,0 @@ -import copy -import itertools -import math -import re -from dataclasses import fields - -from frostfs_testlib.cli import FrostfsAuthmate -from frostfs_testlib.load.k6 import K6 -from frostfs_testlib.load.load_config import K6ProcessAllocationStrategy, LoadParams -from frostfs_testlib.reporter import get_reporter -from frostfs_testlib.resources.cli import FROSTFS_AUTHMATE_EXEC -from frostfs_testlib.resources.load_params import ( - BACKGROUND_LOAD_VUS_COUNT_DIVISOR, - LOAD_NODE_SSH_USER, -) -from frostfs_testlib.shell import CommandOptions, SSHShell -from frostfs_testlib.shell.interfaces import InteractiveInput, SshCredentials -from frostfs_testlib.storage.cluster import ClusterNode -from frostfs_testlib.storage.dataclasses.frostfs_services import S3Gate, StorageNode -from frostfs_testlib.storage.dataclasses.wallet import WalletInfo - -reporter = get_reporter() - -STOPPED_HOSTS = [] - - -@reporter.step_deco("Init s3 client on load nodes") -def init_s3_client( - load_nodes: list[str], - load_params: LoadParams, - k6_directory: str, - ssh_credentials: SshCredentials, - nodes_under_load: list[ClusterNode], - wallet: WalletInfo, -): - storage_node = nodes_under_load[0].service(StorageNode) - s3_public_keys = [node.service(S3Gate).get_wallet_public_key() for node in nodes_under_load] - grpc_peer = storage_node.get_rpc_endpoint() - - for load_node in load_nodes: - ssh_client = _get_shell(ssh_credentials, load_node) - frostfs_authmate_exec: FrostfsAuthmate = FrostfsAuthmate(ssh_client, FROSTFS_AUTHMATE_EXEC) - issue_secret_output = frostfs_authmate_exec.secret.issue( - wallet=wallet.path, - peer=grpc_peer, - bearer_rules=f"{k6_directory}/scenarios/files/rules.json", - gate_public_key=s3_public_keys, - container_placement_policy=load_params.preset.container_placement_policy, - container_policy=f"{k6_directory}/scenarios/files/policy.json", - wallet_password=wallet.password, - ).stdout - aws_access_key_id = str( - re.search(r"access_key_id.*:\s.(?P\w*)", issue_secret_output).group( - "aws_access_key_id" - ) - ) - aws_secret_access_key = str( - re.search( - r"secret_access_key.*:\s.(?P\w*)", issue_secret_output - ).group("aws_secret_access_key") - ) - # prompt_pattern doesn't work at the moment - configure_input = [ - InteractiveInput(prompt_pattern=r"AWS Access Key ID.*", input=aws_access_key_id), - InteractiveInput( - prompt_pattern=r"AWS Secret Access Key.*", input=aws_secret_access_key - ), - InteractiveInput(prompt_pattern=r".*", input=""), - InteractiveInput(prompt_pattern=r".*", input=""), - ] - ssh_client.exec("aws configure", CommandOptions(interactive_inputs=configure_input)) - - -@reporter.step_deco("Prepare K6 instances and objects") -def prepare_k6_instances( - load_nodes: list[str], - ssh_credentials: SshCredentials, - k6_dir: str, - load_params: LoadParams, - endpoints: list[str], - loaders_wallet: WalletInfo, -) -> list[K6]: - k6_load_objects: list[K6] = [] - nodes = itertools.cycle(load_nodes) - - k6_distribution_count = { - K6ProcessAllocationStrategy.PER_LOAD_NODE: len(load_nodes), - K6ProcessAllocationStrategy.PER_ENDPOINT: len(endpoints), - } - endpoints_generators = { - K6ProcessAllocationStrategy.PER_LOAD_NODE: itertools.cycle([endpoints]), - K6ProcessAllocationStrategy.PER_ENDPOINT: itertools.cycle( - [[endpoint] for endpoint in endpoints] - ), - } - k6_processes_count = k6_distribution_count[load_params.k6_process_allocation_strategy] - endpoints_gen = endpoints_generators[load_params.k6_process_allocation_strategy] - - distributed_load_params_list = _get_distributed_load_params_list( - load_params, k6_processes_count - ) - - for distributed_load_params in distributed_load_params_list: - load_node = next(nodes) - shell = _get_shell(ssh_credentials, load_node) - # Make working_dir directory - shell.exec(f"sudo mkdir -p {distributed_load_params.working_dir}") - shell.exec(f"sudo chown {LOAD_NODE_SSH_USER} {distributed_load_params.working_dir}") - - k6_load_object = K6( - distributed_load_params, - next(endpoints_gen), - k6_dir, - shell, - load_node, - loaders_wallet, - ) - k6_load_objects.append(k6_load_object) - if load_params.preset: - k6_load_object.preset() - - return k6_load_objects - - -def _get_shell(ssh_credentials: SshCredentials, load_node: str) -> SSHShell: - ssh_client = SSHShell( - host=load_node, - login=ssh_credentials.ssh_login, - password=ssh_credentials.ssh_password, - private_key_path=ssh_credentials.ssh_key_path, - private_key_passphrase=ssh_credentials.ssh_key_passphrase, - ) - - return ssh_client - - -def _get_distributed_load_params_list( - original_load_params: LoadParams, workers_count: int -) -> list[LoadParams]: - divisor = int(BACKGROUND_LOAD_VUS_COUNT_DIVISOR) - distributed_load_params: list[LoadParams] = [] - - for i in range(workers_count): - load_params = copy.deepcopy(original_load_params) - # Append #i here in case if multiple k6 processes goes into same load node - load_params.set_id(f"{load_params.load_id}_{i}") - distributed_load_params.append(load_params) - - load_fields = fields(original_load_params) - - for field in load_fields: - if ( - field.metadata - and original_load_params.scenario in field.metadata["applicable_scenarios"] - and field.metadata["distributed"] - and getattr(original_load_params, field.name) is not None - ): - original_value = getattr(original_load_params, field.name) - distribution = _get_distribution(math.ceil(original_value / divisor), workers_count) - for i in range(workers_count): - setattr(distributed_load_params[i], field.name, distribution[i]) - - return distributed_load_params - - -def _get_distribution(clients_count: int, workers_count: int) -> list[int]: - """ - This function will distribute evenly as possible X clients to Y workers. - For example if we have 150 readers (clients) and we want to spread it over 4 load nodes (workers) - this will return [38, 38, 37, 37]. - - Args: - clients_count: amount of things needs to be distributed. - workers_count: amount of workers. - - Returns: - list of distribution. - """ - if workers_count < 1: - raise Exception("Workers cannot be less then 1") - - # Amount of guaranteed payload on one worker - clients_per_worker = clients_count // workers_count - # Remainder of clients left to be distributed - remainder = clients_count - clients_per_worker * workers_count - - distribution = [ - clients_per_worker + 1 if i < remainder else clients_per_worker - for i in range(workers_count) - ] - return distribution diff --git a/src/frostfs_testlib/load/loaders.py b/src/frostfs_testlib/load/loaders.py new file mode 100644 index 00000000..9e92155c --- /dev/null +++ b/src/frostfs_testlib/load/loaders.py @@ -0,0 +1,60 @@ +from frostfs_testlib.load.interfaces import Loader +from frostfs_testlib.resources.load_params import ( + LOAD_NODE_SSH_PASSWORD, + LOAD_NODE_SSH_PRIVATE_KEY_PASSPHRASE, + LOAD_NODE_SSH_PRIVATE_KEY_PATH, + LOAD_NODE_SSH_USER, +) +from frostfs_testlib.shell.interfaces import Shell, SshCredentials +from frostfs_testlib.shell.ssh_shell import SSHShell +from frostfs_testlib.storage.cluster import ClusterNode + + +class RemoteLoader(Loader): + def __init__(self, ssh_credentials: SshCredentials, ip: str) -> None: + self.ssh_credentials = ssh_credentials + self._ip = ip + + @property + def ip(self): + return self._ip + + def get_shell(self) -> Shell: + ssh_client = SSHShell( + host=self.ip, + login=self.ssh_credentials.ssh_login, + password=self.ssh_credentials.ssh_password, + private_key_path=self.ssh_credentials.ssh_key_path, + private_key_passphrase=self.ssh_credentials.ssh_key_passphrase, + ) + + return ssh_client + + @classmethod + def from_ip_list(cls, ip_list: list[str]) -> list[Loader]: + loaders: list[Loader] = [] + ssh_credentials = SshCredentials( + LOAD_NODE_SSH_USER, + LOAD_NODE_SSH_PASSWORD, + LOAD_NODE_SSH_PRIVATE_KEY_PATH, + LOAD_NODE_SSH_PRIVATE_KEY_PASSPHRASE, + ) + + for ip in ip_list: + loaders.append(RemoteLoader(ssh_credentials, ip)) + + return loaders + + +class NodeLoader(Loader): + """When ClusterNode is the loader for itself (for Local scenario only).""" + + def __init__(self, cluster_node: ClusterNode) -> None: + self.cluster_node = cluster_node + + def get_shell(self) -> Shell: + return self.cluster_node.host.get_shell() + + @property + def ip(self): + return self.cluster_node.host_ip diff --git a/src/frostfs_testlib/load/runners.py b/src/frostfs_testlib/load/runners.py new file mode 100644 index 00000000..6f9d0465 --- /dev/null +++ b/src/frostfs_testlib/load/runners.py @@ -0,0 +1,398 @@ +import copy +import itertools +import math +import re +import time +from concurrent.futures import ThreadPoolExecutor +from dataclasses import fields +from typing import Optional + +import yaml + +from frostfs_testlib.cli.frostfs_authmate.authmate import FrostfsAuthmate +from frostfs_testlib.load.interfaces import Loader, ScenarioRunner +from frostfs_testlib.load.k6 import K6 +from frostfs_testlib.load.load_config import K6ProcessAllocationStrategy, LoadParams, LoadType +from frostfs_testlib.load.loaders import NodeLoader, RemoteLoader +from frostfs_testlib.reporter import get_reporter +from frostfs_testlib.resources import optionals +from frostfs_testlib.resources.cli import FROSTFS_AUTHMATE_EXEC +from frostfs_testlib.resources.common import STORAGE_USER_NAME +from frostfs_testlib.resources.load_params import ( + BACKGROUND_LOAD_VUS_COUNT_DIVISOR, + LOAD_NODE_SSH_USER, + LOAD_NODES, +) +from frostfs_testlib.shell.interfaces import CommandOptions, InteractiveInput +from frostfs_testlib.storage.cluster import ClusterNode +from frostfs_testlib.storage.controllers.cluster_state_controller import ClusterStateController +from frostfs_testlib.storage.dataclasses.frostfs_services import S3Gate, StorageNode +from frostfs_testlib.storage.dataclasses.wallet import WalletInfo +from frostfs_testlib.testing.test_control import run_optionally +from frostfs_testlib.utils import datetime_utils +from frostfs_testlib.utils.file_keeper import FileKeeper + +reporter = get_reporter() + + +class DefaultRunner(ScenarioRunner): + k6_instances: list[K6] + loaders: list[Loader] + loaders_wallet: WalletInfo + + def __init__( + self, + loaders_wallet: WalletInfo, + load_ip_list: Optional[list[str]] = None, + ) -> None: + if load_ip_list is None: + load_ip_list = LOAD_NODES + self.loaders = RemoteLoader.from_ip_list(load_ip_list) + self.loaders_wallet = loaders_wallet + + @run_optionally(optionals.OPTIONAL_BACKGROUND_LOAD_ENABLED) + @reporter.step_deco("Prepare load instances") + def prepare( + self, + load_params: LoadParams, + nodes_under_load: list[ClusterNode], + k6_dir: str, + ): + if load_params.load_type != LoadType.S3: + return + + with reporter.step("Init s3 client on loaders"): + storage_node = nodes_under_load[0].service(StorageNode) + s3_public_keys = [ + node.service(S3Gate).get_wallet_public_key() for node in nodes_under_load + ] + grpc_peer = storage_node.get_rpc_endpoint() + + for loader in self.loaders: + with reporter.step(f"Init s3 client on {loader.ip}"): + shell = loader.get_shell() + frostfs_authmate_exec: FrostfsAuthmate = FrostfsAuthmate( + shell, FROSTFS_AUTHMATE_EXEC + ) + issue_secret_output = frostfs_authmate_exec.secret.issue( + wallet=self.loaders_wallet.path, + peer=grpc_peer, + bearer_rules=f"{k6_dir}/scenarios/files/rules.json", + gate_public_key=s3_public_keys, + container_placement_policy=load_params.preset.container_placement_policy, + container_policy=f"{k6_dir}/scenarios/files/policy.json", + wallet_password=self.loaders_wallet.password, + ).stdout + aws_access_key_id = str( + re.search( + r"access_key_id.*:\s.(?P\w*)", issue_secret_output + ).group("aws_access_key_id") + ) + aws_secret_access_key = str( + re.search( + r"secret_access_key.*:\s.(?P\w*)", + issue_secret_output, + ).group("aws_secret_access_key") + ) + + configure_input = [ + InteractiveInput( + prompt_pattern=r"AWS Access Key ID.*", input=aws_access_key_id + ), + InteractiveInput( + prompt_pattern=r"AWS Secret Access Key.*", input=aws_secret_access_key + ), + InteractiveInput(prompt_pattern=r".*", input=""), + InteractiveInput(prompt_pattern=r".*", input=""), + ] + shell.exec("aws configure", CommandOptions(interactive_inputs=configure_input)) + + def wait_until_finish(self): + for k6_instance in self.k6_instances: + k6_instance.wait_until_finished() + + def init_k6_instances(self, load_params: LoadParams, endpoints: list[str], k6_dir: str): + self.k6_instances = [] + cycled_loaders = itertools.cycle(self.loaders) + + k6_distribution_count = { + K6ProcessAllocationStrategy.PER_LOAD_NODE: len(self.loaders), + K6ProcessAllocationStrategy.PER_ENDPOINT: len(endpoints), + } + endpoints_generators = { + K6ProcessAllocationStrategy.PER_LOAD_NODE: itertools.cycle([endpoints]), + K6ProcessAllocationStrategy.PER_ENDPOINT: itertools.cycle( + [[endpoint] for endpoint in endpoints] + ), + } + k6_processes_count = k6_distribution_count[load_params.k6_process_allocation_strategy] + endpoints_gen = endpoints_generators[load_params.k6_process_allocation_strategy] + + distributed_load_params_list = self._get_distributed_load_params_list( + load_params, k6_processes_count + ) + + for distributed_load_params in distributed_load_params_list: + loader = next(cycled_loaders) + shell = loader.get_shell() + with reporter.step( + f"Init K6 instances on {loader.ip} for load id {distributed_load_params.load_id}" + ): + with reporter.step(f"Make working directory"): + shell.exec(f"sudo mkdir -p {distributed_load_params.working_dir}") + shell.exec( + f"sudo chown {LOAD_NODE_SSH_USER} {distributed_load_params.working_dir}" + ) + + k6_instance = K6( + distributed_load_params, + next(endpoints_gen), + k6_dir, + shell, + loader, + self.loaders_wallet, + ) + self.k6_instances.append(k6_instance) + if load_params.preset: + k6_instance.preset() + + def _get_distributed_load_params_list( + self, original_load_params: LoadParams, workers_count: int + ) -> list[LoadParams]: + divisor = int(BACKGROUND_LOAD_VUS_COUNT_DIVISOR) + distributed_load_params: list[LoadParams] = [] + + for i in range(workers_count): + load_params = copy.deepcopy(original_load_params) + # Append #i here in case if multiple k6 processes goes into same load node + load_params.set_id(f"{load_params.load_id}_{i}") + distributed_load_params.append(load_params) + + load_fields = fields(original_load_params) + + for field in load_fields: + if ( + field.metadata + and original_load_params.scenario in field.metadata["applicable_scenarios"] + and field.metadata["distributed"] + and getattr(original_load_params, field.name) is not None + ): + original_value = getattr(original_load_params, field.name) + distribution = self._get_distribution( + math.ceil(original_value / divisor), workers_count + ) + for i in range(workers_count): + setattr(distributed_load_params[i], field.name, distribution[i]) + + return distributed_load_params + + def _get_distribution(self, clients_count: int, workers_count: int) -> list[int]: + """ + This function will distribute evenly as possible X clients to Y workers. + For example if we have 150 readers (clients) and we want to spread it over 4 load nodes (workers) + this will return [38, 38, 37, 37]. + + Args: + clients_count: amount of things needs to be distributed. + workers_count: amount of workers. + + Returns: + list of distribution. + """ + if workers_count < 1: + raise Exception("Workers cannot be less then 1") + + # Amount of guaranteed payload on one worker + clients_per_worker = clients_count // workers_count + # Remainder of clients left to be distributed + remainder = clients_count - clients_per_worker * workers_count + + distribution = [ + clients_per_worker + 1 if i < remainder else clients_per_worker + for i in range(workers_count) + ] + return distribution + + def start(self): + load_params = self.k6_instances[0].load_params + + with ThreadPoolExecutor(max_workers=len(self.k6_instances)) as executor: + futures = [executor.submit(k6.start) for k6 in self.k6_instances] + + # Check for exceptions + exceptions = [future.exception() for future in futures if future.exception()] + if exceptions: + raise RuntimeError( + f"The following exceptions occured during start of k6: {exceptions}" + ) + + wait_after_start_time = datetime_utils.parse_time(load_params.setup_timeout) + 5 + with reporter.step( + f"Wait for start timeout + couple more seconds ({wait_after_start_time}) before moving on" + ): + time.sleep(wait_after_start_time) + + def stop(self): + for k6_instance in self.k6_instances: + k6_instance.stop() + + def get_results(self) -> dict: + results = {} + for k6_instance in self.k6_instances: + if k6_instance.load_params.k6_process_allocation_strategy is None: + raise RuntimeError("k6_process_allocation_strategy should not be none") + + result = k6_instance.get_results() + keys_map = { + K6ProcessAllocationStrategy.PER_LOAD_NODE: k6_instance.loader.ip, + K6ProcessAllocationStrategy.PER_ENDPOINT: k6_instance.endpoints[0], + } + key = keys_map[k6_instance.load_params.k6_process_allocation_strategy] + results[key] = result + + return results + + @property + def is_running(self): + for k6_instance in self.k6_instances: + if not k6_instance.is_running: + return False + + return True + + +class LocalRunner(ScenarioRunner): + k6_instances: list[K6] + loaders: list[Loader] + cluster_state_controller: ClusterStateController + file_keeper: FileKeeper + wallet: WalletInfo + + def __init__( + self, + cluster_state_controller: ClusterStateController, + file_keeper: FileKeeper, + nodes_under_load: list[ClusterNode], + ) -> None: + self.cluster_state_controller = cluster_state_controller + self.file_keeper = file_keeper + self.loaders = [NodeLoader(node) for node in nodes_under_load] + + @run_optionally(optionals.OPTIONAL_BACKGROUND_LOAD_ENABLED) + @reporter.step_deco("Prepare load instances") + def prepare( + self, + load_params: LoadParams, + nodes_under_load: list[ClusterNode], + k6_dir: str, + ): + @reporter.step_deco("Prepare node {cluster_node}") + def prepare_node(cluster_node: ClusterNode): + shell = cluster_node.host.get_shell() + + with reporter.step("Allow storage user to login into system"): + shell.exec(f"sudo chsh -s /bin/bash {STORAGE_USER_NAME}") + + with reporter.step("Update limits.conf"): + limits_path = "/etc/security/limits.conf" + self.file_keeper.add(cluster_node.storage_node, limits_path) + content = f"{STORAGE_USER_NAME} hard nofile 65536\n{STORAGE_USER_NAME} soft nofile 65536\n" + shell.exec(f"echo '{content}' | sudo tee {limits_path}") + + with reporter.step("Download K6"): + shell.exec(f"sudo rm -rf {k6_dir};sudo mkdir {k6_dir}") + shell.exec(f"sudo curl -so {k6_dir}/k6.tar.gz {load_params.k6_url}") + shell.exec(f"sudo tar xf {k6_dir}/k6.tar.gz -C {k6_dir}") + shell.exec(f"sudo chmod -R 777 {k6_dir}") + + with reporter.step("Create empty_passwd"): + self.wallet = WalletInfo( + f"{k6_dir}/scenarios/files/wallet.json", "", "/tmp/empty_passwd.yml" + ) + content = yaml.dump({"password": ""}) + shell.exec(f'echo "{content}" | sudo tee {self.wallet.config_path}') + shell.exec(f"sudo chmod -R 777 {self.wallet.config_path}") + + with ThreadPoolExecutor(max_workers=len(nodes_under_load)) as executor: + result = executor.map(prepare_node, nodes_under_load) + + # Check for exceptions + for _ in result: + pass + + def wait_until_finish(self): + for k6_instance in self.k6_instances: + k6_instance.wait_until_finished() + + def init_k6_instances(self, load_params: LoadParams, endpoints: list[str], k6_dir: str): + self.k6_instances = [] + for loader in self.loaders: + shell = loader.get_shell() + with reporter.step(f"Init K6 instances on {loader.ip}"): + with reporter.step(f"Make working directory"): + shell.exec(f"sudo mkdir -p {load_params.working_dir}") + # If we chmod /home/ folder we can no longer ssh to the node + # !! IMPORTANT !! + if ( + load_params.working_dir + and not load_params.working_dir == f"/home/{LOAD_NODE_SSH_USER}" + and not load_params.working_dir == f"/home/{LOAD_NODE_SSH_USER}/" + ): + shell.exec(f"sudo chmod -R 777 {load_params.working_dir}") + + k6_instance = K6( + load_params, + ["localhost:8080"], + k6_dir, + shell, + loader, + self.wallet, + ) + self.k6_instances.append(k6_instance) + if load_params.preset: + k6_instance.preset() + + def start(self): + load_params = self.k6_instances[0].load_params + + self.cluster_state_controller.stop_all_s3_gates() + self.cluster_state_controller.stop_all_storage_services() + + with ThreadPoolExecutor(max_workers=len(self.k6_instances)) as executor: + futures = [executor.submit(k6.start) for k6 in self.k6_instances] + + # Check for exceptions + exceptions = [future.exception() for future in futures if future.exception()] + if exceptions: + raise RuntimeError( + f"The following exceptions occured during start of k6: {exceptions}" + ) + + wait_after_start_time = datetime_utils.parse_time(load_params.setup_timeout) + 5 + with reporter.step( + f"Wait for start timeout + couple more seconds ({wait_after_start_time}) before moving on" + ): + time.sleep(wait_after_start_time) + + def stop(self): + for k6_instance in self.k6_instances: + k6_instance.stop() + + self.cluster_state_controller.start_stopped_storage_services() + self.cluster_state_controller.start_stopped_s3_gates() + + def get_results(self) -> dict: + results = {} + for k6_instance in self.k6_instances: + result = k6_instance.get_results() + results[k6_instance.loader.ip] = result + + return results + + @property + def is_running(self): + for k6_instance in self.k6_instances: + if not k6_instance.is_running: + return False + + return True diff --git a/src/frostfs_testlib/processes/remote_process.py b/src/frostfs_testlib/processes/remote_process.py index 7f490005..d92d77a7 100644 --- a/src/frostfs_testlib/processes/remote_process.py +++ b/src/frostfs_testlib/processes/remote_process.py @@ -10,13 +10,16 @@ from tenacity.wait import wait_fixed from frostfs_testlib.reporter import get_reporter from frostfs_testlib.shell import Shell -from frostfs_testlib.shell.interfaces import CommandOptions +from frostfs_testlib.shell.command_inspectors import SuInspector +from frostfs_testlib.shell.interfaces import CommandInspector, CommandOptions reporter = get_reporter() class RemoteProcess: - def __init__(self, cmd: str, process_dir: str, shell: Shell): + def __init__( + self, cmd: str, process_dir: str, shell: Shell, cmd_inspector: Optional[CommandInspector] + ): self.process_dir = process_dir self.cmd = cmd self.stdout_last_line_number = 0 @@ -26,10 +29,13 @@ class RemoteProcess: self.saved_stdout: Optional[str] = None self.saved_stderr: Optional[str] = None self.shell = shell + self.cmd_inspectors: list[CommandInspector] = [cmd_inspector] if cmd_inspector else [] @classmethod @reporter.step_deco("Create remote process") - def create(cls, command: str, shell: Shell, working_dir: str = "/tmp") -> RemoteProcess: + def create( + cls, command: str, shell: Shell, working_dir: str = "/tmp", user: Optional[str] = None + ) -> RemoteProcess: """ Create a process on a remote host. @@ -39,6 +45,7 @@ class RemoteProcess: rc: contains script return code stderr: contains script errors stdout: contains script output + user: user on behalf whom command will be executed Args: shell: Shell instance @@ -48,8 +55,12 @@ class RemoteProcess: Returns: RemoteProcess instance for further examination """ + cmd_inspector = SuInspector(user) if user else None remote_process = cls( - cmd=command, process_dir=os.path.join(working_dir, f"proc_{uuid.uuid4()}"), shell=shell + cmd=command, + process_dir=os.path.join(working_dir, f"proc_{uuid.uuid4()}"), + shell=shell, + cmd_inspector=cmd_inspector, ) remote_process._create_process_dir() remote_process._generate_command_script(command) @@ -73,7 +84,8 @@ class RemoteProcess: cur_stdout = self.saved_stdout else: terminal = self.shell.exec( - f"cat {self.process_dir}/stdout", options=CommandOptions(no_log=True) + f"cat {self.process_dir}/stdout", + options=CommandOptions(no_log=True, extra_inspectors=self.cmd_inspectors), ) if self.proc_rc is not None: self.saved_stdout = terminal.stdout @@ -104,7 +116,8 @@ class RemoteProcess: cur_stderr = self.saved_stderr else: terminal = self.shell.exec( - f"cat {self.process_dir}/stderr", options=CommandOptions(no_log=True) + f"cat {self.process_dir}/stderr", + options=CommandOptions(no_log=True, extra_inspectors=self.cmd_inspectors), ) if self.proc_rc is not None: self.saved_stderr = terminal.stdout @@ -123,7 +136,10 @@ class RemoteProcess: if self.proc_rc is not None: return self.proc_rc - terminal = self.shell.exec(f"cat {self.process_dir}/rc", CommandOptions(check=False)) + terminal = self.shell.exec( + f"cat {self.process_dir}/rc", + CommandOptions(check=False, extra_inspectors=self.cmd_inspectors, no_log=True), + ) if "No such file or directory" in terminal.stderr: return None elif terminal.stderr or terminal.return_code != 0: @@ -138,7 +154,10 @@ class RemoteProcess: @reporter.step_deco("Send signal to process") def send_signal(self, signal: int) -> None: - kill_res = self.shell.exec(f"kill -{signal} {self.pid}", CommandOptions(check=False)) + kill_res = self.shell.exec( + f"kill -{signal} {self.pid}", + CommandOptions(check=False, extra_inspectors=self.cmd_inspectors), + ) if "No such process" in kill_res.stderr: return if kill_res.return_code: @@ -158,27 +177,38 @@ class RemoteProcess: def clear(self) -> None: if self.process_dir == "/": raise AssertionError(f"Invalid path to delete: {self.process_dir}") - self.shell.exec(f"rm -rf {self.process_dir}") + self.shell.exec( + f"rm -rf {self.process_dir}", CommandOptions(extra_inspectors=self.cmd_inspectors) + ) @reporter.step_deco("Start remote process") def _start_process(self) -> None: self.shell.exec( f"nohup {self.process_dir}/command.sh {self.process_dir}/stdout " - f"2>{self.process_dir}/stderr &" + f"2>{self.process_dir}/stderr &", + CommandOptions(extra_inspectors=self.cmd_inspectors), ) @reporter.step_deco("Create process directory") def _create_process_dir(self) -> None: - self.shell.exec(f"mkdir {self.process_dir}") - self.shell.exec(f"chmod 777 {self.process_dir}") - terminal = self.shell.exec(f"realpath {self.process_dir}") + self.shell.exec( + f"mkdir -p {self.process_dir}", CommandOptions(extra_inspectors=self.cmd_inspectors) + ) + self.shell.exec( + f"chmod 777 {self.process_dir}", CommandOptions(extra_inspectors=self.cmd_inspectors) + ) + terminal = self.shell.exec( + f"realpath {self.process_dir}", CommandOptions(extra_inspectors=self.cmd_inspectors) + ) self.process_dir = terminal.stdout.strip() @reporter.step_deco("Get pid") @retry(wait=wait_fixed(10), stop=stop_after_attempt(5), reraise=True) def _get_pid(self) -> str: - terminal = self.shell.exec(f"cat {self.process_dir}/pid") + terminal = self.shell.exec( + f"cat {self.process_dir}/pid", CommandOptions(extra_inspectors=self.cmd_inspectors) + ) assert terminal.stdout, f"invalid pid: {terminal.stdout}" return terminal.stdout.strip() @@ -196,6 +226,15 @@ class RemoteProcess: f"echo $? > {self.process_dir}/rc" ) - self.shell.exec(f'echo "{script}" > {self.process_dir}/command.sh') - self.shell.exec(f"cat {self.process_dir}/command.sh") - self.shell.exec(f"chmod +x {self.process_dir}/command.sh") + self.shell.exec( + f'echo "{script}" > {self.process_dir}/command.sh', + CommandOptions(extra_inspectors=self.cmd_inspectors), + ) + self.shell.exec( + f"cat {self.process_dir}/command.sh", + CommandOptions(extra_inspectors=self.cmd_inspectors), + ) + self.shell.exec( + f"chmod +x {self.process_dir}/command.sh", + CommandOptions(extra_inspectors=self.cmd_inspectors), + ) diff --git a/src/frostfs_testlib/reporter/allure_handler.py b/src/frostfs_testlib/reporter/allure_handler.py index 8e00b26e..fef815d5 100644 --- a/src/frostfs_testlib/reporter/allure_handler.py +++ b/src/frostfs_testlib/reporter/allure_handler.py @@ -13,7 +13,7 @@ class AllureHandler(ReporterHandler): """Handler that stores test artifacts in Allure report.""" def step(self, name: str) -> AbstractContextManager: - name = shorten(name, width=70, placeholder="...") + name = shorten(name, width=140, placeholder="...") return allure.step(name) def step_decorator(self, name: str) -> Callable: diff --git a/src/frostfs_testlib/resources/common.py b/src/frostfs_testlib/resources/common.py index dfbb3a17..131bf8a4 100644 --- a/src/frostfs_testlib/resources/common.py +++ b/src/frostfs_testlib/resources/common.py @@ -10,6 +10,8 @@ COMPLEX_OBJECT_TAIL_SIZE = os.getenv("COMPLEX_OBJECT_TAIL_SIZE", "1000") SERVICE_MAX_STARTUP_TIME = os.getenv("SERVICE_MAX_STARTUP_TIME", "5m") +STORAGE_USER_NAME = "frostfs-storage" + MORPH_TIMEOUT = os.getenv("MORPH_BLOCK_TIME", "8s") MORPH_BLOCK_TIME = os.getenv("MORPH_BLOCK_TIME", "8s") FROSTFS_CONTRACT_CACHE_TIMEOUT = os.getenv("FROSTFS_CONTRACT_CACHE_TIMEOUT", "30s") diff --git a/src/frostfs_testlib/shell/command_inspectors.py b/src/frostfs_testlib/shell/command_inspectors.py index 8486f43c..8fe2f34e 100644 --- a/src/frostfs_testlib/shell/command_inspectors.py +++ b/src/frostfs_testlib/shell/command_inspectors.py @@ -7,7 +7,23 @@ class SudoInspector(CommandInspector): If command is already prepended with sudo, then has no effect. """ - def inspect(self, command: str) -> str: + def inspect(self, original_command: str, command: str) -> str: if not command.startswith("sudo"): return f"sudo {command}" return command + + +class SuInspector(CommandInspector): + """Allows to run command as another user via sudo su call + + If command is already prepended with sudo su, then has no effect. + """ + + def __init__(self, user: str) -> None: + self.user = user + + def inspect(self, original_command: str, command: str) -> str: + if not original_command.startswith("sudo su"): + cmd = original_command.replace('"', '\\"').replace("\$", "\\\\\\$") + return f'sudo su - {self.user} -c "{cmd}"' + return original_command diff --git a/src/frostfs_testlib/shell/interfaces.py b/src/frostfs_testlib/shell/interfaces.py index 219bc7c4..a8d33250 100644 --- a/src/frostfs_testlib/shell/interfaces.py +++ b/src/frostfs_testlib/shell/interfaces.py @@ -22,11 +22,12 @@ class CommandInspector(ABC): """Interface of inspector that processes command text before execution.""" @abstractmethod - def inspect(self, command: str) -> str: + def inspect(self, original_command: str, command: str) -> str: """Transforms command text and returns modified command. Args: command: Command to transform with this inspector. + original_command: Untransformed command to transform with this inspector. Depending on type of the inspector it might be required to modify original command Returns: Transformed command text. @@ -47,6 +48,7 @@ class CommandOptions: check: Controls whether to check return code of the command. Set to False to ignore non-zero return codes. no_log: Do not print output to logger if True. + extra_inspectors: Exctra command inspectors to process command """ interactive_inputs: Optional[list[InteractiveInput]] = None @@ -54,6 +56,7 @@ class CommandOptions: timeout: Optional[int] = None check: bool = True no_log: bool = False + extra_inspectors: Optional[list[CommandInspector]] = None def __post_init__(self): if self.timeout is None: diff --git a/src/frostfs_testlib/shell/local_shell.py b/src/frostfs_testlib/shell/local_shell.py index 12f450af..56d19b23 100644 --- a/src/frostfs_testlib/shell/local_shell.py +++ b/src/frostfs_testlib/shell/local_shell.py @@ -24,8 +24,10 @@ class LocalShell(Shell): # If no options were provided, use default options options = options or CommandOptions() - for inspector in self.command_inspectors: - command = inspector.inspect(command) + original_command = command + extra_inspectors = options.extra_inspectors if options.extra_inspectors else [] + for inspector in [*self.command_inspectors, *extra_inspectors]: + command = inspector.inspect(original_command, command) logger.info(f"Executing command: {command}") if options.interactive_inputs: diff --git a/src/frostfs_testlib/shell/ssh_shell.py b/src/frostfs_testlib/shell/ssh_shell.py index 6ef3dfb0..5771274d 100644 --- a/src/frostfs_testlib/shell/ssh_shell.py +++ b/src/frostfs_testlib/shell/ssh_shell.py @@ -126,8 +126,10 @@ class SSHShell(Shell): def exec(self, command: str, options: Optional[CommandOptions] = None) -> CommandResult: options = options or CommandOptions() - for inspector in self.command_inspectors: - command = inspector.inspect(command) + original_command = command + extra_inspectors = options.extra_inspectors if options.extra_inspectors else [] + for inspector in [*self.command_inspectors, *extra_inspectors]: + command = inspector.inspect(original_command, command) if options.interactive_inputs: result = self._exec_interactive(command, options) diff --git a/src/frostfs_testlib/storage/controllers/background_load_controller.py b/src/frostfs_testlib/storage/controllers/background_load_controller.py index a2336be1..6cedd0f2 100644 --- a/src/frostfs_testlib/storage/controllers/background_load_controller.py +++ b/src/frostfs_testlib/storage/controllers/background_load_controller.py @@ -1,50 +1,37 @@ import copy -import time +from typing import Optional import frostfs_testlib.resources.optionals as optionals -from frostfs_testlib.load.k6 import K6 +from frostfs_testlib.load.interfaces import ScenarioRunner from frostfs_testlib.load.load_config import ( EndpointSelectionStrategy, - K6ProcessAllocationStrategy, LoadParams, LoadScenario, LoadType, ) from frostfs_testlib.load.load_report import LoadReport -from frostfs_testlib.load.load_steps import init_s3_client, prepare_k6_instances from frostfs_testlib.load.load_verifiers import LoadVerifier from frostfs_testlib.reporter import get_reporter -from frostfs_testlib.resources.load_params import ( - K6_TEARDOWN_PERIOD, - LOAD_NODE_SSH_PASSWORD, - LOAD_NODE_SSH_PRIVATE_KEY_PASSPHRASE, - LOAD_NODE_SSH_PRIVATE_KEY_PATH, - LOAD_NODE_SSH_USER, - LOAD_NODES, -) -from frostfs_testlib.shell.interfaces import SshCredentials from frostfs_testlib.storage.cluster import ClusterNode from frostfs_testlib.storage.dataclasses.frostfs_services import S3Gate, StorageNode from frostfs_testlib.storage.dataclasses.wallet import WalletInfo from frostfs_testlib.testing.test_control import run_optionally -from frostfs_testlib.utils import datetime_utils reporter = get_reporter() class BackgroundLoadController: - k6_instances: list[K6] k6_dir: str load_params: LoadParams original_load_params: LoadParams - load_nodes: list[str] verification_params: LoadParams nodes_under_load: list[ClusterNode] load_counter: int - ssh_credentials: SshCredentials loaders_wallet: WalletInfo load_summaries: dict endpoints: list[str] + runner: ScenarioRunner + started: bool def __init__( self, @@ -52,15 +39,16 @@ class BackgroundLoadController: load_params: LoadParams, loaders_wallet: WalletInfo, nodes_under_load: list[ClusterNode], + runner: ScenarioRunner, ) -> None: self.k6_dir = k6_dir self.original_load_params = load_params self.load_params = copy.deepcopy(self.original_load_params) self.nodes_under_load = nodes_under_load self.load_counter = 1 - self.load_nodes = LOAD_NODES self.loaders_wallet = loaders_wallet - + self.runner = runner + self.started = False if load_params.endpoint_selection_strategy is None: raise RuntimeError("endpoint_selection_strategy should not be None") @@ -68,13 +56,6 @@ class BackgroundLoadController: load_params.load_type, load_params.endpoint_selection_strategy ) - self.ssh_credentials = SshCredentials( - LOAD_NODE_SSH_USER, - LOAD_NODE_SSH_PASSWORD, - LOAD_NODE_SSH_PRIVATE_KEY_PATH, - LOAD_NODE_SSH_PRIVATE_KEY_PASSPHRASE, - ) - @run_optionally(optionals.OPTIONAL_BACKGROUND_LOAD_ENABLED, []) def _get_endpoints( self, load_type: LoadType, endpoint_selection_strategy: EndpointSelectionStrategy @@ -116,69 +97,28 @@ class BackgroundLoadController: return all_endpoints[load_type][endpoint_selection_strategy] @run_optionally(optionals.OPTIONAL_BACKGROUND_LOAD_ENABLED) - @reporter.step_deco("Prepare background load instances") + @reporter.step_deco("Prepare load instances") def prepare(self): - if self.load_params.load_type == LoadType.S3: - init_s3_client( - self.load_nodes, - self.load_params, - self.k6_dir, - self.ssh_credentials, - self.nodes_under_load, - self.loaders_wallet, - ) - - self._prepare(self.load_params) - - def _prepare(self, load_params: LoadParams): - self.k6_instances = prepare_k6_instances( - load_nodes=LOAD_NODES, - ssh_credentials=self.ssh_credentials, - k6_dir=self.k6_dir, - load_params=load_params, - endpoints=self.endpoints, - loaders_wallet=self.loaders_wallet, - ) + self.runner.prepare(self.load_params, self.nodes_under_load, self.k6_dir) + self.runner.init_k6_instances(self.load_params, self.endpoints, self.k6_dir) @run_optionally(optionals.OPTIONAL_BACKGROUND_LOAD_ENABLED) - @reporter.step_deco("Start background load") def start(self): - if self.load_params.preset is None: - raise RuntimeError("Preset should not be none at the moment of start") - - with reporter.step( - f"Start background load on nodes {self.nodes_under_load}: " - f"writers = {self.load_params.writers}, " - f"obj_size = {self.load_params.object_size}, " - f"load_time = {self.load_params.load_time}, " - f"prepare_json = {self.load_params.preset.pregen_json}, " - f"endpoints = {self.endpoints}" - ): - for k6_load_instance in self.k6_instances: - k6_load_instance.start() - - wait_after_start_time = datetime_utils.parse_time(self.load_params.setup_timeout) + 5 - with reporter.step( - f"Wait for start timeout + couple more seconds ({wait_after_start_time}) before moving on" - ): - time.sleep(wait_after_start_time) + with reporter.step(f"Start load on nodes {self.nodes_under_load}"): + self.runner.start() + self.started = True @run_optionally(optionals.OPTIONAL_BACKGROUND_LOAD_ENABLED) - @reporter.step_deco("Stop background load") + @reporter.step_deco("Stop load") def stop(self): - for k6_load_instance in self.k6_instances: - k6_load_instance.stop() + self.runner.stop() @run_optionally(optionals.OPTIONAL_BACKGROUND_LOAD_ENABLED, True) - def is_running(self): - for k6_load_instance in self.k6_instances: - if not k6_load_instance.is_running: - return False - - return True + def is_running(self) -> bool: + return self.runner.is_running @run_optionally(optionals.OPTIONAL_BACKGROUND_LOAD_ENABLED) - @reporter.step_deco("Reset background load") + @reporter.step_deco("Reset load") def _reset_for_consequent_load(self): """This method is required if we want to run multiple loads during test run. Raise load counter by 1 and append it to load_id @@ -188,25 +128,25 @@ class BackgroundLoadController: self.load_params.set_id(f"{self.load_params.load_id}_{self.load_counter}") @run_optionally(optionals.OPTIONAL_BACKGROUND_LOAD_ENABLED) - @reporter.step_deco("Startup background load") + @reporter.step_deco("Startup load") def startup(self): self.prepare() self.start() @run_optionally(optionals.OPTIONAL_BACKGROUND_LOAD_ENABLED) - @reporter.step_deco("Stop and get results of background load") - def teardown(self, load_report: LoadReport = None): - if not self.k6_instances: + @reporter.step_deco("Stop and get results of load") + def teardown(self, load_report: Optional[LoadReport] = None): + if not self.started: return self.stop() - self.load_summaries = self.get_results() - self.k6_instances = [] + self.load_summaries = self._get_results() + self.started = False if load_report: load_report.add_summaries(self.load_summaries) @run_optionally(optionals.OPTIONAL_BACKGROUND_LOAD_ENABLED) - @reporter.step_deco("Verify results of background load") + @reporter.step_deco("Verify results of load") def verify(self): try: if self.load_params.verify: @@ -220,9 +160,10 @@ class BackgroundLoadController: working_dir=self.load_params.working_dir, endpoint_selection_strategy=self.load_params.endpoint_selection_strategy, k6_process_allocation_strategy=self.load_params.k6_process_allocation_strategy, + setup_timeout="1s", ) self._run_verify_scenario() - verification_summaries = self.get_results() + verification_summaries = self._get_results() self.verify_summaries(self.load_summaries, verification_summaries) finally: self._reset_for_consequent_load() @@ -239,38 +180,20 @@ class BackgroundLoadController: @run_optionally(optionals.OPTIONAL_BACKGROUND_LOAD_ENABLED) def wait_until_finish(self): - if self.load_params.load_time is None: - raise RuntimeError("LoadTime should not be none") - - for k6_instance in self.k6_instances: - k6_instance.wait_until_finished(self.load_params.load_time + int(K6_TEARDOWN_PERIOD)) + self.runner.wait_until_finish() @run_optionally(optionals.OPTIONAL_BACKGROUND_LOAD_ENABLED) - @reporter.step_deco("Run verify scenario for background load") + @reporter.step_deco("Run verify scenario") def _run_verify_scenario(self): if self.verification_params.verify_time is None: raise RuntimeError("verify_time should not be none") - self._prepare(self.verification_params) - with reporter.step("Run verify background load data"): - for k6_verify_instance in self.k6_instances: - k6_verify_instance.start() - k6_verify_instance.wait_until_finished(self.verification_params.verify_time) + self.runner.init_k6_instances(self.verification_params, self.endpoints, self.k6_dir) + with reporter.step("Run verify load data"): + self.runner.start() + self.runner.wait_until_finish() @run_optionally(optionals.OPTIONAL_BACKGROUND_LOAD_ENABLED) - @reporter.step_deco("K6 run results") - def get_results(self) -> dict: - results = {} - for k6_instance in self.k6_instances: - if k6_instance.load_params.k6_process_allocation_strategy is None: - raise RuntimeError("k6_process_allocation_strategy should not be none") - - result = k6_instance.get_results() - keys_map = { - K6ProcessAllocationStrategy.PER_LOAD_NODE: k6_instance.load_node, - K6ProcessAllocationStrategy.PER_ENDPOINT: k6_instance.endpoints[0], - } - key = keys_map[k6_instance.load_params.k6_process_allocation_strategy] - results[key] = result - - return results + @reporter.step_deco("Get load results") + def _get_results(self) -> dict: + return self.runner.get_results() diff --git a/src/frostfs_testlib/storage/controllers/cluster_state_controller.py b/src/frostfs_testlib/storage/controllers/cluster_state_controller.py index 10845527..6126f9d7 100644 --- a/src/frostfs_testlib/storage/controllers/cluster_state_controller.py +++ b/src/frostfs_testlib/storage/controllers/cluster_state_controller.py @@ -23,7 +23,7 @@ class ClusterStateController: self.stopped_nodes: list[ClusterNode] = [] self.detached_disks: dict[str, DiskController] = {} self.stopped_storage_nodes: list[ClusterNode] = [] - self.stopped_s3_gate: list[ClusterNode] = [] + self.stopped_s3_gates: list[ClusterNode] = [] self.cluster = cluster self.shell = shell self.suspended_services: dict[str, list[ClusterNode]] = {} @@ -60,6 +60,16 @@ class ClusterStateController: for node in nodes: self.stop_storage_service(node) + @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) + @reporter.step_deco("Stop all S3 gates on cluster") + def stop_all_s3_gates(self, reversed_order: bool = False): + nodes = ( + reversed(self.cluster.cluster_nodes) if reversed_order else self.cluster.cluster_nodes + ) + + for node in nodes: + self.stop_s3_gate(node) + @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @reporter.step_deco("Start host of node {node}") def start_node_host(self, node: ClusterNode): @@ -72,10 +82,18 @@ class ClusterStateController: @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @reporter.step_deco("Start stopped hosts") def start_stopped_hosts(self, reversed_order: bool = False): + if not self.stopped_nodes: + return + nodes = reversed(self.stopped_nodes) if reversed_order else self.stopped_nodes for node in nodes: with reporter.step(f"Start host {node.host.config.address}"): node.host.start_host() + if node in self.stopped_storage_nodes: + self.stopped_storage_nodes.remove(node) + + if node in self.stopped_s3_gates: + self.stopped_s3_gates.remove(node) self.stopped_nodes = [] wait_all_storage_nodes_returned(self.shell, self.cluster) @@ -115,44 +133,51 @@ class ClusterStateController: @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @reporter.step_deco("Start stopped storage services") def start_stopped_storage_services(self): - if self.stopped_storage_nodes: - # In case if we stopped couple services, for example (s01-s04): - # After starting only s01, it may require connections to s02-s04, which is still down, and fail to start. - # Also, if something goes wrong here, we might skip s02-s04 start at all, and cluster will be left in a bad state. - # So in order to make sure that services are at least attempted to be started, using threads here. - with ThreadPoolExecutor(max_workers=len(self.stopped_storage_nodes)) as executor: - start_result = executor.map(self.start_storage_service, self.stopped_storage_nodes) + if not self.stopped_storage_nodes: + return - # Looks tricky, but if exception is raised in any thread, it will be "eaten" by ThreadPoolExecutor, - # But will be thrown here. - # Not ideal solution, but okay for now - for _ in start_result: - pass + # In case if we stopped couple services, for example (s01-s04): + # After starting only s01, it may require connections to s02-s04, which is still down, and fail to start. + # Also, if something goes wrong here, we might skip s02-s04 start at all, and cluster will be left in a bad state. + # So in order to make sure that services are at least attempted to be started, using threads here. + with ThreadPoolExecutor(max_workers=len(self.stopped_storage_nodes)) as executor: + start_result = executor.map(self.start_storage_service, self.stopped_storage_nodes) + + # Looks tricky, but if exception is raised in any thread, it will be "eaten" by ThreadPoolExecutor, + # But will be thrown here. + # Not ideal solution, but okay for now + for _ in start_result: + pass wait_all_storage_nodes_returned(self.shell, self.cluster) self.stopped_storage_nodes = [] - @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @reporter.step_deco("Stop s3 gate on {node}") def stop_s3_gate(self, node: ClusterNode): node.s3_gate.stop_service() - self.stopped_s3_gate.append(node) + self.stopped_s3_gates.append(node) @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @reporter.step_deco("Start s3 gate on {node}") def start_s3_gate(self, node: ClusterNode): node.s3_gate.start_service() - self.stopped_s3_gate.remove(node) - + self.stopped_s3_gates.remove(node) @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @reporter.step_deco("Start stopped S3 gates") - def start_stopped_s3_gate(self): - # not sure if we need here to use threads like in start_stopped_storage_services - for s3_gate in self.stopped_s3_gate: - s3_gate.start_service() - self.stopped_s3_gate = [] + def start_stopped_s3_gates(self): + if not self.stopped_s3_gates: + return + + with ThreadPoolExecutor(max_workers=len(self.stopped_s3_gates)) as executor: + start_result = executor.map(self.start_s3_gate, self.stopped_s3_gates) + + # Looks tricky, but if exception is raised in any thread, it will be "eaten" by ThreadPoolExecutor, + # But will be thrown here. + # Not ideal solution, but okay for now + for _ in start_result: + pass @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @reporter.step_deco("Suspend {process_name} service in {node}")