import os from dataclasses import dataclass, field, fields, is_dataclass from enum import Enum from types import MappingProxyType from typing import Any, Optional, get_args from frostfs_testlib.utils.converting_utils import calc_unit class LoadType(Enum): gRPC = "grpc" S3 = "s3" HTTP = "http" class LoadScenario(Enum): gRPC = "grpc" gRPC_CAR = "grpc_car" S3 = "s3" S3_CAR = "s3_car" HTTP = "http" VERIFY = "verify" LOCAL = "local" all_load_scenarios = [ LoadScenario.gRPC, LoadScenario.S3, LoadScenario.HTTP, LoadScenario.S3_CAR, LoadScenario.gRPC_CAR, LoadScenario.LOCAL, ] all_scenarios = all_load_scenarios.copy() + [LoadScenario.VERIFY] constant_vus_scenarios = [LoadScenario.gRPC, LoadScenario.S3, LoadScenario.HTTP, LoadScenario.LOCAL] constant_arrival_rate_scenarios = [LoadScenario.gRPC_CAR, LoadScenario.S3_CAR] grpc_preset_scenarios = [ LoadScenario.gRPC, LoadScenario.HTTP, LoadScenario.gRPC_CAR, LoadScenario.LOCAL, ] s3_preset_scenarios = [LoadScenario.S3, LoadScenario.S3_CAR] @dataclass class MetaField: name: str metadata: MappingProxyType value: Any def metadata_field( applicable_scenarios: list[LoadScenario], preset_param: Optional[str] = None, scenario_variable: Optional[str] = None, string_repr: Optional[bool] = True, distributed: Optional[bool] = False, ): return field( default=None, metadata={ "applicable_scenarios": applicable_scenarios, "preset_argument": preset_param, "env_variable": scenario_variable, "string_repr": string_repr, "distributed": distributed, }, ) class NodesSelectionStrategy(Enum): # Select ONE random node from cluster nodes. RANDOM_SINGLE = "RANDOM_SINGLE" # Select All nodes. ALL = "ALL" # Select All nodes except node under test (useful for failover). This is DEFAULT one ALL_EXCEPT_UNDER_TEST = "ALL_EXCEPT_UNDER_TEST" # Select ONE random node except under test (useful for failover). RANDOM_SINGLE_EXCEPT_UNDER_TEST = "RANDOM_SINGLE_EXCEPT_UNDER_TEST" class EndpointSelectionStrategy(Enum): """Enum which defines which endpoint to select from each storage node""" # Select All endpoints. ALL = "ALL" # Select first endpoint from node FIRST = "FIRST" class K6ProcessAllocationStrategy(Enum): """Enum which defines how K6 processes should be allocated""" # Each load node will get one k6 process with all endpoints (Default) PER_LOAD_NODE = "PER_LOAD_NODE" # Each endpoint will get it's own k6 process regardless of number of load nodes. # If there is not enough load nodes, some nodes may have multiple k6 processes PER_ENDPOINT = "PER_ENDPOINT" @dataclass class Preset: # ------ COMMON ------ # Amount of objects which should be created objects_count: Optional[int] = metadata_field(all_load_scenarios, "preload_obj", None, False) # Preset json. Filled automatically. pregen_json: Optional[str] = metadata_field(all_load_scenarios, "out", "PREGEN_JSON", False) # Workers count for preset workers: Optional[int] = metadata_field(all_load_scenarios, "workers", None, False) # ------ GRPC ------ # Amount of containers which should be created containers_count: Optional[int] = metadata_field( grpc_preset_scenarios, "containers", None, False ) # Container placement policy for containers for gRPC container_placement_policy: Optional[str] = metadata_field( grpc_preset_scenarios, "policy", None, False ) # ------ S3 ------ # Amount of buckets which should be created buckets_count: Optional[int] = metadata_field(s3_preset_scenarios, "buckets", None, False) # S3 region (AKA placement policy for S3 buckets) s3_location: Optional[str] = metadata_field(s3_preset_scenarios, "location", None, False) @dataclass class LoadParams: # ------- CONTROL PARAMS ------- # Load type can be gRPC, HTTP, S3. load_type: LoadType # Load scenario from k6 scenarios scenario: Optional[LoadScenario] = None # Strategy to select nodes under load. See NodesSelectionStrategy class for more details. # default is ALL_EXCEPT_UNDER_TEST nodes_selection_strategy: Optional[NodesSelectionStrategy] = None # Strategy which defines which endpoint to select from each storage node endpoint_selection_strategy: Optional[EndpointSelectionStrategy] = None # Strategy which defines how K6 processes should be allocated k6_process_allocation_strategy: Optional[K6ProcessAllocationStrategy] = None # Set to true in order to verify uploaded objects after K6 load finish. Default is True. verify: Optional[bool] = None # Just id for load so distinct it between runs. Filled automatically. load_id: Optional[str] = None # Acceptable number of load errors in % # 100 means 100% errors allowed # 1.5 means 1.5% errors allowed # 0 means no errors allowed error_threshold: Optional[float] = None # Working directory working_dir: Optional[str] = None # Preset for the k6 run preset: Optional[Preset] = None # K6 download url k6_url: Optional[str] = None # No ssl verification flag no_verify_ssl: Optional[bool] = metadata_field( [LoadScenario.S3, LoadScenario.S3_CAR, LoadScenario.VERIFY, LoadScenario.HTTP], "no-verify-ssl", "NO_VERIFY_SSL", False, ) # ------- COMMON SCENARIO PARAMS ------- # Load time is the maximum duration for k6 to give load. Default is the BACKGROUND_LOAD_DEFAULT_TIME value. load_time: Optional[int] = metadata_field(all_load_scenarios, None, "DURATION", False) # Object size in KB for load and preset. object_size: Optional[int] = metadata_field(all_load_scenarios, "size", "WRITE_OBJ_SIZE", False) # Output registry K6 file. Filled automatically. registry_file: Optional[str] = metadata_field(all_scenarios, None, "REGISTRY_FILE", False) # Specifies the minimum duration of every single execution (i.e. iteration). # Any iterations that are shorter than this value will cause that VU to # sleep for the remainder of the time until the specified minimum duration is reached. min_iteration_duration: Optional[str] = metadata_field( all_load_scenarios, None, "K6_MIN_ITERATION_DURATION", False ) # Specifies K6 setupTimeout time. Currently hardcoded in xk6 as 5 seconds for all scenarios # https://k6.io/docs/using-k6/k6-options/reference/#setup-timeout setup_timeout: Optional[str] = metadata_field(all_scenarios, None, "K6_SETUP_TIMEOUT", False) # ------- CONSTANT VUS SCENARIO PARAMS ------- # Amount of Writers VU. writers: Optional[int] = metadata_field(constant_vus_scenarios, None, "WRITERS", True, True) # Amount of Readers VU. readers: Optional[int] = metadata_field(constant_vus_scenarios, None, "READERS", True, True) # Amount of Deleters VU. deleters: Optional[int] = metadata_field(constant_vus_scenarios, None, "DELETERS", True, True) # ------- CONSTANT ARRIVAL RATE SCENARIO PARAMS ------- # Number of iterations to start during each timeUnit period for write. write_rate: Optional[int] = metadata_field( constant_arrival_rate_scenarios, None, "WRITE_RATE", True, True ) # Number of iterations to start during each timeUnit period for read. read_rate: Optional[int] = metadata_field( constant_arrival_rate_scenarios, None, "READ_RATE", True, True ) # Number of iterations to start during each timeUnit period for delete. delete_rate: Optional[int] = metadata_field( constant_arrival_rate_scenarios, None, "DELETE_RATE", True, True ) # Amount of preAllocatedVUs for write operations. preallocated_writers: Optional[int] = metadata_field( constant_arrival_rate_scenarios, None, "PRE_ALLOC_WRITERS", True, True ) # Amount of maxVUs for write operations. max_writers: Optional[int] = metadata_field( constant_arrival_rate_scenarios, None, "MAX_WRITERS", False, True ) # Amount of preAllocatedVUs for read operations. preallocated_readers: Optional[int] = metadata_field( constant_arrival_rate_scenarios, None, "PRE_ALLOC_READERS", True, True ) # Amount of maxVUs for read operations. max_readers: Optional[int] = metadata_field( constant_arrival_rate_scenarios, None, "MAX_READERS", False, True ) # Amount of preAllocatedVUs for read operations. preallocated_deleters: Optional[int] = metadata_field( constant_arrival_rate_scenarios, None, "PRE_ALLOC_DELETERS", True, True ) # Amount of maxVUs for delete operations. max_deleters: Optional[int] = metadata_field( constant_arrival_rate_scenarios, None, "MAX_DELETERS", False, True ) # Period of time to apply the rate value. time_unit: Optional[str] = metadata_field( constant_arrival_rate_scenarios, None, "TIME_UNIT", False ) # ------- VERIFY SCENARIO PARAMS ------- # Maximum verification time for k6 to verify objects. Default is BACKGROUND_LOAD_MAX_VERIFY_TIME (3600). verify_time: Optional[int] = metadata_field([LoadScenario.VERIFY], None, "TIME_LIMIT", False) # Amount of Verification VU. verify_clients: Optional[int] = metadata_field( [LoadScenario.VERIFY], None, "CLIENTS", True, False ) # ------- LOCAL SCENARIO PARAMS ------- # Config file location (filled automatically) config_file: Optional[str] = metadata_field([LoadScenario.LOCAL], None, "CONFIG_FILE", False) def set_id(self, load_id): self.load_id = load_id self.registry_file = os.path.join(self.working_dir, f"{load_id}_registry.bolt") if self.preset: self.preset.pregen_json = os.path.join(self.working_dir, f"{load_id}_prepare.json") def get_env_vars(self): env_vars = { meta_field.metadata["env_variable"]: meta_field.value for meta_field in self._get_meta_fields(self) if self.scenario in meta_field.metadata["applicable_scenarios"] and meta_field.metadata["env_variable"] and meta_field.value is not None } return env_vars def get_preset_arguments(self): command_args = [ self._get_preset_argument(meta_field) for meta_field in self._get_meta_fields(self) if self.scenario in meta_field.metadata["applicable_scenarios"] and meta_field.metadata["preset_argument"] and meta_field.value is not None and self._get_preset_argument(meta_field) ] return command_args def _get_applicable_fields(self): applicable_fields = [ meta_field for meta_field in self._get_meta_fields(self) if self.scenario in meta_field.metadata["applicable_scenarios"] and meta_field.value ] return applicable_fields @staticmethod def _get_preset_argument(meta_field: MetaField) -> str: if isinstance(meta_field.value, bool): # For preset calls, bool values are passed with just -- if the value is True return f"--{meta_field.metadata['preset_argument']}" if meta_field.value else "" return f"--{meta_field.metadata['preset_argument']} '{meta_field.value}'" @staticmethod def _get_meta_fields(instance) -> list[MetaField]: data_fields = fields(instance) fields_with_data = [ MetaField(field.name, field.metadata, getattr(instance, field.name)) for field in data_fields if field.metadata and getattr(instance, field.name) is not None ] for field in data_fields: actual_field_type = ( get_args(field.type)[0] if len(get_args(field.type)) else get_args(field.type) ) if is_dataclass(actual_field_type) and getattr(instance, field.name): fields_with_data += LoadParams._get_meta_fields(getattr(instance, field.name)) return fields_with_data or [] def __str__(self) -> str: size, unit = calc_unit(self.object_size, 1) static_params = [f"{self.scenario.value} ({size:.4g} {unit})"] dynamic_params = [ f"{meta_field.name}={meta_field.value}" for meta_field in self._get_applicable_fields() if meta_field.metadata["string_repr"] ] params = ", ".join(static_params + dynamic_params) return f"load: {params}" def __repr__(self) -> str: return self.__str__()