forked from TrueCloudLab/frostfs-testlib
276 lines
11 KiB
Python
276 lines
11 KiB
Python
import copy
|
|
import time
|
|
|
|
import frostfs_testlib.resources.optionals as optionals
|
|
from frostfs_testlib.load.k6 import K6
|
|
from frostfs_testlib.load.load_config import (
|
|
EndpointSelectionStrategy,
|
|
K6ProcessAllocationStrategy,
|
|
LoadParams,
|
|
LoadScenario,
|
|
LoadType,
|
|
)
|
|
from frostfs_testlib.load.load_report import LoadReport
|
|
from frostfs_testlib.load.load_steps import init_s3_client, prepare_k6_instances
|
|
from frostfs_testlib.load.load_verifiers import LoadVerifier
|
|
from frostfs_testlib.reporter import get_reporter
|
|
from frostfs_testlib.resources.load_params import (
|
|
K6_TEARDOWN_PERIOD,
|
|
LOAD_NODE_SSH_PASSWORD,
|
|
LOAD_NODE_SSH_PRIVATE_KEY_PASSPHRASE,
|
|
LOAD_NODE_SSH_PRIVATE_KEY_PATH,
|
|
LOAD_NODE_SSH_USER,
|
|
LOAD_NODES,
|
|
)
|
|
from frostfs_testlib.shell.interfaces import SshCredentials
|
|
from frostfs_testlib.storage.cluster import ClusterNode
|
|
from frostfs_testlib.storage.dataclasses.frostfs_services import S3Gate, StorageNode
|
|
from frostfs_testlib.storage.dataclasses.wallet import WalletInfo
|
|
from frostfs_testlib.testing.test_control import run_optionally
|
|
from frostfs_testlib.utils import datetime_utils
|
|
|
|
reporter = get_reporter()
|
|
|
|
|
|
class BackgroundLoadController:
|
|
k6_instances: list[K6]
|
|
k6_dir: str
|
|
load_params: LoadParams
|
|
original_load_params: LoadParams
|
|
load_nodes: list[str]
|
|
verification_params: LoadParams
|
|
nodes_under_load: list[ClusterNode]
|
|
load_counter: int
|
|
ssh_credentials: SshCredentials
|
|
loaders_wallet: WalletInfo
|
|
load_summaries: dict
|
|
endpoints: list[str]
|
|
|
|
def __init__(
|
|
self,
|
|
k6_dir: str,
|
|
load_params: LoadParams,
|
|
loaders_wallet: WalletInfo,
|
|
nodes_under_load: list[ClusterNode],
|
|
) -> None:
|
|
self.k6_dir = k6_dir
|
|
self.original_load_params = load_params
|
|
self.load_params = copy.deepcopy(self.original_load_params)
|
|
self.nodes_under_load = nodes_under_load
|
|
self.load_counter = 1
|
|
self.load_nodes = LOAD_NODES
|
|
self.loaders_wallet = loaders_wallet
|
|
|
|
if load_params.endpoint_selection_strategy is None:
|
|
raise RuntimeError("endpoint_selection_strategy should not be None")
|
|
|
|
self.endpoints = self._get_endpoints(
|
|
load_params.load_type, load_params.endpoint_selection_strategy
|
|
)
|
|
|
|
self.ssh_credentials = SshCredentials(
|
|
LOAD_NODE_SSH_USER,
|
|
LOAD_NODE_SSH_PASSWORD,
|
|
LOAD_NODE_SSH_PRIVATE_KEY_PATH,
|
|
LOAD_NODE_SSH_PRIVATE_KEY_PASSPHRASE,
|
|
)
|
|
|
|
@run_optionally(optionals.OPTIONAL_BACKGROUND_LOAD_ENABLED, [])
|
|
def _get_endpoints(
|
|
self, load_type: LoadType, endpoint_selection_strategy: EndpointSelectionStrategy
|
|
):
|
|
all_endpoints = {
|
|
LoadType.gRPC: {
|
|
EndpointSelectionStrategy.ALL: list(
|
|
set(
|
|
endpoint
|
|
for node_under_load in self.nodes_under_load
|
|
for endpoint in node_under_load.service(StorageNode).get_all_rpc_endpoint()
|
|
)
|
|
),
|
|
EndpointSelectionStrategy.FIRST: list(
|
|
set(
|
|
node_under_load.service(StorageNode).get_rpc_endpoint()
|
|
for node_under_load in self.nodes_under_load
|
|
)
|
|
),
|
|
},
|
|
# for some reason xk6 appends http protocol on its own
|
|
LoadType.S3: {
|
|
EndpointSelectionStrategy.ALL: list(
|
|
set(
|
|
endpoint.replace("http://", "")
|
|
for node_under_load in self.nodes_under_load
|
|
for endpoint in node_under_load.service(S3Gate).get_all_endpoints()
|
|
)
|
|
),
|
|
EndpointSelectionStrategy.FIRST: list(
|
|
set(
|
|
node_under_load.service(S3Gate).get_endpoint().replace("http://", "")
|
|
for node_under_load in self.nodes_under_load
|
|
)
|
|
),
|
|
},
|
|
}
|
|
|
|
return all_endpoints[load_type][endpoint_selection_strategy]
|
|
|
|
@run_optionally(optionals.OPTIONAL_BACKGROUND_LOAD_ENABLED)
|
|
@reporter.step_deco("Prepare background load instances")
|
|
def prepare(self):
|
|
if self.load_params.load_type == LoadType.S3:
|
|
init_s3_client(
|
|
self.load_nodes,
|
|
self.load_params,
|
|
self.k6_dir,
|
|
self.ssh_credentials,
|
|
self.nodes_under_load,
|
|
self.loaders_wallet,
|
|
)
|
|
|
|
self._prepare(self.load_params)
|
|
|
|
def _prepare(self, load_params: LoadParams):
|
|
self.k6_instances = prepare_k6_instances(
|
|
load_nodes=LOAD_NODES,
|
|
ssh_credentials=self.ssh_credentials,
|
|
k6_dir=self.k6_dir,
|
|
load_params=load_params,
|
|
endpoints=self.endpoints,
|
|
loaders_wallet=self.loaders_wallet,
|
|
)
|
|
|
|
@run_optionally(optionals.OPTIONAL_BACKGROUND_LOAD_ENABLED)
|
|
@reporter.step_deco("Start background load")
|
|
def start(self):
|
|
if self.load_params.preset is None:
|
|
raise RuntimeError("Preset should not be none at the moment of start")
|
|
|
|
with reporter.step(
|
|
f"Start background load on nodes {self.nodes_under_load}: "
|
|
f"writers = {self.load_params.writers}, "
|
|
f"obj_size = {self.load_params.object_size}, "
|
|
f"load_time = {self.load_params.load_time}, "
|
|
f"prepare_json = {self.load_params.preset.pregen_json}, "
|
|
f"endpoints = {self.endpoints}"
|
|
):
|
|
for k6_load_instance in self.k6_instances:
|
|
k6_load_instance.start()
|
|
|
|
wait_after_start_time = datetime_utils.parse_time(self.load_params.setup_timeout) + 5
|
|
with reporter.step(
|
|
f"Wait for start timeout + couple more seconds ({wait_after_start_time}) before moving on"
|
|
):
|
|
time.sleep(wait_after_start_time)
|
|
|
|
@run_optionally(optionals.OPTIONAL_BACKGROUND_LOAD_ENABLED)
|
|
@reporter.step_deco("Stop background load")
|
|
def stop(self):
|
|
for k6_load_instance in self.k6_instances:
|
|
k6_load_instance.stop()
|
|
|
|
@run_optionally(optionals.OPTIONAL_BACKGROUND_LOAD_ENABLED, True)
|
|
def is_running(self):
|
|
for k6_load_instance in self.k6_instances:
|
|
if not k6_load_instance.is_running:
|
|
return False
|
|
|
|
return True
|
|
|
|
@run_optionally(optionals.OPTIONAL_BACKGROUND_LOAD_ENABLED)
|
|
@reporter.step_deco("Reset background load")
|
|
def _reset_for_consequent_load(self):
|
|
"""This method is required if we want to run multiple loads during test run.
|
|
Raise load counter by 1 and append it to load_id
|
|
"""
|
|
self.load_counter += 1
|
|
self.load_params = copy.deepcopy(self.original_load_params)
|
|
self.load_params.set_id(f"{self.load_params.load_id}_{self.load_counter}")
|
|
|
|
@run_optionally(optionals.OPTIONAL_BACKGROUND_LOAD_ENABLED)
|
|
@reporter.step_deco("Startup background load")
|
|
def startup(self):
|
|
self.prepare()
|
|
self.start()
|
|
|
|
@run_optionally(optionals.OPTIONAL_BACKGROUND_LOAD_ENABLED)
|
|
@reporter.step_deco("Stop and get results of background load")
|
|
def teardown(self, load_report: LoadReport = None):
|
|
if not self.k6_instances:
|
|
return
|
|
|
|
self.stop()
|
|
self.load_summaries = self.get_results()
|
|
self.k6_instances = []
|
|
if load_report:
|
|
load_report.add_summaries(self.load_summaries)
|
|
|
|
@run_optionally(optionals.OPTIONAL_BACKGROUND_LOAD_ENABLED)
|
|
@reporter.step_deco("Verify results of background load")
|
|
def verify(self):
|
|
try:
|
|
if self.load_params.verify:
|
|
self.verification_params = LoadParams(
|
|
verify_clients=self.load_params.verify_clients,
|
|
scenario=LoadScenario.VERIFY,
|
|
registry_file=self.load_params.registry_file,
|
|
verify_time=self.load_params.verify_time,
|
|
load_type=self.load_params.load_type,
|
|
load_id=self.load_params.load_id,
|
|
working_dir=self.load_params.working_dir,
|
|
endpoint_selection_strategy=self.load_params.endpoint_selection_strategy,
|
|
k6_process_allocation_strategy=self.load_params.k6_process_allocation_strategy,
|
|
)
|
|
self._run_verify_scenario()
|
|
verification_summaries = self.get_results()
|
|
self.verify_summaries(self.load_summaries, verification_summaries)
|
|
finally:
|
|
self._reset_for_consequent_load()
|
|
|
|
@run_optionally(optionals.OPTIONAL_BACKGROUND_LOAD_ENABLED)
|
|
@reporter.step_deco("Verify summaries from k6")
|
|
def verify_summaries(self, load_summaries: dict, verification_summaries: dict):
|
|
verifier = LoadVerifier(self.load_params)
|
|
for node_or_endpoint in load_summaries:
|
|
with reporter.step(f"Verify load summaries for {node_or_endpoint}"):
|
|
verifier.verify_summaries(
|
|
load_summaries[node_or_endpoint], verification_summaries[node_or_endpoint]
|
|
)
|
|
|
|
@run_optionally(optionals.OPTIONAL_BACKGROUND_LOAD_ENABLED)
|
|
def wait_until_finish(self):
|
|
if self.load_params.load_time is None:
|
|
raise RuntimeError("LoadTime should not be none")
|
|
|
|
for k6_instance in self.k6_instances:
|
|
k6_instance.wait_until_finished(self.load_params.load_time + int(K6_TEARDOWN_PERIOD))
|
|
|
|
@run_optionally(optionals.OPTIONAL_BACKGROUND_LOAD_ENABLED)
|
|
@reporter.step_deco("Run verify scenario for background load")
|
|
def _run_verify_scenario(self):
|
|
if self.verification_params.verify_time is None:
|
|
raise RuntimeError("verify_time should not be none")
|
|
|
|
self._prepare(self.verification_params)
|
|
with reporter.step("Run verify background load data"):
|
|
for k6_verify_instance in self.k6_instances:
|
|
k6_verify_instance.start()
|
|
k6_verify_instance.wait_until_finished(self.verification_params.verify_time)
|
|
|
|
@run_optionally(optionals.OPTIONAL_BACKGROUND_LOAD_ENABLED)
|
|
@reporter.step_deco("K6 run results")
|
|
def get_results(self) -> dict:
|
|
results = {}
|
|
for k6_instance in self.k6_instances:
|
|
if k6_instance.load_params.k6_process_allocation_strategy is None:
|
|
raise RuntimeError("k6_process_allocation_strategy should not be none")
|
|
|
|
result = k6_instance.get_results()
|
|
keys_map = {
|
|
K6ProcessAllocationStrategy.PER_LOAD_NODE: k6_instance.load_node,
|
|
K6ProcessAllocationStrategy.PER_ENDPOINT: k6_instance.endpoints[0],
|
|
}
|
|
key = keys_map[k6_instance.load_params.k6_process_allocation_strategy]
|
|
results[key] = result
|
|
|
|
return results
|