From 81dfc723dae06b4ef4a2d87a1f0805651e950966 Mon Sep 17 00:00:00 2001 From: Andrey Berezin Date: Mon, 4 Dec 2023 17:59:29 +0300 Subject: [PATCH] [#137] Ability to control remote processes id and reports for load Signed-off-by: Andrey Berezin --- src/frostfs_testlib/load/k6.py | 33 ++++--- src/frostfs_testlib/load/load_report.py | 59 ++++++------- .../processes/remote_process.py | 86 ++++++++++++++++--- .../controllers/background_load_controller.py | 34 +++++++- .../controllers/cluster_state_controller.py | 10 ++- 5 files changed, 159 insertions(+), 63 deletions(-) diff --git a/src/frostfs_testlib/load/k6.py b/src/frostfs_testlib/load/k6.py index 92da8e0..2ce7c75 100644 --- a/src/frostfs_testlib/load/k6.py +++ b/src/frostfs_testlib/load/k6.py @@ -34,7 +34,6 @@ class LoadResults: class K6: _k6_process: RemoteProcess - _start_time: datetime def __init__( self, @@ -61,6 +60,18 @@ class K6: self._k6_dir: str = k6_dir + command = ( + f"{self._k6_dir}/k6 run {self._generate_env_variables()} " + f"{self._k6_dir}/scenarios/{self.load_params.scenario.value}.js" + ) + user = STORAGE_USER_NAME if self.load_params.scenario == LoadScenario.LOCAL else None + process_id = ( + self.load_params.load_id + if self.load_params.scenario != LoadScenario.VERIFY + else f"{self.load_params.load_id}_verify" + ) + self._k6_process = RemoteProcess.create(command, self.shell, self.load_params.working_dir, user, process_id) + @property def process_dir(self) -> str: return self._k6_process.process_dir @@ -111,15 +122,15 @@ class K6: reporter.attach("\n".join(f"{param}: {value}" for param, value in env_vars.items()), "K6 ENV variables") return " ".join([f"-e {param}='{value}'" for param, value in env_vars.items() if value is not None]) + def get_start_time(self) -> datetime: + return datetime.fromtimestamp(self._k6_process.start_time()) + + def get_end_time(self) -> datetime: + return datetime.fromtimestamp(self._k6_process.end_time()) + def start(self) -> None: with reporter.step(f"Start load from loader {self.loader.ip} on endpoints {self.endpoints}"): - self._start_time = int(datetime.utcnow().timestamp()) - command = ( - f"{self._k6_dir}/k6 run {self._generate_env_variables()} " - f"{self._k6_dir}/scenarios/{self.load_params.scenario.value}.js" - ) - user = STORAGE_USER_NAME if self.load_params.scenario == LoadScenario.LOCAL else None - self._k6_process = RemoteProcess.create(command, self.shell, self.load_params.working_dir, user) + self._k6_process.start() def wait_until_finished(self, soft_timeout: int = 0) -> None: with reporter.step(f"Wait until load is finished from loader {self.loader.ip} on endpoints {self.endpoints}"): @@ -128,8 +139,10 @@ class K6: else: timeout = self.load_params.load_time or 0 + start_time = int(self.get_start_time().timestamp()) + current_time = int(datetime.utcnow().timestamp()) - working_time = current_time - self._start_time + working_time = current_time - start_time remaining_time = timeout - working_time setup_teardown_time = ( @@ -146,7 +159,7 @@ class K6: original_timeout = timeout timeouts = { - "K6 start time": self._start_time, + "K6 start time": start_time, "Current time": current_time, "K6 working time": working_time, "Remaining time for load": remaining_time, diff --git a/src/frostfs_testlib/load/load_report.py b/src/frostfs_testlib/load/load_report.py index ad3a26d..105d852 100644 --- a/src/frostfs_testlib/load/load_report.py +++ b/src/frostfs_testlib/load/load_report.py @@ -17,11 +17,15 @@ class LoadReport: self.start_time: Optional[datetime] = None self.end_time: Optional[datetime] = None - def set_start_time(self): - self.start_time = datetime.utcnow() + def set_start_time(self, time: datetime = None): + if time is None: + time = datetime.utcnow() + self.start_time = time - def set_end_time(self): - self.end_time = datetime.utcnow() + def set_end_time(self, time: datetime = None): + if time is None: + time = datetime.utcnow() + self.end_time = time def add_summaries(self, load_summaries: dict): self.load_summaries_list.append(load_summaries) @@ -31,6 +35,7 @@ class LoadReport: def get_report_html(self): report_sections = [ + [self.load_params, self._get_load_id_section_html], [self.load_test, self._get_load_params_section_html], [self.load_summaries_list, self._get_totals_section_html], [self.end_time, self._get_test_time_html], @@ -44,9 +49,7 @@ class LoadReport: return html def _get_load_params_section_html(self) -> str: - params: str = yaml.safe_dump( - [self.load_test], sort_keys=False, indent=2, explicit_start=True - ) + params: str = yaml.safe_dump([self.load_test], sort_keys=False, indent=2, explicit_start=True) params = params.replace("\n", "
").replace(" ", " ") section_html = f"""

Scenario params

@@ -55,8 +58,17 @@ class LoadReport: return section_html + def _get_load_id_section_html(self) -> str: + section_html = f"""

Load ID: {self.load_params.load_id}

+
""" + + return section_html + def _get_test_time_html(self) -> str: - html = f"""

Scenario duration in UTC time (from agent)

+ if not self.start_time or not self.end_time: + return "" + + html = f"""

Scenario duration

{self.start_time} - {self.end_time}

""" @@ -97,7 +109,7 @@ class LoadReport: LoadScenario.gRPC_CAR: "open model", LoadScenario.S3_CAR: "open model", LoadScenario.LOCAL: "local fill", - LoadScenario.S3_LOCAL: "local fill" + LoadScenario.S3_LOCAL: "local fill", } return model_map[self.load_params.scenario] @@ -124,10 +136,7 @@ class LoadReport: total_errors: int = 0 for node_key, errors in errors.items(): total_errors += errors - if ( - self.load_params.k6_process_allocation_strategy - == K6ProcessAllocationStrategy.PER_ENDPOINT - ): + if self.load_params.k6_process_allocation_strategy == K6ProcessAllocationStrategy.PER_ENDPOINT: per_node_errors_html += self._row(f"At {node_key}", errors) latency_html = "" @@ -139,9 +148,7 @@ class LoadReport: for param_name, param_val in latency_dict.items(): latency_values += f"{param_name}={param_val:.2f}ms " - latency_html += self._row( - f"{operation_type} latency {node_key.split(':')[0]}", latency_values - ) + latency_html += self._row(f"{operation_type} latency {node_key.split(':')[0]}", latency_values) object_size, object_size_unit = calc_unit(self.load_params.object_size, 1) duration = self._seconds_to_formatted_duration(self.load_params.load_time) @@ -180,9 +187,7 @@ class LoadReport: write_latency = {} write_errors = {} requested_write_rate = self.load_params.write_rate - requested_write_rate_str = ( - f"{requested_write_rate}op/sec" if requested_write_rate else "" - ) + requested_write_rate_str = f"{requested_write_rate}op/sec" if requested_write_rate else "" read_operations = 0 read_op_sec = 0 @@ -197,20 +202,12 @@ class LoadReport: delete_latency = {} delete_errors = {} requested_delete_rate = self.load_params.delete_rate - requested_delete_rate_str = ( - f"{requested_delete_rate}op/sec" if requested_delete_rate else "" - ) + requested_delete_rate_str = f"{requested_delete_rate}op/sec" if requested_delete_rate else "" if self.load_params.scenario in [LoadScenario.gRPC_CAR, LoadScenario.S3_CAR]: - delete_vus = max( - self.load_params.preallocated_deleters or 0, self.load_params.max_deleters or 0 - ) - write_vus = max( - self.load_params.preallocated_writers or 0, self.load_params.max_writers or 0 - ) - read_vus = max( - self.load_params.preallocated_readers or 0, self.load_params.max_readers or 0 - ) + delete_vus = max(self.load_params.preallocated_deleters or 0, self.load_params.max_deleters or 0) + write_vus = max(self.load_params.preallocated_writers or 0, self.load_params.max_writers or 0) + read_vus = max(self.load_params.preallocated_readers or 0, self.load_params.max_readers or 0) else: write_vus = self.load_params.writers read_vus = self.load_params.readers diff --git a/src/frostfs_testlib/processes/remote_process.py b/src/frostfs_testlib/processes/remote_process.py index 1252b97..5624940 100644 --- a/src/frostfs_testlib/processes/remote_process.py +++ b/src/frostfs_testlib/processes/remote_process.py @@ -15,21 +15,33 @@ from frostfs_testlib.shell.interfaces import CommandInspector, CommandOptions class RemoteProcess: - def __init__(self, cmd: str, process_dir: str, shell: Shell, cmd_inspector: Optional[CommandInspector]): + def __init__( + self, cmd: str, process_dir: str, shell: Shell, cmd_inspector: Optional[CommandInspector], proc_id: str + ): self.process_dir = process_dir self.cmd = cmd self.stdout_last_line_number = 0 self.stderr_last_line_number = 0 self.pid: Optional[str] = None self.proc_rc: Optional[int] = None + self.proc_start_time: Optional[int] = None + self.proc_end_time: Optional[int] = None self.saved_stdout: Optional[str] = None self.saved_stderr: Optional[str] = None self.shell = shell + self.proc_id: str = proc_id self.cmd_inspectors: list[CommandInspector] = [cmd_inspector] if cmd_inspector else [] @classmethod @reporter.step("Create remote process") - def create(cls, command: str, shell: Shell, working_dir: str = "/tmp", user: Optional[str] = None) -> RemoteProcess: + def create( + cls, + command: str, + shell: Shell, + working_dir: str = "/tmp", + user: Optional[str] = None, + proc_id: Optional[str] = None, + ) -> RemoteProcess: """ Create a process on a remote host. @@ -40,6 +52,7 @@ class RemoteProcess: stderr: contains script errors stdout: contains script output user: user on behalf whom command will be executed + proc_id: process string identificator Args: shell: Shell instance @@ -49,19 +62,31 @@ class RemoteProcess: Returns: RemoteProcess instance for further examination """ + if proc_id is None: + proc_id = f"{uuid.uuid4()}" + cmd_inspector = SuInspector(user) if user else None remote_process = cls( cmd=command, - process_dir=os.path.join(working_dir, f"proc_{uuid.uuid4()}"), + process_dir=os.path.join(working_dir, f"proc_{proc_id}"), shell=shell, cmd_inspector=cmd_inspector, + proc_id=proc_id, ) - remote_process._create_process_dir() - remote_process._generate_command_script(command) - remote_process._start_process() - remote_process.pid = remote_process._get_pid() + return remote_process + @reporter.step("Start remote process") + def start(self): + """ + Starts a process on a remote host. + """ + + self._create_process_dir() + self._generate_command_script() + self._start_process() + self.pid = self._get_pid() + @reporter.step("Get process stdout") def stdout(self, full: bool = False) -> str: """ @@ -130,17 +155,48 @@ class RemoteProcess: if self.proc_rc is not None: return self.proc_rc + result = self._cat_proc_file("rc") + if not result: + return None + + self.proc_rc = int(result) + return self.proc_rc + + @reporter.step("Get process start time") + def start_time(self) -> Optional[int]: + if self.proc_start_time is not None: + return self.proc_start_time + + result = self._cat_proc_file("start_time") + if not result: + return None + + self.proc_start_time = int(result) + return self.proc_start_time + + @reporter.step("Get process end time") + def end_time(self) -> Optional[int]: + if self.proc_end_time is not None: + return self.proc_end_time + + result = self._cat_proc_file("end_time") + if not result: + return None + + self.proc_end_time = int(result) + return self.proc_end_time + + def _cat_proc_file(self, file: str) -> Optional[str]: terminal = self.shell.exec( - f"cat {self.process_dir}/rc", + f"cat {self.process_dir}/{file}", CommandOptions(check=False, extra_inspectors=self.cmd_inspectors, no_log=True), ) if "No such file or directory" in terminal.stderr: return None elif terminal.stderr or terminal.return_code != 0: - raise AssertionError(f"cat process rc was not successful: {terminal.stderr}") + raise AssertionError(f"cat process {file} was not successful: {terminal.stderr}") - self.proc_rc = int(terminal.stdout) - return self.proc_rc + return terminal.stdout @reporter.step("Check if process is running") def running(self) -> bool: @@ -195,17 +251,19 @@ class RemoteProcess: return terminal.stdout.strip() @reporter.step("Generate command script") - def _generate_command_script(self, command: str) -> None: - command = command.replace('"', '\\"').replace("\\", "\\\\") + def _generate_command_script(self) -> None: + command = self.cmd.replace('"', '\\"').replace("\\", "\\\\") script = ( f"#!/bin/bash\n" f"cd {self.process_dir}\n" + f"date +%s > {self.process_dir}/start_time\n" f"{command} &\n" f"pid=\$!\n" f"cd {self.process_dir}\n" f"echo \$pid > {self.process_dir}/pid\n" f"wait \$pid\n" - f"echo $? > {self.process_dir}/rc" + f"echo $? > {self.process_dir}/rc\n" + f"date +%s > {self.process_dir}/end_time\n" ) self.shell.exec( diff --git a/src/frostfs_testlib/storage/controllers/background_load_controller.py b/src/frostfs_testlib/storage/controllers/background_load_controller.py index 003bb6b..5f2ed99 100644 --- a/src/frostfs_testlib/storage/controllers/background_load_controller.py +++ b/src/frostfs_testlib/storage/controllers/background_load_controller.py @@ -1,4 +1,5 @@ import copy +from datetime import datetime from typing import Optional import frostfs_testlib.resources.optionals as optionals @@ -10,6 +11,7 @@ from frostfs_testlib.load.load_verifiers import LoadVerifier from frostfs_testlib.storage.cluster import ClusterNode from frostfs_testlib.storage.dataclasses.frostfs_services import S3Gate, StorageNode from frostfs_testlib.storage.dataclasses.wallet import WalletInfo +from frostfs_testlib.testing.parallel import parallel from frostfs_testlib.testing.test_control import run_optionally @@ -26,6 +28,7 @@ class BackgroundLoadController: endpoints: list[str] runner: ScenarioRunner started: bool + load_reporters: list[LoadReport] def __init__( self, @@ -45,6 +48,7 @@ class BackgroundLoadController: self.loaders_wallet = loaders_wallet self.runner = runner self.started = False + self.load_reporters = [] if load_params.endpoint_selection_strategy is None: raise RuntimeError("endpoint_selection_strategy should not be None") @@ -83,12 +87,20 @@ class BackgroundLoadController: return all_endpoints[load_type][endpoint_selection_strategy] + @run_optionally(optionals.OPTIONAL_BACKGROUND_LOAD_ENABLED) + @reporter.step("Init k6 instances") + def init_k6(self): + self.endpoints = self._get_endpoints(self.load_params.load_type, self.load_params.endpoint_selection_strategy) + self.runner.init_k6_instances(self.load_params, self.endpoints, self.k6_dir) + @run_optionally(optionals.OPTIONAL_BACKGROUND_LOAD_ENABLED) @reporter.step("Prepare load instances") def prepare(self): - self.endpoints = self._get_endpoints(self.load_params.load_type, self.load_params.endpoint_selection_strategy) self.runner.prepare(self.load_params, self.cluster_nodes, self.nodes_under_load, self.k6_dir) - self.runner.init_k6_instances(self.load_params, self.endpoints, self.k6_dir) + self.init_k6() + + def append_reporter(self, load_report: LoadReport): + self.load_reporters.append(load_report) @run_optionally(optionals.OPTIONAL_BACKGROUND_LOAD_ENABLED) def start(self): @@ -128,16 +140,30 @@ class BackgroundLoadController: @run_optionally(optionals.OPTIONAL_BACKGROUND_LOAD_ENABLED) @reporter.step("Stop and get results of load") - def teardown(self, load_report: Optional[LoadReport] = None): + def teardown(self): if not self.started: return self.stop() self.load_summaries = self._get_results() self.started = False - if load_report: + + start_time = min(self._get_start_times()) + end_time = max(self._get_end_times()) + + for load_report in self.load_reporters: + load_report.set_start_time(start_time) + load_report.set_end_time(end_time) load_report.add_summaries(self.load_summaries) + def _get_start_times(self) -> list[datetime]: + futures = parallel([k6.get_start_time for k6 in self.runner.get_k6_instances()]) + return [future.result() for future in futures] + + def _get_end_times(self) -> list[datetime]: + futures = parallel([k6.get_end_time for k6 in self.runner.get_k6_instances()]) + return [future.result() for future in futures] + @run_optionally(optionals.OPTIONAL_BACKGROUND_LOAD_ENABLED) @reporter.step("Run post-load verification") def verify(self): diff --git a/src/frostfs_testlib/storage/controllers/cluster_state_controller.py b/src/frostfs_testlib/storage/controllers/cluster_state_controller.py index 35ab6c1..301b636 100644 --- a/src/frostfs_testlib/storage/controllers/cluster_state_controller.py +++ b/src/frostfs_testlib/storage/controllers/cluster_state_controller.py @@ -540,8 +540,9 @@ class ClusterStateController: options = CommandOptions(check=False) return self.shell.exec(f"ping {node.host.config.address} -c 1", options).return_code - @retry(max_attempts=60, sleep_interval=5, expected_result=HostStatus.ONLINE) - @reporter.step("Waiting for {node} to go online") + @retry( + max_attempts=60, sleep_interval=10, expected_result=HostStatus.ONLINE, title="Waiting for {node} to go online" + ) def _wait_for_host_online(self, node: ClusterNode): try: ping_result = self._ping_host(node) @@ -552,8 +553,9 @@ class ClusterStateController: logger.warning(f"Host ping fails with error {err}") return HostStatus.OFFLINE - @retry(max_attempts=60, sleep_interval=5, expected_result=HostStatus.OFFLINE) - @reporter.step("Waiting for {node} to go offline") + @retry( + max_attempts=60, sleep_interval=10, expected_result=HostStatus.OFFLINE, title="Waiting for {node} to go offline" + ) def _wait_for_host_offline(self, node: ClusterNode): try: ping_result = self._ping_host(node)