Compare commits

...

21 commits

Author SHA1 Message Date
4f3814690e [TrueCloudLab/xk6-frostfs#125] Add acl option
Signed-off-by: Andrey Berezin <a.berezin@yadro.com>
2024-02-05 18:53:33 +03:00
d79fd87ede [#174] Add flag to remove registry file
Signed-off-by: Andrey Berezin <a.berezin@yadro.com>
2024-02-05 12:43:09 +03:00
8ba2cb8030 [#171] Components versions check
Components versions check

Signed-off-by: Mikhail Kadilov m.kadilov@yadro.com
2024-02-01 09:12:58 +00:00
6caa77dedf [#172] parallel get remote binaries versions 2024-01-31 16:42:30 +03:00
0d7a15877c [#169] Update metrics
Signed-off-by: Andrey Berezin <a.berezin@yadro.com>
2024-01-26 15:29:02 +03:00
82f9df088a [#167] Strip components for new xk6 archive and update unit tests
Signed-off-by: Andrey Berezin <a.berezin@yadro.com>
2024-01-26 13:35:42 +03:00
e04fac0770 [#164] Add local flag to preset in load
Signed-off-by: Andrey Berezin <a.berezin@yadro.com>
2024-01-22 19:06:38 +03:00
328e43fe67 [#162] Refactor frostfs-cli functional
Signed-off-by: Dmitriy Zayakin <d.zayakin@yadro.com>
2024-01-22 13:11:59 +00:00
c0a25ab699 Support of custom version parameter instead of --version for all bins 2024-01-18 10:41:36 +03:00
40fa2c24cc rename local_config_path 2024-01-12 20:25:39 +03:00
be36a10f1e [#157] fix for dev-env and unit-tests
Signed-off-by: Andrey Berezin <a.berezin@yadro.com>
2024-01-12 16:42:19 +00:00
df8d99d83c [#156] load_time in the format of days, hours and minutes; new params
Signed-off-by: Liza <e.chichindaeva@yadro.com>
2024-01-12 16:45:18 +03:00
d6a2cf92a2 [#155] Change args to optionally
Signed-off-by: Dmitriy Zayakin <d.zayakin@yadro.com>
2024-01-12 08:40:51 +00:00
a3bda0b34f [#154] Change func search container
Signed-off-by: Dmitriy Zayakin <d.zayakin@yadro.com>
2024-01-11 13:42:02 +03:00
a4d1082ed5 Shards are attribute of StorageNode class 2023-12-22 16:07:20 +00:00
73c362c307 [#153] Fix stat calculation and add error threshold
Signed-off-by: Andrey Berezin <a.berezin@yadro.com>
2023-12-22 11:33:50 +03:00
10a6efa333 [#151] Refactor load report
Signed-off-by: Andrey Berezin <a.berezin@yadro.com>
2023-12-20 13:03:32 +00:00
663c144709 Search container by name using HTTP requests 2023-12-15 13:14:21 +00:00
8e739adea5 [#150] Increased the status waiting timeout
Signed-off-by: Dmitriy Zayakin <d.zayakin@yadro.com>
2023-12-15 13:13:09 +03:00
3d63772f4a [#148] Add support for custom registry during read operations
Signed-off-by: Andrey Berezin <a.berezin@yadro.com>
2023-12-14 14:03:19 +00:00
02f3ef6b40 [#147] Provide custom environment to ssh connection
Signed-off-by: Dmitry Anurin <d.anurin@yadro.com>
2023-12-14 12:53:51 +03:00
31 changed files with 737 additions and 462 deletions

View file

@ -51,11 +51,11 @@ basic = "frostfs_testlib.healthcheck.basic_healthcheck:BasicHealthcheck"
config = "frostfs_testlib.storage.controllers.state_managers.config_state_manager:ConfigStateManager"
[project.entry-points."frostfs.testlib.services"]
s = "frostfs_testlib.storage.dataclasses.frostfs_services:StorageNode"
s3-gate = "frostfs_testlib.storage.dataclasses.frostfs_services:S3Gate"
http-gate = "frostfs_testlib.storage.dataclasses.frostfs_services:HTTPGate"
morph-chain = "frostfs_testlib.storage.dataclasses.frostfs_services:MorphChain"
ir = "frostfs_testlib.storage.dataclasses.frostfs_services:InnerRing"
frostfs-storage = "frostfs_testlib.storage.dataclasses.frostfs_services:StorageNode"
frostfs-s3 = "frostfs_testlib.storage.dataclasses.frostfs_services:S3Gate"
frostfs-http = "frostfs_testlib.storage.dataclasses.frostfs_services:HTTPGate"
neo-go = "frostfs_testlib.storage.dataclasses.frostfs_services:MorphChain"
frostfs-ir = "frostfs_testlib.storage.dataclasses.frostfs_services:InnerRing"
[tool.isort]
profile = "black"

View file

@ -65,7 +65,6 @@ class FrostfsCliContainer(CliCommand):
ttl: Optional[int] = None,
xhdr: Optional[dict] = None,
force: bool = False,
timeout: Optional[str] = None,
) -> CommandResult:
"""
Delete an existing container.
@ -81,7 +80,6 @@ class FrostfsCliContainer(CliCommand):
ttl: TTL value in request meta header (default 2).
wallet: WIF (NEP-2) string or path to the wallet or binary key.
xhdr: Dict with request X-Headers.
timeout: Timeout for the operation (default 15s).
Returns:
Command's result.
@ -298,9 +296,5 @@ class FrostfsCliContainer(CliCommand):
return self._execute(
f"container nodes {from_str}",
**{
param: value
for param, value in locals().items()
if param not in ["self", "from_file", "from_str"]
},
**{param: value for param, value in locals().items() if param not in ["self", "from_file", "from_str"]},
)

View file

@ -124,9 +124,7 @@ class FrostfsCliObject(CliCommand):
"""
return self._execute(
"object hash",
**{
param: value for param, value in locals().items() if param not in ["self", "params"]
},
**{param: value for param, value in locals().items() if param not in ["self", "params"]},
)
def head(
@ -355,8 +353,8 @@ class FrostfsCliObject(CliCommand):
def nodes(
self,
rpc_endpoint: str,
wallet: str,
cid: str,
wallet: Optional[str] = None,
address: Optional[str] = None,
bearer: Optional[str] = None,
generate_key: Optional = None,

View file

@ -68,11 +68,7 @@ class FrostfsCliShards(CliCommand):
return self._execute_with_password(
"control shards set-mode",
wallet_password,
**{
param: value
for param, value in locals().items()
if param not in ["self", "wallet_password"]
},
**{param: value for param, value in locals().items() if param not in ["self", "wallet_password"]},
)
def dump(
@ -105,18 +101,14 @@ class FrostfsCliShards(CliCommand):
return self._execute_with_password(
"control shards dump",
wallet_password,
**{
param: value
for param, value in locals().items()
if param not in ["self", "wallet_password"]
},
**{param: value for param, value in locals().items() if param not in ["self", "wallet_password"]},
)
def list(
self,
endpoint: str,
wallet: str,
wallet_password: str,
wallet: Optional[str] = None,
wallet_password: Optional[str] = None,
address: Optional[str] = None,
json_mode: bool = False,
timeout: Optional[str] = None,
@ -135,12 +127,13 @@ class FrostfsCliShards(CliCommand):
Returns:
Command's result.
"""
if not wallet_password:
return self._execute(
"control shards list",
**{param: value for param, value in locals().items() if param not in ["self"]},
)
return self._execute_with_password(
"control shards list",
wallet_password,
**{
param: value
for param, value in locals().items()
if param not in ["self", "wallet_password"]
},
**{param: value for param, value in locals().items() if param not in ["self", "wallet_password"]},
)

View file

@ -1,5 +1,5 @@
class Options:
DEFAULT_SHELL_TIMEOUT = 90
DEFAULT_SHELL_TIMEOUT = 120
@staticmethod
def get_default_shell_timeout():

View file

@ -67,6 +67,7 @@ class HostConfig:
clis: list[CLIConfig] = field(default_factory=list)
attributes: dict[str, str] = field(default_factory=dict)
interfaces: dict[str, str] = field(default_factory=dict)
environment: dict[str, str] = field(default_factory=dict)
def __post_init__(self) -> None:
self.services = [ServiceConfig(**service) for service in self.services or []]

View file

@ -152,9 +152,7 @@ class DockerHost(Host):
timeout=service_attributes.start_timeout,
)
def wait_for_service_to_be_in_state(
self, systemd_service_name: str, expected_state: str, timeout: int
) -> None:
def wait_for_service_to_be_in_state(self, systemd_service_name: str, expected_state: str, timeout: int) -> None:
raise NotImplementedError("Not implemented for docker")
def get_data_directory(self, service_name: str) -> str:
@ -181,6 +179,12 @@ class DockerHost(Host):
def delete_pilorama(self, service_name: str) -> None:
raise NotImplementedError("Not implemented for docker")
def delete_file(self, file_path: str) -> None:
raise NotImplementedError("Not implemented for docker")
def is_file_exist(self, file_path: str) -> None:
raise NotImplementedError("Not implemented for docker")
def delete_storage_node_data(self, service_name: str, cache_only: bool = False) -> None:
volume_path = self.get_data_directory(service_name)
@ -305,9 +309,7 @@ class DockerHost(Host):
return container
return None
def _wait_for_container_to_be_in_state(
self, container_name: str, expected_state: str, timeout: int
) -> None:
def _wait_for_container_to_be_in_state(self, container_name: str, expected_state: str, timeout: int) -> None:
iterations = 10
iteration_wait_time = timeout / iterations

View file

@ -219,12 +219,22 @@ class Host(ABC):
"""
@abstractmethod
def delete_pilorama(self, service_name: str) -> None:
def delete_file(self, file_path: str) -> None:
"""
Deletes all pilorama.db files in the node.
Deletes file with provided file path
Args:
service_name: Name of storage node service.
file_path: full path to the file to delete
"""
@abstractmethod
def is_file_exist(self, file_path: str) -> bool:
"""
Checks if file exist
Args:
file_path: full path to the file to check
"""

View file

@ -0,0 +1,94 @@
from dataclasses import dataclass, field
from frostfs_testlib.load.load_config import LoadParams, LoadScenario
from frostfs_testlib.load.load_metrics import get_metrics_object
@dataclass
class SummarizedErorrs:
total: int = field(default_factory=int)
percent: float = field(default_factory=float)
threshold: float = field(default_factory=float)
by_node: dict[str, int] = field(default_factory=dict)
def calc_stats(self, operations):
self.total += sum(self.by_node.values())
if not operations:
return
self.percent = self.total / operations * 100
@dataclass
class SummarizedLatencies:
avg: float = field(default_factory=float)
min: float = field(default_factory=float)
max: float = field(default_factory=float)
by_node: dict[str, dict[str, int]] = field(default_factory=dict)
def calc_stats(self):
if not self.by_node:
return
avgs = [lt["avg"] for lt in self.by_node.values()]
self.avg = sum(avgs) / len(avgs)
minimal = [lt["min"] for lt in self.by_node.values()]
self.min = min(minimal)
maximum = [lt["max"] for lt in self.by_node.values()]
self.max = max(maximum)
@dataclass
class SummarizedStats:
threads: int = field(default_factory=int)
requested_rate: int = field(default_factory=int)
operations: int = field(default_factory=int)
rate: float = field(default_factory=float)
throughput: float = field(default_factory=float)
latencies: SummarizedLatencies = field(default_factory=SummarizedLatencies)
errors: SummarizedErorrs = field(default_factory=SummarizedErorrs)
passed: bool = True
def calc_stats(self):
self.errors.calc_stats(self.operations)
self.latencies.calc_stats()
self.passed = self.errors.percent <= self.errors.threshold
@staticmethod
def collect(load_params: LoadParams, load_summaries: dict) -> dict[str, "SummarizedStats"]:
if load_params.scenario in [LoadScenario.gRPC_CAR, LoadScenario.S3_CAR]:
delete_vus = max(load_params.preallocated_deleters or 0, load_params.max_deleters or 0)
write_vus = max(load_params.preallocated_writers or 0, load_params.max_writers or 0)
read_vus = max(load_params.preallocated_readers or 0, load_params.max_readers or 0)
else:
write_vus = load_params.writers
read_vus = load_params.readers
delete_vus = load_params.deleters
summarized = {
"Write": SummarizedStats(threads=write_vus, requested_rate=load_params.write_rate),
"Read": SummarizedStats(threads=read_vus, requested_rate=load_params.read_rate),
"Delete": SummarizedStats(threads=delete_vus, requested_rate=load_params.delete_rate),
}
for node_key, load_summary in load_summaries.items():
metrics = get_metrics_object(load_params.scenario, load_summary)
for operation in metrics.operations:
target = summarized[operation._NAME]
if not operation.total_iterations:
continue
target.operations += operation.total_iterations
target.rate += operation.rate
target.latencies.by_node[node_key] = operation.latency
target.throughput += operation.throughput
target.errors.threshold = load_params.error_threshold
if operation.failed_iterations:
target.errors.by_node[node_key] = operation.failed_iterations
for operation in summarized.values():
operation.calc_stats()
return summarized

View file

@ -3,11 +3,28 @@ import os
from dataclasses import dataclass, field, fields, is_dataclass
from enum import Enum
from types import MappingProxyType
from typing import Any, Optional, get_args
from typing import Any, Callable, Optional, get_args
from frostfs_testlib.utils.converting_utils import calc_unit
def convert_time_to_seconds(time: int | str | None) -> int:
if time is None:
return None
if str(time).isdigit():
seconds = int(time)
else:
days, hours, minutes = 0, 0, 0
if "d" in time:
days, time = time.split("d")
if "h" in time:
hours, time = time.split("h")
if "min" in time:
minutes = time.replace("min", "")
seconds = int(days) * 86400 + int(hours) * 3600 + int(minutes) * 60
return seconds
class LoadType(Enum):
gRPC = "grpc"
S3 = "s3"
@ -76,6 +93,7 @@ def metadata_field(
scenario_variable: Optional[str] = None,
string_repr: Optional[bool] = True,
distributed: Optional[bool] = False,
formatter: Optional[Callable] = None,
):
return field(
default=None,
@ -85,6 +103,7 @@ def metadata_field(
"env_variable": scenario_variable,
"string_repr": string_repr,
"distributed": distributed,
"formatter": formatter,
},
)
@ -128,6 +147,8 @@ class Preset:
pregen_json: Optional[str] = metadata_field(all_load_scenarios, "out", "PREGEN_JSON", False)
# Workers count for preset
workers: Optional[int] = metadata_field(all_load_scenarios, "workers", None, False)
# Acl for container/buckets
acl: Optional[str] = metadata_field(all_load_scenarios, "acl", None, False)
# ------ GRPC ------
# Amount of containers which should be created
@ -147,6 +168,9 @@ class Preset:
# Flag to control preset erorrs
ignore_errors: Optional[bool] = metadata_field(all_load_scenarios, "ignore-errors", None, False)
# Flag to ensure created containers store data on local endpoints
local: Optional[bool] = metadata_field(grpc_preset_scenarios, "local", None, False)
@dataclass
class LoadParams:
@ -200,13 +224,21 @@ class LoadParams:
# ------- COMMON SCENARIO PARAMS -------
# Load time is the maximum duration for k6 to give load. Default is the BACKGROUND_LOAD_DEFAULT_TIME value.
load_time: Optional[int] = metadata_field(all_load_scenarios, None, "DURATION", False)
load_time: Optional[int] = metadata_field(
all_load_scenarios, None, "DURATION", False, formatter=convert_time_to_seconds
)
# Object size in KB for load and preset.
object_size: Optional[int] = metadata_field(all_load_scenarios, "size", "WRITE_OBJ_SIZE", False)
# For read operations, controls from which set get objects to read
read_from: Optional[ReadFrom] = None
# For read operations done from REGISTRY, controls delay which object should live before it will be used for read operation
read_age: Optional[int] = metadata_field(all_load_scenarios, None, "READ_AGE", False)
# Output registry K6 file. Filled automatically.
registry_file: Optional[str] = metadata_field(all_scenarios, None, "REGISTRY_FILE", False)
# In case if we want to use custom registry file left from another load run
custom_registry: Optional[str] = None
# In case if we want to use custom registry file left from another load run
force_fresh_registry: Optional[bool] = None
# Specifies the minimum duration of every single execution (i.e. iteration).
# Any iterations that are shorter than this value will cause that VU to
# sleep for the remainder of the time until the specified minimum duration is reached.
@ -294,6 +326,11 @@ class LoadParams:
if self.read_from == ReadFrom.REGISTRY:
self.registry_file = os.path.join(self.working_dir, f"{load_id}_registry.bolt")
# For now it's okay to have it this way
if self.custom_registry is not None:
self.registry_file = self.custom_registry
if self.read_from == ReadFrom.PRESET:
self.registry_file = None
@ -375,6 +412,25 @@ class LoadParams:
return fields_with_data or []
def _get_field_formatter(self, field_name: str) -> Callable | None:
data_fields = fields(self)
formatters = [
field.metadata["formatter"]
for field in data_fields
if field.name == field_name and "formatter" in field.metadata and field.metadata["formatter"] != None
]
if formatters:
return formatters[0]
return None
def __setattr__(self, field_name, value):
formatter = self._get_field_formatter(field_name)
if formatter:
value = formatter(value)
super().__setattr__(field_name, value)
def __str__(self) -> str:
load_type_str = self.scenario.value if self.scenario else self.load_type.value
# TODO: migrate load_params defaults to testlib

View file

@ -1,95 +1,43 @@
from abc import ABC
from typing import Any
from typing import Any, Optional
from frostfs_testlib.load.load_config import LoadScenario
class MetricsBase(ABC):
_WRITE_SUCCESS = ""
_WRITE_ERRORS = ""
_WRITE_THROUGHPUT = "data_sent"
_WRITE_LATENCY = ""
_READ_SUCCESS = ""
_READ_ERRORS = ""
_READ_LATENCY = ""
_READ_THROUGHPUT = "data_received"
_DELETE_SUCCESS = ""
_DELETE_LATENCY = ""
_DELETE_ERRORS = ""
class OperationMetric(ABC):
_NAME = ""
_SUCCESS = ""
_ERRORS = ""
_THROUGHPUT = ""
_LATENCY = ""
def __init__(self, summary) -> None:
self.summary = summary
self.metrics = summary["metrics"]
@property
def write_total_iterations(self) -> int:
return self._get_metric(self._WRITE_SUCCESS) + self._get_metric(self._WRITE_ERRORS)
def total_iterations(self) -> int:
return self._get_metric(self._SUCCESS) + self._get_metric(self._ERRORS)
@property
def write_success_iterations(self) -> int:
return self._get_metric(self._WRITE_SUCCESS)
def success_iterations(self) -> int:
return self._get_metric(self._SUCCESS)
@property
def write_latency(self) -> dict:
return self._get_metric(self._WRITE_LATENCY)
def latency(self) -> dict:
return self._get_metric(self._LATENCY)
@property
def write_rate(self) -> float:
return self._get_metric_rate(self._WRITE_SUCCESS)
def rate(self) -> float:
return self._get_metric_rate(self._SUCCESS)
@property
def write_failed_iterations(self) -> int:
return self._get_metric(self._WRITE_ERRORS)
def failed_iterations(self) -> int:
return self._get_metric(self._ERRORS)
@property
def write_throughput(self) -> float:
return self._get_metric_rate(self._WRITE_THROUGHPUT)
@property
def read_total_iterations(self) -> int:
return self._get_metric(self._READ_SUCCESS) + self._get_metric(self._READ_ERRORS)
@property
def read_success_iterations(self) -> int:
return self._get_metric(self._READ_SUCCESS)
@property
def read_latency(self) -> dict:
return self._get_metric(self._READ_LATENCY)
@property
def read_rate(self) -> int:
return self._get_metric_rate(self._READ_SUCCESS)
@property
def read_failed_iterations(self) -> int:
return self._get_metric(self._READ_ERRORS)
@property
def read_throughput(self) -> float:
return self._get_metric_rate(self._READ_THROUGHPUT)
@property
def delete_total_iterations(self) -> int:
return self._get_metric(self._DELETE_SUCCESS) + self._get_metric(self._DELETE_ERRORS)
@property
def delete_success_iterations(self) -> int:
return self._get_metric(self._DELETE_SUCCESS)
@property
def delete_latency(self) -> dict:
return self._get_metric(self._DELETE_LATENCY)
@property
def delete_failed_iterations(self) -> int:
return self._get_metric(self._DELETE_ERRORS)
@property
def delete_rate(self) -> int:
return self._get_metric_rate(self._DELETE_SUCCESS)
def throughput(self) -> float:
return self._get_metric_rate(self._THROUGHPUT)
def _get_metric(self, metric: str) -> int:
metrics_method_map = {
@ -104,9 +52,7 @@ class MetricsBase(ABC):
metric = self.metrics[metric]
metric_type = metric["type"]
if metric_type not in metrics_method_map:
raise Exception(
f"Unsupported metric type: {metric_type}, supported: {metrics_method_map.keys()}"
)
raise Exception(f"Unsupported metric type: {metric_type}, supported: {metrics_method_map.keys()}")
return metrics_method_map[metric_type](metric)
@ -119,9 +65,7 @@ class MetricsBase(ABC):
metric = self.metrics[metric]
metric_type = metric["type"]
if metric_type not in metrics_method_map:
raise Exception(
f"Unsupported rate metric type: {metric_type}, supported: {metrics_method_map.keys()}"
)
raise Exception(f"Unsupported rate metric type: {metric_type}, supported: {metrics_method_map.keys()}")
return metrics_method_map[metric_type](metric)
@ -138,63 +82,145 @@ class MetricsBase(ABC):
return metric["values"]
class WriteOperationMetric(OperationMetric):
_NAME = "Write"
_SUCCESS = ""
_ERRORS = ""
_THROUGHPUT = "data_sent"
_LATENCY = ""
class ReadOperationMetric(OperationMetric):
_NAME = "Read"
_SUCCESS = ""
_ERRORS = ""
_THROUGHPUT = "data_received"
_LATENCY = ""
class DeleteOperationMetric(OperationMetric):
_NAME = "Delete"
_SUCCESS = ""
_ERRORS = ""
_THROUGHPUT = ""
_LATENCY = ""
class GrpcWriteOperationMetric(WriteOperationMetric):
_SUCCESS = "frostfs_obj_put_success"
_ERRORS = "frostfs_obj_put_fails"
_LATENCY = "frostfs_obj_put_duration"
class GrpcReadOperationMetric(ReadOperationMetric):
_SUCCESS = "frostfs_obj_get_success"
_ERRORS = "frostfs_obj_get_fails"
_LATENCY = "frostfs_obj_get_duration"
class GrpcDeleteOperationMetric(DeleteOperationMetric):
_SUCCESS = "frostfs_obj_delete_success"
_ERRORS = "frostfs_obj_delete_fails"
_LATENCY = "frostfs_obj_delete_duration"
class S3WriteOperationMetric(WriteOperationMetric):
_SUCCESS = "aws_obj_put_success"
_ERRORS = "aws_obj_put_fails"
_LATENCY = "aws_obj_put_duration"
class S3ReadOperationMetric(ReadOperationMetric):
_SUCCESS = "aws_obj_get_success"
_ERRORS = "aws_obj_get_fails"
_LATENCY = "aws_obj_get_duration"
class S3DeleteOperationMetric(DeleteOperationMetric):
_SUCCESS = "aws_obj_delete_success"
_ERRORS = "aws_obj_delete_fails"
_LATENCY = "aws_obj_delete_duration"
class S3LocalWriteOperationMetric(WriteOperationMetric):
_SUCCESS = "s3local_obj_put_success"
_ERRORS = "s3local_obj_put_fails"
_LATENCY = "s3local_obj_put_duration"
class S3LocalReadOperationMetric(ReadOperationMetric):
_SUCCESS = "s3local_obj_get_success"
_ERRORS = "s3local_obj_get_fails"
_LATENCY = "s3local_obj_get_duration"
class LocalWriteOperationMetric(WriteOperationMetric):
_SUCCESS = "local_obj_put_success"
_ERRORS = "local_obj_put_fails"
_LATENCY = "local_obj_put_duration"
class LocalReadOperationMetric(ReadOperationMetric):
_SUCCESS = "local_obj_get_success"
_ERRORS = "local_obj_get_fails"
class LocalDeleteOperationMetric(DeleteOperationMetric):
_SUCCESS = "local_obj_delete_success"
_ERRORS = "local_obj_delete_fails"
class VerifyReadOperationMetric(ReadOperationMetric):
_SUCCESS = "verified_obj"
_ERRORS = "invalid_obj"
class MetricsBase(ABC):
def __init__(self) -> None:
self.write: Optional[WriteOperationMetric] = None
self.read: Optional[ReadOperationMetric] = None
self.delete: Optional[DeleteOperationMetric] = None
@property
def operations(self) -> list[OperationMetric]:
return [metric for metric in [self.write, self.read, self.delete] if metric is not None]
class GrpcMetrics(MetricsBase):
_WRITE_SUCCESS = "frostfs_obj_put_total"
_WRITE_ERRORS = "frostfs_obj_put_fails"
_WRITE_LATENCY = "frostfs_obj_put_duration"
_READ_SUCCESS = "frostfs_obj_get_total"
_READ_ERRORS = "frostfs_obj_get_fails"
_READ_LATENCY = "frostfs_obj_get_duration"
_DELETE_SUCCESS = "frostfs_obj_delete_total"
_DELETE_ERRORS = "frostfs_obj_delete_fails"
_DELETE_LATENCY = "frostfs_obj_delete_duration"
def __init__(self, summary) -> None:
super().__init__()
self.write = GrpcWriteOperationMetric(summary)
self.read = GrpcReadOperationMetric(summary)
self.delete = GrpcDeleteOperationMetric(summary)
class S3Metrics(MetricsBase):
_WRITE_SUCCESS = "aws_obj_put_total"
_WRITE_ERRORS = "aws_obj_put_fails"
_WRITE_LATENCY = "aws_obj_put_duration"
def __init__(self, summary) -> None:
super().__init__()
self.write = S3WriteOperationMetric(summary)
self.read = S3ReadOperationMetric(summary)
self.delete = S3DeleteOperationMetric(summary)
_READ_SUCCESS = "aws_obj_get_total"
_READ_ERRORS = "aws_obj_get_fails"
_READ_LATENCY = "aws_obj_get_duration"
_DELETE_SUCCESS = "aws_obj_delete_total"
_DELETE_ERRORS = "aws_obj_delete_fails"
_DELETE_LATENCY = "aws_obj_delete_duration"
class S3LocalMetrics(MetricsBase):
_WRITE_SUCCESS = "s3local_obj_put_total"
_WRITE_ERRORS = "s3local_obj_put_fails"
_WRITE_LATENCY = "s3local_obj_put_duration"
def __init__(self, summary) -> None:
super().__init__()
self.write = S3LocalWriteOperationMetric(summary)
self.read = S3LocalReadOperationMetric(summary)
_READ_SUCCESS = "s3local_obj_get_total"
_READ_ERRORS = "s3local_obj_get_fails"
_READ_LATENCY = "s3local_obj_get_duration"
class LocalMetrics(MetricsBase):
_WRITE_SUCCESS = "local_obj_put_total"
_WRITE_ERRORS = "local_obj_put_fails"
_WRITE_LATENCY = "local_obj_put_duration"
_READ_SUCCESS = "local_obj_get_total"
_READ_ERRORS = "local_obj_get_fails"
_DELETE_SUCCESS = "local_obj_delete_total"
_DELETE_ERRORS = "local_obj_delete_fails"
def __init__(self, summary) -> None:
super().__init__()
self.write = LocalWriteOperationMetric(summary)
self.read = LocalReadOperationMetric(summary)
self.delete = LocalDeleteOperationMetric(summary)
class VerifyMetrics(MetricsBase):
_WRITE_SUCCESS = "N/A"
_WRITE_ERRORS = "N/A"
_READ_SUCCESS = "verified_obj"
_READ_ERRORS = "invalid_obj"
_DELETE_SUCCESS = "N/A"
_DELETE_ERRORS = "N/A"
def __init__(self, summary) -> None:
super().__init__()
self.read = VerifyReadOperationMetric(summary)
def get_metrics_object(load_type: LoadScenario, summary: dict[str, Any]) -> MetricsBase:

View file

@ -3,8 +3,8 @@ from typing import Optional
import yaml
from frostfs_testlib.load.interfaces.summarized import SummarizedStats
from frostfs_testlib.load.load_config import K6ProcessAllocationStrategy, LoadParams, LoadScenario
from frostfs_testlib.load.load_metrics import get_metrics_object
from frostfs_testlib.utils.converting_utils import calc_unit
@ -114,63 +114,46 @@ class LoadReport:
return model_map[self.load_params.scenario]
def _get_operations_sub_section_html(
self,
operation_type: str,
total_operations: int,
requested_rate_str: str,
vus_str: str,
total_rate: float,
throughput: float,
errors: dict[str, int],
latency: dict[str, dict],
):
def _get_operations_sub_section_html(self, operation_type: str, stats: SummarizedStats):
throughput_html = ""
if throughput > 0:
throughput, unit = calc_unit(throughput)
if stats.throughput > 0:
throughput, unit = calc_unit(stats.throughput)
throughput_html = self._row("Throughput", f"{throughput:.2f} {unit}/sec")
per_node_errors_html = ""
total_errors = 0
if errors:
total_errors: int = 0
for node_key, errors in errors.items():
total_errors += errors
if self.load_params.k6_process_allocation_strategy == K6ProcessAllocationStrategy.PER_ENDPOINT:
per_node_errors_html += self._row(f"At {node_key}", errors)
for node_key, errors in stats.errors.by_node.items():
if self.load_params.k6_process_allocation_strategy == K6ProcessAllocationStrategy.PER_ENDPOINT:
per_node_errors_html += self._row(f"At {node_key}", errors)
latency_html = ""
if latency:
for node_key, latency_dict in latency.items():
latency_values = "N/A"
if latency_dict:
latency_values = ""
for param_name, param_val in latency_dict.items():
latency_values += f"{param_name}={param_val:.2f}ms "
for node_key, latencies in stats.latencies.by_node.items():
latency_values = "N/A"
if latencies:
latency_values = ""
for param_name, param_val in latencies.items():
latency_values += f"{param_name}={param_val:.2f}ms "
latency_html += self._row(f"{operation_type} latency {node_key.split(':')[0]}", latency_values)
latency_html += self._row(f"{operation_type} latency {node_key.split(':')[0]}", latency_values)
object_size, object_size_unit = calc_unit(self.load_params.object_size, 1)
duration = self._seconds_to_formatted_duration(self.load_params.load_time)
model = self._get_model_string()
requested_rate_str = f"{stats.requested_rate}op/sec" if stats.requested_rate else ""
# write 8KB 15h49m 50op/sec 50th open model/closed model/min_iteration duration=1s - 1.636MB/s 199.57451/s
short_summary = f"{operation_type} {object_size}{object_size_unit} {duration} {requested_rate_str} {vus_str} {model} - {throughput:.2f}{unit}/s {total_rate:.2f}/s"
errors_percent = 0
if total_operations:
errors_percent = total_errors / total_operations * 100.0
short_summary = f"{operation_type} {object_size}{object_size_unit} {duration} {requested_rate_str} {stats.threads}th {model} - {throughput:.2f}{unit}/s {stats.rate:.2f}/s"
html = f"""
<table border="1" cellpadding="5px"><tbody>
<tr><th colspan="2" bgcolor="gainsboro">{short_summary}</th></tr>
<tr><th colspan="2" bgcolor="gainsboro">Metrics</th></tr>
{self._row("Total operations", total_operations)}
{self._row("OP/sec", f"{total_rate:.2f}")}
{self._row("Total operations", stats.operations)}
{self._row("OP/sec", f"{stats.rate:.2f}")}
{throughput_html}
{latency_html}
<tr><th colspan="2" bgcolor="gainsboro">Errors</th></tr>
{per_node_errors_html}
{self._row("Total", f"{total_errors} ({errors_percent:.2f}%)")}
{self._row("Threshold", f"{self.load_params.error_threshold:.2f}%")}
{self._row("Total", f"{stats.errors.total} ({stats.errors.percent:.2f}%)")}
{self._row("Threshold", f"{stats.errors.threshold:.2f}%")}
</tbody></table><br><hr>
"""
@ -178,111 +161,12 @@ class LoadReport:
def _get_totals_section_html(self):
html = ""
for i, load_summaries in enumerate(self.load_summaries_list, 1):
html += f"<h3>Load Results for load #{i}</h3>"
for i in range(len(self.load_summaries_list)):
html += f"<h3>Load Results for load #{i+1}</h3>"
write_operations = 0
write_op_sec = 0
write_throughput = 0
write_latency = {}
write_errors = {}
requested_write_rate = self.load_params.write_rate
requested_write_rate_str = f"{requested_write_rate}op/sec" if requested_write_rate else ""
read_operations = 0
read_op_sec = 0
read_throughput = 0
read_latency = {}
read_errors = {}
requested_read_rate = self.load_params.read_rate
requested_read_rate_str = f"{requested_read_rate}op/sec" if requested_read_rate else ""
delete_operations = 0
delete_op_sec = 0
delete_latency = {}
delete_errors = {}
requested_delete_rate = self.load_params.delete_rate
requested_delete_rate_str = f"{requested_delete_rate}op/sec" if requested_delete_rate else ""
if self.load_params.scenario in [LoadScenario.gRPC_CAR, LoadScenario.S3_CAR]:
delete_vus = max(self.load_params.preallocated_deleters or 0, self.load_params.max_deleters or 0)
write_vus = max(self.load_params.preallocated_writers or 0, self.load_params.max_writers or 0)
read_vus = max(self.load_params.preallocated_readers or 0, self.load_params.max_readers or 0)
else:
write_vus = self.load_params.writers
read_vus = self.load_params.readers
delete_vus = self.load_params.deleters
write_vus_str = f"{write_vus}th"
read_vus_str = f"{read_vus}th"
delete_vus_str = f"{delete_vus}th"
write_section_required = False
read_section_required = False
delete_section_required = False
for node_key, load_summary in load_summaries.items():
metrics = get_metrics_object(self.load_params.scenario, load_summary)
write_operations += metrics.write_total_iterations
if write_operations:
write_section_required = True
write_op_sec += metrics.write_rate
write_latency[node_key] = metrics.write_latency
write_throughput += metrics.write_throughput
if metrics.write_failed_iterations:
write_errors[node_key] = metrics.write_failed_iterations
read_operations += metrics.read_total_iterations
if read_operations:
read_section_required = True
read_op_sec += metrics.read_rate
read_throughput += metrics.read_throughput
read_latency[node_key] = metrics.read_latency
if metrics.read_failed_iterations:
read_errors[node_key] = metrics.read_failed_iterations
delete_operations += metrics.delete_total_iterations
if delete_operations:
delete_section_required = True
delete_op_sec += metrics.delete_rate
delete_latency[node_key] = metrics.delete_latency
if metrics.delete_failed_iterations:
delete_errors[node_key] = metrics.delete_failed_iterations
if write_section_required:
html += self._get_operations_sub_section_html(
"Write",
write_operations,
requested_write_rate_str,
write_vus_str,
write_op_sec,
write_throughput,
write_errors,
write_latency,
)
if read_section_required:
html += self._get_operations_sub_section_html(
"Read",
read_operations,
requested_read_rate_str,
read_vus_str,
read_op_sec,
read_throughput,
read_errors,
read_latency,
)
if delete_section_required:
html += self._get_operations_sub_section_html(
"Delete",
delete_operations,
requested_delete_rate_str,
delete_vus_str,
delete_op_sec,
0,
delete_errors,
delete_latency,
)
summarized = SummarizedStats.collect(self.load_params, self.load_summaries_list[i])
for operation_type, stats in summarized.items():
if stats.operations:
html += self._get_operations_sub_section_html(operation_type, stats)
return html

View file

@ -1,4 +1,5 @@
from frostfs_testlib import reporter
from frostfs_testlib.load.interfaces.summarized import SummarizedStats
from frostfs_testlib.load.load_config import LoadParams, LoadScenario
from frostfs_testlib.load.load_metrics import get_metrics_object
@ -8,56 +9,16 @@ class LoadVerifier:
self.load_params = load_params
def collect_load_issues(self, load_summaries: dict[str, dict]) -> list[str]:
write_operations = 0
write_errors = 0
read_operations = 0
read_errors = 0
delete_operations = 0
delete_errors = 0
writers = self.load_params.writers or self.load_params.preallocated_writers or 0
readers = self.load_params.readers or self.load_params.preallocated_readers or 0
deleters = self.load_params.deleters or self.load_params.preallocated_deleters or 0
for load_summary in load_summaries.values():
metrics = get_metrics_object(self.load_params.scenario, load_summary)
if writers:
write_operations += metrics.write_total_iterations
write_errors += metrics.write_failed_iterations
if readers:
read_operations += metrics.read_total_iterations
read_errors += metrics.read_failed_iterations
if deleters:
delete_operations += metrics.delete_total_iterations
delete_errors += metrics.delete_failed_iterations
summarized = SummarizedStats.collect(self.load_params, load_summaries)
issues = []
if writers and not write_operations:
issues.append(f"No any write operation was performed")
if readers and not read_operations:
issues.append(f"No any read operation was performed")
if deleters and not delete_operations:
issues.append(f"No any delete operation was performed")
error_rate = self._get_error_rate(writers, write_operations, write_errors)
if error_rate > self.load_params.error_threshold:
rate_str = self._get_rate_str(error_rate)
issues.append(f"Write errors exceeded threshold: {rate_str} > {self.load_params.error_threshold}%")
for operation_type, stats in summarized.items():
if stats.threads and not stats.operations:
issues.append(f"No any {operation_type.lower()} operation was performed")
error_rate = self._get_error_rate(readers, read_operations, read_errors)
if error_rate > self.load_params.error_threshold:
rate_str = self._get_rate_str(error_rate)
issues.append(f"Read errors exceeded threshold: {rate_str} > {self.load_params.error_threshold}%")
error_rate = self._get_error_rate(deleters, delete_operations, delete_errors)
if error_rate > self.load_params.error_threshold:
rate_str = self._get_rate_str(error_rate)
issues.append(f"Delete errors exceeded threshold: {rate_str} > {self.load_params.error_threshold}%")
if stats.errors.percent > stats.errors.threshold:
rate_str = self._get_rate_str(stats.errors.percent)
issues.append(f"{operation_type} errors exceeded threshold: {rate_str} > {stats.errors.threshold}%")
return issues
@ -74,13 +35,6 @@ class LoadVerifier:
)
return verify_issues
def _get_error_rate(self, vus: int, operations: int, errors: int) -> float:
if not operations or not vus:
return 0
error_rate = errors / operations * 100
return error_rate
def _get_rate_str(self, rate: float, minimal: float = 0.01) -> str:
return f"{rate:.2f}%" if rate >= minimal else f"~{minimal}%"
@ -95,13 +49,13 @@ class LoadVerifier:
delete_success = 0
if deleters > 0:
delete_success = load_metrics.delete_success_iterations
delete_success = load_metrics.delete.success_iterations
if verification_summary:
verify_metrics = get_metrics_object(LoadScenario.VERIFY, verification_summary)
verified_objects = verify_metrics.read_success_iterations
invalid_objects = verify_metrics.read_failed_iterations
total_left_objects = load_metrics.write_success_iterations - delete_success
verified_objects = verify_metrics.read.success_iterations
invalid_objects = verify_metrics.read.failed_iterations
total_left_objects = load_metrics.write.success_iterations - delete_success
# Due to interruptions we may see total verified objects to be less than written on writers count
if abs(total_left_objects - verified_objects) > writers:

View file

@ -4,6 +4,7 @@ import math
import re
import time
from dataclasses import fields
from threading import Event
from typing import Optional
from urllib.parse import urlparse
@ -30,7 +31,6 @@ from frostfs_testlib.testing import parallel, run_optionally
from frostfs_testlib.testing.test_control import retry
from frostfs_testlib.utils import datetime_utils
from frostfs_testlib.utils.file_keeper import FileKeeper
from threading import Event
class RunnerBase(ScenarioRunner):
@ -78,6 +78,10 @@ class DefaultRunner(RunnerBase):
nodes_under_load: list[ClusterNode],
k6_dir: str,
):
if load_params.force_fresh_registry and load_params.custom_registry:
with reporter.step("Forcing fresh registry files"):
parallel(self._force_fresh_registry, self.loaders, load_params)
if load_params.load_type != LoadType.S3:
return
@ -88,6 +92,11 @@ class DefaultRunner(RunnerBase):
parallel(self._prepare_loader, self.loaders, load_params, grpc_peer, s3_public_keys, k6_dir)
def _force_fresh_registry(self, loader: Loader, load_params: LoadParams):
with reporter.step(f"Forcing fresh registry on {loader.ip}"):
shell = loader.get_shell()
shell.exec(f"rm -f {load_params.registry_file}")
def _prepare_loader(
self,
loader: Loader,
@ -314,7 +323,7 @@ class LocalRunner(RunnerBase):
with reporter.step("Download K6"):
shell.exec(f"sudo rm -rf {k6_dir};sudo mkdir {k6_dir}")
shell.exec(f"sudo curl -so {k6_dir}/k6.tar.gz {load_params.k6_url}")
shell.exec(f"sudo tar xf {k6_dir}/k6.tar.gz -C {k6_dir}")
shell.exec(f"sudo tar xf {k6_dir}/k6.tar.gz --strip-components 2 -C {k6_dir}")
shell.exec(f"sudo chmod -R 777 {k6_dir}")
with reporter.step("Create empty_passwd"):

View file

@ -9,4 +9,4 @@ FROSTFS_ADM_EXEC = os.getenv("FROSTFS_ADM_EXEC", "frostfs-adm")
# Config for frostfs-adm utility. Optional if tests are running against devenv
FROSTFS_ADM_CONFIG_PATH = os.getenv("FROSTFS_ADM_CONFIG_PATH")
CLI_DEFAULT_TIMEOUT = os.getenv("CLI_DEFAULT_TIMEOUT", None)
CLI_DEFAULT_TIMEOUT = os.getenv("CLI_DEFAULT_TIMEOUT", "100s")

View file

@ -9,7 +9,7 @@ class SudoInspector(CommandInspector):
def inspect(self, original_command: str, command: str) -> str:
if not command.startswith("sudo"):
return f"sudo -i {command}"
return f"sudo {command}"
return command

View file

@ -185,6 +185,7 @@ class SSHShell(Shell):
private_key_passphrase: Optional[str] = None,
port: str = "22",
command_inspectors: Optional[list[CommandInspector]] = None,
custom_environment: Optional[dict] = None
) -> None:
super().__init__()
self.connection_provider = SshConnectionProvider()
@ -196,6 +197,8 @@ class SSHShell(Shell):
self.command_inspectors = command_inspectors or []
self.environment = custom_environment
@property
def _connection(self):
return self.connection_provider.provide(self.host, self.port)
@ -224,7 +227,7 @@ class SSHShell(Shell):
@log_command
def _exec_interactive(self, command: str, options: CommandOptions) -> CommandResult:
stdin, stdout, stderr = self._connection.exec_command(command, timeout=options.timeout, get_pty=True)
stdin, stdout, stderr = self._connection.exec_command(command, timeout=options.timeout, get_pty=True, environment=self.environment)
for interactive_input in options.interactive_inputs:
input = interactive_input.input
if not input.endswith("\n"):
@ -251,7 +254,7 @@ class SSHShell(Shell):
@log_command
def _exec_non_interactive(self, command: str, options: CommandOptions) -> CommandResult:
try:
stdin, stdout, stderr = self._connection.exec_command(command, timeout=options.timeout)
stdin, stdout, stderr = self._connection.exec_command(command, timeout=options.timeout, environment=self.environment)
if options.close_stdin:
stdin.close()

View file

@ -5,6 +5,8 @@ from dataclasses import dataclass
from time import sleep
from typing import Optional, Union
import requests
from frostfs_testlib import reporter
from frostfs_testlib.cli import FrostfsCli
from frostfs_testlib.resources.cli import CLI_DEFAULT_TIMEOUT, FROSTFS_CLI_EXEC
@ -290,18 +292,17 @@ def delete_container(
force: bool = False,
session_token: Optional[str] = None,
await_mode: bool = False,
timeout: Optional[str] = CLI_DEFAULT_TIMEOUT,
) -> None:
"""
A wrapper for `frostfs-cli container delete` call.
Args:
await_mode: Block execution until container is removed.
wallet (str): path to a wallet on whose behalf we delete the container
cid (str): ID of the container to delete
shell: executor for cli command
endpoint: FrostFS endpoint to send request to, appends to `--rpc-endpoint` key
force (bool): do not check whether container contains locks and remove immediately
session_token: a path to session token file
timeout: Timeout for the operation.
This function doesn't return anything.
"""
@ -313,7 +314,6 @@ def delete_container(
force=force,
session=session_token,
await_mode=await_mode,
timeout=timeout,
)
@ -344,12 +344,13 @@ def _parse_cid(output: str) -> str:
@reporter.step("Search container by name")
def search_container_by_name(wallet: str, name: str, shell: Shell, endpoint: str):
list_cids = list_containers(wallet, shell, endpoint)
for cid in list_cids:
cont_info = get_container(wallet, cid, shell, endpoint, True)
if cont_info.get("attributes", {}).get("Name", None) == name:
return cid
def search_container_by_name(name: str, node: ClusterNode):
node_shell = node.host.get_shell()
output = node_shell.exec(f"curl -I HEAD http://127.0.0.1:8084/{name}")
pattern = r"X-Container-Id: (\S+)"
cid = re.findall(pattern, output.stdout)
if cid:
return cid[0]
return None

View file

@ -732,23 +732,24 @@ def neo_go_query_height(shell: Shell, endpoint: str) -> dict:
@reporter.step("Search object nodes")
def get_object_nodes(
cluster: Cluster,
wallet: str,
cid: str,
oid: str,
shell: Shell,
endpoint: str,
alive_node: ClusterNode,
bearer: str = "",
xhdr: Optional[dict] = None,
is_direct: bool = False,
verify_presence_all: bool = False,
wallet_config: Optional[str] = None,
timeout: Optional[str] = CLI_DEFAULT_TIMEOUT,
) -> list[ClusterNode]:
cli = FrostfsCli(shell, FROSTFS_CLI_EXEC, wallet_config or DEFAULT_WALLET_CONFIG)
shell = alive_node.host.get_shell()
endpoint = alive_node.storage_node.get_rpc_endpoint()
wallet = alive_node.storage_node.get_remote_wallet_path()
wallet_config = alive_node.storage_node.get_remote_wallet_config_path()
cli = FrostfsCli(shell, FROSTFS_CLI_EXEC, wallet_config)
result_object_nodes = cli.object.nodes(
rpc_endpoint=endpoint,
wallet=wallet,
cid=cid,
oid=oid,
bearer=bearer,

View file

@ -87,7 +87,7 @@ def tick_epoch(shell: Shell, cluster: Cluster, alive_node: Optional[StorageNode]
alive_node = alive_node if alive_node else cluster.services(StorageNode)[0]
remote_shell = alive_node.host.get_shell()
if FROSTFS_ADM_EXEC and FROSTFS_ADM_CONFIG_PATH:
if "force_transactions" not in alive_node.host.config.attributes:
# If frostfs-adm is available, then we tick epoch with it (to be consistent with UAT tests)
frostfs_adm = FrostfsAdm(
shell=remote_shell,

View file

@ -231,6 +231,10 @@ def search_nodes_with_bucket(
shell: Shell,
endpoint: str,
) -> list[ClusterNode]:
cid = search_container_by_name(wallet=wallet, name=bucket_name, shell=shell, endpoint=endpoint)
cid = None
for cluster_node in cluster.cluster_nodes:
cid = search_container_by_name(name=bucket_name, node=cluster_node)
if cid:
break
nodes_list = search_nodes_with_container(wallet=wallet, cid=cid, shell=shell, endpoint=endpoint, cluster=cluster)
return nodes_list

View file

@ -105,7 +105,7 @@ class ClusterNode:
service_entry = self.class_registry.get_entry(service_type)
service_name = service_entry["hosting_service_name"]
pattern = f"{service_name}{self.id:02}"
pattern = f"{service_name}_{self.id:02}"
config = self.host.get_service_config(pattern)
return service_type(
@ -120,7 +120,7 @@ class ClusterNode:
svcs_names_on_node = [svc.name for svc in self.host.config.services]
for entry in self.class_registry._class_mapping.values():
hosting_svc_name = entry["hosting_service_name"]
pattern = f"{hosting_svc_name}{self.id:02}"
pattern = f"{hosting_svc_name}_{self.id:02}"
if pattern in svcs_names_on_node:
config = self.host.get_service_config(pattern)
svcs.append(
@ -267,13 +267,13 @@ class Cluster:
service_name = service["hosting_service_name"]
cls: type[NodeBase] = service["cls"]
pattern = f"{service_name}\d*$"
pattern = f"{service_name}_\d*$"
configs = self.hosting.find_service_configs(pattern)
found_nodes = []
for config in configs:
# config.name is something like s3-gate01. Cut last digits to know service type
service_type = re.findall(".*\D", config.name)[0]
service_type = re.findall("(.*)_\d+", config.name)[0]
# exclude unsupported services
if service_type != service_name:
continue

View file

@ -8,7 +8,8 @@ class ConfigAttributes:
SHARD_CONFIG_PATH = "shard_config_path"
LOGGER_CONFIG_PATH = "logger_config_path"
LOCAL_WALLET_PATH = "local_wallet_path"
LOCAL_WALLET_CONFIG = "local_config_path"
LOCAL_WALLET_CONFIG = "local_wallet_config_path"
REMOTE_WALLET_CONFIG = "remote_wallet_config_path"
ENDPOINT_DATA_0 = "endpoint_data0"
ENDPOINT_DATA_1 = "endpoint_data1"
ENDPOINT_INTERNAL = "endpoint_internal0"
@ -17,11 +18,3 @@ class ConfigAttributes:
UN_LOCODE = "un_locode"
HTTP_HOSTNAME = "http_hostname"
S3_HOSTNAME = "s3_hostname"
class _FrostfsServicesNames:
STORAGE = "s"
S3_GATE = "s3-gate"
HTTP_GATE = "http-gate"
MORPH_CHAIN = "morph-chain"
INNER_RING = "ir"

View file

@ -476,12 +476,12 @@ class ClusterStateController:
def _enable_date_synchronizer(self, cluster_node: ClusterNode):
shell = cluster_node.host.get_shell()
shell.exec("timedatectl set-ntp true")
cluster_node.host.wait_for_service_to_be_in_state("systemd-timesyncd", "active", 5)
cluster_node.host.wait_for_service_to_be_in_state("systemd-timesyncd", "active", 15)
def _disable_date_synchronizer(self, cluster_node: ClusterNode):
shell = cluster_node.host.get_shell()
shell.exec("timedatectl set-ntp false")
cluster_node.host.wait_for_service_to_be_in_state("systemd-timesyncd", "inactive", 5)
cluster_node.host.wait_for_service_to_be_in_state("systemd-timesyncd", "inactive", 15)
def _get_disk_controller(self, node: StorageNode, device: str, mountpoint: str) -> DiskController:
disk_controller_id = DiskController.get_id(node, device)

View file

@ -3,7 +3,7 @@ import yaml
from frostfs_testlib.blockchain import RPCClient
from frostfs_testlib.storage.constants import ConfigAttributes
from frostfs_testlib.storage.dataclasses.node_base import NodeBase
from frostfs_testlib.storage.dataclasses.shard import Shard
class InnerRing(NodeBase):
"""
@ -148,6 +148,20 @@ class StorageNode(NodeBase):
def get_shards_config(self) -> tuple[str, dict]:
return self.get_config(self.get_shard_config_path())
def get_shards(self) -> list[Shard]:
config = self.get_shards_config()[1]
config["storage"]["shard"].pop("default")
return [Shard.from_object(shard) for shard in config["storage"]["shard"].values()]
def get_shards_from_env(self) -> list[Shard]:
config = self.get_shards_config()[1]
configObj = ConfigObj(StringIO(config))
pattern = f"{SHARD_PREFIX}\d*"
num_shards = len(set(re.findall(pattern, self.get_shards_config())))
return [Shard.from_config_object(configObj, shard_id) for shard_id in range(num_shards)]
def get_control_endpoint(self) -> str:
return self._get_attribute(ConfigAttributes.CONTROL_ENDPOINT)
@ -157,6 +171,9 @@ class StorageNode(NodeBase):
def get_data_directory(self) -> str:
return self.host.get_data_directory(self.name)
def get_storage_config(self) -> str:
return self.host.get_storage_config(self.name)
def get_http_hostname(self) -> str:
return self._get_attribute(ConfigAttributes.HTTP_HOSTNAME)
@ -169,8 +186,11 @@ class StorageNode(NodeBase):
def delete_fstree(self):
self.host.delete_fstree(self.name)
def delete_pilorama(self):
self.host.delete_pilorama(self.name)
def delete_file(self, file_path: str) -> None:
self.host.delete_file(file_path)
def is_file_exist(self, file_path) -> bool:
return self.host.is_file_exist(file_path)
def delete_metabase(self):
self.host.delete_metabase(self.name)

View file

@ -114,6 +114,14 @@ class NodeBase(HumanReadableABC):
ConfigAttributes.CONFIG_PATH,
)
def get_remote_wallet_config_path(self) -> str:
"""
Returns node config file path located on remote host
"""
return self._get_attribute(
ConfigAttributes.REMOTE_WALLET_CONFIG,
)
def get_wallet_config_path(self) -> str:
return self._get_attribute(
ConfigAttributes.LOCAL_WALLET_CONFIG,
@ -125,8 +133,11 @@ class NodeBase(HumanReadableABC):
Returns config path for logger located on remote host
"""
config_attributes = self.host.get_service_config(self.name)
return self._get_attribute(
ConfigAttributes.LOGGER_CONFIG_PATH) if ConfigAttributes.LOGGER_CONFIG_PATH in config_attributes.attributes else None
return (
self._get_attribute(ConfigAttributes.LOGGER_CONFIG_PATH)
if ConfigAttributes.LOGGER_CONFIG_PATH in config_attributes.attributes
else None
)
@property
def config_dir(self) -> str:

View file

@ -0,0 +1,99 @@
import json
import pathlib
import re
from dataclasses import dataclass
from io import StringIO
import allure
import pytest
import yaml
from configobj import ConfigObj
from frostfs_testlib.cli import FrostfsCli
from frostfs_testlib.resources.cli import CLI_DEFAULT_TIMEOUT
from frostfs_testlib.resources.common import DEFAULT_WALLET_CONFIG
SHARD_PREFIX = "FROSTFS_STORAGE_SHARD_"
BLOBSTOR_PREFIX = "_BLOBSTOR_"
@dataclass
class Blobstor:
path: str
path_type: str
def __eq__(self, other) -> bool:
if not isinstance(other, self.__class__):
raise RuntimeError(f"Only two {self.__class__.__name__} instances can be compared")
return self.path == other.path and self.path_type == other.path_type
def __hash__(self):
return hash((self.path, self.path_type))
@staticmethod
def from_config_object(section: ConfigObj, shard_id: str, blobstor_id: str):
var_prefix = f"{SHARD_PREFIX}{shard_id}{BLOBSTOR_PREFIX}{blobstor_id}"
return Blobstor(section.get(f"{var_prefix}_PATH"), section.get(f"{var_prefix}_TYPE"))
@dataclass
class Shard:
blobstor: list[Blobstor]
metabase: str
writecache: str
pilorama: str
def __eq__(self, other) -> bool:
if not isinstance(other, self.__class__):
raise RuntimeError(f"Only two {self.__class__.__name__} instances can be compared")
return (
set(self.blobstor) == set(other.blobstor)
and self.metabase == other.metabase
and self.writecache == other.writecache
and self.pilorama == other.pilorama
)
def __hash__(self):
return hash((self.metabase, self.writecache))
@staticmethod
def _get_blobstor_count_from_section(config_object: ConfigObj, shard_id: int):
pattern = f"{SHARD_PREFIX}{shard_id}{BLOBSTOR_PREFIX}"
blobstors = {key[: len(pattern) + 2] for key in config_object.keys() if pattern in key}
return len(blobstors)
@staticmethod
def from_config_object(config_object: ConfigObj, shard_id: int):
var_prefix = f"{SHARD_PREFIX}{shard_id}"
blobstor_count = Shard._get_blobstor_count_from_section(config_object, shard_id)
blobstors = [
Blobstor.from_config_object(config_object, shard_id, blobstor_id) for blobstor_id in range(blobstor_count)
]
write_cache_enabled = config_object.as_bool(f"{var_prefix}_WRITECACHE_ENABLED")
return Shard(
blobstors,
config_object.get(f"{var_prefix}_METABASE_PATH"),
config_object.get(f"{var_prefix}_WRITECACHE_PATH") if write_cache_enabled else "",
)
@staticmethod
def from_object(shard):
metabase = shard["metabase"]["path"] if "path" in shard["metabase"] else shard["metabase"]
writecache = shard["writecache"]["path"] if "path" in shard["writecache"] else shard["writecache"]
# Currently due to issue we need to check if pilorama exists in keys
# TODO: make pilorama mandatory after fix
if shard.get("pilorama"):
pilorama = shard["pilorama"]["path"] if "path" in shard["pilorama"] else shard["pilorama"]
else:
pilorama = None
return Shard(
blobstor=[Blobstor(path=blobstor["path"], path_type=blobstor["type"]) for blobstor in shard["blobstor"]],
metabase=metabase,
writecache=writecache,
pilorama=pilorama
)

View file

@ -2,10 +2,11 @@ import logging
import re
from frostfs_testlib.cli import FrostfsAdm, FrostfsCli
from frostfs_testlib.hosting import Hosting
from frostfs_testlib.hosting import Host, Hosting
from frostfs_testlib.resources.cli import FROSTFS_ADM_EXEC, FROSTFS_AUTHMATE_EXEC, FROSTFS_CLI_EXEC, NEOGO_EXECUTABLE
from frostfs_testlib.resources.common import DEFAULT_WALLET_CONFIG
from frostfs_testlib.shell import Shell
from frostfs_testlib.testing.parallel import parallel
logger = logging.getLogger("NeoLogger")
@ -33,49 +34,74 @@ def get_local_binaries_versions(shell: Shell) -> dict[str, str]:
return versions
def get_remote_binaries_versions(hosting: Hosting) -> dict[str, str]:
def parallel_binary_verions(host: Host) -> dict[str, str]:
versions_by_host = {}
for host in hosting.hosts:
binary_path_by_name = {} # Maps binary name to executable path
for service_config in host.config.services:
exec_path = service_config.attributes.get("exec_path")
requires_check = service_config.attributes.get("requires_version_check", "true")
if exec_path:
binary_path_by_name[service_config.name] = {
"exec_path": exec_path,
"check": requires_check.lower() == "true",
}
for cli_config in host.config.clis:
requires_check = cli_config.attributes.get("requires_version_check", "true")
binary_path_by_name[cli_config.name] = {
"exec_path": cli_config.exec_path,
binary_path_by_name = {} # Maps binary name to executable path
for service_config in host.config.services:
exec_path = service_config.attributes.get("exec_path")
requires_check = service_config.attributes.get("requires_version_check", "true")
if exec_path:
binary_path_by_name[service_config.name] = {
"exec_path": exec_path,
"check": requires_check.lower() == "true",
}
for cli_config in host.config.clis:
requires_check = cli_config.attributes.get("requires_version_check", "true")
binary_path_by_name[cli_config.name] = {
"exec_path": cli_config.exec_path,
"check": requires_check.lower() == "true",
}
shell = host.get_shell()
versions_at_host = {}
for binary_name, binary in binary_path_by_name.items():
try:
binary_path = binary["exec_path"]
result = shell.exec(f"{binary_path} --version")
versions_at_host[binary_name] = {"version": _parse_version(result.stdout), "check": binary["check"]}
except Exception as exc:
logger.error(f"Cannot get version for {binary_path} because of\n{exc}")
versions_at_host[binary_name] = {"version": "Unknown", "check": binary["check"]}
versions_by_host[host.config.address] = versions_at_host
shell = host.get_shell()
versions_at_host = {}
for binary_name, binary in binary_path_by_name.items():
try:
binary_path = binary["exec_path"]
result = shell.exec(f"{binary_path} --version")
versions_at_host[binary_name] = {"version": _parse_version(result.stdout), "check": binary["check"]}
except Exception as exc:
logger.error(f"Cannot get version for {binary_path} because of\n{exc}")
versions_at_host[binary_name] = {"version": "Unknown", "check": binary["check"]}
versions_by_host[host.config.address] = versions_at_host
return versions_by_host
def get_remote_binaries_versions(hosting: Hosting) -> dict[str, str]:
versions_by_host = {}
future_binary_verions = parallel(parallel_binary_verions, parallel_items=hosting.hosts)
for future in future_binary_verions:
versions_by_host.update(future.result())
# Consolidate versions across all hosts
cheak_versions = {}
exсeptions = []
exception = set()
previous_host = None
versions = {}
captured_version = None
for host, binary_versions in versions_by_host.items():
for name, binary in binary_versions.items():
captured_version = versions.get(name, {}).get("version")
version = binary["version"]
if captured_version:
assert captured_version == version, f"Binary {name} has inconsistent version on host {host}"
if not cheak_versions.get(f'{name[:-2]}', None):
captured_version = cheak_versions.get(f'{name[:-2]}',{}).get(host, {}).get(captured_version)
cheak_versions[f'{name[:-2]}'] = {host: {version: name}}
else:
versions[name] = {"version": version, "check": binary["check"]}
return versions
captured_version = list(cheak_versions.get(f'{name[:-2]}',{}).get(previous_host).keys())[0]
cheak_versions[f'{name[:-2]}'].update({host:{version:name}})
if captured_version and captured_version != version:
exception.add(name[:-2])
versions[name] = {"version": version, "check": binary["check"]}
previous_host = host
if exception:
for i in exception:
for host in versions_by_host.keys():
for version, name in cheak_versions.get(i).get(host).items():
exсeptions.append(f'Binary {name} has inconsistent version {version} on host {host}')
exсeptions.append('\n')
return versions, exсeptions
def _parse_version(version_output: str) -> str:
version = re.search(r"version[:\s]*v?(.+)", version_output, re.IGNORECASE)

View file

@ -4,13 +4,7 @@ import pytest
from frostfs_testlib.s3 import AwsCliClient, Boto3ClientWrapper
from frostfs_testlib.storage.dataclasses.acl import EACLRole
from frostfs_testlib.storage.dataclasses.frostfs_services import (
HTTPGate,
InnerRing,
MorphChain,
S3Gate,
StorageNode,
)
from frostfs_testlib.storage.dataclasses.frostfs_services import HTTPGate, InnerRing, MorphChain, S3Gate, StorageNode
from frostfs_testlib.storage.dataclasses.object_size import ObjectSize
@ -22,10 +16,10 @@ class TestDataclassesStr:
[
(Boto3ClientWrapper, "Boto3 client"),
(AwsCliClient, "AWS CLI"),
(ObjectSize("simple", 1), "simple object size"),
(ObjectSize("simple", 10), "simple object size"),
(ObjectSize("complex", 5000), "complex object size"),
(ObjectSize("complex", 5555), "complex object size"),
(ObjectSize("simple", 1), "simple"),
(ObjectSize("simple", 10), "simple"),
(ObjectSize("complex", 5000), "complex"),
(ObjectSize("complex", 5555), "complex"),
(StorageNode, "StorageNode"),
(MorphChain, "MorphChain"),
(S3Gate, "S3Gate"),

View file

@ -15,6 +15,7 @@ class TestHosting(TestCase):
HOST1 = {
"address": HOST1_ADDRESS,
"plugin_name": HOST1_PLUGIN,
"healthcheck_plugin_name": "basic",
"attributes": HOST1_ATTRIBUTES,
"clis": HOST1_CLIS,
"services": HOST1_SERVICES,
@ -32,6 +33,7 @@ class TestHosting(TestCase):
HOST2 = {
"address": HOST2_ADDRESS,
"plugin_name": HOST2_PLUGIN,
"healthcheck_plugin_name": "basic",
"attributes": HOST2_ATTRIBUTES,
"clis": HOST2_CLIS,
"services": HOST2_SERVICES,
@ -52,18 +54,14 @@ class TestHosting(TestCase):
self.assertEqual(host1.config.plugin_name, self.HOST1_PLUGIN)
self.assertDictEqual(host1.config.attributes, self.HOST1_ATTRIBUTES)
self.assertListEqual(host1.config.clis, [CLIConfig(**cli) for cli in self.HOST1_CLIS])
self.assertListEqual(
host1.config.services, [ServiceConfig(**service) for service in self.HOST1_SERVICES]
)
self.assertListEqual(host1.config.services, [ServiceConfig(**service) for service in self.HOST1_SERVICES])
host2 = hosting.get_host_by_address(self.HOST2_ADDRESS)
self.assertEqual(host2.config.address, self.HOST2_ADDRESS)
self.assertEqual(host2.config.plugin_name, self.HOST2_PLUGIN)
self.assertDictEqual(host2.config.attributes, self.HOST2_ATTRIBUTES)
self.assertListEqual(host2.config.clis, [CLIConfig(**cli) for cli in self.HOST2_CLIS])
self.assertListEqual(
host2.config.services, [ServiceConfig(**service) for service in self.HOST2_SERVICES]
)
self.assertListEqual(host2.config.services, [ServiceConfig(**service) for service in self.HOST2_SERVICES])
def test_get_host_by_service(self):
hosting = Hosting()
@ -104,9 +102,7 @@ class TestHosting(TestCase):
services = hosting.find_service_configs(rf"^{self.SERVICE_NAME_PREFIX}")
self.assertEqual(len(services), 2)
for service in services:
self.assertEqual(
service.name[: len(self.SERVICE_NAME_PREFIX)], self.SERVICE_NAME_PREFIX
)
self.assertEqual(service.name[: len(self.SERVICE_NAME_PREFIX)], self.SERVICE_NAME_PREFIX)
service1 = hosting.find_service_configs(self.SERVICE1["name"])
self.assertEqual(len(service1), 1)

View file

@ -136,11 +136,15 @@ class TestLoadConfig:
def test_argument_parsing_for_grpc_scenario(self, load_params: LoadParams):
expected_preset_args = [
"--size '11'",
"--acl 'acl'",
"--preload_obj '13'",
"--out 'pregen_json'",
"--workers '7'",
"--containers '16'",
"--policy 'container_placement_policy'",
"--ignore-errors",
"--sleep '19'",
"--local",
]
expected_env_vars = {
"DURATION": 9,
@ -151,6 +155,7 @@ class TestLoadConfig:
"WRITERS": 7,
"READERS": 7,
"DELETERS": 8,
"READ_AGE": 8,
"PREGEN_JSON": "pregen_json",
"PREPARE_LOCALLY": True,
}
@ -167,6 +172,10 @@ class TestLoadConfig:
"--workers '7'",
"--containers '16'",
"--policy 'container_placement_policy'",
"--ignore-errors",
"--sleep '19'",
"--local",
"--acl 'acl'",
]
expected_env_vars = {
"DURATION": 9,
@ -184,6 +193,7 @@ class TestLoadConfig:
"TIME_UNIT": "time_unit",
"WRITE_RATE": 10,
"READ_RATE": 9,
"READ_AGE": 8,
"DELETE_RATE": 11,
"PREPARE_LOCALLY": True,
}
@ -201,6 +211,9 @@ class TestLoadConfig:
"--workers '7'",
"--buckets '13'",
"--location 's3_location'",
"--ignore-errors",
"--sleep '19'",
"--acl 'acl'",
]
expected_env_vars = {
"DURATION": 9,
@ -211,6 +224,7 @@ class TestLoadConfig:
"WRITERS": 7,
"READERS": 7,
"DELETERS": 8,
"READ_AGE": 8,
"NO_VERIFY_SSL": True,
"PREGEN_JSON": "pregen_json",
}
@ -218,6 +232,45 @@ class TestLoadConfig:
self._check_preset_params(load_params, expected_preset_args)
self._check_env_vars(load_params, expected_env_vars)
@pytest.mark.parametrize("load_params", [LoadScenario.S3_CAR], indirect=True)
def test_argument_parsing_for_s3_car_scenario_with_stringed_time(self, load_params: LoadParams):
load_params.load_time = "2d3h5min"
expected_preset_args = [
"--size '11'",
"--preload_obj '13'",
"--no-verify-ssl",
"--out 'pregen_json'",
"--workers '7'",
"--buckets '13'",
"--location 's3_location'",
"--ignore-errors",
"--sleep '19'",
"--acl 'acl'",
]
expected_env_vars = {
"DURATION": 183900,
"WRITE_OBJ_SIZE": 11,
"REGISTRY_FILE": "registry_file",
"K6_MIN_ITERATION_DURATION": "min_iteration_duration",
"K6_SETUP_TIMEOUT": "setup_timeout",
"NO_VERIFY_SSL": True,
"MAX_WRITERS": 11,
"MAX_READERS": 11,
"MAX_DELETERS": 12,
"PRE_ALLOC_DELETERS": 21,
"PRE_ALLOC_READERS": 20,
"PRE_ALLOC_WRITERS": 20,
"PREGEN_JSON": "pregen_json",
"TIME_UNIT": "time_unit",
"WRITE_RATE": 10,
"READ_RATE": 9,
"READ_AGE": 8,
"DELETE_RATE": 11,
}
self._check_preset_params(load_params, expected_preset_args)
self._check_env_vars(load_params, expected_env_vars)
@pytest.mark.parametrize("load_params", [LoadScenario.S3_CAR], indirect=True)
def test_argument_parsing_for_s3_car_scenario(self, load_params: LoadParams):
expected_preset_args = [
@ -228,6 +281,9 @@ class TestLoadConfig:
"--workers '7'",
"--buckets '13'",
"--location 's3_location'",
"--ignore-errors",
"--sleep '19'",
"--acl 'acl'",
]
expected_env_vars = {
"DURATION": 9,
@ -246,6 +302,7 @@ class TestLoadConfig:
"TIME_UNIT": "time_unit",
"WRITE_RATE": 10,
"READ_RATE": 9,
"READ_AGE": 8,
"DELETE_RATE": 11,
}
@ -254,6 +311,7 @@ class TestLoadConfig:
@pytest.mark.parametrize("load_params", [LoadScenario.HTTP], indirect=True)
def test_argument_parsing_for_http_scenario(self, load_params: LoadParams):
load_params.preset.local = False
expected_preset_args = [
"--no-verify-ssl",
"--size '11'",
@ -262,6 +320,9 @@ class TestLoadConfig:
"--workers '7'",
"--containers '16'",
"--policy 'container_placement_policy'",
"--ignore-errors",
"--sleep '19'",
"--acl 'acl'",
]
expected_env_vars = {
"DURATION": 9,
@ -273,6 +334,7 @@ class TestLoadConfig:
"WRITERS": 7,
"READERS": 7,
"DELETERS": 8,
"READ_AGE": 8,
"PREGEN_JSON": "pregen_json",
}
@ -281,6 +343,7 @@ class TestLoadConfig:
@pytest.mark.parametrize("load_params", [LoadScenario.LOCAL], indirect=True)
def test_argument_parsing_for_local_scenario(self, load_params: LoadParams):
load_params.preset.local = False
expected_preset_args = [
"--size '11'",
"--preload_obj '13'",
@ -288,6 +351,9 @@ class TestLoadConfig:
"--workers '7'",
"--containers '16'",
"--policy 'container_placement_policy'",
"--ignore-errors",
"--sleep '19'",
"--acl 'acl'",
]
expected_env_vars = {
"CONFIG_FILE": "config_file",
@ -299,6 +365,7 @@ class TestLoadConfig:
"WRITERS": 7,
"READERS": 7,
"DELETERS": 8,
"READ_AGE": 8,
"PREGEN_JSON": "pregen_json",
}
@ -338,6 +405,8 @@ class TestLoadConfig:
"--workers '0'",
"--containers '0'",
"--policy ''",
"--sleep '0'",
"--acl ''",
]
expected_env_vars = {
"DURATION": 0,
@ -348,6 +417,7 @@ class TestLoadConfig:
"WRITERS": 0,
"READERS": 0,
"DELETERS": 0,
"READ_AGE": 0,
"PREGEN_JSON": "",
"PREPARE_LOCALLY": False,
}
@ -364,6 +434,8 @@ class TestLoadConfig:
"--workers '0'",
"--containers '0'",
"--policy ''",
"--sleep '0'",
"--acl ''",
]
expected_env_vars = {
"DURATION": 0,
@ -382,6 +454,7 @@ class TestLoadConfig:
"WRITE_RATE": 0,
"READ_RATE": 0,
"DELETE_RATE": 0,
"READ_AGE": 0,
"PREPARE_LOCALLY": False,
}
@ -397,6 +470,8 @@ class TestLoadConfig:
"--workers '0'",
"--buckets '0'",
"--location ''",
"--sleep '0'",
"--acl ''",
]
expected_env_vars = {
"DURATION": 0,
@ -407,6 +482,7 @@ class TestLoadConfig:
"WRITERS": 0,
"READERS": 0,
"DELETERS": 0,
"READ_AGE": 0,
"NO_VERIFY_SSL": False,
"PREGEN_JSON": "",
}
@ -423,6 +499,8 @@ class TestLoadConfig:
"--workers '0'",
"--buckets '0'",
"--location ''",
"--sleep '0'",
"--acl ''",
]
expected_env_vars = {
"DURATION": 0,
@ -442,6 +520,7 @@ class TestLoadConfig:
"WRITE_RATE": 0,
"READ_RATE": 0,
"DELETE_RATE": 0,
"READ_AGE": 0,
}
self._check_preset_params(load_params, expected_preset_args)
@ -456,6 +535,8 @@ class TestLoadConfig:
"--workers '0'",
"--containers '0'",
"--policy ''",
"--sleep '0'",
"--acl ''",
]
expected_env_vars = {
"DURATION": 0,
@ -467,6 +548,7 @@ class TestLoadConfig:
"WRITERS": 0,
"READERS": 0,
"DELETERS": 0,
"READ_AGE": 0,
"PREGEN_JSON": "",
}
@ -482,6 +564,8 @@ class TestLoadConfig:
"--workers '0'",
"--containers '0'",
"--policy ''",
"--sleep '0'",
"--acl ''",
]
expected_env_vars = {
"CONFIG_FILE": "",
@ -493,6 +577,7 @@ class TestLoadConfig:
"WRITERS": 0,
"READERS": 0,
"DELETERS": 0,
"READ_AGE": 0,
"PREGEN_JSON": "",
}
@ -531,6 +616,27 @@ class TestLoadConfig:
self._check_env_vars(load_params, expected_env_vars)
@pytest.mark.parametrize(
"load_params, load_type",
[(LoadScenario.gRPC, LoadType.gRPC)],
indirect=True,
)
@pytest.mark.parametrize(
"load_time, expected_seconds",
[
(300, 300),
("2d3h45min", 186300),
("1d6h", 108000),
("1d", 86400),
("1d1min", 86460),
("2h", 7200),
("2h2min", 7320),
],
)
def test_convert_time_to_seconds(self, load_params: LoadParams, load_time: str | int, expected_seconds: int):
load_params.load_time = load_time
assert load_params.load_time == expected_seconds
def _check_preset_params(self, load_params: LoadParams, expected_preset_args: list[str]):
preset_parameters = load_params.get_preset_arguments()
assert sorted(preset_parameters) == sorted(expected_preset_args)