Changes required to run multiple loads during one test

Signed-off-by: Andrey Berezin <a.berezin@yadro.com>
This commit is contained in:
Andrey Berezin 2023-05-25 23:09:07 +03:00
parent 123b5425a8
commit cc35b2e6da
6 changed files with 217 additions and 138 deletions

View file

@ -153,10 +153,6 @@ class K6:
@reporter.step_deco("Start K6 on initiator") @reporter.step_deco("Start K6 on initiator")
def start(self) -> None: def start(self) -> None:
# Make working_dir directory
self.shell.exec(f"sudo mkdir -p {self.load_params.working_dir}")
self.shell.exec(f"sudo chown {LOAD_NODE_SSH_USER} {self.load_params.working_dir}")
command = ( command = (
f"{self._k6_dir}/k6 run {self._generate_env_variables()} " f"{self._k6_dir}/k6 run {self._generate_env_variables()} "
f"{self._k6_dir}/scenarios/{self.scenario.value}.js" f"{self._k6_dir}/scenarios/{self.scenario.value}.js"
@ -170,13 +166,12 @@ class K6:
assert "No k6 instances were executed" assert "No k6 instances were executed"
if k6_should_be_running: if k6_should_be_running:
assert self._k6_process.running(), "k6 should be running." assert self._k6_process.running(), "k6 should be running."
while timeout >= 0: while timeout > 0:
if not self._k6_process.running(): if not self._k6_process.running():
return return
logger.info(f"K6 is running. Waiting {wait_interval} seconds...") logger.info(f"K6 is running. Waiting {wait_interval} seconds...")
if timeout > 0: sleep(wait_interval)
sleep(wait_interval) timeout -= wait_interval
timeout -= wait_interval
self.stop() self.stop()
raise TimeoutError(f"Expected K6 finished in {timeout} sec.") raise TimeoutError(f"Expected K6 finished in {timeout} sec.")

View file

@ -10,7 +10,8 @@ from frostfs_testlib.load.load_metrics import get_metrics_object
class LoadReport: class LoadReport:
def __init__(self, load_test) -> None: def __init__(self, load_test) -> None:
self.load_test = load_test self.load_test = load_test
self.load_summaries: Optional[dict] = None # List of load summaries dict
self.load_summaries_list: Optional[list[dict]] = []
self.load_params: Optional[LoadParams] = None self.load_params: Optional[LoadParams] = None
self.start_time: Optional[datetime] = None self.start_time: Optional[datetime] = None
self.end_time: Optional[datetime] = None self.end_time: Optional[datetime] = None
@ -21,8 +22,8 @@ class LoadReport:
def set_end_time(self): def set_end_time(self):
self.end_time = datetime.utcnow() self.end_time = datetime.utcnow()
def set_summaries(self, load_summaries: dict): def add_summaries(self, load_summaries: dict):
self.load_summaries = load_summaries self.load_summaries_list.append(load_summaries)
def set_load_params(self, load_params: LoadParams): def set_load_params(self, load_params: LoadParams):
self.load_params = load_params self.load_params = load_params
@ -30,7 +31,7 @@ class LoadReport:
def get_report_html(self): def get_report_html(self):
report_sections = [ report_sections = [
[self.load_test, self._get_load_params_section_html], [self.load_test, self._get_load_params_section_html],
[self.load_summaries, self._get_totals_section_html], [self.load_summaries_list, self._get_totals_section_html],
[self.end_time, self._get_test_time_html], [self.end_time, self._get_test_time_html],
] ]
@ -156,110 +157,113 @@ class LoadReport:
return html return html
def _get_totals_section_html(self): def _get_totals_section_html(self):
html = ""
for i, load_summaries in enumerate(self.load_summaries_list, 1):
html += f"<h3>Load Results for load #{i}</h3>"
html = "<h3>Load Results</h3>" write_operations = 0
write_op_sec = 0
write_operations = 0 write_throughput = 0
write_op_sec = 0 write_errors = {}
write_throughput = 0 requested_write_rate = self.load_params.write_rate
write_errors = {} requested_write_rate_str = (
requested_write_rate = self.load_params.write_rate f"{requested_write_rate}op/sec" if requested_write_rate else ""
requested_write_rate_str = f"{requested_write_rate}op/sec" if requested_write_rate else ""
read_operations = 0
read_op_sec = 0
read_throughput = 0
read_errors = {}
requested_read_rate = self.load_params.read_rate
requested_read_rate_str = f"{requested_read_rate}op/sec" if requested_read_rate else ""
delete_operations = 0
delete_op_sec = 0
delete_errors = {}
requested_delete_rate = self.load_params.delete_rate
requested_delete_rate_str = (
f"{requested_delete_rate}op/sec" if requested_delete_rate else ""
)
if self.load_params.scenario in [LoadScenario.gRPC_CAR, LoadScenario.S3_CAR]:
delete_vus = max(
self.load_params.preallocated_deleters or 0, self.load_params.max_deleters or 0
)
write_vus = max(
self.load_params.preallocated_writers or 0, self.load_params.max_writers or 0
)
read_vus = max(
self.load_params.preallocated_readers or 0, self.load_params.max_readers or 0
)
else:
write_vus = self.load_params.writers
read_vus = self.load_params.readers
delete_vus = self.load_params.deleters
write_vus_str = f"{write_vus}th"
read_vus_str = f"{read_vus}th"
delete_vus_str = f"{delete_vus}th"
write_section_required = False
read_section_required = False
delete_section_required = False
for node_key, load_summary in self.load_summaries.items():
metrics = get_metrics_object(self.load_params.scenario, load_summary)
write_operations += metrics.write_total_iterations
if write_operations:
write_section_required = True
write_op_sec += metrics.write_rate
write_throughput += metrics.write_throughput
if metrics.write_failed_iterations:
write_errors[node_key] = metrics.write_failed_iterations
read_operations += metrics.read_total_iterations
if read_operations:
read_section_required = True
read_op_sec += metrics.read_rate
read_throughput += metrics.read_throughput
if metrics.read_failed_iterations:
read_errors[node_key] = metrics.read_failed_iterations
delete_operations += metrics.delete_total_iterations
if delete_operations:
delete_section_required = True
delete_op_sec += metrics.delete_rate
if metrics.delete_failed_iterations:
delete_errors[node_key] = metrics.delete_failed_iterations
if write_section_required:
html += self._get_oprations_sub_section_html(
"Write",
write_operations,
requested_write_rate_str,
write_vus_str,
write_op_sec,
write_throughput,
write_errors,
) )
if read_section_required: read_operations = 0
html += self._get_oprations_sub_section_html( read_op_sec = 0
"Read", read_throughput = 0
read_operations, read_errors = {}
requested_read_rate_str, requested_read_rate = self.load_params.read_rate
read_vus_str, requested_read_rate_str = f"{requested_read_rate}op/sec" if requested_read_rate else ""
read_op_sec,
read_throughput, delete_operations = 0
read_errors, delete_op_sec = 0
delete_errors = {}
requested_delete_rate = self.load_params.delete_rate
requested_delete_rate_str = (
f"{requested_delete_rate}op/sec" if requested_delete_rate else ""
) )
if delete_section_required: if self.load_params.scenario in [LoadScenario.gRPC_CAR, LoadScenario.S3_CAR]:
html += self._get_oprations_sub_section_html( delete_vus = max(
"Delete", self.load_params.preallocated_deleters or 0, self.load_params.max_deleters or 0
delete_operations, )
requested_delete_rate_str, write_vus = max(
delete_vus_str, self.load_params.preallocated_writers or 0, self.load_params.max_writers or 0
delete_op_sec, )
0, read_vus = max(
delete_errors, self.load_params.preallocated_readers or 0, self.load_params.max_readers or 0
) )
else:
write_vus = self.load_params.writers
read_vus = self.load_params.readers
delete_vus = self.load_params.deleters
write_vus_str = f"{write_vus}th"
read_vus_str = f"{read_vus}th"
delete_vus_str = f"{delete_vus}th"
write_section_required = False
read_section_required = False
delete_section_required = False
for node_key, load_summary in load_summaries.items():
metrics = get_metrics_object(self.load_params.scenario, load_summary)
write_operations += metrics.write_total_iterations
if write_operations:
write_section_required = True
write_op_sec += metrics.write_rate
write_throughput += metrics.write_throughput
if metrics.write_failed_iterations:
write_errors[node_key] = metrics.write_failed_iterations
read_operations += metrics.read_total_iterations
if read_operations:
read_section_required = True
read_op_sec += metrics.read_rate
read_throughput += metrics.read_throughput
if metrics.read_failed_iterations:
read_errors[node_key] = metrics.read_failed_iterations
delete_operations += metrics.delete_total_iterations
if delete_operations:
delete_section_required = True
delete_op_sec += metrics.delete_rate
if metrics.delete_failed_iterations:
delete_errors[node_key] = metrics.delete_failed_iterations
if write_section_required:
html += self._get_oprations_sub_section_html(
"Write",
write_operations,
requested_write_rate_str,
write_vus_str,
write_op_sec,
write_throughput,
write_errors,
)
if read_section_required:
html += self._get_oprations_sub_section_html(
"Read",
read_operations,
requested_read_rate_str,
read_vus_str,
read_op_sec,
read_throughput,
read_errors,
)
if delete_section_required:
html += self._get_oprations_sub_section_html(
"Delete",
delete_operations,
requested_delete_rate_str,
delete_vus_str,
delete_op_sec,
0,
delete_errors,
)
return html return html

View file

@ -9,7 +9,10 @@ from frostfs_testlib.load.k6 import K6
from frostfs_testlib.load.load_config import K6ProcessAllocationStrategy, LoadParams from frostfs_testlib.load.load_config import K6ProcessAllocationStrategy, LoadParams
from frostfs_testlib.reporter import get_reporter from frostfs_testlib.reporter import get_reporter
from frostfs_testlib.resources.cli import FROSTFS_AUTHMATE_EXEC from frostfs_testlib.resources.cli import FROSTFS_AUTHMATE_EXEC
from frostfs_testlib.resources.load_params import BACKGROUND_LOAD_VUS_COUNT_DIVISOR from frostfs_testlib.resources.load_params import (
BACKGROUND_LOAD_VUS_COUNT_DIVISOR,
LOAD_NODE_SSH_USER,
)
from frostfs_testlib.shell import CommandOptions, SSHShell from frostfs_testlib.shell import CommandOptions, SSHShell
from frostfs_testlib.shell.interfaces import InteractiveInput, SshCredentials from frostfs_testlib.shell.interfaces import InteractiveInput, SshCredentials
from frostfs_testlib.storage.cluster import ClusterNode from frostfs_testlib.storage.cluster import ClusterNode
@ -35,7 +38,7 @@ def init_s3_client(
grpc_peer = storage_node.get_rpc_endpoint() grpc_peer = storage_node.get_rpc_endpoint()
for load_node in load_nodes: for load_node in load_nodes:
ssh_client = _get_ssh_client(ssh_credentials, load_node) ssh_client = _get_shell(ssh_credentials, load_node)
frostfs_authmate_exec: FrostfsAuthmate = FrostfsAuthmate(ssh_client, FROSTFS_AUTHMATE_EXEC) frostfs_authmate_exec: FrostfsAuthmate = FrostfsAuthmate(ssh_client, FROSTFS_AUTHMATE_EXEC)
issue_secret_output = frostfs_authmate_exec.secret.issue( issue_secret_output = frostfs_authmate_exec.secret.issue(
wallet=wallet.path, wallet=wallet.path,
@ -99,12 +102,16 @@ def prepare_k6_instances(
for distributed_load_params in distributed_load_params_list: for distributed_load_params in distributed_load_params_list:
load_node = next(nodes) load_node = next(nodes)
ssh_client = _get_ssh_client(ssh_credentials, load_node) shell = _get_shell(ssh_credentials, load_node)
# Make working_dir directory
shell.exec(f"sudo mkdir -p {distributed_load_params.working_dir}")
shell.exec(f"sudo chown {LOAD_NODE_SSH_USER} {distributed_load_params.working_dir}")
k6_load_object = K6( k6_load_object = K6(
distributed_load_params, distributed_load_params,
next(endpoints_gen), next(endpoints_gen),
k6_dir, k6_dir,
ssh_client, shell,
load_node, load_node,
loaders_wallet, loaders_wallet,
) )
@ -115,7 +122,7 @@ def prepare_k6_instances(
return k6_load_objects return k6_load_objects
def _get_ssh_client(ssh_credentials: SshCredentials, load_node: str): def _get_shell(ssh_credentials: SshCredentials, load_node: str) -> SSHShell:
ssh_client = SSHShell( ssh_client = SSHShell(
host=load_node, host=load_node,
login=ssh_credentials.ssh_login, login=ssh_credentials.ssh_login,

View file

@ -57,7 +57,7 @@ class LoadVerifier:
# Due to interruptions we may see total verified objects to be less than written on writers count # Due to interruptions we may see total verified objects to be less than written on writers count
if abs(objects_count - verified_objects) > writers: if abs(objects_count - verified_objects) > writers:
exceptions.append( exceptions.append(
f"Verified objects is less than total objects. Total: {objects_count}, Verified: {verified_objects}. Writers: {writers}." f"Verified objects mismatch. Total: {objects_count}, Verified: {verified_objects}. Writers: {writers}."
) )
assert not exceptions, "\n".join(exceptions) assert not exceptions, "\n".join(exceptions)

View file

@ -1,3 +1,4 @@
import copy
import time import time
import frostfs_testlib.resources.optionals as optionals import frostfs_testlib.resources.optionals as optionals
@ -9,7 +10,9 @@ from frostfs_testlib.load.load_config import (
LoadScenario, LoadScenario,
LoadType, LoadType,
) )
from frostfs_testlib.load.load_report import LoadReport
from frostfs_testlib.load.load_steps import init_s3_client, prepare_k6_instances from frostfs_testlib.load.load_steps import init_s3_client, prepare_k6_instances
from frostfs_testlib.load.load_verifiers import LoadVerifier
from frostfs_testlib.reporter import get_reporter from frostfs_testlib.reporter import get_reporter
from frostfs_testlib.resources.load_params import ( from frostfs_testlib.resources.load_params import (
K6_TEARDOWN_PERIOD, K6_TEARDOWN_PERIOD,
@ -33,11 +36,14 @@ class BackgroundLoadController:
k6_instances: list[K6] k6_instances: list[K6]
k6_dir: str k6_dir: str
load_params: LoadParams load_params: LoadParams
original_load_params: LoadParams
load_nodes: list[str] load_nodes: list[str]
verification_params: LoadParams verification_params: LoadParams
nodes_under_load: list[ClusterNode] nodes_under_load: list[ClusterNode]
load_counter: int
ssh_credentials: SshCredentials ssh_credentials: SshCredentials
loaders_wallet: WalletInfo loaders_wallet: WalletInfo
load_summaries: dict
endpoints: list[str] endpoints: list[str]
def __init__( def __init__(
@ -48,8 +54,10 @@ class BackgroundLoadController:
nodes_under_load: list[ClusterNode], nodes_under_load: list[ClusterNode],
) -> None: ) -> None:
self.k6_dir = k6_dir self.k6_dir = k6_dir
self.load_params = load_params self.original_load_params = load_params
self.load_params = copy.deepcopy(self.original_load_params)
self.nodes_under_load = nodes_under_load self.nodes_under_load = nodes_under_load
self.load_counter = 1
self.load_nodes = LOAD_NODES self.load_nodes = LOAD_NODES
self.loaders_wallet = loaders_wallet self.loaders_wallet = loaders_wallet
@ -59,17 +67,7 @@ class BackgroundLoadController:
self.endpoints = self._get_endpoints( self.endpoints = self._get_endpoints(
load_params.load_type, load_params.endpoint_selection_strategy load_params.load_type, load_params.endpoint_selection_strategy
) )
self.verification_params = LoadParams(
verify_clients=load_params.verify_clients,
scenario=LoadScenario.VERIFY,
registry_file=load_params.registry_file,
verify_time=load_params.verify_time,
load_type=load_params.load_type,
load_id=load_params.load_id,
working_dir=load_params.working_dir,
endpoint_selection_strategy=load_params.endpoint_selection_strategy,
k6_process_allocation_strategy=load_params.k6_process_allocation_strategy,
)
self.ssh_credentials = SshCredentials( self.ssh_credentials = SshCredentials(
LOAD_NODE_SSH_USER, LOAD_NODE_SSH_USER,
LOAD_NODE_SSH_PASSWORD, LOAD_NODE_SSH_PASSWORD,
@ -179,6 +177,66 @@ class BackgroundLoadController:
return True return True
@run_optionally(optionals.OPTIONAL_BACKGROUND_LOAD_ENABLED)
@reporter.step_deco("Reset background load")
def _reset_for_consequent_load(self):
"""This method is required if we want to run multiple loads during test run.
Raise load counter by 1 and append it to load_id
"""
self.load_counter += 1
self.load_params = copy.deepcopy(self.original_load_params)
self.load_params.set_id(f"{self.load_params.load_id}_{self.load_counter}")
@run_optionally(optionals.OPTIONAL_BACKGROUND_LOAD_ENABLED)
@reporter.step_deco("Startup background load")
def startup(self):
self.prepare()
self.start()
@run_optionally(optionals.OPTIONAL_BACKGROUND_LOAD_ENABLED)
@reporter.step_deco("Stop and get results of background load")
def teardown(self, load_report: LoadReport = None):
if not self.k6_instances:
return
self.stop()
self.load_summaries = self.get_results()
self.k6_instances = []
if load_report:
load_report.add_summaries(self.load_summaries)
@run_optionally(optionals.OPTIONAL_BACKGROUND_LOAD_ENABLED)
@reporter.step_deco("Verify results of background load")
def verify(self):
try:
if self.load_params.verify:
self.verification_params = LoadParams(
verify_clients=self.load_params.verify_clients,
scenario=LoadScenario.VERIFY,
registry_file=self.load_params.registry_file,
verify_time=self.load_params.verify_time,
load_type=self.load_params.load_type,
load_id=self.load_params.load_id,
working_dir=self.load_params.working_dir,
endpoint_selection_strategy=self.load_params.endpoint_selection_strategy,
k6_process_allocation_strategy=self.load_params.k6_process_allocation_strategy,
)
self._run_verify_scenario()
verification_summaries = self.get_results()
self.verify_summaries(self.load_summaries, verification_summaries)
finally:
self._reset_for_consequent_load()
@run_optionally(optionals.OPTIONAL_BACKGROUND_LOAD_ENABLED)
@reporter.step_deco("Verify summaries from k6")
def verify_summaries(self, load_summaries: dict, verification_summaries: dict):
verifier = LoadVerifier(self.load_params)
for node_or_endpoint in load_summaries:
with reporter.step(f"Verify load summaries for {node_or_endpoint}"):
verifier.verify_summaries(
load_summaries[node_or_endpoint], verification_summaries[node_or_endpoint]
)
@run_optionally(optionals.OPTIONAL_BACKGROUND_LOAD_ENABLED) @run_optionally(optionals.OPTIONAL_BACKGROUND_LOAD_ENABLED)
def wait_until_finish(self): def wait_until_finish(self):
if self.load_params.load_time is None: if self.load_params.load_time is None:
@ -188,7 +246,8 @@ class BackgroundLoadController:
k6_instance.wait_until_finished(self.load_params.load_time + int(K6_TEARDOWN_PERIOD)) k6_instance.wait_until_finished(self.load_params.load_time + int(K6_TEARDOWN_PERIOD))
@run_optionally(optionals.OPTIONAL_BACKGROUND_LOAD_ENABLED) @run_optionally(optionals.OPTIONAL_BACKGROUND_LOAD_ENABLED)
def verify(self): @reporter.step_deco("Run verify scenario for background load")
def _run_verify_scenario(self):
if self.verification_params.verify_time is None: if self.verification_params.verify_time is None:
raise RuntimeError("verify_time should not be none") raise RuntimeError("verify_time should not be none")

View file

@ -1,7 +1,5 @@
import time import time
import allure
import frostfs_testlib.resources.optionals as optionals import frostfs_testlib.resources.optionals as optionals
from frostfs_testlib.reporter import get_reporter from frostfs_testlib.reporter import get_reporter
from frostfs_testlib.shell import CommandOptions, Shell from frostfs_testlib.shell import CommandOptions, Shell
@ -30,15 +28,29 @@ class ClusterStateController:
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step_deco("Stop host of node {node}") @reporter.step_deco("Stop host of node {node}")
def stop_node_host(self, node: ClusterNode, mode: str): def stop_node_host(self, node: ClusterNode, mode: str):
with allure.step(f"Stop host {node.host.config.address}"): with reporter.step(f"Stop host {node.host.config.address}"):
node.host.stop_host(mode=mode) node.host.stop_host(mode=mode)
wait_for_host_offline(self.shell, node.storage_node) wait_for_host_offline(self.shell, node.storage_node)
self.stopped_nodes.append(node) self.stopped_nodes.append(node)
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step_deco("Shutdown whole cluster")
def shutdown_cluster(self, mode: str, reversed_order: bool = False):
nodes = (
reversed(self.cluster.cluster_nodes) if reversed_order else self.cluster.cluster_nodes
)
for node in nodes:
with reporter.step(f"Stop host {node.host.config.address}"):
self.stopped_nodes.append(node)
node.host.stop_host(mode=mode)
for node in nodes:
wait_for_host_offline(self.shell, node.storage_node)
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step_deco("Start host of node {node}") @reporter.step_deco("Start host of node {node}")
def start_node_host(self, node: ClusterNode): def start_node_host(self, node: ClusterNode):
with allure.step(f"Start host {node.host.config.address}"): with reporter.step(f"Start host {node.host.config.address}"):
node.host.start_host() node.host.start_host()
wait_for_host_online(self.shell, node.storage_node) wait_for_host_online(self.shell, node.storage_node)
wait_for_node_online(node.storage_node) wait_for_node_online(node.storage_node)
@ -46,9 +58,11 @@ class ClusterStateController:
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step_deco("Start stopped hosts") @reporter.step_deco("Start stopped hosts")
def start_stopped_hosts(self): def start_stopped_hosts(self, reversed_order: bool = False):
for node in self.stopped_nodes: nodes = reversed(self.stopped_nodes) if reversed_order else self.stopped_nodes
node.host.start_host() for node in nodes:
with reporter.step(f"Start host {node.host.config.address}"):
node.host.start_host()
self.stopped_nodes = [] self.stopped_nodes = []
wait_all_storage_nodes_returned(self.shell, self.cluster) wait_all_storage_nodes_returned(self.shell, self.cluster)