frostfs-testlib/tests/test_load_config.py
2024-01-26 13:36:44 +03:00

702 lines
25 KiB
Python

from dataclasses import Field, dataclass, fields, is_dataclass
from typing import Any, get_args
import pytest
from frostfs_testlib.load.load_config import (
EndpointSelectionStrategy,
LoadParams,
LoadScenario,
LoadType,
Preset,
ReadFrom,
)
from frostfs_testlib.load.runners import DefaultRunner
from frostfs_testlib.resources.load_params import BACKGROUND_LOAD_DEFAULT_VU_INIT_TIME
from frostfs_testlib.storage.cluster import ClusterNode
from frostfs_testlib.storage.controllers.background_load_controller import BackgroundLoadController
from frostfs_testlib.storage.dataclasses.frostfs_services import StorageNode
from frostfs_testlib.storage.dataclasses.node_base import NodeBase
@dataclass
class MetaTestField:
field: Field
field_type: type
instance: Any
class TestLoadConfig:
@pytest.fixture
def set_empty(self, request: pytest.FixtureRequest):
# Workaround for verify
if "param" in request.__dict__ and request.param:
return request.param
return False
@pytest.fixture
def load_type(self, request: pytest.FixtureRequest):
# Workaround for verify
if "param" in request.__dict__ and request.param:
return request.param
return None
@pytest.fixture
def load_params(self, load_type: LoadType, set_empty: bool, request: pytest.FixtureRequest):
load_scenario = request.param
return self._get_filled_load_params(load_type, load_scenario, set_empty)
def test_load_params_only_load_type_required(self):
load_params = LoadParams(load_type=LoadType.S3)
expected = "s3"
assert repr(load_params) == expected
assert f"{load_params}" == expected
def test_load_params_init_time(self):
load_params = LoadParams(load_type=LoadType.S3)
vus = 100
load_params.vu_init_time = BACKGROUND_LOAD_DEFAULT_VU_INIT_TIME
# Used in time calculations
load_params.readers = vus
load_params.writers = vus
load_params.preallocated_readers = vus
load_params.preallocated_writers = vus
# Not used in time calculations
load_params.deleters = vus
load_params.preallocated_deleters = vus
expected = vus * 4 * BACKGROUND_LOAD_DEFAULT_VU_INIT_TIME
actual = load_params.get_init_time()
assert actual == expected, "Incorrect time for get_init_time()"
def test_load_params_initially_have_all_values_none(self):
load_params = LoadParams(load_type=LoadType.S3)
self._check_all_values_none(load_params, ["load_type", "scenario"])
def test_preset_initially_have_all_values_none(self):
preset = Preset()
self._check_all_values_none(preset)
@pytest.mark.parametrize("load_params", [LoadScenario.S3_CAR], indirect=True)
def test_string_representation_s3_car(self, load_params: LoadParams):
load_params.object_size = 524288
expected = "s3_car 512 MiB, write_rate=10, read_rate=9, delete_rate=11, preallocated_writers=20, preallocated_readers=20, preallocated_deleters=21"
assert f"{load_params}" == expected
assert repr(load_params) == expected
@pytest.mark.parametrize("load_params", [LoadScenario.gRPC], indirect=True)
def test_string_representation_grpc(self, load_params: LoadParams):
load_params.object_size = 512
expected = "grpc 512 KiB, writers=7, readers=7, deleters=8"
assert f"{load_params}" == expected
assert repr(load_params) == expected
@pytest.mark.parametrize("load_params", [LoadScenario.gRPC], indirect=True)
def test_load_controller_string_representation(self, load_params: LoadParams):
load_params.endpoint_selection_strategy = EndpointSelectionStrategy.ALL
load_params.object_size = 512
background_load_controller = BackgroundLoadController(
"tmp", load_params, "wallet", None, None, DefaultRunner(None)
)
expected = "grpc 512 KiB, writers=7, readers=7, deleters=8"
assert f"{background_load_controller}" == expected
assert repr(background_load_controller) == expected
def test_load_set_id_changes_fields(self):
load_params = LoadParams(load_type=LoadType.S3)
load_params.preset = Preset()
load_params.read_from = ReadFrom["REGISTRY"]
load_params.working_dir = "/tmp"
load_params.set_id("test_id")
assert load_params.registry_file == "/tmp/test_id_registry.bolt"
assert load_params.preset.pregen_json == "/tmp/test_id_prepare.json"
assert load_params.load_id == "test_id"
# No other values should be changed
self._check_all_values_none(
load_params,
[
"load_type",
"working_dir",
"load_id",
"registry_file",
"preset",
"scenario",
"read_from",
],
)
self._check_all_values_none(load_params.preset, ["pregen_json", "scenario"])
@pytest.mark.parametrize("load_params", [LoadScenario.gRPC], indirect=True)
def test_argument_parsing_for_grpc_scenario(self, load_params: LoadParams):
expected_preset_args = [
"--size '11'",
"--preload_obj '13'",
"--out 'pregen_json'",
"--workers '7'",
"--containers '16'",
"--policy 'container_placement_policy'",
"--ignore-errors",
"--sleep '19'",
"--local",
]
expected_env_vars = {
"DURATION": 9,
"WRITE_OBJ_SIZE": 11,
"REGISTRY_FILE": "registry_file",
"K6_MIN_ITERATION_DURATION": "min_iteration_duration",
"K6_SETUP_TIMEOUT": "setup_timeout",
"WRITERS": 7,
"READERS": 7,
"DELETERS": 8,
"READ_AGE": 8,
"PREGEN_JSON": "pregen_json",
"PREPARE_LOCALLY": True,
}
self._check_preset_params(load_params, expected_preset_args)
self._check_env_vars(load_params, expected_env_vars)
@pytest.mark.parametrize("load_params", [LoadScenario.gRPC_CAR], indirect=True)
def test_argument_parsing_for_grpc_car_scenario(self, load_params: LoadParams):
expected_preset_args = [
"--size '11'",
"--preload_obj '13'",
"--out 'pregen_json'",
"--workers '7'",
"--containers '16'",
"--policy 'container_placement_policy'",
"--ignore-errors",
"--sleep '19'",
"--local",
]
expected_env_vars = {
"DURATION": 9,
"WRITE_OBJ_SIZE": 11,
"REGISTRY_FILE": "registry_file",
"K6_MIN_ITERATION_DURATION": "min_iteration_duration",
"K6_SETUP_TIMEOUT": "setup_timeout",
"MAX_WRITERS": 11,
"MAX_READERS": 11,
"MAX_DELETERS": 12,
"PRE_ALLOC_DELETERS": 21,
"PRE_ALLOC_READERS": 20,
"PRE_ALLOC_WRITERS": 20,
"PREGEN_JSON": "pregen_json",
"TIME_UNIT": "time_unit",
"WRITE_RATE": 10,
"READ_RATE": 9,
"READ_AGE": 8,
"DELETE_RATE": 11,
"PREPARE_LOCALLY": True,
}
self._check_preset_params(load_params, expected_preset_args)
self._check_env_vars(load_params, expected_env_vars)
@pytest.mark.parametrize("load_params", [LoadScenario.S3], indirect=True)
def test_argument_parsing_for_s3_scenario(self, load_params: LoadParams):
expected_preset_args = [
"--size '11'",
"--preload_obj '13'",
"--no-verify-ssl",
"--out 'pregen_json'",
"--workers '7'",
"--buckets '13'",
"--location 's3_location'",
"--ignore-errors",
"--sleep '19'",
]
expected_env_vars = {
"DURATION": 9,
"WRITE_OBJ_SIZE": 11,
"REGISTRY_FILE": "registry_file",
"K6_MIN_ITERATION_DURATION": "min_iteration_duration",
"K6_SETUP_TIMEOUT": "setup_timeout",
"WRITERS": 7,
"READERS": 7,
"DELETERS": 8,
"READ_AGE": 8,
"NO_VERIFY_SSL": True,
"PREGEN_JSON": "pregen_json",
}
self._check_preset_params(load_params, expected_preset_args)
self._check_env_vars(load_params, expected_env_vars)
@pytest.mark.parametrize("load_params", [LoadScenario.S3_CAR], indirect=True)
def test_argument_parsing_for_s3_car_scenario_with_stringed_time(self, load_params: LoadParams):
load_params.load_time = "2d3h5min"
expected_preset_args = [
"--size '11'",
"--preload_obj '13'",
"--no-verify-ssl",
"--out 'pregen_json'",
"--workers '7'",
"--buckets '13'",
"--location 's3_location'",
"--ignore-errors",
"--sleep '19'",
]
expected_env_vars = {
"DURATION": 183900,
"WRITE_OBJ_SIZE": 11,
"REGISTRY_FILE": "registry_file",
"K6_MIN_ITERATION_DURATION": "min_iteration_duration",
"K6_SETUP_TIMEOUT": "setup_timeout",
"NO_VERIFY_SSL": True,
"MAX_WRITERS": 11,
"MAX_READERS": 11,
"MAX_DELETERS": 12,
"PRE_ALLOC_DELETERS": 21,
"PRE_ALLOC_READERS": 20,
"PRE_ALLOC_WRITERS": 20,
"PREGEN_JSON": "pregen_json",
"TIME_UNIT": "time_unit",
"WRITE_RATE": 10,
"READ_RATE": 9,
"READ_AGE": 8,
"DELETE_RATE": 11,
}
self._check_preset_params(load_params, expected_preset_args)
self._check_env_vars(load_params, expected_env_vars)
@pytest.mark.parametrize("load_params", [LoadScenario.S3_CAR], indirect=True)
def test_argument_parsing_for_s3_car_scenario(self, load_params: LoadParams):
expected_preset_args = [
"--size '11'",
"--preload_obj '13'",
"--no-verify-ssl",
"--out 'pregen_json'",
"--workers '7'",
"--buckets '13'",
"--location 's3_location'",
"--ignore-errors",
"--sleep '19'",
]
expected_env_vars = {
"DURATION": 9,
"WRITE_OBJ_SIZE": 11,
"REGISTRY_FILE": "registry_file",
"K6_MIN_ITERATION_DURATION": "min_iteration_duration",
"K6_SETUP_TIMEOUT": "setup_timeout",
"NO_VERIFY_SSL": True,
"MAX_WRITERS": 11,
"MAX_READERS": 11,
"MAX_DELETERS": 12,
"PRE_ALLOC_DELETERS": 21,
"PRE_ALLOC_READERS": 20,
"PRE_ALLOC_WRITERS": 20,
"PREGEN_JSON": "pregen_json",
"TIME_UNIT": "time_unit",
"WRITE_RATE": 10,
"READ_RATE": 9,
"READ_AGE": 8,
"DELETE_RATE": 11,
}
self._check_preset_params(load_params, expected_preset_args)
self._check_env_vars(load_params, expected_env_vars)
@pytest.mark.parametrize("load_params", [LoadScenario.HTTP], indirect=True)
def test_argument_parsing_for_http_scenario(self, load_params: LoadParams):
load_params.preset.local = False
expected_preset_args = [
"--no-verify-ssl",
"--size '11'",
"--preload_obj '13'",
"--out 'pregen_json'",
"--workers '7'",
"--containers '16'",
"--policy 'container_placement_policy'",
"--ignore-errors",
"--sleep '19'",
]
expected_env_vars = {
"DURATION": 9,
"WRITE_OBJ_SIZE": 11,
"NO_VERIFY_SSL": True,
"REGISTRY_FILE": "registry_file",
"K6_MIN_ITERATION_DURATION": "min_iteration_duration",
"K6_SETUP_TIMEOUT": "setup_timeout",
"WRITERS": 7,
"READERS": 7,
"DELETERS": 8,
"READ_AGE": 8,
"PREGEN_JSON": "pregen_json",
}
self._check_preset_params(load_params, expected_preset_args)
self._check_env_vars(load_params, expected_env_vars)
@pytest.mark.parametrize("load_params", [LoadScenario.LOCAL], indirect=True)
def test_argument_parsing_for_local_scenario(self, load_params: LoadParams):
load_params.preset.local = False
expected_preset_args = [
"--size '11'",
"--preload_obj '13'",
"--out 'pregen_json'",
"--workers '7'",
"--containers '16'",
"--policy 'container_placement_policy'",
"--ignore-errors",
"--sleep '19'",
]
expected_env_vars = {
"CONFIG_FILE": "config_file",
"DURATION": 9,
"WRITE_OBJ_SIZE": 11,
"REGISTRY_FILE": "registry_file",
"K6_MIN_ITERATION_DURATION": "min_iteration_duration",
"K6_SETUP_TIMEOUT": "setup_timeout",
"WRITERS": 7,
"READERS": 7,
"DELETERS": 8,
"READ_AGE": 8,
"PREGEN_JSON": "pregen_json",
}
self._check_preset_params(load_params, expected_preset_args)
self._check_env_vars(load_params, expected_env_vars)
@pytest.mark.parametrize("load_params, load_type", [(LoadScenario.VERIFY, LoadType.S3)], indirect=True)
def test_argument_parsing_for_s3_verify_scenario(self, load_params: LoadParams):
expected_env_vars = {
"CLIENTS": 14,
"REGISTRY_FILE": "registry_file",
"K6_SETUP_TIMEOUT": "setup_timeout",
"NO_VERIFY_SSL": True,
"TIME_LIMIT": 11,
}
self._check_env_vars(load_params, expected_env_vars)
@pytest.mark.parametrize("load_params, load_type", [(LoadScenario.VERIFY, LoadType.gRPC)], indirect=True)
def test_argument_parsing_for_grpc_verify_scenario(self, load_params: LoadParams):
expected_env_vars = {
"CLIENTS": 14,
"REGISTRY_FILE": "registry_file",
"K6_SETUP_TIMEOUT": "setup_timeout",
"NO_VERIFY_SSL": True,
"TIME_LIMIT": 11,
}
self._check_env_vars(load_params, expected_env_vars)
@pytest.mark.parametrize("load_params, set_empty", [(LoadScenario.gRPC, True)], indirect=True)
def test_empty_argument_parsing_for_grpc_scenario(self, load_params: LoadParams):
expected_preset_args = [
"--size '0'",
"--preload_obj '0'",
"--out ''",
"--workers '0'",
"--containers '0'",
"--policy ''",
"--sleep '0'",
]
expected_env_vars = {
"DURATION": 0,
"WRITE_OBJ_SIZE": 0,
"REGISTRY_FILE": "",
"K6_MIN_ITERATION_DURATION": "",
"K6_SETUP_TIMEOUT": "",
"WRITERS": 0,
"READERS": 0,
"DELETERS": 0,
"READ_AGE": 0,
"PREGEN_JSON": "",
"PREPARE_LOCALLY": False,
}
self._check_preset_params(load_params, expected_preset_args)
self._check_env_vars(load_params, expected_env_vars)
@pytest.mark.parametrize("load_params, set_empty", [(LoadScenario.gRPC_CAR, True)], indirect=True)
def test_empty_argument_parsing_for_grpc_car_scenario(self, load_params: LoadParams):
expected_preset_args = [
"--size '0'",
"--preload_obj '0'",
"--out ''",
"--workers '0'",
"--containers '0'",
"--policy ''",
"--sleep '0'",
]
expected_env_vars = {
"DURATION": 0,
"WRITE_OBJ_SIZE": 0,
"REGISTRY_FILE": "",
"K6_MIN_ITERATION_DURATION": "",
"K6_SETUP_TIMEOUT": "",
"MAX_WRITERS": 0,
"MAX_READERS": 0,
"MAX_DELETERS": 0,
"PRE_ALLOC_DELETERS": 0,
"PRE_ALLOC_READERS": 0,
"PRE_ALLOC_WRITERS": 0,
"PREGEN_JSON": "",
"TIME_UNIT": "",
"WRITE_RATE": 0,
"READ_RATE": 0,
"DELETE_RATE": 0,
"READ_AGE": 0,
"PREPARE_LOCALLY": False,
}
self._check_preset_params(load_params, expected_preset_args)
self._check_env_vars(load_params, expected_env_vars)
@pytest.mark.parametrize("load_params, set_empty", [(LoadScenario.S3, True)], indirect=True)
def test_empty_argument_parsing_for_s3_scenario(self, load_params: LoadParams):
expected_preset_args = [
"--size '0'",
"--preload_obj '0'",
"--out ''",
"--workers '0'",
"--buckets '0'",
"--location ''",
"--sleep '0'",
]
expected_env_vars = {
"DURATION": 0,
"WRITE_OBJ_SIZE": 0,
"REGISTRY_FILE": "",
"K6_MIN_ITERATION_DURATION": "",
"K6_SETUP_TIMEOUT": "",
"WRITERS": 0,
"READERS": 0,
"DELETERS": 0,
"READ_AGE": 0,
"NO_VERIFY_SSL": False,
"PREGEN_JSON": "",
}
self._check_preset_params(load_params, expected_preset_args)
self._check_env_vars(load_params, expected_env_vars)
@pytest.mark.parametrize("load_params, set_empty", [(LoadScenario.S3_CAR, True)], indirect=True)
def test_empty_argument_parsing_for_s3_car_scenario(self, load_params: LoadParams):
expected_preset_args = [
"--size '0'",
"--preload_obj '0'",
"--out ''",
"--workers '0'",
"--buckets '0'",
"--location ''",
"--sleep '0'",
]
expected_env_vars = {
"DURATION": 0,
"WRITE_OBJ_SIZE": 0,
"REGISTRY_FILE": "",
"K6_MIN_ITERATION_DURATION": "",
"K6_SETUP_TIMEOUT": "",
"NO_VERIFY_SSL": False,
"MAX_WRITERS": 0,
"MAX_READERS": 0,
"MAX_DELETERS": 0,
"PRE_ALLOC_DELETERS": 0,
"PRE_ALLOC_READERS": 0,
"PRE_ALLOC_WRITERS": 0,
"PREGEN_JSON": "",
"TIME_UNIT": "",
"WRITE_RATE": 0,
"READ_RATE": 0,
"DELETE_RATE": 0,
"READ_AGE": 0,
}
self._check_preset_params(load_params, expected_preset_args)
self._check_env_vars(load_params, expected_env_vars)
@pytest.mark.parametrize("load_params, set_empty", [(LoadScenario.HTTP, True)], indirect=True)
def test_empty_argument_parsing_for_http_scenario(self, load_params: LoadParams):
expected_preset_args = [
"--size '0'",
"--preload_obj '0'",
"--out ''",
"--workers '0'",
"--containers '0'",
"--policy ''",
"--sleep '0'",
]
expected_env_vars = {
"DURATION": 0,
"WRITE_OBJ_SIZE": 0,
"NO_VERIFY_SSL": False,
"REGISTRY_FILE": "",
"K6_MIN_ITERATION_DURATION": "",
"K6_SETUP_TIMEOUT": "",
"WRITERS": 0,
"READERS": 0,
"DELETERS": 0,
"READ_AGE": 0,
"PREGEN_JSON": "",
}
self._check_preset_params(load_params, expected_preset_args)
self._check_env_vars(load_params, expected_env_vars)
@pytest.mark.parametrize("load_params, set_empty", [(LoadScenario.LOCAL, True)], indirect=True)
def test_empty_argument_parsing_for_local_scenario(self, load_params: LoadParams):
expected_preset_args = [
"--size '0'",
"--preload_obj '0'",
"--out ''",
"--workers '0'",
"--containers '0'",
"--policy ''",
"--sleep '0'",
]
expected_env_vars = {
"CONFIG_FILE": "",
"DURATION": 0,
"WRITE_OBJ_SIZE": 0,
"REGISTRY_FILE": "",
"K6_MIN_ITERATION_DURATION": "",
"K6_SETUP_TIMEOUT": "",
"WRITERS": 0,
"READERS": 0,
"DELETERS": 0,
"READ_AGE": 0,
"PREGEN_JSON": "",
}
self._check_preset_params(load_params, expected_preset_args)
self._check_env_vars(load_params, expected_env_vars)
@pytest.mark.parametrize(
"load_params, load_type, set_empty",
[(LoadScenario.VERIFY, LoadType.S3, True)],
indirect=True,
)
def test_empty_argument_parsing_for_s3_verify_scenario(self, load_params: LoadParams):
expected_env_vars = {
"CLIENTS": 0,
"REGISTRY_FILE": "",
"K6_SETUP_TIMEOUT": "",
"NO_VERIFY_SSL": False,
"TIME_LIMIT": 0,
}
self._check_env_vars(load_params, expected_env_vars)
@pytest.mark.parametrize(
"load_params, load_type, set_empty",
[(LoadScenario.VERIFY, LoadType.gRPC, True)],
indirect=True,
)
def test_argument_parsing_for_grpc_verify_scenario(self, load_params: LoadParams):
expected_env_vars = {
"CLIENTS": 0,
"REGISTRY_FILE": "",
"K6_SETUP_TIMEOUT": "",
"NO_VERIFY_SSL": False,
"TIME_LIMIT": 0,
}
self._check_env_vars(load_params, expected_env_vars)
@pytest.mark.parametrize(
"load_params, load_type",
[(LoadScenario.gRPC, LoadType.gRPC)],
indirect=True,
)
@pytest.mark.parametrize(
"load_time, expected_seconds",
[
(300, 300),
("2d3h45min", 186300),
("1d6h", 108000),
("1d", 86400),
("1d1min", 86460),
("2h", 7200),
("2h2min", 7320),
],
)
def test_convert_time_to_seconds(self, load_params: LoadParams, load_time: str | int, expected_seconds: int):
load_params.load_time = load_time
assert load_params.load_time == expected_seconds
def _check_preset_params(self, load_params: LoadParams, expected_preset_args: list[str]):
preset_parameters = load_params.get_preset_arguments()
assert sorted(preset_parameters) == sorted(expected_preset_args)
def _check_env_vars(self, load_params: LoadParams, expected_env_vars: dict[str, str]):
env_vars = load_params.get_env_vars()
assert env_vars == expected_env_vars
def _check_all_values_none(self, dataclass, skip_fields=None):
if skip_fields is None:
skip_fields = []
dataclass_fields = [field for field in fields(dataclass) if field.name not in skip_fields]
for field in dataclass_fields:
value = getattr(dataclass, field.name)
assert value is None, f"{field.name} is not None"
def _check_all_values_not_none(self, dataclass, skip_fields=None):
if skip_fields is None:
skip_fields = []
dataclass_fields = [field for field in fields(dataclass) if field.name not in skip_fields]
for field in dataclass_fields:
value = getattr(dataclass, field.name)
assert value is not None, f"{field.name} is not None"
def _get_filled_load_params(
self, load_type: LoadType, load_scenario: LoadScenario, set_emtpy: bool = False
) -> LoadParams:
load_type_map = {
LoadScenario.S3: LoadType.S3,
LoadScenario.S3_CAR: LoadType.S3,
LoadScenario.gRPC: LoadType.gRPC,
LoadScenario.gRPC_CAR: LoadType.gRPC,
LoadScenario.LOCAL: LoadType.gRPC,
LoadScenario.HTTP: LoadType.HTTP,
}
load_type = load_type_map[load_scenario] if not load_type else load_type
load_params = LoadParams(load_type)
load_params.scenario = load_scenario
load_params.preset = Preset()
meta_fields = self._get_meta_fields(load_params)
for field in meta_fields:
if (
getattr(field.instance, field.field.name) is None
and load_params.scenario in field.field.metadata["applicable_scenarios"]
):
value_to_set_map = {
int: 0 if set_emtpy else len(field.field.name),
str: "" if set_emtpy else field.field.name,
bool: False if set_emtpy else True,
}
value_to_set = value_to_set_map[field.field_type]
setattr(field.instance, field.field.name, value_to_set)
return load_params
def _get_actual_field_type(self, field: Field) -> type:
return get_args(field.type)[0] if len(get_args(field.type)) else get_args(field.type)
def _get_meta_fields(self, instance):
data_fields = fields(instance)
fields_with_data = [
MetaTestField(field, self._get_actual_field_type(field), instance)
for field in data_fields
if field.metadata
]
for field in data_fields:
actual_field_type = self._get_actual_field_type(field)
if is_dataclass(actual_field_type) and getattr(instance, field.name):
fields_with_data += self._get_meta_fields(getattr(instance, field.name))
return fields_with_data or []