diff --git a/src/frostfs_testlib/load/k6.py b/src/frostfs_testlib/load/k6.py
index 9a8b1d9..2fa2c00 100644
--- a/src/frostfs_testlib/load/k6.py
+++ b/src/frostfs_testlib/load/k6.py
@@ -153,10 +153,6 @@ class K6:
@reporter.step_deco("Start K6 on initiator")
def start(self) -> None:
- # Make working_dir directory
- self.shell.exec(f"sudo mkdir -p {self.load_params.working_dir}")
- self.shell.exec(f"sudo chown {LOAD_NODE_SSH_USER} {self.load_params.working_dir}")
-
command = (
f"{self._k6_dir}/k6 run {self._generate_env_variables()} "
f"{self._k6_dir}/scenarios/{self.scenario.value}.js"
@@ -170,13 +166,12 @@ class K6:
assert "No k6 instances were executed"
if k6_should_be_running:
assert self._k6_process.running(), "k6 should be running."
- while timeout >= 0:
+ while timeout > 0:
if not self._k6_process.running():
return
logger.info(f"K6 is running. Waiting {wait_interval} seconds...")
- if timeout > 0:
- sleep(wait_interval)
- timeout -= wait_interval
+ sleep(wait_interval)
+ timeout -= wait_interval
self.stop()
raise TimeoutError(f"Expected K6 finished in {timeout} sec.")
diff --git a/src/frostfs_testlib/load/load_report.py b/src/frostfs_testlib/load/load_report.py
index 2771df5..c9c23c7 100644
--- a/src/frostfs_testlib/load/load_report.py
+++ b/src/frostfs_testlib/load/load_report.py
@@ -10,7 +10,8 @@ from frostfs_testlib.load.load_metrics import get_metrics_object
class LoadReport:
def __init__(self, load_test) -> None:
self.load_test = load_test
- self.load_summaries: Optional[dict] = None
+ # List of load summaries dict
+ self.load_summaries_list: Optional[list[dict]] = []
self.load_params: Optional[LoadParams] = None
self.start_time: Optional[datetime] = None
self.end_time: Optional[datetime] = None
@@ -21,8 +22,8 @@ class LoadReport:
def set_end_time(self):
self.end_time = datetime.utcnow()
- def set_summaries(self, load_summaries: dict):
- self.load_summaries = load_summaries
+ def add_summaries(self, load_summaries: dict):
+ self.load_summaries_list.append(load_summaries)
def set_load_params(self, load_params: LoadParams):
self.load_params = load_params
@@ -30,7 +31,7 @@ class LoadReport:
def get_report_html(self):
report_sections = [
[self.load_test, self._get_load_params_section_html],
- [self.load_summaries, self._get_totals_section_html],
+ [self.load_summaries_list, self._get_totals_section_html],
[self.end_time, self._get_test_time_html],
]
@@ -156,110 +157,113 @@ class LoadReport:
return html
def _get_totals_section_html(self):
+ html = ""
+ for i, load_summaries in enumerate(self.load_summaries_list, 1):
+ html += f"
Load Results for load #{i}
"
- html = "Load Results
"
-
- write_operations = 0
- write_op_sec = 0
- write_throughput = 0
- write_errors = {}
- requested_write_rate = self.load_params.write_rate
- requested_write_rate_str = f"{requested_write_rate}op/sec" if requested_write_rate else ""
-
- read_operations = 0
- read_op_sec = 0
- read_throughput = 0
- read_errors = {}
- requested_read_rate = self.load_params.read_rate
- requested_read_rate_str = f"{requested_read_rate}op/sec" if requested_read_rate else ""
-
- delete_operations = 0
- delete_op_sec = 0
- delete_errors = {}
- requested_delete_rate = self.load_params.delete_rate
- requested_delete_rate_str = (
- f"{requested_delete_rate}op/sec" if requested_delete_rate else ""
- )
-
- if self.load_params.scenario in [LoadScenario.gRPC_CAR, LoadScenario.S3_CAR]:
- delete_vus = max(
- self.load_params.preallocated_deleters or 0, self.load_params.max_deleters or 0
- )
- write_vus = max(
- self.load_params.preallocated_writers or 0, self.load_params.max_writers or 0
- )
- read_vus = max(
- self.load_params.preallocated_readers or 0, self.load_params.max_readers or 0
- )
- else:
- write_vus = self.load_params.writers
- read_vus = self.load_params.readers
- delete_vus = self.load_params.deleters
-
- write_vus_str = f"{write_vus}th"
- read_vus_str = f"{read_vus}th"
- delete_vus_str = f"{delete_vus}th"
-
- write_section_required = False
- read_section_required = False
- delete_section_required = False
-
- for node_key, load_summary in self.load_summaries.items():
- metrics = get_metrics_object(self.load_params.scenario, load_summary)
- write_operations += metrics.write_total_iterations
- if write_operations:
- write_section_required = True
- write_op_sec += metrics.write_rate
- write_throughput += metrics.write_throughput
- if metrics.write_failed_iterations:
- write_errors[node_key] = metrics.write_failed_iterations
-
- read_operations += metrics.read_total_iterations
- if read_operations:
- read_section_required = True
- read_op_sec += metrics.read_rate
- read_throughput += metrics.read_throughput
- if metrics.read_failed_iterations:
- read_errors[node_key] = metrics.read_failed_iterations
-
- delete_operations += metrics.delete_total_iterations
- if delete_operations:
- delete_section_required = True
- delete_op_sec += metrics.delete_rate
- if metrics.delete_failed_iterations:
- delete_errors[node_key] = metrics.delete_failed_iterations
-
- if write_section_required:
- html += self._get_oprations_sub_section_html(
- "Write",
- write_operations,
- requested_write_rate_str,
- write_vus_str,
- write_op_sec,
- write_throughput,
- write_errors,
+ write_operations = 0
+ write_op_sec = 0
+ write_throughput = 0
+ write_errors = {}
+ requested_write_rate = self.load_params.write_rate
+ requested_write_rate_str = (
+ f"{requested_write_rate}op/sec" if requested_write_rate else ""
)
- if read_section_required:
- html += self._get_oprations_sub_section_html(
- "Read",
- read_operations,
- requested_read_rate_str,
- read_vus_str,
- read_op_sec,
- read_throughput,
- read_errors,
+ read_operations = 0
+ read_op_sec = 0
+ read_throughput = 0
+ read_errors = {}
+ requested_read_rate = self.load_params.read_rate
+ requested_read_rate_str = f"{requested_read_rate}op/sec" if requested_read_rate else ""
+
+ delete_operations = 0
+ delete_op_sec = 0
+ delete_errors = {}
+ requested_delete_rate = self.load_params.delete_rate
+ requested_delete_rate_str = (
+ f"{requested_delete_rate}op/sec" if requested_delete_rate else ""
)
- if delete_section_required:
- html += self._get_oprations_sub_section_html(
- "Delete",
- delete_operations,
- requested_delete_rate_str,
- delete_vus_str,
- delete_op_sec,
- 0,
- delete_errors,
- )
+ if self.load_params.scenario in [LoadScenario.gRPC_CAR, LoadScenario.S3_CAR]:
+ delete_vus = max(
+ self.load_params.preallocated_deleters or 0, self.load_params.max_deleters or 0
+ )
+ write_vus = max(
+ self.load_params.preallocated_writers or 0, self.load_params.max_writers or 0
+ )
+ read_vus = max(
+ self.load_params.preallocated_readers or 0, self.load_params.max_readers or 0
+ )
+ else:
+ write_vus = self.load_params.writers
+ read_vus = self.load_params.readers
+ delete_vus = self.load_params.deleters
+
+ write_vus_str = f"{write_vus}th"
+ read_vus_str = f"{read_vus}th"
+ delete_vus_str = f"{delete_vus}th"
+
+ write_section_required = False
+ read_section_required = False
+ delete_section_required = False
+
+ for node_key, load_summary in load_summaries.items():
+ metrics = get_metrics_object(self.load_params.scenario, load_summary)
+ write_operations += metrics.write_total_iterations
+ if write_operations:
+ write_section_required = True
+ write_op_sec += metrics.write_rate
+ write_throughput += metrics.write_throughput
+ if metrics.write_failed_iterations:
+ write_errors[node_key] = metrics.write_failed_iterations
+
+ read_operations += metrics.read_total_iterations
+ if read_operations:
+ read_section_required = True
+ read_op_sec += metrics.read_rate
+ read_throughput += metrics.read_throughput
+ if metrics.read_failed_iterations:
+ read_errors[node_key] = metrics.read_failed_iterations
+
+ delete_operations += metrics.delete_total_iterations
+ if delete_operations:
+ delete_section_required = True
+ delete_op_sec += metrics.delete_rate
+ if metrics.delete_failed_iterations:
+ delete_errors[node_key] = metrics.delete_failed_iterations
+
+ if write_section_required:
+ html += self._get_oprations_sub_section_html(
+ "Write",
+ write_operations,
+ requested_write_rate_str,
+ write_vus_str,
+ write_op_sec,
+ write_throughput,
+ write_errors,
+ )
+
+ if read_section_required:
+ html += self._get_oprations_sub_section_html(
+ "Read",
+ read_operations,
+ requested_read_rate_str,
+ read_vus_str,
+ read_op_sec,
+ read_throughput,
+ read_errors,
+ )
+
+ if delete_section_required:
+ html += self._get_oprations_sub_section_html(
+ "Delete",
+ delete_operations,
+ requested_delete_rate_str,
+ delete_vus_str,
+ delete_op_sec,
+ 0,
+ delete_errors,
+ )
return html
diff --git a/src/frostfs_testlib/load/load_steps.py b/src/frostfs_testlib/load/load_steps.py
index 5d935aa..b55ff22 100644
--- a/src/frostfs_testlib/load/load_steps.py
+++ b/src/frostfs_testlib/load/load_steps.py
@@ -9,7 +9,10 @@ from frostfs_testlib.load.k6 import K6
from frostfs_testlib.load.load_config import K6ProcessAllocationStrategy, LoadParams
from frostfs_testlib.reporter import get_reporter
from frostfs_testlib.resources.cli import FROSTFS_AUTHMATE_EXEC
-from frostfs_testlib.resources.load_params import BACKGROUND_LOAD_VUS_COUNT_DIVISOR
+from frostfs_testlib.resources.load_params import (
+ BACKGROUND_LOAD_VUS_COUNT_DIVISOR,
+ LOAD_NODE_SSH_USER,
+)
from frostfs_testlib.shell import CommandOptions, SSHShell
from frostfs_testlib.shell.interfaces import InteractiveInput, SshCredentials
from frostfs_testlib.storage.cluster import ClusterNode
@@ -35,7 +38,7 @@ def init_s3_client(
grpc_peer = storage_node.get_rpc_endpoint()
for load_node in load_nodes:
- ssh_client = _get_ssh_client(ssh_credentials, load_node)
+ ssh_client = _get_shell(ssh_credentials, load_node)
frostfs_authmate_exec: FrostfsAuthmate = FrostfsAuthmate(ssh_client, FROSTFS_AUTHMATE_EXEC)
issue_secret_output = frostfs_authmate_exec.secret.issue(
wallet=wallet.path,
@@ -99,12 +102,16 @@ def prepare_k6_instances(
for distributed_load_params in distributed_load_params_list:
load_node = next(nodes)
- ssh_client = _get_ssh_client(ssh_credentials, load_node)
+ shell = _get_shell(ssh_credentials, load_node)
+ # Make working_dir directory
+ shell.exec(f"sudo mkdir -p {distributed_load_params.working_dir}")
+ shell.exec(f"sudo chown {LOAD_NODE_SSH_USER} {distributed_load_params.working_dir}")
+
k6_load_object = K6(
distributed_load_params,
next(endpoints_gen),
k6_dir,
- ssh_client,
+ shell,
load_node,
loaders_wallet,
)
@@ -115,7 +122,7 @@ def prepare_k6_instances(
return k6_load_objects
-def _get_ssh_client(ssh_credentials: SshCredentials, load_node: str):
+def _get_shell(ssh_credentials: SshCredentials, load_node: str) -> SSHShell:
ssh_client = SSHShell(
host=load_node,
login=ssh_credentials.ssh_login,
diff --git a/src/frostfs_testlib/load/load_verifiers.py b/src/frostfs_testlib/load/load_verifiers.py
index 69e9f1f..1ff63ae 100644
--- a/src/frostfs_testlib/load/load_verifiers.py
+++ b/src/frostfs_testlib/load/load_verifiers.py
@@ -57,7 +57,7 @@ class LoadVerifier:
# Due to interruptions we may see total verified objects to be less than written on writers count
if abs(objects_count - verified_objects) > writers:
exceptions.append(
- f"Verified objects is less than total objects. Total: {objects_count}, Verified: {verified_objects}. Writers: {writers}."
+ f"Verified objects mismatch. Total: {objects_count}, Verified: {verified_objects}. Writers: {writers}."
)
assert not exceptions, "\n".join(exceptions)
diff --git a/src/frostfs_testlib/storage/controllers/background_load_controller.py b/src/frostfs_testlib/storage/controllers/background_load_controller.py
index f9cf0e5..a2336be 100644
--- a/src/frostfs_testlib/storage/controllers/background_load_controller.py
+++ b/src/frostfs_testlib/storage/controllers/background_load_controller.py
@@ -1,3 +1,4 @@
+import copy
import time
import frostfs_testlib.resources.optionals as optionals
@@ -9,7 +10,9 @@ from frostfs_testlib.load.load_config import (
LoadScenario,
LoadType,
)
+from frostfs_testlib.load.load_report import LoadReport
from frostfs_testlib.load.load_steps import init_s3_client, prepare_k6_instances
+from frostfs_testlib.load.load_verifiers import LoadVerifier
from frostfs_testlib.reporter import get_reporter
from frostfs_testlib.resources.load_params import (
K6_TEARDOWN_PERIOD,
@@ -33,11 +36,14 @@ class BackgroundLoadController:
k6_instances: list[K6]
k6_dir: str
load_params: LoadParams
+ original_load_params: LoadParams
load_nodes: list[str]
verification_params: LoadParams
nodes_under_load: list[ClusterNode]
+ load_counter: int
ssh_credentials: SshCredentials
loaders_wallet: WalletInfo
+ load_summaries: dict
endpoints: list[str]
def __init__(
@@ -48,8 +54,10 @@ class BackgroundLoadController:
nodes_under_load: list[ClusterNode],
) -> None:
self.k6_dir = k6_dir
- self.load_params = load_params
+ self.original_load_params = load_params
+ self.load_params = copy.deepcopy(self.original_load_params)
self.nodes_under_load = nodes_under_load
+ self.load_counter = 1
self.load_nodes = LOAD_NODES
self.loaders_wallet = loaders_wallet
@@ -59,17 +67,7 @@ class BackgroundLoadController:
self.endpoints = self._get_endpoints(
load_params.load_type, load_params.endpoint_selection_strategy
)
- self.verification_params = LoadParams(
- verify_clients=load_params.verify_clients,
- scenario=LoadScenario.VERIFY,
- registry_file=load_params.registry_file,
- verify_time=load_params.verify_time,
- load_type=load_params.load_type,
- load_id=load_params.load_id,
- working_dir=load_params.working_dir,
- endpoint_selection_strategy=load_params.endpoint_selection_strategy,
- k6_process_allocation_strategy=load_params.k6_process_allocation_strategy,
- )
+
self.ssh_credentials = SshCredentials(
LOAD_NODE_SSH_USER,
LOAD_NODE_SSH_PASSWORD,
@@ -179,6 +177,66 @@ class BackgroundLoadController:
return True
+ @run_optionally(optionals.OPTIONAL_BACKGROUND_LOAD_ENABLED)
+ @reporter.step_deco("Reset background load")
+ def _reset_for_consequent_load(self):
+ """This method is required if we want to run multiple loads during test run.
+ Raise load counter by 1 and append it to load_id
+ """
+ self.load_counter += 1
+ self.load_params = copy.deepcopy(self.original_load_params)
+ self.load_params.set_id(f"{self.load_params.load_id}_{self.load_counter}")
+
+ @run_optionally(optionals.OPTIONAL_BACKGROUND_LOAD_ENABLED)
+ @reporter.step_deco("Startup background load")
+ def startup(self):
+ self.prepare()
+ self.start()
+
+ @run_optionally(optionals.OPTIONAL_BACKGROUND_LOAD_ENABLED)
+ @reporter.step_deco("Stop and get results of background load")
+ def teardown(self, load_report: LoadReport = None):
+ if not self.k6_instances:
+ return
+
+ self.stop()
+ self.load_summaries = self.get_results()
+ self.k6_instances = []
+ if load_report:
+ load_report.add_summaries(self.load_summaries)
+
+ @run_optionally(optionals.OPTIONAL_BACKGROUND_LOAD_ENABLED)
+ @reporter.step_deco("Verify results of background load")
+ def verify(self):
+ try:
+ if self.load_params.verify:
+ self.verification_params = LoadParams(
+ verify_clients=self.load_params.verify_clients,
+ scenario=LoadScenario.VERIFY,
+ registry_file=self.load_params.registry_file,
+ verify_time=self.load_params.verify_time,
+ load_type=self.load_params.load_type,
+ load_id=self.load_params.load_id,
+ working_dir=self.load_params.working_dir,
+ endpoint_selection_strategy=self.load_params.endpoint_selection_strategy,
+ k6_process_allocation_strategy=self.load_params.k6_process_allocation_strategy,
+ )
+ self._run_verify_scenario()
+ verification_summaries = self.get_results()
+ self.verify_summaries(self.load_summaries, verification_summaries)
+ finally:
+ self._reset_for_consequent_load()
+
+ @run_optionally(optionals.OPTIONAL_BACKGROUND_LOAD_ENABLED)
+ @reporter.step_deco("Verify summaries from k6")
+ def verify_summaries(self, load_summaries: dict, verification_summaries: dict):
+ verifier = LoadVerifier(self.load_params)
+ for node_or_endpoint in load_summaries:
+ with reporter.step(f"Verify load summaries for {node_or_endpoint}"):
+ verifier.verify_summaries(
+ load_summaries[node_or_endpoint], verification_summaries[node_or_endpoint]
+ )
+
@run_optionally(optionals.OPTIONAL_BACKGROUND_LOAD_ENABLED)
def wait_until_finish(self):
if self.load_params.load_time is None:
@@ -188,7 +246,8 @@ class BackgroundLoadController:
k6_instance.wait_until_finished(self.load_params.load_time + int(K6_TEARDOWN_PERIOD))
@run_optionally(optionals.OPTIONAL_BACKGROUND_LOAD_ENABLED)
- def verify(self):
+ @reporter.step_deco("Run verify scenario for background load")
+ def _run_verify_scenario(self):
if self.verification_params.verify_time is None:
raise RuntimeError("verify_time should not be none")
diff --git a/src/frostfs_testlib/storage/controllers/cluster_state_controller.py b/src/frostfs_testlib/storage/controllers/cluster_state_controller.py
index 35072f2..34f027f 100644
--- a/src/frostfs_testlib/storage/controllers/cluster_state_controller.py
+++ b/src/frostfs_testlib/storage/controllers/cluster_state_controller.py
@@ -1,7 +1,5 @@
import time
-import allure
-
import frostfs_testlib.resources.optionals as optionals
from frostfs_testlib.reporter import get_reporter
from frostfs_testlib.shell import CommandOptions, Shell
@@ -30,15 +28,29 @@ class ClusterStateController:
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step_deco("Stop host of node {node}")
def stop_node_host(self, node: ClusterNode, mode: str):
- with allure.step(f"Stop host {node.host.config.address}"):
+ with reporter.step(f"Stop host {node.host.config.address}"):
node.host.stop_host(mode=mode)
wait_for_host_offline(self.shell, node.storage_node)
self.stopped_nodes.append(node)
+ @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
+ @reporter.step_deco("Shutdown whole cluster")
+ def shutdown_cluster(self, mode: str, reversed_order: bool = False):
+ nodes = (
+ reversed(self.cluster.cluster_nodes) if reversed_order else self.cluster.cluster_nodes
+ )
+ for node in nodes:
+ with reporter.step(f"Stop host {node.host.config.address}"):
+ self.stopped_nodes.append(node)
+ node.host.stop_host(mode=mode)
+
+ for node in nodes:
+ wait_for_host_offline(self.shell, node.storage_node)
+
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step_deco("Start host of node {node}")
def start_node_host(self, node: ClusterNode):
- with allure.step(f"Start host {node.host.config.address}"):
+ with reporter.step(f"Start host {node.host.config.address}"):
node.host.start_host()
wait_for_host_online(self.shell, node.storage_node)
wait_for_node_online(node.storage_node)
@@ -46,9 +58,11 @@ class ClusterStateController:
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step_deco("Start stopped hosts")
- def start_stopped_hosts(self):
- for node in self.stopped_nodes:
- node.host.start_host()
+ def start_stopped_hosts(self, reversed_order: bool = False):
+ nodes = reversed(self.stopped_nodes) if reversed_order else self.stopped_nodes
+ for node in nodes:
+ with reporter.step(f"Start host {node.host.config.address}"):
+ node.host.start_host()
self.stopped_nodes = []
wait_all_storage_nodes_returned(self.shell, self.cluster)