frostfs-testlib/src/frostfs_testlib/controllers/background_load_controller.py
Andrey Berezin 997e768e92 Move shared code to testlib
Signed-off-by: Andrey Berezin <a.berezin@yadro.com>
2023-05-14 13:43:59 +03:00

207 lines
7.8 KiB
Python

import frostfs_testlib.resources.optionals as optionals
from frostfs_testlib.load.k6 import K6
from frostfs_testlib.load.load_config import (
EndpointSelectionStrategy,
K6ProcessAllocationStrategy,
LoadParams,
LoadScenario,
LoadType,
)
from frostfs_testlib.load.load_steps import init_s3_client, prepare_k6_instances
from frostfs_testlib.reporter import get_reporter
from frostfs_testlib.resources.load_params import (
K6_TEARDOWN_PERIOD,
LOAD_NODE_SSH_PASSWORD,
LOAD_NODE_SSH_PRIVATE_KEY_PASSPHRASE,
LOAD_NODE_SSH_PRIVATE_KEY_PATH,
LOAD_NODE_SSH_USER,
LOAD_NODES,
)
from frostfs_testlib.shell.interfaces import SshCredentials
from frostfs_testlib.storage.cluster import ClusterNode
from frostfs_testlib.storage.cluster.frostfs_services import S3Gate, StorageNode
from frostfs_testlib.storage.dataclasses.wallet import WalletInfo
from frostfs_testlib.testing.test_control import run_optionally
reporter = get_reporter()
class BackgroundLoadController:
k6_instances: list[K6]
k6_dir: str
load_params: LoadParams
load_nodes: list[str]
verification_params: LoadParams
nodes_under_load: list[ClusterNode]
ssh_credentials: SshCredentials
loaders_wallet: WalletInfo
endpoints: list[str]
def __init__(
self,
k6_dir: str,
load_params: LoadParams,
loaders_wallet: WalletInfo,
nodes_under_load: list[ClusterNode],
) -> None:
self.k6_dir = k6_dir
self.load_params = load_params
self.nodes_under_load = nodes_under_load
self.load_nodes = LOAD_NODES
self.loaders_wallet = loaders_wallet
if load_params.endpoint_selection_strategy is None:
raise RuntimeError("endpoint_selection_strategy should not be None")
self.endpoints = self._get_endpoints(
load_params.load_type, load_params.endpoint_selection_strategy
)
self.verification_params = LoadParams(
clients=load_params.readers,
scenario=LoadScenario.VERIFY,
registry_file=load_params.registry_file,
verify_time=load_params.verify_time,
load_type=load_params.load_type,
load_id=load_params.load_id,
working_dir=load_params.working_dir,
endpoint_selection_strategy=load_params.endpoint_selection_strategy,
k6_process_allocation_strategy=load_params.k6_process_allocation_strategy,
)
self.ssh_credentials = SshCredentials(
LOAD_NODE_SSH_USER,
LOAD_NODE_SSH_PASSWORD,
LOAD_NODE_SSH_PRIVATE_KEY_PATH,
LOAD_NODE_SSH_PRIVATE_KEY_PASSPHRASE,
)
@run_optionally(optionals.OPTIONAL_BACKGROUND_LOAD_ENABLED, [])
def _get_endpoints(
self, load_type: LoadType, endpoint_selection_strategy: EndpointSelectionStrategy
):
all_endpoints = {
LoadType.gRPC: {
EndpointSelectionStrategy.ALL: list(
set(
endpoint
for node_under_load in self.nodes_under_load
for endpoint in node_under_load.service(StorageNode).get_all_rpc_endpoint()
)
),
EndpointSelectionStrategy.FIRST: list(
set(
node_under_load.service(StorageNode).get_rpc_endpoint()
for node_under_load in self.nodes_under_load
)
),
},
# for some reason xk6 appends http protocol on its own
LoadType.S3: {
EndpointSelectionStrategy.ALL: list(
set(
endpoint.replace("http://", "")
for node_under_load in self.nodes_under_load
for endpoint in node_under_load.service(S3Gate).get_all_endpoints()
)
),
EndpointSelectionStrategy.FIRST: list(
set(
node_under_load.service(S3Gate).get_endpoint().replace("http://", "")
for node_under_load in self.nodes_under_load
)
),
},
}
return all_endpoints[load_type][endpoint_selection_strategy]
@run_optionally(optionals.OPTIONAL_BACKGROUND_LOAD_ENABLED)
@reporter.step_deco("Prepare background load instances")
def prepare(self):
if self.load_params.load_type == LoadType.S3:
init_s3_client(
self.load_nodes,
self.load_params,
self.k6_dir,
self.ssh_credentials,
self.nodes_under_load,
self.loaders_wallet,
)
self._prepare(self.load_params)
def _prepare(self, load_params: LoadParams):
self.k6_instances = prepare_k6_instances(
load_nodes=LOAD_NODES,
ssh_credentials=self.ssh_credentials,
k6_dir=self.k6_dir,
load_params=load_params,
endpoints=self.endpoints,
loaders_wallet=self.loaders_wallet,
)
@run_optionally(optionals.OPTIONAL_BACKGROUND_LOAD_ENABLED)
@reporter.step_deco("Start background load")
def start(self):
if self.load_params.preset is None:
raise RuntimeError("Preset should not be none at the moment of start")
with reporter.step(
f"Start background load on nodes {self.nodes_under_load}: "
f"writers = {self.load_params.writers}, "
f"obj_size = {self.load_params.object_size}, "
f"load_time = {self.load_params.load_time}, "
f"prepare_json = {self.load_params.preset.pregen_json}, "
f"endpoints = {self.endpoints}"
):
for k6_load_instance in self.k6_instances:
k6_load_instance.start()
@run_optionally(optionals.OPTIONAL_BACKGROUND_LOAD_ENABLED)
@reporter.step_deco("Stop background load")
def stop(self):
for k6_load_instance in self.k6_instances:
k6_load_instance.stop()
@run_optionally(optionals.OPTIONAL_BACKGROUND_LOAD_ENABLED, True)
def is_running(self):
for k6_load_instance in self.k6_instances:
if not k6_load_instance.is_running:
return False
return True
def wait_until_finish(self):
if self.load_params.load_time is None:
raise RuntimeError("LoadTime should not be none")
for k6_instance in self.k6_instances:
k6_instance.wait_until_finished(self.load_params.load_time + int(K6_TEARDOWN_PERIOD))
@run_optionally(optionals.OPTIONAL_BACKGROUND_LOAD_ENABLED)
def verify(self):
if self.verification_params.verify_time is None:
raise RuntimeError("verify_time should not be none")
self._prepare(self.verification_params)
with reporter.step("Run verify background load data"):
for k6_verify_instance in self.k6_instances:
k6_verify_instance.start()
k6_verify_instance.wait_until_finished(self.verification_params.verify_time)
@run_optionally(optionals.OPTIONAL_BACKGROUND_LOAD_ENABLED)
@reporter.step_deco("K6 run results")
def get_results(self) -> dict:
results = {}
for k6_instance in self.k6_instances:
if k6_instance.load_params.k6_process_allocation_strategy is None:
raise RuntimeError("k6_process_allocation_strategy should not be none")
result = k6_instance.get_results()
keys_map = {
K6ProcessAllocationStrategy.PER_LOAD_NODE: k6_instance.load_node,
K6ProcessAllocationStrategy.PER_ENDPOINT: k6_instance.endpoints[0],
}
key = keys_map[k6_instance.load_params.k6_process_allocation_strategy]
results[key] = result
return results