forked from TrueCloudLab/frostfs-testlib
489 lines
20 KiB
Python
489 lines
20 KiB
Python
import math
|
|
import os
|
|
from dataclasses import dataclass, field, fields, is_dataclass
|
|
from enum import Enum
|
|
from types import MappingProxyType
|
|
from typing import Any, Callable, Optional, get_args
|
|
|
|
from frostfs_testlib.utils.converting_utils import calc_unit
|
|
|
|
|
|
def convert_time_to_seconds(time: int | str | None) -> int:
|
|
if time is None:
|
|
return None
|
|
if str(time).isdigit():
|
|
seconds = int(time)
|
|
else:
|
|
days, hours, minutes = 0, 0, 0
|
|
if "d" in time:
|
|
days, time = time.split("d")
|
|
if "h" in time:
|
|
hours, time = time.split("h")
|
|
if "min" in time:
|
|
minutes = time.replace("min", "")
|
|
seconds = int(days) * 86400 + int(hours) * 3600 + int(minutes) * 60
|
|
return seconds
|
|
|
|
|
|
def force_list(input: str | list[str]):
|
|
if input is None:
|
|
return None
|
|
|
|
if isinstance(input, list):
|
|
return list(map(str.strip, input))
|
|
|
|
return [input.strip()]
|
|
|
|
|
|
class LoadType(Enum):
|
|
gRPC = "grpc"
|
|
S3 = "s3"
|
|
HTTP = "http"
|
|
|
|
|
|
class LoadScenario(Enum):
|
|
gRPC = "grpc"
|
|
gRPC_CAR = "grpc_car"
|
|
S3 = "s3"
|
|
S3_CAR = "s3_car"
|
|
S3_MULTIPART = "s3_multipart"
|
|
S3_LOCAL = "s3local"
|
|
HTTP = "http"
|
|
VERIFY = "verify"
|
|
LOCAL = "local"
|
|
|
|
|
|
class ReadFrom(Enum):
|
|
REGISTRY = "registry"
|
|
PRESET = "preset"
|
|
MANUAL = "manual"
|
|
|
|
|
|
all_load_scenarios = [
|
|
LoadScenario.gRPC,
|
|
LoadScenario.S3,
|
|
LoadScenario.HTTP,
|
|
LoadScenario.S3_CAR,
|
|
LoadScenario.gRPC_CAR,
|
|
LoadScenario.LOCAL,
|
|
LoadScenario.S3_MULTIPART,
|
|
LoadScenario.S3_LOCAL,
|
|
]
|
|
all_scenarios = all_load_scenarios.copy() + [LoadScenario.VERIFY]
|
|
|
|
constant_vus_scenarios = [
|
|
LoadScenario.gRPC,
|
|
LoadScenario.S3,
|
|
LoadScenario.HTTP,
|
|
LoadScenario.LOCAL,
|
|
LoadScenario.S3_MULTIPART,
|
|
LoadScenario.S3_LOCAL,
|
|
]
|
|
constant_arrival_rate_scenarios = [LoadScenario.gRPC_CAR, LoadScenario.S3_CAR]
|
|
|
|
grpc_preset_scenarios = [
|
|
LoadScenario.gRPC,
|
|
LoadScenario.HTTP,
|
|
LoadScenario.gRPC_CAR,
|
|
LoadScenario.LOCAL,
|
|
]
|
|
s3_preset_scenarios = [LoadScenario.S3, LoadScenario.S3_CAR, LoadScenario.S3_MULTIPART, LoadScenario.S3_LOCAL]
|
|
|
|
|
|
@dataclass
|
|
class MetaField:
|
|
name: str
|
|
metadata: MappingProxyType
|
|
value: Any
|
|
|
|
|
|
def metadata_field(
|
|
applicable_scenarios: list[LoadScenario],
|
|
preset_param: Optional[str] = None,
|
|
scenario_variable: Optional[str] = None,
|
|
string_repr: Optional[bool] = True,
|
|
distributed: Optional[bool] = False,
|
|
formatter: Optional[Callable] = None,
|
|
env_variable: Optional[str] = None,
|
|
):
|
|
return field(
|
|
default=None,
|
|
metadata={
|
|
"applicable_scenarios": applicable_scenarios,
|
|
"preset_argument": preset_param,
|
|
"scenario_variable": scenario_variable,
|
|
"string_repr": string_repr,
|
|
"distributed": distributed,
|
|
"formatter": formatter,
|
|
"env_variable": env_variable,
|
|
},
|
|
)
|
|
|
|
|
|
class NodesSelectionStrategy(Enum):
|
|
# Select ONE random node from cluster nodes.
|
|
RANDOM_SINGLE = "RANDOM_SINGLE"
|
|
# Select All nodes.
|
|
ALL = "ALL"
|
|
# Select All nodes except node under test (useful for failover). This is DEFAULT one
|
|
ALL_EXCEPT_UNDER_TEST = "ALL_EXCEPT_UNDER_TEST"
|
|
# Select ONE random node except under test (useful for failover).
|
|
RANDOM_SINGLE_EXCEPT_UNDER_TEST = "RANDOM_SINGLE_EXCEPT_UNDER_TEST"
|
|
# Select node under test
|
|
NODE_UNDER_TEST = "NODE_UNDER_TEST"
|
|
|
|
|
|
class EndpointSelectionStrategy(Enum):
|
|
"""Enum which defines which endpoint to select from each storage node"""
|
|
|
|
# Select All endpoints.
|
|
ALL = "ALL"
|
|
# Select first endpoint from node
|
|
FIRST = "FIRST"
|
|
|
|
|
|
class K6ProcessAllocationStrategy(Enum):
|
|
"""Enum which defines how K6 processes should be allocated"""
|
|
|
|
# Each load node will get one k6 process with all endpoints (Default)
|
|
PER_LOAD_NODE = "PER_LOAD_NODE"
|
|
# Each endpoint will get it's own k6 process regardless of number of load nodes.
|
|
# If there is not enough load nodes, some nodes may have multiple k6 processes
|
|
PER_ENDPOINT = "PER_ENDPOINT"
|
|
|
|
|
|
class MetaConfig:
|
|
def _get_field_formatter(self, field_name: str) -> Callable | None:
|
|
data_fields = fields(self)
|
|
formatters = [
|
|
field.metadata["formatter"]
|
|
for field in data_fields
|
|
if field.name == field_name and "formatter" in field.metadata and field.metadata["formatter"] != None
|
|
]
|
|
if formatters:
|
|
return formatters[0]
|
|
|
|
return None
|
|
|
|
def __setattr__(self, field_name, value):
|
|
formatter = self._get_field_formatter(field_name)
|
|
if formatter:
|
|
value = formatter(value)
|
|
|
|
super().__setattr__(field_name, value)
|
|
|
|
|
|
@dataclass
|
|
class Preset(MetaConfig):
|
|
# ------ COMMON ------
|
|
# Amount of objects which should be created
|
|
objects_count: Optional[int] = metadata_field(all_load_scenarios, "preload_obj", None, False)
|
|
# Preset json. Filled automatically.
|
|
pregen_json: Optional[str] = metadata_field(all_load_scenarios, "out", "PREGEN_JSON", False)
|
|
# Workers count for preset
|
|
workers: Optional[int] = metadata_field(all_load_scenarios, "workers", None, False)
|
|
# Acl for container/buckets
|
|
acl: Optional[str] = metadata_field(all_load_scenarios, "acl", None, False)
|
|
|
|
# ------ GRPC ------
|
|
# Amount of containers which should be created
|
|
containers_count: Optional[int] = metadata_field(grpc_preset_scenarios, "containers", None, False)
|
|
# Container placement policy for containers for gRPC
|
|
container_placement_policy: Optional[list[str]] = metadata_field(grpc_preset_scenarios, "policy", None, False, formatter=force_list)
|
|
# Number of retries for creation of container
|
|
container_creation_retry: Optional[int] = metadata_field(grpc_preset_scenarios, "retry", None, False)
|
|
|
|
# ------ S3 ------
|
|
# Amount of buckets which should be created
|
|
buckets_count: Optional[int] = metadata_field(s3_preset_scenarios, "buckets", None, False)
|
|
# S3 region (AKA placement policy for S3 buckets)
|
|
s3_location: Optional[list[str]] = metadata_field(s3_preset_scenarios, "location", None, False, formatter=force_list)
|
|
|
|
# Delay between containers creation and object upload for preset
|
|
object_upload_delay: Optional[int] = metadata_field(all_load_scenarios, "sleep", None, False)
|
|
|
|
# Flag to control preset erorrs
|
|
ignore_errors: Optional[bool] = metadata_field(all_load_scenarios, "ignore-errors", None, False)
|
|
|
|
# Flag to ensure created containers store data on local endpoints
|
|
local: Optional[bool] = metadata_field(grpc_preset_scenarios, "local", None, False)
|
|
|
|
|
|
@dataclass
|
|
class PrometheusParams(MetaConfig):
|
|
# Prometheus server URL
|
|
server_url: Optional[str] = metadata_field(all_load_scenarios, env_variable="K6_PROMETHEUS_RW_SERVER_URL", string_repr=False)
|
|
# Prometheus trend stats
|
|
trend_stats: Optional[str] = metadata_field(all_load_scenarios, env_variable="K6_PROMETHEUS_RW_TREND_STATS", string_repr=False)
|
|
# Additional tags
|
|
metrics_tags: Optional[str] = metadata_field(all_load_scenarios, None, "METRIC_TAGS", False)
|
|
|
|
|
|
@dataclass
|
|
class LoadParams(MetaConfig):
|
|
# ------- CONTROL PARAMS -------
|
|
# Load type can be gRPC, HTTP, S3.
|
|
load_type: LoadType
|
|
# Load scenario from k6 scenarios
|
|
scenario: Optional[LoadScenario] = None
|
|
# Strategy to select nodes under load. See NodesSelectionStrategy class for more details.
|
|
# default is ALL_EXCEPT_UNDER_TEST
|
|
nodes_selection_strategy: Optional[NodesSelectionStrategy] = None
|
|
# Strategy which defines which endpoint to select from each storage node
|
|
endpoint_selection_strategy: Optional[EndpointSelectionStrategy] = None
|
|
# Strategy which defines how K6 processes should be allocated
|
|
k6_process_allocation_strategy: Optional[K6ProcessAllocationStrategy] = None
|
|
# Set to true in order to verify uploaded objects after K6 load finish. Default is True.
|
|
verify: Optional[bool] = None
|
|
# Just id for load so distinct it between runs. Filled automatically.
|
|
load_id: Optional[str] = None
|
|
# Acceptable number of load errors in %
|
|
# 100 means 100% errors allowed
|
|
# 1.5 means 1.5% errors allowed
|
|
# 0 means no errors allowed
|
|
error_threshold: Optional[float] = None
|
|
# Working directory
|
|
working_dir: Optional[str] = None
|
|
# Preset for the k6 run
|
|
preset: Optional[Preset] = None
|
|
# K6 download url
|
|
k6_url: Optional[str] = None
|
|
# Requests module url
|
|
requests_module_url: Optional[str] = None
|
|
# aws cli download url
|
|
awscli_url: Optional[str] = None
|
|
# No ssl verification flag
|
|
no_verify_ssl: Optional[bool] = metadata_field(
|
|
[
|
|
LoadScenario.S3,
|
|
LoadScenario.S3_CAR,
|
|
LoadScenario.S3_MULTIPART,
|
|
LoadScenario.S3_LOCAL,
|
|
LoadScenario.VERIFY,
|
|
LoadScenario.HTTP,
|
|
],
|
|
"no-verify-ssl",
|
|
"NO_VERIFY_SSL",
|
|
False,
|
|
)
|
|
# Percentage of filling of all data disks on all nodes
|
|
fill_percent: Optional[float] = None
|
|
# if specified, max payload size in GB of the storage engine. If the storage engine is already full, no new objects will be saved.
|
|
max_total_size_gb: Optional[float] = metadata_field([LoadScenario.LOCAL, LoadScenario.S3_LOCAL], None, "MAX_TOTAL_SIZE_GB")
|
|
# if set, the payload is generated on the fly and is not read into memory fully.
|
|
streaming: Optional[int] = metadata_field(all_load_scenarios, None, "STREAMING", False)
|
|
# Output format
|
|
output: Optional[str] = metadata_field(all_load_scenarios, None, "K6_OUT", False)
|
|
# Prometheus params
|
|
prometheus: Optional[PrometheusParams] = None
|
|
|
|
# ------- COMMON SCENARIO PARAMS -------
|
|
# Load time is the maximum duration for k6 to give load. Default is the BACKGROUND_LOAD_DEFAULT_TIME value.
|
|
load_time: Optional[int] = metadata_field(all_load_scenarios, None, "DURATION", False, formatter=convert_time_to_seconds)
|
|
# Object size in KB for load and preset.
|
|
object_size: Optional[int] = metadata_field(all_load_scenarios, "size", "WRITE_OBJ_SIZE", False)
|
|
# For read operations, controls from which set get objects to read
|
|
read_from: Optional[ReadFrom] = None
|
|
# For read operations done from REGISTRY, controls delay which object should live before it will be used for read operation
|
|
read_age: Optional[int] = metadata_field(all_load_scenarios, None, "READ_AGE", False)
|
|
# Output registry K6 file. Filled automatically.
|
|
registry_file: Optional[str] = metadata_field(all_scenarios, None, "REGISTRY_FILE", False)
|
|
# In case if we want to use custom registry file left from another load run
|
|
custom_registry: Optional[str] = None
|
|
# In case if we want to use custom registry file left from another load run
|
|
force_fresh_registry: Optional[bool] = None
|
|
# Specifies the minimum duration of every single execution (i.e. iteration).
|
|
# Any iterations that are shorter than this value will cause that VU to
|
|
# sleep for the remainder of the time until the specified minimum duration is reached.
|
|
min_iteration_duration: Optional[str] = metadata_field(all_load_scenarios, None, "K6_MIN_ITERATION_DURATION", False)
|
|
# Prepare/cut objects locally on client before sending
|
|
prepare_locally: Optional[bool] = metadata_field([LoadScenario.gRPC, LoadScenario.gRPC_CAR], None, "PREPARE_LOCALLY", False)
|
|
# Specifies K6 setupTimeout time. Currently hardcoded in xk6 as 5 seconds for all scenarios
|
|
# https://k6.io/docs/using-k6/k6-options/reference/#setup-timeout
|
|
setup_timeout: Optional[str] = metadata_field(all_scenarios, None, "K6_SETUP_TIMEOUT", False)
|
|
|
|
# Delay for read operations in case if we read from registry
|
|
read_age: Optional[int] = metadata_field(all_load_scenarios, None, "READ_AGE", None, False)
|
|
|
|
# Initialization time for each VU for k6 load
|
|
vu_init_time: Optional[float] = None
|
|
|
|
# ------- CONSTANT VUS SCENARIO PARAMS -------
|
|
# Amount of Writers VU.
|
|
writers: Optional[int] = metadata_field(constant_vus_scenarios, None, "WRITERS", True, True)
|
|
# Amount of Readers VU.
|
|
readers: Optional[int] = metadata_field(constant_vus_scenarios, None, "READERS", True, True)
|
|
# Amount of Deleters VU.
|
|
deleters: Optional[int] = metadata_field(constant_vus_scenarios, None, "DELETERS", True, True)
|
|
|
|
# ------- CONSTANT ARRIVAL RATE SCENARIO PARAMS -------
|
|
# Number of iterations to start during each timeUnit period for write.
|
|
write_rate: Optional[int] = metadata_field(constant_arrival_rate_scenarios, None, "WRITE_RATE", True, True)
|
|
|
|
# Number of iterations to start during each timeUnit period for read.
|
|
read_rate: Optional[int] = metadata_field(constant_arrival_rate_scenarios, None, "READ_RATE", True, True)
|
|
|
|
# Number of iterations to start during each timeUnit period for delete.
|
|
delete_rate: Optional[int] = metadata_field(constant_arrival_rate_scenarios, None, "DELETE_RATE", True, True)
|
|
|
|
# Amount of preAllocatedVUs for write operations.
|
|
preallocated_writers: Optional[int] = metadata_field(constant_arrival_rate_scenarios, None, "PRE_ALLOC_WRITERS", True, True)
|
|
# Amount of maxVUs for write operations.
|
|
max_writers: Optional[int] = metadata_field(constant_arrival_rate_scenarios, None, "MAX_WRITERS", False, True)
|
|
|
|
# Amount of preAllocatedVUs for read operations.
|
|
preallocated_readers: Optional[int] = metadata_field(constant_arrival_rate_scenarios, None, "PRE_ALLOC_READERS", True, True)
|
|
# Amount of maxVUs for read operations.
|
|
max_readers: Optional[int] = metadata_field(constant_arrival_rate_scenarios, None, "MAX_READERS", False, True)
|
|
|
|
# Amount of preAllocatedVUs for read operations.
|
|
preallocated_deleters: Optional[int] = metadata_field(constant_arrival_rate_scenarios, None, "PRE_ALLOC_DELETERS", True, True)
|
|
# Amount of maxVUs for delete operations.
|
|
max_deleters: Optional[int] = metadata_field(constant_arrival_rate_scenarios, None, "MAX_DELETERS", False, True)
|
|
|
|
# Multipart
|
|
# Number of parts to upload in parallel
|
|
writers_multipart: Optional[int] = metadata_field([LoadScenario.S3_MULTIPART], None, "WRITERS_MULTIPART", False, True)
|
|
# part size must be greater than (5 MB)
|
|
write_object_part_size: Optional[int] = metadata_field([LoadScenario.S3_MULTIPART], None, "WRITE_OBJ_PART_SIZE", False)
|
|
|
|
# Period of time to apply the rate value.
|
|
time_unit: Optional[str] = metadata_field(constant_arrival_rate_scenarios, None, "TIME_UNIT", False)
|
|
|
|
# ------- VERIFY SCENARIO PARAMS -------
|
|
# Maximum verification time for k6 to verify objects. Default is BACKGROUND_LOAD_MAX_VERIFY_TIME (3600).
|
|
verify_time: Optional[int] = metadata_field([LoadScenario.VERIFY], None, "TIME_LIMIT", False)
|
|
# Amount of Verification VU.
|
|
verify_clients: Optional[int] = metadata_field([LoadScenario.VERIFY], None, "CLIENTS", True, False)
|
|
|
|
# ------- LOCAL SCENARIO PARAMS -------
|
|
# Config file location (filled automatically)
|
|
config_file: Optional[str] = metadata_field([LoadScenario.LOCAL, LoadScenario.S3_LOCAL], None, "CONFIG_FILE", False)
|
|
# Config directory location (filled automatically)
|
|
config_dir: Optional[str] = metadata_field([LoadScenario.LOCAL, LoadScenario.S3_LOCAL], None, "CONFIG_DIR", False)
|
|
|
|
def set_id(self, load_id):
|
|
self.load_id = load_id
|
|
|
|
if self.read_from == ReadFrom.REGISTRY:
|
|
self.registry_file = os.path.join(self.working_dir, f"{load_id}_registry.bolt")
|
|
|
|
# For now it's okay to have it this way
|
|
if self.custom_registry is not None:
|
|
self.registry_file = self.custom_registry
|
|
|
|
if self.read_from == ReadFrom.PRESET:
|
|
self.registry_file = None
|
|
|
|
if self.preset:
|
|
self.preset.pregen_json = os.path.join(self.working_dir, f"{load_id}_prepare.json")
|
|
|
|
def get_k6_vars(self):
|
|
env_vars = {
|
|
meta_field.metadata["scenario_variable"]: meta_field.value
|
|
for meta_field in self._get_meta_fields(self)
|
|
if self.scenario in meta_field.metadata["applicable_scenarios"]
|
|
and meta_field.metadata["scenario_variable"]
|
|
and meta_field.value is not None
|
|
}
|
|
|
|
return env_vars
|
|
|
|
def get_env_vars(self):
|
|
env_vars = {
|
|
meta_field.metadata["env_variable"]: meta_field.value
|
|
for meta_field in self._get_meta_fields(self)
|
|
if self.scenario in meta_field.metadata["applicable_scenarios"]
|
|
and meta_field.metadata["env_variable"]
|
|
and meta_field.value is not None
|
|
}
|
|
|
|
return env_vars
|
|
|
|
def __post_init__(self):
|
|
default_scenario_map = {
|
|
LoadType.gRPC: LoadScenario.gRPC,
|
|
LoadType.HTTP: LoadScenario.HTTP,
|
|
LoadType.S3: LoadScenario.S3,
|
|
}
|
|
|
|
if self.scenario is None:
|
|
self.scenario = default_scenario_map[self.load_type]
|
|
|
|
def get_preset_arguments(self):
|
|
command_args = [
|
|
self._get_preset_argument(meta_field)
|
|
for meta_field in self._get_meta_fields(self)
|
|
if self.scenario in meta_field.metadata["applicable_scenarios"]
|
|
and meta_field.metadata["preset_argument"]
|
|
and meta_field.value is not None
|
|
and self._get_preset_argument(meta_field)
|
|
]
|
|
|
|
return command_args
|
|
|
|
def get_init_time(self) -> int:
|
|
return math.ceil(self._get_total_vus() * self.vu_init_time)
|
|
|
|
def _get_total_vus(self) -> int:
|
|
vu_fields = ["writers", "preallocated_writers", "readers", "preallocated_readers"]
|
|
data_fields = [getattr(self, field.name) or 0 for field in fields(self) if field.name in vu_fields]
|
|
return sum(data_fields)
|
|
|
|
def _get_applicable_fields(self):
|
|
applicable_fields = [
|
|
meta_field
|
|
for meta_field in self._get_meta_fields(self)
|
|
if self.scenario in meta_field.metadata["applicable_scenarios"] and meta_field.value
|
|
]
|
|
|
|
return applicable_fields
|
|
|
|
@staticmethod
|
|
def _get_preset_argument(meta_field: MetaField) -> str:
|
|
if isinstance(meta_field.value, bool):
|
|
# For preset calls, bool values are passed with just --<argument_name> if the value is True
|
|
return f"--{meta_field.metadata['preset_argument']}" if meta_field.value else ""
|
|
|
|
if isinstance(meta_field.value, list):
|
|
return (
|
|
" ".join(f"--{meta_field.metadata['preset_argument']} '{value}'" for value in meta_field.value) if meta_field.value else ""
|
|
)
|
|
|
|
return f"--{meta_field.metadata['preset_argument']} '{meta_field.value}'"
|
|
|
|
@staticmethod
|
|
def _get_meta_fields(instance) -> list[MetaField]:
|
|
data_fields = fields(instance)
|
|
|
|
fields_with_data = [
|
|
MetaField(field.name, field.metadata, getattr(instance, field.name))
|
|
for field in data_fields
|
|
if field.metadata and getattr(instance, field.name) is not None
|
|
]
|
|
|
|
for field in data_fields:
|
|
actual_field_type = get_args(field.type)[0] if len(get_args(field.type)) else get_args(field.type)
|
|
if is_dataclass(actual_field_type) and getattr(instance, field.name):
|
|
fields_with_data += LoadParams._get_meta_fields(getattr(instance, field.name))
|
|
|
|
return fields_with_data or []
|
|
|
|
def __str__(self) -> str:
|
|
load_type_str = self.scenario.value if self.scenario else self.load_type.value
|
|
# TODO: migrate load_params defaults to testlib
|
|
if self.object_size is not None:
|
|
size, unit = calc_unit(self.object_size, 1)
|
|
static_params = [f"{load_type_str} {size:.4g} {unit}"]
|
|
else:
|
|
static_params = [f"{load_type_str}"]
|
|
|
|
dynamic_params = [
|
|
f"{meta_field.name}={meta_field.value}" for meta_field in self._get_applicable_fields() if meta_field.metadata["string_repr"]
|
|
]
|
|
params = ", ".join(static_params + dynamic_params)
|
|
|
|
return params
|
|
|
|
def __repr__(self) -> str:
|
|
return self.__str__()
|