Compare commits

...

10 commits

Author SHA1 Message Date
f7ef8cb881 Another increase default load time
Signed-off-by: anikeev-yadro <a.anikeev@yadro.com>
2023-09-15 12:30:58 +03:00
ecf8f0841a Change NodeNetmapInfo class
Signed-off-by: Dmitriy Zayakin <d.zayakin@yadro.com>
2023-09-11 10:36:54 +03:00
19b8b96898 Use only name in ObjectSize repr and str
Signed-off-by: Andrey Berezin <a.berezin@yadro.com>
2023-09-08 10:50:28 +00:00
f2d34dbf2e add latency report 2023-09-08 09:33:29 +00:00
e14896400f Add post-init for load params
Signed-off-by: Andrey Berezin <a.berezin@yadro.com>
2023-09-06 16:51:18 +03:00
449c18bb1a Adding options to work with any service type
Signed-off-by: Andrey Berezin <a.berezin@yadro.com>
2023-08-30 15:28:12 +03:00
aa277fdd6a Increase default load time
Signed-off-by: anikeev-yadro <a.anikeev@yadro.com>
2023-08-29 16:55:25 +03:00
7059596506 Support prepare locally flag
Signed-off-by: m.malygina <m.malygina@yadro.com>
2023-08-21 11:59:05 +00:00
7112bf9c88 Change NodeNetmapInfo class
Signed-off-by: Dmitriy Zayakin <d.zayakin@yadro.com>
2023-08-17 12:54:05 +03:00
b1c21e0e5b Add Iptables helper
Signed-off-by: Dmitriy Zayakin <d.zayakin@yadro.com>
2023-08-16 12:22:14 +00:00
13 changed files with 407 additions and 12 deletions

View file

@ -351,3 +351,45 @@ class FrostfsCliObject(CliCommand):
"object search",
**{param: value for param, value in locals().items() if param not in ["self"]},
)
def nodes(
self,
rpc_endpoint: str,
wallet: str,
cid: str,
address: Optional[str] = None,
bearer: Optional[str] = None,
generate_key: Optional = None,
oid: Optional[str] = None,
trace: bool = False,
root: bool = False,
verify_presence_all: bool = False,
ttl: Optional[int] = None,
xhdr: Optional[dict] = None,
timeout: Optional[str] = None,
) -> CommandResult:
"""
Search object nodes.
Args:
address: Address of wallet account.
bearer: File with signed JSON or binary encoded bearer token.
cid: Container ID.
generate_key: Generate new private key.
oid: Object ID.
trace: Generate trace ID and print it.
root: Search for user objects.
rpc_endpoint: Remote node address (as 'multiaddr' or '<host>:<port>').
verify_presence_all: Verify the actual presence of the object on all netmap nodes.
ttl: TTL value in request meta header (default 2).
wallet: WIF (NEP-2) string or path to the wallet or binary key.
xhdr: Dict with request X-Headers.
timeout: Timeout for the operation (default 15s).
Returns:
Command's result.
"""
return self._execute(
"object nodes",
**{param: value for param, value in locals().items() if param not in ["self"]},
)

View file

@ -64,6 +64,7 @@ class HostConfig:
services: list[ServiceConfig] = field(default_factory=list)
clis: list[CLIConfig] = field(default_factory=list)
attributes: dict[str, str] = field(default_factory=dict)
interfaces: dict[str, str] = field(default_factory=dict)
def __post_init__(self) -> None:
self.services = [ServiceConfig(**service) for service in self.services or []]

View file

