diff --git a/src/frostfs_testlib/load/__init__.py b/src/frostfs_testlib/load/__init__.py index ca2f120..8477ee4 100644 --- a/src/frostfs_testlib/load/__init__.py +++ b/src/frostfs_testlib/load/__init__.py @@ -1,4 +1,5 @@ -from frostfs_testlib.load.interfaces import Loader, ScenarioRunner +from frostfs_testlib.load.interfaces.loader import Loader +from frostfs_testlib.load.interfaces.scenario_runner import ScenarioRunner from frostfs_testlib.load.load_config import ( EndpointSelectionStrategy, K6ProcessAllocationStrategy, diff --git a/src/frostfs_testlib/load/interfaces/loader.py b/src/frostfs_testlib/load/interfaces/loader.py new file mode 100644 index 0000000..2c818d9 --- /dev/null +++ b/src/frostfs_testlib/load/interfaces/loader.py @@ -0,0 +1,14 @@ +from abc import ABC, abstractmethod + +from frostfs_testlib.shell.interfaces import Shell + + +class Loader(ABC): + @abstractmethod + def get_shell(self) -> Shell: + """Get shell for the loader""" + + @property + @abstractmethod + def ip(self): + """Get address of the loader""" diff --git a/src/frostfs_testlib/load/interfaces.py b/src/frostfs_testlib/load/interfaces/scenario_runner.py similarity index 79% rename from src/frostfs_testlib/load/interfaces.py rename to src/frostfs_testlib/load/interfaces/scenario_runner.py index 394fff7..45c1317 100644 --- a/src/frostfs_testlib/load/interfaces.py +++ b/src/frostfs_testlib/load/interfaces/scenario_runner.py @@ -1,20 +1,8 @@ from abc import ABC, abstractmethod +from frostfs_testlib.load.k6 import K6 from frostfs_testlib.load.load_config import LoadParams -from frostfs_testlib.shell.interfaces import Shell from frostfs_testlib.storage.cluster import ClusterNode -from frostfs_testlib.storage.dataclasses.wallet import WalletInfo - - -class Loader(ABC): - @abstractmethod - def get_shell(self) -> Shell: - """Get shell for the loader""" - - @property - @abstractmethod - def ip(self): - """Get address of the loader""" class ScenarioRunner(ABC): @@ -32,6 +20,10 @@ class ScenarioRunner(ABC): def init_k6_instances(self, load_params: LoadParams, endpoints: list[str], k6_dir: str): """Init K6 instances""" + @abstractmethod + def get_k6_instances(self) -> list[K6]: + """Get K6 instances""" + @abstractmethod def start(self): """Start K6 instances""" diff --git a/src/frostfs_testlib/load/k6.py b/src/frostfs_testlib/load/k6.py index e46221e..3dedd53 100644 --- a/src/frostfs_testlib/load/k6.py +++ b/src/frostfs_testlib/load/k6.py @@ -8,13 +8,8 @@ from time import sleep from typing import Any from urllib.parse import urlparse -from frostfs_testlib.load.interfaces import Loader -from frostfs_testlib.load.load_config import ( - K6ProcessAllocationStrategy, - LoadParams, - LoadScenario, - LoadType, -) +from frostfs_testlib.load.interfaces.loader import Loader +from frostfs_testlib.load.load_config import K6ProcessAllocationStrategy, LoadParams, LoadScenario, LoadType from frostfs_testlib.processes.remote_process import RemoteProcess from frostfs_testlib.reporter import get_reporter from frostfs_testlib.resources.common import STORAGE_USER_NAME @@ -59,6 +54,7 @@ class K6: self.loader: Loader = loader self.shell: Shell = shell self.wallet = wallet + self.preset_output: str = "" self.summary_json: str = os.path.join( self.load_params.working_dir, f"{self.load_params.load_id}_{self.load_params.scenario.value}_summary.json", @@ -101,10 +97,10 @@ class K6: command = " ".join(command_args) result = self.shell.exec(command) - assert ( - result.return_code == EXIT_RESULT_CODE - ), f"Return code of preset is not zero: {result.stdout}" - return result.stdout.strip("\n") + assert result.return_code == EXIT_RESULT_CODE, f"Return code of preset is not zero: {result.stdout}" + + self.preset_output = result.stdout.strip("\n") + return self.preset_output @reporter.step_deco("Generate K6 command") def _generate_env_variables(self) -> str: @@ -113,31 +109,21 @@ class K6: env_vars[f"{self.load_params.load_type.value.upper()}_ENDPOINTS"] = ",".join(self.endpoints) env_vars["SUMMARY_JSON"] = self.summary_json - reporter.attach( - "\n".join(f"{param}: {value}" for param, value in env_vars.items()), "K6 ENV variables" - ) - return " ".join( - [f"-e {param}='{value}'" for param, value in env_vars.items() if value is not None] - ) + reporter.attach("\n".join(f"{param}: {value}" for param, value in env_vars.items()), "K6 ENV variables") + return " ".join([f"-e {param}='{value}'" for param, value in env_vars.items() if value is not None]) def start(self) -> None: - with reporter.step( - f"Start load from loader {self.loader.ip} on endpoints {self.endpoints}" - ): + with reporter.step(f"Start load from loader {self.loader.ip} on endpoints {self.endpoints}"): self._start_time = int(datetime.utcnow().timestamp()) command = ( f"{self._k6_dir}/k6 run {self._generate_env_variables()} " f"{self._k6_dir}/scenarios/{self.load_params.scenario.value}.js" ) user = STORAGE_USER_NAME if self.load_params.scenario == LoadScenario.LOCAL else None - self._k6_process = RemoteProcess.create( - command, self.shell, self.load_params.working_dir, user - ) + self._k6_process = RemoteProcess.create(command, self.shell, self.load_params.working_dir, user) def wait_until_finished(self, soft_timeout: int = 0) -> None: - with reporter.step( - f"Wait until load is finished from loader {self.loader.ip} on endpoints {self.endpoints}" - ): + with reporter.step(f"Wait until load is finished from loader {self.loader.ip} on endpoints {self.endpoints}"): if self.load_params.scenario == LoadScenario.VERIFY: timeout = self.load_params.verify_time or 0 else: @@ -180,9 +166,11 @@ class K6: while timeout > 0: if not self._k6_process.running(): return - remaining_time_hours = f"{timeout//3600}h" if timeout//3600 != 0 else "" - remaining_time_minutes = f"{timeout//60%60}m" if timeout//60%60 != 0 else "" - logger.info(f"K6 is running. Remaining time {remaining_time_hours}{remaining_time_minutes}{timeout%60}s. Next check after {wait_interval} seconds...") + remaining_time_hours = f"{timeout//3600}h" if timeout // 3600 != 0 else "" + remaining_time_minutes = f"{timeout//60%60}m" if timeout // 60 % 60 != 0 else "" + logger.info( + f"K6 is running. Remaining time {remaining_time_hours}{remaining_time_minutes}{timeout%60}s. Next check after {wait_interval} seconds..." + ) sleep(wait_interval) timeout -= min(timeout, wait_interval) wait_interval = max( @@ -198,9 +186,7 @@ class K6: raise TimeoutError(f"Expected K6 to finish after {original_timeout} sec.") def get_results(self) -> Any: - with reporter.step( - f"Get load results from loader {self.loader.ip} on endpoints {self.endpoints}" - ): + with reporter.step(f"Get load results from loader {self.loader.ip} on endpoints {self.endpoints}"): self.__log_output() if not self.summary_json: @@ -231,9 +217,7 @@ class K6: return False @reporter.step_deco("Wait until K6 process end") - @wait_for_success( - K6_STOP_SIGNAL_TIMEOUT, 15, False, False, "Can not stop K6 process within timeout" - ) + @wait_for_success(K6_STOP_SIGNAL_TIMEOUT, 15, False, False, "Can not stop K6 process within timeout") def _wait_until_process_end(self): return self._k6_process.running() diff --git a/src/frostfs_testlib/load/loaders.py b/src/frostfs_testlib/load/loaders.py index 9e92155..1e0e97f 100644 --- a/src/frostfs_testlib/load/loaders.py +++ b/src/frostfs_testlib/load/loaders.py @@ -1,4 +1,4 @@ -from frostfs_testlib.load.interfaces import Loader +from frostfs_testlib.load.interfaces.loader import Loader from frostfs_testlib.resources.load_params import ( LOAD_NODE_SSH_PASSWORD, LOAD_NODE_SSH_PRIVATE_KEY_PASSPHRASE, diff --git a/src/frostfs_testlib/load/runners.py b/src/frostfs_testlib/load/runners.py index 982cfcc..ea5a374 100644 --- a/src/frostfs_testlib/load/runners.py +++ b/src/frostfs_testlib/load/runners.py @@ -10,7 +10,8 @@ from urllib.parse import urlparse import yaml from frostfs_testlib.cli.frostfs_authmate.authmate import FrostfsAuthmate -from frostfs_testlib.load.interfaces import Loader, ScenarioRunner +from frostfs_testlib.load.interfaces.loader import Loader +from frostfs_testlib.load.interfaces.scenario_runner import ScenarioRunner from frostfs_testlib.load.k6 import K6 from frostfs_testlib.load.load_config import K6ProcessAllocationStrategy, LoadParams, LoadType from frostfs_testlib.load.loaders import NodeLoader, RemoteLoader @@ -50,6 +51,9 @@ class RunnerBase(ScenarioRunner): return any([future.result() for future in futures]) + def get_k6_instances(self): + return self.k6_instances + class DefaultRunner(RunnerBase): loaders: list[Loader] @@ -391,6 +395,7 @@ class LocalRunner(RunnerBase): return results + class S3LocalRunner(LocalRunner): endpoints: list[str] k6_dir: str @@ -404,7 +409,8 @@ class S3LocalRunner(LocalRunner): @reporter.step_deco("Resolve containers in preset") def _resolve_containers_in_preset(self, k6_instance: K6): k6_instance.shell.exec( - f"sudo {self.k6_dir}/scenarios/preset/resolve_containers_in_preset.py --endpoint {k6_instance.endpoints[0]} --preset_file {k6_instance.load_params.preset.pregen_json}") + f"sudo {self.k6_dir}/scenarios/preset/resolve_containers_in_preset.py --endpoint {k6_instance.endpoints[0]} --preset_file {k6_instance.load_params.preset.pregen_json}" + ) @reporter.step_deco("Init k6 instances") def init_k6_instances(self, load_params: LoadParams, endpoints: list[str], k6_dir: str): @@ -426,9 +432,9 @@ class S3LocalRunner(LocalRunner): # If we chmod /home/ folder we can no longer ssh to the node # !! IMPORTANT !! if ( - load_params.working_dir - and not load_params.working_dir == f"/home/{LOAD_NODE_SSH_USER}" - and not load_params.working_dir == f"/home/{LOAD_NODE_SSH_USER}/" + load_params.working_dir + and not load_params.working_dir == f"/home/{LOAD_NODE_SSH_USER}" + and not load_params.working_dir == f"/home/{LOAD_NODE_SSH_USER}/" ): shell.exec(f"sudo chmod -R 777 {load_params.working_dir}") @@ -444,30 +450,25 @@ class S3LocalRunner(LocalRunner): @run_optionally(optionals.OPTIONAL_BACKGROUND_LOAD_ENABLED) @reporter.step_deco("Preparation steps") def prepare( - self, - load_params: LoadParams, - cluster_nodes: list[ClusterNode], - nodes_under_load: list[ClusterNode], - k6_dir: str, + self, + load_params: LoadParams, + cluster_nodes: list[ClusterNode], + nodes_under_load: list[ClusterNode], + k6_dir: str, ): self.k6_dir = k6_dir with reporter.step("Init s3 client on loaders"): storage_node = nodes_under_load[0].service(StorageNode) - s3_public_keys = [ - node.service(S3Gate).get_wallet_public_key() for node in cluster_nodes - ] + s3_public_keys = [node.service(S3Gate).get_wallet_public_key() for node in cluster_nodes] grpc_peer = storage_node.get_rpc_endpoint() parallel(self.prepare_node, nodes_under_load, k6_dir, load_params, s3_public_keys, grpc_peer) @reporter.step_deco("Prepare node {cluster_node}") - def prepare_node(self, - cluster_node: ClusterNode, - k6_dir: str, - load_params: LoadParams, - s3_public_keys: list[str], - grpc_peer: str): - LocalRunner.prepare_node(self,cluster_node, k6_dir, load_params) + def prepare_node( + self, cluster_node: ClusterNode, k6_dir: str, load_params: LoadParams, s3_public_keys: list[str], grpc_peer: str + ): + LocalRunner.prepare_node(self, cluster_node, k6_dir, load_params) self.endpoints = cluster_node.s3_gate.get_all_endpoints() shell = cluster_node.host.get_shell() @@ -497,9 +498,9 @@ class S3LocalRunner(LocalRunner): wallet_password=self.wallet.password, ).stdout aws_access_key_id = str( - re.search( - r"access_key_id.*:\s.(?P\w*)", issue_secret_output - ).group("aws_access_key_id") + re.search(r"access_key_id.*:\s.(?P\w*)", issue_secret_output).group( + "aws_access_key_id" + ) ) aws_secret_access_key = str( re.search( @@ -509,10 +510,8 @@ class S3LocalRunner(LocalRunner): ) configure_input = [ InteractiveInput(prompt_pattern=r"AWS Access Key ID.*", input=aws_access_key_id), - InteractiveInput( - prompt_pattern=r"AWS Secret Access Key.*", input=aws_secret_access_key - ), + InteractiveInput(prompt_pattern=r"AWS Secret Access Key.*", input=aws_secret_access_key), InteractiveInput(prompt_pattern=r".*", input=""), InteractiveInput(prompt_pattern=r".*", input=""), ] - shell.exec("aws configure", CommandOptions(interactive_inputs=configure_input)) \ No newline at end of file + shell.exec("aws configure", CommandOptions(interactive_inputs=configure_input)) diff --git a/src/frostfs_testlib/storage/controllers/background_load_controller.py b/src/frostfs_testlib/storage/controllers/background_load_controller.py index a18a603..8ecada8 100644 --- a/src/frostfs_testlib/storage/controllers/background_load_controller.py +++ b/src/frostfs_testlib/storage/controllers/background_load_controller.py @@ -2,13 +2,8 @@ import copy from typing import Optional import frostfs_testlib.resources.optionals as optionals -from frostfs_testlib.load.interfaces import ScenarioRunner -from frostfs_testlib.load.load_config import ( - EndpointSelectionStrategy, - LoadParams, - LoadScenario, - LoadType, -) +from frostfs_testlib.load.interfaces.scenario_runner import ScenarioRunner +from frostfs_testlib.load.load_config import EndpointSelectionStrategy, LoadParams, LoadScenario, LoadType from frostfs_testlib.load.load_report import LoadReport from frostfs_testlib.load.load_verifiers import LoadVerifier from frostfs_testlib.reporter import get_reporter @@ -56,9 +51,7 @@ class BackgroundLoadController: raise RuntimeError("endpoint_selection_strategy should not be None") @run_optionally(optionals.OPTIONAL_BACKGROUND_LOAD_ENABLED, []) - def _get_endpoints( - self, load_type: LoadType, endpoint_selection_strategy: EndpointSelectionStrategy - ): + def _get_endpoints(self, load_type: LoadType, endpoint_selection_strategy: EndpointSelectionStrategy): all_endpoints = { LoadType.gRPC: { EndpointSelectionStrategy.ALL: list( @@ -85,10 +78,7 @@ class BackgroundLoadController: ) ), EndpointSelectionStrategy.FIRST: list( - set( - node_under_load.service(S3Gate).get_endpoint() - for node_under_load in self.nodes_under_load - ) + set(node_under_load.service(S3Gate).get_endpoint() for node_under_load in self.nodes_under_load) ), }, } @@ -98,12 +88,8 @@ class BackgroundLoadController: @run_optionally(optionals.OPTIONAL_BACKGROUND_LOAD_ENABLED) @reporter.step_deco("Prepare load instances") def prepare(self): - self.endpoints = self._get_endpoints( - self.load_params.load_type, self.load_params.endpoint_selection_strategy - ) - self.runner.prepare( - self.load_params, self.cluster_nodes, self.nodes_under_load, self.k6_dir - ) + self.endpoints = self._get_endpoints(self.load_params.load_type, self.load_params.endpoint_selection_strategy) + self.runner.prepare(self.load_params, self.cluster_nodes, self.nodes_under_load, self.k6_dir) self.runner.init_k6_instances(self.load_params, self.endpoints, self.k6_dir) @run_optionally(optionals.OPTIONAL_BACKGROUND_LOAD_ENABLED) diff --git a/src/frostfs_testlib/storage/controllers/cluster_state_controller.py b/src/frostfs_testlib/storage/controllers/cluster_state_controller.py index 825f2ac..000bdd8 100644 --- a/src/frostfs_testlib/storage/controllers/cluster_state_controller.py +++ b/src/frostfs_testlib/storage/controllers/cluster_state_controller.py @@ -109,12 +109,14 @@ class ClusterStateController: @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @reporter.step_deco("Start host of node {node}") - def start_node_host(self, node: ClusterNode): + def start_node_host(self, node: ClusterNode, tree_healthcheck: bool = True): with reporter.step(f"Start host {node.host.config.address}"): node.host.start_host() wait_for_host_online(self.shell, node.storage_node) + self.stopped_nodes.remove(node) wait_for_node_online(node.storage_node) - self.stopped_nodes.remove(node) + if tree_healthcheck: + self.wait_tree_healthcheck() @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @reporter.step_deco("Start stopped hosts") @@ -364,7 +366,7 @@ class ClusterStateController: @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @reporter.step_deco("Hard reboot host {node} via magic SysRq option") - def panic_reboot_host(self, node: ClusterNode, wait_for_return: bool = True): + def panic_reboot_host(self, node: ClusterNode, wait_for_return: bool = True, tree_healthcheck: bool = True): shell = node.host.get_shell() shell.exec('sudo sh -c "echo 1 > /proc/sys/kernel/sysrq"') @@ -381,6 +383,8 @@ class ClusterStateController: time.sleep(10) wait_for_host_online(self.shell, node.storage_node) wait_for_node_online(node.storage_node) + if tree_healthcheck: + self.wait_tree_healthcheck() @reporter.step_deco("Down {interface} to {nodes}") def down_interface(self, nodes: list[ClusterNode], interface: str): diff --git a/src/frostfs_testlib/utils/failover_utils.py b/src/frostfs_testlib/utils/failover_utils.py index 8c6062f..27cd181 100644 --- a/src/frostfs_testlib/utils/failover_utils.py +++ b/src/frostfs_testlib/utils/failover_utils.py @@ -12,6 +12,7 @@ from frostfs_testlib.steps.node_management import storage_node_healthcheck from frostfs_testlib.steps.storage_policy import get_nodes_with_object from frostfs_testlib.storage.cluster import Cluster, ClusterNode, NodeBase, StorageNode from frostfs_testlib.storage.dataclasses.frostfs_services import MorphChain +from frostfs_testlib.testing.parallel import parallel from frostfs_testlib.testing.test_control import retry, wait_for_success from frostfs_testlib.utils.datetime_utils import parse_time @@ -26,12 +27,17 @@ def ping_host(shell: Shell, host: Host): return shell.exec(f"ping {host.config.address} -c 1", options).return_code +# TODO: Move to ClusterStateController @reporter.step_deco("Wait for storage nodes returned to cluster") def wait_all_storage_nodes_returned(shell: Shell, cluster: Cluster) -> None: - for node in cluster.services(StorageNode): - with reporter.step(f"Run health check for storage at '{node}'"): - wait_for_host_online(shell, node) - wait_for_node_online(node) + nodes = cluster.services(StorageNode) + parallel(_wait_for_storage_node, nodes, shell=shell) + + +@reporter.step_deco("Run health check for storage at '{node}'") +def _wait_for_storage_node(node: StorageNode, shell: Shell) -> None: + wait_for_host_online(shell, node) + wait_for_node_online(node) @retry(max_attempts=60, sleep_interval=5, expected_result=0) @@ -64,10 +70,17 @@ def wait_for_node_online(node: StorageNode): except Exception as err: logger.warning(f"Node healthcheck fails with error {err}") return False + finally: + gather_socket_info(node) return health_check.health_status == "READY" and health_check.network_status == "ONLINE" +@reporter.step_deco("Gather socket info for {node}") +def gather_socket_info(node: StorageNode): + node.host.get_shell().exec("ss -tuln | grep 8080", CommandOptions(check=False)) + + @reporter.step_deco("Check and return status of given service") def service_status(service: str, shell: Shell) -> str: return shell.exec(f"sudo systemctl is-active {service}").stdout.rstrip() @@ -139,9 +152,7 @@ def multiple_restart( service_name = node.service(service_type).name for _ in range(count): node.host.restart_service(service_name) - logger.info( - f"Restart {service_systemctl_name}; sleep {sleep_interval} seconds and continue" - ) + logger.info(f"Restart {service_systemctl_name}; sleep {sleep_interval} seconds and continue") sleep(sleep_interval) @@ -164,9 +175,7 @@ def check_services_status(service_list: list[str], expected_status: str, shell: @reporter.step_deco("Wait for active status of passed service") @wait_for_success(60, 5) -def wait_service_in_desired_state( - service: str, shell: Shell, expected_status: Optional[str] = "active" -): +def wait_service_in_desired_state(service: str, shell: Shell, expected_status: Optional[str] = "active"): real_status = service_status(service=service, shell=shell) assert ( expected_status == real_status