diff --git a/src/frostfs_testlib/load/runners.py b/src/frostfs_testlib/load/runners.py index b65f1295..635247e3 100644 --- a/src/frostfs_testlib/load/runners.py +++ b/src/frostfs_testlib/load/runners.py @@ -3,7 +3,6 @@ import itertools import math import re import time -from concurrent.futures import ThreadPoolExecutor from dataclasses import fields from typing import Optional from urllib.parse import urlparse @@ -24,12 +23,14 @@ from frostfs_testlib.resources.load_params import ( LOAD_NODE_SSH_USER, LOAD_NODES, ) +from frostfs_testlib.shell.command_inspectors import SuInspector from frostfs_testlib.shell.interfaces import CommandOptions, InteractiveInput from frostfs_testlib.storage.cluster import ClusterNode from frostfs_testlib.storage.controllers.cluster_state_controller import ClusterStateController from frostfs_testlib.storage.dataclasses.frostfs_services import S3Gate, StorageNode from frostfs_testlib.storage.dataclasses.wallet import WalletInfo from frostfs_testlib.testing import parallel, run_optionally +from frostfs_testlib.testing.test_control import retry from frostfs_testlib.utils import FileKeeper, datetime_utils reporter = get_reporter() @@ -296,40 +297,53 @@ class LocalRunner(RunnerBase): nodes_under_load: list[ClusterNode], k6_dir: str, ): - @reporter.step_deco("Prepare node {cluster_node}") - def prepare_node(cluster_node: ClusterNode): - shell = cluster_node.host.get_shell() + parallel(self.prepare_node, nodes_under_load, k6_dir, load_params) - with reporter.step("Allow storage user to login into system"): - shell.exec(f"sudo chsh -s /bin/bash {STORAGE_USER_NAME}") - shell.exec("sudo chattr +i /etc/passwd") + @retry(3, 5, expected_result=True) + def allow_user_to_login_in_system(self, cluster_node: ClusterNode): + shell = cluster_node.host.get_shell() - with reporter.step("Update limits.conf"): - limits_path = "/etc/security/limits.conf" - self.file_keeper.add(cluster_node.storage_node, limits_path) - content = f"{STORAGE_USER_NAME} hard nofile 65536\n{STORAGE_USER_NAME} soft nofile 65536\n" - shell.exec(f"echo '{content}' | sudo tee {limits_path}") + result = None + try: + shell.exec(f"sudo chsh -s /bin/bash {STORAGE_USER_NAME}") + self.lock_passwd_on_node(cluster_node) + options = CommandOptions(check=False, extra_inspectors=[SuInspector(STORAGE_USER_NAME)]) + result = shell.exec("whoami", options) + finally: + if not result or result.return_code: + self.restore_passwd_on_node(cluster_node) + return False - with reporter.step("Download K6"): - shell.exec(f"sudo rm -rf {k6_dir};sudo mkdir {k6_dir}") - shell.exec(f"sudo curl -so {k6_dir}/k6.tar.gz {load_params.k6_url}") - shell.exec(f"sudo tar xf {k6_dir}/k6.tar.gz -C {k6_dir}") - shell.exec(f"sudo chmod -R 777 {k6_dir}") + return True - with reporter.step("Create empty_passwd"): - self.wallet = WalletInfo( - f"{k6_dir}/scenarios/files/wallet.json", "", "/tmp/empty_passwd.yml" - ) - content = yaml.dump({"password": ""}) - shell.exec(f'echo "{content}" | sudo tee {self.wallet.config_path}') - shell.exec(f"sudo chmod -R 777 {self.wallet.config_path}") + @reporter.step_deco("Prepare node {cluster_node}") + def prepare_node(self, cluster_node: ClusterNode, k6_dir: str, load_params: LoadParams): + shell = cluster_node.host.get_shell() - with ThreadPoolExecutor(max_workers=len(nodes_under_load)) as executor: - result = executor.map(prepare_node, nodes_under_load) + with reporter.step("Allow storage user to login into system"): + self.allow_user_to_login_in_system(cluster_node) - # Check for exceptions - for _ in result: - pass + with reporter.step("Update limits.conf"): + limits_path = "/etc/security/limits.conf" + self.file_keeper.add(cluster_node.storage_node, limits_path) + content = ( + f"{STORAGE_USER_NAME} hard nofile 65536\n{STORAGE_USER_NAME} soft nofile 65536\n" + ) + shell.exec(f"echo '{content}' | sudo tee {limits_path}") + + with reporter.step("Download K6"): + shell.exec(f"sudo rm -rf {k6_dir};sudo mkdir {k6_dir}") + shell.exec(f"sudo curl -so {k6_dir}/k6.tar.gz {load_params.k6_url}") + shell.exec(f"sudo tar xf {k6_dir}/k6.tar.gz -C {k6_dir}") + shell.exec(f"sudo chmod -R 777 {k6_dir}") + + with reporter.step("Create empty_passwd"): + self.wallet = WalletInfo( + f"{k6_dir}/scenarios/files/wallet.json", "", "/tmp/empty_passwd.yml" + ) + content = yaml.dump({"password": ""}) + shell.exec(f'echo "{content}" | sudo tee {self.wallet.config_path}') + shell.exec(f"sudo chmod -R 777 {self.wallet.config_path}") @reporter.step_deco("Init k6 instances") def init_k6_instances(self, load_params: LoadParams, endpoints: list[str], k6_dir: str): @@ -379,16 +393,21 @@ class LocalRunner(RunnerBase): ): time.sleep(wait_after_start_time) + @reporter.step_deco("Restore passwd on {cluster_node}") + def restore_passwd_on_node(self, cluster_node: ClusterNode): + shell = cluster_node.host.get_shell() + shell.exec("sudo chattr -i /etc/passwd") + + @reporter.step_deco("Lock passwd on {cluster_node}") + def lock_passwd_on_node(self, cluster_node: ClusterNode): + shell = cluster_node.host.get_shell() + shell.exec("sudo chattr +i /etc/passwd") + def stop(self): for k6_instance in self.k6_instances: k6_instance.stop() - @reporter.step_deco("Restore passwd on {cluster_node}") - def restore_passwd_attr_on_node(cluster_node: ClusterNode): - shell = cluster_node.host.get_shell() - shell.exec("sudo chattr -i /etc/passwd") - - parallel(restore_passwd_attr_on_node, self.nodes_under_load) + parallel(self.restore_passwd_on_node, self.nodes_under_load) self.cluster_state_controller.start_stopped_storage_services() self.cluster_state_controller.start_stopped_s3_gates()