Compare commits

...

10 Commits

Author SHA1 Message Date
m.malygina 3af4dfd977 multipart scenario
Signed-off-by: m.malygina <m.malygina@yadro.com>
2023-10-27 11:53:55 +03:00
Andrey Berezin 8a360683ae [#104] Add mask/unmask for services
Signed-off-by: Andrey Berezin <a.berezin@yadro.com>
2023-10-26 17:31:33 +03:00
Andrey Berezin f4111a1374 [#103] Add host_status method to Host
Signed-off-by: Andrey Berezin <a.berezin@yadro.com>
2023-10-26 13:34:42 +03:00
Andrey Berezin b1a3d740e9 [#102] Updates for failover
Signed-off-by: Andrey Berezin <a.berezin@yadro.com>
2023-10-25 15:57:38 +03:00
Dmitriy Zayakin 0c3bb20af5 Add method to interfaces
Signed-off-by: Dmitriy Zayakin <d.zayakin@yadro.com>
2023-10-24 12:41:44 +00:00
Andrey Berezin e1f3444e92 [#100] Add new method for logs gathering
Signed-off-by: Andrey Berezin <a.berezin@yadro.com>
2023-10-20 18:08:22 +03:00
Dmitriy Zayakin cff5db5a67 Change func parsing netmap
Signed-off-by: Dmitriy Zayakin <d.zayakin@yadro.com>
2023-10-18 08:29:34 +00:00
Andrey Berezin 1c3bbe26f7 [#98] Small dependency cleanup
Signed-off-by: Andrey Berezin <a.berezin@yadro.com>
2023-10-17 17:45:23 +03:00
Dmitry Anurin dd347dd8fb Added unit to logs getter
Signed-off-by: Dmitry Anurin <d.anurin@yadro.com>
2023-10-12 11:56:30 +00:00
Andrey Berezin 98f9c78f09 [#97] Probe fix for filedescriptor issue
Signed-off-by: Andrey Berezin <a.berezin@yadro.com>
2023-10-11 18:21:40 +03:00
17 changed files with 290 additions and 124 deletions

View File

@ -18,11 +18,11 @@ keywords = ["frostfs", "test"]
dependencies = [
"allure-python-commons>=2.13.2",
"docker>=4.4.0",
"importlib_metadata>=5.0; python_version < '3.10'",
"pyyaml==6.0.1",
"neo-mamba==1.0.0",
"paramiko>=2.10.3",
"pexpect>=4.8.0",
"requests>=2.28.0",
"requests==2.28.1",
"docstring_parser>=0.15",
"testrail-api>=1.12.0",
"pytest==7.1.2",

View File

@ -1,6 +1,5 @@
allure-python-commons==2.13.2
docker==4.4.0
importlib_metadata==5.0.0
neo-mamba==1.0.0
paramiko==2.10.3
pexpect==4.8.0

View File

@ -11,7 +11,7 @@ import docker
from requests import HTTPError
from frostfs_testlib.hosting.config import ParsedAttributes
from frostfs_testlib.hosting.interfaces import DiskInfo, Host
from frostfs_testlib.hosting.interfaces import DiskInfo, Host, HostStatus
from frostfs_testlib.shell import LocalShell, Shell, SSHShell
from frostfs_testlib.shell.command_inspectors import SudoInspector
@ -87,6 +87,15 @@ class DockerHost(Host):
for service_config in self._config.services:
self.start_service(service_config.name)
def get_host_status(self) -> HostStatus:
# We emulate host status by checking all services.
for service_config in self._config.services:
state = self._get_container_state(service_config.name)
if state != "running":
return HostStatus.OFFLINE
return HostStatus.ONLINE
def stop_host(self) -> None:
# We emulate stopping machine by stopping all services
# As an alternative we can probably try to stop docker service...
@ -117,6 +126,14 @@ class DockerHost(Host):
timeout=service_attributes.stop_timeout,
)
def mask_service(self, service_name: str) -> None:
# Not required for Docker
return
def unmask_service(self, service_name: str) -> None:
# Not required for Docker
return
def wait_success_suspend_process(self, service_name: str):
raise NotImplementedError("Not supported for docker")
@ -212,11 +229,36 @@ class DockerHost(Host):
with open(file_path, "wb") as file:
file.write(logs)
def get_filtered_logs(
self,
filter_regex: str,
since: Optional[datetime] = None,
until: Optional[datetime] = None,
unit: Optional[str] = None,
) -> str:
client = self._get_docker_client()
filtered_logs = ""
for service_config in self._config.services:
container_name = self._get_service_attributes(service_config.name).container_name
try:
filtered_logs = client.logs(container_name, since=since, until=until)
except HTTPError as exc:
logger.info(f"Got exception while dumping logs of '{container_name}': {exc}")
continue
matches = re.findall(filter_regex, filtered_logs, re.IGNORECASE + re.MULTILINE)
found = list(matches)
if found:
filtered_logs += f"{container_name}:\n{os.linesep.join(found)}"
return filtered_logs
def is_message_in_logs(
self,
message_regex: str,
since: Optional[datetime] = None,
until: Optional[datetime] = None,
unit: Optional[str] = None,
) -> bool:
client = self._get_docker_client()
for service_config in self._config.services:
@ -268,11 +310,16 @@ class DockerHost(Host):
# To speed things up, we break timeout in smaller iterations and check container state
# several times. This way waiting stops as soon as container reaches the expected state
for _ in range(iterations):
container = self._get_container_by_name(container_name)
logger.debug(f"Current container state\n:{json.dumps(container, indent=2)}")
state = self._get_container_state(container_name)
if container and container["State"] == expected_state:
if state == expected_state:
return
time.sleep(iteration_wait_time)
raise RuntimeError(f"Container {container_name} is not in {expected_state} state.")
def _get_container_state(self, container_name: str) -> str:
container = self._get_container_by_name(container_name)
logger.debug(f"Current container state\n:{json.dumps(container, indent=2)}")
return container.get("State", None)

View File

@ -4,6 +4,13 @@ from typing import Optional
from frostfs_testlib.hosting.config import CLIConfig, HostConfig, ServiceConfig
from frostfs_testlib.shell.interfaces import Shell
from frostfs_testlib.testing.readable import HumanReadableEnum
class HostStatus(HumanReadableEnum):
ONLINE = "Online"
OFFLINE = "Offline"
UNKNOWN = "Unknown"
class DiskInfo(dict):
@ -79,6 +86,10 @@ class Host(ABC):
def start_host(self) -> None:
"""Starts the host machine."""
@abstractmethod
def get_host_status(self) -> HostStatus:
"""Check host status."""
@abstractmethod
def stop_host(self, mode: str) -> None:
"""Stops the host machine.
@ -107,6 +118,26 @@ class Host(ABC):
service_name: Name of the service to stop.
"""
@abstractmethod
def mask_service(self, service_name: str) -> None:
"""Prevent the service from start by any activity by masking it.
The service must be hosted on this host.
Args:
service_name: Name of the service to mask.
"""
@abstractmethod
def unmask_service(self, service_name: str) -> None:
"""Allow the service to start by any activity by unmasking it.
The service must be hosted on this host.
Args:
service_name: Name of the service to unmask.
"""
@abstractmethod
def restart_service(self, service_name: str) -> None:
"""Restarts the service with specified name and waits until it starts.
@ -115,7 +146,6 @@ class Host(ABC):
service_name: Name of the service to restart.
"""
@abstractmethod
def get_data_directory(self, service_name: str) -> str:
"""
@ -126,7 +156,6 @@ class Host(ABC):
service_name: Name of storage node service.
"""
@abstractmethod
def wait_success_suspend_process(self, process_name: str) -> None:
"""Search for a service ID by its name and stop the process
@ -251,12 +280,34 @@ class Host(ABC):
filter_regex: regex to filter output
"""
@abstractmethod
def get_filtered_logs(
self,
filter_regex: str,
since: Optional[datetime] = None,
until: Optional[datetime] = None,
unit: Optional[str] = None,
) -> str:
"""Get logs from host filtered by regex.
Args:
filter_regex: regex filter for logs.
since: If set, limits the time from which logs should be collected. Must be in UTC.
until: If set, limits the time until which logs should be collected. Must be in UTC.
unit: required unit.
Returns:
Found entries as str if any found.
Empty string otherwise.
"""
@abstractmethod
def is_message_in_logs(
self,
message_regex: str,
since: Optional[datetime] = None,
until: Optional[datetime] = None,
unit: Optional[str] = None,
) -> bool:
"""Checks logs on host for specified message regex.
@ -269,10 +320,11 @@ class Host(ABC):
True if message found in logs in the given time frame.
False otherwise.
"""
@abstractmethod
def wait_for_service_to_be_in_state(self, systemd_service_name: str, expected_state: str, timeout: int) -> None:
def wait_for_service_to_be_in_state(
self, systemd_service_name: str, expected_state: str, timeout: int
) -> None:
"""
Waites for service to be in specified state.

View File

@ -19,6 +19,7 @@ class LoadScenario(Enum):
gRPC_CAR = "grpc_car"
S3 = "s3"
S3_CAR = "s3_car"
S3_MULTIPART = "s3_multipart"
HTTP = "http"
VERIFY = "verify"
LOCAL = "local"
@ -37,10 +38,11 @@ all_load_scenarios = [
LoadScenario.S3_CAR,
LoadScenario.gRPC_CAR,
LoadScenario.LOCAL,
LoadScenario.S3_MULTIPART
]
all_scenarios = all_load_scenarios.copy() + [LoadScenario.VERIFY]
constant_vus_scenarios = [LoadScenario.gRPC, LoadScenario.S3, LoadScenario.HTTP, LoadScenario.LOCAL]
constant_vus_scenarios = [LoadScenario.gRPC, LoadScenario.S3, LoadScenario.HTTP, LoadScenario.LOCAL, LoadScenario.S3_MULTIPART]
constant_arrival_rate_scenarios = [LoadScenario.gRPC_CAR, LoadScenario.S3_CAR]
grpc_preset_scenarios = [
@ -49,7 +51,7 @@ grpc_preset_scenarios = [
LoadScenario.gRPC_CAR,
LoadScenario.LOCAL,
]
s3_preset_scenarios = [LoadScenario.S3, LoadScenario.S3_CAR]
s3_preset_scenarios = [LoadScenario.S3, LoadScenario.S3_CAR, LoadScenario.S3_MULTIPART]
@dataclass
@ -172,7 +174,7 @@ class LoadParams:
k6_url: Optional[str] = None
# No ssl verification flag
no_verify_ssl: Optional[bool] = metadata_field(
[LoadScenario.S3, LoadScenario.S3_CAR, LoadScenario.VERIFY, LoadScenario.HTTP],
[LoadScenario.S3, LoadScenario.S3_CAR, LoadScenario.S3_MULTIPART, LoadScenario.VERIFY, LoadScenario.HTTP],
"no-verify-ssl",
"NO_VERIFY_SSL",
False,
@ -258,6 +260,14 @@ class LoadParams:
constant_arrival_rate_scenarios, None, "MAX_DELETERS", False, True
)
# Multipart
# Number of parts to upload in parallel
writers_multipart: Optional[int] = metadata_field(
[LoadScenario.S3_MULTIPART], None, "WRITERS_MULTIPART", False, True
)
# part size must be greater than (5 MB)
write_object_part_size: Optional[int] = metadata_field([LoadScenario.S3_MULTIPART], None, "WRITE_OBJ_PART_SIZE", False)
# Period of time to apply the rate value.
time_unit: Optional[str] = metadata_field(
constant_arrival_rate_scenarios, None, "TIME_UNIT", False

View File

@ -196,6 +196,7 @@ def get_metrics_object(load_type: LoadScenario, summary: dict[str, Any]) -> Metr
LoadScenario.HTTP: GrpcMetrics,
LoadScenario.S3: S3Metrics,
LoadScenario.S3_CAR: S3Metrics,
LoadScenario.S3_MULTIPART: S3Metrics,
LoadScenario.VERIFY: VerifyMetrics,
LoadScenario.LOCAL: LocalMetrics,
}

View File

@ -92,6 +92,7 @@ class LoadReport:
model_map = {
LoadScenario.gRPC: "closed model",
LoadScenario.S3: "closed model",
LoadScenario.S3_MULTIPART: "closed model",
LoadScenario.HTTP: "closed model",
LoadScenario.gRPC_CAR: "open model",
LoadScenario.S3_CAR: "open model",

View File

@ -1,12 +1,6 @@
import sys
from importlib.metadata import entry_points
from typing import Any
if sys.version_info < (3, 10):
# On Python prior 3.10 we need to use backport of entry points
from importlib_metadata import entry_points
else:
from importlib.metadata import entry_points
def load_plugin(plugin_group: str, name: str) -> Any:
"""Loads plugin using entry point specification.

View File

@ -39,7 +39,7 @@ class LocalShell(Shell):
log_file = tempfile.TemporaryFile() # File is reliable cross-platform way to capture output
try:
command_process = pexpect.spawn(command, timeout=options.timeout)
command_process = pexpect.spawn(command, timeout=options.timeout, use_poll=True)
except (pexpect.ExceptionPexpect, OSError) as exc:
raise RuntimeError(f"Command: {command}") from exc

View File

@ -15,8 +15,7 @@ from frostfs_testlib.resources.cli import (
)
from frostfs_testlib.resources.common import MORPH_BLOCK_TIME
from frostfs_testlib.shell import Shell
from frostfs_testlib.steps.epoch import tick_epoch
from frostfs_testlib.steps.epoch import wait_for_epochs_align
from frostfs_testlib.steps.epoch import tick_epoch, wait_for_epochs_align
from frostfs_testlib.storage.cluster import Cluster, StorageNode
from frostfs_testlib.storage.dataclasses.frostfs_services import S3Gate
from frostfs_testlib.utils import datetime_utils
@ -41,44 +40,6 @@ class HealthStatus:
return HealthStatus(network, health)
@reporter.step_deco("Stop random storage nodes")
def stop_random_storage_nodes(number: int, nodes: list[StorageNode]) -> list[StorageNode]:
"""
Shuts down the given number of randomly selected storage nodes.
Args:
number: the number of storage nodes to stop
nodes: the list of storage nodes to stop
Returns:
the list of nodes that were stopped
"""
nodes_to_stop = random.sample(nodes, number)
for node in nodes_to_stop:
node.stop_service()
return nodes_to_stop
@reporter.step_deco("Start storage node")
def start_storage_nodes(nodes: list[StorageNode]) -> None:
"""
The function starts specified storage nodes.
Args:
nodes: the list of nodes to start
"""
for node in nodes:
node.start_service()
@reporter.step_deco("Stop storage node")
def stop_storage_nodes(nodes: list[StorageNode]) -> None:
"""
The function starts specified storage nodes.
Args:
nodes: the list of nodes to start
"""
for node in nodes:
node.stop_service()
@reporter.step_deco("Get Locode from random storage node")
def get_locode_from_random_node(cluster: Cluster) -> str:
node = random.choice(cluster.services(StorageNode))
@ -329,25 +290,3 @@ def _run_control_command(node: StorageNode, command: str) -> None:
f"--wallet {wallet_path} --config {wallet_config_path}"
)
return result.stdout
@reporter.step_deco("Start services s3gate ")
def start_s3gates(cluster: Cluster) -> None:
"""
The function starts specified storage nodes.
Args:
cluster: cluster instance under test
"""
for gate in cluster.services(S3Gate):
gate.start_service()
@reporter.step_deco("Stop services s3gate ")
def stop_s3gates(cluster: Cluster) -> None:
"""
The function starts specified storage nodes.
Args:
cluster: cluster instance under test
"""
for gate in cluster.services(S3Gate):
gate.stop_service()

View File

@ -17,6 +17,7 @@ from frostfs_testlib.storage.dataclasses.frostfs_services import (
StorageNode,
)
from frostfs_testlib.storage.dataclasses.node_base import NodeBase, ServiceClass
from frostfs_testlib.storage.dataclasses.storage_object_info import Interfaces
from frostfs_testlib.storage.service_registry import ServiceRegistry
reporter = get_reporter()
@ -121,6 +122,40 @@ class ClusterNode:
config.attributes[ConfigAttributes.SERVICE_NAME] for config in self.host.config.services
]
def get_all_interfaces(self) -> dict[str, str]:
return self.host.config.interfaces
def get_interface(self, interface: Interfaces) -> str:
return self.host.config.interfaces[interface.value]
def get_data_interfaces(self) -> list[str]:
return [
ip_address
for name_interface, ip_address in self.host.config.interfaces.items()
if "data" in name_interface
]
def get_data_interface(self, search_interface: str) -> list[str]:
return [
self.host.config.interfaces[interface]
for interface in self.host.config.interfaces.keys()
if search_interface == interface
]
def get_internal_interfaces(self) -> list[str]:
return [
ip_address
for name_interface, ip_address in self.host.config.interfaces.items()
if "internal" in name_interface
]
def get_internal_interface(self, search_internal: str) -> list[str]:
return [
self.host.config.interfaces[interface]
for interface in self.host.config.interfaces.keys()
if search_internal == interface
]
class Cluster:
"""
@ -173,6 +208,42 @@ class Cluster:
def morph_chain(self) -> list[MorphChain]:
return self.services(MorphChain)
def nodes(self, services: list[ServiceClass]) -> list[ClusterNode]:
"""
Resolve which cluster nodes hosting the specified services.
Args:
services: list of services to resolve hosting cluster nodes.
Returns:
list of cluster nodes which host specified services.
"""
cluster_nodes = set()
for service in services:
cluster_nodes.update(
[node for node in self.cluster_nodes if node.service(type(service)) == service]
)
return list(cluster_nodes)
def node(self, service: ServiceClass) -> ClusterNode:
"""
Resolve single cluster node hosting the specified service.
Args:
services: list of services to resolve hosting cluster nodes.
Returns:
list of cluster nodes which host specified services.
"""
nodes = [node for node in self.cluster_nodes if node.service(type(service)) == service]
if not len(nodes):
raise RuntimeError(f"Cannot find service {service} on any node")
return nodes[0]
def services(self, service_type: type[ServiceClass]) -> list[ServiceClass]:
"""
Get all services in a cluster of specified type.

View File

@ -10,6 +10,7 @@ class ConfigAttributes:
ENDPOINT_DATA_0 = "endpoint_data0"
ENDPOINT_DATA_1 = "endpoint_data1"
ENDPOINT_INTERNAL = "endpoint_internal0"
ENDPOINT_PROMETHEUS = "endpoint_prometheus"
CONTROL_ENDPOINT = "control_endpoint"
UN_LOCODE = "un_locode"
HTTP_HOSTNAME = "http_hostname"

View File

@ -41,10 +41,10 @@ class ClusterStateController:
provider = SshConnectionProvider()
provider.drop(node.host_ip)
self.stopped_nodes.append(node)
with reporter.step(f"Stop host {node.host.config.address}"):
node.host.stop_host(mode=mode)
wait_for_host_offline(self.shell, node.storage_node)
self.stopped_nodes.append(node)
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step_deco("Shutdown whole cluster")
@ -135,9 +135,9 @@ class ClusterStateController:
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step_deco("Stop storage service on {node}")
def stop_storage_service(self, node: ClusterNode):
node.storage_node.stop_service()
def stop_storage_service(self, node: ClusterNode, mask: bool = True):
self.stopped_storage_nodes.append(node)
node.storage_node.stop_service(mask)
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step_deco("Stop all {service_type} services")
@ -171,9 +171,11 @@ class ClusterStateController:
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step_deco("Stop {service_type} service on {node}")
def stop_service_of_type(self, node: ClusterNode, service_type: type[ServiceClass]):
def stop_service_of_type(
self, node: ClusterNode, service_type: type[ServiceClass], mask: bool = True
):
service = node.service(service_type)
service.stop_service()
service.stop_service(mask)
self.stopped_services.add(service)
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@ -207,8 +209,8 @@ class ClusterStateController:
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step_deco("Stop s3 gate on {node}")
def stop_s3_gate(self, node: ClusterNode):
node.s3_gate.stop_service()
def stop_s3_gate(self, node: ClusterNode, mask: bool = True):
node.s3_gate.stop_service(mask)
self.stopped_s3_gates.append(node)
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)

View File

@ -110,6 +110,10 @@ class MorphChain(NodeBase):
def label(self) -> str:
return f"{self.name}: {self.get_endpoint()}"
def get_http_endpoint(self) -> str:
return self._get_attribute("http_endpoint")
class StorageNode(NodeBase):
"""
Class represents storage node in a storage cluster
@ -141,6 +145,9 @@ class StorageNode(NodeBase):
def get_shard_config_path(self) -> str:
return self._get_attribute(ConfigAttributes.SHARD_CONFIG_PATH)
def get_shards_config(self) -> tuple[str, dict]:
return self.get_config(self.get_shard_config_path())
def get_control_endpoint(self) -> str:
return self._get_attribute(ConfigAttributes.CONTROL_ENDPOINT)
@ -149,10 +156,10 @@ class StorageNode(NodeBase):
def get_data_directory(self) -> str:
return self.host.get_data_directory(self.name)
def get_http_hostname(self) -> str:
return self._get_attribute(ConfigAttributes.HTTP_HOSTNAME)
def get_s3_hostname(self) -> str:
return self._get_attribute(ConfigAttributes.S3_HOSTNAME)

View File

@ -1,6 +1,6 @@
from abc import abstractmethod
from dataclasses import dataclass
from typing import Optional, Tuple, TypedDict, TypeVar
from typing import Optional, TypedDict, TypeVar
import yaml
@ -57,6 +57,9 @@ class NodeBase(HumanReadableABC):
return self._process_name
def start_service(self):
with reporter.step(f"Unmask {self.name} service on {self.host.config.address}"):
self.host.unmask_service(self.name)
with reporter.step(f"Start {self.name} service on {self.host.config.address}"):
self.host.start_service(self.name)
@ -64,7 +67,14 @@ class NodeBase(HumanReadableABC):
def service_healthcheck(self) -> bool:
"""Service healthcheck."""
def stop_service(self):
def get_metrics_endpoint(self) -> str:
return self._get_attribute(ConfigAttributes.ENDPOINT_PROMETHEUS)
def stop_service(self, mask: bool = True):
if mask:
with reporter.step(f"Mask {self.name} service on {self.host.config.address}"):
self.host.mask_service(self.name)
with reporter.step(f"Stop {self.name} service on {self.host.config.address}"):
self.host.stop_service(self.name)
@ -103,8 +113,10 @@ class NodeBase(HumanReadableABC):
ConfigAttributes.WALLET_CONFIG,
)
def get_config(self) -> Tuple[str, dict]:
config_file_path = self._get_attribute(ConfigAttributes.CONFIG_PATH)
def get_config(self, config_file_path: Optional[str] = None) -> tuple[str, dict]:
if config_file_path is None:
config_file_path = self._get_attribute(ConfigAttributes.CONFIG_PATH)
shell = self.host.get_shell()
result = shell.exec(f"cat {config_file_path}")
@ -113,8 +125,10 @@ class NodeBase(HumanReadableABC):
config = yaml.safe_load(config_text)
return config_file_path, config
def save_config(self, new_config: dict) -> None:
config_file_path = self._get_attribute(ConfigAttributes.CONFIG_PATH)
def save_config(self, new_config: dict, config_file_path: Optional[str] = None) -> None:
if config_file_path is None:
config_file_path = self._get_attribute(ConfigAttributes.CONFIG_PATH)
shell = self.host.get_shell()
config_str = yaml.dump(new_config)

View File

@ -1,6 +1,9 @@
from dataclasses import dataclass
from enum import Enum
from typing import Optional
from frostfs_testlib.testing.readable import HumanReadableEnum
@dataclass
class ObjectRef:
@ -29,15 +32,24 @@ class StorageObjectInfo(ObjectRef):
class NodeNetmapInfo:
node_id: str = None
node_status: str = None
node_data_ip: str = None
node_data_ips: list[str] = None
cluster_name: str = None
continent: str = None
country: str = None
country_code: str = None
external_address: str = None
external_address: list[str] = None
location: str = None
node: str = None
price: int = None
sub_div: str = None
sub_div_code: int = None
un_locode: str = None
role: str = None
class Interfaces(HumanReadableEnum):
DATA_O: str = "data0"
DATA_1: str = "data1"
MGMT: str = "mgmt"
INTERNAL_0: str = "internal0"
INTERNAL_1: str = "internal1"

View File

@ -8,6 +8,7 @@ Helper functions to use with `frostfs-cli`, `neo-go` and other CLIs.
import csv
import json
import logging
import re
import subprocess
import sys
from contextlib import suppress
@ -138,32 +139,47 @@ def log_command_execution(cmd: str, output: Union[str, TypedDict]) -> None:
def parse_netmap_output(output: str) -> list[NodeNetmapInfo]:
"""
The cli command will return something like.
Epoch: 240
Node 1: 01234 ONLINE /ip4/10.10.10.10/tcp/8080
Continent: Europe
Country: Russia
CountryCode: RU
ExternalAddr: /ip4/10.10.11.18/tcp/8080
Location: Moskva
Node: 10.10.10.12
Price: 5
SubDiv: Moskva
SubDivCode: MOW
UN-LOCODE: RU MOW
role: alphabet
The code will parse each line and return each node as dataclass.
"""
netmap_list = output.split("Node ")[1:]
dataclass_list = []
for node in netmap_list:
node = node.replace("\t", "").split("\n")
node = *node[0].split(" ")[1:-1], *[row.split(": ")[-1] for row in node[1:-1]]
dataclass_list.append(NodeNetmapInfo(*node))
netmap_nodes = output.split("Node ")[1:]
dataclasses_netmap = []
result_netmap = {}
return dataclass_list
regexes = {
"node_id": r"\d+: (?P<node_id>\w+)",
"node_data_ips": r"(?P<node_data_ips>/ip4/.+?)$",
"node_status": r"(?P<node_status>ONLINE|OFFLINE)",
"cluster_name": r"ClusterName: (?P<cluster_name>\w+)",
"continent": r"Continent: (?P<continent>\w+)",
"country": r"Country: (?P<country>\w+)",
"country_code": r"CountryCode: (?P<country_code>\w+)",
"external_address": r"ExternalAddr: (?P<external_address>/ip[4].+?)$",
"location": r"Location: (?P<location>\w+.*)",
"node": r"Node: (?P<node>\d+\.\d+\.\d+\.\d+)",
"price": r"Price: (?P<price>\d+)",
"sub_div": r"SubDiv: (?P<sub_div>.*)",
"sub_div_code": r"SubDivCode: (?P<sub_div_code>\w+)",
"un_locode": r"UN-LOCODE: (?P<un_locode>\w+.*)",
"role": r"role: (?P<role>\w+)",
}
for node in netmap_nodes:
for key, regex in regexes.items():
search_result = re.search(regex, node, flags=re.MULTILINE)
if key == "node_data_ips":
result_netmap[key] = search_result[key].strip().split(" ")
continue
if key == "external_address":
result_netmap[key] = search_result[key].strip().split(",")
continue
if search_result == None:
result_netmap[key] = None
continue
result_netmap[key] = search_result[key].strip()
dataclasses_netmap.append(NodeNetmapInfo(**result_netmap))
return dataclasses_netmap
def parse_cmd_table(output: str, delimiter="|") -> list[dict[str, str]]: