from dataclasses import Field, dataclass, fields, is_dataclass from typing import Any, get_args import pytest from frostfs_testlib.load.load_config import ( EndpointSelectionStrategy, LoadParams, LoadScenario, LoadType, Preset, ReadFrom, ) from frostfs_testlib.load.runners import DefaultRunner from frostfs_testlib.resources.load_params import BACKGROUND_LOAD_DEFAULT_VU_INIT_TIME from frostfs_testlib.storage.cluster import ClusterNode from frostfs_testlib.storage.controllers.background_load_controller import BackgroundLoadController from frostfs_testlib.storage.dataclasses.frostfs_services import StorageNode from frostfs_testlib.storage.dataclasses.node_base import NodeBase @dataclass class MetaTestField: field: Field field_type: type instance: Any class TestLoadConfig: @pytest.fixture def set_empty(self, request: pytest.FixtureRequest): # Workaround for verify if "param" in request.__dict__ and request.param: return request.param return False @pytest.fixture def load_type(self, request: pytest.FixtureRequest): # Workaround for verify if "param" in request.__dict__ and request.param: return request.param return None @pytest.fixture def load_params(self, load_type: LoadType, set_empty: bool, request: pytest.FixtureRequest): load_scenario = request.param return self._get_filled_load_params(load_type, load_scenario, set_empty) def test_load_params_only_load_type_required(self): load_params = LoadParams(load_type=LoadType.S3) expected = "s3" assert repr(load_params) == expected assert f"{load_params}" == expected def test_load_params_init_time(self): load_params = LoadParams(load_type=LoadType.S3) vus = 100 load_params.vu_init_time = BACKGROUND_LOAD_DEFAULT_VU_INIT_TIME # Used in time calculations load_params.readers = vus load_params.writers = vus load_params.preallocated_readers = vus load_params.preallocated_writers = vus # Not used in time calculations load_params.deleters = vus load_params.preallocated_deleters = vus expected = vus * 4 * BACKGROUND_LOAD_DEFAULT_VU_INIT_TIME actual = load_params.get_init_time() assert actual == expected, "Incorrect time for get_init_time()" def test_load_params_initially_have_all_values_none(self): load_params = LoadParams(load_type=LoadType.S3) self._check_all_values_none(load_params, ["load_type", "scenario"]) def test_preset_initially_have_all_values_none(self): preset = Preset() self._check_all_values_none(preset) @pytest.mark.parametrize("load_params", [LoadScenario.S3_CAR], indirect=True) def test_string_representation_s3_car(self, load_params: LoadParams): load_params.object_size = 524288 expected = "s3_car 512 MiB, write_rate=10, read_rate=9, delete_rate=11, preallocated_writers=20, preallocated_readers=20, preallocated_deleters=21" assert f"{load_params}" == expected assert repr(load_params) == expected @pytest.mark.parametrize("load_params", [LoadScenario.gRPC], indirect=True) def test_string_representation_grpc(self, load_params: LoadParams): load_params.object_size = 512 expected = "grpc 512 KiB, writers=7, readers=7, deleters=8" assert f"{load_params}" == expected assert repr(load_params) == expected @pytest.mark.parametrize("load_params", [LoadScenario.gRPC], indirect=True) def test_load_controller_string_representation(self, load_params: LoadParams): load_params.endpoint_selection_strategy = EndpointSelectionStrategy.ALL load_params.object_size = 512 background_load_controller = BackgroundLoadController( "tmp", load_params, "wallet", None, None, DefaultRunner(None) ) expected = "grpc 512 KiB, writers=7, readers=7, deleters=8" assert f"{background_load_controller}" == expected assert repr(background_load_controller) == expected def test_load_set_id_changes_fields(self): load_params = LoadParams(load_type=LoadType.S3) load_params.preset = Preset() load_params.read_from = ReadFrom["REGISTRY"] load_params.working_dir = "/tmp" load_params.set_id("test_id") assert load_params.registry_file == "/tmp/test_id_registry.bolt" assert load_params.preset.pregen_json == "/tmp/test_id_prepare.json" assert load_params.load_id == "test_id" # No other values should be changed self._check_all_values_none( load_params, [ "load_type", "working_dir", "load_id", "registry_file", "preset", "scenario", "read_from", ], ) self._check_all_values_none(load_params.preset, ["pregen_json", "scenario"]) @pytest.mark.parametrize("load_params", [LoadScenario.gRPC], indirect=True) def test_argument_parsing_for_grpc_scenario(self, load_params: LoadParams): expected_preset_args = [ "--size '11'", "--acl 'acl'", "--preload_obj '13'", "--out 'pregen_json'", "--workers '7'", "--containers '16'", "--policy 'container_placement_policy'", "--ignore-errors", "--sleep '19'", "--local", ] expected_env_vars = { "DURATION": 9, "WRITE_OBJ_SIZE": 11, "REGISTRY_FILE": "registry_file", "K6_MIN_ITERATION_DURATION": "min_iteration_duration", "K6_SETUP_TIMEOUT": "setup_timeout", "WRITERS": 7, "READERS": 7, "DELETERS": 8, "READ_AGE": 8, "STREAMING": 9, "K6_OUT": "output", "PREGEN_JSON": "pregen_json", "PREPARE_LOCALLY": True, } self._check_preset_params(load_params, expected_preset_args) self._check_env_vars(load_params, expected_env_vars) @pytest.mark.parametrize("load_params", [LoadScenario.gRPC_CAR], indirect=True) def test_argument_parsing_for_grpc_car_scenario(self, load_params: LoadParams): expected_preset_args = [ "--size '11'", "--preload_obj '13'", "--out 'pregen_json'", "--workers '7'", "--containers '16'", "--policy 'container_placement_policy'", "--ignore-errors", "--sleep '19'", "--local", "--acl 'acl'", ] expected_env_vars = { "DURATION": 9, "WRITE_OBJ_SIZE": 11, "K6_OUT": "output", "REGISTRY_FILE": "registry_file", "K6_MIN_ITERATION_DURATION": "min_iteration_duration", "K6_SETUP_TIMEOUT": "setup_timeout", "MAX_WRITERS": 11, "MAX_READERS": 11, "MAX_DELETERS": 12, "PRE_ALLOC_DELETERS": 21, "PRE_ALLOC_READERS": 20, "PRE_ALLOC_WRITERS": 20, "PREGEN_JSON": "pregen_json", "TIME_UNIT": "time_unit", "WRITE_RATE": 10, "READ_RATE": 9, "READ_AGE": 8, "DELETE_RATE": 11, "STREAMING": 9, "PREPARE_LOCALLY": True, } self._check_preset_params(load_params, expected_preset_args) self._check_env_vars(load_params, expected_env_vars) @pytest.mark.parametrize("load_params", [LoadScenario.S3], indirect=True) def test_argument_parsing_for_s3_scenario(self, load_params: LoadParams): expected_preset_args = [ "--size '11'", "--preload_obj '13'", "--no-verify-ssl", "--out 'pregen_json'", "--workers '7'", "--buckets '13'", "--location 's3_location'", "--ignore-errors", "--sleep '19'", "--acl 'acl'", ] expected_env_vars = { "DURATION": 9, "WRITE_OBJ_SIZE": 11, "REGISTRY_FILE": "registry_file", "K6_OUT": "output", "K6_MIN_ITERATION_DURATION": "min_iteration_duration", "K6_SETUP_TIMEOUT": "setup_timeout", "WRITERS": 7, "READERS": 7, "DELETERS": 8, "READ_AGE": 8, "STREAMING": 9, "NO_VERIFY_SSL": True, "PREGEN_JSON": "pregen_json", } self._check_preset_params(load_params, expected_preset_args) self._check_env_vars(load_params, expected_env_vars) @pytest.mark.parametrize("load_params", [LoadScenario.S3_CAR], indirect=True) def test_argument_parsing_for_s3_car_scenario_with_stringed_time(self, load_params: LoadParams): load_params.load_time = "2d3h5min" expected_preset_args = [ "--size '11'", "--preload_obj '13'", "--no-verify-ssl", "--out 'pregen_json'", "--workers '7'", "--buckets '13'", "--location 's3_location'", "--ignore-errors", "--sleep '19'", "--acl 'acl'", ] expected_env_vars = { "DURATION": 183900, "WRITE_OBJ_SIZE": 11, "REGISTRY_FILE": "registry_file", "K6_OUT": "output", "K6_MIN_ITERATION_DURATION": "min_iteration_duration", "K6_SETUP_TIMEOUT": "setup_timeout", "NO_VERIFY_SSL": True, "MAX_WRITERS": 11, "MAX_READERS": 11, "MAX_DELETERS": 12, "PRE_ALLOC_DELETERS": 21, "PRE_ALLOC_READERS": 20, "PRE_ALLOC_WRITERS": 20, "PREGEN_JSON": "pregen_json", "TIME_UNIT": "time_unit", "WRITE_RATE": 10, "READ_RATE": 9, "READ_AGE": 8, "STREAMING": 9, "DELETE_RATE": 11, } self._check_preset_params(load_params, expected_preset_args) self._check_env_vars(load_params, expected_env_vars) @pytest.mark.parametrize("load_params", [LoadScenario.S3_CAR], indirect=True) def test_argument_parsing_for_s3_car_scenario(self, load_params: LoadParams): expected_preset_args = [ "--size '11'", "--preload_obj '13'", "--no-verify-ssl", "--out 'pregen_json'", "--workers '7'", "--buckets '13'", "--location 's3_location'", "--ignore-errors", "--sleep '19'", "--acl 'acl'", ] expected_env_vars = { "DURATION": 9, "WRITE_OBJ_SIZE": 11, "REGISTRY_FILE": "registry_file", "K6_OUT": "output", "K6_MIN_ITERATION_DURATION": "min_iteration_duration", "K6_SETUP_TIMEOUT": "setup_timeout", "NO_VERIFY_SSL": True, "MAX_WRITERS": 11, "MAX_READERS": 11, "MAX_DELETERS": 12, "PRE_ALLOC_DELETERS": 21, "PRE_ALLOC_READERS": 20, "PRE_ALLOC_WRITERS": 20, "PREGEN_JSON": "pregen_json", "TIME_UNIT": "time_unit", "WRITE_RATE": 10, "READ_RATE": 9, "READ_AGE": 8, "STREAMING": 9, "DELETE_RATE": 11, } self._check_preset_params(load_params, expected_preset_args) self._check_env_vars(load_params, expected_env_vars) @pytest.mark.parametrize("load_params", [LoadScenario.HTTP], indirect=True) def test_argument_parsing_for_http_scenario(self, load_params: LoadParams): load_params.preset.local = False expected_preset_args = [ "--no-verify-ssl", "--size '11'", "--preload_obj '13'", "--out 'pregen_json'", "--workers '7'", "--containers '16'", "--policy 'container_placement_policy'", "--ignore-errors", "--sleep '19'", "--acl 'acl'", ] expected_env_vars = { "DURATION": 9, "WRITE_OBJ_SIZE": 11, "K6_OUT": "output", "NO_VERIFY_SSL": True, "REGISTRY_FILE": "registry_file", "K6_MIN_ITERATION_DURATION": "min_iteration_duration", "K6_SETUP_TIMEOUT": "setup_timeout", "WRITERS": 7, "READERS": 7, "DELETERS": 8, "READ_AGE": 8, "STREAMING": 9, "PREGEN_JSON": "pregen_json", } self._check_preset_params(load_params, expected_preset_args) self._check_env_vars(load_params, expected_env_vars) @pytest.mark.parametrize("load_params", [LoadScenario.LOCAL], indirect=True) def test_argument_parsing_for_local_scenario(self, load_params: LoadParams): load_params.preset.local = False expected_preset_args = [ "--size '11'", "--preload_obj '13'", "--out 'pregen_json'", "--workers '7'", "--containers '16'", "--policy 'container_placement_policy'", "--ignore-errors", "--sleep '19'", "--acl 'acl'", ] expected_env_vars = { "CONFIG_FILE": "config_file", "DURATION": 9, "WRITE_OBJ_SIZE": 11, "K6_OUT": "output", "REGISTRY_FILE": "registry_file", "K6_MIN_ITERATION_DURATION": "min_iteration_duration", "K6_SETUP_TIMEOUT": "setup_timeout", "WRITERS": 7, "READERS": 7, "DELETERS": 8, "READ_AGE": 8, "STREAMING": 9, "PREGEN_JSON": "pregen_json", } self._check_preset_params(load_params, expected_preset_args) self._check_env_vars(load_params, expected_env_vars) @pytest.mark.parametrize("load_params, load_type", [(LoadScenario.VERIFY, LoadType.S3)], indirect=True) def test_argument_parsing_for_s3_verify_scenario(self, load_params: LoadParams): expected_env_vars = { "CLIENTS": 14, "REGISTRY_FILE": "registry_file", "K6_SETUP_TIMEOUT": "setup_timeout", "NO_VERIFY_SSL": True, "TIME_LIMIT": 11, } self._check_env_vars(load_params, expected_env_vars) @pytest.mark.parametrize("load_params, load_type", [(LoadScenario.VERIFY, LoadType.gRPC)], indirect=True) def test_argument_parsing_for_grpc_verify_scenario(self, load_params: LoadParams): expected_env_vars = { "CLIENTS": 14, "REGISTRY_FILE": "registry_file", "K6_SETUP_TIMEOUT": "setup_timeout", "NO_VERIFY_SSL": True, "TIME_LIMIT": 11, } self._check_env_vars(load_params, expected_env_vars) @pytest.mark.parametrize("load_params, set_empty", [(LoadScenario.gRPC, True)], indirect=True) def test_empty_argument_parsing_for_grpc_scenario(self, load_params: LoadParams): expected_preset_args = [ "--size '0'", "--preload_obj '0'", "--out ''", "--workers '0'", "--containers '0'", "--policy ''", "--sleep '0'", "--acl ''", ] expected_env_vars = { "DURATION": 0, "WRITE_OBJ_SIZE": 0, "REGISTRY_FILE": "", "K6_OUT": "", "K6_MIN_ITERATION_DURATION": "", "K6_SETUP_TIMEOUT": "", "WRITERS": 0, "READERS": 0, "DELETERS": 0, "READ_AGE": 0, "STREAMING": 0, "PREGEN_JSON": "", "PREPARE_LOCALLY": False, } self._check_preset_params(load_params, expected_preset_args) self._check_env_vars(load_params, expected_env_vars) @pytest.mark.parametrize("load_params, set_empty", [(LoadScenario.gRPC_CAR, True)], indirect=True) def test_empty_argument_parsing_for_grpc_car_scenario(self, load_params: LoadParams): expected_preset_args = [ "--size '0'", "--preload_obj '0'", "--out ''", "--workers '0'", "--containers '0'", "--policy ''", "--sleep '0'", "--acl ''", ] expected_env_vars = { "DURATION": 0, "WRITE_OBJ_SIZE": 0, "REGISTRY_FILE": "", "K6_OUT": "", "K6_MIN_ITERATION_DURATION": "", "K6_SETUP_TIMEOUT": "", "MAX_WRITERS": 0, "MAX_READERS": 0, "MAX_DELETERS": 0, "PRE_ALLOC_DELETERS": 0, "PRE_ALLOC_READERS": 0, "PRE_ALLOC_WRITERS": 0, "PREGEN_JSON": "", "TIME_UNIT": "", "WRITE_RATE": 0, "READ_RATE": 0, "DELETE_RATE": 0, "READ_AGE": 0, "STREAMING": 0, "PREPARE_LOCALLY": False, } self._check_preset_params(load_params, expected_preset_args) self._check_env_vars(load_params, expected_env_vars) @pytest.mark.parametrize("load_params, set_empty", [(LoadScenario.S3, True)], indirect=True) def test_empty_argument_parsing_for_s3_scenario(self, load_params: LoadParams): expected_preset_args = [ "--size '0'", "--preload_obj '0'", "--out ''", "--workers '0'", "--buckets '0'", "--location ''", "--sleep '0'", "--acl ''", ] expected_env_vars = { "DURATION": 0, "WRITE_OBJ_SIZE": 0, "REGISTRY_FILE": "", "K6_OUT": "", "K6_MIN_ITERATION_DURATION": "", "K6_SETUP_TIMEOUT": "", "WRITERS": 0, "READERS": 0, "DELETERS": 0, "READ_AGE": 0, "STREAMING": 0, "NO_VERIFY_SSL": False, "PREGEN_JSON": "", } self._check_preset_params(load_params, expected_preset_args) self._check_env_vars(load_params, expected_env_vars) @pytest.mark.parametrize("load_params, set_empty", [(LoadScenario.S3_CAR, True)], indirect=True) def test_empty_argument_parsing_for_s3_car_scenario(self, load_params: LoadParams): expected_preset_args = [ "--size '0'", "--preload_obj '0'", "--out ''", "--workers '0'", "--buckets '0'", "--location ''", "--sleep '0'", "--acl ''", ] expected_env_vars = { "DURATION": 0, "WRITE_OBJ_SIZE": 0, "REGISTRY_FILE": "", "K6_OUT": "", "K6_MIN_ITERATION_DURATION": "", "K6_SETUP_TIMEOUT": "", "NO_VERIFY_SSL": False, "MAX_WRITERS": 0, "MAX_READERS": 0, "MAX_DELETERS": 0, "PRE_ALLOC_DELETERS": 0, "PRE_ALLOC_READERS": 0, "PRE_ALLOC_WRITERS": 0, "PREGEN_JSON": "", "TIME_UNIT": "", "WRITE_RATE": 0, "READ_RATE": 0, "DELETE_RATE": 0, "READ_AGE": 0, "STREAMING": 0, } self._check_preset_params(load_params, expected_preset_args) self._check_env_vars(load_params, expected_env_vars) @pytest.mark.parametrize("load_params, set_empty", [(LoadScenario.HTTP, True)], indirect=True) def test_empty_argument_parsing_for_http_scenario(self, load_params: LoadParams): expected_preset_args = [ "--size '0'", "--preload_obj '0'", "--out ''", "--workers '0'", "--containers '0'", "--policy ''", "--sleep '0'", "--acl ''", ] expected_env_vars = { "DURATION": 0, "WRITE_OBJ_SIZE": 0, "NO_VERIFY_SSL": False, "REGISTRY_FILE": "", "K6_OUT": "", "K6_MIN_ITERATION_DURATION": "", "K6_SETUP_TIMEOUT": "", "WRITERS": 0, "READERS": 0, "DELETERS": 0, "READ_AGE": 0, "STREAMING": 0, "PREGEN_JSON": "", } self._check_preset_params(load_params, expected_preset_args) self._check_env_vars(load_params, expected_env_vars) @pytest.mark.parametrize("load_params, set_empty", [(LoadScenario.LOCAL, True)], indirect=True) def test_empty_argument_parsing_for_local_scenario(self, load_params: LoadParams): expected_preset_args = [ "--size '0'", "--preload_obj '0'", "--out ''", "--workers '0'", "--containers '0'", "--policy ''", "--sleep '0'", "--acl ''", ] expected_env_vars = { "CONFIG_FILE": "", "DURATION": 0, "WRITE_OBJ_SIZE": 0, "REGISTRY_FILE": "", "K6_OUT": "", "K6_MIN_ITERATION_DURATION": "", "K6_SETUP_TIMEOUT": "", "WRITERS": 0, "READERS": 0, "DELETERS": 0, "READ_AGE": 0, "STREAMING": 0, "PREGEN_JSON": "", } self._check_preset_params(load_params, expected_preset_args) self._check_env_vars(load_params, expected_env_vars) @pytest.mark.parametrize( "load_params, load_type, set_empty", [(LoadScenario.VERIFY, LoadType.S3, True)], indirect=True, ) def test_empty_argument_parsing_for_s3_verify_scenario(self, load_params: LoadParams): expected_env_vars = { "CLIENTS": 0, "REGISTRY_FILE": "", "K6_SETUP_TIMEOUT": "", "NO_VERIFY_SSL": False, "TIME_LIMIT": 0, } self._check_env_vars(load_params, expected_env_vars) @pytest.mark.parametrize( "load_params, load_type, set_empty", [(LoadScenario.VERIFY, LoadType.gRPC, True)], indirect=True, ) def test_argument_parsing_for_grpc_verify_scenario(self, load_params: LoadParams): expected_env_vars = { "CLIENTS": 0, "REGISTRY_FILE": "", "K6_SETUP_TIMEOUT": "", "NO_VERIFY_SSL": False, "TIME_LIMIT": 0, } self._check_env_vars(load_params, expected_env_vars) @pytest.mark.parametrize( "load_params, load_type", [(LoadScenario.gRPC, LoadType.gRPC)], indirect=True, ) @pytest.mark.parametrize( "load_time, expected_seconds", [ (300, 300), ("2d3h45min", 186300), ("1d6h", 108000), ("1d", 86400), ("1d1min", 86460), ("2h", 7200), ("2h2min", 7320), ], ) def test_convert_time_to_seconds(self, load_params: LoadParams, load_time: str | int, expected_seconds: int): load_params.load_time = load_time assert load_params.load_time == expected_seconds def _check_preset_params(self, load_params: LoadParams, expected_preset_args: list[str]): preset_parameters = load_params.get_preset_arguments() assert sorted(preset_parameters) == sorted(expected_preset_args) def _check_env_vars(self, load_params: LoadParams, expected_env_vars: dict[str, str]): env_vars = load_params.get_k6_vars() assert env_vars == expected_env_vars def _check_all_values_none(self, dataclass, skip_fields=None): if skip_fields is None: skip_fields = [] dataclass_fields = [field for field in fields(dataclass) if field.name not in skip_fields] for field in dataclass_fields: value = getattr(dataclass, field.name) assert value is None, f"{field.name} is not None" def _check_all_values_not_none(self, dataclass, skip_fields=None): if skip_fields is None: skip_fields = [] dataclass_fields = [field for field in fields(dataclass) if field.name not in skip_fields] for field in dataclass_fields: value = getattr(dataclass, field.name) assert value is not None, f"{field.name} is not None" def _get_filled_load_params( self, load_type: LoadType, load_scenario: LoadScenario, set_emtpy: bool = False ) -> LoadParams: load_type_map = { LoadScenario.S3: LoadType.S3, LoadScenario.S3_CAR: LoadType.S3, LoadScenario.gRPC: LoadType.gRPC, LoadScenario.gRPC_CAR: LoadType.gRPC, LoadScenario.LOCAL: LoadType.gRPC, LoadScenario.HTTP: LoadType.HTTP, } load_type = load_type_map[load_scenario] if not load_type else load_type load_params = LoadParams(load_type) load_params.scenario = load_scenario load_params.preset = Preset() meta_fields = self._get_meta_fields(load_params) for field in meta_fields: if ( getattr(field.instance, field.field.name) is None and load_params.scenario in field.field.metadata["applicable_scenarios"] ): value_to_set_map = { int: 0 if set_emtpy else len(field.field.name), str: "" if set_emtpy else field.field.name, bool: False if set_emtpy else True, } value_to_set = value_to_set_map[field.field_type] setattr(field.instance, field.field.name, value_to_set) return load_params def _get_actual_field_type(self, field: Field) -> type: return get_args(field.type)[0] if len(get_args(field.type)) else get_args(field.type) def _get_meta_fields(self, instance): data_fields = fields(instance) fields_with_data = [ MetaTestField(field, self._get_actual_field_type(field), instance) for field in data_fields if field.metadata ] for field in data_fields: actual_field_type = self._get_actual_field_type(field) if is_dataclass(actual_field_type) and getattr(instance, field.name): fields_with_data += self._get_meta_fields(getattr(instance, field.name)) return fields_with_data or []