import math import os from dataclasses import dataclass, field, fields, is_dataclass from enum import Enum from types import MappingProxyType from typing import Any, Callable, Optional, get_args from frostfs_testlib.utils.converting_utils import calc_unit def convert_time_to_seconds(time: int | str | None) -> int: if time is None: return None if str(time).isdigit(): seconds = int(time) else: days, hours, minutes = 0, 0, 0 if "d" in time: days, time = time.split("d") if "h" in time: hours, time = time.split("h") if "min" in time: minutes = time.replace("min", "") seconds = int(days) * 86400 + int(hours) * 3600 + int(minutes) * 60 return seconds def force_list(input: str | list[str]): if input is None: return None if isinstance(input, list): return list(map(str.strip, input)) return [input.strip()] class LoadType(Enum): gRPC = "grpc" S3 = "s3" HTTP = "http" class LoadScenario(Enum): gRPC = "grpc" gRPC_CAR = "grpc_car" S3 = "s3" S3_CAR = "s3_car" S3_MULTIPART = "s3_multipart" S3_LOCAL = "s3local" HTTP = "http" VERIFY = "verify" LOCAL = "local" class ReadFrom(Enum): REGISTRY = "registry" PRESET = "preset" MANUAL = "manual" all_load_scenarios = [ LoadScenario.gRPC, LoadScenario.S3, LoadScenario.HTTP, LoadScenario.S3_CAR, LoadScenario.gRPC_CAR, LoadScenario.LOCAL, LoadScenario.S3_MULTIPART, LoadScenario.S3_LOCAL, ] all_scenarios = all_load_scenarios.copy() + [LoadScenario.VERIFY] constant_vus_scenarios = [ LoadScenario.gRPC, LoadScenario.S3, LoadScenario.HTTP, LoadScenario.LOCAL, LoadScenario.S3_MULTIPART, LoadScenario.S3_LOCAL, ] constant_arrival_rate_scenarios = [LoadScenario.gRPC_CAR, LoadScenario.S3_CAR] grpc_preset_scenarios = [ LoadScenario.gRPC, LoadScenario.HTTP, LoadScenario.gRPC_CAR, LoadScenario.LOCAL, ] s3_preset_scenarios = [LoadScenario.S3, LoadScenario.S3_CAR, LoadScenario.S3_MULTIPART, LoadScenario.S3_LOCAL] @dataclass class MetaField: name: str metadata: MappingProxyType value: Any def metadata_field( applicable_scenarios: list[LoadScenario], preset_param: Optional[str] = None, scenario_variable: Optional[str] = None, string_repr: Optional[bool] = True, distributed: Optional[bool] = False, formatter: Optional[Callable] = None, env_variable: Optional[str] = None, ): return field( default=None, metadata={ "applicable_scenarios": applicable_scenarios, "preset_argument": preset_param, "scenario_variable": scenario_variable, "string_repr": string_repr, "distributed": distributed, "formatter": formatter, "env_variable": env_variable, }, ) class NodesSelectionStrategy(Enum): # Select ONE random node from cluster nodes. RANDOM_SINGLE = "RANDOM_SINGLE" # Select All nodes. ALL = "ALL" # Select All nodes except node under test (useful for failover). This is DEFAULT one ALL_EXCEPT_UNDER_TEST = "ALL_EXCEPT_UNDER_TEST" # Select ONE random node except under test (useful for failover). RANDOM_SINGLE_EXCEPT_UNDER_TEST = "RANDOM_SINGLE_EXCEPT_UNDER_TEST" # Select node under test NODE_UNDER_TEST = "NODE_UNDER_TEST" class EndpointSelectionStrategy(Enum): """Enum which defines which endpoint to select from each storage node""" # Select All endpoints. ALL = "ALL" # Select first endpoint from node FIRST = "FIRST" class K6ProcessAllocationStrategy(Enum): """Enum which defines how K6 processes should be allocated""" # Each load node will get one k6 process with all endpoints (Default) PER_LOAD_NODE = "PER_LOAD_NODE" # Each endpoint will get it's own k6 process regardless of number of load nodes. # If there is not enough load nodes, some nodes may have multiple k6 processes PER_ENDPOINT = "PER_ENDPOINT" class MetaConfig: def _get_field_formatter(self, field_name: str) -> Callable | None: data_fields = fields(self) formatters = [ field.metadata["formatter"] for field in data_fields if field.name == field_name and "formatter" in field.metadata and field.metadata["formatter"] != None ] if formatters: return formatters[0] return None def __setattr__(self, field_name, value): formatter = self._get_field_formatter(field_name) if formatter: value = formatter(value) super().__setattr__(field_name, value) @dataclass class Preset(MetaConfig): # ------ COMMON ------ # Amount of objects which should be created objects_count: Optional[int] = metadata_field(all_load_scenarios, "preload_obj", None, False) # Preset json. Filled automatically. pregen_json: Optional[str] = metadata_field(all_load_scenarios, "out", "PREGEN_JSON", False) # Workers count for preset workers: Optional[int] = metadata_field(all_load_scenarios, "workers", None, False) # Acl for container/buckets acl: Optional[str] = metadata_field(all_load_scenarios, "acl", None, False) # ------ GRPC ------ # Amount of containers which should be created containers_count: Optional[int] = metadata_field(grpc_preset_scenarios, "containers", None, False) # Container placement policy for containers for gRPC container_placement_policy: Optional[list[str]] = metadata_field(grpc_preset_scenarios, "policy", None, False, formatter=force_list) # Number of retries for creation of container container_creation_retry: Optional[int] = metadata_field(grpc_preset_scenarios, "retry", None, False) # ------ S3 ------ # Amount of buckets which should be created buckets_count: Optional[int] = metadata_field(s3_preset_scenarios, "buckets", None, False) # S3 region (AKA placement policy for S3 buckets) s3_location: Optional[list[str]] = metadata_field(s3_preset_scenarios, "location", None, False, formatter=force_list) # Delay between containers creation and object upload for preset object_upload_delay: Optional[int] = metadata_field(all_load_scenarios, "sleep", None, False) # Flag to control preset erorrs ignore_errors: Optional[bool] = metadata_field(all_load_scenarios, "ignore-errors", None, False) # Flag to ensure created containers store data on local endpoints local: Optional[bool] = metadata_field(grpc_preset_scenarios, "local", None, False) @dataclass class PrometheusParams(MetaConfig): # Prometheus server URL server_url: Optional[str] = metadata_field(all_load_scenarios, env_variable="K6_PROMETHEUS_RW_SERVER_URL", string_repr=False) # Prometheus trend stats trend_stats: Optional[str] = metadata_field(all_load_scenarios, env_variable="K6_PROMETHEUS_RW_TREND_STATS", string_repr=False) # Additional tags metrics_tags: Optional[str] = metadata_field(all_load_scenarios, None, "METRIC_TAGS", False) @dataclass class LoadParams(MetaConfig): # ------- CONTROL PARAMS ------- # Load type can be gRPC, HTTP, S3. load_type: LoadType # Load scenario from k6 scenarios scenario: Optional[LoadScenario] = None # Strategy to select nodes under load. See NodesSelectionStrategy class for more details. # default is ALL_EXCEPT_UNDER_TEST nodes_selection_strategy: Optional[NodesSelectionStrategy] = None # Strategy which defines which endpoint to select from each storage node endpoint_selection_strategy: Optional[EndpointSelectionStrategy] = None # Strategy which defines how K6 processes should be allocated k6_process_allocation_strategy: Optional[K6ProcessAllocationStrategy] = None # Set to true in order to verify uploaded objects after K6 load finish. Default is True. verify: Optional[bool] = None # Just id for load so distinct it between runs. Filled automatically. load_id: Optional[str] = None # Acceptable number of load errors in % # 100 means 100% errors allowed # 1.5 means 1.5% errors allowed # 0 means no errors allowed error_threshold: Optional[float] = None # Working directory working_dir: Optional[str] = None # Preset for the k6 run preset: Optional[Preset] = None # K6 download url k6_url: Optional[str] = None # Requests module url requests_module_url: Optional[str] = None # aws cli download url awscli_url: Optional[str] = None # No ssl verification flag no_verify_ssl: Optional[bool] = metadata_field( [ LoadScenario.S3, LoadScenario.S3_CAR, LoadScenario.S3_MULTIPART, LoadScenario.S3_LOCAL, LoadScenario.VERIFY, LoadScenario.HTTP, ], "no-verify-ssl", "NO_VERIFY_SSL", False, ) # Percentage of filling of all data disks on all nodes fill_percent: Optional[float] = None # if specified, max payload size in GB of the storage engine. If the storage engine is already full, no new objects will be saved. max_total_size_gb: Optional[float] = metadata_field([LoadScenario.LOCAL, LoadScenario.S3_LOCAL], None, "MAX_TOTAL_SIZE_GB") # if set, the payload is generated on the fly and is not read into memory fully. streaming: Optional[int] = metadata_field(all_load_scenarios, None, "STREAMING", False) # Output format output: Optional[str] = metadata_field(all_load_scenarios, None, "K6_OUT", False) # Prometheus params prometheus: Optional[PrometheusParams] = None # ------- COMMON SCENARIO PARAMS ------- # Load time is the maximum duration for k6 to give load. Default is the BACKGROUND_LOAD_DEFAULT_TIME value. load_time: Optional[int] = metadata_field(all_load_scenarios, None, "DURATION", False, formatter=convert_time_to_seconds) # Object size in KB for load and preset. object_size: Optional[int] = metadata_field(all_load_scenarios, "size", "WRITE_OBJ_SIZE", False) # For read operations, controls from which set get objects to read read_from: Optional[ReadFrom] = None # For read operations done from REGISTRY, controls delay which object should live before it will be used for read operation read_age: Optional[int] = metadata_field(all_load_scenarios, None, "READ_AGE", False) # Output registry K6 file. Filled automatically. registry_file: Optional[str] = metadata_field(all_scenarios, None, "REGISTRY_FILE", False) # In case if we want to use custom registry file left from another load run custom_registry: Optional[str] = None # In case if we want to use custom registry file left from another load run force_fresh_registry: Optional[bool] = None # Specifies the minimum duration of every single execution (i.e. iteration). # Any iterations that are shorter than this value will cause that VU to # sleep for the remainder of the time until the specified minimum duration is reached. min_iteration_duration: Optional[str] = metadata_field(all_load_scenarios, None, "K6_MIN_ITERATION_DURATION", False) # Prepare/cut objects locally on client before sending prepare_locally: Optional[bool] = metadata_field([LoadScenario.gRPC, LoadScenario.gRPC_CAR], None, "PREPARE_LOCALLY", False) # Specifies K6 setupTimeout time. Currently hardcoded in xk6 as 5 seconds for all scenarios # https://k6.io/docs/using-k6/k6-options/reference/#setup-timeout setup_timeout: Optional[str] = metadata_field(all_scenarios, None, "K6_SETUP_TIMEOUT", False) # Delay for read operations in case if we read from registry read_age: Optional[int] = metadata_field(all_load_scenarios, None, "READ_AGE", None, False) # Initialization time for each VU for k6 load vu_init_time: Optional[float] = None # ------- CONSTANT VUS SCENARIO PARAMS ------- # Amount of Writers VU. writers: Optional[int] = metadata_field(constant_vus_scenarios, None, "WRITERS", True, True) # Amount of Readers VU. readers: Optional[int] = metadata_field(constant_vus_scenarios, None, "READERS", True, True) # Amount of Deleters VU. deleters: Optional[int] = metadata_field(constant_vus_scenarios, None, "DELETERS", True, True) # ------- CONSTANT ARRIVAL RATE SCENARIO PARAMS ------- # Number of iterations to start during each timeUnit period for write. write_rate: Optional[int] = metadata_field(constant_arrival_rate_scenarios, None, "WRITE_RATE", True, True) # Number of iterations to start during each timeUnit period for read. read_rate: Optional[int] = metadata_field(constant_arrival_rate_scenarios, None, "READ_RATE", True, True) # Number of iterations to start during each timeUnit period for delete. delete_rate: Optional[int] = metadata_field(constant_arrival_rate_scenarios, None, "DELETE_RATE", True, True) # Amount of preAllocatedVUs for write operations. preallocated_writers: Optional[int] = metadata_field(constant_arrival_rate_scenarios, None, "PRE_ALLOC_WRITERS", True, True) # Amount of maxVUs for write operations. max_writers: Optional[int] = metadata_field(constant_arrival_rate_scenarios, None, "MAX_WRITERS", False, True) # Amount of preAllocatedVUs for read operations. preallocated_readers: Optional[int] = metadata_field(constant_arrival_rate_scenarios, None, "PRE_ALLOC_READERS", True, True) # Amount of maxVUs for read operations. max_readers: Optional[int] = metadata_field(constant_arrival_rate_scenarios, None, "MAX_READERS", False, True) # Amount of preAllocatedVUs for read operations. preallocated_deleters: Optional[int] = metadata_field(constant_arrival_rate_scenarios, None, "PRE_ALLOC_DELETERS", True, True) # Amount of maxVUs for delete operations. max_deleters: Optional[int] = metadata_field(constant_arrival_rate_scenarios, None, "MAX_DELETERS", False, True) # Multipart # Number of parts to upload in parallel writers_multipart: Optional[int] = metadata_field([LoadScenario.S3_MULTIPART], None, "WRITERS_MULTIPART", False, True) # part size must be greater than (5 MB) write_object_part_size: Optional[int] = metadata_field([LoadScenario.S3_MULTIPART], None, "WRITE_OBJ_PART_SIZE", False) # Period of time to apply the rate value. time_unit: Optional[str] = metadata_field(constant_arrival_rate_scenarios, None, "TIME_UNIT", False) # ------- VERIFY SCENARIO PARAMS ------- # Maximum verification time for k6 to verify objects. Default is BACKGROUND_LOAD_MAX_VERIFY_TIME (3600). verify_time: Optional[int] = metadata_field([LoadScenario.VERIFY], None, "TIME_LIMIT", False) # Amount of Verification VU. verify_clients: Optional[int] = metadata_field([LoadScenario.VERIFY], None, "CLIENTS", True, False) # ------- LOCAL SCENARIO PARAMS ------- # Config file location (filled automatically) config_file: Optional[str] = metadata_field([LoadScenario.LOCAL, LoadScenario.S3_LOCAL], None, "CONFIG_FILE", False) # Config directory location (filled automatically) config_dir: Optional[str] = metadata_field([LoadScenario.LOCAL, LoadScenario.S3_LOCAL], None, "CONFIG_DIR", False) def set_id(self, load_id): self.load_id = load_id if self.read_from == ReadFrom.REGISTRY: self.registry_file = os.path.join(self.working_dir, f"{load_id}_registry.bolt") # For now it's okay to have it this way if self.custom_registry is not None: self.registry_file = self.custom_registry if self.read_from == ReadFrom.PRESET: self.registry_file = None if self.preset: self.preset.pregen_json = os.path.join(self.working_dir, f"{load_id}_prepare.json") def get_k6_vars(self): env_vars = { meta_field.metadata["scenario_variable"]: meta_field.value for meta_field in self._get_meta_fields(self) if self.scenario in meta_field.metadata["applicable_scenarios"] and meta_field.metadata["scenario_variable"] and meta_field.value is not None } return env_vars def get_env_vars(self): env_vars = { meta_field.metadata["env_variable"]: meta_field.value for meta_field in self._get_meta_fields(self) if self.scenario in meta_field.metadata["applicable_scenarios"] and meta_field.metadata["env_variable"] and meta_field.value is not None } return env_vars def __post_init__(self): default_scenario_map = { LoadType.gRPC: LoadScenario.gRPC, LoadType.HTTP: LoadScenario.HTTP, LoadType.S3: LoadScenario.S3, } if self.scenario is None: self.scenario = default_scenario_map[self.load_type] def get_preset_arguments(self): command_args = [ self._get_preset_argument(meta_field) for meta_field in self._get_meta_fields(self) if self.scenario in meta_field.metadata["applicable_scenarios"] and meta_field.metadata["preset_argument"] and meta_field.value is not None and self._get_preset_argument(meta_field) ] return command_args def get_init_time(self) -> int: return math.ceil(self._get_total_vus() * self.vu_init_time) def _get_total_vus(self) -> int: vu_fields = ["writers", "preallocated_writers", "readers", "preallocated_readers"] data_fields = [getattr(self, field.name) or 0 for field in fields(self) if field.name in vu_fields] return sum(data_fields) def _get_applicable_fields(self): applicable_fields = [ meta_field for meta_field in self._get_meta_fields(self) if self.scenario in meta_field.metadata["applicable_scenarios"] and meta_field.value ] return applicable_fields @staticmethod def _get_preset_argument(meta_field: MetaField) -> str: if isinstance(meta_field.value, bool): # For preset calls, bool values are passed with just -- if the value is True return f"--{meta_field.metadata['preset_argument']}" if meta_field.value else "" if isinstance(meta_field.value, list): return ( " ".join(f"--{meta_field.metadata['preset_argument']} '{value}'" for value in meta_field.value) if meta_field.value else "" ) return f"--{meta_field.metadata['preset_argument']} '{meta_field.value}'" @staticmethod def _get_meta_fields(instance) -> list[MetaField]: data_fields = fields(instance) fields_with_data = [ MetaField(field.name, field.metadata, getattr(instance, field.name)) for field in data_fields if field.metadata and getattr(instance, field.name) is not None ] for field in data_fields: actual_field_type = get_args(field.type)[0] if len(get_args(field.type)) else get_args(field.type) if is_dataclass(actual_field_type) and getattr(instance, field.name): fields_with_data += LoadParams._get_meta_fields(getattr(instance, field.name)) return fields_with_data or [] def __str__(self) -> str: load_type_str = self.scenario.value if self.scenario else self.load_type.value # TODO: migrate load_params defaults to testlib if self.object_size is not None: size, unit = calc_unit(self.object_size, 1) static_params = [f"{load_type_str} {size:.4g} {unit}"] else: static_params = [f"{load_type_str}"] dynamic_params = [ f"{meta_field.name}={meta_field.value}" for meta_field in self._get_applicable_fields() if meta_field.metadata["string_repr"] ] params = ", ".join(static_params + dynamic_params) return params def __repr__(self) -> str: return self.__str__()