forked from TrueCloudLab/frostfs-testlib
184 lines
6.8 KiB
Python
184 lines
6.8 KiB
Python
import copy
|
|
import itertools
|
|
import math
|
|
import re
|
|
from dataclasses import fields
|
|
|
|
from frostfs_testlib.cli import FrostfsAuthmate
|
|
from frostfs_testlib.load.k6 import K6
|
|
from frostfs_testlib.load.load_config import K6ProcessAllocationStrategy, LoadParams
|
|
from frostfs_testlib.reporter import get_reporter
|
|
from frostfs_testlib.resources.cli import FROSTFS_AUTHMATE_EXEC
|
|
from frostfs_testlib.resources.load_params import BACKGROUND_LOAD_VUS_COUNT_DIVISOR
|
|
from frostfs_testlib.shell import CommandOptions, SSHShell
|
|
from frostfs_testlib.shell.interfaces import InteractiveInput, SshCredentials
|
|
from frostfs_testlib.storage.cluster import ClusterNode
|
|
from frostfs_testlib.storage.dataclasses.frostfs_services import S3Gate, StorageNode
|
|
from frostfs_testlib.storage.dataclasses.wallet import WalletInfo
|
|
|
|
reporter = get_reporter()
|
|
|
|
STOPPED_HOSTS = []
|
|
|
|
|
|
@reporter.step_deco("Init s3 client on load nodes")
|
|
def init_s3_client(
|
|
load_nodes: list[str],
|
|
load_params: LoadParams,
|
|
k6_directory: str,
|
|
ssh_credentials: SshCredentials,
|
|
nodes_under_load: list[ClusterNode],
|
|
wallet: WalletInfo,
|
|
):
|
|
storage_node = nodes_under_load[0].service(StorageNode)
|
|
s3_public_keys = [node.service(S3Gate).get_wallet_public_key() for node in nodes_under_load]
|
|
grpc_peer = storage_node.get_rpc_endpoint()
|
|
|
|
for load_node in load_nodes:
|
|
ssh_client = _get_ssh_client(ssh_credentials, load_node)
|
|
frostfs_authmate_exec: FrostfsAuthmate = FrostfsAuthmate(ssh_client, FROSTFS_AUTHMATE_EXEC)
|
|
issue_secret_output = frostfs_authmate_exec.secret.issue(
|
|
wallet=wallet.path,
|
|
peer=grpc_peer,
|
|
bearer_rules=f"{k6_directory}/scenarios/files/rules.json",
|
|
gate_public_key=s3_public_keys,
|
|
container_placement_policy=load_params.preset.container_placement_policy,
|
|
container_policy=f"{k6_directory}/scenarios/files/policy.json",
|
|
wallet_password=wallet.password,
|
|
).stdout
|
|
aws_access_key_id = str(
|
|
re.search(r"access_key_id.*:\s.(?P<aws_access_key_id>\w*)", issue_secret_output).group(
|
|
"aws_access_key_id"
|
|
)
|
|
)
|
|
aws_secret_access_key = str(
|
|
re.search(
|
|
r"secret_access_key.*:\s.(?P<aws_secret_access_key>\w*)", issue_secret_output
|
|
).group("aws_secret_access_key")
|
|
)
|
|
# prompt_pattern doesn't work at the moment
|
|
configure_input = [
|
|
InteractiveInput(prompt_pattern=r"AWS Access Key ID.*", input=aws_access_key_id),
|
|
InteractiveInput(
|
|
prompt_pattern=r"AWS Secret Access Key.*", input=aws_secret_access_key
|
|
),
|
|
InteractiveInput(prompt_pattern=r".*", input=""),
|
|
InteractiveInput(prompt_pattern=r".*", input=""),
|
|
]
|
|
ssh_client.exec("aws configure", CommandOptions(interactive_inputs=configure_input))
|
|
|
|
|
|
@reporter.step_deco("Prepare K6 instances and objects")
|
|
def prepare_k6_instances(
|
|
load_nodes: list[str],
|
|
ssh_credentials: SshCredentials,
|
|
k6_dir: str,
|
|
load_params: LoadParams,
|
|
endpoints: list[str],
|
|
loaders_wallet: WalletInfo,
|
|
) -> list[K6]:
|
|
k6_load_objects: list[K6] = []
|
|
nodes = itertools.cycle(load_nodes)
|
|
|
|
k6_distribution_count = {
|
|
K6ProcessAllocationStrategy.PER_LOAD_NODE: len(load_nodes),
|
|
K6ProcessAllocationStrategy.PER_ENDPOINT: len(endpoints),
|
|
}
|
|
endpoints_generators = {
|
|
K6ProcessAllocationStrategy.PER_LOAD_NODE: itertools.cycle([endpoints]),
|
|
K6ProcessAllocationStrategy.PER_ENDPOINT: itertools.cycle(
|
|
[[endpoint] for endpoint in endpoints]
|
|
),
|
|
}
|
|
k6_processes_count = k6_distribution_count[load_params.k6_process_allocation_strategy]
|
|
endpoints_gen = endpoints_generators[load_params.k6_process_allocation_strategy]
|
|
|
|
distributed_load_params_list = _get_distributed_load_params_list(
|
|
load_params, k6_processes_count
|
|
)
|
|
|
|
for distributed_load_params in distributed_load_params_list:
|
|
load_node = next(nodes)
|
|
ssh_client = _get_ssh_client(ssh_credentials, load_node)
|
|
k6_load_object = K6(
|
|
distributed_load_params,
|
|
next(endpoints_gen),
|
|
k6_dir,
|
|
ssh_client,
|
|
load_node,
|
|
loaders_wallet,
|
|
)
|
|
k6_load_objects.append(k6_load_object)
|
|
if load_params.preset:
|
|
k6_load_object.preset()
|
|
|
|
return k6_load_objects
|
|
|
|
|
|
def _get_ssh_client(ssh_credentials: SshCredentials, load_node: str):
|
|
ssh_client = SSHShell(
|
|
host=load_node,
|
|
login=ssh_credentials.ssh_login,
|
|
password=ssh_credentials.ssh_password,
|
|
private_key_path=ssh_credentials.ssh_key_path,
|
|
private_key_passphrase=ssh_credentials.ssh_key_passphrase,
|
|
)
|
|
|
|
return ssh_client
|
|
|
|
|
|
def _get_distributed_load_params_list(
|
|
original_load_params: LoadParams, workers_count: int
|
|
) -> list[LoadParams]:
|
|
divisor = int(BACKGROUND_LOAD_VUS_COUNT_DIVISOR)
|
|
distributed_load_params: list[LoadParams] = []
|
|
|
|
for i in range(workers_count):
|
|
load_params = copy.deepcopy(original_load_params)
|
|
# Append #i here in case if multiple k6 processes goes into same load node
|
|
load_params.set_id(f"{load_params.load_id}_{i}")
|
|
distributed_load_params.append(load_params)
|
|
|
|
load_fields = fields(original_load_params)
|
|
|
|
for field in load_fields:
|
|
if (
|
|
field.metadata
|
|
and original_load_params.scenario in field.metadata["applicable_scenarios"]
|
|
and field.metadata["distributed"]
|
|
and getattr(original_load_params, field.name) is not None
|
|
):
|
|
original_value = getattr(original_load_params, field.name)
|
|
distribution = _get_distribution(math.ceil(original_value / divisor), workers_count)
|
|
for i in range(workers_count):
|
|
setattr(distributed_load_params[i], field.name, distribution[i])
|
|
|
|
return distributed_load_params
|
|
|
|
|
|
def _get_distribution(clients_count: int, workers_count: int) -> list[int]:
|
|
"""
|
|
This function will distribute evenly as possible X clients to Y workers.
|
|
For example if we have 150 readers (clients) and we want to spread it over 4 load nodes (workers)
|
|
this will return [38, 38, 37, 37].
|
|
|
|
Args:
|
|
clients_count: amount of things needs to be distributed.
|
|
workers_count: amount of workers.
|
|
|
|
Returns:
|
|
list of distribution.
|
|
"""
|
|
if workers_count < 1:
|
|
raise Exception("Workers cannot be less then 1")
|
|
|
|
# Amount of guaranteed payload on one worker
|
|
clients_per_worker = clients_count // workers_count
|
|
# Remainder of clients left to be distributed
|
|
remainder = clients_count - clients_per_worker * workers_count
|
|
|
|
distribution = [
|
|
clients_per_worker + 1 if i < remainder else clients_per_worker
|
|
for i in range(workers_count)
|
|
]
|
|
return distribution
|