diff --git a/src/frostfs_testlib/load/runners.py b/src/frostfs_testlib/load/runners.py index 7cc1526..4634350 100644 --- a/src/frostfs_testlib/load/runners.py +++ b/src/frostfs_testlib/load/runners.py @@ -19,11 +19,7 @@ from frostfs_testlib.reporter import get_reporter from frostfs_testlib.resources import optionals from frostfs_testlib.resources.cli import FROSTFS_AUTHMATE_EXEC from frostfs_testlib.resources.common import STORAGE_USER_NAME -from frostfs_testlib.resources.load_params import ( - BACKGROUND_LOAD_VUS_COUNT_DIVISOR, - LOAD_NODE_SSH_USER, - LOAD_NODES, -) +from frostfs_testlib.resources.load_params import BACKGROUND_LOAD_VUS_COUNT_DIVISOR, LOAD_NODE_SSH_USER, LOAD_NODES from frostfs_testlib.shell.command_inspectors import SuInspector from frostfs_testlib.shell.interfaces import CommandOptions, InteractiveInput from frostfs_testlib.storage.cluster import ClusterNode @@ -87,14 +83,10 @@ class DefaultRunner(RunnerBase): with reporter.step("Init s3 client on loaders"): storage_node = nodes_under_load[0].service(StorageNode) - s3_public_keys = [ - node.service(S3Gate).get_wallet_public_key() for node in cluster_nodes - ] + s3_public_keys = [node.service(S3Gate).get_wallet_public_key() for node in cluster_nodes] grpc_peer = storage_node.get_rpc_endpoint() - parallel( - self._prepare_loader, self.loaders, load_params, grpc_peer, s3_public_keys, k6_dir - ) + parallel(self._prepare_loader, self.loaders, load_params, grpc_peer, s3_public_keys, k6_dir) def _prepare_loader( self, @@ -116,9 +108,9 @@ class DefaultRunner(RunnerBase): wallet_password=self.loaders_wallet.password, ).stdout aws_access_key_id = str( - re.search( - r"access_key_id.*:\s.(?P\w*)", issue_secret_output - ).group("aws_access_key_id") + re.search(r"access_key_id.*:\s.(?P\w*)", issue_secret_output).group( + "aws_access_key_id" + ) ) aws_secret_access_key = str( re.search( @@ -129,9 +121,7 @@ class DefaultRunner(RunnerBase): configure_input = [ InteractiveInput(prompt_pattern=r"AWS Access Key ID.*", input=aws_access_key_id), - InteractiveInput( - prompt_pattern=r"AWS Secret Access Key.*", input=aws_secret_access_key - ), + InteractiveInput(prompt_pattern=r"AWS Secret Access Key.*", input=aws_secret_access_key), InteractiveInput(prompt_pattern=r".*", input=""), InteractiveInput(prompt_pattern=r".*", input=""), ] @@ -148,16 +138,12 @@ class DefaultRunner(RunnerBase): } endpoints_generators = { K6ProcessAllocationStrategy.PER_LOAD_NODE: itertools.cycle([endpoints]), - K6ProcessAllocationStrategy.PER_ENDPOINT: itertools.cycle( - [[endpoint] for endpoint in endpoints] - ), + K6ProcessAllocationStrategy.PER_ENDPOINT: itertools.cycle([[endpoint] for endpoint in endpoints]), } k6_processes_count = k6_distribution_count[load_params.k6_process_allocation_strategy] endpoints_gen = endpoints_generators[load_params.k6_process_allocation_strategy] - distributed_load_params_list = self._get_distributed_load_params_list( - load_params, k6_processes_count - ) + distributed_load_params_list = self._get_distributed_load_params_list(load_params, k6_processes_count) futures = parallel( self._init_k6_instance, @@ -168,9 +154,7 @@ class DefaultRunner(RunnerBase): ) self.k6_instances = [future.result() for future in futures] - def _init_k6_instance( - self, load_params_for_loader: LoadParams, loader: Loader, endpoints: list[str], k6_dir: str - ): + def _init_k6_instance(self, load_params_for_loader: LoadParams, loader: Loader, endpoints: list[str], k6_dir: str): shell = loader.get_shell() with reporter.step(f"Init K6 instance on {loader.ip} for endpoints {endpoints}"): with reporter.step(f"Make working directory"): @@ -208,9 +192,7 @@ class DefaultRunner(RunnerBase): and getattr(original_load_params, field.name) is not None ): original_value = getattr(original_load_params, field.name) - distribution = self._get_distribution( - math.ceil(original_value / divisor), workers_count - ) + distribution = self._get_distribution(math.ceil(original_value / divisor), workers_count) for i in range(workers_count): setattr(distributed_load_params[i], field.name, distribution[i]) @@ -237,10 +219,7 @@ class DefaultRunner(RunnerBase): # Remainder of clients left to be distributed remainder = clients_count - clients_per_worker * workers_count - distribution = [ - clients_per_worker + 1 if i < remainder else clients_per_worker - for i in range(workers_count) - ] + distribution = [clients_per_worker + 1 if i < remainder else clients_per_worker for i in range(workers_count)] return distribution def start(self): @@ -249,9 +228,7 @@ class DefaultRunner(RunnerBase): parallel([k6.start for k6 in self.k6_instances]) wait_after_start_time = datetime_utils.parse_time(load_params.setup_timeout) + 5 - with reporter.step( - f"Wait for start timeout + couple more seconds ({wait_after_start_time}) before moving on" - ): + with reporter.step(f"Wait for start timeout + couple more seconds ({wait_after_start_time}) before moving on"): time.sleep(wait_after_start_time) def stop(self): @@ -331,9 +308,7 @@ class LocalRunner(RunnerBase): with reporter.step("Update limits.conf"): limits_path = "/etc/security/limits.conf" self.file_keeper.add(cluster_node.storage_node, limits_path) - content = ( - f"{STORAGE_USER_NAME} hard nofile 65536\n{STORAGE_USER_NAME} soft nofile 65536\n" - ) + content = f"{STORAGE_USER_NAME} hard nofile 65536\n{STORAGE_USER_NAME} soft nofile 65536\n" shell.exec(f"echo '{content}' | sudo tee {limits_path}") with reporter.step("Download K6"): @@ -343,9 +318,7 @@ class LocalRunner(RunnerBase): shell.exec(f"sudo chmod -R 777 {k6_dir}") with reporter.step("Create empty_passwd"): - self.wallet = WalletInfo( - f"{k6_dir}/scenarios/files/wallet.json", "", "/tmp/empty_passwd.yml" - ) + self.wallet = WalletInfo(f"{k6_dir}/scenarios/files/wallet.json", "", "/tmp/empty_passwd.yml") content = yaml.dump({"password": ""}) shell.exec(f'echo "{content}" | sudo tee {self.wallet.config_path}') shell.exec(f"sudo chmod -R 777 {self.wallet.config_path}") @@ -387,15 +360,13 @@ class LocalRunner(RunnerBase): def start(self): load_params = self.k6_instances[0].load_params - self.cluster_state_controller.stop_all_s3_gates() - self.cluster_state_controller.stop_all_storage_services() + self.cluster_state_controller.stop_services_of_type(S3Gate) + self.cluster_state_controller.stop_services_of_type(StorageNode) parallel([k6.start for k6 in self.k6_instances]) wait_after_start_time = datetime_utils.parse_time(load_params.setup_timeout) + 5 - with reporter.step( - f"Wait for start timeout + couple more seconds ({wait_after_start_time}) before moving on" - ): + with reporter.step(f"Wait for start timeout + couple more seconds ({wait_after_start_time}) before moving on"): time.sleep(wait_after_start_time) @reporter.step_deco("Restore passwd on {cluster_node}") @@ -412,8 +383,7 @@ class LocalRunner(RunnerBase): for k6_instance in self.k6_instances: k6_instance.stop() - self.cluster_state_controller.start_stopped_storage_services() - self.cluster_state_controller.start_stopped_s3_gates() + self.cluster_state_controller.start_all_stopped_services() def get_results(self) -> dict: results = {} diff --git a/src/frostfs_testlib/storage/controllers/cluster_state_controller.py b/src/frostfs_testlib/storage/controllers/cluster_state_controller.py index e45aa14..aa5f151 100644 --- a/src/frostfs_testlib/storage/controllers/cluster_state_controller.py +++ b/src/frostfs_testlib/storage/controllers/cluster_state_controller.py @@ -158,7 +158,8 @@ class ClusterStateController: @reporter.step_deco("Wait for S3Gates reconnection to local storage") def wait_s3gates(self): online_s3gates = self._get_online(S3Gate) - parallel(self.wait_s3gate, online_s3gates) + if online_s3gates: + parallel(self.wait_s3gate, online_s3gates) @wait_for_success(600, 60) def wait_tree_healthcheck(self):