Compare commits

...

10 commits

Author SHA1 Message Date
3af4dfd977 multipart scenario
Signed-off-by: m.malygina <m.malygina@yadro.com>
2023-10-27 11:53:55 +03:00
8a360683ae [#104] Add mask/unmask for services
Signed-off-by: Andrey Berezin <a.berezin@yadro.com>
2023-10-26 17:31:33 +03:00
f4111a1374 [#103] Add host_status method to Host
Signed-off-by: Andrey Berezin <a.berezin@yadro.com>
2023-10-26 13:34:42 +03:00
b1a3d740e9 [#102] Updates for failover
Signed-off-by: Andrey Berezin <a.berezin@yadro.com>
2023-10-25 15:57:38 +03:00
0c3bb20af5 Add method to interfaces
Signed-off-by: Dmitriy Zayakin <d.zayakin@yadro.com>
2023-10-24 12:41:44 +00:00
e1f3444e92 [#100] Add new method for logs gathering
Signed-off-by: Andrey Berezin <a.berezin@yadro.com>
2023-10-20 18:08:22 +03:00
cff5db5a67 Change func parsing netmap
Signed-off-by: Dmitriy Zayakin <d.zayakin@yadro.com>
2023-10-18 08:29:34 +00:00
1c3bbe26f7 [#98] Small dependency cleanup
Signed-off-by: Andrey Berezin <a.berezin@yadro.com>
2023-10-17 17:45:23 +03:00
dd347dd8fb Added unit to logs getter
Signed-off-by: Dmitry Anurin <d.anurin@yadro.com>
2023-10-12 11:56:30 +00:00
98f9c78f09 [#97] Probe fix for filedescriptor issue
Signed-off-by: Andrey Berezin <a.berezin@yadro.com>
2023-10-11 18:21:40 +03:00
17 changed files with 290 additions and 124 deletions

View file

@ -18,11 +18,11 @@ keywords = ["frostfs", "test"]
dependencies = [ dependencies = [
"allure-python-commons>=2.13.2", "allure-python-commons>=2.13.2",
"docker>=4.4.0", "docker>=4.4.0",
"importlib_metadata>=5.0; python_version < '3.10'", "pyyaml==6.0.1",
"neo-mamba==1.0.0", "neo-mamba==1.0.0",
"paramiko>=2.10.3", "paramiko>=2.10.3",
"pexpect>=4.8.0", "pexpect>=4.8.0",
"requests>=2.28.0", "requests==2.28.1",
"docstring_parser>=0.15", "docstring_parser>=0.15",
"testrail-api>=1.12.0", "testrail-api>=1.12.0",
"pytest==7.1.2", "pytest==7.1.2",

View file

@ -1,6 +1,5 @@
allure-python-commons==2.13.2 allure-python-commons==2.13.2
docker==4.4.0 docker==4.4.0
importlib_metadata==5.0.0
neo-mamba==1.0.0 neo-mamba==1.0.0
paramiko==2.10.3 paramiko==2.10.3
pexpect==4.8.0 pexpect==4.8.0

View file

@ -11,7 +11,7 @@ import docker
from requests import HTTPError from requests import HTTPError
from frostfs_testlib.hosting.config import ParsedAttributes from frostfs_testlib.hosting.config import ParsedAttributes
from frostfs_testlib.hosting.interfaces import DiskInfo, Host from frostfs_testlib.hosting.interfaces import DiskInfo, Host, HostStatus
from frostfs_testlib.shell import LocalShell, Shell, SSHShell from frostfs_testlib.shell import LocalShell, Shell, SSHShell
from frostfs_testlib.shell.command_inspectors import SudoInspector from frostfs_testlib.shell.command_inspectors import SudoInspector
@ -87,6 +87,15 @@ class DockerHost(Host):
for service_config in self._config.services: for service_config in self._config.services:
self.start_service(service_config.name) self.start_service(service_config.name)
def get_host_status(self) -> HostStatus:
# We emulate host status by checking all services.
for service_config in self._config.services:
state = self._get_container_state(service_config.name)
if state != "running":
return HostStatus.OFFLINE
return HostStatus.ONLINE
def stop_host(self) -> None: def stop_host(self) -> None:
# We emulate stopping machine by stopping all services # We emulate stopping machine by stopping all services
# As an alternative we can probably try to stop docker service... # As an alternative we can probably try to stop docker service...
@ -117,6 +126,14 @@ class DockerHost(Host):
timeout=service_attributes.stop_timeout, timeout=service_attributes.stop_timeout,
) )
def mask_service(self, service_name: str) -> None:
# Not required for Docker
return
def unmask_service(self, service_name: str) -> None:
# Not required for Docker
return
def wait_success_suspend_process(self, service_name: str): def wait_success_suspend_process(self, service_name: str):
raise NotImplementedError("Not supported for docker") raise NotImplementedError("Not supported for docker")
@ -212,11 +229,36 @@ class DockerHost(Host):
with open(file_path, "wb") as file: with open(file_path, "wb") as file:
file.write(logs) file.write(logs)
def get_filtered_logs(
self,
filter_regex: str,
since: Optional[datetime] = None,
until: Optional[datetime] = None,
unit: Optional[str] = None,
) -> str:
client = self._get_docker_client()
filtered_logs = ""
for service_config in self._config.services:
container_name = self._get_service_attributes(service_config.name).container_name
try:
filtered_logs = client.logs(container_name, since=since, until=until)
except HTTPError as exc:
logger.info(f"Got exception while dumping logs of '{container_name}': {exc}")
continue
matches = re.findall(filter_regex, filtered_logs, re.IGNORECASE + re.MULTILINE)
found = list(matches)
if found:
filtered_logs += f"{container_name}:\n{os.linesep.join(found)}"
return filtered_logs
def is_message_in_logs( def is_message_in_logs(
self, self,
message_regex: str, message_regex: str,
since: Optional[datetime] = None, since: Optional[datetime] = None,
until: Optional[datetime] = None, until: Optional[datetime] = None,
unit: Optional[str] = None,
) -> bool: ) -> bool:
client = self._get_docker_client() client = self._get_docker_client()
for service_config in self._config.services: for service_config in self._config.services:
@ -268,11 +310,16 @@ class DockerHost(Host):
# To speed things up, we break timeout in smaller iterations and check container state # To speed things up, we break timeout in smaller iterations and check container state
# several times. This way waiting stops as soon as container reaches the expected state # several times. This way waiting stops as soon as container reaches the expected state
for _ in range(iterations): for _ in range(iterations):
container = self._get_container_by_name(container_name) state = self._get_container_state(container_name)
logger.debug(f"Current container state\n:{json.dumps(container, indent=2)}")
if container and container["State"] == expected_state: if state == expected_state:
return return
time.sleep(iteration_wait_time) time.sleep(iteration_wait_time)
raise RuntimeError(f"Container {container_name} is not in {expected_state} state.") raise RuntimeError(f"Container {container_name} is not in {expected_state} state.")
def _get_container_state(self, container_name: str) -> str:
container = self._get_container_by_name(container_name)
logger.debug(f"Current container state\n:{json.dumps(container, indent=2)}")
return container.get("State", None)

View file

@ -4,6 +4,13 @@ from typing import Optional
from frostfs_testlib.hosting.config import CLIConfig, HostConfig, ServiceConfig from frostfs_testlib.hosting.config import CLIConfig, HostConfig, ServiceConfig
from frostfs_testlib.shell.interfaces import Shell from frostfs_testlib.shell.interfaces import Shell
from frostfs_testlib.testing.readable import HumanReadableEnum
class HostStatus(HumanReadableEnum):
ONLINE = "Online"
OFFLINE = "Offline"
UNKNOWN = "Unknown"
class DiskInfo(dict): class DiskInfo(dict):
@ -79,6 +86,10 @@ class Host(ABC):
def start_host(self) -> None: def start_host(self) -> None:
"""Starts the host machine.""" """Starts the host machine."""
@abstractmethod
def get_host_status(self) -> HostStatus:
"""Check host status."""
@abstractmethod @abstractmethod
def stop_host(self, mode: str) -> None: def stop_host(self, mode: str) -> None:
"""Stops the host machine. """Stops the host machine.
@ -107,6 +118,26 @@ class Host(ABC):
service_name: Name of the service to stop. service_name: Name of the service to stop.
""" """
@abstractmethod
def mask_service(self, service_name: str) -> None:
"""Prevent the service from start by any activity by masking it.
The service must be hosted on this host.
Args:
service_name: Name of the service to mask.
"""
@abstractmethod
def unmask_service(self, service_name: str) -> None:
"""Allow the service to start by any activity by unmasking it.
The service must be hosted on this host.
Args:
service_name: Name of the service to unmask.
"""
@abstractmethod @abstractmethod
def restart_service(self, service_name: str) -> None: def restart_service(self, service_name: str) -> None:
"""Restarts the service with specified name and waits until it starts. """Restarts the service with specified name and waits until it starts.
@ -115,7 +146,6 @@ class Host(ABC):
service_name: Name of the service to restart. service_name: Name of the service to restart.
""" """
@abstractmethod @abstractmethod
def get_data_directory(self, service_name: str) -> str: def get_data_directory(self, service_name: str) -> str:
""" """
@ -126,7 +156,6 @@ class Host(ABC):
service_name: Name of storage node service. service_name: Name of storage node service.
""" """
@abstractmethod @abstractmethod
def wait_success_suspend_process(self, process_name: str) -> None: def wait_success_suspend_process(self, process_name: str) -> None:
"""Search for a service ID by its name and stop the process """Search for a service ID by its name and stop the process
@ -251,12 +280,34 @@ class Host(ABC):
filter_regex: regex to filter output filter_regex: regex to filter output
""" """
@abstractmethod
def get_filtered_logs(
self,
filter_regex: str,
since: Optional[datetime] = None,
until: Optional[datetime] = None,
unit: Optional[str] = None,
) -> str:
"""Get logs from host filtered by regex.
Args:
filter_regex: regex filter for logs.
since: If set, limits the time from which logs should be collected. Must be in UTC.
until: If set, limits the time until which logs should be collected. Must be in UTC.
unit: required unit.
Returns:
Found entries as str if any found.
Empty string otherwise.
"""
@abstractmethod @abstractmethod
def is_message_in_logs( def is_message_in_logs(
self, self,
message_regex: str, message_regex: str,
since: Optional[datetime] = None, since: Optional[datetime] = None,
until: Optional[datetime] = None, until: Optional[datetime] = None,
unit: Optional[str] = None,
) -> bool: ) -> bool:
"""Checks logs on host for specified message regex. """Checks logs on host for specified message regex.
@ -270,9 +321,10 @@ class Host(ABC):
False otherwise. False otherwise.
""" """
@abstractmethod @abstractmethod
def wait_for_service_to_be_in_state(self, systemd_service_name: str, expected_state: str, timeout: int) -> None: def wait_for_service_to_be_in_state(
self, systemd_service_name: str, expected_state: str, timeout: int
) -> None:
""" """
Waites for service to be in specified state. Waites for service to be in specified state.

View file

@ -19,6 +19,7 @@ class LoadScenario(Enum):
gRPC_CAR = "grpc_car" gRPC_CAR = "grpc_car"
S3 = "s3" S3 = "s3"
S3_CAR = "s3_car" S3_CAR = "s3_car"
S3_MULTIPART = "s3_multipart"
HTTP = "http" HTTP = "http"
VERIFY = "verify" VERIFY = "verify"
LOCAL = "local" LOCAL = "local"
@ -37,10 +38,11 @@ all_load_scenarios = [
LoadScenario.S3_CAR, LoadScenario.S3_CAR,
LoadScenario.gRPC_CAR, LoadScenario.gRPC_CAR,
LoadScenario.LOCAL, LoadScenario.LOCAL,
LoadScenario.S3_MULTIPART
] ]
all_scenarios = all_load_scenarios.copy() + [LoadScenario.VERIFY] all_scenarios = all_load_scenarios.copy() + [LoadScenario.VERIFY]
constant_vus_scenarios = [LoadScenario.gRPC, LoadScenario.S3, LoadScenario.HTTP, LoadScenario.LOCAL] constant_vus_scenarios = [LoadScenario.gRPC, LoadScenario.S3, LoadScenario.HTTP, LoadScenario.LOCAL, LoadScenario.S3_MULTIPART]
constant_arrival_rate_scenarios = [LoadScenario.gRPC_CAR, LoadScenario.S3_CAR] constant_arrival_rate_scenarios = [LoadScenario.gRPC_CAR, LoadScenario.S3_CAR]
grpc_preset_scenarios = [ grpc_preset_scenarios = [
@ -49,7 +51,7 @@ grpc_preset_scenarios = [
LoadScenario.gRPC_CAR, LoadScenario.gRPC_CAR,
LoadScenario.LOCAL, LoadScenario.LOCAL,
] ]
s3_preset_scenarios = [LoadScenario.S3, LoadScenario.S3_CAR] s3_preset_scenarios = [LoadScenario.S3, LoadScenario.S3_CAR, LoadScenario.S3_MULTIPART]
@dataclass @dataclass
@ -172,7 +174,7 @@ class LoadParams:
k6_url: Optional[str] = None k6_url: Optional[str] = None
# No ssl verification flag # No ssl verification flag
no_verify_ssl: Optional[bool] = metadata_field( no_verify_ssl: Optional[bool] = metadata_field(
[LoadScenario.S3, LoadScenario.S3_CAR, LoadScenario.VERIFY, LoadScenario.HTTP], [LoadScenario.S3, LoadScenario.S3_CAR, LoadScenario.S3_MULTIPART, LoadScenario.VERIFY, LoadScenario.HTTP],
"no-verify-ssl", "no-verify-ssl",
"NO_VERIFY_SSL", "NO_VERIFY_SSL",
False, False,
@ -258,6 +260,14 @@ class LoadParams:
constant_arrival_rate_scenarios, None, "MAX_DELETERS", False, True constant_arrival_rate_scenarios, None, "MAX_DELETERS", False, True
) )
# Multipart
# Number of parts to upload in parallel
writers_multipart: Optional[int] = metadata_field(
[LoadScenario.S3_MULTIPART], None, "WRITERS_MULTIPART", False, True
)
# part size must be greater than (5 MB)
write_object_part_size: Optional[int] = metadata_field([LoadScenario.S3_MULTIPART], None, "WRITE_OBJ_PART_SIZE", False)
# Period of time to apply the rate value. # Period of time to apply the rate value.
time_unit: Optional[str] = metadata_field( time_unit: Optional[str] = metadata_field(
constant_arrival_rate_scenarios, None, "TIME_UNIT", False constant_arrival_rate_scenarios, None, "TIME_UNIT", False

View file

@ -196,6 +196,7 @@ def get_metrics_object(load_type: LoadScenario, summary: dict[str, Any]) -> Metr
LoadScenario.HTTP: GrpcMetrics, LoadScenario.HTTP: GrpcMetrics,
LoadScenario.S3: S3Metrics, LoadScenario.S3: S3Metrics,
LoadScenario.S3_CAR: S3Metrics, LoadScenario.S3_CAR: S3Metrics,
LoadScenario.S3_MULTIPART: S3Metrics,
LoadScenario.VERIFY: VerifyMetrics, LoadScenario.VERIFY: VerifyMetrics,
LoadScenario.LOCAL: LocalMetrics, LoadScenario.LOCAL: LocalMetrics,
} }

View file

@ -92,6 +92,7 @@ class LoadReport:
model_map = { model_map = {
LoadScenario.gRPC: "closed model", LoadScenario.gRPC: "closed model",
LoadScenario.S3: "closed model", LoadScenario.S3: "closed model",
LoadScenario.S3_MULTIPART: "closed model",
LoadScenario.HTTP: "closed model", LoadScenario.HTTP: "closed model",
LoadScenario.gRPC_CAR: "open model", LoadScenario.gRPC_CAR: "open model",
LoadScenario.S3_CAR: "open model", LoadScenario.S3_CAR: "open model",

View file

@ -1,11 +1,5 @@
import sys
from typing import Any
if sys.version_info < (3, 10):
# On Python prior 3.10 we need to use backport of entry points
from importlib_metadata import entry_points
else:
from importlib.metadata import entry_points from importlib.metadata import entry_points
from typing import Any
def load_plugin(plugin_group: str, name: str) -> Any: def load_plugin(plugin_group: str, name: str) -> Any:

View file

@ -39,7 +39,7 @@ class LocalShell(Shell):
log_file = tempfile.TemporaryFile() # File is reliable cross-platform way to capture output log_file = tempfile.TemporaryFile() # File is reliable cross-platform way to capture output
try: try:
command_process = pexpect.spawn(command, timeout=options.timeout) command_process = pexpect.spawn(command, timeout=options.timeout, use_poll=True)
except (pexpect.ExceptionPexpect, OSError) as exc: except (pexpect.ExceptionPexpect, OSError) as exc:
raise RuntimeError(f"Command: {command}") from exc raise RuntimeError(f"Command: {command}") from exc

View file

@ -15,8 +15,7 @@ from frostfs_testlib.resources.cli import (
) )
from frostfs_testlib.resources.common import MORPH_BLOCK_TIME from frostfs_testlib.resources.common import MORPH_BLOCK_TIME
from frostfs_testlib.shell import Shell from frostfs_testlib.shell import Shell
from frostfs_testlib.steps.epoch import tick_epoch from frostfs_testlib.steps.epoch import tick_epoch, wait_for_epochs_align
from frostfs_testlib.steps.epoch import wait_for_epochs_align
from frostfs_testlib.storage.cluster import Cluster, StorageNode from frostfs_testlib.storage.cluster import Cluster, StorageNode
from frostfs_testlib.storage.dataclasses.frostfs_services import S3Gate from frostfs_testlib.storage.dataclasses.frostfs_services import S3Gate
from frostfs_testlib.utils import datetime_utils from frostfs_testlib.utils import datetime_utils
@ -41,44 +40,6 @@ class HealthStatus:
return HealthStatus(network, health) return HealthStatus(network, health)
@reporter.step_deco("Stop random storage nodes")
def stop_random_storage_nodes(number: int, nodes: list[StorageNode]) -> list[StorageNode]:
"""
Shuts down the given number of randomly selected storage nodes.
Args:
number: the number of storage nodes to stop
nodes: the list of storage nodes to stop
Returns:
the list of nodes that were stopped
"""
nodes_to_stop = random.sample(nodes, number)
for node in nodes_to_stop:
node.stop_service()
return nodes_to_stop
@reporter.step_deco("Start storage node")
def start_storage_nodes(nodes: list[StorageNode]) -> None:
"""
The function starts specified storage nodes.
Args:
nodes: the list of nodes to start
"""
for node in nodes:
node.start_service()
@reporter.step_deco("Stop storage node")
def stop_storage_nodes(nodes: list[StorageNode]) -> None:
"""
The function starts specified storage nodes.
Args:
nodes: the list of nodes to start
"""
for node in nodes:
node.stop_service()
@reporter.step_deco("Get Locode from random storage node") @reporter.step_deco("Get Locode from random storage node")
def get_locode_from_random_node(cluster: Cluster) -> str: def get_locode_from_random_node(cluster: Cluster) -> str:
node = random.choice(cluster.services(StorageNode)) node = random.choice(cluster.services(StorageNode))
@ -329,25 +290,3 @@ def _run_control_command(node: StorageNode, command: str) -> None:
f"--wallet {wallet_path} --config {wallet_config_path}" f"--wallet {wallet_path} --config {wallet_config_path}"
) )
return result.stdout return result.stdout
@reporter.step_deco("Start services s3gate ")
def start_s3gates(cluster: Cluster) -> None:
"""
The function starts specified storage nodes.
Args:
cluster: cluster instance under test
"""
for gate in cluster.services(S3Gate):
gate.start_service()
@reporter.step_deco("Stop services s3gate ")
def stop_s3gates(cluster: Cluster) -> None:
"""
The function starts specified storage nodes.
Args:
cluster: cluster instance under test
"""
for gate in cluster.services(S3Gate):
gate.stop_service()

View file

@ -17,6 +17,7 @@ from frostfs_testlib.storage.dataclasses.frostfs_services import (
StorageNode, StorageNode,
) )
from frostfs_testlib.storage.dataclasses.node_base import NodeBase, ServiceClass from frostfs_testlib.storage.dataclasses.node_base import NodeBase, ServiceClass
from frostfs_testlib.storage.dataclasses.storage_object_info import Interfaces
from frostfs_testlib.storage.service_registry import ServiceRegistry from frostfs_testlib.storage.service_registry import ServiceRegistry
reporter = get_reporter() reporter = get_reporter()
@ -121,6 +122,40 @@ class ClusterNode:
config.attributes[ConfigAttributes.SERVICE_NAME] for config in self.host.config.services config.attributes[ConfigAttributes.SERVICE_NAME] for config in self.host.config.services
] ]
def get_all_interfaces(self) -> dict[str, str]:
return self.host.config.interfaces
def get_interface(self, interface: Interfaces) -> str:
return self.host.config.interfaces[interface.value]
def get_data_interfaces(self) -> list[str]:
return [
ip_address
for name_interface, ip_address in self.host.config.interfaces.items()
if "data" in name_interface
]
def get_data_interface(self, search_interface: str) -> list[str]:
return [
self.host.config.interfaces[interface]
for interface in self.host.config.interfaces.keys()
if search_interface == interface
]
def get_internal_interfaces(self) -> list[str]:
return [
ip_address
for name_interface, ip_address in self.host.config.interfaces.items()
if "internal" in name_interface
]
def get_internal_interface(self, search_internal: str) -> list[str]:
return [
self.host.config.interfaces[interface]
for interface in self.host.config.interfaces.keys()
if search_internal == interface
]
class Cluster: class Cluster:
""" """
@ -173,6 +208,42 @@ class Cluster:
def morph_chain(self) -> list[MorphChain]: def morph_chain(self) -> list[MorphChain]:
return self.services(MorphChain) return self.services(MorphChain)
def nodes(self, services: list[ServiceClass]) -> list[ClusterNode]:
"""
Resolve which cluster nodes hosting the specified services.
Args:
services: list of services to resolve hosting cluster nodes.
Returns:
list of cluster nodes which host specified services.
"""
cluster_nodes = set()
for service in services:
cluster_nodes.update(
[node for node in self.cluster_nodes if node.service(type(service)) == service]
)
return list(cluster_nodes)
def node(self, service: ServiceClass) -> ClusterNode:
"""
Resolve single cluster node hosting the specified service.
Args:
services: list of services to resolve hosting cluster nodes.
Returns:
list of cluster nodes which host specified services.
"""
nodes = [node for node in self.cluster_nodes if node.service(type(service)) == service]
if not len(nodes):
raise RuntimeError(f"Cannot find service {service} on any node")
return nodes[0]
def services(self, service_type: type[ServiceClass]) -> list[ServiceClass]: def services(self, service_type: type[ServiceClass]) -> list[ServiceClass]:
""" """
Get all services in a cluster of specified type. Get all services in a cluster of specified type.

View file

@ -10,6 +10,7 @@ class ConfigAttributes:
ENDPOINT_DATA_0 = "endpoint_data0" ENDPOINT_DATA_0 = "endpoint_data0"
ENDPOINT_DATA_1 = "endpoint_data1" ENDPOINT_DATA_1 = "endpoint_data1"
ENDPOINT_INTERNAL = "endpoint_internal0" ENDPOINT_INTERNAL = "endpoint_internal0"
ENDPOINT_PROMETHEUS = "endpoint_prometheus"
CONTROL_ENDPOINT = "control_endpoint" CONTROL_ENDPOINT = "control_endpoint"
UN_LOCODE = "un_locode" UN_LOCODE = "un_locode"
HTTP_HOSTNAME = "http_hostname" HTTP_HOSTNAME = "http_hostname"

View file

@ -41,10 +41,10 @@ class ClusterStateController:
provider = SshConnectionProvider() provider = SshConnectionProvider()
provider.drop(node.host_ip) provider.drop(node.host_ip)
self.stopped_nodes.append(node)
with reporter.step(f"Stop host {node.host.config.address}"): with reporter.step(f"Stop host {node.host.config.address}"):
node.host.stop_host(mode=mode) node.host.stop_host(mode=mode)
wait_for_host_offline(self.shell, node.storage_node) wait_for_host_offline(self.shell, node.storage_node)
self.stopped_nodes.append(node)
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step_deco("Shutdown whole cluster") @reporter.step_deco("Shutdown whole cluster")
@ -135,9 +135,9 @@ class ClusterStateController:
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step_deco("Stop storage service on {node}") @reporter.step_deco("Stop storage service on {node}")
def stop_storage_service(self, node: ClusterNode): def stop_storage_service(self, node: ClusterNode, mask: bool = True):
node.storage_node.stop_service()
self.stopped_storage_nodes.append(node) self.stopped_storage_nodes.append(node)
node.storage_node.stop_service(mask)
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step_deco("Stop all {service_type} services") @reporter.step_deco("Stop all {service_type} services")
@ -171,9 +171,11 @@ class ClusterStateController:
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step_deco("Stop {service_type} service on {node}") @reporter.step_deco("Stop {service_type} service on {node}")
def stop_service_of_type(self, node: ClusterNode, service_type: type[ServiceClass]): def stop_service_of_type(
self, node: ClusterNode, service_type: type[ServiceClass], mask: bool = True
):
service = node.service(service_type) service = node.service(service_type)
service.stop_service() service.stop_service(mask)
self.stopped_services.add(service) self.stopped_services.add(service)
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@ -207,8 +209,8 @@ class ClusterStateController:
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)
@reporter.step_deco("Stop s3 gate on {node}") @reporter.step_deco("Stop s3 gate on {node}")
def stop_s3_gate(self, node: ClusterNode): def stop_s3_gate(self, node: ClusterNode, mask: bool = True):
node.s3_gate.stop_service() node.s3_gate.stop_service(mask)
self.stopped_s3_gates.append(node) self.stopped_s3_gates.append(node)
@run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED) @run_optionally(optionals.OPTIONAL_FAILOVER_ENABLED)

