forked from TrueCloudLab/frostfs-testlib
Compare commits
21 commits
89522b607c
...
4f3814690e
Author | SHA1 | Date | |
---|---|---|---|
4f3814690e | |||
d79fd87ede | |||
8ba2cb8030 | |||
6caa77dedf | |||
0d7a15877c | |||
82f9df088a | |||
e04fac0770 | |||
328e43fe67 | |||
c0a25ab699 | |||
40fa2c24cc | |||
be36a10f1e | |||
df8d99d83c | |||
d6a2cf92a2 | |||
a3bda0b34f | |||
a4d1082ed5 | |||
73c362c307 | |||
10a6efa333 | |||
663c144709 | |||
8e739adea5 | |||
3d63772f4a | |||
02f3ef6b40 |
31 changed files with 737 additions and 462 deletions
|
@ -51,11 +51,11 @@ basic = "frostfs_testlib.healthcheck.basic_healthcheck:BasicHealthcheck"
|
|||
config = "frostfs_testlib.storage.controllers.state_managers.config_state_manager:ConfigStateManager"
|
||||
|
||||
[project.entry-points."frostfs.testlib.services"]
|
||||
s = "frostfs_testlib.storage.dataclasses.frostfs_services:StorageNode"
|
||||
s3-gate = "frostfs_testlib.storage.dataclasses.frostfs_services:S3Gate"
|
||||
http-gate = "frostfs_testlib.storage.dataclasses.frostfs_services:HTTPGate"
|
||||
morph-chain = "frostfs_testlib.storage.dataclasses.frostfs_services:MorphChain"
|
||||
ir = "frostfs_testlib.storage.dataclasses.frostfs_services:InnerRing"
|
||||
frostfs-storage = "frostfs_testlib.storage.dataclasses.frostfs_services:StorageNode"
|
||||
frostfs-s3 = "frostfs_testlib.storage.dataclasses.frostfs_services:S3Gate"
|
||||
frostfs-http = "frostfs_testlib.storage.dataclasses.frostfs_services:HTTPGate"
|
||||
neo-go = "frostfs_testlib.storage.dataclasses.frostfs_services:MorphChain"
|
||||
frostfs-ir = "frostfs_testlib.storage.dataclasses.frostfs_services:InnerRing"
|
||||
|
||||
[tool.isort]
|
||||
profile = "black"
|
||||
|
|
|
@ -65,7 +65,6 @@ class FrostfsCliContainer(CliCommand):
|
|||
ttl: Optional[int] = None,
|
||||
xhdr: Optional[dict] = None,
|
||||
force: bool = False,
|
||||
timeout: Optional[str] = None,
|
||||
) -> CommandResult:
|
||||
"""
|
||||
Delete an existing container.
|
||||
|
@ -81,7 +80,6 @@ class FrostfsCliContainer(CliCommand):
|
|||
ttl: TTL value in request meta header (default 2).
|
||||
wallet: WIF (NEP-2) string or path to the wallet or binary key.
|
||||
xhdr: Dict with request X-Headers.
|
||||
timeout: Timeout for the operation (default 15s).
|
||||
|
||||
Returns:
|
||||
Command's result.
|
||||
|
@ -298,9 +296,5 @@ class FrostfsCliContainer(CliCommand):
|
|||
|
||||
return self._execute(
|
||||
f"container nodes {from_str}",
|
||||
**{
|
||||
param: value
|
||||
for param, value in locals().items()
|
||||
if param not in ["self", "from_file", "from_str"]
|
||||
},
|
||||
**{param: value for param, value in locals().items() if param not in ["self", "from_file", "from_str"]},
|
||||
)
|
||||
|
|
|
@ -124,9 +124,7 @@ class FrostfsCliObject(CliCommand):
|
|||
"""
|
||||
return self._execute(
|
||||
"object hash",
|
||||
**{
|
||||
param: value for param, value in locals().items() if param not in ["self", "params"]
|
||||
},
|
||||
**{param: value for param, value in locals().items() if param not in ["self", "params"]},
|
||||
)
|
||||
|
||||
def head(
|
||||
|
@ -355,8 +353,8 @@ class FrostfsCliObject(CliCommand):
|
|||
def nodes(
|
||||
self,
|
||||
rpc_endpoint: str,
|
||||
wallet: str,
|
||||
cid: str,
|
||||
wallet: Optional[str] = None,
|
||||
address: Optional[str] = None,
|
||||
bearer: Optional[str] = None,
|
||||
generate_key: Optional = None,
|
||||
|
|
|
@ -68,11 +68,7 @@ class FrostfsCliShards(CliCommand):
|
|||
return self._execute_with_password(
|
||||
"control shards set-mode",
|
||||
wallet_password,
|
||||
**{
|
||||
param: value
|
||||
for param, value in locals().items()
|
||||
if param not in ["self", "wallet_password"]
|
||||
},
|
||||
**{param: value for param, value in locals().items() if param not in ["self", "wallet_password"]},
|
||||
)
|
||||
|
||||
def dump(
|
||||
|
@ -105,18 +101,14 @@ class FrostfsCliShards(CliCommand):
|
|||
return self._execute_with_password(
|
||||
"control shards dump",
|
||||
wallet_password,
|
||||
**{
|
||||
param: value
|
||||
for param, value in locals().items()
|
||||
if param not in ["self", "wallet_password"]
|
||||
},
|
||||
**{param: value for param, value in locals().items() if param not in ["self", "wallet_password"]},
|
||||
)
|
||||
|
||||
def list(
|
||||
self,
|
||||
endpoint: str,
|
||||
wallet: str,
|
||||
wallet_password: str,
|
||||
wallet: Optional[str] = None,
|
||||
wallet_password: Optional[str] = None,
|
||||
address: Optional[str] = None,
|
||||
json_mode: bool = False,
|
||||
timeout: Optional[str] = None,
|
||||
|
@ -135,12 +127,13 @@ class FrostfsCliShards(CliCommand):
|
|||
Returns:
|
||||
Command's result.
|
||||
"""
|
||||
if not wallet_password:
|
||||
return self._execute(
|
||||
"control shards list",
|
||||
**{param: value for param, value in locals().items() if param not in ["self"]},
|
||||
)
|
||||
return self._execute_with_password(
|
||||
"control shards list",
|
||||
wallet_password,
|
||||
**{
|
||||
param: value
|
||||
for param, value in locals().items()
|
||||
if param not in ["self", "wallet_password"]
|
||||
},
|
||||
**{param: value for param, value in locals().items() if param not in ["self", "wallet_password"]},
|
||||
)
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
class Options:
|
||||
DEFAULT_SHELL_TIMEOUT = 90
|
||||
DEFAULT_SHELL_TIMEOUT = 120
|
||||
|
||||
@staticmethod
|
||||
def get_default_shell_timeout():
|
||||
|
|
|
@ -67,6 +67,7 @@ class HostConfig:
|
|||
clis: list[CLIConfig] = field(default_factory=list)
|
||||
attributes: dict[str, str] = field(default_factory=dict)
|
||||
interfaces: dict[str, str] = field(default_factory=dict)
|
||||
environment: dict[str, str] = field(default_factory=dict)
|
||||
|
||||
def __post_init__(self) -> None:
|
||||
self.services = [ServiceConfig(**service) for service in self.services or []]
|
||||
|
|
|
@ -152,9 +152,7 @@ class DockerHost(Host):
|
|||
timeout=service_attributes.start_timeout,
|
||||
)
|
||||
|
||||
def wait_for_service_to_be_in_state(
|
||||
self, systemd_service_name: str, expected_state: str, timeout: int
|
||||
) -> None:
|
||||
def wait_for_service_to_be_in_state(self, systemd_service_name: str, expected_state: str, timeout: int) -> None:
|
||||
raise NotImplementedError("Not implemented for docker")
|
||||
|
||||
def get_data_directory(self, service_name: str) -> str:
|
||||
|
@ -181,6 +179,12 @@ class DockerHost(Host):
|
|||
def delete_pilorama(self, service_name: str) -> None:
|
||||
raise NotImplementedError("Not implemented for docker")
|
||||
|
||||
def delete_file(self, file_path: str) -> None:
|
||||
raise NotImplementedError("Not implemented for docker")
|
||||
|
||||
def is_file_exist(self, file_path: str) -> None:
|
||||
raise NotImplementedError("Not implemented for docker")
|
||||
|
||||
def delete_storage_node_data(self, service_name: str, cache_only: bool = False) -> None:
|
||||
volume_path = self.get_data_directory(service_name)
|
||||
|
||||
|
@ -305,9 +309,7 @@ class DockerHost(Host):
|
|||
return container
|
||||
return None
|
||||
|
||||
def _wait_for_container_to_be_in_state(
|
||||
self, container_name: str, expected_state: str, timeout: int
|
||||
) -> None:
|
||||
def _wait_for_container_to_be_in_state(self, container_name: str, expected_state: str, timeout: int) -> None:
|
||||
iterations = 10
|
||||
iteration_wait_time = timeout / iterations
|
||||
|
||||
|
|
|
@ -219,12 +219,22 @@ class Host(ABC):
|
|||
"""
|
||||
|
||||
@abstractmethod
|
||||
def delete_pilorama(self, service_name: str) -> None:
|
||||
def delete_file(self, file_path: str) -> None:
|
||||
"""
|
||||
Deletes all pilorama.db files in the node.
|
||||
Deletes file with provided file path
|
||||
|
||||
Args:
|
||||
service_name: Name of storage node service.
|
||||
file_path: full path to the file to delete
|
||||
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
def is_file_exist(self, file_path: str) -> bool:
|
||||
"""
|
||||
Checks if file exist
|
||||
|
||||
Args:
|
||||
file_path: full path to the file to check
|
||||
|
||||
"""
|
||||
|
||||
|
|
94
src/frostfs_testlib/load/interfaces/summarized.py
Normal file
94
src/frostfs_testlib/load/interfaces/summarized.py
Normal file
|
@ -0,0 +1,94 @@
|
|||
from dataclasses import dataclass, field
|
||||
|
||||
from frostfs_testlib.load.load_config import LoadParams, LoadScenario
|
||||
from frostfs_testlib.load.load_metrics import get_metrics_object
|
||||
|
||||
|
||||
@dataclass
|
||||
class SummarizedErorrs:
|
||||
total: int = field(default_factory=int)
|
||||
percent: float = field(default_factory=float)
|
||||
threshold: float = field(default_factory=float)
|
||||
by_node: dict[str, int] = field(default_factory=dict)
|
||||
|
||||
def calc_stats(self, operations):
|
||||
self.total += sum(self.by_node.values())
|
||||
|
||||
if not operations:
|
||||
return
|
||||
|
||||
self.percent = self.total / operations * 100
|
||||
|
||||
|
||||
@dataclass
|
||||
class SummarizedLatencies:
|
||||
avg: float = field(default_factory=float)
|
||||
min: float = field(default_factory=float)
|
||||
max: float = field(default_factory=float)
|
||||
by_node: dict[str, dict[str, int]] = field(default_factory=dict)
|
||||
|
||||
def calc_stats(self):
|
||||
if not self.by_node:
|
||||
return
|
||||
|
||||
avgs = [lt["avg"] for lt in self.by_node.values()]
|
||||
self.avg = sum(avgs) / len(avgs)
|
||||
|
||||
minimal = [lt["min"] for lt in self.by_node.values()]
|
||||
self.min = min(minimal)
|
||||
|
||||
maximum = [lt["max"] for lt in self.by_node.values()]
|
||||
self.max = max(maximum)
|
||||
|
||||
|
||||
@dataclass
|
||||
class SummarizedStats:
|
||||
threads: int = field(default_factory=int)
|
||||
requested_rate: int = field(default_factory=int)
|
||||
operations: int = field(default_factory=int)
|
||||
rate: float = field(default_factory=float)
|
||||
throughput: float = field(default_factory=float)
|
||||
latencies: SummarizedLatencies = field(default_factory=SummarizedLatencies)
|
||||
errors: SummarizedErorrs = field(default_factory=SummarizedErorrs)
|
||||
passed: bool = True
|
||||
|
||||
def calc_stats(self):
|
||||
self.errors.calc_stats(self.operations)
|
||||
self.latencies.calc_stats()
|
||||
self.passed = self.errors.percent <= self.errors.threshold
|
||||
|
||||
@staticmethod
|
||||
def collect(load_params: LoadParams, load_summaries: dict) -> dict[str, "SummarizedStats"]:
|
||||
if load_params.scenario in [LoadScenario.gRPC_CAR, LoadScenario.S3_CAR]:
|
||||
delete_vus = max(load_params.preallocated_deleters or 0, load_params.max_deleters or 0)
|
||||
write_vus = max(load_params.preallocated_writers or 0, load_params.max_writers or 0)
|
||||
read_vus = max(load_params.preallocated_readers or 0, load_params.max_readers or 0)
|
||||
else:
|
||||
write_vus = load_params.writers
|
||||
read_vus = load_params.readers
|
||||
delete_vus = load_params.deleters
|
||||
|
||||
summarized = {
|
||||
"Write": SummarizedStats(threads=write_vus, requested_rate=load_params.write_rate),
|
||||
"Read": SummarizedStats(threads=read_vus, requested_rate=load_params.read_rate),
|
||||
"Delete": SummarizedStats(threads=delete_vus, requested_rate=load_params.delete_rate),
|
||||
}
|
||||
|
||||
for node_key, load_summary in load_summaries.items():
|
||||
metrics = get_metrics_object(load_params.scenario, load_summary)
|
||||
for operation in metrics.operations:
|
||||
target = summarized[operation._NAME]
|
||||
if not operation.total_iterations:
|
||||
continue
|
||||
target.operations += operation.total_iterations
|
||||
target.rate += operation.rate
|
||||
target.latencies.by_node[node_key] = operation.latency
|
||||
target.throughput += operation.throughput
|
||||
target.errors.threshold = load_params.error_threshold
|
||||
if operation.failed_iterations:
|
||||
target.errors.by_node[node_key] = operation.failed_iterations
|
||||
|
||||
for operation in summarized.values():
|
||||
operation.calc_stats()
|
||||
|
||||
return summarized
|
|
@ -3,11 +3,28 @@ import os
|
|||
from dataclasses import dataclass, field, fields, is_dataclass
|
||||
from enum import Enum
|
||||
from types import MappingProxyType
|
||||
from typing import Any, Optional, get_args
|
||||
from typing import Any, Callable, Optional, get_args
|
||||
|
||||
from frostfs_testlib.utils.converting_utils import calc_unit
|
||||
|
||||
|
||||
def convert_time_to_seconds(time: int | str | None) -> int:
|
||||
if time is None:
|
||||
return None
|
||||
if str(time).isdigit():
|
||||
seconds = int(time)
|
||||
else:
|
||||
days, hours, minutes = 0, 0, 0
|
||||
if "d" in time:
|
||||
days, time = time.split("d")
|
||||
if "h" in time:
|
||||
hours, time = time.split("h")
|
||||
if "min" in time:
|
||||
minutes = time.replace("min", "")
|
||||
seconds = int(days) * 86400 + int(hours) * 3600 + int(minutes) * 60
|
||||
return seconds
|
||||
|
||||
|
||||
class LoadType(Enum):
|
||||
gRPC = "grpc"
|
||||
S3 = "s3"
|
||||
|
@ -76,6 +93,7 @@ def metadata_field(
|
|||
scenario_variable: Optional[str] = None,
|
||||
string_repr: Optional[bool] = True,
|
||||
distributed: Optional[bool] = False,
|
||||
formatter: Optional[Callable] = None,
|
||||
):
|
||||
return field(
|
||||
default=None,
|
||||
|
@ -85,6 +103,7 @@ def metadata_field(
|
|||
"env_variable": scenario_variable,
|
||||
"string_repr": string_repr,
|
||||
"distributed": distributed,
|
||||
"formatter": formatter,
|
||||
},
|
||||
)
|
||||
|
||||
|
@ -128,6 +147,8 @@ class Preset:
|
|||
pregen_json: Optional[str] = metadata_field(all_load_scenarios, "out", "PREGEN_JSON", False)
|
||||
# Workers count for preset
|
||||
workers: Optional[int] = metadata_field(all_load_scenarios, "workers", None, False)
|
||||
# Acl for container/buckets
|
||||
acl: Optional[str] = metadata_field(all_load_scenarios, "acl", None, False)
|
||||
|
||||
# ------ GRPC ------
|
||||
# Amount of containers which should be created
|
||||
|
@ -147,6 +168,9 @@ class Preset:
|
|||
# Flag to control preset erorrs
|
||||
ignore_errors: Optional[bool] = metadata_field(all_load_scenarios, "ignore-errors", None, False)
|
||||
|
||||
# Flag to ensure created containers store data on local endpoints
|
||||
local: Optional[bool] = metadata_field(grpc_preset_scenarios, "local", None, False)
|
||||
|
||||
|
||||
@dataclass
|
||||
class LoadParams:
|
||||
|
@ -200,13 +224,21 @@ class LoadParams:
|
|||
|
||||
# ------- COMMON SCENARIO PARAMS -------
|
||||
# Load time is the maximum duration for k6 to give load. Default is the BACKGROUND_LOAD_DEFAULT_TIME value.
|
||||
load_time: Optional[int] = metadata_field(all_load_scenarios, None, "DURATION", False)
|
||||
load_time: Optional[int] = metadata_field(
|
||||
all_load_scenarios, None, "DURATION", False, formatter=convert_time_to_seconds
|
||||
)
|
||||
# Object size in KB for load and preset.
|
||||
object_size: Optional[int] = metadata_field(all_load_scenarios, "size", "WRITE_OBJ_SIZE", False)
|
||||
# For read operations, controls from which set get objects to read
|
||||
read_from: Optional[ReadFrom] = None
|
||||
# For read operations done from REGISTRY, controls delay which object should live before it will be used for read operation
|
||||
read_age: Optional[int] = metadata_field(all_load_scenarios, None, "READ_AGE", False)
|
||||
# Output registry K6 file. Filled automatically.
|
||||
registry_file: Optional[str] = metadata_field(all_scenarios, None, "REGISTRY_FILE", False)
|
||||
# In case if we want to use custom registry file left from another load run
|
||||
custom_registry: Optional[str] = None
|
||||
# In case if we want to use custom registry file left from another load run
|
||||
force_fresh_registry: Optional[bool] = None
|
||||
# Specifies the minimum duration of every single execution (i.e. iteration).
|
||||
# Any iterations that are shorter than this value will cause that VU to
|
||||
# sleep for the remainder of the time until the specified minimum duration is reached.
|
||||
|
@ -294,6 +326,11 @@ class LoadParams:
|
|||
|
||||
if self.read_from == ReadFrom.REGISTRY:
|
||||
self.registry_file = os.path.join(self.working_dir, f"{load_id}_registry.bolt")
|
||||
|
||||
# For now it's okay to have it this way
|
||||
if self.custom_registry is not None:
|
||||
self.registry_file = self.custom_registry
|
||||
|
||||
if self.read_from == ReadFrom.PRESET:
|
||||
self.registry_file = None
|
||||
|
||||
|
@ -375,6 +412,25 @@ class LoadParams:
|
|||
|
||||
return fields_with_data or []
|
||||
|
||||
def _get_field_formatter(self, field_name: str) -> Callable | None:
|
||||
data_fields = fields(self)
|
||||
formatters = [
|
||||
field.metadata["formatter"]
|
||||
for field in data_fields
|
||||
if field.name == field_name and "formatter" in field.metadata and field.metadata["formatter"] != None
|
||||
]
|
||||
if formatters:
|
||||
return formatters[0]
|
||||
|
||||
return None
|
||||
|
||||
def __setattr__(self, field_name, value):
|
||||
formatter = self._get_field_formatter(field_name)
|
||||
if formatter:
|
||||
value = formatter(value)
|
||||
|
||||
super().__setattr__(field_name, value)
|
||||
|
||||
def __str__(self) -> str:
|
||||
load_type_str = self.scenario.value if self.scenario else self.load_type.value
|
||||
# TODO: migrate load_params defaults to testlib
|
||||
|
|
|
@ -1,95 +1,43 @@
|
|||
from abc import ABC
|
||||
from typing import Any
|
||||
from typing import Any, Optional
|
||||
|
||||
from frostfs_testlib.load.load_config import LoadScenario
|
||||
|
||||
|
||||
class MetricsBase(ABC):
|
||||
_WRITE_SUCCESS = ""
|
||||
_WRITE_ERRORS = ""
|
||||
_WRITE_THROUGHPUT = "data_sent"
|
||||
_WRITE_LATENCY = ""
|
||||
|
||||
_READ_SUCCESS = ""
|
||||
_READ_ERRORS = ""
|
||||
_READ_LATENCY = ""
|
||||
_READ_THROUGHPUT = "data_received"
|
||||
|
||||
_DELETE_SUCCESS = ""
|
||||
_DELETE_LATENCY = ""
|
||||
_DELETE_ERRORS = ""
|
||||
class OperationMetric(ABC):
|
||||
_NAME = ""
|
||||
_SUCCESS = ""
|
||||
_ERRORS = ""
|
||||
_THROUGHPUT = ""
|
||||
_LATENCY = ""
|
||||
|
||||
def __init__(self, summary) -> None:
|
||||
self.summary = summary
|
||||
self.metrics = summary["metrics"]
|
||||
|
||||
@property
|
||||
def write_total_iterations(self) -> int:
|
||||
return self._get_metric(self._WRITE_SUCCESS) + self._get_metric(self._WRITE_ERRORS)
|
||||
def total_iterations(self) -> int:
|
||||
return self._get_metric(self._SUCCESS) + self._get_metric(self._ERRORS)
|
||||
|
||||
@property
|
||||
def write_success_iterations(self) -> int:
|
||||
return self._get_metric(self._WRITE_SUCCESS)
|
||||
def success_iterations(self) -> int:
|
||||
return self._get_metric(self._SUCCESS)
|
||||
|
||||
@property
|
||||
def write_latency(self) -> dict:
|
||||
return self._get_metric(self._WRITE_LATENCY)
|
||||
def latency(self) -> dict:
|
||||
return self._get_metric(self._LATENCY)
|
||||
|
||||
@property
|
||||
def write_rate(self) -> float:
|
||||
return self._get_metric_rate(self._WRITE_SUCCESS)
|
||||
def rate(self) -> float:
|
||||
return self._get_metric_rate(self._SUCCESS)
|
||||
|
||||
@property
|
||||
def write_failed_iterations(self) -> int:
|
||||
return self._get_metric(self._WRITE_ERRORS)
|
||||
def failed_iterations(self) -> int:
|
||||
return self._get_metric(self._ERRORS)
|
||||
|
||||
@property
|
||||
def write_throughput(self) -> float:
|
||||
return self._get_metric_rate(self._WRITE_THROUGHPUT)
|
||||
|
||||
@property
|
||||
def read_total_iterations(self) -> int:
|
||||
return self._get_metric(self._READ_SUCCESS) + self._get_metric(self._READ_ERRORS)
|
||||
|
||||
@property
|
||||
def read_success_iterations(self) -> int:
|
||||
return self._get_metric(self._READ_SUCCESS)
|
||||
|
||||
@property
|
||||
def read_latency(self) -> dict:
|
||||
return self._get_metric(self._READ_LATENCY)
|
||||
|
||||
@property
|
||||
def read_rate(self) -> int:
|
||||
return self._get_metric_rate(self._READ_SUCCESS)
|
||||
|
||||
@property
|
||||
def read_failed_iterations(self) -> int:
|
||||
return self._get_metric(self._READ_ERRORS)
|
||||
|
||||
@property
|
||||
def read_throughput(self) -> float:
|
||||
return self._get_metric_rate(self._READ_THROUGHPUT)
|
||||
|
||||
@property
|
||||
def delete_total_iterations(self) -> int:
|
||||
return self._get_metric(self._DELETE_SUCCESS) + self._get_metric(self._DELETE_ERRORS)
|
||||
|
||||
@property
|
||||
def delete_success_iterations(self) -> int:
|
||||
return self._get_metric(self._DELETE_SUCCESS)
|
||||
|
||||
@property
|
||||
def delete_latency(self) -> dict:
|
||||
return self._get_metric(self._DELETE_LATENCY)
|
||||
|
||||
@property
|
||||
def delete_failed_iterations(self) -> int:
|
||||
return self._get_metric(self._DELETE_ERRORS)
|
||||
|
||||
@property
|
||||
def delete_rate(self) -> int:
|
||||
return self._get_metric_rate(self._DELETE_SUCCESS)
|
||||
def throughput(self) -> float:
|
||||
return self._get_metric_rate(self._THROUGHPUT)
|
||||
|
||||
def _get_metric(self, metric: str) -> int:
|
||||
metrics_method_map = {
|
||||
|
@ -104,9 +52,7 @@ class MetricsBase(ABC):
|
|||
metric = self.metrics[metric]
|
||||
metric_type = metric["type"]
|
||||
if metric_type not in metrics_method_map:
|
||||
raise Exception(
|
||||
f"Unsupported metric type: {metric_type}, supported: {metrics_method_map.keys()}"
|
||||
)
|
||||
raise Exception(f"Unsupported metric type: {metric_type}, supported: {metrics_method_map.keys()}")
|
||||
|
||||
return metrics_method_map[metric_type](metric)
|
||||
|
||||
|
@ -119,9 +65,7 @@ class MetricsBase(ABC):
|
|||
metric = self.metrics[metric]
|
||||
metric_type = metric["type"]
|
||||
if metric_type not in metrics_method_map:
|
||||
raise Exception(
|
||||
f"Unsupported rate metric type: {metric_type}, supported: {metrics_method_map.keys()}"
|
||||
)
|
||||
raise Exception(f"Unsupported rate metric type: {metric_type}, supported: {metrics_method_map.keys()}")
|
||||
|
||||
return metrics_method_map[metric_type](metric)
|
||||
|
||||
|
@ -138,63 +82,145 @@ class MetricsBase(ABC):
|
|||
return metric["values"]
|
||||
|
||||
|
||||
class WriteOperationMetric(OperationMetric):
|
||||
_NAME = "Write"
|
||||
_SUCCESS = ""
|
||||
_ERRORS = ""
|
||||
_THROUGHPUT = "data_sent"
|
||||
_LATENCY = ""
|
||||
|
||||
|
||||
class ReadOperationMetric(OperationMetric):
|
||||
_NAME = "Read"
|
||||
_SUCCESS = ""
|
||||
_ERRORS = ""
|
||||
_THROUGHPUT = "data_received"
|
||||
_LATENCY = ""
|
||||
|
||||
|
||||
class DeleteOperationMetric(OperationMetric):
|
||||
_NAME = "Delete"
|
||||
_SUCCESS = ""
|
||||
_ERRORS = ""
|
||||
_THROUGHPUT = ""
|
||||
_LATENCY = ""
|
||||
|
||||
|
||||
class GrpcWriteOperationMetric(WriteOperationMetric):
|
||||
_SUCCESS = "frostfs_obj_put_success"
|
||||
_ERRORS = "frostfs_obj_put_fails"
|
||||
_LATENCY = "frostfs_obj_put_duration"
|
||||
|
||||
|
||||
class GrpcReadOperationMetric(ReadOperationMetric):
|
||||
_SUCCESS = "frostfs_obj_get_success"
|
||||
_ERRORS = "frostfs_obj_get_fails"
|
||||
_LATENCY = "frostfs_obj_get_duration"
|
||||
|
||||
|
||||
class GrpcDeleteOperationMetric(DeleteOperationMetric):
|
||||
_SUCCESS = "frostfs_obj_delete_success"
|
||||
_ERRORS = "frostfs_obj_delete_fails"
|
||||
_LATENCY = "frostfs_obj_delete_duration"
|
||||
|
||||
|
||||
class S3WriteOperationMetric(WriteOperationMetric):
|
||||
_SUCCESS = "aws_obj_put_success"
|
||||
_ERRORS = "aws_obj_put_fails"
|
||||
_LATENCY = "aws_obj_put_duration"
|
||||
|
||||
|
||||
class S3ReadOperationMetric(ReadOperationMetric):
|
||||
_SUCCESS = "aws_obj_get_success"
|
||||
_ERRORS = "aws_obj_get_fails"
|
||||
_LATENCY = "aws_obj_get_duration"
|
||||
|
||||
|
||||
class S3DeleteOperationMetric(DeleteOperationMetric):
|
||||
_SUCCESS = "aws_obj_delete_success"
|
||||
_ERRORS = "aws_obj_delete_fails"
|
||||
_LATENCY = "aws_obj_delete_duration"
|
||||
|
||||
|
||||
class S3LocalWriteOperationMetric(WriteOperationMetric):
|
||||
_SUCCESS = "s3local_obj_put_success"
|
||||
_ERRORS = "s3local_obj_put_fails"
|
||||
_LATENCY = "s3local_obj_put_duration"
|
||||
|
||||
|
||||
class S3LocalReadOperationMetric(ReadOperationMetric):
|
||||
_SUCCESS = "s3local_obj_get_success"
|
||||
_ERRORS = "s3local_obj_get_fails"
|
||||
_LATENCY = "s3local_obj_get_duration"
|
||||
|
||||
|
||||
class LocalWriteOperationMetric(WriteOperationMetric):
|
||||
_SUCCESS = "local_obj_put_success"
|
||||
_ERRORS = "local_obj_put_fails"
|
||||
_LATENCY = "local_obj_put_duration"
|
||||
|
||||
|
||||
class LocalReadOperationMetric(ReadOperationMetric):
|
||||
_SUCCESS = "local_obj_get_success"
|
||||
_ERRORS = "local_obj_get_fails"
|
||||
|
||||
|
||||
class LocalDeleteOperationMetric(DeleteOperationMetric):
|
||||
_SUCCESS = "local_obj_delete_success"
|
||||
_ERRORS = "local_obj_delete_fails"
|
||||
|
||||
|
||||
class VerifyReadOperationMetric(ReadOperationMetric):
|
||||
_SUCCESS = "verified_obj"
|
||||
_ERRORS = "invalid_obj"
|
||||
|
||||
|
||||
class MetricsBase(ABC):
|
||||
def __init__(self) -> None:
|
||||
self.write: Optional[WriteOperationMetric] = None
|
||||
self.read: Optional[ReadOperationMetric] = None
|
||||
self.delete: Optional[DeleteOperationMetric] = None
|
||||
|
||||
@property
|
||||
def operations(self) -> list[OperationMetric]:
|
||||
return [metric for metric in [self.write, self.read, self.delete] if metric is not None]
|
||||
|
||||
|
||||
class GrpcMetrics(MetricsBase):
|
||||
_WRITE_SUCCESS = "frostfs_obj_put_total"
|
||||
_WRITE_ERRORS = "frostfs_obj_put_fails"
|
||||
_WRITE_LATENCY = "frostfs_obj_put_duration"
|
||||
|
||||
_READ_SUCCESS = "frostfs_obj_get_total"
|
||||
_READ_ERRORS = "frostfs_obj_get_fails"
|
||||
_READ_LATENCY = "frostfs_obj_get_duration"
|
||||
|
||||
_DELETE_SUCCESS = "frostfs_obj_delete_total"
|
||||
_DELETE_ERRORS = "frostfs_obj_delete_fails"
|
||||
_DELETE_LATENCY = "frostfs_obj_delete_duration"
|
||||
def __init__(self, summary) -> None:
|
||||
super().__init__()
|
||||
self.write = GrpcWriteOperationMetric(summary)
|
||||
self.read = GrpcReadOperationMetric(summary)
|
||||
self.delete = GrpcDeleteOperationMetric(summary)
|
||||
|
||||
|
||||
class S3Metrics(MetricsBase):
|
||||
_WRITE_SUCCESS = "aws_obj_put_total"
|
||||
_WRITE_ERRORS = "aws_obj_put_fails"
|
||||
_WRITE_LATENCY = "aws_obj_put_duration"
|
||||
def __init__(self, summary) -> None:
|
||||
super().__init__()
|
||||
self.write = S3WriteOperationMetric(summary)
|
||||
self.read = S3ReadOperationMetric(summary)
|
||||
self.delete = S3DeleteOperationMetric(summary)
|
||||
|
||||
_READ_SUCCESS = "aws_obj_get_total"
|
||||
_READ_ERRORS = "aws_obj_get_fails"
|
||||
_READ_LATENCY = "aws_obj_get_duration"
|
||||
|
||||
_DELETE_SUCCESS = "aws_obj_delete_total"
|
||||
_DELETE_ERRORS = "aws_obj_delete_fails"
|
||||
_DELETE_LATENCY = "aws_obj_delete_duration"
|
||||
|
||||
class S3LocalMetrics(MetricsBase):
|
||||
_WRITE_SUCCESS = "s3local_obj_put_total"
|
||||
_WRITE_ERRORS = "s3local_obj_put_fails"
|
||||
_WRITE_LATENCY = "s3local_obj_put_duration"
|
||||
def __init__(self, summary) -> None:
|
||||
super().__init__()
|
||||
self.write = S3LocalWriteOperationMetric(summary)
|
||||
self.read = S3LocalReadOperationMetric(summary)
|
||||
|
||||
_READ_SUCCESS = "s3local_obj_get_total"
|
||||
_READ_ERRORS = "s3local_obj_get_fails"
|
||||
_READ_LATENCY = "s3local_obj_get_duration"
|
||||
|
||||
class LocalMetrics(MetricsBase):
|
||||
_WRITE_SUCCESS = "local_obj_put_total"
|
||||
_WRITE_ERRORS = "local_obj_put_fails"
|
||||
_WRITE_LATENCY = "local_obj_put_duration"
|
||||
|
||||
_READ_SUCCESS = "local_obj_get_total"
|
||||
_READ_ERRORS = "local_obj_get_fails"
|
||||
|
||||
_DELETE_SUCCESS = "local_obj_delete_total"
|
||||
_DELETE_ERRORS = "local_obj_delete_fails"
|
||||
def __init__(self, summary) -> None:
|
||||
super().__init__()
|
||||
self.write = LocalWriteOperationMetric(summary)
|
||||
self.read = LocalReadOperationMetric(summary)
|
||||
self.delete = LocalDeleteOperationMetric(summary)
|
||||
|
||||
|
||||
class VerifyMetrics(MetricsBase):
|
||||
_WRITE_SUCCESS = "N/A"
|
||||
_WRITE_ERRORS = "N/A"
|
||||
|
||||
_READ_SUCCESS = "verified_obj"
|
||||
_READ_ERRORS = "invalid_obj"
|
||||
|
||||
_DELETE_SUCCESS = "N/A"
|
||||
_DELETE_ERRORS = "N/A"
|
||||
def __init__(self, summary) -> None:
|
||||
super().__init__()
|
||||
self.read = VerifyReadOperationMetric(summary)
|
||||
|
||||
|
||||
def get_metrics_object(load_type: LoadScenario, summary: dict[str, Any]) -> MetricsBase:
|
||||
|
|
|
@ -3,8 +3,8 @@ from typing import Optional
|
|||
|
||||
import yaml
|
||||
|
||||
from frostfs_testlib.load.interfaces.summarized import SummarizedStats
|
||||
from frostfs_testlib.load.load_config import K6ProcessAllocationStrategy, LoadParams, LoadScenario
|
||||
from frostfs_testlib.load.load_metrics import get_metrics_object
|
||||
from frostfs_testlib.utils.converting_utils import calc_unit
|
||||
|
||||
|
||||
|
@ -114,63 +114,46 @@ class LoadReport:
|
|||
|
||||
return model_map[self.load_params.scenario]
|
||||
|
||||
def _get_operations_sub_section_html(
|
||||
self,
|
||||
operation_type: str,
|
||||
total_operations: int,
|
||||
requested_rate_str: str,
|
||||
vus_str: str,
|
||||
total_rate: float,
|
||||
throughput: float,
|
||||
errors: dict[str, int],
|
||||
latency: dict[str, dict],
|
||||
):
|
||||
def _get_operations_sub_section_html(self, operation_type: str, stats: SummarizedStats):
|
||||
throughput_html = ""
|
||||
if throughput > 0:
|
||||
throughput, unit = calc_unit(throughput)
|
||||
if stats.throughput > 0:
|
||||
throughput, unit = calc_unit(stats.throughput)
|
||||
throughput_html = self._row("Throughput", f"{throughput:.2f} {unit}/sec")
|
||||
|
||||
per_node_errors_html = ""
|
||||
total_errors = 0
|
||||
if errors:
|
||||
total_errors: int = 0
|
||||
for node_key, errors in errors.items():
|
||||
total_errors += errors
|
||||
if self.load_params.k6_process_allocation_strategy == K6ProcessAllocationStrategy.PER_ENDPOINT:
|
||||
per_node_errors_html += self._row(f"At {node_key}", errors)
|
||||
for node_key, errors in stats.errors.by_node.items():
|
||||
if self.load_params.k6_process_allocation_strategy == K6ProcessAllocationStrategy.PER_ENDPOINT:
|
||||
per_node_errors_html += self._row(f"At {node_key}", errors)
|
||||
|
||||
latency_html = ""
|
||||
if latency:
|
||||
for node_key, latency_dict in latency.items():
|
||||
latency_values = "N/A"
|
||||
if latency_dict:
|
||||
latency_values = ""
|
||||
for param_name, param_val in latency_dict.items():
|
||||
latency_values += f"{param_name}={param_val:.2f}ms "
|
||||
for node_key, latencies in stats.latencies.by_node.items():
|
||||
latency_values = "N/A"
|
||||
if latencies:
|
||||
latency_values = ""
|
||||
for param_name, param_val in latencies.items():
|
||||
latency_values += f"{param_name}={param_val:.2f}ms "
|
||||
|
||||
latency_html += self._row(f"{operation_type} latency {node_key.split(':')[0]}", latency_values)
|
||||
latency_html += self._row(f"{operation_type} latency {node_key.split(':')[0]}", latency_values)
|
||||
|
||||
object_size, object_size_unit = calc_unit(self.load_params.object_size, 1)
|
||||
duration = self._seconds_to_formatted_duration(self.load_params.load_time)
|
||||
model = self._get_model_string()
|
||||
requested_rate_str = f"{stats.requested_rate}op/sec" if stats.requested_rate else ""
|
||||
# write 8KB 15h49m 50op/sec 50th open model/closed model/min_iteration duration=1s - 1.636MB/s 199.57451/s
|
||||
short_summary = f"{operation_type} {object_size}{object_size_unit} {duration} {requested_rate_str} {vus_str} {model} - {throughput:.2f}{unit}/s {total_rate:.2f}/s"
|
||||
errors_percent = 0
|
||||
if total_operations:
|
||||
errors_percent = total_errors / total_operations * 100.0
|
||||
short_summary = f"{operation_type} {object_size}{object_size_unit} {duration} {requested_rate_str} {stats.threads}th {model} - {throughput:.2f}{unit}/s {stats.rate:.2f}/s"
|
||||
|
||||
html = f"""
|
||||
<table border="1" cellpadding="5px"><tbody>
|
||||
<tr><th colspan="2" bgcolor="gainsboro">{short_summary}</th></tr>
|
||||
<tr><th colspan="2" bgcolor="gainsboro">Metrics</th></tr>
|
||||
{self._row("Total operations", total_operations)}
|
||||
{self._row("OP/sec", f"{total_rate:.2f}")}
|
||||
{self._row("Total operations", stats.operations)}
|
||||
{self._row("OP/sec", f"{stats.rate:.2f}")}
|
||||
{throughput_html}
|
||||
{latency_html}
|
||||
<tr><th colspan="2" bgcolor="gainsboro">Errors</th></tr>
|
||||
{per_node_errors_html}
|
||||
{self._row("Total", f"{total_errors} ({errors_percent:.2f}%)")}
|
||||
{self._row("Threshold", f"{self.load_params.error_threshold:.2f}%")}
|
||||
{self._row("Total", f"{stats.errors.total} ({stats.errors.percent:.2f}%)")}
|
||||
{self._row("Threshold", f"{stats.errors.threshold:.2f}%")}
|
||||
</tbody></table><br><hr>
|
||||
"""
|
||||
|
||||
|
@ -178,111 +161,12 @@ class LoadReport:
|
|||
|
||||
def _get_totals_section_html(self):
|
||||
html = ""
|
||||
for i, load_summaries in enumerate(self.load_summaries_list, 1):
|
||||
html += f"<h3>Load Results for load #{i}</h3>"
|
||||
for i in range(len(self.load_summaries_list)):
|
||||
html += f"<h3>Load Results for load #{i+1}</h3>"
|
||||
|
||||
write_operations = 0
|
||||
write_op_sec = 0
|
||||
write_throughput = 0
|
||||
write_latency = {}
|
||||
write_errors = {}
|
||||
requested_write_rate = self.load_params.write_rate
|
||||
requested_write_rate_str = f"{requested_write_rate}op/sec" if requested_write_rate else ""
|
||||
|
||||
read_operations = 0
|
||||
read_op_sec = 0
|
||||
read_throughput = 0
|
||||
read_latency = {}
|
||||
read_errors = {}
|
||||
requested_read_rate = self.load_params.read_rate
|
||||
requested_read_rate_str = f"{requested_read_rate}op/sec" if requested_read_rate else ""
|
||||
|
||||
delete_operations = 0
|
||||
delete_op_sec = 0
|
||||
delete_latency = {}
|
||||
delete_errors = {}
|
||||
requested_delete_rate = self.load_params.delete_rate
|
||||
requested_delete_rate_str = f"{requested_delete_rate}op/sec" if requested_delete_rate else ""
|
||||
|
||||
if self.load_params.scenario in [LoadScenario.gRPC_CAR, LoadScenario.S3_CAR]:
|
||||
delete_vus = max(self.load_params.preallocated_deleters or 0, self.load_params.max_deleters or 0)
|
||||
write_vus = max(self.load_params.preallocated_writers or 0, self.load_params.max_writers or 0)
|
||||
read_vus = max(self.load_params.preallocated_readers or 0, self.load_params.max_readers or 0)
|
||||
else:
|
||||
write_vus = self.load_params.writers
|
||||
read_vus = self.load_params.readers
|
||||
delete_vus = self.load_params.deleters
|
||||
|
||||
write_vus_str = f"{write_vus}th"
|
||||
read_vus_str = f"{read_vus}th"
|
||||
delete_vus_str = f"{delete_vus}th"
|
||||
|
||||
write_section_required = False
|
||||
read_section_required = False
|
||||
delete_section_required = False
|
||||
|
||||
for node_key, load_summary in load_summaries.items():
|
||||
metrics = get_metrics_object(self.load_params.scenario, load_summary)
|
||||
write_operations += metrics.write_total_iterations
|
||||
if write_operations:
|
||||
write_section_required = True
|
||||
write_op_sec += metrics.write_rate
|
||||
write_latency[node_key] = metrics.write_latency
|
||||
write_throughput += metrics.write_throughput
|
||||
if metrics.write_failed_iterations:
|
||||
write_errors[node_key] = metrics.write_failed_iterations
|
||||
|
||||
read_operations += metrics.read_total_iterations
|
||||
if read_operations:
|
||||
read_section_required = True
|
||||
read_op_sec += metrics.read_rate
|
||||
read_throughput += metrics.read_throughput
|
||||
read_latency[node_key] = metrics.read_latency
|
||||
if metrics.read_failed_iterations:
|
||||
read_errors[node_key] = metrics.read_failed_iterations
|
||||
|
||||
delete_operations += metrics.delete_total_iterations
|
||||
if delete_operations:
|
||||
delete_section_required = True
|
||||
delete_op_sec += metrics.delete_rate
|
||||
delete_latency[node_key] = metrics.delete_latency
|
||||
if metrics.delete_failed_iterations:
|
||||
delete_errors[node_key] = metrics.delete_failed_iterations
|
||||
|
||||
if write_section_required:
|
||||
html += self._get_operations_sub_section_html(
|
||||
"Write",
|
||||
write_operations,
|
||||
requested_write_rate_str,
|
||||
write_vus_str,
|
||||
write_op_sec,
|
||||
write_throughput,
|
||||
write_errors,
|
||||
write_latency,
|
||||
)
|
||||
|
||||
if read_section_required:
|
||||
html += self._get_operations_sub_section_html(
|
||||
"Read",
|
||||
read_operations,
|
||||
requested_read_rate_str,
|
||||
read_vus_str,
|
||||
read_op_sec,
|
||||
read_throughput,
|
||||
read_errors,
|
||||
read_latency,
|
||||
)
|
||||
|
||||
if delete_section_required:
|
||||
html += self._get_operations_sub_section_html(
|
||||
"Delete",
|
||||
delete_operations,
|
||||
requested_delete_rate_str,
|
||||
delete_vus_str,
|
||||
delete_op_sec,
|
||||
0,
|
||||
delete_errors,
|
||||
delete_latency,
|
||||
)
|
||||
summarized = SummarizedStats.collect(self.load_params, self.load_summaries_list[i])
|
||||
for operation_type, stats in summarized.items():
|
||||
if stats.operations:
|
||||
html += self._get_operations_sub_section_html(operation_type, stats)
|
||||
|
||||
return html
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
from frostfs_testlib import reporter
|
||||
from frostfs_testlib.load.interfaces.summarized import SummarizedStats
|
||||
from frostfs_testlib.load.load_config import LoadParams, LoadScenario
|
||||
from frostfs_testlib.load.load_metrics import get_metrics_object
|
||||
|
||||
|
@ -8,56 +9,16 @@ class LoadVerifier:
|
|||
self.load_params = load_params
|
||||
|
||||
def collect_load_issues(self, load_summaries: dict[str, dict]) -> list[str]:
|
||||
write_operations = 0
|
||||
write_errors = 0
|
||||
|
||||
read_operations = 0
|
||||
read_errors = 0
|
||||
|
||||
delete_operations = 0
|
||||
delete_errors = 0
|
||||
|
||||
writers = self.load_params.writers or self.load_params.preallocated_writers or 0
|
||||
readers = self.load_params.readers or self.load_params.preallocated_readers or 0
|
||||
deleters = self.load_params.deleters or self.load_params.preallocated_deleters or 0
|
||||
|
||||
for load_summary in load_summaries.values():
|
||||
metrics = get_metrics_object(self.load_params.scenario, load_summary)
|
||||
|
||||
if writers:
|
||||
write_operations += metrics.write_total_iterations
|
||||
write_errors += metrics.write_failed_iterations
|
||||
|
||||
if readers:
|
||||
read_operations += metrics.read_total_iterations
|
||||
read_errors += metrics.read_failed_iterations
|
||||
|
||||
if deleters:
|
||||
delete_operations += metrics.delete_total_iterations
|
||||
delete_errors += metrics.delete_failed_iterations
|
||||
|
||||
summarized = SummarizedStats.collect(self.load_params, load_summaries)
|
||||
issues = []
|
||||
if writers and not write_operations:
|
||||
issues.append(f"No any write operation was performed")
|
||||
if readers and not read_operations:
|
||||
issues.append(f"No any read operation was performed")
|
||||
if deleters and not delete_operations:
|
||||
issues.append(f"No any delete operation was performed")
|
||||
|
||||
error_rate = self._get_error_rate(writers, write_operations, write_errors)
|
||||
if error_rate > self.load_params.error_threshold:
|
||||
rate_str = self._get_rate_str(error_rate)
|
||||
issues.append(f"Write errors exceeded threshold: {rate_str} > {self.load_params.error_threshold}%")
|
||||
for operation_type, stats in summarized.items():
|
||||
if stats.threads and not stats.operations:
|
||||
issues.append(f"No any {operation_type.lower()} operation was performed")
|
||||
|
||||
error_rate = self._get_error_rate(readers, read_operations, read_errors)
|
||||
if error_rate > self.load_params.error_threshold:
|
||||
rate_str = self._get_rate_str(error_rate)
|
||||
issues.append(f"Read errors exceeded threshold: {rate_str} > {self.load_params.error_threshold}%")
|
||||
|
||||
error_rate = self._get_error_rate(deleters, delete_operations, delete_errors)
|
||||
if error_rate > self.load_params.error_threshold:
|
||||
rate_str = self._get_rate_str(error_rate)
|
||||
issues.append(f"Delete errors exceeded threshold: {rate_str} > {self.load_params.error_threshold}%")
|
||||
if stats.errors.percent > stats.errors.threshold:
|
||||
rate_str = self._get_rate_str(stats.errors.percent)
|
||||
issues.append(f"{operation_type} errors exceeded threshold: {rate_str} > {stats.errors.threshold}%")
|
||||
|
||||
return issues
|
||||
|
||||
|
@ -74,13 +35,6 @@ class LoadVerifier:
|
|||
)
|
||||
return verify_issues
|
||||
|
||||
def _get_error_rate(self, vus: int, operations: int, errors: int) -> float:
|
||||
if not operations or not vus:
|
||||
return 0
|
||||
|
||||
error_rate = errors / operations * 100
|
||||
return error_rate
|
||||
|
||||
def _get_rate_str(self, rate: float, minimal: float = 0.01) -> str:
|
||||
return f"{rate:.2f}%" if rate >= minimal else f"~{minimal}%"
|
||||
|
||||
|
@ -95,13 +49,13 @@ class LoadVerifier:
|
|||
delete_success = 0
|
||||
|
||||
if deleters > 0:
|
||||
delete_success = load_metrics.delete_success_iterations
|
||||
delete_success = load_metrics.delete.success_iterations
|
||||
|
||||
if verification_summary:
|
||||
verify_metrics = get_metrics_object(LoadScenario.VERIFY, verification_summary)
|
||||
verified_objects = verify_metrics.read_success_iterations
|
||||
invalid_objects = verify_metrics.read_failed_iterations
|
||||
total_left_objects = load_metrics.write_success_iterations - delete_success
|
||||
verified_objects = verify_metrics.read.success_iterations
|
||||
invalid_objects = verify_metrics.read.failed_iterations
|
||||
total_left_objects = load_metrics.write.success_iterations - delete_success
|
||||
|
||||
# Due to interruptions we may see total verified objects to be less than written on writers count
|
||||
if abs(total_left_objects - verified_objects) > writers:
|
||||
|
|
|
@ -4,6 +4,7 @@ import math
|
|||
import re
|
||||
import time
|
||||
from dataclasses import fields
|
||||
from threading import Event
|
||||
from typing import Optional
|
||||
from urllib.parse import urlparse
|
||||
|
||||
|
@ -30,7 +31,6 @@ from frostfs_testlib.testing import parallel, run_optionally
|
|||
from frostfs_testlib.testing.test_control import retry
|
||||
from frostfs_testlib.utils import datetime_utils
|
||||
from frostfs_testlib.utils.file_keeper import FileKeeper
|
||||
from threading import Event
|
||||
|
||||
|
||||
class RunnerBase(ScenarioRunner):
|
||||
|
@ -78,6 +78,10 @@ class DefaultRunner(RunnerBase):
|
|||
nodes_under_load: list[ClusterNode],
|
||||
k6_dir: str,
|
||||
):
|
||||
if load_params.force_fresh_registry and load_params.custom_registry:
|
||||
with reporter.step("Forcing fresh registry files"):
|
||||
parallel(self._force_fresh_registry, self.loaders, load_params)
|
||||
|
||||
if load_params.load_type != LoadType.S3:
|
||||
return
|
||||
|
||||
|
@ -88,6 +92,11 @@ class DefaultRunner(RunnerBase):
|
|||
|
||||
parallel(self._prepare_loader, self.loaders, load_params, grpc_peer, s3_public_keys, k6_dir)
|
||||
|
||||
def _force_fresh_registry(self, loader: Loader, load_params: LoadParams):
|
||||
with reporter.step(f"Forcing fresh registry on {loader.ip}"):
|
||||
shell = loader.get_shell()
|
||||
shell.exec(f"rm -f {load_params.registry_file}")
|
||||
|
||||
def _prepare_loader(
|
||||
self,
|
||||
loader: Loader,
|
||||
|
@ -314,7 +323,7 @@ class LocalRunner(RunnerBase):
|
|||
with reporter.step("Download K6"):
|
||||
shell.exec(f"sudo rm -rf {k6_dir};sudo mkdir {k6_dir}")
|
||||
shell.exec(f"sudo curl -so {k6_dir}/k6.tar.gz {load_params.k6_url}")
|
||||
shell.exec(f"sudo tar xf {k6_dir}/k6.tar.gz -C {k6_dir}")
|
||||
shell.exec(f"sudo tar xf {k6_dir}/k6.tar.gz --strip-components 2 -C {k6_dir}")
|
||||
shell.exec(f"sudo chmod -R 777 {k6_dir}")
|
||||
|
||||
with reporter.step("Create empty_passwd"):
|
||||
|
|
|
@ -9,4 +9,4 @@ FROSTFS_ADM_EXEC = os.getenv("FROSTFS_ADM_EXEC", "frostfs-adm")
|
|||
# Config for frostfs-adm utility. Optional if tests are running against devenv
|
||||
FROSTFS_ADM_CONFIG_PATH = os.getenv("FROSTFS_ADM_CONFIG_PATH")
|
||||
|
||||
CLI_DEFAULT_TIMEOUT = os.getenv("CLI_DEFAULT_TIMEOUT", None)
|
||||
CLI_DEFAULT_TIMEOUT = os.getenv("CLI_DEFAULT_TIMEOUT", "100s")
|
||||
|
|
|
@ -9,7 +9,7 @@ class SudoInspector(CommandInspector):
|
|||
|
||||
def inspect(self, original_command: str, command: str) -> str:
|
||||
if not command.startswith("sudo"):
|
||||
return f"sudo -i {command}"
|
||||
return f"sudo {command}"
|
||||
return command
|
||||
|
||||
|
||||
|
|
|
@ -185,6 +185,7 @@ class SSHShell(Shell):
|
|||
private_key_passphrase: Optional[str] = None,
|
||||
port: str = "22",
|
||||
command_inspectors: Optional[list[CommandInspector]] = None,
|
||||
custom_environment: Optional[dict] = None
|
||||
) -> None:
|
||||
super().__init__()
|
||||
self.connection_provider = SshConnectionProvider()
|
||||
|
@ -196,6 +197,8 @@ class SSHShell(Shell):
|
|||
|
||||
self.command_inspectors = command_inspectors or []
|
||||
|
||||
self.environment = custom_environment
|
||||
|
||||
@property
|
||||
def _connection(self):
|
||||
return self.connection_provider.provide(self.host, self.port)
|
||||
|
@ -224,7 +227,7 @@ class SSHShell(Shell):
|
|||
|
||||
@log_command
|
||||
def _exec_interactive(self, command: str, options: CommandOptions) -> CommandResult:
|
||||
stdin, stdout, stderr = self._connection.exec_command(command, timeout=options.timeout, get_pty=True)
|
||||
stdin, stdout, stderr = self._connection.exec_command(command, timeout=options.timeout, get_pty=True, environment=self.environment)
|
||||
for interactive_input in options.interactive_inputs:
|
||||
input = interactive_input.input
|
||||
if not input.endswith("\n"):
|
||||
|
@ -251,7 +254,7 @@ class SSHShell(Shell):
|
|||
@log_command
|
||||
def _exec_non_interactive(self, command: str, options: CommandOptions) -> CommandResult:
|
||||
try:
|
||||
stdin, stdout, stderr = self._connection.exec_command(command, timeout=options.timeout)
|
||||
stdin, stdout, stderr = self._connection.exec_command(command, timeout=options.timeout, environment=self.environment)
|
||||
|
||||
if options.close_stdin:
|
||||
stdin.close()
|
||||
|
|
|
@ -5,6 +5,8 @@ from dataclasses import dataclass
|
|||
from time import sleep
|
||||
from typing import Optional, Union
|
||||
|
||||
import requests
|
||||
|
||||
from frostfs_testlib import reporter
|
||||
from frostfs_testlib.cli import FrostfsCli
|
||||
from frostfs_testlib.resources.cli import CLI_DEFAULT_TIMEOUT, FROSTFS_CLI_EXEC
|
||||
|
@ -290,18 +292,17 @@ def delete_container(
|
|||
force: bool = False,
|
||||
session_token: Optional[str] = None,
|
||||
await_mode: bool = False,
|
||||
timeout: Optional[str] = CLI_DEFAULT_TIMEOUT,
|
||||
) -> None:
|
||||
"""
|
||||
A wrapper for `frostfs-cli container delete` call.
|
||||
Args:
|
||||
await_mode: Block execution until container is removed.
|
||||
wallet (str): path to a wallet on whose behalf we delete the container
|
||||
cid (str): ID of the container to delete
|
||||
shell: executor for cli command
|
||||
endpoint: FrostFS endpoint to send request to, appends to `--rpc-endpoint` key
|
||||
force (bool): do not check whether container contains locks and remove immediately
|
||||
session_token: a path to session token file
|
||||
timeout: Timeout for the operation.
|
||||
This function doesn't return anything.
|
||||
"""
|
||||
|
||||
|
@ -313,7 +314,6 @@ def delete_container(
|
|||
force=force,
|
||||
session=session_token,
|
||||
await_mode=await_mode,
|
||||
timeout=timeout,
|
||||
)
|
||||
|
||||
|
||||
|
@ -344,12 +344,13 @@ def _parse_cid(output: str) -> str:
|
|||
|
||||
|
||||
@reporter.step("Search container by name")
|
||||
def search_container_by_name(wallet: str, name: str, shell: Shell, endpoint: str):
|
||||
list_cids = list_containers(wallet, shell, endpoint)
|
||||
for cid in list_cids:
|
||||
cont_info = get_container(wallet, cid, shell, endpoint, True)
|
||||
if cont_info.get("attributes", {}).get("Name", None) == name:
|
||||
return cid
|
||||
def search_container_by_name(name: str, node: ClusterNode):
|
||||
node_shell = node.host.get_shell()
|
||||
output = node_shell.exec(f"curl -I HEAD http://127.0.0.1:8084/{name}")
|
||||
pattern = r"X-Container-Id: (\S+)"
|
||||
cid = re.findall(pattern, output.stdout)
|
||||
if cid:
|
||||
return cid[0]
|
||||
return None
|
||||
|
||||
|
||||
|
|
|
@ -732,23 +732,24 @@ def neo_go_query_height(shell: Shell, endpoint: str) -> dict:
|
|||
@reporter.step("Search object nodes")
|
||||
def get_object_nodes(
|
||||
cluster: Cluster,
|
||||
wallet: str,
|
||||
cid: str,
|
||||
oid: str,
|
||||
shell: Shell,
|
||||
endpoint: str,
|
||||
alive_node: ClusterNode,
|
||||
bearer: str = "",
|
||||
xhdr: Optional[dict] = None,
|
||||
is_direct: bool = False,
|
||||
verify_presence_all: bool = False,
|
||||
wallet_config: Optional[str] = None,
|
||||
timeout: Optional[str] = CLI_DEFAULT_TIMEOUT,
|
||||
) -> list[ClusterNode]:
|
||||
cli = FrostfsCli(shell, FROSTFS_CLI_EXEC, wallet_config or DEFAULT_WALLET_CONFIG)
|
||||
shell = alive_node.host.get_shell()
|
||||
endpoint = alive_node.storage_node.get_rpc_endpoint()
|
||||
wallet = alive_node.storage_node.get_remote_wallet_path()
|
||||
wallet_config = alive_node.storage_node.get_remote_wallet_config_path()
|
||||
|
||||
cli = FrostfsCli(shell, FROSTFS_CLI_EXEC, wallet_config)
|
||||
|
||||
result_object_nodes = cli.object.nodes(
|
||||
rpc_endpoint=endpoint,
|
||||
wallet=wallet,
|
||||
cid=cid,
|
||||
oid=oid,
|
||||
bearer=bearer,
|
||||
|
|
|
@ -87,7 +87,7 @@ def tick_epoch(shell: Shell, cluster: Cluster, alive_node: Optional[StorageNode]
|
|||
alive_node = alive_node if alive_node else cluster.services(StorageNode)[0]
|
||||
remote_shell = alive_node.host.get_shell()
|
||||
|
||||
if FROSTFS_ADM_EXEC and FROSTFS_ADM_CONFIG_PATH:
|
||||
if "force_transactions" not in alive_node.host.config.attributes:
|
||||
# If frostfs-adm is available, then we tick epoch with it (to be consistent with UAT tests)
|
||||
frostfs_adm = FrostfsAdm(
|
||||
shell=remote_shell,
|
||||
|
|
|
@ -231,6 +231,10 @@ def search_nodes_with_bucket(
|
|||
shell: Shell,
|
||||
endpoint: str,
|
||||
) -> list[ClusterNode]:
|
||||
cid = search_container_by_name(wallet=wallet, name=bucket_name, shell=shell, endpoint=endpoint)
|
||||
cid = None
|
||||
for cluster_node in cluster.cluster_nodes:
|
||||
cid = search_container_by_name(name=bucket_name, node=cluster_node)
|
||||
if cid:
|
||||
break
|
||||
nodes_list = search_nodes_with_container(wallet=wallet, cid=cid, shell=shell, endpoint=endpoint, cluster=cluster)
|
||||
return nodes_list
|
||||
|
|
|
@ -105,7 +105,7 @@ class ClusterNode:
|
|||
service_entry = self.class_registry.get_entry(service_type)
|
||||
service_name = service_entry["hosting_service_name"]
|
||||
|
||||
pattern = f"{service_name}{self.id:02}"
|
||||
pattern = f"{service_name}_{self.id:02}"
|
||||
config = self.host.get_service_config(pattern)
|
||||
|
||||
return service_type(
|
||||
|
@ -120,7 +120,7 @@ class ClusterNode:
|
|||
svcs_names_on_node = [svc.name for svc in self.host.config.services]
|
||||
for entry in self.class_registry._class_mapping.values():
|
||||
hosting_svc_name = entry["hosting_service_name"]
|
||||
pattern = f"{hosting_svc_name}{self.id:02}"
|
||||
pattern = f"{hosting_svc_name}_{self.id:02}"
|
||||
if pattern in svcs_names_on_node:
|
||||
config = self.host.get_service_config(pattern)
|
||||
svcs.append(
|
||||
|
@ -267,13 +267,13 @@ class Cluster:
|
|||
service_name = service["hosting_service_name"]
|
||||
cls: type[NodeBase] = service["cls"]
|
||||
|
||||
pattern = f"{service_name}\d*$"
|
||||
pattern = f"{service_name}_\d*$"
|
||||
configs = self.hosting.find_service_configs(pattern)
|
||||
|
||||
found_nodes = []
|
||||
for config in configs:
|
||||
# config.name is something like s3-gate01. Cut last digits to know service type
|
||||
service_type = re.findall(".*\D", config.name)[0]
|
||||
service_type = re.findall("(.*)_\d+", config.name)[0]
|
||||
# exclude unsupported services
|
||||
if service_type != service_name:
|
||||
continue
|
||||
|
|
|
@ -8,7 +8,8 @@ class ConfigAttributes:
|
|||
SHARD_CONFIG_PATH = "shard_config_path"
|
||||
LOGGER_CONFIG_PATH = "logger_config_path"
|
||||
LOCAL_WALLET_PATH = "local_wallet_path"
|
||||
LOCAL_WALLET_CONFIG = "local_config_path"
|
||||
LOCAL_WALLET_CONFIG = "local_wallet_config_path"
|
||||
REMOTE_WALLET_CONFIG = "remote_wallet_config_path"
|
||||
ENDPOINT_DATA_0 = "endpoint_data0"
|
||||
ENDPOINT_DATA_1 = "endpoint_data1"
|
||||
ENDPOINT_INTERNAL = "endpoint_internal0"
|
||||
|
@ -17,11 +18,3 @@ class ConfigAttributes:
|
|||
UN_LOCODE = "un_locode"
|
||||
HTTP_HOSTNAME = "http_hostname"
|
||||
S3_HOSTNAME = "s3_hostname"
|
||||
|
||||
|
||||
class _FrostfsServicesNames:
|
||||
STORAGE = "s"
|
||||
S3_GATE = "s3-gate"
|
||||
HTTP_GATE = "http-gate"
|
||||
MORPH_CHAIN = "morph-chain"
|
||||
INNER_RING = "ir"
|
||||
|
|
|
@ -476,12 +476,12 @@ class ClusterStateController:
|
|||
def _enable_date_synchronizer(self, cluster_node: ClusterNode):
|
||||
shell = cluster_node.host.get_shell()
|
||||
shell.exec("timedatectl set-ntp true")
|
||||
cluster_node.host.wait_for_service_to_be_in_state("systemd-timesyncd", "active", 5)
|
||||
cluster_node.host.wait_for_service_to_be_in_state("systemd-timesyncd", "active", 15)
|
||||
|
||||
def _disable_date_synchronizer(self, cluster_node: ClusterNode):
|
||||
shell = cluster_node.host.get_shell()
|
||||
shell.exec("timedatectl set-ntp false")
|
||||
cluster_node.host.wait_for_service_to_be_in_state("systemd-timesyncd", "inactive", 5)
|
||||
cluster_node.host.wait_for_service_to_be_in_state("systemd-timesyncd", "inactive", 15)
|
||||
|
||||
def _get_disk_controller(self, node: StorageNode, device: str, mountpoint: str) -> DiskController:
|
||||
disk_controller_id = DiskController.get_id(node, device)
|
||||
|
|
|
@ -3,7 +3,7 @@ import yaml
|
|||
from frostfs_testlib.blockchain import RPCClient
|
||||
from frostfs_testlib.storage.constants import ConfigAttributes
|
||||
from frostfs_testlib.storage.dataclasses.node_base import NodeBase
|
||||
|
||||
from frostfs_testlib.storage.dataclasses.shard import Shard
|
||||
|
||||
class InnerRing(NodeBase):
|
||||
"""
|
||||
|
@ -148,6 +148,20 @@ class StorageNode(NodeBase):
|
|||
def get_shards_config(self) -> tuple[str, dict]:
|
||||
return self.get_config(self.get_shard_config_path())
|
||||
|
||||
def get_shards(self) -> list[Shard]:
|
||||
config = self.get_shards_config()[1]
|
||||
config["storage"]["shard"].pop("default")
|
||||
return [Shard.from_object(shard) for shard in config["storage"]["shard"].values()]
|
||||
|
||||
def get_shards_from_env(self) -> list[Shard]:
|
||||
config = self.get_shards_config()[1]
|
||||
configObj = ConfigObj(StringIO(config))
|
||||
|
||||
pattern = f"{SHARD_PREFIX}\d*"
|
||||
num_shards = len(set(re.findall(pattern, self.get_shards_config())))
|
||||
|
||||
return [Shard.from_config_object(configObj, shard_id) for shard_id in range(num_shards)]
|
||||
|
||||
def get_control_endpoint(self) -> str:
|
||||
return self._get_attribute(ConfigAttributes.CONTROL_ENDPOINT)
|
||||
|
||||
|
@ -157,6 +171,9 @@ class StorageNode(NodeBase):
|
|||
def get_data_directory(self) -> str:
|
||||
return self.host.get_data_directory(self.name)
|
||||
|
||||
def get_storage_config(self) -> str:
|
||||
return self.host.get_storage_config(self.name)
|
||||
|
||||
def get_http_hostname(self) -> str:
|
||||
return self._get_attribute(ConfigAttributes.HTTP_HOSTNAME)
|
||||
|
||||
|
@ -169,8 +186,11 @@ class StorageNode(NodeBase):
|
|||
def delete_fstree(self):
|
||||
self.host.delete_fstree(self.name)
|
||||
|
||||
def delete_pilorama(self):
|
||||
self.host.delete_pilorama(self.name)
|
||||
def delete_file(self, file_path: str) -> None:
|
||||
self.host.delete_file(file_path)
|
||||
|
||||
def is_file_exist(self, file_path) -> bool:
|
||||
return self.host.is_file_exist(file_path)
|
||||
|
||||
def delete_metabase(self):
|
||||
self.host.delete_metabase(self.name)
|
||||
|
|
|
@ -114,6 +114,14 @@ class NodeBase(HumanReadableABC):
|
|||
ConfigAttributes.CONFIG_PATH,
|
||||
)
|
||||
|
||||
def get_remote_wallet_config_path(self) -> str:
|
||||
"""
|
||||
Returns node config file path located on remote host
|
||||
"""
|
||||
return self._get_attribute(
|
||||
ConfigAttributes.REMOTE_WALLET_CONFIG,
|
||||
)
|
||||
|
||||
def get_wallet_config_path(self) -> str:
|
||||
return self._get_attribute(
|
||||
ConfigAttributes.LOCAL_WALLET_CONFIG,
|
||||
|
@ -125,8 +133,11 @@ class NodeBase(HumanReadableABC):
|
|||
Returns config path for logger located on remote host
|
||||
"""
|
||||
config_attributes = self.host.get_service_config(self.name)
|
||||
return self._get_attribute(
|
||||
ConfigAttributes.LOGGER_CONFIG_PATH) if ConfigAttributes.LOGGER_CONFIG_PATH in config_attributes.attributes else None
|
||||
return (
|
||||
self._get_attribute(ConfigAttributes.LOGGER_CONFIG_PATH)
|
||||
if ConfigAttributes.LOGGER_CONFIG_PATH in config_attributes.attributes
|
||||
else None
|
||||
)
|
||||
|
||||
@property
|
||||
def config_dir(self) -> str:
|
||||
|
|
99
src/frostfs_testlib/storage/dataclasses/shard.py
Normal file
99
src/frostfs_testlib/storage/dataclasses/shard.py
Normal file
|
@ -0,0 +1,99 @@
|
|||
import json
|
||||
import pathlib
|
||||
import re
|
||||
from dataclasses import dataclass
|
||||
from io import StringIO
|
||||
|
||||
import allure
|
||||
import pytest
|
||||
import yaml
|
||||
from configobj import ConfigObj
|
||||
from frostfs_testlib.cli import FrostfsCli
|
||||
from frostfs_testlib.resources.cli import CLI_DEFAULT_TIMEOUT
|
||||
from frostfs_testlib.resources.common import DEFAULT_WALLET_CONFIG
|
||||
|
||||
SHARD_PREFIX = "FROSTFS_STORAGE_SHARD_"
|
||||
BLOBSTOR_PREFIX = "_BLOBSTOR_"
|
||||
|
||||
|
||||
@dataclass
|
||||
class Blobstor:
|
||||
path: str
|
||||
path_type: str
|
||||
|
||||
def __eq__(self, other) -> bool:
|
||||
if not isinstance(other, self.__class__):
|
||||
raise RuntimeError(f"Only two {self.__class__.__name__} instances can be compared")
|
||||
return self.path == other.path and self.path_type == other.path_type
|
||||
|
||||
def __hash__(self):
|
||||
return hash((self.path, self.path_type))
|
||||
|
||||
@staticmethod
|
||||
def from_config_object(section: ConfigObj, shard_id: str, blobstor_id: str):
|
||||
var_prefix = f"{SHARD_PREFIX}{shard_id}{BLOBSTOR_PREFIX}{blobstor_id}"
|
||||
return Blobstor(section.get(f"{var_prefix}_PATH"), section.get(f"{var_prefix}_TYPE"))
|
||||
|
||||
|
||||
@dataclass
|
||||
class Shard:
|
||||
blobstor: list[Blobstor]
|
||||
metabase: str
|
||||
writecache: str
|
||||
pilorama: str
|
||||
|
||||
def __eq__(self, other) -> bool:
|
||||
if not isinstance(other, self.__class__):
|
||||
raise RuntimeError(f"Only two {self.__class__.__name__} instances can be compared")
|
||||
return (
|
||||
set(self.blobstor) == set(other.blobstor)
|
||||
and self.metabase == other.metabase
|
||||
and self.writecache == other.writecache
|
||||
and self.pilorama == other.pilorama
|
||||
)
|
||||
|
||||
def __hash__(self):
|
||||
return hash((self.metabase, self.writecache))
|
||||
|
||||
@staticmethod
|
||||
def _get_blobstor_count_from_section(config_object: ConfigObj, shard_id: int):
|
||||
pattern = f"{SHARD_PREFIX}{shard_id}{BLOBSTOR_PREFIX}"
|
||||
blobstors = {key[: len(pattern) + 2] for key in config_object.keys() if pattern in key}
|
||||
return len(blobstors)
|
||||
|
||||
@staticmethod
|
||||
def from_config_object(config_object: ConfigObj, shard_id: int):
|
||||
var_prefix = f"{SHARD_PREFIX}{shard_id}"
|
||||
|
||||
blobstor_count = Shard._get_blobstor_count_from_section(config_object, shard_id)
|
||||
blobstors = [
|
||||
Blobstor.from_config_object(config_object, shard_id, blobstor_id) for blobstor_id in range(blobstor_count)
|
||||
]
|
||||
|
||||
write_cache_enabled = config_object.as_bool(f"{var_prefix}_WRITECACHE_ENABLED")
|
||||
|
||||
return Shard(
|
||||
blobstors,
|
||||
config_object.get(f"{var_prefix}_METABASE_PATH"),
|
||||
config_object.get(f"{var_prefix}_WRITECACHE_PATH") if write_cache_enabled else "",
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def from_object(shard):
|
||||
metabase = shard["metabase"]["path"] if "path" in shard["metabase"] else shard["metabase"]
|
||||
writecache = shard["writecache"]["path"] if "path" in shard["writecache"] else shard["writecache"]
|
||||
|
||||
# Currently due to issue we need to check if pilorama exists in keys
|
||||
# TODO: make pilorama mandatory after fix
|
||||
if shard.get("pilorama"):
|
||||
pilorama = shard["pilorama"]["path"] if "path" in shard["pilorama"] else shard["pilorama"]
|
||||
else:
|
||||
pilorama = None
|
||||
|
||||
return Shard(
|
||||
blobstor=[Blobstor(path=blobstor["path"], path_type=blobstor["type"]) for blobstor in shard["blobstor"]],
|
||||
metabase=metabase,
|
||||
writecache=writecache,
|
||||
pilorama=pilorama
|
||||
)
|
||||
|
|
@ -2,10 +2,11 @@ import logging
|
|||
import re
|
||||
|
||||
from frostfs_testlib.cli import FrostfsAdm, FrostfsCli
|
||||
from frostfs_testlib.hosting import Hosting
|
||||
from frostfs_testlib.hosting import Host, Hosting
|
||||
from frostfs_testlib.resources.cli import FROSTFS_ADM_EXEC, FROSTFS_AUTHMATE_EXEC, FROSTFS_CLI_EXEC, NEOGO_EXECUTABLE
|
||||
from frostfs_testlib.resources.common import DEFAULT_WALLET_CONFIG
|
||||
from frostfs_testlib.shell import Shell
|
||||
from frostfs_testlib.testing.parallel import parallel
|
||||
|
||||
logger = logging.getLogger("NeoLogger")
|
||||
|
||||
|
@ -33,49 +34,74 @@ def get_local_binaries_versions(shell: Shell) -> dict[str, str]:
|
|||
return versions
|
||||
|
||||
|
||||
def get_remote_binaries_versions(hosting: Hosting) -> dict[str, str]:
|
||||
def parallel_binary_verions(host: Host) -> dict[str, str]:
|
||||
versions_by_host = {}
|
||||
for host in hosting.hosts:
|
||||
binary_path_by_name = {} # Maps binary name to executable path
|
||||
for service_config in host.config.services:
|
||||
exec_path = service_config.attributes.get("exec_path")
|
||||
requires_check = service_config.attributes.get("requires_version_check", "true")
|
||||
if exec_path:
|
||||
binary_path_by_name[service_config.name] = {
|
||||
"exec_path": exec_path,
|
||||
"check": requires_check.lower() == "true",
|
||||
}
|
||||
for cli_config in host.config.clis:
|
||||
requires_check = cli_config.attributes.get("requires_version_check", "true")
|
||||
binary_path_by_name[cli_config.name] = {
|
||||
"exec_path": cli_config.exec_path,
|
||||
|
||||
binary_path_by_name = {} # Maps binary name to executable path
|
||||
for service_config in host.config.services:
|
||||
exec_path = service_config.attributes.get("exec_path")
|
||||
requires_check = service_config.attributes.get("requires_version_check", "true")
|
||||
if exec_path:
|
||||
binary_path_by_name[service_config.name] = {
|
||||
"exec_path": exec_path,
|
||||
"check": requires_check.lower() == "true",
|
||||
}
|
||||
for cli_config in host.config.clis:
|
||||
requires_check = cli_config.attributes.get("requires_version_check", "true")
|
||||
binary_path_by_name[cli_config.name] = {
|
||||
"exec_path": cli_config.exec_path,
|
||||
"check": requires_check.lower() == "true",
|
||||
}
|
||||
|
||||
shell = host.get_shell()
|
||||
versions_at_host = {}
|
||||
for binary_name, binary in binary_path_by_name.items():
|
||||
try:
|
||||
binary_path = binary["exec_path"]
|
||||
result = shell.exec(f"{binary_path} --version")
|
||||
versions_at_host[binary_name] = {"version": _parse_version(result.stdout), "check": binary["check"]}
|
||||
except Exception as exc:
|
||||
logger.error(f"Cannot get version for {binary_path} because of\n{exc}")
|
||||
versions_at_host[binary_name] = {"version": "Unknown", "check": binary["check"]}
|
||||
versions_by_host[host.config.address] = versions_at_host
|
||||
shell = host.get_shell()
|
||||
versions_at_host = {}
|
||||
for binary_name, binary in binary_path_by_name.items():
|
||||
try:
|
||||
binary_path = binary["exec_path"]
|
||||
result = shell.exec(f"{binary_path} --version")
|
||||
versions_at_host[binary_name] = {"version": _parse_version(result.stdout), "check": binary["check"]}
|
||||
except Exception as exc:
|
||||
logger.error(f"Cannot get version for {binary_path} because of\n{exc}")
|
||||
versions_at_host[binary_name] = {"version": "Unknown", "check": binary["check"]}
|
||||
versions_by_host[host.config.address] = versions_at_host
|
||||
return versions_by_host
|
||||
|
||||
|
||||
def get_remote_binaries_versions(hosting: Hosting) -> dict[str, str]:
|
||||
versions_by_host = {}
|
||||
future_binary_verions = parallel(parallel_binary_verions, parallel_items=hosting.hosts)
|
||||
for future in future_binary_verions:
|
||||
versions_by_host.update(future.result())
|
||||
|
||||
# Consolidate versions across all hosts
|
||||
cheak_versions = {}
|
||||
exсeptions = []
|
||||
exception = set()
|
||||
previous_host = None
|
||||
versions = {}
|
||||
captured_version = None
|
||||
for host, binary_versions in versions_by_host.items():
|
||||
for name, binary in binary_versions.items():
|
||||
captured_version = versions.get(name, {}).get("version")
|
||||
version = binary["version"]
|
||||
if captured_version:
|
||||
assert captured_version == version, f"Binary {name} has inconsistent version on host {host}"
|
||||
if not cheak_versions.get(f'{name[:-2]}', None):
|
||||
captured_version = cheak_versions.get(f'{name[:-2]}',{}).get(host, {}).get(captured_version)
|
||||
cheak_versions[f'{name[:-2]}'] = {host: {version: name}}
|
||||
else:
|
||||
versions[name] = {"version": version, "check": binary["check"]}
|
||||
return versions
|
||||
|
||||
captured_version = list(cheak_versions.get(f'{name[:-2]}',{}).get(previous_host).keys())[0]
|
||||
cheak_versions[f'{name[:-2]}'].update({host:{version:name}})
|
||||
|
||||
if captured_version and captured_version != version:
|
||||
exception.add(name[:-2])
|
||||
|
||||
versions[name] = {"version": version, "check": binary["check"]}
|
||||
previous_host = host
|
||||
if exception:
|
||||
for i in exception:
|
||||
for host in versions_by_host.keys():
|
||||
for version, name in cheak_versions.get(i).get(host).items():
|
||||
exсeptions.append(f'Binary {name} has inconsistent version {version} on host {host}')
|
||||
exсeptions.append('\n')
|
||||
return versions, exсeptions
|
||||
|
||||
def _parse_version(version_output: str) -> str:
|
||||
version = re.search(r"version[:\s]*v?(.+)", version_output, re.IGNORECASE)
|
||||
|
|
|
@ -4,13 +4,7 @@ import pytest
|
|||
|
||||
from frostfs_testlib.s3 import AwsCliClient, Boto3ClientWrapper
|
||||
from frostfs_testlib.storage.dataclasses.acl import EACLRole
|
||||
from frostfs_testlib.storage.dataclasses.frostfs_services import (
|
||||
HTTPGate,
|
||||
InnerRing,
|
||||
MorphChain,
|
||||
S3Gate,
|
||||
StorageNode,
|
||||
)
|
||||
from frostfs_testlib.storage.dataclasses.frostfs_services import HTTPGate, InnerRing, MorphChain, S3Gate, StorageNode
|
||||
from frostfs_testlib.storage.dataclasses.object_size import ObjectSize
|
||||
|
||||
|
||||
|
@ -22,10 +16,10 @@ class TestDataclassesStr:
|
|||
[
|
||||
(Boto3ClientWrapper, "Boto3 client"),
|
||||
(AwsCliClient, "AWS CLI"),
|
||||
(ObjectSize("simple", 1), "simple object size"),
|
||||
(ObjectSize("simple", 10), "simple object size"),
|
||||
(ObjectSize("complex", 5000), "complex object size"),
|
||||
(ObjectSize("complex", 5555), "complex object size"),
|
||||
(ObjectSize("simple", 1), "simple"),
|
||||
(ObjectSize("simple", 10), "simple"),
|
||||
(ObjectSize("complex", 5000), "complex"),
|
||||
(ObjectSize("complex", 5555), "complex"),
|
||||
(StorageNode, "StorageNode"),
|
||||
(MorphChain, "MorphChain"),
|
||||
(S3Gate, "S3Gate"),
|
||||
|
|
|
@ -15,6 +15,7 @@ class TestHosting(TestCase):
|
|||
HOST1 = {
|
||||
"address": HOST1_ADDRESS,
|
||||
"plugin_name": HOST1_PLUGIN,
|
||||
"healthcheck_plugin_name": "basic",
|
||||
"attributes": HOST1_ATTRIBUTES,
|
||||
"clis": HOST1_CLIS,
|
||||
"services": HOST1_SERVICES,
|
||||
|
@ -32,6 +33,7 @@ class TestHosting(TestCase):
|
|||
HOST2 = {
|
||||
"address": HOST2_ADDRESS,
|
||||
"plugin_name": HOST2_PLUGIN,
|
||||
"healthcheck_plugin_name": "basic",
|
||||
"attributes": HOST2_ATTRIBUTES,
|
||||
"clis": HOST2_CLIS,
|
||||
"services": HOST2_SERVICES,
|
||||
|
@ -52,18 +54,14 @@ class TestHosting(TestCase):
|
|||
self.assertEqual(host1.config.plugin_name, self.HOST1_PLUGIN)
|
||||
self.assertDictEqual(host1.config.attributes, self.HOST1_ATTRIBUTES)
|
||||
self.assertListEqual(host1.config.clis, [CLIConfig(**cli) for cli in self.HOST1_CLIS])
|
||||
self.assertListEqual(
|
||||
host1.config.services, [ServiceConfig(**service) for service in self.HOST1_SERVICES]
|
||||
)
|
||||
self.assertListEqual(host1.config.services, [ServiceConfig(**service) for service in self.HOST1_SERVICES])
|
||||
|
||||
host2 = hosting.get_host_by_address(self.HOST2_ADDRESS)
|
||||
self.assertEqual(host2.config.address, self.HOST2_ADDRESS)
|
||||
self.assertEqual(host2.config.plugin_name, self.HOST2_PLUGIN)
|
||||
self.assertDictEqual(host2.config.attributes, self.HOST2_ATTRIBUTES)
|
||||
self.assertListEqual(host2.config.clis, [CLIConfig(**cli) for cli in self.HOST2_CLIS])
|
||||
self.assertListEqual(
|
||||
host2.config.services, [ServiceConfig(**service) for service in self.HOST2_SERVICES]
|
||||
)
|
||||
self.assertListEqual(host2.config.services, [ServiceConfig(**service) for service in self.HOST2_SERVICES])
|
||||
|
||||
def test_get_host_by_service(self):
|
||||
hosting = Hosting()
|
||||
|
@ -104,9 +102,7 @@ class TestHosting(TestCase):
|
|||
services = hosting.find_service_configs(rf"^{self.SERVICE_NAME_PREFIX}")
|
||||
self.assertEqual(len(services), 2)
|
||||
for service in services:
|
||||
self.assertEqual(
|
||||
service.name[: len(self.SERVICE_NAME_PREFIX)], self.SERVICE_NAME_PREFIX
|
||||
)
|
||||
self.assertEqual(service.name[: len(self.SERVICE_NAME_PREFIX)], self.SERVICE_NAME_PREFIX)
|
||||
|
||||
service1 = hosting.find_service_configs(self.SERVICE1["name"])
|
||||
self.assertEqual(len(service1), 1)
|
||||
|
|
|
@ -136,11 +136,15 @@ class TestLoadConfig:
|
|||
def test_argument_parsing_for_grpc_scenario(self, load_params: LoadParams):
|
||||
expected_preset_args = [
|
||||
"--size '11'",
|
||||
"--acl 'acl'",
|
||||
"--preload_obj '13'",
|
||||
"--out 'pregen_json'",
|
||||
"--workers '7'",
|
||||
"--containers '16'",
|
||||
"--policy 'container_placement_policy'",
|
||||
"--ignore-errors",
|
||||
"--sleep '19'",
|
||||
"--local",
|
||||
]
|
||||
expected_env_vars = {
|
||||
"DURATION": 9,
|
||||
|
@ -151,6 +155,7 @@ class TestLoadConfig:
|
|||
"WRITERS": 7,
|
||||
"READERS": 7,
|
||||
"DELETERS": 8,
|
||||
"READ_AGE": 8,
|
||||
"PREGEN_JSON": "pregen_json",
|
||||
"PREPARE_LOCALLY": True,
|
||||
}
|
||||
|
@ -167,6 +172,10 @@ class TestLoadConfig:
|
|||
"--workers '7'",
|
||||
"--containers '16'",
|
||||
"--policy 'container_placement_policy'",
|
||||
"--ignore-errors",
|
||||
"--sleep '19'",
|
||||
"--local",
|
||||
"--acl 'acl'",
|
||||
]
|
||||
expected_env_vars = {
|
||||
"DURATION": 9,
|
||||
|
@ -184,6 +193,7 @@ class TestLoadConfig:
|
|||
"TIME_UNIT": "time_unit",
|
||||
"WRITE_RATE": 10,
|
||||
"READ_RATE": 9,
|
||||
"READ_AGE": 8,
|
||||
"DELETE_RATE": 11,
|
||||
"PREPARE_LOCALLY": True,
|
||||
}
|
||||
|
@ -201,6 +211,9 @@ class TestLoadConfig:
|
|||
"--workers '7'",
|
||||
"--buckets '13'",
|
||||
"--location 's3_location'",
|
||||
"--ignore-errors",
|
||||
"--sleep '19'",
|
||||
"--acl 'acl'",
|
||||
]
|
||||
expected_env_vars = {
|
||||
"DURATION": 9,
|
||||
|
@ -211,6 +224,7 @@ class TestLoadConfig:
|
|||
"WRITERS": 7,
|
||||
"READERS": 7,
|
||||
"DELETERS": 8,
|
||||
"READ_AGE": 8,
|
||||
"NO_VERIFY_SSL": True,
|
||||
"PREGEN_JSON": "pregen_json",
|
||||
}
|
||||
|
@ -218,6 +232,45 @@ class TestLoadConfig:
|
|||
self._check_preset_params(load_params, expected_preset_args)
|
||||
self._check_env_vars(load_params, expected_env_vars)
|
||||
|
||||
@pytest.mark.parametrize("load_params", [LoadScenario.S3_CAR], indirect=True)
|
||||
def test_argument_parsing_for_s3_car_scenario_with_stringed_time(self, load_params: LoadParams):
|
||||
load_params.load_time = "2d3h5min"
|
||||
expected_preset_args = [
|
||||
"--size '11'",
|
||||
"--preload_obj '13'",
|
||||
"--no-verify-ssl",
|
||||
"--out 'pregen_json'",
|
||||
"--workers '7'",
|
||||
"--buckets '13'",
|
||||
"--location 's3_location'",
|
||||
"--ignore-errors",
|
||||
"--sleep '19'",
|
||||
"--acl 'acl'",
|
||||
]
|
||||
expected_env_vars = {
|
||||
"DURATION": 183900,
|
||||
"WRITE_OBJ_SIZE": 11,
|
||||
"REGISTRY_FILE": "registry_file",
|
||||
"K6_MIN_ITERATION_DURATION": "min_iteration_duration",
|
||||
"K6_SETUP_TIMEOUT": "setup_timeout",
|
||||
"NO_VERIFY_SSL": True,
|
||||
"MAX_WRITERS": 11,
|
||||
"MAX_READERS": 11,
|
||||
"MAX_DELETERS": 12,
|
||||
"PRE_ALLOC_DELETERS": 21,
|
||||
"PRE_ALLOC_READERS": 20,
|
||||
"PRE_ALLOC_WRITERS": 20,
|
||||
"PREGEN_JSON": "pregen_json",
|
||||
"TIME_UNIT": "time_unit",
|
||||
"WRITE_RATE": 10,
|
||||
"READ_RATE": 9,
|
||||
"READ_AGE": 8,
|
||||
"DELETE_RATE": 11,
|
||||
}
|
||||
|
||||
self._check_preset_params(load_params, expected_preset_args)
|
||||
self._check_env_vars(load_params, expected_env_vars)
|
||||
|
||||
@pytest.mark.parametrize("load_params", [LoadScenario.S3_CAR], indirect=True)
|
||||
def test_argument_parsing_for_s3_car_scenario(self, load_params: LoadParams):
|
||||
expected_preset_args = [
|
||||
|
@ -228,6 +281,9 @@ class TestLoadConfig:
|
|||
"--workers '7'",
|
||||
"--buckets '13'",
|
||||
"--location 's3_location'",
|
||||
"--ignore-errors",
|
||||
"--sleep '19'",
|
||||
"--acl 'acl'",
|
||||
]
|
||||
expected_env_vars = {
|
||||
"DURATION": 9,
|
||||
|
@ -246,6 +302,7 @@ class TestLoadConfig:
|
|||
"TIME_UNIT": "time_unit",
|
||||
"WRITE_RATE": 10,
|
||||
"READ_RATE": 9,
|
||||
"READ_AGE": 8,
|
||||
"DELETE_RATE": 11,
|
||||
}
|
||||
|
||||
|
@ -254,6 +311,7 @@ class TestLoadConfig:
|
|||
|
||||
@pytest.mark.parametrize("load_params", [LoadScenario.HTTP], indirect=True)
|
||||
def test_argument_parsing_for_http_scenario(self, load_params: LoadParams):
|
||||
load_params.preset.local = False
|
||||
expected_preset_args = [
|
||||
"--no-verify-ssl",
|
||||
"--size '11'",
|
||||
|
@ -262,6 +320,9 @@ class TestLoadConfig:
|
|||
"--workers '7'",
|
||||
"--containers '16'",
|
||||
"--policy 'container_placement_policy'",
|
||||
"--ignore-errors",
|
||||
"--sleep '19'",
|
||||
"--acl 'acl'",
|
||||
]
|
||||
expected_env_vars = {
|
||||
"DURATION": 9,
|
||||
|
@ -273,6 +334,7 @@ class TestLoadConfig:
|
|||
"WRITERS": 7,
|
||||
"READERS": 7,
|
||||
"DELETERS": 8,
|
||||
"READ_AGE": 8,
|
||||
"PREGEN_JSON": "pregen_json",
|
||||
}
|
||||
|
||||
|
@ -281,6 +343,7 @@ class TestLoadConfig:
|
|||
|
||||
@pytest.mark.parametrize("load_params", [LoadScenario.LOCAL], indirect=True)
|
||||
def test_argument_parsing_for_local_scenario(self, load_params: LoadParams):
|
||||
load_params.preset.local = False
|
||||
expected_preset_args = [
|
||||
"--size '11'",
|
||||
"--preload_obj '13'",
|
||||
|
@ -288,6 +351,9 @@ class TestLoadConfig:
|
|||
"--workers '7'",
|
||||
"--containers '16'",
|
||||
"--policy 'container_placement_policy'",
|
||||
"--ignore-errors",
|
||||
"--sleep '19'",
|
||||
"--acl 'acl'",
|
||||
]
|
||||
expected_env_vars = {
|
||||
"CONFIG_FILE": "config_file",
|
||||
|
@ -299,6 +365,7 @@ class TestLoadConfig:
|
|||
"WRITERS": 7,
|
||||
"READERS": 7,
|
||||
"DELETERS": 8,
|
||||
"READ_AGE": 8,
|
||||
"PREGEN_JSON": "pregen_json",
|
||||
}
|
||||
|
||||
|
@ -338,6 +405,8 @@ class TestLoadConfig:
|
|||
"--workers '0'",
|
||||
"--containers '0'",
|
||||
"--policy ''",
|
||||
"--sleep '0'",
|
||||
"--acl ''",
|
||||
]
|
||||
expected_env_vars = {
|
||||
"DURATION": 0,
|
||||
|
@ -348,6 +417,7 @@ class TestLoadConfig:
|
|||
"WRITERS": 0,
|
||||
"READERS": 0,
|
||||
"DELETERS": 0,
|
||||
"READ_AGE": 0,
|
||||
"PREGEN_JSON": "",
|
||||
"PREPARE_LOCALLY": False,
|
||||
}
|
||||
|
@ -364,6 +434,8 @@ class TestLoadConfig:
|
|||
"--workers '0'",
|
||||
"--containers '0'",
|
||||
"--policy ''",
|
||||
"--sleep '0'",
|
||||
"--acl ''",
|
||||
]
|
||||
expected_env_vars = {
|
||||
"DURATION": 0,
|
||||
|
@ -382,6 +454,7 @@ class TestLoadConfig:
|
|||
"WRITE_RATE": 0,
|
||||
"READ_RATE": 0,
|
||||
"DELETE_RATE": 0,
|
||||
"READ_AGE": 0,
|
||||
"PREPARE_LOCALLY": False,
|
||||
}
|
||||
|
||||
|
@ -397,6 +470,8 @@ class TestLoadConfig:
|
|||
"--workers '0'",
|
||||
"--buckets '0'",
|
||||
"--location ''",
|
||||
"--sleep '0'",
|
||||
"--acl ''",
|
||||
]
|
||||
expected_env_vars = {
|
||||
"DURATION": 0,
|
||||
|
@ -407,6 +482,7 @@ class TestLoadConfig:
|
|||
"WRITERS": 0,
|
||||
"READERS": 0,
|
||||
"DELETERS": 0,
|
||||
"READ_AGE": 0,
|
||||
"NO_VERIFY_SSL": False,
|
||||
"PREGEN_JSON": "",
|
||||
}
|
||||
|
@ -423,6 +499,8 @@ class TestLoadConfig:
|
|||
"--workers '0'",
|
||||
"--buckets '0'",
|
||||
"--location ''",
|
||||
"--sleep '0'",
|
||||
"--acl ''",
|
||||
]
|
||||
expected_env_vars = {
|
||||
"DURATION": 0,
|
||||
|
@ -442,6 +520,7 @@ class TestLoadConfig:
|
|||
"WRITE_RATE": 0,
|
||||
"READ_RATE": 0,
|
||||
"DELETE_RATE": 0,
|
||||
"READ_AGE": 0,
|
||||
}
|
||||
|
||||
self._check_preset_params(load_params, expected_preset_args)
|
||||
|
@ -456,6 +535,8 @@ class TestLoadConfig:
|
|||
"--workers '0'",
|
||||
"--containers '0'",
|
||||
"--policy ''",
|
||||
"--sleep '0'",
|
||||
"--acl ''",
|
||||
]
|
||||
expected_env_vars = {
|
||||
"DURATION": 0,
|
||||
|
@ -467,6 +548,7 @@ class TestLoadConfig:
|
|||
"WRITERS": 0,
|
||||
"READERS": 0,
|
||||
"DELETERS": 0,
|
||||
"READ_AGE": 0,
|
||||
"PREGEN_JSON": "",
|
||||
}
|
||||
|
||||
|
@ -482,6 +564,8 @@ class TestLoadConfig:
|
|||
"--workers '0'",
|
||||
"--containers '0'",
|
||||
"--policy ''",
|
||||
"--sleep '0'",
|
||||
"--acl ''",
|
||||
]
|
||||
expected_env_vars = {
|
||||
"CONFIG_FILE": "",
|
||||
|
@ -493,6 +577,7 @@ class TestLoadConfig:
|
|||
"WRITERS": 0,
|
||||
"READERS": 0,
|
||||
"DELETERS": 0,
|
||||
"READ_AGE": 0,
|
||||
"PREGEN_JSON": "",
|
||||
}
|
||||
|
||||
|
@ -531,6 +616,27 @@ class TestLoadConfig:
|
|||
|
||||
self._check_env_vars(load_params, expected_env_vars)
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"load_params, load_type",
|
||||
[(LoadScenario.gRPC, LoadType.gRPC)],
|
||||
indirect=True,
|
||||
)
|
||||
@pytest.mark.parametrize(
|
||||
"load_time, expected_seconds",
|
||||
[
|
||||
(300, 300),
|
||||
("2d3h45min", 186300),
|
||||
("1d6h", 108000),
|
||||
("1d", 86400),
|
||||
("1d1min", 86460),
|
||||
("2h", 7200),
|
||||
("2h2min", 7320),
|
||||
],
|
||||
)
|
||||
def test_convert_time_to_seconds(self, load_params: LoadParams, load_time: str | int, expected_seconds: int):
|
||||
load_params.load_time = load_time
|
||||
assert load_params.load_time == expected_seconds
|
||||
|
||||
def _check_preset_params(self, load_params: LoadParams, expected_preset_args: list[str]):
|
||||
preset_parameters = load_params.get_preset_arguments()
|
||||
assert sorted(preset_parameters) == sorted(expected_preset_args)
|
||||
|
|
Loading…
Reference in a new issue