@ -178,6 +178,10 @@ class LoadParams:
min_iteration_duration: Optional[str] = metadata_field(
all_load_scenarios, None, "K6_MIN_ITERATION_DURATION", False
)
# Prepare/cut objects locally on client before sending
prepare_locally: Optional[bool] = metadata_field(
[LoadScenario.gRPC, LoadScenario.gRPC_CAR], None, "PREPARE_LOCALLY", False
)
# Specifies K6 setupTimeout time. Currently hardcoded in xk6 as 5 seconds for all scenarios
# https://k6.io/docs/using-k6/k6-options/reference/#setup-timeout
setup_timeout: Optional[str] = metadata_field(all_scenarios, None, "K6_SETUP_TIMEOUT", False)
@ -267,6 +271,16 @@ class LoadParams:
return env_vars
def __post_init__(self):
default_scenario_map = {
LoadType.gRPC: LoadScenario.gRPC,
LoadType.HTTP: LoadScenario.HTTP,
LoadType.S3: LoadScenario.S3,
}
if self.scenario is None:
self.scenario = default_scenario_map[self.load_type]
def get_preset_arguments(self):
command_args = [
self._get_preset_argument(meta_field)
@ -320,7 +334,7 @@ class LoadParams:
# TODO: migrate load_params defaults to testlib
if self.object_size is not None:
size, unit = calc_unit(self.object_size, 1)
static_params = [f"{load_type_str} ({size:.4g} {unit})"]
static_params = [f"{load_type_str} {size:.4g} {unit}"]
else:
static_params = [f"{load_type_str}"]
@ -331,7 +345,7 @@ class LoadParams:
]
params = ", ".join(static_params + dynamic_params)
return f"load: {params}"
return params
def __repr__(self) -> str:
return self.__str__()

View file

@ -8,12 +8,15 @@ class MetricsBase(ABC):
_WRITE_SUCCESS = ""
_WRITE_ERRORS = ""
_WRITE_THROUGHPUT = "data_sent"
_WRITE_LATENCY = ""
_READ_SUCCESS = ""
_READ_ERRORS = ""
_READ_LATENCY = ""
_READ_THROUGHPUT = "data_received"
_DELETE_SUCCESS = ""
_DELETE_LATENCY = ""
_DELETE_ERRORS = ""
def __init__(self, summary) -> None:
@ -27,6 +30,10 @@ class MetricsBase(ABC):
@property
def write_success_iterations(self) -> int:
return self._get_metric(self._WRITE_SUCCESS)
@property
def write_latency(self) -> dict:
return self._get_metric(self._WRITE_LATENCY)
@property
def write_rate(self) -> float:
@ -47,6 +54,10 @@ class MetricsBase(ABC):
@property
def read_success_iterations(self) -> int:
return self._get_metric(self._READ_SUCCESS)
@property
def read_latency(self) -> dict:
return self._get_metric(self._READ_LATENCY)
@property
def read_rate(self) -> int:
@ -67,6 +78,10 @@ class MetricsBase(ABC):
@property
def delete_success_iterations(self) -> int:
return self._get_metric(self._DELETE_SUCCESS)
@property
def delete_latency(self) -> dict:
return self._get_metric(self._DELETE_LATENCY)
@property
def delete_failed_iterations(self) -> int:
@ -77,7 +92,7 @@ class MetricsBase(ABC):
return self._get_metric_rate(self._DELETE_SUCCESS)
def _get_metric(self, metric: str) -> int:
metrics_method_map = {"counter": self._get_counter_metric, "gauge": self._get_gauge_metric}
metrics_method_map = {"counter": self._get_counter_metric, "gauge": self._get_gauge_metric, "trend" : self._get_trend_metrics}
if metric not in self.metrics:
return 0
@ -114,28 +129,37 @@ class MetricsBase(ABC):
def _get_gauge_metric(self, metric: str) -> int:
return metric["values"]["value"]
def _get_trend_metrics(self, metric: str) -> int:
return metric["values"]
class GrpcMetrics(MetricsBase):
_WRITE_SUCCESS = "frostfs_obj_put_total"
_WRITE_ERRORS = "frostfs_obj_put_fails"
_WRITE_LATENCY = "frostfs_obj_put_duration"
_READ_SUCCESS = "frostfs_obj_get_total"
_READ_ERRORS = "frostfs_obj_get_fails"
_READ_LATENCY = "frostfs_obj_get_duration"
_DELETE_SUCCESS = "frostfs_obj_delete_total"
_DELETE_ERRORS = "frostfs_obj_delete_fails"
_DELETE_LATENCY = "frostfs_obj_delete_duration"
class S3Metrics(MetricsBase):
_WRITE_SUCCESS = "aws_obj_put_total"
_WRITE_ERRORS = "aws_obj_put_fails"
_WRITE_LATENCY = "aws_obj_put_duration"
_READ_SUCCESS = "aws_obj_get_total"
_READ_ERRORS = "aws_obj_get_fails"
_READ_LATENCY = "aws_obj_get_duration"
_DELETE_SUCCESS = "aws_obj_delete_total"
_DELETE_ERRORS = "aws_obj_delete_fails"
_DELETE_LATENCY = "aws_obj_delete_duration"
class LocalMetrics(MetricsBase):

View file

@ -2,6 +2,7 @@ from datetime import datetime
from typing import Optional
import yaml
import os
from frostfs_testlib.load.load_config import K6ProcessAllocationStrategy, LoadParams, LoadScenario
from frostfs_testlib.load.load_metrics import get_metrics_object
@ -109,6 +110,7 @@ class LoadReport:
total_rate: float,
throughput: float,
errors: dict[str, int],
latency: dict[str, dict],
):
throughput_html = ""
if throughput > 0:
@ -127,6 +129,15 @@ class LoadReport:
):
per_node_errors_html += self._row(f"At {node_key}", errors)
latency_html = ""
if latency:
for node_key, param_dict in latency.items():
latency_values = ""
for param_name, param_val in param_dict.items():
latency_values += f"{param_name}={param_val:.2f}ms "
latency_html += self._row(f"Put latency {node_key.split(':')[0]}", latency_values)
object_size, object_size_unit = calc_unit(self.load_params.object_size, 1)
duration = self._seconds_to_formatted_duration(self.load_params.load_time)
model = self._get_model_string()
@ -135,6 +146,7 @@ class LoadReport:
errors_percent = 0
if total_operations:
errors_percent = total_errors/total_operations*100.0
html = f"""
<table border="1" cellpadding="5px"><tbody>
<tr><th colspan="2" bgcolor="gainsboro">{short_summary}</th></tr>
@ -142,7 +154,7 @@ class LoadReport:
{self._row("Total operations", total_operations)}
{self._row("OP/sec", f"{total_rate:.2f}")}
{throughput_html}
{latency_html}
<tr><th colspan="2" bgcolor="gainsboro">Errors</th></tr>
{per_node_errors_html}
{self._row("Total", f"{total_errors} ({errors_percent:.2f}%)")}
@ -160,6 +172,7 @@ class LoadReport:
write_operations = 0
write_op_sec = 0
write_throughput = 0
write_latency = {}
write_errors = {}
requested_write_rate = self.load_params.write_rate
requested_write_rate_str = (
@ -169,12 +182,14 @@ class LoadReport:
read_operations = 0
read_op_sec = 0
read_throughput = 0
read_latency = {}
read_errors = {}
requested_read_rate = self.load_params.read_rate
requested_read_rate_str = f"{requested_read_rate}op/sec" if requested_read_rate else ""
delete_operations = 0
delete_op_sec = 0
delete_latency = {}
delete_errors = {}
requested_delete_rate = self.load_params.delete_rate
requested_delete_rate_str = (
@ -210,6 +225,7 @@ class LoadReport:
if write_operations:
write_section_required = True
write_op_sec += metrics.write_rate
write_latency[node_key] = metrics.write_latency
write_throughput += metrics.write_throughput
if metrics.write_failed_iterations:
write_errors[node_key] = metrics.write_failed_iterations
@ -219,6 +235,7 @@ class LoadReport:
read_section_required = True
read_op_sec += metrics.read_rate
read_throughput += metrics.read_throughput
read_latency[node_key] = metrics.read_latency
if metrics.read_failed_iterations:
read_errors[node_key] = metrics.read_failed_iterations
@ -226,6 +243,7 @@ class LoadReport:
if delete_operations:
delete_section_required = True
delete_op_sec += metrics.delete_rate
delete_latency[node_key] = metrics.delete_latency
if metrics.delete_failed_iterations:
delete_errors[node_key] = metrics.delete_failed_iterations
@ -238,6 +256,7 @@ class LoadReport:
write_op_sec,
write_throughput,
write_errors,
write_latency,
)
if read_section_required:
@ -249,6 +268,7 @@ class LoadReport:
read_op_sec,
read_throughput,
read_errors,
read_latency,
)
if delete_section_required:
@ -260,6 +280,7 @@ class LoadReport:
delete_op_sec,
0,
delete_errors,
delete_latency,
)
return html

View file

@ -11,7 +11,7 @@ BACKGROUND_WRITERS_COUNT = os.getenv("BACKGROUND_WRITERS_COUNT", 0)
BACKGROUND_READERS_COUNT = os.getenv("BACKGROUND_READERS_COUNT", 0)
BACKGROUND_DELETERS_COUNT = os.getenv("BACKGROUND_DELETERS_COUNT", 0)
BACKGROUND_VERIFIERS_COUNT = os.getenv("BACKGROUND_VERIFIERS_COUNT", 0)
BACKGROUND_LOAD_DEFAULT_TIME = os.getenv("BACKGROUND_LOAD_DEFAULT_TIME", 600)
BACKGROUND_LOAD_DEFAULT_TIME = os.getenv("BACKGROUND_LOAD_DEFAULT_TIME", 1800)
BACKGROUND_LOAD_DEFAULT_OBJECT_SIZE = os.getenv("BACKGROUND_LOAD_DEFAULT_OBJECT_SIZE", 32)
BACKGROUND_LOAD_SETUP_TIMEOUT = os.getenv("BACKGROUND_LOAD_SETUP_TIMEOUT", "5s")

View file

@ -11,8 +11,9 @@ from frostfs_testlib.reporter import get_reporter
from frostfs_testlib.resources.cli import CLI_DEFAULT_TIMEOUT, FROSTFS_CLI_EXEC, NEOGO_EXECUTABLE
from frostfs_testlib.resources.common import ASSETS_DIR, DEFAULT_WALLET_CONFIG
from frostfs_testlib.shell import Shell
from frostfs_testlib.storage.cluster import Cluster
from frostfs_testlib.storage.cluster import Cluster, ClusterNode
from frostfs_testlib.utils import json_utils
from frostfs_testlib.utils.cli_utils import parse_cmd_table, parse_netmap_output
logger = logging.getLogger("NeoLogger")
reporter = get_reporter()
@ -731,3 +732,62 @@ def neo_go_query_height(shell: Shell, endpoint: str) -> dict:
latest_block[0].replace(":", ""): int(latest_block[1]),
validated_state[0].replace(":", ""): int(validated_state[1]),
}
@reporter.step_deco("Search object nodes")
def get_object_nodes(
cluster: Cluster,
wallet: str,
cid: str,
oid: str,
shell: Shell,
endpoint: str,
bearer: str = "",
xhdr: Optional[dict] = None,
is_direct: bool = False,
verify_presence_all: bool = False,
wallet_config: Optional[str] = None,
timeout: Optional[str] = CLI_DEFAULT_TIMEOUT,
) -> list[ClusterNode]:
cli = FrostfsCli(shell, FROSTFS_CLI_EXEC, wallet_config or DEFAULT_WALLET_CONFIG)
result_object_nodes = cli.object.nodes(
rpc_endpoint=endpoint,
wallet=wallet,
cid=cid,
oid=oid,
bearer=bearer,
ttl=1 if is_direct else None,
xhdr=xhdr,
timeout=timeout,
verify_presence_all=verify_presence_all,
)
parsing_output = parse_cmd_table(result_object_nodes.stdout, "|")
list_object_nodes = [
node
for node in parsing_output
if node["should_contain_object"] == "true" and node["actually_contains_object"] == "true"
]
netmap_nodes_list = parse_netmap_output(
cli.netmap.snapshot(
rpc_endpoint=endpoint,
wallet=wallet,
).stdout
)
netmap_nodes = [
netmap_node
for object_node in list_object_nodes
for netmap_node in netmap_nodes_list
if object_node["node_id"] == netmap_node.node_id
]
result = [
cluster_node
for netmap_node in netmap_nodes
for cluster_node in cluster.cluster_nodes
if netmap_node.node == cluster_node.host_ip
]
return result

View file

@ -0,0 +1,42 @@
from frostfs_testlib.shell import Shell
from frostfs_testlib.storage.cluster import ClusterNode
class IpTablesHelper:
@staticmethod
def drop_input_traffic_to_port(node: ClusterNode, ports: list[str]) -> None:
shell = node.host.get_shell()
for port in ports:
shell.exec(f"iptables -A INPUT -p tcp --dport {port} -j DROP")
@staticmethod
def drop_input_traffic_to_node(node: ClusterNode, block_ip: list[str]) -> None:
shell = node.host.get_shell()
for ip in block_ip:
shell.exec(f"iptables -A INPUT -s {ip} -j DROP")
@staticmethod
def restore_input_traffic_to_port(node: ClusterNode) -> None:
shell = node.host.get_shell()
ports = (
shell.exec("iptables -L --numeric | grep DROP | awk '{print $7}'")
.stdout.strip()
.split("\n")
)
if ports[0] == "":
return
for port in ports:
shell.exec(f"iptables -D INPUT -p tcp --dport {port.split(':')[-1]} -j DROP")
@staticmethod
def restore_input_traffic_to_node(node: ClusterNode) -> None:
shell = node.host.get_shell()
unlock_ip = (
shell.exec("iptables -L --numeric | grep DROP | awk '{print $4}'")
.stdout.strip()
.split("\n")
)
if unlock_ip[0] == "":
return
for ip in unlock_ip:
shell.exec(f"iptables -D INPUT -s {ip} -j DROP")

View file

@ -4,8 +4,10 @@ import time
import frostfs_testlib.resources.optionals as optionals
from frostfs_testlib.reporter import get_reporter
from frostfs_testlib.shell import CommandOptions, Shell
from frostfs_testlib.steps.iptables import IpTablesHelper
from frostfs_testlib.storage.cluster import Cluster, ClusterNode, StorageNode
from frostfs_testlib.storage.controllers.disk_controller import DiskController
from frostfs_testlib.storage.dataclasses.node_base import NodeBase, ServiceClass
from frostfs_testlib.testing import parallel
from frostfs_testlib.testing.test_control import run_optionally
from frostfs_testlib.utils.failover_utils import (
@ -24,6 +26,8 @@ class ClusterStateController:
self.detached_disks: dict[str, DiskController] = {}
self.stopped_storage_nodes: list[ClusterNode] = []
self.stopped_s3_gates: list[ClusterNode] = []
self.dropped_traffic: list[ClusterNode] = []
self.stopped_services: set[NodeBase] = set()
self.cluster = cluster
self.shell = shell
self.suspended_services: dict[str, list[ClusterNode]] = {}
@ -124,6 +128,51 @@ class ClusterStateController:
node.storage_node.stop_service()
self.stopped_storage_nodes.append(node)
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step_deco("Stop all {service_type} services")
def stop_services_of_type(self, service_type: type[ServiceClass]):
services = self.cluster.services(service_type)
self.stopped_services.update(services)
parallel([service.stop_service for service in services])
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step_deco("Start all {service_type} services")
def start_services_of_type(self, service_type: type[ServiceClass]):
services = self.cluster.services(service_type)
parallel([service.start_service for service in services])
if service_type == StorageNode:
wait_all_storage_nodes_returned(self.shell, self.cluster)
self.stopped_services = self.stopped_services - set(services)
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step_deco("Start all stopped services")
def start_all_stopped_services(self):
parallel([service.start_service for service in self.stopped_services])
for service in self.stopped_services:
if isinstance(service, StorageNode):
wait_all_storage_nodes_returned(self.shell, self.cluster)
break
self.stopped_services.clear()
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step_deco("Stop {service_type} service on {node}")
def stop_service_of_type(self, node: ClusterNode, service_type: type[ServiceClass]):
service = node.service(service_type)
service.stop_service()
self.stopped_services.add(service)
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step_deco("Start {service_type} service on {node}")
def start_service_of_type(self, node: ClusterNode, service_type: type[ServiceClass]):
service = node.service(service_type)
service.start_service()
if service in self.stopped_services:
self.stopped_services.remove(service)
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step_deco("Start storage service on {node}")
def start_storage_service(self, node: ClusterNode):
@ -191,6 +240,62 @@ class ClusterStateController:
[node.host.wait_success_resume_process(process_name) for node in list_nodes]
self.suspended_services = {}
@reporter.step_deco("Drop traffic to {node}, with ports - {ports}, nodes - {block_nodes}")
def drop_traffic(
self,
mode: str,
node: ClusterNode,
wakeup_timeout: int,
ports: list[str] = None,
block_nodes: list[ClusterNode] = None,
) -> None:
allowed_modes = ["ports", "nodes"]
assert mode in allowed_modes
match mode:
case "ports":
IpTablesHelper.drop_input_traffic_to_port(node, ports)
case "nodes":
list_ip = self._parse_intefaces(block_nodes)
IpTablesHelper.drop_input_traffic_to_node(node, list_ip)
time.sleep(wakeup_timeout)
self.dropped_traffic.append(node)
@reporter.step_deco("Ping traffic")
def ping_traffic(
self,
node: ClusterNode,
nodes_list: list[ClusterNode],
expect_result: int,
) -> bool:
shell = node.host.get_shell()
options = CommandOptions(check=False)
ips = self._parse_intefaces(nodes_list)
for ip in ips:
code = shell.exec(f"ping {ip} -c 1", options).return_code
if code != expect_result:
return False
return True
@reporter.step_deco("Start traffic to {node}")
def restore_traffic(
self,
mode: str,
node: ClusterNode,
) -> None:
allowed_modes = ["ports", "nodes"]
assert mode in allowed_modes
match mode:
case "ports":
IpTablesHelper.restore_input_traffic_to_port(node=node)
case "nodes":
IpTablesHelper.restore_input_traffic_to_node(node=node)
@reporter.step_deco("Restore blocked nodes")
def restore_all_traffic(self):
parallel(self._restore_traffic_to_node, self.dropped_traffic)
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step_deco("Hard reboot host {node} via magic SysRq option")
def panic_reboot_host(self, node: ClusterNode, wait_for_return: bool = True):
@ -217,3 +322,16 @@ class ClusterStateController:
disk_controller = DiskController(node, device, mountpoint)
return disk_controller
def _restore_traffic_to_node(self, node):
IpTablesHelper.restore_input_traffic_to_port(node)
IpTablesHelper.restore_input_traffic_to_node(node)
def _parse_intefaces(self, nodes: list[ClusterNode]):
interfaces = []
for node in nodes:
dict_interfaces = node.host.config.interfaces
for type, ip in dict_interfaces.items():
if "mgmt" not in type:
interfaces.append(ip)
return interfaces

View file

@ -1,4 +1,4 @@
from abc import ABC, abstractmethod
from abc import abstractmethod
from dataclasses import dataclass
from typing import Optional, Tuple, TypedDict, TypeVar
@ -6,10 +6,13 @@ import yaml
from frostfs_testlib.hosting.config import ServiceConfig
from frostfs_testlib.hosting.interfaces import Host
from frostfs_testlib.reporter import get_reporter
from frostfs_testlib.storage.constants import ConfigAttributes
from frostfs_testlib.testing.readable import HumanReadableABC
from frostfs_testlib.utils import wallet_utils
reporter = get_reporter()
@dataclass
class NodeBase(HumanReadableABC):
@ -54,17 +57,20 @@ class NodeBase(HumanReadableABC):
return self._process_name
def start_service(self):
self.host.start_service(self.name)
with reporter.step(f"Start {self.name} service on {self.host.config.address}"):
self.host.start_service(self.name)
@abstractmethod
def service_healthcheck(self) -> bool:
"""Service healthcheck."""
def stop_service(self):
self.host.stop_service(self.name)
with reporter.step(f"Stop {self.name} service on {self.host.config.address}"):
self.host.stop_service(self.name)
def restart_service(self):
self.host.restart_service(self.name)
with reporter.step(f"Restart {self.name} service on {self.host.config.address}"):
self.host.restart_service(self.name)
def get_wallet_password(self) -> str:
return self._get_attribute(ConfigAttributes.WALLET_PASSWORD)

View file

@ -7,7 +7,7 @@ class ObjectSize:
value: int
def __str__(self) -> str:
return f"{self.name} object size"
return self.name
def __repr__(self) -> str:
return self.__str__()

View file

@ -23,3 +23,21 @@ class StorageObjectInfo(ObjectRef):
attributes: Optional[list[dict[str, str]]] = None
tombstone: Optional[str] = None
locks: Optional[list[LockObjectInfo]] = None
@dataclass
class NodeNetmapInfo:
node_id: str = None
node_status: str = None
node_data_ip: str = None
cluster_name: str = None
continent: str = None
country: str = None
country_code: str = None
external_address: str = None
location: str = None
node: str = None
sub_div: str = None
sub_div_code: int = None
un_locode: str = None
role: str = None

View file

@ -5,18 +5,21 @@
"""
Helper functions to use with `frostfs-cli`, `neo-go` and other CLIs.
"""
import csv
import json
import logging
import subprocess
import sys
from contextlib import suppress
from datetime import datetime
from io import StringIO
from textwrap import shorten
from typing import TypedDict, Union
from typing import Dict, List, TypedDict, Union
import pexpect
from frostfs_testlib.reporter import get_reporter
from frostfs_testlib.storage.dataclasses.storage_object_info import NodeNetmapInfo
reporter = get_reporter()
logger = logging.getLogger("NeoLogger")
@ -131,3 +134,49 @@ def log_command_execution(cmd: str, output: Union[str, TypedDict]) -> None:
command_attachment = f"COMMAND: '{cmd}'\n" f"OUTPUT:\n {output}\n"
with reporter.step(f'COMMAND: {shorten(cmd, width=60, placeholder="...")}'):
reporter.attach(command_attachment, "Command execution")
def parse_netmap_output(output: str) -> list[NodeNetmapInfo]:
"""
The cli command will return something like.
Epoch: 240
Node 1: 01234 ONLINE /ip4/10.10.10.10/tcp/8080
Continent: Europe
Country: Russia
CountryCode: RU
ExternalAddr: /ip4/10.10.11.18/tcp/8080
Location: Moskva
Node: 10.10.10.12
Price: 5
SubDiv: Moskva
SubDivCode: MOW
UN-LOCODE: RU MOW
role: alphabet
The code will parse each line and return each node as dataclass.
"""
netmap_list = output.split("Node ")[1:]
dataclass_list = []
for node in netmap_list:
node = node.replace("\t", "").split("\n")
node = *node[0].split(" ")[1:-1], *[row.split(": ")[-1] for row in node[1:-1]]
dataclass_list.append(NodeNetmapInfo(*node))
return dataclass_list
def parse_cmd_table(output: str, delimiter="|") -> list[dict[str, str]]:
parsing_output = []
reader = csv.reader(StringIO(output.strip()), delimiter=delimiter)
iter_reader = iter(reader)
header_row = next(iter_reader)
for row in iter_reader:
table = {}
for i in range(len(row)):
header = header_row[i].strip().lower().replace(" ", "_")
value = row[i].strip().lower()
if header:
table[header] = value
parsing_output.append(table)
return parsing_output