View file

@ -110,6 +110,10 @@ class MorphChain(NodeBase):
def label(self) -> str: def label(self) -> str:
return f"{self.name}: {self.get_endpoint()}" return f"{self.name}: {self.get_endpoint()}"
def get_http_endpoint(self) -> str:
return self._get_attribute("http_endpoint")
class StorageNode(NodeBase): class StorageNode(NodeBase):
""" """
Class represents storage node in a storage cluster Class represents storage node in a storage cluster
@ -141,6 +145,9 @@ class StorageNode(NodeBase):
def get_shard_config_path(self) -> str: def get_shard_config_path(self) -> str:
return self._get_attribute(ConfigAttributes.SHARD_CONFIG_PATH) return self._get_attribute(ConfigAttributes.SHARD_CONFIG_PATH)
def get_shards_config(self) -> tuple[str, dict]:
return self.get_config(self.get_shard_config_path())
def get_control_endpoint(self) -> str: def get_control_endpoint(self) -> str:
return self._get_attribute(ConfigAttributes.CONTROL_ENDPOINT) return self._get_attribute(ConfigAttributes.CONTROL_ENDPOINT)

View file

@ -1,6 +1,6 @@
from abc import abstractmethod from abc import abstractmethod
from dataclasses import dataclass from dataclasses import dataclass
from typing import Optional, Tuple, TypedDict, TypeVar from typing import Optional, TypedDict, TypeVar
import yaml import yaml
@ -57,6 +57,9 @@ class NodeBase(HumanReadableABC):
return self._process_name return self._process_name
def start_service(self): def start_service(self):
with reporter.step(f"Unmask {self.name} service on {self.host.config.address}"):
self.host.unmask_service(self.name)
with reporter.step(f"Start {self.name} service on {self.host.config.address}"): with reporter.step(f"Start {self.name} service on {self.host.config.address}"):
self.host.start_service(self.name) self.host.start_service(self.name)
@ -64,7 +67,14 @@ class NodeBase(HumanReadableABC):
def service_healthcheck(self) -> bool: def service_healthcheck(self) -> bool:
"""Service healthcheck.""" """Service healthcheck."""
def stop_service(self): def get_metrics_endpoint(self) -> str:
return self._get_attribute(ConfigAttributes.ENDPOINT_PROMETHEUS)
def stop_service(self, mask: bool = True):
if mask:
with reporter.step(f"Mask {self.name} service on {self.host.config.address}"):
self.host.mask_service(self.name)
with reporter.step(f"Stop {self.name} service on {self.host.config.address}"): with reporter.step(f"Stop {self.name} service on {self.host.config.address}"):
self.host.stop_service(self.name) self.host.stop_service(self.name)
@ -103,8 +113,10 @@ class NodeBase(HumanReadableABC):
ConfigAttributes.WALLET_CONFIG, ConfigAttributes.WALLET_CONFIG,
) )
def get_config(self) -> Tuple[str, dict]: def get_config(self, config_file_path: Optional[str] = None) -> tuple[str, dict]:
if config_file_path is None:
config_file_path = self._get_attribute(ConfigAttributes.CONFIG_PATH) config_file_path = self._get_attribute(ConfigAttributes.CONFIG_PATH)
shell = self.host.get_shell() shell = self.host.get_shell()
result = shell.exec(f"cat {config_file_path}") result = shell.exec(f"cat {config_file_path}")
@ -113,8 +125,10 @@ class NodeBase(HumanReadableABC):
config = yaml.safe_load(config_text) config = yaml.safe_load(config_text)
return config_file_path, config return config_file_path, config
def save_config(self, new_config: dict) -> None: def save_config(self, new_config: dict, config_file_path: Optional[str] = None) -> None:
if config_file_path is None:
config_file_path = self._get_attribute(ConfigAttributes.CONFIG_PATH) config_file_path = self._get_attribute(ConfigAttributes.CONFIG_PATH)
shell = self.host.get_shell() shell = self.host.get_shell()
config_str = yaml.dump(new_config) config_str = yaml.dump(new_config)

