Add loader and sceanrio runner interfaces, add support for local scenario

Signed-off-by: Andrey Berezin <a.berezin@yadro.com>
This commit is contained in:
Andrey Berezin 2023-06-26 16:45:34 +03:00 committed by Andrey Berezin
parent 13ea25bff5
commit 182bd6ab36
19 changed files with 786 additions and 384 deletions

View file

@ -0,0 +1,398 @@
import copy
import itertools
import math
import re
import time
from concurrent.futures import ThreadPoolExecutor
from dataclasses import fields
from typing import Optional
import yaml
from frostfs_testlib.cli.frostfs_authmate.authmate import FrostfsAuthmate
from frostfs_testlib.load.interfaces import Loader, ScenarioRunner
from frostfs_testlib.load.k6 import K6
from frostfs_testlib.load.load_config import K6ProcessAllocationStrategy, LoadParams, LoadType
from frostfs_testlib.load.loaders import NodeLoader, RemoteLoader
from frostfs_testlib.reporter import get_reporter
from frostfs_testlib.resources import optionals
from frostfs_testlib.resources.cli import FROSTFS_AUTHMATE_EXEC
from frostfs_testlib.resources.common import STORAGE_USER_NAME
from frostfs_testlib.resources.load_params import (
BACKGROUND_LOAD_VUS_COUNT_DIVISOR,
LOAD_NODE_SSH_USER,
LOAD_NODES,
)
from frostfs_testlib.shell.interfaces import CommandOptions, InteractiveInput
from frostfs_testlib.storage.cluster import ClusterNode
from frostfs_testlib.storage.controllers.cluster_state_controller import ClusterStateController
from frostfs_testlib.storage.dataclasses.frostfs_services import S3Gate, StorageNode
from frostfs_testlib.storage.dataclasses.wallet import WalletInfo
from frostfs_testlib.testing.test_control import run_optionally
from frostfs_testlib.utils import datetime_utils
from frostfs_testlib.utils.file_keeper import FileKeeper
reporter = get_reporter()
class DefaultRunner(ScenarioRunner):
k6_instances: list[K6]
loaders: list[Loader]
loaders_wallet: WalletInfo
def __init__(
self,
loaders_wallet: WalletInfo,
load_ip_list: Optional[list[str]] = None,
) -> None:
if load_ip_list is None:
load_ip_list = LOAD_NODES
self.loaders = RemoteLoader.from_ip_list(load_ip_list)
self.loaders_wallet = loaders_wallet
@run_optionally(optionals.OPTIONAL_BACKGROUND_LOAD_ENABLED)
@reporter.step_deco("Prepare load instances")
def prepare(
self,
load_params: LoadParams,
nodes_under_load: list[ClusterNode],
k6_dir: str,
):
if load_params.load_type != LoadType.S3:
return
with reporter.step("Init s3 client on loaders"):
storage_node = nodes_under_load[0].service(StorageNode)
s3_public_keys = [
node.service(S3Gate).get_wallet_public_key() for node in nodes_under_load
]
grpc_peer = storage_node.get_rpc_endpoint()
for loader in self.loaders:
with reporter.step(f"Init s3 client on {loader.ip}"):
shell = loader.get_shell()
frostfs_authmate_exec: FrostfsAuthmate = FrostfsAuthmate(
shell, FROSTFS_AUTHMATE_EXEC
)
issue_secret_output = frostfs_authmate_exec.secret.issue(
wallet=self.loaders_wallet.path,
peer=grpc_peer,
bearer_rules=f"{k6_dir}/scenarios/files/rules.json",
gate_public_key=s3_public_keys,
container_placement_policy=load_params.preset.container_placement_policy,
container_policy=f"{k6_dir}/scenarios/files/policy.json",
wallet_password=self.loaders_wallet.password,
).stdout
aws_access_key_id = str(
re.search(
r"access_key_id.*:\s.(?P<aws_access_key_id>\w*)", issue_secret_output
).group("aws_access_key_id")
)
aws_secret_access_key = str(
re.search(
r"secret_access_key.*:\s.(?P<aws_secret_access_key>\w*)",
issue_secret_output,
).group("aws_secret_access_key")
)
configure_input = [
InteractiveInput(
prompt_pattern=r"AWS Access Key ID.*", input=aws_access_key_id
),
InteractiveInput(
prompt_pattern=r"AWS Secret Access Key.*", input=aws_secret_access_key
),
InteractiveInput(prompt_pattern=r".*", input=""),
InteractiveInput(prompt_pattern=r".*", input=""),
]
shell.exec("aws configure", CommandOptions(interactive_inputs=configure_input))
def wait_until_finish(self):
for k6_instance in self.k6_instances:
k6_instance.wait_until_finished()
def init_k6_instances(self, load_params: LoadParams, endpoints: list[str], k6_dir: str):
self.k6_instances = []
cycled_loaders = itertools.cycle(self.loaders)
k6_distribution_count = {
K6ProcessAllocationStrategy.PER_LOAD_NODE: len(self.loaders),
K6ProcessAllocationStrategy.PER_ENDPOINT: len(endpoints),
}
endpoints_generators = {
K6ProcessAllocationStrategy.PER_LOAD_NODE: itertools.cycle([endpoints]),
K6ProcessAllocationStrategy.PER_ENDPOINT: itertools.cycle(
[[endpoint] for endpoint in endpoints]
),
}
k6_processes_count = k6_distribution_count[load_params.k6_process_allocation_strategy]
endpoints_gen = endpoints_generators[load_params.k6_process_allocation_strategy]
distributed_load_params_list = self._get_distributed_load_params_list(
load_params, k6_processes_count
)
for distributed_load_params in distributed_load_params_list:
loader = next(cycled_loaders)
shell = loader.get_shell()
with reporter.step(
f"Init K6 instances on {loader.ip} for load id {distributed_load_params.load_id}"
):
with reporter.step(f"Make working directory"):
shell.exec(f"sudo mkdir -p {distributed_load_params.working_dir}")
shell.exec(
f"sudo chown {LOAD_NODE_SSH_USER} {distributed_load_params.working_dir}"
)
k6_instance = K6(
distributed_load_params,
next(endpoints_gen),
k6_dir,
shell,
loader,
self.loaders_wallet,
)
self.k6_instances.append(k6_instance)
if load_params.preset:
k6_instance.preset()
def _get_distributed_load_params_list(
self, original_load_params: LoadParams, workers_count: int
) -> list[LoadParams]:
divisor = int(BACKGROUND_LOAD_VUS_COUNT_DIVISOR)
distributed_load_params: list[LoadParams] = []
for i in range(workers_count):
load_params = copy.deepcopy(original_load_params)
# Append #i here in case if multiple k6 processes goes into same load node
load_params.set_id(f"{load_params.load_id}_{i}")
distributed_load_params.append(load_params)
load_fields = fields(original_load_params)
for field in load_fields:
if (
field.metadata
and original_load_params.scenario in field.metadata["applicable_scenarios"]
and field.metadata["distributed"]
and getattr(original_load_params, field.name) is not None
):
original_value = getattr(original_load_params, field.name)
distribution = self._get_distribution(
math.ceil(original_value / divisor), workers_count
)
for i in range(workers_count):
setattr(distributed_load_params[i], field.name, distribution[i])
return distributed_load_params
def _get_distribution(self, clients_count: int, workers_count: int) -> list[int]:
"""
This function will distribute evenly as possible X clients to Y workers.
For example if we have 150 readers (clients) and we want to spread it over 4 load nodes (workers)
this will return [38, 38, 37, 37].
Args:
clients_count: amount of things needs to be distributed.
workers_count: amount of workers.
Returns:
list of distribution.
"""
if workers_count < 1:
raise Exception("Workers cannot be less then 1")
# Amount of guaranteed payload on one worker
clients_per_worker = clients_count // workers_count
# Remainder of clients left to be distributed
remainder = clients_count - clients_per_worker * workers_count
distribution = [
clients_per_worker + 1 if i < remainder else clients_per_worker
for i in range(workers_count)
]
return distribution
def start(self):
load_params = self.k6_instances[0].load_params
with ThreadPoolExecutor(max_workers=len(self.k6_instances)) as executor:
futures = [executor.submit(k6.start) for k6 in self.k6_instances]
# Check for exceptions
exceptions = [future.exception() for future in futures if future.exception()]
if exceptions:
raise RuntimeError(
f"The following exceptions occured during start of k6: {exceptions}"
)
wait_after_start_time = datetime_utils.parse_time(load_params.setup_timeout) + 5
with reporter.step(
f"Wait for start timeout + couple more seconds ({wait_after_start_time}) before moving on"
):
time.sleep(wait_after_start_time)
def stop(self):
for k6_instance in self.k6_instances:
k6_instance.stop()
def get_results(self) -> dict:
results = {}
for k6_instance in self.k6_instances:
if k6_instance.load_params.k6_process_allocation_strategy is None:
raise RuntimeError("k6_process_allocation_strategy should not be none")
result = k6_instance.get_results()
keys_map = {
K6ProcessAllocationStrategy.PER_LOAD_NODE: k6_instance.loader.ip,
K6ProcessAllocationStrategy.PER_ENDPOINT: k6_instance.endpoints[0],
}
key = keys_map[k6_instance.load_params.k6_process_allocation_strategy]
results[key] = result
return results
@property
def is_running(self):
for k6_instance in self.k6_instances:
if not k6_instance.is_running:
return False
return True
class LocalRunner(ScenarioRunner):
k6_instances: list[K6]
loaders: list[Loader]
cluster_state_controller: ClusterStateController
file_keeper: FileKeeper
wallet: WalletInfo
def __init__(
self,
cluster_state_controller: ClusterStateController,
file_keeper: FileKeeper,
nodes_under_load: list[ClusterNode],
) -> None:
self.cluster_state_controller = cluster_state_controller
self.file_keeper = file_keeper
self.loaders = [NodeLoader(node) for node in nodes_under_load]
@run_optionally(optionals.OPTIONAL_BACKGROUND_LOAD_ENABLED)
@reporter.step_deco("Prepare load instances")
def prepare(
self,
load_params: LoadParams,
nodes_under_load: list[ClusterNode],
k6_dir: str,
):
@reporter.step_deco("Prepare node {cluster_node}")
def prepare_node(cluster_node: ClusterNode):
shell = cluster_node.host.get_shell()
with reporter.step("Allow storage user to login into system"):
shell.exec(f"sudo chsh -s /bin/bash {STORAGE_USER_NAME}")
with reporter.step("Update limits.conf"):
limits_path = "/etc/security/limits.conf"
self.file_keeper.add(cluster_node.storage_node, limits_path)
content = f"{STORAGE_USER_NAME} hard nofile 65536\n{STORAGE_USER_NAME} soft nofile 65536\n"
shell.exec(f"echo '{content}' | sudo tee {limits_path}")
with reporter.step("Download K6"):
shell.exec(f"sudo rm -rf {k6_dir};sudo mkdir {k6_dir}")
shell.exec(f"sudo curl -so {k6_dir}/k6.tar.gz {load_params.k6_url}")
shell.exec(f"sudo tar xf {k6_dir}/k6.tar.gz -C {k6_dir}")
shell.exec(f"sudo chmod -R 777 {k6_dir}")
with reporter.step("Create empty_passwd"):
self.wallet = WalletInfo(
f"{k6_dir}/scenarios/files/wallet.json", "", "/tmp/empty_passwd.yml"
)
content = yaml.dump({"password": ""})
shell.exec(f'echo "{content}" | sudo tee {self.wallet.config_path}')
shell.exec(f"sudo chmod -R 777 {self.wallet.config_path}")
with ThreadPoolExecutor(max_workers=len(nodes_under_load)) as executor:
result = executor.map(prepare_node, nodes_under_load)
# Check for exceptions
for _ in result:
pass
def wait_until_finish(self):
for k6_instance in self.k6_instances:
k6_instance.wait_until_finished()
def init_k6_instances(self, load_params: LoadParams, endpoints: list[str], k6_dir: str):
self.k6_instances = []
for loader in self.loaders:
shell = loader.get_shell()
with reporter.step(f"Init K6 instances on {loader.ip}"):
with reporter.step(f"Make working directory"):
shell.exec(f"sudo mkdir -p {load_params.working_dir}")
# If we chmod /home/<user_name> folder we can no longer ssh to the node
# !! IMPORTANT !!
if (
load_params.working_dir
and not load_params.working_dir == f"/home/{LOAD_NODE_SSH_USER}"
and not load_params.working_dir == f"/home/{LOAD_NODE_SSH_USER}/"
):
shell.exec(f"sudo chmod -R 777 {load_params.working_dir}")
k6_instance = K6(
load_params,
["localhost:8080"],
k6_dir,
shell,
loader,
self.wallet,
)
self.k6_instances.append(k6_instance)
if load_params.preset:
k6_instance.preset()
def start(self):
load_params = self.k6_instances[0].load_params
self.cluster_state_controller.stop_all_s3_gates()
self.cluster_state_controller.stop_all_storage_services()
with ThreadPoolExecutor(max_workers=len(self.k6_instances)) as executor:
futures = [executor.submit(k6.start) for k6 in self.k6_instances]
# Check for exceptions
exceptions = [future.exception() for future in futures if future.exception()]
if exceptions:
raise RuntimeError(
f"The following exceptions occured during start of k6: {exceptions}"
)
wait_after_start_time = datetime_utils.parse_time(load_params.setup_timeout) + 5
with reporter.step(
f"Wait for start timeout + couple more seconds ({wait_after_start_time}) before moving on"
):
time.sleep(wait_after_start_time)
def stop(self):
for k6_instance in self.k6_instances:
k6_instance.stop()
self.cluster_state_controller.start_stopped_storage_services()
self.cluster_state_controller.start_stopped_s3_gates()
def get_results(self) -> dict:
results = {}
for k6_instance in self.k6_instances:
result = k6_instance.get_results()
results[k6_instance.loader.ip] = result
return results
@property
def is_running(self):
for k6_instance in self.k6_instances:
if not k6_instance.is_running:
return False
return True