From cc35b2e6da4aee957a0e1d8540f4a0731e1a2d07 Mon Sep 17 00:00:00 2001 From: Andrey Berezin Date: Thu, 25 May 2023 23:09:07 +0300 Subject: [PATCH] Changes required to run multiple loads during one test Signed-off-by: Andrey Berezin --- src/frostfs_testlib/load/k6.py | 11 +- src/frostfs_testlib/load/load_report.py | 212 +++++++++--------- src/frostfs_testlib/load/load_steps.py | 17 +- src/frostfs_testlib/load/load_verifiers.py | 2 +- .../controllers/background_load_controller.py | 85 +++++-- .../controllers/cluster_state_controller.py | 28 ++- 6 files changed, 217 insertions(+), 138 deletions(-) diff --git a/src/frostfs_testlib/load/k6.py b/src/frostfs_testlib/load/k6.py index 9a8b1d9..2fa2c00 100644 --- a/src/frostfs_testlib/load/k6.py +++ b/src/frostfs_testlib/load/k6.py @@ -153,10 +153,6 @@ class K6: @reporter.step_deco("Start K6 on initiator") def start(self) -> None: - # Make working_dir directory - self.shell.exec(f"sudo mkdir -p {self.load_params.working_dir}") - self.shell.exec(f"sudo chown {LOAD_NODE_SSH_USER} {self.load_params.working_dir}") - command = ( f"{self._k6_dir}/k6 run {self._generate_env_variables()} " f"{self._k6_dir}/scenarios/{self.scenario.value}.js" @@ -170,13 +166,12 @@ class K6: assert "No k6 instances were executed" if k6_should_be_running: assert self._k6_process.running(), "k6 should be running." - while timeout >= 0: + while timeout > 0: if not self._k6_process.running(): return logger.info(f"K6 is running. Waiting {wait_interval} seconds...") - if timeout > 0: - sleep(wait_interval) - timeout -= wait_interval + sleep(wait_interval) + timeout -= wait_interval self.stop() raise TimeoutError(f"Expected K6 finished in {timeout} sec.") diff --git a/src/frostfs_testlib/load/load_report.py b/src/frostfs_testlib/load/load_report.py index 2771df5..c9c23c7 100644 --- a/src/frostfs_testlib/load/load_report.py +++ b/src/frostfs_testlib/load/load_report.py @@ -10,7 +10,8 @@ from frostfs_testlib.load.load_metrics import get_metrics_object class LoadReport: def __init__(self, load_test) -> None: self.load_test = load_test - self.load_summaries: Optional[dict] = None + # List of load summaries dict + self.load_summaries_list: Optional[list[dict]] = [] self.load_params: Optional[LoadParams] = None self.start_time: Optional[datetime] = None self.end_time: Optional[datetime] = None @@ -21,8 +22,8 @@ class LoadReport: def set_end_time(self): self.end_time = datetime.utcnow() - def set_summaries(self, load_summaries: dict): - self.load_summaries = load_summaries + def add_summaries(self, load_summaries: dict): + self.load_summaries_list.append(load_summaries) def set_load_params(self, load_params: LoadParams): self.load_params = load_params @@ -30,7 +31,7 @@ class LoadReport: def get_report_html(self): report_sections = [ [self.load_test, self._get_load_params_section_html], - [self.load_summaries, self._get_totals_section_html], + [self.load_summaries_list, self._get_totals_section_html], [self.end_time, self._get_test_time_html], ] @@ -156,110 +157,113 @@ class LoadReport: return html def _get_totals_section_html(self): + html = "" + for i, load_summaries in enumerate(self.load_summaries_list, 1): + html += f"

Load Results for load #{i}

" - html = "

Load Results

" - - write_operations = 0 - write_op_sec = 0 - write_throughput = 0 - write_errors = {} - requested_write_rate = self.load_params.write_rate - requested_write_rate_str = f"{requested_write_rate}op/sec" if requested_write_rate else "" - - read_operations = 0 - read_op_sec = 0 - read_throughput = 0 - read_errors = {} - requested_read_rate = self.load_params.read_rate - requested_read_rate_str = f"{requested_read_rate}op/sec" if requested_read_rate else "" - - delete_operations = 0 - delete_op_sec = 0 - delete_errors = {} - requested_delete_rate = self.load_params.delete_rate - requested_delete_rate_str = ( - f"{requested_delete_rate}op/sec" if requested_delete_rate else "" - ) - - if self.load_params.scenario in [LoadScenario.gRPC_CAR, LoadScenario.S3_CAR]: - delete_vus = max( - self.load_params.preallocated_deleters or 0, self.load_params.max_deleters or 0 - ) - write_vus = max( - self.load_params.preallocated_writers or 0, self.load_params.max_writers or 0 - ) - read_vus = max( - self.load_params.preallocated_readers or 0, self.load_params.max_readers or 0 - ) - else: - write_vus = self.load_params.writers - read_vus = self.load_params.readers - delete_vus = self.load_params.deleters - - write_vus_str = f"{write_vus}th" - read_vus_str = f"{read_vus}th" - delete_vus_str = f"{delete_vus}th" - - write_section_required = False - read_section_required = False - delete_section_required = False - - for node_key, load_summary in self.load_summaries.items(): - metrics = get_metrics_object(self.load_params.scenario, load_summary) - write_operations += metrics.write_total_iterations - if write_operations: - write_section_required = True - write_op_sec += metrics.write_rate - write_throughput += metrics.write_throughput - if metrics.write_failed_iterations: - write_errors[node_key] = metrics.write_failed_iterations - - read_operations += metrics.read_total_iterations - if read_operations: - read_section_required = True - read_op_sec += metrics.read_rate - read_throughput += metrics.read_throughput - if metrics.read_failed_iterations: - read_errors[node_key] = metrics.read_failed_iterations - - delete_operations += metrics.delete_total_iterations - if delete_operations: - delete_section_required = True - delete_op_sec += metrics.delete_rate - if metrics.delete_failed_iterations: - delete_errors[node_key] = metrics.delete_failed_iterations - - if write_section_required: - html += self._get_oprations_sub_section_html( - "Write", - write_operations, - requested_write_rate_str, - write_vus_str, - write_op_sec, - write_throughput, - write_errors, + write_operations = 0 + write_op_sec = 0 + write_throughput = 0 + write_errors = {} + requested_write_rate = self.load_params.write_rate + requested_write_rate_str = ( + f"{requested_write_rate}op/sec" if requested_write_rate else "" ) - if read_section_required: - html += self._get_oprations_sub_section_html( - "Read", - read_operations, - requested_read_rate_str, - read_vus_str, - read_op_sec, - read_throughput, - read_errors, + read_operations = 0 + read_op_sec = 0 + read_throughput = 0 + read_errors = {} + requested_read_rate = self.load_params.read_rate + requested_read_rate_str = f"{requested_read_rate}op/sec" if requested_read_rate else "" + + delete_operations = 0 + delete_op_sec = 0 + delete_errors = {} + requested_delete_rate = self.load_params.delete_rate + requested_delete_rate_str = ( + f"{requested_delete_rate}op/sec" if requested_delete_rate else "" ) - if delete_section_required: - html += self._get_oprations_sub_section_html( - "Delete", - delete_operations, - requested_delete_rate_str, - delete_vus_str, - delete_op_sec, - 0, - delete_errors, - ) + if self.load_params.scenario in [LoadScenario.gRPC_CAR, LoadScenario.S3_CAR]: + delete_vus = max( + self.load_params.preallocated_deleters or 0, self.load_params.max_deleters or 0 + ) + write_vus = max( + self.load_params.preallocated_writers or 0, self.load_params.max_writers or 0 + ) + read_vus = max( + self.load_params.preallocated_readers or 0, self.load_params.max_readers or 0 + ) + else: + write_vus = self.load_params.writers + read_vus = self.load_params.readers + delete_vus = self.load_params.deleters + + write_vus_str = f"{write_vus}th" + read_vus_str = f"{read_vus}th" + delete_vus_str = f"{delete_vus}th" + + write_section_required = False + read_section_required = False + delete_section_required = False + + for node_key, load_summary in load_summaries.items(): + metrics = get_metrics_object(self.load_params.scenario, load_summary) + write_operations += metrics.write_total_iterations + if write_operations: + write_section_required = True + write_op_sec += metrics.write_rate + write_throughput += metrics.write_throughput + if metrics.write_failed_iterations: + write_errors[node_key] = metrics.write_failed_iterations + + read_operations += metrics.read_total_iterations + if read_operations: + read_section_required = True + read_op_sec += metrics.read_rate + read_throughput += metrics.read_throughput + if metrics.read_failed_iterations: + read_errors[node_key] = metrics.read_failed_iterations + + delete_operations += metrics.delete_total_iterations + if delete_operations: + delete_section_required = True + delete_op_sec += metrics.delete_rate + if metrics.delete_failed_iterations: + delete_errors[node_key] = metrics.delete_failed_iterations + + if write_section_required: + html += self._get_oprations_sub_section_html( + "Write", + write_operations, + requested_write_rate_str, + write_vus_str, + write_op_sec, + write_throughput, + write_errors, + ) + + if read_section_required: + html += self._get_oprations_sub_section_html( + "Read", + read_operations, + requested_read_rate_str, + read_vus_str, + read_op_sec, + read_throughput, + read_errors, + ) + + if delete_section_required: + html += self._get_oprations_sub_section_html( + "Delete", + delete_operations, + requested_delete_rate_str, + delete_vus_str, + delete_op_sec, + 0, + delete_errors, + ) return html diff --git a/src/frostfs_testlib/load/load_steps.py b/src/frostfs_testlib/load/load_steps.py index 5d935aa..b55ff22 100644 --- a/src/frostfs_testlib/load/load_steps.py +++ b/src/frostfs_testlib/load/load_steps.py @@ -9,7 +9,10 @@ from frostfs_testlib.load.k6 import K6 from frostfs_testlib.load.load_config import K6ProcessAllocationStrategy, LoadParams from frostfs_testlib.reporter import get_reporter from frostfs_testlib.resources.cli import FROSTFS_AUTHMATE_EXEC -from frostfs_testlib.resources.load_params import BACKGROUND_LOAD_VUS_COUNT_DIVISOR +from frostfs_testlib.resources.load_params import ( + BACKGROUND_LOAD_VUS_COUNT_DIVISOR, + LOAD_NODE_SSH_USER, +) from frostfs_testlib.shell import CommandOptions, SSHShell from frostfs_testlib.shell.interfaces import InteractiveInput, SshCredentials from frostfs_testlib.storage.cluster import ClusterNode @@ -35,7 +38,7 @@ def init_s3_client( grpc_peer = storage_node.get_rpc_endpoint() for load_node in load_nodes: - ssh_client = _get_ssh_client(ssh_credentials, load_node) + ssh_client = _get_shell(ssh_credentials, load_node) frostfs_authmate_exec: FrostfsAuthmate = FrostfsAuthmate(ssh_client, FROSTFS_AUTHMATE_EXEC) issue_secret_output = frostfs_authmate_exec.secret.issue( wallet=wallet.path, @@ -99,12 +102,16 @@ def prepare_k6_instances( for distributed_load_params in distributed_load_params_list: load_node = next(nodes) - ssh_client = _get_ssh_client(ssh_credentials, load_node) + shell = _get_shell(ssh_credentials, load_node) + # Make working_dir directory + shell.exec(f"sudo mkdir -p {distributed_load_params.working_dir}") + shell.exec(f"sudo chown {LOAD_NODE_SSH_USER} {distributed_load_params.working_dir}") + k6_load_object = K6( distributed_load_params, next(endpoints_gen), k6_dir, - ssh_client, + shell, load_node, loaders_wallet, ) @@ -115,7 +122,7 @@ def prepare_k6_instances( return k6_load_objects -def _get_ssh_client(ssh_credentials: SshCredentials, load_node: str): +def _get_shell(ssh_credentials: SshCredentials, load_node: str) -> SSHShell: ssh_client = SSHShell( host=load_node, login=ssh_credentials.ssh_login, diff --git a/src/frostfs_testlib/load/load_verifiers.py b/src/frostfs_testlib/load/load_verifiers.py index 69e9f1f..1ff63ae 100644 --- a/src/frostfs_testlib/load/load_verifiers.py +++ b/src/frostfs_testlib/load/load_verifiers.py @@ -57,7 +57,7 @@ class LoadVerifier: # Due to interruptions we may see total verified objects to be less than written on writers count if abs(objects_count - verified_objects) > writers: exceptions.append( - f"Verified objects is less than total objects. Total: {objects_count}, Verified: {verified_objects}. Writers: {writers}." + f"Verified objects mismatch. Total: {objects_count}, Verified: {verified_objects}. Writers: {writers}." ) assert not exceptions, "\n".join(exceptions) diff --git a/src/frostfs_testlib/storage/controllers/background_load_controller.py b/src/frostfs_testlib/storage/controllers/background_load_controller.py index f9cf0e5..a2336be 100644 --- a/src/frostfs_testlib/storage/controllers/background_load_controller.py +++ b/src/frostfs_testlib/storage/controllers/background_load_controller.py @@ -1,3 +1,4 @@ +import copy import time import frostfs_testlib.resources.optionals as optionals @@ -9,7 +10,9 @@ from frostfs_testlib.load.load_config import ( LoadScenario, LoadType, ) +from frostfs_testlib.load.load_report import LoadReport from frostfs_testlib.load.load_steps import init_s3_client, prepare_k6_instances +from frostfs_testlib.load.load_verifiers import LoadVerifier from frostfs_testlib.reporter import get_reporter from frostfs_testlib.resources.load_params import ( K6_TEARDOWN_PERIOD, @@ -33,11 +36,14 @@ class BackgroundLoadController: k6_instances: list[K6] k6_dir: str load_params: LoadParams + original_load_params: LoadParams load_nodes: list[str] verification_params: LoadParams nodes_under_load: list[ClusterNode] + load_counter: int ssh_credentials: SshCredentials loaders_wallet: WalletInfo + load_summaries: dict endpoints: list[str] def __init__( @@ -48,8 +54,10 @@ class BackgroundLoadController: nodes_under_load: list[ClusterNode], ) -> None: self.k6_dir = k6_dir - self.load_params = load_params + self.original_load_params = load_params + self.load_params = copy.deepcopy(self.original_load_params) self.nodes_under_load = nodes_under_load + self.load_counter = 1 self.load_nodes = LOAD_NODES self.loaders_wallet = loaders_wallet @@ -59,17 +67,7 @@ class BackgroundLoadController: self.endpoints = self._get_endpoints( load_params.load_type, load_params.endpoint_selection_strategy ) - self.verification_params = LoadParams( - verify_clients=load_params.verify_clients, - scenario=LoadScenario.VERIFY, - registry_file=load_params.registry_file, - verify_time=load_params.verify_time, - load_type=load_params.load_type, - load_id=load_params.load_id, - working_dir=load_params.working_dir, - endpoint_selection_strategy=load_params.endpoint_selection_strategy, - k6_process_allocation_strategy=load_params.k6_process_allocation_strategy, - ) + self.ssh_credentials = SshCredentials( LOAD_NODE_SSH_USER, LOAD_NODE_SSH_PASSWORD, @@ -179,6 +177,66 @@ class BackgroundLoadController: return True + @run_optionally(optionals.OPTIONAL_BACKGROUND_LOAD_ENABLED) + @reporter.step_deco("Reset background load") + def _reset_for_consequent_load(self): + """This method is required if we want to run multiple loads during test run. + Raise load counter by 1 and append it to load_id + """ + self.load_counter += 1 + self.load_params = copy.deepcopy(self.original_load_params) + self.load_params.set_id(f"{self.load_params.load_id}_{self.load_counter}") + + @run_optionally(optionals.OPTIONAL_BACKGROUND_LOAD_ENABLED) + @reporter.step_deco("Startup background load") + def startup(self): + self.prepare() + self.start() + + @run_optionally(optionals.OPTIONAL_BACKGROUND_LOAD_ENABLED) + @reporter.step_deco("Stop and get results of background load") + def teardown(self, load_report: LoadReport = None): + if not self.k6_instances: + return + + self.stop() + self.load_summaries = self.get_results() + self.k6_instances = [] + if load_report: + load_report.add_summaries(self.load_summaries) + + @run_optionally(optionals.OPTIONAL_BACKGROUND_LOAD_ENABLED) + @reporter.step_deco("Verify results of background load") + def verify(self): + try: + if self.load_params.verify: + self.verification_params = LoadParams( + verify_clients=self.load_params.verify_clients, + scenario=LoadScenario.VERIFY, + registry_file=self.load_params.registry_file, + verify_time=self.load_params.verify_time, + load_type=self.load_params.load_type, + load_id=self.load_params.load_id, + working_dir=self.load_params.working_dir, + endpoint_selection_strategy=self.load_params.endpoint_selection_strategy, + k6_process_allocation_strategy=self.load_params.k6_process_allocation_strategy, + ) + self._run_verify_scenario() + verification_summaries = self.get_results() + self.verify_summaries(self.load_summaries, verification_summaries) + finally: + self._reset_for_consequent_load() + + @run_optionally(optionals.OPTIONAL_BACKGROUND_LOAD_ENABLED) + @reporter.step_deco("Verify summaries from k6") + def verify_summaries(self, load_summaries: dict, verification_summaries: dict): + verifier = LoadVerifier(self.load_params) + for node_or_endpoint in load_summaries: + with reporter.step(f"Verify load summaries for {node_or_endpoint}"): + verifier.verify_summaries( + load_summaries[node_or_endpoint], verification_summaries[node_or_endpoint] + ) + @run_optionally(optionals.OPTIONAL_BACKGROUND_LOAD_ENABLED) def wait_until_finish(self): if self.load_params.load_time is None: @@ -188,7 +246,8 @@ class BackgroundLoadController: k6_instance.wait_until_finished(self.load_params.load_time + int(K6_TEARDOWN_PERIOD)) @run_optionally(optionals.OPTIONAL_BACKGROUND_LOAD_ENABLED) - def verify(self): + @reporter.step_deco("Run verify scenario for background load") + def _run_verify_scenario(self): if self.verification_params.verify_time is None: raise RuntimeError("verify_time should not be none") diff --git a/src/frostfs_testlib/storage/controllers/cluster_state_controller.py b/src/frostfs_testlib/storage/controllers/cluster_state_controller.py index 35072f2..34f027f 100644 --- a/src/frostfs_testlib/storage/controllers/cluster_state_controller.py +++ b/src/frostfs_testlib/storage/controllers/cluster_state_controller.py @@ -1,7 +1,5 @@ import time -import allure - import frostfs_testlib.resources.optionals as optionals from frostfs_testlib.reporter import get_reporter from frostfs_testlib.shell import CommandOptions, Shell @@ -30,15 +28,29 @@ class ClusterStateController: @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @reporter.step_deco("Stop host of node {node}") def stop_node_host(self, node: ClusterNode, mode: str): - with allure.step(f"Stop host {node.host.config.address}"): + with reporter.step(f"Stop host {node.host.config.address}"): node.host.stop_host(mode=mode) wait_for_host_offline(self.shell, node.storage_node) self.stopped_nodes.append(node) + @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) + @reporter.step_deco("Shutdown whole cluster") + def shutdown_cluster(self, mode: str, reversed_order: bool = False): + nodes = ( + reversed(self.cluster.cluster_nodes) if reversed_order else self.cluster.cluster_nodes + ) + for node in nodes: + with reporter.step(f"Stop host {node.host.config.address}"): + self.stopped_nodes.append(node) + node.host.stop_host(mode=mode) + + for node in nodes: + wait_for_host_offline(self.shell, node.storage_node) + @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @reporter.step_deco("Start host of node {node}") def start_node_host(self, node: ClusterNode): - with allure.step(f"Start host {node.host.config.address}"): + with reporter.step(f"Start host {node.host.config.address}"): node.host.start_host() wait_for_host_online(self.shell, node.storage_node) wait_for_node_online(node.storage_node) @@ -46,9 +58,11 @@ class ClusterStateController: @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @reporter.step_deco("Start stopped hosts") - def start_stopped_hosts(self): - for node in self.stopped_nodes: - node.host.start_host() + def start_stopped_hosts(self, reversed_order: bool = False): + nodes = reversed(self.stopped_nodes) if reversed_order else self.stopped_nodes + for node in nodes: + with reporter.step(f"Start host {node.host.config.address}"): + node.host.start_host() self.stopped_nodes = [] wait_all_storage_nodes_returned(self.shell, self.cluster)