forked from TrueCloudLab/frostfs-testlib
Compare commits
10 commits
02c079eda3
...
f7ef8cb881
Author | SHA1 | Date | |
---|---|---|---|
f7ef8cb881 | |||
ecf8f0841a | |||
19b8b96898 | |||
f2d34dbf2e | |||
e14896400f | |||
449c18bb1a | |||
aa277fdd6a | |||
7059596506 | |||
7112bf9c88 | |||
b1c21e0e5b |
13 changed files with 407 additions and 12 deletions
|
@ -351,3 +351,45 @@ class FrostfsCliObject(CliCommand):
|
|||
"object search",
|
||||
**{param: value for param, value in locals().items() if param not in ["self"]},
|
||||
)
|
||||
|
||||
def nodes(
|
||||
self,
|
||||
rpc_endpoint: str,
|
||||
wallet: str,
|
||||
cid: str,
|
||||
address: Optional[str] = None,
|
||||
bearer: Optional[str] = None,
|
||||
generate_key: Optional = None,
|
||||
oid: Optional[str] = None,
|
||||
trace: bool = False,
|
||||
root: bool = False,
|
||||
verify_presence_all: bool = False,
|
||||
ttl: Optional[int] = None,
|
||||
xhdr: Optional[dict] = None,
|
||||
timeout: Optional[str] = None,
|
||||
) -> CommandResult:
|
||||
"""
|
||||
Search object nodes.
|
||||
|
||||
Args:
|
||||
address: Address of wallet account.
|
||||
bearer: File with signed JSON or binary encoded bearer token.
|
||||
cid: Container ID.
|
||||
generate_key: Generate new private key.
|
||||
oid: Object ID.
|
||||
trace: Generate trace ID and print it.
|
||||
root: Search for user objects.
|
||||
rpc_endpoint: Remote node address (as 'multiaddr' or '<host>:<port>').
|
||||
verify_presence_all: Verify the actual presence of the object on all netmap nodes.
|
||||
ttl: TTL value in request meta header (default 2).
|
||||
wallet: WIF (NEP-2) string or path to the wallet or binary key.
|
||||
xhdr: Dict with request X-Headers.
|
||||
timeout: Timeout for the operation (default 15s).
|
||||
|
||||
Returns:
|
||||
Command's result.
|
||||
"""
|
||||
return self._execute(
|
||||
"object nodes",
|
||||
**{param: value for param, value in locals().items() if param not in ["self"]},
|
||||
)
|
||||
|
|
|
@ -64,6 +64,7 @@ class HostConfig:
|
|||
services: list[ServiceConfig] = field(default_factory=list)
|
||||
clis: list[CLIConfig] = field(default_factory=list)
|
||||
attributes: dict[str, str] = field(default_factory=dict)
|
||||
interfaces: dict[str, str] = field(default_factory=dict)
|
||||
|
||||
def __post_init__(self) -> None:
|
||||
self.services = [ServiceConfig(**service) for service in self.services or []]
|
||||
|
|
|
@ -178,6 +178,10 @@ class LoadParams:
|
|||
min_iteration_duration: Optional[str] = metadata_field(
|
||||
all_load_scenarios, None, "K6_MIN_ITERATION_DURATION", False
|
||||
)
|
||||
# Prepare/cut objects locally on client before sending
|
||||
prepare_locally: Optional[bool] = metadata_field(
|
||||
[LoadScenario.gRPC, LoadScenario.gRPC_CAR], None, "PREPARE_LOCALLY", False
|
||||
)
|
||||
# Specifies K6 setupTimeout time. Currently hardcoded in xk6 as 5 seconds for all scenarios
|
||||
# https://k6.io/docs/using-k6/k6-options/reference/#setup-timeout
|
||||
setup_timeout: Optional[str] = metadata_field(all_scenarios, None, "K6_SETUP_TIMEOUT", False)
|
||||
|
@ -267,6 +271,16 @@ class LoadParams:
|
|||
|
||||
return env_vars
|
||||
|
||||
def __post_init__(self):
|
||||
default_scenario_map = {
|
||||
LoadType.gRPC: LoadScenario.gRPC,
|
||||
LoadType.HTTP: LoadScenario.HTTP,
|
||||
LoadType.S3: LoadScenario.S3,
|
||||
}
|
||||
|
||||
if self.scenario is None:
|
||||
self.scenario = default_scenario_map[self.load_type]
|
||||
|
||||
def get_preset_arguments(self):
|
||||
command_args = [
|
||||
self._get_preset_argument(meta_field)
|
||||
|
@ -320,7 +334,7 @@ class LoadParams:
|
|||
# TODO: migrate load_params defaults to testlib
|
||||
if self.object_size is not None:
|
||||
size, unit = calc_unit(self.object_size, 1)
|
||||
static_params = [f"{load_type_str} ({size:.4g} {unit})"]
|
||||
static_params = [f"{load_type_str} {size:.4g} {unit}"]
|
||||
else:
|
||||
static_params = [f"{load_type_str}"]
|
||||
|
||||
|
@ -331,7 +345,7 @@ class LoadParams:
|
|||
]
|
||||
params = ", ".join(static_params + dynamic_params)
|
||||
|
||||
return f"load: {params}"
|
||||
return params
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return self.__str__()
|
||||
|
|
|
@ -8,12 +8,15 @@ class MetricsBase(ABC):
|
|||
_WRITE_SUCCESS = ""
|
||||
_WRITE_ERRORS = ""
|
||||
_WRITE_THROUGHPUT = "data_sent"
|
||||
_WRITE_LATENCY = ""
|
||||
|
||||
_READ_SUCCESS = ""
|
||||
_READ_ERRORS = ""
|
||||
_READ_LATENCY = ""
|
||||
_READ_THROUGHPUT = "data_received"
|
||||
|
||||
_DELETE_SUCCESS = ""
|
||||
_DELETE_LATENCY = ""
|
||||
_DELETE_ERRORS = ""
|
||||
|
||||
def __init__(self, summary) -> None:
|
||||
|
@ -27,6 +30,10 @@ class MetricsBase(ABC):
|
|||
@property
|
||||
def write_success_iterations(self) -> int:
|
||||
return self._get_metric(self._WRITE_SUCCESS)
|
||||
|
||||
@property
|
||||
def write_latency(self) -> dict:
|
||||
return self._get_metric(self._WRITE_LATENCY)
|
||||
|
||||
@property
|
||||
def write_rate(self) -> float:
|
||||
|
@ -47,6 +54,10 @@ class MetricsBase(ABC):
|
|||
@property
|
||||
def read_success_iterations(self) -> int:
|
||||
return self._get_metric(self._READ_SUCCESS)
|
||||
|
||||
@property
|
||||
def read_latency(self) -> dict:
|
||||
return self._get_metric(self._READ_LATENCY)
|
||||
|
||||
@property
|
||||
def read_rate(self) -> int:
|
||||
|
@ -67,6 +78,10 @@ class MetricsBase(ABC):
|
|||
@property
|
||||
def delete_success_iterations(self) -> int:
|
||||
return self._get_metric(self._DELETE_SUCCESS)
|
||||
|
||||
@property
|
||||
def delete_latency(self) -> dict:
|
||||
return self._get_metric(self._DELETE_LATENCY)
|
||||
|
||||
@property
|
||||
def delete_failed_iterations(self) -> int:
|
||||
|
@ -77,7 +92,7 @@ class MetricsBase(ABC):
|
|||
return self._get_metric_rate(self._DELETE_SUCCESS)
|
||||
|
||||
def _get_metric(self, metric: str) -> int:
|
||||
metrics_method_map = {"counter": self._get_counter_metric, "gauge": self._get_gauge_metric}
|
||||
metrics_method_map = {"counter": self._get_counter_metric, "gauge": self._get_gauge_metric, "trend" : self._get_trend_metrics}
|
||||
|
||||
if metric not in self.metrics:
|
||||
return 0
|
||||
|
@ -114,28 +129,37 @@ class MetricsBase(ABC):
|
|||
|
||||
def _get_gauge_metric(self, metric: str) -> int:
|
||||
return metric["values"]["value"]
|
||||
|
||||
def _get_trend_metrics(self, metric: str) -> int:
|
||||
return metric["values"]
|
||||
|
||||
|
||||
class GrpcMetrics(MetricsBase):
|
||||
_WRITE_SUCCESS = "frostfs_obj_put_total"
|
||||
_WRITE_ERRORS = "frostfs_obj_put_fails"
|
||||
_WRITE_LATENCY = "frostfs_obj_put_duration"
|
||||
|
||||
_READ_SUCCESS = "frostfs_obj_get_total"
|
||||
_READ_ERRORS = "frostfs_obj_get_fails"
|
||||
_READ_LATENCY = "frostfs_obj_get_duration"
|
||||
|
||||
_DELETE_SUCCESS = "frostfs_obj_delete_total"
|
||||
_DELETE_ERRORS = "frostfs_obj_delete_fails"
|
||||
_DELETE_LATENCY = "frostfs_obj_delete_duration"
|
||||
|
||||
|
||||
class S3Metrics(MetricsBase):
|
||||
_WRITE_SUCCESS = "aws_obj_put_total"
|
||||
_WRITE_ERRORS = "aws_obj_put_fails"
|
||||
_WRITE_LATENCY = "aws_obj_put_duration"
|
||||
|
||||
_READ_SUCCESS = "aws_obj_get_total"
|
||||
_READ_ERRORS = "aws_obj_get_fails"
|
||||
_READ_LATENCY = "aws_obj_get_duration"
|
||||
|
||||
_DELETE_SUCCESS = "aws_obj_delete_total"
|
||||
_DELETE_ERRORS = "aws_obj_delete_fails"
|
||||
_DELETE_LATENCY = "aws_obj_delete_duration"
|
||||
|
||||
|
||||
class LocalMetrics(MetricsBase):
|
||||
|
|
|
@ -2,6 +2,7 @@ from datetime import datetime
|
|||
from typing import Optional
|
||||
|
||||
import yaml
|
||||
import os
|
||||
|
||||
from frostfs_testlib.load.load_config import K6ProcessAllocationStrategy, LoadParams, LoadScenario
|
||||
from frostfs_testlib.load.load_metrics import get_metrics_object
|
||||
|
@ -109,6 +110,7 @@ class LoadReport:
|
|||
total_rate: float,
|
||||
throughput: float,
|
||||
errors: dict[str, int],
|
||||
latency: dict[str, dict],
|
||||
):
|
||||
throughput_html = ""
|
||||
if throughput > 0:
|
||||
|
@ -127,6 +129,15 @@ class LoadReport:
|
|||
):
|
||||
per_node_errors_html += self._row(f"At {node_key}", errors)
|
||||
|
||||
latency_html = ""
|
||||
if latency:
|
||||
for node_key, param_dict in latency.items():
|
||||
latency_values = ""
|
||||
for param_name, param_val in param_dict.items():
|
||||
latency_values += f"{param_name}={param_val:.2f}ms "
|
||||
|
||||
latency_html += self._row(f"Put latency {node_key.split(':')[0]}", latency_values)
|
||||
|
||||
object_size, object_size_unit = calc_unit(self.load_params.object_size, 1)
|
||||
duration = self._seconds_to_formatted_duration(self.load_params.load_time)
|
||||
model = self._get_model_string()
|
||||
|
@ -135,6 +146,7 @@ class LoadReport:
|
|||
errors_percent = 0
|
||||
if total_operations:
|
||||
errors_percent = total_errors/total_operations*100.0
|
||||
|
||||
html = f"""
|
||||
<table border="1" cellpadding="5px"><tbody>
|
||||
<tr><th colspan="2" bgcolor="gainsboro">{short_summary}</th></tr>
|
||||
|
@ -142,7 +154,7 @@ class LoadReport:
|
|||
{self._row("Total operations", total_operations)}
|
||||
{self._row("OP/sec", f"{total_rate:.2f}")}
|
||||
{throughput_html}
|
||||
|
||||
{latency_html}
|
||||
<tr><th colspan="2" bgcolor="gainsboro">Errors</th></tr>
|
||||
{per_node_errors_html}
|
||||
{self._row("Total", f"{total_errors} ({errors_percent:.2f}%)")}
|
||||
|
@ -160,6 +172,7 @@ class LoadReport:
|
|||
write_operations = 0
|
||||
write_op_sec = 0
|
||||
write_throughput = 0
|
||||
write_latency = {}
|
||||
write_errors = {}
|
||||
requested_write_rate = self.load_params.write_rate
|
||||
requested_write_rate_str = (
|
||||
|
@ -169,12 +182,14 @@ class LoadReport:
|
|||
read_operations = 0
|
||||
read_op_sec = 0
|
||||
read_throughput = 0
|
||||
read_latency = {}
|
||||
read_errors = {}
|
||||
requested_read_rate = self.load_params.read_rate
|
||||
requested_read_rate_str = f"{requested_read_rate}op/sec" if requested_read_rate else ""
|
||||
|
||||
delete_operations = 0
|
||||
delete_op_sec = 0
|
||||
delete_latency = {}
|
||||
delete_errors = {}
|
||||
requested_delete_rate = self.load_params.delete_rate
|
||||
requested_delete_rate_str = (
|
||||
|
@ -210,6 +225,7 @@ class LoadReport:
|
|||
if write_operations:
|
||||
write_section_required = True
|
||||
write_op_sec += metrics.write_rate
|
||||
write_latency[node_key] = metrics.write_latency
|
||||
write_throughput += metrics.write_throughput
|
||||
if metrics.write_failed_iterations:
|
||||
write_errors[node_key] = metrics.write_failed_iterations
|
||||
|
@ -219,6 +235,7 @@ class LoadReport:
|
|||
read_section_required = True
|
||||
read_op_sec += metrics.read_rate
|
||||
read_throughput += metrics.read_throughput
|
||||
read_latency[node_key] = metrics.read_latency
|
||||
if metrics.read_failed_iterations:
|
||||
read_errors[node_key] = metrics.read_failed_iterations
|
||||
|
||||
|
@ -226,6 +243,7 @@ class LoadReport:
|
|||
if delete_operations:
|
||||
delete_section_required = True
|
||||
delete_op_sec += metrics.delete_rate
|
||||
delete_latency[node_key] = metrics.delete_latency
|
||||
if metrics.delete_failed_iterations:
|
||||
delete_errors[node_key] = metrics.delete_failed_iterations
|
||||
|
||||
|
@ -238,6 +256,7 @@ class LoadReport:
|
|||
write_op_sec,
|
||||
write_throughput,
|
||||
write_errors,
|
||||
write_latency,
|
||||
)
|
||||
|
||||
if read_section_required:
|
||||
|
@ -249,6 +268,7 @@ class LoadReport:
|
|||
read_op_sec,
|
||||
read_throughput,
|
||||
read_errors,
|
||||
read_latency,
|
||||
)
|
||||
|
||||
if delete_section_required:
|
||||
|
@ -260,6 +280,7 @@ class LoadReport:
|
|||
delete_op_sec,
|
||||
0,
|
||||
delete_errors,
|
||||
delete_latency,
|
||||
)
|
||||
|
||||
return html
|
||||
|
|
|
@ -11,7 +11,7 @@ BACKGROUND_WRITERS_COUNT = os.getenv("BACKGROUND_WRITERS_COUNT", 0)
|
|||
BACKGROUND_READERS_COUNT = os.getenv("BACKGROUND_READERS_COUNT", 0)
|
||||
BACKGROUND_DELETERS_COUNT = os.getenv("BACKGROUND_DELETERS_COUNT", 0)
|
||||
BACKGROUND_VERIFIERS_COUNT = os.getenv("BACKGROUND_VERIFIERS_COUNT", 0)
|
||||
BACKGROUND_LOAD_DEFAULT_TIME = os.getenv("BACKGROUND_LOAD_DEFAULT_TIME", 600)
|
||||
BACKGROUND_LOAD_DEFAULT_TIME = os.getenv("BACKGROUND_LOAD_DEFAULT_TIME", 1800)
|
||||
BACKGROUND_LOAD_DEFAULT_OBJECT_SIZE = os.getenv("BACKGROUND_LOAD_DEFAULT_OBJECT_SIZE", 32)
|
||||
BACKGROUND_LOAD_SETUP_TIMEOUT = os.getenv("BACKGROUND_LOAD_SETUP_TIMEOUT", "5s")
|
||||
|
||||
|
|
|
@ -11,8 +11,9 @@ from frostfs_testlib.reporter import get_reporter
|
|||
from frostfs_testlib.resources.cli import CLI_DEFAULT_TIMEOUT, FROSTFS_CLI_EXEC, NEOGO_EXECUTABLE
|
||||
from frostfs_testlib.resources.common import ASSETS_DIR, DEFAULT_WALLET_CONFIG
|
||||
from frostfs_testlib.shell import Shell
|
||||
from frostfs_testlib.storage.cluster import Cluster
|
||||
from frostfs_testlib.storage.cluster import Cluster, ClusterNode
|
||||
from frostfs_testlib.utils import json_utils
|
||||
from frostfs_testlib.utils.cli_utils import parse_cmd_table, parse_netmap_output
|
||||
|
||||
logger = logging.getLogger("NeoLogger")
|
||||
reporter = get_reporter()
|
||||
|
@ -731,3 +732,62 @@ def neo_go_query_height(shell: Shell, endpoint: str) -> dict:
|
|||
latest_block[0].replace(":", ""): int(latest_block[1]),
|
||||
validated_state[0].replace(":", ""): int(validated_state[1]),
|
||||
}
|
||||
|
||||
|
||||
@reporter.step_deco("Search object nodes")
|
||||
def get_object_nodes(
|
||||
cluster: Cluster,
|
||||
wallet: str,
|
||||
cid: str,
|
||||
oid: str,
|
||||
shell: Shell,
|
||||
endpoint: str,
|
||||
bearer: str = "",
|
||||
xhdr: Optional[dict] = None,
|
||||
is_direct: bool = False,
|
||||
verify_presence_all: bool = False,
|
||||
wallet_config: Optional[str] = None,
|
||||
timeout: Optional[str] = CLI_DEFAULT_TIMEOUT,
|
||||
) -> list[ClusterNode]:
|
||||
cli = FrostfsCli(shell, FROSTFS_CLI_EXEC, wallet_config or DEFAULT_WALLET_CONFIG)
|
||||
|
||||
result_object_nodes = cli.object.nodes(
|
||||
rpc_endpoint=endpoint,
|
||||
wallet=wallet,
|
||||
cid=cid,
|
||||
oid=oid,
|
||||
bearer=bearer,
|
||||
ttl=1 if is_direct else None,
|
||||
xhdr=xhdr,
|
||||
timeout=timeout,
|
||||
verify_presence_all=verify_presence_all,
|
||||
)
|
||||
|
||||
parsing_output = parse_cmd_table(result_object_nodes.stdout, "|")
|
||||
list_object_nodes = [
|
||||
node
|
||||
for node in parsing_output
|
||||
if node["should_contain_object"] == "true" and node["actually_contains_object"] == "true"
|
||||
]
|
||||
|
||||
netmap_nodes_list = parse_netmap_output(
|
||||
cli.netmap.snapshot(
|
||||
rpc_endpoint=endpoint,
|
||||
wallet=wallet,
|
||||
).stdout
|
||||
)
|
||||
netmap_nodes = [
|
||||
netmap_node
|
||||
for object_node in list_object_nodes
|
||||
for netmap_node in netmap_nodes_list
|
||||
if object_node["node_id"] == netmap_node.node_id
|
||||
]
|
||||
|
||||
result = [
|
||||
cluster_node
|
||||
for netmap_node in netmap_nodes
|
||||
for cluster_node in cluster.cluster_nodes
|
||||
if netmap_node.node == cluster_node.host_ip
|
||||
]
|
||||
|
||||
return result
|
||||
|
|
42
src/frostfs_testlib/steps/iptables.py
Normal file
42
src/frostfs_testlib/steps/iptables.py
Normal file
|
@ -0,0 +1,42 @@
|
|||
from frostfs_testlib.shell import Shell
|
||||
from frostfs_testlib.storage.cluster import ClusterNode
|
||||
|
||||
|
||||
class IpTablesHelper:
|
||||
@staticmethod
|
||||
def drop_input_traffic_to_port(node: ClusterNode, ports: list[str]) -> None:
|
||||
shell = node.host.get_shell()
|
||||
for port in ports:
|
||||
shell.exec(f"iptables -A INPUT -p tcp --dport {port} -j DROP")
|
||||
|
||||
@staticmethod
|
||||
def drop_input_traffic_to_node(node: ClusterNode, block_ip: list[str]) -> None:
|
||||
shell = node.host.get_shell()
|
||||
for ip in block_ip:
|
||||
shell.exec(f"iptables -A INPUT -s {ip} -j DROP")
|
||||
|
||||
@staticmethod
|
||||
def restore_input_traffic_to_port(node: ClusterNode) -> None:
|
||||
shell = node.host.get_shell()
|
||||
ports = (
|
||||
shell.exec("iptables -L --numeric | grep DROP | awk '{print $7}'")
|
||||
.stdout.strip()
|
||||
.split("\n")
|
||||
)
|
||||
if ports[0] == "":
|
||||
return
|
||||
for port in ports:
|
||||
shell.exec(f"iptables -D INPUT -p tcp --dport {port.split(':')[-1]} -j DROP")
|
||||
|
||||
@staticmethod
|
||||
def restore_input_traffic_to_node(node: ClusterNode) -> None:
|
||||
shell = node.host.get_shell()
|
||||
unlock_ip = (
|
||||
shell.exec("iptables -L --numeric | grep DROP | awk '{print $4}'")
|
||||
.stdout.strip()
|
||||
.split("\n")
|
||||
)
|
||||
if unlock_ip[0] == "":
|
||||
return
|
||||
for ip in unlock_ip:
|
||||
shell.exec(f"iptables -D INPUT -s {ip} -j DROP")
|
|
@ -4,8 +4,10 @@ import time
|
|||
import frostfs_testlib.resources.optionals as optionals
|
||||
from frostfs_testlib.reporter import get_reporter
|
||||
from frostfs_testlib.shell import CommandOptions, Shell
|
||||
from frostfs_testlib.steps.iptables import IpTablesHelper
|
||||
from frostfs_testlib.storage.cluster import Cluster, ClusterNode, StorageNode
|
||||
from frostfs_testlib.storage.controllers.disk_controller import DiskController
|
||||
from frostfs_testlib.storage.dataclasses.node_base import NodeBase, ServiceClass
|
||||
from frostfs_testlib.testing import parallel
|
||||
from frostfs_testlib.testing.test_control import run_optionally
|
||||
from frostfs_testlib.utils.failover_utils import (
|
||||
|
@ -24,6 +26,8 @@ class ClusterStateController:
|
|||
self.detached_disks: dict[str, DiskController] = {}
|
||||
self.stopped_storage_nodes: list[ClusterNode] = []
|
||||
self.stopped_s3_gates: list[ClusterNode] = []
|
||||
self.dropped_traffic: list[ClusterNode] = []
|
||||
self.stopped_services: set[NodeBase] = set()
|
||||
self.cluster = cluster
|
||||
self.shell = shell
|
||||
self.suspended_services: dict[str, list[ClusterNode]] = {}
|
||||
|
@ -124,6 +128,51 @@ class ClusterStateController:
|
|||
node.storage_node.stop_service()
|
||||
self.stopped_storage_nodes.append(node)
|
||||
|
||||
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
|
||||
@reporter.step_deco("Stop all {service_type} services")
|
||||
def stop_services_of_type(self, service_type: type[ServiceClass]):
|
||||
services = self.cluster.services(service_type)
|
||||
self.stopped_services.update(services)
|
||||
parallel([service.stop_service for service in services])
|
||||
|
||||
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
|
||||
@reporter.step_deco("Start all {service_type} services")
|
||||
def start_services_of_type(self, service_type: type[ServiceClass]):
|
||||
services = self.cluster.services(service_type)
|
||||
parallel([service.start_service for service in services])
|
||||
|
||||
if service_type == StorageNode:
|
||||
wait_all_storage_nodes_returned(self.shell, self.cluster)
|
||||
|
||||
self.stopped_services = self.stopped_services - set(services)
|
||||
|
||||
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
|
||||
@reporter.step_deco("Start all stopped services")
|
||||
def start_all_stopped_services(self):
|
||||
parallel([service.start_service for service in self.stopped_services])
|
||||
|
||||
for service in self.stopped_services:
|
||||
if isinstance(service, StorageNode):
|
||||
wait_all_storage_nodes_returned(self.shell, self.cluster)
|
||||
break
|
||||
|
||||
self.stopped_services.clear()
|
||||
|
||||
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
|
||||
@reporter.step_deco("Stop {service_type} service on {node}")
|
||||
def stop_service_of_type(self, node: ClusterNode, service_type: type[ServiceClass]):
|
||||
service = node.service(service_type)
|
||||
service.stop_service()
|
||||
self.stopped_services.add(service)
|
||||
|
||||
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
|
||||
@reporter.step_deco("Start {service_type} service on {node}")
|
||||
def start_service_of_type(self, node: ClusterNode, service_type: type[ServiceClass]):
|
||||
service = node.service(service_type)
|
||||
service.start_service()
|
||||
if service in self.stopped_services:
|
||||
self.stopped_services.remove(service)
|
||||
|
||||
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
|
||||
@reporter.step_deco("Start storage service on {node}")
|
||||
def start_storage_service(self, node: ClusterNode):
|
||||
|
@ -191,6 +240,62 @@ class ClusterStateController:
|
|||
[node.host.wait_success_resume_process(process_name) for node in list_nodes]
|
||||
self.suspended_services = {}
|
||||
|
||||
@reporter.step_deco("Drop traffic to {node}, with ports - {ports}, nodes - {block_nodes}")
|
||||
def drop_traffic(
|
||||
self,
|
||||
mode: str,
|
||||
node: ClusterNode,
|
||||
wakeup_timeout: int,
|
||||
ports: list[str] = None,
|
||||
block_nodes: list[ClusterNode] = None,
|
||||
) -> None:
|
||||
allowed_modes = ["ports", "nodes"]
|
||||
assert mode in allowed_modes
|
||||
|
||||
match mode:
|
||||
case "ports":
|
||||
IpTablesHelper.drop_input_traffic_to_port(node, ports)
|
||||
case "nodes":
|
||||
list_ip = self._parse_intefaces(block_nodes)
|
||||
IpTablesHelper.drop_input_traffic_to_node(node, list_ip)
|
||||
time.sleep(wakeup_timeout)
|
||||
self.dropped_traffic.append(node)
|
||||
|
||||
@reporter.step_deco("Ping traffic")
|
||||
def ping_traffic(
|
||||
self,
|
||||
node: ClusterNode,
|
||||
nodes_list: list[ClusterNode],
|
||||
expect_result: int,
|
||||
) -> bool:
|
||||
shell = node.host.get_shell()
|
||||
options = CommandOptions(check=False)
|
||||
ips = self._parse_intefaces(nodes_list)
|
||||
for ip in ips:
|
||||
code = shell.exec(f"ping {ip} -c 1", options).return_code
|
||||
if code != expect_result:
|
||||
return False
|
||||
return True
|
||||
|
||||
@reporter.step_deco("Start traffic to {node}")
|
||||
def restore_traffic(
|
||||
self,
|
||||
mode: str,
|
||||
node: ClusterNode,
|
||||
) -> None:
|
||||
allowed_modes = ["ports", "nodes"]
|
||||
assert mode in allowed_modes
|
||||
|
||||
match mode:
|
||||
case "ports":
|
||||
IpTablesHelper.restore_input_traffic_to_port(node=node)
|
||||
case "nodes":
|
||||
IpTablesHelper.restore_input_traffic_to_node(node=node)
|
||||
|
||||
@reporter.step_deco("Restore blocked nodes")
|
||||
def restore_all_traffic(self):
|
||||
parallel(self._restore_traffic_to_node, self.dropped_traffic)
|
||||
|
||||
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
|
||||
@reporter.step_deco("Hard reboot host {node} via magic SysRq option")
|
||||
def panic_reboot_host(self, node: ClusterNode, wait_for_return: bool = True):
|
||||
|
@ -217,3 +322,16 @@ class ClusterStateController:
|
|||
disk_controller = DiskController(node, device, mountpoint)
|
||||
|
||||
return disk_controller
|
||||
|
||||
def _restore_traffic_to_node(self, node):
|
||||
IpTablesHelper.restore_input_traffic_to_port(node)
|
||||
IpTablesHelper.restore_input_traffic_to_node(node)
|
||||
|
||||
def _parse_intefaces(self, nodes: list[ClusterNode]):
|
||||
interfaces = []
|
||||
for node in nodes:
|
||||
dict_interfaces = node.host.config.interfaces
|
||||
for type, ip in dict_interfaces.items():
|
||||
if "mgmt" not in type:
|
||||
interfaces.append(ip)
|
||||
return interfaces
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
from abc import ABC, abstractmethod
|
||||
from abc import abstractmethod
|
||||
from dataclasses import dataclass
|
||||
from typing import Optional, Tuple, TypedDict, TypeVar
|
||||
|
||||
|
@ -6,10 +6,13 @@ import yaml
|
|||
|
||||
from frostfs_testlib.hosting.config import ServiceConfig
|
||||
from frostfs_testlib.hosting.interfaces import Host
|
||||
from frostfs_testlib.reporter import get_reporter
|
||||
from frostfs_testlib.storage.constants import ConfigAttributes
|
||||
from frostfs_testlib.testing.readable import HumanReadableABC
|
||||
from frostfs_testlib.utils import wallet_utils
|
||||
|
||||
reporter = get_reporter()
|
||||
|
||||
|
||||
@dataclass
|
||||
class NodeBase(HumanReadableABC):
|
||||
|
@ -54,17 +57,20 @@ class NodeBase(HumanReadableABC):
|
|||
return self._process_name
|
||||
|
||||
def start_service(self):
|
||||
self.host.start_service(self.name)
|
||||
with reporter.step(f"Start {self.name} service on {self.host.config.address}"):
|
||||
self.host.start_service(self.name)
|
||||
|
||||
@abstractmethod
|
||||
def service_healthcheck(self) -> bool:
|
||||
"""Service healthcheck."""
|
||||
|
||||
def stop_service(self):
|
||||
self.host.stop_service(self.name)
|
||||
with reporter.step(f"Stop {self.name} service on {self.host.config.address}"):
|
||||
self.host.stop_service(self.name)
|
||||
|
||||
def restart_service(self):
|
||||
self.host.restart_service(self.name)
|
||||
with reporter.step(f"Restart {self.name} service on {self.host.config.address}"):
|
||||
self.host.restart_service(self.name)
|
||||
|
||||
def get_wallet_password(self) -> str:
|
||||
return self._get_attribute(ConfigAttributes.WALLET_PASSWORD)
|
||||
|
|
|
@ -7,7 +7,7 @@ class ObjectSize:
|
|||
value: int
|
||||
|
||||
def __str__(self) -> str:
|
||||
return f"{self.name} object size"
|
||||
return self.name
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return self.__str__()
|
||||
|
|
|
@ -23,3 +23,21 @@ class StorageObjectInfo(ObjectRef):
|
|||
attributes: Optional[list[dict[str, str]]] = None
|
||||
tombstone: Optional[str] = None
|
||||
locks: Optional[list[LockObjectInfo]] = None
|
||||
|
||||
|
||||
@dataclass
|
||||
class NodeNetmapInfo:
|
||||
node_id: str = None
|
||||
node_status: str = None
|
||||
node_data_ip: str = None
|
||||
cluster_name: str = None
|
||||
continent: str = None
|
||||
country: str = None
|
||||
country_code: str = None
|
||||
external_address: str = None
|
||||
location: str = None
|
||||
node: str = None
|
||||
sub_div: str = None
|
||||
sub_div_code: int = None
|
||||
un_locode: str = None
|
||||
role: str = None
|
||||
|
|
|
@ -5,18 +5,21 @@
|
|||
"""
|
||||
Helper functions to use with `frostfs-cli`, `neo-go` and other CLIs.
|
||||
"""
|
||||
import csv
|
||||
import json
|
||||
import logging
|
||||
import subprocess
|
||||
import sys
|
||||
from contextlib import suppress
|
||||
from datetime import datetime
|
||||
from io import StringIO
|
||||
from textwrap import shorten
|
||||
from typing import TypedDict, Union
|
||||
from typing import Dict, List, TypedDict, Union
|
||||
|
||||
import pexpect
|
||||
|
||||
from frostfs_testlib.reporter import get_reporter
|
||||
from frostfs_testlib.storage.dataclasses.storage_object_info import NodeNetmapInfo
|
||||
|
||||
reporter = get_reporter()
|
||||
logger = logging.getLogger("NeoLogger")
|
||||
|
@ -131,3 +134,49 @@ def log_command_execution(cmd: str, output: Union[str, TypedDict]) -> None:
|
|||
command_attachment = f"COMMAND: '{cmd}'\n" f"OUTPUT:\n {output}\n"
|
||||
with reporter.step(f'COMMAND: {shorten(cmd, width=60, placeholder="...")}'):
|
||||
reporter.attach(command_attachment, "Command execution")
|
||||
|
||||
|
||||
def parse_netmap_output(output: str) -> list[NodeNetmapInfo]:
|
||||
"""
|
||||
The cli command will return something like.
|
||||
|
||||
Epoch: 240
|
||||
Node 1: 01234 ONLINE /ip4/10.10.10.10/tcp/8080
|
||||
Continent: Europe
|
||||
Country: Russia
|
||||
CountryCode: RU
|
||||
ExternalAddr: /ip4/10.10.11.18/tcp/8080
|
||||
Location: Moskva
|
||||
Node: 10.10.10.12
|
||||
Price: 5
|
||||
SubDiv: Moskva
|
||||
SubDivCode: MOW
|
||||
UN-LOCODE: RU MOW
|
||||
role: alphabet
|
||||
|
||||
The code will parse each line and return each node as dataclass.
|
||||
"""
|
||||
netmap_list = output.split("Node ")[1:]
|
||||
dataclass_list = []
|
||||
for node in netmap_list:
|
||||
node = node.replace("\t", "").split("\n")
|
||||
node = *node[0].split(" ")[1:-1], *[row.split(": ")[-1] for row in node[1:-1]]
|
||||
dataclass_list.append(NodeNetmapInfo(*node))
|
||||
|
||||
return dataclass_list
|
||||
|
||||
|
||||
def parse_cmd_table(output: str, delimiter="|") -> list[dict[str, str]]:
|
||||
parsing_output = []
|
||||
reader = csv.reader(StringIO(output.strip()), delimiter=delimiter)
|
||||
iter_reader = iter(reader)
|
||||
header_row = next(iter_reader)
|
||||
for row in iter_reader:
|
||||
table = {}
|
||||
for i in range(len(row)):
|
||||
header = header_row[i].strip().lower().replace(" ", "_")
|
||||
value = row[i].strip().lower()
|
||||
if header:
|
||||
table[header] = value
|
||||
parsing_output.append(table)
|
||||
return parsing_output
|
||||
|
|
Loading…
Add table
Reference in a new issue