View file

@ -1,6 +1,9 @@
from dataclasses import dataclass from dataclasses import dataclass
from enum import Enum
from typing import Optional from typing import Optional
from frostfs_testlib.testing.readable import HumanReadableEnum
@dataclass @dataclass
class ObjectRef: class ObjectRef:
@ -29,15 +32,24 @@ class StorageObjectInfo(ObjectRef):
class NodeNetmapInfo: class NodeNetmapInfo:
node_id: str = None node_id: str = None
node_status: str = None node_status: str = None
node_data_ip: str = None node_data_ips: list[str] = None
cluster_name: str = None cluster_name: str = None
continent: str = None continent: str = None
country: str = None country: str = None
country_code: str = None country_code: str = None
external_address: str = None external_address: list[str] = None
location: str = None location: str = None
node: str = None node: str = None
price: int = None
sub_div: str = None sub_div: str = None
sub_div_code: int = None sub_div_code: int = None
un_locode: str = None un_locode: str = None
role: str = None role: str = None
class Interfaces(HumanReadableEnum):
DATA_O: str = "data0"
DATA_1: str = "data1"
MGMT: str = "mgmt"
INTERNAL_0: str = "internal0"
INTERNAL_1: str = "internal1"

View file

@ -8,6 +8,7 @@ Helper functions to use with `frostfs-cli`, `neo-go` and other CLIs.
import csv import csv
import json import json
import logging import logging
import re
import subprocess import subprocess
import sys import sys
from contextlib import suppress from contextlib import suppress
@ -138,32 +139,47 @@ def log_command_execution(cmd: str, output: Union[str, TypedDict]) -> None:
def parse_netmap_output(output: str) -> list[NodeNetmapInfo]: def parse_netmap_output(output: str) -> list[NodeNetmapInfo]:
""" """
The cli command will return something like.
Epoch: 240
Node 1: 01234 ONLINE /ip4/10.10.10.10/tcp/8080
Continent: Europe
Country: Russia
CountryCode: RU
ExternalAddr: /ip4/10.10.11.18/tcp/8080
Location: Moskva
Node: 10.10.10.12
Price: 5
SubDiv: Moskva
SubDivCode: MOW
UN-LOCODE: RU MOW
role: alphabet
The code will parse each line and return each node as dataclass. The code will parse each line and return each node as dataclass.
""" """
netmap_list = output.split("Node ")[1:] netmap_nodes = output.split("Node ")[1:]
dataclass_list = [] dataclasses_netmap = []
for node in netmap_list: result_netmap = {}
node = node.replace("\t", "").split("\n")
node = *node[0].split(" ")[1:-1], *[row.split(": ")[-1] for row in node[1:-1]]
dataclass_list.append(NodeNetmapInfo(*node))
return dataclass_list regexes = {
"node_id": r"\d+: (?P<node_id>\w+)",
"node_data_ips": r"(?P<node_data_ips>/ip4/.+?)$",
"node_status": r"(?P<node_status>ONLINE|OFFLINE)",
"cluster_name": r"ClusterName: (?P<cluster_name>\w+)",
"continent": r"Continent: (?P<continent>\w+)",
"country": r"Country: (?P<country>\w+)",
"country_code": r"CountryCode: (?P<country_code>\w+)",
"external_address": r"ExternalAddr: (?P<external_address>/ip[4].+?)$",
"location": r"Location: (?P<location>\w+.*)",
"node": r"Node: (?P<node>\d+\.\d+\.\d+\.\d+)",
"price": r"Price: (?P<price>\d+)",
"sub_div": r"SubDiv: (?P<sub_div>.*)",
"sub_div_code": r"SubDivCode: (?P<sub_div_code>\w+)",
"un_locode": r"UN-LOCODE: (?P<un_locode>\w+.*)",
"role": r"role: (?P<role>\w+)",
}
for node in netmap_nodes:
for key, regex in regexes.items():
search_result = re.search(regex, node, flags=re.MULTILINE)
if key == "node_data_ips":
result_netmap[key] = search_result[key].strip().split(" ")
continue
if key == "external_address":
result_netmap[key] = search_result[key].strip().split(",")
continue
if search_result == None:
result_netmap[key] = None
continue
result_netmap[key] = search_result[key].strip()
dataclasses_netmap.append(NodeNetmapInfo(**result_netmap))
return dataclasses_netmap
def parse_cmd_table(output: str, delimiter="|") -> list[dict[str, str]]: def parse_cmd_table(output: str, delimiter="|") -> list[dict[str, str]]: